日志新逻辑(待优化)

This commit is contained in:
even 2025-07-02 16:09:16 +08:00
parent 046c03ec68
commit 79e4df8a60
10 changed files with 121 additions and 87 deletions

View File

@ -3,24 +3,28 @@ package cd.casic.ci.api;
import cd.casic.ci.process.dto.req.history.PipelineHistoryQueryReq;
import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.log.PipTaskLogLine;
import cd.casic.ci.process.process.service.history.PipelineHistoryService;
import cd.casic.ci.process.process.service.taskLog.PipTaskLogLineService;
import cd.casic.ci.process.process.service.taskLog.TaskLogService;
import cd.casic.framework.commons.pojo.CommonResult;
import cd.casic.framework.commons.pojo.PageResult;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("/history")
public class PipHistoryController {
@Resource
TaskLogService taskLogService;
private PipelineHistoryService pipelineHistoryService;
@Resource
PipelineHistoryService pipelineHistoryService;
private PipTaskLogLineService logLineService;
@GetMapping("/getLogById")
public CommonResult<PipTaskLog> getLogById(@RequestParam String id){
PipTaskLog byId = taskLogService.getById(id);
return CommonResult.success(byId);
public CommonResult<String> getLogById(@RequestParam String id){
String content = logLineService.getLineContentByLogId(id, 10000);
return CommonResult.success(content);
}
@PostMapping("/list")
public CommonResult<PageResult<PipPipelineHisInstance>> list(@RequestBody PipelineHistoryQueryReq req){

View File

@ -4,12 +4,16 @@ import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dao.pipeline.PipTaskLogDao;
import cd.casic.ci.process.process.dao.pipeline.PipTaskLogLineDao;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.log.PipTaskLogLine;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.ci.process.process.service.task.TaskService;
import cd.casic.ci.process.process.service.taskLog.PipTaskLogLineService;
import cd.casic.ci.process.util.snowflake.SnowflakeIdentifierGenerator;
import cd.casic.framework.commons.util.network.IpUtil;
import cd.casic.ci.process.util.WebFrameworkUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
@ -35,15 +39,8 @@ public class MemoryLogManager implements LoggerManager {
* 第一级 是taskId可能同一个账号IP建立多个连接所以保存为list
* */
private final Map<String, List<SseEmitter>> taskIdSSEMap = new ConcurrentHashMap<>();
private final Map<String,StringBuffer> taskIdMemoryLogMap = new ConcurrentHashMap<>();
@Resource
private SnowflakeIdentifierGenerator identifierGenerator;
@Resource
private TaskService taskService;
public final Integer FLUSH_DB_SIZE=2*1024*1024;
// public final Integer FLUSH_DB_SIZE=1000;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* 缓存最近一次执行的日志key是taskIdval是数据库id已入库的情况下用于buffer满了增加日志内容
@ -52,6 +49,8 @@ public class MemoryLogManager implements LoggerManager {
private final static Map<String,String> taskIdDbMap = new ConcurrentHashMap<>();
@Resource
private PipTaskLogDao logDao;
@Resource
private PipTaskLogLineService logLineService;
@Override
public SseEmitter subscribe(String taskId,HttpServletRequest request) {
if (request!=null) {
@ -77,35 +76,27 @@ public class MemoryLogManager implements LoggerManager {
* 内存满4mb入库查询日志和入库操作用同一把锁
* 然后新内容推sse
* */
StringBuffer logCache = taskIdMemoryLogMap.computeIfAbsent(taskId, k -> new StringBuffer());
int length = logCache.length();
if (length>=FLUSH_DB_SIZE) {
synchronized (this){
if (length>=FLUSH_DB_SIZE) {
StringBuffer stringBuffer = new StringBuffer();
// 清空缓存
taskIdMemoryLogMap.put(taskId, stringBuffer);
// 入库或者更新
if (taskIdDbMap.containsKey(taskId)) {
// 之前已经入库过
// 存在则更新
String id = taskIdDbMap.get(taskId);
PipTaskLog pipTaskLog = logDao.selectById(id);
pipTaskLog.setContent(pipTaskLog.getContent()+logCache.toString());
logDao.updateById(pipTaskLog);
} else {
// 不存在就新增
PipTaskLog pipTaskLog = new PipTaskLog();
pipTaskLog.setTaskId(taskId);
pipTaskLog.setContent(logCache.toString());
logDao.insert(pipTaskLog);
taskIdDbMap.put(taskId,pipTaskLog.getId());
}
logCache = stringBuffer;
}
}
// 入库或者更新
if (taskIdDbMap.containsKey(taskId)) {
// 之前已经入库过
// 存在则更新
String id = taskIdDbMap.get(taskId);
PipTaskLogLine line = new PipTaskLogLine();
line.setLogId(id);
line.setLineContent(logContent);
logLineService.save(line);
} else {
// 不存在就新增
PipTaskLog pipTaskLog = new PipTaskLog();
pipTaskLog.setTaskId(taskId);
// pipTaskLog.setContent(logCache.toString());
logDao.insert(pipTaskLog);
taskIdDbMap.put(taskId,pipTaskLog.getId());
PipTaskLogLine line = new PipTaskLogLine();
line.setLogId(pipTaskLog.getTaskId());
line.setLineContent(logContent);
logLineService.save(line);
}
logCache.append(logContent);
List<SseEmitter> sseEmitters = taskIdSSEMap.get(taskId);
if (!CollectionUtils.isEmpty(sseEmitters)) {
for (SseEmitter sseEmitter : sseEmitters) {
@ -120,63 +111,31 @@ public class MemoryLogManager implements LoggerManager {
@Transactional(rollbackFor = Exception.class)
public void flushMemory(List<TaskRunContext> taskContextList){
log.info("流水线日志开始入库");
List<PipTaskLog> insertList = new ArrayList<>();
List<PipTaskLog> updateList = new ArrayList<>();
for (TaskRunContext taskRunContext : taskContextList) {
String taskId = taskRunContext.getContextDef().getId();
StringBuffer logCache = taskIdMemoryLogMap.get(taskId);
if (taskIdDbMap.containsKey(taskId)) {
// 之前已经入库过
// 存在则更新
String id = taskIdDbMap.get(taskId);
PipTaskLog pipTaskLog = logDao.selectById(id);
if (logCache!=null) {
pipTaskLog.setContent(pipTaskLog.getContent()+logCache);
}
updateList.add(pipTaskLog);
taskIdMemoryLogMap.remove(taskId);
taskRunContext.getLocalVariables().put(EngineRuntimeConstant.HIS_LOG_KEY,pipTaskLog.getId());
List<SseEmitter> sseEmitters = taskIdSSEMap.get(taskId);
if (!CollectionUtils.isEmpty(sseEmitters)) {
sseEmitters.forEach(ResponseBodyEmitter::complete);
}
logDao.updateById(pipTaskLog);
} else {
// 不存在就新增
PipTaskLog pipTaskLog = new PipTaskLog();
pipTaskLog.setTaskId(taskId);
if (logCache!=null) {
pipTaskLog.setContent(logCache.toString());
}
pipTaskLog.setId(identifierGenerator.nextUUID(null));
insertList.add(pipTaskLog);
taskRunContext.getLocalVariables().put(EngineRuntimeConstant.HIS_LOG_KEY,pipTaskLog.getId());
taskIdMemoryLogMap.remove(taskId);
taskIdDbMap.put(taskId,pipTaskLog.getId());
taskRunContext.getLocalVariables().put(EngineRuntimeConstant.HIS_LOG_KEY,id);
List<SseEmitter> sseEmitters = taskIdSSEMap.get(taskId);
if (!CollectionUtils.isEmpty(sseEmitters)) {
sseEmitters.forEach(ResponseBodyEmitter::complete);
}
}
}
if (!CollectionUtils.isEmpty(insertList)) {
logDao.insertBatch(insertList);
}
if (!CollectionUtils.isEmpty(updateList)) {
logDao.updateBatch(updateList);
}
}
@Override
public String getLogContent(String taskId) {
StringBuffer logCache = taskIdMemoryLogMap.getOrDefault(taskId, new StringBuffer());
String id = taskIdDbMap.get(taskId);
if (id != null) {
PipTaskLog pipTaskLog = logDao.selectById(id);
return pipTaskLog.getContent()+logCache.toString();
List<PipTaskLogLine> lineList = logLineService.getLineListByLogId(id,10000);
StringBuilder stringBuilder = new StringBuilder();
for (PipTaskLogLine line : lineList) {
stringBuilder.append(line.getLineContent());
}
return stringBuilder.toString();
}
return logCache.toString();
return "";
}
public void clear(String pipelineId){
PipTask query = new PipTask();
@ -210,4 +169,6 @@ public class MemoryLogManager implements LoggerManager {
log.error("===================超时taskId:{}断开连接===============",taskId);
});
}
}

View File

@ -93,8 +93,8 @@ public class AFLWorker extends DockerWorker {
// 获取docker 暂时先写固定值
dockerRun(commandScript,resourceListByType.getDockerEndpointList().get(0),context, runningTime);
} catch (Exception e) {
String errorMessage = "该节点配置信息为空,请先配置该节点信息" + "\r\n";
log.error("执行ssh失败:", e);
String errorMessage = "执行afl失败"+e.getMessage() + "\r\n";
log.error("执行afl失败", e);
append(context, errorMessage);
toBadEnding();
}

View File

@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import javax.swing.text.StringContent;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

View File

@ -2,6 +2,7 @@ package cd.casic.ci.process.process.dao.pipeline;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.framework.mybatis.core.mapper.BaseMapperX;
import org.apache.ibatis.annotations.Param;
public interface PipTaskLogDao extends BaseMapperX<PipTaskLog> {
}

View File

@ -0,0 +1,8 @@
package cd.casic.ci.process.process.dao.pipeline;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.log.PipTaskLogLine;
import cd.casic.framework.mybatis.core.mapper.BaseMapperX;
public interface PipTaskLogLineDao extends BaseMapperX<PipTaskLogLine> {
}

View File

@ -11,11 +11,6 @@ import lombok.EqualsAndHashCode;
@Data
public class PipTaskLog extends BaseDO {
private String taskId;
private String content;
@TableId(type = IdType.ASSIGN_ID)
private String id;
public void append(String content){
this.content += CommandConstant.ENTER;
this.content +=content;
}
}

View File

@ -0,0 +1,13 @@
package cd.casic.ci.process.process.dataObject.log;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import lombok.Data;
@Data
public class PipTaskLogLine {
@TableId(type = IdType.ASSIGN_ID)
private String id;
private String logId;
private String lineContent;
}

View File

@ -0,0 +1,12 @@
package cd.casic.ci.process.process.service.taskLog;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.log.PipTaskLogLine;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
public interface PipTaskLogLineService extends IService<PipTaskLogLine> {
String getLineContentByLogId(String logId,Integer count);
List<PipTaskLogLine> getLineListByLogId(String logId, Integer count);
}

View File

@ -0,0 +1,39 @@
package cd.casic.ci.process.process.service.taskLog.impl;
import cd.casic.ci.process.process.dao.pipeline.PipTaskLogLineDao;
import cd.casic.ci.process.process.dataObject.log.PipTaskLogLine;
import cd.casic.ci.process.process.service.taskLog.PipTaskLogLineService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;
@Service
public class PipTaskLogLineServiceImpl extends ServiceImpl<PipTaskLogLineDao,PipTaskLogLine> implements PipTaskLogLineService {
@Resource
private PipTaskLogLineDao logLineDao;
@Override
public String getLineContentByLogId(String logId,Integer count){
List<PipTaskLogLine> lineList = getLineListByLogId(logId, count);
if (CollectionUtils.isEmpty(lineList)) {
return "";
}
StringBuilder stringBuilder = new StringBuilder();
for (PipTaskLogLine line : lineList) {
stringBuilder.append(line.getLineContent());
}
return stringBuilder.toString();
}
@Override
public List<PipTaskLogLine> getLineListByLogId(String logId, Integer count){
LambdaQueryWrapper<PipTaskLogLine> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(PipTaskLogLine::getLogId,logId);
wrapper.last("LIMIT "+count);
return logLineDao.selectList(wrapper);
}
}