EventBus3.0源码解析

1,558 阅读20分钟

EventBus 是一个用于组件间通信的框架。它为开发提供一种非常简便的方式来是实现组件间解耦通信,并且提供了线程切换、优先级设置等功能。

EventBus示意图

从官方的示意图中不难看出,EventBus使用的是观察者模式Subscriber注册到EventBus, 当Publisher使用post方法将Event发送给EventBusEventBus就会回调SubscriberonEvent方法。观察者模式能将观察者和订阅者最大程度的解耦,这也是EventBus的功能所在。

具体用法就不多说了,具体可见官方主页github.com/greenrobot/…

本文解析使用的EventBus版本是3.0.0

进行源码分析之前,先思考一下,如果是自己,会如何实现?

首先,我们需要将注册的类、声明的订阅方法,以及方法执行的线程、优先级都保存下来; 其次,我们要可以根据接收的事件去快速查找到对应的订阅者,当有消息通知时,可以高效通知; 再者,我们还需要自己去处理线程间的切换以满足不同的应用场景; 最后,我们应该提供注销功能以便取消消息订阅。

带着思路,一起来看看源码是如何一步一步实现的。本文根据我们使用EventBus的步骤来进行讲解的。

注解@Subscribe

在声明订阅方法时,要求使用 @Subscribe注解,先来看下它的具体定义

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    ThreadMode threadMode() default ThreadMode.POSTING;

    boolean sticky() default false;

    int priority() default 0;
}

先来看三个元注解:

@Documented :生成java文档时,会将该注解也写进文档里

@Retention(RetentionPolicy.RUNTIME):有效周期是在运行时

@Target({ElementType.METHOD}):指定对方法有效。

注解里声明了三个成员变量:

  • threadMode(订阅方法执行的线程)
  • sticky(是否是粘性事件)
  • priority(优先级)。

ThreadMode是一个枚举类,定义了四种线程模式:

public enum ThreadMode {
    POSTING,
    MAIN,
    BACKGROUND,
    ASYNC
}

POSTING: 和发送事件的线程在同一个线程,避免了线程切换开销。

MAIN:订阅在主线程,事件将排队等待交付(非阻塞)。使用此模式的订阅者必须快速返回,以避免阻塞主线程。

BACKGROUND:如果是在主线程发布,则会订阅在一个后台线程,依次排队执行;如果不是在主线程发布,则会订阅在发布所在的线程。

ASYNC: 在非主线程和发布线程中订阅。当处理事件的方法 是耗时的,需要使用此模式。尽量避免同时触发大量的耗时较长的异步操作,EventBus 使用线程池高效的复用已经完成异步操作的线程。

EventBus 之所以要求在订阅方法上加上@Subscribe注解,就是相当于给订阅者打标签,框架根据注解去找到订阅者。

注册

先来看看注册的过程

public void register(Object subscriber) {

    //拿到订阅者的运行时类型Class
    Class<?> subscriberClass = subscriber.getClass();
    
    //利用订阅者的Class去查找类中声明的订阅方法
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        
    //循环遍历逐个将订阅者和订阅方法订阅到EventBus
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
        }
    }
}

主要有三步,第一步拿到订阅者的类对象,第二步通过订阅者类对象找到类中的所有订阅方法,第三步进行订阅。

查找订阅方法

在上述第二步使用了一个SubscriberMethodFinder实例来进行方法查找。SubscriberMethodFinder这个类是专门用来查找订阅方法的,findSubscriberMethods()最后返回了一个SubscriberMethod集合。SubscriberMethod类则就是对我们声明的订阅方法和参数的封装。可以先略过。

接着看findSubscriberMethods()是如何通过订阅者的Class对象来进行方法查找的。

   List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
   
        //从缓存中查找,如果已经有,则直接返回。
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        //如果缓存中没有查找到,就通过订阅者Class对象进行查找。
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            // 使用apt提前解析的订阅者信息
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        
        // 将查找到的方法存入缓存并返回
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

这里做了个缓存处理METHOD_CACHE,因为查找是比较耗时的操作,缓存可以提高效率。

private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

METHOD_CACHE是一个以订阅者Class为Key, 订阅方法集合为Value的线程安全的的HashMap

第一次执行肯定是没有缓存的,然后会根据ignoreGeneratedIndex来执行不同的方法。从方法名来看,一个是使用反射去查找,另一个是使用已有的信息去查找。

其实这里就是3.0.0引入的优化点:3.0.0引入了APT(Annotation Processing Tool),它可以在编译阶段就提前解析注解,提前检索到订阅方法。这样就提高了运行时效率。

ignoreGeneratedIndex这个值默认是false,因为反射开销大,所以默认是走findUsingInfo()分支,但是在findUsingInfo()方法中会检查本地是否有apt预先解析出的订阅者信息,如果没有,还是会执行反射方法findUsingReflectionInSingleClass()

启动apt的部分涉及到了注解处理器,所以会单独起一篇来讲解,先来看看不使用apt的情况:

    private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        //准备一个FindeState实例
        FindState findState = prepareFindState();
        //将订阅者Class与FindeState实例关联
        findState.initForSubscriber(subscriberClass);
        
        //从子类到父类去逐一查找订阅方法
        while (findState.clazz != null) {
           //使用反射查找一个类的订阅方法 findUsingReflectionInSingleClass(findState);
           //将父类赋给findState.clazz,往上进行查找
            findState.moveToSuperclass();
        }
        //返回订阅方法集合并回收FindState实例
        return getMethodsAndRelease(findState);
    }

这里去查找订阅方法,因为子类会继承父类的方法,所以当子类找不到时,需要去查找父类。这里使用了一个while循环递归进行查找。查找过程使用了一个新的对象FindState,它是用来存储查找过程中的一些信息,方便进行迭代查找。它的类定义:

    static class FindState {
        //订阅方法集合
        final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
        final Map<Class, Object> anyMethodByEventType = new HashMap<>();
        final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
        final StringBuilder methodKeyBuilder = new StringBuilder(128);

        //当前订阅者
        Class<?> subscriberClass;
        //当前查找的类
        Class<?> clazz;
        //是否跳过父类查找
        boolean skipSuperClasses;
        SubscriberInfo subscriberInfo;
    }

接着往下走,进入真正开始使用反射解析一个类的订阅方法:findUsingReflectionInSingleClass()

private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // 只获取非继承的方法,这个方法比getMethods()方法效率高,比如对一个Activity来说。 findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // 异常则直接获取所有方法(包括继承的), 这样就不用再检查父类了。
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        // 遍历所有方法筛选出订阅方法
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            // 必须是public的、非静态、非抽象的方法
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                //获取到所有参数的类型
                Class<?>[] parameterTypes = method.getParameterTypes();
                // 只能有一个参数
                if (parameterTypes.length == 1) {
                    //获取@Subscribe注解的方法
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        //检查该eventType是否已订阅了,通常订阅者不能有多个 eventType 相同的订阅方法
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            //将订阅方法和对应的接收的Event类型以及注解参数保存
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

这个方法主要是通过修饰符、参数个数、指定注解和是否有EventType相同的方法这几层筛选,最终将订阅方法添加进 findStatesubscriberMethods 这个 List 中。

其中重点有一个判断:findState.checkAdd(),这个方法决定了是否订阅方法可以被保存下来进而能接收到消息。一起来看看它是如何判断的:

private boolean checkAdd(Method method, Class<?> eventType) {
    // 1.检查eventType是否已经注册过对应的方法(一般都没有) 
    Object existing = anyMethodByEventType.put(eventType, method);
    if (existing == null) {
        return true;
    } else {
        // 2. 如果已经有方法注册了这个eventType
        if (existing instanceof Method) {
            if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                throw new IllegalStateException();
            }
            anyMethodByEventType.put(eventType, this);
        }
        return checkAddWithMethodSignature(method, eventType);
    }
}

进行了两种检查,第一种是判断当前类中是否已经有这个EventType和对应的订阅方法,一般一个类不会有对同一个EventType写多个方法,会直接返回true,进行保存。

但是如果出现了同一个类中同样的EventType写了多个方法,该如何处理?

还有当findUsingReflection()中进行下一轮循环,会进行父类查找,如果子类继承了父类的订阅方法,又该如何处理呢?

答案就在上边的注释2。关键点就是checkAddWithMethodSignature()方法:

private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
    // 以[方法名>eventType]为Key
    methodKeyBuilder.setLength(0);                                                   
    methodKeyBuilder.append(method.getName());                                       
    methodKeyBuilder.append('>').append(eventType.getName());
    String methodKey = methodKeyBuilder.toString();                  
    
    // 拿到新的订阅方法所属类
    Class<?> methodClass = method.getDeclaringClass();                               
    Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
    
    if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {    
        // methodClassOld == null或者 methodClassOld是methodClass的父类/同一个类     
        return true;                                                                 
    } else {                      
        // Revert the put, old class is further down the class hierarchy             
        subscriberClassByMethodKey.put(methodKey, methodClassOld);                   
        return false;                                                                
    }                                                                                
}

对于同一类中同样的EventType写了多个方法,因为方法名不同,所以[方法名>eventType]Key不同,methodClassOld会为null,直接返回 true。所以这种情况会将所有相同EventType的方法都进行保存。

对于子类重写父类方法的情况,则methodClassOld(即子类)不为null,并且methodClassOld也不是methodClass的父类,所以会返回false。即对于子类重写父类订阅方法,只会保存子类的订阅方法,忽略父类的订阅方法。

至此,findState的查找任务就结束了,通过循环向父类查找,将订阅者的订阅方法都保存在了其内部变量subscriberMethods列表中。

最后,跳出循环,回到findUsingReflection()方法中,最后返回了 getMethodsAndRelease(findState)

    private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        findState.recycle();
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        return subscriberMethods;
    }

很好理解,就是将findState中的subscriberMethods取出并返回。可以看到作者还是很细心的,将使用完的findState实例置空恢复后又放回实例池中,将实例回收利用,节省了新的开销。

再回到findSubscriberMethods()方法中,将查找的方法最后都存进了内存缓存METHOD_CACHE中, 对应关系是订阅类和它的订阅方法:

METHOD_CACHE.put(subscriberClass, subscriberMethods);

到这里,查找订阅方法就结束了。

订阅(subscribe())

回到register()方法中的第三步,对上一步查找到的订阅方法集合进行了遍历调用subscribe()方法。来看看subscribe()方法做了什么事:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        // 实例一个Subscription对象,内部持有了订阅者和订阅方法
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        // subscriptionsByEventType是以EventType为Key,以它对应Subscription集合的Map
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        // 根据优先级排序Subscription
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        // typesBySubscriber保存了订阅者对应的所有EventType
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        // 粘性订阅方法要立即处理
        if (subscriberMethod.sticky) {
            // 默认为true
            if (eventInheritance) {
                // 看当前EventType是否是已有的stickyEvent的子类或父类
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        // 立即投递事件
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

这里其实只做了三件事:

  1. 将订阅者和订阅方法封装到subscriptionsByEventType,它可以根据EventType拿到所有的Subscription对象,Subscription对象中就有订阅者和订阅方法。这样当有EventType消息过来时,可以快速的传递给订阅者的订阅方法。
  2. 将订阅者和订阅方法封装到typesBySubscriber,它可以根据订阅类拿到所有的EventType。这样当我们调用调用 unregister(this) 时,就可以拿到EventType,又根据EventType拿到所有订阅者和方法,进行解绑了。
  3. 如果当前订阅方法是粘性方法,则立即去查找是否有本地事件,有的话就立即投递。

至此,我们的注册就完成了。可以看到某些方法栈的调用还是非常深的,但是整体流程却很简单。这也是值得我们学习的地方。

注销

注册对应的就是注销,当我们的订阅者不再需要接收事件时,可以调用unregister进行注销:

public synchronized void unregister(Object subscriber) {
        // 通过订阅者拿到它订阅的所有的订阅事件类型
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            // 遍历事件类型集合,根据事件类型解绑
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            // 从记录中移除订阅者
            typesBySubscriber.remove(subscriber);
        } else {
            Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

代码很简单,继续看unsubscribeByEventType

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        // 根据事件类型拿到所有的Subscription
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            // 遍历所有Subscription,符合解除条件的进行remove
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

相比注册流程,注销流程就非常简单了,就是把对应的注册者和对应的注册信息从记录中移除即可。

发送事件

注册完毕后,我们的订阅者和订阅方法都被记录在了EventBus里,这时就可以给订阅者们发送事件了。EventBus提供了两种发送方法post()postSticky()post()发送的是非粘性的事件,postSticky()发送的是粘性事件。

post

先来看看post()发送:

public void post(Object event) {
        // 从当前线程中取出PostingThreadState
        PostingThreadState postingState = currentPostingThreadState.get();
        // 拿到EventType队列
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        // 当前线程是否有消息正在投递
        if (!postingState.isPosting) {
            postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
            //正在投递
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                // 循环一个个进行投递
                while (!eventQueue.isEmpty()) {
                    // 投递出去就remove掉
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                // 所有消息都投递完成
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

第一步使用ThreadLocal,是因为ThreadLocal保证了数据只对当前线程可见,其他线程是不可见的,这样的话当我们从不同的线程中去取数据,数据相当于是分开保存,设置和读取就会比较快。从当前线程中取出的是一个PostingThreadState:

    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<Object>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

PostingThreadState 主要包含了当前线程的Event队列、订阅者信息、事件等数据。接下来就是循环调用postSingleEvent()方法进行投送了:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        // 如果是可继承的事件
        if (eventInheritance) {
            // 查找Event的所有父类、接口类以及父类的接口类(Event的父类和接口也是Event)
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                // 根据查找到的所有Class(Event),逐个寻找订阅者,进行分发event
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            // 如果不是可继承的事件,则直接对
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        // 如果没有找到订阅者,报异常
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                Log.d(TAG, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

这个方法的作用很简单,就是做了个分支处理:如果是可继承的事件,则查找到它的所有父事件,然后再往下分发。继续看postSingleEventForEventType()方法:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        // 可能同时在多个线程同时发送Event,subscriptionsByEventType是共有常量,所以需要加锁
        synchronized (this) {
            // 根据Event的类型拿到所有订阅者
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                // 事件还是Event,但是可能会分发到订阅它父类的订阅者中
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    // 逐个通知订阅者
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

这个方法和上一个方法的职责也很单一,就是查找所有订阅者,然后遍历进行通知。查找用的是一个Map: postToSubscription,就是在订阅时生成的一个[EventType -> List<Subscription>] Map, 这样我们就可以根据EventType查找到所有订阅者。接着看postToSubscription()

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        // 根据订阅线程模式进行switch
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                // 直接调用在本线程
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    // 如果就在主线程,则直接调用
                    invokeSubscriber(subscription, event);
                } else {
                    // 如果不在主线程,则使用mainThreadPoster
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    // 如果在主线程,使用backgroundPoster
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    // 如果不在主线程,则直接在当前线程调用
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                // 启动新的线程调用,asyncPoster
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

这里就是EventBus的线程切换了,主要有POSTINGMAINBACKGROUNDASYNC四种模式,四种模式在开篇已经介绍过了。这里涉及到了invokeSubscriber()方法和mainThreadPosterbackgroundPosterasyncPoster三个poster,接下来我们就分别来看下:

invokeSubscriber()

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

invokeSubscriber()就是直接在当前线程调用了订阅者的method对象,这里调用了反射类Method的方法invoke()直接调用执行。

mainThreadPoster

mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);

mainThreadPoster是个自定义的类HandlerPoster,它的目的是在主线程中调用订阅方法,而EventBus使用的就是我们熟悉的Handler

final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        // 用来最后调用方法
        this.eventBus = eventBus;
        // 最大处理时间
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        // 一个待处理消息的队列
        queue = new PendingPostQueue();
    }
    
    ...
    ...
}

HandlerPoster是自定义的Handler,发送消息使用的是Looper.getMainLooper()即主线程的Handler。 内部定义了一个最大处理消息时间,默认是10毫秒,所以说我们一定不要在订阅方法中做耗时操作。还维护了一个PendingPostQueue,它是自定义的一个链表队列,这里猜测HandlerPoster可能是自己维护了消息队列,来看下入队方法:

    void enqueue(Subscription subscription, Object event) {
        // 获取一个PendingPost实例
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            // 入队
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                // 主线程的handler发送消息,发送到主线程
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

这里出现了一个新的类PendingPost,它封装了订阅者subscription实例和事件event实例;它内部又维护了一个大小为10000PendingPost(数组集合)池,用来重复利用PendingPost实例。然后将PendingPost实例入队到上一步说的PendingPostQueue队列中。接着使用主线程的Handler发送一个消息。接下来就是在handleMessage()中如何处理消息了:

@Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                // 从队列中取出一个pendingPost
                PendingPost pendingPost = queue.poll();
                // 如果队列里的消息处理完毕,就直接跳出循环。
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                // 调用订阅方法并会回收pendingPost
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                // 如果方法的执行时间超过最大执行时间(默认10毫秒)
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }

这里就是使用了while循环,不断从队列中去取PendingPost处理,但是加了个最大执行时间处理,因为是在主线程调用,所以一旦超时,就退出队列,并重新尝试去再进入队列。

BackgroundPoster

再来看看BackgroundPoster

BackgroundPoster的作用是将UI线程的订阅方法调度在非UI线程中。即它是要执行在新的Thread中的,而开启线程我们最常用的就是Runnable, 来看看源码:

final class BackgroundPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }
    
    ...
    ...
}

果不其然,BackgroundPoster实现了Runnable接口,这样就可以被线程执行。其内部也是维护了EventBus和一个PendingPost队列。

    public void enqueue(Subscription subscription, Object event) {
        // 从消息池中构建一个PendingPost
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            // 入队
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                // 线程池调度执行
                eventBus.getExecutorService().execute(this);
            }
        }
    }

HandlerPoster类似:新建一个新的PendingPost入队,使用了EventBus里的一个ExecutorService,它是对线程池定义的一个接口,来看看它的默认值:

public class EventBusBuilder {
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
    
    ...
    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
}

熟悉的Executors,创建了一个可缓存的线程池,用来执行BackgroundPoster这个Runnable 对象,再来看看BackgroundPosterrun()方法:

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    // 取出队头
                    PendingPost pendingPost = queue.poll(1000);
                    // 如果队头为空,说明队列里没有消息了,直接退出循环
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    // 调用订阅方法
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

也和HandlerPoster类似,但是不同之处在于:

  1. 使用了poll(int maxMillisToWait)方法,这个设计很巧妙,当取到最后发现队列为空后,会wait 1000 毫秒,当有有新的信息来临时就会唤醒线程,poll出消息。这样设计就减少了发送消息的次数,节省了资源。
  2. 因为是在子线程执行,所以就没有方法执行时间的限制了。

AsyncPoster

class AsyncPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        //直接开启新的线程执行
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        //  直接取出消息执行
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

}

可以看到AsyncPosterBackgroundPoster``非常的相似,因为它们的功能也非常相似。但是不同之处在于:BackgroundPoster是尽可能使用一个后台线层去依次排队执行订阅方法;而AsyncPoster则是每条消息都直接开启新的后台线程立即执行。

至此四个Poster就讲完了,看完是真的爽,不论是从功能抽象到具体细节的把控,EventBus都处理的很好,非常值得学习。

粘性事件

粘性事件这名字一听很耳熟,没错,安卓四大组件之BroadcastReceiver就有一种广播叫做粘性广播(StickyBroadcast),而EventBus也提供了类似的功能:当注册了粘性事件后,立即能收到还没有注册时系统发出的最后一个事件。

    public void postSticky(Object event) {
        // 将粘性事件保存下来
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        // Should be posted after it is putted, in case the subscriber wants to remove immediately
        post(event);
    }

postSticky()方法用来发送一个粘性事件,在这个方法中,直接将粘性事件保存在了一个Map集合中,而key就是Event的Class对象。接着就调用正常的post()方法了。

那为什么我们后注册的方法也能接收到之前发出的粘性事件呢,答案就在上面提到的注册方法subscribe()中的最后一段:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    ...
    
    // 如果是粘性事件,则直接发送出去
    if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // 从stickyEvents取出粘性事件的Class对象
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    // 如果订阅的事件是保存的粘性事件Class或它的父类
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        // 取出缓存的Event
                        Object stickyEvent = entry.getValue();
                        // 将缓存的Event发送出去
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
}

在我们注册订阅方法和事件时,如果是粘性事件,就直接会将事件发送给注册了相同Event的订阅者,方法中调用了checkPostStickyEventToSubscription(newSubscription, stickyEvent)方法:

    private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
        if (stickyEvent != null) {
            // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
            // --> Strange corner case, which we don't take care of here.
            postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());
        }
    }

很简单,直接又调用了postToSubscription()方法,根据指定线程分别进行分发。

总结

整体看下来,框架的结构非常的清晰全面,尤其在对方法功能的封装和细节处理上,很是值得学习。尤其在3.0后,EventBus使用了注解处理器在编译阶段就完成了订阅者的解析,这使得框架更佳的轻量和高效。整片源码读下来还是很爽的,没事还要再复习多看几遍,很多东西都值得学习。共勉!