阅读 455

Android 源码分析(三)安卓中的线程

上一节我们一起探索了 Handler 的跨线程通信机制,这里我么趁热打铁,一举拿下 Android 中的线程。

今天我们主要探索 AsyncTask、HandlerThread、IntentService 三个类,这三个类的表现形式和传统线程有一定的区别,但是本质还是基于线程做的处理。

本文将会涉及很多 hadnler 的知识点,建议学习本文的小伙伴先学习上一章 Android 源码分析(二)handler 机制.

线程的概念及基础操作我在 Java 基础中讲了,如果不清楚的同学可以去翻一翻 Java 基础线程篇,我们直接开始讲重点了。

AsyncTask

这个类大家应该都知道吧,没用过但是肯定听说过。

来看看类注释

AsyncTask enables proper and easy use of the UI thread. This class allows yo to perform background operations and publish results on the UI thread without having to manipulate threads and/or handlers

翻译大概告诉我们这个类运行用户在 UI 线程上执行异步任务,而不需要操作线程。

跟我们想象中差不多,反正大家也都知道这个功能,我们先来看一下几个核心方法,在去阅读源码吧。

execute(Params... params)

AsyncTask的执行方法,调用这个方法,这个任务才会被丢进线程池执行,注意,这个方法只能调用一次。

protected void onPreExecute()

这个方法在excuse()方法里面被调用,所以和 AsyncTask 的 excuse调用在一个线程,很多博客都说只能在主线程调用,但是我亲测在子线程也是可以调用的,因为源码里面并没有做线程检测。补充一下,AsyncTask 类加载必须发生在主线程,因为几个类静态成员 handler是需要绑定主线程的 Looper,但是在 Android4.1及以上版本已经被系统完成了。

protected abstract Result doInBackground(Params... params);

这是唯一一个必须实现的抽象方法,这线程池中被调用。这个方法里面可以调用publishProgress 方法来更新任务进度,publishProgress 会通过 handler调用onProgressUpdate。最后这个方法需要将结果返回,

onProgressUpdate(Progress... values)

在主线程中执行,上面说了,这个方法在 doInBackground 里面通过 publishProgress 调用 handler,然后切回主线程回调这个方法。

onPostExecute(Result result)

在主线程中执行,在异步任务执行之后,此方法会被调用,其中 result 参数是doInBackground 的返回值。

so?接下来,就看源码吧,也没什么好看的,我讲一遍核心源码,大家看了我的解析之后,再跟着我的理解,回头读一遍源码就行了。

AsyncTask 源码解析

其实源码真的没什么看点。。。。 那就,先从构造方法看起吧~

有三个构造方法,另外两个最终都会调用这个构造方法

public AsyncTask(@Nullable Looper callbackLooper) {
    mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
        ? getMainHandler()
        : new Handler(callbackLooper);

    mWorker = new WorkerRunnable<Params, Result>() {
        public Result call() throws Exception {
            mTaskInvoked.set(true);
            Result result = null;
            try {
                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                result = doInBackground(mParams);
                Binder.flushPendingCommands();
            } catch (Throwable tr) {
                mCancelled.set(true);
                throw tr;
            } finally {
                postResult(result);
            }
            return result;
        }
    };

    mFuture = new FutureTask<Result>(mWorker) {
        @Override
        protected void done() {
            try {
                postResultIfNotInvoked(get());
            } catch (InterruptedException e) {
                android.util.Log.w(LOG_TAG, e);
            } catch (ExecutionException e) {
                throw new RuntimeException("An error occurred while executing doInBackground()",
                        e.getCause());
            } catch (CancellationException e) {
                postResultIfNotInvoked(null);
            }
        }
    };
}
复制代码

mmp,有两个构造方法被隐藏了,明明是公共构造方法。。。。。 好吧,那就当只有一个无参的公共构造方法吧,上面这个构造方法也是不能直接调用。 扯远了,来分析这个构造方法吧。这个方法就初始化了三个变量mHandler、mWorker、mFuture。

这里 mHandler 被绑定了主线程的 Looper(如果你要用反射质量的方法强行。。。可以绑定子线程 Looper),然后就是一个 WorkerRunnable 和 FutureTask 对象。这两个类是线程相关的概念,不了解的可以去了解一下多线程篇,反正你记住,FutureTask的 run 方法里面会调用WorkerRunnable的 call 方法,所以 mWorker 的 call 方法会在线程池中被调用,也就是异步线程里面被调用。

接下来,我们就从 execute方法被调用开始吧

public final AsyncTask<Params, Progress, Result> execute(Params... params) {
	return executeOnExecutor(sDefaultExecutor, params);
}
复制代码

这个方法比较简单,直接调用了executeOnExecutor方法,但是这里有个参数sDefaultExecutor 需要注意一下。先来看这个参数的定义:

private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}
复制代码

从代码中我们可以看到sDefaultExecutor 是一个自定义的静态线程池,注意,是静态的哦,至于里面的逻辑,我们稍后再讲,先来看 executeOnExecutor 方法吧。

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
        Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }

    mStatus = Status.RUNNING;

    onPreExecute();

    mWorker.mParams = params;
    exec.execute(mFuture);

    return this;
}
复制代码

第一步:这个方法做了状态监测,一旦执行了这个方法 mStatus 状态就不再是 PENDING 了,所以当多次调用 execute 方法多次调用就会异常。 第二步:这个方法调用了onPreExecute 方法,注意,这里没有做任何线程切换,所以onPreExecute 方法回调和 execute 方法是在同一个线程。 第三步:把 params 赋值给mWorker.mParams,这个没什么好说的,大家记得mFuture的 run 方法会调用mWorker的 call 方法即可。 第四步:把构造方法里面的 mFuture 丢进了线程池里面,这个线程池就是刚刚我们说的sDefaultExecutor,现在,我们来看sDefaultExecutor线程池的代码吧。

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}
复制代码

这里大家可以看到,在execute(final Runnable r)方法里面的参数 r 就是mFuture,然后存到队列 mTask 里面去了,这里代码应该不太好理解,我直接根据结果分析原因了。就是为了保证mTasks队列里面的任务只能“串行”执行,也就是不允许并发。又因为这里是静态资源,独立与某个 AsyncTask,而每个 AsyncTask 的 mFuture又被保存在这里面,等待执行。所以,AsyncTask任务实际上是串行执行的哦。

如果需要并发执行 AsyncTask,可以直接调用 executeOnExecutor 方法即可。 以上分析如果有不理解的童鞋需要对着源码理解一遍,源码不难读。

好,我们继续往下看,上面我们说到mFuture 被保存在SerialExecutor 的队列里面,在 scheduleNext 方法里面被消费,那么我们来看看 THREAD_POOL_EXECUTOR 到底是什么鬼~~

public static final Executor THREAD_POOL_EXECUTOR;

static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            sPoolWorkQueue, sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
复制代码

哦,THREAD_POOL_EXECUTOR 这货也是一个线程池,也就是说这里才是真正执行 AsyncTask 任务的线程池,但是被 sDefaultExecutor 线程池限制了一次只给它一个任务执行。

好,到这里我们一个 AsyncTask 任务已经被丢进线程池消费了,接下来我们我们的 mFuture 被线程池执行会运行 run 方法,run 方法会调用mWorker的 call 方法。我们来看看 mWorker 的 call 方法吧。

mWorker = new WorkerRunnable<Params, Result>() {
        public Result call() throws Exception {
            mTaskInvoked.set(true);
            Result result = null;
            try {
                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                result = doInBackground(mParams);
                Binder.flushPendingCommands();
            } catch (Throwable tr) {
                mCancelled.set(true);
                throw tr;
            } finally {
                postResult(result);
            }
            return result;
        }
    };
复制代码

注意,call 方法是在线程池里面被调用的,此时已经是在异步线程了哦。

首先是第一行一个原子 boolean,这只是一个状态的变化。然后调用Process修改了线程的优先级。 敲黑板!!!!!!注意这行代码result = doInBackground(mParams);,这里就是doInBackground 的回调,在子线程,也是我们唯一需要实现的抽象方法,这个方法里面是做我们异步任务的,然后刚刚我们说了,doInBackground 里面怎么做进度更新,这里就不在赘述了。 然后就是 doInBackground 经过一番耗时操作,得到了结果是怎么被处理的。我们来看postResult方法。

private Result postResult(Result result) {
    @SuppressWarnings("unchecked")
    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
            new AsyncTaskResult<Result>(this, result));
    message.sendToTarget();
    return result;
}
复制代码

可能这个方法读起来不是那么顺畅,因为这里发生信息竟然是 message.sendToTarget(),跟我们 handler 发生信息的方式不一样,其实不用纠结,就等同于 getHandler.sendMessag(message)。getHandler()获得的是谁,其实就是我们在构造方法里面讲过的那个 handler。然后我们来看看这个 handler 处理逻辑吧。

private static class InternalHandler extends Handler {
    public InternalHandler(Looper looper) {
        super(looper);
    }

    @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
    @Override
    public void handleMessage(Message msg) {
        AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
        switch (msg.what) {
            case MESSAGE_POST_RESULT:
                // There is only one result
                result.mTask.finish(result.mData[0]);
                break;
            case MESSAGE_POST_PROGRESS:
                result.mTask.onProgressUpdate(result.mData);
                break;
        }
    }
}
复制代码

刚刚我们发生的 message.what是MESSAGE_POST_RESULT,所以根据 case,这里面调用的是 AsyncTask 的finish方法。

private void finish(Result result) {
    if (isCancelled()) {
        onCancelled(result);
    } else {
        onPostExecute(result);
    }
    mStatus = Status.FINISHED;
}
复制代码

这里面的逻辑就很简单了。

至此,一个 AsyncTask 任务执行结束,AsyncTask 源码分析结束。小伙伴们记得根据我的逻辑去理解一遍源码哦,自己理解了一遍就能巩固了。

没有总结了,所有的东西都在源码分析中提到了。

HandlerThread

这是一个很简单的类,加上各种注释也才160行代码。

HandlerThread继承了 Thread,它是一种可以用 Handler 的 Thread,实现也很简单,就是在 run 方法里面开启 Looper 循环,然后通过 Handler往循环里面丢任务,这样就能异步执行了。

@Override
public void run() {
    mTid = Process.myTid();
    Looper.prepare();
    synchronized (this) {
        mLooper = Looper.myLooper();
        notifyAll();
    }
    Process.setThreadPriority(mPriority);
    onLooperPrepared();
    Looper.loop();
    mTid = -1;
}
复制代码

呐,大家瞅一瞅这个 run 方法把,相信读过我前面文章的小伙伴理解这个类应该不难吧。

需要注意的是,在不需要 HandlerThread 执行任务的时候,需要调用 quit 或者 quitSafely 方法来结束循环哦。

IntentService

刚刚我们讲了 HandlerThread,HandlerThread 在 Android 中的一个具体使用场景就是 IntentService。

突然想起以前为了应付面试被的一个知识点。

Q:什么是 IntentService?

A:就是一个干完活可以自己死去的 Service。

其实并没有用过 IntentService,只知道继承自 Service,然后背了这个标准答案。确实这句话是对 IntentService 的一个高度概括,但是如果只背了这句话,未免有点囫囵吞枣,今天我们来全面了解IntentService。

IntentService是一个抽象类,加上50行的类注释也不过才150行代码,同学们学习这个不要有心理压力,这只是一个小知识点。

IntentService 可用于执行后台耗时的任务,当任务执行后它会自动停止,同时由于 IntentService 是服务的原因,这导致它的优先级别比单纯的线程要高很多,不容易被杀死,所以 IntentService 比较适合执行一些高优先级别的后台任务。在实现上,IntentService 封装了 HandlerThread 和 Handler。

我们先来看 oncreate 方法把

@Override
public void onCreate() {
    // TODO: It would be nice to have an option to hold a partial wakelock
    // during processing, and to have a static startService(Context, Intent)
    // method that would launch the service & hand off a wakelock.

    super.onCreate();
    HandlerThread thread = new HandlerThread("IntentService[" + mName + "]");
    thread.start();

    mServiceLooper = thread.getLooper();
    mServiceHandler = new ServiceHandler(mServiceLooper);
}
复制代码

当 IntentService 第一次启动时,它的 onCreate 方法会被调用,然后初始化HandlerThread和 Handler。

然后看 onStart 方法

@Override
public void onStart(@Nullable Intent intent, int startId) {
    Message msg = mServiceHandler.obtainMessage();
    msg.arg1 = startId;
    msg.obj = intent;
    mServiceHandler.sendMessage(msg);
}
复制代码

直接把 intent 保存到message 里面去了,然后发送到通过 handler 发送到 HandlerThread 里面去执行了。 执行结束会回调 handler 的 handleMessage 方法,我们来看看这个方法。

private final class ServiceHandler extends Handler {
    public ServiceHandler(Looper looper) {
        super(looper);
    }

    @Override
    public void handleMessage(Message msg) {
        onHandleIntent((Intent)msg.obj);
        stopSelf(msg.arg1);
    }
}
复制代码

直接调用抽象方法 onHandleIntent 来处理异步方法,注意这里onHandleIntent 是在子线程中被调用的哦。

然后会调用stopSelf(msg.arg1)方法来尝试关闭服务,这是 Service 的方法,这个方法并不会立即关闭服务,而是会等待所有消息都处理完毕之后才会终止服务,具体原因这里就不做分析了,以后 Service 篇会讲的。

好了,分析完了。

线程池

之前讲 concurrent 包简单提了一下线程池,这里我们来补充一点线程池基础吧。

使用线程池的优点

  • 重用线程池中的线程,避免创建线程带来的开销
  • 能有效的控制线程池的最大并发数,避免大量线程之间相互抢资源导致阻塞
  • 能简单管理线程,并提供定时执行以及指定时间间隔执行等功能

ThreadPoolExecutor 的几个核心参数

  • corePoolSize:线程的核心线程数
  • maximumPoolSize:线程池所能容纳的最大线程数,当活动线程达到这个数值后,后续的新任务将会被阻塞
  • keepAliveTime:非核心线程的超时时长,超过就会被回收,当 allowCoreThreadTimeout 属性设置为 true 时,keepAliveTime 同样会作用与核心线程
  • unit:keepAliveTime的时间单位
  • workQueue:任务队列
  • threadFactory:线程工厂,用来创建新线程

ThreadPoolExecutor 执行大致规则:

  • 如果线程池中的线程数量未达到核心线程数量,那么会直接启动一个核心线程来执行任务
  • 如果线程池中的线程数量已经达到核心线程数量或者超过,那么任务会被插入到任务队列中排队等待执行
  • 如果上一步中无法将任务插入到任务队列中,是因为任务队列满了,这时如果线程数量未达到线程池最大值,那么会立即启动一个非核心线程来执行任务
  • 如果上一步线程数量达到线程池规定的最大值,那么就拒接执行此任务,会调用 RejeceedExecutionHandler 的 rejectedExecution 方法来通知调用者。

Executors

Executors能直接创建最常见的几种具有不同功能的线程池。

  • Executors.newFixedThreadPool(int nThreads) 这是一种线程数量固定的线程池,当线程处于空闲时也不会被回收,除非线程池被关闭。

  • Executors.newCachedThreadPool(ThreadFactor factor) 这是一个0核心线程、非核心线程可以达到无限大且超时时长为60秒的线程池。说白了就是可以无限任务同时执行的线程,适合大量耗时段的任务需求。

  • Executors.newScheduledThreadPool(int corePoolSize) 核心线程固定,非核心线程无限多且非核心线程用完马上被回收。

  • Executors.newSingleThreadExecutor() 只有一个核心线程,保证所有任务串行执行。好像 AsyncTask 里面的 SerialExecutor 就可以用这个替换掉。

以上的种种,都可以通过配置 ThreadPoolExecutor实现。

Android 线程篇就到这里吧。

参考资料

《Android 开发艺术探索》

关注下面的标签,发现更多相似文章
评论