From b050ca2acb322f14fb52f3867fd7f63562e2cc60 Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Mon, 19 May 2025 20:41:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E6=B5=81=E7=A8=8B=E8=B0=83?= =?UTF-8?q?=E8=AF=95=E6=9A=82=E6=97=B6=E5=AE=8C=E6=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/compiler.xml | 2 +- .../framework/commons/dataobject/BaseDO.java | 2 +- .../dispatcher/impl/ParallelDispatcher.java | 2 +- .../engine/enums/ContextStateEnum.java | 73 +++++++------------ .../impl/DefaultPipelineExecutor.java | 6 +- .../manager/impl/DefaultWorkerManager.java | 1 + .../engine/runContext/BaseRunContext.java | 33 +++++---- .../engine/runContext/PipelineRunContext.java | 12 +-- .../runContext/SecondStageRunContext.java | 4 +- .../engine/runContext/TaskRunContext.java | 4 +- .../ci/process/engine/worker/BaseWorker.java | 3 +- .../process/engine/worker/TestGitWorker.java | 22 ++++++ .../ci/process/engine/worker/TestWorker.java | 2 +- .../server/controller/PipelineController.java | 9 +++ .../java/cd/casic/server/RedisMqTest.java | 4 +- 15 files changed, 101 insertions(+), 78 deletions(-) create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestGitWorker.java diff --git a/.idea/compiler.xml b/.idea/compiler.xml index ac357cf3..b1f55977 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -14,9 +14,9 @@ - + diff --git a/framework/commons/src/main/java/cd/casic/framework/commons/dataobject/BaseDO.java b/framework/commons/src/main/java/cd/casic/framework/commons/dataobject/BaseDO.java index ada3e2f1..31c316ef 100644 --- a/framework/commons/src/main/java/cd/casic/framework/commons/dataobject/BaseDO.java +++ b/framework/commons/src/main/java/cd/casic/framework/commons/dataobject/BaseDO.java @@ -52,7 +52,7 @@ public abstract class BaseDO implements Serializable, TransPojo { /** * 是否删除 */ - @TableLogic +// @TableLogic private Boolean deleted; } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index c162b19a..e3bc027b 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -51,7 +51,7 @@ public class ParallelDispatcher implements BaseDispatcher{ CountDownLatch latch = new CountDownLatch(stageList.size()); for (PipStage secondStage : stageList) { // 二阶段下所有task是串行所以不用关心线程安全相关信息 - SecondStageRunContext context = new SecondStageRunContext(secondStage,pipelineRunContext,new ConcurrentHashMap<>()); + SecondStageRunContext context = new SecondStageRunContext(secondStage,secondStage.getTaskValues().size(),pipelineRunContext,new ConcurrentHashMap<>()); runContextManager.contextRegister(context); SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate); // 给线程池进行执行 diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java index a8e8ade8..ed8e5cdb 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java @@ -2,65 +2,46 @@ package cd.casic.ci.process.engine.enums; import lombok.Getter; -import java.util.HashSet; -import java.util.Objects; -import java.util.Set; +import java.util.*; + @Getter public enum ContextStateEnum { - INIT(0,"初始化", new HashSet<>(){ - { - add(READY); - add(SUSPEND); - add(BAD_ENDING); - add(STOP); - } - }), - READY(1,"就绪", new HashSet<>(){ - { - add(RUNNING); - add(SUSPEND); - add(BAD_ENDING); - add(STOP); - } - }), - RUNNING(2,"运行", new HashSet<>(){ - { - add(SUSPEND); - add(STOP); - add(HAPPY_ENDING); - add(BAD_ENDING); - } - }), - SUSPEND(3,"挂起", new HashSet<>(){ - { - add(INIT); - add(READY); - add(RUNNING); - add(BAD_ENDING); - } - }), - STOP(-1,"停止", new HashSet<>()), - HAPPY_ENDING(4,"执行成功", new HashSet<>()), - BAD_ENDING(5,"执行失败", new HashSet<>()) + INIT(0,"初始化"), + READY(1,"就绪"), + RUNNING(2,"运行"), + SUSPEND(3,"挂起"), + STOP(-1,"停止"), + HAPPY_ENDING(4,"执行成功"), + BAD_ENDING(5,"执行失败") ; private Integer code; private String msg; - /** - * 包含当前所有合法的下一个状态 - * */ - private Set nextStep; + private static final Map> TRANSITIONS = new EnumMap<>(ContextStateEnum.class); - ContextStateEnum(Integer code, String msg, Set nextStep) { + static { + TRANSITIONS.put(INIT, Set.of(READY, SUSPEND, BAD_ENDING, STOP)); + TRANSITIONS.put(READY, Set.of(READY,RUNNING, SUSPEND, BAD_ENDING, STOP)); + TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP)); + TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP)); + //...初始化其他状态转移关系 + } + + ContextStateEnum(Integer code, String msg) { this.code = code; this.msg = msg; - this.nextStep = nextStep; + } public static Boolean canGoto(ContextStateEnum from,ContextStateEnum to){ - if (Objects.isNull(from) || Objects.isNull(to)) { + try { + if (Objects.isNull(from) || Objects.isNull(to)) { + return false; + } + return TRANSITIONS.get(from).contains(to); + } catch (Exception e){ + System.out.println(""); return false; } - return from.nextStep.contains(to); } public static ContextStateEnum getByCode(Integer code){ for (ContextStateEnum value : values()) { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java index 0d8e0e32..4ba6f78f 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java @@ -52,8 +52,12 @@ public class DefaultPipelineExecutor implements PipelineExecutor { if (CollectionUtils.isEmpty(mainStage)) { throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到有效阶段信息"); } + Integer childCount= 0; + for (PipStage stage : mainStage) { + childCount+=stage.getStageList().size(); + } // 如果要做 容灾就需要重新将数据库存的记录按顺序加载入 - PipelineRunContext pipelineRunContext = new PipelineRunContext(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); + PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline,childCount,null,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor); parallelExecutor.execute(parallelDispatcher); return pipelineRunContext; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java index 2ca1b724..df8163d6 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java @@ -76,6 +76,7 @@ public class DefaultWorkerManager extends AbstractRedisStreamMessageListener localVariables; private Map childContext; - public BaseRunContext(PipBaseElement contextDef,BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { + public BaseRunContext(PipBaseElement contextDef,Integer childCount,BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { this.contextDef = contextDef; + this.childCount = childCount; this.parentContext = parentContext; this.state = new AtomicInteger(ContextStateEnum.INIT.getCode()); this.startTime = startTime; @@ -88,6 +94,10 @@ public abstract class BaseRunContext { * 查找子类是否全部完成,如果子类全部完成则父类也完成 * */ public void checkChildEnd() throws ServiceException{ + Map childContext = getChildContext(); + if (childContext.size()!=childCount) { + return; + } int result = ContextStateEnum.HAPPY_ENDING.getCode(); for (Map.Entry entry : childContext.entrySet()) { BaseRunContext child = entry.getValue(); @@ -100,18 +110,16 @@ public abstract class BaseRunContext { boolean end = false; if (ContextStateEnum.HAPPY_ENDING.getCode()==result) { if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.HAPPY_ENDING)) { - this.state.compareAndExchange(state.get(),ContextStateEnum.HAPPY_ENDING.getCode()); + this.changeContextState(ContextStateEnum.HAPPY_ENDING); end = true; } } else { if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.BAD_ENDING)) { - this.state.compareAndExchange(state.get(),ContextStateEnum.BAD_ENDING.getCode()); + this.changeContextState(ContextStateEnum.BAD_ENDING); end = true; } } - if (end) { - this.changeContextState(ContextStateEnum.getByCode(result)); - } + // 当前执行结束看看是否要执行后置处理 } /** * 查找子类是否全部就绪,如果子类全部完成则父类也就绪 @@ -127,16 +135,11 @@ public abstract class BaseRunContext { } result&=state; } - boolean ready = false; - if (ContextStateEnum.READY.getCode()==result) { + if (ContextStateEnum.READY.getCode().equals(result)) { if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.READY)) { - this.state.compareAndExchange(state.get(),ContextStateEnum.READY.getCode()); - ready = true; + this.changeContextState(ContextStateEnum.READY); } } - if (!ready) { - throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); - } } /** * 查找子类是否存在开始运行的,如果有则父状态变成running @@ -147,14 +150,14 @@ public abstract class BaseRunContext { for (Map.Entry entry : childContext.entrySet()) { BaseRunContext child = entry.getValue(); int state = child.getState().get(); - if (ContextStateEnum.READY.getCode().equals(state)) { + if (ContextStateEnum.RUNNING.getCode().equals(state)) { runningFlag = true; break; } } if (runningFlag) { if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.RUNNING)) { - this.state.compareAndExchange(state.get(),ContextStateEnum.RUNNING.getCode()); + this.changeContextState(ContextStateEnum.RUNNING); } else{ throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java index 27d3a0ae..3565ae51 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java @@ -1,5 +1,6 @@ package cd.casic.ci.process.engine.runContext; +import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.framework.commons.exception.ServiceException; @@ -11,16 +12,17 @@ import java.util.concurrent.ConcurrentHashMap; public class PipelineRunContext extends BaseRunContext{ - public PipelineRunContext(PipPipeline pipeline) { - this(null,pipeline,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); + public PipelineRunContext(PipPipeline pipeline,Integer childCount) { + this(pipeline,childCount,null,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); } - public PipelineRunContext(BaseRunContext parentContext, PipPipeline pipeline, Map globalVariables, Map localVariables) { - this(pipeline,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>()); + public PipelineRunContext(PipPipeline pipeline,Integer childCount,BaseRunContext parentContext, Map globalVariables, Map localVariables) { + this(pipeline,childCount,parentContext,LocalDateTime.now(),pipeline.getResourceId(),pipeline.getTargetId(),pipeline.getTargetType(),globalVariables,localVariables,new ConcurrentHashMap<>()); } - private PipelineRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { + private PipelineRunContext(PipBaseElement contextDef,Integer childCount, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { super( contextDef + ,childCount ,parentContext ,startTime , resourceId diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java index c9efb652..fcc2d72a 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/SecondStageRunContext.java @@ -11,8 +11,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class SecondStageRunContext extends BaseRunContext{ - public SecondStageRunContext(PipStage contextDef, PipelineRunContext parentContext, Map localVariables) { - super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), localVariables, new ConcurrentHashMap<>()); + public SecondStageRunContext(PipStage contextDef,Integer childCount, PipelineRunContext parentContext, Map localVariables) { + super(contextDef,childCount, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(), localVariables, new ConcurrentHashMap<>()); } @Override diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java index f63d6634..584429ff 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/TaskRunContext.java @@ -12,10 +12,10 @@ import java.util.Map; public class TaskRunContext extends BaseRunContext{ public TaskRunContext(PipTask contextDef, SecondStageRunContext parentContext,Map localVariable) { - super(contextDef, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(),localVariable, new HashMap<>()); + super(contextDef,0, parentContext, LocalDateTime.now(), parentContext.getResourceId(), parentContext.getTargetId(), parentContext.getTargetType(), parentContext.getGlobalVariables(),localVariable, new HashMap<>()); } private TaskRunContext(PipBaseElement contextDef, BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetId, String targetType, Map globalVariables, Map localVariables, Map childContext) { - super(contextDef, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext); + super(contextDef,0, parentContext, startTime, resourceId, targetId, targetType, globalVariables, localVariables, childContext); } /** diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/BaseWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/BaseWorker.java index d31d7928..0c2bfcc1 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/BaseWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/BaseWorker.java @@ -17,7 +17,6 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Data - public abstract class BaseWorker implements Runnable{ // 一些属性 @Resource @@ -36,10 +35,12 @@ public abstract class BaseWorker implements Runnable{ BaseRunContext context = contextManager.getContext(contextKey); if (context instanceof TaskRunContext taskRunContext){ try { + taskRunContext.changeContextState(ContextStateEnum.READY); taskRunContext.changeContextState(ContextStateEnum.RUNNING); execute(taskRunContext); } catch (Exception e) { taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING); + return; } // TODO 执行结束修改context的state,并且通知父类 taskRunContext.changeContextState(ContextStateEnum.HAPPY_ENDING); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestGitWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestGitWorker.java new file mode 100644 index 00000000..a7573325 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestGitWorker.java @@ -0,0 +1,22 @@ +package cd.casic.ci.process.engine.worker; + +import cd.casic.ci.common.pipeline.annotation.Plugin; +import cd.casic.ci.process.engine.runContext.TaskRunContext; +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +@Plugin(taskType = "Github") +public class TestGitWorker extends BaseWorker{ + + + @Override + public void execute(TaskRunContext context) { + PipBaseElement contextDef = context.getContextDef(); + String id = contextDef.getId(); + log.info("==============触发worker执行========"); + log.info("==========运行context:{}===========", JSON.toJSONString(context)); + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java index 9e3cb5cf..85173715 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java @@ -9,7 +9,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Slf4j -@Plugin(taskType = "testTask") +@Plugin(taskType = "test") public class TestWorker extends BaseWorker{ diff --git a/ops-server/src/main/java/cd/casic/server/controller/PipelineController.java b/ops-server/src/main/java/cd/casic/server/controller/PipelineController.java index 5b855b8d..3980a76d 100644 --- a/ops-server/src/main/java/cd/casic/server/controller/PipelineController.java +++ b/ops-server/src/main/java/cd/casic/server/controller/PipelineController.java @@ -6,6 +6,8 @@ import cd.casic.ci.common.pipeline.req.pipeline.PipelineReq; import cd.casic.ci.common.pipeline.req.pipeline.PipelineUpdateReq; import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp; import cd.casic.ci.common.pipeline.utils.PageResult; +import cd.casic.ci.process.engine.executor.PipelineExecutor; +import cd.casic.ci.process.engine.runContext.PipelineRunContext; import cd.casic.ci.process.process.service.pipeline.PipelineService; import cd.casic.framework.commons.pojo.CommonResult; import jakarta.annotation.Resource; @@ -32,6 +34,8 @@ public class PipelineController { @Resource private PipelineService pipelineService; + @Resource + private PipelineExecutor pipelineExecutor; @PermitAll @PostMapping(path="/createPipeline") @@ -96,4 +100,9 @@ public class PipelineController { return CommonResult.success(); } + @PostMapping("/executePipeline") + public CommonResult executePipeline(String pipelineId){ + PipelineRunContext execute = pipelineExecutor.execute(pipelineId); + return CommonResult.success(execute); + } } diff --git a/ops-server/src/test/java/cd/casic/server/RedisMqTest.java b/ops-server/src/test/java/cd/casic/server/RedisMqTest.java index 42faccfd..9436d364 100644 --- a/ops-server/src/test/java/cd/casic/server/RedisMqTest.java +++ b/ops-server/src/test/java/cd/casic/server/RedisMqTest.java @@ -41,11 +41,11 @@ public class RedisMqTest { pipTask.setStageId("testStage"); PipPipeline pipeline = new PipPipeline(); pipeline.setId("testPipeline"); - PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline); + PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline,1); PipStage stage = new PipStage(); stage.setId("testStage"); stage.setParentId("testPipeline"); - SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,pipelineRunContext,new ConcurrentHashMap<>()); + SecondStageRunContext secondStageRunContext = new SecondStageRunContext(stage,1,pipelineRunContext,new ConcurrentHashMap<>()); TaskRunContext taskRunContext = new TaskRunContext(pipTask,secondStageRunContext,new HashMap<>()); contextManager.contextRegister(pipelineRunContext); contextManager.contextRegister(secondStageRunContext);