diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ApplicationWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ApplicationWorker.java new file mode 100644 index 00000000..db52acb6 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ApplicationWorker.java @@ -0,0 +1,234 @@ +package cd.casic.ci.process.engine.worker; + +import cd.casic.ci.common.pipeline.annotation.Plugin; +import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; +import cd.casic.ci.process.engine.context.ConstantContextHolder; +import cd.casic.ci.process.engine.runContext.TaskRunContext; +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; +import cd.casic.ci.process.process.dataObject.target.TargetVersion; +import cd.casic.ci.process.process.dataObject.task.PipTask; +import cd.casic.ci.process.process.service.pipeline.PipelineService; +import cd.casic.ci.process.process.service.target.impl.TargetVersionServiceImpl; +import cd.casic.framework.commons.exception.ServiceException; +import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; +import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.core.io.FileSystemResource; +import org.springframework.http.*; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.client.RestTemplate; + +import java.io.File; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.*; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName ApplicationWorker + * @Date: 2025/5/22 10:40 + * @Description: + */ +@Slf4j +@Plugin(taskType = "application") +public class ApplicationWorker extends HttpWorker{ + private static final int POLLING_INTERVAL = 5000; // 轮询间隔,单位:毫秒 + private static final int MAX_POLLING_TIMES = 100; // 最大退出次数 + + @Resource + private PipelineService pipelineService; + + @Resource + private TargetVersionServiceImpl targetVersionService; + + + @Override + public void execute(TaskRunContext context) { + int statusCode = 0; + Map localVariables = context.getLocalVariables(); + + PipBaseElement contextDef = context.getContextDef(); + PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); + pipTaskLog.setContent("SCA-SBOM节点开始执行"); + if (ObjectUtil.isEmpty(contextDef)) { + log.error("未查询到节点[application]配置"); + localVariables.put("statusCode", "-1"); + } + if (ObjectUtil.isEmpty(contextDef)) { + log.error("未查询到节点[application]配置"); + localVariables.put("statusCode", "-1"); + } + + String filePath = ""; + if (contextDef instanceof PipTask pipTask){ + // 查询并下载目标文件 + String pipelineId = pipTask.getPipelineId(); + //根据流水线id查询流水线信息 + PipPipeline pipeline = pipelineService.getById(pipelineId); + //根据目标id查询目标信息 + if (StringUtils.isEmpty(pipeline.getTargetVersionId())){ + throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"目标文件不存在"); + } + TargetVersion targetVersion = targetVersionService.getById(pipeline.getTargetVersionId()); + filePath = targetVersion.getFilePath(); + + try { + + File file = new File(filePath); + if (!file.exists() || !file.canRead()) { + log.error("目标文件不存在或不可读"); +// nodeLogger.appendErrorNow("目标文件不存在或不可读"); + localVariables.put("statusCode", "-1"); + } + + handleUpload(pipTask.getTaskProperties(), file); + }catch (Exception e){ + throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); + } + } + localVariables.put("statusCode", statusCode + ""); + } + + private void handleUpload(Map applicationConfigInfo, File file) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + RestTemplate restTemplate = getRestTemplateWithoutSANCheck(); + String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/app-package/detect-file"; + MultiValueMap body = buildRequestBody(applicationConfigInfo, file); + HttpHeaders headers = createHeaders(); + headers.setContentType(MediaType.MULTIPART_FORM_DATA); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + HttpEntity> requestEntity = new HttpEntity<>(body, headers); + + log.info("SCA上传接口:" + scaUploadUrl); + JSONObject response = restTemplate.postForObject(scaUploadUrl, requestEntity, JSONObject.class); + String message = response.getString("message"); + + if (message.equals("success")) { + JSONObject data = response.getJSONObject("data"); + Integer scaTaskId = data.getInteger("scaTaskId"); + + pollTaskStatus(restTemplate, scaTaskId); + } else if (message.equals("应用已经存在")) { + Integer oldScaTaskId = (Integer) applicationConfigInfo.get("taskId"); + Integer oldApplicationId = (Integer) applicationConfigInfo.get("applicationId"); + int restartResult = reStartTask(restTemplate, oldApplicationId); + if (restartResult != 0) { + return; + } + pollTaskStatus(restTemplate, oldScaTaskId); + } else { + throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-应用包审查分析节点执行失败"); + } + Thread.currentThread().interrupt(); + } + + private MultiValueMap buildRequestBody(Map applicationConfigInfo, File file) { + MultiValueMap body = new LinkedMultiValueMap<>(); + body.add("file", new FileSystemResource(file)); // 文件参数(必填) + body.add("projectName", applicationConfigInfo.get("projectName")); // 项目名称(必填) + body.add("applicationName", applicationConfigInfo.get("applicationName")); // 应用名称(必填) + body.add("applicationVersion", applicationConfigInfo.get("applicationVersion")); // 应用版本(必填) + body.add("applicationDescription", applicationConfigInfo.get("applicationDescription")); // 应用描述(可选) + return body; + } + + + /** + * 创建请求头 + * @return HttpHeaders + */ + private HttpHeaders createHeaders() { + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + return headers; + } + + + /** + * 轮询请求任务状态 + * @param restTemplate RestTemplate + * @param scaTaskId 任务ID + */ + public void pollTaskStatus(RestTemplate restTemplate, Integer scaTaskId) { + int currentPollingTimes = 0; + while (currentPollingTimes < MAX_POLLING_TIMES) { + try { + // 根据任务 id 获取任务状态 + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + HttpEntity requestEntity = new HttpEntity<>(null, headers); + String scaStatusUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/task/" + scaTaskId; + ResponseEntity response = restTemplate.exchange(scaStatusUrl, HttpMethod.GET, requestEntity, JSONObject.class); + + if (Objects.requireNonNull(response.getBody()).getString("message").equals("success")) { + //"status": 5, //状态 0-未审计 1-未检测 2-排队中 3-检测中 4-检测暂停 5-检测完成 6-检测超时 7-手动停止 8-检测异常 9-已删除 10-拉取中 11-停止中 12-下载中 + int status = response.getBody().getJSONObject("data").getInteger("status"); + log.info("当前任务状态: " + status); + if (status == 5) { + System.out.println("任务已完成,停止轮询。"); + log.info("任务已完成,停止轮询。"); + break; + } + } else { + log.error("获取任务状态失败: " + response.getBody().getString("message")); + break; + } + } catch (Exception e) { + log.error("获取任务状态时发生错误: " + e.getMessage()); + } + try { + // 轮询间隔 5 秒 + Thread.sleep(POLLING_INTERVAL); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("轮询被中断: " + e.getMessage()); + } + currentPollingTimes++; + } + System.out.println("停止轮询"); + } + + + /** + * 重新检测接口 + * + * @param restTemplate + * @param applicationId + */ + public int reStartTask(RestTemplate restTemplate, Integer applicationId) { + try { + String url = ConstantContextHolder.getScaIp() + "/openapi/v1/task/batch/detect"; + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + Map> param = new HashMap<>(); + param.put("applicationIds", Arrays.asList(applicationId)); + String s = JSON.toJSONString(param); + HttpEntity formEntry = new HttpEntity<>(s, headers); + // + JSONObject res = restTemplate.postForObject(url, formEntry, JSONObject.class); + if (res.getString("message").equals("success")) { + log.info("重新检测成功"); + return 0; + } else { + log.error("重新检测失败"); + log.error(res.getString("message")); + return -1; + } + } catch (Exception e) { + log.error("重新检测失败"); + log.error(e.getMessage()); + } + return 0; + } + +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java index 19fd61ff..2886cebe 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java @@ -1,10 +1,11 @@ package cd.casic.ci.process.engine.worker; import cd.casic.ci.common.pipeline.annotation.Plugin; +import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; import cd.casic.ci.process.engine.context.ConstantContextHolder; -import cd.casic.ci.process.engine.runContext.BaseRunContext; import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; import cd.casic.ci.process.process.dataObject.target.TargetVersion; import cd.casic.ci.process.process.dataObject.task.PipTask; @@ -38,7 +39,7 @@ import java.util.*; * @Description: */ @Slf4j -@Plugin(taskType = "ScaSbom") +@Plugin(taskType = "scaSbom") public class ScaSbomWorker extends HttpWorker{ private static final int POLLING_INTERVAL = 5000; // 轮询间隔,单位:毫秒 @@ -50,19 +51,23 @@ public class ScaSbomWorker extends HttpWorker{ @Resource private TargetVersionServiceImpl targetVersionService; - - public String work(BaseRunContext workerParam) { + @Override + public void execute(TaskRunContext context) { int statusCode = 0; + Map localVariables = context.getLocalVariables(); + PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); - PipBaseElement contextDef = workerParam.getContextDef(); - log.info("================SCA-SBOM节点执行==================="); + PipBaseElement contextDef = context.getContextDef(); + pipTaskLog.append("SCA-SBOM节点开始执行"); + pipTaskLog.append("SCA-SBOM节点配置:" + contextDef); if (ObjectUtil.isEmpty(contextDef)) { - log.error("未查询到节点[{}]配置,taskType = ScaSbom"); - return "-1"; + log.error("未查询到节点[ScaSbom]配置"); + localVariables.put("statusCode", "-1"); } if (ObjectUtil.isEmpty(contextDef)) { - log.error("未查询到节点[{}]配置,taskType = ScaSbom"); - return "-1"; + log.error("未查询到节点[ScaSbom]配置"); + pipTaskLog.append("未查询到节点[ScaSbom]配置"); + localVariables.put("statusCode", "-1"); } String filePath = ""; @@ -84,22 +89,24 @@ public class ScaSbomWorker extends HttpWorker{ File file = new File(filePath); if (!file.exists() || !file.canRead()) { log.error("目标文件不存在或不可读"); -// nodeLogger.appendErrorNow("目标文件不存在或不可读"); - return "-1"; + localVariables.put("statusCode", "-1"); + pipTaskLog.append("目标文件不存在或不可读"); } - handleUpload(workerParam, contextDef, pipTask.getTaskProperties(), file); + handleUpload(pipTask.getTaskProperties(), file,pipTaskLog); }catch (Exception e){ + pipTaskLog.append("==================SCA-SBOM节点执行失败================="); + pipTaskLog.append("SCA-SBOM节点执行失败失败,请检查当前节点配置!"); + pipTaskLog.append(e.getMessage()); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); } } - - return statusCode + ""; + localVariables.put("statusCode", statusCode + ""); + localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); } - private void handleUpload(BaseRunContext workerParam, PipBaseElement pipelineNodeConfigInfo, - Map scaSbomConfigInfo, File file) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + private void handleUpload(Map scaSbomConfigInfo, File file, PipTaskLog pipTaskLog) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { RestTemplate restTemplate = getRestTemplateWithoutSANCheck(); String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/sbom/detect-file"; MultiValueMap body = buildRequestBody(scaSbomConfigInfo, file); @@ -113,6 +120,7 @@ public class ScaSbomWorker extends HttpWorker{ String message = response.getString("message"); if (message.equals("success")) { + pipTaskLog.append("===============SCA上传成功================="); JSONObject data = response.getJSONObject("data"); Integer scaTaskId = data.getInteger("scaTaskId"); @@ -126,6 +134,7 @@ public class ScaSbomWorker extends HttpWorker{ } pollTaskStatus(restTemplate, oldScaTaskId); } else { + pipTaskLog.append("==================SCA接口异常,调用失败================="); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); } } @@ -229,8 +238,4 @@ public class ScaSbomWorker extends HttpWorker{ return 0; } - @Override - public void execute(TaskRunContext context) { - - } } diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java index 3e0b227a..df9f1ba2 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/TestWorker.java @@ -5,7 +5,6 @@ import cd.casic.ci.process.engine.runContext.TaskRunContext; import cd.casic.ci.process.process.dataObject.base.PipBaseElement; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Slf4j