AsyncTask源码详解

394 阅读6分钟

在分析源码之前,首先引用一段源码中的注释说明AsyncTask的定义:

AsyncTask能够正确,方便的使用UI线程。AsyncTask可以在不用使用Threads 和 handlers的情况下,操作后台线程并且将结果发送给UI线程。 AsyncTask是为Thread和Handler而设计的帮助类,并且它应当使用在一些耗时较短的操作上,如果需要让线程保持长时间的运行,强烈的推荐使用 java.util.concurrent包中的如Excutor、ThreadPoolExecutor、FutureTask。

在分析源码之前,先看一段简单的使用例子,方便理解:

private class DownloadFilesTask extends AsyncTask(URL, Integer, Long) {
      protected Long doInBackground(URL... urls) {
          int count = urls.length;
          long totalSize = 0;
          for (int i = 0; i < count; i++) {
              totalSize += Downloader.downloadFile(urls[i]);
              //将进度返回给主线程,用于更新进度。
              publishProgress((int) ((i / (float) count) * 100));
              // Escape early if cancel() is called
              if (isCancelled()) break;
          }
          return totalSize;
      }
 
      protected void onProgressUpdate(Integer... progress) {
          setProgressPercent(progress[0]);
      }
 
      protected void onPostExecute(Long result) {
          showDialog("Downloaded " + result + " bytes");
      }
 }
    
    //调用
    new DownloadFilesTask(url, 0, 1L).execute();

通过上面的例子我们可以看出AsyncTask的基本用法:就是用一个类继承AsyncTask,然后分别实现三个方法: doInBackground()onProgressUpdate()onPostExecute()。使用的时候,只需要调用execute()方法即可。那么下面我们来看一下AsyncTask的源码是怎么实现的:

public abstract class AsyncTask<Params, Progress, Result>

AsyncTask是一个抽象类,并且包含了三个泛型参数,它们的含义我们从名字应该能看得出来分别表示后台任务的参数、进度、以及结果。 分别对应上面例子中的URL、Integer、Long。

接下来我们进入它的执行方法execute():

 public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
 }

可以看到,execute()调用了execureOnExecutor()这个方法 (关于execute()有一点需要说明的是,AsyncTask最初是使用串行的方式处理多任务,但是从Android 1.6开始,将其改为并行的,但是为了避免并行带来的错误,自从Android 3.2 之后又改回了串行模式 ) ,那么下面看看execureOnExecutor()的源代码:

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
    Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }

    mStatus = Status.RUNNING;
    
    onPreExecute();
    
    mWorker.mParams = params;
    exec.execute(mFuture);
    
    return this;
}

从上面代码中可以看出,首先会判断当前AsyncTask的状态是否为等待状态,如果不是则报错,是等待状态将会执行onPreExecute(),也就是我们自定义的进行后台任务前的方法。然后会调用exec的execute()方法,那么这个exec是什么呢? 从execute()方法的参数可以看出,exec是一个叫做sDefaultExecutor的变量,那么我们去看看这个变量是什么:

public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

原来是SerialExecutor这个类的一个对象,再继续看看SerialExecutor这个类的定义:

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
      //如果当前没有正在运行的任务
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
	//如果任务队列不为空
        if ((mActive = mTasks.poll()) != null) {
          //从任务队列中取到任务并调用线程池执行	
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

可以看出,在SerialExecutor内部,实现了一个task任务的队列(ArrayDeque),并且从这里的处理,我们也能看出来之前在execute()方法解释的时候说任务是串行的原理了(注释有说明),而实现任务的串行执行的正是THREAD_POOL_EXECUTOR,从名字就可以看出来是一个线程池的定义,我们先来看看对于这个线程池的定义:

	//线程池核心线程数	
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
	//线程池最大线程数
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
		//非核心线程存活时间
    private static final int KEEP_ALIVE_SECONDS = 30;
		//线程工厂类
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };
		//大小为128的阻塞队列
    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

    /**
     * An {@link Executor} that can be used to execute tasks in parallel.
     */
    public static final Executor THREAD_POOL_EXECUTOR;

    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }

从上面的注释我们可以看出,对于线程池的大小定义是和当前设备的CPU有关的,而核心线程数最大也不会超过4。那么当如果线程数超过了最大线程数,其他的只能阻塞排队等待,那也就是说当有多个线程的时候不适合使用AsyncTask。

我们再回到SerialExecutor类中,可以看到在添加任务进队列队尾的时候,调用了r.run()这个方法,那么我们就需要知道这个r代表什么,回看方法调用流程,我们发现在executeOnExecutor()方法中,调用exec.execute()传入了一个mFuture参数,那么来看一下mFuture的定义:

mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };

可以看出,mFuture是一个FutureTask的对象,首先来看一下FutureTask的类定义:

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
...
public void run() {
    if (state != NEW ||
        !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
          //调用Callable的call()
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
              //将结果保存起来
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

FutureTask的参数为一个runnable与一个Result的对象,FutureTask是一个可以执行runnable,并且返回执行结果result的类。之前提到,在SerialExecutor中调用了FutureTask的run方法,从上面的run方法定义中可以看出,实际上是调用了传入的runnable转换后的Callable的call(),并且将结果保存起来,然后返回:

protected void set(V v) {
    if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
        outcome = v;
        U.putOrderedInt(this, STATE, NORMAL); // final state
        finishCompletion();
    }
}

...

private void finishCompletion() {
    ...

    done();
    callable = null;        // to reduce footprint
}

最后通过调用done()方法,表示已经运行完毕。

到这里我们发现,实际上最终运行的是FutureTask中的这个Runnable,那么回到mFutureTask的定义,看到传入的是一个mWorker,那么看看mWorker的定义

mWorker = new WorkerRunnable<Params, Result>() {
        public Result call() throws Exception {
            mTaskInvoked.set(true);
            Result result = null;
            try {
                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                result = doInBackground(mParams);
                Binder.flushPendingCommands();
            } catch (Throwable tr) {
                mCancelled.set(true);
                throw tr;
            } finally {
                postResult(result);
            }
            return result;
        }
    };
...
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
    Params[] mParams;
}

可以看到,mWorker就是一个Callable对象,并且在其call()方法内,调用了doInBackground()也就是我们自定义的后台操作,在返回结果之后,再调用postResult()返回给主线程:

private Result postResult(Result result) {
    @SuppressWarnings("unchecked")
    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
            new AsyncTaskResult<Result>(this, result));
    message.sendToTarget();
    return result;
}

可以看到,postResult内部也是使用handler去返回给主线程的,然后再来看看Handler的定义:

private static class InternalHandler extends Handler {
    public InternalHandler(Looper looper) {
        super(looper);
    }

    @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
    @Override
    public void handleMessage(Message msg) {
        AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
        switch (msg.what) {
            case MESSAGE_POST_RESULT:
                // There is only one result
            		// 调用finish方法,并将result返回
                result.mTask.finish(result.mData[0]);
                break;
            case MESSAGE_POST_PROGRESS:
                result.mTask.onProgressUpdate(result.mData);
                break;
        }
    }
}

可以看到,在Handler中调用了finish()这个方法:

private void finish(Result result) {
    if (isCancelled()) {
        onCancelled(result);
    } else {
        onPostExecute(result);
    }
    mStatus = Status.FINISHED;
}

在finish中,将结果返回给用户,调用我们自定义的onPostExecute,并且将当前的状态置为FINISHED,完成了整个流程。