执行器相关逻辑

This commit is contained in:
even 2025-05-19 15:22:16 +08:00
parent 889026d53c
commit ab924c0e6b
3 changed files with 3 additions and 0 deletions

View File

@ -53,6 +53,7 @@ public class SerialDispatcher implements BaseDispatcher {
&& state.get() != ContextStateEnum.BAD_ENDING.getCode()) { && state.get() != ContextStateEnum.BAD_ENDING.getCode()) {
Thread.sleep(1000L); Thread.sleep(1000L);
} }
//
} }
} }

View File

@ -52,6 +52,7 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
if (CollectionUtils.isEmpty(mainStage)) { if (CollectionUtils.isEmpty(mainStage)) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息"); throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息");
} }
// 如果要做 容灾就需要重新将数据库存的记录按顺序加载入
PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor); ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor);
parallelExecutor.execute(parallelDispatcher); parallelExecutor.execute(parallelDispatcher);

View File

@ -36,6 +36,7 @@ public abstract class BaseWorker implements Runnable{
BaseRunContext context = contextManager.getContext(contextKey); BaseRunContext context = contextManager.getContext(contextKey);
if (context instanceof TaskRunContext taskRunContext){ if (context instanceof TaskRunContext taskRunContext){
try { try {
taskRunContext.changeContextState(ContextStateEnum.RUNNING);
execute(taskRunContext); execute(taskRunContext);
} catch (Exception e) { } catch (Exception e) {
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING); taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);