【并发编程系列9】阻塞队列之PriorityBlockingQueue,DelayQueue原理分析

1,307 阅读7分钟

推荐一个学习Java的UP主:space.bilibili.com/430946906

前言

前面我们介绍了ArrayBlockingQueue,LinkedBlockingQueue,LinkedBlockingDeque三种阻塞队列,今天继续介绍PriorityBlockingQueue和DelayQueue两个阻塞队列,在介绍这两个阻塞队列之前,需要先了解一种数据结构:二叉堆。因为PriorityBlockingQueue内部使用了最小二叉堆算法来保证每次弹出的元素是最小元素,而DelayQueue又依赖于PriorityBlockingQueue。

二叉堆

堆的数据结构是一颗完全二叉树,完全二叉树指的是除了最后一层,其余层均有左右子节点。二叉堆又可以分为最大二叉堆和最小二叉堆。

最大二叉堆的某个结点的值最多与其父结点一样大,最小堆则是某个结点的值最多与其父结点一样小。所以最大堆中最大的结点永远是根结点,最小堆中最小的结点永远是根节点。

用数组(下标从1开始算)来存储二叉堆的话有以下几个特性:

  • 第n个位置的子节点分别在index[2n]和index[2n+1]。如index[1]位置的子节点在index[2]和index[3],而index[2]位置的子节点为index[4]和[5],以此类推。
  • 叶子节点的下标为index[n/2+1]到index[n]。如一个长度为9的数组中,index[5]到index[9]位置的元素均属于叶子节点。
  • 第n个位置(非根节点)的父节点为:n/2

如下图就是一个二叉堆(

圆圈内的数字表示数组下标,并不表示真实元素的值

):
在这里插入图片描述
比如说4这个位置,他的左右子节点就是2

4和2

4+1,即8和9
n/2+1=5,说明从5到n即9都是叶子节点。

堆有三种基本操作:初始化,上沉,下浮(下面以最小二叉堆为例来说明):

  • 初始化:将一个无序的数组初始化成堆。从最后一个非叶子结点开始,将父节点和子节点进行比较,如果父节点大于子节点,则将父节点和子节点替换,确保父节点<=子节点
  • 上沉:用于插入元素。在数组的末尾插入新的元素,然后和父节点比较,如果比父节点大,则插入完成,如果比父节点小,则交换位置,并以此类推。
  • 下浮:用于移除元素。移除头节点元素,此时会将数组末尾的数据拿过来先放到头节点,然后和子节点进行比较,如果比子节点大,则交换位置,以此类推。

注意:二叉堆和二叉树不一样,二叉堆并不保证左右节点的大小

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列(大小受限于内存)。和前面介绍的三种有界队列相比,无界队列的最大区别是即使初始化的时候指定了长度,那么当队列元素达到上限后队列也会自动进行扩容,所以PriorityBlockingQueue在添加元素的时候不会发生阻塞,而如果扩容后的大小超过了内存限制,会抛出OutOfMemoryError错误。

默认情况下PriorityBlockingQueue队列元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者在初始化时,可以指定构造参数Comparator来对元素进行排序。
注意:PriorityBlockingQueue不能保证相同优先级元素的顺序(即两个值排序一样时,不保证顺序)。

下面还是先来看看PriorityBlockingQueue类图:
在这里插入图片描述
可以看到提供了4个 构造器:

  • PriorityBlockingQueue():
    初始化一个默认大小(11)长度的队列,并使用默认自然排序。
  • PriorityBlockingQueue(int):
    初始化一个指定大小的长度的队列,并使用默认自然排序。
  • PriorityBlockingQueue(int,Comparator):
    初始化一个指定大小的队列,并按照指定比较器进行排序。
  • PriorityBlockingQueue(Collection):
    根据传入的集合进行初始化并堆化,如果当前集合是SortedSet或者PriorityBlockingQueue类型,则保持原有顺序,否则使用自然排序进行堆化。

初始化

前面两个构造器最后都会调用第三个构造器去初始化一个队列:
在这里插入图片描述
我们看到只有一个Condition队列,这个是用来阻塞出队线程的,入队线程不会被阻塞。
接下来我们主要看看第4个构造器,是如何初始化一个队列的:

public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; //true表示需要堆化即需要重排序
        boolean screen = true;  //true表示需要筛选空值
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;//如果比较器是SortedSet类型则不需要堆化
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;//如果比较器是PriorityBlockingQueue类型则不需要筛选空值
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;//如果pq就是一个PriorityBlockingQueue则不需要堆化
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);//如果c.torray()失败,重新复制一个数组
        if (screen && (n == 1 || this.comparator != null)) {//??
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();//堆化(排序)
    }

上面标注了?的if判断,没想到什么场景会发生,如果有知道的,恳请留言告知,非常感谢!

这一段代码看起来很长,实际上就是将指定的集合赋值给队列,并确认排序规则,如果需要排序,则调用heapify()方法,这个初始化排序才是关键:
在这里插入图片描述
下图就是一个数组[8,5,2,7,6,4,1,9,3]的二叉堆表现形式:

在这里插入图片描述
首先看到代码437行,从二叉堆的特性知道,二叉堆的初始化会从最后一个非叶子节点开始,也就是n/2开始,但是因为这种算法是基于元素从1开始算的,而数组是从0开始,所以这里需要减1,也就是从下图中的位置3(元素7)开始往前面循环。

后面就是两个排序规则判断,代码逻辑是一样的,我们进入siftDownComparable方法,这个方法主要就是完成元素的下沉操作
在这里插入图片描述
主要逻辑为:

  1. 将当前循环节点的左右子节点比较,确保拿到最小子节点的下标child
  2. 再将child对应的元素和父节点比较,确保父节点<最小子节点
  3. 最后会再次确认当前元素与最小子节点(可能是左也可能是右)的子节点(如果有的话)进行大小比较,依此类推,完成元素下沉。
**注意:除了阻塞队列,我还分享了最新Java架构项目实战教程+大厂面试题库,有兴趣的[点击此处](https://docs.qq.com/doc/DVEVEeWJudlpGRmVY)免费获取,没基础勿进!**

第一次下沉

从元素7开始循环,首先将元素9和元素3比较,发现9>3,临时变量替换一下,
然后将元素7和元素3比较,发现7>3,所以直接将父节点和右子节点替换,完成了第一次循环(因为子节点已经是叶子节点了,所以不满足二次循环条件)。
在这里插入图片描述

第二次下沉

第二次循环就到了下标2的位置,也就是元素2,和第一次循环类似,因为子节点是叶子节点,所以也是一次循环就结束,直接完成了父节点和最小子节点的替换,升级过程如下:
在这里插入图片描述

第三次下沉

第三次循环就到了下标1的位置,也就是元素5,这时候因为左子节点本来就小于右子节点,所以不需要临时替换,直接比较左子节点和父子节点,注意这里图2是一个临时过程,因为是首先将左子节点赋值给父节点,然后发现左子节点下面还有子节点会再进行一次循环,直到通过break跳出循环之后才会将5赋值给左子节点,完成替换:
在这里插入图片描述

第四次下沉

第四次循环就到了下标0的位置,也就是元素8,首先完成1和3的替换,然后完成3和8的替换:
在这里插入图片描述
这时候因为最小子节点的下标是2,2<half,所以会再次循环(注意再次循环的时候还是拿最开始的元素8来和左右子节点进行比较),然后又会将8和2进行替换,将元素2赋值到下标2的位置,然后这时候不满足循环条件了,结束循环,这时候才正式将元素8赋值到下标6的位置:
在这里插入图片描述
上图中两个流程可以看到,元素8会一路下沉到最后。

到这里完成了初始化排序,最终数组由:[8,5,2,7,6,4,1,9,3]变为[1,3,2,5,6,4,8,9,7]。

添加元素(生产者)

put(E)方法会调用offer(E)方法,上一篇阻塞队列的文章中,我们知道,offer(E)方法是不阻塞的,而这里是无界数组也不会阻塞,所以直接调用offer(E)方法就可以了:
在这里插入图片描述
这里逻辑比较简单,首先看有没有越界,越界了就先进行扩容,扩容放在后面讲。
然后添加元素主要就是进行上浮过程,进入默认的排序规则上浮方法siftUpComparable:
在这里插入图片描述
还是用上面排序后的二叉堆,假如我们现在添加一个元素4,会得到下面这样一个二叉堆:
在这里插入图片描述
这时候为了确保新添加的元素按照排序规则不会比根节点小,需要将新添加的元素进行上浮操作。

第一次上浮

发现4<6,所以将6放到队尾,注意这时候4并不会赋值到队列中,因为4还需要继续上浮确认放在哪个位置
在这里插入图片描述

第二次上浮

第二次上浮会发现4<3,不满足所以会跳出循环,确认将4放在了下标4的位置,完成插入元素操作
在这里插入图片描述

获取元素(消费者)

调用take()方法获取元素
在这里插入图片描述
主要看dequeue()方法:
在这里插入图片描述
这个方法的主要逻辑为:
1、先拿到第一个元素(需要返回)和最后一个元素
2、然后将最后一个元素置为空
3、用存好的最后一个元素的值从头开始下沉
最后一步下沉操作和初始化的最后一步下沉操作是一样的处理方式,直到完成下沉就会诞生一个最小的元素重新放到头节点

扩容

最后我们来分析下扩容tryGrow方法

    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); //扩容前先释放锁(扩容可能会费时,先让出锁,让出队线程可以正常操作)
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {//通过CAS操作确保只有一个线程可以扩容
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {//大于当前最大容量则可能溢出
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)//扩大一个元素也溢出或者超过最大容量则抛出异常
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;//扩容后如果超过最大容量,则只扩大到最大容量
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];//根据最新容量初始化一个新数组
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) //如果是空,说明前面CAS失败,有线程在扩容,让出CPU
            Thread.yield();
        lock.lock();//这里重新加锁是确保数组复制操作只有一个线程能进行
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);//将旧的元素复制到新数组
        }
    }

DelayQueue

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列中使用PriorityQueue来实现。队列中的元素必须实现Delayed接口:
在这里插入图片描述
接口里定义了一个getDelay方法来获取当前剩余的过期时间,另外因实现了Comparable接口,所以还会有一个compareTo方法。

DelayQueue使用示例

1、新建一个对象,实现Delayed ,并重写getDelay和compareTo

package com.zwx.concurrent.queue.block.model;

import java.sql.Time;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class MyElement implements Delayed {
    private long expireTime;//过期时间(毫秒)
    private int id;

    public long getExpireTime() {
        return expireTime;
    }

    public void setExpireTime(long expireTime) {
        this.expireTime = expireTime;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public MyElement(int id, long expireTime) {
        this.id = id;
        this.expireTime = System.currentTimeMillis() + expireTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        //类里面接收的是毫秒,但是getDelay方法在DelayQeue里面传的是纳秒,所以这里需要进行一次单位转换
        return unit.convert(expireTime - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        //注意,这里的排序要确定最先到期的放在第一位,否则会阻塞住后面未到期的
        return Long.valueOf(expireTime).compareTo(((MyElement) o).expireTime);
    }
}

package com.zwx.concurrent.queue.block;

import com.zwx.concurrent.queue.block.model.MyElement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;

public class DelayQueueDemo {
    public static void main(String[] args) {
        List<MyElement> list = new ArrayList<>();
        for (int i=1;i<=5;i++){
            MyElement myElement = new MyElement(i,i*1000);
            list.add(myElement);
        }
        DelayQueue delayQueue = new DelayQueue(list);

        while (true){
            try {
                MyElement myElement = (MyElement) delayQueue.take();
                System.out.println(myElement.getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

DelayQueue类图

接下来看看类图
在这里插入图片描述
只有两个构造器,第一个是空的构造器,第二个是默认初始化一个集合。

初始化

在这里插入图片描述
通过循环调用add(e)方法进行添加,然后add方法又去调用了offer(e)方法:
在这里插入图片描述

添加元素(消费者)

DelayQueue队列的元素是存在其内部维护的PriorityQueue上,所以上面调用了q.offer(e)方法。
leader表示获取到锁的线程。q.peek()==e表示当前第一个元素就是刚刚添加进去的元素,所以需要将leader设置为空,唤醒出队(消费者)线程重新争抢锁。

q.offer(e)方法的处理方式基本和上面讲的PriorityBlockingQueue中逻辑一致
在这里插入图片描述

获取元素(消费者)

take方法会依次获取元素,如果第一个元素没到期,则会一直阻塞:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();//队列为空,则阻塞
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();//如果到期了,则调用poll方法取元素并直接返回
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();//头节点不为空,说明有线程持有锁并正在等待到期时间,所以直接阻塞
                    else {//leader==null
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;//设置头节点为当前线程,表名有线程在等待头节点元素过期
                        try {
                            available.awaitNanos(delay);//阻塞指定时间
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

Leader-Follower线程模型

在Leader-follower线程模型中每个线程有三种模式:

  • leader:只有一个线程成为leader,如DelayQueue如果有一个线程在等待元素到期,则其他线程就会阻塞等待
  • follower:会一直尝试争抢leader,抢到leader之后才开始干活
  • processing:处理中的线程

DelayQueue队列中有一个leader属性:private Thread leader = null;用到的就是Leader-Follower线程模型。
当有一个线程持有锁,设置了leader属性,正在等待元素到期时,则成为了leader,其他线程就直接阻塞。