From 889026d53c70633aea7227de07d612d40265dcf3 Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Mon, 19 May 2025 15:12:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B2=E8=A1=8C=E5=88=86=E5=8F=91=E5=99=A8?= =?UTF-8?q?=EF=BC=8C=E6=B7=BB=E5=8A=A0=E9=98=BB=E5=A1=9E=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../engine/dispatcher/impl/ParallelDispatcher.java | 2 +- .../engine/dispatcher/impl/SerialDispatcher.java | 10 ++++++++-- .../ci/process/engine/runContext/BaseRunContext.java | 2 +- .../src/test/java/cd/casic/server/RedisMqTest.java | 2 +- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index 49db9c80..c162b19a 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -60,7 +60,7 @@ public class ParallelDispatcher implements BaseDispatcher{ // 等待当前阶段执行 latch.await(); } - + // 入库 } @Override public void run() { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index a1e9bc0b..3f302f62 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -38,7 +38,7 @@ public class SerialDispatcher implements BaseDispatcher { } @Override - public void dispatch() { + public void dispatch() throws InterruptedException { for (PipTask pipTask : taskList) { // 注册taskContext,且发送消息至消息队列给work执行, 如果需要则传入参数 TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>()); @@ -46,7 +46,13 @@ public class SerialDispatcher implements BaseDispatcher { taskRunContext.changeContextState(ContextStateEnum.READY); TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask); redisMQTemplate.send(taskRunMessage); - // TODO 监听当前taskContext状态 + // TODO 监听当前taskContext状态变成执行成功或者执行失败(worker当中改变状态为运行中、执行成功、执行失败) + // + AtomicInteger state = taskRunContext.getState(); + while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode() + && state.get() != ContextStateEnum.BAD_ENDING.getCode()) { + Thread.sleep(1000L); + } } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java index 6c45e759..a764465a 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java @@ -61,7 +61,7 @@ public abstract class BaseRunContext { callParentChange(stateEnum); } } - + // 保证一直都操作同一个引用的值 private void setState(AtomicInteger state) { this.state = state; } diff --git a/ops-server/src/test/java/cd/casic/server/RedisMqTest.java b/ops-server/src/test/java/cd/casic/server/RedisMqTest.java index 9510ac9d..42faccfd 100644 --- a/ops-server/src/test/java/cd/casic/server/RedisMqTest.java +++ b/ops-server/src/test/java/cd/casic/server/RedisMqTest.java @@ -46,7 +46,7 @@ public class RedisMqTest { stage.setId("testStage"); stage.setParentId("testPipeline"); SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,pipelineRunContext,new ConcurrentHashMap<>()); - TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext); + TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext,new HashMap<>()); contextManager.contextRegister(pipelineRunContext); contextManager.contextRegister(secondStageRunContext); contextManager.contextRegister(taskRunContext);