EventBus从源码角度来谈谈设计原理

947 阅读8分钟

前言

EventBus是我们常用的第三方框架,主要作用是在各个组件之间进行通信。而且使用起来非常简单,内部原理也并不复杂,如果还不会用的人,可以参考我的这边文章EventBus(一)基础使用。这里对于使用不做说明,我们主要是来看看内部实现方式和原理。

源码解析

eventBus.getDefault()

/** Convenience singleton for apps using a process-wide EventBus instance. */
    public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }

我们看到,在getDefault()方法里面其实是对EventBus进行了初始化操作。里面运用的是一个双重检索形式的单例模式,确保一次只能实例化一个EventBus对象。最后将这个实例化的对象给返回回来,我们后面做的所有操作都是基于这个对象来操作的。

注册对象——register()

 
    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

从注册源码我们可以看到,我们通过findSubscriberMethods()这个方法找到所有的被订阅的方法,即被subscribe修饰的方法,将他们以一个List集合的形式返回回来,之后通过一个for循环,遍历集合中所有被subscribe修饰的方法,将他们放到subscribe()订阅方法里面去处理。

那么EventBus是如何查找到被订阅的方法的?

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
 
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

跳进findSubscribeMethod()这个方法里面去,我们看看里面做了什么操作。可以看到,首先会从一个缓存的Map集合METHOD_CACHE里面查找被订阅的方法,如果能够找得到不为空(null),则将这个查找出来的List集合返回。如果在缓存里面找不到,则这里会进行一次判断,是从通过索引找还是通过反射来查找。这里需要说明一下,EventBus2.0和3.0的区别就在这里。在2.0时代,EventBus都是通过在运行的时候通过反射来查找所有的订阅方法,而在3.0时代EventBus开始在编译的时候就通过反射来查找,这样比起2.0更加快更有效率。

再来看下findUsingReflection()

    private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            findUsingReflectionInSingleClass(findState);
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }
    
    
    private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        findState.recycle();
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        return subscriberMethods;
    }


我们获取FindState,然后通过while循环从其类和其父类中获取到对订阅的方法,然后将findState放入到getMethodsAndRelease()中,在这个方法里面先new一个list集合,然后我们再次通过for遍历,从里面查找方法,最后将list集合返回回去。

之后通过反射找到了被订阅的事件之后我们将他放入缓存Map集合中并将他return返回回来。之后将结果放入到subscribe()方法中。

// Must be called in synchronized block
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }
 
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
 
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);
 
        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

创建一个subscription来保存订阅者和订阅方法,然后是获取当前订阅事件的集合CopyWriteArrayList,之后通过一个循环按照优先级的高低将这个集合中的订阅事件添加到subscription当中去。到此,所有注册事件全部完毕。

注销对象——unregister()

public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

我们看到,在注销方法中没有做什么很深入的操作。整体看来分两步走,一步是移除注册对象和其所有 Event 事件链表,即 typesBySubscriber 移除相关键值对的;再就是在 unsubscribeByEventType() 方法中对 subscriptionsByEventType 移除了该 subscriber 的所有订阅信息(可以看到实际上没有对 METHOD_CACHE 进行相关移除操作,便于下一次注册的时候可以很方便拿到之前的信息,这便是缓存的作用所在)。

发射事件对象——post()

 public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);
 
        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

在这里面,我们先通过currentPostingThreadState获取到一个postingState对象,这个currentPostingThreadState其实就是一个ThreadLocal,这里有点像Handler的感觉。之后通过postingState获取到一个list集合,事件队列,将post中发射的event事件存放到这个集合当中去。为了防止高并发下,一个event不会被调用多次,所以在这里有一个isPosting来判断是否已经开始回调,并且在最后将list集合中的event放入到postSingleEvent中去处理,放入一个就remove掉一个,防止Event多次被调用。

进入postSignleEvent()中,我们看到这里面层层递进,最终调用的是postToSubscription() 方法,我们来看下这个方法

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

发现这个方法里面就开始使用了threadMode线程模式了。这里再说下这四种模式的区别。

  • POSTING: 接收事件方法应执行在发射事件方法所在的线程(由于发射事件方法线程可能是主线程,这意味着接收方法不能执行耗时操作,否则会阻塞主线程)

  • MAIN: 在 Android 中则接收事件方法应执行在主线程,否则(在 Java 项目中)等同于 POSTING。如果发射事件方法已位于主线程,那么接收事件方法会被「立即」调用(这意味着接收事件方法不能执行耗时操作,否则会阻塞主线程;同时,由于是「立即」调用,所以发射事件方法此时是会被接收事件方法所阻塞的),否则等同于 MAIN_ORDERED

  • MAIN_ORDERED: 在 Android 中则接收事件方法会被扔进 MessageQueue 中等待执行(这意味着发射事件方法是不会被阻塞的),否则(在 Java 项目中)等同于 POSTING。

  • BACKGROUND:

    • 发射事件方法在主线程中执行,则接收事件方法应执行在子线程执行,但该子线程是 EventBus 维护的单一子线程,所以为了避免影响到其他接收事件方法的执行,该方法不应太耗时避免该子线程阻塞。
    • 发射事件方法在子线程中执行,则接收事件方法应执行在发射事件方法所在的线程。
    • 在 Java 项目中,接收事件方法会始终执行在 EventBus 维护的单一子线程中。
  • ASYNC: 接收方法应执行在不同于发射事件方法所在的另一个线程。常用于耗时操作,例如网络访问。当然,尽量避免在同一个时间大量触发此类型方法,尽管 EventBus 为此专门创建了线程池来管理回收利用这些线程。

我们可以看到,在这些线程模式当中,我们主要调用的方式就两种,一种invokeSubscriber(),一种是enqueue()。那么那些情况会使用 invokeSubscriber() 方法呢?

  • POSTING: 不用说,既然和发射事件线程同一条线程执行,那么当然直接调用 invokeSubscriber() 即可。

  • MAIN: 在确保发射事件线程是主线程的情况下,直接调用 invokeSubscriber()。

  • MAIN_ORDERED: 如果当前项目不是 Android 项目情况下(纯 Java 项目),将会直接调用 invokeSubscriber()。

  • BACKGROUND: 前面提到如果发射事件线程不是主线程的话,接收事件将会执行于发射事件所在的线程,所以也会直接调用 invokeSubscriber()。

而在enqueue中,我们可以看到里面有一个HandlerPoster,不用说这个肯定是继承自Handler。那么也就是说,在EventBus中我们是通过Handler的形式来通知线程来进行切换的。

EventBus内部主要依靠的是观察者的设计模式,register通过注册,不断的在观察是否有订阅者的事件发生,如果有则将其加入到subscribe中去订阅。而post则将事件Event发送出去,被register检测到了就可以开始进行消息的传递和处理了。当然,我们也可以指定发射消息和接收消息所处在的线程,从而做一些耗时操作或者更新UI的操作。

到此,EventBus全部分析完成!