Springboot中动态管理定时任务

xing-chen-d / 2024-09-25 / 原文

引言

基于cron表达式的定时任务实现,因为cron表达式对于每个任务不确定,所以使用线程池来动态的创建和销毁定时任务

依赖

因为使用的spring自带的调度功能,所以没有额外的依赖,我的项目版本为:Static Badge Static Badge

使用

首先需要定义一个线程池,使用@configuration 注解配置

import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.scheduling.TaskScheduler;  
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
public class SchedulerConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        // 设置线程池大小
        scheduler.setPoolSize(10);
        scheduler.setThreadNamePrefix("workshop-scheduled-task-");
        scheduler.initialize();
        return scheduler;
    }
}

这个配置中定义了一个调度的线程池,并且配置了线程池大小、线程名称前缀以及初始化操作

然后实现一个定时管理

import com.google.common.collect.Maps;  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.scheduling.TaskScheduler;  
import org.springframework.scheduling.support.CronTrigger;  
import org.springframework.stereotype.Service;  
  
import javax.annotation.Resource;  
import java.time.LocalDateTime;  
import java.time.format.DateTimeFormatter;  
import java.util.Map;  
import java.util.concurrent.ScheduledFuture;

@Slf4j
@Service
public class DynamicScheduledTaskService {

    @Resource
    private TaskScheduler taskScheduler;
    
    /**  
     * 使用Map来关联任务ID和ScheduledFuture
     * 如有必要,可以通过Redis等数据库进行管理
     */
    private final Map<String, ScheduledFuture<?>> tasks = Maps.newConcurrentMap();

	/**
	 * 周期调度执行任务
	 *  
	 * @param taskId         任务ID  
	 * @param cronExpression cron表达式  
	 * @param task           实际任务  
	 */
    public void schedulingTask(String taskId, String cronExpression, Runnable task) {  
        log.info("添加定时调度任务:{},cron为:{}", taskId, cronExpression);  
  
        // 取消已存在的同ID任务
        ScheduledFuture<?> existingTask = tasks.get(taskId);
        if (existingTask != null && !existingTask.isCancelled()) {  
            existingTask.cancel(false);  
        }  
  
        // 包装任务以便在执行完毕后自动取消  
        Runnable wrappedTask = () -> {
            log.info("{} 执行定时调度任务:{}", DateUtil.nowToString(), taskId);  
            task.run();  
        };  
        // 安排任务并保存其Future  
        ScheduledFuture<?> future = taskScheduler.schedule(wrappedTask, new CronTrigger(cronExpression));  
        tasks.put(taskId, future);  
    }  
	    
	/**
	 * 定时单次执行调度任务
	 *  
	 * @param taskId   任务ID
	 * @param execTime 执行时间
	 * @param task     实际任务
	 */
	public void singleScheduleTask(String taskId, LocalDateTime execTime, Runnable task) {
	    log.info("添加定时调度任务:{},执行时间为:{}", taskId, execTime);
	  
	    DateTimeFormatter cronTimeFormatter = DateTimeFormatter.ofPattern("ss mm HH dd MM ?");
	    // 取消已存在的同ID任务
	    ScheduledFuture<?> existingTask = tasks.get(taskId);
	    if (existingTask != null && !existingTask.isCancelled()) {
	        existingTask.cancel(false);
	    }
	  
	    // 包装任务以便在执行完毕后自动取消
	    Runnable wrappedTask = () -> {
	        log.info("{} 执行单次调度任务:{}", LocalDateTime.now(), taskId);
	        try {
	            task.run();
	        } finally {  
	            // 无论任务成功还是异常终止,都取消后续执行  
	            this.stopTask(taskId);  
	        }  
	    };  
	    // 安排任务并保存其Future  
	    ScheduledFuture<?> future = taskScheduler.schedule(wrappedTask, new CronTrigger(cronTimeFormatter.format(execTime)));  
	    tasks.put(taskId, future);
	}
    public void stopTask(String taskId) {  
        ScheduledFuture<?> future = tasks.get(taskId);  
        if (future != null && !future.isCancelled()) {  
            future.cancel(false);  
            tasks.remove(taskId);  
            log.info("{} 停止调度任务:{}", DateUtil.nowToString(), taskId);  
        }  
    }  
  
}


此定时管理服务中一共实现了两种情况

  1. 周期性的执行任务,手动取消才进行取消
  2. 单次执行任务,执行后自动销毁任务

使用服务样例

@Slf4j
@Service
public class UseScheduleService{

    @Resource
    private DynamicScheduledTaskService scheduledTaskService;

    public void startSchedulingTask(){
        // 其余逻辑
        String taskId="taskId";
        // cron表达式,以从0分钟开始,每隔一分钟执行一次为例
        String crontab = "0 0/1 * * * ?";

        scheduledTaskService.schedulingTask(taskId, crontab, () -> {
            // 此处是执行任务的逻辑
        });
    
    }

    
    public void startScheduleTask(){
        // 其余逻辑
        String taskId="taskId";
        // 执行的时间
        LocalDateTime execTime = LocalDateTime.now();

        scheduledTaskService.scheduleTask(taskId, execTime, () -> {
            // 此处是执行任务的逻辑
        });
    }

    public void stopTask(String taskId){
        scheduledTaskService.stopTask(taskId);
    }

}

这样就完成了定时任务的动态管理

定时管理之外

初始化

在使用定时任务的场景中,我们一般还会有重启服务时候,需要针对重启之前已经在执行的任务进行恢复定时,这里我选择使用Spring的 ApplicationRunner 进行服务启动后执行逻辑的管理

import lombok.extern.slf4j.Slf4j;  
import org.springframework.stereotype.Service;
import org.springframework.boot.ApplicationRunner;  
import org.springframework.boot.ApplicationArguments;
  
@Slf4j  
@Service
public class UseScheduleService implements ApplicationRunner{
@Resource
private DynamicScheduledTaskService dynamicScheduledTaskService;

    @Override  
    public void run(ApplicationArguments args) {
        log.info("============= 初始化定时任务begin =============");
        // 获取需要启动的列表
        List<Object> needStartList = new ArrayList<>();
        // 进行定时任务的写入
        needStartList.forEach(i->dynamicScheduledTaskService.startSchedulingTask(i.getTaskId(), i.getCronTab(),
        () -> {
        // 执行的逻辑
        }));
        log.info("============= 初始化定时任务end =============");
    }
}

定时任务工具类

我在使用定时任务时,还遇到了需要判定是否是合规的cron表达式以及根据cron表达式获取下次执行时间的需求,封装的工具类如下

import org.springframework.scheduling.support.CronSequenceGenerator;  
  
import java.time.LocalDateTime;  
import java.time.ZoneId;  
import java.util.Date;
  
public class CronUtils {  
  
    private CronUtils() {  
    }  
  
    /**  
     * 判断cron表达式是否无效  
     *  
     * @param cron cron表达式  
     * @return 判定结果  
     */  
    public static Boolean isInvalidCron(String cron) {  
        return !isValidCron(cron);  
    }  
  
    /**  
     * 判断cron表达式是否有效  
     *  
     * @param cron cron表达式  
     * @return 判定结果  
     */  
    public static Boolean isValidCron(String cron) {  
        try {  
            new CronSequenceGenerator(cron);  
            return true;  
        } catch (IllegalArgumentException ex) {  
            return false;  
        }  
    }  
  
    /**  
     * 获取下次执行时间  
     *  
     * @param cron cron表达式  
     * @return 下次执行时间  
     */  
    public static LocalDateTime getNextExecTime(String cron) {
        // 因为CronSequenceGenerator.next接受的入参只有java.util.Date,而我又习惯使用LocalDateTime,因此加入此转换
        return new CronSequenceGenerator(cron).next(new Date()).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();  
    }  
  
}