深入Android系统(五)Android的同步和消息机制

2,015 阅读27分钟

Android 的消息机制

消息驱动是一种进程或线程的运行模式。内部、外部的各种事件都可以放到消息队列中按序处理。这种模式特别适合处理大量的交互事件。

Android 应用的UI线程,同样采用了消息驱动模式,所有外部来的按键事件、触屏事件、各种系统Intent、广播等都会转化为内部的消息,然后在主线程中分发处理。

消息模型

现在的操作系统普遍采用消息驱动模式。Windows操作系统就是经典的消息驱动类型。但是,Android的消息处理机制和Windows的不太相同,我们看下二者的对比图:

image

上图:

  • Windows的消息处理模型中,存在一些系统消息队列,这个队列是整个进程的核心,几乎所有动作都要转换成消息,然后放到这个队列中,消息的处理只能在主线程来完成。
  • Android没有全局的消息队列,消息队列是和某个线程关联在一起的。每个线程最多有一个消息队列,消息的取出和处理在线程中完成。

比较而言:

  • Windows的消息模型比较简单
    • 由于全局队列的存在
      • 消息的发送简单方便
    • 但是
      • 容易成为瓶颈,如果某个消息不能及时完成,整个进程都会挂起
      • 全局队列,需要频繁的同步,会增大系统开销
  • Android的消息模型
    • 相对复杂很多
      • 使用前必须为线程构造消息队列
      • 发送消息必须先得到消息队列Handler对象
    • 但是由于消息队列在各个线程中
      • 对于线程内部的消息,不会存在额外的系统开销

我们来看下Android中与消息机制相关的类,主要是:

  • Looper类:Looper对象时线程的消息循环处理器,每个线程只能有一个Looper对象。
    • Looper内部有一个消息队列MessageQueue,所有线程的消息都存放在这个队列中。
    • 新创建一个线程时,系统不会马上为这个线程创建一个Looper对象,需要程序自己创建
    • Android在启动时,会为UI线程创建了一个Looper对象
  • Handler类:对象是Message的接收者和处理者。
    • 可以使用Handler对象把Message添加到消息队列中
    • 通过Handler的回调方法handleMessage来对消息队列中的Message进行处理
    • Handler对象在构造时会和某个Looper对象关联在一起
    • HandlerLooper是多对一的关系,多个Handler可以和同一个Looper对象关联在一起
  • Message类和MessageQueueMessage是消息的载体。
    • Message继承了Parcelable类,这表明Message对象可以通过binder来跨进程发送
    • MessageQueue存放Message对象的链表

这些类之间的关系如下图:

image

理解 Looper 类

Looper类的主要成员变量和方法如下:

public final class Looper {
    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
    private static Looper sMainLooper;  // guarded by Looper.class
    final MessageQueue mQueue;
    final Thread mThread;
    
    public static void prepare() {
        prepare(true);
    }

    private static void prepare(boolean quitAllowed) {......}
    public static void prepareMainLooper() {......}
    public static Looper getMainLooper() {......}
    public static void loop() {......}
}
  • 每个线程只能有一个Looper类的实例对象
  • Looper类的实例对象必须通过prepare()创建
    • prepare()会创建一个Looper对象,并保存到静态变量sThreadLocal

需要注意的是:一个线程中多次调用prepare()会抛出异常

    private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed));
    }

创建完Looper类的实例对象后,可以通过myLooper()获得这个对象

    public static @Nullable Looper myLooper() {
        return sThreadLocal.get();
    }

静态变量sThreadLocal的类型是ThreadLocal<T>,它通过将需要保存的对象线程ID关联在一起的方式实现了线程本地储存的功能。

Looper类的getMainLooper()方法将返回主线程Looper对象。

    public static Looper getMainLooper() {
        synchronized (Looper.class) {
            return sMainLooper;
        }
    }

Android应用启动时会创建主线程,同时会创建一个Looper对象和主线程相关联,所以prepareMainLooper()函数并不是给应用层使用的(主要是看注释部分哈)

    /**
     * Initialize the current thread as a looper, marking it as an
     * application's main looper. The main looper for your application
     * is created by the Android environment, so you should never need
     * to call this function yourself.  See also: {@link #prepare()}
     */
    public static void prepareMainLooper() {
        prepare(false);
        synchronized (Looper.class) {
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            sMainLooper = myLooper();
        }
    }

有了Looper对象后,可以调用Looper类的loop()方法来进入消息循环。

咱们看看官网上Looper的典型用法

class LooperThread extends Thread {
      public Handler mHandler;

      public void run() {
          Looper.prepare();

          mHandler = new Handler() {
              public void handleMessage(Message msg) {
                  // process incoming messages here
              }
          };

          Looper.loop();
      }
  }

loop()方法的主要作用是分发消息队列中消息,函数的代码如下:

    public static void loop() {
        final Looper me = myLooper();
        //......一些 Looper 对象有关的异常判断
        final MessageQueue queue = me.mQueue;
        //......一些线程ID相关的信息处理
        for (;;) {
            Message msg = queue.next(); // might block
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }
            // ......
            msg.target.dispatchMessage(msg);
            // ......
            msg.recycleUnchecked();
        }
    }
  • loop()方法会循环从MessageQueue队列中取出消息,然后把消息分发出去
  • 消息分发通过Message对象中的target变量完成,这个变量的类型是Handler
  • Message是消息的载体,发送者把需要传递的消息放在Message对象中
  • Message对象创建的时候需要指定它的处理对象target,也就是Handler

理解 Handler 类

Handler类主要负责消息的发送和处理。

在一个线程中可以只用一个Handler对象来处理所有消息,也可以使用多个。

对于构造一个Handler对象来说,需要两个参数:

  • 线程的Looper对象
    • 用来指定要给哪个线程的Looper对象发消息
    • 必需
  • 消息的处理函数callback
    • 通过callback实现对消息的集中处理
    • 也可以把callback直接放在消息对象
    • 非必需

Handler类是消息框架的一部分,在消息的定义和响应方面设计的非常灵活,具体的消息类型和响应逻辑都需要需要在应用层的代码中完成

  • 传统的消息模型中,某个线程能处理的消息的种类必须与先定义好,使用者只能使用它们来给某个线程发送消息
  • Android把消息的定义和处理完全独立出来了,线程只是提供了一个消息队列消息分发的运行环境

例如,Android主线程的实现都在Framework中,但是我们可以使用下面的方法来构造一个带有callback方法的消息发送给主线程

public static Message obtain(Handler h, Runnable callback)

这样,这个callback方法将在主线程执行

而对于Handler类的消息发送接口,可以分成两大类:

  • 一类是send
  • 一类是post

不过,在深入了解Handler类的这两种接口前,我们先简单看一下MessageQueue类中的关键接口:

  • 扫一眼后你会发现,MessageQueue类中其实只有一个插入消息的接口
  • 就是boolean enqueueMessage(Message msg, long when)

MessageQueue类中就一个接口,Handler类中哪来的那么多发送指令

不急,我们看看这两种到底是啥?

send类接口

先看下Handlersend打头的方法:

    public final boolean sendMessage(Message msg)
    public final boolean sendEmptyMessage(int what)
    public final boolean sendEmptyMessageDelayed(int what, long delayMillis)
    public final boolean sendEmptyMessageAtTime(int what, long uptimeMillis)
    public final boolean sendMessageDelayed(Message msg, long delayMillis)
    public boolean sendMessageAtTime(Message msg, long uptimeMillis)
    public final boolean sendMessageAtFrontOfQueue(Message msg)

大家随便追踪一下代码就可以找到,代码最后都走到了Handler类中的enqueueMessage方法

    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        msg.target = this;
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }
  • 所谓的send其实也只是把Message插入到MessageQueue中,同时指定消息的处理时间
  • 如果指定的时间为0,表示要立即处理,MessageQueue会把这条消息查到队列的头部

MessageQueue中的插入方法,除了消息参数msg外,只有一个时间参数uptimeMillis

因此,Handler类里面发送消息的接口虽然多,但是都是在时间上做动作,让应用方便使用而已。send类接口总结如下:

  • 如果希望马上处理,但是不打算插队,使用sendMessage
  • 如果非常紧急,希望尽快处理,使用sendMessageAtFrontOfQueue
  • 如果希望延时一段时间处理,使用sendMessageDelayed
  • 如果希望在指定时间处理,使用sendMessageAtTime
  • 如果定义的消息只有消息ID,不用附加参数,可以使用sendEmptyMessage

post类接口

其实,看到了实现也就明白post类接口是在干啥了

    public final boolean post(Runnable r)
    {
       return  sendMessageDelayed(getPostMessage(r), 0);
    }
    public final boolean postAtTime(Runnable r, long uptimeMillis)
    {
        return sendMessageAtTime(getPostMessage(r), uptimeMillis);
    }
    //......省略部分post函数
    public final boolean postDelayed(Runnable r, Object token, long delayMillis)
    {
        return sendMessageDelayed(getPostMessage(r, token), delayMillis);
    }
    public final boolean postAtFrontOfQueue(Runnable r)
    {
        return sendMessageAtFrontOfQueue(getPostMessage(r));
    }
    private static Message getPostMessage(Runnable r, Object token) {
        Message m = Message.obtain();
        m.obj = token;
        m.callback = r;
        return m;
    }

从代码的实现上看

  • 这些post类型的方法也是在使用send类型的方法
  • 只是在send类型的方法上增加了Runnable类的参数
  • 通过getPostMessage来打包出一个Message对象
  • 最后通过send类型的方法把对象添加到MessageQueue

到这里我们可以把这两个接口的特性归纳一下了:

  • post类接口用来发送带有处理方法(Runnable对象)的消息
  • send类接口则用于发送传统带有消息ID的消息

还记得Looperloop循环里会调用msg.target.dispatchMessage函数么

我们来看下实现,注释很详细哟:

    //请注意,这部分代码一共出现了三个回调功能,我们逐一注释哈
    public void dispatchMessage(Message msg) {
        if (msg.callback != null) {
            // 这里是第一个回调函数
            // 当判断到消息中携带了Runnable的对象时
            // 直接执行消息中的 run 函数
            // 并退出
            handleCallback(msg);
        } else {
            // 此处是第二个回调函数
            // 当我们创建Handler对象时可能会传入的 Handler.Callback 对象
            if (mCallback != null) {
                // Handler.Callback 对象如果不为空,会执行回调函数
                if (mCallback.handleMessage(msg)) {
                    // 请注意此处
                    // 如果在 Handler.Callback 的回调中返回了 true
                    // 就不会执行到第三个回调函数了
                    return;
                }
            }
            // 此处是第三个回调函数
            // 当 Handler.Callback 对象为空
            // 或者当 Handler.Callback 对象返回 false 时,执行此函数
            // 此函数是一个空方法体
            // Handler 的子类可以通过重载该方法来达到处理消息的目的
            handleMessage(msg);
        }
    }
    private static void handleCallback(Message message) {
        message.callback.run();
    }
    public void handleMessage(Message msg) {
    }

dispatchMessage的代码来看:

  • 消息会优先分给消息中自带的回调方法
  • 否则,如果Handler定义了回调方法,先调用这个方法处理
  • 如果Handler的回调方法没有处理,还会调用Handler自己的handleMessage

MessageQueue类的铺垫内容

这部分是对研究MessageQueue类的补充。内容并不多,重点是Message类的一个setAsynchronous函数。

如果不了解MessagesetAsynchronous函数,对于后面MessageQueue类的一些逻辑判断可能会有些晕晕的

AndroidMessage类中,有一个setAsynchronous(boolean async)方法,它的作用是设置一个标志位,代码如下:

    public void setAsynchronous(boolean async) {
        if (async) {
            flags |= FLAG_ASYNCHRONOUS;
        } else {
            flags &= ~FLAG_ASYNCHRONOUS;
        }
    }

这个函数到底怎么使用呢?

  • MessageQueue中有一个方法叫postSyncBarrier()
    • 调用这个方法会在消息队列中插入一个没有Handler对象的消息
    • 这个没有Handler对象的消息称为SyncBarrier
  • MessageQueue将暂停处理队列中SyncBarrier以后的消息
    • 好比一群人排队买票,有人在队列中放了一个牌子从这开始,停止销售
  • 但是此时如果还有消息需要处理,可以使用setAsynchronous(boolean async)方法给这个消息做个标记
  • MessageQueue检测到这个标记后,会正常处理这条信息
  • 其他未做标记的信息还是暂停处理,直到通过removeSyncBarrier移走这个SyncBarrier

好滴,铺垫完成,继续学习吧~

分析MessageQueue

MessageQueue类的构造方法

我们先看下MessageQueue类的构造方法

    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();
    }

非常简洁。。。:

  • MessageQueue对象的构造是调用本地方法nativeInit()完成的
  • nativeInit()方法对应的JNI函数是native层android_os_MessageQueue_nativeInit()函数
    • 咋还调用到native层了,这就坏滴很啊

我们继续看下android_os_MessageQueue_nativeInit()的函数代码:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    //......
}

android_os_MessageQueue_nativeInit()函数主要的功能是新创建了一个本地的NativeMessageQueue对象。我们再来看下NativeMessageQueue的构造函数:

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}
  • NativeMessageQueue的构造函数只是创建一个本地的Looper类对象。

NativeMessageQueue类的代码来看,它本质上是一个代理类。它把Java层的调用转变为对native层Looper类的调用

native层Looper类也实现了一套完整的消息处理机制。但是Java层Looper类native层Looper类并没有直接关系。

MessageQueue虽然使用了native层的Looper类,但也是只使用了它的等待/唤醒机制,其余的如消息队列的实现还是在Java层

因此,对于MessageQueue中的JNI调用,我们直接分析native层Looper类就好了,我们先来简单看下native层Looper类的构造函数:

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    // 通过系统调用eventfd来创建一个eventfd对象
    // 这个函数会创建一个用于事件通知的文件描述符。
    // 并把 flag 设置成
        // EFD_CLOEXEC : 文件被设置成 O_CLOEXEC,创建子进程 (fork) 时不继承父进程的文件描述符。
        // EFD_NONBLOCK : 文件被设置成 O_NONBLOCK,执行 read / write 操作时,不会阻塞。
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    // ...... 省略异常判断
    // 通过epoll操作监听 mWakeEventFd 文件描述符
    rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
    // Close old epoll instance if we have one.
    // ...... 省略部分操作
    // Allocate the new epoll instance and register the wake pipe.
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    //......
    // 一些对象数据填充
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
    // 添加epoll监听
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    // ......
}

在上面的代码中,Looper类的构造函数做了两件事情:

  • 创建一个用于事件通知的文件描述符,eventfd对象
  • 使用epoll来监听数据

MessageQueue类的构造方法就分析到这里了

MessageQueue中的消息处理过程

MessageQueue中的消息循环在方法next()中,代码有点长,先简单描述下功能:

  1. 检查队列中的第一条信息是否为SyncBarrier消息
    • 如果是,寻找队列中标志为FLAG_ASYNCHRONOUS的消息,把找到的第一消息个作为当前处理消息
    • 如果不是,取当前队列的第一条消息作为当前处理消息
  2. 如果当前取出的消息不为NULL,检查该消息的处理时间是否已经超时
    • 如果没有超时,计算等待时间
    • 如果时间到了,next()方法将返回该消息并退出
  3. 如果取出的消息为NUll,表示队列中没有可以处理的消息。
    • 设置等待时间为-1,永久等待
  4. 检查队列中的退出标志
    • 如果检测到退出标志,则销毁native层中创建的对象,然后next()方法退出
  5. 检查是否存在IdleHandler的回调函数
    • 如果不存在,继续循环,通过nativePollOnce()方法挂起线程并等待新消息到来
    • 如果存在
      • 则调用所有IdleHandler的回调函数
      • 对于返回false的回调函数,在调用完成后从队列中移除
      • 最后把epoll等待时间设置为0

在看next()代码前,我们先看下nativePollOnce()方法的调用流程:

  • nativePollOnce()调用了NativeMessageQueue::pollOnce
  • NativeMessageQueue::pollOnce调用了Looper::pollOnce
  • Looper::pollOnce调用了Looper::pollInner
  • 最后,Looper::pollInner调用了epoll_wait

我们看下调用epoll_wait部分的代码:

//......
    // We are about to idle.
    mPolling = true;

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false;
//......
  • 调用epoll_wait会阻塞当前线程
  • 当有数据进来后或到达超时时间,epoll_wait才会返回

接下来再看下next()代码部分:

    Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }
        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
            // 调用本地方法等待nextPollTimeoutMillis毫秒
            // -1表示永远阻塞
            nativePollOnce(ptr, nextPollTimeoutMillis);
            // 此处使用了同步
            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    // SyncBarrier 的标志就是消息的target为null
                    // 如果队列的第一条消息就是 SyncBarrier
                    // 忽略普通消息,查找第一条异步消息
                    // 即执行过setAsynchronous(true)的消息
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    // 找到消息后,先进行时间判断
                    if (now < msg.when) {
                        // 还未到处理这条消息的时间,计算需要等待的时长
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        // 取消阻塞标志
                        mBlocked = false;
                        // 从队列中取出消息
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        // 标记使用并返回消息
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    // 表示队列中现在没有需要处理的消息
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }
                if (mQuitting) {
                    // 如果设置了退出标志,则销毁native对象,并返回
                    dispose();
                    return null;
                }
                // If first time idle, then get the number of idlers to run.
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                // 检查是否安装了IdleHandler
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run.  Loop and wait some more.
                    mBlocked = true;
                    continue;
                }
                //将Idle Handler放到数组mPendingIdleHandlers中
                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            // 处理所有的Idle Handler,如果回调结果为false,表示不再继续处理
            // 则从列表中移除该Idle Handler
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    // 大家可以在主线程注册一个IdleHandler回调
                    // 回调中延时10s,看看有木有什么奇怪的地方,哈哈哈
                    // 也可以试试弹个窗啥的,会有惊喜哟
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }
                if (!keep) {
                    //此处为移除
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }
            // Reset the idle handler count to 0 so we do not run them again.
            pendingIdleHandlerCount = 0;
            // While calling an idle handler, a new message could have been delivered
            // so go back and look again for a pending message without waiting.
            nextPollTimeoutMillis = 0;
        }
    }

MessageQueue发消息

MessageQueue发消息使用的是enqueueMessage()函数,来瞅一瞅,很详细的注释哈:

    boolean enqueueMessage(Message msg, long when) {
        if (msg.target == null) {
            // 如果要插入消息的target为null,说明没有指定Handler,抛出异常
            throw new IllegalArgumentException("Message must have a target.");
        }
        if (msg.isInUse()) {
            // 如果要插入的消息处于正在使用中,说明是消息被重复加入,抛出异常
            throw new IllegalStateException(msg + " This message is already in use.");
        }
        // 增加同步操作
        synchronized (this) {
            if (mQuitting) {
                // 如果已经处于退出状态,返回false,打印警告信息
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }
            // 做一些带插入消息的状态处理
            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            // 是否需要唤醒的标志
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
                // p == null 说明队列中没有消息
                // 或者 (when == 0 || when < p.when) 说明当前消息需要插入到头部
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                // 根据 next() 的阻塞状态来设置是否需要唤醒
                // 如果线程阻塞了,则需要唤醒
                needWake = mBlocked;
            } else {
                // 队列中有消息
                // 只有在头部p.target == null,说明是SyncBarrier
                // 并且当前消息为异步消息时 msg.isAsynchronous()
                // 才需要唤醒
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                // 寻找合适的位置插入消息
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    // 请注意此处,只有在消息插入前才会走到这里
                    // 如果消息插入前发现已经有消息被设置为异步消息
                    // 则本消息不需要唤醒
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }
            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                // 唤醒线程的操作
                nativeWake(mPtr);
            }
        }
        return true;
    }
  • enqueueMessage方法插入消息时根据时间来排序,时间早的在前
  • 消息队列的组织结构利用了Message类中的next指针形成一个单向链表
  • 在插入消息时会计算是否需要唤醒线程,enqueueMessage希望尽量避免唤醒处理线程
    • 在插入了一条马上要处理的消息后会唤醒线程
    • SyncBarrier状态下又插入了一条异步消息后会唤醒线程

最后,我们看下唤醒线程的操作:

void Looper::wake() {
    // ......移除一些debug代码
    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    // ......移除一些debug代码
}
  • TEMP_FAILURE_RETRY只是一个do..while的宏定义,核心是系统调用write函数
  • wake()通过系统调用write写入数据来唤醒消息处理线程
  • 处理线程通过epoll监听eventfd上的数据
  • 一旦数据到来,线程就会被唤醒,next()方法会继续处理消息

到这里,我们就把MessageQueue这部分给串起来了:

  1. Looper类中,通过Looper.prepareLooper.loop来初始化和启动消息服务,这里面包括:
    • MessageQueue的初始化
    • 创建一个无限循环用来处理消息
    • 循环通过MessageQueuenext()函数取消息
    • next()会根据队列中消息的状态等待,执行epoll_wait操作阻塞线程等待消息到达通知
  2. Handler类中通过send*或者post*来完成消息发送,这里面包括:
    • 真正执行的是通过enqueueMessage插入消息
    • 需要唤醒的情况下,通过系统调用write来通知唤醒线程
  3. 而对于MessageQueue的状态,我们可以通过IdleHandler来监听是否队列是否为空

进程间的消息传递

Android 的消息可以在进程之间传递。当然,进程间消息的传递是建立在Binder通信基础上的。

我们知道,只要有了Binder引用对象就可以发起远程调用Android中如果希望向另一个进程的Handler发送消息,一定要通过某个Binder代理对象完成。

我们再看下Handler类中,方法getIMessage()会创建一个Binder对象

    final IMessenger getIMessenger() {
        synchronized (mQueue) {
            if (mMessenger != null) {
                return mMessenger;
            }
            mMessenger = new MessengerImpl();
            return mMessenger;
        }
    }

getIMessenger中创建的对象类型是MessengerImpl,它是一个Binder的服务类,从IMessenger.Stub类继承过来的。MessengerImpl中的send()方法的作用就是给Handler发消息。代码如下:

    private final class MessengerImpl extends IMessenger.Stub {
        public void send(Message msg) {
            msg.sendingUid = Binder.getCallingUid();
            Handler.this.sendMessage(msg);
        }
    }

好了,接口有了。

  • 调用Handler对象的getIMessenger方法就能得到给这个Handler发送消息的Binder对象

但是要跨进程发送消息,怎么在调用进程得到这个Binder对象? 我们来深入了解下Messenger

理解Messenger

Messenger类实际上是对IMessenger对象的包装,它里面包含了一个Handler对象关联的MessengerImpl

类信息如下:

public final class Messenger implements Parcelable {
    private final IMessenger mTarget;
    public Messenger(Handler target) {
        mTarget = target.getIMessenger();
    }
    public void send(Message message) throws RemoteException {
        mTarget.send(message);
    }
    //......省略一些方法
    // 请注意这个构造方法哈,在 bindService 时会用到
    public Messenger(IBinder target) {
        mTarget = IMessenger.Stub.asInterface(target);
    }
}
  • 从代码看,Messenger还是用的Binder通信的那一套逻辑
    • 实现了Parcelable接口,Binder通信时可以直接传输过去
    • Messenger持有HandlerIMessenger对象
    • 这样,调用进程在得到Messenger对象的同时也就获得了Handler发送消息的能力
  • 使用Messenger的好处就是隐藏了通信中使用Binder的细节,让整个过程看起来就像在发送一个本地消息一样简单

我们看个简单的示例:

  1. 定义一个远程Service,记得配置android:process属性哈
public class RemoteService extends Service {
    private static final String TAG = "HuaLee";
    private static final Handler mHandler = new Handler(new Handler.Callback() {
        @Override
        public boolean handleMessage(Message msg) {
            Log.d(TAG, "handleMessage=" + msg);
            return true;
        }
    });
    private static final Messenger messenger = new Messenger(mHandler);

    @Override
    public IBinder onBind(Intent intent) {
        return messenger.getBinder();
    }
}
  1. 找一个Activity,绑定服务,并转换成Messenger对象
    private Messenger mRemoteMessenger = null;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        //......
        Intent intent = new Intent(this, RemoteService.class);
        bindService(intent, new ServiceConnection() {
            @Override
            public void onServiceConnected(ComponentName name, IBinder service) {
                Log.d(TAG, "onServiceConnected");
                mRemoteMessenger = new Messenger(service);
            }

            @Override
            public void onServiceDisconnected(ComponentName name) {

            }
        }, BIND_AUTO_CREATE);
    }
  1. 这样,我们就可以通过Messenger对象来发送消息给服务端了
    Message msg = Message.obtain();
    mRemoteMessenger.send(msg);

是不是有了例子更好理解啦,哈哈哈哈

如果只需要在两个进程间进行简单的消息往来,上面这部分已经够用了。为了方便应用使用,Android 还提供了 AsyncChannel 类来建立双向通信。

建立通信通道-AyncChannel类的作用

AyncChannel类对上层应用来说又是一个隐藏类,源码路径在:frameworks/base/core/java/com/android/internal/util/AsyncChannel.java。是用来建立两个Handler之间的通信通道。这两个Handler可以在一个进程中,也可以在两个进程中。

通过AyncChannel类建立的通信双方的地位并不是对等的

  • 一方要充当Service并响应AyncChannel类中定义的消息
  • 另一方则充当client的角色,主动去连接对方

使用AyncChannel类首先要明确通信双方使用半连接模式还是全连接模式

  • 半连接模式是指建立连接后,只能客户端主动给服务端发送消息,服务端在收到客户端的消息后,利用消息中附带的Messenger对象来给客户端回复消息
  • 全连接模式则是双方可以主动发送消息

显然,全连接模式半连接模式占用更多的系统资源。

半连接模式

AyncChannel类中提供很多个连接方法,客户端需要根据情况使用。但是在连接前,还是要先得到服务端Messenger对象。

前面介绍过,Java层Binder传递有两种方式:

  • 一种是通过已经建立好的Binder通道来传递Binder对象
  • 一种是通过组件Service来获得Service中包含的Binder对象

AyncChannel类同时支持上面这两种方式。

如果客户端服务端已经建立了Binder通道,那么服务端Messenger对象就可以通过这个通道传递到客户端。这样就可以使用AyncChannel类的connect方法来建立两者之间的联系:

    public void connect(Context srcContext, Handler srcHandler, Messenger dstMessenger) {
        // We are connected
        connected(srcContext, srcHandler, dstMessenger);
        // Tell source we are half connected
        replyHalfConnected(STATUS_SUCCESSFUL);
    }
  • srcContext客户端的上下文
  • srcHandler客户端Handler对象
  • dstMessenger:从服务端传递来的Messenger对象

AyncChannel类还为通信双方定义了非常简单的握手协议

  • connect方法会调用replyHalfConnected方法
  • replyHalfConnected方法会发送CMD_CHANNEL_HALF_CONNECTED消息给客户端srcHandler
    • 怎么是客户端srcHandler?不应该是服务端的么?
      • 首先,握手协议是给全连接模式准备的,对于半连接模式只有在特殊情况下才会严格执行握手协议
        • 一般情况下,半连接模式只要得到了对方的Messenger就可以通信了,握不握手不重要
        • 但是全连接模式必须先确保通信双方都准备好了才能开始通信,所以需要一个简单的握手
      • 因此,在半连接模式中,srcHandler对象不需要处理CMD_CHANNEL_HALF_CONNECTED消息
      • 半连接模式在调用完connect方法后就可以开始发送消息了

如果客户端服务端没有现成的Binder通道,可以通过启动组件Service的方式建立一个Binder通道。按照上面的Messenger类的示例一样就可以。

如果不愿意重新写一个ServiceAndorid中提供了一个抽象类AsyncService服务端可以继承它来创建一个Service。在这种情况下,客户端需要使用AsyncChannel类的另一个connect方法来建立连接:

    public void connect(Context srcContext, Handler srcHandler, String dstPackageName,String dstClassName) {
        //......
        final class ConnectAsync implements Runnable {
            //......
            @Override
            public void run() {
                int result = connectSrcHandlerToPackageSync(mSrcCtx, mSrcHdlr, mDstPackageName,
                        mDstClassName);
                replyHalfConnected(result);
            }
        }
        ConnectAsync ca = new ConnectAsync(srcContext, srcHandler, dstPackageName, dstClassName);
        new Thread(ca).start();
        //......
    }
    public int connectSrcHandlerToPackageSync(
            Context srcContext, Handler srcHandler, String dstPackageName, String dstClassName) {
        //......
        Intent intent = new Intent(Intent.ACTION_MAIN);
        intent.setClassName(dstPackageName, dstClassName);
        boolean result = srcContext.bindService(intent, mConnection, Context.BIND_AUTO_CREATE);
        return result ? STATUS_SUCCESSFUL : STATUS_BINDING_UNSUCCESSFUL;
    }
    class AsyncChannelConnection implements ServiceConnection {
        //......
        @Override
        public void onServiceConnected(ComponentName className, IBinder service) {
            mDstMessenger = new Messenger(service);
            replyHalfConnected(STATUS_SUCCESSFUL);
        }
        @Override
        public void onServiceDisconnected(ComponentName className) {
            replyDisconnected(STATUS_SUCCESSFUL);
        }
    }

上面代码把关键的几个方法都列了出来,其实也是利用Service的流程大概是:

  • connect方法会根据传入的包名类名去启动Service,启动的时间可能比较长,因此connect方法中创建一个线程去启动:new Thread(ca).start();
  • run()方法中调用了connectSrcHandlerToPackageSync方法,它会调用bindService来启动一个组件Service
  • Service启动后,使用ServiceConnection接口的实现类AsyncChannelConnection来接收Service传递回来的Binder对象
  • 然后在onServiceConnected中把Binder对象包装成Messenger对象mDstMessenger
    • 半连接模式下给服务端发送消息,用的就是mDstMessenger对象

请注意意,在这个过程中,一共会执行两次replyHalfConnected,个人理解:

  • connectSrcHandlerToPackageSync中的replyHalfConnected主要是为了绑定失败后的提示

在这种情况下,客户端Handler对象只有在收到CMD_CHANNEL_HALF_CONNECTED消息后,才能说明通信建立成功了。

使用AsyncService个人觉得没啥必要,实现起来还是通过Service这种方法。自己写多解气,哈哈哈

全连接模式

全连接模式是建立在半连接模式基础上的:

  • 客户端Handler对象收到消息CMD_CHANNEL_HALF_CONNECTED
  • 如果希望建立全连接客户端需要:
    • 服务端发送CMD_CHANNEL_FULL_CONNECTION消息
    • 同时附上客户端Messenger对象
      • Message对象中有个replyTo
  • 服务端收到消息后
    • 客户端回复CMD_CHANNEL_FULLY_CONNECTED
    • 如果服务端同意建立全连接,会将消息的第一个参数msg.arg1的值设置为0,否则设置为非0值

如下是全连接模式的消息交互图:
image

我们以Android中的WifiServiceWifiManager(客户端)为例来了解下全连接模式的建立过程:

Wifi这部分的细节还是很多的,我们这里只是简单介绍下客户端服务端Handler的通信过程哈

服务端客户端的两个Handler

  1. 首先,作为客户端WifiManager中定义的Handler如下:
private class ServiceHandler extends Handler {
        @Override
        public void handleMessage(Message message) {
            synchronized (sServiceHandlerDispatchLock) {
                dispatchMessageToListeners(message);
            }
        }
        private void dispatchMessageToListeners(Message message) {
            Object listener = removeListener(message.arg2);
            switch (message.what) {
                case AsyncChannel.CMD_CHANNEL_HALF_CONNECTED:
                    // 半连接部分连接成功
                    if (message.arg1 == AsyncChannel.STATUS_SUCCESSFUL) {
                       // 发送全连接请求
                       mAsyncChannel.sendMessage(AsyncChannel.CMD_CHANNEL_FULL_CONNECTION);
                    } else {
                        Log.e(TAG, "Failed to set up channel connection");
                        // This will cause all further async API calls on the WifiManager
                        // to fail and throw an exception
                        mAsyncChannel = null;
                    }
                    mConnected.countDown();
                    break;
                case AsyncChannel.CMD_CHANNEL_FULLY_CONNECTED:
                    // Ignore
                    break;
                //......
            }
        }
    }
  1. 作为服务端WifiService类中的Handler定义如下:
    private class ClientHandler extends WifiHandler {
        //......
        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
            switch (msg.what) {
                case AsyncChannel.CMD_CHANNEL_HALF_CONNECTED: {
                    if (msg.arg1 == AsyncChannel.STATUS_SUCCESSFUL) {
                        Slog.d(TAG, "New client listening to asynchronous messages");
                        // We track the clients by the Messenger
                        // since it is expected to be always available
                        mTrafficPoller.addClient(msg.replyTo);
                    } else {
                        Slog.e(TAG, "Client connection failure, error=" + msg.arg1);
                    }
                    break;
                }
                //......
                case AsyncChannel.CMD_CHANNEL_FULL_CONNECTION: {
                    // 收到客户端的全连接请求后,创建AsyncChannel对象
                    AsyncChannel ac = mFrameworkFacade.makeWifiAsyncChannel(TAG);
                    // 并根据客户端消息携带的Messenger对象来建立连接
                    ac.connect(mContext, this, msg.replyTo);
                    break;
                }
                //......
            }
        }

根据上面两个Handler,我们能够推理出的流程如下:

  • WifiManager作为客户端,在完成半连接后,会发送一条CMD_CHANNEL_FULL_CONNECTION的消息
  • WifiService作为服务端,在收到CMD_CHANNEL_FULL_CONNECTION
    • 先创建一个服务端自己的AsyncChannel对象
    • 然后通过AsyncChannel对象的connect方法去连接客户端
    • 然后,connect方法中会给服务端自己的Handler对象发送一个CMD_CHANNEL_HALF_CONNECTED消息
  • WifiService作为服务端,在收到CMD_CHANNEL_HALF_CONNECTED
    • 客户端发送过来的Messenger对象保存到成员变量mTrafficPoller
    • mTrafficPoller对象中持有一个MessengerList集合
  • 请注意,WifiService并没有回复CMD_CHANNEL_FULLY_CONNECTED。惊不惊喜,意不意外

发送同步消息

AyncChannel类的sendMessageSynchronously()方法来发送同步消息。sendMessageSynchronously()方法在发送完消息后会挂起线程进入等待状态,收到消息的回复后再恢复线程的运行。

AyncChannel类的sendMessageSynchronously()的核心代码如下:

    private static Message sendMessageSynchronously(Messenger dstMessenger, Message msg) {
            SyncMessenger sm = SyncMessenger.obtain();
            Message resultMsg = null;
            try {
                if (dstMessenger != null && msg != null) {
                    // 请注意这里的replyTo,设置成了 SyncMessenger 对象
                    msg.replyTo = sm.mMessenger;
                    synchronized (sm.mHandler.mLockObject) {
                        if (sm.mHandler.mResultMsg != null) {
                            Slog.wtf(TAG, "mResultMsg should be null here");
                            sm.mHandler.mResultMsg = null;
                        }
                        // 向服务端发送消息
                        dstMessenger.send(msg);
                        // 此处调用 wait 等待唤醒
                        sm.mHandler.mLockObject.wait();
                        resultMsg = sm.mHandler.mResultMsg;
                        sm.mHandler.mResultMsg = null;
                    }
                }
            } 
            //......
            sm.recycle();
            return resultMsg;
    }
  • 创建SyncMessenger对象
  • Message.replyTo设置成SyncMessenger对象
    • 这样消息的接收就转到SyncMessenger对象中了
  • 发送消息,并调用wait()方法挂起线程

对于服务端发送的回复消息,将在SyncMessenger中定义的SyncHandler中处理:

        private class SyncHandler extends Handler {
            /** The object used to wait/notify */
            private Object mLockObject = new Object();
            /** The resulting message */
            private Message mResultMsg;
            /......
            /** Handle of the reply message */
            @Override
            public void handleMessage(Message msg) {
                Message msgCopy = Message.obtain();
                msgCopy.copyFrom(msg);
                synchronized(mLockObject) {
                    mResultMsg = msgCopy;
                    mLockObject.notify();
                }
            }
        }

这部分的处理逻辑也比较简单:

  • 把接收到的消息保存到变量mResultMsg
  • 调用notify()方法唤醒线程

此时,通过wait()方法挂起的线程会被唤醒,然后返回resultMsg

Android 同步相关的概念

书中把这部分加在这里感觉有点不搭,多线程与同步相关的知识很是很多的,先摘一部分概念学习下,Java部分的多线程已经整过,只是不怎么全,后面再单独梳理吧

前面介绍了,Android程序运行时会根据需要自动产生Binder线程,因此即使上层应用代码不创造任何线程,Android应用进程中还是会有多个线程在运行。

这就会不可避免地遇到资源竞争问题。Linux下的线程的运行模式是抢占式,因此,对于共享资源访问,需要使用系统提供的同步机制来保证线程安全。

虽然同步机制能解决资源访问的冲突问题,但也不可避免地带来了性能上的损失。因此,在不影响正确性的前提下,应对尽量避免使用同步机制

原子操作

对于简单类型的全局变量进行操作时,即使是一些简单的操作,如加减法等,在汇编级别上也需要多条指令才能完成。

整个操作过程的完成需要:

  • 先读取内存中的值
  • 在CPU中计算
  • 然后写回内存

如果中间发生了线程切换并改变了内存中的值,这样最后的执行结果就会偏离预期。这种情况我们可以增加来解决;但避免这种问题发生的最好办法就是使用原子操作

Android 中用汇编语言实现了一套原子操作函数,这些函数在同步机制的实现中被广泛使用

Android 的原子操作函数

这部分函数在system/core/libcutils/include/atomic.h中定义

先来看一下和原子变量操作相关的函数:

// 原子变量的自增操作
int android_atomic_inc(volatile int32_t* addr);
// 原子变量的自减操作
int android_atomic_dec(volatile int32_t* addr);
// 原子变量的加法操作
int android_atomic_add(int32_t value, volatile int32_t* addr);
// 原子变量的与操作
int android_atomic_and(int32_t value, volatile int32_t* addr);
// 原子变量的或操作
int android_atomic_or(int32_t value, volatile int32_t* addr);
// 原子变量的读取操作
int android_atomic_acquire_load(volatile const int32_t* addr);
int android_atomic_release_load(volatile const int32_t* addr);
// 原子变量的设置操作
void android_atomic_acquire_store(int32_t value, volatile int32_t* addr);
void android_atomic_release_store(int32_t value, volatile int32_t* addr);
// 原子变量的比较并交换
int android_atomic_acquire_cas(int32_t oldvalue, int32_t newvalue,volatile int32_t* addr);
int android_atomic_release_cas(int32_t oldvalue, int32_t newvalue,volatile int32_t* addr);
// 两个原子变量的宏定义
#define android_atomic_write android_atomic_release_store
#define android_atomic_cmpxchg android_atomic_release_cas

原子操作的实现方式和CPU架构有着密切的关系,现在的原子操作一般都是在CPI指令级别实现的。这种方式不但简单,而且效率非常高。

内存屏障和编译屏障

背景知识

  1. 现代CPU中指令的执行次序不一定按照开发者编写的顺序执行,没有相关性的指令可以打乱次序执行,以充分利用CPU的指令流水线,提高执行速度。使用内存屏障来对策
  2. 编译器本身也会对指令进行优化。例如,调整指令顺序来利用CPU的指令流水线。使用编译屏障来对策

这些其实就是内存乱序访问内存乱序访问行为出现的理由是为了提升程序运行时的性能。关于乱序执行,我们可以参考wiki百科

但是对于一些对执行顺序有着很严格要求的程序,乱序执行可能会照成灾难性的后果。

这种情况就需要我们做一些特殊处理。

概念描述

也称内存栅栏内存栅障屏障指令等。它使得 CPU编译器在对内存进行操作的时候, 严格按照一定的顺序来执行, 也就是说在memory barrier 之前的指令和memory barrier之后的指令不会由于系统优化等原因而导致乱序

大多数现代计算机为了提高性能而采取乱序执行,这使得内存屏障成为必须。

语义上,内存屏障之前的所有写操作都要写入内存;内存屏障之后的读操作都可以获得同步屏障之前的写操作的结果。因此,对于敏感的程序块,写操作之后、读操作之前可以插入内存屏障