From ddf3ff70699651200266638ec475440e5d87be47 Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Sat, 17 May 2025 00:56:49 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=BC=95=E6=93=8E?= =?UTF-8?q?=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81=EF=BC=88=E6=9C=AA=E5=AE=8C?= =?UTF-8?q?=E6=88=90=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../process/engine/config/ExecutorConfig.java | 25 +++++++++ .../engine/dispatcher/BaseDispatcher.java | 4 +- .../dispatcher/impl/ParallelDispatcher.java | 51 +++++++++++++++++-- .../dispatcher/impl/SerialDispatcher.java | 32 ++++++++++-- .../engine/enums/ContextStateEnum.java | 3 ++ .../impl/DefaultPipelineExecutor.java | 19 +++++-- .../engine/manager/RunContextManager.java | 6 +-- .../engine/runContext/BaseRunContext.java | 51 ++++++++++++++++++- .../engine/runContext/MainStageContext.java | 22 -------- .../engine/runContext/PipelineRunContext.java | 13 +++-- ...ontext.java => SecondStageRunContext.java} | 8 +-- .../engine/runContext/TaskRunContext.java | 6 +-- .../dataObject/pipeline/PipPipeline.java | 2 + 13 files changed, 187 insertions(+), 55 deletions(-) create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java delete mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/MainStageContext.java rename modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/{StageRunContext.java => SecondStageRunContext.java} (66%) diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java new file mode 100644 index 0000000..7ac1c45 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/config/ExecutorConfig.java @@ -0,0 +1,25 @@ +package cd.casic.ci.process.engine.config; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +public class ExecutorConfig { + @Bean("pipelineExecutor") + public ThreadPoolTaskExecutor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(100); + executor.setThreadNamePrefix("Pipeline-"); + ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy(); + executor.setRejectedExecutionHandler(callerRunsPolicy); + executor.initialize(); // 必须手动触发初始化 + return executor; + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java index b032b82..0ea126d 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/BaseDispatcher.java @@ -5,6 +5,6 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import java.util.List; -public interface BaseDispatcher { - void dispatch(); +public interface BaseDispatcher extends Runnable{ + void dispatch() throws InterruptedException; } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index 316f14e..c317d37 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -1,26 +1,67 @@ package cd.casic.ci.process.engine.dispatcher.impl; import cd.casic.ci.process.engine.dispatcher.BaseDispatcher; +import cd.casic.ci.process.engine.enums.ContextStateEnum; +import cd.casic.ci.process.engine.manager.RunContextManager; import cd.casic.ci.process.engine.runContext.PipelineRunContext; +import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.stage.PipStage; +import org.springframework.core.task.TaskExecutor; +import org.springframework.util.CollectionUtils; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; -public class ParallelDispatcher implements BaseDispatcher { +public class ParallelDispatcher implements BaseDispatcher{ private List firstStageList; private Integer stageIndex; - private PipelineRunContext context; + private PipelineRunContext pipelineRunContext; + private RunContextManager runContextManager; - public ParallelDispatcher(List firstStageList, PipelineRunContext context) { + public ParallelDispatcher(List firstStageList, PipelineRunContext context,RunContextManager contextManager) { this.firstStageList = firstStageList; - this.context = context; + this.pipelineRunContext = context; this.stageIndex = 0; + this.runContextManager = contextManager; + contextManager.contextRegister(context); } @Override - public void dispatch() { + public void dispatch() throws InterruptedException { + // 负责依次执行阶段 + for (PipStage stage : firstStageList) { + List stageList = stage.getStageList(); + if (CollectionUtils.isEmpty(stageList)) { + // 此处可以暂时记录日志 + continue; + } + CountDownLatch latch = new CountDownLatch(stageList.size()); + for (PipStage secondStage : stageList) { + // 二阶段下所有task是串行所以不用关心线程安全相关信息 + SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new HashMap<>(),new HashMap<>()); + + runContextManager.contextRegister(context); + SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch); + // 给线程池进行执行 + } + latch.await(); + } } + @Override + public void run() { + // TODO 计时 + try { + dispatch(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index 49f2a2d..ff2dc94 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -1,17 +1,43 @@ package cd.casic.ci.process.engine.dispatcher.impl; import cd.casic.ci.process.engine.dispatcher.BaseDispatcher; -import cd.casic.ci.process.engine.runContext.StageRunContext; +import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; +import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.dataObject.task.PipTask; import java.util.List; +import java.util.concurrent.CountDownLatch; public class SerialDispatcher implements BaseDispatcher { - private StageRunContext stageRunContext; - private List itemList; + private SecondStageRunContext stageRunContext; + private List taskList; + private CountDownLatch latch; + + public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch) { + this.stageRunContext = stageRunContext; + PipBaseElement contextDef = stageRunContext.getContextDef(); + if (contextDef instanceof PipStage) { + this.taskList = ((PipStage)contextDef).getTaskValues(); + } + this.latch = latch; + } + @Override public void dispatch() { + for (PipTask pipTask : taskList) { + // TODO 注册taskContext,且发送消息至消息队列给work执行 + } + } + @Override + public void run() { + try { + dispatch(); + latch.countDown(); + } catch (Exception e) { + throw new RuntimeException(e); + } } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java index 8263f10..a8e8ade 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java @@ -11,6 +11,7 @@ public enum ContextStateEnum { { add(READY); add(SUSPEND); + add(BAD_ENDING); add(STOP); } }), @@ -18,6 +19,7 @@ public enum ContextStateEnum { { add(RUNNING); add(SUSPEND); + add(BAD_ENDING); add(STOP); } }), @@ -34,6 +36,7 @@ public enum ContextStateEnum { add(INIT); add(READY); add(RUNNING); + add(BAD_ENDING); } }), STOP(-1,"停止", new HashSet<>()), diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java index cc0e919..b3a934a 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java @@ -10,10 +10,16 @@ import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.service.pipeline.PipelineService; import cd.casic.ci.process.process.service.stage.StageService; +import cd.casic.framework.commons.exception.ServiceException; +import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import jakarta.annotation.Resource; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; +import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -26,6 +32,9 @@ public class DefaultPipelineExecutor implements PipelineExecutor { private StageService stageService; @Resource private RunContextManager runContextManager; + @Resource + @Qualifier("pipelineExecutor") + private ThreadPoolTaskExecutor taskExecutor; @Override public PipelineRunContext execute(String pipelineId) { PipPipeline pipeline = pipelineService.getById(pipelineId); @@ -36,10 +45,12 @@ public class DefaultPipelineExecutor implements PipelineExecutor { String executeStatus = pipeline.getExecuteStatus(); // TODO 如果判断成功则查询所有的阶段信息 List mainStage = stageService.findAllFirstStagesAndChild(pipelineId); + if (CollectionUtils.isEmpty(mainStage)) { + throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息"); + } PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); - runContextManager.contextRegister(pipelineRunContext); -// ParallelDispatcher parallelDispatcher = new ParallelDispatcher(); - - return null; + ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager); + taskExecutor.execute(parallelDispatcher); + return pipelineRunContext; } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java index f3347c2..b0b8710 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java @@ -12,7 +12,7 @@ public interface RunContextManager { * */ Boolean notifyPipeline(String pipelineId); /** - * 挂起流水线-预留 + * 挂起流水线-预留 TODO 根据配置判断是否存入数据库,恢复执行再加载入内存 * */ Boolean suspendPipeline(String pipelineId); /** @@ -20,11 +20,11 @@ public interface RunContextManager { * */ Boolean notifyStage(String pipelineId,String stageId); /** - * 挂起子阶段-预留 + * 挂起子阶段-预留 TODO 根据配置判断是否存入数据库,恢复执行再加载入内存 * */ Boolean suspendStage(String pipelineId,String stageId); /** - * 判断相应的context类型,放入注册Map中 + * 判断相应的context类型,放入注册Map中(自动维护父子context关系) * */ void contextRegister(BaseRunContext context); BaseRunContext getContext(String key); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java index 65fd83e..94cacbe 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java @@ -63,10 +63,14 @@ public abstract class BaseRunContext { public void callParentChange(ContextStateEnum state){ if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { checkChildEnd(); + } else if(ContextStateEnum.READY.equals(state)){ + checkChildReady(); + } else if(ContextStateEnum.RUNNING.equals(state)){ + checkChildRunning(); } } /** - * 查找子类是否全部完成,如果子类全部完成则父类也全部完成 + * 查找子类是否全部完成,如果子类全部完成则父类也完成 * */ public void checkChildEnd() throws ServiceException{ int result = ContextStateEnum.HAPPY_ENDING.getCode(); @@ -94,4 +98,49 @@ public abstract class BaseRunContext { throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); } } + /** + * 查找子类是否全部就绪,如果子类全部完成则父类也就绪 + * */ + public void checkChildReady() throws ServiceException{ + int result = ContextStateEnum.READY.getCode(); + for (Map.Entry entry : childContext.entrySet()) { + BaseRunContext child = entry.getValue(); + int state = child.getState().get(); + if (!ContextStateEnum.READY.getCode().equals(state)) { + return; + } + result&=state; + } + boolean ready = false; + if (ContextStateEnum.READY.getCode()==result) { + if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.READY)) { + this.state.compareAndExchange(state.get(),ContextStateEnum.READY.getCode()); + ready = true; + } + } + if (!ready) { + throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); + } + } + /** + * 查找子类是否存在开始运行的,如果有则父状态变成running + * */ + public void checkChildRunning() throws ServiceException{ + Boolean runningFlag = false; + for (Map.Entry entry : childContext.entrySet()) { + BaseRunContext child = entry.getValue(); + int state = child.getState().get(); + if (ContextStateEnum.READY.getCode().equals(state)) { + runningFlag = true; + break; + } + } + if (runningFlag) { + if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.RUNNING)) { + this.state.compareAndExchange(state.get(),ContextStateEnum.RUNNING.getCode()); + } else{ + throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); + } + } + } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/MainStageContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/MainStageContext.java deleted file mode 100644 index 1486cab..0000000 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/MainStageContext.java +++ /dev/null @@ -1,22 +0,0 @@ -package cd.casic.ci.process.engine.runContext; - -import cd.casic.ci.process.process.dataObject.base.PipBaseElement; - -import java.time.LocalDateTime; -import java.util.Map; - -public class MainStageContext extends BaseRunContext{ - public MainStageContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { - super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext); - } - - @Override - public BaseRunContext getChildRunContext(String key) { - return null; - } - - @Override - public void putChildRunContext(String key, BaseRunContext context) { - - } -} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java index 8a4b672..3aedf6f 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java @@ -1,23 +1,22 @@ package cd.casic.ci.process.engine.runContext; -import cd.casic.ci.process.api.process.pojo.Pipeline; -import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import java.time.LocalDateTime; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; public class PipelineRunContext extends BaseRunContext{ - public PipelineRunContext(BaseRunContext parentContext,PipPipeline pipeline,Map globalVariables,Map localVariables) { - this(pipeline,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>()); + public PipelineRunContext(PipPipeline pipeline) { + this(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); + } + public PipelineRunContext(BaseRunContext parentContext, PipPipeline pipeline, Map globalVariables, Map localVariables) { + this(pipeline,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>()); } private PipelineRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { @@ -50,7 +49,7 @@ public class PipelineRunContext extends BaseRunContext{ @Override public void putChildRunContext(String key, BaseRunContext context) { Map childContext = getChildContext(); - if (context instanceof StageRunContext) { + if (context instanceof SecondStageRunContext) { childContext.put(key,context); } else { throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"不支持类型"); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/StageRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java similarity index 66% rename from modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/StageRunContext.java rename to modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java index f61223e..d02a623 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/StageRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java @@ -1,15 +1,17 @@ package cd.casic.ci.process.engine.runContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import java.time.LocalDateTime; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -public class StageRunContext extends BaseRunContext{ - public StageRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { - super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext); +public class SecondStageRunContext extends BaseRunContext{ + public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map globalVariables, Map localVariables) { + super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), globalVariables, localVariables, new ConcurrentHashMap<>(contextDef.getStageList().size())); } @Override diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java index 89a89f7..a0118e9 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java @@ -4,10 +4,6 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; import org.apache.commons.lang3.StringUtils; import java.time.LocalDateTime; @@ -15,7 +11,7 @@ import java.util.HashMap; import java.util.Map; public class TaskRunContext extends BaseRunContext{ - public TaskRunContext(PipTask contextDef, StageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables) { + public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables) { super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, new HashMap<>()); } private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/pipeline/PipPipeline.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/pipeline/PipPipeline.java index 7839298..2212c6d 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/pipeline/PipPipeline.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/pipeline/PipPipeline.java @@ -1,8 +1,10 @@ package cd.casic.ci.process.process.dataObject.pipeline; +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import lombok.Data; +import lombok.EqualsAndHashCode; import java.time.LocalDateTime; From 69b0c0b765c02ff25047c654ab8b042be8a86021 Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Sat, 17 May 2025 00:59:27 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=BC=95=E6=93=8E?= =?UTF-8?q?=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81=EF=BC=88=E6=9C=AA=E5=AE=8C?= =?UTF-8?q?=E6=88=90=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ci/process/engine/dispatcher/impl/SerialDispatcher.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index ff2dc94..d4d559b 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -37,7 +37,8 @@ public class SerialDispatcher implements BaseDispatcher { dispatch(); latch.countDown(); } catch (Exception e) { - throw new RuntimeException(e); +// throw new RuntimeException(e); + // TODO 当前stage标记为失败等待父context发现并处理 } } } From 69d168f4a806079f766d5be3aa1fcc9d7afeea4b Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Sat, 17 May 2025 01:10:21 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=BC=95=E6=93=8E?= =?UTF-8?q?=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81=EF=BC=88=E6=9C=AA=E5=AE=8C?= =?UTF-8?q?=E6=88=90=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../process/engine/dispatcher/impl/SerialDispatcher.java | 8 ++++++-- .../ci/process/process/dataObject/stage/PipStage.java | 4 ++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index d4d559b..e958c0c 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -14,12 +14,14 @@ public class SerialDispatcher implements BaseDispatcher { private SecondStageRunContext stageRunContext; private List taskList; private CountDownLatch latch; + private String triggerModel; public SerialDispatcher(SecondStageRunContext stageRunContext, CountDownLatch latch) { this.stageRunContext = stageRunContext; PipBaseElement contextDef = stageRunContext.getContextDef(); - if (contextDef instanceof PipStage) { - this.taskList = ((PipStage)contextDef).getTaskValues(); + if (contextDef instanceof PipStage secondStage) { + this.triggerModel = secondStage.getTriggerMode(); + this.taskList = secondStage.getTaskValues(); } this.latch = latch; } @@ -34,6 +36,8 @@ public class SerialDispatcher implements BaseDispatcher { @Override public void run() { try { + // TODO 检测触发方式如果需要手动触发,挂起当前stage,等待父级执行相应操作 + // TODO 看看需要内存入库还是忽略掉当前执行,进行入库(countDown放行) dispatch(); latch.countDown(); } catch (Exception e) { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/stage/PipStage.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/stage/PipStage.java index a93b298..ad3117d 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/stage/PipStage.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/stage/PipStage.java @@ -30,6 +30,10 @@ public class PipStage extends PipBaseElement { //@ApiProperty(name = "code",desc="是否是源码") private Boolean code = false; + /** + * 触发方式 0-手动触发/1-自动触发 + * */ + private String triggerMode; //@ApiProperty(name = "taskValues",desc="阶段任务") @TableField(exist = false) From 57a6f9613852acf80d20f9c9c5fe53f9c5501e29 Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Sat, 17 May 2025 01:14:44 +0800 Subject: [PATCH 4/4] =?UTF-8?q?TODO=E6=B7=BB=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ci/process/engine/dispatcher/impl/SerialDispatcher.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index e958c0c..e4d019b 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -38,6 +38,7 @@ public class SerialDispatcher implements BaseDispatcher { try { // TODO 检测触发方式如果需要手动触发,挂起当前stage,等待父级执行相应操作 // TODO 看看需要内存入库还是忽略掉当前执行,进行入库(countDown放行) +// stageRunContext.callParentChange(); dispatch(); latch.countDown(); } catch (Exception e) {