并发编程实战(五):构建块

439 阅读12分钟

0 前言

1 同步容器

1.1 同步容器中出现的问题

1.2 迭代器和ConcurrentModificationException

ConcurrentModificationException也可能出现在单线程代码中;当对象并非通过Iterator.remove,而是直接从容器中删除时,就会出现这种情况。

public class Test {
    public static void main(String[] args)  {
        ArrayList list = new ArrayList();
        list.add(2);
        Iterator iterator = list.iterator();
        while(iterator.hasNext()){
            Integer integer = iterator.next();
            if(integer==2)
                list.remove(integer);
        }
    }
}

1.21 解决办法

因此一般有2种解决办法:Java ConcurrentModificationException异常原因和解决办法

  • 1.在使用iterator迭代的时候使用synchronized或者Lock进行同步;                           
  • 2使用并发容器CopyOnWriteArrayList代替ArrayList和Vector。

1.3 隐藏迭代器

2 并发容器

  • 1.使用ConcurrentHashMap,来替代同步的哈希 Map实现;
  • 2.当多数操作为读取时,CopyOnWriteArrayList是List相应的同步实现。
  • 3.Queue用来临时保存正在等待被进一步处理的一系列元素(LinkList其实就是实现了Queue)。
  • 4.BlockingQueue扩展了Queue增加了可阻塞的插入和获取操作。如果队列是空的,一个获取操作会一直阻塞直到队列中存在可用元素;如果队列是满的,插入操作会一直阻塞到队列存在可用空间。阻塞队列在生产者-消费者设计中非常有用,第3节会有更详细介绍。
  • 5.java 6同样新增了两个容器类型Deque和BlockingDeque,他们分别扩展了queque和BlockingQueue。
  • 2.1 ConcurrentHashMap

  • 2.1.1 弱一致性

  • 2.1.2 附加的原子操作

  • 2.3 CopyOnWriteArrayList

CopyOnWriteArrayList使用了一种叫写时复制的方法,当有新元素添加到CopyOnWriteArrayList时,先从原有的数组中拷贝一份出来,然后在新的数组做写操作,写完之后,再将原来的数组引用指向到新数组。

  • CopyOnWriteArrayList的整个add操作都是在锁的保护下进行的,这样做是为了避免在多线程并发add的时候,复制出多个副本出来,把数据搞乱了,导致最终的数组数据不是我们期望的。
  • remove,删除元素,很简单,就是判断要删除的元素是否最后一个,如果最后一个直接在复制副本数组的时候,复制长度为旧数组的length-1即可;但是如果不是最后一个元素,就先复制旧的数组的index前面元素到新数组中,然后再复制旧数组中index后面的元素到数组中,最后再把新数组复制给旧数组的引用。

当对容器迭代操作的频率远远高于对容器的修改时,使用“写入时复制”容器是个合理的选择。

2.3.1 优点

  • 1提供了更好的并发性,避免了在迭代期间对容器进行加锁和复制。
  • 2“读写操作”容器的返回迭代器不会抛出ConcurrentModificationException,并且返回的参数严格与迭代器创建的时相一致,不会考虑后续的修改。

2.3.1 缺点

  • 1 由于写操作的时候,需要拷贝数组,会消耗内存
  • 2不能用于实时读的场景,像拷贝数组、新增元素都需要时间,所以调用一个set操作后,读取到数据可能还是旧的,虽然CopyOnWriteArrayList 能做到最终一致性,但是还是没法满足实时性要求
  • 2.3 阻塞队列和生产者-消费者模式

2.3.1 BlockingQueueBlockingQueue深入解析

  • 1、ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.
  • 2、 LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的
  • 3、PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.
  • 4、SynchronousQueue:队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。

注意:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用作指示poll 操作失败的警戒值.

2.3.1 生产者-消费者模式

生产者和消费者是围绕阻塞队列展开设计,生产者把数据放进队列,不用考虑消费者的消费能力,甚至可以根本没有消费者。类似的,消费者也不需要知道生产者是谁,只要从队列中获取数据即可。 阻塞队列可以使用任意数量的生产者和消费者。 简化了开发,因为它解除了生产者类和消费者类之间相互依赖的代码;解耦不同速度的生产,消费等活动。

2.3.2 双端队列和窃取工作模式

Deque 是 Double ended queue (双端队列) 的缩写,读音和 deck 一样,蛋壳。 Deque 继承自 Queue,直接实现了它的有,ArrayDeque,llist ,ConcurrentLinkedDeque 等。 Deque 支持容量受限的双端队列,也支持大小不固定的。一般双端队列大小不确定。 Deque 接口定义了一些从头部和尾部访问元素的方法。比如分别在头部、尾部进行插入、删除、获取元素。和 Queue 类似,每个操作都有两种方法,一种在异常情况下直接抛出异常奔溃,另一种则不会抛异常,而是返回特殊的值,比如 false, null。

  • 窃取工作:每个消费者都有一个自己的双端队列。如果一个消费者完成了自己的双端队列中的全部工作,它可以偷取其他双端队列中 末尾 的任务

优点:工作的线程不会竞争一个共享的任务队列,所以该模式更具有可伸缩性; 减少竞争,即使要从其他双端队列中获取任务,也是从 末尾 获取,不会与原本线程发生竞争 解决消费者与生产者同体的问题:当一个消费者线程在执行任务的过程中,发现了更多的的任务,就把新发现的任务放到自己队列的末尾(或者其他工作者的队列中)例如:网络爬虫在处理一个页面时,发现了更多新的页面。

2.4 阻塞和可中断方法

1.线程可能在执行过程中阻塞或者暂停执行,例如等待IO结束,等待获得一个锁,等待从Thread.sleep方法中醒来,或者等待另一个线程的计算结果。

2.阻塞方法;当一个方法能够抛出检查的InterruptedException时,是在告诉你这个方法是一个可阻塞方法。BlockingQueue的put和take方法抛出InterruptedException,还有类似的方法如Thread.sleep会抛出InterruptedException异常。如果这个方法被中断,他将努力提前结束阻塞状态。

3.中断是一种协作机制,一个线程不能强制要求其他线程停止正在执行的操作而去执行其他操作。当线程A中断线程B时,A只是要求B在执行到某个可以暂停的地方停止正在执行的操作。但是实际怎样处理中断是由线程B自己决定的。所以在,类中调用阻塞方法时,需要添加中断处理。

4.什么时候会发生中断:

  • 点击某个桌面应用中的取消按钮时;
  • 某个操作超过了一定的执行时间限制需要中止时;
  • 多个线程做相同的事情,只要一个线程成功其它线程都可以取消时;
  • 一组线程中的一个或多个出现错误导致整组都无法继续时;
  • 当一个应用或服务需要停止时。

5.中断处理策略:

  • 传递InterruptedException:将中断处理交给方法的调用者,包括根本不捕获异常或者捕获异常后进行简单的处理之后再抛出异常。
  • 恢复中断:有时候不能抛出InterruptedException,例如在Runnable中,这时需要捕获异常并且恢复异常。
  • 屏蔽中断:中断发生后捕获异常但是什么都不做。 6.中断方法
  • 1、public void interrupt():中断线程
  • 2、public static boolean interrupted():查询当前线程中断状态,如果已经中断,就清除中断。
  • 2、public boolean isInterrupted():查询当前线程中断状态,但是不会改变中断状态。

2.5 synchronizer

2.51 闭锁

  • 1、什么是闭锁?

闭锁(latch)是一种Synchronizer,可以延迟线程的进度直到线程到达终止状态。一个闭锁工作起来就像是一道大门:直到闭锁达到终点状态之前,门一直是关闭的,没有线程能够通过,在终点状态到来的时候,所有线程都可以通过。

  • 2、闭锁的实现

CountDownLatch是一个同步辅助类,存在于java.util.concurrent包下,灵活的实现了闭锁,它允许一个或多个线程等待一个事件集的发生。CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数的值就会减1。当计数器值到达0时,它所表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

  • 3、应用场景   

闭锁可以用来确保特定活动直到其他的活动都完成后才开始发生,比如:

  • 确保一个计算不会执行,直到它所需要的资源被初始化
  • 确保一个服务不会开始,直到它依赖的其他服务都已经开始
  • 等待,直到活动的所有部分都为继续处理做好充分准备
  • 死锁检测,可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁

2.5.2 futureTask

先上一个场景:假如你突然想做饭,但是没有厨具,也没有食材。网上购买厨具比较方便,食材去超市买更放心。 实现分析:在快递员送厨具的期间,我们肯定不会闲着,可以去超市买食材。所以,在主线程里面另起一个子线程去网购厨具

Java现在的多线程机制,核心方法run是没有返回值的;如果要保存run方法里面的计算结果,必须等待run方法计算完,无论计算过程多么耗时。

面对这种尴尬的处境,程序员就会想:在子线程run方法计算的期间,能不能在主线程里面继续异步执行。

Callable onlineShopping = new Callable() {

        @Override
        public Chuju call() throws Exception {
            System.out.println("第一步:下单");
            System.out.println("第一步:等待送货");
            Thread.sleep(5000);  // 模拟送货时间
            System.out.println("第一步:快递送到");
            return new Chuju();
        }
        
    };
    FutureTask<Chuju> task = new FutureTask<Chuju>(onlineShopping);
    new Thread(task).start();

把Callable实例当作参数,生成一个FutureTask的对象,然后把这个对象当作一个Runnable,作为参数另起线程。

FutureTask提供以下方法

  • get方法:获取计算结果(如果还没计算完,也是必须等待的)

  • cancel方法:还没计算完,可以取消计算过程

  • isDone方法:判断是否计算完

  • isCancelled方法:判断计算是否被取消

2.5.3 信号量

Semaphore可以控制某个资源可被同时访问的个数,提供了同步机制,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。 单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。

2.5.4 关卡 CyclicBarrier

关卡类似于闭锁,它们都能阻塞一组线程,直到某些事件发生。 关卡和闭锁关键的不同在于,所有线程必须同时到达关卡点,才能继续处理。闭锁等待的是事件,关卡等待的是其他线程。

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

当线程到达关卡点时,调用await方法,await会被阻塞,直到所有的线程都到达关卡点。如果所有的线程都到达了关卡点,关卡就会被突破,这样所有的线程都被释放,关卡会重置以备下一次使用。如果对await的方法调用超时,或者阻塞中的线程被中断,那么关卡就被认为是失败的,所有对await未完成的调用都通过BrokenBarrierException终止。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

2.5.5 为计算结果建立高效可伸缩缓存