From f0b36fad6dfa302977371e724c9c60c753ce43a9 Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Thu, 29 May 2025 10:15:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E7=8A=B6=E6=80=81=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E6=9B=B4=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispatcher/impl/ParallelDispatcher.java | 5 +++ .../dispatcher/impl/SerialDispatcher.java | 3 +- .../engine/runContext/BaseRunContext.java | 35 +++++++++++++++++-- .../engine/worker/base/BaseWorker.java | 1 + 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index c3b4832c..66c1ac40 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -60,6 +60,11 @@ public class ParallelDispatcher implements BaseDispatcher{ // 等待当前阶段执行 latch.await(); // TODO 检查是否全部执行成功 ,目前没有逻辑就是忽略错误 + // 当前执行失败 + while (pipelineRunContext.getState().get() != ContextStateEnum.RUNNING.getCode()) { + // 想办法借助工具类 或者直接wait + pipelineRunContext.pause(); + } } } @Override diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index e600efc3..82a608b0 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -51,7 +51,8 @@ public class SerialDispatcher implements BaseDispatcher { AtomicInteger state = taskRunContext.getState(); while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode() && state.get() != ContextStateEnum.BAD_ENDING.getCode()) { - Thread.sleep(1000L); +// Thread.sleep(1000L); + taskRunContext.pause(); } // } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java index d93d762d..28186a38 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java @@ -6,13 +6,16 @@ import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; +import lombok.extern.slf4j.Slf4j; import org.springframework.data.annotation.Transient; import java.time.LocalDateTime; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @Data +@Slf4j public abstract class BaseRunContext { /** * 当前上下文的定义 @@ -46,6 +49,10 @@ public abstract class BaseRunContext { * */ private Map localVariables; private Map childContext; + /** + * 用来在控制其他地方的阻塞放行,countDown为1 + * */ + private CountDownLatch countDownLatch; public BaseRunContext(PipBaseElement contextDef,Integer childCount,BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetVersionId, String targetType, Map globalVariables, Map localVariables, Map childContext) { this.contextDef = contextDef; @@ -81,6 +88,8 @@ public abstract class BaseRunContext { if (parentContext==null) { return; } + // 如果之前有暂停监听状态的.则停止暂停 + unpause(); if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { this.endTime=LocalDateTime.now(); parentContext.checkChildEnd(); @@ -96,11 +105,12 @@ public abstract class BaseRunContext { * */ public void checkChildEnd() throws ServiceException{ Map childContext = getChildContext(); - if (childContext.size()!=childCount) { - return; - } + int result = ContextStateEnum.HAPPY_ENDING.getCode(); for (Map.Entry entry : childContext.entrySet()) { + if (childContext.size()!=childCount) { + return; + } BaseRunContext child = entry.getValue(); int state = child.getState().get(); if (!ContextStateEnum.HAPPY_ENDING.getCode().equals(state)&&!ContextStateEnum.BAD_ENDING.getCode().equals(state)) { @@ -163,4 +173,23 @@ public abstract class BaseRunContext { } } } + public void pause(){ + if (countDownLatch==null) { + synchronized(this) { + if (this.countDownLatch == null) { + this.countDownLatch= new CountDownLatch(1); + } + } + } + try { + this.countDownLatch.await(); + } catch (InterruptedException e) { + log.error(e.getMessage()); + } + } + private void unpause(){ + if (this.countDownLatch!=null) { + this.countDownLatch.countDown(); + } + } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java index be64100f..0d630482 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java @@ -68,6 +68,7 @@ public abstract class BaseWorker implements Runnable{ } catch (Exception e) { log.error("================worker执行报错:",e); taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING); + append(context,e.getMessage()); return; } // TODO 执行结束修改context的state,并且通知父类