From 0491cf5ccbdc1e5755df56cfb98d12c90c6c2065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=92=B2=E5=85=88=E7=94=9F?= <821039958@qq.com> Date: Mon, 14 Apr 2025 17:38:21 +0800 Subject: [PATCH 1/6] =?UTF-8?q?worker=20pom=20=E6=B7=BB=E5=8A=A0=20module-?= =?UTF-8?q?ci-process-api=20=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/module-ci-worker/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/modules/module-ci-worker/pom.xml b/modules/module-ci-worker/pom.xml index 47fd3cc..6f19bac 100644 --- a/modules/module-ci-worker/pom.xml +++ b/modules/module-ci-worker/pom.xml @@ -13,6 +13,12 @@ module-ci-worker + + + cd.casic.boot + module-ci-process-api + + cd.casic.boot module-ci-common-pipeline From a100eae6c1565c9cd1551fcaaf53febf2c424379 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=92=B2=E5=85=88=E7=94=9F?= <821039958@qq.com> Date: Tue, 15 Apr 2025 13:50:57 +0800 Subject: [PATCH 2/6] =?UTF-8?q?worker=20=E6=A8=A1=E5=9D=97=E6=97=A5?= =?UTF-8?q?=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/worker/api/log/LogSDKApi.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/api/log/LogSDKApi.java diff --git a/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/api/log/LogSDKApi.java b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/api/log/LogSDKApi.java new file mode 100644 index 0000000..099fe5e --- /dev/null +++ b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/api/log/LogSDKApi.java @@ -0,0 +1,28 @@ +package cd.casic.devops.common.worker.api.log; + +import cd.casic.ci.log.dal.pojo.TaskBuildLogProperty; +import cd.casic.ci.log.dal.pojo.enums.LogStorageMode; +import cd.casic.ci.log.dal.pojo.message.LogMessage; + +import java.util.List; + +public interface LogSDKApi extends WorkerRestApiSDK { + + + Result addLogMultiLine(String buildId, List logMessages); + + + Result finishLog( + String tag, + String jobId, + Integer executeCount, + String subTag, + LogStorageMode logMode + ); + + + Result updateStorageMode( + List propertyList, + int executeCount + ); +} \ No newline at end of file From 12ccae1f2acd63387b13af6a99f29ad29d526562 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=92=B2=E5=85=88=E7=94=9F?= <821039958@qq.com> Date: Tue, 15 Apr 2025 14:13:42 +0800 Subject: [PATCH 3/6] =?UTF-8?q?worker=20common=20=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../devops/common/worker/task/ITask.java | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/ITask.java diff --git a/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/ITask.java b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/ITask.java new file mode 100644 index 0000000..af0100c --- /dev/null +++ b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/ITask.java @@ -0,0 +1,149 @@ +package cd.casic.devops.common.worker.task; + +import cd.casic.ci.common.pipeline.enums.ErrorCode; +import cd.casic.ci.common.pipeline.pojo.BuildParameters; +import cd.casic.ci.common.pipeline.pojo.ErrorType; +import cd.casic.ci.process.api.process.pojo.BuildTask; +import cd.casic.ci.process.api.process.pojo.BuildVariables; +import cd.casic.devops.common.worker.env.BuildEnv; +import cd.casic.devops.common.worker.env.BuildType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@SuppressWarnings({"NestedMethodCall", "MethodCount"}) +public abstract class ITask { + + private final Logger logger = LoggerFactory.getLogger(ITask.class); + + private final Map environment = new HashMap<>(); + + private final Map monitorData = new HashMap<>(); + + private String platformCode; + + private Integer platformErrorCode; + + private Boolean finishKillFlag; + + /* 存储常量的key */ + private List constVar; + + public void run(BuildTask buildTask, BuildVariables buildVariables, File workspace) { + // 过滤只读变量并收集键名到列表 + constVar = buildVariables.getVariablesWithType().stream() + .filter(variable -> variable.isReadOnly()) + .map(variable -> variable.getKey()) + .collect(Collectors.toList()); + + execute(buildTask, buildVariables, workspace); + } + + private BuildVariables combineVariables(BuildTask buildTask, BuildVariables buildVariables) { + + Map buildVariable = buildTask.getBuildVariable(); + if (buildVariable == null) { + return buildVariables; + } + + Map newVariables = buildVariables.getVariables(); + newVariables.putAll(buildVariable); + + List buildParameters = buildVariable.entrySet().stream().map(entry -> new BuildParameters(entry.getKey(), entry.getValue()) + ).collect(Collectors.toList()); + + Map newBuildParameters = + buildVariables.getVariablesWithType().stream() + .collect(Collectors.toMap( + BuildParameters::getKey, + param -> param, + (a, b) -> b // 合并函数(新值优先) + )); + buildParameters.forEach(param -> newBuildParameters.put(param.getKey(), param)); + + BuildVariables restBuildVariables = new BuildVariables(); + restBuildVariables.setVariables(newVariables); + restBuildVariables.setVariablesWithType(newBuildParameters.values().stream().collect(Collectors.toList())); + return restBuildVariables; + } + + abstract void execute(BuildTask buildTask, BuildVariables buildVariables, File workspace); + + protected void addEnv(Map env) { + if (!constVar.isEmpty()) { + boolean errFlag = false; + for (Map.Entry entry : env.entrySet()) { + String key = entry.getKey(); + if (constVar.contains(key)) { + LoggerService.addErrorLine("Variable " + key + " is read-only and cannot be modified."); + errFlag = true; + } + } + + if (errFlag) { + throw new TaskExecuteException( + "[Finish task] status: false, errorType: " + ErrorType.USER.getNum() + + ", errorCode: " + ErrorCode.USER_INPUT_INVAILD + + ", message: read-only cannot be modified.", + ErrorType.USER, + ErrorCode.USER_INPUT_INVAILD + ); + } + environment.putAll(env); + } + } + + protected void addEnv(String key, String value) { + environment.put(key, value); + } + + protected String getEnv(String key) { + return environment.getOrDefault(key, ""); + } + + public Map getAllEnv() { + return environment; + } + + protected void addMonitorData(Map monitorDataMap) { + monitorData.putAll(monitorDataMap); + } + + public Map getMonitorData() { + return new HashMap<>(monitorData); + } + + protected void addPlatformCode(String taskPlatformCode) { + this.platformCode = taskPlatformCode; + } + + public String getPlatformCode() { + return platformCode; + } + + protected void addPlatformErrorCode(int taskPlatformErrorCode) { + this.platformErrorCode = taskPlatformErrorCode; + } + + public Integer getPlatformErrorCode() { + return platformErrorCode; + } + + protected void addFinishKillFlag(boolean taskFinishKillFlag) { + this.finishKillFlag = taskFinishKillFlag; + } + + public Boolean getFinishKillFlag() { + return finishKillFlag; + } + + protected boolean isThirdAgent() { + return BuildEnv.getBuildType() == BuildType.AGENT; + } + +} \ No newline at end of file From d48e2225b6144480e47f75fbad093d8edf7958bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=92=B2=E5=85=88=E7=94=9F?= <821039958@qq.com> Date: Tue, 15 Apr 2025 14:18:16 +0800 Subject: [PATCH 4/6] =?UTF-8?q?worker=20common=20=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../devops/common/worker/task/EmptyTask.java | 27 +++++++++++++++++++ .../common/worker/task/TaskClassType.java | 17 ++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/EmptyTask.java create mode 100644 modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskClassType.java diff --git a/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/EmptyTask.java b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/EmptyTask.java new file mode 100644 index 0000000..0dbb682 --- /dev/null +++ b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/EmptyTask.java @@ -0,0 +1,27 @@ +package cd.casic.devops.common.worker.task; + +import cd.casic.ci.common.pipeline.enums.ErrorCode; +import cd.casic.ci.common.pipeline.pojo.ErrorType; +import java.io.File; +import cd.casic.ci.process.api.process.pojo.BuildTask; +import cd.casic.ci.process.api.process.pojo.BuildVariables; + +public class EmptyTask extends ITask { + private final String type; + + public EmptyTask(String type) { + this.type = type; + } + + @Override + public void execute(BuildTask buildTask, BuildVariables buildVariables, File workspace) { + throw new TaskExecuteException( + "Received unimplemented build task: " + type, + ErrorCode.USER_INPUT_INVAILD, + ErrorType.USER + ); + } +} + + + diff --git a/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskClassType.java b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskClassType.java new file mode 100644 index 0000000..06f55a5 --- /dev/null +++ b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskClassType.java @@ -0,0 +1,17 @@ +package cd.casic.devops.common.worker.task; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +@Target({ElementType.TYPE, ElementType.FIELD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface TaskClassType { + + String[] classTypes(); + + + int priority() default 0; +} From ee9ac3b7b48085c2ecd589d8c20a023748fbde45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=92=B2=E5=85=88=E7=94=9F?= <821039958@qq.com> Date: Tue, 15 Apr 2025 15:22:26 +0800 Subject: [PATCH 5/6] =?UTF-8?q?worker=20common=20=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/pipeline/pojo/BuildParameters.java | 5 + .../process/api/process/pojo/BuildTask.java | 2 + .../devops/common/worker/task/TaskDaemon.java | 127 ++++++++++++++++++ 3 files changed, 134 insertions(+) create mode 100644 modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskDaemon.java diff --git a/modules/module-ci-common-pipeline/src/main/java/cd/casic/ci/common/pipeline/pojo/BuildParameters.java b/modules/module-ci-common-pipeline/src/main/java/cd/casic/ci/common/pipeline/pojo/BuildParameters.java index d71480d..0dec43e 100644 --- a/modules/module-ci-common-pipeline/src/main/java/cd/casic/ci/common/pipeline/pojo/BuildParameters.java +++ b/modules/module-ci-common-pipeline/src/main/java/cd/casic/ci/common/pipeline/pojo/BuildParameters.java @@ -38,4 +38,9 @@ public class BuildParameters { this.desc = desc; this.defaultValue = defaultValue; } + + public BuildParameters(String key, String value) { + this.key = key; + this.value = value; + } } diff --git a/modules/module-ci-process-api/src/main/java/cd/casic/ci/process/api/process/pojo/BuildTask.java b/modules/module-ci-process-api/src/main/java/cd/casic/ci/process/api/process/pojo/BuildTask.java index e3d49a4..e333fbb 100644 --- a/modules/module-ci-process-api/src/main/java/cd/casic/ci/process/api/process/pojo/BuildTask.java +++ b/modules/module-ci-process-api/src/main/java/cd/casic/ci/process/api/process/pojo/BuildTask.java @@ -2,6 +2,7 @@ package cd.casic.ci.process.api.process.pojo; import cd.casic.ci.common.pipeline.enums.BuildTaskStatus; import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; import java.util.Map; @@ -14,6 +15,7 @@ import java.util.Map; * @Filename:BuildTask * @description:Todo */ +@Data @Schema(title = "流水线模型-构建任务") public class BuildTask { @Schema(title = "构建ID", required = true) diff --git a/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskDaemon.java b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskDaemon.java new file mode 100644 index 0000000..b804844 --- /dev/null +++ b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskDaemon.java @@ -0,0 +1,127 @@ +package cd.casic.devops.common.worker.task; +import cd.casic.ci.common.pipeline.enums.ErrorCode; +import cd.casic.ci.common.pipeline.pojo.ErrorType; +import cd.casic.ci.process.api.process.pojo.BuildTask; +import cd.casic.ci.process.api.process.pojo.BuildTaskResult; +import cd.casic.ci.process.api.process.pojo.BuildVariables; +import com.github.xiaoymin.knife4j.core.util.CommonUtils; +import java.io.File; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.*; + +import static cd.casic.ci.process.api.process.utils.Constants.PIPELINE_TASK_MESSAGE_STRING_LENGTH_MAX; + +public class TaskDaemon implements Callable> { + private final ITask task; + private final BuildTask buildTask; + private final BuildVariables buildVariables; + private final File workspace; + private static final int PARAM_MAX_LENGTH = 4000; // 流水线参数最大长度 + + public TaskDaemon(ITask task, BuildTask buildTask, BuildVariables buildVariables, File workspace) { + this.task = task; + this.buildTask = buildTask; + this.buildVariables = buildVariables; + this.workspace = workspace; + } + + @Override + public Map call() { + task.run(buildTask, buildVariables, workspace); + return task.getAllEnv(); + } + + public void runWithTimeout() { + int timeout = TaskUtil.getTimeOut(buildTask); + ExecutorService executor = Executors.newCachedThreadPool(); + String taskId = buildTask.getTaskId(); + + if (taskId != null) { + TaskExecutorCache.put(taskId, executor); + } + Future> f1 = executor.submit(this); + try { + if (f1.get(timeout, TimeUnit.MINUTES) == null) { + throw new TimeoutException("Task[" + buildTask.getElementName() + + "] timeout: " + timeout + " minutes"); + } + } catch (Exception e) { + throw new TaskExecuteException( + ErrorType.USER, + ErrorCode.USER_TASK_OUTTIME_LIMIT, + e.getMessage() != null ? e.getMessage() : + "Task[" + buildTask.getElementName() + "] timeout: " + timeout + " minutes" + ); + } finally { + executor.shutdownNow(); + if (taskId != null) { + TaskExecutorCache.invalidate(taskId); + } + } + } + + private Map getAllEnv() { + return task.getAllEnv(); + } + + private Map getMonitorData() { + return task.getMonitorData(); + } + + public BuildTaskResult getBuildResult( + Boolean isSuccess, + String errorMessage, + String errorType, + Integer errorCode) { + Map allEnv = getAllEnv(); + Map buildResult = new HashMap<>(); + + if (allEnv != null && !allEnv.isEmpty()) { + for (Map.Entry entry : allEnv.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + + // 检查值长度 + if (value != null && value.length() > PARAM_MAX_LENGTH) { + LoggerService.getInstance().addWarnLine( + "Warning, assignment to variable [" + key + "] failed, " + + "more than " + PARAM_MAX_LENGTH + " characters(len=" + value.length() + ")" + ); + continue; + } + + if (SensitiveValueService.matchSensitiveValue(value)) { + LoggerService.getInstance().addWarnLine( + "Warning, credentials cannot be assigned to variable[" + key + "]" + ); + continue; + } + + buildResult.put(key, value); + } + } + + return new BuildTaskResult( + buildTask.getTaskId(), + buildTask.getTaskId(), + buildVariables.getContainerHashId(), + buildTask.getElementVersion(), + isSuccess, + buildTask.getExecuteCount(), + buildResult, + errorMessage != null ? + CommonUtils.interceptStringInLength( + SensitiveValueService.fixSensitiveContent(errorMessage), + PIPELINE_TASK_MESSAGE_STRING_LENGTH_MAX + ) : null, + buildTask.getType(), + errorType, + errorCode, + task.getPlatformCode(), + task.getPlatformErrorCode(), + getMonitorData() + ); + + } +} \ No newline at end of file From 0924444849d0eb214e3b0a49d820fbe417e35fe1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=92=B2=E5=85=88=E7=94=9F?= <821039958@qq.com> Date: Tue, 15 Apr 2025 16:15:39 +0800 Subject: [PATCH 6/6] =?UTF-8?q?worker=20common=20=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/worker/task/TaskExecutorCache.java | 44 +++++++++ .../common/worker/task/TaskFactory.java | 99 +++++++++++++++++++ 2 files changed, 143 insertions(+) create mode 100644 modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskExecutorCache.java create mode 100644 modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskFactory.java diff --git a/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskExecutorCache.java b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskExecutorCache.java new file mode 100644 index 0000000..18d392a --- /dev/null +++ b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskExecutorCache.java @@ -0,0 +1,44 @@ +package cd.casic.devops.common.worker.task; + +import cd.casic.ci.process.api.engine.common.Timeout; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Cache; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public final class TaskExecutorCache { + + private static final TaskExecutorCache INSTANCE = new TaskExecutorCache(); + + // Caffeine 缓存 + private final Cache taskExecutorCache = Caffeine.newBuilder() + .maximumSize(50) + .expireAfterWrite(Timeout.MAX_JOB_RUN_DAYS, TimeUnit.DAYS) + .build(); + + // 私有构造函数 + private TaskExecutorCache() {} + + // 获取单例实例 + public static TaskExecutorCache getInstance() { + return INSTANCE; + } + + public void invalidate(String taskId) { + taskExecutorCache.invalidate(taskId); + } + + public void put(String taskId, ExecutorService executor) { + taskExecutorCache.put(taskId, executor); + } + + public ExecutorService getIfPresent(String taskId) { + return taskExecutorCache.getIfPresent(taskId); + } + + public Map getAllPresent(Set taskIds) { + return taskExecutorCache.getAllPresent(taskIds); + } +} \ No newline at end of file diff --git a/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskFactory.java b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskFactory.java new file mode 100644 index 0000000..083d916 --- /dev/null +++ b/modules/module-ci-worker/src/main/java/cd/casic/devops/common/worker/task/TaskFactory.java @@ -0,0 +1,99 @@ +package cd.casic.devops.common.worker.task; + +import cd.casic.ci.common.pipeline.pojo.element.agent.LinuxScriptElement; +import cd.casic.ci.common.pipeline.pojo.element.agent.WindowsScriptElement; +import cd.casic.ci.common.pipeline.pojo.element.market.MarketBuildAtomElement; +import cd.casic.ci.common.pipeline.pojo.element.market.MarketBuildLessAtomElement; +import org.reflections.Reflections; + +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.Map; + +public final class TaskFactory { + // 单例实例 + private static final TaskFactory INSTANCE = new TaskFactory(); + + // 任务类型映射表 + private final Map> taskMap = new ConcurrentHashMap<>(); + + // 私有构造方法 + private TaskFactory() {} + + // 获取单例实例 + public static TaskFactory getInstance() { + return INSTANCE; + } + + @SuppressWarnings("all") + public void init() { + // 注册基础任务类型 + register(LinuxScriptElement.classType, LinuxScriptTask.class); + register(WindowsScriptElement.classType, WindowsScriptTask.class); + register(MarketBuildAtomElement.classType, MarketAtomTask.class); + register(MarketBuildLessAtomElement.classType, MarketAtomTask.class); + + // 使用反射扫描任务类 + Reflections reflections = new Reflections("com.tencent.devops.plugin.worker.task"); + Set> taskClasses = reflections.getSubTypesOf(ITask.class); + + Map candidatePriorityMap = new HashMap<>(); + Map> candidateMap = new HashMap<>(); + + if (taskClasses != null) { + for (Class taskClazz : taskClasses) { + if (!Modifier.isAbstract(taskClazz.getModifiers())) { + TaskClassType taskClassType = taskClazz.getAnnotation(TaskClassType.class); + if (taskClassType != null) { + for (String classType : taskClassType.classTypes()) { + boolean find = false; + Integer priority = candidatePriorityMap.get(classType); + + if (taskClassType.priority() > (priority != null ? priority : 0)) { + priority = taskClassType.priority(); + find = true; + } + + if (priority == null || find) { + candidatePriorityMap.put(classType, priority != null ? priority : 0); + candidateMap.put(classType, taskClazz); + } + } + } + } + } + } + + // 注册扫描到的任务类 + for (Map.Entry> entry : candidateMap.entrySet()) { + register(entry.getKey(), entry.getValue()); + } + } + + + // 注册任务类型 + private void register(String classType, Class taskClass) { + taskMap.put(classType, taskClass); + } + + // 创建任务实例 + public ITask create(String type) { + Class clazz = taskMap.get(type); + if (clazz == null) { + return new EmptyTask(type); + } + + try { + // 尝试获取无参构造器 + java.lang.reflect.Constructor ctor = clazz.getDeclaredConstructor(); + if (ctor != null) { + return ctor.newInstance(); + } + } catch (Exception e) { + // 构造失败时返回EmptyTask + } + return new EmptyTask(type); + } +} \ No newline at end of file