聊一聊 EventBus 源码和设计之禅

9,405 阅读13分钟

前言

笔者看过一些知名开源项目的源码,认为 EventBus 算是其中最简单的,甚至复杂程度不在一个级别上。解析源码前先提一下以下几个变量和类,掌握了这些变量和类基本上 EventBus 已经就掌握一半了。

  • METHOD_CACHEMap<Class<?>, List<SubscriberMethod>> 类型。键为注册类的 Class,值为该类中所有 EventBus 回调的方法链表(也就是被 @Subscribe 标记的方法们)。
  • typesBySubscriberMap<Object, List<Class<?>>> 类型。键为对象本身(例如 Activity 对象),值为该对象中所有的 Event 的类类型。该字段只用于仅用于判断某个对象是否注册过,在日常使用中几乎没什么作用(感谢评论区指出)。
  • Subscription 类(文中称订阅信息):关注类中两个字段,一个是 Object 类型的 subscriber,该字段即为注册的对象(在 Android 中时常为 Activity);另一个是 SubscriberMethod 类型的 subscriberMethod,细节如下:
    • subscriberMethodSubscriberMethod 类型(文中称订阅方法)。关注类中有个字段 eventTypeClass<?> 类型,代表 Event 的类类型。
  • subscribtionsByEventTypeMap<Class<?>, CopyonWriteArrayList<Subscribtion>> 类型。键为 Event 的类类型,值为元素为 Subscription(订阅信息)链表。核心字段。

register()

直接查看 EventBus#register() 源码:

public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    // 根据当前注册类获取 List<SubscriberMethod>
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
        	// subsciber 对 List<SubscriberMethod> 中每个 SubscriberMethod 进行订阅
            subscribe(subscriber, subscriberMethod);
        }
    }
}

获取当前注册对象所有订阅方法信息

先查看如何根据当前注册类获取 List 的,SubscriberMethodFinder#findSubscriberMethods(Class<?> subscriberClass) 源码精简如下:

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    // 如果已存在则返回
    if (subscriberMethods != null) {
        return subscriberMethods;
    }

    subscriberMethods = findUsingReflection(subscriberClass);
    METHOD_CACHE.put(subscriberClass, subscriberMethods);
    
    return subscriberMethods;
}

METHOD_CACHE 前面提到过,是存着注册类与其所有需要回调的 Event 方法列表的键值对。如果已经存在则直接返回,如果否则需要通过 findUsingReflection(subscriberClass) 方法进行查找再返回,当然,返回之前需要存入 METHOD_CACHE 中,否则该 METHOD_CACHE 就没有存在的意义了。

SubscriberMethodFinder#findUsingReflection() 源码如下:

private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
    	// 通过纯反射去获取被 @Subscribe 所修饰的方法
        findUsingReflectionInSingleClass(findState);
        // 将当前 class 的父类 class 赋值给 findState.clazz 
        findState.moveToSuperclass();
    }
    // 重置 FindState 便于下一次回收利用
    return getMethodsAndRelease(findState);
}    

初始化 FindState 对象后,会进入一个 while 循环中,不停地去反射获取当前类和其父类(注意,在 Java 中,如果当前类实现了一个接口,即使该接口的方法被 @Subscribe 所修饰,当前类中的方法也是不包含该注解属性的,所以如果在接口中对某个方法使用了 @Subscribe 修饰然后让类去实现这个接口是没有任何作用的)的订阅方法并添入列表中,最终返回这个列表并重置 FindState 对象利于下一次重复使用。反射获取当前类和其父类的订阅方法源码简化如下:

private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
    	// 返回当前类自身方法和显式重载的父类方法
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        methods = findState.clazz.getMethods();
        findState.skipSuperClasses = true;
    }
    for (Method method : methods) {
        int modifiers = method.getModifiers();
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 1) {
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    Class<?> eventType = parameterTypes[0];
                    // needCheck
                    if (findState.checkAdd(method, eventType)) {
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            }
        }
    }
}

这里想要提及的一点事,获取到 @Subscribe 修饰的目标方法后,并非无脑地添入 subscriberMethods 中,而实际上是需要过滤一遍的,讲解 checkAdd() 源码前,希望读者思考以下几个问题:

  • 对于同一个 Event,当前类对该对象使用了多个方法进行了多次订阅,那么如果该 Event 被发射的时候,当前类会如何调用这些方法?
  • 对于同一个 Event,父类对该对象进行了一次订阅,子类重写该订阅方法,那么如果该 Event 被发射的时候,父类子类当中会如何处理这些方法?

解决这些方法就需要去看看 checkAdd() 的底层实现了——

boolean checkAdd(Method method, Class<?> eventType) {
    Object existing = anyMethodByEventType.put(eventType, method);
    if (existing == null) {
        return true;
    } else {
        return checkAddWithMethodSignature(method, eventType);
    }
}

可以看到 anyMethodByEventType 使用了 Event 的 Class 作为键,这像是意味着一个类对于同一个 Event 只能订阅一次,事实上是不是这样,还得继续看看 checkAddWithMethodSignature(),其源码简化如下:

private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
    methodKeyBuilder.setLength(0);
    methodKeyBuilder.append(method.getName());
    methodKeyBuilder.append('>').append(eventType.getName());

    String methodKey = methodKeyBuilder.toString();
    Class<?> methodClass = method.getDeclaringClass();
    Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
    if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
        return true;
    } else {
        subscriberClassByMethodKey.put(methodKey, methodClassOld);
        return false;
    }
}

可以看到 subscriberClassByMethodKey 使用方法名 + '>' + 事件类型作为键,这意味着对于同一个类来说,subscriberClassByMethodKey 肯定不会键重复(毕竟一个类中不能够方法名相同且方法参数、个数都相同),因此它最终会返回 true。这意味着一个类如果使用了多个方法对同一个 Event 对象进行注册,那么当该 Event 对象被发射出来的时候,所有的方法都将会得到回调。

但是当父类执行上述操作的时候,如果子类有「显示」实现父类的订阅方法,那么此时 subscriberClassByMethodKey.put(methodKey, methodClass) 返回值不会为空,且为子类的 Class,此时 if 上分支将会判断子类 Class 是否 isAssignableFrom 父类 Class,这肯定是会为 false 的,这将会走入 if 下分支并返回 false。这意味着当子类「显示」实现父类的订阅方法的时候,如果此时发射指定 Event 的话,父类的订阅方法将不会执行,而仅会执行子类的订阅方法。

subscribe()

获取到相应的 SubscriberMethod 链表后,就是对链表中的 SubscriberMethod 对象进行订阅了,EventBus#subscribe() 方法源码精简如下:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    }

    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
        	// 根据 priority 大小放入 List 中
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    // 省略 sticky 事件
}

subscriptionsByEventType 根据 Event 事件类类型获取订阅信息链表,当然,如果没有的话那就 new 一个并放入其中。接着根据订阅方法的优先级塞入该链表中。最后 typesBySubscriber 获取该 subsciber 的所有 Event 事件类型链表,并添加当前 Event 事件类型。关于 sticky 事件的具体内容在 sticky 中会具体讲解。

至此 EventBus#register(Object) 方法算是结束了。

post()

EventBus#post(Object) 源码精简如下:

public void post(Object event) {
    PostingThreadState postingState = currentPostingThreadState.get();
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);

	// 确保不会被调用多次
    if (!postingState.isPosting) {
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        try {
            while (!eventQueue.isEmpty()) {
            	// 分发 Event 事件
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
        	// 最后要 reset flag
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

currentPostingThreadState 是一个 ThreadLocal 类,通过它获取到 PostingThreadState 对象,再根据该对象获取到 event 链表(有没有联想到 Android 中的消息机制?),并将传入的 event 塞入该链表。为了控制 Event 出队列不会被调用多次,PostingThreadState 对象有一个 isPosting 来标记当前链表是否已经开始进行回调操作,通过源码可以看到,每次分发完一个 Event 事件,该事件也会被从链表中 remove 出去。

postSingleEvent()

具体 postSingleEvent() 源码精简如下:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    postSingleEventForEventType(event, postingState, eventClass);
}

追溯 EventBus#postSingleEventForEventType() 源码精简如下:

private void postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
        }
    }
}

通过 subscriptionsByEventType 获取该 Event 事件对应的订阅信息链表,然后将该订阅信息Event 和当前线程信息传给了 postToSubscription() 方法,该方法戳进去一看就知道是用来去回调所有订阅方法的,该方法的具体分析在 threadMode 中。实际上到这里 post() 流程就算是结束了。所以实际上核心方法 post() 的源码是十分简单的,也可以看得到,核心字段也仅有 subscriptionsByEventType 一个而已。

unregister()

EventBus#unregister(Object) 方法源码精简如下:

public synchronized void unregister(Object subscriber) {
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            unsubscribeByEventType(subscriber, eventType);
        }
        typesBySubscriber.remove(subscriber);
    }
}

整体看来分两步走,一步是移除注册对象和其所有 Event 事件链表,即 typesBySubscriber 移除相关键值对的;再就是在 unsubscribeByEventType() 方法中对 subscriptionsByEventType 移除了该 subscriber 的所有订阅信息(可以看到实际上没有对 METHOD_CACHE 进行相关移除操作,便于下一次注册的时候可以很方便拿到之前的信息,这便是缓存的作用所在)。

threadMode

在 EventBus 中,共有四种 threadMode,如下:

public enum ThreadMode {
    POSTING,

    MAIN,

    MAIN_ORDERED,

    BACKGROUND,

    ASYNC
}

  • POSTING:接收事件方法应执行在发射事件方法所在的线程(由于发射事件方法线程可能是主线程,这意味着接收方法不能执行耗时操作,否则会阻塞主线程)
  • MAIN:在 Android 中则接收事件方法应执行在主线程,否则(在 Java 项目中)等同于 POSTING。如果发射事件方法已位于主线程,那么接收事件方法会被「立即」调用(这意味着接收事件方法不能执行耗时操作,否则会阻塞主线程;同时,由于是「立即」调用,所以发射事件方法此时是会被接收事件方法所阻塞的),否则等同于 MAIN_ORDERED
  • MAIN_ORDERED:在 Android 中则接收事件方法会被扔进 MessageQueue 中等待执行(这意味着发射事件方法是不会被阻塞的),否则(在 Java 项目中)等同于 POSTING
  • BACKGROUND
    • 在 Android 中
      • 发射事件方法在主线程中执行,则接收事件方法应执行在子线程执行,但该子线程是 EventBus 维护的单一子线程,所以为了避免影响到其他接收事件方法的执行,该方法不应太耗时避免该子线程阻塞。
      • 发射事件方法在子线程中执行,则接收事件方法应执行在发射事件方法所在的线程。
    • 在 Java 项目中,接收事件方法会始终执行在 EventBus 维护的单一子线程中。
  • ASYNC:接收方法应执行在不同于发射事件方法所在的另一个线程。常用于耗时操作,例如网络访问。当然,尽量避免在同一个时间大量触发此类型方法,尽管 EventBus 为此专门创建了线程池来管理回收利用这些线程。

关于以上 threadMode 哪几种应避免耗时操作,耗时时阻塞的是哪条线程,希望各位读者能够仔细阅读。

说完几种 threadMode 之后,再来看看前文遗留下来的问题——postToSubscription() 源码如下:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

细看源码,其实可以发现只用到了两种方法,一种是 invokeSubscriber 意味着立即调用该方法,另一种是 xxxPoster.enqueue() 意味着需要使用其他线程来执行该方法。

invokeSubscriber()

源码如下:

void invokeSubscriber(Subscription subscription, Object event) {
    try {
    	//纯反射
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

实在是简单粗暴直接通俗易懂,笔者佩服。

那么那些情况会使用 invokeSubscriber() 方法呢?

  • POSTING:不用说,既然和发射事件线程同一条线程执行,那么当然直接调用 invokeSubscriber() 即可。
  • MAIN:在确保发射事件线程是主线程的情况下,直接调用 invokeSubscriber()
  • MAIN_ORDERED:如果当前项目不是 Android 项目情况下(纯 Java 项目),将会直接调用 invokeSubscriber()
  • BACKGROUND:前面提到如果发射事件线程不是主线程的话,接收事件将会执行于发射事件所在的线程,所以也会直接调用 invokeSubscriber()

文中已多次提到 Android 项目和纯 Java 项目,是由于在 Java 项目中大部分情况下不需要特地区分主线程和子线程(这一点笔者也得到了女票的证实)。其实不仅是 EventBus,RxJava 也是如此,RxJava 中是没有 Schedulers.mainThread() 一说的,仅有 Schedulers.trampoline() 表当前线程。

Poster#enqueue()

根据源码可以看出来分为以下三种:

这里写图片描述

HandlerPoster 源码不在此扩展了,熟悉 Android 的读者们应该都猜得到 HandlerPoster 底层实现肯定是通过 Handler 机制来实现的,HandlerPoster#enqueue() 方法的实现离不开 Hanlder#sendMessage(),而处理方式肯定就是在 Hanlder#handleMessage() 中去调用 invokeSubscriber()

BackgroundPoster 源码也不在此扩展了,前面提到 EventBus 会维护单一线程去执行接收事件方法,所以肯定会在 Runnable#run() 中去调用 invokeSubscriber()

AsyncPoster 的底层实现实际上与 BackgroundPoster 大同小异,但是有读者会疑惑了,BackgroundPoster 底层维护的是「单一」线程,而 AsyncPoster 肯定不是这样的啊。这里的细节留到设计技巧一节再来细说。

sticky

什么叫做 sticky 事件笔者此处就不做扩展了。项目中如果想要发射 sticky 事件需要通过 EventBus#postSticky() 方式,源码如下:

public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    post(event);
}

可以看到第一步是将该事件放入 stickyEvents 中,第二步则是正常 post()。为避免多线程操作 postSticky(Object)removeStickyEvent(Class<?>) 引发的冲突,所以对 stickyEvents 对象添加了 synchronized 关键字,不得不说 EventBus 作者的设计实在是缜密啊。前文提到 EventBus#register() 中关于 sticky 事件的代码简化如下:

if (subscriberMethod.sticky) {
    Object stickyEvent = stickyEvents.get(eventType);
    if (stickyEvent != null) {
        postToSubscription(newSubscription, stickyEvent, isMainThread());
    }
}

可以看到,没有什么特殊的地方,判断当前事件是否 sticky,如果 sticky 则从 stickyEvents 拿出该事件并执行 postToSubscription() 方法。

优化操作

eventInheritance

不知道各位读者在日常使用 EventBus 中会不会在 Event 之间存在继承关系,反正笔者是没这样用过。也正是存在笔者这种不会这样使用 Event 和会使用 Event 继承的开发者之间的矛盾才会有这个字段出现。全局搜索该字段仅用于发射事件的时候判断是否需要发射父类事件,由于该字段默认为 true,所以如果各位读者和笔者一样在项目开发中 Event 不存在继承关系的话,可以将该字段设为 false 以提高性能。

APT

EventBus 内部使用了大量的反射去寻找接收事件方法,实际上有经验的小伙伴知道可以使用 APT 来优化。这也就是 EventBus 3.0 引入的技术,此处的使用便不在此处扩展了,代码中通过 ignoreGeneratedIndex 来判断是否使用生成的 APT 代码去优化寻找接收事件的过程,如果开启了的话,那么将会通过 subscriberInfoIndexes 来快速得到接收事件方法的相关信息。所以各位读者如果没有在项目中接入 EventBus 的 APT,那么可以将 ignoreGeneratedIndex 设为 false 提高性能。

设计技巧

反射方法

EventBus 在获取接收事件方法的信息中,通过 getDeclaredMethods() 来获取类中所有方法而并不是通过 getMethods(),由于前者只反射当前类的方法(不包括隐式继承的父类方法),所以前者的效率较后者更高些。

FindState

以下代码是 FindState 的获取:

private FindState prepareFindState() {
    synchronized (FIND_STATE_POOL) {
        for (int i = 0; i < POOL_SIZE; i++) {
            FindState state = FIND_STATE_POOL[i];
            if (state != null) {
                FIND_STATE_POOL[i] = null;
                return state;
            }
        }
    }
    return new FindState();
}

以下代码是 FindState 的回收复用:

private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
    List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
    findState.recycle();
    synchronized (FIND_STATE_POOL) {
        for (int i = 0; i < POOL_SIZE; i++) {
            if (FIND_STATE_POOL[i] == null) {
                FIND_STATE_POOL[i] = findState;
                break;
            }
        }
    }
    return subscriberMethods;
}

可以看到,EventBus 使用 FindState 并不是简单的 new,由于 FindState 在注册流程中使用频繁且创建耗费资源,故创建 FindState 池复用 FindState 对象,与此相同的还有 PendingPost,它用于反射调用接收事件方法,具体不在此扩展。

AsyncPoster、BackgroundPoster

前面提到 AsyncPosterBackgroundPoster 的底层实现是一样的,但是有读者会疑惑了,BackgroundPoster 底层维护的是「单一」线程,而 AsyncPoster 肯定不是这样的啊——笔者也是读了源码之后才发现被 EventBus 作者摆了一道——在默认情况下实际上两者底层维护的都是 Executors.newCachedThreadPool(),这是一个有则用、无则创建、无数量上限的线程池。而 BackgroundPoster 是如何控制「单一」的呢?其在 Executor#execute() 上添加了 synchronized 并设立 flag,保证任一时间只且仅能有一个任务会被线程池执行;而 AsyncPoster 只需无脑地将传来的任务塞入线程池即可。

后记

EventBus 源码虽简单,但是当中的很多设计技巧是非常值得学习的,例如前文提到的复用池,以及遍布 EventBus 源码各处的 synchronized 关键字。希望各位读者也能够深入到其中去探索一番,寻找到笔者未找到的宝藏。

另外,我建了一个群,如果你对文章有什么疑问,或者想和我讨论 Android 技术,欢迎入群哈。