diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index 99d43b8d..49db9c80 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -52,12 +52,12 @@ public class ParallelDispatcher implements BaseDispatcher{ for (PipStage secondStage : stageList) { // 二阶段下所有task是串行所以不用关心线程安全相关信息 SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new ConcurrentHashMap<>()); - runContextManager.contextRegister(context); SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate); // 给线程池进行执行 taskExecutor.execute(serialDispatcher); } + // 等待当前阶段执行 latch.await(); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index 515b146b..a1e9bc0b 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -12,6 +12,7 @@ import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.framework.mq.redis.core.RedisMQTemplate; +import java.util.HashMap; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -39,12 +40,13 @@ public class SerialDispatcher implements BaseDispatcher { @Override public void dispatch() { for (PipTask pipTask : taskList) { - // 注册taskContext,且发送消息至消息队列给work执行 - TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext); + // 注册taskContext,且发送消息至消息队列给work执行, 如果需要则传入参数 + TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>()); contextManager.contextRegister(taskRunContext); taskRunContext.changeContextState(ContextStateEnum.READY); TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask); redisMQTemplate.send(taskRunMessage); + // TODO 监听当前taskContext状态 } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java index 1d0d026d..f63d6634 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java @@ -11,8 +11,8 @@ import java.util.HashMap; import java.util.Map; public class TaskRunContext extends BaseRunContext{ - public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext) { - super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), new HashMap<>(), new HashMap<>()); + public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext,Map localVariable) { + super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(),localVariable, new HashMap<>()); } private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext);