断点运行相关代码

This commit is contained in:
even 2025-06-06 12:20:46 +08:00
parent 430086cfa8
commit c5ec84681d
14 changed files with 106 additions and 20 deletions

View File

@ -28,4 +28,8 @@ public class PipHistoryController {
public CommonResult<PageResult<PipPipelineHisInstance>> list(@RequestBody PipelineHistoryQueryReq req){
return CommonResult.success(pipelineHistoryService.getPageByPipelineId(req));
}
@PostMapping("/getById/{id}")
public CommonResult<PipPipelineHisInstance> getById(@PathVariable String id){
return CommonResult.success(pipelineHistoryService.getById(id));
}
}

View File

@ -116,4 +116,9 @@ public class PipelineController {
public CommonResult<TreeRunContextResp> getPipelineRunState(@PathVariable String pipelineId){
return CommonResult.success(pipelineService.getPipelineRunState(pipelineId));
}
@PostMapping("/traversePipelineContext/{pipelineId}")
public CommonResult<PipelineRunContext> traversePipelineContext(@PathVariable("pipelineId") String pipelineId,@RequestParam(required = false,defaultValue = "false") Boolean fromError){
pipelineExecutor.traversePipelineContext(pipelineId);
return CommonResult.success();
}
}

View File

@ -20,4 +20,5 @@ public class SingletonRunContextResp {
private LocalDateTime startTime;
private LocalDateTime endTime;
private String duration;
private String logId;
}

View File

@ -61,8 +61,10 @@ public class ParallelDispatcher implements BaseDispatcher{
BaseRunContext baseRunContext = runContextManager.getContext(secondStage.getId());
if (baseRunContext!=null) {
context = (SecondStageRunContext) baseRunContext;
context.setContextDef(secondStage);
context.setChildCount(secondStage.getTaskValues().size());
} else {
// 防止运行停止以后添加节点 ,删除节点的同时也要删除上下文
// 防止运行停止以后添加节点,以及到达之前未执行的节点 ,todo 删除节点的同时也要删除上下文
context = new SecondStageRunContext(secondStage,secondStage.getTaskValues().size(),pipelineRunContext,new ConcurrentHashMap<>());
runContextManager.contextRegister(context);
}

View File

@ -42,7 +42,7 @@ public class SerialDispatcher implements BaseDispatcher {
}
@Override
public void dispatch() throws InterruptedException {
public void dispatch() {
for (PipTask pipTask : taskList) {
TaskRunContext taskRunContext = null;
Map<String, Object> globalVariables = stageRunContext.getGlobalVariables();
@ -55,8 +55,9 @@ public class SerialDispatcher implements BaseDispatcher {
BaseRunContext context = contextManager.getContext(pipTask.getId());
if (context != null) {
taskRunContext= (TaskRunContext) context;
context.setContextDef(pipTask);
} else {
// 防止运行停止以后添加节点,删除节点的同时也要删除上下文
// 防止运行停止以后添加节点,以及到达之前未执行的节点 ,todo 删除节点的同时也要删除上下文
taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>());
contextManager.contextRegister(taskRunContext);
}
@ -64,8 +65,7 @@ public class SerialDispatcher implements BaseDispatcher {
taskRunContext.changeContextState(ContextStateEnum.READY);
TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask);
redisMQTemplate.send(taskRunMessage);
// TODO 监听当前taskContext状态变成执行成功或者执行失败(worker当中改变状态为运行中执行成功执行失败)
//监听当前taskContext状态变成执行成功或者执行失败(worker当中改变状态为运行中执行成功执行失败)
AtomicInteger state = taskRunContext.getState();
// 如果不为正常执行成功就暂时阻塞直到有状态更改
while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode()

View File

@ -13,7 +13,8 @@ public enum ContextStateEnum {
STOP(-1,"停止"),
HAPPY_ENDING(4,"执行成功"),
BAD_ENDING(5,"执行失败"),
SKIP_TO(6,"跳过")
SKIP_TO(6,"跳过"),
ERROR_EXECUTE_READY(7,"从错误处执行准备完毕")
;
private Integer code;
@ -26,7 +27,9 @@ public enum ContextStateEnum {
TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP,SKIP_TO));
TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP));
//...初始化其他状态转移关系
TRANSITIONS.put(SKIP_TO,Collections.emptySet());
TRANSITIONS.put(SKIP_TO,Set.of(HAPPY_ENDING));
TRANSITIONS.put(BAD_ENDING,Set.of(ERROR_EXECUTE_READY));
TRANSITIONS.put(ERROR_EXECUTE_READY,Set.of(ERROR_EXECUTE_READY,READY,RUNNING,SUSPEND,SKIP_TO,BAD_ENDING,HAPPY_ENDING));
// TRANSITIONS.put(HAPPY_ENDING,Set.of(RUNNING));
}

View File

@ -5,4 +5,5 @@ import cd.casic.ci.process.enums.PiplineTriggerModeEnum;
public interface PipelineExecutor {
PipelineRunContext execute(String pipelineId, PiplineTriggerModeEnum triggerModeEnum,Boolean formError);
void traversePipelineContext(String pipelineId);
}

View File

@ -2,11 +2,13 @@ package cd.casic.ci.process.engine.executor.impl;
import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant;
import cd.casic.ci.process.engine.dispatcher.impl.ParallelDispatcher;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.executor.PipelineExecutor;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.enums.PiplineTriggerModeEnum;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
@ -16,14 +18,18 @@ import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Slf4j
public class DefaultPipelineExecutor implements PipelineExecutor {
@Resource
private PipelineService pipelineService;
@ -66,7 +72,8 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
ConcurrentHashMap<String, Object> globalVariable = null;
globalVariable = new ConcurrentHashMap<>();
// 如果要做 容灾就需要重新将数据库存的记录按顺序加载入
pipelineRunContext=new PipelineRunContext(pipeline,childCount,null,globalVariable,new ConcurrentHashMap<>());
pipelineRunContext=new PipelineRunContext(pipeline,childCount,null,globalVariable,new ConcurrentHashMap<>());
runContextManager.contextRegister(pipelineRunContext);
} else {
try {
pipelineRunContext = (PipelineRunContext)runContextManager.getContext(pipelineId);
@ -74,12 +81,44 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
// 以后可以改成从 数据库重载
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到上次的记录");
}
fromErrorInit(pipelineId);
}
pipelineRunContext.setTriggerMode(triggerModeEnum);
pipelineRunContext.getGlobalVariables().put(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR,formError);
runContextManager.contextRegister(pipelineRunContext);
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor);
parallelExecutor.execute(parallelDispatcher);
return pipelineRunContext;
}
public void fromErrorInit(String pipelineId){
List<BaseRunContext> allContextList = new LinkedList<>();
BaseRunContext context = runContextManager.getContext(pipelineId);
if (context ==null) {
return;
}
allContextList.add(context);
Collection<BaseRunContext> stageContextList = context.getChildContext().values();
allContextList.addAll(stageContextList);
for (BaseRunContext baseRunContext : stageContextList) {
Collection<TaskRunContext> taskContextList = baseRunContext.getChildContext().values().stream().map(item->(TaskRunContext)item).toList();
allContextList.addAll(taskContextList);
}
log.info("上下文分组完毕");
for (BaseRunContext taskRunContext : allContextList) {
if (ContextStateEnum.BAD_ENDING.getCode().equals(taskRunContext.getState().get()))
taskRunContext.changeContextState(ContextStateEnum.ERROR_EXECUTE_READY);
}
}
public void traversePipelineContext(String pipelineId){
BaseRunContext context = runContextManager.getContext(pipelineId);
Collection<BaseRunContext> stageContextList = context.getChildContext().values();
List<TaskRunContext> allTaskList = new LinkedList<>();
for (BaseRunContext baseRunContext : stageContextList) {
Collection<TaskRunContext> taskContextList = baseRunContext.getChildContext().values().stream().map(item->(TaskRunContext)item).toList();
allTaskList.addAll(taskContextList);
}
log.info("上下文分组完毕");
}
}

View File

@ -1,6 +1,9 @@
package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.process.dal.req.pipeline.PipelineQueryReq;
import cd.casic.ci.process.dal.resp.context.TreeRunContextResp;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.engine.manager.RunContextManager;
@ -37,6 +40,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@Component
@Slf4j
@ -178,7 +183,16 @@ public class DefaultRunContextManager implements RunContextManager {
PipelineQueryReq pipelineQueryReq = new PipelineQueryReq();
pipelineQueryReq.setId(pipelineId);
pipPipelineHisInstance.setDefTree(JSON.toJSONString(pipelineService.findPipelineById(pipelineQueryReq), SerializerFeature.DisableCircularReferenceDetect));
pipPipelineHisInstance.setStateTree(JSON.toJSONString(pipelineService.getPipelineRunState(pipelineId),SerializerFeature.DisableCircularReferenceDetect));
TreeRunContextResp pipelineRunState = pipelineService.getPipelineRunState(pipelineId);
Map<String, TaskRunContext> collect = taskContextList.stream().collect(Collectors.toMap(item -> item.getContextDef().getId(), Function.identity()));
for (TreeRunContextResp treeRunContextResp : pipelineRunState.getTaskList()) {
if (collect.containsKey(treeRunContextResp.getId())) {
Map<String, Object> localVariables = collect.get(treeRunContextResp.getId()).getLocalVariables();
Object logId = localVariables.get(EngineRuntimeConstant.HIS_LOG_KEY);
treeRunContextResp.setLogId(String.valueOf(logId));
}
}
pipPipelineHisInstance.setStateTree(JSON.toJSONString(pipelineRunState,SerializerFeature.DisableCircularReferenceDetect));
pipPipelineHisInstance.setTargetVersionName("");
pipPipelineHisInstance.setTargetVersionId(pipeline.getTargetVersionId());
hisInstanceDao.insert(pipPipelineHisInstance);

View File

@ -80,6 +80,7 @@ public class DefaultWorkerManager extends WorkerManager {
}
ContextStateEnum byCode = ContextStateEnum.getByCode(context.getState().get());
if (ContextStateEnum.SUSPEND.equals(byCode)) {
// todo 可以添加进内存或者什么别的地方中等待审批以后重新加入队列
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"当前task为挂起状态进入队列重新执行");
}
String taskType = task.getTaskType();

View File

@ -83,7 +83,7 @@ public abstract class BaseRunContext {
}
callParentChange(stateEnum);
} else {
log.error("非法状态扭转直接忽略");
log.error("非法状态扭转直接忽略,{},{}",curr,stateEnum);
}
}
public void changeContextStateAndChild(ContextStateEnum stateEnum){
@ -125,9 +125,7 @@ public abstract class BaseRunContext {
* */
public void checkChildEnd() throws ServiceException{
Map<String, BaseRunContext> childContext = getChildContext();
if (childContext.size()!=childCount) {
return;
}
int result = ContextStateEnum.HAPPY_ENDING.getCode();
for (Map.Entry<String, BaseRunContext> entry : childContext.entrySet()) {
BaseRunContext child = entry.getValue();
@ -144,6 +142,9 @@ public abstract class BaseRunContext {
}
boolean end = false;
if (ContextStateEnum.HAPPY_ENDING.getCode()==result) {
if (childContext.size()!=childCount) {
return;
}
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.HAPPY_ENDING)) {
this.changeContextState(ContextStateEnum.HAPPY_ENDING);
end = true;

View File

@ -101,7 +101,7 @@ public class PipelineRunContext extends BaseRunContext{
}
callParentChange(stateEnum);
} else {
log.error("非法状态扭转直接忽略");
log.error("非法状态扭转直接忽略,{},{}",curr,stateEnum);
}
}

View File

@ -2,13 +2,27 @@ package cd.casic.ci.process.engine.worker;
import cd.casic.ci.process.common.WorkAtom;
import cd.casic.ci.process.dal.resp.resource.ResourceFindResp;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.resource.PipResourceMachine;
import cd.casic.ci.process.process.dataObject.target.TargetVersion;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.ci.process.process.service.machine.MachineInfoService;
import cd.casic.ci.process.process.service.target.TargetVersionService;
import cd.casic.ci.process.util.CryptogramUtil;
import cd.casic.ci.process.util.SftpUploadUtil;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.util.Map;
/**
* 目标处理worker
@ -23,6 +37,7 @@ public class TargetHandleWorker extends BaseWorker {
private MachineInfoService machineInfoService;
@Override
public void execute(TaskRunContext context) {
// 暂无获取文件方式 todo 先注释掉
// String filePath = "";
// Map<String, Object> localVariables = context.getLocalVariables();
// PipBaseElement taskContextDef = context.getContextDef();
@ -35,6 +50,7 @@ public class TargetHandleWorker extends BaseWorker {
// if (StringUtils.isEmpty(pipeline.getTargetVersionId())){
// throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"目标文件不存在");
// }
//
// TargetVersion targetVersion = targetVersionService.getById(pipeline.getTargetVersionId());
// filePath = targetVersion.getFilePath();
// File file = new File(filePath);
@ -45,16 +61,16 @@ public class TargetHandleWorker extends BaseWorker {
// toBadEnding();
// }
// // 上传文件
// String machineId = pipeline.getMachineId();
// MachineInfo byId = machineInfoService.getById(machineId);
// ResourceFindResp resourceById = getResourceManagerService().findResourceById(pipeline.getResourceId());
// PipResourceMachine resourceMachine = resourceById.getResourceMachine();
// append(context,"开始文件上传");
// try {
// SftpUploadUtil.uploadFileViaSftp(byId.getMachineHost(),byId.getSshPort(),byId.getUsername(), CryptogramUtil.doDecrypt(byId.getPassword()),null,file.getAbsolutePath(),"/home/casic/706/ai_test_527",file.getName());
// SftpUploadUtil.uploadFileViaSftp(resourceMachine.getMachineHost(),Integer.valueOf(resourceMachine.getSshPort()),resourceMachine.getUsername(), CryptogramUtil.doDecrypt(resourceMachine.getPassword()),null,file.getAbsolutePath(),"/home/casic/706/ai_test_527",file.getName());
// } catch (SftpUploadUtil.SftpUploadException e) {
// log.error("文件上传失败",e);
// toBadEnding();
// }
// append(context,"文件上传至"+byId.getMachineHost()+" /home/casic/706/ai_test_527");
// append(context,"文件上传至"+resourceMachine.getMachineHost()+"/home/casic/706/ai_test_527");
// }
}
}

View File

@ -68,7 +68,6 @@ public abstract class BaseWorker implements Runnable{
Object fromError = globalVariables.get(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR);
if (Boolean.TRUE.equals(fromError)&&ContextStateEnum.HAPPY_ENDING.getCode().equals(taskRunContext.getState().get())) {
append(taskRunContext,"跳过执行"+CommandConstant.ENTER);
return;
} else{
if (this instanceof PassableWorker passableWorker) {
taskRunContext.changeContextState(ContextStateEnum.SUSPEND);