阅读 787

Java多线程-五种线程池分析以及AnsyncTask源码分析

这是小编个人的学习总结,还望各位大神多多吐槽~

一.前言

线程是我们处理耗时操作的利器但是每个线程的创建和销毁都需要一定的开销。只是偶尔的使用线程去做一些操作的时候还是可以直接new的,如果需要大量的线程去做一些操作的时候我们就要慎重考虑了,比如某个软件下载APP这样的。这里呢Java 1.5中提供了Executor框架用于把任务的提交和执行解耦,任务的提交交给了Runable或者Callable,而Executor框架用来处理任务。ThreadPollExecutor则是Executor框架的核心实现类。下面我们就来看下这个类。

(1)ThreadPollExecutor

我们可以直接用该类去创建一个线程池,我们先来看一下ThreadPollExecutor类最多的一个参数列表,读完你就知道怎么使用了。

  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable>                    workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {复制代码
  • corePoolSize: 核心线程数。默认情况下线程池是空的,只有有任务提交的时候才会去创建线程。当运行的线程少于corePoolSize的时候,新建任务将创建核心线程,如果的等于或者大于corePoolSize的时候将不会创建核心线程,创建普通线程。可以调用prestartAllcoreThread的方法来提前创建并启动所有的核心线程。

  • maximumPoolSize:线程池所允许的最大的线程数量。当任务队列满了的话线程数小于maximumPoolSize的值的话,新建任务还是会创建新的线程的。

  • keepAliveTime : 非核心线程闲置的超时时间,第一是非核心的线程第二是闲置状态。调用allowCoreThreadTimeOut的时候该设置也会作用再核心的线程上面。

  • TimeUnit : 超时的时间单位。DAYS(天),HOURS(小时),MINUTES(f分钟),SECONDS(秒),MILLISECONDS(毫秒).

  • workQueue : 任务队列(阻塞队列)。当前线程书大于corePoolSize的大小的时候,会将任务加入阻塞队列里面。该队列是BlockingQueue类型的。

  • ThreadFactory : 线程工场。我们可以自己去定义。

  • RejectedExecutionHandler : 饱和策略。这是当任务队列和线程数都满了的时候所采取的的对应策略默认是AbordPolicy表示无法处理新的任务,并抛出RejectedExecutionException异常。

    1.CallerRunsPolicy : 用调用者所在的线程来处理任务。

    2.DisCardPolicy : 不能执行任务并将任务删除。

    2.DisCardOldesPolicy : 丢弃队列最近的任务,并执行当前的任务, 会一直执行下去。

(2)FixedThreadPool

FixedThreadPool是可重的固定线程数的线程池。

//创建FixedThreadPool
 Executors.newFixedThreadPool(5);

//创建所需要的参数
  public static ExecutorService newFixedThreadPool(int nThreads) {
 //最后调用的都是ThreadPoolExecutor
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }复制代码

我们只需要传如需要创建的线程的数量,也就是线程池的大小。我们可以看到构造方法,传入的线程的数量就是核心线程的数量。也就是FixedThreadPool会创建固定数量核心线程的线程池,并且这些核心线程不会被回收,任务超过线程的数量将存入队列中。

(2)CachedThreadPool

该线程池是根据需要去创建线程的。

//创建缓存线程池
Executors.newCachedThreadPool();
//构造方法
  public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }复制代码

我们可以看出,这里面的核心线程数是0,而线程的最大值是Integer.Max_VALUE。闲置超时间是一分钟。默认队列可以保证任务顺序的执行。CacheThreadPool适合大量的需要立即处理并且耗时较少的任务。

(4)SingleThreadExecutor

该类是使用单个线程的线程池。

//创建方法
  Executors.newSingleThreadExecutor();
  //构造方法
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }复制代码

corePoolSize和maximumPoolSize都是1,意味着SingleThreadExecutor只有一个核心线程。其他的参数和FixedThreadPool一样。如果已经创建了一个线程再来一个任务的时候会将该任务加入到任务队列里面,确保了所有任务的执行顺序。

(5)ScheduledThreadPool

ScheduledThreadPool是一个能实现定是和周期性任务的线程池。

//创建
 Executors.newScheduledThreadPool(5);
//构造方法
     public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }
//最后还是调用的
ThreadPoolExecutor的构造方法。复制代码

当执行任务的时候,会先将任务包装成ScheduledFutrueTask并添加到DelayedWorkQueue里面,当没超过corePoolSize的时候,会创建线程,人后去DelayedWorkQueue队列里面去拿任务,并不是立即的执行。当执行完任务的时候会将ScheduledFutrueTask中的time变量改为下次要执行的时间并放回DelayedWorkQueue中。

二.AsyncTask的源码分析

有了之前的铺垫我们就可以去看下AsyncTask的源码了。我相信再很早的时候大家都会接触到AsyncTask加HttpUrlConnection的组合去下载网络数据。我们先来看下构造:

public abstract class AsyncTask<Params, Progress, Result> {
}复制代码

AsyncTask是一个抽象的泛型类,他有三个泛型,第一个是要传入的参数的类型Params,第二个是进度会掉的参数类型Progress,第三个是返回值类型额参数Result。AsyncTask有四个核心的方法。

  • onPreExecute() : 再任务启动前调用,最先调用。

  • doInbackground(Parsms..) : 在线程池中执行。再onPreExecute执行后执行,该方法运行再子线程中。再该方法中去处理耗时的操作。

  • onProgressUpdate(Progress...) : 再主线程中执行。当调用publishProgress(Progress...)的时候,此方法会将进度更新到UI上。

  • onPostExecute(Result result): 再主线程中。接受doInbackground方法返回的值。

AsyncTask再3.0之前和之后的改动比较大。我们直接来看3.0之后的版本。先来看下构造方法:

    public AsyncTask() {
    //1
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);
                Result result = null;
                try {
                    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                    //noinspection unchecked
                    result = doInBackground(mParams);
                    Binder.flushPendingCommands();
                } catch (Throwable tr) {
                    mCancelled.set(true);
                    throw tr;
                } finally {
                    postResult(result);
                }
                return result;
            }
        };
//2
        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }复制代码

从注释1我们可以看到WorkerRunable实现了Callable接口,并实现了Call方法再Call方法里面调用了doInBackground方法来处理任务并得到结果,然后通通过postResult方法将结果传递出去。这就解释了为什么doInBackground的方法是在子线程中,但是postResult也在call里面啊,别急慢慢来。注释2这里是一个FutureTask的可管理的异步任务,它实现了Runable和Future接口。因此可以包装成Runable和Callable,提供给Executor执行。也可以直接调用FutureTask.run()方法。这里new FutureTask(mWorker) 将WorkerRunnable通过参数保存到自己的方法内存中,在执行的时候会用到。当调用了exeture()方法时:

  public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }

//全局变量定义
    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

//调用到
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }

        mStatus = Status.RUNNING;

        onPreExecute();

        mWorker.mParams = params;
        exec.execute(mFuture);

        return this;
    }复制代码

这里呢就会看到首先调用的是onPreExecute的方法了。当exec.execute(mFuture);的时候FutureTask作为一个参数传进了sDefaultExecutor的execute方法里面。sDefaultExecutor是一个串行的SerialExecutor;

    private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }复制代码

我们可以看出当exec.execute(mFuture);执行的时候,会将FutureTask加入到mTaks中。无论任务执行如何都将调用schedleNext方法,它会从mTasks中取出FutureTask任务并交给THREAD_POOL_EXECUTOR处理。然后再FutrueTask的run方法执行的时候也活调用传入的WorkerRunable的call方法

//FutureTask的run方法。
 public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                //这里调用
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        }
        ........
   }复制代码

然后将结果传递出去:

private Result postResult(Result result) {
        @SuppressWarnings("unchecked")
        Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
        message.sendToTarget();
        return result;
    }复制代码

这里会创建一个Message,将消息发送到handler.这里就解释了为什么postResult里面是主线程了。我们看一下getHandler()方法。

   private static Handler getHandler() {
        synchronized (AsyncTask.class) {
            if (sHandler == null) {
                sHandler = new InternalHandler();
            }
            return sHandler;
        }
    }复制代码

这个InternalHandler是继承与Handler 的类,我们都很清除handler,这里就不再详细的描述了,接着我们看下全局变量都是什么意思:

   //获取CPU的数量,我们可以借鉴哦
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
   //这里使用的还是ThradPoolExecutor,这里是核心线程的数量。
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    //这是最大线程的数量,跟cpu的数量有关。
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    //闲置线程的存活时间,我记得之前是60s。
    private static final int KEEP_ALIVE_SECONDS = 30;
    //自己定义的ThreadFactory我们可以借鉴。
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };
    //定义的任务队列,这里是LinkedBlockingQueue,详情可以百度一下阻塞队列。大小是128.
    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);
   //AsyncTask.execute()最终调用的是THREAD_POOL_EXECUTOR的execute()方法。我们可以看到它是ThreadPoolExecutor 。
    public static final Executor THREAD_POOL_EXECUTOR;

    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }

    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
    //Message发送消息的标记位。
    private static final int MESSAGE_POST_RESULT = 0x1;
     //Message发送消息的标记位。
    private static final int MESSAGE_POST_PROGRESS = 0x2;

    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
    //自定义的Handler
    private static InternalHandler sHandler;

    private final WorkerRunnable<Params, Result> mWorker;
    private final FutureTask<Result> mFuture;

    private volatile Status mStatus = Status.PENDING;

    private final AtomicBoolean mCancelled = new AtomicBoolean();
    private final AtomicBoolean mTaskInvoked = new AtomicBoolean();复制代码

这里呢基本上我们就了解AsyncTask 的内部构造了。Android 3.0 以上使用的话,SerialExecutor作为默认的线程,它将任务串行的处理。我们想要并行的处理的话需要这样:

asynctask。executeOnExecutor(自定义线程,"");复制代码

三.总结

我相信线程池构造方法知道后我们就可以去使用了,使用起来也是很简单的这里就不在叙述了,不懂的话大家可以评论,我会实时回复的啊~

欢迎大家关注,点个赞吧~:
我的CSDN
我的掘金
我的简书

关注下面的标签,发现更多相似文章
评论