多线程程序加速指南

1,705 阅读15分钟

虽然对于一个计算机程序来说最重要的是正确性,如果一个程序没办法产出正确的结果,那么这个程序的价值就大打折扣了。但程序性能也是很重要的一个方面,如果程序运行得太慢,那也会影响到程序的适用范围和硬件配置的成本。

在之前的文章《4.多线程中那些看不到的陷阱》中,我们了解了线程间的同步机制,这主要是为了保证程序在多线程环境下的正确性。在这篇文章中我们将会深入探究多线程程序的性能瓶颈和多种不同的优化方式,那么我们首先就从对程序性能的测量与分析开始吧。

分析多线程程序的性能

我们先来看一个使用AtomicLong进行多线程计数的程序,下面的程序中会启动两个线程,每个线程会对静态变量count进行一亿次(10的8次方)的累加操作,这段代码在开始和结束的时候都获取了当前时间,然后通过这两个时间值计算程序的运行耗时。

public class AtomicIntegerTest {

    private static AtomicLong count = new AtomicLong(0);

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        Runnable task = new Runnable() {
            public void run() {
                for (int i = 0; i < 1e8; ++i) {
                    count.incrementAndGet();
                }
            }
        };

        Thread t1 = new Thread(task);
        Thread t2 = new Thread(task);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("count = " + count);

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
    }

}

在我的电脑上运行这段程序最后输出的结果是2.44s,看起来有点长了,那么我们再看一下如果直接在单个线程中对一个整型变量累加两亿次会是什么结果。下面是一个在单个线程中累加两亿次的程序代码:

public class SingleThreadTest {

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        long count = 0;
        for (int i = 0; i < 2e8; ++i) {
            count += 1;
        }
        System.out.println("count = " + count);

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
    }

}

这段代码运行耗时只有0.33s,我们通过两个线程进行累加的代码竟然比单线程的代码还要慢得多,这不是就和我们使用多线程加快程序运行的初衷相违背了吗?

多线程程序的线程间同步是影响多线程程序性能的关键所在,一方面程序中必须串行化的部分会使系统整体的耗时显著增加,另一方面同步行为本身的开销也比较大,特别是在发生冲突的情况下。在上文的代码中,多线程累加的程序之所以会比单线程还慢得多就是因为在AtomicLong类型的静态变量count上有两个线程同时调用incrementAndGet方法进行累加,这就会导致在这个静态变量上存在很严重的冲突。

当一个线程成功修改了变量count的值后,另外一个正在修改的线程就会修改失败并且会再次重试累加操作。并且因为AtomicLong类型的对象中是用一个volatile变量来保存实际的整型值的,而我们在之前的文章《多线程中那些看不到的陷阱》中可以了解到,对volatile变量的修改操作一定要把修改后的数据从高速缓存写回内存当中,这也是用AtomicLong进行累加的耗时比单线程累加版本还要多这么多的主要原因。

那么我们有没有更好的方法可以解决这个问题呢?

使用任务拆分进行优化

在上面的例子中,我们需要的只是最终累加的结果,所以为了减小线程间同步的开销,我们可以将累加任务拆分到不同的线程中执行,到最后再把每个线程的结果加在一起就可以得到最终的结果了。在下面的代码中我们就使用了这种方法,t1在count1上累加一亿次,t2在count2上累加一亿次,最后把count1和count2相加得到最终的结果,我们来一起运行一下,看看效果如何。

public class TwoThreadTest {

    private static long count1 = 0;
    private static long count2 = 0;

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        Thread t1 = new Thread(new Runnable() {
            public void run() {
                for (int i = 0; i < 1e8; ++i) {
                    count1 += 1;
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            public void run() {
                for (int i = 0; i < 1e8; ++i) {
                    count2 += 1;
                }
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        long count = count1 + count2;
        System.out.println("count = " + count);

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
    }

}

这段程序在我的电脑上的耗时是0.20s,比之前单线程的0.33s有了不小的提高,更是遥遥领先原先用两个线程累加AtomicLong类型变量版本的2.44s。这说明我们之前的分析是正确的,CAS重试和volatile写回内存两个操作所引起的开销是AtomicLong版本程序性能低下的罪魁祸首。

但是这个版本的结构还是略显原始了,在应付累加这种简单的需求时可能还比较容易,但是一旦面临复杂的并发任务,那可能就要写很多复杂的代码,并且很容易出现错误了。例如我们如果想把任务拆分到10个线程中运行,那么我们就要首先把两亿次的累加任务拆分为10份,然后还要创建一个包含10个Thread对象的数组让他们分别的对不同范围进行累加,最后还要通过join方法等待这10个线程都执行完成,这个任务听起来就不太容易。没关系,下面我们将会介绍一种目前比较常用的任务拆分与运行框架来解决这个问题,通过这个框架我们可以很容易地写出易于编写和扩展的任务拆分式程序。

使用ForkJoinPool进行任务

JDK 1.7中引入了一个新的多线程任务执行框架,被称为ForkJoinPoolForkJoinPool是一个Java类,它实现了代表线程池功能的ExecutorService接口,所以它在使用方法上和常用的线程池类ThreadPoolExecutor相似,但在本节中我们并不需要了解线程池的详细用法,不过感兴趣的读者可以参考这篇文章从0到1玩转线程池来了解一下。

线程池就是一个线程的集合,其中的线程会一直等待执行任务,所以我们可以把任务以任务对象的形式提交到线程池,然后线程池就会利用其中的线程来执行任务。在ForkJoinPool的使用中,线程池指的就是ForkJoinPool类型的对象,而任务对象指的就是继承自ForkJoinTask的类的对象。在下面的示例代码中,我们使用了自定义的RecursiveTask的子类来作为任务类,RecursiveTask类就继承自ForkJoinTask类。

Recursive的意思是递归,也就是说我们在这个任务类的执行过程中可能会创建新的任务类对象来代表当前任务的子任务,然后通过结合多个子任务的结果来返回当前任务的结果。比如一开始的任务是累加两亿次,但那么我们就可以把它分为两个分别累加一亿次的子任务的结果之和,同样的道理,累加一亿次的子任务也可以再被分为两个累加五千万次的子子任务。这样的拆分会一直持续到我们认为任务规模已经足够小的时候,这时子任务的结果就会被计算,然后再返回给上层任务进行处理之后就得到上层任务的结果了。

如果前面这一段文字描述看不明白也没关系,我们在代码中找一找答案:

public class ForkJoinTest {

    private static class AccumulateTask extends RecursiveTask<Long> {

        private long start;
        private long end;
        private long threshold;

        /**
         * 任务的构造函数
         *
         * @param start         任务处理范围的起始点(包含)
         * @param end           任务处理范围的结束点(不包含)
         * @param threshold     任务拆分的阈值
         */
        public AccumulateTask(long start, long end, long threshold) {
            this.start = start;
            this.end = end;
            this.threshold = threshold;
        }

        @Override
        protected Long compute() {
            long left = start;
            long right = end;

            // 终止条件:如果当前处理的范围小于等于阈值(threshold),
            //                    那么就直接通过循环执行累加操作
            if (right - left <= (int) threshold) {
                long result = 0;
                for (long i = left; i < right; ++i) {
                    result += 1;
                }
                return result;
            }

            // 获取当前处理范围的中心点
            long mid = (start + end) / 2;

            // 拆分出两个子任务,一个从start到mid,一个从mid到end
            ForkJoinTask<Long> leftTask = new AccumulateTask(start, mid, threshold);
            ForkJoinTask<Long> rightTask = new AccumulateTask(mid, end, threshold);

            // 通过当前线程池运行两个子任务
            leftTask.fork();
            rightTask.fork();

            try {
                // 获取两个子任务的结果并返回
                return leftTask.get() + rightTask.get();
            } catch (Exception e) {
                return 0L;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        // 创建总任务,范围是从1到两亿(包含),阈值为10的7次方,所以最终至少会有10个任务进行for循环的累加
        AccumulateTask forkJoinTask = new AccumulateTask(1, (int) 2e8+1, (long) 1e7);
        // 使用一个新创建的ForkJoinPool任务池运行ForkJoin任务
        new ForkJoinPool().submit(forkJoinTask);

        // 打印任务结果
        System.out.println("count = " + forkJoinTask.get());

        // 计算程序耗时并打印
        long endTime = System.currentTimeMillis();
        System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
    }

}

在上面的代码中,我们会不断创建在指定范围内累加的子任务,直到任务范围小于阈值threshold(在代码中是10的7次方)时才不再拆分子任务,而是通过循环来得到累加结果。之后子任务的返回结果在上层任务中相加并作为上层任务的结果返回。到最后我们就可以得到累加两亿次的结果了。

在这个程序中,最重要的是对任务对象的三个操作:

  1. 创建任务对象,代码中使用的是new AccumulateTask(start, mid, threshold)new AccumulateTask(mid, end, threshold),这两段代码会创建除了范围不同其他逻辑与父任务完全一致的子任务,每个子任务会负责执行父任务范围中的一半;
  2. 执行子任务,通过调用任务对象的fork()方法可以让子任务被提交到当前ForkJoinPool中执行;
  3. 等待子任务返回结果,通过调用子任务任务对象的get()方法,父任务将会等待子任务执行完成并返回结果,然后将两个子任务的结果相加得到父任务的执行结果。

为什么同样都是线程池,但是ThreadPoolExecutor类就很难实现这样的执行方式呢?细心的读者可能已经发现了,我们在一个任务中会拆分出两个子任务,并且要等待这两个子任务都执行完成才能返回父任务的结果。如果是在ThreadPoolExecutor中,在等待子任务运行完成得到结果时,父任务会一直阻塞并且占用一个线程,这样的话如果父任务太多就会导致子任务没有线程可供使用了,这个运行流程就没办法继续执行下去了。而ForkJoinPool这个特殊的线程池就解决了这个问题,父任务在等待子任务执行时可以让出线程给其他任务,这样就不会导致线程都被阻塞状态的父任务所阻塞了。

这种将任务拆分为互不依赖的子任务,然后分别在不同的线程上执行,最后再将结果进行逐步合并的方法就被称为Map-Reduce。这种方法在离线大数据技术中被广泛应用,甚至可以说大数据相关技术就是在Map-Reduce思想基础上发展起来的也不为过。

线程内变量

通过上面的几个例子,我们可以看到,对于多线程程序来说,共享数据就是最大的问题。共享数据不但可能引起数据竞争问题,导致程序出现问题;而且随着引入的线程同步操作又会拉低程序的性能,甚至可能使多线程程序的执行时间比单线程程序还长得多。在上面的例子中,我们通过使用ForkJoinPool拆分并执行了一个累加任务,各个子任务之间基本完全独立,做到了最大程度的并行化。但是在一些情况下,我们可能没办法做到如此理想的方案,在一些情况下还是会留有一定的线程同步操作和对应的代码临界区。

那么在这些情况下我们如何处理能让程序的性能尽可能高呢?

假设我们现在要统计一个方法的调用次数,如果可能有多个线程同时调用该方法,那么就需要对多个线程的调用同时计数。这种情况下我们可以考虑在每个线程里各自保留一个整型变量用于保存每个线程内的调用次数,然后在获取总数时只需要把每个线程中的数量加在一起就能计算出来了。而在累加计数时我们只需要修改当前线程对应的变量就可以了,自然就没有了数据竞争问题。java.util.concurrent.atomic包中的累加器LongAdder类采用的就是这样的思路,这种思路也有自己专门的专业术语,被称为**“线程封闭”**,线程封闭指的就是这种通过线程内变量来避免线程间共享数据的优化方式。

Java中也有专门的ThreadLocal类可以处理线程内变量,只是因为性能和线程销毁时的数据保存之类的原因一般不会用于多线程累加这样的数据聚合场景,但是在保存和获取数据方面非常的便利,有兴趣的读者可以了解一下。

ConcurrentHashMap

java.util.concurrent包为我们提供了锁、原子类、线程池、ForkJoinPool等一大批并发编程工具。最后,我们来了解一下java.util.concurrent包中为我们提供的一种线程安全数据结构。

在Java中,我们常用的Map类是HashMap,但是这个类并不是线程安全的,如果我们在多个线程中同时对HashMap对象进行读写,那么就有可能引发一些程序问题。还有一个从JDK 1.0起就已经存在的Hashtable类可以保证线程安全,但是我们打开这个类的源代码可以看到,这个类中的大部分方法上都加上了synchronized标记,其中包括了最常用的getput方法,这意味着Hashtable类的对象同一时间几乎只能被一个线程所使用,这样的效率相对是比较低的。

但是其实熟悉HashMap结构的朋友可能会知道,HashMap内部的结构是分为很多的桶的,每个键值对都会根据key值的hashCode值被放到不同的桶中。其实在做修改操作时我们只需要对对应的一个桶加锁就可以了,而在执行读操作时,在大多数情况下是不用加锁的。JDK 1.5中引入的ConcurrentHashMap基本能达到这两点。

在这个类中,我们通过两种方式来优化了并发的性能:

  1. 通过限制锁保护的代码范围来减少了锁冲突发生的可能性,而且也减少了需要的锁数量,减少了同步产生的开销;
  2. 另一方面因为读取的时候并不需要加锁,而只有写操作才需要加锁,在一些读操作较多但是写操作较少的情况下,我们就能大大降低读操作的成本,从而提高了程序的性能。

而且ConcurrentHashMap中的大多数方法不仅优化了同步机制的效率,而且提供了很多原子性的类似于CAS的操作方法,下面是ConcurrentHashMap类常用的操作:

  • V putIfAbsent(K key, V value),原子性操作,如果map中不包含key,则执行map.put(key, value)并将put方法的返回值返回,否则直接返回map.get(key)的值,即当前值;
  • boolean remove(Object key, Object value),原子性操作,如果满足条件 (map中包含key对应的键值对 && value参数等于键值对中当前的value值) 则移除key对应的键值对并返回true,否则返回false;
  • boolean replace(K key, V oldValue, V newValue),原子性操作,当满足 (map中包含key对应的键值对 && oldValue参数等于键值对中当前的value值)时,将key对应的值改为newValue并返回true,否则返回false。

总结

我们在这篇文章中从对一个使用AtomicLong进行多线程累加的程序的性能测试开始,通过Map-Reduce思想大大优化了这个程序的性能,在此过程中还涉及到了ForkJoinPool类的使用。之后我们通过线程内变量正式提出了**“线程封闭”的概念,如果我们能做到线程封闭**,那么因为减少了线程间同步的开销,所以线程的性能一定会有很大的提高。最后我们介绍了java.util.concurrent包中为我们提供的并发安全数据类ConcurrentHashMap。相信通过这篇文章大家能够了解到多种多线程性能优化方法,但最重要的还是要找出多线程程序性能的瓶颈所在,这样才能在实际的实践场景中根据不同的情况因时制宜,使用合适的方法解决不同的瓶颈问题。本文中的观点是多线程程序的瓶颈主要在于共享数据引起的数据竞争问题,如果能够让不同的线程间不存在或者尽可能少地存在共享数据与临界区代码,就能够对多线程程序的性能起到正面的影响。