diff --git a/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/TestController.java b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/TestController.java new file mode 100644 index 00000000..f4f35055 --- /dev/null +++ b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/TestController.java @@ -0,0 +1,19 @@ +package cd.casic.ci.api; + +import cd.casic.ci.process.util.CryptogramUtil; +import cd.casic.ci.process.util.SftpUploadUtil; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController("/test") +public class TestController { + @PostMapping("/upload") + public void uploadTest(){ + + try { + SftpUploadUtil.uploadFileViaSftp("175.6.27.228",22,"hnidc", CryptogramUtil.doDecrypt("cb2ee50ff663312808773f1698b801d2f9d6073f9684473e090767edbc2dba93"),null,"/ops/ops-pro/ops-server.jar","/home/casic/706/ai_test_527","ops-server.jar"); + } catch (SftpUploadUtil.SftpUploadException e) { + throw new RuntimeException(e); + } + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/constant/PipelineBehaviorConstant.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/constant/PipelineBehaviorConstant.java new file mode 100644 index 00000000..c4a81d78 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/constant/PipelineBehaviorConstant.java @@ -0,0 +1,14 @@ +package cd.casic.ci.process.engine.constant; +/** + * 用于存放系统变量 + * */ +public class PipelineBehaviorConstant { + /** + * 发生错误是否跳过,存放于taskProperties + * */ + public static final String TASK_SKIP_KEY = "taskSkip"; + /** + * 流水线是否从上次错误处执行 + * */ + public static final String PIPELINE_EXECUTE_FROM_ERROR=""; +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java index 7f20abeb..1c0250e9 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/ParallelDispatcher.java @@ -53,16 +53,12 @@ public class ParallelDispatcher implements BaseDispatcher{ } // 等待当前阶段执行 latch.await(); + // TODO 检查是否全部执行成功 ,目前没有逻辑就是忽略错误 + // 当前执行失败 if (pipelineRunContext.getState().get()== ContextStateEnum.BAD_ENDING.getCode()) { log.error("并行执行停止"); break; } - // TODO 检查是否全部执行成功 ,目前没有逻辑就是忽略错误 - // 当前执行失败 -// while (pipelineRunContext.getState().get() != ContextStateEnum.RUNNING.getCode()) { -// // 想办法借助工具类 或者直接wait -// pipelineRunContext.pause(); -// } } } @Override diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java index 996dc436..a2e49f14 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/dispatcher/impl/SerialDispatcher.java @@ -53,6 +53,7 @@ public class SerialDispatcher implements BaseDispatcher { // 如果不为正常执行成功就暂时阻塞直到有状态更改 while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode() && state.get() != ContextStateEnum.BAD_ENDING.getCode() + && state.get() != ContextStateEnum.SKIP_TO.getCode() ) { // Thread.sleep(1000L); taskRunContext.pause(); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java index b37c2852..b2621c23 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/enums/ContextStateEnum.java @@ -12,7 +12,8 @@ public enum ContextStateEnum { SUSPEND(3,"挂起"), STOP(-1,"停止"), HAPPY_ENDING(4,"执行成功"), - BAD_ENDING(5,"执行失败") + BAD_ENDING(5,"执行失败"), + SKIP_TO(6,"跳过") ; private Integer code; @@ -22,9 +23,10 @@ public enum ContextStateEnum { static { TRANSITIONS.put(INIT, Set.of(READY, SUSPEND, BAD_ENDING, STOP)); TRANSITIONS.put(READY, Set.of(READY,RUNNING, SUSPEND, BAD_ENDING, STOP)); - TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP)); + TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP,SKIP_TO)); TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP)); //...初始化其他状态转移关系 + TRANSITIONS.put(SKIP_TO,Collections.emptySet()); } ContextStateEnum(Integer code, String msg) { diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java index 8c18f510..9db52659 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/DefaultRunContextManager.java @@ -12,6 +12,7 @@ import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import jakarta.annotation.Resource; import org.springframework.stereotype.Component; @@ -65,7 +66,8 @@ public class DefaultRunContextManager implements RunContextManager { if (contextMap.containsKey(id)) { PipelineRunContext oldPipeline = contextMap.get(id); oldPipeline.changeContextStateAndChild(ContextStateEnum.BAD_ENDING); - List taskList = taskDao.selectList("pipelineId", id); + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + List taskList = taskDao.selectList(wrapper); List taskIdList = taskList.stream().map(PipTask::getId).toList(); // 清空上一次的日志 loggerManager.flushMemory(taskIdList); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java index 3aa552b9..c94ddea8 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/runContext/BaseRunContext.java @@ -30,7 +30,7 @@ public abstract class BaseRunContext { /** * 运行状态 * */ - private AtomicInteger state; + private final AtomicInteger state; /** * 启动时间 * */ @@ -73,9 +73,17 @@ public abstract class BaseRunContext { ContextStateEnum curr = ContextStateEnum.getByCode(state.get()); if (ContextStateEnum.canGoto(curr,stateEnum)) { state.compareAndExchange(curr.getCode(),stateEnum.getCode()); - // 如果之前有暂停监听状态的.则停止暂停 -// unpause(); + if (ContextStateEnum.HAPPY_ENDING.equals(stateEnum) + ||ContextStateEnum.BAD_ENDING.equals(stateEnum) + ||ContextStateEnum.SKIP_TO.equals(stateEnum)) { + this.endTime=LocalDateTime.now(); + } + if(this instanceof PipelineRunContext){ + log.info("debug专用"); + } callParentChange(stateEnum); + } else { + log.error("非法状态扭转直接忽略"); } } public void changeContextStateAndChild(ContextStateEnum stateEnum){ @@ -90,10 +98,6 @@ public abstract class BaseRunContext { } } } - // 保证一直都操作同一个引用的值 - private void setState(AtomicInteger state) { - this.state = state; - } /** * 获取当前或者子上下文 @@ -104,10 +108,11 @@ public abstract class BaseRunContext { if (parentContext==null) { return; } - if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { - this.endTime=LocalDateTime.now(); - unpause(); + if (ContextStateEnum.HAPPY_ENDING.equals(state) + ||ContextStateEnum.BAD_ENDING.equals(state) + ||ContextStateEnum.SKIP_TO.equals(state)) { parentContext.checkChildEnd(); + unpause(); } else if(ContextStateEnum.READY.equals(state)){ parentContext.checkChildReady(); } else if(ContextStateEnum.RUNNING.equals(state)){ @@ -127,10 +132,15 @@ public abstract class BaseRunContext { for (Map.Entry entry : childContext.entrySet()) { BaseRunContext child = entry.getValue(); int state = child.getState().get(); - if (!ContextStateEnum.HAPPY_ENDING.getCode().equals(state)&&!ContextStateEnum.BAD_ENDING.getCode().equals(state)) { + if (!ContextStateEnum.HAPPY_ENDING.getCode().equals(state) + &&!ContextStateEnum.BAD_ENDING.getCode().equals(state) + &&!ContextStateEnum.SKIP_TO.getCode().equals(state)) { return; } - result&=state; + // 子成员中有一个失败则状态为失败 + if (ContextStateEnum.BAD_ENDING.getCode().equals(state)) { + result=state; + } } boolean end = false; if (ContextStateEnum.HAPPY_ENDING.getCode()==result) { @@ -208,4 +218,22 @@ public abstract class BaseRunContext { } } } + + @Override + public String toString() { + return "BaseRunContext{" + + "contextDef=" + contextDef + + ", childCount=" + childCount + + ", state=" + state + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", resourceId='" + resourceId + '\'' + + ", targetVersionId='" + targetVersionId + '\'' + + ", targetType='" + targetType + '\'' + + ", globalVariables=" + globalVariables + + ", localVariables=" + localVariables + + ", childContext=" + childContext + + ", countDownLatch=" + countDownLatch + + '}'; + } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TargetHandleWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TargetHandleWorker.java index a81c2577..6020087f 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TargetHandleWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TargetHandleWorker.java @@ -6,6 +6,7 @@ import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.engine.worker.base.BaseWorker; import cd.casic.ci.process.process.service.machine.MachineInfoService; import cd.casic.ci.process.process.service.target.TargetVersionService; +import cd.casic.ci.process.util.SftpUploadUtil; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestErrorWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestErrorWorker.java new file mode 100644 index 00000000..cab21128 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestErrorWorker.java @@ -0,0 +1,31 @@ +package cd.casic.ci.process.engine.worker; + + +import cd.casic.ci.process.common.WorkAtom; +import cd.casic.ci.process.engine.runContext.TaskRunContext; +import cd.casic.ci.process.engine.worker.base.BaseWorker; +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.task.PipTask; +import cd.casic.framework.commons.exception.ServiceException; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + + +@Slf4j +@WorkAtom(taskType = "ERROR_SUCCESS") +public class TestErrorWorker extends BaseWorker { + @Override + public void execute(TaskRunContext context) { + if (context.getContextDef() instanceof PipTask task) { + Map taskProperties = task.getTaskProperties(); + String s = String.valueOf(taskProperties.get("buildScript")); + if ("error".equals(s)) { + append(context,"执行报错"); + throw new ServiceException(111,"模拟测试报错"); + } + append(context,"执行成功"); + } + } +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java index 0d630482..aca23f05 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java @@ -3,6 +3,7 @@ package cd.casic.ci.process.engine.worker.base; import cd.casic.ci.process.constant.CommandConstant; import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; +import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant; import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.engine.manager.LoggerManager; import cd.casic.ci.process.engine.manager.RunContextManager; @@ -13,6 +14,7 @@ import cd.casic.ci.process.enums.MachineSystemEnum; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.log.PipTaskLog; import cd.casic.ci.process.process.dataObject.machine.MachineInfo; +import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.ci.process.process.service.machine.MachineInfoService; import cd.casic.ci.process.ssh.SshClient; @@ -29,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import javax.swing.text.StringContent; import java.util.Arrays; import java.util.List; +import java.util.Map; @Data @@ -67,8 +70,20 @@ public abstract class BaseWorker implements Runnable{ execute(taskRunContext); } catch (Exception e) { log.error("================worker执行报错:",e); - taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING); - append(context,e.getMessage()); + // todo 根据配置决定失败是跳过还是直接失败,如果直接跳过,状态改为跳过,如果直接失败状态就改为失败 + PipBaseElement contextDef = taskRunContext.getContextDef(); + if (contextDef instanceof PipTask task) { + Map taskProperties = task.getTaskProperties(); + Object taskSkip = taskProperties.get(PipelineBehaviorConstant.TASK_SKIP_KEY); + if (Boolean.TRUE.equals(taskSkip)) { + taskRunContext.changeContextState(ContextStateEnum.SKIP_TO); + append(context,e.getMessage()); + } else{ + taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING); + append(context,e.getMessage()); + } + } + return; } // TODO 执行结束修改context的state,并且通知父类 diff --git a/ops-server/src/test/java/cd/casic/server/PipelineExecuteTest.java b/ops-server/src/test/java/cd/casic/server/PipelineExecuteTest.java index 3f12b34e..fbd10884 100644 --- a/ops-server/src/test/java/cd/casic/server/PipelineExecuteTest.java +++ b/ops-server/src/test/java/cd/casic/server/PipelineExecuteTest.java @@ -24,4 +24,14 @@ public class PipelineExecuteTest { public void getRunState(){ pipelineService.getPipelineRunState("716299522803896320"); } + @Test + public void taskSkipExecute(){ + // 这个流水线包含了故意报错的worker,可以验证跳过效果 + pipelineExecutor.execute("718104543308681216"); + } + @Test + public void taskSkipGetState(){ + // 这个流水线包含了故意报错的worker,可以验证跳过效果 + pipelineService.getPipelineRunState("718104543308681216"); + } }