【并发编程系列8】阻塞队列之ArrayBlockingQueue,LinkedBlockingQueue,LinkedBlockingDeque原理分析

1,634 阅读8分钟

整理了一份Java JVM 多线程 MySQL Redis Kafka Docker RocketMQ Nginx MQ队列 数据结构 并发编程 并发压测 秒杀架构等技术知识点PDF,如果你有需要的话,可以点击这里领取

什么是阻塞队列

阻塞队列有两个特点:

  • 当队列中没有元素时,从队列中获取元素会被阻塞
  • 当队列满了时,添加元素会被阻塞

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素,消费者则从队列里取元素。

队列Queue接口核心方法

阻塞队列,本质上来说还是属于队列,也就是说阻塞队列继承了队列的功能,这里我们先来看看Queue接口中的几个核心方法:

方法功能
add(e)添加一个元素,成功返回true,如果空间满了则抛出异常
offer(e)添加一个元素,成功返回true,如果空间满了则返回false,处理有界队列时优于add方法
remove()检索并移除队列头元素,成功则返回移除的元素,如果队列为空则抛出异常
poll()检索并移除队列头元素,成功则返回移除的元素,如果队列为空则返回null
element()检索并返回队列头元素,如果队列为空则抛出异常
peek()检索并返回队列头元素,如果位列为空则返回null

这几个方法是队列接口所提供的,然而这些方法并不会阻塞,所以需要重新定义阻塞队列的接口,下面我们看看阻塞队列中的核心方法。

阻塞队列BlockigQueue接口核心方法

方法功能
put(e)添加一个元素,成功返回true,**如果空间满了则阻塞等待**
offer(e,time,unit)添加一个元素,成功返回true,**如果空间满了则阻塞指定时间**,到达指定时间还没空间则返回null
take()检索并移除队列头元素,成功则返回移除的元素,**如果队列为空则阻塞**
poll(time,unit)检索并移除队列头元素,成功则返回移除的元素,**如果队列为空则阻塞指定时间**,到达指定时间后队列还是空则返回null
drainTo(Collection)一次性获取队列所有元素放到指定的集合中,并返回转移个数
drainTo(c,n)一次性获取队列中指定个数的元素放到指定的集合中,并返回转移个数
remainingCapacity()返回队列中理想情况下可添加元素个数

在Java中,提供了7种常用的阻塞队列。

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界阻塞队列
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列实现的无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下采用非公平锁的方式实现,可以通过构造器传参控制是采用公平锁还是非公平锁实现。先看看ArrayBlockingQueue类图关系:
在这里插入图片描述
可以看到有3个构造器,其实最终都是会调用上图中第二个构造器进行初始化,第三个构造器在初始化之后会再进行赋值(如果传入的Collection不为空)。

ArrayBlockingQueue nonFairQueue = new ArrayBlockingQueue(10);//默认非公平锁实现
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(10,true);//true表示公平锁

模拟实现生产者消费者

package com.zwx.concurrent.queue.block;

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(100);//默认非公平锁实现
        new Thread(new ConsumerThread(queue)).start();
        Thread.sleep(2000);
        new Thread(new ProcuctThread(queue)).start();
    }
}
    class ProcuctThread extends Thread{
        private ArrayBlockingQueue queue;
        public ProcuctThread(ArrayBlockingQueue queue) {
            this.queue = queue;
        }
        @Override
        public void run() {
           for (int i=0;i<100;i++){
               try {
                   queue.put(i);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
        }
    }

    class ConsumerThread extends Thread{
        private ArrayBlockingQueue queue;
        public ConsumerThread(ArrayBlockingQueue queue) {
            this.queue = queue;
        }
        @Override
        public void run() {
            while (true){
                try {
                    int a = (int)queue.take();
                    System.out.println("消费:" + a);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

上面的示例中,我们先启动了消费者模式,运行之后可以发现,因为队列是空的,所以会阻塞等待生产者添加元素之后输出。

初始化队列

首先是初始化两个Condition队列分别用于阻塞生产者和消费者线程

在这里插入图片描述

添加元素(生产者)

添加元素的时候,如果队列满了,就阻塞,不满则调用enque方法添加元素
在这里插入图片描述
获取元素时通过内部维护的putIndex逐个往后添加元素,满了则重新从0开始
在这里插入图片描述

获取元素(消费者)

首先会判断队列是不是空了,空了就阻塞,否则调用dequeue方法获取元素
在这里插入图片描述
调用dequeue方法移除元素,并唤醒添加元素线程。
在这里插入图片描述

LinkedBlockingQueue

一个由链表结构组成的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序,和ArrayBlockingQueue的区别是ArrayBlockingQueue内部维护的是一个数组,通过数组下标来维护队列,而LinkedBlockingQueue维护的是一个链表,通过Node来维护队列。还是先来看看类图关系:
在这里插入图片描述
LinkedBlockingQueue依然有3个构造函数,第一个和第三个构造器最终也是会调用第二个构造器,而且默认会初始化一个Integer.MAX_VALUE大小队列,第三个构造器在初始化队列之后会进行赋值(如果传入的Collection不为空)。

初始化队列

在这里插入图片描述
Node是LinkedBlockingQueue中的一个静态内部类:
在这里插入图片描述
所以第一次初始化之后Node节点中的item是默认为null值的。初始化之后得到如下队列:
在这里插入图片描述
这个头节点也是一个哨兵,和AQS同步队列一样,设置了一个空信息的节点作为哨兵。

添加元素(生产者)

在这里插入图片描述
添加元素获取的是putLock,后面可以看到,获取元素获取的的takeLock,采用了读写双锁分离方式实现性能上的提升。
再看下添加元素的enqueue方法:
在这里插入图片描述
添加元素后得到如下队列:
在这里插入图片描述

获取元素(消费者)

在这里插入图片描述
这里消费者获取的是另一把锁takeLock,这里的逻辑也应该是比较容易看懂,我们进入真正获取元素的dequeue方法:
在这里插入图片描述
1、我们先看219行,执行之后得到如下队列:
在这里插入图片描述2、继续看221行代码,执行之后得到如下结果:
在这里插入图片描述
经过这两步,其实原先的E1节点已经被移除了,那么从上图可以看到,原先旧的head节点中next还持有了当前head节点的引用,这时候根据GC的可达性算法,是无法回收的,所以需要将next的引用去掉(也就是上面218行代码的作用),这样Node节点就没有持有其他对象的引用了,GC就可以将其当做垃圾回收掉,这个和之前在AQS同步队列以及Condition队列中做法是一个意思,都是为了取消其引用,方便GC。

LinkedBlockingDeque

LinkedBlockingDeque是和LinkedBlockingQeque一样均是由链表结构组成,也就是说内部都是通过一个Node内部类来实现聊表,而LinkedBlockingDeque是双向的阻塞队列,故而Node中肯定会比单向的多了一个prev指向前一个节点。

双向队列因为多了一个操作队列的入口,所以相比较于LinkedBlockingQeque单向队列中多了addFirst、 addLast、offerFirst、offerLast、peekFirst和peekLast等方法。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst,而take方法却等同于takeFirst,这些事实需要注意的,为了避免混乱,建议使用的时候还是带上First和Last关键字。
首先还是来看一看类图:
在这里插入图片描述
可以看出相比较于单项队列,多了一个Deque(双向)接口,构造器和单向列表一样,也是提供了3个。

初始化队列

在这里插入图片描述
这里可以看到,初始化的时候没有设置任何节点,仅仅只是设置了一个容量。

添加元素(生产者)

从First添加

在这里插入图片描述
这里基本没有逻辑,主要看linkFirst(Node)方法:
在这里插入图片描述

从Last添加

在这里插入图片描述
这个方法也是一样没有逻辑,主要看linkLast(Node)方法:
在这里插入图片描述

获取元素(消费者)

从First获取

在这里插入图片描述
继续看unlinkFirst这个方法:
在这里插入图片描述
这些逻辑和上面的单向队列是差不多的,只是多了一个prev指向

从Last获取

在这里插入图片描述
继续看unlinkLast()方法:
在这里插入图片描述

总结

本文主要讲述了Java提供的7种队列中的其中三种队列,这三种队列实现方式比较接近,而且源码比较容易理解。