EventBus源码解析

9,113 阅读24分钟

前言

最近跟一位前辈聊了一下学习方法,聊了很多,也收获了很多。从交流的过程中前辈送给我一句话:“学以致用,格物致知”。听完之后意识到之前的学习方法有很大的问题,在以后的学习中需要更多的和实践相结合,做到学以致用。各位小伙伴们有什么更好的学习方法吗?欢迎留言交流。

简介

本篇文章将会讲一下EventBus相关的知识,EventBus各位已经都很熟悉了,在日常的开发中也有使用,我们看一下官方是如何介绍EventBus的。
EventBus是适用于AndroidJava的开源库,使用发布者/订阅者模式进行松散耦合。EventBus使中央通信只需几行代码即可解耦类,从而简化了代码,消除了依赖关系并加快了应用程序的开发。

EventBus的优点:

1、简化了组件之间的通信
2、将事件的发送者和接受者分离
3、避免了因为复杂且容易出错的依赖性和生命周期造成的问题
4、体积小
...

EventBus的功能:

1、简单但功能强大: EventBus是一个微型库,具有易于学习的API。但是,通过解耦组件,您的软件体系结构可能会受益匪浅:在使用事件时,订阅者不了解发送者。
2、经过实战测试: EventBus是最常用的Android库之一:成千上万的应用程序使用EventBus,其中包括非常流行的应用程序。超过十亿的应用安装说明了一切。
3、高性能:特别是在Android上,性能至关重要。对EventBus进行了概要分析和优化;可能使其成为同类产品中最快的解决方案。
4、方便的基于注释的API (不牺牲性能):只需将@Subscribe注释放入您的订户方法中。由于注释的建立时间已建立索引,因此EventBus无需在应用程序运行时进行注释反射,这在Android上非常慢。
5、Android主线程传送:与UI交互时,EventBus可以在主线程中传送事件,而不管事件的发布方式如何。
6、后台线程传递:如果您的订户执行长时间运行的任务,EventBus也可以使用后台线程来避免UI阻塞。
7、事件和订阅者继承:在EventBus中,面向对象的范例适用于事件和订阅者类。假设事件类AB的超类。类型B的已发布事件也将被发布给对A感兴趣的订户。类似地,考虑订户类的继承。
8、零配置: 您可以使用代码中任何位置都可用的默认EventBus实例立即开始使用。
9、可配置: 要调整EventBus到您的要求,您可以使用构建器模式来调整其行为。

读了这么多官方文档,可以从里面总结成一句话:EventBus就是牛X,用就完事了,不用你就Out了。

使用

本篇文章使用的是官方最新版本3.1.1
我们按照官方的使用步骤写一个小Demo

第一步:定义事件

public class SayHelloEvent {
    private String message;

    public void sayHellow(String message) {
        this.message = message;
    }
}

这里事件被定义成一个普通类,在类的内部创建一个字段,同时创建一个方法。

第二步:准备订阅者


@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(SayHelloEvent event) {
    String message = event.getMessage();
    ...
}

@Override
public void onStart() {
    super.onStart();
    EventBus.getDefault().register(this);
}
 
@Override
public void onStop() {
    EventBus.getDefault().unregister(this);
    super.onStop();
}

这里需要在创建需要订阅者是调用EventBus.getDefault().register(this)方法进行注册,在适当的位置调用EventBus.getDefault().unregister(this)方法进行解注册。同时创建一个接收事件的方法onMessageEvent方法,使用注解@Subscribe进行标示,同时生命事件接收的线程为主线程。

第三步:发送事件

EventBus.getDefault().post(new SayHelloEvent("Hello,EventBus!!!"));

在需要发送事件的地方调用EventBus.getDefault().post方法,将第一步创建的事件对象传入即可。
到这里EventBus的使用方法就算结束了,看起来挺简单的,但是这里面有很多需要注意的地方,别着急,这些都会在代码探究中进行讲解。

知识点准备

在进行源码分析之前,我们要知道EventBus中的三个知识点:角色分配、线程模型和事件类型。

角色分配

Event

事件,它可以是任意类型,EventBus会根据事件的类型进行全局发送。

Subscriber

事件订阅者,也可以称之为事件的接收者。在EventBus3.0之前,使用时必须以OnEvent开头的几种接收方法来接收事件。这些方法分别为:onEventonEventMainThreadonEventBackgroundThreadonEventAsync,但是在3.0之后处理方法可以自定义,但是要以注解@subscribe进行标示,同时指定线程模型,线程模型默认为POSTING的方式。

Publisher

事件的发布者,它可以在任意线程中发布事件。在使用时通常调用EventBus.getDefault()方法获取一个EventBus对象,再通过链式编程调用post()方法进行事件发送。

线程模型

事件模型是指事件订阅者所在线程和发布事件者所在线程的关系。

POSTING

事件的订阅和事件的发布处于同一线程。这是默认设置。事件传递意味着最少的开销,因为它完全避免了线程切换。因此,对于已知在非常短的时间内完成而不需要主线程的简单任务,这是推荐的模式。使用此模式的事件处理程序必须快速返回,以避免阻塞可能是主线程的发布线程。

MAIN

Android上,用户将在Android的主线程(UI线程)中被调用。如果发布线程是主线程,则将直接调用订阅方方法,从而阻塞发布线程。否则,事件将排队等待传递(非阻塞)。使用此模式的订阅服务器必须快速返回,以避免阻塞主线程。如果不在Android上,则行为与POSTING相同。

MAIN_ORDERED

Android上,用户将在Android的主线程(UI线程)中被调用。与{@link#MAIN}不同,事件将始终排队等待传递。这确保了post调用是非阻塞的。

BACKGROUND

Android上,用户将在后台线程中被调用。如果发布线程不是主线程,则将在发布线程中直接调用订阅方方法。如果发布线程是主线程,则EventBus使用单个后台线程,该线程将按顺序传递其所有事件。使用此模式的订阅服务器应尝试快速返回,以避免阻塞后台线程。如果不在Android上,则始终使用后台线程。

ASYNC

订阅服务器将在单独的线程中调用这始终独立于发布线程和主线程。发布事件从不等待使用此模式的订阅服务器方法。如果订户方法的执行可能需要一些时间(例如,用于网络访问),则应使用此模式。避免同时触发大量长时间运行的异步订阅服务器方法来限制并发线程的数量。EventBus使用线程池有效地重用来自已完成的异步订阅服务器通知的线程。

事件类型

普通事件

普通事件是指已有的事件订阅者能够收到事件发送者发送的事件,在事件发送之后注册的事件接收者将无法收到事件。发送普通事件可以调用EventBus.getDefault().post()方法进行发送。

粘性事件

粘性事件是指,不管是在事件发送之前注册的事件接收者还是在事件发送之后注册的事件接收者都能够收到事件。这里于普通事件的区别之处在于事件接收处需要定义事件接收类型,它可以通过@Subscribe(threadMode = xxx, sticky = true)的方式进行声明;在事件发送时需要调用EventBus.getDefault().postSticky()方法进行发送。事件类型默认为普通事件。

事件优先级

订阅者优先级以影响事件传递顺序。在同一传递线程ThreadMode中,优先级较高的订阅者将在优先级较低的其他订阅者之前接收事件。默认优先级为0。注意:优先级不影响具有不同ThreadMode的订阅服务器之间的传递顺序!

源码分析

EventBus.getDefault()创建EventBus对象

在EventBus使用的过程中,不管是注册、解注册或者是发送事件都会用到这个方法,所以我们首先看一下这个方法中都是做了什么。

public class EventBus {
    static volatile EventBus defaultInstance;
    private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
    private final Map<Object, List<Class<?>>> typesBySubscriber;
        private final Map<Class<?>, Object> stickyEvents;
    ...
    // 1、单例设计模式返回EventBus对象
    public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    // 2、调用EventBus构造方法
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }
    ...
    public EventBus() {
        // 3、调用有参构造方法,传入一个EventBusBuilder对象
        this(DEFAULT_BUILDER);
    }
    
    EventBus(EventBusBuilder builder) {
        //日志
        logger = builder.getLogger();
        //这个集合可以根据事件类型获取订阅者
        //key:事件类型,value:订阅该事件的订阅者集合
        subscriptionsByEventType = new HashMap<>();
        //订阅者所订阅的事件集合
        //key:订阅者,value:该订阅者订阅的事件集合
        typesBySubscriber = new HashMap<>();
        //粘性事件集合
        //key:事件Class对象,value:事件对象
        stickyEvents = new ConcurrentHashMap<>();
        //Android主线程处理事件
        mainThreadSupport = builder.getMainThreadSupport();
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        //Background事件发送者
        backgroundPoster = new BackgroundPoster(this);
        //异步事件发送者
        asyncPoster = new AsyncPoster(this);
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        //订阅者订阅事件查找对象
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }
}

这个方法内部首先通过单例模式创建一个EventBus对象,在创建EventBus时最终会调用它的有参构造函数,传入一个EventBus.Builder对象。在这个有参构造函数内部对属性进行初始化,其中有几个比较重要的属性:subscriptionsByEventTypetypesBySubscriberstickyEventssubscriberMethodFinder,这几个属性的作用已经在注释中给出。

register(Object subscriber)

创建EventBus对象之后需要将事件订阅者在EventBus中进行注册。

public class EventBus {
    /**
     * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
     * are no longer interested in receiving events.
     * <p/>
     * Subscribers have event handling methods that must be annotated by {@link Subscribe}.
     * The {@link Subscribe} annotation also allows configuration like {@link
     * ThreadMode} and priority.
     */
    public void register(Object subscriber) {
        // 1、通过反射获取到订阅者的Class对象
        Class<?> subscriberClass = subscriber.getClass();
        // 2、通过subscriberMethodFinder对象获取订阅者所订阅事件的集合
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            // 3、遍历集合进行注册
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }
    
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        // 4、获取事件类型
        Class<?> eventType = subscriberMethod.eventType;
        // 5、封装Subscription对象
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        // 6、通过事件类型获取该事件的订阅者集合
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        // 7、如果没有订阅者订阅该事件
        if (subscriptions == null) {
            // 创建集合,存入subscriptionsByEventType集合中
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else { // 8、如果有订阅者已经订阅了该事件
            // 判断这些订阅者中是否有重复订阅的现象
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }
        int size = subscriptions.size();
        // 9、遍历该事件的所有订阅者
        for (int i = 0; i <= size; i++) {
            // 按照优先级高低进行插入,如果优先级最低,插入到集合尾部
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
        
        // 10、获取该事件订阅者订阅的所有事件集合
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        // 11、将该事件加入到集合中
        subscribedEvents.add(eventType);
        
        // 12、判断该事件是否是粘性事件
        if (subscriberMethod.sticky) {
            if (eventInheritance) { // 13、判断事件的继承性,默认是不可继承
                // 14、获取所有粘性事件并遍历,判断继承关系
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        // 15、调用checkPostStickyEventToSubscription方法
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }
    
    private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
        if (stickyEvent != null) {
            // 16、如果粘性事件不为空
            postToSubscription(newSubscription, stickyEvent, isMainThread());
        }
    }


    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        // 17、根据threadMode的类型去选择是直接反射调用方法,还是将事件插入队列
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    // 18、通过反射的方式调用
                    invokeSubscriber(subscription, event);
                } else {
                    // 19、将粘性事件插入到队列中
                    // 最后还是会调用EventBus.invokeSubscriber(PendingPost pendingPost)方法。
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }
    
    void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            invokeSubscriber(subscription, event);
        }
    }
    
    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }
}


public class SubscriberMethod {
    final Method method; // 处理事件的Method对象
    final ThreadMode threadMode; //线程模型
    final Class<?> eventType; //事件类型
    final int priority; //事件优先级
    final boolean sticky; //是否是粘性事件
    String methodString;
}

final class Subscription {
    final Object subscriber;
    final SubscriberMethod subscriberMethod;
}

还记得上面代码注释2处通过调用subscriberMethodFinder.findSubscriberMethods(subscriberClass)方法,获取到了订阅者所有的订阅事件集合。

class SubscriberMethodFinder {
    private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;
    private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
    private static final int POOL_SIZE = 4;
    private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
    
    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        // 1、先从之前缓存的集合中获取
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            // 2、如果之前缓存了,直接返回
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) { //ignoreGeneratedIndex一般为false
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            // 3、获取所有订阅方法集合
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            // 4、放入缓存集合中
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }
    
    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        // 5、从数组中获取FindState对象
        // 如果有直接返回,如果没有创建一个新的FindState对象
        FindState findState = prepareFindState();
        // 6、根据事件订阅者初始化findState
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            // 7、获取subscriberInfo,初始化为null
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                // 8、通过反射的方式获取订阅者中的Method
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }
    
    private FindState prepareFindState() {
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                FindState state = FIND_STATE_POOL[i];
                if (state != null) {
                    FIND_STATE_POOL[i] = null;
                    return state;
                }
            }
        }
        return new FindState();
    }
    
    private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // 9、订阅者中所有声明的方法,放入数组中
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // 10、获取订阅者中声明的public方法,设置跳过父类
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        // 遍历这些方法
        for (Method method : methods) {
            // 11、获取方法的修饰符:public、private等等
            int modifiers = method.getModifiers();
            // 12、订阅方法为public同时不是abstract、static
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                // 13、方法参数类型数组
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length == 1) {
                    // 14、获取方法的注解
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    // 15、如果有注解
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        // 16、将method和eventType放入到findState进行检查
                        if (findState.checkAdd(method, eventType)) {
                            // 17、获取注解中的threadMode对象
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            // 18、新建一个SubscriberMethod对象,同时加入到findState中
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }
    
    // 从findState中获取订阅者所有方法并释放
    private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        // 获取订阅者所有订阅方法集合
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        // findState进行回收
        findState.recycle();
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        // 返回集合
        return subscriberMethods;
    }
}

到这里,注册的流程差不多就走完了,可以稍作休息,放松一下。

小结

在订阅者注册的过程中主要进行了以下几个步骤,详细的步骤都在代码的注释中,大家可以自行查看。

1、根据单例设计模式创建一个EventBus对象,同时创建一个EventBus.Builder对象对EventBus进行初始化,其中有三个比较重要的集合和一个SubscriberMethodFinder对象。
2、调用register方法,首先通过反射获取到订阅者的Class对象。
3、通过SubscriberMethodFinder对象获取订阅者中所有订阅的事件集合,它先从缓存中获取,如果缓存中有,直接返回;如果缓存中没有,通过反射的方式去遍历订阅者内部被注解的方法,将这些方法放入到集合中进行返回。
4、遍历第三步获取的集合,将订阅者和事件进行绑定。
5、在绑定之后会判断绑定的事件是否是粘性事件,如果是粘性事件,直接调用postToSubscription方法,将之前发送的粘性事件发送给订阅者。其实这也很好理解,在讲粘性事件时说过,如果在粘性事件发送之前注册的订阅者,当发送粘性事件时,会接收到该事件;如果是粘性事件发送之后注册的订阅者,同样也能接收到事件,原因就在这里。

EventBus.getDefault().post(Object event)

public class EventBus {
    ...
    public void post(Object event) {
        // 1、获取当前线程的PostingThreadState,这是一个ThreadLocal对象
        PostingThreadState postingState = currentPostingThreadState.get();
        // 2、当前线程的事件集合
        List<Object> eventQueue = postingState.eventQueue;
        // 3、将要发送的事件加入到集合中
        eventQueue.add(event);

        // 查看是否正在发送事件
        if (!postingState.isPosting) {
            // 判断是否是主线程
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                // 4、只要事件集合中还有事件,就一直发送
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }
    
    // currentPostingThreadState是包含了PostingThreadState的ThreadLocal对象
    // ThreadLocal是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据, 并且线程之间的数据是相互独立的。
    // 其内部通过创建一个它包裹的泛型对象的数组,不同的线程对应不同的数组索引,每个线程通过get方法获取对应的线程数据。
    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };
    
    // 每个线程中存储的数据
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>(); // 线程的事件队列
        boolean isPosting; //是否正在发送中
        boolean isMainThread; //是否在主线程中发送
        Subscription subscription; //事件订阅者和订阅事件的封装
        Object event; //事件对象
        boolean canceled; //是否被取消发送
    }
    
    ...
    
    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        // 5、获取事件的Class对象
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) { // eventInheritance一般为true
            // 6、 找到当前的event的所有 父类和实现的接口 的class集合
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                // 7、遍历集合发送单个事件
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }
    
    private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
        synchronized (eventTypesCache) {
            // 获取事件集合
            List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
            if (eventTypes == null) { //如果为空
                eventTypes = new ArrayList<>();
                Class<?> clazz = eventClass;
                while (clazz != null) {
                    eventTypes.add(clazz); //添加事件
                    addInterfaces(eventTypes, clazz.getInterfaces()); //添加当前事件的接口class
                    clazz = clazz.getSuperclass();// 获取当前事件的父类
                }
                eventTypesCache.put(eventClass, eventTypes);
            }
            return eventTypes;
        }
    }
    
    //循环添加当前事件的接口class
    static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
        for (Class<?> interfaceClass : interfaces) {
            if (!eventTypes.contains(interfaceClass)) {
                eventTypes.add(interfaceClass);
                addInterfaces(eventTypes, interfaceClass.getInterfaces());
            }
        }
    }

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            // 8、根据事件获取所有订阅它的订阅者
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            // 9、遍历集合
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    // 10、将事件发送给订阅者
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    // 11、重置postingState
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }
    
    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        // 12、根据订阅方法的线程模式调用订阅方法
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING: //默认类型,表示发送事件操作直接调用订阅者的响应方法,不需要进行线程间的切换
                invokeSubscriber(subscription, event);
                break;
            case MAIN: //主线程,表示订阅者的响应方法在主线程进行接收事件
                if (isMainThread) { //如果发送者在主线程
                    invokeSubscriber(subscription, event);//直接调用订阅者的响应方法
                } else { //如果事件的发送者不是主线程
                    //添加到mainThreadPoster的队列中去,在主线程中调用响应方法
                    mainThreadPoster.enqueue(subscription, event); 
                }
                break;
            case MAIN_ORDERED:// 主线程优先模式
                if (mainThreadPoster != null) {
                    
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    //如果不是主线程就在消息发送者的线程中进行调用响应方法
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    // 如果事件发送者在主线程,加入到backgroundPoster的队列中,在线程池中调用响应方法
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    // 如果不是主线程,在事件发送者所在的线程调用响应方法
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                //这里没有进行线程的判断,也就是说不管是不是在主线程中,都会在子线程中调用响应方法
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }
    ...
}

小结

有关发送事件的分析先到这里,我们先来小结一下事件发送都是做了什么。

1、获取当前线程的事件集合,将要发送的事件加入到集合中。
2、通过循环,只要事件集合中还有事件,就一直发送。
3、获取事件的Class对象,找到当前的event的所有父类和实现的接口的class集合。遍历这个集合,调用发送单个事件的方法进行发送。
4、根据事件获取所有订阅它的订阅者集合,遍历集合,将事件发送给订阅者。
5、发送给订阅者时,根据订阅方法的线程模式调用订阅方法,如果需要线程切换,则切换线程进行调用;否则,直接调用。

有关线程切换的问题会放到后面的下文中做一个单独的主题进行探究。

EventBus.getDefault().postSticky(Object event)

接下来我们来看一下发送粘性事件。

public class EventBus {
    public void postSticky(Object event) {
        synchronized (stickyEvents) {
            // 1、将事件添加到粘性事件集合中
            stickyEvents.put(event.getClass(), event);
        }
        // 2、发送事件
        post(event);
    }
}

小结

发送粘性事件做了两件事:

1、将粘性事件加入到EventBus对象的粘性事件集合中,当有新的订阅者进入后,如果该订阅者订阅了该粘性事件,可以直接发送给订阅者。
2、将粘性事件发送给已有的事件订阅者。

EventBus.getDefault().unregister(Object subscriber)

前面已经把订阅者注册和消息的发送进行了探究,接下来我们再来看一下解注册的方法。

public class EventBus {
    ...
    public synchronized void unregister(Object subscriber) {
        // 1、获取订阅者订阅的所有事件
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            // 2、遍历集合
            for (Class<?> eventType : subscribedTypes) {
                // 3、将该订阅者的从订阅该事件的所有订阅者集合中移除
                unsubscribeByEventType(subscriber, eventType);
            }
            // 4、将订阅者从集合中移除
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }
    
    private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        // 获取该事件的所有订阅者
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            // 遍历集合
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                // 将订阅者从集合中移除
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }
    ...
}

小结

解注册一共就做了两件事:

1、获取订阅者的所有订阅方法,遍历这些方法。然后拿到每个方法对应的所有订阅者集合,将订阅者从集合中移除。
2、移除订阅者中所有的订阅方法。

EventBus是如何进行线程间切换的?

由于事件的发送牵扯到前程的切换,在平时的面试中被提到的几率会比较大,所以作为一个单独的主题来进行探究。

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

这个方法中根据ThreadMode类型来调用订阅者的订阅方法,有关ThreadMode的解释已经在前面给出,如果不太清楚的可以看一下。
这个方法中涉及到的前程切换的地方有两个:mainThreadPosterbackgroundPoster,有关ASYNC模式下是如何工作的,这里也会进行探究。

mainThreadPoster.enqueue

mainThreadPosterEventBus进行初始化时创建的,我们来看一下。

    ...
    private final MainThreadSupport mainThreadSupport;
    private final Poster mainThreadPoster;
    ...
    EventBus(EventBusBuilder builder) {
        ...
        // 1、创建mainThreadSupport
        mainThreadSupport = builder.getMainThreadSupport();
        // 2、创建mainThreadPoster
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        ...
    }

这里创建了两个对象:mainThreadSupport和mainThreadPoster,我们先来看一下mainThreadSupport是什么。

    MainThreadSupport getMainThreadSupport() {
        if (mainThreadSupport != null) {
            return mainThreadSupport;
        } else if (Logger.AndroidLogger.isAndroidLogAvailable()) {
            // 获取主线程Looper对象,这里又可能为空
            Object looperOrNull = getAndroidMainLooperOrNull();
            // 根据主线程Looper对象返回AndroidHandlerMainThreadSupport对象
            return looperOrNull == null ? null :
                    new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
        } else {
            return null;
        }
    }
    
    Object getAndroidMainLooperOrNull() {
        try {
            // 返回主线程Looper
            return Looper.getMainLooper();
        } catch (RuntimeException e) {
            return null;
        }
    }
    
public interface MainThreadSupport {
    boolean isMainThread();
    Poster createPoster(EventBus eventBus);
    
    class AndroidHandlerMainThreadSupport implements MainThreadSupport {
        private final Looper looper;
        public AndroidHandlerMainThreadSupport(Looper looper) {
            // 根据外部传入的looper对象进行本地初始化
            this.looper = looper;
        }

        @Override
        public boolean isMainThread() {
            // 判断是否为主线程
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            // 当调用createPoster方法时创建一个HandlerPoster对象
            return new HandlerPoster(eventBus, looper, 10);
        }
    }
}

我们看到创建mainThreadSupport对象实际上是根据主线程的Looper对象来创建的,MainThreadSupport实际上是一个接口,所以这里返回的是它的实现类AndroidHandlerMainThreadSupport对象。
接下来让我们看一下mainThreadPoster对象,在创建mainThreadPoster对象时,调用了mainThreadSupport.createPoster方法,由于MainThreadSupport是一个接口,所以实际上是调用了它的实现类对象的createPoster方法,在方法的内部创建了一个HandlerPoster对象并返回,我们看一下HandlerPoster

//这个类继承自Handler并且实现了Poster接口
public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        //这里传入的是主线程的Looper对象,所以这个Handler对象是主线程的Handler
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        //创建一个事件队列
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        //根据传入的参数封装一个PendingPost对象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //将PendingPost加入到队列中
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                // 调用sendMessage,发送事件回到主线程
                // 最终会调用下面的handleMessage方法
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            //无限循环
            while (true) {
                PendingPost pendingPost = queue.poll();
                //进行两次检查,确保pendingPost不为null,如果为null直接跳出循环
                if (pendingPost == null) {
                    synchronized (this) {
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //使用反射的方法调用订阅者的订阅方法。
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

mainThreadPoster.enqueue我们已经探究完毕,其实还是运行了Handler机制。

backgroundPoster.enqueue

backgroundPoster还是在EventBus初始化时一并初始化

public class EventBus {
    ...
    private final BackgroundPoster backgroundPoster;
    //线程池对象
    private final ExecutorService executorService;
    EventBus(EventBusBuilder builder) {
        ...
        backgroundPoster = new BackgroundPoster(this);
        ...
    }
    ...
}

public class EventBusBuilder {
    // 创建线程池
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
    
    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
}

public class Executors {
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
}

这里同时初始化了一个线程池对象,这个线程池会在后面用到。 看一下BackgroundPoster的内部结构。

final class BackgroundPoster implements Runnable, Poster {
    private final PendingPostQueue queue;
    private final EventBus eventBus;
    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        //初始化队列
        queue = new PendingPostQueue();
    }
    
    public void enqueue(Subscription subscription, Object event) {
        //封装PendingPost对象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //将PendingPost对象加入到队列中
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                //这里使用到之前初始化的线程池
                eventBus.getExecutorService().execute(this);
            }
        }
    }
    
    //线程池的执行回调
    @Override
    public void run() {
        try {
            try {
                //无限循环
                while (true) {
                    //获取队列中的PendingPost,进行双重检查
                    //如果为null直接返回,结束循环
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    //使用反射的方式调用订阅者的订阅方法
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }
}

从上面的代码中我们可以看到backgroundPoster.enqueue实际上是使用了线程池的方式。

asyncPoster.enqueue

首先我们还是看一下asyncPoster的内部结构。

class AsyncPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }
}

从它的内部结构来看有一种似曾相识的感觉,没错它的内部结构相当于BackgroundPoster的简化版,但是原理还是一样的,也是使用了线程池。

总结

到这里EventBus的探究算是告一段落,关键步骤都在文中代码的注释中给出,如果有哪里写的不对的地方,恳请大家批评指正,感谢。最后给大家留一个问题:EventBus有哪些不足之处?

参考资料

EventBus官网