From 020c4a0a6d8e55315a76e4af591c8177ef167dce Mon Sep 17 00:00:00 2001 From: even <827656971@qq.com> Date: Thu, 5 Jun 2025 15:27:24 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=B5=81=E6=B0=B4=E7=BA=BF=E4=BB=8E=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E5=A4=84=E5=BC=80=E5=A7=8B=E6=89=A7=E8=A1=8C&?= =?UTF-8?q?=E4=BB=8E=E5=A4=B4=E5=BC=80=E5=A7=8B=E6=89=A7=E8=A1=8C=EF=BC=8C?= =?UTF-8?q?=E8=BF=90=E8=A1=8C=E4=B8=AD=E6=B5=81=E7=A8=8B=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E5=8F=91=E8=B5=B7=E8=BF=90=E8=A1=8C=E6=8A=A5=E9=94=99=EF=BC=88?= =?UTF-8?q?=E5=88=9D=E6=AD=A5=E6=B5=8B=E8=AF=95=E9=80=9A=E8=BF=87=EF=BC=89?= =?UTF-8?q?=202.=E6=B5=81=E6=B0=B4=E7=BA=BF=E6=89=A7=E8=A1=8C=E5=8E=86?= =?UTF-8?q?=E5=8F=B2=E6=9F=A5=E8=AF=A2=E4=B8=8E=E5=85=A5=E5=BA=93=E6=96=B9?= =?UTF-8?q?=E6=B3=95=EF=BC=88=E5=88=9D=E6=AD=A5=E6=B5=8B=E8=AF=95=E9=80=9A?= =?UTF-8?q?=E8=BF=87=EF=BC=89=203.context=E8=8E=B7=E5=8F=96=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E8=8E=B7=E5=8F=96=E4=B8=8D=E5=88=B0=E6=94=B9=E4=B8=BA?= =?UTF-8?q?=E4=BD=BF=E7=94=A8springsecurity=E7=9A=84ContextHolder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- framework/commons/pom.xml | 23 ++++- .../commons/util/util/WebFrameworkUtils.java | 14 +++- .../datapermission/convert/TenantConvert.java | 2 +- .../core/util/SecurityFrameworkUtils.java | 1 + .../cd/casic/ci/api/PipHistoryController.java | 32 +++++++ .../cd/casic/ci/api/PipelineController.java | 12 ++- .../constant/EngineRuntimeConstant.java | 2 +- .../constant/PipelineBehaviorConstant.java | 2 +- .../dispatcher/impl/ParallelDispatcher.java | 26 +++++- .../dispatcher/impl/SerialDispatcher.java | 23 ++++- .../engine/enums/ContextStateEnum.java | 1 + .../engine/executor/PipelineExecutor.java | 3 +- .../impl/DefaultPipelineExecutor.java | 28 ++++++- .../process/engine/manager/LoggerManager.java | 4 +- .../engine/manager/RunContextManager.java | 2 + .../impl/DefaultRunContextManager.java | 84 +++++++++++++++++-- .../engine/manager/impl/MemoryLogManager.java | 80 +++++++++++------- .../engine/runContext/BaseRunContext.java | 39 +++++---- .../engine/runContext/PipelineRunContext.java | 49 ++++++++++- .../scheduler/job/PipelineSchedulingJob.java | 3 +- .../engine/worker/ApplicationWorker.java | 2 - .../process/engine/worker/CodingWorker.java | 2 - .../worker/DIYImageExecuteCommandWorker.java | 1 - .../process/engine/worker/ScaSbomWorker.java | 2 - .../engine/worker/base/BaseWorker.java | 19 +++-- .../history/PipPipelineHisInstanceDao.java | 7 ++ .../history/PipPipelineHisInstance.java | 33 ++++++++ .../history/PipelineHistoryService.java | 11 +++ .../impl/PipelineHistoryServiceImpl.java | 27 ++++++ .../service/taskLog/TaskLogService.java | 8 ++ .../taskLog/impl/TaskLogServiceImpl.java | 11 +++ .../cd/casic/server/OpsServerApplication.java | 2 +- .../cd/casic/server/PipelineExecuteTest.java | 12 ++- 33 files changed, 476 insertions(+), 91 deletions(-) create mode 100644 modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dal/history/PipPipelineHisInstanceDao.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/history/PipPipelineHisInstance.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/PipelineHistoryService.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/impl/PipelineHistoryServiceImpl.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/TaskLogService.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/impl/TaskLogServiceImpl.java diff --git a/framework/commons/pom.xml b/framework/commons/pom.xml index bb63b37e..44bb9141 100644 --- a/framework/commons/pom.xml +++ b/framework/commons/pom.xml @@ -194,6 +194,27 @@ - + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + org.projectlombok + lombok + ${lombok.version} + + + org.mapstruct + mapstruct-processor + ${mapstruct.version} + + + + + + \ No newline at end of file diff --git a/framework/commons/src/main/java/cd/casic/framework/commons/util/util/WebFrameworkUtils.java b/framework/commons/src/main/java/cd/casic/framework/commons/util/util/WebFrameworkUtils.java index 23be0ef6..484c91d6 100644 --- a/framework/commons/src/main/java/cd/casic/framework/commons/util/util/WebFrameworkUtils.java +++ b/framework/commons/src/main/java/cd/casic/framework/commons/util/util/WebFrameworkUtils.java @@ -20,6 +20,7 @@ import org.springframework.web.context.request.ServletRequestAttributes; public class WebFrameworkUtils { private static final String REQUEST_ATTRIBUTE_LOGIN_USER_ID = "login_user_id"; + private static final String REQUEST_ATTRIBUTE_LOGIN_USER_NAME = "login_user_name"; private static final String REQUEST_ATTRIBUTE_LOGIN_USER_TYPE = "login_user_type"; private static final String REQUEST_ATTRIBUTE_COMMON_RESULT = "common_result"; @@ -38,7 +39,6 @@ public class WebFrameworkUtils { public WebFrameworkUtils(WebProperties webProperties) { WebFrameworkUtils.properties = webProperties; } - /** * 获得租户编号,从 header 中 * 考虑到其它 framework 组件也会使用到租户编号,所以不得不放在 WebFrameworkUtils 统一提供 @@ -54,6 +54,18 @@ public class WebFrameworkUtils { public static void setLoginUserId(ServletRequest request, Long userId) { request.setAttribute(REQUEST_ATTRIBUTE_LOGIN_USER_ID, userId); } + public static void setLoginNickName(ServletRequest request, String userName) { + request.setAttribute(REQUEST_ATTRIBUTE_LOGIN_USER_ID, userName); + } + public static String getLoginNickName() { + return getLoginNickName(getRequest()); + } + public static String getLoginNickName(HttpServletRequest request) { + if (request == null) { + return null; + } + return (String) request.getAttribute(REQUEST_ATTRIBUTE_LOGIN_USER_NAME); + } /** * 设置用户类型 diff --git a/framework/spring-boot-starter-biz-data-permission/src/main/java/cd/casic/framework/datapermission/convert/TenantConvert.java b/framework/spring-boot-starter-biz-data-permission/src/main/java/cd/casic/framework/datapermission/convert/TenantConvert.java index d03871bb..45487c68 100644 --- a/framework/spring-boot-starter-biz-data-permission/src/main/java/cd/casic/framework/datapermission/convert/TenantConvert.java +++ b/framework/spring-boot-starter-biz-data-permission/src/main/java/cd/casic/framework/datapermission/convert/TenantConvert.java @@ -10,7 +10,7 @@ import org.mapstruct.factory.Mappers; * * @author mianbin modified from yudao */ -@Mapper +@Mapper(componentModel = "spring") public interface TenantConvert { TenantConvert INSTANCE = Mappers.getMapper(TenantConvert.class); diff --git a/framework/spring-boot-starter-security/src/main/java/cd/casic/framework/security/core/util/SecurityFrameworkUtils.java b/framework/spring-boot-starter-security/src/main/java/cd/casic/framework/security/core/util/SecurityFrameworkUtils.java index 3287388b..ca6cce4f 100644 --- a/framework/spring-boot-starter-security/src/main/java/cd/casic/framework/security/core/util/SecurityFrameworkUtils.java +++ b/framework/spring-boot-starter-security/src/main/java/cd/casic/framework/security/core/util/SecurityFrameworkUtils.java @@ -127,6 +127,7 @@ public class SecurityFrameworkUtils { // 原因是,Spring Security 的 Filter 在 ApiAccessLogFilter 后面,在它记录访问日志时,线上上下文已经没有用户编号等信息 WebFrameworkUtils.setLoginUserId(request, loginUser.getId()); WebFrameworkUtils.setLoginUserType(request, loginUser.getUserType()); + WebFrameworkUtils.setLoginNickName(request,loginUser.getInfo().get("nickName")); } private static Authentication buildAuthentication(LoginUser loginUser, HttpServletRequest request) { diff --git a/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java new file mode 100644 index 00000000..0fc1cf9f --- /dev/null +++ b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java @@ -0,0 +1,32 @@ +package cd.casic.ci.api; + +import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance; +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.service.history.PipelineHistoryService; +import cd.casic.ci.process.process.service.taskLog.TaskLogService; +import cd.casic.framework.commons.pojo.CommonResult; +import jakarta.annotation.Resource; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +@RestController +@RequestMapping("/history") +public class PipHistoryController { + @Resource + TaskLogService taskLogService; + @Resource + PipelineHistoryService pipelineHistoryService; + @GetMapping("/getLogById") + public CommonResult getLogById(String id){ + PipTaskLog byId = taskLogService.getById(id); + return CommonResult.success(byId); + } + @GetMapping("/list") + public CommonResult> list(String pipelineId){ + List list = pipelineHistoryService.list(); + return CommonResult.success(list); + } +} diff --git a/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipelineController.java b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipelineController.java index ef4a083a..a63533b3 100644 --- a/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipelineController.java +++ b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipelineController.java @@ -9,13 +9,19 @@ import cd.casic.ci.process.dal.resp.context.TreeRunContextResp; import cd.casic.ci.process.dal.resp.pipeline.PipelineFindResp; import cd.casic.ci.process.engine.executor.PipelineExecutor; import cd.casic.ci.process.engine.runContext.PipelineRunContext; +import cd.casic.ci.process.enums.PiplineTriggerModeEnum; import cd.casic.ci.process.process.service.pipeline.PipelineService; import cd.casic.framework.commons.pojo.CommonResult; import cd.casic.framework.commons.pojo.PageResult; +import cd.casic.framework.commons.util.util.WebFrameworkUtils; +import cd.casic.framework.security.core.LoginUser; import jakarta.annotation.Resource; import jakarta.annotation.security.PermitAll; +import jakarta.servlet.http.HttpServletRequest; import jakarta.validation.Valid; import org.jetbrains.annotations.NotNull; +import org.springframework.boot.actuate.autoconfigure.metrics.MetricsProperties; +import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.web.bind.annotation.*; import java.util.List; @@ -99,9 +105,11 @@ public class PipelineController { return CommonResult.success(); } + @Resource + private HttpServletRequest request; @PostMapping("/executePipeline/{pipelineId}") - public CommonResult executePipeline(@PathVariable String pipelineId){ - PipelineRunContext execute = pipelineExecutor.execute(pipelineId); + public CommonResult executePipeline(@PathVariable("pipelineId") String pipelineId,@RequestParam(required = false,defaultValue = "false") Boolean fromError){ + PipelineRunContext execute = pipelineExecutor.execute(pipelineId, PiplineTriggerModeEnum.HAND,fromError); return CommonResult.success(execute); } @PostMapping("/getStageRunState/{pipelineId}") diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/constant/EngineRuntimeConstant.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/constant/EngineRuntimeConstant.java index e1e46406..d11b2612 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/constant/EngineRuntimeConstant.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/constant/EngineRuntimeConstant.java @@ -1,5 +1,5 @@ package cd.casic.ci.process.engine.constant; public class EngineRuntimeConstant { - public static final String LOG_KEY = "logContent"; + public static final String HIS_LOG_KEY = "logId"; } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/constant/PipelineBehaviorConstant.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/constant/PipelineBehaviorConstant.java index c4a81d78..a3fbd093 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/constant/PipelineBehaviorConstant.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/constant/PipelineBehaviorConstant.java @@ -10,5 +10,5 @@ public class PipelineBehaviorConstant { /** * 流水线是否从上次错误处执行 * */ - public static final String PIPELINE_EXECUTE_FROM_ERROR=""; + public static final String PIPELINE_EXECUTE_FROM_ERROR="fromError"; } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index 1c0250e9..3579f8a7 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -1,17 +1,22 @@ package cd.casic.ci.process.engine.dispatcher.impl; +import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant; import cd.casic.ci.process.engine.dispatcher.BaseDispatcher; import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.engine.manager.RunContextManager; +import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.PipelineRunContext; import cd.casic.ci.process.engine.runContext.SecondStageRunContext; +import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.framework.mq.redis.core.RedisMQTemplate; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.CollectionUtils; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @Slf4j @@ -44,9 +49,24 @@ public class ParallelDispatcher implements BaseDispatcher{ } CountDownLatch latch = new CountDownLatch(stageList.size()); for (PipStage secondStage : stageList) { - // 二阶段下所有task是串行所以不用关心线程安全相关信息 - SecondStageRunContext context = new SecondStageRunContext(secondStage,secondStage.getTaskValues().size(),pipelineRunContext,new ConcurrentHashMap<>()); - runContextManager.contextRegister(context); + Map globalVariables = pipelineRunContext.getGlobalVariables(); + Object fromError = globalVariables.get(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR); + SecondStageRunContext context = null; + if (!Boolean.TRUE.equals(fromError)) { + // 注册taskContext,且发送消息至消息队列给work执行, 如果需要则传入参数 + // 二阶段下所有task是串行所以不用关心线程安全相关信息 + context = new SecondStageRunContext(secondStage,secondStage.getTaskValues().size(),pipelineRunContext,new ConcurrentHashMap<>()); + runContextManager.contextRegister(context); + } else { + BaseRunContext baseRunContext = runContextManager.getContext(secondStage.getId()); + if (baseRunContext!=null) { + context = (SecondStageRunContext) baseRunContext; + } else { + // 防止运行停止以后添加节点 ,删除节点的同时也要删除上下文 + context = new SecondStageRunContext(secondStage,secondStage.getTaskValues().size(),pipelineRunContext,new ConcurrentHashMap<>()); + runContextManager.contextRegister(context); + } + } SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate); // 给线程池进行执行 taskExecutor.execute(serialDispatcher); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index a2e49f14..b7d728b6 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -1,9 +1,11 @@ package cd.casic.ci.process.engine.dispatcher.impl; +import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant; import cd.casic.ci.process.engine.dispatcher.BaseDispatcher; import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.engine.manager.RunContextManager; import cd.casic.ci.process.engine.message.TaskRunMessage; +import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; @@ -15,6 +17,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @@ -41,9 +44,23 @@ public class SerialDispatcher implements BaseDispatcher { @Override public void dispatch() throws InterruptedException { for (PipTask pipTask : taskList) { - // 注册taskContext,且发送消息至消息队列给work执行, 如果需要则传入参数 - TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>()); - contextManager.contextRegister(taskRunContext); + TaskRunContext taskRunContext = null; + Map globalVariables = stageRunContext.getGlobalVariables(); + Object fromError = globalVariables.get(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR); + if (!Boolean.TRUE.equals(fromError)) { + // 注册taskContext,且发送消息至消息队列给work执行, 如果需要则传入参数 + taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>()); + contextManager.contextRegister(taskRunContext); + } else { + BaseRunContext context = contextManager.getContext(pipTask.getId()); + if (context != null) { + taskRunContext= (TaskRunContext) context; + } else { + // 防止运行停止以后添加节点,删除节点的同时也要删除上下文 + taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>()); + contextManager.contextRegister(taskRunContext); + } + } taskRunContext.changeContextState(ContextStateEnum.READY); TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask); redisMQTemplate.send(taskRunMessage); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java index b2621c23..1c5f0910 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java @@ -27,6 +27,7 @@ public enum ContextStateEnum { TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP)); //...初始化其他状态转移关系 TRANSITIONS.put(SKIP_TO,Collections.emptySet()); +// TRANSITIONS.put(HAPPY_ENDING,Set.of(RUNNING)); } ContextStateEnum(Integer code, String msg) { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/PipelineExecutor.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/PipelineExecutor.java index 2ca6e10a..892b7e97 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/PipelineExecutor.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/PipelineExecutor.java @@ -1,7 +1,8 @@ package cd.casic.ci.process.engine.executor; import cd.casic.ci.process.engine.runContext.PipelineRunContext; +import cd.casic.ci.process.enums.PiplineTriggerModeEnum; public interface PipelineExecutor { - PipelineRunContext execute(String pipelineId); + PipelineRunContext execute(String pipelineId, PiplineTriggerModeEnum triggerModeEnum,Boolean formError); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java index 9e6e59c9..ebfe20df 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/executor/impl/DefaultPipelineExecutor.java @@ -1,9 +1,13 @@ package cd.casic.ci.process.engine.executor.impl; +import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant; import cd.casic.ci.process.engine.dispatcher.impl.ParallelDispatcher; import cd.casic.ci.process.engine.executor.PipelineExecutor; +import cd.casic.ci.process.engine.manager.LoggerManager; import cd.casic.ci.process.engine.manager.RunContextManager; +import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.PipelineRunContext; +import cd.casic.ci.process.enums.PiplineTriggerModeEnum; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.stage.PipStage; import cd.casic.ci.process.process.service.pipeline.PipelineService; @@ -33,8 +37,10 @@ public class DefaultPipelineExecutor implements PipelineExecutor { private ThreadPoolTaskExecutor serialExecutor; @Resource private RedisMQTemplate redisMQTemplate; + @Resource + private LoggerManager loggerManager; @Override - public PipelineRunContext execute(String pipelineId) { + public PipelineRunContext execute(String pipelineId, PiplineTriggerModeEnum triggerModeEnum,Boolean formError) { PipPipeline pipeline = pipelineService.getById(pipelineId); if (pipeline==null) { return null; @@ -53,8 +59,24 @@ public class DefaultPipelineExecutor implements PipelineExecutor { for (PipStage stage : mainStage) { childCount+=stage.getStageList().size(); } - // 如果要做 容灾就需要重新将数据库存的记录按顺序加载入 - PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline,childCount,null,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); + PipelineRunContext pipelineRunContext = null; + if (!formError) { + // 从头执行清理缓存日志 + loggerManager.clear(pipelineId); + ConcurrentHashMap globalVariable = null; + globalVariable = new ConcurrentHashMap<>(); + // 如果要做 容灾就需要重新将数据库存的记录按顺序加载入 + pipelineRunContext=new PipelineRunContext(pipeline,childCount,null,globalVariable,new ConcurrentHashMap<>()); + } else { + try { + pipelineRunContext = (PipelineRunContext)runContextManager.getContext(pipelineId); + } catch (Exception e) { + // 以后可以改成从 数据库重载 + throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到上次的记录"); + } + } + pipelineRunContext.setTriggerMode(triggerModeEnum); + pipelineRunContext.getGlobalVariables().put(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR,formError); runContextManager.contextRegister(pipelineRunContext); ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor); parallelExecutor.execute(parallelDispatcher); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/LoggerManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/LoggerManager.java index f03ae802..90d660e3 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/LoggerManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/LoggerManager.java @@ -1,5 +1,6 @@ package cd.casic.ci.process.engine.manager; +import cd.casic.ci.process.engine.runContext.TaskRunContext; import jakarta.servlet.http.HttpServletRequest; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; @@ -13,5 +14,6 @@ public interface LoggerManager { SseEmitter subscribe(String taskId,HttpServletRequest request); void append(String taskId,String logContent); String getLogContent(String taskId); - void flushMemory(List taskIdList); + void flushMemory(List taskContextList); + void clear(String pipelineId); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java index b0b8710f..6bec42be 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/RunContextManager.java @@ -28,4 +28,6 @@ public interface RunContextManager { * */ void contextRegister(BaseRunContext context); BaseRunContext getContext(String key); + + public void toHistory(String pipelineId); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java index 9db52659..39b9362b 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java @@ -1,5 +1,6 @@ package cd.casic.ci.process.engine.manager.impl; +import cd.casic.ci.process.dal.req.pipeline.PipelineQueryReq; import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.engine.manager.LoggerManager; import cd.casic.ci.process.engine.manager.RunContextManager; @@ -7,27 +8,47 @@ import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.PipelineRunContext; import cd.casic.ci.process.engine.runContext.SecondStageRunContext; import cd.casic.ci.process.engine.runContext.TaskRunContext; +import cd.casic.ci.process.enums.PiplineTriggerModeEnum; +import cd.casic.ci.process.process.dal.history.PipPipelineHisInstanceDao; import cd.casic.ci.process.process.dal.pipeline.PipTaskDao; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance; +import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.task.PipTask; +import cd.casic.ci.process.process.service.pipeline.PipelineService; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; +import cd.casic.framework.commons.util.util.WebFrameworkUtils; +import cd.casic.framework.security.core.LoginUser; +import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContext; +import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.Collection; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; @Component +@Slf4j public class DefaultRunContextManager implements RunContextManager { private final Map contextMap= new ConcurrentHashMap(); @Resource private LoggerManager loggerManager; @Resource private PipTaskDao taskDao; + @Resource + private PipPipelineHisInstanceDao hisInstanceDao; + @Resource + private PipelineService pipelineService; @Override public Boolean stopPipeline(String pipelineId) { return null; @@ -65,13 +86,26 @@ public class DefaultRunContextManager implements RunContextManager { if (context instanceof PipelineRunContext pipelineRunContext) { if (contextMap.containsKey(id)) { PipelineRunContext oldPipeline = contextMap.get(id); - oldPipeline.changeContextStateAndChild(ContextStateEnum.BAD_ENDING); - LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); - List taskList = taskDao.selectList(wrapper); - List taskIdList = taskList.stream().map(PipTask::getId).toList(); - // 清空上一次的日志 - loggerManager.flushMemory(taskIdList); + if (!ContextStateEnum.HAPPY_ENDING.getCode().equals(oldPipeline.getState().get()) + &&!ContextStateEnum.BAD_ENDING.getCode().equals(oldPipeline.getState().get())) { + throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"有未完成执行的流水线请稍后再试"); + } } + pipelineRunContext.setLoggerManager(loggerManager); + pipelineRunContext.setHisInstanceDao(hisInstanceDao); + SecurityContext securityContext = SecurityContextHolder.getContext(); + if (securityContext !=null) { + Authentication authentication = securityContext.getAuthentication(); + if (authentication!=null) { + if (authentication.getPrincipal() instanceof LoginUser user) { + pipelineRunContext.setUserId(String.valueOf(user.getId())); + pipelineRunContext.setNickName(user.getInfo().get("nickname")); + } + } + + } + + pipelineRunContext.setContextManager(this); contextMap.put(id,pipelineRunContext); } else { if (parentContext==null) { @@ -112,5 +146,43 @@ public class DefaultRunContextManager implements RunContextManager { } pipelineRunContext.changeContextStateAndChild(stateEnum); } + @Transactional(rollbackFor = Exception.class) + public void toHistory(String pipelineId){ + log.info("========================开始入库"); + BaseRunContext context = getContext(pipelineId); + PipBaseElement contextDef = context.getContextDef(); + if (context instanceof PipelineRunContext pipelineRunContext) { + if (contextDef instanceof PipPipeline pipeline){ + Collection values = pipelineRunContext.getChildContext().values(); + List taskContextList = new LinkedList<>(); + for (BaseRunContext value : values) { + Collection childContext = value.getChildContext().values(); + List list = childContext.stream().map(it -> (TaskRunContext) it).toList(); + taskContextList.addAll(list); + } + loggerManager.flushMemory(taskContextList); + PipPipelineHisInstance pipPipelineHisInstance = new PipPipelineHisInstance(); + pipPipelineHisInstance.setPipelineId(pipeline.getId()); + pipPipelineHisInstance.setPipelineName(pipeline.getName()); + AtomicInteger state = pipelineRunContext.getState(); + pipPipelineHisInstance.setState(String.valueOf(state.get())); + pipPipelineHisInstance.setStateName(ContextStateEnum.getByCode(state.get()).getMsg()); + pipPipelineHisInstance.setStartTime(pipelineRunContext.getStartTime()); + pipPipelineHisInstance.setExecutorUserId(pipelineRunContext.getUserId()); + pipPipelineHisInstance.setExecutorUserName(pipelineRunContext.getNickName()); + pipPipelineHisInstance.setEndTime(pipelineRunContext.getEndTime()); + PiplineTriggerModeEnum triggerMode = pipelineRunContext.getTriggerMode(); + pipPipelineHisInstance.setTriggerMode(triggerMode.getCode()); + pipPipelineHisInstance.setContextTree(JSON.toJSONString(pipelineRunContext)); + PipelineQueryReq pipelineQueryReq = new PipelineQueryReq(); + pipelineQueryReq.setId(pipelineId); + pipPipelineHisInstance.setDefTree(JSON.toJSONString(pipelineService.findPipelineById(pipelineQueryReq))); + pipPipelineHisInstance.setStateTree(JSON.toJSONString(pipelineService.getPipelineRunState(pipelineId))); + pipPipelineHisInstance.setTargetVersionName(""); + pipPipelineHisInstance.setTargetVersionId(pipeline.getTargetVersionId()); + hisInstanceDao.insert(pipPipelineHisInstance); + } + } + } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java index f2d52adf..cacaa1ec 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java @@ -1,8 +1,14 @@ package cd.casic.ci.process.engine.manager.impl; +import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; import cd.casic.ci.process.engine.manager.LoggerManager; +import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.process.dal.pipeline.PipTaskLogDao; import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.dataObject.task.PipTask; +import cd.casic.ci.process.process.service.task.TaskService; +import cd.casic.ci.process.util.snowflake.SnowflakeIdWorker; +import cd.casic.ci.process.util.snowflake.SnowflakeIdentifierGenerator; import cd.casic.framework.commons.util.network.IpUtil; import cd.casic.framework.commons.util.util.WebFrameworkUtils; import jakarta.annotation.Resource; @@ -31,6 +37,10 @@ public class MemoryLogManager implements LoggerManager { private final Map> taskIdSSEMap = new ConcurrentHashMap<>(); private final Map taskIdMemoryLogMap = new ConcurrentHashMap<>(); + @Resource + private SnowflakeIdentifierGenerator identifierGenerator; + @Resource + private TaskService taskService; public final Integer FLUSH_DB_SIZE=2*1024*1024; // public final Integer FLUSH_DB_SIZE=1000; @@ -107,41 +117,43 @@ public class MemoryLogManager implements LoggerManager { } } } - public void flushMemory(List taskIdList){ + public void flushMemory(List taskContextList){ List insertList = new ArrayList<>(); List updateList = new ArrayList<>(); - for (String taskId : taskIdList) { + for (TaskRunContext taskRunContext : taskContextList) { + String taskId = taskRunContext.getContextDef().getId(); StringBuffer logCache = taskIdMemoryLogMap.get(taskId); - if (logCache!=null) { - if (taskIdDbMap.containsKey(taskId)) { - // 之前已经入库过 - // 存在则更新 - String id = taskIdDbMap.get(taskId); - PipTaskLog pipTaskLog = logDao.selectById(id); - // TODO 之后优化 - pipTaskLog.setContent(pipTaskLog.getContent()+logCache.toString()); - updateList.add(pipTaskLog); - taskIdDbMap.remove(taskId); - taskIdMemoryLogMap.remove(taskId); - List sseEmitters = taskIdSSEMap.get(taskId); - if (!CollectionUtils.isEmpty(sseEmitters)) { - sseEmitters.forEach(ResponseBodyEmitter::complete); - } -// logDao.updateById(pipTaskLog); - } else { - // 不存在就新增 - PipTaskLog pipTaskLog = new PipTaskLog(); - pipTaskLog.setTaskId(taskId); + if (taskIdDbMap.containsKey(taskId)) { + // 之前已经入库过 + // 存在则更新 + String id = taskIdDbMap.get(taskId); + PipTaskLog pipTaskLog = logDao.selectById(id); + if (logCache!=null) { + pipTaskLog.setContent(pipTaskLog.getContent()+logCache); + } + updateList.add(pipTaskLog); + taskIdMemoryLogMap.remove(taskId); + taskRunContext.getLocalVariables().put(EngineRuntimeConstant.HIS_LOG_KEY,pipTaskLog.getId()); + List sseEmitters = taskIdSSEMap.get(taskId); + if (!CollectionUtils.isEmpty(sseEmitters)) { + sseEmitters.forEach(ResponseBodyEmitter::complete); + } +// logDao.updateById(pipTaskLog); + } else { + // 不存在就新增 + PipTaskLog pipTaskLog = new PipTaskLog(); + pipTaskLog.setTaskId(taskId); + if (logCache!=null) { pipTaskLog.setContent(logCache.toString()); -// logDao.insert(pipTaskLog); -// taskIdDbMap.put(taskId,pipTaskLog.getId()); - insertList.add(pipTaskLog); - taskIdDbMap.remove(taskId); - taskIdMemoryLogMap.remove(taskId); - List sseEmitters = taskIdSSEMap.get(taskId); - if (!CollectionUtils.isEmpty(sseEmitters)) { - sseEmitters.forEach(ResponseBodyEmitter::complete); - } + } + pipTaskLog.setId(identifierGenerator.nextUUID(null)); + insertList.add(pipTaskLog); + taskRunContext.getLocalVariables().put(EngineRuntimeConstant.HIS_LOG_KEY,pipTaskLog.getId()); + taskIdMemoryLogMap.remove(taskId); + taskIdDbMap.put(taskId,pipTaskLog.getId()); + List sseEmitters = taskIdSSEMap.get(taskId); + if (!CollectionUtils.isEmpty(sseEmitters)) { + sseEmitters.forEach(ResponseBodyEmitter::complete); } } } @@ -164,6 +176,12 @@ public class MemoryLogManager implements LoggerManager { } return logCache.toString(); } + public void clear(String pipelineId){ + PipTask query = new PipTask(); + query.setPipelineId(pipelineId); + List taskList = taskService.getTask(query); + taskList.forEach(it->taskIdDbMap.remove(it.getId())); + } private void emitterInit(SseEmitter emitter,String taskId){ // 维持心跳 diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java index c94ddea8..d8633ebf 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java @@ -4,6 +4,7 @@ import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; +import com.alibaba.fastjson.annotation.JSONField; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -22,39 +23,41 @@ public abstract class BaseRunContext { /** * 当前上下文的定义 * */ - private PipBaseElement contextDef; + protected PipBaseElement contextDef; @Transient @JsonIgnore - private BaseRunContext parentContext; - private Integer childCount; + @JSONField(serialize = false) + protected BaseRunContext parentContext; + protected Integer childCount; /** * 运行状态 * */ - private final AtomicInteger state; + protected final AtomicInteger state; /** * 启动时间 * */ - private LocalDateTime startTime; + protected LocalDateTime startTime; /** * 结束时间 * */ - private LocalDateTime endTime; - private String resourceId; - private String targetVersionId; - private String targetType; + protected LocalDateTime endTime; + protected String resourceId; + protected String targetVersionId; + protected String targetType; /** * 整个流水线全局的变量 * */ - private Map globalVariables; + protected final Map globalVariables; /** * 当前上下文局部变量 * */ - private Map localVariables; - private Map childContext; + protected final Map localVariables; + protected Map childContext; /** * 用来在控制其他地方的阻塞放行,countDown为1 * */ - private CountDownLatch countDownLatch; + @JsonIgnore + protected CountDownLatch countDownLatch; public BaseRunContext(PipBaseElement contextDef,Integer childCount,BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetVersionId, String targetType, Map globalVariables, Map localVariables, Map childContext) { this.contextDef = contextDef; @@ -77,9 +80,10 @@ public abstract class BaseRunContext { ||ContextStateEnum.BAD_ENDING.equals(stateEnum) ||ContextStateEnum.SKIP_TO.equals(stateEnum)) { this.endTime=LocalDateTime.now(); - } - if(this instanceof PipelineRunContext){ - log.info("debug专用"); + if(this instanceof PipelineRunContext pipelineRunContext){ + // 流水线执行结束 进行入库 + + } } callParentChange(stateEnum); } else { @@ -193,7 +197,8 @@ public abstract class BaseRunContext { if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.RUNNING)) { this.changeContextState(ContextStateEnum.RUNNING); } else{ - throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); + log.error("非正常状态扭转{}->{}",ContextStateEnum.getByCode(state.get()),ContextStateEnum.RUNNING); +// throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误"); } } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java index a02d9c81..5dbd9d1a 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/PipelineRunContext.java @@ -1,18 +1,45 @@ package cd.casic.ci.process.engine.runContext; import cd.casic.ci.process.engine.enums.ContextStateEnum; +import cd.casic.ci.process.engine.manager.LoggerManager; +import cd.casic.ci.process.engine.manager.RunContextManager; +import cd.casic.ci.process.enums.PiplineTriggerModeEnum; +import cd.casic.ci.process.process.dal.history.PipPipelineHisInstanceDao; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; +import cd.casic.framework.commons.util.util.WebFrameworkUtils; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.annotation.JSONField; +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - +@Setter +@Getter +@Slf4j public class PipelineRunContext extends BaseRunContext{ + @JsonIgnore + @JSONField(serialize = false) + private PipPipelineHisInstanceDao hisInstanceDao; + @JsonIgnore + @JSONField(serialize = false) + private LoggerManager loggerManager; + private PiplineTriggerModeEnum triggerMode; + private String userId; + private String nickName; + @JSONField(serialize = false) + private RunContextManager contextManager; - public PipelineRunContext(PipPipeline pipeline,Integer childCount) { + public PipelineRunContext(PipPipeline pipeline, Integer childCount) { this(pipeline,childCount,null,new ConcurrentHashMap<>(),new ConcurrentHashMap<>()); } @@ -60,4 +87,22 @@ public class PipelineRunContext extends BaseRunContext{ throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"不支持类型"); } } + @Override + public void changeContextState(ContextStateEnum stateEnum){ + ContextStateEnum curr = ContextStateEnum.getByCode(getState().get()); + if (ContextStateEnum.canGoto(curr,stateEnum)) { + state.compareAndExchange(curr.getCode(),stateEnum.getCode()); + if (ContextStateEnum.HAPPY_ENDING.equals(stateEnum) + ||ContextStateEnum.BAD_ENDING.equals(stateEnum) + ||ContextStateEnum.SKIP_TO.equals(stateEnum)) { + this.endTime=LocalDateTime.now(); + // 入库保存 + contextManager.toHistory(getContextDef().getId()); + } + callParentChange(stateEnum); + } else { + log.error("非法状态扭转直接忽略"); + } + } + } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java index ddc2368a..abd30829 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/job/PipelineSchedulingJob.java @@ -1,6 +1,7 @@ package cd.casic.ci.process.engine.scheduler.job; import cd.casic.ci.process.engine.executor.PipelineExecutor; +import cd.casic.ci.process.enums.PiplineTriggerModeEnum; import jakarta.annotation.Resource; import org.joda.time.LocalDateTime; import org.quartz.Job; @@ -27,7 +28,7 @@ public class PipelineSchedulingJob implements Job { String pipelineId = dataMap.getString(JOB_PARAM_PIPELINE_ID); if (pipelineId != null && executor != null) { - executor.execute(pipelineId); + executor.execute(pipelineId, PiplineTriggerModeEnum.TIMING,null); System.out.println("定时任务开始执行,当前执行时间为" + new LocalDateTime()); } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ApplicationWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ApplicationWorker.java index fad8241c..0c272a7e 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ApplicationWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ApplicationWorker.java @@ -58,7 +58,6 @@ public class ApplicationWorker extends HttpWorker { public void execute(TaskRunContext context) { int statusCode = 0; Map localVariables = context.getLocalVariables(); - PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); PipBaseElement contextDef = context.getContextDef(); append(context,"SCA-应用包审查分析节点开始执行"); @@ -105,7 +104,6 @@ public class ApplicationWorker extends HttpWorker { } } localVariables.put("statusCode", statusCode + ""); - localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); } private void handleUpload(Map applicationConfigInfo, File file, BaseRunContext context) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/CodingWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/CodingWorker.java index a54ea8b5..72a467b4 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/CodingWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/CodingWorker.java @@ -49,7 +49,6 @@ public class CodingWorker extends HttpWorker { public void execute(TaskRunContext context) { int statusCode = 0; Map localVariables = context.getLocalVariables(); - PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); PipBaseElement contextDef = context.getContextDef(); append(context,"SCA-代码仓库管理节点开始执行"); @@ -132,7 +131,6 @@ public class CodingWorker extends HttpWorker { } localVariables.put("statusCode", statusCode + ""); - localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/DIYImageExecuteCommandWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/DIYImageExecuteCommandWorker.java index c32adf25..a88f0476 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/DIYImageExecuteCommandWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/DIYImageExecuteCommandWorker.java @@ -29,7 +29,6 @@ public class DIYImageExecuteCommandWorker extends SshWorker { public void execute(TaskRunContext context) { int statusCode = -1; Map localVariables = context.getLocalVariables(); - PipTaskLog taskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); if (context.getContextDef() instanceof PipTask taskDef) { log.info(taskDef.getTaskName()); Map taskProperties = taskDef.getTaskProperties(); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java index 60d04c19..4b55c689 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java @@ -58,7 +58,6 @@ public class ScaSbomWorker extends HttpWorker { public void execute(TaskRunContext context) { int statusCode = 0; Map localVariables = context.getLocalVariables(); - PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); PipBaseElement contextDef = context.getContextDef(); append(context,"SCA-SBOM节点开始执行"); @@ -107,7 +106,6 @@ public class ScaSbomWorker extends HttpWorker { } localVariables.put("statusCode", statusCode + ""); - localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); } private void handleUpload(Map scaSbomConfigInfo, File file, BaseRunContext context) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java index aca23f05..131bee17 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java @@ -61,13 +61,20 @@ public abstract class BaseWorker implements Runnable{ if (context instanceof TaskRunContext taskRunContext){ try { taskRunContext.changeContextState(ContextStateEnum.READY); - if (this instanceof PassableWorker passableWorker) { - taskRunContext.changeContextState(ContextStateEnum.SUSPEND); - passableWorker.waitForPermission(); - } else { - taskRunContext.changeContextState(ContextStateEnum.RUNNING); + Map globalVariables = context.getGlobalVariables(); + Object fromError = globalVariables.get(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR); + if (Boolean.TRUE.equals(fromError)&&ContextStateEnum.HAPPY_ENDING.getCode().equals(taskRunContext.getState().get())) { + append(taskRunContext,"跳过执行"+CommandConstant.ENTER); + return; + } else{ + if (this instanceof PassableWorker passableWorker) { + taskRunContext.changeContextState(ContextStateEnum.SUSPEND); + passableWorker.waitForPermission(); + } else { + taskRunContext.changeContextState(ContextStateEnum.RUNNING); + } + execute(taskRunContext); } - execute(taskRunContext); } catch (Exception e) { log.error("================worker执行报错:",e); // todo 根据配置决定失败是跳过还是直接失败,如果直接跳过,状态改为跳过,如果直接失败状态就改为失败 diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dal/history/PipPipelineHisInstanceDao.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dal/history/PipPipelineHisInstanceDao.java new file mode 100644 index 00000000..68111fbb --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dal/history/PipPipelineHisInstanceDao.java @@ -0,0 +1,7 @@ +package cd.casic.ci.process.process.dal.history; + +import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance; +import cd.casic.framework.mybatis.core.mapper.BaseMapperX; + +public interface PipPipelineHisInstanceDao extends BaseMapperX { +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/history/PipPipelineHisInstance.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/history/PipPipelineHisInstance.java new file mode 100644 index 00000000..60d57b9b --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/history/PipPipelineHisInstance.java @@ -0,0 +1,33 @@ +package cd.casic.ci.process.process.dataObject.history; + +import cd.casic.framework.commons.dataobject.BaseDO; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.time.LocalDateTime; + +@EqualsAndHashCode(callSuper = true) +@Data +@TableName("pip_pipeline_his_instance") +public class PipPipelineHisInstance extends BaseDO { + @TableId(type = IdType.ASSIGN_ID) + private String id; + private String pipelineId; + private String contextTree; + private String stateTree; + private String defTree; + private String executorUserId; + private String executorUserName; + private String triggerMode; + private LocalDateTime startTime; + private LocalDateTime endTime; + private String state; + private String stateName; + private String pipelineName; + private String targetVersionId; + private String targetVersionName; + private String description; +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/PipelineHistoryService.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/PipelineHistoryService.java new file mode 100644 index 00000000..d47bec4c --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/PipelineHistoryService.java @@ -0,0 +1,11 @@ +package cd.casic.ci.process.process.service.history; + +import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance; +import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; +import com.baomidou.mybatisplus.extension.service.IService; + +import java.util.List; + +public interface PipelineHistoryService extends IService { + List getListByPipelineId(String pipelineId); +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/impl/PipelineHistoryServiceImpl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/impl/PipelineHistoryServiceImpl.java new file mode 100644 index 00000000..19eb0845 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/history/impl/PipelineHistoryServiceImpl.java @@ -0,0 +1,27 @@ +package cd.casic.ci.process.process.service.history.impl; + +import cd.casic.ci.process.process.dal.history.PipPipelineHisInstanceDao; +import cd.casic.ci.process.process.dal.pipeline.PipelineDao; +import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance; +import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; +import cd.casic.ci.process.process.service.history.PipelineHistoryService; +import cd.casic.ci.process.process.service.pipeline.PipelineService; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import jakarta.annotation.Resource; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +public class PipelineHistoryServiceImpl extends ServiceImpl implements PipelineHistoryService { + @Resource + private PipPipelineHisInstanceDao pipelineHisInstanceDao; + @Override + public List getListByPipelineId(String pipelineId) { + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + wrapper.eq(PipPipelineHisInstance::getPipelineId,pipelineId); + return pipelineHisInstanceDao.selectList(wrapper); + + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/TaskLogService.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/TaskLogService.java new file mode 100644 index 00000000..16a3b051 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/TaskLogService.java @@ -0,0 +1,8 @@ +package cd.casic.ci.process.process.service.taskLog; + +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.dataObject.task.PipTask; +import com.baomidou.mybatisplus.extension.service.IService; + +public interface TaskLogService extends IService { +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/impl/TaskLogServiceImpl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/impl/TaskLogServiceImpl.java new file mode 100644 index 00000000..6c7cf3c5 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/impl/TaskLogServiceImpl.java @@ -0,0 +1,11 @@ +package cd.casic.ci.process.process.service.taskLog.impl; + +import cd.casic.ci.process.process.dal.pipeline.PipTaskLogDao; +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.service.taskLog.TaskLogService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.springframework.stereotype.Service; + +@Service +public class TaskLogServiceImpl extends ServiceImpl implements TaskLogService { +} diff --git a/ops-server/src/main/java/cd/casic/server/OpsServerApplication.java b/ops-server/src/main/java/cd/casic/server/OpsServerApplication.java index 006a842d..34d5091f 100644 --- a/ops-server/src/main/java/cd/casic/server/OpsServerApplication.java +++ b/ops-server/src/main/java/cd/casic/server/OpsServerApplication.java @@ -5,6 +5,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScans; +import org.springframework.security.core.context.SecurityContextHolder; /** * 项目的启动类 @@ -17,7 +18,6 @@ import org.springframework.context.annotation.ComponentScans; @MapperScan(basePackages = {"cd.casic.**.dal","cd.casic.**.dao"}) @SpringBootApplication(scanBasePackages = {"${ops.info.base-package}.server", "${ops.info.base-package}.module"}) public class OpsServerApplication { - public static void main(String[] args) { SpringApplication.run(OpsServerApplication.class, args); } diff --git a/ops-server/src/test/java/cd/casic/server/PipelineExecuteTest.java b/ops-server/src/test/java/cd/casic/server/PipelineExecuteTest.java index fbd10884..d36e4a03 100644 --- a/ops-server/src/test/java/cd/casic/server/PipelineExecuteTest.java +++ b/ops-server/src/test/java/cd/casic/server/PipelineExecuteTest.java @@ -1,6 +1,7 @@ package cd.casic.server; import cd.casic.ci.process.engine.executor.PipelineExecutor; +import cd.casic.ci.process.enums.PiplineTriggerModeEnum; import cd.casic.ci.process.process.service.pipeline.PipelineService; import jakarta.annotation.Resource; import org.junit.jupiter.api.Test; @@ -17,7 +18,7 @@ public class PipelineExecuteTest { // 执行pipeline @Test public void executePipeline(){ - pipelineExecutor.execute("716299522803896320"); + pipelineExecutor.execute("716299522803896320", PiplineTriggerModeEnum.HAND); } // 获取pipeline执行状态 @Test @@ -27,11 +28,18 @@ public class PipelineExecuteTest { @Test public void taskSkipExecute(){ // 这个流水线包含了故意报错的worker,可以验证跳过效果 - pipelineExecutor.execute("718104543308681216"); + pipelineExecutor.execute("718104543308681216",PiplineTriggerModeEnum.HAND); } @Test public void taskSkipGetState(){ // 这个流水线包含了故意报错的worker,可以验证跳过效果 pipelineService.getPipelineRunState("718104543308681216"); } + @Test + public void fromError(){ + // 这个流水线包含了故意报错的worker,可以验证跳过效果 + // 传false 就是从头执行,传false就是跳过正确的节点从错误处执行 + pipelineExecutor.execute("718104543308681216",PiplineTriggerModeEnum.HAND,false); + pipelineExecutor.execute("718104543308681216",PiplineTriggerModeEnum.HAND,true); + } }