Commit a000c746 by liuyongshuai

1.去除区域碰撞标识号的重复数据,优化效率,这样入库标识号就会少很多,调取关联关系也会节省很多时间;

2.修改区域碰撞的处理逻辑,分两步,首先调取区域碰撞的标识号信息,并更新任务状态位;然后调取标识号的关联数据信息,并任务状态。
parent c88261f7
package com.founder.interservice.regionalanalysis.repository;
import com.founder.interservice.regionalanalysis.model.RegionalTask;
import com.founder.interservice.regionalanalysis.model.RegionalTaskResult;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import java.util.List;
/**
* @ClassName: RegionalRepository
* @Auther: 曹鹏
......@@ -12,5 +15,5 @@ import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
* @Version: 1.0
*/
public interface RegionalTaskResultRepository extends JpaRepository<RegionalTaskResult,String>,JpaSpecificationExecutor<RegionalTaskResult> {
public List<RegionalTaskResult> findAllByTaskId(String taskId);
}
......@@ -72,28 +72,23 @@ public class ScheduledService {
}
});
if(taskList != null && !taskList.isEmpty()){
JSONObject jsonObject = null;
String progress=null ,state=null;
String taskId = null,progress=null,state=null,result=null;Map<String,String> taskMap=null;
for (RegionalTask task:taskList) {
String status_url = REGIONAL_ANALYSIS_TASK_STATUS + "&taskId="+task.getTaskId();
String result = HttpUtil.getWaData(status_url);
//String statusStr = "{\"progress\":0.8,\"state\":\"TIMEOUT\"}";
if(null != result && result.startsWith("{")){
jsonObject = JSONObject.parseObject(result);
}
if(jsonObject!=null){
progress = jsonObject.getString("progress"); //进度
state = jsonObject.getString("state"); //状态
}
taskId = task.getTaskId();
//查询比对结果状态
taskMap = queryTaskStates(taskId);
progress = taskMap.get("progress");
state = taskMap.get("state");
if("1".equals(progress) && "FINISHED".equals(state)){
String info_url = REGIONAL_ANALYSIS_TASK_INFO + "&taskId=" + task.getTaskId();
String info_url = REGIONAL_ANALYSIS_TASK_INFO + "&taskId=" + taskId;
String taskInfoResult = HttpUtil.getWaData(info_url);
//String taskInfoResult = "{\"results\":[],\"status\":\"ok\"}";
//调取结果成功 然后将结果保存入库
if(!taskInfoResult.startsWith("Rate")){
if(null != result && result.startsWith("{")){
if(null != taskInfoResult && taskInfoResult.startsWith("{")){
getAndSaveInfo(taskInfoResult,task);
updateTaskStates(task.getTaskId(), progress, state);
updateTaskStates(taskId,progress,state);
}
}
}
......@@ -105,6 +100,48 @@ public class ScheduledService {
}
/**
* 第二步,调取标识号的关联信息
* 1.将比对碰撞首次调取标识号成功的任务过滤出来即状态位为FINISHED的
* 2.开启新线程调取标识号的关联数据
* 3.调取标识号关联数据完成,然后更新任务
*/
@Scheduled(initialDelay = 120000,fixedDelay = 180000) //项目启动后延迟两分钟执行,每次执行完后两分钟后再次执行
public void queryFinishTaskResult(){
System.out.println("=============开始执行调取关联数据定时任务================");
System.out.println("线程名称========== " + Thread.currentThread().getName());
try{
//1 下去查询任务表中status = "QUEUEING","STARTING","RUNNING"的任务
List<RegionalTask> taskList = regionalTaskRepository.findAll(new Specification<RegionalTask>() {
@Override
public Predicate toPredicate(Root<RegionalTask> root, CriteriaQuery<?> criteriaQuery, CriteriaBuilder criteriaBuilder) {
List<Predicate> list = new ArrayList<>();
Expression<String> exp = root.<String>get("state");
list.add(exp.in(Arrays.asList("FINISHED")));
Predicate[] p = new Predicate[list.size()];
return criteriaBuilder.and(list.toArray(p));
}
});
if(taskList != null && !taskList.isEmpty()){
for (RegionalTask task:taskList) {
List<RegionalTaskResult> taskResults = taskResultRepository.findAllByTaskId(task.getTaskId());
//保存区域碰撞结果翻译后的数据
getAndSaveRydetail(taskResults);
//查询状态,并更新状态
Map<String,String> taskMap = queryTaskStates(task.getTaskId());
String progress = taskMap.get("progress");
String state = taskMap.get("state");
//判断当前任务状态和查询任务状态是否一致,如果不一致 进行更新
if(!state.equals(task.getState())){
updateTaskStates(task.getTaskId(), progress, state);
}
}
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 通过区域碰撞的结果 使用第四个接口调取关联数据
*/
public void getAndSaveRydetail(List<RegionalTaskResult> resultsList) throws Exception {
......@@ -184,7 +221,7 @@ public class ScheduledService {
}
/**
* 保存任务结果数据
* 保存任务结果数据(调取标识号完成,即第一步完成)
*
* @param taskInfoResult
* @param task
......@@ -210,8 +247,6 @@ public class ScheduledService {
}
//保存区域碰撞结果数据
taskResultRepository.save(taskResults);
//保存区域碰撞结果翻译后的数据
getAndSaveRydetail(taskResults);
}
}
}catch (Exception e){
......@@ -219,6 +254,32 @@ public class ScheduledService {
}
}
/**
* 查询任务状态
* @param taskId
* @return
*/
public Map<String,String> queryTaskStates(String taskId){
JSONObject jsonObject = null;
String progress=null ,state=null;
Map<String,String> map = new HashMap();
String status_url = REGIONAL_ANALYSIS_TASK_STATUS + "&taskId="+taskId;
String result = HttpUtil.getWaData(status_url);
//String statusStr = "{\"progress\":0.8,\"state\":\"TIMEOUT\"}";
if(null != result && result.startsWith("{")){
jsonObject = JSONObject.parseObject(result);
}
if(jsonObject!=null){
progress = jsonObject.getString("progress"); //进度
state = jsonObject.getString("state"); //状态
}
map.put("progress",progress);
map.put("state",state);
return map;
}
/**
* 修改任务状态
* @param taskId
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment