EvenBus源码分析

987 阅读9分钟

第一次在平台写博客,之前只是在公众号上写,虽然没人看但是自娱自乐的坚持了一段时间。想想毕竟自己也翻过资料看过别人的博客才写出来的应该也有一些价值,不管有没有人看也应该写一写为开发社区贡献一点见解,毕竟开源的目的不是汲取而是分享。

正文

1、基本使用


  • 自定义一个事件类,用于传递数据
public static class MessageEvent { /* Additional fields if needed */ }
  • 在需要订阅事件的地方注册事件
 @Override
 public void onStart() {     
 super.onStart();    
  EventBus.getDefault().register(this);
 } 
  
 @Override
 public void onStop() { 
     super.onStop();     
     EventBus.getDefault().unregister(this);
 }
  • 发送事件
//普通事件,注册后发送才可以接收
EventBus.getDefault().post(new MessageEvent());
//黏性事件,发送后才注册也可以接收。订阅者注解中需要添加sticky=true
EventBus.getDefault().postSticky(new MessageEvent());
  • 处理事件
**
* threadMode类型有四种:
* POSTING:默认,从什么线程发出就在什么线程接收
* MAIN:事件处理在主线程执行,时间太长会导致ANR
* BACKGROUNP:在主线程发出在主线程接收、在子线程发出在原线程接收
* ASYNC:无论从哪里发出,都将在子线程接收。禁止ui操作
*
* @Subscrbe:订阅者注解
*/
@Subscribe(threadMode = ThreadMode.MAIN)  
public void onMessageEvent(MessageEvent event) {
/* Do something */
};

2、源码分析

  • 注册分析

根据源码分析后画的图。其实关于EventBus的源码分析的博客有很多,这只是方便自己日后看回来更好回忆

  • getDefault()
/** Convenience singleton for apps using a process-wide EventBus instance. */
public static EventBus getDefault() {
    EventBus instance = defaultInstance;
    if (instance == null) {
        synchronized (EventBus.class) {
            instance = EventBus.defaultInstance;
            if (instance == null) {
                instance = EventBus.defaultInstance = new EventBus();
            }
        }
    }
    return instance;
}

可以发现,只是一个简单的单例模式,那么看看构造方法做了什么操作。

  • 构造方法
public EventBus() {
    this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
    logger = builder.getLogger();
    subscriptionsByEventType = new HashMap<>();
    typesBySubscriber = new HashMap<>();
    stickyEvents = new ConcurrentHashMap<>();
    mainThreadSupport = builder.getMainThreadSupport();
    mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
    backgroundPoster = new BackgroundPoster(this);
    asyncPoster = new AsyncPoster(this);
    indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
    subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
            builder.strictMethodVerification, builder.ignoreGeneratedIndex);
    logSubscriberExceptions = builder.logSubscriberExceptions;
    logNoSubscriberMessages = builder.logNoSubscriberMessages;
    sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
    sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
    throwSubscriberException = builder.throwSubscriberException;
    eventInheritance = builder.eventInheritance;
    executorService = builder.executorService;
}

很明显构造方法只是进行了需要用到对象的初始化,其中EventBusBuilder是一个建造者模式的建造者。

  • register(Object)
/**
 * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
 * are no longer interested in receiving events.
 * <p/> * Subscribers have event handling methods that must be annotated by {@link Subscribe}.
 * The {@link Subscribe} annotation also allows configuration like {@link
 * ThreadMode} and priority.
 */
public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

会发现两个核心的方法
findSubscriberMethods
subscribe
还有一个核心的类
SubscribeMethod

  • SubscribeMethod
/** Used internally by EventBus and generated subscriber indexes. */
public class SubscriberMethod {
    final Method method;
    final ThreadMode threadMode;
    final Class<?> eventType;
    final int priority;
    final boolean sticky;
    /** Used for efficient comparison */
    String methodString;

    public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.priority = priority;
        this.sticky = sticky;
    }

    @Override
    public boolean equals(Object other) {
        if (other == this) {
            return true;
        } else if (other instanceof SubscriberMethod) {
            checkMethodString();
            SubscriberMethod otherSubscriberMethod = (SubscriberMethod)other;
            otherSubscriberMethod.checkMethodString();
            // Don't use method.equals because of http://code.google.com/p/android/issues/detail?id=7811#c6
            return methodString.equals(otherSubscriberMethod.methodString);
        } else {
            return false;
        }
    }

    private synchronized void checkMethodString() {
        if (methodString == null) {
            // Method.toString has more overhead, just take relevant parts of the method
            StringBuilder builder = new StringBuilder(64);
            builder.append(method.getDeclaringClass().getName());
            builder.append('#').append(method.getName());
            builder.append('(').append(eventType.getName());
            methodString = builder.toString();
        }
    }

    @Override
    public int hashCode() {
        return method.hashCode();
    }
}

发现SubscribeMethod就是一个封装了订阅者的线程优先级、黏性事件、订阅的线程模式、订阅者方法等等信息的类

  • findSubscriberMethods

顾名思义,就是查找SubscribeMethods的方法。具体看看怎么实现的

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    if (subscriberMethods != null) {
        return subscriberMethods;
    }

    if (ignoreGeneratedIndex) {
        subscriberMethods = findUsingReflection(subscriberClass);
    } else {
        subscriberMethods = findUsingInfo(subscriberClass);
    }
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;
    }
}

METHOD_CACHE是一个map集合,用于缓存。代码中可以看到如果读取到有缓存,会立即返回。如果没有缓存最后也会添加到缓存中。

ignoreGeneratedIndex是一个boolean变量,默认值为false。用于判断是否忽略注解生成的MyEventBusIndex,因为我们通常都是使用单例获取Evenbus实例,所以不会有MyEventBusIndex。而如果使用的是以下方式

 EventBus.builder().addIndex(new MyEventBusIndex()).build()

会生成一个MyEventBusIndex文件。由于很少用到我也不是很懂,并且这只是另外一种使用方式而已,不会影响对核心源码的理解。所以不做过多讲解。因此ignoreGeneratedIndex这个值一般都为false。因此先看看findUsingInfo这个方法

    • findUsingInfo
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {

   FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        findState.subscriberInfo = getSubscriberInfo(findState);
        if (findState.subscriberInfo != null) {
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            findUsingReflectionInSingleClass(findState);
        }
        findState.moveToSuperclass();
    }
    return getMethodsAndRelease(findState);
}

会发现有个比较经常出现的类FindState。这是一个内部类。封装了定义订阅者的公共方法。结构如下:

      • FindState
static class FindState {

        final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
        final Map<Class, Object> anyMethodByEventType = new HashMap<>();
        final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
        final StringBuilder methodKeyBuilder = new StringBuilder(128);

        Class<?> subscriberClass;
        Class<?> clazz;
        boolean skipSuperClasses;
        SubscriberInfo subscriberInfo;

        void initForSubscriber(Class<?> subscriberClass) {

...
        }

        boolean checkAdd(Method method, Class<?> eventType) {
...
        }

        private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
...
        }

        void moveToSuperclass() {
...
        }
    }

这种面向对象的封装思想其实在自定义控件中如果对于一些变量有计算的我们同样可以定义一个内部类做封装,这样的显得代码更有可读性。getSubscriberInfo 方法是获取订阅者信息的,它的返回结果会影响后面if-else执行。通过源码发现getSubscriberInfo里面有三种返回结果

      • getSubscriberInfo
private SubscriberInfo getSubscriberInfo(FindState findState) {

    if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
        SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
        if (findState.clazz == superclassInfo.getSubscriberClass()) {
            return superclassInfo;
        }
    }
    if (subscriberInfoIndexes != null) {
        for (SubscriberInfoIndex index : subscriberInfoIndexes) {
            SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
            if (info != null) {
                return info;
            }
        }
    }
    return null;
}

返回superclassinfo和info,都表示有在EventBusBuilder里面有配置MyEventBusIndex,正常不会走这里。所以先看看返回null的情况,这种情况会执行findUsingReflectionInSingleClass 方法。

      • findUsingReflectionInSingleClass
private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // This is faster than getMethods, especially when subscribers are fat classes like Activities
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
        methods = findState.clazz.getMethods();
        findState.skipSuperClasses = true;
    }
    for (Method method : methods) {
        int modifiers = method.getModifiers();
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 1) {
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    Class<?> eventType = parameterTypes[0];
                    if (findState.checkAdd(method, eventType)) {
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException("@Subscribe method " + methodName +
                        "must have exactly 1 parameter but has " + parameterTypes.length);
            }
        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
            String methodName = method.getDeclaringClass().getName() + "." + method.getName();
            throw new EventBusException(methodName +
                    " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
        }
    }
}

会发现这里是通过反射获取到订阅者的所有方法和属性保存到FindState中。这里提一下的是,反射是写架构常常用到的知识点,有人说反射是java语言的灵魂。

因为我们一开始的目的是找到 SubscribeMethod类,这个类的作用上面说了是一个封装订阅者信息的类,而这时我们已经通过反射拿到了订阅者的所有方法和属性。再到后面的添加缓存和读取缓存其实只是为了优化性能用到的了,所以到这里查找订阅者的订阅方法算是完成了。


看回一开始的注册方法,第二个关键方法是订阅subscribe

  • subscribe
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

Subscription(订阅对象)是根据Subscriber(订阅者)和SubscribeMethod(订阅方法)封装的一个对象。

subscriptionByEventType是一个map对象,根据eventtype(事件类型)获取Subscribptions集合,如果为空重新创建并根据eventtype再保存到map中。

往下的for循环是根据订阅方法的优先级插入订阅对象的集合中,最后的 subscribeMethod.sticky 判断是否是黏性事件。会发现未处理的黏性事件(先发送后注册)其实是在注册方法里遍历取出处理的,而黏性事件的Set集合是由postSticky时保存的。

  • 简单总结

订阅方法做了两件事情,第一是Subscription根据eventType封装到subscriptionByEventType中,将SubscribeEvents根据 Subscriber封装到 typesBySubscribe中,第二是黏性事件的处理。


  • 事件发送分析

  • post(Object)
/** Posts the given event to the event bus. */
public void post(Object event) {
    PostingThreadState postingState = currentPostingThreadState.get();
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);

    if (!postingState.isPosting) {
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

首先从PostingThredState中取出事件队列集合并将当前的事件插入到集合中。最后循环交给postSingleEvent 处理

  • postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

eventInheritance表示是否向上查找事件的父类,可以通过EventbusBuilder进行配置,默认值为true。这时会通过lookupAllEventType查找父类父类事件并交给postSingleEventForEventType逐一处理。

  • postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;

postSingleEventForEventType会同步取出订阅对象。传递给PostingThreadState,PostingThreadState其实也只是一个变量的封装。交给PostingThreadState后再把事件交给postToSubscription 处理

  • postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

这个方法的作用是取出订阅方法的线程模式,然后根据不同的线程模式做不同的处理。举例MAIN,如果是主线程则通过反射直接运行订阅方法,而如果不是主线程则需要通过mainThreadPost添加到主线程队列中。mainThreadPost是HandlePost类型继承自Handle。通过Handle对订阅方法的线程进行切换。

  • 简单总结

所以post方法经过一轮转换最后就是通过订阅方法找到线程模式,然后经过线程调度后再通过反射执行到对应的订阅方法。

  • 心得

最后的反注册方法毫无疑问就是去对应的容器中去除订阅方法这里不做过多讲解,看到的朋友觉得写的不好的地方欢迎指正,如果觉得还可以的朋友可以关注我的公众号,但写作对我来说纯属只是给自己一个时间静静学习的机会,即便没有人认可我也会坚持 。


最开始坚持的地方,记录学习与生活的点点滴滴