Android进阶1:EventBus 3.0 源码分析

330 阅读17分钟

转载请注明出处:YingKe

综述

EventBus是我们项目中最常用的开源框架之一。对于EventBus的使用方法也非常简单。然而EventBus的内部实现原理也不是很复杂。EventBus是一个Android事件发布/订阅框架,通过解耦发布者和订阅者简化 Android 事件传递

EventBus 3.0图解

对于EventBus的工作过程很简单,用一句话概括:事件被提交到EventBus后进行查找所有订阅该事件的方法并执行这些订阅方法

普及名词

  1. Subscriber :事件订阅者,用于接收事件
  2. onEvent : 发送者在哪个线程发送的,该方法就在哪个线程处理事件,即处理者与发送者同线程
  3. onEventMainThread :不管发送者在哪个线程发送的,该方法都会在主线程处理任务
  4. onEventBackgroundThread : 如果发送者是在子线程发送的,该方法也在同一个子线程处理事件,如果发送者在主线程,该方法在一个线程次中处理事件,即处理事件一定在子线程
  5. onEventAsync : 不管发送者在哪个线程发送的,该方法都在线程池中执行

使用方法

1. 注册订阅者 首先我们需要将我们希望订阅事件的类,通过EventBus类注册,注册代码如下:

//3.0版本的注册
EventBus.getDefault().register(this);
       
//2.x版本的注册
EventBus.getDefault().register(this);
EventBus.getDefault().register(this, 100);
EventBus.getDefault().registerSticky(this, 100);
EventBus.getDefault().registerSticky(this);

可以看到2.x版本中有四种注册方法,区分了普通注册和粘性事件注册,并且在注册时可以选择接收事件的优先级,这里我们就不对2.x版本做过多的研究了.由于3.0版本将粘性事件以及订阅事件的优先级换了一种更好的实现方式,所以3.0版本中的注册就变得简单,只有一个register()方法即可.

2. 编写响应事件订阅方法 注册之后,我们需要编写响应事件的方法,代码如下:

/3.0版本
@Subscribe(threadMode = ThreadMode.BACKGROUND, sticky = true, priority = 100)
public void test(XXXEvent str) {
    
}

//2.x版本
public void onEvent(XXXEvent event) {

}
public void onEventMainThread(XXXEvent str) {

}
public void onEventBackgroundThread(XXXEvent str) {

}

在2.x版本中只有通过onEvent开头的方法会被注册,而且响应事件方法触发的线程通过onEventMainThread或onEventBackgroundThread这些方法名区分, 而在3.0版本中.通过@Subscribe注解,来确定运行的线程threadMode,是否接受粘性事件sticky以及事件优先级priority,而且方法名不在需要onEvent开头,所以又简洁灵活了不少.

3. 发送事件

我们可以通过EventBus的post()方法来发送事件,发送之后就会执行注册过这个事件的对应类的方法.或者通过postSticky()来发送一个粘性事件.在代码是2.x版本和3.0版本是一样的.

EventBus.getDefault().post(new XXXEvent());
EventBus.getDefault().postSticky(new XXXEvent());

4. 解除注册 当我们不在需要接收事件的时候需要解除注册unregister,2.x和3.0的解除注册也是相同的.代码如下:

EventBus.getDefault().unregister(this);

源码分析

1. 类图

EventBus 3.0

2. 创建EventBus 一般情况下我们都是通过EventBus.getDefault()获取到EventBus对象,从而在进行register()或者post()等等,所以我们看看getDefault()方法的实现:

//默认的EventBus 构建者
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
 
//单例
public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }

 /**
     * Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a
     * central bus, consider {@link #getDefault()}.
     */
    public EventBus() {
        this(DEFAULT_BUILDER);
    }
    //构造函数
    EventBus(EventBusBuilder builder) {
        //日志
        logger = builder.getLogger();
        //key:订阅的事件类型,value:订阅这个事件的所有订阅者集合 线程安全的  按订阅事件分类
        subscriptionsByEventType = new HashMap<>();
        //key:订阅者对象,value:这个订阅者订阅的事件集合  按订阅者分类
        typesBySubscriber = new HashMap<>();
        //粘性事件 key:粘性事件的class对象, value:事件对象
        stickyEvents = new ConcurrentHashMap<>();
        //事件主线程处理
        mainThreadSupport = builder.getMainThreadSupport();
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        //事件 Background 处理
        backgroundPoster = new BackgroundPoster(this);
        //事件异步线程处理
        asyncPoster = new AsyncPoster(this);
        
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        
        //订阅者响应函数信息存储和查找类  这个比较重要
        //默认情况下 subscriberInfoIndexes == null 和ignoreGeneratedIndex == false 后面查找Method时会用
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
                
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }


3. 注册过程源码分析

3.1 register()方法的实现

3.0的注册只提供一个register()方法了,所以我们先来看看register()方法做了什么:



    /**
     * 注册一个事件订阅者来接收事件。当订阅者不在对接收事件感兴趣,必须调用{@link #unregister(Object)}
     * <p/>
     * Subscribers have event handling methods that must be annotated by {@link Subscribe}.
     * The {@link Subscribe} annotation also allows configuration like {@link
     * ThreadMode} and priority.
     * 订阅者处理事件的方法 必须使用注解@Subscribe,并且允许配置ThreadMode和priority
     */
    public void register(Object subscriber) {
        //首先获得订阅者的class对象
        Class<?> subscriberClass = subscriber.getClass();
        
        //通过subscriberMethodFinder来找到订阅者订阅了哪些事件.返回一个SubscriberMethod对象的List,SubscriberMethod
        //里包含了这个方法的Method对象,以及将来响应订阅是在哪个线程的ThreadMode,以及订阅的事件类型eventType,以及订阅的优
        //先级priority,以及是否接收粘性sticky事件的boolean值.
        
        //订阅者可以订阅多个Event事件
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        
        synchronized (this) {
            //遍历 订阅
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }
    //订阅  主要作用 注册到subscriptionsByEventType 和typesBySubscriber 两个集合中
    // Must be called in synchronized block
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
       //订阅事件的类型class
        Class<?> eventType = subscriberMethod.eventType;
        //对subscriber和subscriberMethod的封装
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        //根据事件类型,获取对该事件类型感兴趣的所有订阅者
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }
        //遍历 对该事件类型感兴趣的 所有订阅者
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            //1,插在末尾,2,插在优先级的位置,subscriptions按优先级顺序的
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
        //根据该订阅者,获取它已经订阅的所有事件类型
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        //把当前事件加进去,多一个
        subscribedEvents.add(eventType);
         
        //如果事件是粘性的
        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

    
    
    //订阅者Method
    public class SubscriberMethod {
        //处理事件方法的Method对象
        final Method method;
        //响应订阅是在哪个线程的ThreadMode,
        final ThreadMode threadMode;
        //订阅的事件类型eventType
        final Class<?> eventType;
        //优先级
        final int priority;
        //是否粘性
        final boolean sticky;
        /** Used for efficient comparison */
        String methodString;
   }
   //订阅
   final class Subscription {
        //订阅者
        final Object subscriber;
        //订阅者Method
        final SubscriberMethod subscriberMethod;
    }
    
    

可以看到register()方法很简洁,代码里的注释也很清楚了,我们可以看出通过subscriberMethodFinder.findSubscriberMethods(subscriberClass)方法就能返回一个SubscriberMethod的对象,而SubscriberMethod里包含了所有我们需要的接下来执行subscribe()的信息.所以我们先去看看findSubscriberMethods()是怎么实现的

3.2 SubscriberMethodFinder的实现

findSubscriberMethods()到底是如何实现的:

 //缓存方法 列表
 private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

 List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //缓存了当前类的方法信息 列表
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        
        if (subscriberMethods != null) {
            //之前有缓存 的就返回
            return subscriberMethods;
        }
        //根据前面EvenBus实例化 一般为false
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            //获取订阅Method
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            //放入缓存并返回
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }
    

看看findUsingInfo方法的实现:

  private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        //从缓存中得到一个FindState对象
        FindState findState = prepareFindState();
        // 初始化刚才得到的findState
        findState.initForSubscriber(subscriberClass);
        
        while (findState.clazz != null) {
             //获取subscriberInfo 存入findState中,一般为null
            findState.subscriberInfo = getSubscriberInfo(findState);
            //一般null
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                //在一个class钟使用反射 获取Method
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        //获取Method并且释放
        return getMethodsAndRelease(findState);
    }
    
    
   
    //缓存了4个FindState对象
    private static final int POOL_SIZE = 4;
    private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
    //从缓存中拿
    private FindState prepareFindState() {
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                FindState state = FIND_STATE_POOL[i];
                //不为null,就返回
                if (state != null) {
                    FIND_STATE_POOL[i] = null;
                    return state;
                }
            }
        }
        //没有就new个新的
        return new FindState();
    }
      //FindState类
      static class FindState {
        // 申明fianl 避免重新分配内存
        //订阅的Method的列表
        final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
        //根据事件类型分类 Method
        final Map<Class, Object> anyMethodByEventType = new HashMap<>();
        //根据Method分类
        final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
        final StringBuilder methodKeyBuilder = new StringBuilder(128);

        //订阅者类型
        Class<?> subscriberClass;
        //订阅者类型
        Class<?> clazz;
        //跳过父类 一般为false
        boolean skipSuperClasses;
        //订阅者info,一般为null
        SubscriberInfo subscriberInfo;

        void initForSubscriber(Class<?> subscriberClass) {
            this.subscriberClass = clazz = subscriberClass;
            skipSuperClasses = false;
            subscriberInfo = null;
        }
      }
    

//在一个class中使用反射 获取Method,,subscriberMethods都保存在findState中
 private void findUsingReflectionInSingleClass(FindState findState) {
        //Method数组
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            //获取订阅者 已经定义的方法
            methods = findState.clazz.getDeclaredMethods();
            
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        //遍历每个方法
        for (Method method : methods) {
            //标识符 public/private/protect
            int modifiers = method.getModifiers();
            
            //订阅方法 一定是public,non-static, and non-abstract
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                
                //方法 的参数类型
                Class<?>[] parameterTypes = method.getParameterTypes();
                
                //参数只能为1个
                if (parameterTypes.length == 1) {
                    
                    //method获得 Subscribe注解
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        // 如果是被Subscribe注解修饰的方法,第一个参数类型就是 订阅者关注的事件类型eventType
                        Class<?> eventType = parameterTypes[0];
                        //把这个eventType和method加入findState中
                        if (findState.checkAdd(method, eventType)) {
                            //注解对象的threadMode 变量
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            //findState 中添加subscriberMethod 
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }
    
    
     //从findState中获取subscriberMethods并且释放
     private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        //findState的subscriberMethods
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        
        //findState对象回收,其实是清空那4个fianl 集合,避免重建,开销大
        findState.recycle();
        //缓存起来
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        //返回
        return subscriberMethods;
    }


以上就是所有注册过程,现在再来看这张图就会特别清晰EventBus的register()过程了:

register 流程

3.3 事件分发过程源码分析

通过第二节我们知道可以通过EventBus.getDefault().post(XXXEvent event);来发送一个事件,所以我们就从这行代码开始分析,首先看看post()方法是如何实现的:


    /** Posts the given event to the event bus. */
    public void post(Object event) {
        //得到当前线程的Posting状态.  postingState是 当前调用线程保存的变量
        PostingThreadState postingState = currentPostingThreadState.get();
        //当前线程里 保存的事件集合
        List<Object> eventQueue = postingState.eventQueue;
        //把要发送的事件 保存在事件队列中
        eventQueue.add(event);
         
         //如果当前线程的发送状态postingState  没有在发送中就发送,那么其他的event就会直接进队列
        if (!postingState.isPosting) {
            //当前发送线程 是否主线程
            postingState.isMainThread = isMainThread();
            //正在发送中
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //循环发送 单个事件
                while (!eventQueue.isEmpty()) {
                    // 发送单个事件 
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                //把当前线程 维护的发送状态 初始化
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

首先是通过currentPostingThreadState.get()方法来得到当前线程PostingThreadState的对象,为什么是说当前线程我们来看看currentPostingThreadState的实现:

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };

currentPostingThreadState的实现是一个包含了PostingThreadState的ThreadLocal对象,关于ThreadLocal:ThreadLocal 是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据, 而这段数据是不会与其他线程共享的。其内部原理是通过生成一个它包裹的泛型对象的数组,在不同的线程会有不同的数组索引值,通过这样就可以做到每个线程通过get()方法获取的时候,取到的只能是自己线程所对应的数据。 看一下PostingThreadState:

 /** For ThreadLocal, much faster to set (and get multiple values). */
     //线程中 保存的数据状态
    final static class PostingThreadState {
        //线程保存的 event队列
        final List<Object> eventQueue = new ArrayList<>();
        //正在发送中
        boolean isPosting;
        //post发送线程 是否主线程
        boolean isMainThread;
        
        //event 的订阅者和订阅Method
        Subscription subscription;
        //event对象
        Object event;
        //是否被取消
        boolean canceled;
    }

接下来我们来看postSingleEvent()方法:

// 发送单个事件
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        //事件event的class
        Class<?> eventClass = event.getClass();
        
        //是否发现subscription 标志位
        boolean subscriptionFound = false;
        //这个一般为true  见EventBus初始化,EventBusBuilder里默认为true
        if (eventInheritance) {
            //找到当前的event的所有 父类和实现的接口 的class集合
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            //遍历每个event/event的父类
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                //发送单个事件 对单个event的class
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

我们来看一下lookupAllEventTypes:

 /** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
    
    // 找当前发送event的 所有父类和实现的接口 的class集合 放入缓存
    private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
        synchronized (eventTypesCache) {
            List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
            //如果是空 新添加
            if (eventTypes == null) {
                eventTypes = new ArrayList<>();
                Class<?> clazz = eventClass;
                //循环添加
                while (clazz != null) {
                    //添加当前 eventClass
                    eventTypes.add(clazz);
                    //递归添加 当前eventClass 的接口 class
                    addInterfaces(eventTypes, clazz.getInterfaces());
                    
                    //再去找当前eventClass 的父类
                    clazz = clazz.getSuperclass();
                }
                eventTypesCache.put(eventClass, eventTypes);
            }
            return eventTypes;
        }
    }

    /** Recurses through super interfaces. */
    //递归添加 当前eventClass 的接口 class
    static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
        for (Class<?> interfaceClass : interfaces) {
            if (!eventTypes.contains(interfaceClass)) {
                eventTypes.add(interfaceClass);
                addInterfaces(eventTypes, interfaceClass.getInterfaces());
            }
        }
    }

以上说明 当发送post某个事件Event时,连同该Event的父类Event 也一起post了。

最后看下postSingleEventForEventType


   //针对某个具体的event 发送单个事件
    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            //当前事件event的所有订阅者+订阅Method
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        
        if (subscriptions != null && !subscriptions.isEmpty()) {
            //遍历所有订阅者 
            for (Subscription subscription : subscriptions) {
            
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    //真正发送去执行
                    postToSubscription(subscription, event, postingState.isMainThread);
                    
                
                    aborted = postingState.canceled;
                } finally {
                    //把postingState 重置
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

 //根据订阅方法的线程模式  执行订阅方法
 private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                //默认的 表示在执行 Post 操作的线程直接调用订阅者的事件响应方法
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    //如果post的线程是主线程 ,就在主线程执行响应方法
                    invokeSubscriber(subscription, event);
                } else {
                    //如果post线程 不是主线程,就添加到mainThreadPoster队列,在主线程执行响应方法
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED://优先主线程
            
                if (mainThreadPoster != null) {
                    //优先主线程
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    //mainThreadPoster 等于null 就在post线程中 继续执行响应方法
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    //如果post线程是主线程 则加入backgroundPoster 的队列,在线程池中执行
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    //如果post线程是子线程,在子线程中继续执行响应方法
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                //无论post线程在主线程还是子线程,都在子线程中执行响应方法
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

总结上面的代码就是,首先从subscriptionsByEventType里获得所有订阅了这个事件的Subscription列表,然后在通过postToSubscription()方法来分发事件,在postToSubscription()通过不同的threadMode在不同的线程里invoke()订阅者的方法,ThreadMode共有四类:

  • PostThread:默认的 ThreadMode,表示在执行 Post 操作的线程直接调用订阅者的事件响应方法,不论该线程是否为主线程(UI 线程)。当该线程为主线程时,响应方法中不能有耗时操作,否则有卡主线程的风险。适用场景:对于是否在主线程执行无要求,但若 Post 线程为主线程,不能耗时的操作;

  • MainThread在主线程中执行响应方法。如果发布线程就是主线程,则直接调用订阅者的事件响应方法,否则通过主线程的 Handler 发送消息在主线程中处理——调用订阅者的事件响应函数。显然,MainThread类的方法也不能有耗时操作,以避免卡主线程。适用场景:必须在主线程执行的操作;

  • BackgroundThread在后台线程中执行响应方法。如果发布线程不是主线程,则直接调用订阅者的事件响应函数,否则启动唯一的后台线程去处理。由于后台线程是唯一的,当事件超过一个的时候,它们会被放在队列中依次执行,因此该类响应方法虽然没有PostThread类和MainThread类方法对性能敏感,但最好不要有重度耗时的操作或太频繁的轻度耗时操作,以造成其他操作等待。适用场景:操作轻微耗时且不会过于频繁,即一般的耗时操作都可以放在这里;

  • Async不论发布线程是否为主线程,都使用一个空闲线程来处理。和BackgroundThread不同的是,Async类的所有线程是相互独立的,因此不会出现卡线程的问题。适用场景:长耗时操作,例如网络访问。

我们来看上面到底是怎么进行线程切换,在主线程,子线程执行响应函数的,即看下面代码实现:

 //切换到主线程 执行响应函数
 mainThreadPoster.enqueue(subscription, event);
 //切换到子线程 执行响应函数
 backgroundPoster.enqueue(subscription, event);

在EvnetBus初始化时:

        
 mainThreadSupport = builder.getMainThreadSupport();
 mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
 backgroundPoster = new BackgroundPoster(this);
        
 //接着看mainThreadSupport.createPoster(this)
  //主线程支持接口
  public interface MainThreadSupport {

    boolean isMainThread();

    Poster createPoster(EventBus eventBus);

    class AndroidHandlerMainThreadSupport implements MainThreadSupport {
        //这个传进来的是mainlooper
        private final Looper looper;

        public AndroidHandlerMainThreadSupport(Looper looper) {
            this.looper = looper;
        }

        @Override
        public boolean isMainThread() {
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            //new了一个HandlerPoster对象
            return new HandlerPoster(eventBus, looper, 10);
        }
    }

}      
    //看一下HandlerPoster,即mainThreadPoster 就是一个HandlerPoster 对象
    
    public class HandlerPoster extends Handler implements Poster {
    //未执行post的队列
    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        //新建了一个队列
        queue = new PendingPostQueue();
    }

    //这就是上面那个mainThreadPoster.enqueue(subscription, event);
    
    public void enqueue(Subscription subscription, Object event) {
        //获得一个要执行的post
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        
        synchronized (this) {
            //加入队列
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                 //然后发送一个Message ,就在就到主线程去处理了  就是下面的handleMessage
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            //死循环 处理队列
            while (true) {
                //队列是null、退出
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //真正的处理响应方法  通过反射
                eventBus.invokeSubscriber(pendingPost);
                
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
        
  //看看BackgroundPoster
  
/**
 * Posts events in background.
 * 发送一个事件到子线程
 
 * @author Markus
 */
final class BackgroundPoster implements Runnable, Poster {
    //还是有一个队列
    private final PendingPostQueue queue;
    //eventbus
    private final EventBus eventBus;
    
    //是否线程正在执行
    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        //新建一个队列
        queue = new PendingPostQueue();
    }
    //加入队列  就是前面的 backgroundPoster.enqueue(subscription, event);
    
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //获取一个post、加入队列
            queue.enqueue(pendingPost);
            
            //如果线程没有正在执行,就加入线程池执行 就执行下面的run方法
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                
                //死循环 处理post队列
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    //对列是null
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    ////真正的处理响应方法  通过反射
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

        

以上就解释了怎样切换 主线程和子线程中 处理响应函数的。

看一下通过反射执行方法invokeSubscriber



    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            //反射执行方法
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }
    
    //final类 暂未执行的post
final class PendingPost {

    //作为缓存池用的避免重复的创建对象
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
    
    //事件对象 event
    Object event;
    //对该event的 订阅者+订阅方法
    Subscription subscription;
    //下一个暂未执行的post
    PendingPost next;

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }

    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        //缓存池里没有对象才会new一个
        return new PendingPost(event, subscription);
    }

    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < 10000) {
                pendingPostPool.add(pendingPost);
            }
        }
    }

}