实现生产者消费者模式的三种方式

1,419 阅读3分钟

什么是生产者消费者模式

简单来说,生产者消费者模式就是缓冲区。

image.png 那么这么做有两个好处,一个是解耦,第二个是平衡生产能力和消费能力的差,因为生产者和消费者的速度是不一样的,有了这个缓冲区就可以平衡这样一个落差,达到动态平衡。

那么这个缓冲区其实就是一个队列,它的规则就是当队列是满的时候,生产者会被阻塞。当队列为空的时候,消费者会被阻塞, 在java中实现生产者消费者模式有多种方式,主要是线程间的通信,这里介绍三种:wait/notify、await/signal和阻塞队列。

wait/notify

wait的时候阻塞当前线程,然后释放锁,这时候消费者就有机会抢占到锁了,代码如下:

//waitnotify模式的生产者消费者,泛型类
public class WaitNotifyProducerConsumer<T> {

    //储存元素的队列
    public ArrayList<T> list = new ArrayList<>();

    //队列最大长度5
    private int maxSize = 5;

    //生产者
    public void put(T item) throws InterruptedException {
        //锁住
        synchronized (list) {
            //如果已经达到5了,证明满了,阻塞线程,释放锁
            if (list.size() == maxSize) {
                list.wait();
            }
            //添加队列元素
            list.add(item);
            System.out.println(Thread.currentThread().getName() + "生产元素" + item + ",队列里还剩" + list.size() + "个元素");
            //通知消费者
            list.notify();
        }
    }


    public T take() throws InterruptedException {
        //锁住,和生产者共用一把锁
        synchronized (list) {
            //如果队列为空,阻塞线程释放锁,等待生产者放进去了唤醒
            if (list.isEmpty()) {
                list.wait();
            }
            //队列,先进先出,取出第一个元素
            T item = list.remove(0);
            System.out.println(Thread.currentThread().getName() + "消费元素" + item + ",队列里还剩" + list.size() + "个元素");
            //通知生产者
            list.notify();
            return item;
        }
    }


    public static void main(String[] args) throws InterruptedException {
        WaitNotifyProducerConsumer<String> wnpc = new WaitNotifyProducerConsumer<>();
        //生产者线程,十轮
        Thread t1 = new Thread(() -> {
            Random random = new Random();
            for (int i = 1; i <= 10; i++) {
                //元素
                String item = "item-" + i;
                try {
                    //如果队列满了,put会阻塞
                    wnpc.put(item);
                    //随机休息
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        //让生产者先启动生产数据
        Thread.sleep(1000);
        Thread t2 = new Thread(() -> {
            Random random = new Random();
            //自旋
            for (; ; ) {
                try {
                    //消费元素
                    String item = wnpc.take();
                    //随机休息
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t2.start();
    }
}

执行一下,生产者和消费者轮流生产消费元素。

image.png

await/signal

await和signal是Condition里面的,作用和synchronized一样,await让线程阻塞,并且释放锁,signal唤醒阻塞的线程,代码如下:

//Condition实现生产者消费者模式,泛型类
public class ConditionProducerConsumer<T> {

    //锁
    private Lock lock = new ReentrantLock();

    //储存元素的队列
    public ArrayList<T> list = new ArrayList<>();

    //队列最大长度5
    private int maxSize = 5;

    //使用两个条件队列condition来实现精确通知,生产者condition
    private final Condition producerCondition = lock.newCondition();

    //消费者condition
    private final Condition consumerCondition = lock.newCondition();


    //生产者
    public void put(T item) throws InterruptedException {
        //锁住
        lock.lock();
        try {
            //如果队列已满,阻塞当前线程,添加到生产者Condition等待队列,释放锁
            if (list.size() == maxSize) {
                producerCondition.await();

            }
            //添加元素
            list.add(item);
            System.out.println(Thread.currentThread().getName() + "生产元素" + item + ",队列里还剩" + list.size() + "个元素");
            //唤醒消费者Condition队列阻塞的线程
            consumerCondition.signal();
        } finally {
            lock.unlock();
        }
    }


    public T take() throws InterruptedException {
        //加锁
        lock.lock();
        try {
            //如果队列为空,释放锁,阻塞线程,添加到消费者等待队列
            if (list.isEmpty()) {
                consumerCondition.await();

            }
            //取出第一个元素
            T item = list.remove(0);
            System.out.println(Thread.currentThread().getName() + "消费元素" + item + ",队列里还剩" + list.size() + "个元素");
            //唤醒生产者队列阻塞的线程
            producerCondition.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        ConditionProducerConsumer<String> cpc = new ConditionProducerConsumer<>();
        //生产者线程,十轮
        Thread t1 = new Thread(() -> {
            Random random = new Random();
            for (int i = 1; i <= 10; i++) {
                //元素
                String item = "item-" + i;
                try {
                    //如果队列满了,put会阻塞
                    cpc.put(item);
                    //随机休息
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        //让生产者先启动生产数据
        Thread.sleep(1000);
        Thread t2 = new Thread(() -> {
            Random random = new Random();
            //自旋
            for (; ; ) {
                try {
                    //消费元素
                    String item = cpc.take();
                    //随机休息
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t2.start();
    }

}

运行一下:

image.png

阻塞队列

JUC中的阻塞队列

  • ArrayBlockingQueue 基于数组结构
  • LinkedBlockingQueue 基于链表结构
  • PriorityBlcokingQueue 基于优先级队列
  • DelayQueue 允许延时执行的队列
  • SynchronousQueue 没有任何存储结构的的队列,它的应用场景是newCachedThreadPool()线程池。可以处理非常大请求的任务,1000个任务过来,那么线程池需要分配1000个线程来执行。关于线程池的原理可以查看《线程池源码精讲》

image.png

阻塞队列中的方法

添加元素

  • add -> 如果队列满了,抛出异常
  • offer -> true/false , 添加成功返回true,否则返回false
  • put -> 如果队列满了,则一直阻塞
  • offer(timeout) -> 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout时长,超过阻塞时长,返回false。

移除元素

  • element-> 队列为空,抛异常
  • peek -> true/false , 移除成功返回true,否则返回false
  • take -> 一直阻塞
  • poll(timeout) -> 如果超时了,还没有元素,则返回null

实现生产者消费者

上面用Condition中的await和signal其实是就是在手写阻塞队列,但是我们可以不用这么麻烦,直接用java中封装好的阻塞队列,这些阻塞队列的源码点进去看其实用的就是await和signal,只不过封装好的代码比我们自己写的代码要健壮。我们这里用的是阻塞队列提供的put()和take()方法,代码如下:

public class BlockingQueueProducerConsumer<T> {
    
    //阻塞队列,长度是5
    BlockingQueue<T> blockingQueue = new ArrayBlockingQueue(5);


    //生产者,调阻塞队列的put方法
    public void put(T item) throws InterruptedException {
        blockingQueue.put(item);
        System.out.println(Thread.currentThread().getName() + "生产元素" + item + ",队列里还剩" + blockingQueue.size() + "个元素");
    }


    //消费者,调阻塞队列的take方法
    public T take() throws InterruptedException {
        T item = blockingQueue.take();
        System.out.println(Thread.currentThread().getName() + "消费元素" + item + ",队列里还剩" + blockingQueue.size() + "个元素");
        return item;
    }


    public static void main(String[] args) throws InterruptedException {
        BlockingQueueProducerConsumer<String> bqpc = new BlockingQueueProducerConsumer<>();
        //生产者线程,十轮
        Thread t1 = new Thread(() -> {
            Random random = new Random();
            for (int i = 1; i <= 10; i++) {
                //元素
                String item = "item-" + i;
                try {
                    //如果队列满了,put会阻塞
                    bqpc.put(item);
                    //随机休息
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        //让生产者启动生产数据
        Thread.sleep(1000);
        Thread t2 = new Thread(() -> {
            Random random = new Random();
            //自旋
            for (; ; ) {
                try {
                    //消费元素
                    String item = bqpc.take();
                    //随机休息
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t2.start();
    }
}

运行一下: image.png

我们可以看到用java中自带的阻塞队列实现生产者消费者是最简单的,两行代码搞定,但是另外两种写法也得懂,主要是线程间的通信,毕竟要知其然知其所以然,最后感谢大家收看~