在分析源码之前,首先引用一段源码中的注释说明AsyncTask的定义:
AsyncTask能够正确,方便的使用UI线程。AsyncTask可以在不用使用Threads 和 handlers的情况下,操作后台线程并且将结果发送给UI线程。 AsyncTask是为Thread和Handler而设计的帮助类,并且它应当使用在一些耗时较短的操作上,如果需要让线程保持长时间的运行,强烈的推荐使用 java.util.concurrent包中的如Excutor、ThreadPoolExecutor、FutureTask。
在分析源码之前,先看一段简单的使用例子,方便理解:
private class DownloadFilesTask extends AsyncTask(URL, Integer, Long) {
protected Long doInBackground(URL... urls) {
int count = urls.length;
long totalSize = 0;
for (int i = 0; i < count; i++) {
totalSize += Downloader.downloadFile(urls[i]);
//将进度返回给主线程,用于更新进度。
publishProgress((int) ((i / (float) count) * 100));
// Escape early if cancel() is called
if (isCancelled()) break;
}
return totalSize;
}
protected void onProgressUpdate(Integer... progress) {
setProgressPercent(progress[0]);
}
protected void onPostExecute(Long result) {
showDialog("Downloaded " + result + " bytes");
}
}
//调用
new DownloadFilesTask(url, 0, 1L).execute();
通过上面的例子我们可以看出AsyncTask的基本用法:就是用一个类继承AsyncTask,然后分别实现三个方法: doInBackground()
、onProgressUpdate()
、onPostExecute()
。使用的时候,只需要调用execute()
方法即可。那么下面我们来看一下AsyncTask的源码是怎么实现的:
public abstract class AsyncTask<Params, Progress, Result>
AsyncTask是一个抽象类,并且包含了三个泛型参数,它们的含义我们从名字应该能看得出来分别表示后台任务的参数、进度、以及结果。 分别对应上面例子中的URL、Integer、Long。
接下来我们进入它的执行方法execute():
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
可以看到,execute()调用了execureOnExecutor()这个方法 (关于execute()有一点需要说明的是,AsyncTask最初是使用串行的方式处理多任务,但是从Android 1.6开始,将其改为并行的,但是为了避免并行带来的错误,自从Android 3.2 之后又改回了串行模式 ) ,那么下面看看execureOnExecutor()的源代码:
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
从上面代码中可以看出,首先会判断当前AsyncTask的状态是否为等待状态,如果不是则报错,是等待状态将会执行onPreExecute(),也就是我们自定义的进行后台任务前的方法。然后会调用exec的execute()方法,那么这个exec是什么呢? 从execute()方法的参数可以看出,exec是一个叫做sDefaultExecutor的变量,那么我们去看看这个变量是什么:
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
原来是SerialExecutor这个类的一个对象,再继续看看SerialExecutor这个类的定义:
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
//如果当前没有正在运行的任务
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
//如果任务队列不为空
if ((mActive = mTasks.poll()) != null) {
//从任务队列中取到任务并调用线程池执行
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
可以看出,在SerialExecutor内部,实现了一个task任务的队列(ArrayDeque),并且从这里的处理,我们也能看出来之前在execute()方法解释的时候说任务是串行的原理了(注释有说明),而实现任务的串行执行的正是THREAD_POOL_EXECUTOR,从名字就可以看出来是一个线程池的定义,我们先来看看对于这个线程池的定义:
//线程池核心线程数
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
//线程池最大线程数
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
//非核心线程存活时间
private static final int KEEP_ALIVE_SECONDS = 30;
//线程工厂类
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
//大小为128的阻塞队列
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
从上面的注释我们可以看出,对于线程池的大小定义是和当前设备的CPU有关的,而核心线程数最大也不会超过4。那么当如果线程数超过了最大线程数,其他的只能阻塞排队等待,那也就是说当有多个线程的时候不适合使用AsyncTask。
我们再回到SerialExecutor类中,可以看到在添加任务进队列队尾的时候,调用了r.run()这个方法,那么我们就需要知道这个r代表什么,回看方法调用流程,我们发现在executeOnExecutor()方法中,调用exec.execute()传入了一个mFuture参数,那么来看一下mFuture的定义:
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
可以看出,mFuture是一个FutureTask的对象,首先来看一下FutureTask的类定义:
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
...
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用Callable的call()
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//将结果保存起来
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
FutureTask的参数为一个runnable与一个Result的对象,FutureTask是一个可以执行runnable,并且返回执行结果result的类。之前提到,在SerialExecutor中调用了FutureTask的run方法,从上面的run方法定义中可以看出,实际上是调用了传入的runnable转换后的Callable的call(),并且将结果保存起来,然后返回:
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
...
private void finishCompletion() {
...
done();
callable = null; // to reduce footprint
}
最后通过调用done()方法,表示已经运行完毕。
到这里我们发现,实际上最终运行的是FutureTask中的这个Runnable,那么回到mFutureTask的定义,看到传入的是一个mWorker,那么看看mWorker的定义
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
...
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
可以看到,mWorker就是一个Callable对象,并且在其call()方法内,调用了doInBackground()也就是我们自定义的后台操作,在返回结果之后,再调用postResult()返回给主线程:
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
可以看到,postResult内部也是使用handler去返回给主线程的,然后再来看看Handler的定义:
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
// 调用finish方法,并将result返回
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
可以看到,在Handler中调用了finish()这个方法:
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
在finish中,将结果返回给用户,调用我们自定义的onPostExecute,并且将当前的状态置为FINISHED,完成了整个流程。