EventBus
是一个用于组件间通信的框架。它为开发提供一种非常简便的方式来是实现组件间解耦通信,并且提供了线程切换、优先级设置等功能。
从官方的示意图中不难看出,EventBus
使用的是观察者模式
:Subscriber
注册到EventBus
, 当Publisher
使用post
方法将Event
发送给EventBus
,EventBus
就会回调Subscriber
的onEvent
方法。观察者模式能将观察者和订阅者最大程度的解耦,这也是EventBus
的功能所在。
具体用法就不多说了,具体可见官方主页github.com/greenrobot/…
本文解析使用的EventBus
版本是3.0.0
。
进行源码分析之前,先思考一下,如果是自己,会如何实现?
首先,我们需要将注册的类、声明的订阅方法,以及方法执行的线程、优先级都保存下来; 其次,我们要可以根据接收的事件去快速查找到对应的订阅者,当有消息通知时,可以高效通知; 再者,我们还需要自己去处理线程间的切换以满足不同的应用场景; 最后,我们应该提供注销功能以便取消消息订阅。
带着思路,一起来看看源码是如何一步一步实现的。本文根据我们使用EventBus
的步骤来进行讲解的。
注解@Subscribe
在声明订阅方法时,要求使用
@Subscribe
注解,先来看下它的具体定义
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
ThreadMode threadMode() default ThreadMode.POSTING;
boolean sticky() default false;
int priority() default 0;
}
先来看三个元注解:
@Documented
:生成java文档时,会将该注解也写进文档里
@Retention(RetentionPolicy.RUNTIME)
:有效周期是在运行时
@Target({ElementType.METHOD})
:指定对方法有效。
注解里声明了三个成员变量:
threadMode
(订阅方法执行的线程)sticky
(是否是粘性事件)priority
(优先级)。
ThreadMode
是一个枚举类,定义了四种线程模式:
public enum ThreadMode {
POSTING,
MAIN,
BACKGROUND,
ASYNC
}
POSTING: 和发送事件的线程在同一个线程,避免了线程切换开销。
MAIN:订阅在主线程,事件将排队等待交付(非阻塞)。使用此模式的订阅者必须快速返回,以避免阻塞主线程。
BACKGROUND:如果是在主线程发布,则会订阅在一个后台线程,依次排队执行;如果不是在主线程发布,则会订阅在发布所在的线程。
ASYNC: 在非主线程和发布线程中订阅。当处理事件的方法 是耗时的,需要使用此模式。尽量避免同时触发大量的耗时较长的异步操作,EventBus 使用线程池高效的复用已经完成异步操作的线程。
EventBus
之所以要求在订阅方法上加上@Subscribe
注解,就是相当于给订阅者打标签,框架根据注解去找到订阅者。
注册
先来看看注册的过程
public void register(Object subscriber) {
//拿到订阅者的运行时类型Class
Class<?> subscriberClass = subscriber.getClass();
//利用订阅者的Class去查找类中声明的订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
//循环遍历逐个将订阅者和订阅方法订阅到EventBus
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
主要有三步,第一步拿到订阅者的类对象,第二步通过订阅者类对象找到类中的所有订阅方法,第三步进行订阅。
查找订阅方法
在上述第二步使用了一个SubscriberMethodFinder
实例来进行方法查找。SubscriberMethodFinder
这个类是专门用来查找订阅方法的,findSubscriberMethods()
最后返回了一个SubscriberMethod
集合。SubscriberMethod
类则就是对我们声明的订阅方法和参数的封装。可以先略过。
接着看findSubscriberMethods()
是如何通过订阅者的Class对象来进行方法查找的。
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//从缓存中查找,如果已经有,则直接返回。
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//如果缓存中没有查找到,就通过订阅者Class对象进行查找。
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 使用apt提前解析的订阅者信息
subscriberMethods = findUsingInfo(subscriberClass);
}
// 将查找到的方法存入缓存并返回
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
这里做了个缓存处理METHOD_CACHE
,因为查找是比较耗时的操作,缓存可以提高效率。
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
METHOD_CACHE
是一个以订阅者Class为Key
, 订阅方法集合为Value的线程安全的的HashMap
。
第一次执行肯定是没有缓存的,然后会根据ignoreGeneratedIndex
来执行不同的方法。从方法名来看,一个是使用反射去查找,另一个是使用已有的信息去查找。
其实这里就是3.0.0引入的优化点:3.0.0引入了APT(Annotation Processing Tool)
,它可以在编译阶段就提前解析注解,提前检索到订阅方法。这样就提高了运行时效率。
ignoreGeneratedIndex
这个值默认是false,因为反射开销大,所以默认是走findUsingInfo()
分支,但是在findUsingInfo()
方法中会检查本地是否有apt
预先解析出的订阅者信息,如果没有,还是会执行反射方法findUsingReflectionInSingleClass()
。
启动apt
的部分涉及到了注解处理器,所以会单独起一篇来讲解,先来看看不使用apt
的情况:
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
//准备一个FindeState实例
FindState findState = prepareFindState();
//将订阅者Class与FindeState实例关联
findState.initForSubscriber(subscriberClass);
//从子类到父类去逐一查找订阅方法
while (findState.clazz != null) {
//使用反射查找一个类的订阅方法 findUsingReflectionInSingleClass(findState);
//将父类赋给findState.clazz,往上进行查找
findState.moveToSuperclass();
}
//返回订阅方法集合并回收FindState实例
return getMethodsAndRelease(findState);
}
这里去查找订阅方法,因为子类会继承父类的方法,所以当子类找不到时,需要去查找父类。这里使用了一个while循环递归进行查找。查找过程使用了一个新的对象FindState
,它是用来存储查找过程中的一些信息,方便进行迭代查找。它的类定义:
static class FindState {
//订阅方法集合
final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
final StringBuilder methodKeyBuilder = new StringBuilder(128);
//当前订阅者
Class<?> subscriberClass;
//当前查找的类
Class<?> clazz;
//是否跳过父类查找
boolean skipSuperClasses;
SubscriberInfo subscriberInfo;
}
接着往下走,进入真正开始使用反射解析一个类的订阅方法:findUsingReflectionInSingleClass()
:
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 只获取非继承的方法,这个方法比getMethods()方法效率高,比如对一个Activity来说。 findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// 异常则直接获取所有方法(包括继承的), 这样就不用再检查父类了。
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
// 遍历所有方法筛选出订阅方法
for (Method method : methods) {
int modifiers = method.getModifiers();
// 必须是public的、非静态、非抽象的方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//获取到所有参数的类型
Class<?>[] parameterTypes = method.getParameterTypes();
// 只能有一个参数
if (parameterTypes.length == 1) {
//获取@Subscribe注解的方法
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
//检查该eventType是否已订阅了,通常订阅者不能有多个 eventType 相同的订阅方法
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
//将订阅方法和对应的接收的Event类型以及注解参数保存
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
这个方法主要是通过修饰符、参数个数、指定注解和是否有EventType
相同的方法这几层筛选,最终将订阅方法添加进 findState
的 subscriberMethods
这个 List 中。
其中重点有一个判断:findState.checkAdd()
,这个方法决定了是否订阅方法可以被保存下来进而能接收到消息。一起来看看它是如何判断的:
private boolean checkAdd(Method method, Class<?> eventType) {
// 1.检查eventType是否已经注册过对应的方法(一般都没有)
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
// 2. 如果已经有方法注册了这个eventType
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
throw new IllegalStateException();
}
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}
进行了两种检查,第一种是判断当前类中是否已经有这个EventType
和对应的订阅方法,一般一个类不会有对同一个EventType
写多个方法,会直接返回true
,进行保存。
但是如果出现了同一个类中同样的EventType
写了多个方法,该如何处理?
还有当findUsingReflection()
中进行下一轮循环,会进行父类查找,如果子类继承了父类的订阅方法,又该如何处理呢?
答案就在上边的注释2。关键点就是checkAddWithMethodSignature()
方法:
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
// 以[方法名>eventType]为Key
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
String methodKey = methodKeyBuilder.toString();
// 拿到新的订阅方法所属类
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// methodClassOld == null或者 methodClassOld是methodClass的父类/同一个类
return true;
} else {
// Revert the put, old class is further down the class hierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}
对于同一类中同样的EventType
写了多个方法,因为方法名不同,所以[方法名>eventType]
的Key
不同,methodClassOld
会为null
,直接返回 true
。所以这种情况会将所有相同EventType
的方法都进行保存。
对于子类重写父类方法的情况,则methodClassOld
(即子类)不为null
,并且methodClassOld
也不是methodClass
的父类,所以会返回false
。即对于子类重写父类订阅方法,只会保存子类的订阅方法,忽略父类的订阅方法。
至此,findState
的查找任务就结束了,通过循环向父类查找,将订阅者的订阅方法都保存在了其内部变量subscriberMethods
列表中。
最后,跳出循环,回到findUsingReflection()
方法中,最后返回了 getMethodsAndRelease(findState)
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}
很好理解,就是将findState
中的subscriberMethods
取出并返回。可以看到作者还是很细心的,将使用完的findState
实例置空恢复后又放回实例池中,将实例回收利用,节省了新的开销。
再回到findSubscriberMethods()
方法中,将查找的方法最后都存进了内存缓存METHOD_CACHE中, 对应关系是订阅类和它的订阅方法:
METHOD_CACHE.put(subscriberClass, subscriberMethods);
到这里,查找订阅方法就结束了。
订阅(subscribe())
回到register()
方法中的第三步,对上一步查找到的订阅方法集合进行了遍历调用subscribe()
方法。来看看subscribe()
方法做了什么事:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
// 实例一个Subscription对象,内部持有了订阅者和订阅方法
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// subscriptionsByEventType是以EventType为Key,以它对应Subscription集合的Map
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
// 根据优先级排序Subscription
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// typesBySubscriber保存了订阅者对应的所有EventType
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 粘性订阅方法要立即处理
if (subscriberMethod.sticky) {
// 默认为true
if (eventInheritance) {
// 看当前EventType是否是已有的stickyEvent的子类或父类
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
// 立即投递事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
这里其实只做了三件事:
- 将订阅者和订阅方法封装到
subscriptionsByEventType
,它可以根据EventType
拿到所有的Subscription
对象,Subscription
对象中就有订阅者和订阅方法。这样当有EventType
消息过来时,可以快速的传递给订阅者的订阅方法。 - 将订阅者和订阅方法封装到
typesBySubscriber
,它可以根据订阅类拿到所有的EventType
。这样当我们调用调用unregister(this)
时,就可以拿到EventType
,又根据EventType
拿到所有订阅者和方法,进行解绑了。 - 如果当前订阅方法是粘性方法,则立即去查找是否有本地事件,有的话就立即投递。
至此,我们的注册就完成了。可以看到某些方法栈的调用还是非常深的,但是整体流程却很简单。这也是值得我们学习的地方。
注销
注册对应的就是注销,当我们的订阅者不再需要接收事件时,可以调用unregister
进行注销:
public synchronized void unregister(Object subscriber) {
// 通过订阅者拿到它订阅的所有的订阅事件类型
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
// 遍历事件类型集合,根据事件类型解绑
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
// 从记录中移除订阅者
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
代码很简单,继续看unsubscribeByEventType
:
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 根据事件类型拿到所有的Subscription
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
// 遍历所有Subscription,符合解除条件的进行remove
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
相比注册流程,注销流程就非常简单了,就是把对应的注册者和对应的注册信息从记录中移除即可。
发送事件
注册完毕后,我们的订阅者和订阅方法都被记录在了EventBus
里,这时就可以给订阅者们发送事件了。EventBus
提供了两种发送方法post()
和postSticky()
。post()
发送的是非粘性的事件,postSticky()
发送的是粘性事件。
post
先来看看post()
发送:
public void post(Object event) {
// 从当前线程中取出PostingThreadState
PostingThreadState postingState = currentPostingThreadState.get();
// 拿到EventType队列
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
// 当前线程是否有消息正在投递
if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
//正在投递
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 循环一个个进行投递
while (!eventQueue.isEmpty()) {
// 投递出去就remove掉
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 所有消息都投递完成
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
第一步使用ThreadLocal
,是因为ThreadLocal
保证了数据只对当前线程可见,其他线程是不可见的,这样的话当我们从不同的线程中去取数据,数据相当于是分开保存,设置和读取就会比较快。从当前线程中取出的是一个PostingThreadState
:
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<Object>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
PostingThreadState
主要包含了当前线程的Event
队列、订阅者信息、事件等数据。接下来就是循环调用postSingleEvent()
方法进行投送了:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
// 如果是可继承的事件
if (eventInheritance) {
// 查找Event的所有父类、接口类以及父类的接口类(Event的父类和接口也是Event)
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 根据查找到的所有Class(Event),逐个寻找订阅者,进行分发event
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// 如果不是可继承的事件,则直接对
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// 如果没有找到订阅者,报异常
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
这个方法的作用很简单,就是做了个分支处理:如果是可继承的事件,则查找到它的所有父事件,然后再往下分发。继续看postSingleEventForEventType()
方法:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
// 可能同时在多个线程同时发送Event,subscriptionsByEventType是共有常量,所以需要加锁
synchronized (this) {
// 根据Event的类型拿到所有订阅者
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
// 事件还是Event,但是可能会分发到订阅它父类的订阅者中
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 逐个通知订阅者
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
这个方法和上一个方法的职责也很单一,就是查找所有订阅者,然后遍历进行通知。查找用的是一个Map: postToSubscription
,就是在订阅时生成的一个[EventType -> List<Subscription>]
Map, 这样我们就可以根据EventType
查找到所有订阅者。接着看postToSubscription()
:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 根据订阅线程模式进行switch
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 直接调用在本线程
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
// 如果就在主线程,则直接调用
invokeSubscriber(subscription, event);
} else {
// 如果不在主线程,则使用mainThreadPoster
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
// 如果在主线程,使用backgroundPoster
backgroundPoster.enqueue(subscription, event);
} else {
// 如果不在主线程,则直接在当前线程调用
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 启动新的线程调用,asyncPoster
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
这里就是EventBus
的线程切换了,主要有POSTING
、MAIN
、BACKGROUND
、ASYNC
四种模式,四种模式在开篇已经介绍过了。这里涉及到了invokeSubscriber()
方法和mainThreadPoster
、backgroundPoster
、asyncPoster
三个poster,接下来我们就分别来看下:
invokeSubscriber()
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
invokeSubscriber()
就是直接在当前线程调用了订阅者的method
对象,这里调用了反射类Method
的方法invoke()
直接调用执行。
mainThreadPoster
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
mainThreadPoster
是个自定义的类HandlerPoster
,它的目的是在主线程中调用订阅方法,而EventBus
使用的就是我们熟悉的Handler
:
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
// 用来最后调用方法
this.eventBus = eventBus;
// 最大处理时间
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
// 一个待处理消息的队列
queue = new PendingPostQueue();
}
...
...
}
HandlerPoster
是自定义的Handler
,发送消息使用的是Looper.getMainLooper()
即主线程的Handler
。 内部定义了一个最大处理消息时间,默认是10
毫秒,所以说我们一定不要在订阅方法中做耗时操作。还维护了一个PendingPostQueue
,它是自定义的一个链表队列,这里猜测HandlerPoster
可能是自己维护了消息队列,来看下入队方法:
void enqueue(Subscription subscription, Object event) {
// 获取一个PendingPost实例
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 入队
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
// 主线程的handler发送消息,发送到主线程
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
这里出现了一个新的类PendingPost
,它封装了订阅者subscription
实例和事件event
实例;它内部又维护了一个大小为10000
的PendingPost
(数组集合)池,用来重复利用PendingPost
实例。然后将PendingPost
实例入队到上一步说的PendingPostQueue
队列中。接着使用主线程的Handler
发送一个消息。接下来就是在handleMessage()
中如何处理消息了:
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
// 从队列中取出一个pendingPost
PendingPost pendingPost = queue.poll();
// 如果队列里的消息处理完毕,就直接跳出循环。
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
// 调用订阅方法并会回收pendingPost
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
// 如果方法的执行时间超过最大执行时间(默认10毫秒)
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
这里就是使用了while循环,不断从队列中去取PendingPost
处理,但是加了个最大执行时间处理,因为是在主线程调用,所以一旦超时,就退出队列,并重新尝试去再进入队列。
BackgroundPoster
再来看看BackgroundPoster
BackgroundPoster
的作用是将UI线程的订阅方法调度在非UI线程中。即它是要执行在新的Thread
中的,而开启线程我们最常用的就是Runnable
, 来看看源码:
final class BackgroundPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
...
...
}
果不其然,BackgroundPoster
实现了Runnable
接口,这样就可以被线程执行。其内部也是维护了EventBus
和一个PendingPost
队列。
public void enqueue(Subscription subscription, Object event) {
// 从消息池中构建一个PendingPost
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 入队
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
// 线程池调度执行
eventBus.getExecutorService().execute(this);
}
}
}
和HandlerPoster
类似:新建一个新的PendingPost
入队,使用了EventBus
里的一个ExecutorService
,它是对线程池定义的一个接口,来看看它的默认值:
public class EventBusBuilder {
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
...
ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
}
熟悉的Executors
,创建了一个可缓存的线程池,用来执行BackgroundPoster
这个Runnable
对象,再来看看BackgroundPoster
的run()
方法:
@Override
public void run() {
try {
try {
while (true) {
// 取出队头
PendingPost pendingPost = queue.poll(1000);
// 如果队头为空,说明队列里没有消息了,直接退出循环
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
// 调用订阅方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
也和HandlerPoster
类似,但是不同之处在于:
- 使用了
poll(int maxMillisToWait)
方法,这个设计很巧妙,当取到最后发现队列为空后,会wait
1000 毫秒,当有有新的信息来临时就会唤醒线程,poll
出消息。这样设计就减少了发送消息的次数,节省了资源。 - 因为是在子线程执行,所以就没有方法执行时间的限制了。
AsyncPoster
class AsyncPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
//直接开启新的线程执行
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
// 直接取出消息执行
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
可以看到AsyncPoster
和BackgroundPoster``非常的相似,因为它们的功能也非常相似。但是不同之处在于:BackgroundPoster
是尽可能使用一个后台线层去依次排队执行订阅方法;而AsyncPoster
则是每条消息都直接开启新的后台线程立即执行。
至此四个Poster就讲完了,看完是真的爽,不论是从功能抽象到具体细节的把控,EventBus
都处理的很好,非常值得学习。
粘性事件
粘性事件这名字一听很耳熟,没错,安卓四大组件之BroadcastReceiver
就有一种广播叫做粘性广播(StickyBroadcast
),而EventBus
也提供了类似的功能:当注册了粘性事件后,立即能收到还没有注册时系统发出的最后一个事件。
public void postSticky(Object event) {
// 将粘性事件保存下来
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
postSticky()
方法用来发送一个粘性事件,在这个方法中,直接将粘性事件保存在了一个Map集合中,而key
就是Event
的Class对象。接着就调用正常的post()
方法了。
那为什么我们后注册的方法也能接收到之前发出的粘性事件呢,答案就在上面提到的注册方法subscribe()
中的最后一段:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
...
// 如果是粘性事件,则直接发送出去
if (subscriberMethod.sticky) {
if (eventInheritance) {
// 从stickyEvents取出粘性事件的Class对象
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
// 如果订阅的事件是保存的粘性事件Class或它的父类
if (eventType.isAssignableFrom(candidateEventType)) {
// 取出缓存的Event
Object stickyEvent = entry.getValue();
// 将缓存的Event发送出去
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
在我们注册订阅方法和事件时,如果是粘性事件,就直接会将事件发送给注册了相同Event
的订阅者,方法中调用了checkPostStickyEventToSubscription(newSubscription, stickyEvent)
方法:
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());
}
}
很简单,直接又调用了postToSubscription()
方法,根据指定线程分别进行分发。
总结
整体看下来,框架的结构非常的清晰全面,尤其在对方法功能的封装和细节处理上,很是值得学习。尤其在3.0后,EventBus
使用了注解处理器在编译阶段就完成了订阅者的解析,这使得框架更佳的轻量和高效。整片源码读下来还是很爽的,没事还要再复习多看几遍,很多东西都值得学习。共勉!