Java并发实战(六) 任务执行

1,834 阅读5分钟

1.任务

把程序抽象成多个任务。

2.现代web程序划分任务边界

以独立的客户请求为边界。就是一个请求一个任务。

3.任务调度策略

3.1串行的执行

糟糕的响应性和吞吐量。

3.2为每一个任务创建一个线程

结论:

  • 任务交由子线程处理,提高了响应性和吞吐量。
  • 任务处理的代码必须是线程安全的。

不足:

  • 线程生命周期的开销非常高。
  • 资源消耗。可运行线程多于可用处理器的数量,会有线程闲置占用内存,且大量线程竞争CPU时将产生其他性能开销。
  • 稳定性。不同平台可创建线程的数量有限制。

4.Java中Executor框架的设计

4.1设计理念

Java提供了Executor框架来执行任务。基于生产者-消费者模式。提交任务就是操作相当于生产者,执行任务的线程相当于消费者。(解耦,削峰)

    public interface Executor {
        void execute(Runnable command);
    }

4.2执行策略

任务的提交代码散布在整个程序的业务代码中。
执行策略则统一交由框架处理。

执行策略中定义了任务执行的"What,Where,When,How"等方面,包括:

  • 在什么(What)线程中执行任务?
  • 任务按照什么(What)顺序执行(优先级)?
  • 有多少个(How Many)任务能并发执行?
  • 在队列中有多少个(How Many)任务在等待执行?
  • 系统该怎么(How)拒绝任务?
  • 在任务执行前后,应该进行哪些(What)动作?

通过将任务提交与任务的执行策略分离,有助于在部署阶段选择与可用硬件资源最匹配的执行策略。

4.3线程池

Executor任务执行框架将"为每一个任务分配一个线程"策略编程基于线程池的策略。
类库提供了一个灵活的线程池及一些有用的默认配置。如newFixedThreadpool。

  • Web服务器不会再高负载情况下失败。
  • 但是任务到达的速度总是超过任务执行的速度,服务器仍有可能耗尽内存。

4.4Executor的生命周期

Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法。

    public interface ExecutorService extends Executor {
        /**
         * 平缓的关闭过程:不再接受新任务,等待已经提交的任务执行完成。
         */
        void shutdown();
        
        /**
         * 粗暴的关闭过程:它将尝试取消所有运行中的任务,不在启动队列中尚未开始执行的任务。
         */
        list<Runnable> shutdownNow();
        
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit);
    }

4.5延迟任务和周期任务

JAVA中提供Timer来管理定时任务。

  • Timer执行定时任务只会创建一个线程。
  • Timer是基于绝对时间的调度机制,对系统时间敏感。
  • Timer存在线程泄露问题(Timer不捕获异常,当抛出一个未检查异常时线程将终止)。

ScheduledThreadPoolExecutor更优质的管理定时任务。

  • 其内部是一个线程池。
  • 其很好的解决了Timer的线程泄露问题。

不适用于分布式环境。

5.找出可利用的并行性

本章提供一些示例来发掘在一个请求中的并行性。

5.1 示例:串行的页面渲染器

假设页面 = 文本标签 + 图片
如下代码串行的执行渲染。

    public class SingleThreadRenderer {
        void renderPage(CharSequence source) {
            renderText(source);
            List<ImageData> imageData = new ArrayList<ImageData>();
            for (ImageInfo imageInfo : scanForImageInfo(source))
                imageData.add(imageInfo.downloadImage());
            for (ImageData data : imageData)
                renderImage(data);
        }
    }

5.2携带结果的任务Callable与Future

Runnable作为基本的任务表现形式。缺陷:1.无返回值。2.不能抛出一个受检查异常。

  • Callable接口

它是任务更好的抽象,描述了一个任务的返回值和异常。

    public interface Callable<V> {
        V call() throws Exception;
    }
  • Future接口

它表示一个任务的生命周期,并提供了相应的方法来判断任务是否已经完成或取消。

    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws Exception;
        V get(long timeout, TimeUnit unit);
    }

5.3示例:使用Future实现页面渲染器

将渲染过程分解成两个任务,一个是渲染所有的文本,一个是下载所有图像。

    代码略。

渲染文本和渲染图片并发执行。

5.4在异构任务并行化中存在的局限

上例中一般渲染文本的速度远远高于渲染图片的速度,程序最终和串行执行效率差别不大,代码确变得更复杂了。

只有大量相互独立且同构的任务可以并发进行处理时,才能体现出性能的提升。

5.5CompletionService:Executor与BlockingQueue

提交一组任务,简单的写法。

    @Test
    public void test() throws Exception{
        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<Future<String>> futures = new ArrayList();
        for (int i=0; i<5; i++){
            final int param = i;
            Future<String> future = executor.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(param * 1000);
                    return "result" + param;
                }
            });
            futures.add(future);
        }

        for (int i=4; i>0; i--) {
            System.out.println(futures.get(i).get());
        }
    }

CompletionService将Executor和BlockingQueue的功能融合。你可以将Callable任务提交给它执行,然后使用类似队列操作的take和poll方法来获得已完成的结果。

5.6示例:使用CompletionService实现页面渲染器

书上的示例:略。

    @Test
    public void test() throws Exception{
        ExecutorService executor = Executors.newFixedThreadPool(5);
        CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
        for (int i=4; i>0; i--){
            final int param = i;
            completionService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(param * 1000);
                    return "result" + param;
                }
            });
        }
        for (int i=0; i<4; i++) {
            System.out.println(completionService.take().get());
        }
    }
    输出:
        result1
        result2
        result3
        result4

5.7为任务设置时限

为单个任务设置时间。

    @Test
    public void singleTaskTest(){
        ExecutorService executor = Executors.newFixedThreadPool(5);
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() {
                try {
                    Thread.sleep(2000L);
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println("任务执行完毕...");
                return "singleTask.";
            }
        });
        try {
            System.out.println(future.get(1, TimeUnit.SECONDS));
        }catch (TimeoutException e){
            System.out.println("任务超时...");
            future.cancel(true); // 这句话的是否注销影响运行情况,原理未知?
        }catch (InterruptedException e){
            e.printStackTrace();
        }catch (ExecutionException e){
            e.printStackTrace();
        }
    }

5.8示例:陆行预定门户网站

未多个任务设置超时时间。

6.总结

本章主要是介绍了Java的Executor框架的优点和一些常见需求。
还有对任务的划分粒度,要根据业务场景分析任务边界。