1.流水线从错误处开始执行&从头开始执行,运行中流程重复发起运行报错(初步测试通过)

2.流水线执行历史查询与入库方法(初步测试通过)
3.context获取用户获取不到改为使用springsecurity的ContextHolder
This commit is contained in:
even 2025-06-05 15:27:24 +08:00
parent 8acb7b04e1
commit 020c4a0a6d
33 changed files with 476 additions and 91 deletions

View File

@ -194,6 +194,27 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path>
<path>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${mapstruct.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -20,6 +20,7 @@ import org.springframework.web.context.request.ServletRequestAttributes;
public class WebFrameworkUtils {
private static final String REQUEST_ATTRIBUTE_LOGIN_USER_ID = "login_user_id";
private static final String REQUEST_ATTRIBUTE_LOGIN_USER_NAME = "login_user_name";
private static final String REQUEST_ATTRIBUTE_LOGIN_USER_TYPE = "login_user_type";
private static final String REQUEST_ATTRIBUTE_COMMON_RESULT = "common_result";
@ -38,7 +39,6 @@ public class WebFrameworkUtils {
public WebFrameworkUtils(WebProperties webProperties) {
WebFrameworkUtils.properties = webProperties;
}
/**
* 获得租户编号 header
* 考虑到其它 framework 组件也会使用到租户编号所以不得不放在 WebFrameworkUtils 统一提供
@ -54,6 +54,18 @@ public class WebFrameworkUtils {
public static void setLoginUserId(ServletRequest request, Long userId) {
request.setAttribute(REQUEST_ATTRIBUTE_LOGIN_USER_ID, userId);
}
public static void setLoginNickName(ServletRequest request, String userName) {
request.setAttribute(REQUEST_ATTRIBUTE_LOGIN_USER_ID, userName);
}
public static String getLoginNickName() {
return getLoginNickName(getRequest());
}
public static String getLoginNickName(HttpServletRequest request) {
if (request == null) {
return null;
}
return (String) request.getAttribute(REQUEST_ATTRIBUTE_LOGIN_USER_NAME);
}
/**
* 设置用户类型

View File

@ -10,7 +10,7 @@ import org.mapstruct.factory.Mappers;
*
* @author mianbin modified from yudao
*/
@Mapper
@Mapper(componentModel = "spring")
public interface TenantConvert {
TenantConvert INSTANCE = Mappers.getMapper(TenantConvert.class);

View File

@ -127,6 +127,7 @@ public class SecurityFrameworkUtils {
// 原因是Spring Security Filter ApiAccessLogFilter 后面在它记录访问日志时线上上下文已经没有用户编号等信息
WebFrameworkUtils.setLoginUserId(request, loginUser.getId());
WebFrameworkUtils.setLoginUserType(request, loginUser.getUserType());
WebFrameworkUtils.setLoginNickName(request,loginUser.getInfo().get("nickName"));
}
private static Authentication buildAuthentication(LoginUser loginUser, HttpServletRequest request) {

View File

@ -0,0 +1,32 @@
package cd.casic.ci.api;
import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.service.history.PipelineHistoryService;
import cd.casic.ci.process.process.service.taskLog.TaskLogService;
import cd.casic.framework.commons.pojo.CommonResult;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/history")
public class PipHistoryController {
@Resource
TaskLogService taskLogService;
@Resource
PipelineHistoryService pipelineHistoryService;
@GetMapping("/getLogById")
public CommonResult<PipTaskLog> getLogById(String id){
PipTaskLog byId = taskLogService.getById(id);
return CommonResult.success(byId);
}
@GetMapping("/list")
public CommonResult<List<PipPipelineHisInstance>> list(String pipelineId){
List<PipPipelineHisInstance> list = pipelineHistoryService.list();
return CommonResult.success(list);
}
}

View File

@ -9,13 +9,19 @@ import cd.casic.ci.process.dal.resp.context.TreeRunContextResp;
import cd.casic.ci.process.dal.resp.pipeline.PipelineFindResp;
import cd.casic.ci.process.engine.executor.PipelineExecutor;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.enums.PiplineTriggerModeEnum;
import cd.casic.ci.process.process.service.pipeline.PipelineService;
import cd.casic.framework.commons.pojo.CommonResult;
import cd.casic.framework.commons.pojo.PageResult;
import cd.casic.framework.commons.util.util.WebFrameworkUtils;
import cd.casic.framework.security.core.LoginUser;
import jakarta.annotation.Resource;
import jakarta.annotation.security.PermitAll;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.validation.Valid;
import org.jetbrains.annotations.NotNull;
import org.springframework.boot.actuate.autoconfigure.metrics.MetricsProperties;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@ -99,9 +105,11 @@ public class PipelineController {
return CommonResult.success();
}
@Resource
private HttpServletRequest request;
@PostMapping("/executePipeline/{pipelineId}")
public CommonResult<PipelineRunContext> executePipeline(@PathVariable String pipelineId){
PipelineRunContext execute = pipelineExecutor.execute(pipelineId);
public CommonResult<PipelineRunContext> executePipeline(@PathVariable("pipelineId") String pipelineId,@RequestParam(required = false,defaultValue = "false") Boolean fromError){
PipelineRunContext execute = pipelineExecutor.execute(pipelineId, PiplineTriggerModeEnum.HAND,fromError);
return CommonResult.success(execute);
}
@PostMapping("/getStageRunState/{pipelineId}")

View File

@ -1,5 +1,5 @@
package cd.casic.ci.process.engine.constant;
public class EngineRuntimeConstant {
public static final String LOG_KEY = "logContent";
public static final String HIS_LOG_KEY = "logId";
}

View File

@ -10,5 +10,5 @@ public class PipelineBehaviorConstant {
/**
* 流水线是否从上次错误处执行
* */
public static final String PIPELINE_EXECUTE_FROM_ERROR="";
public static final String PIPELINE_EXECUTE_FROM_ERROR="fromError";
}

View File

@ -1,17 +1,22 @@
package cd.casic.ci.process.engine.dispatcher.impl;
import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant;
import cd.casic.ci.process.engine.dispatcher.BaseDispatcher;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@Slf4j
@ -44,9 +49,24 @@ public class ParallelDispatcher implements BaseDispatcher{
}
CountDownLatch latch = new CountDownLatch(stageList.size());
for (PipStage secondStage : stageList) {
Map<String, Object> globalVariables = pipelineRunContext.getGlobalVariables();
Object fromError = globalVariables.get(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR);
SecondStageRunContext context = null;
if (!Boolean.TRUE.equals(fromError)) {
// 注册taskContext且发送消息至消息队列给work执行, 如果需要则传入参数
// 二阶段下所有task是串行所以不用关心线程安全相关信息
SecondStageRunContext context = new SecondStageRunContext(secondStage,secondStage.getTaskValues().size(),pipelineRunContext,new ConcurrentHashMap<>());
context = new SecondStageRunContext(secondStage,secondStage.getTaskValues().size(),pipelineRunContext,new ConcurrentHashMap<>());
runContextManager.contextRegister(context);
} else {
BaseRunContext baseRunContext = runContextManager.getContext(secondStage.getId());
if (baseRunContext!=null) {
context = (SecondStageRunContext) baseRunContext;
} else {
// 防止运行停止以后添加节点 ,删除节点的同时也要删除上下文
context = new SecondStageRunContext(secondStage,secondStage.getTaskValues().size(),pipelineRunContext,new ConcurrentHashMap<>());
runContextManager.contextRegister(context);
}
}
SerialDispatcher serialDispatcher = new SerialDispatcher(context,latch,runContextManager,redisMQTemplate);
// 给线程池进行执行
taskExecutor.execute(serialDispatcher);

View File

@ -1,9 +1,11 @@
package cd.casic.ci.process.engine.dispatcher.impl;
import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant;
import cd.casic.ci.process.engine.dispatcher.BaseDispatcher;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.message.TaskRunMessage;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
@ -15,6 +17,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@ -41,9 +44,23 @@ public class SerialDispatcher implements BaseDispatcher {
@Override
public void dispatch() throws InterruptedException {
for (PipTask pipTask : taskList) {
TaskRunContext taskRunContext = null;
Map<String, Object> globalVariables = stageRunContext.getGlobalVariables();
Object fromError = globalVariables.get(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR);
if (!Boolean.TRUE.equals(fromError)) {
// 注册taskContext且发送消息至消息队列给work执行, 如果需要则传入参数
TaskRunContext taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>());
taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>());
contextManager.contextRegister(taskRunContext);
} else {
BaseRunContext context = contextManager.getContext(pipTask.getId());
if (context != null) {
taskRunContext= (TaskRunContext) context;
} else {
// 防止运行停止以后添加节点,删除节点的同时也要删除上下文
taskRunContext = new TaskRunContext(pipTask,stageRunContext,new HashMap<>());
contextManager.contextRegister(taskRunContext);
}
}
taskRunContext.changeContextState(ContextStateEnum.READY);
TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask);
redisMQTemplate.send(taskRunMessage);

View File

@ -27,6 +27,7 @@ public enum ContextStateEnum {
TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP));
//...初始化其他状态转移关系
TRANSITIONS.put(SKIP_TO,Collections.emptySet());
// TRANSITIONS.put(HAPPY_ENDING,Set.of(RUNNING));
}
ContextStateEnum(Integer code, String msg) {

View File

@ -1,7 +1,8 @@
package cd.casic.ci.process.engine.executor;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.enums.PiplineTriggerModeEnum;
public interface PipelineExecutor {
PipelineRunContext execute(String pipelineId);
PipelineRunContext execute(String pipelineId, PiplineTriggerModeEnum triggerModeEnum,Boolean formError);
}

View File

@ -1,9 +1,13 @@
package cd.casic.ci.process.engine.executor.impl;
import cd.casic.ci.process.engine.constant.PipelineBehaviorConstant;
import cd.casic.ci.process.engine.dispatcher.impl.ParallelDispatcher;
import cd.casic.ci.process.engine.executor.PipelineExecutor;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.enums.PiplineTriggerModeEnum;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.service.pipeline.PipelineService;
@ -33,8 +37,10 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
private ThreadPoolTaskExecutor serialExecutor;
@Resource
private RedisMQTemplate redisMQTemplate;
@Resource
private LoggerManager loggerManager;
@Override
public PipelineRunContext execute(String pipelineId) {
public PipelineRunContext execute(String pipelineId, PiplineTriggerModeEnum triggerModeEnum,Boolean formError) {
PipPipeline pipeline = pipelineService.getById(pipelineId);
if (pipeline==null) {
return null;
@ -53,8 +59,24 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
for (PipStage stage : mainStage) {
childCount+=stage.getStageList().size();
}
PipelineRunContext pipelineRunContext = null;
if (!formError) {
// 从头执行清理缓存日志
loggerManager.clear(pipelineId);
ConcurrentHashMap<String, Object> globalVariable = null;
globalVariable = new ConcurrentHashMap<>();
// 如果要做 容灾就需要重新将数据库存的记录按顺序加载入
PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline,childCount,null,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
pipelineRunContext=new PipelineRunContext(pipeline,childCount,null,globalVariable,new ConcurrentHashMap<>());
} else {
try {
pipelineRunContext = (PipelineRunContext)runContextManager.getContext(pipelineId);
} catch (Exception e) {
// 以后可以改成从 数据库重载
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"未找到上次的记录");
}
}
pipelineRunContext.setTriggerMode(triggerModeEnum);
pipelineRunContext.getGlobalVariables().put(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR,formError);
runContextManager.contextRegister(pipelineRunContext);
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor);
parallelExecutor.execute(parallelDispatcher);

View File

@ -1,5 +1,6 @@
package cd.casic.ci.process.engine.manager;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import jakarta.servlet.http.HttpServletRequest;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@ -13,5 +14,6 @@ public interface LoggerManager {
SseEmitter subscribe(String taskId,HttpServletRequest request);
void append(String taskId,String logContent);
String getLogContent(String taskId);
void flushMemory(List<String> taskIdList);
void flushMemory(List<TaskRunContext> taskContextList);
void clear(String pipelineId);
}

View File

@ -28,4 +28,6 @@ public interface RunContextManager {
* */
void contextRegister(BaseRunContext context);
BaseRunContext getContext(String key);
public void toHistory(String pipelineId);
}

View File

@ -1,5 +1,6 @@
package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.process.dal.req.pipeline.PipelineQueryReq;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.engine.manager.RunContextManager;
@ -7,27 +8,47 @@ import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.enums.PiplineTriggerModeEnum;
import cd.casic.ci.process.process.dal.history.PipPipelineHisInstanceDao;
import cd.casic.ci.process.process.dal.pipeline.PipTaskDao;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.ci.process.process.service.pipeline.PipelineService;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import cd.casic.framework.commons.util.util.WebFrameworkUtils;
import cd.casic.framework.security.core.LoginUser;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@Slf4j
public class DefaultRunContextManager implements RunContextManager {
private final Map<String,PipelineRunContext> contextMap= new ConcurrentHashMap();
@Resource
private LoggerManager loggerManager;
@Resource
private PipTaskDao taskDao;
@Resource
private PipPipelineHisInstanceDao hisInstanceDao;
@Resource
private PipelineService pipelineService;
@Override
public Boolean stopPipeline(String pipelineId) {
return null;
@ -65,13 +86,26 @@ public class DefaultRunContextManager implements RunContextManager {
if (context instanceof PipelineRunContext pipelineRunContext) {
if (contextMap.containsKey(id)) {
PipelineRunContext oldPipeline = contextMap.get(id);
oldPipeline.changeContextStateAndChild(ContextStateEnum.BAD_ENDING);
LambdaQueryWrapper<PipTask> wrapper = new LambdaQueryWrapper<>();
List<PipTask> taskList = taskDao.selectList(wrapper);
List<String> taskIdList = taskList.stream().map(PipTask::getId).toList();
// 清空上一次的日志
loggerManager.flushMemory(taskIdList);
if (!ContextStateEnum.HAPPY_ENDING.getCode().equals(oldPipeline.getState().get())
&&!ContextStateEnum.BAD_ENDING.getCode().equals(oldPipeline.getState().get())) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"有未完成执行的流水线请稍后再试");
}
}
pipelineRunContext.setLoggerManager(loggerManager);
pipelineRunContext.setHisInstanceDao(hisInstanceDao);
SecurityContext securityContext = SecurityContextHolder.getContext();
if (securityContext !=null) {
Authentication authentication = securityContext.getAuthentication();
if (authentication!=null) {
if (authentication.getPrincipal() instanceof LoginUser user) {
pipelineRunContext.setUserId(String.valueOf(user.getId()));
pipelineRunContext.setNickName(user.getInfo().get("nickname"));
}
}
}
pipelineRunContext.setContextManager(this);
contextMap.put(id,pipelineRunContext);
} else {
if (parentContext==null) {
@ -112,5 +146,43 @@ public class DefaultRunContextManager implements RunContextManager {
}
pipelineRunContext.changeContextStateAndChild(stateEnum);
}
@Transactional(rollbackFor = Exception.class)
public void toHistory(String pipelineId){
log.info("========================开始入库");
BaseRunContext context = getContext(pipelineId);
PipBaseElement contextDef = context.getContextDef();
if (context instanceof PipelineRunContext pipelineRunContext) {
if (contextDef instanceof PipPipeline pipeline){
Collection<BaseRunContext> values = pipelineRunContext.getChildContext().values();
List<TaskRunContext> taskContextList = new LinkedList<>();
for (BaseRunContext value : values) {
Collection<BaseRunContext> childContext = value.getChildContext().values();
List<TaskRunContext> list = childContext.stream().map(it -> (TaskRunContext) it).toList();
taskContextList.addAll(list);
}
loggerManager.flushMemory(taskContextList);
PipPipelineHisInstance pipPipelineHisInstance = new PipPipelineHisInstance();
pipPipelineHisInstance.setPipelineId(pipeline.getId());
pipPipelineHisInstance.setPipelineName(pipeline.getName());
AtomicInteger state = pipelineRunContext.getState();
pipPipelineHisInstance.setState(String.valueOf(state.get()));
pipPipelineHisInstance.setStateName(ContextStateEnum.getByCode(state.get()).getMsg());
pipPipelineHisInstance.setStartTime(pipelineRunContext.getStartTime());
pipPipelineHisInstance.setExecutorUserId(pipelineRunContext.getUserId());
pipPipelineHisInstance.setExecutorUserName(pipelineRunContext.getNickName());
pipPipelineHisInstance.setEndTime(pipelineRunContext.getEndTime());
PiplineTriggerModeEnum triggerMode = pipelineRunContext.getTriggerMode();
pipPipelineHisInstance.setTriggerMode(triggerMode.getCode());
pipPipelineHisInstance.setContextTree(JSON.toJSONString(pipelineRunContext));
PipelineQueryReq pipelineQueryReq = new PipelineQueryReq();
pipelineQueryReq.setId(pipelineId);
pipPipelineHisInstance.setDefTree(JSON.toJSONString(pipelineService.findPipelineById(pipelineQueryReq)));
pipPipelineHisInstance.setStateTree(JSON.toJSONString(pipelineService.getPipelineRunState(pipelineId)));
pipPipelineHisInstance.setTargetVersionName("");
pipPipelineHisInstance.setTargetVersionId(pipeline.getTargetVersionId());
hisInstanceDao.insert(pipPipelineHisInstance);
}
}
}
}

View File

@ -1,8 +1,14 @@
package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dal.pipeline.PipTaskLogDao;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.ci.process.process.service.task.TaskService;
import cd.casic.ci.process.util.snowflake.SnowflakeIdWorker;
import cd.casic.ci.process.util.snowflake.SnowflakeIdentifierGenerator;
import cd.casic.framework.commons.util.network.IpUtil;
import cd.casic.framework.commons.util.util.WebFrameworkUtils;
import jakarta.annotation.Resource;
@ -31,6 +37,10 @@ public class MemoryLogManager implements LoggerManager {
private final Map<String, List<SseEmitter>> taskIdSSEMap = new ConcurrentHashMap<>();
private final Map<String,StringBuffer> taskIdMemoryLogMap = new ConcurrentHashMap<>();
@Resource
private SnowflakeIdentifierGenerator identifierGenerator;
@Resource
private TaskService taskService;
public final Integer FLUSH_DB_SIZE=2*1024*1024;
// public final Integer FLUSH_DB_SIZE=1000;
@ -107,22 +117,23 @@ public class MemoryLogManager implements LoggerManager {
}
}
}
public void flushMemory(List<String> taskIdList){
public void flushMemory(List<TaskRunContext> taskContextList){
List<PipTaskLog> insertList = new ArrayList<>();
List<PipTaskLog> updateList = new ArrayList<>();
for (String taskId : taskIdList) {
for (TaskRunContext taskRunContext : taskContextList) {
String taskId = taskRunContext.getContextDef().getId();
StringBuffer logCache = taskIdMemoryLogMap.get(taskId);
if (logCache!=null) {
if (taskIdDbMap.containsKey(taskId)) {
// 之前已经入库过
// 存在则更新
String id = taskIdDbMap.get(taskId);
PipTaskLog pipTaskLog = logDao.selectById(id);
// TODO 之后优化
pipTaskLog.setContent(pipTaskLog.getContent()+logCache.toString());
if (logCache!=null) {
pipTaskLog.setContent(pipTaskLog.getContent()+logCache);
}
updateList.add(pipTaskLog);
taskIdDbMap.remove(taskId);
taskIdMemoryLogMap.remove(taskId);
taskRunContext.getLocalVariables().put(EngineRuntimeConstant.HIS_LOG_KEY,pipTaskLog.getId());
List<SseEmitter> sseEmitters = taskIdSSEMap.get(taskId);
if (!CollectionUtils.isEmpty(sseEmitters)) {
sseEmitters.forEach(ResponseBodyEmitter::complete);
@ -132,19 +143,20 @@ public class MemoryLogManager implements LoggerManager {
// 不存在就新增
PipTaskLog pipTaskLog = new PipTaskLog();
pipTaskLog.setTaskId(taskId);
if (logCache!=null) {
pipTaskLog.setContent(logCache.toString());
// logDao.insert(pipTaskLog);
// taskIdDbMap.put(taskId,pipTaskLog.getId());
}
pipTaskLog.setId(identifierGenerator.nextUUID(null));
insertList.add(pipTaskLog);
taskIdDbMap.remove(taskId);
taskRunContext.getLocalVariables().put(EngineRuntimeConstant.HIS_LOG_KEY,pipTaskLog.getId());
taskIdMemoryLogMap.remove(taskId);
taskIdDbMap.put(taskId,pipTaskLog.getId());
List<SseEmitter> sseEmitters = taskIdSSEMap.get(taskId);
if (!CollectionUtils.isEmpty(sseEmitters)) {
sseEmitters.forEach(ResponseBodyEmitter::complete);
}
}
}
}
if (!CollectionUtils.isEmpty(insertList)) {
logDao.insertBatch(insertList);
}
@ -164,6 +176,12 @@ public class MemoryLogManager implements LoggerManager {
}
return logCache.toString();
}
public void clear(String pipelineId){
PipTask query = new PipTask();
query.setPipelineId(pipelineId);
List<PipTask> taskList = taskService.getTask(query);
taskList.forEach(it->taskIdDbMap.remove(it.getId()));
}
private void emitterInit(SseEmitter emitter,String taskId){
// 维持心跳

View File

@ -4,6 +4,7 @@ import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import com.alibaba.fastjson.annotation.JSONField;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@ -22,39 +23,41 @@ public abstract class BaseRunContext {
/**
* 当前上下文的定义
* */
private PipBaseElement contextDef;
protected PipBaseElement contextDef;
@Transient
@JsonIgnore
private BaseRunContext parentContext;
private Integer childCount;
@JSONField(serialize = false)
protected BaseRunContext parentContext;
protected Integer childCount;
/**
* 运行状态
* */
private final AtomicInteger state;
protected final AtomicInteger state;
/**
* 启动时间
* */
private LocalDateTime startTime;
protected LocalDateTime startTime;
/**
* 结束时间
* */
private LocalDateTime endTime;
private String resourceId;
private String targetVersionId;
private String targetType;
protected LocalDateTime endTime;
protected String resourceId;
protected String targetVersionId;
protected String targetType;
/**
* 整个流水线全局的变量
* */
private Map<String,Object> globalVariables;
protected final Map<String,Object> globalVariables;
/**
* 当前上下文局部变量
* */
private Map<String,Object> localVariables;
private Map<String,BaseRunContext> childContext;
protected final Map<String,Object> localVariables;
protected Map<String,BaseRunContext> childContext;
/**
* 用来在控制其他地方的阻塞放行,countDown为1
* */
private CountDownLatch countDownLatch;
@JsonIgnore
protected CountDownLatch countDownLatch;
public BaseRunContext(PipBaseElement contextDef,Integer childCount,BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetVersionId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
this.contextDef = contextDef;
@ -77,9 +80,10 @@ public abstract class BaseRunContext {
||ContextStateEnum.BAD_ENDING.equals(stateEnum)
||ContextStateEnum.SKIP_TO.equals(stateEnum)) {
this.endTime=LocalDateTime.now();
if(this instanceof PipelineRunContext pipelineRunContext){
// 流水线执行结束 进行入库
}
if(this instanceof PipelineRunContext){
log.info("debug专用");
}
callParentChange(stateEnum);
} else {
@ -193,7 +197,8 @@ public abstract class BaseRunContext {
if (ContextStateEnum.canGoto(ContextStateEnum.getByCode(state.get()),ContextStateEnum.RUNNING)) {
this.changeContextState(ContextStateEnum.RUNNING);
} else{
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
log.error("非正常状态扭转{}->{}",ContextStateEnum.getByCode(state.get()),ContextStateEnum.RUNNING);
// throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"状态有误");
}
}
}

View File

@ -1,18 +1,45 @@
package cd.casic.ci.process.engine.runContext;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.enums.PiplineTriggerModeEnum;
import cd.casic.ci.process.process.dal.history.PipPipelineHisInstanceDao;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import cd.casic.framework.commons.util.util.WebFrameworkUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.annotation.JSONField;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Setter
@Getter
@Slf4j
public class PipelineRunContext extends BaseRunContext{
@JsonIgnore
@JSONField(serialize = false)
private PipPipelineHisInstanceDao hisInstanceDao;
@JsonIgnore
@JSONField(serialize = false)
private LoggerManager loggerManager;
private PiplineTriggerModeEnum triggerMode;
private String userId;
private String nickName;
@JSONField(serialize = false)
private RunContextManager contextManager;
public PipelineRunContext(PipPipeline pipeline,Integer childCount) {
public PipelineRunContext(PipPipeline pipeline, Integer childCount) {
this(pipeline,childCount,null,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
}
@ -60,4 +87,22 @@ public class PipelineRunContext extends BaseRunContext{
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"不支持类型");
}
}
@Override
public void changeContextState(ContextStateEnum stateEnum){
ContextStateEnum curr = ContextStateEnum.getByCode(getState().get());
if (ContextStateEnum.canGoto(curr,stateEnum)) {
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
if (ContextStateEnum.HAPPY_ENDING.equals(stateEnum)
||ContextStateEnum.BAD_ENDING.equals(stateEnum)
||ContextStateEnum.SKIP_TO.equals(stateEnum)) {
this.endTime=LocalDateTime.now();
// 入库保存
contextManager.toHistory(getContextDef().getId());
}
callParentChange(stateEnum);
} else {
log.error("非法状态扭转直接忽略");
}
}
}

View File

@ -1,6 +1,7 @@
package cd.casic.ci.process.engine.scheduler.job;
import cd.casic.ci.process.engine.executor.PipelineExecutor;
import cd.casic.ci.process.enums.PiplineTriggerModeEnum;
import jakarta.annotation.Resource;
import org.joda.time.LocalDateTime;
import org.quartz.Job;
@ -27,7 +28,7 @@ public class PipelineSchedulingJob implements Job {
String pipelineId = dataMap.getString(JOB_PARAM_PIPELINE_ID);
if (pipelineId != null && executor != null) {
executor.execute(pipelineId);
executor.execute(pipelineId, PiplineTriggerModeEnum.TIMING,null);
System.out.println("定时任务开始执行,当前执行时间为" + new LocalDateTime());
}
}

View File

@ -58,7 +58,6 @@ public class ApplicationWorker extends HttpWorker {
public void execute(TaskRunContext context) {
int statusCode = 0;
Map<String, Object> localVariables = context.getLocalVariables();
PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY);
PipBaseElement contextDef = context.getContextDef();
append(context,"SCA-应用包审查分析节点开始执行");
@ -105,7 +104,6 @@ public class ApplicationWorker extends HttpWorker {
}
}
localVariables.put("statusCode", statusCode + "");
localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog);
}
private void handleUpload(Map<String,Object> applicationConfigInfo, File file, BaseRunContext context) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {

View File

@ -49,7 +49,6 @@ public class CodingWorker extends HttpWorker {
public void execute(TaskRunContext context) {
int statusCode = 0;
Map<String, Object> localVariables = context.getLocalVariables();
PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY);
PipBaseElement contextDef = context.getContextDef();
append(context,"SCA-代码仓库管理节点开始执行");
@ -132,7 +131,6 @@ public class CodingWorker extends HttpWorker {
}
localVariables.put("statusCode", statusCode + "");
localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog);
}

View File

@ -29,7 +29,6 @@ public class DIYImageExecuteCommandWorker extends SshWorker {
public void execute(TaskRunContext context) {
int statusCode = -1;
Map<String, Object> localVariables = context.getLocalVariables();
PipTaskLog taskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY);
if (context.getContextDef() instanceof PipTask taskDef) {
log.info(taskDef.getTaskName());
Map<String, Object> taskProperties = taskDef.getTaskProperties();

View File

@ -58,7 +58,6 @@ public class ScaSbomWorker extends HttpWorker {
public void execute(TaskRunContext context) {
int statusCode = 0;
Map<String, Object> localVariables = context.getLocalVariables();
PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY);
PipBaseElement contextDef = context.getContextDef();
append(context,"SCA-SBOM节点开始执行");
@ -107,7 +106,6 @@ public class ScaSbomWorker extends HttpWorker {
}
localVariables.put("statusCode", statusCode + "");
localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog);
}
private void handleUpload(Map<String,Object> scaSbomConfigInfo, File file, BaseRunContext context) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {

View File

@ -61,6 +61,12 @@ public abstract class BaseWorker implements Runnable{
if (context instanceof TaskRunContext taskRunContext){
try {
taskRunContext.changeContextState(ContextStateEnum.READY);
Map<String, Object> globalVariables = context.getGlobalVariables();
Object fromError = globalVariables.get(PipelineBehaviorConstant.PIPELINE_EXECUTE_FROM_ERROR);
if (Boolean.TRUE.equals(fromError)&&ContextStateEnum.HAPPY_ENDING.getCode().equals(taskRunContext.getState().get())) {
append(taskRunContext,"跳过执行"+CommandConstant.ENTER);
return;
} else{
if (this instanceof PassableWorker passableWorker) {
taskRunContext.changeContextState(ContextStateEnum.SUSPEND);
passableWorker.waitForPermission();
@ -68,6 +74,7 @@ public abstract class BaseWorker implements Runnable{
taskRunContext.changeContextState(ContextStateEnum.RUNNING);
}
execute(taskRunContext);
}
} catch (Exception e) {
log.error("================worker执行报错",e);
// todo 根据配置决定失败是跳过还是直接失败如果直接跳过状态改为跳过如果直接失败状态就改为失败

View File

@ -0,0 +1,7 @@
package cd.casic.ci.process.process.dal.history;
import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance;
import cd.casic.framework.mybatis.core.mapper.BaseMapperX;
public interface PipPipelineHisInstanceDao extends BaseMapperX<PipPipelineHisInstance> {
}

View File

@ -0,0 +1,33 @@
package cd.casic.ci.process.process.dataObject.history;
import cd.casic.framework.commons.dataobject.BaseDO;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.LocalDateTime;
@EqualsAndHashCode(callSuper = true)
@Data
@TableName("pip_pipeline_his_instance")
public class PipPipelineHisInstance extends BaseDO {
@TableId(type = IdType.ASSIGN_ID)
private String id;
private String pipelineId;
private String contextTree;
private String stateTree;
private String defTree;
private String executorUserId;
private String executorUserName;
private String triggerMode;
private LocalDateTime startTime;
private LocalDateTime endTime;
private String state;
private String stateName;
private String pipelineName;
private String targetVersionId;
private String targetVersionName;
private String description;
}

View File

@ -0,0 +1,11 @@
package cd.casic.ci.process.process.service.history;
import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
public interface PipelineHistoryService extends IService<PipPipelineHisInstance> {
List<PipPipelineHisInstance> getListByPipelineId(String pipelineId);
}

View File

@ -0,0 +1,27 @@
package cd.casic.ci.process.process.service.history.impl;
import cd.casic.ci.process.process.dal.history.PipPipelineHisInstanceDao;
import cd.casic.ci.process.process.dal.pipeline.PipelineDao;
import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.service.history.PipelineHistoryService;
import cd.casic.ci.process.process.service.pipeline.PipelineService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class PipelineHistoryServiceImpl extends ServiceImpl<PipPipelineHisInstanceDao, PipPipelineHisInstance> implements PipelineHistoryService {
@Resource
private PipPipelineHisInstanceDao pipelineHisInstanceDao;
@Override
public List<PipPipelineHisInstance> getListByPipelineId(String pipelineId) {
LambdaQueryWrapper<PipPipelineHisInstance> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(PipPipelineHisInstance::getPipelineId,pipelineId);
return pipelineHisInstanceDao.selectList(wrapper);
}
}

View File

@ -0,0 +1,8 @@
package cd.casic.ci.process.process.service.taskLog;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import com.baomidou.mybatisplus.extension.service.IService;
public interface TaskLogService extends IService<PipTaskLog> {
}

View File

@ -0,0 +1,11 @@
package cd.casic.ci.process.process.service.taskLog.impl;
import cd.casic.ci.process.process.dal.pipeline.PipTaskLogDao;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.service.taskLog.TaskLogService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
@Service
public class TaskLogServiceImpl extends ServiceImpl<PipTaskLogDao, PipTaskLog> implements TaskLogService {
}

View File

@ -5,6 +5,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScans;
import org.springframework.security.core.context.SecurityContextHolder;
/**
* 项目的启动类
@ -17,7 +18,6 @@ import org.springframework.context.annotation.ComponentScans;
@MapperScan(basePackages = {"cd.casic.**.dal","cd.casic.**.dao"})
@SpringBootApplication(scanBasePackages = {"${ops.info.base-package}.server", "${ops.info.base-package}.module"})
public class OpsServerApplication {
public static void main(String[] args) {
SpringApplication.run(OpsServerApplication.class, args);
}

View File

@ -1,6 +1,7 @@
package cd.casic.server;
import cd.casic.ci.process.engine.executor.PipelineExecutor;
import cd.casic.ci.process.enums.PiplineTriggerModeEnum;
import cd.casic.ci.process.process.service.pipeline.PipelineService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
@ -17,7 +18,7 @@ public class PipelineExecuteTest {
// 执行pipeline
@Test
public void executePipeline(){
pipelineExecutor.execute("716299522803896320");
pipelineExecutor.execute("716299522803896320", PiplineTriggerModeEnum.HAND);
}
// 获取pipeline执行状态
@Test
@ -27,11 +28,18 @@ public class PipelineExecuteTest {
@Test
public void taskSkipExecute(){
// 这个流水线包含了故意报错的worker可以验证跳过效果
pipelineExecutor.execute("718104543308681216");
pipelineExecutor.execute("718104543308681216",PiplineTriggerModeEnum.HAND);
}
@Test
public void taskSkipGetState(){
// 这个流水线包含了故意报错的worker可以验证跳过效果
pipelineService.getPipelineRunState("718104543308681216");
}
@Test
public void fromError(){
// 这个流水线包含了故意报错的worker可以验证跳过效果
// 传false 就是从头执行传false就是跳过正确的节点从错误处执行
pipelineExecutor.execute("718104543308681216",PiplineTriggerModeEnum.HAND,false);
pipelineExecutor.execute("718104543308681216",PiplineTriggerModeEnum.HAND,true);
}
}