diff --git a/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java index 84de8693..9c84bd42 100644 --- a/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java +++ b/modules/module-ci-process-api/src/main/java/cd/casic/ci/api/PipHistoryController.java @@ -3,24 +3,28 @@ package cd.casic.ci.api; import cd.casic.ci.process.dto.req.history.PipelineHistoryQueryReq; import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance; import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.dataObject.log.PipTaskLogLine; import cd.casic.ci.process.process.service.history.PipelineHistoryService; +import cd.casic.ci.process.process.service.taskLog.PipTaskLogLineService; import cd.casic.ci.process.process.service.taskLog.TaskLogService; import cd.casic.framework.commons.pojo.CommonResult; import cd.casic.framework.commons.pojo.PageResult; import jakarta.annotation.Resource; import org.springframework.web.bind.annotation.*; +import java.util.List; + @RestController @RequestMapping("/history") public class PipHistoryController { @Resource - TaskLogService taskLogService; + private PipelineHistoryService pipelineHistoryService; @Resource - PipelineHistoryService pipelineHistoryService; + private PipTaskLogLineService logLineService; @GetMapping("/getLogById") - public CommonResult getLogById(@RequestParam String id){ - PipTaskLog byId = taskLogService.getById(id); - return CommonResult.success(byId); + public CommonResult getLogById(@RequestParam String id){ + String content = logLineService.getLineContentByLogId(id, 10000); + return CommonResult.success(content); } @PostMapping("/list") public CommonResult> list(@RequestBody PipelineHistoryQueryReq req){ diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java index b4d0c572..7b3bf4b7 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/manager/impl/MemoryLogManager.java @@ -4,12 +4,16 @@ import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; import cd.casic.ci.process.engine.manager.LoggerManager; import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.process.dao.pipeline.PipTaskLogDao; +import cd.casic.ci.process.process.dao.pipeline.PipTaskLogLineDao; import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.dataObject.log.PipTaskLogLine; import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.ci.process.process.service.task.TaskService; +import cd.casic.ci.process.process.service.taskLog.PipTaskLogLineService; import cd.casic.ci.process.util.snowflake.SnowflakeIdentifierGenerator; import cd.casic.framework.commons.util.network.IpUtil; import cd.casic.ci.process.util.WebFrameworkUtils; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import jakarta.annotation.Resource; import jakarta.servlet.http.HttpServletRequest; import lombok.extern.slf4j.Slf4j; @@ -35,15 +39,8 @@ public class MemoryLogManager implements LoggerManager { * 第一级 是taskId,可能同一个账号、IP、建立多个连接所以保存为list * */ private final Map> taskIdSSEMap = new ConcurrentHashMap<>(); - - private final Map taskIdMemoryLogMap = new ConcurrentHashMap<>(); - @Resource - private SnowflakeIdentifierGenerator identifierGenerator; @Resource private TaskService taskService; - - public final Integer FLUSH_DB_SIZE=2*1024*1024; -// public final Integer FLUSH_DB_SIZE=1000; private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); /** * 缓存最近一次执行的日志,key是taskId,val是数据库id(已入库的情况下)用于buffer满了增加日志内容 @@ -52,6 +49,8 @@ public class MemoryLogManager implements LoggerManager { private final static Map taskIdDbMap = new ConcurrentHashMap<>(); @Resource private PipTaskLogDao logDao; + @Resource + private PipTaskLogLineService logLineService; @Override public SseEmitter subscribe(String taskId,HttpServletRequest request) { if (request!=null) { @@ -77,35 +76,27 @@ public class MemoryLogManager implements LoggerManager { * 内存满4mb入库,查询日志和入库操作用同一把锁 * 然后新内容推sse * */ - StringBuffer logCache = taskIdMemoryLogMap.computeIfAbsent(taskId, k -> new StringBuffer()); - int length = logCache.length(); - if (length>=FLUSH_DB_SIZE) { - synchronized (this){ - if (length>=FLUSH_DB_SIZE) { - StringBuffer stringBuffer = new StringBuffer(); - // 清空缓存 - taskIdMemoryLogMap.put(taskId, stringBuffer); - // 入库或者更新 - if (taskIdDbMap.containsKey(taskId)) { - // 之前已经入库过 - // 存在则更新 - String id = taskIdDbMap.get(taskId); - PipTaskLog pipTaskLog = logDao.selectById(id); - pipTaskLog.setContent(pipTaskLog.getContent()+logCache.toString()); - logDao.updateById(pipTaskLog); - } else { - // 不存在就新增 - PipTaskLog pipTaskLog = new PipTaskLog(); - pipTaskLog.setTaskId(taskId); - pipTaskLog.setContent(logCache.toString()); - logDao.insert(pipTaskLog); - taskIdDbMap.put(taskId,pipTaskLog.getId()); - } - logCache = stringBuffer; - } - } + // 入库或者更新 + if (taskIdDbMap.containsKey(taskId)) { + // 之前已经入库过 + // 存在则更新 + String id = taskIdDbMap.get(taskId); + PipTaskLogLine line = new PipTaskLogLine(); + line.setLogId(id); + line.setLineContent(logContent); + logLineService.save(line); + } else { + // 不存在就新增 + PipTaskLog pipTaskLog = new PipTaskLog(); + pipTaskLog.setTaskId(taskId); +// pipTaskLog.setContent(logCache.toString()); + logDao.insert(pipTaskLog); + taskIdDbMap.put(taskId,pipTaskLog.getId()); + PipTaskLogLine line = new PipTaskLogLine(); + line.setLogId(pipTaskLog.getTaskId()); + line.setLineContent(logContent); + logLineService.save(line); } - logCache.append(logContent); List sseEmitters = taskIdSSEMap.get(taskId); if (!CollectionUtils.isEmpty(sseEmitters)) { for (SseEmitter sseEmitter : sseEmitters) { @@ -120,63 +111,31 @@ public class MemoryLogManager implements LoggerManager { @Transactional(rollbackFor = Exception.class) public void flushMemory(List taskContextList){ log.info("流水线日志开始入库"); - List insertList = new ArrayList<>(); - List updateList = new ArrayList<>(); for (TaskRunContext taskRunContext : taskContextList) { String taskId = taskRunContext.getContextDef().getId(); - StringBuffer logCache = taskIdMemoryLogMap.get(taskId); if (taskIdDbMap.containsKey(taskId)) { - // 之前已经入库过 - // 存在则更新 String id = taskIdDbMap.get(taskId); - PipTaskLog pipTaskLog = logDao.selectById(id); - if (logCache!=null) { - pipTaskLog.setContent(pipTaskLog.getContent()+logCache); - } - updateList.add(pipTaskLog); - taskIdMemoryLogMap.remove(taskId); - taskRunContext.getLocalVariables().put(EngineRuntimeConstant.HIS_LOG_KEY,pipTaskLog.getId()); - List sseEmitters = taskIdSSEMap.get(taskId); - if (!CollectionUtils.isEmpty(sseEmitters)) { - sseEmitters.forEach(ResponseBodyEmitter::complete); - } - logDao.updateById(pipTaskLog); - } else { - // 不存在就新增 - PipTaskLog pipTaskLog = new PipTaskLog(); - pipTaskLog.setTaskId(taskId); - if (logCache!=null) { - pipTaskLog.setContent(logCache.toString()); - } - pipTaskLog.setId(identifierGenerator.nextUUID(null)); - insertList.add(pipTaskLog); - taskRunContext.getLocalVariables().put(EngineRuntimeConstant.HIS_LOG_KEY,pipTaskLog.getId()); - taskIdMemoryLogMap.remove(taskId); - taskIdDbMap.put(taskId,pipTaskLog.getId()); + taskRunContext.getLocalVariables().put(EngineRuntimeConstant.HIS_LOG_KEY,id); List sseEmitters = taskIdSSEMap.get(taskId); if (!CollectionUtils.isEmpty(sseEmitters)) { sseEmitters.forEach(ResponseBodyEmitter::complete); } } } - if (!CollectionUtils.isEmpty(insertList)) { - logDao.insertBatch(insertList); - } - if (!CollectionUtils.isEmpty(updateList)) { - logDao.updateBatch(updateList); - } - } @Override public String getLogContent(String taskId) { - StringBuffer logCache = taskIdMemoryLogMap.getOrDefault(taskId, new StringBuffer()); String id = taskIdDbMap.get(taskId); if (id != null) { - PipTaskLog pipTaskLog = logDao.selectById(id); - return pipTaskLog.getContent()+logCache.toString(); + List lineList = logLineService.getLineListByLogId(id,10000); + StringBuilder stringBuilder = new StringBuilder(); + for (PipTaskLogLine line : lineList) { + stringBuilder.append(line.getLineContent()); + } + return stringBuilder.toString(); } - return logCache.toString(); + return ""; } public void clear(String pipelineId){ PipTask query = new PipTask(); @@ -210,4 +169,6 @@ public class MemoryLogManager implements LoggerManager { log.error("===================超时,taskId:{}断开连接===============",taskId); }); } + + } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/afl/AFLWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/afl/AFLWorker.java index 7f7f5ab8..3eccaff2 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/afl/AFLWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/afl/AFLWorker.java @@ -93,8 +93,8 @@ public class AFLWorker extends DockerWorker { // 获取docker 暂时先写固定值 dockerRun(commandScript,resourceListByType.getDockerEndpointList().get(0),context, runningTime); } catch (Exception e) { - String errorMessage = "该节点配置信息为空,请先配置该节点信息" + "\r\n"; - log.error("执行ssh失败:", e); + String errorMessage = "执行afl失败"+e.getMessage() + "\r\n"; + log.error("执行afl失败", e); append(context, errorMessage); toBadEnding(); } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java index 53cb5661..7c7feea4 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/base/BaseWorker.java @@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import javax.swing.text.StringContent; +import java.io.File; import java.util.Arrays; import java.util.List; import java.util.Map; diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dao/pipeline/PipTaskLogDao.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dao/pipeline/PipTaskLogDao.java index 3d3ce30c..3e57a9f8 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dao/pipeline/PipTaskLogDao.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dao/pipeline/PipTaskLogDao.java @@ -2,6 +2,7 @@ package cd.casic.ci.process.process.dao.pipeline; import cd.casic.ci.process.process.dataObject.log.PipTaskLog; import cd.casic.framework.mybatis.core.mapper.BaseMapperX; +import org.apache.ibatis.annotations.Param; public interface PipTaskLogDao extends BaseMapperX { } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dao/pipeline/PipTaskLogLineDao.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dao/pipeline/PipTaskLogLineDao.java new file mode 100644 index 00000000..0a264da6 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dao/pipeline/PipTaskLogLineDao.java @@ -0,0 +1,8 @@ +package cd.casic.ci.process.process.dao.pipeline; + +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.dataObject.log.PipTaskLogLine; +import cd.casic.framework.mybatis.core.mapper.BaseMapperX; + +public interface PipTaskLogLineDao extends BaseMapperX { +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/log/PipTaskLog.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/log/PipTaskLog.java index 558e1da2..8bd0dea4 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/log/PipTaskLog.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/log/PipTaskLog.java @@ -11,11 +11,6 @@ import lombok.EqualsAndHashCode; @Data public class PipTaskLog extends BaseDO { private String taskId; - private String content; @TableId(type = IdType.ASSIGN_ID) private String id; - public void append(String content){ - this.content += CommandConstant.ENTER; - this.content +=content; - } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/log/PipTaskLogLine.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/log/PipTaskLogLine.java new file mode 100644 index 00000000..81309390 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/dataObject/log/PipTaskLogLine.java @@ -0,0 +1,13 @@ +package cd.casic.ci.process.process.dataObject.log; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import lombok.Data; + +@Data +public class PipTaskLogLine { + @TableId(type = IdType.ASSIGN_ID) + private String id; + private String logId; + private String lineContent; +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/PipTaskLogLineService.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/PipTaskLogLineService.java new file mode 100644 index 00000000..dec646ea --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/PipTaskLogLineService.java @@ -0,0 +1,12 @@ +package cd.casic.ci.process.process.service.taskLog; + +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.dataObject.log.PipTaskLogLine; +import com.baomidou.mybatisplus.extension.service.IService; + +import java.util.List; + +public interface PipTaskLogLineService extends IService { + String getLineContentByLogId(String logId,Integer count); + List getLineListByLogId(String logId, Integer count); +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/impl/PipTaskLogLineServiceImpl.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/impl/PipTaskLogLineServiceImpl.java new file mode 100644 index 00000000..98f1c1c0 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/process/service/taskLog/impl/PipTaskLogLineServiceImpl.java @@ -0,0 +1,39 @@ +package cd.casic.ci.process.process.service.taskLog.impl; + +import cd.casic.ci.process.process.dao.pipeline.PipTaskLogLineDao; +import cd.casic.ci.process.process.dataObject.log.PipTaskLogLine; +import cd.casic.ci.process.process.service.taskLog.PipTaskLogLineService; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.service.IService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import jakarta.annotation.Resource; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.Collections; +import java.util.List; + +@Service +public class PipTaskLogLineServiceImpl extends ServiceImpl implements PipTaskLogLineService { + @Resource + private PipTaskLogLineDao logLineDao; + @Override + public String getLineContentByLogId(String logId,Integer count){ + List lineList = getLineListByLogId(logId, count); + if (CollectionUtils.isEmpty(lineList)) { + return ""; + } + StringBuilder stringBuilder = new StringBuilder(); + for (PipTaskLogLine line : lineList) { + stringBuilder.append(line.getLineContent()); + } + return stringBuilder.toString(); + } + @Override + public List getLineListByLogId(String logId, Integer count){ + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + wrapper.eq(PipTaskLogLine::getLogId,logId); + wrapper.last("LIMIT "+count); + return logLineDao.selectList(wrapper); + } +}