Merge branch 'temp' of http://1.14.125.6:3000/mianbin/ops-pro into temp
# Conflicts: # modules/module-ci-process-biz/src/main/java/cd/casic/ci/process/engine/scheduler/PipelineSchedulingBootstrapper.java
This commit is contained in:
commit
d7a852d556
@ -1,14 +1,15 @@
|
||||
package cd.casic.ci.process.engine.scheduler;
|
||||
|
||||
import cd.casic.ci.process.engine.enums.ContextStateEnum;
|
||||
import cd.casic.ci.process.engine.executor.PipelineExecutor;
|
||||
import cd.casic.ci.process.engine.scheduler.config.PipelineSchedulerConfig;
|
||||
import cd.casic.ci.process.engine.scheduler.config.QuartzSchedulerManager;
|
||||
import cd.casic.ci.process.engine.scheduler.dao.PipelineSchedulingPropertiesDao;
|
||||
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
|
||||
import cd.casic.ci.process.engine.scheduler.service.impl.PipelineSchedulingPropertiesServiceImpl;
|
||||
import cd.casic.ci.process.engine.scheduler.req.PipelineSchedulingReq;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ -16,29 +17,32 @@ import java.util.List;
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName PipelineSchedulingBootstrapper
|
||||
* @Date: 2025/5/26 17:28
|
||||
* @Date: 2025/5/27 17:46
|
||||
* @Description:
|
||||
*/
|
||||
//@Component
|
||||
//@RequiredArgsConstructor
|
||||
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class PipelineSchedulingBootstrapper {
|
||||
@Resource
|
||||
private PipelineSchedulingPropertiesServiceImpl taskService;
|
||||
private final QuartzSchedulerManager quartzSchedulerManager;
|
||||
private final PipelineExecutor pipelineExecutor;
|
||||
|
||||
@Resource
|
||||
private PipelineSchedulerConfig pipelineSchedulerConfig;
|
||||
|
||||
@Resource
|
||||
private PipelineExecutor pipelineExecutor;
|
||||
private PipelineSchedulingPropertiesDao pipelineSchedulingPropertiesDao;
|
||||
|
||||
@PostConstruct
|
||||
public void init(){
|
||||
List<PipelineSchedulingProperties> tasks = taskService.getAllTasks();
|
||||
List<PipelineSchedulingProperties> tasks = pipelineSchedulingPropertiesDao.selectList();
|
||||
if (!CollectionUtils.isEmpty(tasks)){
|
||||
for (PipelineSchedulingProperties task : tasks) {
|
||||
if (ContextStateEnum.RUNNING.getCode().equals(Integer.parseInt(task.getStatus()))) {
|
||||
pipelineSchedulerConfig.addTask(task.getPipelineId(), ()->{
|
||||
pipelineExecutor.execute(task.getPipelineId());
|
||||
}, task.getCronExpression());
|
||||
try {
|
||||
PipelineSchedulingReq req = new PipelineSchedulingReq();
|
||||
req.setPipelineId(task.getPipelineId());
|
||||
req.setCron(task.getCron());
|
||||
quartzSchedulerManager.addTask(req);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,150 +0,0 @@
|
||||
package cd.casic.ci.process.engine.scheduler.config;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.SchedulingConfigurer;
|
||||
import org.springframework.scheduling.config.CronTask;
|
||||
import org.springframework.scheduling.config.ScheduledTask;
|
||||
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
/**
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName PipelineSchedulerConfig
|
||||
* @Date: 2025/5/26 10:55
|
||||
* @Description:
|
||||
*/
|
||||
|
||||
@Configuration
|
||||
@EnableScheduling
|
||||
@Slf4j
|
||||
@Component("pipelineSchedulerConfig")
|
||||
public class PipelineSchedulerConfig implements SchedulingConfigurer {
|
||||
private ScheduledTaskRegistrar taskRegistrar;
|
||||
|
||||
private final Map<String, ScheduledTask> taskFutures = new HashMap<>();
|
||||
|
||||
private final Map<String, CronTask> cronTaskMap = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void configureTasks(@NotNull ScheduledTaskRegistrar taskRegistrar) {
|
||||
this.taskRegistrar = taskRegistrar;
|
||||
log.info("ScheduledTaskRegistrar 初始化完成");
|
||||
}
|
||||
|
||||
|
||||
public void addTask(String taskId, Runnable task, String cronExpression) {
|
||||
if (taskRegistrar == null) {
|
||||
log.error("【定时任务失败】taskRegistrar 尚未初始化,请检查调度器是否已启动");
|
||||
return;
|
||||
}
|
||||
|
||||
CronTask cronTask = new CronTask(task, cronExpression);
|
||||
ScheduledTask scheduledTask = taskRegistrar.scheduleCronTask(cronTask);
|
||||
taskFutures.put(taskId, scheduledTask);
|
||||
cronTaskMap.put(taskId, cronTask);
|
||||
|
||||
log.info("任务 [{}] 已添加,cron: {}", taskId, cronExpression);
|
||||
}
|
||||
|
||||
public void removeTask(String taskId) {
|
||||
ScheduledTask scheduledTask = taskFutures.get(taskId);
|
||||
if (scheduledTask != null) {
|
||||
ScheduledFuture<?> future = getFuture(scheduledTask);
|
||||
if (future != null) {
|
||||
future.cancel(true);
|
||||
}
|
||||
taskFutures.remove(taskId);
|
||||
cronTaskMap.remove(taskId);
|
||||
|
||||
log.info("任务 [{}] 已删除", taskId);
|
||||
}
|
||||
}
|
||||
|
||||
public void updateTask(String taskId, String cronExpression) {
|
||||
removeTask(taskId);
|
||||
CronTask oldTask = cronTaskMap.get(taskId);
|
||||
if (oldTask == null) {
|
||||
throw new IllegalArgumentException("未找到对应的任务: " + taskId);
|
||||
}
|
||||
addTask(taskId, oldTask.getRunnable(), cronExpression);
|
||||
log.info("任务 [{}] 已更新,新 cron: {}", taskId, cronExpression);
|
||||
}
|
||||
|
||||
public void startTask(String taskId) {
|
||||
ScheduledTask scheduledTask = taskFutures.get(taskId);
|
||||
if (scheduledTask == null) {
|
||||
log.warn("任务 [{}] 不存在", taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
ScheduledFuture<?> future = getFuture(scheduledTask);
|
||||
if (future != null && !future.isCancelled()) {
|
||||
log.info("任务 [{}] 正在运行中,无需重启", taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
CronTask cronTask = cronTaskMap.get(taskId);
|
||||
if (cronTask == null) {
|
||||
log.warn("未找到对应的任务定义: {}", taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
ScheduledTask newTask = taskRegistrar.scheduleCronTask(cronTask);
|
||||
taskFutures.put(taskId, newTask);
|
||||
log.info("任务 [{}] 已重启", taskId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 停止一个任务
|
||||
*/
|
||||
public void stopTask(String taskId) {
|
||||
removeTask(taskId);
|
||||
log.info("任务 [{}] 已停止", taskId);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取 ScheduledTask 中的 future 字段
|
||||
*/
|
||||
private ScheduledFuture<?> getFuture(ScheduledTask scheduledTask) {
|
||||
try {
|
||||
Field field = scheduledTask.getClass().getDeclaredField("future");
|
||||
field.setAccessible(true);
|
||||
return (ScheduledFuture<?>) field.get(scheduledTask);
|
||||
} catch (Exception e) {
|
||||
log.error("反射获取 future 失败: {}", e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 判断任务是否已取消
|
||||
*/
|
||||
public boolean isTaskCancelled(String taskId) {
|
||||
ScheduledTask scheduledTask = taskFutures.get(taskId);
|
||||
if (scheduledTask == null) return true;
|
||||
|
||||
ScheduledFuture<?> future = getFuture(scheduledTask);
|
||||
return future != null && future.isCancelled();
|
||||
}
|
||||
|
||||
// @Component
|
||||
// public static class PipelineSchedulerTask implements Runnable {
|
||||
//
|
||||
// @Override
|
||||
// public void run() {
|
||||
// // 定时任务逻辑代码
|
||||
// System.out.println("Pipeline Task: " + new Date());
|
||||
// }
|
||||
// }
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
package cd.casic.ci.process.engine.scheduler.config;
|
||||
|
||||
import org.quartz.Scheduler;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
|
||||
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
|
||||
|
||||
/**
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName QuartzConfig
|
||||
* @Date: 2025/5/27 21:16
|
||||
* @Description:
|
||||
*/
|
||||
|
||||
@Configuration
|
||||
public class QuartzConfig {
|
||||
private final ApplicationContext applicationContext;
|
||||
|
||||
public QuartzConfig(ApplicationContext applicationContext) {
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Scheduler scheduler() throws Exception {
|
||||
SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
|
||||
jobFactory.setApplicationContext(applicationContext);
|
||||
|
||||
SchedulerFactoryBean factory = new SchedulerFactoryBean();
|
||||
factory.setJobFactory(jobFactory);
|
||||
factory.afterPropertiesSet();
|
||||
|
||||
Scheduler scheduler = factory.getScheduler();
|
||||
scheduler.start();
|
||||
return scheduler;
|
||||
}
|
||||
}
|
@ -0,0 +1,155 @@
|
||||
package cd.casic.ci.process.engine.scheduler.config;
|
||||
|
||||
import cd.casic.ci.process.engine.scheduler.dao.PipelineSchedulingPropertiesDao;
|
||||
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
|
||||
import cd.casic.ci.process.engine.scheduler.job.PipelineSchedulingJob;
|
||||
import cd.casic.ci.process.engine.scheduler.req.PipelineSchedulingReq;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.quartz.*;
|
||||
import org.quartz.impl.matchers.GroupMatcher;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.quartz.JobBuilder.newJob;
|
||||
import static org.quartz.TriggerBuilder.newTrigger;
|
||||
|
||||
/**
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName QuartzSchedulerManager
|
||||
* @Date: 2025/5/27 17:31
|
||||
* @Description:
|
||||
*/
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class QuartzSchedulerManager {
|
||||
private static final String JOB_GROUP_NAME = "PIPELINE_JOB_GROUP";
|
||||
private static final String TRIGGER_GROUP_NAME = "PIPELINE_TRIGGER_GROUP";
|
||||
|
||||
@Resource
|
||||
private Scheduler scheduler;
|
||||
|
||||
@Resource
|
||||
private PipelineSchedulingPropertiesDao pipelineSchedulingPropertiesDao;
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws SchedulerException {
|
||||
log.info("Quartz 调度器初始化完成");
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加一个定时任务
|
||||
*/
|
||||
public void addTask(PipelineSchedulingReq req) throws Exception {
|
||||
JobDetail jobDetail = newJob(PipelineSchedulingJob.class)
|
||||
.withIdentity(req.getPipelineId(), JOB_GROUP_NAME)
|
||||
.usingJobData(PipelineSchedulingJob.JOB_PARAM_PIPELINE_ID, req.getPipelineId())
|
||||
.build();
|
||||
|
||||
Trigger trigger = newTrigger()
|
||||
.withIdentity(req.getPipelineId(), TRIGGER_GROUP_NAME)
|
||||
.withSchedule(CronScheduleBuilder.cronSchedule(req.getCron()))
|
||||
.build();
|
||||
|
||||
scheduler.scheduleJob(jobDetail, trigger);
|
||||
log.info("任务 [{}] 已添加,cron: {}", req.getPipelineId(), req.getCron());
|
||||
|
||||
QueryWrapper<PipelineSchedulingProperties> wrapper = new QueryWrapper<>();
|
||||
wrapper.eq("pipeline_id", req.getPipelineId());
|
||||
wrapper.eq("cron", req.getCron());
|
||||
List<PipelineSchedulingProperties> list = pipelineSchedulingPropertiesDao.selectList(wrapper);
|
||||
|
||||
//为空则数据入库,否则仅为重新添加任务进调度器
|
||||
if (!CollectionUtils.isEmpty(list)){
|
||||
PipelineSchedulingProperties pipelineSchedulingProperties = new PipelineSchedulingProperties();
|
||||
pipelineSchedulingProperties.setPipelineId(req.getPipelineId());
|
||||
pipelineSchedulingProperties.setCron(req.getCron());
|
||||
pipelineSchedulingPropertiesDao.insert(pipelineSchedulingProperties);
|
||||
// pipelineSchedulingProperties.setStatus(PipelineSchedulingStatusEnum.READY.getCode());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除任务
|
||||
*/
|
||||
public void removeTask(PipelineSchedulingReq req) throws SchedulerException {
|
||||
TriggerKey triggerKey = TriggerKey.triggerKey(req.getPipelineId(), TRIGGER_GROUP_NAME);
|
||||
scheduler.pauseTrigger(triggerKey);
|
||||
scheduler.unscheduleJob(triggerKey);
|
||||
scheduler.deleteJob(JobKey.jobKey(req.getPipelineId(), JOB_GROUP_NAME));
|
||||
log.info("任务 [{}] 已删除", req.getPipelineId());
|
||||
|
||||
//数据删除
|
||||
QueryWrapper<PipelineSchedulingProperties> wrapper = new QueryWrapper<>();
|
||||
wrapper.eq("pipeline_id", req.getPipelineId());
|
||||
wrapper.eq("cron", req.getCron());
|
||||
List<PipelineSchedulingProperties> list = pipelineSchedulingPropertiesDao.selectList(wrapper);
|
||||
|
||||
if (!CollectionUtils.isEmpty(list)){
|
||||
List<String> idList = list.stream().map(PipelineSchedulingProperties::getId).toList();
|
||||
pipelineSchedulingPropertiesDao.deleteByIds(idList);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新任务
|
||||
*/
|
||||
public void updateTask(PipelineSchedulingReq req) throws Exception {
|
||||
removeTask(req);
|
||||
addTask(req);
|
||||
log.info("任务 [{}] 已更新,新 cron: {}", req.getPipelineId(), req.getCron());
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动/重启任务
|
||||
*/
|
||||
public void startTask(PipelineSchedulingReq req) throws SchedulerException {
|
||||
JobKey jobKey = JobKey.jobKey(req.getPipelineId(), JOB_GROUP_NAME);
|
||||
if (!scheduler.checkExists(jobKey)) {
|
||||
log.warn("任务 [{}] 不存在", req.getPipelineId());
|
||||
return;
|
||||
}
|
||||
|
||||
if (scheduler.isShutdown()) {
|
||||
scheduler.start();
|
||||
}
|
||||
|
||||
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
|
||||
for (Trigger trigger : triggers) {
|
||||
if (trigger.getNextFireTime() == null) {
|
||||
scheduler.rescheduleJob(TriggerKey.triggerKey(req.getPipelineId(), TRIGGER_GROUP_NAME), trigger);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("任务 [{}] 已重启", req.getPipelineId());
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断任务是否已停止
|
||||
*/
|
||||
public boolean isTaskPaused(PipelineSchedulingReq req) throws SchedulerException {
|
||||
JobKey jobKey = JobKey.jobKey(req.getPipelineId(), JOB_GROUP_NAME);
|
||||
return !scheduler.isStarted() || scheduler.getJobDetail(jobKey) == null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有任务 ID
|
||||
*/
|
||||
public Set<String> getAllTaskIds() throws SchedulerException {
|
||||
Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(JOB_GROUP_NAME));
|
||||
Set<String> taskIds = new HashSet<>();
|
||||
for (JobKey jobKey : jobKeys) {
|
||||
taskIds.add(jobKey.getName());
|
||||
}
|
||||
return taskIds;
|
||||
}
|
||||
}
|
@ -0,0 +1,52 @@
|
||||
package cd.casic.ci.process.engine.scheduler.controller;
|
||||
|
||||
import cd.casic.ci.process.engine.scheduler.config.QuartzSchedulerManager;
|
||||
import cd.casic.ci.process.engine.scheduler.req.PipelineSchedulingReq;
|
||||
import cd.casic.framework.commons.pojo.CommonResult;
|
||||
import jakarta.annotation.Resource;
|
||||
import jakarta.validation.Valid;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName PipelineTaskController
|
||||
* @Date: 2025/5/27 19:18
|
||||
* @Description:
|
||||
*/
|
||||
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/scheduler")
|
||||
public class PipelineSchedulingController {
|
||||
|
||||
@Resource
|
||||
private QuartzSchedulerManager quartzSchedulerManager;
|
||||
|
||||
@PostMapping("/add")
|
||||
public CommonResult<Void> add(@RequestBody @Valid PipelineSchedulingReq req) throws Exception {
|
||||
quartzSchedulerManager.addTask(req);
|
||||
return CommonResult.success();
|
||||
}
|
||||
|
||||
@PostMapping("/remove")
|
||||
public CommonResult<Void> remove(@RequestBody @Valid PipelineSchedulingReq req) throws Exception {
|
||||
quartzSchedulerManager.removeTask(req);
|
||||
return CommonResult.success();
|
||||
}
|
||||
|
||||
@PostMapping("/update")
|
||||
public CommonResult<Void> update(@RequestBody @Valid PipelineSchedulingReq req) throws Exception {
|
||||
quartzSchedulerManager.updateTask(req);
|
||||
return CommonResult.success();
|
||||
}
|
||||
|
||||
@PostMapping("/start")
|
||||
public CommonResult<Void> start(@RequestBody @Valid PipelineSchedulingReq req) throws Exception {
|
||||
quartzSchedulerManager.startTask(req);
|
||||
return CommonResult.success();
|
||||
}
|
||||
}
|
@ -1,54 +0,0 @@
|
||||
package cd.casic.ci.process.engine.scheduler.controller;
|
||||
|
||||
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
|
||||
import cd.casic.ci.process.engine.scheduler.service.PipelineSchedulingPropertiesService;
|
||||
import cd.casic.ci.process.process.dataObject.base.BaseIdReq;
|
||||
import jakarta.annotation.Resource;
|
||||
import jakarta.validation.Valid;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName PipelineSchedulingPropertiesController
|
||||
* @Date: 2025/5/26 16:56
|
||||
* @Description:
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/scheduling")
|
||||
public class PipelineSchedulingPropertiesController {
|
||||
@Resource
|
||||
private PipelineSchedulingPropertiesService pipelineSchedulingPropertiesService;
|
||||
|
||||
@PostMapping("/getAll")
|
||||
public List<PipelineSchedulingProperties> getAll() {
|
||||
return pipelineSchedulingPropertiesService.getAllTasks();
|
||||
}
|
||||
|
||||
@PostMapping("/create")
|
||||
public void create(@RequestBody @Valid PipelineSchedulingProperties task) {
|
||||
pipelineSchedulingPropertiesService.addTask(task);
|
||||
}
|
||||
|
||||
@PostMapping("/update")
|
||||
public void update(@RequestBody @Valid PipelineSchedulingProperties task) {
|
||||
pipelineSchedulingPropertiesService.updateTask(task);
|
||||
}
|
||||
|
||||
@PostMapping("/delete")
|
||||
public void delete(@RequestBody @Valid BaseIdReq req) {
|
||||
pipelineSchedulingPropertiesService.deleteTask(req.getId());
|
||||
}
|
||||
|
||||
@PostMapping("/start")
|
||||
public void start(@RequestBody @Valid BaseIdReq req) {
|
||||
pipelineSchedulingPropertiesService.startTask(req.getId());
|
||||
}
|
||||
|
||||
@PostMapping("/stop")
|
||||
public void stop(@RequestBody @Valid BaseIdReq req) {
|
||||
pipelineSchedulingPropertiesService.stopTask(req.getId());
|
||||
}
|
||||
}
|
@ -9,7 +9,7 @@ import lombok.EqualsAndHashCode;
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName PipelineSchedulingProperties
|
||||
* @Date: 2025/5/26 16:27
|
||||
* @Date: 2025/5/27 19:57
|
||||
* @Description:
|
||||
*/
|
||||
|
||||
@ -22,15 +22,10 @@ public class PipelineSchedulingProperties extends PipBaseElement {
|
||||
*/
|
||||
private String pipelineId;
|
||||
|
||||
/**
|
||||
* 任务名称
|
||||
*/
|
||||
private String pipelineName;
|
||||
|
||||
/**
|
||||
* 定时表达式
|
||||
*/
|
||||
private String cronExpression;
|
||||
private String cron;
|
||||
|
||||
/**
|
||||
* 执行状态
|
||||
|
@ -0,0 +1,56 @@
|
||||
package cd.casic.ci.process.engine.scheduler.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.EnumMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
@Getter
|
||||
public enum PipelineSchedulingStatusEnum {
|
||||
INIT("0","初始化"),
|
||||
READY("1","就绪"),
|
||||
RUNNING("2","运行"),
|
||||
SUSPEND("3","挂起"),
|
||||
STOP("-1","停止"),
|
||||
HAPPY_ENDING("4","执行成功"),
|
||||
BAD_ENDING("5","执行失败")
|
||||
;
|
||||
|
||||
private String code;
|
||||
private String msg;
|
||||
private static final Map<PipelineSchedulingStatusEnum, Set<PipelineSchedulingStatusEnum>> TRANSITIONS = new EnumMap<>(PipelineSchedulingStatusEnum.class);
|
||||
|
||||
static {
|
||||
TRANSITIONS.put(INIT, Set.of(READY, SUSPEND, BAD_ENDING, STOP));
|
||||
TRANSITIONS.put(READY, Set.of(READY,RUNNING, SUSPEND, BAD_ENDING, STOP));
|
||||
TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP));
|
||||
TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP));
|
||||
//...初始化其他状态转移关系
|
||||
}
|
||||
|
||||
PipelineSchedulingStatusEnum(String code, String msg) {
|
||||
this.code = code;
|
||||
this.msg = msg;
|
||||
|
||||
}
|
||||
public static Boolean canGoto(PipelineSchedulingStatusEnum from, PipelineSchedulingStatusEnum to){
|
||||
try {
|
||||
if (Objects.isNull(from) || Objects.isNull(to)) {
|
||||
return false;
|
||||
}
|
||||
return TRANSITIONS.get(from).contains(to);
|
||||
} catch (Exception e){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
public static PipelineSchedulingStatusEnum getByCode(String code){
|
||||
for (PipelineSchedulingStatusEnum value : values()) {
|
||||
if (value.getCode().equals(code)) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
package cd.casic.ci.process.engine.scheduler.job;
|
||||
|
||||
import cd.casic.ci.process.engine.executor.PipelineExecutor;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.quartz.Job;
|
||||
import org.quartz.JobDataMap;
|
||||
import org.quartz.JobExecutionContext;
|
||||
import org.quartz.JobExecutionException;
|
||||
|
||||
/**
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName PipelineTaskJob
|
||||
* @Date: 2025/5/27 17:28
|
||||
* @Description:
|
||||
*/
|
||||
public class PipelineSchedulingJob implements Job {
|
||||
public static final String JOB_PARAM_PIPELINE_ID = "pipelineId";
|
||||
|
||||
@Resource
|
||||
private PipelineExecutor executor;
|
||||
|
||||
@Override
|
||||
public void execute(JobExecutionContext context) throws JobExecutionException {
|
||||
JobDataMap dataMap = context.getMergedJobDataMap();
|
||||
String pipelineId = dataMap.getString(JOB_PARAM_PIPELINE_ID);
|
||||
|
||||
if (pipelineId != null && executor != null) {
|
||||
executor.execute(pipelineId);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package cd.casic.ci.process.engine.scheduler.listener;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.quartz.Scheduler;
|
||||
import org.quartz.SchedulerException;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName QuartzStartupListener
|
||||
* @Date: 2025/5/27 20:42
|
||||
* @Description:
|
||||
*/
|
||||
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class QuartzStartupListener {
|
||||
private final Scheduler scheduler;
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void onApplicationReady() throws SchedulerException {
|
||||
if (!scheduler.isStarted()) {
|
||||
scheduler.start();
|
||||
System.out.println("Quartz 调度器已手动启动");
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package cd.casic.ci.process.engine.scheduler.req;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName PipelineSchedulingReq
|
||||
* @Date: 2025/5/27 19:24
|
||||
* @Description:
|
||||
*/
|
||||
@Data
|
||||
public class PipelineSchedulingReq {
|
||||
|
||||
private String pipelineId;
|
||||
|
||||
private String cron;
|
||||
|
||||
private String status;
|
||||
}
|
@ -1,27 +0,0 @@
|
||||
package cd.casic.ci.process.engine.scheduler.service;
|
||||
|
||||
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
|
||||
import jakarta.validation.Valid;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName PipelineSchedulingPropertiesService
|
||||
* @Date: 2025/5/26 16:48
|
||||
* @Description:
|
||||
*/
|
||||
public interface PipelineSchedulingPropertiesService {
|
||||
List<PipelineSchedulingProperties> getAllTasks();
|
||||
|
||||
void addTask(@Valid PipelineSchedulingProperties task);
|
||||
|
||||
void updateTask(@Valid PipelineSchedulingProperties task);
|
||||
|
||||
void deleteTask(String id);
|
||||
|
||||
void startTask(String id);
|
||||
|
||||
void stopTask(String id);
|
||||
}
|
@ -1,71 +0,0 @@
|
||||
package cd.casic.ci.process.engine.scheduler.service.impl;
|
||||
|
||||
import cd.casic.ci.process.engine.enums.ContextStateEnum;
|
||||
import cd.casic.ci.process.engine.scheduler.config.PipelineSchedulerConfig;
|
||||
import cd.casic.ci.process.engine.scheduler.dao.PipelineSchedulingPropertiesDao;
|
||||
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
|
||||
import cd.casic.ci.process.engine.scheduler.service.PipelineSchedulingPropertiesService;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName PipelineSchedulingPropertiesServiceImpl
|
||||
* @Date: 2025/5/26 16:49
|
||||
* @Description:
|
||||
*/
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class PipelineSchedulingPropertiesServiceImpl extends ServiceImpl<PipelineSchedulingPropertiesDao, PipelineSchedulingProperties> implements PipelineSchedulingPropertiesService {
|
||||
|
||||
@Resource
|
||||
private PipelineSchedulingPropertiesDao taskRepository;
|
||||
|
||||
@Resource
|
||||
private PipelineSchedulerConfig pipelineSchedulerConfig;
|
||||
|
||||
public List<PipelineSchedulingProperties> getAllTasks() {
|
||||
return taskRepository.selectList(null);
|
||||
}
|
||||
|
||||
public void addTask(PipelineSchedulingProperties task) {
|
||||
if (ContextStateEnum.RUNNING.getCode() == Integer.parseInt(task.getStatus())) {
|
||||
pipelineSchedulerConfig.addTask(task.getPipelineId(), () -> {
|
||||
},task.getCronExpression());
|
||||
}
|
||||
taskRepository.insert(task);
|
||||
}
|
||||
|
||||
public void updateTask(PipelineSchedulingProperties task) {
|
||||
PipelineSchedulingProperties old = taskRepository.selectById(task.getId());
|
||||
if (old != null) {
|
||||
pipelineSchedulerConfig.updateTask(old.getPipelineId(), task.getCronExpression());
|
||||
}
|
||||
taskRepository.updateById(task);
|
||||
}
|
||||
|
||||
public void deleteTask(String taskId) {
|
||||
pipelineSchedulerConfig.removeTask(taskId);
|
||||
taskRepository.delete(new QueryWrapper<PipelineSchedulingProperties>().eq("task_id", taskId));
|
||||
}
|
||||
|
||||
public void startTask(String taskId) {
|
||||
pipelineSchedulerConfig.startTask(taskId);
|
||||
taskRepository.update(null, new UpdateWrapper<PipelineSchedulingProperties>()
|
||||
.set("status", String.valueOf(ContextStateEnum.RUNNING.getCode())).eq("task_id", taskId));
|
||||
}
|
||||
|
||||
public void stopTask(String taskId) {
|
||||
pipelineSchedulerConfig.stopTask(taskId);
|
||||
taskRepository.update(null, new UpdateWrapper<PipelineSchedulingProperties>()
|
||||
.set("status", String.valueOf(ContextStateEnum.STOP.getCode())).eq("task_id", taskId));
|
||||
}
|
||||
}
|
@ -1,19 +0,0 @@
|
||||
package cd.casic.ci.process.engine.scheduler.trigger;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
/**
|
||||
* @author HopeLi
|
||||
* @version v1.0
|
||||
* @ClassName SchedulingTrigger
|
||||
* @Date: 2025/5/27 15:05
|
||||
* @Description:
|
||||
*/
|
||||
@Configuration
|
||||
public class SchedulingTrigger {
|
||||
@Scheduled(fixedRate = 5000)
|
||||
public void triggerScheduler() {
|
||||
// 用于触发调度器初始化
|
||||
}
|
||||
}
|
@ -4,7 +4,6 @@ import cd.casic.ci.common.pipeline.req.pipeline.PipelineCreateReq;
|
||||
import cd.casic.ci.common.pipeline.req.pipeline.PipelineQueryReq;
|
||||
import cd.casic.ci.common.pipeline.req.pipeline.PipelineReq;
|
||||
import cd.casic.ci.common.pipeline.req.pipeline.PipelineUpdateReq;
|
||||
import cd.casic.ci.common.pipeline.resp.context.SingletonRunContextResp;
|
||||
import cd.casic.ci.common.pipeline.resp.context.TreeRunContextResp;
|
||||
import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp;
|
||||
import cd.casic.ci.common.pipeline.resp.stage.StageResp;
|
||||
@ -143,7 +142,7 @@ public class PipelineServiceImpl extends ServiceImpl<PipelineDao, PipPipeline> i
|
||||
childTask11.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
childTask11.setTaskName("串行任务-1-1");
|
||||
childTask11.setPipelineId(pipeline.getId());
|
||||
childTask11.setTaskType("test");
|
||||
childTask11.setTaskType("code");
|
||||
childTask11.setTaskSort(1);
|
||||
childTask11.setStageId(childStage1.getId());
|
||||
taskService.save(childTask11);
|
||||
@ -176,31 +175,31 @@ public class PipelineServiceImpl extends ServiceImpl<PipelineDao, PipPipeline> i
|
||||
childTask21.setTaskName("串行任务-2-1");
|
||||
childTask21.setCreateTime(LocalDateTime.now());
|
||||
childTask21.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
childTask21.setTaskType("test");
|
||||
childTask21.setTaskType("TEST_CASE_GENERATION");
|
||||
childTask21.setTaskSort(1);
|
||||
childTask21.setStageId(childStage21.getId());
|
||||
taskService.save(childTask21);
|
||||
|
||||
//------------------------------------------------------------------
|
||||
PipStage childStage22 = new PipStage();
|
||||
childStage22.setStageName("并行阶段-2-2");
|
||||
childStage22.setPipelineId(pipeline.getId());
|
||||
childStage22.setCreateTime(LocalDateTime.now());
|
||||
childStage22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
childStage22.setStageSort(2);
|
||||
childStage22.setCode(false);
|
||||
childStage22.setParentId(stageReq2.getId());
|
||||
stageService.save(childStage22);
|
||||
|
||||
PipTask childTask22 = new PipTask();
|
||||
childTask22.setCreateTime(LocalDateTime.now());
|
||||
childTask22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
childTask22.setTaskName("串行任务-2-2");
|
||||
childTask22.setPipelineId(pipeline.getId());
|
||||
childTask22.setTaskType("test");
|
||||
childTask22.setTaskSort(2);
|
||||
childTask22.setStageId(childStage22.getId());
|
||||
taskService.save(childTask22);
|
||||
// PipStage childStage22 = new PipStage();
|
||||
// childStage22.setStageName("并行阶段-2-2");
|
||||
// childStage22.setPipelineId(pipeline.getId());
|
||||
// childStage22.setCreateTime(LocalDateTime.now());
|
||||
// childStage22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
// childStage22.setStageSort(2);
|
||||
// childStage22.setCode(false);
|
||||
// childStage22.setParentId(stageReq2.getId());
|
||||
// stageService.save(childStage22);
|
||||
//
|
||||
// PipTask childTask22 = new PipTask();
|
||||
// childTask22.setCreateTime(LocalDateTime.now());
|
||||
// childTask22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
// childTask22.setTaskName("串行任务-2-2");
|
||||
// childTask22.setPipelineId(pipeline.getId());
|
||||
// childTask22.setTaskType("test");
|
||||
// childTask22.setTaskSort(2);
|
||||
// childTask22.setStageId(childStage22.getId());
|
||||
// taskService.save(childTask22);
|
||||
|
||||
//创建第三个阶段
|
||||
PipStage stageReq3 = new PipStage();
|
||||
@ -228,41 +227,42 @@ public class PipelineServiceImpl extends ServiceImpl<PipelineDao, PipPipeline> i
|
||||
childTask31.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
childTask31.setTaskName("串行任务-3-1");
|
||||
childTask31.setPipelineId(pipeline.getId());
|
||||
childTask31.setTaskType("test");
|
||||
childTask31.setTaskType("AFL");
|
||||
childTask31.setTaskSort(1);
|
||||
childTask31.setStageId(childStage31.getId());
|
||||
taskService.save(childTask31);
|
||||
|
||||
//创建第四个阶段
|
||||
PipStage stageReq4 = new PipStage();
|
||||
stageReq4.setPipelineId(pipeline.getId());
|
||||
stageReq4.setStageName("阶段-4");
|
||||
stageReq4.setCreateTime(LocalDateTime.now());
|
||||
stageReq4.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
stageReq4.setStageSort(4);
|
||||
stageReq4.setParentId("-1");
|
||||
stageReq4.setCode(true);
|
||||
stageService.save(stageReq4);
|
||||
|
||||
PipStage childStage41 = new PipStage();
|
||||
childStage41.setStageName("并行任务-4-1");
|
||||
childStage41.setPipelineId(pipeline.getId());
|
||||
childStage41.setCreateTime(LocalDateTime.now());
|
||||
childStage41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
childStage41.setStageSort(1);
|
||||
childStage41.setCode(false);
|
||||
childStage41.setParentId(stageReq4.getId());
|
||||
stageService.save(childStage41);
|
||||
|
||||
PipTask childTask41 = new PipTask();
|
||||
childTask41.setCreateTime(LocalDateTime.now());
|
||||
childTask41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
childTask41.setTaskName("串行任务-4-1");
|
||||
childTask41.setPipelineId(pipeline.getId());
|
||||
childTask41.setTaskType("test");
|
||||
childTask41.setTaskSort(1);
|
||||
childTask41.setStageId(childStage41.getId());
|
||||
taskService.save(childTask41);
|
||||
//TODO
|
||||
// PipStage stageReq4 = new PipStage();
|
||||
// stageReq4.setPipelineId(pipeline.getId());
|
||||
// stageReq4.setStageName("阶段-4");
|
||||
// stageReq4.setCreateTime(LocalDateTime.now());
|
||||
// stageReq4.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
// stageReq4.setStageSort(4);
|
||||
// stageReq4.setParentId("-1");
|
||||
// stageReq4.setCode(true);
|
||||
// stageService.save(stageReq4);
|
||||
//
|
||||
// PipStage childStage41 = new PipStage();
|
||||
// childStage41.setStageName("并行任务-4-1");
|
||||
// childStage41.setPipelineId(pipeline.getId());
|
||||
// childStage41.setCreateTime(LocalDateTime.now());
|
||||
// childStage41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
// childStage41.setStageSort(1);
|
||||
// childStage41.setCode(false);
|
||||
// childStage41.setParentId(stageReq4.getId());
|
||||
// stageService.save(childStage41);
|
||||
//
|
||||
// PipTask childTask41 = new PipTask();
|
||||
// childTask41.setCreateTime(LocalDateTime.now());
|
||||
// childTask41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
|
||||
// childTask41.setTaskName("串行任务-4-1");
|
||||
// childTask41.setPipelineId(pipeline.getId());
|
||||
// childTask41.setTaskType("REPORT");
|
||||
// childTask41.setTaskSort(1);
|
||||
// childTask41.setStageId(childStage41.getId());
|
||||
// taskService.save(childTask41);
|
||||
|
||||
//TODO 创建对应的鉴权关系
|
||||
//TODO 创建对应的消息分发
|
||||
|
@ -70,7 +70,7 @@ spring:
|
||||
# Redis 配置。Redisson 默认的配置足够使用,一般不需要进行调优
|
||||
data:
|
||||
redis:
|
||||
host: 192.168.1.120 # 地址
|
||||
host: 127.0.0.1 # 地址
|
||||
port: 6379 # 端口
|
||||
database: 0 # 数据库索引
|
||||
# password: dev # 密码,建议生产环境开启
|
||||
|
@ -183,3 +183,33 @@ debug: false
|
||||
mybatis-plus:
|
||||
configuration:
|
||||
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
|
||||
|
||||
|
||||
# Quartz 配置项,对应 QuartzProperties 配置类
|
||||
spring:
|
||||
quartz:
|
||||
auto-startup: true # 本地开发环境,尽量不要开启 Job
|
||||
scheduler-name: schedulerName # Scheduler 名字。默认为 schedulerName
|
||||
job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。
|
||||
wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
|
||||
properties: # 添加 Quartz Scheduler 附加属性,更多可以看 http://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/configuration.html 文档
|
||||
org:
|
||||
quartz:
|
||||
# Scheduler 相关配置
|
||||
scheduler:
|
||||
instanceName: schedulerName
|
||||
instanceId: AUTO # 自动生成 instance ID
|
||||
# JobStore 相关配置
|
||||
jobStore:
|
||||
# JobStore 实现类。可见博客:https://blog.csdn.net/weixin_42458219/article/details/122247162
|
||||
class: org.springframework.scheduling.quartz.LocalDataSourceJobStore
|
||||
isClustered: true # 是集群模式
|
||||
clusterCheckinInterval: 15000 # 集群检查频率,单位:毫秒。默认为 15000,即 15 秒
|
||||
misfireThreshold: 60000 # misfire 阀值,单位:毫秒。
|
||||
# 线程池相关配置
|
||||
threadPool:
|
||||
threadCount: 25 # 线程池大小。默认为 10 。
|
||||
threadPriority: 5 # 线程优先级
|
||||
class: org.quartz.simpl.SimpleThreadPool # 线程池类型
|
||||
jdbc: # 使用 JDBC 的 JobStore 的时候,JDBC 的配置
|
||||
initialize-schema: NEVER # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。
|
||||
|
Loading…
x
Reference in New Issue
Block a user