DelayQueue实现指定时间后业务取消或确认

酸菜鱼没有鱼 / 2023-08-19 / 原文

DelayQueue实现指定时间后业务取消或确认

需求背景:

项目中经常会遇到一些业务在某些情况下需要延迟一些时间后进行取消或确认操作,常见的就是订单支付时15分钟内未支付取消订单。具体使用如下:

1.先定义一个执行延迟任务的接口,执行延迟任务的业务相关类必须实现此接口(也就是我们xxxService),具体如下:

你也可以补充自定定义一个接口,提供你需要的功能

import java.util.List;

/**
 * 描述:JDK延迟队列执行task接口
 *
 * @author SXT
 * @version 1.0
 * @date 2023/8/17
 */
public interface DelayTaskService<E> {

    /**
     * 无返回值执行延迟任务
     * @param data 参数
     */
    void executeTask(List<E> data);
}

2.创建一个通用接受延迟任务的class,并实现Delayed接口。主要目的是为了接受任何一个业务需要的延迟任务数据并进行到点执行。接受两个泛型,E代表的是延迟任务的业务类, S是对应的Service,此泛型必须实现上一步定义的DelayTaskService接口,主要是到点执行对应的业务逻辑。

具体实现如下:


import com.test.DelayTaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 描述:延迟任务的通用实体类
 *
 * @author SXT
 * @version 1.0
 * @date 2023/8/17
 */
public class DelayItem<E, S extends DelayTaskService<E>> implements Delayed, Serializable {

    private static final long serialVersionUID = -9064181344914970072L;

    private static Logger log = LoggerFactory.getLogger(DelayItem.class);

    /** 唯一标识,用来移除未执行的延迟任务,也可以当作唯一的业务标识 */
    private String bizId;

    /** 类型(根据需要可有可无) */
    private final String type;

    /** 延迟时间 */
    private long expiredTime;

    /** 延迟任务执行的服务接口 */
    private final S taskService;

    /** 延迟任务执行是需要的数据 */
    private List<E> data;

    /**
     * 创建延迟任务实例
     * @param bizId 唯一标识,可以为null
     * @param type 类型 可以为null
     * @param taskService 延迟任务执行的服务接口
     * @param data 延迟任务执行是需要的数据
     * @param creatTime 延迟任务的创建时间
     * @param expiredTime 过期时间 单位s
     */
    public DelayItem(String bizId,String type, S taskService, List<E> data, long creatTime, long expiredTime){
        if (creatTime < 1) {
            creatTime = Instant.now().toEpochMilli();
        }
        this.bizId = bizId;
        this.type = type;
        this.taskService = taskService;
        this.data = data;
        this.expiredTime = creatTime + TimeUnit.MILLISECONDS.convert(expiredTime, TimeUnit.SECONDS);
    }

    /**
     * 创建延迟任务实例
     * @param taskService 延迟任务执行的服务接口
     * @param data 延迟任务执行是需要的数据
     * @param creatTime 延迟任务的创建时间
     * @param expiredTime 过期时间 单位s
     */
    public DelayItem(S taskService, List<E> data, long creatTime, long expiredTime){
        this(null, null, taskService, data, creatTime, expiredTime);
    }

    /**
     * 创建延迟任务实例,以当前时间为创建时间
     * @param taskService 延迟任务执行的服务接口
     * @param data 延迟任务执行是需要的数据
     * @param expiredTime 过期时间 单位s
     */
    public DelayItem(S taskService, List<E> data, long expiredTime){
        this(null, null, taskService, data, Instant.now().toEpochMilli(), expiredTime);
    }



    /**
     * 获取延迟时间多久进行
     * @param unit 时间单位
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        log.info("延迟时间~~~~");
        return unit.convert(expiredTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 比较延迟任务先后
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return Long.compare(expiredTime, ((DelayItem)o).expiredTime);
    }

    /**
     * 执行延迟任务
     */
    public void execute(){
        try {
            taskService.executeTask(data);
        }catch (Exception e) {
            log.error("延迟任务执行错误, 参数: {}", this.toString());
            log.error("异常详情: ", e);
        }
    }

    @Override
    public String toString() {
        return "DelayItem{" +
                "bizId='" + bizId + '\'' +
                ", type='" + type + '\'' +
                ", expiredTime=" + expiredTime +
                ", taskService=" + taskService +
                ", data=" + data +
                '}';
    }
}

3.定义一个延迟任务工具类,方便添加延迟任务

具体如下:


import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.test.DelayItem;
import com.test.DelayTaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;

/**
 * 描述:延迟任务工具类,在使用前需要调用DelayUtil.start()开启延迟任务监控线程
 *
 * @author SXT
 * @version 1.0
 * @date 2023/8/17
 */
public class DelayUtil {

    private static Logger log = LoggerFactory.getLogger(DelayUtil.class);

    /** 是否开启线程执行延迟任务, true:已开启 */
    private static boolean isRun = false;

    private final static ExecutorService threadService = new ThreadPoolExecutor(
            // 核心线程池大小
            3,
            // 最大线程池大小
            6,
            // 线程最大空闲时间
            15,
            // 时间单位
            TimeUnit.SECONDS,
            // 线程等待队列
            new LinkedBlockingQueue<>(),
            // 线程创建工厂
            new ThreadFactoryBuilder().setNameFormat("Delay-pool-%d").build(),
            // 拒绝策略
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    private final static DelayQueue<DelayItem<?, ? extends DelayTaskService<?>>> QUEUE = new DelayQueue<>();

    /**
     * 向延迟队列中添加执行的任务
     * @param item
     */
    public static void addItem (DelayItem<?, ? extends DelayTaskService<?>> item) {
        QUEUE.put(item);
    }

    /**
     * 向延迟队列中添加执行的任务
     * @param items
     */
    public static void addItem (List<DelayItem<?, ? extends DelayTaskService<?>>> items) {
        items.forEach(QUEUE::put);
    }

    /**
     * 向延迟队列中移除等待执行的任务
     * @param bizId 移除指定标识的元素
     */
    public static void remove (String bizId) {
        if (null != bizId && !bizId.isEmpty()) {
            if (QUEUE.removeIf(el -> Objects.equals(el.getBizId(), bizId))) {
                log.debug("{} 延迟任务移除成功", bizId);
                return;
            }
            log.info("{} 延迟任务移除失败", bizId);
        }
    }

    /**
     * 向延迟队列中移除等待执行的任务
     * @param bizIds 移除指定标识的元素集合
     */
    public static synchronized void removeAll (Collection<String> bizIds) {
        for (String bizId : bizIds) {
            remove(bizId);
        }
    }

    /**
     * 阻塞获取队列元素
     */
    public static DelayItem<?, ? extends DelayTaskService<?>> take () {
        try {
            return QUEUE.take();
        } catch (InterruptedException e) {
            log.error("阻塞获取延迟实例失败: {}", e);
        }
        return null;
    }

    /**
     * 非阻塞获取队列元素, 没有到期的元素直接返回 null
     */
    public static DelayItem<?, ? extends DelayTaskService<?>> poll () {
        return QUEUE.poll();
    }

    /**
     * 开启线程执行延迟队列任务
     */
    public static synchronized void start () {
        if (isRun) {
            return;
        }
        isRun = true;
        log.info("开始运行延迟队列~~~");
        threadService.execute(() -> {
            while (true) {
                //获取要执行的任务
                final DelayItem<?, ? extends DelayTaskService<?>> task = take();
                if (null == task) {
                    log.warn("延迟任务未获取到数据....");
                    continue;
                }
                //开启一个新的线程进行执行任务
                CompletableFuture.runAsync(task::execute, threadService);
                log.info("延迟任务执行, 执行时间: {}", LocalDateTime.now());
            }
        });
    }


}

以上就是使用jdk提供的DelayQueue实现的通用延迟队列实现。

下面是具体的使用示例:

1.分别创建UserService和PersonService并是实现自定义的DelayService接口


public class UserService implements DelayTaskService<User> {

    private static Logger log = LoggerFactory.getLogger(UserService.class);

    @Override
    public void executeTask(List<User> data) {
        log.info("UserService服务执行...");
        log.info("data: {}", Arrays.toString(data.toArray()));
    }
}

/*****************************************************/

public class PersonService implements DelayTaskService<Person> {

    private static Logger log = LoggerFactory.getLogger(PersonService.class);

    @Override
    public void executeTask(List<Person> data) {
        log.info("PersonService服务执行...");
        log.info("data: {}", Arrays.toString(data.toArray()));
    }
}

2.进行测试


import com.test.DelayItem;
import com.test.DelayUtil;
import com.test.Person;
import com.test.User;
import com.test.PersonService;
import com.test.UserService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
 * 描述:
 *
 * @author SXT
 * @version 1.0
 * @date 2023/8/17
 */
public class DelayTaskTest {

    private UserService userService;
    private PersonService personService;

    /** 模拟service依赖注入 */
    @BeforeEach
    void before(){
        userService = new UserService();
        personService = new PersonService();
    }

    @AfterEach
    void end(){
        System.out.println("测试结束 = 。=");
    }

    /**
     * 测试延迟任务执行
     * @throws InterruptedException
     */
    @Test
    void testDelayTask() throws InterruptedException {
        final long milli = Instant.now().toEpochMilli();
        User user = new User();
        user.setAge(12);
        user.setName("12");
        user.setId("12");
        DelayItem<User, UserService> userITem = new DelayItem<>(userService, Arrays.asList(user), milli,15);
        DelayUtil.addItem(userITem);

        Person person = new Person();
        person.setAge(14);
        person.setName("14");
        person.setId("14");
        DelayItem<Person, PersonService> personItem = new DelayItem<>(personService, Arrays.asList(person),milli, 15);
        DelayUtil.addItem(personItem);
        DelayUtil.start();
        TimeUnit.SECONDS.sleep(60);
    }

    /**
     * 测试取消延迟任务
     * @throws InterruptedException
     */
    @Test
    void testCancelDelayTask() throws InterruptedException {
        System.out.println("测试取消延迟任务");
        final long milli = Instant.now().toEpochMilli();
        User user = new User();
        user.setAge(12);
        user.setName("12");
        user.setId("12");
        DelayItem<User, UserService> userITem = new DelayItem<>("12", "user",userService, Arrays.asList(user), milli,15);
        DelayUtil.addItem(userITem);

        Person person = new Person();
        person.setAge(14);
        person.setName("14");
        person.setId("14");
        DelayItem<Person, PersonService> personItem = new DelayItem<>("14", "person",personService, Arrays.asList(person),milli, 15);
        DelayUtil.addItem(personItem);
        DelayUtil.start();

        //取消未执行的延迟任务
        TimeUnit.SECONDS.sleep(3);
        DelayUtil.remove("12");

        TimeUnit.SECONDS.sleep(30);
    }

}

3.测试结果:

测试延迟任务执行:

测试取消延迟任务:

总结:

  1. 此方式只适用于数据量不大的情况下使用,适用于单机版
  2. 在项目中使用需要注意在项目启动时或启动后执行DelayUtil.start()开启延迟线程
  3. 服务停止会将未执行的延迟任务清空,所有在启动项目时要做获取需要执行的延迟任务数据,并添加到延迟队列中

在使用Spring框架下,可以定义一个class专门在项目启动时执行总结的点三点。具体可以时在Bean实例化后进行(实现InitializingBean接口),也可以使用@PostConstruct。当然也可以使用其他的方式