小豹子带你看源码:Java 线程池(三)提交任务

2,421 阅读9分钟

承上启下:上一篇文章小豹子讲了线程池的实例化过程,粗略介绍了线程池的状态转换;这篇文章主要讲了我运行线程池时遇到的小问题,以及 execute 方法的源码理解。

4 并不算疑难的 Bug

按照我们的规划,下一步就应该提交任务,探究线程池执行任务时的内部行为,但首先,我要提交一个任务嘛。于是,接着上一篇文章的代码,我提交了一个任务:

@Test
public void submitTest() {
    // 创建线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(), 
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread();
            }
        }, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("拒绝服务");
        }
    });
    // 提交任务,该任务为睡眠 1 秒后打印 Hello
    threadPoolExecutor.submit(new Callable<String>() {
        @Override
        public String call() throws InterruptedException {
            Thread.sleep(1000L);
            System.out.println("Hello");
            return null;
        }
    });
}

而我并没有看到任何输出,程序也并没有睡眠一秒,而是马上结束了。哦对,我想起来,我们创建的线程默认是守护线程,当所有用户线程结束之后,程序就会结束了,并不会理会是否还有守护线程在运行。那么我们用一个简单易行的办法来解决这个问题 —— 不让用户线程结束,让它多睡一会:

@Test
public void submitTest() throws InterruptedException {
    // 创建线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(), 
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread();
            }
        }, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("拒绝服务");
        }
    });
    // 提交任务,该任务为睡眠 1 秒后打印 Hello
    threadPoolExecutor.submit(new Callable<String>() {
        @Override
        public String call() throws InterruptedException {
            Thread.sleep(1000L);
            System.out.println("Hello");
            return null;
        }
    });
    // 使主线程休眠 5 秒,防止守护线程意外退出
    Thread.sleep(5000L);
}

然而,程序等待 5 秒之后,依旧没有输出。我的第一个反应是,我对于线程池的用法不对。是不是还需要调用某个方法来“激活”或者“启动”线程池?而无论在文档中,还是各博客的例子中,我都没有找到类似的方法。我们仔细思考一下这个 Bug,产生这样问题的可能原因有三:

  1. ThreadPoolExecutor 内部代码有问题
  2. 我对 ThreadPoolExecutor 的使用方法不对
  3. 我设计的 ThreadFactoryRejectedExecutionHandler 有问题

原因 1,可能性太小,几乎没有。那么原因2、3,我们现在没法排除,于是我尝试构建一个最小可重现错误,将 ThreadPoolExecutor 剥离出来,看 Bug 是否重现:

最小可重现(minimal reproducible)这个思想是我在翻译《使用 Rust 开发一个简单的 Web 应用,第 4 部分 —— CLI 选项解析》时,作者用到的思想。就是在我们无法定位 Bug 时,剥离出当前代码中我们认为无关的部分,剥离后观察 Bug 是否重现,一步步缩小 Bug 的范围。通俗的说,就是排除法。

private class MyThreadFactory implements ThreadFactory{
    @Override
    public Thread newThread(Runnable r) {
        return new Thread();
    }
}

@Test
public void reproducibleTest() throws InterruptedException {
    new MyThreadFactory().newThread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Hello");
        }
    }).start();
    Thread.sleep(5000L);
}

还是没有任何输出,不过这是一个好消息,这意味着我们定位了问题所在:现在问题只可能出现在 MyThreadFactory 中,短短 6 行代码会有什么问题呢?哎呦(拍大腿),我没有把 Runnable r 传给 new Thread() 啊,我一直在执行一个空线程啊,怎么可能有任何输出!于是:return new Thread(r); 这样一改就好了。

5 重构

上面的问题看似简单,但能出现这么低级的错误,值得我思考。我因为产生该错误的原因有二:

  1. 我不了解 ThreadPoolExecutor 的原理,从语法上看 ThreadFactory 的实现类只需要传出一个 Thread 实例就行了,却不知 Runnable r 不可或缺。
  2. 测试代码结构凌乱不堪。即便是测试代码,也不应该写成面条,自己看尚不能清楚明了,何谈读者?

于是,我决定对测试代码进行重构。这次重构,一要使线程工厂产生非守护线程,防止因为主进程的退出导致线程池中线程全部意外退出;二要对每个操作打日志,我们要能直观的观察到线程池在做什么,值得一提的是,对于阻塞队列的日志操作,我使用了动态代理的方式对每一个方法打日志,不熟悉动态代理的童鞋可以戳我之前写的小豹子带你看源码:JDK 动态代理

// import...

public class ThreadPoolExecutorTest {
    /**
     * 记录启动时间
     */
    private final static long START_TIME = System.currentTimeMillis();

    /**
     * 自定义线程工厂,产生非守护线程,并打印日志
     */
    private class MyThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(false);
            debug("创建线程 - %s", thread.getName());
            return thread;
        }
    }

    /**
     * 自定义拒绝服务异常处理器,打印拒绝服务信息
     */
    private class MyRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            debug("拒绝请求,Runnable:%s,ThreadPoolExecutor:%s", r, executor);
        }
    }

    /**
     * 自定义任务,休眠 1 秒后打印当前线程名,并返回线程名
     */
    private class MyTask implements Callable<String> {

        @Override
        public String call() throws InterruptedException {
            Thread.sleep(1000L);
            String threadName = Thread.currentThread().getName();
            debug("MyTask - %s", threadName);
            return threadName;
        }
    }

    /**
     * 对 BlockingQueue 的动态代理,实现对 BlockingQueue 的所有方法调用打 Log
     */
    private class PrintInvocationHandler implements InvocationHandler {
        private final BlockingQueue<?> blockingQueue;

        private PrintInvocationHandler(BlockingQueue<?> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            debug("BlockingQueue - %s,参数为:%s", method.getName(), Arrays.toString(args));
            Object result = method.invoke(blockingQueue, args);
            debug("BlockingQueue - %s 执行完毕,返回值为:%s", method.getName(), String.valueOf(result));
            return result;
        }
    }

    /**
     * 产生 BlockingQueue 代理类
     * @param blockingQueue 原 BlockingQueue
     * @param <E> 任意类型
     * @return 动态代理 BlockingQueue,执行任何方法时会打 Log
     */
    @SuppressWarnings("unchecked")
    private <E> BlockingQueue<E> debugQueue(BlockingQueue<E> blockingQueue) {
        return (BlockingQueue<E>) Proxy.newProxyInstance(this.getClass().getClassLoader(),
                new Class<?>[]{BlockingQueue.class},
                new PrintInvocationHandler(blockingQueue));
    }

    /**
     * 实例化一个 核心池为 3,最大池为 5,存活时间为 20s,利用上述阻塞队列、线程工厂、拒绝服务处理器的线程池实例
     * @return 返回 ThreadPoolExecutor 实例
     */
    private ThreadPoolExecutor newTestPoolInstance() {
        return new ThreadPoolExecutor(3, 5, 20,
                TimeUnit.SECONDS, debugQueue(new LinkedBlockingQueue<>()),
                new MyThreadFactory(), new MyRejectedExecutionHandler());
    }

    /**
     * 向控制台打印日志,自动输出时间,线程等信息
     * @param info
     * @param arg
     */
    private void debug(String info, Object... arg) {
        long time = System.currentTimeMillis() - START_TIME;
        System.out.println(String.format(((double) time / 1000) + "-" + Thread.currentThread().getName() + "-" + info, arg));
    }

    /**
     * 测试实例化操作
     */
    private void newInstanceTest() {
        newTestPoolInstance();
    }

    /**
     * 测试提交操作,提交 10 次任务
     */
    private void submitTest() {
        ThreadPoolExecutor threadPool = newTestPoolInstance();
        for (int i = 0; i < 10; i++) {
            threadPool.submit(new MyTask());
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
        test.submitTest();
    }
}

编译,运行 =>

0.047-main-创建线程 - Thread-0
0.064-main-创建线程 - Thread-1
0.064-main-创建线程 - Thread-2
0.064-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@4d7e1886]
0.064-main-BlockingQueue - offer 执行完毕,返回值为:true
0.064-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@3cd1a2f1]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@2f0e140b]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@7440e464]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@49476842]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@78308db1]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@27c170f0]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
1.065-Thread-1-MyTask - Thread-1
1.065-Thread-0-MyTask - Thread-0
1.065-Thread-2-MyTask - Thread-2
1.065-Thread-1-BlockingQueue - take,参数为:null
1.065-Thread-0-BlockingQueue - take,参数为:null
1.065-Thread-2-BlockingQueue - take,参数为:null
1.065-Thread-0-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@3cd1a2f1
1.065-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@2f0e140b
1.065-Thread-1-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@4d7e1886
2.065-Thread-1-MyTask - Thread-1
2.065-Thread-2-MyTask - Thread-2
2.065-Thread-0-MyTask - Thread-0
2.065-Thread-1-BlockingQueue - take,参数为:null
2.065-Thread-2-BlockingQueue - take,参数为:null
2.065-Thread-0-BlockingQueue - take,参数为:null
2.065-Thread-1-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@7440e464
2.065-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@49476842
2.065-Thread-0-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@78308db1
3.066-Thread-1-MyTask - Thread-1
3.066-Thread-2-MyTask - Thread-2
3.066-Thread-0-MyTask - Thread-0
3.066-Thread-2-BlockingQueue - take,参数为:null
3.066-Thread-1-BlockingQueue - take,参数为:null
3.066-Thread-0-BlockingQueue - take,参数为:null
3.066-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@27c170f0
4.067-Thread-2-MyTask - Thread-2
4.067-Thread-2-BlockingQueue - take,参数为:null

日志的格式是:时间(秒)-线程名-信息

从日志输出中,我们可以获知:

  • 当队列为空,线程数少于核心线程数时,提交任务会触发创建线程,并立即执行任务
  • 当核心线程均忙,再提交的请求会被存储至阻塞队列,等待线程空闲后执行队列中的任务
  • 除主线程外,始终只有三个工作线程
  • 当队列为空,工作线程还在运行的时候,工作线程会因为阻塞队列的 take 方法阻塞(这一点由日志后几行可以看出,只有调用日志,没有调用完成的日志)

由此,我产生一个疑问:为什么始终只有三个线程?我的设置不是“核心池为 3,最大池为 5”吗?为什么只有三个线程在工作呢?

6 submit 任务

终于开始看源码了,我们以 submit 为切入点,探寻我们提交任务时,线程池做了什么,submit 方法本身很简单,就是将传入参数封装为 RunnableFuture 实例,然后调用 execute 方法,以下给出 submit 多个重载方法其中之一:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

那么,我们继续看 execute 的代码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

我们首先解释一下 addWorker 方法,暂时我们只需要了解几件事情就可以理解 execute 代码了:

  • 该方法用于新建一个工作线程
  • 该方法线程安全
  • 该方法第一个参数是新线程要执行的第一个任务,第二个参数是是否新建核心线程
  • 该方法如果新建线程成功,则返回 true,否则返回 false

那么我们回过头来理解 execute 代码:

为了帮助理解,我根据代码逻辑画了一个流程图:

execute 方法流程图

现在我明白了,只有等待队列插入失败(如达到容量上限等)情况下,才会创建非核心线程来处理任务,也就是说,我们使用的 LinkedBlockingQueue 队列来作为等待队列,那是看不到非核心线程被创建的现象的。

有心的读者可能注意到了,整个过程没有加锁啊,怎样保证并发安全呢?我们观察这段代码,其实没必要全部加锁,只需要保证 addWorkerremoveworkQueue.offer 三个方法的线程安全,该方法就没必要加锁。事实上,在 addWorker 中是有对线程池状态的 recheck 的,如果创建失败会返回 false。

系列文章

小豹子还是一个大三的学生,小豹子希望你能“批判性的”阅读本文,对本文内容中不正确、不妥当之处进行严厉的批评,小豹子感激不尽。