查漏补缺-Java多线程

1,707 阅读16分钟

线程

线程和进程的区别

进程是一个可执行的程序,是系统资源分配的基本单位。线程是进程内部相对独立的可执行单元,是操作系统进行任务调度的基本单位。

多线程的优缺点

优点:

  • 充分利用多核多cpu的资源,提高cpu的使用率,提高了程序的运行效率。 缺点:
  • 线程数过多会影响性能,操作系统会在线程切换之间增加内存的开销。
  • 存在线程同步和安全问题
  • 可能产生死锁
  • 增加了开发人员的技术难度

线程有几种状态

一共五种状态:分别是新建,就绪,运行,阻塞和死亡状态。详细见下图:

  • 新建状态:当用new创建一个线程时,线程还没有开始运行,此时线程处于新建状态。处于新建状态的线程还没有开始运行。
  • 就绪状态:一个新建的线程并不会自动运行,要执行线程,要手动调用线程的start()方法,当start()方法返回后,线程就处于就绪状态,等待处理器的调度。
  • 运行状态:当线程获取了CPU的时间后,它才进入运行状态,真正的执行run()方法里的内容。
  • 阻塞状态:线程运行过程中,可能因为各种原因进入阻塞状态:比如调用sleep()进入休眠状态;调用一个在IO上被阻塞的操作,即该操作在输入输出操作完成之前不会返回到它的调用者;等待获取锁被阻塞;线程在等待其他的触发条件。所谓的阻塞状态就是正在运行的线程没有运行结束,暂时让出CPU资源。
  • 死亡状态:有两个原因会导致线程死亡:run()方法正常结束;一个未捕获的异常终止了run()方法而导致线程猝死。

经典题:一个线程OOM了,其他线程是否还能运行?

答案是还能运行。虽然说堆是线程共享的区域,一个线程堆抛出OOM异常,你可能会觉得其他线程也会抛出OOM异常。但其实不然,当一个线程抛出OOM异常后,它所占据的内存会全部释放掉,从而不会影响其他线程的运行。 另外如果主线程异常了,子线程还能运行吗?这个问题也是可以运行的。线程不像进程,一个进程之间的线程之间是没有父子之分的,都是平级关系。即线程都是一样的,退出了一个不会影响另外一个。

创建线程的几种方法

  • 继承Thread类,重写run()方法,利用Thread.start()启动线程。
  • 实现Runnable接口,重写run()方法,通过new Thread(Runnable a)创建线程,并调用start()方法启动线程。
  • 通过callable和futuretask创建线程,实现callable接口,重写call方法,利用future对象包装callable实例,通过new Thread方法创建线程。
  • 通过线程池创建线程。

sleep和wait方法的区别

  • wait只能在synchronized中调用,属于对象级别的方法,sleep不需要,属于Thread的方法
  • 调用wait方法会释放锁,sleep不会释放锁
  • wait超时之后线程进入就绪状态,等待获取cpu继续执行。

yield和join区别

  • yield会释放cpu资源,不会释放锁,让当前线程进入就绪状态,只能使同优先级或更高优先级的线程有执行的机会。
  • join会释放cpu资源和锁,底层是wait()方法实现的,join会等待调用join方法的线程执行完成之后再继续执行。

死锁

什么叫死锁

死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象。若无外力作用,他们都将无法进行运行下去。

死锁产生的原因

  • 系统资源的竞争:系统中拥有不可剥夺的资源,其数量不足以满足多个线程运行的需要,使得线程在运行过程中因为争夺资源而陷入僵局。
  • 线程推进顺序非法:线程在获得一个锁L1的情况下再去申请另外一个锁L2,也就是在没有释放锁L1的情况下,又去申请锁L2,这个是产生死锁最根本的原因。

死锁产生的必要条件

  • 互斥条件:进程要求对所分配的资源在一段时间内只能由一个进程所拥有。
  • 不可剥夺条件:资源在进程未使用完成之前,不能被其他进程夺走,除非是主动释放。
  • 请求和保持条件:进程已经保持了至少一个资源,但是又申请新的资源,但是该资源已经被其他进程所拥有,这就陷入了死循环。
  • 循环等待条件:总的来说,就是进程资源循环等待,A拥有资源1,B拥有资源2,同时A在没有释放资源1的情况下又去申请资源2,B在没有释放资源2的情况下又去申请资源1。

如何避免死锁

  • 加锁顺序要合理:线程要按照一定的顺序加锁
  • 加锁时限要适当:线程尝试获取锁的时候要加上一定时限,超时就要放弃锁请求,并释放自己占有的锁。不能无限等待。
  • 死锁检测

ThreadLocal

ThreadLocal是什么

  • ThreadLocal是一个关于创建线程局部变量的类,主要作用是存储线程的局部变量,做到数据隔离。保存在ThreadLocal中的数据只属于当前线程,对其他线程是不可见的。在多线程环境下,可以防止自己的变量被其他线程修改而导致数据不一致性。
  • ThreadLocal设计的目的就是为了能够使得当前线程拥有属于自己的变量,并不是为了解决并发或者共享变量的问题
  • 底层使用ThreadLocalMap实现,每个线程都拥有自己的ThreadLocalMap,内部是继承了WeakReference的Entry数组,包含的Key为ThreadLocal,值为Object。

ThreadLocalMap的内存泄露问题

我们知道ThreadLocalMap使用ThreadLocal的弱引用作为key,如果一个ThreadLocal没有外部强引用时,在GC的时候,这个ThreadLocal必然会被回收,但是对应的value不会被回收掉直到线程结束才会被回收。如果当前线程一直处于运行中,那么这些Entry对象中的value就可能一直无法回收,就会发生内存泄露。实际开发中,会使用线程池维护线程的创建和复用,线程为了复用是不会主动结束的,那么ThreadLocal设置的value值就一直被引用,就会发生内存泄露。

线程池

概念

  • 简单理解,就是一个管理线程的池子。
  • 它帮我们管理线程,避免增加创建线程和销毁线程的资源损耗。因为线程本身也是一个对象,创建需要经过类加载,回收也要GC回收。
  • 它提高了响应速度。如果任务到达了,相对于从线程池拿线程,重新创建线程执行要慢很多。
  • 重复利用。线程用完,再返回池子,可以达到重复利用的效果,节省资源。

创建线程池

线程池可以通过ThreadPoolExecutor来创建,看下构造函数:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
   BlockingQueue<Runnable> workQueue,
   ThreadFactory threadFactory,
   RejectedExecutionHandler handler) 

几个核心参数的作用:

  • corePoolSize:核心线程的数量
  • maximumPoolSize:线程池中最大的线程数量
  • keepAliveTime:线程池中非核心线程空闲的存活时间
  • TimeUnit:线程空闲存活时间的时间单位
  • workQueue:存放任务的阻塞队列
  • threadFactory:用于创建核心线程的线程工厂,可以给创建的线程自定义名字,方便查日志
  • handler:线程池的饱和策略(拒绝策略),有四种类型。

线程池的执行流程

文字表述就是:

  • 提交一个任务,如果线程池里存活的核心线程数<corePoolSize,线程池会创建一个核心线程去执行任务。
  • 如果线程池核心线程数已满,即线程数已经等于coolPoolSize,会把任务先塞到阻塞队列中。
  • 如果线程池里核心线程数都忙着干其他事,并且任务队列也满了塞不下了,别急还没到执行拒绝策略的时候,判断线程数是否达到了最大线程数maximumPoolSize,如果没达到,好说,创建一个非核心的线程执行你的任务,否则的话只能对你的任务执行拒绝策略了。

其实线程池为了执行你的任务也是操碎了心有木有,先用核心线程执行你的任务,核心线程没空,把你加到核心线程的任务队列里(核心线程空下来就执行你的任务)。核心线程的排期也满了,不能再加了,还是想办法在最大线程数的范围内创建非核心的线程去执行你的任务,如果你的任务太多,那不好意思,只能用拒绝策略招待你了。

四种拒绝策略:

  • AbortPolicy:抛出一个异常,默认的拒绝策略
  • DiscardPolicy:直接丢弃任务
  • DiscardOldestPolicy:丢弃队列里最老的任务,将当前这个任务继续提交给线程池。
  • CallerRunsPolicy:交给线程池调用所在的线程进行处理。

线程池的工作队列

ArrayBlockingQueue

有界队列,是一个用数组实现的有界阻塞队列,按FIFO排序。

LinkedBlockingQueue

基于链表实现的阻塞队列,按FIFO排序任务,容量可以设置,不设置的话将是一个无边界的阻塞队列(最大长度是Integet.Max_VLAUE),吞吐量通常要高于ArrayBlockingQueue;newFixedThreadPool线程池使用了这个队列。

DelayQueue

DelayQueue是一个任务定时周期延迟执行的队列。根据指定的执行从小到大排序,否则根据插入到队列的先后顺序。newScheduledThreadPool线程池使用了这个队列。

PriorityBlockingQueue

优先级队列是具有优先级的无界阻塞队列。

SynchronousQueue

同步队列,一个不存储元素的阻塞队列,每个插入操作都必须等待另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。吞吐量通常要高于LinkedBlockingQueue,newCachedThreadPool使用了这个队列。

几种常用的线程池

  • newFixedThreadPool(固定数目线程的线程池,内部使用LinkedBlockingQueue)
  • newCachedThreadPool(可缓存线程的线程池,内部使用SynchronousBlockingQueue)
  • newSingleThreadPool(单线程的线程池,内部使用LinkedBlockingQueue)
  • newScheduledThreadPool(定时及周期性执行的线程池,内部使用DelayQueue)

newFixedThreadPool

  public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

线程池特点

  • 核心线程数量=最大线程数量
  • 没有所谓的非空闲时间,即keepAliveTime=0
  • 阻塞队列为无界队列LinkedBlockingQueue 提交任务的执行流程参照ThreadPoolExecutor及newFixedThreadPool自行得出。此处有个问题:使用无界队列的线程池会导致内存飙升吗?答案是会的。因为如果提交的任务很多,超过了newFixedThreadPool设置的核心线程数,会一直往无界队列里塞任务,任务累积越来越多,导致机器内存飙升,直到OOM。
    使用场景:newFixedThreadPool适用于处理CPU密集型的任务,确保CPU在长期工作线程使用的情况下,尽可能少的分配线程。

newCachedThreadPool

   public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

线程池特点:

  • 核心线程数为0,最大线程树为Integer.Max_VALUE
  • 非核心线程空闲的存活时间为60s
  • 阻塞队列是SynchronousQueue

执行流程参照ThreadPoolExecutor和池特点自行得出。这个池也有一个问题:当提交任务的数量大于处理任务的数量时,每次提交一个任务必然会创建一个非核心线程,极端情况下会创建过多的线程(最大Integer.Max_VALUE),耗尽CPU和内存资源。当然如果没有任务了,这些空闲的非核心线程在存活60s后被回收。 使用场景: 用于并发量大执行大量短期的小任务。

newSingleThreadExecutor

  public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

线程池特点:

  • 核心线程数=最大线程数=1,也就是这个池子里从始最终只有一个活着的线程。
  • keepAliveTime=0,这个参数无效。
  • 阻塞队列是无界的LinkedBlockingQueue

提交任务的执行流程可自行得出。其实这个池子里干活的只有一个人,不管你往里塞多少任务,都是它按部就班的从队列里获取任务执行,适用于串行执行任务的情景,一个任务接一个任务的执行。

newScheduledThreadPool

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

线程池特点:

  • 核心线程数自定义,最大线程数为Integer.MAX_VALUE
  • keepAliveTime=0
  • 阻塞队列为DelayedWorkQueue 这里scheduleAtFixedRate和scheduleWithFixedDelay两个方法的区别如下:
  • scheduleAtFixedRate:按某种速率周期性的执行,不管上一个任务有没有执行结束。也就是说是从上一个任务开始执行算起的+一个周期作为下一个任务的开始执行时间。
  • scheduleWithFixedDelay:在某个延迟后执行,是要等上一个任务执行结束算起的。也就是说是从上一个任务执行结束时间+一个周期作为下一个任务的开始执行时间。

使用场景:周期性的执行任务的场景,做一些简单的定时调度。

线程池状态

线程池有这几个状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED

   //线程池状态
   private static final int RUNNING    = -1 << COUNT_BITS;
   private static final int SHUTDOWN   =  0 << COUNT_BITS;
   private static final int STOP       =  1 << COUNT_BITS;
   private static final int TIDYING    =  2 << COUNT_BITS;
   private static final int TERMINATED =  3 << COUNT_BITS;

RUNNING

  • 该状态的线程池会接收新任务,并处理阻塞队列中的任务
  • 调用shutdown()方法可以切换到SHUTDOWN状态
  • 调用shutdownNow()方法可以切换到STOP状态

SHUTDOWN

  • 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务
  • 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态

STOP

  • 该状态的线程池不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在执行中的任务
  • 线程池中执行的任务一旦变为空,进入TIDYING状态

TIDYING

  • 该状态表明所有的任务已经运行终止,记录的任务数量为0
  • terminated()执行完毕,进入TERMINATED状态

TERMINATED

该状态表明线程池彻底终止或死亡

四种并发工具类

这里按照使用频率简单说下四种并发工具类CountDownLatch,CyclicBarrier,Semaphore和Exchanger的区别和实现。

CountDownLatch和CyclicBarrier

这两太容易弄混了。

  • CountDownLatch具有计数器的功能,意思是主线程遇到CountDownLatch阻塞在那,要等待CountDownLatch里的所有线程都执行完毕,主线程才能继续执行。需要注意的是CountDownLatch创建的线程数和每个线程里countDown的总次数需要和初始化CountDownLatch传入的线程数相等,不然的话主线程将一直处于等待状态。

  • CyclicBarrier就是循环栅栏,意思是多个线程相互阻塞,只有多个线程都达到了栅栏时候,才能同时执行后续的逻辑。 区别就是CountDownLatch是一个线程(或多个)等待另外的N个线程完成某个事情后才能执行。CyclicBarrier是N个线程相互等待,任何一个线程达到栅栏前,所有线程都阻塞在那。

下面给两个Demo
一个老师(主线程)等10个学生(其他线程)都来教室了才开始上课。是CountdownLatch模型。

public class CountDownLatchDemo {

    public static final Integer STUDENT_COUNT = 10;

    public static CountDownLatch countDownLatch = new CountDownLatch(STUDENT_COUNT);

    public static void main(String [] args) {
        Thread teacher = new Thread(new Teacher());
        teacher.start();
        for (int i=0; i<STUDENT_COUNT; i++) {
            Thread student = new Thread(new Student());
            student.start();
        }
    }

    public static class Teacher implements Runnable {

        @Override
        public void run() {
            System.out.println("老师"+Thread.currentThread().getName()+"来了,等所有同学都到了才上课...");
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("所有同学都到齐了,开始上课...");
        }
    }

    public static class Student implements Runnable {

        @Override
        public void run() {
            System.out.println("同学"+Thread.currentThread().getName()+"进入了教室...");
            countDownLatch.countDown();
        }
    }
}

一个任务需要10个工人(10个线程)都到岗之后才能开始进行,否则只能等待,是CyclicBarrier模型。

public class CyclicBarrierDemo {
    
    public static final Integer WORKER_COUNT = 10;

    private static CyclicBarrier cyclicBarrier;
    
    public static void main(String [] args) {
        cyclicBarrier = new CyclicBarrier(WORKER_COUNT, new Runnable() {
            @Override
            public void run() {
                System.out.println(WORKER_COUNT+"个工人已就位,可以开始干活了...");
            }
        });
        for (int i=0; i<WORKER_COUNT; i++) {
            Thread worker = new Thread(new Worker());
            worker.start();
        }
    }

    public static class Worker implements Runnable {
        @Override
        public void run() {
            System.out.println("工人"+Thread.currentThread().getName()+"就位...");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Semaphore

Semaphore信号量,就是用来控制访问有限资源的线程数量,线程要访问资源首先要获得许可证,一个许可证对应一个资源,如果资源不足了,线程就要等待,如果其他线程释放了一个资源(许可证),那么信号量就通知等待的一个线程,分配给它一个许可证。内部实现这里不再赘述,下面是4个人看电影,但是电影院只有2个位置,也就是一次只能2个人在里面看(这里限制不能站着看),有人离开位置,后面的人才可以进来看电影。

public class SemaphoreDemo {

    public static final Integer SETS_NUMBER = 2;
    public static final Integer PERSON_NUMBER = 4;

    public static void main(String [] args) {
        WatchMovie watchMovie = new WatchMovie(SETS_NUMBER);
        for (int i=0; i<PERSON_NUMBER; i++) {
            Thread person = new Thread(new Person(watchMovie));
            person.start();
        }
    }

    public static class WatchMovie {

        private Semaphore semaphore;

        WatchMovie(int setCount) {
            semaphore = new Semaphore(setCount);
        }

        public void watchMovie() {
            try {
                //获取信号量
                semaphore.acquire();
                long time = (long) (Math.random()*10+1);
                System.out.println(Thread.currentThread().getName()+"看了"+time+"秒的电影");
                Thread.sleep(time);
                System.out.println(Thread.currentThread().getName()+"让出座位,离开了电影院");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放信号量
                semaphore.release();
            }

        }
    }

    public static class Person implements Runnable{

        private WatchMovie watchMovie;

        Person(WatchMovie watchMovie) {
            this.watchMovie = watchMovie;
        }

        @Override
        public void run() {
            watchMovie.watchMovie();
        }
    }
}

Exchanger

Exchanger理解其意思就行,就是线程间的数据交换。是怎么交换呢?实际上是共用一个Exchange的多个线程,在全部都到达栅栏的时候才可以进行数据的交换,否则的话先到达的线程只能等待其他的线程达到栅栏。那怎么才算到达栅栏呢,就是线程内部调用exchanger.exchange方法。然后这个方法的返回值就是其他线程交换的数据,参数就是当前线程想要交还给其他线程的数据。

三大并发容器

CopyOnWrite

JDK里提供了CopyOnWriteArrayList和CopyOnWriteArraySet,它的实现机制就是读写分离,简单的说就是任何时候都可以读,写需要加锁,写时复制。什么是写复制呢?就是对容器的修改加锁后,通过copy一个新的容器来进行修改,修改完毕后将容器替换为新的容器。 不贴源码了,自己去看吧。下面说下优缺点。

  • 优点很显然,可以并发读,而且解决了迭代过程多线程修改导致的并发问题,还记得那些年代码里抛出的CurrentModificationException吗?嘿嘿,现在知道为什么用CopyOnWrite可以解决了吧,因为多线程修改每次都是创建新的容器而不是操作的原容器。
  • 缺点也是伴随着优点,每次修改创建新的容器肯定要申请新的内存空间,高并发修改频繁的情况下,会造成频繁的GC。

ConcurrentHashMap

这个实在是太重要了!!!但是几百个字也说不清楚,这里只点到为止,说一下ConcurrentHashMap在1.7和1.8中的结构,具体的源码和实现原理以后有机会再整理。

1.7的ConcurrentHashMap

  • 在jdk1.7中,ConcurrentHashMap是由Segment+HashEntry数组的方式实现的。结构如上图所示,一个ConcurrentHashMap中包含了一个Segment数组,Segment数组和HashMap类似,是数组+链表结构,一个Segment又包含一个HashEntry数组,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组里的数据进行修改时,首先要获得对应的Segment的锁。
  • 这就是所谓的分段锁机制:首先将数据分成一段段存储,每一段叫一个Segment,然后给每段数据配一把锁,叫Segment锁。当一个线程想要访问某段Segment的数据,它需要获得锁,然而其他段的数据还能被其他线程访问,这在一定程度上降低了锁粒度。

1.8中的ConcurrentHashMap

jdk1.8的实现已经摈弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap。

ConcurrentSkipListMap

ConcurrentSkipListMap内部使用跳表的数据结构来实现,他的结构很简单。跳表就是一个多层的链表,底层是一个普通的链表,然后逐层减少,通常通过一个简单的算法实现每一层元素是下一层的元素的二分之一,这样当搜索元素时从最顶层开始搜索,可以说是另一种形式的二分查找。在理论上它的查找、插入和删除时间复杂度为log(N)。在并发上,ConcurrentSkipListMap采用无锁的CAS+自旋来控制。也就是说整个插入的过程都是通过CAS来实现的。下面看下一个三层的跳表添加元素的过程:

插入值为15的元素

插入后:

总结

多线程不仅有synchronized,volatile,lock,CAS和AQS等,还有最基本的线程以及线程池。建议在掌握基础的同时,多阅读源码理解实现原理,多在实践中把线程相关的东西用起来,那样才印象深刻。