阅读 325

Java并发——阻塞队列集(上)

简介

阻塞队列是一个支持两个附加操作的队列,这两个附加操作支持阻塞的插入和移除方法
①.支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直至队列不满
②.支持阻塞的移除方法:当队列空时,获取元素的线程会等待队列变为非空

在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如下

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

阻塞队列

ArrayBlockingQueue:由数组结构组成的有界阻塞队列
LinkedBlockingQueue:由链表结构组成的有界阻塞队列
PriorityBlockingQueue:支持优先级排序的无界阻塞队列
DelayQueue:使用优先级队列实现的无界阻塞队列
SynchronousQueue:不存储元素的阻塞队列
LinkedTransferQueue:由链表结构组成的无界阻塞队列
LinkedBlockingDeque:由链表结构组成的双向阻塞队列

ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列,队列按照先进先出(FIFO)原则对元素进行排序。默认采用不公平访问,因为公平性通常会降低吞吐量。

主要属性


    private static final long serialVersionUID = -817911632652898426L;
    /** 数组用来维护ArrayBlockingQueue中的元素 */
    final Object[] items;
    /** 出队首位置索引 */
    int takeIndex;
    /** 入队末位置索引 */
    int putIndex;
    /** 元素个数 */
    int count;
    
    final ReentrantLock lock;
    /** 出队等待队列 */
    private final Condition notEmpty;
    /** 入队等待队列 */
    private final Condition notFull;
复制代码

put

ArrayBlockingQueue提供了很多方法入队:add()、offer()、put()等。我们以阻塞式方法为主,put()方法其源码如下


    public void put(E e) throws InterruptedException {
        // 校验元素是否为空
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 响应中断式获取同步,若线程被中断会抛出异常
        lock.lockInterruptibly();
        try {
            // 当队列已满,将线程添加到notFull等待队列中
            while (count == items.length)
                notFull.await();
            // 若没有满,进行入队    
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
复制代码

当队列满时,会调用Condition的await()方法将线程添加到等待队列中。若队列未满调用enqueue()进行入队操作(所有入队方法最终都将调用该方法在队列尾部插入元素


    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        // 入队
        items[putIndex] = x;
        // 当数组添加满后,重新从0开始
        if (++putIndex == items.length)
            putIndex = 0;
        // 元素个数+1    
        count++;
        // 唤醒出队等待队列中的线程
        notEmpty.signal();
    }
复制代码

take

出队方法有:poll()、remove(),take()等,take()方法其源码如下


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 响应中断式获取同步,若线程被中断会抛出异常
        lock.lockInterruptibly();
        try {
            // 若队列空,将线程添加到notEmpty等待队列中
            while (count == 0)
                notEmpty.await();
            // 获取数据    
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
复制代码

当队列为空,会调用condition的await()方法将线程添加到notEmpty等待队列中,若队列不为空则调用dequeue()获取数据


    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 获取数据
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 元素个数-1    
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 通知入队等待队列中的线程
        notFull.signal();
        return x;
    }
复制代码

从源码中可以发现ArrayBlockingQueue通过condition的等待唤醒机制完成可阻塞式的入队和出队

LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序

主要属性


    /** 容量 */
    private final int capacity;
    /** 元素个数 */
    private final AtomicInteger count = new AtomicInteger();
    /** 头节点 */
    transient Node head;
    /** 尾节点 */
    private transient Node last;
    /** 出队锁 */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** 出队等待队列 */
    private final Condition notEmpty = takeLock.newCondition();
    /** 入队锁 */
    private final ReentrantLock putLock = new ReentrantLock();
    /** 入队等待队列 */
    private final Condition notFull = putLock.newCondition();
复制代码

从属性上来看LinkedBlockingQueue维护两个锁在入队和出队时保证线程安全,两个锁降低线程由于线程无法获取lock而进入WAITING状态的可能性提高了线程并发执行的效率,并且count属性使用AtomicInteger原子操作类(可能两个线程一个出队一个入队操作count,各自的锁显然起不到用处)

put


    public void put(E e) throws InterruptedException {
        // 若新增元素为null抛异常
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        // 获取当前元素个数
        final AtomicInteger count = this.count;
        // 响应中断式获取锁,若线程被中断会抛出异常
        putLock.lockInterruptibly();
        try {
            // 若当前队列已满,将线程添加到notFull等待队列中
            while (count.get() == capacity) {
                notFull.await();
            }
            // 若没有满,进行入队
            enqueue(node);
            // 元素个数+1
            c = count.getAndIncrement();
            // 若当前元素个数+1还未到定义的最大容量,则唤醒入队等待队列中的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
 复制代码

take


    public E take() throws InterruptedException {
        E x;
        int c = -1;
        // 获取当前元素个数
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 响应中断式获取锁,若线程被中断会抛出异常
        takeLock.lockInterruptibly();
        try {
            // 若当前队列为空,则将线程添加到notEmpty等待队列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 获取数据
            x = dequeue();
            // 当前元素个数-1
            c = count.getAndDecrement();
            // 若队列中还有元素,唤醒阻塞的出队线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
 复制代码

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列,虽然无界但由于资源耗尽,尝试的添加可能会失败(导致OutOfMemoryError ),默认情况下元素采取自然顺序升序排序,也可以通过构造函数来指定Comparator来对元素进行排序,需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序

主要属性


    /** 默认容量 */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    /** 最大容量 */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    /** 内置数组 */
    private transient Object[] queue;
    /** 元素个数 */
    private transient int size;
    /** 比较器,为空则自然排序 */
    private transient Comparator comparator;
    
    private final ReentrantLock lock;
    /** 出队等待队列 */
    private final Condition notEmpty;
    /** 用于CAS扩容时用 */
    private transient volatile int allocationSpinLock;

    private PriorityQueue q;
复制代码

可以发现PriorityBlockingQueue只有一个condition,因为PriorityBlockingQueue是一个无界队列,插入始终成功,也正因为此所以其入队用lock.lock()方法不响应中断,而出队用lock.lockInterruptibly()响应中断式获取锁

put


    public void put(E e) {
        // 不需要阻塞
        offer(e); // never need to block
    }
    
    public boolean offer(E e) {
        // 判空
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lock();
        int n, cap;
        Object[] array;
        // 若大于等于当前数组长度则扩容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            // 获取比较器
            Comparator cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            // 元素个数+1    
            size = n + 1;
            // 唤醒
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
复制代码

tryGrow扩容


    private void tryGrow(Object[] array, int oldCap) {
        // 必须先释放锁
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        // CAS设置占用
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                // 新容量
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                //  新容量若超过最大值                  
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                // 若新容量大于旧容量且当前数组相等,创建新容量数组
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        // CAS设置allocationSpinLock失败,表明有其他线程也正在扩容,让给其他线程处理
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        // 获取锁    
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            // 数组复制
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }
复制代码

从源码中可以发现为了尽可能提高并发效率,先释放锁在计算新容量时利用CAS设置allocationSpinLock来保证线程安全,再最后获取锁进行数组复制扩容。扩容完后,根据比较器的排序规则进行新增

siftUpComparable(),比较器comparator为null时采取自然排序调用此方法


    private static  void siftUpComparable(int k, T x, Object[] array) {
        Comparable key = (Comparable) x;
        // 若当前元素个数大于0,即队列不为空
        while (k > 0) {
            // (n - 1) / 2
            int parent = (k - 1) >>> 1;
            // 获取parent位置上的元素
            Object e = array[parent];
            // 从队列的最后往上调整堆,直到不小于其父节点为止
            if (key.compareTo((T) e) >= 0)
                break;
            // 如果当前节点小于其父节点,则将其与父节点进行交换,并继续往上访问父节点    
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
复制代码

此方法为建堆过程,假定PriorityBlockingQueue内部数组如下:

转换为堆(堆是一种二叉树结构):

往其添加元素2,k为当前元素个数12,计算parent为5,e为6,e大于2,交换位置

第二次循环,k=5,parent=2,e=5,5>2交换位置

第三次循环,k=2,parent=0,e=1,1<2退出循环,第2个位置给新元素2

其主要思路 末位置寻找其父节点,若新增元素小于父节点则将其与父节点进行交换,并继续往上访问父节点,直到大于等于其父节点为止

siftUpUsingComparator(),当比较器不为null,采用指定比较器,调用此方法


    private static  void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }
复制代码

take


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
复制代码

获取锁后,调用dequeue()


    private E dequeue() {
        // 若队列为空,返回null
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            // 出队元素,首元素
            E result = (E) array[0];
            // 最后一个元素
            E x = (E) array[n];
            array[n] = null;
            Comparator cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }
复制代码

自然排序处理siftDownComparable()


    private static  void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable key = (Comparable)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                // 左节点
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                // 右节点
                int right = child + 1;
                if (right < n &&
                    ((Comparable) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }
复制代码

指定排序siftDownUsingComparator()


    private static  void siftDownUsingComparator(int k, T x, Object[] array,
                                                    int n,
                                                    Comparator cmp) {
        if (n > 0) {
            int half = n >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                Object c = array[child];
                int right = child + 1;
                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                    c = array[child = right];
                if (cmp.compare(x, (T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = x;
        }
    }
复制代码

以上面最后一个图为基础出队第一个元素

第一次循环:k=0,n=12,half=6,child=1,c为图中节点3,right=2,经过子节点比较找出较小值2,2与末尾值节点6相比,末位置更大,首位置与右子节点交换位置

第二次循环:k=2,child=5,c为图中节点5,right=6,经过子节点比较找出较小值5,5与末位置节点6相比,末位置更大,与左子节点交换位置

第三次循环:k=5,child=11,c为图中节点8,right=12,经过子节点比较找出较小值末位置节点6相比

其主要思路:首位置寻找其子节点,找出两个子节点的较小的与末尾位置节点比较若末尾节点小,则将其置入首位置,否则首位置与较小子节点替换位置,以此略推继续往下找

DelayQueue

DelayQueue是一个支持延时获取元素的无界阻塞队列,队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素,可以将其应用在缓存、定时任务调度等场景

Delayed接口

DelayQueue队列中的元素必须实现Delayed接口,我们先看Delayed接口继承关系

从图中我们可以知道,实现Delayed接口,我们必须实现其自定义的getDelay()方法以及继承过来的compareTo()方法

主要属性


    private final transient ReentrantLock lock = new ReentrantLock();
    /** 优先级队列 */
    private final PriorityQueue q = new PriorityQueue();

    private Thread leader = null;

    private final Condition available = lock.newCondition();
复制代码

put


    public void put(E e) {
        offer(e);
    }
    
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 向PriorityQueue添加元素
            q.offer(e);
            // 若当前元素
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
复制代码

其添加操作基于PriorityQueue的offer方法


    public boolean offer(E e) {
        // 判空
        if (e == null)
            throw new NullPointerException();
        // 修改次数
        modCount++;
        int i = size;
        // 判断是否需要扩容
        if (i >= queue.length)
            grow(i + 1);
        // 元素个数+1    
        size = i + 1;
        // 若队列为空,首元素置为e
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);
        return true;
    }
    
    private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        // 自然排序
        else
            siftUpComparable(k, x);
    }
    
    /**
     * 自然排序
     */
    private void siftUpComparable(int k, E x) {
        Comparable key = (Comparable) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }
    
    /**
     * 指定比较器
     */
    private void siftUpUsingComparator(int k, E x) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
    }
复制代码

PriorityQueue的自然排序或指定比较器处理新增操作与PriorityBlockingQueue的逻辑差不多,这里就不再过多分析,但是从源码我们发现了modCount,表明PriorityQueue是线程不安全的,但是由于DelayQueue可以依靠ReentrantLock来确保同步安全。新增完后会判断新增元素是否为队列首元素,若是将leader设置为空,并唤醒所有等待线程

take


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 死循环
            for (;;) {
                // 获取队列首元素,若队列为空返回null
                E first = q.peek();
                // 若队列为空
                if (first == null)
                    available.await();
                else {
                    // 获取剩余延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 若小于0表明已过期,出队
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // 若leader!= null 表明有其他线程正在操作
                    if (leader != null)
                        available.await();
                    else {
                        // 否则将leader置为当前线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 指定时间等待
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
复制代码

整体出队逻辑不再多述,来说下leader和first

  • leader
  • 从源码我们可以看到leader属性在put()与take()方法中都有出现,其作用在于减少不必要的竞争,若leader不为空说明已经有线程正在操作,直接一直等待即可没必要再争。举个例子假定有线程A、B、C依次要出队,线程A先获取锁由于首元素未过期,指定剩余时间等待,若不采用leader直接一直等待,线程B和C也指定时间等待,那么会造成三个线程同时竞争首元素,本来A→B→C的顺序可能导致乱序不是线程所想要的元素

  • first
  • 在take()方法中为什么要将first置为null,英文注解当等待时不要持有依赖。若不置空假定线程A等待,其栈帧中会存有first局部变量所指元素引用,线程B请求仍然等待其栈帧也存有first局部变量所指元素引用,以此略推后来线程等待后栈帧中都会存有,那么当线程A成功出队首元素,其他线程依然占有其引用,导致一直回收不了,这样就可能会造成内存泄漏

    感谢

    《java并发编程的艺术》
    cmsblogs.com/?p=2407

    关注下面的标签,发现更多相似文章
    评论