This commit is contained in:
HopeLi 2025-05-22 15:54:46 +08:00
parent f93004d132
commit a6929ff1c1
3 changed files with 260 additions and 22 deletions

View File

@ -0,0 +1,234 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.context.ConstantContextHolder;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.target.TargetVersion;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.ci.process.process.service.pipeline.PipelineService;
import cd.casic.ci.process.process.service.target.impl.TargetVersionServiceImpl;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.*;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import java.io.File;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.*;
/**
* @author HopeLi
* @version v1.0
* @ClassName ApplicationWorker
* @Date: 2025/5/22 10:40
* @Description:
*/
@Slf4j
@Plugin(taskType = "application")
public class ApplicationWorker extends HttpWorker{
private static final int POLLING_INTERVAL = 5000; // 轮询间隔单位毫秒
private static final int MAX_POLLING_TIMES = 100; // 最大退出次数
@Resource
private PipelineService pipelineService;
@Resource
private TargetVersionServiceImpl targetVersionService;
@Override
public void execute(TaskRunContext context) {
int statusCode = 0;
Map<String, Object> localVariables = context.getLocalVariables();
PipBaseElement contextDef = context.getContextDef();
PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY);
pipTaskLog.setContent("SCA-SBOM节点开始执行");
if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[application]配置");
localVariables.put("statusCode", "-1");
}
if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[application]配置");
localVariables.put("statusCode", "-1");
}
String filePath = "";
if (contextDef instanceof PipTask pipTask){
// 查询并下载目标文件
String pipelineId = pipTask.getPipelineId();
//根据流水线id查询流水线信息
PipPipeline pipeline = pipelineService.getById(pipelineId);
//根据目标id查询目标信息
if (StringUtils.isEmpty(pipeline.getTargetVersionId())){
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"目标文件不存在");
}
TargetVersion targetVersion = targetVersionService.getById(pipeline.getTargetVersionId());
filePath = targetVersion.getFilePath();
try {
File file = new File(filePath);
if (!file.exists() || !file.canRead()) {
log.error("目标文件不存在或不可读");
// nodeLogger.appendErrorNow("目标文件不存在或不可读");
localVariables.put("statusCode", "-1");
}
handleUpload(pipTask.getTaskProperties(), file);
}catch (Exception e){
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败");
}
}
localVariables.put("statusCode", statusCode + "");
}
private void handleUpload(Map<String,Object> applicationConfigInfo, File file) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
RestTemplate restTemplate = getRestTemplateWithoutSANCheck();
String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/app-package/detect-file";
MultiValueMap<String, Object> body = buildRequestBody(applicationConfigInfo, file);
HttpHeaders headers = createHeaders();
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken());
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body, headers);
log.info("SCA上传接口:" + scaUploadUrl);
JSONObject response = restTemplate.postForObject(scaUploadUrl, requestEntity, JSONObject.class);
String message = response.getString("message");
if (message.equals("success")) {
JSONObject data = response.getJSONObject("data");
Integer scaTaskId = data.getInteger("scaTaskId");
pollTaskStatus(restTemplate, scaTaskId);
} else if (message.equals("应用已经存在")) {
Integer oldScaTaskId = (Integer) applicationConfigInfo.get("taskId");
Integer oldApplicationId = (Integer) applicationConfigInfo.get("applicationId");
int restartResult = reStartTask(restTemplate, oldApplicationId);
if (restartResult != 0) {
return;
}
pollTaskStatus(restTemplate, oldScaTaskId);
} else {
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-应用包审查分析节点执行失败");
}
Thread.currentThread().interrupt();
}
private MultiValueMap<String, Object> buildRequestBody(Map<String, Object> applicationConfigInfo, File file) {
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
body.add("file", new FileSystemResource(file)); // 文件参数必填
body.add("projectName", applicationConfigInfo.get("projectName")); // 项目名称必填
body.add("applicationName", applicationConfigInfo.get("applicationName")); // 应用名称必填
body.add("applicationVersion", applicationConfigInfo.get("applicationVersion")); // 应用版本必填
body.add("applicationDescription", applicationConfigInfo.get("applicationDescription")); // 应用描述可选
return body;
}
/**
* 创建请求头
* @return HttpHeaders
*/
private HttpHeaders createHeaders() {
HttpHeaders headers = new HttpHeaders();
headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken());
return headers;
}
/**
* 轮询请求任务状态
* @param restTemplate RestTemplate
* @param scaTaskId 任务ID
*/
public void pollTaskStatus(RestTemplate restTemplate, Integer scaTaskId) {
int currentPollingTimes = 0;
while (currentPollingTimes < MAX_POLLING_TIMES) {
try {
// 根据任务 id 获取任务状态
HttpHeaders headers = new HttpHeaders();
headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken());
HttpEntity<String> requestEntity = new HttpEntity<>(null, headers);
String scaStatusUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/task/" + scaTaskId;
ResponseEntity<JSONObject> response = restTemplate.exchange(scaStatusUrl, HttpMethod.GET, requestEntity, JSONObject.class);
if (Objects.requireNonNull(response.getBody()).getString("message").equals("success")) {
//"status": 5, //状态 0-未审计 1-未检测 2-排队中 3-检测中 4-检测暂停 5-检测完成 6-检测超时 7-手动停止 8-检测异常 9-已删除 10-拉取中 11-停止中 12-下载中
int status = response.getBody().getJSONObject("data").getInteger("status");
log.info("当前任务状态: " + status);
if (status == 5) {
System.out.println("任务已完成,停止轮询。");
log.info("任务已完成,停止轮询。");
break;
}
} else {
log.error("获取任务状态失败: " + response.getBody().getString("message"));
break;
}
} catch (Exception e) {
log.error("获取任务状态时发生错误: " + e.getMessage());
}
try {
// 轮询间隔 5
Thread.sleep(POLLING_INTERVAL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("轮询被中断: " + e.getMessage());
}
currentPollingTimes++;
}
System.out.println("停止轮询");
}
/**
* 重新检测接口
*
* @param restTemplate
* @param applicationId
*/
public int reStartTask(RestTemplate restTemplate, Integer applicationId) {
try {
String url = ConstantContextHolder.getScaIp() + "/openapi/v1/task/batch/detect";
HttpHeaders headers = new HttpHeaders();
headers.add("OpenApiUserToken", ConstantContextHolder.getScaToken());
headers.setContentType(MediaType.APPLICATION_JSON);
headers.add("Accept", MediaType.APPLICATION_JSON.toString());
Map<String, List<Integer>> param = new HashMap<>();
param.put("applicationIds", Arrays.asList(applicationId));
String s = JSON.toJSONString(param);
HttpEntity<String> formEntry = new HttpEntity<>(s, headers);
//
JSONObject res = restTemplate.postForObject(url, formEntry, JSONObject.class);
if (res.getString("message").equals("success")) {
log.info("重新检测成功");
return 0;
} else {
log.error("重新检测失败");
log.error(res.getString("message"));
return -1;
}
} catch (Exception e) {
log.error("重新检测失败");
log.error(e.getMessage());
}
return 0;
}
}

View File

@ -1,10 +1,11 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.context.ConstantContextHolder;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.target.TargetVersion;
import cd.casic.ci.process.process.dataObject.task.PipTask;
@ -38,7 +39,7 @@ import java.util.*;
* @Description:
*/
@Slf4j
@Plugin(taskType = "ScaSbom")
@Plugin(taskType = "scaSbom")
public class ScaSbomWorker extends HttpWorker{
private static final int POLLING_INTERVAL = 5000; // 轮询间隔单位毫秒
@ -50,19 +51,23 @@ public class ScaSbomWorker extends HttpWorker{
@Resource
private TargetVersionServiceImpl targetVersionService;
public String work(BaseRunContext workerParam) {
@Override
public void execute(TaskRunContext context) {
int statusCode = 0;
Map<String, Object> localVariables = context.getLocalVariables();
PipTaskLog pipTaskLog = (PipTaskLog) localVariables.get(EngineRuntimeConstant.LOG_KEY);
PipBaseElement contextDef = workerParam.getContextDef();
log.info("================SCA-SBOM节点执行===================");
PipBaseElement contextDef = context.getContextDef();
pipTaskLog.append("SCA-SBOM节点开始执行");
pipTaskLog.append("SCA-SBOM节点配置" + contextDef);
if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[{}]配置,taskType = ScaSbom");
return "-1";
log.error("未查询到节点[ScaSbom]配置");
localVariables.put("statusCode", "-1");
}
if (ObjectUtil.isEmpty(contextDef)) {
log.error("未查询到节点[{}]配置,taskType = ScaSbom");
return "-1";
log.error("未查询到节点[ScaSbom]配置");
pipTaskLog.append("未查询到节点[ScaSbom]配置");
localVariables.put("statusCode", "-1");
}
String filePath = "";
@ -84,22 +89,24 @@ public class ScaSbomWorker extends HttpWorker{
File file = new File(filePath);
if (!file.exists() || !file.canRead()) {
log.error("目标文件不存在或不可读");
// nodeLogger.appendErrorNow("目标文件不存在或不可读");
return "-1";
localVariables.put("statusCode", "-1");
pipTaskLog.append("目标文件不存在或不可读");
}
handleUpload(workerParam, contextDef, pipTask.getTaskProperties(), file);
handleUpload(pipTask.getTaskProperties(), file,pipTaskLog);
}catch (Exception e){
pipTaskLog.append("==================SCA-SBOM节点执行失败=================");
pipTaskLog.append("SCA-SBOM节点执行失败失败请检查当前节点配置!");
pipTaskLog.append(e.getMessage());
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败");
}
}
return statusCode + "";
localVariables.put("statusCode", statusCode + "");
localVariables.put(EngineRuntimeConstant.LOG_KEY, pipTaskLog);
}
private void handleUpload(BaseRunContext workerParam, PipBaseElement pipelineNodeConfigInfo,
Map<String,Object> scaSbomConfigInfo, File file) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
private void handleUpload(Map<String,Object> scaSbomConfigInfo, File file, PipTaskLog pipTaskLog) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
RestTemplate restTemplate = getRestTemplateWithoutSANCheck();
String scaUploadUrl = ConstantContextHolder.getScaIp() + "/openapi/v1/sbom/detect-file";
MultiValueMap<String, Object> body = buildRequestBody(scaSbomConfigInfo, file);
@ -113,6 +120,7 @@ public class ScaSbomWorker extends HttpWorker{
String message = response.getString("message");
if (message.equals("success")) {
pipTaskLog.append("===============SCA上传成功=================");
JSONObject data = response.getJSONObject("data");
Integer scaTaskId = data.getInteger("scaTaskId");
@ -126,6 +134,7 @@ public class ScaSbomWorker extends HttpWorker{
}
pollTaskStatus(restTemplate, oldScaTaskId);
} else {
pipTaskLog.append("==================SCA接口异常调用失败=================");
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"SCA-SBOM节点执行失败");
}
}
@ -229,8 +238,4 @@ public class ScaSbomWorker extends HttpWorker{
return 0;
}
@Override
public void execute(TaskRunContext context) {
}
}

View File

@ -5,7 +5,6 @@ import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Slf4j