新的worker添加,等待审批节点预留

This commit is contained in:
even 2025-05-28 19:31:30 +08:00
parent 7d38e59482
commit de80ed56fe
21 changed files with 149 additions and 59 deletions

View File

@ -6,23 +6,18 @@ import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.manager.WorkerManager;
import cd.casic.ci.process.engine.message.TaskRunMessage;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.worker.BaseWorker;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import cd.casic.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import jodd.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

View File

@ -3,28 +3,15 @@ package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.AFLConstant;
import cd.casic.ci.process.engine.constant.DIYImageExecuteCommandConstant;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.engine.worker.base.SshWorker;
import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.target.TargetVersion;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.ci.process.process.service.target.TargetVersionService;
import cd.casic.ci.process.util.CryptogramUtil;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import com.jcraft.jsch.*;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
@Plugin(taskType = "AFL")

View File

@ -4,8 +4,8 @@ import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.context.ConstantContextHolder;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.HttpWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
@ -41,8 +41,8 @@ import java.util.*;
* @Description:
*/
@Slf4j
@Plugin(taskType = "application")
public class ApplicationWorker extends HttpWorker{
@Plugin(taskType = "APPLICATION")
public class ApplicationWorker extends HttpWorker {
private static final int POLLING_INTERVAL = 5000; // 轮询间隔单位毫秒
private static final int MAX_POLLING_TIMES = 100; // 最大退出次数

View File

@ -4,6 +4,7 @@ import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.context.ConstantContextHolder;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.HttpWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.task.PipTask;
@ -29,8 +30,8 @@ import java.util.*;
* @Description:
*/
@Slf4j
@Plugin(taskType = "coding")
public class CodingWorker extends HttpWorker{
@Plugin(taskType = "CODING")
public class CodingWorker extends HttpWorker {
private static final int POLLING_INTERVAL = 5000; // 轮询间隔单位毫秒
private static final int MAX_POLLING_TIMES = 100; // 最大退出次数

View File

@ -0,0 +1,25 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
/**
* 自定义编译
* */
@Slf4j
@Plugin(taskType = "CUSTOM_COMPILE")
public class CustomCompilerWorker extends BaseWorker {
@Override
public void execute(TaskRunContext context) {
PipBaseElement contextDef = context.getContextDef();
String id = contextDef.getId();
log.info("==============触发worker执行========");
log.info("==========运行context{}===========", JSON.toJSONString(context));
}
}

View File

@ -0,0 +1,25 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
/**
* 数据库模糊测试
* */
@Slf4j
@Plugin(taskType = "DB_FUZZ_TESTING")
public class DBFuzzTestingWorker extends BaseWorker {
@Override
public void execute(TaskRunContext context) {
PipBaseElement contextDef = context.getContextDef();
String id = contextDef.getId();
log.info("==============触发worker执行========");
log.info("==========运行context{}===========", JSON.toJSONString(context));
}
}

View File

@ -4,19 +4,14 @@ import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.DIYImageExecuteCommandConstant;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.engine.worker.base.SshWorker;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 自定义镜像执行命令

View File

@ -0,0 +1,25 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
/**
* sast
* */
@Slf4j
@Plugin(taskType = "SAST")
public class SastWorker extends BaseWorker {
@Override
public void execute(TaskRunContext context) {
PipBaseElement contextDef = context.getContextDef();
String id = contextDef.getId();
log.info("==============触发worker执行========");
log.info("==========运行context{}===========", JSON.toJSONString(context));
}
}

View File

@ -4,6 +4,7 @@ import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.context.ConstantContextHolder;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.HttpWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.target.TargetVersion;
@ -38,8 +39,8 @@ import java.util.*;
* @Description:
*/
@Slf4j
@Plugin(taskType = "binary")
public class ScaBinaryWorker extends HttpWorker{
@Plugin(taskType = "BINARY")
public class ScaBinaryWorker extends HttpWorker {
private static final int POLLING_INTERVAL = 5000; // 轮询间隔单位毫秒
private static final int MAX_POLLING_TIMES = 100; // 最大退出次数

View File

@ -1,12 +1,11 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.context.ConstantContextHolder;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.HttpWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.target.TargetVersion;
import cd.casic.ci.process.process.dataObject.task.PipTask;
@ -41,8 +40,8 @@ import java.util.*;
* @Description:
*/
@Slf4j
@Plugin(taskType = "mirror")
public class ScaMirrorWorker extends HttpWorker{
@Plugin(taskType = "MIRROR")
public class ScaMirrorWorker extends HttpWorker {
private static final int POLLING_INTERVAL = 5000; // 轮询间隔单位毫秒
private static final int MAX_POLLING_TIMES = 100; // 最大退出次数

View File

@ -5,6 +5,7 @@ import cd.casic.ci.process.engine.constant.EngineRuntimeConstant;
import cd.casic.ci.process.engine.context.ConstantContextHolder;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.HttpWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
@ -41,7 +42,7 @@ import java.util.*;
*/
@Slf4j
@Plugin(taskType = "SCA_S_BOM")
public class ScaSbomWorker extends HttpWorker{
public class ScaSbomWorker extends HttpWorker {
private static final int POLLING_INTERVAL = 5000; // 轮询间隔单位毫秒
private static final int MAX_POLLING_TIMES = 100; // 最大退出次数

View File

@ -2,6 +2,7 @@ package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
@ -13,7 +14,6 @@ import cd.casic.ci.process.util.CryptogramUtil;
import cd.casic.ci.process.util.SftpUploadUtil;
import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import com.alibaba.fastjson.JSON;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -27,7 +27,7 @@ import java.util.Map;
* */
@Slf4j
@Plugin(taskType = "code")
public class TargetHandleWorker extends BaseWorker{
public class TargetHandleWorker extends BaseWorker {
@Resource
private TargetVersionService targetVersionService;
@Resource

View File

@ -1,10 +1,10 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.constant.AFLConstant;
import cd.casic.ci.process.engine.constant.DIYImageExecuteCommandConstant;
import cd.casic.ci.process.engine.constant.TestCaseGenerationConstant;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.SshWorker;
import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.task.PipTask;
@ -16,7 +16,7 @@ import java.util.Map;
@Plugin(taskType = "TEST_CASE_GENERATION")
@Slf4j
public class TestCaseGenerationWorker extends SshWorker{
public class TestCaseGenerationWorker extends SshWorker {
@Override
public void execute(TaskRunContext context) {
int statusCode = -1;

View File

@ -2,6 +2,7 @@ package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
@ -9,7 +10,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@Plugin(taskType = "Github")
public class TestGitWorker extends BaseWorker{
public class TestGitWorker extends BaseWorker {
@Override

View File

@ -1,7 +1,7 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
@ -9,7 +9,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
//@Plugin(taskType = "test")
public class TestWorker extends BaseWorker{
public class TestWorker extends BaseWorker {
@Override

View File

@ -0,0 +1,25 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
/**
* 单元测试
* */
@Slf4j
@Plugin(taskType = "UNIT_TESTING")
public class UnitTestingWorker extends BaseWorker {
@Override
public void execute(TaskRunContext context) {
PipBaseElement contextDef = context.getContextDef();
String id = contextDef.getId();
log.info("==============触发worker执行========");
log.info("==========运行context{}===========", JSON.toJSONString(context));
}
}

View File

@ -1,4 +1,4 @@
package cd.casic.ci.process.engine.worker;
package cd.casic.ci.process.engine.worker.base;
import cd.casic.ci.process.constant.CommandConstant;
@ -58,12 +58,12 @@ public abstract class BaseWorker implements Runnable{
if (context instanceof TaskRunContext taskRunContext){
try {
taskRunContext.changeContextState(ContextStateEnum.READY);
if (this instanceof PassableWorker passableWorker) {
taskRunContext.changeContextState(ContextStateEnum.SUSPEND);
passableWorker.waitForPermission();
} else {
taskRunContext.changeContextState(ContextStateEnum.RUNNING);
PipBaseElement contextDef = taskRunContext.getContextDef();
PipTaskLog pipTaskLog = new PipTaskLog();
pipTaskLog.setId(contextDef.getId());
pipTaskLog.setContent("");
taskRunContext.getLocalVariables().put(EngineRuntimeConstant.LOG_KEY,pipTaskLog);
}
execute(taskRunContext);
} catch (Exception e) {
log.error("================worker执行报错",e);

View File

@ -1,6 +1,7 @@
package cd.casic.ci.process.engine.worker;
package cd.casic.ci.process.engine.worker.base;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
@ -15,7 +16,6 @@ import javax.net.ssl.SSLContext;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
/**
* @author HopeLi
@ -24,7 +24,7 @@ import java.security.cert.X509Certificate;
* @Date: 2025/5/21 20:13
* @Description:
*/
public abstract class HttpWorker extends BaseWorker{
public abstract class HttpWorker extends BaseWorker {
public static RestTemplate getRestTemplateWithoutSANCheck() throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException, KeyStoreException {
// 创建信任所有证书的SSL上下文
SSLContext sslContext = SSLContextBuilder.create()

View File

@ -0,0 +1,10 @@
package cd.casic.ci.process.engine.worker.base;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
public abstract class PassableWorker extends BaseWorker{
/**
* 这个方法用于阻塞
* */
public abstract void waitForPermission();
}

View File

@ -1,9 +1,9 @@
package cd.casic.ci.process.engine.worker;
package cd.casic.ci.process.engine.worker.base;
import cd.casic.ci.process.constant.CommandConstant;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.enums.MachineSystemEnum;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
import cd.casic.ci.process.ssh.SshClient;
import cd.casic.ci.process.ssh.SshClientFactory;
@ -16,7 +16,7 @@ import java.util.List;
* 使用ssh的worker的基类
* */
@Slf4j
public abstract class SshWorker extends BaseWorker{
public abstract class SshWorker extends BaseWorker {
/**
* 执行shell命令
* @param machineInfo 机器

View File

@ -141,7 +141,7 @@ public class PipelineServiceImpl extends ServiceImpl<PipelineDao, PipPipeline> i
PipTask childTask11 = new PipTask();
childTask11.setCreateTime(LocalDateTime.now());
childTask11.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
childTask11.setTaskName("串行任务-1-1");
childTask11.setTaskName("目标");
childTask11.setPipelineId(pipeline.getId());
childTask11.setTaskType("code");
childTask11.setTaskSort(1);