This commit is contained in:
even 2025-04-15 17:00:38 +08:00
commit b8e6b45a65
10 changed files with 504 additions and 0 deletions

View File

@ -38,4 +38,9 @@ public class BuildParameters {
this.desc = desc; this.desc = desc;
this.defaultValue = defaultValue; this.defaultValue = defaultValue;
} }
public BuildParameters(String key, String value) {
this.key = key;
this.value = value;
}
} }

View File

@ -2,6 +2,7 @@ package cd.casic.ci.process.api.process.pojo;
import cd.casic.ci.common.pipeline.enums.BuildTaskStatus; import cd.casic.ci.common.pipeline.enums.BuildTaskStatus;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.Map; import java.util.Map;
@ -14,6 +15,7 @@ import java.util.Map;
* @FilenameBuildTask * @FilenameBuildTask
* @descriptionTodo * @descriptionTodo
*/ */
@Data
@Schema(title = "流水线模型-构建任务") @Schema(title = "流水线模型-构建任务")
public class BuildTask { public class BuildTask {
@Schema(title = "构建ID", required = true) @Schema(title = "构建ID", required = true)

View File

@ -13,6 +13,12 @@
<artifactId>module-ci-worker</artifactId> <artifactId>module-ci-worker</artifactId>
<dependencies> <dependencies>
<dependency>
<groupId>cd.casic.boot</groupId>
<artifactId>module-ci-process-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>cd.casic.boot</groupId> <groupId>cd.casic.boot</groupId>
<artifactId>module-ci-common-pipeline</artifactId> <artifactId>module-ci-common-pipeline</artifactId>

View File

@ -0,0 +1,28 @@
package cd.casic.devops.common.worker.api.log;
import cd.casic.ci.log.dal.pojo.TaskBuildLogProperty;
import cd.casic.ci.log.dal.pojo.enums.LogStorageMode;
import cd.casic.ci.log.dal.pojo.message.LogMessage;
import java.util.List;
public interface LogSDKApi extends WorkerRestApiSDK {
Result<Boolean> addLogMultiLine(String buildId, List<LogMessage> logMessages);
Result<Boolean> finishLog(
String tag,
String jobId,
Integer executeCount,
String subTag,
LogStorageMode logMode
);
Result<Boolean> updateStorageMode(
List<TaskBuildLogProperty> propertyList,
int executeCount
);
}

View File

@ -0,0 +1,27 @@
package cd.casic.devops.common.worker.task;
import cd.casic.ci.common.pipeline.enums.ErrorCode;
import cd.casic.ci.common.pipeline.pojo.ErrorType;
import java.io.File;
import cd.casic.ci.process.api.process.pojo.BuildTask;
import cd.casic.ci.process.api.process.pojo.BuildVariables;
public class EmptyTask extends ITask {
private final String type;
public EmptyTask(String type) {
this.type = type;
}
@Override
public void execute(BuildTask buildTask, BuildVariables buildVariables, File workspace) {
throw new TaskExecuteException(
"Received unimplemented build task: " + type,
ErrorCode.USER_INPUT_INVAILD,
ErrorType.USER
);
}
}

View File

@ -0,0 +1,149 @@
package cd.casic.devops.common.worker.task;
import cd.casic.ci.common.pipeline.enums.ErrorCode;
import cd.casic.ci.common.pipeline.pojo.BuildParameters;
import cd.casic.ci.common.pipeline.pojo.ErrorType;
import cd.casic.ci.process.api.process.pojo.BuildTask;
import cd.casic.ci.process.api.process.pojo.BuildVariables;
import cd.casic.devops.common.worker.env.BuildEnv;
import cd.casic.devops.common.worker.env.BuildType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@SuppressWarnings({"NestedMethodCall", "MethodCount"})
public abstract class ITask {
private final Logger logger = LoggerFactory.getLogger(ITask.class);
private final Map<String, String> environment = new HashMap<>();
private final Map<String, Object> monitorData = new HashMap<>();
private String platformCode;
private Integer platformErrorCode;
private Boolean finishKillFlag;
/* 存储常量的key */
private List<String> constVar;
public void run(BuildTask buildTask, BuildVariables buildVariables, File workspace) {
// 过滤只读变量并收集键名到列表
constVar = buildVariables.getVariablesWithType().stream()
.filter(variable -> variable.isReadOnly())
.map(variable -> variable.getKey())
.collect(Collectors.toList());
execute(buildTask, buildVariables, workspace);
}
private BuildVariables combineVariables(BuildTask buildTask, BuildVariables buildVariables) {
Map<String, String> buildVariable = buildTask.getBuildVariable();
if (buildVariable == null) {
return buildVariables;
}
Map<String, String> newVariables = buildVariables.getVariables();
newVariables.putAll(buildVariable);
List<BuildParameters> buildParameters = buildVariable.entrySet().stream().map(entry -> new BuildParameters(entry.getKey(), entry.getValue())
).collect(Collectors.toList());
Map<String, BuildParameters> newBuildParameters =
buildVariables.getVariablesWithType().stream()
.collect(Collectors.toMap(
BuildParameters::getKey,
param -> param,
(a, b) -> b // 合并函数新值优先
));
buildParameters.forEach(param -> newBuildParameters.put(param.getKey(), param));
BuildVariables restBuildVariables = new BuildVariables();
restBuildVariables.setVariables(newVariables);
restBuildVariables.setVariablesWithType(newBuildParameters.values().stream().collect(Collectors.toList()));
return restBuildVariables;
}
abstract void execute(BuildTask buildTask, BuildVariables buildVariables, File workspace);
protected void addEnv(Map<String, String> env) {
if (!constVar.isEmpty()) {
boolean errFlag = false;
for (Map.Entry<String, String> entry : env.entrySet()) {
String key = entry.getKey();
if (constVar.contains(key)) {
LoggerService.addErrorLine("Variable " + key + " is read-only and cannot be modified.");
errFlag = true;
}
}
if (errFlag) {
throw new TaskExecuteException(
"[Finish task] status: false, errorType: " + ErrorType.USER.getNum() +
", errorCode: " + ErrorCode.USER_INPUT_INVAILD +
", message: read-only cannot be modified.",
ErrorType.USER,
ErrorCode.USER_INPUT_INVAILD
);
}
environment.putAll(env);
}
}
protected void addEnv(String key, String value) {
environment.put(key, value);
}
protected String getEnv(String key) {
return environment.getOrDefault(key, "");
}
public Map<String, String> getAllEnv() {
return environment;
}
protected void addMonitorData(Map<String, Object> monitorDataMap) {
monitorData.putAll(monitorDataMap);
}
public Map<String, Object> getMonitorData() {
return new HashMap<>(monitorData);
}
protected void addPlatformCode(String taskPlatformCode) {
this.platformCode = taskPlatformCode;
}
public String getPlatformCode() {
return platformCode;
}
protected void addPlatformErrorCode(int taskPlatformErrorCode) {
this.platformErrorCode = taskPlatformErrorCode;
}
public Integer getPlatformErrorCode() {
return platformErrorCode;
}
protected void addFinishKillFlag(boolean taskFinishKillFlag) {
this.finishKillFlag = taskFinishKillFlag;
}
public Boolean getFinishKillFlag() {
return finishKillFlag;
}
protected boolean isThirdAgent() {
return BuildEnv.getBuildType() == BuildType.AGENT;
}
}

View File

@ -0,0 +1,17 @@
package cd.casic.devops.common.worker.task;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.TYPE, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface TaskClassType {
String[] classTypes();
int priority() default 0;
}

View File

@ -0,0 +1,127 @@
package cd.casic.devops.common.worker.task;
import cd.casic.ci.common.pipeline.enums.ErrorCode;
import cd.casic.ci.common.pipeline.pojo.ErrorType;
import cd.casic.ci.process.api.process.pojo.BuildTask;
import cd.casic.ci.process.api.process.pojo.BuildTaskResult;
import cd.casic.ci.process.api.process.pojo.BuildVariables;
import com.github.xiaoymin.knife4j.core.util.CommonUtils;
import java.io.File;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.*;
import static cd.casic.ci.process.api.process.utils.Constants.PIPELINE_TASK_MESSAGE_STRING_LENGTH_MAX;
public class TaskDaemon implements Callable<Map<String, String>> {
private final ITask task;
private final BuildTask buildTask;
private final BuildVariables buildVariables;
private final File workspace;
private static final int PARAM_MAX_LENGTH = 4000; // 流水线参数最大长度
public TaskDaemon(ITask task, BuildTask buildTask, BuildVariables buildVariables, File workspace) {
this.task = task;
this.buildTask = buildTask;
this.buildVariables = buildVariables;
this.workspace = workspace;
}
@Override
public Map<String, String> call() {
task.run(buildTask, buildVariables, workspace);
return task.getAllEnv();
}
public void runWithTimeout() {
int timeout = TaskUtil.getTimeOut(buildTask);
ExecutorService executor = Executors.newCachedThreadPool();
String taskId = buildTask.getTaskId();
if (taskId != null) {
TaskExecutorCache.put(taskId, executor);
}
Future<Map<String, String>> f1 = executor.submit(this);
try {
if (f1.get(timeout, TimeUnit.MINUTES) == null) {
throw new TimeoutException("Task[" + buildTask.getElementName() +
"] timeout: " + timeout + " minutes");
}
} catch (Exception e) {
throw new TaskExecuteException(
ErrorType.USER,
ErrorCode.USER_TASK_OUTTIME_LIMIT,
e.getMessage() != null ? e.getMessage() :
"Task[" + buildTask.getElementName() + "] timeout: " + timeout + " minutes"
);
} finally {
executor.shutdownNow();
if (taskId != null) {
TaskExecutorCache.invalidate(taskId);
}
}
}
private Map<String, String> getAllEnv() {
return task.getAllEnv();
}
private Map<String, Object> getMonitorData() {
return task.getMonitorData();
}
public BuildTaskResult getBuildResult(
Boolean isSuccess,
String errorMessage,
String errorType,
Integer errorCode) {
Map<String, String> allEnv = getAllEnv();
Map<String, String> buildResult = new HashMap<>();
if (allEnv != null && !allEnv.isEmpty()) {
for (Map.Entry<String, String> entry : allEnv.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
// 检查值长度
if (value != null && value.length() > PARAM_MAX_LENGTH) {
LoggerService.getInstance().addWarnLine(
"Warning, assignment to variable [" + key + "] failed, " +
"more than " + PARAM_MAX_LENGTH + " characters(len=" + value.length() + ")"
);
continue;
}
if (SensitiveValueService.matchSensitiveValue(value)) {
LoggerService.getInstance().addWarnLine(
"Warning, credentials cannot be assigned to variable[" + key + "]"
);
continue;
}
buildResult.put(key, value);
}
}
return new BuildTaskResult(
buildTask.getTaskId(),
buildTask.getTaskId(),
buildVariables.getContainerHashId(),
buildTask.getElementVersion(),
isSuccess,
buildTask.getExecuteCount(),
buildResult,
errorMessage != null ?
CommonUtils.interceptStringInLength(
SensitiveValueService.fixSensitiveContent(errorMessage),
PIPELINE_TASK_MESSAGE_STRING_LENGTH_MAX
) : null,
buildTask.getType(),
errorType,
errorCode,
task.getPlatformCode(),
task.getPlatformErrorCode(),
getMonitorData()
);
}
}

View File

@ -0,0 +1,44 @@
package cd.casic.devops.common.worker.task;
import cd.casic.ci.process.api.engine.common.Timeout;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Cache;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public final class TaskExecutorCache {
private static final TaskExecutorCache INSTANCE = new TaskExecutorCache();
// Caffeine 缓存
private final Cache<String, ExecutorService> taskExecutorCache = Caffeine.newBuilder()
.maximumSize(50)
.expireAfterWrite(Timeout.MAX_JOB_RUN_DAYS, TimeUnit.DAYS)
.build();
// 私有构造函数
private TaskExecutorCache() {}
// 获取单例实例
public static TaskExecutorCache getInstance() {
return INSTANCE;
}
public void invalidate(String taskId) {
taskExecutorCache.invalidate(taskId);
}
public void put(String taskId, ExecutorService executor) {
taskExecutorCache.put(taskId, executor);
}
public ExecutorService getIfPresent(String taskId) {
return taskExecutorCache.getIfPresent(taskId);
}
public Map<String, ExecutorService> getAllPresent(Set<String> taskIds) {
return taskExecutorCache.getAllPresent(taskIds);
}
}

View File

@ -0,0 +1,99 @@
package cd.casic.devops.common.worker.task;
import cd.casic.ci.common.pipeline.pojo.element.agent.LinuxScriptElement;
import cd.casic.ci.common.pipeline.pojo.element.agent.WindowsScriptElement;
import cd.casic.ci.common.pipeline.pojo.element.market.MarketBuildAtomElement;
import cd.casic.ci.common.pipeline.pojo.element.market.MarketBuildLessAtomElement;
import org.reflections.Reflections;
import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
public final class TaskFactory {
// 单例实例
private static final TaskFactory INSTANCE = new TaskFactory();
// 任务类型映射表
private final Map<String, Class<? extends ITask>> taskMap = new ConcurrentHashMap<>();
// 私有构造方法
private TaskFactory() {}
// 获取单例实例
public static TaskFactory getInstance() {
return INSTANCE;
}
@SuppressWarnings("all")
public void init() {
// 注册基础任务类型
register(LinuxScriptElement.classType, LinuxScriptTask.class);
register(WindowsScriptElement.classType, WindowsScriptTask.class);
register(MarketBuildAtomElement.classType, MarketAtomTask.class);
register(MarketBuildLessAtomElement.classType, MarketAtomTask.class);
// 使用反射扫描任务类
Reflections reflections = new Reflections("com.tencent.devops.plugin.worker.task");
Set<Class<? extends ITask>> taskClasses = reflections.getSubTypesOf(ITask.class);
Map<String, Integer> candidatePriorityMap = new HashMap<>();
Map<String, Class<? extends ITask>> candidateMap = new HashMap<>();
if (taskClasses != null) {
for (Class<? extends ITask> taskClazz : taskClasses) {
if (!Modifier.isAbstract(taskClazz.getModifiers())) {
TaskClassType taskClassType = taskClazz.getAnnotation(TaskClassType.class);
if (taskClassType != null) {
for (String classType : taskClassType.classTypes()) {
boolean find = false;
Integer priority = candidatePriorityMap.get(classType);
if (taskClassType.priority() > (priority != null ? priority : 0)) {
priority = taskClassType.priority();
find = true;
}
if (priority == null || find) {
candidatePriorityMap.put(classType, priority != null ? priority : 0);
candidateMap.put(classType, taskClazz);
}
}
}
}
}
}
// 注册扫描到的任务类
for (Map.Entry<String, Class<? extends ITask>> entry : candidateMap.entrySet()) {
register(entry.getKey(), entry.getValue());
}
}
// 注册任务类型
private void register(String classType, Class<? extends ITask> taskClass) {
taskMap.put(classType, taskClass);
}
// 创建任务实例
public ITask create(String type) {
Class<? extends ITask> clazz = taskMap.get(type);
if (clazz == null) {
return new EmptyTask(type);
}
try {
// 尝试获取无参构造器
java.lang.reflect.Constructor<? extends ITask> ctor = clazz.getDeclaredConstructor();
if (ctor != null) {
return ctor.newInstance();
}
} catch (Exception e) {
// 构造失败时返回EmptyTask
}
return new EmptyTask(type);
}
}