Compare commits

...

13 Commits

Author SHA1 Message Date
even
ead82b946d 执行逻辑修改-每次启动标记上一次执行为执行失败 2025-05-30 14:52:57 +08:00
even
82f1aaa72f 引擎逻辑修改 2025-05-30 12:53:21 +08:00
even
00a172715f Merge branch 'temp' of http://1.14.125.6:3000/mianbin/ops-pro into temp 2025-05-30 11:40:23 +08:00
even
f596fd101e worker修改 2025-05-30 11:39:24 +08:00
even
65a5b75a84 判空增加 2025-05-29 20:03:04 +08:00
even
c249975546 命令修改 2025-05-29 19:23:28 +08:00
even
59cdbf7401 Merge branch 'temp' of http://1.14.125.6:3000/mianbin/ops-pro into temp 2025-05-29 16:19:27 +08:00
even
056f15d68b 阻塞逻辑修改 2025-05-29 14:11:37 +08:00
even
06b1bea076 阻塞逻辑修改 2025-05-29 10:40:07 +08:00
even
1055eb3dcd Merge branch 'temp' of http://1.14.125.6:3000/mianbin/ops-pro into temp 2025-05-29 10:36:26 +08:00
even
14c4e9b6e5 阻塞逻辑修改 2025-05-29 10:32:58 +08:00
even
f0b36fad6d 失败状态逻辑更改 2025-05-29 10:15:10 +08:00
even
5be88bf251 抽象类改为接口类 2025-05-29 09:24:27 +08:00
19 changed files with 176 additions and 50 deletions

View File

@ -73,7 +73,7 @@ public class OpsTenantAutoConfiguration {
// ========== Security ==========
@Bean
// @Bean
public FilterRegistrationBean<TenantSecurityWebFilter> tenantSecurityWebFilter(TenantProperties tenantProperties,
WebProperties webProperties,
GlobalExceptionHandler globalExceptionHandler,

View File

@ -13,7 +13,7 @@ public class ExecutorConfig {
@Bean("parallelExecutor")
public ThreadPoolTaskExecutor pipelineExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Parallel-");
@ -25,7 +25,7 @@ public class ExecutorConfig {
@Bean("serialExecutor")
public ThreadPoolTaskExecutor serialExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Serial-");
@ -37,7 +37,7 @@ public class ExecutorConfig {
@Bean("workerExecutor")
public ThreadPoolTaskExecutor workerExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Worker-");

View File

@ -2,6 +2,7 @@ package cd.casic.ci.process.engine.dispatcher;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import lombok.extern.slf4j.Slf4j;
import java.util.List;

View File

@ -8,6 +8,7 @@ import cd.casic.ci.process.engine.runContext.SecondStageRunContext;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;
@ -19,7 +20,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class ParallelDispatcher implements BaseDispatcher{
private List<PipStage> firstStageList;
@ -34,7 +35,6 @@ public class ParallelDispatcher implements BaseDispatcher{
this.pipelineRunContext = context;
this.stageIndex = 0;
this.runContextManager = contextManager;
contextManager.contextRegister(context);
this.redisMQTemplate = redisMQTemplate;
this.taskExecutor = taskExecutor;
}
@ -59,7 +59,16 @@ public class ParallelDispatcher implements BaseDispatcher{
}
// 等待当前阶段执行
latch.await();
if (pipelineRunContext.getState().get()== ContextStateEnum.BAD_ENDING.getCode()) {
log.error("并行执行停止");
break;
}
// TODO 检查是否全部执行成功 目前没有逻辑就是忽略错误
// 当前执行失败
// while (pipelineRunContext.getState().get() != ContextStateEnum.RUNNING.getCode()) {
// // 想办法借助工具类 或者直接wait
// pipelineRunContext.pause();
// }
}
}
@Override

View File

@ -11,12 +11,13 @@ import cd.casic.ci.process.process.dataObject.pipeline.PipPipeline;
import cd.casic.ci.process.process.dataObject.stage.PipStage;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.framework.mq.redis.core.RedisMQTemplate;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class SerialDispatcher implements BaseDispatcher {
private SecondStageRunContext stageRunContext;
private List<PipTask> taskList;
@ -47,13 +48,20 @@ public class SerialDispatcher implements BaseDispatcher {
TaskRunMessage taskRunMessage = new TaskRunMessage(pipTask);
redisMQTemplate.send(taskRunMessage);
// TODO 监听当前taskContext状态变成执行成功或者执行失败(worker当中改变状态为运行中执行成功执行失败)
//
AtomicInteger state = taskRunContext.getState();
// 如果不为正常执行成功就暂时阻塞直到有状态更改
while (state.get() != ContextStateEnum.HAPPY_ENDING.getCode()
&& state.get() != ContextStateEnum.BAD_ENDING.getCode()) {
Thread.sleep(1000L);
&& state.get() != ContextStateEnum.BAD_ENDING.getCode()
) {
// Thread.sleep(1000L);
taskRunContext.pause();
}
//
if (state.get()== ContextStateEnum.BAD_ENDING.getCode()) {
log.error("串行执行停止");
break;
}
}
}

View File

@ -61,6 +61,7 @@ public class DefaultPipelineExecutor implements PipelineExecutor {
}
// 如果要做 容灾就需要重新将数据库存的记录按顺序加载入
PipelineRunContext pipelineRunContext = new PipelineRunContext(pipeline,childCount,null,new ConcurrentHashMap<>(),new ConcurrentHashMap<>());
runContextManager.contextRegister(pipelineRunContext);
ParallelDispatcher parallelDispatcher = new ParallelDispatcher(mainStage,pipelineRunContext,runContextManager,redisMQTemplate,serialExecutor);
parallelExecutor.execute(parallelDispatcher);
return pipelineRunContext;

View File

@ -1,5 +1,6 @@
package cd.casic.ci.process.engine.manager.impl;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.manager.RunContextManager;
import cd.casic.ci.process.engine.runContext.BaseRunContext;
import cd.casic.ci.process.engine.runContext.PipelineRunContext;
@ -10,6 +11,7 @@ import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -51,6 +53,10 @@ public class DefaultRunContextManager implements RunContextManager {
String id = contextDef.getId();
BaseRunContext parentContext = context.getParentContext();
if (context instanceof PipelineRunContext pipelineRunContext) {
if (contextMap.containsKey(id)) {
PipelineRunContext oldPipeline = contextMap.get(id);
oldPipeline.changeContextStateAndChild(ContextStateEnum.BAD_ENDING);
}
contextMap.put(id,pipelineRunContext);
} else {
if (parentContext==null) {
@ -84,4 +90,12 @@ public class DefaultRunContextManager implements RunContextManager {
}
return null;
}
public void changePipelineState(String pipelineId,ContextStateEnum stateEnum){
PipelineRunContext pipelineRunContext = contextMap.get(pipelineId);
if (pipelineRunContext==null) {
return;
}
pipelineRunContext.changeContextStateAndChild(stateEnum);
}
}

View File

@ -6,13 +6,18 @@ import cd.casic.framework.commons.exception.ServiceException;
import cd.casic.framework.commons.exception.enums.GlobalErrorCodeConstants;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.annotation.Transient;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@Data
@Slf4j
public abstract class BaseRunContext {
/**
* 当前上下文的定义
@ -46,6 +51,10 @@ public abstract class BaseRunContext {
* */
private Map<String,Object> localVariables;
private Map<String,BaseRunContext> childContext;
/**
* 用来在控制其他地方的阻塞放行,countDown为1
* */
private CountDownLatch countDownLatch;
public BaseRunContext(PipBaseElement contextDef,Integer childCount,BaseRunContext parentContext, LocalDateTime startTime, String resourceId, String targetVersionId, String targetType, Map<String, Object> globalVariables, Map<String, Object> localVariables, Map<String, BaseRunContext> childContext) {
this.contextDef = contextDef;
@ -64,9 +73,23 @@ public abstract class BaseRunContext {
ContextStateEnum curr = ContextStateEnum.getByCode(state.get());
if (ContextStateEnum.canGoto(curr,stateEnum)) {
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
// 如果之前有暂停监听状态的.则停止暂停
// unpause();
callParentChange(stateEnum);
}
}
public void changeContextStateAndChild(ContextStateEnum stateEnum){
ContextStateEnum curr = ContextStateEnum.getByCode(state.get());
if (ContextStateEnum.canGoto(curr,stateEnum)) {
state.compareAndExchange(curr.getCode(),stateEnum.getCode());
Collection<BaseRunContext> values = this.getChildContext().values();
if (!CollectionUtils.isEmpty(values)) {
for (BaseRunContext value : values) {
value.changeContextStateAndChild(stateEnum);
}
}
}
}
// 保证一直都操作同一个引用的值
private void setState(AtomicInteger state) {
this.state = state;
@ -83,6 +106,7 @@ public abstract class BaseRunContext {
}
if (ContextStateEnum.HAPPY_ENDING.equals(state)||ContextStateEnum.BAD_ENDING.equals(state)) {
this.endTime=LocalDateTime.now();
unpause();
parentContext.checkChildEnd();
} else if(ContextStateEnum.READY.equals(state)){
parentContext.checkChildReady();
@ -163,4 +187,25 @@ public abstract class BaseRunContext {
}
}
}
public void pause(){
if (countDownLatch==null) {
synchronized(this) {
if (this.countDownLatch == null) {
this.countDownLatch= new CountDownLatch(1);
}
}
}
try {
this.countDownLatch.await();
} catch (InterruptedException e) {
log.error(e.getMessage());}
}
private void unpause(){
if (this.countDownLatch!=null) {
this.countDownLatch.countDown();
if (this.countDownLatch.getCount()==0) {
this.countDownLatch=null;
}
}
}
}

View File

@ -44,7 +44,7 @@ public class AFLWorker extends SshWorker {
//获取机器
MachineInfo machineInfoDO = this.getMachineInfoService().getById(machineId);
statusCode = shell(machineInfoDO, CryptogramUtil.doDecrypt(machineInfoDO.getPassword()), context,
statusCode = shell(machineInfoDO, null, context,
"echo \"自定义镜像执行命令\"",
commandScript
);
@ -56,8 +56,10 @@ public class AFLWorker extends SshWorker {
}
if (statusCode == 0) {
log.info("节点执行完成");
append(context,"节点执行完成");
} else {
log.error("节点执行失败");
append(context,"节点执行失败");
}
localVariables.put(DIYImageExecuteCommandConstant.STATUS_CODE, statusCode);
}

View File

@ -8,6 +8,7 @@ import cd.casic.ci.process.engine.worker.base.SshWorker;
import cd.casic.ci.process.process.dataObject.log.PipTaskLog;
import cd.casic.ci.process.process.dataObject.machine.MachineInfo;
import cd.casic.ci.process.process.dataObject.task.PipTask;
import cd.casic.ci.process.util.CryptogramUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -55,7 +56,8 @@ public class DIYImageExecuteCommandWorker extends SshWorker {
//获取机器
MachineInfo machineInfoDO = this.getMachineInfoService().getById(machineId);
statusCode = shell(machineInfoDO,"Hnidc@0626cn!@#zyx",context,
//TODO 得改一下
statusCode = shell(machineInfoDO, null,context,
"echo \"自定义镜像执行命令\"",
commandScript
);

View File

@ -34,38 +34,38 @@ public class TargetHandleWorker extends BaseWorker {
private MachineInfoService machineInfoService;
@Override
public void execute(TaskRunContext context) {
String filePath = "";
Map<String, Object> localVariables = context.getLocalVariables();
PipBaseElement taskContextDef = context.getContextDef();
if (taskContextDef instanceof PipTask pipTask){
// 查询并下载目标文件
String pipelineId = pipTask.getPipelineId();
//根据流水线id查询流水线信息
PipPipeline pipeline = (PipPipeline) getContextManager().getContext(pipelineId).getContextDef();
//根据目标id查询目标信息
if (StringUtils.isEmpty(pipeline.getTargetVersionId())){
throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"目标文件不存在");
}
TargetVersion targetVersion = targetVersionService.getById(pipeline.getTargetVersionId());
filePath = targetVersion.getFilePath();
File file = new File(filePath);
if (!file.exists() || !file.canRead()) {
log.error("目标文件不存在或不可读");
localVariables.put("statusCode", "-1");
append(context,"目标文件不存在或不可读");
toBadEnding();
}
// 上传文件
String machineId = pipeline.getMachineId();
MachineInfo byId = machineInfoService.getById(machineId);
append(context,"开始文件上传");
try {
SftpUploadUtil.uploadFileViaSftp(byId.getMachineHost(),byId.getSshPort(),byId.getUsername(), CryptogramUtil.doDecrypt(byId.getPassword()),null,file.getAbsolutePath(),"/home/casic/706/ai_test_527",file.getName());
} catch (SftpUploadUtil.SftpUploadException e) {
log.error("文件上传失败",e);
toBadEnding();
}
append(context,"文件上传至"+byId.getMachineHost()+" /home/casic/706/ai_test_527");
}
// String filePath = "";
// Map<String, Object> localVariables = context.getLocalVariables();
// PipBaseElement taskContextDef = context.getContextDef();
// if (taskContextDef instanceof PipTask pipTask){
// // 查询并下载目标文件
// String pipelineId = pipTask.getPipelineId();
// //根据流水线id查询流水线信息
// PipPipeline pipeline = (PipPipeline) getContextManager().getContext(pipelineId).getContextDef();
// //根据目标id查询目标信息
// if (StringUtils.isEmpty(pipeline.getTargetVersionId())){
// throw new ServiceException(GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR.getCode(),"目标文件不存在");
// }
// TargetVersion targetVersion = targetVersionService.getById(pipeline.getTargetVersionId());
// filePath = targetVersion.getFilePath();
// File file = new File(filePath);
// if (!file.exists() || !file.canRead()) {
// log.error("目标文件不存在或不可读");
// localVariables.put("statusCode", "-1");
// append(context,"目标文件不存在或不可读");
// toBadEnding();
// }
// // 上传文件
// String machineId = pipeline.getMachineId();
// MachineInfo byId = machineInfoService.getById(machineId);
// append(context,"开始文件上传");
// try {
// SftpUploadUtil.uploadFileViaSftp(byId.getMachineHost(),byId.getSshPort(),byId.getUsername(), CryptogramUtil.doDecrypt(byId.getPassword()),null,file.getAbsolutePath(),"/home/casic/706/ai_test_527",file.getName());
// } catch (SftpUploadUtil.SftpUploadException e) {
// log.error("文件上传失败",e);
// toBadEnding();
// }
// append(context,"文件上传至"+byId.getMachineHost()+" /home/casic/706/ai_test_527");
// }
}
}

View File

@ -1,5 +1,6 @@
package cd.casic.ci.process.engine.worker;
import cd.casic.ci.common.pipeline.annotation.Plugin;
import cd.casic.ci.process.engine.runContext.TaskRunContext;
import cd.casic.ci.process.engine.worker.base.BaseWorker;
import cd.casic.ci.process.process.dataObject.base.PipBaseElement;
@ -8,7 +9,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
//@Plugin(taskType = "test")
@Plugin(taskType = "GIT")
public class TestWorker extends BaseWorker {

View File

@ -68,6 +68,7 @@ public abstract class BaseWorker implements Runnable{
} catch (Exception e) {
log.error("================worker执行报错",e);
taskRunContext.changeContextState(ContextStateEnum.BAD_ENDING);
append(context,e.getMessage());
return;
}
// TODO 执行结束修改context的state,并且通知父类

View File

@ -2,7 +2,7 @@ package cd.casic.ci.process.engine.worker.base;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
public abstract class PassableWorker extends BaseWorker{
public interface PassableWorker{
/**
* 这个方法用于阻塞
* */

View File

@ -190,7 +190,15 @@ public class SshCommand implements SshClient {
} else {
realCmd = cmd;
}
commander.append(realCmd).append(CommandConstant.ENTER);
for (String s : realCmd.split("\n")) {
commander.append(s).append(CommandConstant.ENTER);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
commander.append("exit").append(CommandConstant.ENTER);

View File

@ -97,4 +97,26 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path>
<path>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${mapstruct.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,5 +1,6 @@
package cd.casic.module.system.service.dict;
import cd.casic.framework.commons.util.collection.CollectionUtils;
import cd.casic.module.system.controller.admin.dict.vo.data.DictDataRespVO;
import cd.casic.module.system.controller.admin.dict.vo.data.DictDataTreeVO;
import cd.casic.module.system.convert.dict.DictDataConvert;
@ -7,7 +8,6 @@ import cd.casic.module.system.convert.dict.DictTypeConvert;
import cn.hutool.core.collection.CollUtil;
import cd.casic.framework.commons.enums.CommonStatusEnum;
import cd.casic.framework.commons.pojo.PageResult;
import cd.casic.framework.commons.util.collection.CollectionUtils;
import cd.casic.framework.commons.util.object.BeanUtils;
import cd.casic.module.system.controller.admin.dict.vo.data.DictDataPageReqVO;
import cd.casic.module.system.controller.admin.dict.vo.data.DictDataSaveReqVO;
@ -192,7 +192,9 @@ public class DictDataServiceImpl implements DictDataService {
if (CollUtil.isEmpty(list)) {
return dataTree;
}
if (org.springframework.util.CollectionUtils.isEmpty(list)) {
return new ArrayList<>();
}
Map<String, List<DictDataRespVO>> map = list.stream().collect(Collectors.groupingBy(DictDataRespVO::getDictType));
for (DictDataTreeVO treeVO : dataTree) {
treeVO.setChildren(map.get(treeVO.getType()));

View File

@ -0,0 +1,6 @@
package cd.casic.server;
public class LogTest {
public static void main(String[] args) {
}
}

File diff suppressed because one or more lines are too long