From ead82b946d597f6d8e38915e881f31f5a61eddce Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Fri, 30 May 2025 14:52:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E9=80=BB=E8=BE=91=E4=BF=AE?= =?UTF-8?q?=E6=94=B9-=E6=AF=8F=E6=AC=A1=E5=90=AF=E5=8A=A8=E6=A0=87?= =?UTF-8?q?=E8=AE=B0=E4=B8=8A=E4=B8=80=E6=AC=A1=E6=89=A7=E8=A1=8C=E4=B8=BA?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=A4=B1=E8=B4=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../engine/dispatcher/BaseDispatcher.java | 1 + .../dispatcher/impl/ParallelDispatcher.java | 8 ++++++-- .../dispatcher/impl/SerialDispatcher.java | 7 ++++++- .../executor/impl/DefaultPipelineExecutor.java | 1 + .../manager/impl/DefaultRunContextManager.java | 14 ++++++++++++++ .../engine/runContext/BaseRunContext.java | 17 ++++++++++++++++- 6 files changed, 44 insertions(+), 4 deletions(-) diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java index 0ea126dc..756a96af 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java @@ -2,6 +2,7 @@ package cd.casic.ci.process.engine.dispatcher; import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import lombok.extern.slf4j.Slf4j; import java.util.List; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index 1d973640..ce4dda1c 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -8,6 +8,7 @@ import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.framework.mq.redis.core.RedisMQTemplate; +import lombok.extern.slf4j.Slf4j; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.CollectionUtils; @@ -19,7 +20,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; - +@Slf4j public class ParallelDispatcher implements BaseDispatcher{ private List firstStageList; @@ -34,7 +35,6 @@ public class ParallelDispatcher implements BaseDispatcher{ this.pipelineRunContext = context; this.stageIndex = 0; this.runContextManager = contextManager; - contextManager.contextRegister(context); this.redisMQTemplate = redisMQTemplate; this.taskExecutor = taskExecutor; } @@ -59,6 +59,10 @@ public class ParallelDispatcher implements BaseDispatcher{ } // 等待当前阶段执行 latch.await(); + if (pipelineRunContext.getState().get()== ContextStateEnum.BAD_ENDING.getCode()) { + log.error("并行执行停止"); + break; + } // TODO 检查是否全部执行成功 ,目前没有逻辑就是忽略错误 // 当前执行失败 // while (pipelineRunContext.getState().get() != ContextStateEnum.RUNNING.getCode()) { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index 14315275..996dc436 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -11,12 +11,13 @@ import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.framework.mq.redis.core.RedisMQTemplate; +import lombok.extern.slf4j.Slf4j; import java.util.HashMap; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; - +@Slf4j public class SerialDispatcher implements BaseDispatcher { private SecondStageRunContext stageRunContext; private List taskList; @@ -57,6 +58,10 @@ public class SerialDispatcher implements BaseDispatcher { taskRunContext.pause(); } // + if (state.get()== ContextStateEnum.BAD_ENDING.getCode()) { + log.error("串行执行停止"); + break; + } } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java index 233736cb..260b26ca 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java @@ -61,6 +61,7 @@ public class DefaultPipelineExecutor implements PipelineExecutor { } // 如果要做 容灾就需要重新将数据库存的记录按顺序加载入 PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline,childCount,null,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); + runContextManager.contextRegister(pipelineRunContext); ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor); parallelExecutor.execute(parallelDispatcher); return pipelineRunContext; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java index 7c4bfb1c..1e0a89b6 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java @@ -1,5 +1,6 @@ package cd.casic.ci.process.engine.manager.impl; +import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.engine.manager.RunContextManager; import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.PipelineRunContext; @@ -10,6 +11,7 @@ import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import org.springframework.stereotype.Component; +import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -51,6 +53,10 @@ public class DefaultRunContextManager implements RunContextManager { String id = contextDef.getId(); BaseRunContext parentContext = context.getParentContext(); if (context instanceof PipelineRunContext pipelineRunContext) { + if (contextMap.containsKey(id)) { + PipelineRunContext oldPipeline = contextMap.get(id); + oldPipeline.changeContextStateAndChild(ContextStateEnum.BAD_ENDING); + } contextMap.put(id,pipelineRunContext); } else { if (parentContext==null) { @@ -84,4 +90,12 @@ public class DefaultRunContextManager implements RunContextManager { } return null; } + public void changePipelineState(String pipelineId,ContextStateEnum stateEnum){ + PipelineRunContext pipelineRunContext = contextMap.get(pipelineId); + if (pipelineRunContext==null) { + return; + } + pipelineRunContext.changeContextStateAndChild(stateEnum); + } + } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java index 2e2fa2db..3aa552b9 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java @@ -8,8 +8,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.data.annotation.Transient; +import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -72,10 +74,22 @@ public abstract class BaseRunContext { if (ContextStateEnum.canGoto(curr,stateEnum)) { state.compareAndExchange(curr.getCode(),stateEnum.getCode()); // 如果之前有暂停监听状态的.则停止暂停 - unpause(); +// unpause(); callParentChange(stateEnum); } } + public void changeContextStateAndChild(ContextStateEnum stateEnum){ + ContextStateEnum curr = ContextStateEnum.getByCode(state.get()); + if (ContextStateEnum.canGoto(curr,stateEnum)) { + state.compareAndExchange(curr.getCode(),stateEnum.getCode()); + Collection values = this.getChildContext().values(); + if (!CollectionUtils.isEmpty(values)) { + for (BaseRunContext value : values) { + value.changeContextStateAndChild(stateEnum); + } + } + } + } // 保证一直都操作同一个引用的值 private void setState(AtomicInteger state) { this.state = state; @@ -92,6 +106,7 @@ public abstract class BaseRunContext { } if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { this.endTime=LocalDateTime.now(); + unpause(); parentContext.checkChildEnd(); } else if(ContextStateEnum.READY.equals(state)){ parentContext.checkChildReady();