阅读 106

【并发编程】JUC组件扩展(Callable、FutureTask、Fork/Join 框架、BlockingQueue)

前言

之前我们已经学习了创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。

这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。

如果要获得返回值就必须通过共享变量或者线程间通信的方式,实现起来较复杂。

因此在Java5开始提供了CallableFuture,通过它们可以在任务执行完毕之后得到任务执行结果。

Callable

首先我们来看一下Runnable的源码

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}
复制代码

可以看到run方法的返回值为void

下面来看一下Callable的源码

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
复制代码

可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

Callable的使用会在后面的代码演示中给出。

Future

Future类位于java.util.concurrent包下,它是一个接口.

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码

Future接口中声明了5个方法,下面依次解释每个方法的作用:

  • cancel方法:用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果任务已经完成,则无论mayInterruptIfRunningtrue还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true是什么,肯定返回true

  • isCancelled方法:表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true

  • isDone方法:表示任务是否已经完成,若任务完成,则返回true

  • get()方法:用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

  • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。   

也就是说Future提供了三种功能:

  • 1.判断任务是否完成;

  • 2.能够中断任务;

  • 3.能够获取任务执行结果。

Future的使用

@Slf4j
public class FutureExample {

    static class MyCallable implements Callable<String> {

        @Override
        public String call() throws Exception {
            log.info("do sth in callable");
            Thread.sleep(5000);
            return "done";
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());
        log.info("do sth in main");
        Thread.sleep(1000);
        String result = future.get();
        log.info("result: {}", result);
    }
}
复制代码

运行结果

FutureTask

FutureTask位于JUC包内但不是AQS的子类。

FutureTask实现了FutureRunnable接口,可以获取线程的返回值。

FutureTask的使用

@Slf4j
public class FutureTaskExample {


    public static void main(String[] args) throws InterruptedException, ExecutionException {

        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("do sth. in callable");
                Thread.sleep(5000);
                return "Done";
            }
        });

        new Thread(futureTask).start();
        log.info("do sth. in main");
        Thread.sleep(1000);
        String result = futureTask.get();
        log.info("result: {}", result);
    }
}
复制代码

运行结果与上面相同。

从使用例子中可以看出FutureTask是非常方便的,什么时候想用就什么时候启动线程就可以了。

FutureTask还可以传入多种参数类型,我们进入它的源码看一下

/**
 * Creates a {@code FutureTask} that will, upon running, execute the
 * given {@code Callable}.
 *
 * @param  callable the callable task
 * @throws NullPointerException if the callable is null
 */
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

/**
 * Creates a {@code FutureTask} that will, upon running, execute the
 * given {@code Runnable}, and arrange that {@code get} will return the
 * given result on successful completion.
 *
 * @param runnable the runnable task
 * @param result the result to return on successful completion. If
 * you don't need a particular result, consider using
 * constructions of the form:
 * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
 * @throws NullPointerException if the runnable is null
 */
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
复制代码

可以看到FutureTask可以传入CallableRunnable,传入Runnable时还可以指定返回值类型。

Fork/Join 框架

Fork/Join 框架Java7提供的用于并行执行任务的框架,通过把大任务分成多个小任务最终汇总每个小任务结果来得到最终结果。

它主要采用的是工作窃取算法,这个算法是指某个线程从其他队列里窃取任务来执行,过程如下图

线程的任务采用了双端队列,窃取任务时只能从尾部窃取。这个算法的优点就是可以充分利用线程进行并行计算,并减少了线程间的竞争。缺点是在极端情况下还是存在竞争,如队列中只有一个任务时。

Fork/Join框架有一定的局限性:

  • 任务只能使用fork和join操作作为同步机制,如果使用了其他同步机制,工作线程执行时就不能执行其他任务了。
  • 任务不应该执行IO操作
  • 任务不能抛出检查异常,必须通过必要的代码来处理它们。

Coding演示

@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {

    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample (int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            //如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            //执行任务
            leftTask.fork();
            rightTask.fork();

            //任务结束后合并结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            //合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        Future<Integer> result = forkJoinPool.submit(task);

        try{
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}
复制代码

重写的compute方法就是递归调用自身不断将大任务拆分成小任务,最后汇总结果得到最终结果的。

BlockingQueue

BlockingQueue即阻塞队列。

当阻塞队列满时进行入队操作和当阻塞队列空时进行出队操作都会使线程进入阻塞状态。

主要用于生产者消费者场景。

ArrayBlockingQueue

大小固定,内部实现是一个数组,在初始化时需要指定容量且不能改变。

先进先出的方式存储数据,最新数据在尾部。

DelayQueue

阻塞的是内部元素。DelayQueue中的元素必须实现JUC里的Delay接口(Delay接口继承了Comparable接口),一般都以元素过期时间的优先级进行排序。

内部实现是优先队列和lock。

LinkedBlockingQueue

大小配置可选,初始化时指定大小就是有边界的,若不指定就是无边界的。

内部实现是链表,其他特点与ArrayBlockingQueue一致。

PriorityBlockingQueue

没有边界,有排序规则,允许插入null。

所有插入PriorityBlockingQueue的对象必须实现Comparable接口。

SynchronousQueue

同步队列

内部仅允许容纳一个元素,内部插入一个元素后就会被阻塞,除非这个元素被其他线程消费。

Written by Autu

2019.7.21

关注下面的标签,发现更多相似文章
评论