阅读 3987

实战|我还是很建议你用DelayQueue搞定超时订单的-(1)

一、用三根鸡毛做引言

  • 真的! 不骗你们的喔~ 相信大家都遇到类似于:订单30min后未支付自动取消的开发任务
  • 那么今日份就来了解一下怎么用延时队列 DelayQueue搞定单机版的超时订单

二、延时队列使用场景

那么什么时候需要用延时队列呢?常见的延时任务场景 举栗子:

  1. 订单在30分钟之内未支付则自动取消。
  2. 重试机制实现,把调用失败的接口放入一个固定延时的队列,到期后再重试。
  3. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
  6. 关闭空闲连接,服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
  7. 清理过期数据业务。比如缓存中的对象,超过了空闲时间,需要从缓存中移出。
  8. 多考生考试,到期全部考生必须交卷,要求时间非常准确的场景。

三、解决办法多如鸡毛

  1. 定期轮询(数据库等)
  2. JDK DelayQueue
  3. JDK Timer
  4. ScheduledExecutorService 周期性线程池
  5. 时间轮(kafka)
  6. 时间轮(Netty的HashedWheelTimer)
  7. Redis有序集合(zset)
  8. zookeeper之curator
  9. RabbitMQ
  10. Quartz,xxljob等定时任务框架
  11. Koala(考拉)
  12. JCronTab(仿crontab的java调度器)
  13. SchedulerX(阿里)
  14. 有赞延迟队列
  15. .....(鸡毛)
  • 解决问题方法真是不胜枚举,正所谓一呼百应,一千个读者眼里有一千个哈姆雷特

🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱

  • 那我们第一篇先来实战JDK的DelayQueue,万祖归宗,万法同源,学会了最基础的Queue,就不愁其他的了
  • 后续再写几篇使用Redis,Zk,MQ的一些机制,实战分布式情况下的使用

四、先认亲

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队入队是有方向性的,元素从一端进入,从另一端取出。

其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

一言以蔽之曰 : 延时队列就是用来存放需要在指定时间被处理的元素的队列。

1) DelayQueue 是谁,上族谱

看的出来到DelayQueue这一代已经第五代传人了,

要知道 DelayQueue自幼生在八戒家,长大就往外面拉,熊熊烈火它不怕,水是水来渣是渣。

不过它真的是文韬武略,有一把ReentrantLock就是它的九齿钉耙,抗的死死の捍卫着自己的PriorityQueue.

有典故曰:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
           implements BlockingQueue<E> {
// 用于控制并发的 可重入 全局 锁
private final transient ReentrantLock lock = new ReentrantLock();
// 根据Delay时间排序的 无界的 优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化阻塞通知的线程元素leader,标记当前是否有线程在排队(仅用于取元素时)
private Thread leader = null;
// 条件,用于阻塞和通知的Condition对象,表示现在是否有可取的元素
private final Condition available = lock.newCondition();

       /**
        * 省洛方法代码.....  你们懂我的省洛吗?
        */
复制代码
  • 注释的已经很清楚他们的意思了,也具备了并发编程之艺术的 锁,队列,状态(条件)
  • 他的几个方法也是通过 锁-->维护队列-->出队,入队-->根据Condition进行条件的判断-->进行线程之间的通信和唤起
  • 以支持优先级无界队列的PriorityQueue作为一个容器,容器里面的元素都应该实现Delayed接口,在每次往优先级队列中添加元素时以元素的过期时间作为排序条件,最先过期的元素放在优先级最高。
  • DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

2) 优先级队列 PriorityQueue

因为我们的DelayQueue里面维护了一个优先级的队列PriorityQueue 简单的看下:

    //默认容量11
     private static final int DEFAULT_INITIAL_CAPACITY = 11;
    //存储元素的地方 数组
    transient Object[] queue; // non-private to simplify nested class access
    //元素个数
    private int size = 0;
    //比较器
    private final Comparator<? super E> comparator;
复制代码
  1. 默认容量是11;
  2. queue,元素存储在数组中,这跟我们之前说的堆一般使用数组来存储是一致的;
  3. comparator,比较器,在优先级队列中,也有两种方式比较元素,一种是元素的自然顺序,一种是通过比较器来比较;
  4. modCount,修改次数,有这个属性表示PriorityQueue也是fast-fail的;
  5. PriorityQueue不是有序的,只有堆顶存储着最小的元素;
  6. PriorityQueue 是非线程安全的;

3) DelayQueue的方法简介

  • 入队方法 : 若添加的元素是队首(堆顶)元素,就把leader置为空,并唤醒等待在条件available上的线程;
public boolean add(E e) {    return offer(e);}
public void put(E e) {    offer(e);}
public boolean offer(E e, long timeout, TimeUnit unit) {    return offer(e);}
public boolean offer(E e) {    
    final ReentrantLock lock = this.lock;    
    lock.lock();   //加锁 因为优先队列线程不安全
    try {
        q.offer(e);  //判断优先级 进行入队      
    if (q.peek() == e) {    //-----[1]
        //leader记录了被阻塞在等待队列头生效的线程 新增一个元素到队列头,
        //表示等待原来队列头生效的阻塞的线程已经失去了阻塞的意义
        //,此时需要获取新的队列头进行返回了
        leader = null;      
        //获取队列头的线程被唤起,主要有两种场景:
        //1. 之前队列为空,导致被阻塞的线程
        //2. 之前队列非空,但是队列头没有生效(到期)导致被阻塞的线程
        available.signal();     
    }        
        return true; //因为是无界队列 所以添加元素肯定成功  直到OOM
    } finally {    
        lock.unlock();   //释放锁
    }
}
复制代码

offer()方法,首先获取独占锁,然后添加元素到优先级队列,由于q是优先级队列,所以添加元素后,peek并不一定是当前添加的元素,如果[1]为true,说明当前元素e的优先级最小也就即将过期的,这时候激活avaliable变量条件队列里面的线程,通知他们队列里面有元素了。

  • 出队方法 take()

请看我详细的注释,绝不是蜻蜓点水

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock; //获取锁 
    lock.lockInterruptibly(); //可中断锁     并发类里面凡是调用了await的方法获取锁时候都是使用的lockInterruptibly方法而不是Lock. 
    //也是一种fail-fast思想吧,await()方法会在中断标志设置后抛出InterruptedException异常后退出 不至于死死的等待
    try {
        for (;;) {//会写死循环的都是高手
            E first = q.peek();//get队头元素
            if (first == null)
                // 队列头为空,则阻塞,直到新增一个入队为止(1)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);//获取剩余时间
                if (delay <= 0)
                    // 若队列头元素已生效,则直接返回(2)
                    return q.poll();
                first = null; // don't retain ref while waiting 等待的时候不能引用,表示释放当前引用的(3)
                if (leader != null)
                    // leader 非空时,表示有其他的一个线程在出队阻塞中 (4.1)
                    // 此时挂住当前线程,等待另一个线程出队完成
                    available.await();
                else {
                    //标识当前线程处于等待队列头生效的阻塞中 (4.2.1)
                    Thread thisThread = Thread.currentThread(); 
                    leader = thisThread;
                    try {
                        // 等待队列头元素生效(4.2.2)
                        available.awaitNanos(delay);
                    } finally {
                        //最终释放当前的线程 设置leader为null (4.2.3)
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }     //(5)
    } finally {
        if (leader == null && q.peek() != null)
            // 当前线程出队完成,通知其他出队阻塞的线程继续执行(6)
            available.signal();
            lock.unlock();//解锁结束
    }
}
复制代码

那么,下面的结论肉眼可见:

  1. 如果队列为空,则阻塞,直到有个线程(生产者投递数据)完成入队操作
  2. 获取队列头,若队列头已生效,则直接返回
  3. 未生效则释放当前引用
  4. 当队列头部没有生效时候:
    1. 若有另一个线程已经处于等待队列头生效的阻塞过程中,则阻塞当前线程,直到另一个线程完成出队操作
    2. 若没有其他线程阻塞在出队过程中,即当前线程为第一个获取队列头的线程
      • 标识当前线程处于等待队列头生效的阻塞中(leader = thisThread
      • 阻塞当前线程,等待队列头生效
      • 队列头生效之后,清空标识(leader=null)
  5. 再次进入循环,获取队列头并返回
  6. 最后,当前线程出队完成,通知其他出队阻塞的线程继续执行

4) Leader/Follower模式

  1. 如果不是队首节点,根本不需要唤醒操作!
  2. 假设取值时,延时时间还没有到,那么需要等待,但这个时候,队列中新加入了一个延时更短的,并放在了队首,那么 此时,for循环由开始了,取得是新加入的元素,那之前的等待就白等了,明显可以早点退出等待!
  3. 还有就是如果好多线程都在此等待,如果时间到了,同时好多线程会充等待队列进入锁池中,去竞争锁资源,但结果只能是一个成功, 多了写无畏的竞争!(多次的等待和唤醒)

5)Delayed

public interface Delayed extends Comparable<Delayed> { 
    long getDelay(TimeUnit unit);
}
复制代码

据情报显示:Delayed是一个继承自Delayed的接口,并且定义了一个Delayed方法,用于表示还有多少时间到期,到期了应返回小于等于0的数值。

很简答就是定义了一个,一个哈,一个表延迟的接口,就是个规范接口,目的就是骗我们去实现它的方法.哼~

五、再实战

说了那么多废话,让我想起了那句名言:一切没有代码实操的讲解都是耍流氓 至今深深的烙在我心中,所以我一定要实战给你们看,显得我不是流氓...

  • 实战以 订单下单后三十分钟内未支付则自动取消 为业务场景

  • 该场景的代码逻辑分析如下:

    1. 下单后将订单直接放入未支付的延时队列中
    2. 如果超时未支付,则从队列中取出,进行修改为取消状态的订单
    3. 如果支付了,则不去进行取消,或者取消的时候做个状态筛选,即可避免更新
    4. 或者支付完成后,做个主动出队
    5. 还有就是用户主动取消订单,也做个主动出队
  • 那么我们写代码一定要通用,先来写个通用的Delayed 通用...嗯! 泛型的


import lombok.Getter;
import lombok.Setter;

import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author LiJing
 * @ClassName: ItemDelayed
 * @Description: 数据延迟实现实例 用以包装具体的实例转型
 * @date 2019/9/16 15:53
 */

@Setter
@Getter
public class ItemDelayed<T> implements Delayed {

    /**默认延迟30分钟*/
    private final static long DELAY = 30 * 60 * 1000L;
    /**数据id*/
    private Long dataId;
    /**开始时间*/
    private long startTime;
    /**到期时间*/
    private long expire;
    /**创建时间*/
    private Date now;
    /**泛型data*/
    private T data;
    
    public ItemDelayed(Long dataId, long startTime, long secondsDelay) {
        super();
        this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + (secondsDelay * 1000);
        this.now = new Date();
    }

    public ItemDelayed(Long dataId, long startTime) {
        super();
        this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + DELAY;
        this.now = new Date();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
}
复制代码
  • 再写个通用的接口,用于规范和方便统一实现 这样任何类型的订单都可以实现这个接口 进行延时任务的处理

public interface DelayOrder<T> {


    /**
     * 添加延迟对象到延时队列
     *
     * @param itemDelayed 延迟对象
     * @return boolean
     */
    boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed);

    /**
     * 根据对象添加到指定延时队列
     *
     * @param data 数据对象
     * @return boolean
     */
    boolean addToDelayQueue(T data);

    /**
     * 移除指定的延迟对象从延时队列中
     *
     * @param data
     */
    void removeToOrderDelayQueue(T data);
}
复制代码
  • 来具体的任务,具体的逻辑具体实现
@Slf4j
@Lazy(false)
@Component
public class DelayOwnOrderImpl implements DelayOrder<Order> {

    @Autowired
    private OrderService orderService;

    @Autowired
    private ExecutorService delayOrderExecutor;

    private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>();

    /**
     * 初始化时加载数据库中需处理超时的订单
     * 系统启动:扫描数据库中未支付(要在更新时:加上已支付就不用更新了),未过期的的订单
     */
    @PostConstruct
    public void init() {
        log.info("系统启动:扫描数据库中未支付,未过期的的订单");
        List<Order> orderList = orderService.selectFutureOverTimeOrder();
        for (Order order : orderList) {
            ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
            this.addToOrderDelayQueue(orderDelayed);
        }
        log.info("系统启动:扫描数据库中未支付的订单,总共扫描了" + orderList.size() + "个订单,推入检查队列,准备到期检查...");

        /*启动一个线程,去取延迟订单*/
        delayOrderExecutor.execute(() -> {
            log.info("启动处理的订单线程:" + Thread.currentThread().getName());
            ItemDelayed<Order> orderDelayed;
            while (true) {
                try {
                    orderDelayed = DELAY_QUEUE.take();
                    //处理超时订单
                    orderService.updateCloseOverTimeOrder(orderDelayed.getDataId());
                } catch (Exception e) {
                    log.error("执行自营超时订单的_延迟队列_异常:" + e);
                }
            }
        });
    }

    /**
     * 加入延迟消息队列
     **/
    @Override
    public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) {
        return DELAY_QUEUE.add(orderDelayed);
    }

    /**
     * 加入延迟消息队列
     **/
    @Override
    public boolean addToDelayQueue(Order order) {
        ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
        return DELAY_QUEUE.add(orderDelayed);
    }

    /**
     * 从延迟队列中移除 主动取消就主动从队列中取出
     **/
    @Override
    public void removeToOrderDelayQueue(Order order) {
        if (order == null) {
            return;
        }
        for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
            ItemDelayed<Order> queue = iterator.next();
            if (queue.getDataId().equals(order.getId())) {
                DELAY_QUEUE.remove(queue);
            }
        }
    }
}
复制代码

解释一番上面的写的东东

  1. delayOrderExecutor是注入的一个专门处理出队的一个线程
  2. @PostConstruct是啥呢,是在容器启动后只进行一次初始化动作的一个注解,相当实用
  3. 启动后呢,我们去数据库扫描一遍,防止有漏网之鱼,因为单机版吗,队列的数据是在内存中的,重启后肯定原先的数据会丢失,所以为保证服务质量,我们可能会录音.....所以为保证重启后数据的恢复,我们需要重新扫描数据库把未支付的数据重新装载到内存的队列中
  4. 接下来就是用这个线程去一直不停的访问队列的take()方法,当队列无数据就一直阻塞,或者数据没到期继续阻塞着,直到到期出队,然后获取订单的信息,去处理订单的更新操作

六、后总结

  • 这就是单机的不好处,也是一个痛点,所以肯定是不太适合订单量特别大的场景 大家也要酌情考虑和运用
  • 相对于同等量级的数据库轮询操作来说,真是节省了不少数据库的压力和连接,还是值得一用的,我们可以只保存订单的id到延时实例中,这样缩减队列单个实例内存存储
  • 那还有技巧就是更新的时候注意控制好幂等性,控制好幂等性,会让你轻松很多,顺畅很多,但是数据量大了,要蛀牙的哦

那今日份的讲解就到此结束,具体的代码请移步我的gitHub的mybot项目Master分支查阅,fork体验一把,或者评论区留言探讨,写的不好,请多多指教~~