AFL节点后置处理

This commit is contained in:
even 2025-07-08 10:29:52 +08:00
parent 88219bff9a
commit 17da55937e
5 changed files with 75 additions and 25 deletions

View File

@ -5,4 +5,6 @@ public class AFLConstant {
public static final String COMMAND_END="commandEnd"; public static final String COMMAND_END="commandEnd";
public static final String EXECUTABLE_NAME ="executableName"; public static final String EXECUTABLE_NAME ="executableName";
public static final String RUNNING_TIME ="runningTime"; public static final String RUNNING_TIME ="runningTime";
public static final String GROUP_ID= "groupId";
} }

View File

@ -1,9 +1,9 @@
package cd.casic.ci.process.engine.manager; package cd.casic.ci.process.engine.manager;
import cd.casic.ci.process.engine.postHandler.PipExecutePostHandler; import cd.casic.ci.process.engine.postHandler.ExecuteTaskPostHandler;
import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance; import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance;
public interface PostHandlerManager { public interface PostHandlerManager {
public void registerPostHandler(PipExecutePostHandler handler); public void registerPostHandler(ExecuteTaskPostHandler handler);
public void executePostHandler(PipPipelineHisInstance hisInstance); public void executePostHandler(PipPipelineHisInstance hisInstance);
} }

View File

@ -1,7 +1,7 @@
package cd.casic.ci.process.engine.manager.impl; package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.process.engine.manager.PostHandlerManager; import cd.casic.ci.process.engine.manager.PostHandlerManager;
import cd.casic.ci.process.engine.postHandler.PipExecutePostHandler; import cd.casic.ci.process.engine.postHandler.ExecuteTaskPostHandler;
import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance; import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance;
import cd.casic.framework.commons.exception.ServiceException; import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
@ -15,24 +15,25 @@ import java.util.concurrent.CopyOnWriteArrayList;
@Component @Component
public class MemoryPostHandlerManager implements PostHandlerManager { public class MemoryPostHandlerManager implements PostHandlerManager {
private ConcurrentHashMap<String, List<PipExecutePostHandler>> handlerMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, List<ExecuteTaskPostHandler>> handlerMap = new ConcurrentHashMap<>();
@Override @Override
public void registerPostHandler(PipExecutePostHandler handler) { public void registerPostHandler(ExecuteTaskPostHandler handler) {
if (handler==null|| StringUtils.isNotEmpty(handler.getPipelineId())) { if (handler==null|| StringUtils.isNotEmpty(handler.getPipelineId())) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"注册后置处理器失败"); throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"注册后置处理器失败");
} }
List<PipExecutePostHandler> orDefault = handlerMap.getOrDefault(handler.getPipelineId(), new CopyOnWriteArrayList<>()); List<ExecuteTaskPostHandler> orDefault = handlerMap.getOrDefault(handler.getPipelineId(), new CopyOnWriteArrayList<>());
orDefault.add(handler); orDefault.add(handler);
handlerMap.put(handler.getPipelineId(),orDefault);
} }
@Override @Override
public void executePostHandler(PipPipelineHisInstance hisInstance) { public void executePostHandler(PipPipelineHisInstance hisInstance) {
String pipelineId = hisInstance.getPipelineId(); String pipelineId = hisInstance.getPipelineId();
List<PipExecutePostHandler> pipExecutePostHandlers = handlerMap.get(pipelineId); List<ExecuteTaskPostHandler> pipExecutePostHandlers = handlerMap.get(pipelineId);
if (CollectionUtils.isEmpty(pipExecutePostHandlers)) { if (CollectionUtils.isEmpty(pipExecutePostHandlers)) {
return; return;
} }
for (PipExecutePostHandler postHandler : pipExecutePostHandlers) { for (ExecuteTaskPostHandler postHandler : pipExecutePostHandlers) {
postHandler.executeAfterDone(hisInstance); postHandler.executeAfterDone(hisInstance);
} }
} }

View File

@ -6,7 +6,7 @@ import lombok.Data;
@Data @Data
@AllArgsConstructor @AllArgsConstructor
public abstract class PipExecutePostHandler { public abstract class ExecuteTaskPostHandler {
private String taskId; private String taskId;
private String pipelineId; private String pipelineId;

View File

@ -6,11 +6,12 @@ import cd.casic.ci.process.dto.req.resource.ResourceQueryReq;
import cd.casic.ci.process.dto.resp.resource.ResourceFindResp; import cd.casic.ci.process.dto.resp.resource.ResourceFindResp;
import cd.casic.ci.process.dto.resp.taskResource.TaskResourceFindResp; import cd.casic.ci.process.dto.resp.taskResource.TaskResourceFindResp;
import cd.casic.ci.process.engine.constant.AFLConstant; import cd.casic.ci.process.engine.constant.AFLConstant;
import cd.casic.ci.process.engine.constant.AFLSlotCompileConstant;
import cd.casic.ci.process.engine.constant.DIYImageExecuteCommandConstant; import cd.casic.ci.process.engine.constant.DIYImageExecuteCommandConstant;
import cd.casic.ci.process.engine.constant.PipelineVariableConstant; import cd.casic.ci.process.engine.manager.PostHandlerManager;
import cd.casic.ci.process.engine.postHandler.ExecuteTaskPostHandler;
import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.DockerWorker; import cd.casic.ci.process.engine.worker.base.DockerWorker;
import cd.casic.ci.process.process.dataObject.history.PipPipelineHisInstance;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.target.TargetVersion; import cd.casic.ci.process.process.dataObject.target.TargetVersion;
import cd.casic.ci.process.process.dataObject.task.PipTask; import cd.casic.ci.process.process.dataObject.task.PipTask;
@ -18,13 +19,17 @@ import cd.casic.ci.process.process.service.aflManager.AflInfoService;
import cd.casic.ci.process.process.service.aflManager.AflPlotInfoService; import cd.casic.ci.process.process.service.aflManager.AflPlotInfoService;
import cd.casic.ci.process.process.service.aflManager.AflSeedInfoService; import cd.casic.ci.process.process.service.aflManager.AflSeedInfoService;
import cd.casic.ci.process.process.service.target.TargetVersionService; import cd.casic.ci.process.process.service.target.TargetVersionService;
import cd.casic.ci.process.util.SftpUploadUtil;
import com.alibaba.fastjson.JSON;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.io.File; import java.io.File;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import static cd.casic.ci.process.engine.constant.AFLConstant.*; import static cd.casic.ci.process.engine.constant.AFLConstant.*;
import static cd.casic.ci.process.engine.constant.PipelineVariableConstant.*; import static cd.casic.ci.process.engine.constant.PipelineVariableConstant.*;
@ -34,6 +39,14 @@ import static cd.casic.ci.process.engine.constant.PipelineVariableConstant.*;
public class AFLWorker extends DockerWorker { public class AFLWorker extends DockerWorker {
@Resource @Resource
private TargetVersionService targetVersionService; private TargetVersionService targetVersionService;
@Resource
private AflInfoService aflInfoService;
@Resource
private AflSeedInfoService aflSeedInfoService;
@Resource
private AflPlotInfoService aflPlotInfoService;
@Resource
private PostHandlerManager postHandlerManager;
@Override @Override
public void execute(TaskRunContext context) { public void execute(TaskRunContext context) {
int statusCode = -1; int statusCode = -1;
@ -107,7 +120,8 @@ public class AFLWorker extends DockerWorker {
toBadEnding(); toBadEnding();
} }
localVariables.put(DIYImageExecuteCommandConstant.STATUS_CODE, statusCode); localVariables.put(DIYImageExecuteCommandConstant.STATUS_CODE, statusCode);
afterTaskExecute(context);
afterPipelineExecute(context);
} }
} }
private String cdSourceName(String fileName){ private String cdSourceName(String fileName){
@ -123,17 +137,50 @@ public class AFLWorker extends DockerWorker {
} }
return null; return null;
} }
@Resource @Async
private AflInfoService aflInfoService; public void afterTaskExecute(TaskRunContext context){
@Resource append(context,"开始拉取afl输出信息");
private AflSeedInfoService aflSeedInfoService; PipTask contextDef = (PipTask)context.getContextDef();
@Resource Map<String, Object> localVariables = context.getLocalVariables();
private AflPlotInfoService aflPlotInfoService; String pipelineId =contextDef.getPipelineId();
// private void afterTaskExecute(){ String taskId = contextDef.getId();
// AflManagerReq aflManagerReq = new AflManagerReq(); String groupId = UUID.randomUUID().toString().replace("-","");
// aflInfoService.saveAflInfo(); localVariables.put(GROUP_ID,groupId);
// } AflManagerReq req = new AflManagerReq();
// private void afterPipelineExecute(){ req.setGroupIdentifier(groupId);
// req.setTaskId(taskId);
// } req.setPipelineId(pipelineId);
try {
aflInfoService.saveAflInfo(req);
} catch (SftpUploadUtil.SftpUploadException e) {
log.error("保存afl信息失败,入参{}", JSON.toJSONString(req),e);
}
aflSeedInfoService.saveAflSeedInfo(req);
try {
aflPlotInfoService.saveAflPlotInfo(req);
} catch (SftpUploadUtil.SftpUploadException e) {
log.error("保存afl信息失败,入参{}", JSON.toJSONString(req),e);
}
}
private void afterPipelineExecute(TaskRunContext context){
append(context,"开始拉取afl输出信息");
PipTask contextDef = (PipTask)context.getContextDef();
String pipelineId =contextDef.getPipelineId();
String taskId = contextDef.getId();
AflManagerReq req = new AflManagerReq();
req.setTaskId(taskId);
req.setPipelineId(pipelineId);
postHandlerManager.registerPostHandler(new ExecuteTaskPostHandler(taskId,pipelineId) {
@Override
public void executeAfterDone(PipPipelineHisInstance pipPipelineHisInstance) {
Map<String, Object> localVariables = context.getLocalVariables();
String groupId = localVariables.get(GROUP_ID) instanceof String ? ((String) localVariables.get(GROUP_ID)) : null;
req.setGroupIdentifier(groupId);
req.setPipelineHistoryId(pipPipelineHisInstance.getId());
aflInfoService.updateHistoryPipelineIdByAflInfo(req);
aflSeedInfoService.updateHistoryPipelineIdByAflSeedInfo(req);
aflPlotInfoService.updateHistoryPipelineIdByAflPlotInfo(req);
}
});
}
} }