联调问题修复,日志修改

This commit is contained in:
even 2025-05-26 19:18:25 +08:00
parent 1520c8c103
commit 4b9c846a7a
19 changed files with 123 additions and 67 deletions

View File

@ -1,6 +1,9 @@
package cd.casic.ci.common.pipeline.resp.context; package cd.casic.ci.common.pipeline.resp.context;
import lombok.Data; import lombok.Data;
import java.time.LocalDateTime;
/** /**
* 单节点上下文状态返回对象 * 单节点上下文状态返回对象
* */ * */
@ -14,4 +17,6 @@ public class SingletonRunContextResp {
* 状态 详见 ContextStateEnum * 状态 详见 ContextStateEnum
* */ * */
private Integer state; private Integer state;
private LocalDateTime startTime;
private LocalDateTime endTime;
} }

View File

@ -3,10 +3,12 @@ package cd.casic.ci.common.pipeline.resp.context;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import java.util.List;
import java.util.Map; import java.util.Map;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Data @Data
public class TreeRunContextResp extends SingletonRunContextResp{ public class TreeRunContextResp extends SingletonRunContextResp{
private Map<String,TreeRunContextResp> child; private Map<String,TreeRunContextResp> child;
private List<TreeRunContextResp> taskList;
} }

View File

@ -105,5 +105,15 @@ public class PipelineFindResp {
private List<StageResp> stageList; private List<StageResp> stageList;
/**
* 目标id
*/
private String targetVersionId;
/**
* 目标类型
*/
private String targetType;
} }

View File

@ -19,12 +19,12 @@ public class TaskTemplateResp {
/** /**
* name * name
*/ */
private String name; private String taskType;
/** /**
* 标签 * 标签
*/ */
private String label; private String taskName;
/** /**
* 描述 * 描述

View File

@ -42,6 +42,9 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
@Override @Override
public PipelineRunContext execute(String pipelineId) { public PipelineRunContext execute(String pipelineId) {
PipPipeline pipeline = pipelineService.getById(pipelineId); PipPipeline pipeline = pipelineService.getById(pipelineId);
if (pipeline==null) {
return null;
}
// TODO 判断状态不能重复运行 // TODO 判断状态不能重复运行
Integer state = pipeline.getState(); Integer state = pipeline.getState();
// TODO 判断资源是否申请成功是否处于可运行状态 // TODO 判断资源是否申请成功是否处于可运行状态

View File

@ -89,6 +89,9 @@ public class DefaultWorkerManager extends WorkerManager {
} }
String taskType = task.getTaskType(); String taskType = task.getTaskType();
BaseWorker baseWorker = taskTypeWorkerMap.get(taskType); BaseWorker baseWorker = taskTypeWorkerMap.get(taskType);
if (baseWorker==null) {
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"找不到worker");
}
baseWorker.setContextKey(task.getId()); baseWorker.setContextKey(task.getId());
workerExecutor.execute(baseWorker); workerExecutor.execute(baseWorker);
}catch (Exception e){ }catch (Exception e){

View File

@ -82,6 +82,7 @@ public abstract class BaseRunContext {
return; return;
} }
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) { if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
this.endTime=LocalDateTime.now();
parentContext.checkChildEnd(); parentContext.checkChildEnd();
} else if(ContextStateEnum.READY.equals(state)){ } else if(ContextStateEnum.READY.equals(state)){
parentContext.checkChildReady(); parentContext.checkChildReady();

View File

@ -3,6 +3,8 @@ package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin; import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.context.ConstantContextHolder; import cd.casic.ci.process.engine.context.ConstantContextHolder;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog; import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
@ -58,15 +60,15 @@ public class ApplicationWorker extends HttpWorker{
PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY);
PipBaseElement contextDef = context.getContextDef(); PipBaseElement contextDef = context.getContextDef();
pipTaskLog.append("SCA-应用包审查分析节点开始执行"); append(context,"SCA-应用包审查分析节点开始执行");
pipTaskLog.append("SCA-应用包审查分析节点配置:" + contextDef); append(context,"SCA-应用包审查分析节点配置:" + contextDef);
if (ObjectUtil.isEmpty(contextDef)) { if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[application]配置"); log.error("未查询到节点[application]配置");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
} }
if (ObjectUtil.isEmpty(contextDef)) { if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[application]配置"); log.error("未查询到节点[application]配置");
pipTaskLog.append("未查询到节点[application]配置"); append(context,"未查询到节点[application]配置");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
} }
@ -89,14 +91,14 @@ public class ApplicationWorker extends HttpWorker{
if (!file.exists() || !file.canRead()) { if (!file.exists() || !file.canRead()) {
log.error("目标文件不存在或不可读"); log.error("目标文件不存在或不可读");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
pipTaskLog.append("目标文件不存在或不可读"); append(context,"目标文件不存在或不可读");
} }
handleUpload(pipTask.getTaskProperties(), file,pipTaskLog); handleUpload(pipTask.getTaskProperties(), file,context);
}catch (Exception e){ }catch (Exception e){
pipTaskLog.append("==================SCA-应用包审查分析节点执行失败================="); append(context,"==================SCA-应用包审查分析节点执行失败=================");
pipTaskLog.append("SCA-应用包审查分析节点执行失败失败,请检查当前节点配置!"); append(context,"SCA-应用包审查分析节点执行失败失败,请检查当前节点配置!");
pipTaskLog.append(e.getMessage()); append(context,e.getMessage());
log.error("==================SCA-应用包审查分析节点执行失败=================",e); log.error("==================SCA-应用包审查分析节点执行失败=================",e);
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-应用包审查分析节点执行失败"); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-应用包审查分析节点执行失败");
} }
@ -105,7 +107,7 @@ public class ApplicationWorker extends HttpWorker{
localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog);
} }
private void handleUpload(Map<String,Object> applicationConfigInfo, File file, PipTaskLog pipTaskLog) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { private void handleUpload(Map<String,Object> applicationConfigInfo, File file, BaseRunContext context) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
RestTemplate restTemplate = getRestTemplateWithoutSANCheck(); RestTemplate restTemplate = getRestTemplateWithoutSANCheck();
String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/app-package/detect-file"; String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/app-package/detect-file";
MultiValueMap<String, Object> body = buildRequestBody(applicationConfigInfo, file); MultiValueMap<String, Object> body = buildRequestBody(applicationConfigInfo, file);
@ -119,7 +121,7 @@ public class ApplicationWorker extends HttpWorker{
String message = response.getString("message"); String message = response.getString("message");
if (message.equals("success")) { if (message.equals("success")) {
pipTaskLog.append("===============SCA上传成功================="); append(context,"===============SCA上传成功=================");
JSONObject data = response.getJSONObject("data"); JSONObject data = response.getJSONObject("data");
Integer scaTaskId = data.getInteger("scaTaskId"); Integer scaTaskId = data.getInteger("scaTaskId");
@ -133,7 +135,7 @@ public class ApplicationWorker extends HttpWorker{
} }
pollTaskStatus(restTemplate, oldScaTaskId); pollTaskStatus(restTemplate, oldScaTaskId);
} else { } else {
pipTaskLog.append("==================SCA接口异常调用失败================="); append(context,"==================SCA接口异常调用失败=================");
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-应用包审查分析节点执行失败"); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-应用包审查分析节点执行失败");
} }
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();

View File

@ -4,6 +4,7 @@ package cd.casic.ci.process.engine.worker;
import cd.casic.ci.process.constant.CommandConstant; import cd.casic.ci.process.constant.CommandConstant;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.enums.ContextStateEnum; import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.LoggerManager;
import cd.casic.ci.process.engine.manager.RunContextManager; import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.BaseRunContext;
@ -25,6 +26,7 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import javax.swing.text.StringContent;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -37,7 +39,10 @@ public abstract class BaseWorker implements Runnable{
private RunContextManager contextManager; private RunContextManager contextManager;
private String contextKey; private String contextKey;
@Resource @Resource
public MachineInfoService machineInfoService; private MachineInfoService machineInfoService;
@Resource
private LoggerManager loggerManager;
@Override @Override
public void run() { public void run() {
@ -69,6 +74,11 @@ public abstract class BaseWorker implements Runnable{
taskRunContext.changeContextState(ContextStateEnum.HAPPY_ENDING); taskRunContext.changeContextState(ContextStateEnum.HAPPY_ENDING);
} }
} }
public void append(BaseRunContext context, String content){
if (context instanceof TaskRunContext taskRunContext) {
loggerManager.append(taskRunContext.getContextDef().getId(),content);
}
}
public abstract void execute(TaskRunContext context); public abstract void execute(TaskRunContext context);
public void toBadEnding(){ public void toBadEnding(){
throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),""); throw new ServiceException(GlobalErrorCodeConstants.PIPELINE_ERROR.getCode(),"");

View File

@ -50,15 +50,15 @@ public class CodingWorker extends HttpWorker{
PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY);
PipBaseElement contextDef = context.getContextDef(); PipBaseElement contextDef = context.getContextDef();
pipTaskLog.append("SCA-代码仓库管理节点开始执行"); append(context,"SCA-代码仓库管理节点开始执行");
pipTaskLog.append("SCA-代码仓库管理节点配置:" + contextDef); append(context,"SCA-代码仓库管理节点配置:" + contextDef);
if (ObjectUtil.isEmpty(contextDef)) { if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[coding]配置"); log.error("未查询到节点[coding]配置");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
} }
if (ObjectUtil.isEmpty(contextDef)) { if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[coding]配置"); log.error("未查询到节点[coding]配置");
pipTaskLog.append("未查询到节点[coding]配置"); append(context,"未查询到节点[coding]配置");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
} }
@ -100,7 +100,7 @@ public class CodingWorker extends HttpWorker{
JSONObject response = restTemplate.postForObject(scaUploadUrl, requestEntity, JSONObject.class); JSONObject response = restTemplate.postForObject(scaUploadUrl, requestEntity, JSONObject.class);
if (response.getString("message").equals("success")) { if (response.getString("message").equals("success")) {
pipTaskLog.append("===============SCA上传成功================="); append(context,"===============SCA上传成功=================");
JSONObject data = response.getJSONObject("data"); JSONObject data = response.getJSONObject("data");
Integer scaTaskId = data.getInteger("scaTaskId"); Integer scaTaskId = data.getInteger("scaTaskId");
//轮询请求状态 //轮询请求状态
@ -116,14 +116,14 @@ public class CodingWorker extends HttpWorker{
pollTaskStatus(restTemplate, oldScaTaskId); pollTaskStatus(restTemplate, oldScaTaskId);
} else { } else {
//基于第三方文档表明500状态基本属于无镜像配置对比失败于是打印日志为无法对比镜像 //基于第三方文档表明500状态基本属于无镜像配置对比失败于是打印日志为无法对比镜像
pipTaskLog.append("==================SCA校验失败================="); append(context,"==================SCA校验失败=================");
pipTaskLog.append("接口异常调用失败"); append(context,"接口异常调用失败");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
} }
}catch (Exception e){ }catch (Exception e){
pipTaskLog.append("==================SCA-CODING节点执行失败================="); append(context,"==================SCA-CODING节点执行失败=================");
pipTaskLog.append("SCA-CODING节点执行失败失败请检查当前节点配置!"); append(context,"SCA-CODING节点执行失败失败请检查当前节点配置!");
pipTaskLog.append(e.getMessage()); append(context,e.getMessage());
log.error("==================SCA-CODING节点执行失败=================",e); log.error("==================SCA-CODING节点执行失败=================",e);
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败");
} }

View File

@ -60,14 +60,14 @@ public class DIYImageExecuteCommandWorker extends SshWorker {
//获取机器 //获取机器
MachineInfo machineInfoDO = this.getMachineInfoService().getById(machineId); MachineInfo machineInfoDO = this.getMachineInfoService().getById(machineId);
statusCode = shell(machineInfoDO,"Hnidc@0626cn!@#zyx",taskLog, statusCode = shell(machineInfoDO,"Hnidc@0626cn!@#zyx",context,
"echo \"自定义镜像执行命令\"", "echo \"自定义镜像执行命令\"",
commandScript commandScript
); );
} catch (Exception e) { } catch (Exception e) {
String errorMessage = "该节点配置信息为空,请先配置该节点信息" + "\r\n"; String errorMessage = "该节点配置信息为空,请先配置该节点信息" + "\r\n";
log.error("执行ssh失败:",e); log.error("执行ssh失败:",e);
taskLog.append(errorMessage); append(context,errorMessage);
toBadEnding(); toBadEnding();
} }
if (statusCode == 0) { if (statusCode == 0) {

View File

@ -3,6 +3,7 @@ package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin; import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.context.ConstantContextHolder; import cd.casic.ci.process.engine.context.ConstantContextHolder;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog; import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
@ -55,18 +56,18 @@ public class ScaBinaryWorker extends HttpWorker{
public void execute(TaskRunContext context) { public void execute(TaskRunContext context) {
int statusCode = 0; int statusCode = 0;
Map<String, Object> localVariables = context.getLocalVariables(); Map<String, Object> localVariables = context.getLocalVariables();
PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); // PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY);
PipBaseElement contextDef = context.getContextDef(); PipBaseElement contextDef = context.getContextDef();
pipTaskLog.append("SCA-BINARY节点开始执行"); append(context,"SCA-BINARY节点开始执行");
pipTaskLog.append("SCA-BINARY节点配置" + contextDef); append(context,"SCA-BINARY节点配置" + contextDef);
if (ObjectUtil.isEmpty(contextDef)) { if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[ScaBinary]配置"); log.error("未查询到节点[ScaBinary]配置");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
} }
if (ObjectUtil.isEmpty(contextDef)) { if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[ScaBinary]配置"); log.error("未查询到节点[ScaBinary]配置");
pipTaskLog.append("未查询到节点[ScaBinary]配置"); append(context,"未查询到节点[ScaBinary]配置");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
} }
@ -90,24 +91,24 @@ public class ScaBinaryWorker extends HttpWorker{
if (!file.exists() || !file.canRead()) { if (!file.exists() || !file.canRead()) {
log.error("目标文件不存在或不可读"); log.error("目标文件不存在或不可读");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
pipTaskLog.append("目标文件不存在或不可读"); append(context,"目标文件不存在或不可读");
} }
handleUpload(pipTask.getTaskProperties(), file,pipTaskLog); handleUpload(pipTask.getTaskProperties(), file,context);
}catch (Exception e){ }catch (Exception e){
pipTaskLog.append("==================SCA-BINARY节点执行失败================="); append(context,"==================SCA-BINARY节点执行失败=================");
pipTaskLog.append("SCA-BINARY节点执行失败失败请检查当前节点配置!"); append(context,"SCA-BINARY节点执行失败失败请检查当前节点配置!");
pipTaskLog.append(e.getMessage()); append(context,e.getMessage());
log.error("==================SCA-BINARY节点执行失败=================",e); log.error("==================SCA-BINARY节点执行失败=================",e);
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败");
} }
} }
localVariables.put("statusCode", statusCode + ""); localVariables.put("statusCode", statusCode + "");
localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); // localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog);
} }
private void handleUpload(Map<String,Object> scaBinaryConfigInfo, File file, PipTaskLog pipTaskLog) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { private void handleUpload(Map<String,Object> scaBinaryConfigInfo, File file, BaseRunContext context) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
RestTemplate restTemplate = getRestTemplateWithoutSANCheck(); RestTemplate restTemplate = getRestTemplateWithoutSANCheck();
String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/binary/detect-file"; String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/binary/detect-file";
MultiValueMap<String, Object> body = buildRequestBody(scaBinaryConfigInfo, file); MultiValueMap<String, Object> body = buildRequestBody(scaBinaryConfigInfo, file);
@ -121,7 +122,7 @@ public class ScaBinaryWorker extends HttpWorker{
String message = response.getString("message"); String message = response.getString("message");
if (message.equals("success")) { if (message.equals("success")) {
pipTaskLog.append("===============SCA上传成功================="); append(context,"===============SCA上传成功=================");
JSONObject data = response.getJSONObject("data"); JSONObject data = response.getJSONObject("data");
Integer scaTaskId = data.getInteger("scaTaskId"); Integer scaTaskId = data.getInteger("scaTaskId");
@ -135,7 +136,7 @@ public class ScaBinaryWorker extends HttpWorker{
} }
pollTaskStatus(restTemplate, oldScaTaskId); pollTaskStatus(restTemplate, oldScaTaskId);
} else { } else {
pipTaskLog.append("==================SCA接口异常调用失败================="); append(context,"==================SCA接口异常调用失败=================");
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败");
} }
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();

View File

@ -3,6 +3,7 @@ package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin; import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.context.ConstantContextHolder; import cd.casic.ci.process.engine.context.ConstantContextHolder;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog; import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
@ -56,18 +57,18 @@ public class ScaMirrorWorker extends HttpWorker{
public void execute(TaskRunContext context) { public void execute(TaskRunContext context) {
int statusCode = 0; int statusCode = 0;
Map<String, Object> localVariables = context.getLocalVariables(); Map<String, Object> localVariables = context.getLocalVariables();
PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); // PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY);
PipBaseElement contextDef = context.getContextDef(); PipBaseElement contextDef = context.getContextDef();
pipTaskLog.append("SCA-MIRROR节点开始执行"); append(context,"SCA-MIRROR节点开始执行");
pipTaskLog.append("SCA-MIRROR节点配置" + contextDef); append(context,"SCA-MIRROR节点配置" + contextDef);
if (ObjectUtil.isEmpty(contextDef)) { if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[ScaMirror]配置"); log.error("未查询到节点[ScaMirror]配置");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
} }
if (ObjectUtil.isEmpty(contextDef)) { if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[ScaMirror]配置"); log.error("未查询到节点[ScaMirror]配置");
pipTaskLog.append("未查询到节点[ScaMirror]配置"); append(context,"未查询到节点[ScaMirror]配置");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
} }
@ -91,24 +92,24 @@ public class ScaMirrorWorker extends HttpWorker{
if (!file.exists() || !file.canRead()) { if (!file.exists() || !file.canRead()) {
log.error("目标文件不存在或不可读"); log.error("目标文件不存在或不可读");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
pipTaskLog.append("目标文件不存在或不可读"); append(context,"目标文件不存在或不可读");
} }
handleUpload(pipTask.getTaskProperties(), file,pipTaskLog); handleUpload(pipTask.getTaskProperties(), file,context);
}catch (Exception e){ }catch (Exception e){
pipTaskLog.append("==================SCA-MIRROR节点执行失败================="); append(context,"==================SCA-MIRROR节点执行失败=================");
pipTaskLog.append("SCA-MIRROR节点执行失败失败请检查当前节点配置!"); append(context,"SCA-MIRROR节点执行失败失败请检查当前节点配置!");
pipTaskLog.append(e.getMessage()); append(context,e.getMessage());
log.error("==================SCA-MIRROR节点执行失败=================",e); log.error("==================SCA-MIRROR节点执行失败=================",e);
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败");
} }
} }
localVariables.put("statusCode", statusCode + ""); localVariables.put("statusCode", statusCode + "");
localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); // localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog);
} }
private void handleUpload(Map<String,Object> scaMirrorConfigInfo, File file, PipTaskLog pipTaskLog) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { private void handleUpload(Map<String,Object> scaMirrorConfigInfo, File file, BaseRunContext context) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
RestTemplate restTemplate = getRestTemplateWithoutSANCheck(); RestTemplate restTemplate = getRestTemplateWithoutSANCheck();
String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/image/detect-file"; String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/image/detect-file";
MultiValueMap<String, Object> body = buildRequestBody(scaMirrorConfigInfo, file); MultiValueMap<String, Object> body = buildRequestBody(scaMirrorConfigInfo, file);
@ -122,7 +123,7 @@ public class ScaMirrorWorker extends HttpWorker{
String message = response.getString("message"); String message = response.getString("message");
if (message.equals("success")) { if (message.equals("success")) {
pipTaskLog.append("===============SCA上传成功================="); append(context,"===============SCA上传成功=================");
JSONObject data = response.getJSONObject("data"); JSONObject data = response.getJSONObject("data");
Integer scaTaskId = data.getInteger("scaTaskId"); Integer scaTaskId = data.getInteger("scaTaskId");
@ -136,7 +137,7 @@ public class ScaMirrorWorker extends HttpWorker{
} }
pollTaskStatus(restTemplate, oldScaTaskId); pollTaskStatus(restTemplate, oldScaTaskId);
} else { } else {
pipTaskLog.append("==================SCA接口异常调用失败================="); append(context,"==================SCA接口异常调用失败=================");
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败");
} }
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();

View File

@ -3,6 +3,7 @@ package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin; import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.context.ConstantContextHolder; import cd.casic.ci.process.engine.context.ConstantContextHolder;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog; import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
@ -58,15 +59,15 @@ public class ScaSbomWorker extends HttpWorker{
PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY);
PipBaseElement contextDef = context.getContextDef(); PipBaseElement contextDef = context.getContextDef();
pipTaskLog.append("SCA-SBOM节点开始执行"); append(context,"SCA-SBOM节点开始执行");
pipTaskLog.append("SCA-SBOM节点配置" + contextDef); append(context,"SCA-SBOM节点配置" + contextDef);
if (ObjectUtil.isEmpty(contextDef)) { if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[ScaSbom]配置"); log.error("未查询到节点[ScaSbom]配置");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
} }
if (ObjectUtil.isEmpty(contextDef)) { if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[ScaSbom]配置"); log.error("未查询到节点[ScaSbom]配置");
pipTaskLog.append("未查询到节点[ScaSbom]配置"); append(context,"未查询到节点[ScaSbom]配置");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
} }
@ -90,14 +91,14 @@ public class ScaSbomWorker extends HttpWorker{
if (!file.exists() || !file.canRead()) { if (!file.exists() || !file.canRead()) {
log.error("目标文件不存在或不可读"); log.error("目标文件不存在或不可读");
localVariables.put("statusCode", "-1"); localVariables.put("statusCode", "-1");
pipTaskLog.append("目标文件不存在或不可读"); append(context,"目标文件不存在或不可读");
} }
handleUpload(pipTask.getTaskProperties(), file,pipTaskLog); handleUpload(pipTask.getTaskProperties(), file,context);
}catch (Exception e){ }catch (Exception e){
pipTaskLog.append("==================SCA-SBOM节点执行失败================="); append(context,"==================SCA-SBOM节点执行失败=================");
pipTaskLog.append("SCA-SBOM节点执行失败失败请检查当前节点配置!"); append(context,"SCA-SBOM节点执行失败失败请检查当前节点配置!");
pipTaskLog.append(e.getMessage()); append(context,e.getMessage());
log.error("==================SCA-SBOM节点执行失败=================",e); log.error("==================SCA-SBOM节点执行失败=================",e);
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败");
} }
@ -107,7 +108,7 @@ public class ScaSbomWorker extends HttpWorker{
localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog);
} }
private void handleUpload(Map<String,Object> scaSbomConfigInfo, File file, PipTaskLog pipTaskLog) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { private void handleUpload(Map<String,Object> scaSbomConfigInfo, File file, BaseRunContext context) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
RestTemplate restTemplate = getRestTemplateWithoutSANCheck(); RestTemplate restTemplate = getRestTemplateWithoutSANCheck();
String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/sbom/detect-file"; String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/sbom/detect-file";
MultiValueMap<String, Object> body = buildRequestBody(scaSbomConfigInfo, file); MultiValueMap<String, Object> body = buildRequestBody(scaSbomConfigInfo, file);
@ -121,7 +122,7 @@ public class ScaSbomWorker extends HttpWorker{
String message = response.getString("message"); String message = response.getString("message");
if (message.equals("success")) { if (message.equals("success")) {
pipTaskLog.append("===============SCA上传成功================="); append(context,"===============SCA上传成功=================");
JSONObject data = response.getJSONObject("data"); JSONObject data = response.getJSONObject("data");
Integer scaTaskId = data.getInteger("scaTaskId"); Integer scaTaskId = data.getInteger("scaTaskId");
@ -135,7 +136,7 @@ public class ScaSbomWorker extends HttpWorker{
} }
pollTaskStatus(restTemplate, oldScaTaskId); pollTaskStatus(restTemplate, oldScaTaskId);
} else { } else {
pipTaskLog.append("==================SCA接口异常调用失败================="); append(context,"==================SCA接口异常调用失败=================");
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败");
} }
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();

View File

@ -1,6 +1,7 @@
package cd.casic.ci.process.engine.worker; package cd.casic.ci.process.engine.worker;
import cd.casic.ci.process.constant.CommandConstant; import cd.casic.ci.process.constant.CommandConstant;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.enums.MachineSystemEnum; import cd.casic.ci.process.enums.MachineSystemEnum;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog; import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.machine.MachineInfo; import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
@ -22,7 +23,7 @@ public abstract class SshWorker extends BaseWorker{
* @param commands 命令 * @param commands 命令
* @return 0 成功其他值 失败 * @return 0 成功其他值 失败
*/ */
public int shell(MachineInfo machineInfo,String sudoPassword, PipTaskLog taskLog, String... commands) { public int shell(MachineInfo machineInfo, String sudoPassword, BaseRunContext context, String... commands) {
List<String> commandList = Arrays.asList(commands); List<String> commandList = Arrays.asList(commands);
if(MachineSystemEnum.WINDOWS.getSystem().equals(machineInfo.getOsSystem())){ if(MachineSystemEnum.WINDOWS.getSystem().equals(machineInfo.getOsSystem())){
return powerShell(machineInfo, commandList); return powerShell(machineInfo, commandList);
@ -38,14 +39,14 @@ public abstract class SshWorker extends BaseWorker{
//执行命令并且把命令的执行回传到前端 //执行命令并且把命令的执行回传到前端
// TODO 记录日志 // TODO 记录日志
// loggerService.sendMessage(key, var); // loggerService.sendMessage(key, var);
statusCode = ssh.execNew(commandList,sudoPassword, taskLog::append); statusCode = ssh.execNew(commandList,sudoPassword, (content)->append(context,content));
log.info("exit-status: " + statusCode); log.info("exit-status: " + statusCode);
//主动释放当前socket连接 //主动释放当前socket连接
// loggerService.close(key); // loggerService.close(key);
} catch (Exception e) { } catch (Exception e) {
String errorMessage = "与机器建立SSH连接出错" + CommandConstant.ENTER; String errorMessage = "与机器建立SSH连接出错" + CommandConstant.ENTER;
// errorHandle(e, errorMessage); // errorHandle(e, errorMessage);
taskLog.append(errorMessage); append(context,errorMessage);
} finally { } finally {
if(ssh!=null) { if(ssh!=null) {
ssh.disconnect(); ssh.disconnect();

View File

@ -32,12 +32,12 @@ public class PipTaskTemplate extends BaseDO {
/** /**
* name * name
*/ */
private String name; private String taskType;
/** /**
* 标签 * 标签
*/ */
private String label; private String taskName;
/** /**
* 描述 * 描述

View File

@ -532,9 +532,13 @@ public class PipelineServiceImpl extends ServiceImpl<PipelineDao, PipPipeline> i
TreeRunContextResp pipeline = new TreeRunContextResp(); TreeRunContextResp pipeline = new TreeRunContextResp();
pipeline.setId(pipelineRunContext.getContextDef().getId()); pipeline.setId(pipelineRunContext.getContextDef().getId());
pipeline.setState(pipelineRunContext.getState().get()); pipeline.setState(pipelineRunContext.getState().get());
pipeline.setStartTime(pipelineRunContext.getStartTime());
pipeline.setEndTime(pipelineRunContext.getEndTime());
Map<String,TreeRunContextResp> secondStageStateMap = new HashMap<>(pipelineRunContext.getChildContext().size()); Map<String,TreeRunContextResp> secondStageStateMap = new HashMap<>(pipelineRunContext.getChildContext().size());
Map<String, BaseRunContext> childContext = pipelineRunContext.getChildContext(); Map<String, BaseRunContext> childContext = pipelineRunContext.getChildContext();
pipeline.setChild(secondStageStateMap); pipeline.setChild(secondStageStateMap);
LinkedList<TreeRunContextResp> taskList = new LinkedList<>();
pipeline.setTaskList(taskList);
for (Map.Entry<String, BaseRunContext> secondEntry : childContext.entrySet()) { for (Map.Entry<String, BaseRunContext> secondEntry : childContext.entrySet()) {
BaseRunContext value = secondEntry.getValue(); BaseRunContext value = secondEntry.getValue();
String key = secondEntry.getKey(); String key = secondEntry.getKey();
@ -551,7 +555,10 @@ public class PipelineServiceImpl extends ServiceImpl<PipelineDao, PipPipeline> i
TreeRunContextResp taskState = new TreeRunContextResp(); TreeRunContextResp taskState = new TreeRunContextResp();
taskState.setId(taskId); taskState.setId(taskId);
taskState.setState(taskContext.getState().get()); taskState.setState(taskContext.getState().get());
taskState.setStartTime(taskContext.getStartTime());
taskState.setEndTime(taskContext.getEndTime());
taskStateMap.put(taskId,taskState); taskStateMap.put(taskId,taskState);
taskList.add(taskState);
} }
} }
return pipeline; return pipeline;

View File

@ -5,10 +5,17 @@ import cd.casic.ci.common.pipeline.resp.task.TasksResp;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog; import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.service.task.TaskService; import cd.casic.ci.process.process.service.task.TaskService;
import cd.casic.framework.commons.pojo.CommonResult; import cd.casic.framework.commons.pojo.CommonResult;
import cd.casic.framework.commons.util.network.IpUtil;
import cd.casic.framework.commons.util.redis.core.utils.IPUtils;
import cd.casic.framework.commons.util.util.WebFrameworkUtils;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
@RestController @RestController
@RequestMapping("/task") @RequestMapping("/task")
@ -43,4 +50,5 @@ public class TasksController {
public CommonResult<PipTaskLog> getLogContentByTaskId(@PathVariable String taskId){ public CommonResult<PipTaskLog> getLogContentByTaskId(@PathVariable String taskId){
return taskService.getLogContentByTaskId(taskId); return taskService.getLogContentByTaskId(taskId);
} }
} }

View File

@ -121,6 +121,7 @@ ops:
security: security:
permit-all_urls: permit-all_urls:
- /admin-api/mp/open/** # 微信公众号开放平台,微信回调接口,不需要登录 - /admin-api/mp/open/** # 微信公众号开放平台,微信回调接口,不需要登录
- /sse/**
websocket: websocket:
enable: true # websocket的开关 enable: true # websocket的开关
path: /infra/ws # 路径 path: /infra/ws # 路径