diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index 49db9c80..c162b19a 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -60,7 +60,7 @@ public class ParallelDispatcher implements BaseDispatcher{ // 等待当前阶段执行 latch.await(); } - + // 入库 } @Override public void run() { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index a1e9bc0b..3f302f62 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -38,7 +38,7 @@ public class SerialDispatcher implements BaseDispatcher { } @Override - public void dispatch() { + public void dispatch() throws InterruptedException { for (PipTask pipTask : taskList) { // 注册taskContext,且发送消息至消息队列给work执行, 如果需要则传入参数 TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>()); @@ -46,7 +46,13 @@ public class SerialDispatcher implements BaseDispatcher { taskRunContext.changeContextState(ContextStateEnum.READY); TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask); redisMQTemplate.send(taskRunMessage); - // TODO 监听当前taskContext状态 + // TODO 监听当前taskContext状态变成执行成功或者执行失败(worker当中改变状态为运行中、执行成功、执行失败) + // + AtomicInteger state = taskRunContext.getState(); + while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode() + && state.get() != ContextStateEnum.BAD_ENDING.getCode()) { + Thread.sleep(1000L); + } } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java index 6c45e759..a764465a 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java @@ -61,7 +61,7 @@ public abstract class BaseRunContext { callParentChange(stateEnum); } } - + // 保证一直都操作同一个引用的值 private void setState(AtomicInteger state) { this.state = state; } diff --git a/ops-server/src/test/java/cd/casic/server/RedisMqTest.java b/ops-server/src/test/java/cd/casic/server/RedisMqTest.java index 9510ac9d..42faccfd 100644 --- a/ops-server/src/test/java/cd/casic/server/RedisMqTest.java +++ b/ops-server/src/test/java/cd/casic/server/RedisMqTest.java @@ -46,7 +46,7 @@ public class RedisMqTest { stage.setId("testStage"); stage.setParentId("testPipeline"); SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,pipelineRunContext,new ConcurrentHashMap<>()); - TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext); + TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext,new HashMap<>()); contextManager.contextRegister(pipelineRunContext); contextManager.contextRegister(secondStageRunContext); contextManager.contextRegister(taskRunContext);