From c5ec84681d8a38ec05f193355383d113c20f236c Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Fri, 6 Jun 2025 12:20:46 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E6=96=AD=E7=82=B9=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cd/casic/ci/api/PipHistoryController.java | 4 ++ .../cd/casic/ci/api/PipelineController.java | 5 +++ .../resp/context/SingletonRunContextResp.java | 1 + .../dispatcher/impl/ParallelDispatcher.java | 4 +- .../dispatcher/impl/SerialDispatcher.java | 8 ++-- .../engine/enums/ContextStateEnum.java | 7 ++- .../engine/executor/PipelineExecutor.java | 1 + .../impl/DefaultPipelineExecutor.java | 43 ++++++++++++++++++- .../impl/DefaultRunContextManager.java | 16 ++++++- .../manager/impl/DefaultWorkerManager.java | 1 + .../engine/runContext/BaseRunContext.java | 9 ++-- .../engine/runContext/PipelineRunContext.java | 2 +- .../engine/worker/TargetHandleWorker.java | 24 +++++++++-- .../engine/worker/base/BaseWorker.java | 1 - 14 files changed, 106 insertions(+), 20 deletions(-) diff --git a/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java index 4048ccf0..6fe4aebc 100644 --- a/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java +++ b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java @@ -28,4 +28,8 @@ public class PipHistoryController { public CommonResult> list(@RequestBody PipelineHistoryQueryReq req){ return CommonResult.success(pipelineHistoryService.getPageByPipelineId(req)); } + @PostMapping("/getById/{id}") + public CommonResult getById(@PathVariable String id){ + return CommonResult.success(pipelineHistoryService.getById(id)); + } } diff --git a/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipelineController.java b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipelineController.java index a63533b3..6611be93 100644 --- a/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipelineController.java +++ b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipelineController.java @@ -116,4 +116,9 @@ public class PipelineController { public CommonResult getPipelineRunState(@PathVariable String pipelineId){ return CommonResult.success(pipelineService.getPipelineRunState(pipelineId)); } + @PostMapping("/traversePipelineContext/{pipelineId}") + public CommonResult traversePipelineContext(@PathVariable("pipelineId") String pipelineId,@RequestParam(required = false,defaultValue = "false") Boolean fromError){ + pipelineExecutor.traversePipelineContext(pipelineId); + return CommonResult.success(); + } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/dal/resp/context/SingletonRunContextResp.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/dal/resp/context/SingletonRunContextResp.java index f91d4447..7c70aaf3 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/dal/resp/context/SingletonRunContextResp.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/dal/resp/context/SingletonRunContextResp.java @@ -20,4 +20,5 @@ public class SingletonRunContextResp { private LocalDateTime startTime; private LocalDateTime endTime; private String duration; + private String logId; } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index 3579f8a7..0fa14cca 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -61,8 +61,10 @@ public class ParallelDispatcher implements BaseDispatcher{ BaseRunContext baseRunContext = runContextManager.getContext(secondStage.getId()); if (baseRunContext!=null) { context = (SecondStageRunContext) baseRunContext; + context.setContextDef(secondStage); + context.setChildCount(secondStage.getTaskValues().size()); } else { - // 防止运行停止以后添加节点 ,删除节点的同时也要删除上下文 + // 防止运行停止以后添加节点,以及到达之前未执行的节点 ,todo 删除节点的同时也要删除上下文 context = new SecondStageRunContext(secondStage,secondStage.getTaskValues().size(),pipelineRunContext,new ConcurrentHashMap<>()); runContextManager.contextRegister(context); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index b7d728b6..652f8068 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -42,7 +42,7 @@ public class SerialDispatcher implements BaseDispatcher { } @Override - public void dispatch() throws InterruptedException { + public void dispatch() { for (PipTask pipTask : taskList) { TaskRunContext taskRunContext = null; Map globalVariables = stageRunContext.getGlobalVariables(); @@ -55,8 +55,9 @@ public class SerialDispatcher implements BaseDispatcher { BaseRunContext context = contextManager.getContext(pipTask.getId()); if (context != null) { taskRunContext= (TaskRunContext) context; + context.setContextDef(pipTask); } else { - // 防止运行停止以后添加节点,删除节点的同时也要删除上下文 + // 防止运行停止以后添加节点,以及到达之前未执行的节点 ,todo 删除节点的同时也要删除上下文 taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>()); contextManager.contextRegister(taskRunContext); } @@ -64,8 +65,7 @@ public class SerialDispatcher implements BaseDispatcher { taskRunContext.changeContextState(ContextStateEnum.READY); TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask); redisMQTemplate.send(taskRunMessage); - // TODO 监听当前taskContext状态变成执行成功或者执行失败(worker当中改变状态为运行中、执行成功、执行失败) - + //监听当前taskContext状态变成执行成功或者执行失败(worker当中改变状态为运行中、执行成功、执行失败) AtomicInteger state = taskRunContext.getState(); // 如果不为正常执行成功就暂时阻塞直到有状态更改 while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode() diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java index 1c5f0910..828561ba 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java @@ -13,7 +13,8 @@ public enum ContextStateEnum { STOP(-1,"停止"), HAPPY_ENDING(4,"执行成功"), BAD_ENDING(5,"执行失败"), - SKIP_TO(6,"跳过") + SKIP_TO(6,"跳过"), + ERROR_EXECUTE_READY(7,"从错误处执行准备完毕") ; private Integer code; @@ -26,7 +27,9 @@ public enum ContextStateEnum { TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP,SKIP_TO)); TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP)); //...初始化其他状态转移关系 - TRANSITIONS.put(SKIP_TO,Collections.emptySet()); + TRANSITIONS.put(SKIP_TO,Set.of(HAPPY_ENDING)); + TRANSITIONS.put(BAD_ENDING,Set.of(ERROR_EXECUTE_READY)); + TRANSITIONS.put(ERROR_EXECUTE_READY,Set.of(ERROR_EXECUTE_READY,READY,RUNNING,SUSPEND,SKIP_TO,BAD_ENDING,HAPPY_ENDING)); // TRANSITIONS.put(HAPPY_ENDING,Set.of(RUNNING)); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/PipelineExecutor.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/PipelineExecutor.java index 892b7e97..826ae651 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/PipelineExecutor.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/PipelineExecutor.java @@ -5,4 +5,5 @@ import cd.casic.ci.process.enums.PiplineTriggerModeEnum; public interface PipelineExecutor { PipelineRunContext execute(String pipelineId, PiplineTriggerModeEnum triggerModeEnum,Boolean formError); + void traversePipelineContext(String pipelineId); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java index ebfe20df..41c7af91 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java @@ -2,11 +2,13 @@ package cd.casic.ci.process.engine.executor.impl; import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant; import cd.casic.ci.process.engine.dispatcher.impl.ParallelDispatcher; +import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.engine.executor.PipelineExecutor; import cd.casic.ci.process.engine.manager.LoggerManager; import cd.casic.ci.process.engine.manager.RunContextManager; import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.PipelineRunContext; +import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.enums.PiplineTriggerModeEnum; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.stage.PipStage; @@ -16,14 +18,18 @@ import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.mq.redis.core.RedisMQTemplate; import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.util.Collection; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @Component +@Slf4j public class DefaultPipelineExecutor implements PipelineExecutor { @Resource private PipelineService pipelineService; @@ -66,7 +72,8 @@ public class DefaultPipelineExecutor implements PipelineExecutor { ConcurrentHashMap globalVariable = null; globalVariable = new ConcurrentHashMap<>(); // 如果要做 容灾就需要重新将数据库存的记录按顺序加载入 - pipelineRunContext=new PipelineRunContext(pipeline,childCount,null,globalVariable,new ConcurrentHashMap<>()); + pipelineRunContext=new PipelineRunContext(pipeline,childCount,null,globalVariable,new ConcurrentHashMap<>()); + runContextManager.contextRegister(pipelineRunContext); } else { try { pipelineRunContext = (PipelineRunContext)runContextManager.getContext(pipelineId); @@ -74,12 +81,44 @@ public class DefaultPipelineExecutor implements PipelineExecutor { // 以后可以改成从 数据库重载 throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到上次的记录"); } + fromErrorInit(pipelineId); } + pipelineRunContext.setTriggerMode(triggerModeEnum); pipelineRunContext.getGlobalVariables().put(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR,formError); - runContextManager.contextRegister(pipelineRunContext); + + ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor); parallelExecutor.execute(parallelDispatcher); return pipelineRunContext; } + public void fromErrorInit(String pipelineId){ + List allContextList = new LinkedList<>(); + BaseRunContext context = runContextManager.getContext(pipelineId); + if (context ==null) { + return; + } + allContextList.add(context); + Collection stageContextList = context.getChildContext().values(); + allContextList.addAll(stageContextList); + for (BaseRunContext baseRunContext : stageContextList) { + Collection taskContextList = baseRunContext.getChildContext().values().stream().map(item->(TaskRunContext)item).toList(); + allContextList.addAll(taskContextList); + } + log.info("上下文分组完毕"); + for (BaseRunContext taskRunContext : allContextList) { + if (ContextStateEnum.BAD_ENDING.getCode().equals(taskRunContext.getState().get())) + taskRunContext.changeContextState(ContextStateEnum.ERROR_EXECUTE_READY); + } + } + public void traversePipelineContext(String pipelineId){ + BaseRunContext context = runContextManager.getContext(pipelineId); + Collection stageContextList = context.getChildContext().values(); + List allTaskList = new LinkedList<>(); + for (BaseRunContext baseRunContext : stageContextList) { + Collection taskContextList = baseRunContext.getChildContext().values().stream().map(item->(TaskRunContext)item).toList(); + allTaskList.addAll(taskContextList); + } + log.info("上下文分组完毕"); + } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java index 3e36e177..8115cf2e 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java @@ -1,6 +1,9 @@ package cd.casic.ci.process.engine.manager.impl; import cd.casic.ci.process.dal.req.pipeline.PipelineQueryReq; +import cd.casic.ci.process.dal.resp.context.TreeRunContextResp; +import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; +import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant; import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.engine.manager.LoggerManager; import cd.casic.ci.process.engine.manager.RunContextManager; @@ -37,6 +40,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; @Component @Slf4j @@ -178,7 +183,16 @@ public class DefaultRunContextManager implements RunContextManager { PipelineQueryReq pipelineQueryReq = new PipelineQueryReq(); pipelineQueryReq.setId(pipelineId); pipPipelineHisInstance.setDefTree(JSON.toJSONString(pipelineService.findPipelineById(pipelineQueryReq), SerializerFeature.DisableCircularReferenceDetect)); - pipPipelineHisInstance.setStateTree(JSON.toJSONString(pipelineService.getPipelineRunState(pipelineId),SerializerFeature.DisableCircularReferenceDetect)); + TreeRunContextResp pipelineRunState = pipelineService.getPipelineRunState(pipelineId); + Map collect = taskContextList.stream().collect(Collectors.toMap(item -> item.getContextDef().getId(), Function.identity())); + for (TreeRunContextResp treeRunContextResp : pipelineRunState.getTaskList()) { + if (collect.containsKey(treeRunContextResp.getId())) { + Map localVariables = collect.get(treeRunContextResp.getId()).getLocalVariables(); + Object logId = localVariables.get(EngineRuntimeConstant.HIS_LOG_KEY); + treeRunContextResp.setLogId(String.valueOf(logId)); + } + } + pipPipelineHisInstance.setStateTree(JSON.toJSONString(pipelineRunState,SerializerFeature.DisableCircularReferenceDetect)); pipPipelineHisInstance.setTargetVersionName(""); pipPipelineHisInstance.setTargetVersionId(pipeline.getTargetVersionId()); hisInstanceDao.insert(pipPipelineHisInstance); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java index d6082719..2d7d057a 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultWorkerManager.java @@ -80,6 +80,7 @@ public class DefaultWorkerManager extends WorkerManager { } ContextStateEnum byCode = ContextStateEnum.getByCode(context.getState().get()); if (ContextStateEnum.SUSPEND.equals(byCode)) { + // todo 可以添加进内存或者什么别的地方中,等待审批以后重新加入队列 throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"当前task为挂起状态,进入队列重新执行"); } String taskType = task.getTaskType(); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java index 50bba3b8..374efa6f 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java @@ -83,7 +83,7 @@ public abstract class BaseRunContext { } callParentChange(stateEnum); } else { - log.error("非法状态扭转直接忽略"); + log.error("非法状态扭转直接忽略,{},{}",curr,stateEnum); } } public void changeContextStateAndChild(ContextStateEnum stateEnum){ @@ -125,9 +125,7 @@ public abstract class BaseRunContext { * */ public void checkChildEnd() throws ServiceException{ Map childContext = getChildContext(); - if (childContext.size()!=childCount) { - return; - } + int result = ContextStateEnum.HAPPY_ENDING.getCode(); for (Map.Entry entry : childContext.entrySet()) { BaseRunContext child = entry.getValue(); @@ -144,6 +142,9 @@ public abstract class BaseRunContext { } boolean end = false; if (ContextStateEnum.HAPPY_ENDING.getCode()==result) { + if (childContext.size()!=childCount) { + return; + } if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.HAPPY_ENDING)) { this.changeContextState(ContextStateEnum.HAPPY_ENDING); end = true; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java index 5dbd9d1a..988b00f9 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java @@ -101,7 +101,7 @@ public class PipelineRunContext extends BaseRunContext{ } callParentChange(stateEnum); } else { - log.error("非法状态扭转直接忽略"); + log.error("非法状态扭转直接忽略,{},{}",curr,stateEnum); } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TargetHandleWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TargetHandleWorker.java index 6020087f..17d07f8a 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TargetHandleWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TargetHandleWorker.java @@ -2,13 +2,27 @@ package cd.casic.ci.process.engine.worker; import cd.casic.ci.process.common.WorkAtom; +import cd.casic.ci.process.dal.resp.resource.ResourceFindResp; import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.engine.worker.base.BaseWorker; +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.machine.MachineInfo; +import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; +import cd.casic.ci.process.process.dataObject.resource.PipResourceMachine; +import cd.casic.ci.process.process.dataObject.target.TargetVersion; +import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.ci.process.process.service.machine.MachineInfoService; import cd.casic.ci.process.process.service.target.TargetVersionService; +import cd.casic.ci.process.util.CryptogramUtil; import cd.casic.ci.process.util.SftpUploadUtil; +import cd.casic.framework.commons.exception.ServiceException; +import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.util.Map; /** * 目标处理worker @@ -23,6 +37,7 @@ public class TargetHandleWorker extends BaseWorker { private MachineInfoService machineInfoService; @Override public void execute(TaskRunContext context) { + // 暂无获取文件方式 todo 先注释掉 // String filePath = ""; // Map localVariables = context.getLocalVariables(); // PipBaseElement taskContextDef = context.getContextDef(); @@ -35,6 +50,7 @@ public class TargetHandleWorker extends BaseWorker { // if (StringUtils.isEmpty(pipeline.getTargetVersionId())){ // throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"目标文件不存在"); // } +// // TargetVersion targetVersion = targetVersionService.getById(pipeline.getTargetVersionId()); // filePath = targetVersion.getFilePath(); // File file = new File(filePath); @@ -45,16 +61,16 @@ public class TargetHandleWorker extends BaseWorker { // toBadEnding(); // } // // 上传文件 -// String machineId = pipeline.getMachineId(); -// MachineInfo byId = machineInfoService.getById(machineId); +// ResourceFindResp resourceById = getResourceManagerService().findResourceById(pipeline.getResourceId()); +// PipResourceMachine resourceMachine = resourceById.getResourceMachine(); // append(context,"开始文件上传"); // try { -// SftpUploadUtil.uploadFileViaSftp(byId.getMachineHost(),byId.getSshPort(),byId.getUsername(), CryptogramUtil.doDecrypt(byId.getPassword()),null,file.getAbsolutePath(),"/home/casic/706/ai_test_527",file.getName()); +// SftpUploadUtil.uploadFileViaSftp(resourceMachine.getMachineHost(),Integer.valueOf(resourceMachine.getSshPort()),resourceMachine.getUsername(), CryptogramUtil.doDecrypt(resourceMachine.getPassword()),null,file.getAbsolutePath(),"/home/casic/706/ai_test_527",file.getName()); // } catch (SftpUploadUtil.SftpUploadException e) { // log.error("文件上传失败",e); // toBadEnding(); // } -// append(context,"文件上传至"+byId.getMachineHost()+" /home/casic/706/ai_test_527"); +// append(context,"文件上传至"+resourceMachine.getMachineHost()+"/home/casic/706/ai_test_527"); // } } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java index 9c0d7d05..f1dedc45 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java @@ -68,7 +68,6 @@ public abstract class BaseWorker implements Runnable{ Object fromError = globalVariables.get(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR); if (Boolean.TRUE.equals(fromError)&&ContextStateEnum.HAPPY_ENDING.getCode().equals(taskRunContext.getState().get())) { append(taskRunContext,"跳过执行"+CommandConstant.ENTER); - return; } else{ if (this instanceof PassableWorker passableWorker) { taskRunContext.changeContextState(ContextStateEnum.SUSPEND); From 4487f4e97a695055ef50404db319a97b65317974 Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Fri, 6 Jun 2025 12:55:19 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E5=88=86=E9=A1=B5=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ci/process/dal/req/history/PipelineHistoryQueryReq.java | 5 +++-- .../cd/casic/ci/process/engine/worker/base/BaseWorker.java | 2 +- .../service/history/impl/PipelineHistoryServiceImpl.java | 4 +++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/dal/req/history/PipelineHistoryQueryReq.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/dal/req/history/PipelineHistoryQueryReq.java index c51ad5e1..90436a8a 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/dal/req/history/PipelineHistoryQueryReq.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/dal/req/history/PipelineHistoryQueryReq.java @@ -2,9 +2,10 @@ package cd.casic.ci.process.dal.req.history; import cd.casic.framework.commons.pojo.PageParam; import lombok.Data; +import lombok.EqualsAndHashCode; +@EqualsAndHashCode(callSuper = true) @Data -public class PipelineHistoryQueryReq { +public class PipelineHistoryQueryReq extends PageParam{ private String pipelineId; - private PageParam pageParam; } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java index f1dedc45..13341f95 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java @@ -63,7 +63,6 @@ public abstract class BaseWorker implements Runnable{ BaseRunContext context = contextManager.getContext(contextKey); if (context instanceof TaskRunContext taskRunContext){ try { - taskRunContext.changeContextState(ContextStateEnum.READY); Map globalVariables = context.getGlobalVariables(); Object fromError = globalVariables.get(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR); if (Boolean.TRUE.equals(fromError)&&ContextStateEnum.HAPPY_ENDING.getCode().equals(taskRunContext.getState().get())) { @@ -73,6 +72,7 @@ public abstract class BaseWorker implements Runnable{ taskRunContext.changeContextState(ContextStateEnum.SUSPEND); passableWorker.waitForPermission(); } else { + taskRunContext.changeContextState(ContextStateEnum.READY); taskRunContext.changeContextState(ContextStateEnum.RUNNING); } execute(taskRunContext); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/impl/PipelineHistoryServiceImpl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/impl/PipelineHistoryServiceImpl.java index 34cd5973..07b3932e 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/impl/PipelineHistoryServiceImpl.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/impl/PipelineHistoryServiceImpl.java @@ -8,6 +8,7 @@ import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.service.history.PipelineHistoryService; import cd.casic.ci.process.process.service.pipeline.PipelineService; +import cd.casic.framework.commons.pojo.PageParam; import cd.casic.framework.commons.pojo.PageResult; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -25,7 +26,8 @@ public class PipelineHistoryServiceImpl extends ServiceImpl getPageByPipelineId(PipelineHistoryQueryReq req) { LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(PipPipelineHisInstance::getPipelineId,req.getPipelineId()); - Page page = pipelineHisInstanceDao.selectPage(new Page(), wrapper); + wrapper.orderByDesc(PipPipelineHisInstance::getCreateTime); + Page page = pipelineHisInstanceDao.selectPage(new Page<>(req.getPageNo(),req.getPageSize()), wrapper); return new PageResult(page.getRecords(), page.getTotal(), page.getCurrent(), page.getSize()); } } From 43ede4dadee222b32af6b524b811f7d3f9fa7076 Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Fri, 6 Jun 2025 14:43:35 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E7=99=BB=E5=BD=95=E7=94=A8=E6=88=B7?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E6=96=B9=E6=B3=95=E6=9B=B4=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/DefaultRunContextManager.java | 1 - .../engine/manager/impl/MemoryLogManager.java | 2 +- .../engine/runContext/PipelineRunContext.java | 1 - .../pipeline/impl/PipelineServiceImpl.java | 2 +- .../service/stage/impl/StageServiceImpl.java | 2 +- .../service/task/impl/TaskServiceImpl.java | 2 +- .../ci/process/util/WebFrameworkUtils.java | 57 +++++++++++++++++++ 7 files changed, 61 insertions(+), 6 deletions(-) create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/util/WebFrameworkUtils.java diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java index 8115cf2e..6229ed0e 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java @@ -21,7 +21,6 @@ import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.ci.process.process.service.pipeline.PipelineService; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; -import cd.casic.framework.commons.util.util.WebFrameworkUtils; import cd.casic.framework.security.core.LoginUser; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java index cacaa1ec..d6b81c91 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java @@ -10,7 +10,7 @@ import cd.casic.ci.process.process.service.task.TaskService; import cd.casic.ci.process.util.snowflake.SnowflakeIdWorker; import cd.casic.ci.process.util.snowflake.SnowflakeIdentifierGenerator; import cd.casic.framework.commons.util.network.IpUtil; -import cd.casic.framework.commons.util.util.WebFrameworkUtils; +import cd.casic.ci.process.util.WebFrameworkUtils; import jakarta.annotation.Resource; import jakarta.servlet.http.HttpServletRequest; import lombok.extern.slf4j.Slf4j; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java index 988b00f9..676f397f 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java @@ -10,7 +10,6 @@ import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; -import cd.casic.framework.commons.util.util.WebFrameworkUtils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.annotation.JSONField; import com.fasterxml.jackson.annotation.JsonIgnore; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/pipeline/impl/PipelineServiceImpl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/pipeline/impl/PipelineServiceImpl.java index 049b8d99..6ef74ac2 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/pipeline/impl/PipelineServiceImpl.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/pipeline/impl/PipelineServiceImpl.java @@ -28,7 +28,7 @@ import cd.casic.ci.process.process.service.template.impl.TemplateManagerServiceI import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.commons.pojo.PageResult; -import cd.casic.framework.commons.util.util.WebFrameworkUtils; +import cd.casic.ci.process.util.WebFrameworkUtils; import cd.casic.framework.security.dal.user.AdminUserDO; import cd.casic.framework.tenant.core.service.AdminUserServiceImpl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/stage/impl/StageServiceImpl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/stage/impl/StageServiceImpl.java index 754ce1f9..0623e0a4 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/stage/impl/StageServiceImpl.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/stage/impl/StageServiceImpl.java @@ -16,7 +16,7 @@ import cd.casic.ci.process.process.service.stage.StageService; import cd.casic.ci.process.process.service.task.TaskService; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; -import cd.casic.framework.commons.util.util.WebFrameworkUtils; +import cd.casic.ci.process.util.WebFrameworkUtils; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/task/impl/TaskServiceImpl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/task/impl/TaskServiceImpl.java index 30a76bc9..cf8b71df 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/task/impl/TaskServiceImpl.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/task/impl/TaskServiceImpl.java @@ -11,7 +11,7 @@ import cd.casic.ci.process.process.service.task.TaskService; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.commons.pojo.CommonResult; -import cd.casic.framework.commons.util.util.WebFrameworkUtils; +import cd.casic.ci.process.util.WebFrameworkUtils; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import jakarta.annotation.Resource; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/util/WebFrameworkUtils.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/util/WebFrameworkUtils.java new file mode 100644 index 00000000..ce790948 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/util/WebFrameworkUtils.java @@ -0,0 +1,57 @@ +package cd.casic.ci.process.util; + +import cd.casic.framework.commons.config.WebProperties; +import cd.casic.framework.commons.enums.TerminalEnum; +import cd.casic.framework.commons.enums.UserTypeEnum; +import cd.casic.framework.commons.pojo.CommonResult; +import cd.casic.framework.security.core.LoginUser; +import cn.hutool.core.util.NumberUtil; +import jakarta.servlet.ServletRequest; +import jakarta.servlet.http.HttpServletRequest; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.web.context.request.RequestAttributes; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; + +/** + * 专属于 web 包的工具类 + * + * @author mianbin modified from yudao + */ +public class WebFrameworkUtils { + + + + + + public static String getLoginNickName() { + try { + Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); + LoginUser principal = (LoginUser) authentication.getPrincipal(); + return principal.getInfo().get("nickName"); + } catch (Exception e){ + return ""; + } + } + + public static Long getLoginUserId() { + try { + Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); + LoginUser principal = (LoginUser) authentication.getPrincipal(); + return principal.getId(); + } catch (Exception e){ + return null; + } + } + public static String getLoginUserIdStr(){ + Long loginUserId = getLoginUserId(); + if (loginUserId!=null) { + return String.valueOf(loginUserId); + } + return ""; + } + + + +} From 5a4ca4f971d86e2c61e9752bcac992a91bfeb907 Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Fri, 6 Jun 2025 15:42:43 +0800 Subject: [PATCH 4/4] =?UTF-8?q?mybatis=E5=AD=97=E6=AE=B5=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E6=B3=A8=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../process/config/DefaultDBFieldHandler.java | 62 +++++++++++++++++++ .../config/MybatisAutoConfiguration.java | 15 +++++ .../ci/process/util/WebFrameworkUtils.java | 4 -- 3 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/config/DefaultDBFieldHandler.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/config/MybatisAutoConfiguration.java diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/config/DefaultDBFieldHandler.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/config/DefaultDBFieldHandler.java new file mode 100644 index 00000000..7f6d61a6 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/config/DefaultDBFieldHandler.java @@ -0,0 +1,62 @@ +package cd.casic.ci.process.config; + +import cd.casic.ci.process.util.WebFrameworkUtils; +import cd.casic.framework.commons.dataobject.BaseDO; +import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler; +import org.apache.ibatis.reflection.MetaObject; + +import java.time.LocalDateTime; +import java.util.Objects; + +/** + * 通用参数填充实现类 + * + * 如果没有显式的对通用参数进行赋值,这里会对通用参数进行填充、赋值 + * + * @author hexiaowu + */ +public class DefaultDBFieldHandler implements MetaObjectHandler { + + @Override + public void insertFill(MetaObject metaObject) { + if (Objects.nonNull(metaObject) && metaObject.getOriginalObject() instanceof BaseDO) { + BaseDO baseDO = (BaseDO) metaObject.getOriginalObject(); + + LocalDateTime current = LocalDateTime.now(); + // 创建时间为空,则以当前时间为插入时间 + if (Objects.isNull(baseDO.getCreateTime())) { + baseDO.setCreateTime(current); + } + // 更新时间为空,则以当前时间为更新时间 + if (Objects.isNull(baseDO.getUpdateTime())) { + baseDO.setUpdateTime(current); + } + + Long userId = WebFrameworkUtils.getLoginUserId(); + // 当前登录用户不为空,创建人为空,则当前登录用户为创建人 + if (Objects.nonNull(userId) && Objects.isNull(baseDO.getCreator())) { + baseDO.setCreator(userId.toString()); + } + // 当前登录用户不为空,更新人为空,则当前登录用户为更新人 + if (Objects.nonNull(userId) && Objects.isNull(baseDO.getUpdater())) { + baseDO.setUpdater(userId.toString()); + } + } + } + + @Override + public void updateFill(MetaObject metaObject) { + // 更新时间为空,则以当前时间为更新时间 + Object modifyTime = getFieldValByName("updateTime", metaObject); + if (Objects.isNull(modifyTime)) { + setFieldValByName("updateTime", LocalDateTime.now(), metaObject); + } + + // 当前登录用户不为空,更新人为空,则当前登录用户为更新人 + Object modifier = getFieldValByName("updater", metaObject); + Long userId = WebFrameworkUtils.getLoginUserId(); + if (Objects.nonNull(userId) && Objects.isNull(modifier)) { + setFieldValByName("updater", userId.toString(), metaObject); + } + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/config/MybatisAutoConfiguration.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/config/MybatisAutoConfiguration.java new file mode 100644 index 00000000..bfc58cb5 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/config/MybatisAutoConfiguration.java @@ -0,0 +1,15 @@ +package cd.casic.ci.process.config; + +import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +@Configuration +public class MybatisAutoConfiguration { + @Bean("processMetaObjectHandler") + @Primary + public MetaObjectHandler processMetaObjectHandler() { + return new DefaultDBFieldHandler(); // 自动填充参数类 + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/util/WebFrameworkUtils.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/util/WebFrameworkUtils.java index ce790948..6b157d74 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/util/WebFrameworkUtils.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/util/WebFrameworkUtils.java @@ -21,10 +21,6 @@ import org.springframework.web.context.request.ServletRequestAttributes; */ public class WebFrameworkUtils { - - - - public static String getLoginNickName() { try { Authentication authentication = SecurityContextHolder.getContext().getAuthentication();