转载请注明出处:YingKe
综述
EventBus是我们项目中最常用的开源框架之一。对于EventBus的使用方法也非常简单。然而EventBus的内部实现原理也不是很复杂。EventBus是一个Android事件发布/订阅框架,通过解耦发布者和订阅者简化 Android 事件传递。
对于EventBus的工作过程很简单,用一句话概括:事件被提交到EventBus后进行查找所有订阅该事件的方法并执行这些订阅方法
普及名词
- Subscriber :事件订阅者,用于接收事件
- onEvent : 发送者在哪个线程发送的,该方法就在哪个线程处理事件,即
处理者与发送者同线程
- onEventMainThread :不管发送者在哪个线程发送的,
该方法都会在主线程处理任务
- onEventBackgroundThread : 如果发送者是在子线程发送的,该方法也在同一个子线程处理事件,如果发送者在主线程,该方法在一个线程次中处理事件,即
处理事件一定在子线程
- onEventAsync : 不管发送者在哪个线程发送的,
该方法都在线程池中执行
。
使用方法
1. 注册订阅者 首先我们需要将我们希望订阅事件的类,通过EventBus类注册,注册代码如下:
//3.0版本的注册
EventBus.getDefault().register(this);
//2.x版本的注册
EventBus.getDefault().register(this);
EventBus.getDefault().register(this, 100);
EventBus.getDefault().registerSticky(this, 100);
EventBus.getDefault().registerSticky(this);
可以看到2.x版本中有四种注册方法,区分了普通注册和粘性事件注册,并且在注册时可以选择接收事件的优先级,这里我们就不对2.x版本做过多的研究了.由于3.0版本将粘性事件以及订阅事件的优先级换了一种更好的实现方式,所以3.0版本中的注册就变得简单,只有一个register()方法即可
.
2. 编写响应事件订阅方法 注册之后,我们需要编写响应事件的方法,代码如下:
/3.0版本
@Subscribe(threadMode = ThreadMode.BACKGROUND, sticky = true, priority = 100)
public void test(XXXEvent str) {
}
//2.x版本
public void onEvent(XXXEvent event) {
}
public void onEventMainThread(XXXEvent str) {
}
public void onEventBackgroundThread(XXXEvent str) {
}
在2.x版本中只有通过onEvent开头的方法会被注册,而且响应事件方法触发的线程通过onEventMainThread或onEventBackgroundThread这些方法名区分,
而在3.0版本中.通过@Subscribe注解,来确定运行的线程threadMode,是否接受粘性事件sticky以及事件优先级priority
,而且方法名不在需要onEvent开头
,所以又简洁灵活了不少.
3. 发送事件
我们可以通过EventBus的post()
方法来发送事件,发送之后就会执行注册过这个事件的对应类的方法.或者通过postSticky()来发送一个粘性事件.在代码是2.x版本和3.0版本是一样的.
EventBus.getDefault().post(new XXXEvent());
EventBus.getDefault().postSticky(new XXXEvent());
4. 解除注册
当我们不在需要接收事件的时候需要解除注册unregister
,2.x和3.0的解除注册也是相同的.代码如下:
EventBus.getDefault().unregister(this);
源码分析
1. 类图
2. 创建EventBus
一般情况下我们都是通过EventBus.getDefault()
获取到EventBus对象,从而在进行register()或者post()
等等,所以我们看看getDefault()方法的实现:
//默认的EventBus 构建者
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
//单例
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
/**
* Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a
* central bus, consider {@link #getDefault()}.
*/
public EventBus() {
this(DEFAULT_BUILDER);
}
//构造函数
EventBus(EventBusBuilder builder) {
//日志
logger = builder.getLogger();
//key:订阅的事件类型,value:订阅这个事件的所有订阅者集合 线程安全的 按订阅事件分类
subscriptionsByEventType = new HashMap<>();
//key:订阅者对象,value:这个订阅者订阅的事件集合 按订阅者分类
typesBySubscriber = new HashMap<>();
//粘性事件 key:粘性事件的class对象, value:事件对象
stickyEvents = new ConcurrentHashMap<>();
//事件主线程处理
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
//事件 Background 处理
backgroundPoster = new BackgroundPoster(this);
//事件异步线程处理
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
//订阅者响应函数信息存储和查找类 这个比较重要
//默认情况下 subscriberInfoIndexes == null 和ignoreGeneratedIndex == false 后面查找Method时会用
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
3. 注册过程源码分析
3.1 register()方法的实现
3.0的注册只提供一个register()方法了,所以我们先来看看register()方法做了什么:
/**
* 注册一个事件订阅者来接收事件。当订阅者不在对接收事件感兴趣,必须调用{@link #unregister(Object)}
* <p/>
* Subscribers have event handling methods that must be annotated by {@link Subscribe}.
* The {@link Subscribe} annotation also allows configuration like {@link
* ThreadMode} and priority.
* 订阅者处理事件的方法 必须使用注解@Subscribe,并且允许配置ThreadMode和priority
*/
public void register(Object subscriber) {
//首先获得订阅者的class对象
Class<?> subscriberClass = subscriber.getClass();
//通过subscriberMethodFinder来找到订阅者订阅了哪些事件.返回一个SubscriberMethod对象的List,SubscriberMethod
//里包含了这个方法的Method对象,以及将来响应订阅是在哪个线程的ThreadMode,以及订阅的事件类型eventType,以及订阅的优
//先级priority,以及是否接收粘性sticky事件的boolean值.
//订阅者可以订阅多个Event事件
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
//遍历 订阅
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
//订阅 主要作用 注册到subscriptionsByEventType 和typesBySubscriber 两个集合中
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//订阅事件的类型class
Class<?> eventType = subscriberMethod.eventType;
//对subscriber和subscriberMethod的封装
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//根据事件类型,获取对该事件类型感兴趣的所有订阅者
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//遍历 对该事件类型感兴趣的 所有订阅者
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
//1,插在末尾,2,插在优先级的位置,subscriptions按优先级顺序的
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//根据该订阅者,获取它已经订阅的所有事件类型
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
//把当前事件加进去,多一个
subscribedEvents.add(eventType);
//如果事件是粘性的
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
//订阅者Method
public class SubscriberMethod {
//处理事件方法的Method对象
final Method method;
//响应订阅是在哪个线程的ThreadMode,
final ThreadMode threadMode;
//订阅的事件类型eventType
final Class<?> eventType;
//优先级
final int priority;
//是否粘性
final boolean sticky;
/** Used for efficient comparison */
String methodString;
}
//订阅
final class Subscription {
//订阅者
final Object subscriber;
//订阅者Method
final SubscriberMethod subscriberMethod;
}
可以看到register()方法很简洁,代码里的注释也很清楚了,我们可以看出通过subscriberMethodFinder.findSubscriberMethods(subscriberClass)
方法就能返回一个SubscriberMethod
的对象,而SubscriberMethod里包含了所有我们需要的接下来执行subscribe()的信息.所以我们先去看看findSubscriberMethods()是怎么实现的
3.2 SubscriberMethodFinder的实现
看findSubscriberMethods()
到底是如何实现的:
//缓存方法 列表
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//缓存了当前类的方法信息 列表
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
//之前有缓存 的就返回
return subscriberMethods;
}
//根据前面EvenBus实例化 一般为false
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//获取订阅Method
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//放入缓存并返回
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
看看findUsingInfo
方法的实现:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
//从缓存中得到一个FindState对象
FindState findState = prepareFindState();
// 初始化刚才得到的findState
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//获取subscriberInfo 存入findState中,一般为null
findState.subscriberInfo = getSubscriberInfo(findState);
//一般null
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//在一个class钟使用反射 获取Method
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
//获取Method并且释放
return getMethodsAndRelease(findState);
}
//缓存了4个FindState对象
private static final int POOL_SIZE = 4;
private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
//从缓存中拿
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
//不为null,就返回
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
//没有就new个新的
return new FindState();
}
//FindState类
static class FindState {
// 申明fianl 避免重新分配内存
//订阅的Method的列表
final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
//根据事件类型分类 Method
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
//根据Method分类
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
final StringBuilder methodKeyBuilder = new StringBuilder(128);
//订阅者类型
Class<?> subscriberClass;
//订阅者类型
Class<?> clazz;
//跳过父类 一般为false
boolean skipSuperClasses;
//订阅者info,一般为null
SubscriberInfo subscriberInfo;
void initForSubscriber(Class<?> subscriberClass) {
this.subscriberClass = clazz = subscriberClass;
skipSuperClasses = false;
subscriberInfo = null;
}
}
//在一个class中使用反射 获取Method,,subscriberMethods都保存在findState中
private void findUsingReflectionInSingleClass(FindState findState) {
//Method数组
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
//获取订阅者 已经定义的方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//遍历每个方法
for (Method method : methods) {
//标识符 public/private/protect
int modifiers = method.getModifiers();
//订阅方法 一定是public,non-static, and non-abstract
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//方法 的参数类型
Class<?>[] parameterTypes = method.getParameterTypes();
//参数只能为1个
if (parameterTypes.length == 1) {
//method获得 Subscribe注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
// 如果是被Subscribe注解修饰的方法,第一个参数类型就是 订阅者关注的事件类型eventType
Class<?> eventType = parameterTypes[0];
//把这个eventType和method加入findState中
if (findState.checkAdd(method, eventType)) {
//注解对象的threadMode 变量
ThreadMode threadMode = subscribeAnnotation.threadMode();
//findState 中添加subscriberMethod
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
//从findState中获取subscriberMethods并且释放
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
//findState的subscriberMethods
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
//findState对象回收,其实是清空那4个fianl 集合,避免重建,开销大
findState.recycle();
//缓存起来
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
//返回
return subscriberMethods;
}
以上就是所有注册过程,现在再来看这张图就会特别清晰EventBus的register()
过程了:
3.3 事件分发过程源码分析
通过第二节我们知道可以通过EventBus.getDefault().post(XXXEvent event);
来发送一个事件,所以我们就从这行代码开始分析,首先看看post()方法是如何实现的:
/** Posts the given event to the event bus. */
public void post(Object event) {
//得到当前线程的Posting状态. postingState是 当前调用线程保存的变量
PostingThreadState postingState = currentPostingThreadState.get();
//当前线程里 保存的事件集合
List<Object> eventQueue = postingState.eventQueue;
//把要发送的事件 保存在事件队列中
eventQueue.add(event);
//如果当前线程的发送状态postingState 没有在发送中就发送,那么其他的event就会直接进队列
if (!postingState.isPosting) {
//当前发送线程 是否主线程
postingState.isMainThread = isMainThread();
//正在发送中
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//循环发送 单个事件
while (!eventQueue.isEmpty()) {
// 发送单个事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//把当前线程 维护的发送状态 初始化
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
首先是通过currentPostingThreadState.get()
方法来得到当前线程PostingThreadState
的对象,为什么是说当前线程我们来看看currentPostingThreadState的实现:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
currentPostingThreadState
的实现是一个包含了PostingThreadState的ThreadLocal对象
,关于ThreadLocal:ThreadLocal 是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据,
而这段数据是不会与其他线程共享的。其内部原理是通过生成一个它包裹的泛型对象的数组,在不同的线程会有不同的数组索引值,通过这样就可以做到每个线程通过get()
方法获取的时候,取到的只能是自己线程所对应的数据。
看一下PostingThreadState:
/** For ThreadLocal, much faster to set (and get multiple values). */
//线程中 保存的数据状态
final static class PostingThreadState {
//线程保存的 event队列
final List<Object> eventQueue = new ArrayList<>();
//正在发送中
boolean isPosting;
//post发送线程 是否主线程
boolean isMainThread;
//event 的订阅者和订阅Method
Subscription subscription;
//event对象
Object event;
//是否被取消
boolean canceled;
}
接下来我们来看postSingleEvent()
方法:
// 发送单个事件
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
//事件event的class
Class<?> eventClass = event.getClass();
//是否发现subscription 标志位
boolean subscriptionFound = false;
//这个一般为true 见EventBus初始化,EventBusBuilder里默认为true
if (eventInheritance) {
//找到当前的event的所有 父类和实现的接口 的class集合
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
//遍历每个event/event的父类
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//发送单个事件 对单个event的class
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
我们来看一下lookupAllEventTypes
:
/** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
// 找当前发送event的 所有父类和实现的接口 的class集合 放入缓存
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
//如果是空 新添加
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
//循环添加
while (clazz != null) {
//添加当前 eventClass
eventTypes.add(clazz);
//递归添加 当前eventClass 的接口 class
addInterfaces(eventTypes, clazz.getInterfaces());
//再去找当前eventClass 的父类
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
/** Recurses through super interfaces. */
//递归添加 当前eventClass 的接口 class
static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
以上说明 当发送post某个事件Event时,连同该Event的父类Event 也一起post了。
最后看下postSingleEventForEventType
//针对某个具体的event 发送单个事件
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//当前事件event的所有订阅者+订阅Method
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
//遍历所有订阅者
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//真正发送去执行
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
//把postingState 重置
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
//根据订阅方法的线程模式 执行订阅方法
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
//默认的 表示在执行 Post 操作的线程直接调用订阅者的事件响应方法
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
//如果post的线程是主线程 ,就在主线程执行响应方法
invokeSubscriber(subscription, event);
} else {
//如果post线程 不是主线程,就添加到mainThreadPoster队列,在主线程执行响应方法
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED://优先主线程
if (mainThreadPoster != null) {
//优先主线程
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
//mainThreadPoster 等于null 就在post线程中 继续执行响应方法
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
//如果post线程是主线程 则加入backgroundPoster 的队列,在线程池中执行
backgroundPoster.enqueue(subscription, event);
} else {
//如果post线程是子线程,在子线程中继续执行响应方法
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
//无论post线程在主线程还是子线程,都在子线程中执行响应方法
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
总结上面的代码就是,首先从subscriptionsByEventType
里获得所有订阅了这个事件的Subscription列表
,然后在通过postToSubscription()
方法来分发事件,在postToSubscription()
通过不同的threadMode
在不同的线程里invoke()订阅者的方法
,ThreadMode共有四类:
-
PostThread
:默认的 ThreadMode,表示在执行 Post 操作的线程直接调用订阅者的事件响应方法
,不论该线程是否为主线程(UI 线程)。当该线程为主线程时,响应方法中不能有耗时操作,否则有卡主线程的风险。适用场景:对于是否在主线程执行无要求,但若 Post 线程为主线程,不能耗时的操作; -
MainThread
:在主线程中执行响应方法
。如果发布线程就是主线程,则直接调用订阅者的事件响应方法,否则通过主线程的 Handler 发送消息在主线程中处理——调用订阅者的事件响应函数。显然,MainThread类的方法也不能有耗时操作,以避免卡主线程。适用场景:必须在主线程执行的操作; -
BackgroundThread
:在后台线程中执行响应方法
。如果发布线程不是主线程,则直接调用订阅者的事件响应函数,否则启动唯一的后台线程去处理。由于后台线程是唯一的,当事件超过一个的时候,它们会被放在队列中依次执行,因此该类响应方法虽然没有PostThread类和MainThread类方法对性能敏感,但最好不要有重度耗时的操作或太频繁的轻度耗时操作,以造成其他操作等待。适用场景:操作轻微耗时且不会过于频繁,即一般的耗时操作都可以放在这里; -
Async
:不论发布线程是否为主线程,都使用一个空闲线程来处理
。和BackgroundThread不同的是,Async类的所有线程是相互独立的,因此不会出现卡线程的问题。适用场景:长耗时操作,例如网络访问。
我们来看上面到底是怎么进行线程切换,在主线程,子线程执行响应函数的,即看下面代码实现:
//切换到主线程 执行响应函数
mainThreadPoster.enqueue(subscription, event);
//切换到子线程 执行响应函数
backgroundPoster.enqueue(subscription, event);
在EvnetBus初始化时:
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
//接着看mainThreadSupport.createPoster(this)
//主线程支持接口
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
//这个传进来的是mainlooper
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
//new了一个HandlerPoster对象
return new HandlerPoster(eventBus, looper, 10);
}
}
}
//看一下HandlerPoster,即mainThreadPoster 就是一个HandlerPoster 对象
public class HandlerPoster extends Handler implements Poster {
//未执行post的队列
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
//新建了一个队列
queue = new PendingPostQueue();
}
//这就是上面那个mainThreadPoster.enqueue(subscription, event);
public void enqueue(Subscription subscription, Object event) {
//获得一个要执行的post
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//加入队列
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
//然后发送一个Message ,就在就到主线程去处理了 就是下面的handleMessage
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
//死循环 处理队列
while (true) {
//队列是null、退出
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//真正的处理响应方法 通过反射
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
//看看BackgroundPoster
/**
* Posts events in background.
* 发送一个事件到子线程
* @author Markus
*/
final class BackgroundPoster implements Runnable, Poster {
//还是有一个队列
private final PendingPostQueue queue;
//eventbus
private final EventBus eventBus;
//是否线程正在执行
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
//新建一个队列
queue = new PendingPostQueue();
}
//加入队列 就是前面的 backgroundPoster.enqueue(subscription, event);
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//获取一个post、加入队列
queue.enqueue(pendingPost);
//如果线程没有正在执行,就加入线程池执行 就执行下面的run方法
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
//死循环 处理post队列
while (true) {
PendingPost pendingPost = queue.poll(1000);
//对列是null
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
////真正的处理响应方法 通过反射
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
以上就解释了怎样切换 主线程和子线程中 处理响应函数的。
看一下通过反射执行方法invokeSubscriber
:
void invokeSubscriber(Subscription subscription, Object event) {
try {
//反射执行方法
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
//final类 暂未执行的post
final class PendingPost {
//作为缓存池用的避免重复的创建对象
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
//事件对象 event
Object event;
//对该event的 订阅者+订阅方法
Subscription subscription;
//下一个暂未执行的post
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
//缓存池里没有对象才会new一个
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}