Compare commits
5 Commits
b8170bc955
...
b8e344547a
Author | SHA1 | Date | |
---|---|---|---|
![]() |
b8e344547a | ||
![]() |
60b753e2eb | ||
![]() |
a986288195 | ||
![]() |
02447a79a4 | ||
![]() |
e16d90d3c4 |
@ -9,6 +9,7 @@ import jakarta.annotation.Resource;
|
|||||||
import jakarta.servlet.http.HttpServletRequest;
|
import jakarta.servlet.http.HttpServletRequest;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -16,6 +17,10 @@ import java.util.ArrayList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 流水线运行时日志管理
|
* 流水线运行时日志管理
|
||||||
* */
|
* */
|
||||||
@ -25,11 +30,12 @@ public class MemoryLogManager implements LoggerManager {
|
|||||||
/**
|
/**
|
||||||
* 第一级 是taskId,可能同一个账号、IP、建立多个连接所以保存为list
|
* 第一级 是taskId,可能同一个账号、IP、建立多个连接所以保存为list
|
||||||
* */
|
* */
|
||||||
private final static Map<String, List<SseEmitter>> taskIdSSEMap = new ConcurrentHashMap<>();
|
private final Map<String, List<SseEmitter>> taskIdSSEMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final static Map<String,StringBuffer> taskIdMemoryLogMap = new ConcurrentHashMap<>();
|
private final Map<String,StringBuffer> taskIdMemoryLogMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public static final Integer FLUSH_DB_SIZE=2*1024*1024;
|
public final Integer FLUSH_DB_SIZE=2*1024*1024;
|
||||||
|
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
||||||
/**
|
/**
|
||||||
* 缓存最近一次执行的日志,key是taskId,val是数据库id(已入库的情况下)用于buffer满了增加日志内容
|
* 缓存最近一次执行的日志,key是taskId,val是数据库id(已入库的情况下)用于buffer满了增加日志内容
|
||||||
* 读取日志的时候同时读取数据库和内存中的日志
|
* 读取日志的时候同时读取数据库和内存中的日志
|
||||||
@ -40,15 +46,16 @@ public class MemoryLogManager implements LoggerManager {
|
|||||||
@Override
|
@Override
|
||||||
public SseEmitter subscribe(String taskId,HttpServletRequest request) {
|
public SseEmitter subscribe(String taskId,HttpServletRequest request) {
|
||||||
String ipAddr = IpUtil.getIpAddr(request);
|
String ipAddr = IpUtil.getIpAddr(request);
|
||||||
SseEmitter emitter = new SseEmitter(3000L);
|
SseEmitter emitter = new SseEmitter(30000L);
|
||||||
emitterInit(emitter,taskId);
|
emitterInit(emitter,taskId);
|
||||||
Long loginUserId = WebFrameworkUtils.getLoginUserId();
|
Long loginUserId = WebFrameworkUtils.getLoginUserId();
|
||||||
log.info("SSE连接建立");
|
log.info("SSE连接建立");
|
||||||
log.info("当前请求ip:{}",ipAddr);
|
log.info("当前请求ip:{}",ipAddr);
|
||||||
log.info("当前用户id:{}",loginUserId);
|
log.info("当前用户id:{}",loginUserId);
|
||||||
log.info("当前taskId:{}",loginUserId);
|
log.info("当前taskId:{}",loginUserId);
|
||||||
List<SseEmitter> taskIdSSEList = taskIdSSEMap.getOrDefault(taskId, new ArrayList<>(1));
|
List<SseEmitter> taskIdSSEList = taskIdSSEMap.computeIfAbsent(taskId, k->new ArrayList<>(1));
|
||||||
taskIdSSEList.add(emitter);
|
taskIdSSEList.add(emitter);
|
||||||
|
|
||||||
return emitter;
|
return emitter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -59,7 +66,7 @@ public class MemoryLogManager implements LoggerManager {
|
|||||||
* 内存满4mb入库,查询日志和入库操作用同一把锁
|
* 内存满4mb入库,查询日志和入库操作用同一把锁
|
||||||
* 然后新内容推sse
|
* 然后新内容推sse
|
||||||
* */
|
* */
|
||||||
StringBuffer logCache = taskIdMemoryLogMap.getOrDefault(taskId, new StringBuffer());
|
StringBuffer logCache = taskIdMemoryLogMap.computeIfAbsent(taskId, k -> new StringBuffer());
|
||||||
int length = logCache.length();
|
int length = logCache.length();
|
||||||
if (length>=FLUSH_DB_SIZE) {
|
if (length>=FLUSH_DB_SIZE) {
|
||||||
synchronized (this){
|
synchronized (this){
|
||||||
@ -88,6 +95,16 @@ public class MemoryLogManager implements LoggerManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
logCache.append(logContent);
|
logCache.append(logContent);
|
||||||
|
List<SseEmitter> sseEmitters = taskIdSSEMap.get(taskId);
|
||||||
|
if (!CollectionUtils.isEmpty(sseEmitters)) {
|
||||||
|
for (SseEmitter sseEmitter : sseEmitters) {
|
||||||
|
try {
|
||||||
|
sseEmitter.send(logContent);
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("",e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -102,6 +119,14 @@ public class MemoryLogManager implements LoggerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void emitterInit(SseEmitter emitter,String taskId){
|
private void emitterInit(SseEmitter emitter,String taskId){
|
||||||
|
// 维持心跳
|
||||||
|
scheduler.scheduleWithFixedDelay(()-> {
|
||||||
|
try {
|
||||||
|
emitter.send("\n\n");
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("",e);
|
||||||
|
}
|
||||||
|
},0,30,TimeUnit.SECONDS);
|
||||||
emitter.onCompletion(()->{
|
emitter.onCompletion(()->{
|
||||||
taskIdSSEMap.remove(taskId);
|
taskIdSSEMap.remove(taskId);
|
||||||
log.info("===================taskId:{}断开连接===============",taskId);
|
log.info("===================taskId:{}断开连接===============",taskId);
|
||||||
|
@ -124,7 +124,6 @@ public class TaskServiceImpl extends ServiceImpl<PipTaskDao, PipTask> implements
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CommonResult<String> getLogContentByTaskId(String taskId) {
|
public CommonResult<String> getLogContentByTaskId(String taskId) {
|
||||||
|
|
||||||
return CommonResult.success(loggerManager.getLogContent(taskId));
|
return CommonResult.success(loggerManager.getLogContent(taskId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user