From 6e9d521dc5f5f48e19b0b7bbaddc1b567002f533 Mon Sep 17 00:00:00 2001 From: HopeLi <1278288511@qq.com> Date: Fri, 23 May 2025 10:00:50 +0800 Subject: [PATCH] 0523 ljc --- .../engine/worker/ApplicationWorker.java | 21 +- .../process/engine/worker/CodingWorker.java | 227 ++++++++++++++++ .../engine/worker/ScaBinaryWorker.java | 243 +++++++++++++++++ .../engine/worker/ScaMirrorWorker.java | 251 ++++++++++++++++++ .../process/engine/worker/ScaSbomWorker.java | 4 +- 5 files changed, 738 insertions(+), 8 deletions(-) create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/CodingWorker.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaBinaryWorker.java create mode 100644 modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaMirrorWorker.java diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ApplicationWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ApplicationWorker.java index db52acb6..79b81ecf 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ApplicationWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ApplicationWorker.java @@ -55,16 +55,18 @@ public class ApplicationWorker extends HttpWorker{ public void execute(TaskRunContext context) { int statusCode = 0; Map localVariables = context.getLocalVariables(); + PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); PipBaseElement contextDef = context.getContextDef(); - PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); - pipTaskLog.setContent("SCA-SBOM节点开始执行"); + pipTaskLog.append("SCA-应用包审查分析节点开始执行"); + pipTaskLog.append("SCA-应用包审查分析节点配置:" + contextDef); if (ObjectUtil.isEmpty(contextDef)) { log.error("未查询到节点[application]配置"); localVariables.put("statusCode", "-1"); } if (ObjectUtil.isEmpty(contextDef)) { log.error("未查询到节点[application]配置"); + pipTaskLog.append("未查询到节点[application]配置"); localVariables.put("statusCode", "-1"); } @@ -86,19 +88,24 @@ public class ApplicationWorker extends HttpWorker{ File file = new File(filePath); if (!file.exists() || !file.canRead()) { log.error("目标文件不存在或不可读"); -// nodeLogger.appendErrorNow("目标文件不存在或不可读"); localVariables.put("statusCode", "-1"); + pipTaskLog.append("目标文件不存在或不可读"); } - handleUpload(pipTask.getTaskProperties(), file); + handleUpload(pipTask.getTaskProperties(), file,pipTaskLog); }catch (Exception e){ - throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); + pipTaskLog.append("==================SCA-应用包审查分析节点执行失败================="); + pipTaskLog.append("SCA-应用包审查分析节点执行失败失败,请检查当前节点配置!"); + pipTaskLog.append(e.getMessage()); + log.error("==================SCA-应用包审查分析节点执行失败=================",e); + throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-应用包审查分析节点执行失败"); } } localVariables.put("statusCode", statusCode + ""); + localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); } - private void handleUpload(Map applicationConfigInfo, File file) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + private void handleUpload(Map applicationConfigInfo, File file, PipTaskLog pipTaskLog) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { RestTemplate restTemplate = getRestTemplateWithoutSANCheck(); String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/app-package/detect-file"; MultiValueMap body = buildRequestBody(applicationConfigInfo, file); @@ -112,6 +119,7 @@ public class ApplicationWorker extends HttpWorker{ String message = response.getString("message"); if (message.equals("success")) { + pipTaskLog.append("===============SCA上传成功================="); JSONObject data = response.getJSONObject("data"); Integer scaTaskId = data.getInteger("scaTaskId"); @@ -125,6 +133,7 @@ public class ApplicationWorker extends HttpWorker{ } pollTaskStatus(restTemplate, oldScaTaskId); } else { + pipTaskLog.append("==================SCA接口异常,调用失败================="); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-应用包审查分析节点执行失败"); } Thread.currentThread().interrupt(); diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/CodingWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/CodingWorker.java new file mode 100644 index 00000000..0dcc13fd --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/CodingWorker.java @@ -0,0 +1,227 @@ +package cd.casic.ci.process.engine.worker; + +import cd.casic.ci.common.pipeline.annotation.Plugin; +import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; +import cd.casic.ci.process.engine.context.ConstantContextHolder; +import cd.casic.ci.process.engine.runContext.TaskRunContext; +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.dataObject.task.PipTask; +import cd.casic.ci.process.process.service.pipeline.PipelineService; +import cd.casic.ci.process.process.service.target.impl.TargetVersionServiceImpl; +import cd.casic.framework.commons.exception.ServiceException; +import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; +import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.*; +import org.springframework.web.client.RestTemplate; + +import java.util.*; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName ScaSbomWorker + * @Date: 2025/5/21 9:30 + * @Description: + */ +@Slf4j +@Plugin(taskType = "coding") +public class CodingWorker extends HttpWorker{ + + private static final int POLLING_INTERVAL = 5000; // 轮询间隔,单位:毫秒 + private static final int MAX_POLLING_TIMES = 100; // 最大退出次数 + + public static final Integer REPO_TYPE_GIT=8; + + @Resource + private PipelineService pipelineService; + + @Resource + private TargetVersionServiceImpl targetVersionService; + + @Override + public void execute(TaskRunContext context) { + int statusCode = 0; + Map localVariables = context.getLocalVariables(); + PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); + + PipBaseElement contextDef = context.getContextDef(); + pipTaskLog.append("SCA-代码仓库管理节点开始执行"); + pipTaskLog.append("SCA-代码仓库管理节点配置:" + contextDef); + if (ObjectUtil.isEmpty(contextDef)) { + log.error("未查询到节点[coding]配置"); + localVariables.put("statusCode", "-1"); + } + if (ObjectUtil.isEmpty(contextDef)) { + log.error("未查询到节点[coding]配置"); + pipTaskLog.append("未查询到节点[coding]配置"); + localVariables.put("statusCode", "-1"); + } + + String filePath = ""; + if (contextDef instanceof PipTask pipTask){ + Map codingConfigInfo = pipTask.getTaskProperties(); + String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/vcs/detect"; + + try { + + RestTemplate restTemplate = getRestTemplateWithoutSANCheck(); + // 1. 构建表单数据(文件 + 文本参数) + HashMap body = new HashMap<>(); + body.put("projectName", codingConfigInfo.get("projectName")); // 项目名称(必填) + body.put("applicationName", codingConfigInfo.get("applicationName")); // 应用名称(必填) + body.put("applicationVersion", codingConfigInfo.get("applicationVersion")); // 应用版本(必填) + body.put("applicationDescription", codingConfigInfo.get("applicationDescription")); // 应用描述(可选) + Map vcsRepo = new HashMap<>(); + body.put("vcsRepo",vcsRepo); + Integer type = (Integer) codingConfigInfo.get("type"); + vcsRepo.put("type", type); + if (REPO_TYPE_GIT.equals(type)) { + HashMap gitVCSAssetRepo = new HashMap<>(); + vcsRepo.put("gitVCSAssetRepo",gitVCSAssetRepo); + gitVCSAssetRepo.put("url", (String) codingConfigInfo.get("url")); + gitVCSAssetRepo.put("username", (String) codingConfigInfo.get("username")); + gitVCSAssetRepo.put("password", (String) codingConfigInfo.get("password")); + gitVCSAssetRepo.put("branch", (String) codingConfigInfo.get("branch")); + } + + // 2. 设置请求头 + HttpHeaders headers = createHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + String bodyStr = JSON.toJSONString(body); + // 3. 封装请求实体 + HttpEntity requestEntity = new HttpEntity<>(bodyStr, headers); + // 4. 发送请求并获取响应 + log.info("SCA上传接口:" + scaUploadUrl); + JSONObject response = restTemplate.postForObject(scaUploadUrl, requestEntity, JSONObject.class); + + if (response.getString("message").equals("success")) { + pipTaskLog.append("===============SCA上传成功================="); + JSONObject data = response.getJSONObject("data"); + Integer scaTaskId = data.getInteger("scaTaskId"); + //轮询请求状态 + pollTaskStatus(restTemplate, scaTaskId); + } else if (response.getString("message").equals("应用已经存在")){ + Integer oldScaTaskId = (Integer) codingConfigInfo.get("taskId"); + Integer oldApplicationId = (Integer) codingConfigInfo.get("applicationId"); + int restartResult = reStartTask(restTemplate, oldApplicationId); + if (restartResult != 0) { + localVariables.put("statusCode", "-1"); + } + //轮询请求状态 + pollTaskStatus(restTemplate, oldScaTaskId); + } else { + //基于第三方文档表明500状态基本属于无镜像配置,对比失败,于是打印日志为无法对比镜像 + pipTaskLog.append("==================SCA校验失败================="); + pipTaskLog.append("接口异常调用失败"); + localVariables.put("statusCode", "-1"); + } + }catch (Exception e){ + pipTaskLog.append("==================SCA-CODING节点执行失败================="); + pipTaskLog.append("SCA-CODING节点执行失败失败,请检查当前节点配置!"); + pipTaskLog.append(e.getMessage()); + log.error("==================SCA-CODING节点执行失败=================",e); + throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); + } + } + + localVariables.put("statusCode", statusCode + ""); + localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); + } + + + /** + * 创建请求头 + * + * @return HttpHeaders + */ + private HttpHeaders createHeaders() { + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + return headers; + } + + /** + * 轮询请求任务状态 + * + * @param restTemplate + * @param scaTaskId + */ + public void pollTaskStatus(RestTemplate restTemplate, Integer scaTaskId) { + int currentPollingTimes = 0; + while (currentPollingTimes < MAX_POLLING_TIMES) { + try { + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + HttpEntity requestEntity = new HttpEntity<>(null, headers); + String scaStatusUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/task/" + scaTaskId; + ResponseEntity response = restTemplate.exchange(scaStatusUrl, HttpMethod.GET, requestEntity, JSONObject.class); + + if (Objects.requireNonNull(response.getBody()).getString("message").equals("success")) { + //"status": 5, //状态 0-未审计 1-未检测 2-排队中 3-检测中 4-检测暂停 5-检测完成 6-检测超时 7-手动停止 8-检测异常 9-已删除 10-拉取中 11-停止中 12-下载中 + int status = response.getBody().getJSONObject("data").getInteger("status"); + log.info("当前任务状态: " + status); + if (status == 5) { + System.out.println("任务已完成,停止轮询。"); + log.info("任务已完成,停止轮询。"); + break; + } + } else { + log.error("获取任务状态失败: " + response.getBody().getString("message")); + break; + } + } catch (Exception e) { + log.error("获取任务状态时发生错误: " + e.getMessage()); + } + try { + // 轮询间隔 5 秒 + Thread.sleep(POLLING_INTERVAL); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("轮询被中断: " + e.getMessage()); + } + currentPollingTimes++; + } + System.out.println("停止轮询"); + } + + /** + * 重新检测接口 + * + * @param restTemplate + * @param applicationId + */ + public int reStartTask(RestTemplate restTemplate, Integer applicationId) { + try { + String url = ConstantContextHolder.getScaIp() + "/openapi/v1/task/batch/detect"; + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + Map> param = new HashMap<>(); + param.put("applicationIds", Arrays.asList(applicationId)); + String s = JSON.toJSONString(param); + HttpEntity formEntry = new HttpEntity<>(s, headers); + + JSONObject res = restTemplate.postForObject(url, formEntry, JSONObject.class); + if (res.getString("message").equals("success")) { + log.info("重新检测成功"); + return 0; + } else { + log.error("重新检测失败"); + log.error(res.getString("message")); + return -1; + } + } catch (Exception e) { + log.error("重新检测失败"); + log.error(e.getMessage()); + } + return 0; + } + +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaBinaryWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaBinaryWorker.java new file mode 100644 index 00000000..38154495 --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaBinaryWorker.java @@ -0,0 +1,243 @@ +package cd.casic.ci.process.engine.worker; + +import cd.casic.ci.common.pipeline.annotation.Plugin; +import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; +import cd.casic.ci.process.engine.context.ConstantContextHolder; +import cd.casic.ci.process.engine.runContext.TaskRunContext; +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; +import cd.casic.ci.process.process.dataObject.target.TargetVersion; +import cd.casic.ci.process.process.dataObject.task.PipTask; +import cd.casic.ci.process.process.service.pipeline.PipelineService; +import cd.casic.ci.process.process.service.target.impl.TargetVersionServiceImpl; +import cd.casic.framework.commons.exception.ServiceException; +import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; +import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.core.io.FileSystemResource; +import org.springframework.http.*; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.client.RestTemplate; + +import java.io.File; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.*; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName ScaSbomWorker + * @Date: 2025/5/21 9:30 + * @Description: + */ +@Slf4j +@Plugin(taskType = "binary") +public class ScaBinaryWorker extends HttpWorker{ + + private static final int POLLING_INTERVAL = 5000; // 轮询间隔,单位:毫秒 + private static final int MAX_POLLING_TIMES = 100; // 最大退出次数 + + @Resource + private PipelineService pipelineService; + + @Resource + private TargetVersionServiceImpl targetVersionService; + + @Override + public void execute(TaskRunContext context) { + int statusCode = 0; + Map localVariables = context.getLocalVariables(); + PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); + + PipBaseElement contextDef = context.getContextDef(); + pipTaskLog.append("SCA-BINARY节点开始执行"); + pipTaskLog.append("SCA-BINARY节点配置:" + contextDef); + if (ObjectUtil.isEmpty(contextDef)) { + log.error("未查询到节点[ScaBinary]配置"); + localVariables.put("statusCode", "-1"); + } + if (ObjectUtil.isEmpty(contextDef)) { + log.error("未查询到节点[ScaBinary]配置"); + pipTaskLog.append("未查询到节点[ScaBinary]配置"); + localVariables.put("statusCode", "-1"); + } + + String filePath = ""; + if (contextDef instanceof PipTask pipTask){ + // 查询并下载目标文件 + String pipelineId = pipTask.getPipelineId(); + //根据流水线id查询流水线信息 + PipPipeline pipeline = pipelineService.getById(pipelineId); + //根据目标id查询目标信息 + if (StringUtils.isEmpty(pipeline.getTargetVersionId())){ + throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"目标文件不存在"); + } + TargetVersion targetVersion = targetVersionService.getById(pipeline.getTargetVersionId()); + filePath = targetVersion.getFilePath(); + + try { + + + File file = new File(filePath); + if (!file.exists() || !file.canRead()) { + log.error("目标文件不存在或不可读"); + localVariables.put("statusCode", "-1"); + pipTaskLog.append("目标文件不存在或不可读"); + } + + handleUpload(pipTask.getTaskProperties(), file,pipTaskLog); + }catch (Exception e){ + pipTaskLog.append("==================SCA-BINARY节点执行失败================="); + pipTaskLog.append("SCA-BINARY节点执行失败失败,请检查当前节点配置!"); + pipTaskLog.append(e.getMessage()); + log.error("==================SCA-BINARY节点执行失败=================",e); + throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); + } + } + + localVariables.put("statusCode", statusCode + ""); + localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); + } + + private void handleUpload(Map scaBinaryConfigInfo, File file, PipTaskLog pipTaskLog) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + RestTemplate restTemplate = getRestTemplateWithoutSANCheck(); + String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/binary/detect-file"; + MultiValueMap body = buildRequestBody(scaBinaryConfigInfo, file); + HttpHeaders headers = createHeaders(); + headers.setContentType(MediaType.MULTIPART_FORM_DATA); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + HttpEntity> requestEntity = new HttpEntity<>(body, headers); + + log.info("SCA上传接口:" + scaUploadUrl); + JSONObject response = restTemplate.postForObject(scaUploadUrl, requestEntity, JSONObject.class); + String message = response.getString("message"); + + if (message.equals("success")) { + pipTaskLog.append("===============SCA上传成功================="); + JSONObject data = response.getJSONObject("data"); + Integer scaTaskId = data.getInteger("scaTaskId"); + + pollTaskStatus(restTemplate, scaTaskId); + } else if (message.equals("应用已经存在")) { + Integer oldScaTaskId = (Integer) scaBinaryConfigInfo.get("taskId"); + Integer oldApplicationId = (Integer) scaBinaryConfigInfo.get("applicationId"); + int restartResult = reStartTask(restTemplate, oldApplicationId); + if (restartResult != 0) { + return; + } + pollTaskStatus(restTemplate, oldScaTaskId); + } else { + pipTaskLog.append("==================SCA接口异常,调用失败================="); + throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); + } + Thread.currentThread().interrupt(); + } + + private MultiValueMap buildRequestBody(Map scaBinaryConfigInfo, File file) { + MultiValueMap body = new LinkedMultiValueMap<>(); + body.add("file", new FileSystemResource(file)); + body.add("projectName", scaBinaryConfigInfo.get("projectName")); + body.add("applicationName", scaBinaryConfigInfo.get("applicationName")); + body.add("applicationVersion", scaBinaryConfigInfo.get("applicationVersion")); + body.add("applicationDescription", scaBinaryConfigInfo.get("applicationDescription")); + return body; + } + + /** + * 创建请求头 + * + * @return HttpHeaders + */ + private HttpHeaders createHeaders() { + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + return headers; + } + + /** + * 轮询请求任务状态 + * + * @param restTemplate + * @param scaTaskId + */ + public void pollTaskStatus(RestTemplate restTemplate, Integer scaTaskId) { + int currentPollingTimes = 0; + while (currentPollingTimes < MAX_POLLING_TIMES) { + try { + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + HttpEntity requestEntity = new HttpEntity<>(null, headers); + String scaStatusUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/task/" + scaTaskId; + ResponseEntity response = restTemplate.exchange(scaStatusUrl, HttpMethod.GET, requestEntity, JSONObject.class); + + if (Objects.requireNonNull(response.getBody()).getString("message").equals("success")) { + //"status": 5, //状态 0-未审计 1-未检测 2-排队中 3-检测中 4-检测暂停 5-检测完成 6-检测超时 7-手动停止 8-检测异常 9-已删除 10-拉取中 11-停止中 12-下载中 + int status = response.getBody().getJSONObject("data").getInteger("status"); + log.info("当前任务状态: " + status); + if (status == 5) { + System.out.println("任务已完成,停止轮询。"); + log.info("任务已完成,停止轮询。"); + break; + } + } else { + log.error("获取任务状态失败: " + response.getBody().getString("message")); + break; + } + } catch (Exception e) { + log.error("获取任务状态时发生错误: " + e.getMessage()); + } + try { + // 轮询间隔 5 秒 + Thread.sleep(POLLING_INTERVAL); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("轮询被中断: " + e.getMessage()); + } + currentPollingTimes++; + } + System.out.println("停止轮询"); + } + + /** + * 重新检测接口 + * + * @param restTemplate + * @param applicationId + */ + public int reStartTask(RestTemplate restTemplate, Integer applicationId) { + try { + String url = ConstantContextHolder.getScaIp() + "/openapi/v1/task/batch/detect"; + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + Map> param = new HashMap<>(); + param.put("applicationIds", Arrays.asList(applicationId)); + String s = JSON.toJSONString(param); + HttpEntity formEntry = new HttpEntity<>(s, headers); + + JSONObject res = restTemplate.postForObject(url, formEntry, JSONObject.class); + if (res.getString("message").equals("success")) { + log.info("重新检测成功"); + return 0; + } else { + log.error("重新检测失败"); + log.error(res.getString("message")); + return -1; + } + } catch (Exception e) { + log.error("重新检测失败"); + log.error(e.getMessage()); + } + return 0; + } + +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaMirrorWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaMirrorWorker.java new file mode 100644 index 00000000..1c2926ca --- /dev/null +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaMirrorWorker.java @@ -0,0 +1,251 @@ +package cd.casic.ci.process.engine.worker; + +import cd.casic.ci.common.pipeline.annotation.Plugin; +import cd.casic.ci.process.engine.constant.EngineRuntimeConstant; +import cd.casic.ci.process.engine.context.ConstantContextHolder; +import cd.casic.ci.process.engine.runContext.TaskRunContext; +import cd.casic.ci.process.process.dataObject.base.PipBaseElement; +import cd.casic.ci.process.process.dataObject.log.PipTaskLog; +import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline; +import cd.casic.ci.process.process.dataObject.target.TargetVersion; +import cd.casic.ci.process.process.dataObject.task.PipTask; +import cd.casic.ci.process.process.service.pipeline.PipelineService; +import cd.casic.ci.process.process.service.target.impl.TargetVersionServiceImpl; +import cd.casic.framework.commons.exception.ServiceException; +import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants; +import cn.hutool.core.util.ObjectUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.core.io.FileSystemResource; +import org.springframework.http.*; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.util.ObjectUtils; +import org.springframework.web.client.RestTemplate; + +import java.io.File; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.util.*; + +/** + * @author HopeLi + * @version v1.0 + * @ClassName ScaSbomWorker + * @Date: 2025/5/21 9:30 + * @Description: + */ +@Slf4j +@Plugin(taskType = "mirror") +public class ScaMirrorWorker extends HttpWorker{ + + private static final int POLLING_INTERVAL = 5000; // 轮询间隔,单位:毫秒 + private static final int MAX_POLLING_TIMES = 100; // 最大退出次数 + + @Resource + private PipelineService pipelineService; + + @Resource + private TargetVersionServiceImpl targetVersionService; + + @Override + public void execute(TaskRunContext context) { + int statusCode = 0; + Map localVariables = context.getLocalVariables(); + PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY); + + PipBaseElement contextDef = context.getContextDef(); + pipTaskLog.append("SCA-MIRROR节点开始执行"); + pipTaskLog.append("SCA-MIRROR节点配置:" + contextDef); + if (ObjectUtil.isEmpty(contextDef)) { + log.error("未查询到节点[ScaMirror]配置"); + localVariables.put("statusCode", "-1"); + } + if (ObjectUtil.isEmpty(contextDef)) { + log.error("未查询到节点[ScaMirror]配置"); + pipTaskLog.append("未查询到节点[ScaMirror]配置"); + localVariables.put("statusCode", "-1"); + } + + String filePath = ""; + if (contextDef instanceof PipTask pipTask){ + // 查询并下载目标文件 + String pipelineId = pipTask.getPipelineId(); + //根据流水线id查询流水线信息 + PipPipeline pipeline = pipelineService.getById(pipelineId); + //根据目标id查询目标信息 + if (StringUtils.isEmpty(pipeline.getTargetVersionId())){ + throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"目标文件不存在"); + } + TargetVersion targetVersion = targetVersionService.getById(pipeline.getTargetVersionId()); + filePath = targetVersion.getFilePath(); + + try { + + + File file = new File(filePath); + if (!file.exists() || !file.canRead()) { + log.error("目标文件不存在或不可读"); + localVariables.put("statusCode", "-1"); + pipTaskLog.append("目标文件不存在或不可读"); + } + + handleUpload(pipTask.getTaskProperties(), file,pipTaskLog); + }catch (Exception e){ + pipTaskLog.append("==================SCA-MIRROR节点执行失败================="); + pipTaskLog.append("SCA-MIRROR节点执行失败失败,请检查当前节点配置!"); + pipTaskLog.append(e.getMessage()); + log.error("==================SCA-MIRROR节点执行失败=================",e); + throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); + } + } + + localVariables.put("statusCode", statusCode + ""); + localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog); + } + + private void handleUpload(Map scaMirrorConfigInfo, File file, PipTaskLog pipTaskLog) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + RestTemplate restTemplate = getRestTemplateWithoutSANCheck(); + String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/image/detect-file"; + MultiValueMap body = buildRequestBody(scaMirrorConfigInfo, file); + HttpHeaders headers = createHeaders(); + headers.setContentType(MediaType.MULTIPART_FORM_DATA); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + HttpEntity> requestEntity = new HttpEntity<>(body, headers); + + log.info("SCA上传接口:" + scaUploadUrl); + JSONObject response = restTemplate.postForObject(scaUploadUrl, requestEntity, JSONObject.class); + String message = response.getString("message"); + + if (message.equals("success")) { + pipTaskLog.append("===============SCA上传成功================="); + JSONObject data = response.getJSONObject("data"); + Integer scaTaskId = data.getInteger("scaTaskId"); + + pollTaskStatus(restTemplate, scaTaskId); + } else if (message.equals("应用已经存在")) { + Integer oldScaTaskId = (Integer) scaMirrorConfigInfo.get("taskId"); + Integer oldApplicationId = (Integer) scaMirrorConfigInfo.get("applicationId"); + int restartResult = reStartTask(restTemplate, oldApplicationId); + if (restartResult != 0) { + return; + } + pollTaskStatus(restTemplate, oldScaTaskId); + } else { + pipTaskLog.append("==================SCA接口异常,调用失败================="); + throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); + } + Thread.currentThread().interrupt(); + } + + private MultiValueMap buildRequestBody(Map scaMirrorConfigInfo, File file) { + MultiValueMap body = new LinkedMultiValueMap<>(); + body.add("file", new FileSystemResource(file)); + body.add("projectName", scaMirrorConfigInfo.get("projectName")); + body.add("applicationName", scaMirrorConfigInfo.get("applicationName")); + + // 敏感信息检测(非必填) + if (!ObjectUtils.isEmpty(scaMirrorConfigInfo.get("sensitive"))){ + body.add("sensitive", scaMirrorConfigInfo.get("sensitive")); + } + // 二进制分析(非必填) + if (!ObjectUtils.isEmpty(scaMirrorConfigInfo.get("binaryDetect"))){ + body.add("binaryDetect", scaMirrorConfigInfo.get("binaryDetect")); + } + return body; + } + + /** + * 创建请求头 + * + * @return HttpHeaders + */ + private HttpHeaders createHeaders() { + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + return headers; + } + + /** + * 轮询请求任务状态 + * + * @param restTemplate + * @param scaTaskId + */ + public void pollTaskStatus(RestTemplate restTemplate, Integer scaTaskId) { + int currentPollingTimes = 0; + while (currentPollingTimes < MAX_POLLING_TIMES) { + try { + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + HttpEntity requestEntity = new HttpEntity<>(null, headers); + String scaStatusUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/task/" + scaTaskId; + ResponseEntity response = restTemplate.exchange(scaStatusUrl, HttpMethod.GET, requestEntity, JSONObject.class); + + if (Objects.requireNonNull(response.getBody()).getString("message").equals("success")) { + //"status": 5, //状态 0-未审计 1-未检测 2-排队中 3-检测中 4-检测暂停 5-检测完成 6-检测超时 7-手动停止 8-检测异常 9-已删除 10-拉取中 11-停止中 12-下载中 + int status = response.getBody().getJSONObject("data").getInteger("status"); + log.info("当前任务状态: " + status); + if (status == 5) { + System.out.println("任务已完成,停止轮询。"); + log.info("任务已完成,停止轮询。"); + break; + } + } else { + log.error("获取任务状态失败: " + response.getBody().getString("message")); + break; + } + } catch (Exception e) { + log.error("获取任务状态时发生错误: " + e.getMessage()); + } + try { + // 轮询间隔 5 秒 + Thread.sleep(POLLING_INTERVAL); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("轮询被中断: " + e.getMessage()); + } + currentPollingTimes++; + } + System.out.println("停止轮询"); + } + + /** + * 重新检测接口 + * + * @param restTemplate + * @param applicationId + */ + public int reStartTask(RestTemplate restTemplate, Integer applicationId) { + try { + String url = ConstantContextHolder.getScaIp() + "/openapi/v1/task/batch/detect"; + HttpHeaders headers = new HttpHeaders(); + headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken()); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + Map> param = new HashMap<>(); + param.put("applicationIds", Arrays.asList(applicationId)); + String s = JSON.toJSONString(param); + HttpEntity formEntry = new HttpEntity<>(s, headers); + + JSONObject res = restTemplate.postForObject(url, formEntry, JSONObject.class); + if (res.getString("message").equals("success")) { + log.info("重新检测成功"); + return 0; + } else { + log.error("重新检测失败"); + log.error(res.getString("message")); + return -1; + } + } catch (Exception e) { + log.error("重新检测失败"); + log.error(e.getMessage()); + } + return 0; + } + +} diff --git a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java index 1c814e47..5e57d791 100644 --- a/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java +++ b/modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/worker/ScaSbomWorker.java @@ -39,7 +39,7 @@ import java.util.*; * @Description: */ @Slf4j -@Plugin(taskType = "test") +@Plugin(taskType = "sbom") public class ScaSbomWorker extends HttpWorker{ private static final int POLLING_INTERVAL = 5000; // 轮询间隔,单位:毫秒 @@ -136,9 +136,9 @@ public class ScaSbomWorker extends HttpWorker{ pollTaskStatus(restTemplate, oldScaTaskId); } else { pipTaskLog.append("==================SCA接口异常,调用失败================="); - log.error(message); throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败"); } + Thread.currentThread().interrupt(); } private MultiValueMap buildRequestBody(Map scaSbomConfigInfo, File file) {