This commit is contained in:
HopeLi 2025-05-27 21:25:23 +08:00
parent 4029e7d497
commit 8c6011fcb4
17 changed files with 493 additions and 400 deletions

View File

@ -1,14 +1,15 @@
package cd.casic.ci.process.engine.scheduler;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.executor.PipelineExecutor;
import cd.casic.ci.process.engine.scheduler.config.PipelineSchedulerConfig;
import cd.casic.ci.process.engine.scheduler.config.QuartzSchedulerManager;
import cd.casic.ci.process.engine.scheduler.dao.PipelineSchedulingPropertiesDao;
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
import cd.casic.ci.process.engine.scheduler.service.impl.PipelineSchedulingPropertiesServiceImpl;
import cd.casic.ci.process.engine.scheduler.req.PipelineSchedulingReq;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
@ -16,29 +17,32 @@ import java.util.List;
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingBootstrapper
* @Date: 2025/5/26 17:28
* @Description:
* @Date: 2025/5/27 17:46
* @Description:
*/
@Component
@RequiredArgsConstructor
public class PipelineSchedulingBootstrapper {
@Resource
private PipelineSchedulingPropertiesServiceImpl taskService;
private final QuartzSchedulerManager quartzSchedulerManager;
private final PipelineExecutor pipelineExecutor;
@Resource
private PipelineSchedulerConfig pipelineSchedulerConfig;
@Resource
private PipelineExecutor pipelineExecutor;
private PipelineSchedulingPropertiesDao pipelineSchedulingPropertiesDao;
@PostConstruct
public void init() {
List<PipelineSchedulingProperties> tasks = taskService.getAllTasks();
for (PipelineSchedulingProperties task : tasks) {
if (ContextStateEnum.RUNNING.getCode().equals(Integer.parseInt(task.getStatus()))) {
pipelineSchedulerConfig.addTask(task.getPipelineId(), ()->{
pipelineExecutor.execute(task.getPipelineId());
}, task.getCronExpression());
public void init(){
List<PipelineSchedulingProperties> tasks = pipelineSchedulingPropertiesDao.selectList();
if (!CollectionUtils.isEmpty(tasks)){
for (PipelineSchedulingProperties task : tasks) {
try {
PipelineSchedulingReq req = new PipelineSchedulingReq();
req.setPipelineId(task.getPipelineId());
req.setCron(task.getCron());
quartzSchedulerManager.addTask(req);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

View File

@ -1,150 +0,0 @@
package cd.casic.ci.process.engine.scheduler.config;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.ScheduledTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulerConfig
* @Date: 2025/5/26 10:55
* @Description:
*/
@Configuration
@EnableScheduling
@Slf4j
@Component("pipelineSchedulerConfig")
public class PipelineSchedulerConfig implements SchedulingConfigurer {
private ScheduledTaskRegistrar taskRegistrar;
private final Map<String, ScheduledTask> taskFutures = new HashMap<>();
private final Map<String, CronTask> cronTaskMap = new HashMap<>();
@Override
public void configureTasks(@NotNull ScheduledTaskRegistrar taskRegistrar) {
this.taskRegistrar = taskRegistrar;
log.info("ScheduledTaskRegistrar 初始化完成");
}
public void addTask(String taskId, Runnable task, String cronExpression) {
if (taskRegistrar == null) {
log.error("【定时任务失败】taskRegistrar 尚未初始化,请检查调度器是否已启动");
return;
}
CronTask cronTask = new CronTask(task, cronExpression);
ScheduledTask scheduledTask = taskRegistrar.scheduleCronTask(cronTask);
taskFutures.put(taskId, scheduledTask);
cronTaskMap.put(taskId, cronTask);
log.info("任务 [{}] 已添加cron: {}", taskId, cronExpression);
}
public void removeTask(String taskId) {
ScheduledTask scheduledTask = taskFutures.get(taskId);
if (scheduledTask != null) {
ScheduledFuture<?> future = getFuture(scheduledTask);
if (future != null) {
future.cancel(true);
}
taskFutures.remove(taskId);
cronTaskMap.remove(taskId);
log.info("任务 [{}] 已删除", taskId);
}
}
public void updateTask(String taskId, String cronExpression) {
removeTask(taskId);
CronTask oldTask = cronTaskMap.get(taskId);
if (oldTask == null) {
throw new IllegalArgumentException("未找到对应的任务: " + taskId);
}
addTask(taskId, oldTask.getRunnable(), cronExpression);
log.info("任务 [{}] 已更新,新 cron: {}", taskId, cronExpression);
}
public void startTask(String taskId) {
ScheduledTask scheduledTask = taskFutures.get(taskId);
if (scheduledTask == null) {
log.warn("任务 [{}] 不存在", taskId);
return;
}
ScheduledFuture<?> future = getFuture(scheduledTask);
if (future != null && !future.isCancelled()) {
log.info("任务 [{}] 正在运行中,无需重启", taskId);
return;
}
CronTask cronTask = cronTaskMap.get(taskId);
if (cronTask == null) {
log.warn("未找到对应的任务定义: {}", taskId);
return;
}
ScheduledTask newTask = taskRegistrar.scheduleCronTask(cronTask);
taskFutures.put(taskId, newTask);
log.info("任务 [{}] 已重启", taskId);
}
/**
* 停止一个任务
*/
public void stopTask(String taskId) {
removeTask(taskId);
log.info("任务 [{}] 已停止", taskId);
}
/**
* 获取 ScheduledTask 中的 future 字段
*/
private ScheduledFuture<?> getFuture(ScheduledTask scheduledTask) {
try {
Field field = scheduledTask.getClass().getDeclaredField("future");
field.setAccessible(true);
return (ScheduledFuture<?>) field.get(scheduledTask);
} catch (Exception e) {
log.error("反射获取 future 失败: {}", e.getMessage());
return null;
}
}
/**
* 判断任务是否已取消
*/
public boolean isTaskCancelled(String taskId) {
ScheduledTask scheduledTask = taskFutures.get(taskId);
if (scheduledTask == null) return true;
ScheduledFuture<?> future = getFuture(scheduledTask);
return future != null && future.isCancelled();
}
// @Component
// public static class PipelineSchedulerTask implements Runnable {
//
// @Override
// public void run() {
// // 定时任务逻辑代码
// System.out.println("Pipeline Task: " + new Date());
// }
// }
}

View File

@ -0,0 +1,39 @@
package cd.casic.ci.process.engine.scheduler.config;
import org.quartz.Scheduler;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
/**
* @author HopeLi
* @version v1.0
* @ClassName QuartzConfig
* @Date: 2025/5/27 21:16
* @Description:
*/
@Configuration
public class QuartzConfig {
private final ApplicationContext applicationContext;
public QuartzConfig(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Bean
public Scheduler scheduler() throws Exception {
SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setJobFactory(jobFactory);
factory.afterPropertiesSet();
Scheduler scheduler = factory.getScheduler();
scheduler.start();
return scheduler;
}
}

View File

@ -0,0 +1,155 @@
package cd.casic.ci.process.engine.scheduler.config;
import cd.casic.ci.process.engine.scheduler.dao.PipelineSchedulingPropertiesDao;
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
import cd.casic.ci.process.engine.scheduler.job.PipelineSchedulingJob;
import cd.casic.ci.process.engine.scheduler.req.PipelineSchedulingReq;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
/**
* @author HopeLi
* @version v1.0
* @ClassName QuartzSchedulerManager
* @Date: 2025/5/27 17:31
* @Description:
*/
@Component
@Slf4j
public class QuartzSchedulerManager {
private static final String JOB_GROUP_NAME = "PIPELINE_JOB_GROUP";
private static final String TRIGGER_GROUP_NAME = "PIPELINE_TRIGGER_GROUP";
@Resource
private Scheduler scheduler;
@Resource
private PipelineSchedulingPropertiesDao pipelineSchedulingPropertiesDao;
@PostConstruct
public void init() throws SchedulerException {
log.info("Quartz 调度器初始化完成");
}
/**
* 添加一个定时任务
*/
public void addTask(PipelineSchedulingReq req) throws Exception {
JobDetail jobDetail = newJob(PipelineSchedulingJob.class)
.withIdentity(req.getPipelineId(), JOB_GROUP_NAME)
.usingJobData(PipelineSchedulingJob.JOB_PARAM_PIPELINE_ID, req.getPipelineId())
.build();
Trigger trigger = newTrigger()
.withIdentity(req.getPipelineId(), TRIGGER_GROUP_NAME)
.withSchedule(CronScheduleBuilder.cronSchedule(req.getCron()))
.build();
scheduler.scheduleJob(jobDetail, trigger);
log.info("任务 [{}] 已添加cron: {}", req.getPipelineId(), req.getCron());
QueryWrapper<PipelineSchedulingProperties> wrapper = new QueryWrapper<>();
wrapper.eq("pipeline_id", req.getPipelineId());
wrapper.eq("cron", req.getCron());
List<PipelineSchedulingProperties> list = pipelineSchedulingPropertiesDao.selectList(wrapper);
//为空则数据入库否则仅为重新添加任务进调度器
if (!CollectionUtils.isEmpty(list)){
PipelineSchedulingProperties pipelineSchedulingProperties = new PipelineSchedulingProperties();
pipelineSchedulingProperties.setPipelineId(req.getPipelineId());
pipelineSchedulingProperties.setCron(req.getCron());
pipelineSchedulingPropertiesDao.insert(pipelineSchedulingProperties);
// pipelineSchedulingProperties.setStatus(PipelineSchedulingStatusEnum.READY.getCode());
}
}
/**
* 删除任务
*/
public void removeTask(PipelineSchedulingReq req) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(req.getPipelineId(), TRIGGER_GROUP_NAME);
scheduler.pauseTrigger(triggerKey);
scheduler.unscheduleJob(triggerKey);
scheduler.deleteJob(JobKey.jobKey(req.getPipelineId(), JOB_GROUP_NAME));
log.info("任务 [{}] 已删除", req.getPipelineId());
//数据删除
QueryWrapper<PipelineSchedulingProperties> wrapper = new QueryWrapper<>();
wrapper.eq("pipeline_id", req.getPipelineId());
wrapper.eq("cron", req.getCron());
List<PipelineSchedulingProperties> list = pipelineSchedulingPropertiesDao.selectList(wrapper);
if (!CollectionUtils.isEmpty(list)){
List<String> idList = list.stream().map(PipelineSchedulingProperties::getId).toList();
pipelineSchedulingPropertiesDao.deleteByIds(idList);
}
}
/**
* 更新任务
*/
public void updateTask(PipelineSchedulingReq req) throws Exception {
removeTask(req);
addTask(req);
log.info("任务 [{}] 已更新,新 cron: {}", req.getPipelineId(), req.getCron());
}
/**
* 启动/重启任务
*/
public void startTask(PipelineSchedulingReq req) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(req.getPipelineId(), JOB_GROUP_NAME);
if (!scheduler.checkExists(jobKey)) {
log.warn("任务 [{}] 不存在", req.getPipelineId());
return;
}
if (scheduler.isShutdown()) {
scheduler.start();
}
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) {
if (trigger.getNextFireTime() == null) {
scheduler.rescheduleJob(TriggerKey.triggerKey(req.getPipelineId(), TRIGGER_GROUP_NAME), trigger);
}
}
log.info("任务 [{}] 已重启", req.getPipelineId());
}
/**
* 判断任务是否已停止
*/
public boolean isTaskPaused(PipelineSchedulingReq req) throws SchedulerException {
JobKey jobKey = JobKey.jobKey(req.getPipelineId(), JOB_GROUP_NAME);
return !scheduler.isStarted() || scheduler.getJobDetail(jobKey) == null;
}
/**
* 获取所有任务 ID
*/
public Set<String> getAllTaskIds() throws SchedulerException {
Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(JOB_GROUP_NAME));
Set<String> taskIds = new HashSet<>();
for (JobKey jobKey : jobKeys) {
taskIds.add(jobKey.getName());
}
return taskIds;
}
}

View File

@ -0,0 +1,52 @@
package cd.casic.ci.process.engine.scheduler.controller;
import cd.casic.ci.process.engine.scheduler.config.QuartzSchedulerManager;
import cd.casic.ci.process.engine.scheduler.req.PipelineSchedulingReq;
import cd.casic.framework.commons.pojo.CommonResult;
import jakarta.annotation.Resource;
import jakarta.validation.Valid;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineTaskController
* @Date: 2025/5/27 19:18
* @Description:
*/
@RestController
@RequestMapping("/scheduler")
public class PipelineSchedulingController {
@Resource
private QuartzSchedulerManager quartzSchedulerManager;
@PostMapping("/add")
public CommonResult<Void> add(@RequestBody @Valid PipelineSchedulingReq req) throws Exception {
quartzSchedulerManager.addTask(req);
return CommonResult.success();
}
@PostMapping("/remove")
public CommonResult<Void> remove(@RequestBody @Valid PipelineSchedulingReq req) throws Exception {
quartzSchedulerManager.removeTask(req);
return CommonResult.success();
}
@PostMapping("/update")
public CommonResult<Void> update(@RequestBody @Valid PipelineSchedulingReq req) throws Exception {
quartzSchedulerManager.updateTask(req);
return CommonResult.success();
}
@PostMapping("/start")
public CommonResult<Void> start(@RequestBody @Valid PipelineSchedulingReq req) throws Exception {
quartzSchedulerManager.startTask(req);
return CommonResult.success();
}
}

View File

@ -1,54 +0,0 @@
package cd.casic.ci.process.engine.scheduler.controller;
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
import cd.casic.ci.process.engine.scheduler.service.PipelineSchedulingPropertiesService;
import cd.casic.ci.process.process.dataObject.base.BaseIdReq;
import jakarta.annotation.Resource;
import jakarta.validation.Valid;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingPropertiesController
* @Date: 2025/5/26 16:56
* @Description:
*/
@RestController
@RequestMapping("/scheduling")
public class PipelineSchedulingPropertiesController {
@Resource
private PipelineSchedulingPropertiesService pipelineSchedulingPropertiesService;
@PostMapping("/getAll")
public List<PipelineSchedulingProperties> getAll() {
return pipelineSchedulingPropertiesService.getAllTasks();
}
@PostMapping("/create")
public void create(@RequestBody @Valid PipelineSchedulingProperties task) {
pipelineSchedulingPropertiesService.addTask(task);
}
@PostMapping("/update")
public void update(@RequestBody @Valid PipelineSchedulingProperties task) {
pipelineSchedulingPropertiesService.updateTask(task);
}
@PostMapping("/delete")
public void delete(@RequestBody @Valid BaseIdReq req) {
pipelineSchedulingPropertiesService.deleteTask(req.getId());
}
@PostMapping("/start")
public void start(@RequestBody @Valid BaseIdReq req) {
pipelineSchedulingPropertiesService.startTask(req.getId());
}
@PostMapping("/stop")
public void stop(@RequestBody @Valid BaseIdReq req) {
pipelineSchedulingPropertiesService.stopTask(req.getId());
}
}

View File

@ -9,8 +9,8 @@ import lombok.EqualsAndHashCode;
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingProperties
* @Date: 2025/5/26 16:27
* @Description:
* @Date: 2025/5/27 19:57
* @Description:
*/
@EqualsAndHashCode(callSuper = true)
@ -22,15 +22,10 @@ public class PipelineSchedulingProperties extends PipBaseElement {
*/
private String pipelineId;
/**
* 任务名称
*/
private String pipelineName;
/**
* 定时表达式
*/
private String cronExpression;
private String cron;
/**
* 执行状态

View File

@ -0,0 +1,56 @@
package cd.casic.ci.process.engine.scheduler.enums;
import lombok.Getter;
import java.util.EnumMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@Getter
public enum PipelineSchedulingStatusEnum {
INIT("0","初始化"),
READY("1","就绪"),
RUNNING("2","运行"),
SUSPEND("3","挂起"),
STOP("-1","停止"),
HAPPY_ENDING("4","执行成功"),
BAD_ENDING("5","执行失败")
;
private String code;
private String msg;
private static final Map<PipelineSchedulingStatusEnum, Set<PipelineSchedulingStatusEnum>> TRANSITIONS = new EnumMap<>(PipelineSchedulingStatusEnum.class);
static {
TRANSITIONS.put(INIT, Set.of(READY, SUSPEND, BAD_ENDING, STOP));
TRANSITIONS.put(READY, Set.of(READY,RUNNING, SUSPEND, BAD_ENDING, STOP));
TRANSITIONS.put(RUNNING, Set.of(RUNNING,SUSPEND, HAPPY_ENDING, BAD_ENDING, STOP));
TRANSITIONS.put(SUSPEND, Set.of(SUSPEND,INIT, READY, BAD_ENDING, RUNNING,STOP));
//...初始化其他状态转移关系
}
PipelineSchedulingStatusEnum(String code, String msg) {
this.code = code;
this.msg = msg;
}
public static Boolean canGoto(PipelineSchedulingStatusEnum from, PipelineSchedulingStatusEnum to){
try {
if (Objects.isNull(from) || Objects.isNull(to)) {
return false;
}
return TRANSITIONS.get(from).contains(to);
} catch (Exception e){
return false;
}
}
public static PipelineSchedulingStatusEnum getByCode(String code){
for (PipelineSchedulingStatusEnum value : values()) {
if (value.getCode().equals(code)) {
return value;
}
}
return null;
}
}

View File

@ -0,0 +1,33 @@
package cd.casic.ci.process.engine.scheduler.job;
import cd.casic.ci.process.engine.executor.PipelineExecutor;
import jakarta.annotation.Resource;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineTaskJob
* @Date: 2025/5/27 17:28
* @Description:
*/
public class PipelineSchedulingJob implements Job {
public static final String JOB_PARAM_PIPELINE_ID = "pipelineId";
public static final String JOB_PARAM_EXECUTOR = "executor";
@Resource
private PipelineExecutor executor;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDataMap dataMap = context.getMergedJobDataMap();
String pipelineId = dataMap.getString(JOB_PARAM_PIPELINE_ID);
if (pipelineId != null && executor != null) {
executor.execute(pipelineId);
}
}
}

View File

@ -0,0 +1,30 @@
package cd.casic.ci.process.engine.scheduler.listener;
import lombok.RequiredArgsConstructor;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
* @author HopeLi
* @version v1.0
* @ClassName QuartzStartupListener
* @Date: 2025/5/27 20:42
* @Description:
*/
@Component
@RequiredArgsConstructor
public class QuartzStartupListener {
private final Scheduler scheduler;
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() throws SchedulerException {
if (!scheduler.isStarted()) {
scheduler.start();
System.out.println("Quartz 调度器已手动启动");
}
}
}

View File

@ -0,0 +1,20 @@
package cd.casic.ci.process.engine.scheduler.req;
import lombok.Data;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingReq
* @Date: 2025/5/27 19:24
* @Description:
*/
@Data
public class PipelineSchedulingReq {
private String pipelineId;
private String cron;
private String status;
}

View File

@ -1,27 +0,0 @@
package cd.casic.ci.process.engine.scheduler.service;
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
import jakarta.validation.Valid;
import java.util.List;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingPropertiesService
* @Date: 2025/5/26 16:48
* @Description:
*/
public interface PipelineSchedulingPropertiesService {
List<PipelineSchedulingProperties> getAllTasks();
void addTask(@Valid PipelineSchedulingProperties task);
void updateTask(@Valid PipelineSchedulingProperties task);
void deleteTask(String id);
void startTask(String id);
void stopTask(String id);
}

View File

@ -1,71 +0,0 @@
package cd.casic.ci.process.engine.scheduler.service.impl;
import cd.casic.ci.process.engine.enums.ContextStateEnum;
import cd.casic.ci.process.engine.scheduler.config.PipelineSchedulerConfig;
import cd.casic.ci.process.engine.scheduler.dao.PipelineSchedulingPropertiesDao;
import cd.casic.ci.process.engine.scheduler.dateObject.PipelineSchedulingProperties;
import cd.casic.ci.process.engine.scheduler.service.PipelineSchedulingPropertiesService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author HopeLi
* @version v1.0
* @ClassName PipelineSchedulingPropertiesServiceImpl
* @Date: 2025/5/26 16:49
* @Description:
*/
@Service
@Slf4j
public class PipelineSchedulingPropertiesServiceImpl extends ServiceImpl<PipelineSchedulingPropertiesDao, PipelineSchedulingProperties> implements PipelineSchedulingPropertiesService {
@Resource
private PipelineSchedulingPropertiesDao taskRepository;
@Resource
private PipelineSchedulerConfig pipelineSchedulerConfig;
public List<PipelineSchedulingProperties> getAllTasks() {
return taskRepository.selectList(null);
}
public void addTask(PipelineSchedulingProperties task) {
if (ContextStateEnum.RUNNING.getCode() == Integer.parseInt(task.getStatus())) {
pipelineSchedulerConfig.addTask(task.getPipelineId(), () -> {
},task.getCronExpression());
}
taskRepository.insert(task);
}
public void updateTask(PipelineSchedulingProperties task) {
PipelineSchedulingProperties old = taskRepository.selectById(task.getId());
if (old != null) {
pipelineSchedulerConfig.updateTask(old.getPipelineId(), task.getCronExpression());
}
taskRepository.updateById(task);
}
public void deleteTask(String taskId) {
pipelineSchedulerConfig.removeTask(taskId);
taskRepository.delete(new QueryWrapper<PipelineSchedulingProperties>().eq("task_id", taskId));
}
public void startTask(String taskId) {
pipelineSchedulerConfig.startTask(taskId);
taskRepository.update(null, new UpdateWrapper<PipelineSchedulingProperties>()
.set("status", String.valueOf(ContextStateEnum.RUNNING.getCode())).eq("task_id", taskId));
}
public void stopTask(String taskId) {
pipelineSchedulerConfig.stopTask(taskId);
taskRepository.update(null, new UpdateWrapper<PipelineSchedulingProperties>()
.set("status", String.valueOf(ContextStateEnum.STOP.getCode())).eq("task_id", taskId));
}
}

View File

@ -1,19 +0,0 @@
package cd.casic.ci.process.engine.scheduler.trigger;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
/**
* @author HopeLi
* @version v1.0
* @ClassName SchedulingTrigger
* @Date: 2025/5/27 15:05
* @Description:
*/
@Configuration
public class SchedulingTrigger {
@Scheduled(fixedRate = 5000)
public void triggerScheduler() {
// 用于触发调度器初始化
}
}

View File

@ -4,7 +4,6 @@ import cd.casic.ci.common.pipeline.req.pipeline.PipelineCreateReq;
import cd.casic.ci.common.pipeline.req.pipeline.PipelineQueryReq;
import cd.casic.ci.common.pipeline.req.pipeline.PipelineReq;
import cd.casic.ci.common.pipeline.req.pipeline.PipelineUpdateReq;
import cd.casic.ci.common.pipeline.resp.context.SingletonRunContextResp;
import cd.casic.ci.common.pipeline.resp.context.TreeRunContextResp;
import cd.casic.ci.common.pipeline.resp.pipeline.PipelineFindResp;
import cd.casic.ci.common.pipeline.resp.stage.StageResp;
@ -143,7 +142,7 @@ public class PipelineServiceImpl extends ServiceImpl<PipelineDao, PipPipeline> i
childTask11.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
childTask11.setTaskName("串行任务-1-1");
childTask11.setPipelineId(pipeline.getId());
childTask11.setTaskType("test");
childTask11.setTaskType("code");
childTask11.setTaskSort(1);
childTask11.setStageId(childStage1.getId());
taskService.save(childTask11);
@ -176,31 +175,31 @@ public class PipelineServiceImpl extends ServiceImpl<PipelineDao, PipPipeline> i
childTask21.setTaskName("串行任务-2-1");
childTask21.setCreateTime(LocalDateTime.now());
childTask21.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
childTask21.setTaskType("test");
childTask21.setTaskType("TEST_CASE_GENERATION");
childTask21.setTaskSort(1);
childTask21.setStageId(childStage21.getId());
taskService.save(childTask21);
//------------------------------------------------------------------
PipStage childStage22 = new PipStage();
childStage22.setStageName("并行阶段-2-2");
childStage22.setPipelineId(pipeline.getId());
childStage22.setCreateTime(LocalDateTime.now());
childStage22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
childStage22.setStageSort(2);
childStage22.setCode(false);
childStage22.setParentId(stageReq2.getId());
stageService.save(childStage22);
PipTask childTask22 = new PipTask();
childTask22.setCreateTime(LocalDateTime.now());
childTask22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
childTask22.setTaskName("串行任务-2-2");
childTask22.setPipelineId(pipeline.getId());
childTask22.setTaskType("test");
childTask22.setTaskSort(2);
childTask22.setStageId(childStage22.getId());
taskService.save(childTask22);
// PipStage childStage22 = new PipStage();
// childStage22.setStageName("并行阶段-2-2");
// childStage22.setPipelineId(pipeline.getId());
// childStage22.setCreateTime(LocalDateTime.now());
// childStage22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
// childStage22.setStageSort(2);
// childStage22.setCode(false);
// childStage22.setParentId(stageReq2.getId());
// stageService.save(childStage22);
//
// PipTask childTask22 = new PipTask();
// childTask22.setCreateTime(LocalDateTime.now());
// childTask22.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
// childTask22.setTaskName("串行任务-2-2");
// childTask22.setPipelineId(pipeline.getId());
// childTask22.setTaskType("test");
// childTask22.setTaskSort(2);
// childTask22.setStageId(childStage22.getId());
// taskService.save(childTask22);
//创建第三个阶段
PipStage stageReq3 = new PipStage();
@ -228,41 +227,42 @@ public class PipelineServiceImpl extends ServiceImpl<PipelineDao, PipPipeline> i
childTask31.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
childTask31.setTaskName("串行任务-3-1");
childTask31.setPipelineId(pipeline.getId());
childTask31.setTaskType("test");
childTask31.setTaskType("AFL");
childTask31.setTaskSort(1);
childTask31.setStageId(childStage31.getId());
taskService.save(childTask31);
//创建第四个阶段
PipStage stageReq4 = new PipStage();
stageReq4.setPipelineId(pipeline.getId());
stageReq4.setStageName("阶段-4");
stageReq4.setCreateTime(LocalDateTime.now());
stageReq4.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
stageReq4.setStageSort(4);
stageReq4.setParentId("-1");
stageReq4.setCode(true);
stageService.save(stageReq4);
PipStage childStage41 = new PipStage();
childStage41.setStageName("并行任务-4-1");
childStage41.setPipelineId(pipeline.getId());
childStage41.setCreateTime(LocalDateTime.now());
childStage41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
childStage41.setStageSort(1);
childStage41.setCode(false);
childStage41.setParentId(stageReq4.getId());
stageService.save(childStage41);
PipTask childTask41 = new PipTask();
childTask41.setCreateTime(LocalDateTime.now());
childTask41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
childTask41.setTaskName("串行任务-4-1");
childTask41.setPipelineId(pipeline.getId());
childTask41.setTaskType("test");
childTask41.setTaskSort(1);
childTask41.setStageId(childStage41.getId());
taskService.save(childTask41);
//TODO
// PipStage stageReq4 = new PipStage();
// stageReq4.setPipelineId(pipeline.getId());
// stageReq4.setStageName("阶段-4");
// stageReq4.setCreateTime(LocalDateTime.now());
// stageReq4.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
// stageReq4.setStageSort(4);
// stageReq4.setParentId("-1");
// stageReq4.setCode(true);
// stageService.save(stageReq4);
//
// PipStage childStage41 = new PipStage();
// childStage41.setStageName("并行任务-4-1");
// childStage41.setPipelineId(pipeline.getId());
// childStage41.setCreateTime(LocalDateTime.now());
// childStage41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
// childStage41.setStageSort(1);
// childStage41.setCode(false);
// childStage41.setParentId(stageReq4.getId());
// stageService.save(childStage41);
//
// PipTask childTask41 = new PipTask();
// childTask41.setCreateTime(LocalDateTime.now());
// childTask41.setCreator(String.valueOf(WebFrameworkUtils.getLoginUserId()));
// childTask41.setTaskName("串行任务-4-1");
// childTask41.setPipelineId(pipeline.getId());
// childTask41.setTaskType("REPORT");
// childTask41.setTaskSort(1);
// childTask41.setStageId(childStage41.getId());
// taskService.save(childTask41);
//TODO 创建对应的鉴权关系
//TODO 创建对应的消息分发

View File

@ -70,7 +70,7 @@ spring:
# Redis 配置。Redisson 默认的配置足够使用,一般不需要进行调优
data:
redis:
host: 192.168.1.120 # 地址
host: 127.0.0.1 # 地址
port: 6379 # 端口
database: 0 # 数据库索引
# password: dev # 密码,建议生产环境开启

View File

@ -183,3 +183,33 @@ debug: false
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# Quartz 配置项,对应 QuartzProperties 配置类
spring:
quartz:
auto-startup: true # 本地开发环境,尽量不要开启 Job
scheduler-name: schedulerName # Scheduler 名字。默认为 schedulerName
job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。
wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
properties: # 添加 Quartz Scheduler 附加属性,更多可以看 http://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/configuration.html 文档
org:
quartz:
# Scheduler 相关配置
scheduler:
instanceName: schedulerName
instanceId: AUTO # 自动生成 instance ID
# JobStore 相关配置
jobStore:
# JobStore 实现类。可见博客https://blog.csdn.net/weixin_42458219/article/details/122247162
class: org.springframework.scheduling.quartz.LocalDataSourceJobStore
isClustered: true # 是集群模式
clusterCheckinInterval: 15000 # 集群检查频率,单位:毫秒。默认为 15000即 15 秒
misfireThreshold: 60000 # misfire 阀值,单位:毫秒。
# 线程池相关配置
threadPool:
threadCount: 25 # 线程池大小。默认为 10 。
threadPriority: 5 # 线程优先级
class: org.quartz.simpl.SimpleThreadPool # 线程池类型
jdbc: # 使用 JDBC 的 JobStore 的时候JDBC 的配置
initialize-schema: NEVER # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。