Compare commits

...

5 Commits

Author SHA1 Message Date
even
b8e344547a 日志修改 2025-05-26 20:41:07 +08:00
even
60b753e2eb Merge branch 'temp' of http://1.14.125.6:3000/mianbin/ops-pro into temp 2025-05-26 20:37:02 +08:00
even
a986288195 日志修改 2025-05-26 20:35:38 +08:00
even
02447a79a4 日志修改 2025-05-26 20:04:50 +08:00
even
e16d90d3c4 日志修改 2025-05-26 19:55:33 +08:00
2 changed files with 31 additions and 7 deletions

View File

@ -9,6 +9,7 @@ import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException; import java.io.IOException;
@ -16,6 +17,10 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/** /**
* 流水线运行时日志管理 * 流水线运行时日志管理
* */ * */
@ -25,11 +30,12 @@ public class MemoryLogManager implements LoggerManager {
/** /**
* 第一级 是taskId可能同一个账号IP建立多个连接所以保存为list * 第一级 是taskId可能同一个账号IP建立多个连接所以保存为list
* */ * */
private final static Map<String, List<SseEmitter>> taskIdSSEMap = new ConcurrentHashMap<>(); private final Map<String, List<SseEmitter>> taskIdSSEMap = new ConcurrentHashMap<>();
private final static Map<String,StringBuffer> taskIdMemoryLogMap = new ConcurrentHashMap<>(); private final Map<String,StringBuffer> taskIdMemoryLogMap = new ConcurrentHashMap<>();
public static final Integer FLUSH_DB_SIZE=2*1024*1024; public final Integer FLUSH_DB_SIZE=2*1024*1024;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/** /**
* 缓存最近一次执行的日志key是taskIdval是数据库id已入库的情况下用于buffer满了增加日志内容 * 缓存最近一次执行的日志key是taskIdval是数据库id已入库的情况下用于buffer满了增加日志内容
* 读取日志的时候同时读取数据库和内存中的日志 * 读取日志的时候同时读取数据库和内存中的日志
@ -40,15 +46,16 @@ public class MemoryLogManager implements LoggerManager {
@Override @Override
public SseEmitter subscribe(String taskId,HttpServletRequest request) { public SseEmitter subscribe(String taskId,HttpServletRequest request) {
String ipAddr = IpUtil.getIpAddr(request); String ipAddr = IpUtil.getIpAddr(request);
SseEmitter emitter = new SseEmitter(3000L); SseEmitter emitter = new SseEmitter(30000L);
emitterInit(emitter,taskId); emitterInit(emitter,taskId);
Long loginUserId = WebFrameworkUtils.getLoginUserId(); Long loginUserId = WebFrameworkUtils.getLoginUserId();
log.info("SSE连接建立"); log.info("SSE连接建立");
log.info("当前请求ip{}",ipAddr); log.info("当前请求ip{}",ipAddr);
log.info("当前用户id{}",loginUserId); log.info("当前用户id{}",loginUserId);
log.info("当前taskId{}",loginUserId); log.info("当前taskId{}",loginUserId);
List<SseEmitter> taskIdSSEList = taskIdSSEMap.getOrDefault(taskId, new ArrayList<>(1)); List<SseEmitter> taskIdSSEList = taskIdSSEMap.computeIfAbsent(taskId, k->new ArrayList<>(1));
taskIdSSEList.add(emitter); taskIdSSEList.add(emitter);
return emitter; return emitter;
} }
@ -59,7 +66,7 @@ public class MemoryLogManager implements LoggerManager {
* 内存满4mb入库查询日志和入库操作用同一把锁 * 内存满4mb入库查询日志和入库操作用同一把锁
* 然后新内容推sse * 然后新内容推sse
* */ * */
StringBuffer logCache = taskIdMemoryLogMap.getOrDefault(taskId, new StringBuffer()); StringBuffer logCache = taskIdMemoryLogMap.computeIfAbsent(taskId, k -> new StringBuffer());
int length = logCache.length(); int length = logCache.length();
if (length>=FLUSH_DB_SIZE) { if (length>=FLUSH_DB_SIZE) {
synchronized (this){ synchronized (this){
@ -88,6 +95,16 @@ public class MemoryLogManager implements LoggerManager {
} }
} }
logCache.append(logContent); logCache.append(logContent);
List<SseEmitter> sseEmitters = taskIdSSEMap.get(taskId);
if (!CollectionUtils.isEmpty(sseEmitters)) {
for (SseEmitter sseEmitter : sseEmitters) {
try {
sseEmitter.send(logContent);
} catch (IOException e) {
log.error("",e);
}
}
}
} }
@Override @Override
@ -102,6 +119,14 @@ public class MemoryLogManager implements LoggerManager {
} }
private void emitterInit(SseEmitter emitter,String taskId){ private void emitterInit(SseEmitter emitter,String taskId){
// 维持心跳
scheduler.scheduleWithFixedDelay(()-> {
try {
emitter.send("\n\n");
} catch (IOException e) {
log.error("",e);
}
},0,30,TimeUnit.SECONDS);
emitter.onCompletion(()->{ emitter.onCompletion(()->{
taskIdSSEMap.remove(taskId); taskIdSSEMap.remove(taskId);
log.info("===================taskId:{}断开连接===============",taskId); log.info("===================taskId:{}断开连接===============",taskId);

View File

@ -124,7 +124,6 @@ public class TaskServiceImpl extends ServiceImpl<PipTaskDao, PipTask> implements
@Override @Override
public CommonResult<String> getLogContentByTaskId(String taskId) { public CommonResult<String> getLogContentByTaskId(String taskId) {
return CommonResult.success(loggerManager.getLogContent(taskId)); return CommonResult.success(loggerManager.getLogContent(taskId));
} }
} }