阅读 741

重学Android-我对Handler有了新认识

Handler,我们Android工程师每天都在用的东西,它主要可以用来做线程间通信,这一块知识其实我们很熟悉了,但是最近我又翻了一遍Handler源码,发现我对Handler又有了新的认识,所以总结了这篇文章记录一下, 本文的源代码基于8.0。

在文章开始之前,友情提示,我会默认大家有了解Handler,Looper,MessageQueue的基础知识,不清楚的同学可以自行查阅别的资料哈。

由于Handler在Framework层的代码量也是比较大的,一篇文章不可能面面俱到,所以我打算从Handler的使用入手,只对关键节点进行深入,这样既不会太陷入细节,也能把握Handler背后的整个流程。

使用-消息的发送

先看一下Handler的使用,我们先定义一个MyHandler静态内部类如下:

    private static class MyHandler extends Handler {

        WeakReference<MainActivity> mMainActWeakRef;

        public MyHandler(MainActivity activity) {
            this.mMainActWeakRef = new WeakReference<>(activity);
        }

        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
        }
    }
复制代码

然后我们new一个MyHandler对象,就可以通过向MyHandler发消息来实现我们的逻辑了。

    // MainActivity,onCreate
    mMyHandler = new MyHandler(this);
    mMyHandler.sendMessage(new Message());
复制代码

我们调用了sendMessage方法,最终会触发mMyHandler的handleMessage方法被回调,参数就是我们发送的Message。我们看下sendMessage的实现

    public final boolean sendMessage(Message msg){
        return sendMessageDelayed(msg, 0);
    }
    
    public final boolean sendMessageDelayed(Message msg, long delayMillis){
        if (delayMillis < 0) {
            delayMillis = 0;
        }
        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }
    
    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        if (queue == null) {
            RuntimeException e = new RuntimeException(
                    this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }  

    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        msg.target = this;
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }
复制代码

可以看到,先调用Handler的sendMessageDelayed方法,由于delay为0,所以消息的发送时间是SystemClock.uptimeMillis()获得的当前时间,接着调用enqueueMessage方法,内部调用了MessageQueue的enqueueMessage方法,具体发送到哪个MessageQueue由Handler初始化时绑定的Looper决定的,一般我们在主线程new的Handler默认使用MainLooper,MainLooper的初始化在ActivityThread的main方法中。

接下来我们看下MessageQueue的enqueueMessage方法的实现,关键方法多已经添加注释了。

    boolean enqueueMessage(Message msg, long when) {
    // ....
        synchronized (this) {
            // 设置消息inUse标识
            msg.markInUse();
            // 消息何时被处理
            msg.when = when;
            // mMessages是消息队列头,注意数据结构是单链表
            Message p = mMessages;
            boolean needWake;
            // 如果队列是空,或者when的时间为0,或者当前插入的时间小于表头消息时间
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                // 将消息插入表头,并检查当前是否被阻塞,如果被阻塞,设置needWake标识
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                // 这一坨逻辑就是将新的msg插入到一个合适的位置,按照when字段从小到大排序
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // We can assume mPtr != 0 because mQuitting is false.
            // 如果需要唤醒,唤醒Looper
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
复制代码

所以插入消息到消息队列是立即插入的,按照消息的when字段来排序,如果是delay的消息,它的when就是当前时间+delay时间,最后有一个很关键的方法调用 nativeWake(mPtr),它的作用是唤醒Looper,具体实现这里先放一放,后面会做详细分析。

消息的监听

先来复习一下Looper的知识,我们知道Handler需要绑定一个Looper,主线程创建的Handler默认绑定的就是主线程的Looper,当然在子线程也可以用Looper

// 子线程中使用Looper
Looper.prepare();
Looper.loop();
复制代码

prepare方法就是新建一个Looper,并设置给sThreadLocal对象。 我们重点看loop方法的实现

    public static void loop() {
        // ...省略检查代码...
        for (;;) {
            // 从MessageQueue中获取下一条消息
            Message msg = queue.next(); // might block
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }

            // ...省略代码...
            try {
                // 消息分发
                msg.target.dispatchMessage(msg);
                dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
            } finally {
                if (traceTag != 0) {
                    Trace.traceEnd(traceTag);
                }
            }
            // ...省略代码...
        }
    }
复制代码

我们看到先是一个死循环,然后调用了queue.next()这个方法,从MessageQueue获取下一条消息,而且可能会阻塞,我们继续跟进next方法的实现

    // MessageQueue.java
    Message next() {
        // 获取ptr,这是NativeMessageQueue的指针
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            // pollOnce,会阻塞
            nativePollOnce(ptr, nextPollTimeoutMillis);
            
            // 从消息队列中拿消息,注意这里加锁了
            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }
            // ...省略了一坨代码...    
            
            // 执行idle Handler
            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }
        }
    }
复制代码

方法比较长,不重要的代码我已经省略了,我们先列下重点流程,然后逐个分析。

  • 获取mPtr,long类型,看名字它应该是一个指针,它其实是Native层的MessageQueue对象指针
  • 调用nativePollOnce,参数是mPtr和Poll超时时间,这个方法会阻塞
  • 加锁,然后从消息队列中取出一个Message,这里有一坨逻辑,我们后面会细说
  • 如果消息队列已经为空了,会执行idle handlers,

mPtr的初始化

mPtr的初始化是在MessageQueue的构造方法中初始化的

    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();
    }
复制代码

nativeInit是一个Jni方法,继续跟进

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }

    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}
复制代码

可以看到new了一个NativeMessageQueue对象(它继承与RefBase,是一个智能指针对象),然后将它的引用计数+1,这样就不会自动回收,最后将它的对象指针地址返回。所以我们就清楚了,应用层MessageQueue中的mPtr就是Native层NativeMessageQueue的指针。

nativePollOnce

nativePollOnce也是一个Jni方法,我们看下实现

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
复制代码

先将参数的mPtr转化为NativeMessageQueue指针,然后调用了pollOnce方法,继续跟进

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}
复制代码

调用了mLooper的pollOnce,这个mLooper是native层的Looper对象,我们继续跟进

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
        // 省略了一坨代码
        result = pollInner(timeoutMillis);
    }
}
复制代码

调用了自身的pollInnner方法

int Looper::pollInner(int timeoutMillis) {
    // 省略了一坨代码
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
  // 省略了一坨代码
}
复制代码

这里只贴出关键代码,我们看到调用了epoll_wait方法,这个方法可能会阻塞,正因为这个方法,所以我们前面介绍的Looper中的死循环不会导致主线程一直在空转(面试常见考点)。这个方法返回会有几种情况。

  1. 发生错误
  2. 超过了我们传入的timeoutMills时间
  3. 等待的mEpollFd被唤醒,具体的唤醒过程后面会介绍

当nativePollOnce方法返回后,进入下一步

加锁,取下一条消息

我们重新回到MessageQueue.java中,这里再贴一次代码,我直接在代码中添加注释了

        final long now = SystemClock.uptimeMillis();
        Message prevMsg = null;
        // 获取消息队列的头
        Message msg = mMessages;
        // 如果msg.target==null,说明这是一个消息屏障,如果是消息屏障,会遍历整个消息队列,
        // 去搜索是否有异步消息,如果有异步消息,会将第一个异步消息取出
        if (msg != null && msg.target == null) {
            // Stalled by a barrier.  Find the next asynchronous message in the queue.
            do {
                prevMsg = msg;
                msg = msg.next;
            } while (msg != null && !msg.isAsynchronous());
        }
        if (msg != null) {
            // 如果消息还没到时间,Looper继续休眠
            if (now < msg.when) {
                // Next message is not ready.  Set a timeout to wake up when it is ready.
                nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
            } else {
                // Got a message.
                mBlocked = false;
                if (prevMsg != null) {
                    prevMsg.next = msg.next;
                } else {
                    mMessages = msg.next;
                }
                msg.next = null;
                if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                msg.markInUse();
                // 从消息队列移除消息,并返回
                return msg;
            }
        } else {
            // No more messages.
            // 如果消息队列是空,继续休眠,而且休眠时间是永久,直到被唤醒
            nextPollTimeoutMillis = -1;
        }
复制代码

这里涉及到二个概念,需要解释一下

  • 消息屏障,它也是一个消息,只不过它的target是null,我们可以通过MessageQueue的postSyncBarrier方法来发送一个消息屏障插入到消息队列,排在它后面的普通消息会一直等待,直到该消息屏障被移除才会有机会被处理,当然需要清除就调用removeSyncBarrier。
  • 异步消息,异步消息通过Message的setAsynchronous方法来设置,异步消息不受消息屏障的影响,所以如果希望消息能立刻被执行,可以发送异步消息,并将其插入到消息队列的头部。
    Message message = Message.obtain();
    if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP_MR1) {
        message.setAsynchronous(true);
    }
    mMyHandler.sendMessageAtFrontOfQueue(message);
复制代码

执行idle handlers

当消息队列中没有消息或者消息队列现在被消息屏障阻塞着的时候,会执行idle handlers,所以它的意义在于,当消息队列中没有任务在执行时,给我们一个回调,让我们有机会去执行一些非紧急任务,而不用担心抢占主线程紧急任务的CPU执行时间片,因为此刻消息队列是空闲状态,Idle Handler的一个典型应用就是:我们经常在App启动的时候会用到Idle Handler来执行一些非紧急的初始化任务。可以通过下面的代码向MessageQueue中添加IdleHandler

    mMessageQueue.addIdleHandler(new MessageQueue.IdleHandler() {
        @Override
        public boolean queueIdle() {
            return true;
        }
    });
复制代码

这里queueIdle的返回值是有意义的,如果返回false,IdleHandler被回调后就会被移除,也就是说只会回调一次,如果返回true,会多次调用。当然IdleHandler也是执行在主线程,如果做过重的任务,仍然有可能会阻塞消息队列,导致后面的消息处理不及时。

Looper唤醒-从消息队列中取消息

前面已经说过了,Looper调用MessageQueue的next方法会阻塞,它的底层实现是epoll_wait,那么在阻塞的时候,如果有新消息过来,肯定是要唤醒Looper来处理消息。唤醒方法我们 前面提到过,我们再回到MessageQueue的enquque方法

    if (needWake) {
        nativeWake(mPtr);
    }
复制代码

它也是一个native方法,调用是NativeMessageQueue的wake方法,它最终调用的是native层Looper的wake方法。

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
    mLooper->wake();
}

void Looper::wake() {
    // 省略判断代码
    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    // 省略判断代码
}
复制代码

在Looper的wake方法中,调用了write方法,它的第一个参数是mWakeEventFd,其实就是往mWakeEventFd这个描述符对应的文件中写入了一个1,很显然就是这个写入将把Looper从epoll_wait的等待 中唤醒,那mWakeEventFd跟我们的epoll_wait有什么关系呢?

我们查看下Looper的构造函数

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
                        strerror(errno));
    AutoMutex _l(mLock);
    rebuildEpollLocked();
}
复制代码

可以看到在构造器内,mWakeEventFd被初始化,然后调用了rebuildEpollLocked()方法,看名字应该跟构建Epoll有关,跟进去看看

void Looper::rebuildEpollLocked() {
    // 省略一坨代码
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    // 省略一坨代码
}
复制代码

先创建了mEpollFd,接着调用epoll_ctl方法,这个方法就是将mEpollFd和mWakeEventFd关联起来,当向mWakeEventFd中写入数据时,mEpollFd将会被唤醒,epoll_wait方法就会返回。到这里其实就很清楚了,我们上面的wake方法,就是向mWakeEventFd中写入了一个1,来唤醒mEpollFd。

消息的分发

消息的分发在Lopper.java类中,我们看下实现

    try {
        msg.target.dispatchMessage(msg);
        dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
    } finally {
        if (traceTag != 0) {
            Trace.traceEnd(traceTag);
        }
    }
复制代码

最终调用了msg的target字段来分发,我们前面说过这个target就是Handler对象

    public void dispatchMessage(Message msg) {
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }
复制代码

这段逻辑很简单,这里简单提一下,先判断msg是否有callback,如果没有再判断Handler是否有Callback,如果没有,才最终回调Handler的handleMessage方法来处理。

总结

到这里,Android中Handler的整个原理就讲完了,我们从Message的整个生命周期入手,从消息被创建,到加入消息队列,到被消费处理然后被回收,我们都深入到了native层,了解了底层的技术原理。

总结一下

本文主要分析了以下几点

  1. 消息是如何创建然后插入到消息队列的
  2. Looper在死循环中等待消息用到了 epoll_wait机制,从而不会卡死主线程
  3. Looper的唤醒机制,通过向mWakeEventFd中写入数据来唤醒Looper
  4. 消息的分发机制,而且有介绍消息屏障和异步消息
  5. idle Handler 的回调时机分析

了解这些原理,可以指导我们写出更优的应用代码,希望本文对你有帮助!

最后我想贴一段代码,来演示epoll的使用,算是彩蛋吧!

为什么会有这个想法呢?因为我相信大部分同学包括我自己,对Linux都算不上熟悉,看源码内的很多系统调用很容易一脸懵逼,这样理解源码起来就很困难。所以我就一直想能有个机会真实操作一把Linux编程,刚好这次分析的Handler底层利用的epoll不是很难,我就撸了一个demo来感受一下Linux编程,直接上代码(环境是ubuntu 14, 编译器是clion)。

需求非常简单:主进程fork一个子进程出来,然后主进程epoll_wait等待,子进程先sleep 2s,然后唤醒主进程,程序结束

#include <sys/eventfd.h>
#include <sys/epoll.h>

static const int EPOLL_SIZE_HINT = 8;
int main(void) {
    // 创建时间描述符
    int mEventFd = eventfd(0, 0);
    if (mEventFd == -1) {
        printf("create efd error!");
        exit(0);
    }
    // 创建epoll描述符
    int mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    struct epoll_event eventItem;
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mEventFd;
    // 绑定事件描述符到epoll
    epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mEventFd, &eventItem);
    
    // 创建一个子进程,注意fork出来的进程继承主进程的内存布局,而vfork不会
    _pid_t pid = fork();
    // 注意fork方法会返回二次,==0 是在子进程,>0在父进程
    if (pid == 0) {
        // 子进程
        printf("child process sleep 2s\n");
        // 先sleep 2s
        sleep(2);
        uint64_t event = 1;
        // 写入1到mEventFd, 唤醒父进程的epoll
        write(mEventFd, &event, sizeof(uint64_t);
        printf(child process write event finish!\n);
    } else {
        // 父进程
        struct epoll_event eventItems[EPOLL_SIZE_HINT * 100];
        printf("parent process start wating!\n");
        // 父进程epoll阻塞等待,最后一个参数是-1,表示永远等待
        int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_SIZE_HINT*100, -1);
        // 被唤醒 epoll_wait返回,
        printf("get event : %d\n", eventCount);
    }
    exit(0);
}

复制代码

控制台输出结果如下:

代码不难,Linux环境可以直接跑的,大家可以自己动手敲一遍,加深理解! 以上!!

关注下面的标签,发现更多相似文章
评论