Java实现生产者和消费者模式(多线程)

2,973 阅读4分钟

一、生产者和消费者模式描述 

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者从空间里取走数据。

二、存在的问题 

  • 当生产者比消费者快时,消费者会漏掉一部分数据; 
  • 当消费者比生产者快时,消费者会取走相同的数据。 
解决方式:要考虑线程安全的问题,解决此问题有三种方式 

同步代码块 
同步方法  
lock锁机制, 通过创建Lock对象,采用lock()加锁,unlock()解锁,来保护指定的代码块

此文采用第二种方式解决了线程安全问题 

三、wait()和notify()方法 

wait()、nofity()这两个方法必须有锁对象调用,而任意对象都可以作为 synchronized 的同步锁,因此这三个方法只能在Object 类中声明 。  

wait():当缓冲区已满/空时,生产者或消费者线程停止自己的执行,释放锁,使自己处于等待状态,让其它线程执行。 

notify():当生产者或消费者向缓冲区放入或取出一个产品时,向其他等待的线程发出通知,同时释放锁,使自己处于等待状态,让其它线程执行。 

四、生产者和消费者模式中涉及到的类 

商店类(Shop):定义一个成员变量,表示第几个面包,提供生产面包和消费面包的操作; 
生产者类(Producer):实现Runnable接口,重写run()方法,调用生产面包的操作; 
消费者类(Consumer):实现Runnable接口,重写run()方法,调用消费面包的操作; 
测试类: 

五、代码实现 

/**
 * 商店类(Shop):定义一个成员变量,表示第几个面包,提供生产面包和消费面包的操作;
 */
class Shop {
    private int bread = 0;
    /**
     * 生产面包
     */
    public synchronized void produceBread() {
        if (bread < 10) {
            bread++;
            System.out.println(Thread.currentThread().getName() + ":开始生产第" + bread + "个面包");
            notify(); // 唤醒消费者线程
        } else {
            try {
                wait(); // 告诉生产者等待一下
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    /**
     * 消费面包
     */
    public synchronized void consumeBread() {
        if (bread > 0) {
            System.out.println(Thread.currentThread().getName() + ":开始消费第" + bread + "个面包");
            bread--;
            notify(); // 唤醒生产者线程
        } else {
            try {
                wait(); // 告诉消费者等待一下
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 生产者类(Producer):实现Runnable接口,重写run()方法,调用生产面包的操作
 */
class Producer extends Thread {
    private Shop shop;

    public Producer(Shop shop) {
        this.shop = shop;
    }

    @Override
    public void run() {
        System.out.println(getName() + ":开始生产面包.....");
        while (true) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            shop.produceBread();
        }
    }
}

/**
 * 消费者类(Consumer):实现Runnable接口,重写run()方法,调用消费面包的操作
 */
class Consumer extends Thread {
    private Shop shop;
    public Consumer(Shop shop) {
        this.shop = shop;
    }
    
    @Override
    public void run() {
        System.out.println(getName() + ":开始消费面包.....");
        while (true) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            shop.consumeBread();
        }
    }
}

public class BreadTest {
    public static void main(String[] args) {
        // 创建商店对象
        Shop shop = new Shop();
        // 创建生产者对象,把商店对象作为构造方法参数传入生产者对象中
        Producer p1 = new Producer(shop);
        p1.setName("生产者");
        // 创建消费者对象,把商店对象作为构造方法参数传入消费者对象中
        Consumer c1 = new Consumer(shop);
        c1.setName("消费者");

        p1.start();
        c1.start();
        
    }
}

代码执行结果如下: 

消费者:开始消费面包.....
生产者:开始生产面包.....
生产者:开始生产第1个面包
生产者:开始生产第2个面包
生产者:开始生产第3个面包
生产者:开始生产第4个面包
消费者:开始消费第4个面包
生产者:开始生产第4个面包
生产者:开始生产第5个面包
生产者:开始生产第6个面包
生产者:开始生产第7个面包
生产者:开始生产第8个面包
消费者:开始消费第8个面包
生产者:开始生产第8个面包
生产者:开始生产第9个面包
生产者:开始生产第10个面包
消费者:开始消费第10个面包
生产者:开始生产第10个面包
消费者:开始消费第10个面包
生产者:开始生产第10个面包
消费者:开始消费第10个面包
生产者:开始生产第10个面包
消费者:开始消费第10个面包

...

最后

欢迎大家有兴趣的可以关注我的公众号【java小瓜哥的分享平台】,文章都会在里面更新,还有各种java的资料都是免费分享的。希望与大家一起进步努力!