java并发编程系列:生产者-消费者模式

1,578 阅读7分钟

标签: 「我们都是小青蛙」公众号文章

目光从厕所转到饭馆,一个饭馆里通常都有好多厨师以及好多服务员,这里我们把厨师称为生产者,把服务员称为消费者,厨师和服务员是不直接打交道的,而是在厨师做好菜之后放到窗口,服务员从窗口直接把菜端走给客人就好了,这样会极大的提升工作效率,因为省去了生产者和消费者之间的沟通成本。从java的角度看这个事情,每一个厨师就相当于一个生产者线程,每一个服务员都相当于一个消费者线程,而放菜的窗口就相当于一个缓冲队列,生产者线程不断把生产好的东西放到缓冲队列里,消费者线程不断从缓冲队列里取东西,画个图就像是这样:

image_1c216f3an1cbj1v1j1hp83o01a6a9.png-50.3kB

现实中放菜的窗口能放的菜数量是有限的,我们假设这个窗口只能放5个菜。那么厨师在做完菜之后需要看一下窗口是不是满了,如果窗口已经满了的话,就在一旁抽根烟等待,直到有服务员来取菜的时候通知一下厨师窗口有了空闲,可以放菜了,这时厨师再把自己做的菜放到窗口上去炒下一个菜。从服务员的角度来说,如果窗口是空的,那么也去一旁抽根烟等待,直到有厨师把菜做好了放到窗口上,并且通知他们一下,然后再把菜端走。

我们先用java抽象一下

public class Food {
    private static int counter = 0;

    private int i;  //代表生产的第几个菜

    public Food() {
        i = ++counter;
    }

    @Override
    public String toString() {
        return "第" + i + "个菜";
    }
}

每次创建Food对象,字段i的值都会加1,代表这是创建的第几道菜。

为了故事的顺利进行,我们首先定义一个工具类:

class SleepUtil {

    private static Random random = new Random();
    
    public static void randomSleep() {
        try {
            Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

SleepUtil的静态方法randomSleep代表当前线程随机休眠一秒内的时间。

然后我们再用java定义一下厨师:

public class Cook extends Thread {

    private Queue<Food> queue;

    public Cook(Queue<Food> queue, String name) {
        super(name);
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            SleepUtil.randomSleep();    //模拟厨师炒菜时间
            Food food = new Food();
            System.out.println(getName() + " 生产了" + food);
            synchronized (queue) {
                while (queue.size() > 4) {
                    try {
                        System.out.println("队列元素超过5个,为:" + queue.size() + " " + getName() + "抽根烟等待中");
                        queue.wait();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                queue.add(food);
                queue.notifyAll();
            }
        }
    }
}

我们说每一个厨师Cook都是一个线程,内部维护了一个名叫queue的队列。在run方法中是一个死循环,代表不断的生产Food。他每生产一个Food后,都要判断queue队列中元素的个数是不是大于4,如果大于4的话,就调用queue.wait()等待,如果不大于4的话,就把创建号的Food对象放到queue队列中,由于可能多个线程同时访问queue的各个方法,所以对这段代码用queue对象来加锁保护。当向队列添加完刚创建的Food对象之后,就可以通知queue这个锁对象关联的等待队列中的服务员线程们可以继续端菜了。

然后我们再用java定义一下服务员:

class Waiter extends Thread {

    private Queue<Food> queue;

    public Waiter(Queue<Food> queue, String name) {
        super(name);
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            Food food;
            synchronized (queue) {
                while (queue.size() < 1) {
                    try {
                        System.out.println("队列元素个数为: " + queue.size() + "," + getName() + "抽根烟等待中");
                        queue.wait();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                food = queue.remove();
                System.out.println(getName() + " 获取到:" + food);
                queue.notifyAll();
            }

            SleepUtil.randomSleep();    //模拟服务员端菜时间
        }
    }
}

每个服务员也是一个线程,和厨师一样,都在内部维护了一个名叫queue的队列。在run方法中是一个死循环,代表不断的从队列中取走Food。每次在从queue队列中取Food对象的时候,都需要判断一下队列中的元素是否小于1,如果小于1的话,就调用queue.wait()等待,如果不小于1的话,也就是队列里有元素,就从队列里取走一个Food对象,并且通知与queue这个锁对象关联的等待队列中的厨师线程们可以继续向队列里放入Food对象了。

在厨师和服务员线程类都定义好了之后,我们再创建一个Restaurant类,来看看在餐馆里真实发生的事情:

public class Restaurant {

    public static void main(String[] args) {

        Queue<Food> queue = new LinkedList<>();
        new Cook(queue, "1号厨师").start();
        new Cook(queue, "2号厨师").start();
        new Cook(queue, "3号厨师").start();
        new Waiter(queue, "1号服务员").start();
        new Waiter(queue, "2号服务员").start();
        new Waiter(queue, "3号服务员").start();
    }
}

我们在Restaurant中安排了3个厨师和3个服务员,大家执行一下这个程序,会发现在如果厨师生产的过快,厨师就会等待,如果服务员端菜速度过快,服务员就会等待。但是整个过程厨师和服务员是没有任何关系的,它们是通过队列queue实现了所谓的解耦。

这个过程虽然不是很复杂,但是使用中还是需要注意一些问题:

  • 我们这里的厨师和服务员使用同一个锁queue

    使用同一个锁是因为对queue的操作只能用同一个锁来保护,假设使用不同的锁,厨师线程调用queue.add方法,服务员线程调用queue.remove方法,这两个方法都不是原子操作,多线程并发执行的时候会出现不可预测的结果,所以我们使用同一个锁来保护对queue这个变量的操作,这一点我们在唠叨设计线程安全类的时候已经强调过了。

  • 厨师和服务员线程使用同一个锁queue的后果就是厨师线程和服务员线程使用的是同一个等待队列。

    但是同一时刻厨师线程和服务员线程不会同时在等待队列中,因为当厨师线程在wait的时候,队列里的元素肯定是5,此时服务员线程肯定是不会wait的,但是消费的过程是被锁对象queue保护的,所以在一个服务员线程消费了一个Food之后,就会调用notifyAll来唤醒等待队列中的厨师线程们;当消费者线程在wait的时候,队列里的元素肯定是0,此时厨师线程肯定是不会wait的,生产的过程是被锁对象queue保护的,所以在一个厨师线程生产了一个Food对象之后,就会调用notifyAll来唤醒等待队列中的服务员线程们。所以同一时刻厨师线程和服务员线程不会同时在等待队列中。

  • 在生产和消费过程,我们都调用了SleepUtil.randomSleep();

    我们这里的生产者-消费者模型是把实际使用的场景进行了简化,真正的实际场景中生产过程和消费过程一般都会很耗时,这些耗时的操作最好不要放在同步代码块中,这样会造成别的线程的长时间阻塞。如果把生产过程和消费过程都放在同步代码块中,也就是说在一个厨师炒菜的同时不允许别的厨师炒菜,在一个服务员端菜的同时不允许别的服务员端菜,这个显然是不合理的,大家需要注意这一点。

以上就是wait/notify机制的一个现实应用:生产者-消费者模式的一个简介。

题外话

写文章挺累的,有时候你觉得阅读挺流畅的,那其实是背后无数次修改的结果。如果你觉得不错请帮忙转发一下,万分感谢~ 这里是我的公众号,里边有更多技术干货,时不时扯一下犊子,欢迎关注:

小册

另外,作者还写了一本MySQL小册:《MySQL是怎样运行的:从根儿上理解MySQL》的链接 。小册的内容主要是从小白的角度出发,用比较通俗的语言讲解关于MySQL进阶的一些核心概念,比如记录、索引、页面、表空间、查询优化、事务和锁等,总共的字数大约是三四十万字,配有上百幅原创插图。主要是想降低普通程序员学习MySQL进阶的难度,让学习曲线更平滑一点~ 有在MySQL进阶方面有疑惑的同学可以看一下: