使用 LiveDataBus 替代 EventBus

4,726 阅读8分钟

前言

引用官方 LiveData 介绍

LiveData is an observable data holder class. Unlike a regular observable, LiveData is lifecycle-aware, meaning it respects the lifecycle of other app components, such as activities, fragments, or services. This awareness ensures LiveData only updates app component observers that are in an active lifecycle state.

译:LiveData 是可观察的数据持有者类。与常规的可观察对象不同,LiveData 具有生命周期感知功能,这意味着它遵循其他应用程序组件(例如 Activity ,Fragment 或 Service)的生命周期。这种意识确保 LiveData 仅更新处于活动生命周期状态的应用程序组件观察者。

LiveDataBus 和 EventBus 一样作为消息总线来使用,但实际上它算是一种模式,例如 RxBus 依托于 RxJava 的支持,仅用了不到 30 行代码便可实现一个新的消息总线;而 LiveDataBus 与之类似,依托于官方的 LiveData ,我们便可以自己写出一个消息总线,并且这个消息总线是具有生命周期感知能力的。

实现

object LiveDataBus {
    
    private val bus: MutableMap<String, MutableLiveData<Any>> = mutableMapOf()
    
    fun <T> with(key: String): MutableLiveData<T> {
        if (!bus.containsKey(key)) {
            bus[key] = MutableLiveData()
        }
        return bus[key] as MutableLiveData<T>
    }
}

嗯?!这样就完了?对的,使用 Kotlin 的话代码很少很简洁。首先这个 LiveDataBus 是一个单例,内部维护着一个可变 map ,这个 map 通过 key 值来返回相应的 LiveData 对象(如果没有这个对象的话就新建一个放进 map 再返回),而拿到这个对象就可以进行订阅或者发送消息了。

使用

发送消息

class MainActivity : AppCompatActivity() {
    
    override fun onCreate(savedInstanceState: Bundle?) {
        // ...
        
        // post
        LiveDataBus.with<String>("from_main")
      		.value = "From MainActivity."
        Thread {
            // 非主线程下使用 postValue
            LiveDataBus.with<String>("from_main")
            	.postValue("From MainActivity.")
        }.start
    }
}

订阅接收消息

class SecondActivity : AppCompatActivity() {
    
    private val observer = Observer<String> {
        Log.e(TAG, it)
    }
    
    override fun onCreate(savedInstanceState: Bundle?) {
        // ...
        
        // 使用 observe 函数进行订阅的话,只会在宿主处于前台的状态才会进行回调,
        // 也就是如果 activity 被挂在后台了,那么消息会等到 activity 返回前台
        // 时再进行回调,这就是 LiveData 的生命周期感知能力带来的好处。
        LiveDataBus.with<String>("from_main")
        	.observe(this::getLifecycle) {
                Log.e(TAG, it)
            }
        
        // 但是如果我们希望即使 activity 不在前台也可以收到消息,这时就要使用
        // observeForever 这个函数了,使用这种方式订阅相当于忽略宿主的生命周期,
        // 任何时刻只要宿主还存活着,就一定会回调消息,缺点是需要自己注册与反注册。
        LiveDataBus.with<String>("from_main")
        	.observeForever(observer)
    }
    
    override fun onDestroy() {
        // ...
        LiveDataBus.with<String>("from_main")
       		.removeObserver(observer)
    }
}

真滴棒,仅仅用了几行代码就实现了一个消息总线,虽然是在 LiveData 的基础上将其发扬光大,但还是牛逼。于是开始投入使用,用着用着突然发现,好像有点不对劲,为什么我一订阅了就马上收到消息了,而且这个消息是在我订阅之前就发出的,坑爹啊......

问题

回顾一下上面遇到的问题,在订阅消息之后立刻就能收到订阅前发出的消息,这种属于粘性消息,用过 EventBus 的都知道,但问题是我们并不需要每次都接收粘性消息呀,如果消息不能准确接收的话那这个消息总线还是正宗的消息总线吗?

出现这个问题的原因: LiveData 内部的 mVersion 属性和 LiveData 内部类 ObserverWrapper 内部的 mLastVersion 属性之间会作比较,在订阅时只要 mLastVersion 小于 mVersion 就会强制推送最新的消息。LiveData 只要调用过 setValue 或者 postValue 都会使 mVersion++ ,而不管是 LiveData 或者是 LiveData.ObserverWrapper ,其 version 初始值都是 -1 ,只要用 LiveData 设置了值后它的 mVersion 值无论如何都大于 -1 了,这时再进行订阅就会马上收到最后发出的消息了。

源码分析

通过阅读源码可以发现,每当我们调用 LiveData.observe 函数时,内部都会创建一个 LiveData.LifecycleBoundObserver ,它是 ObserverWrapper 子类,并且实现了 GenericLifecycleObserver 接口,从名字上看可以知道这是一个绑定宿主生命周期的类。当 LifecycleOwner 的状态发生变化时,会调用 LiveData.LifecycleBoundObserver 的 onStateChanged 函数。

接着在 onStateChanged 函数内部进行判断,如果宿主不是 DESTROYED 状态就调用从 ObserverWrapper 继承过来的 activeStateChanged 函数,如果这个时候 ObserverWrapper 的状态是 active ,就继续调用 LiveData.dispatchingValue 函数。

LiveData.dispatchingValue 函数中又会调用 LiveData.considerNotify 函数。

LiveData.considerNotify 函数中,可以看到红框中的代码就是根本原因了,如果 ObserverWrapper 的 mLastVersion 小于 LiveData 的 mVersion ,就回调 onChanged() 函数,并且每个新的订阅者其 version 都是 -1 ,LiveData 一旦设置过 value 其 version 都是大于 -1 的,这样直接导致了每注册一个新的订阅者,这个订阅者马上就收到消息,即使这个发布消息的动作发生在订阅之前。

解决问题

通过阅读源码,终于弄明白了为什么会出现这种问题,那么既然出现了这个问题,就得想办法解决。根据上面的分析,只要在注册一个新的订阅者时把 ObserverWrapper 的 mLastVersion 设置成和 LiveData 的 mVersion 一样即可,看图。

我们直接从红框部分入手,LifecycleBoundObserver 是 ObserverWrapper 子类,创建完之后被放进了 mObservers ,mObservers 是 LiveData 的成员变量(SafeIterableMap<Observer<? super T>, ObserverWrapper>),实际上它是一个链表,从 mObservers 的源码中可以发现有一个 protected 的 get 函数,因此我们可以在调用 observe 的时候通过反射拿到 mObservers 中的 LifecycleBoundObserver ,再把它的 mLastVersion 设置成和 LiveData 一致。

而对于非生命周期感知的 observeForever 函数,函数内部生成的不是 LifecycleBoundObserver ,而是 AlwaysActivityObserver ,并且我们没有机会在 observeForever 调用完成后再去更改 AlwaysActiveObserver 的 mLastVersion ,因为再 observeForever 函数体内,回调就已经发生了,看下图。

对于 observeForever ,既然是在函数体中就发生消息回调的,那么可以写一个自己的 ObserverWrapper ,把真正的回调给包装起来。把我们自己的 ObserverWrapper 传给 observeForever ,然后在回调的前去检查调用栈,如果回调是 observeForever 引起的就不回调真正的订阅者。

改良后的 LiveDataBus

object LiveDataBus {

    private val bus: MutableMap<String, BusMutableLiveData<Any>> = mutableMapOf()

    fun <T> with(key: String): BusMutableLiveData<T> {
        if (!bus.containsKey(key)) {
            bus[key] = BusMutableLiveData()
        }
        return bus[key] as BusMutableLiveData<T>
    }

    class BusMutableLiveData<T> : MutableLiveData<T>() {
        private val observerMap: MutableMap<Observer<*>, Observer<*>> = mutableMapOf()

        fun observeSticky(
            owner: () -> Lifecycle,
            observer: (T) -> Unit
        ) {
            super.observe(owner, observer)
        }

        fun observeForeverSticky(observer: Observer<in T>) {
            super.observeForever(observer)
        }

        override fun observeForever(observer: Observer<in T>) {
            if (!observerMap.containsKey(observer)) {
                observerMap[observer] = ObserverWrapper(observer)
            }
            super.observeForever(observerMap[observer] as Observer<in T>)
        }

        override fun removeObserver(observer: Observer<in T>) {
            val realObserver: Observer<in T> =
                if (observerMap.containsKey(observer)) {
                    observerMap.remove(observer) as Observer<in T>
                } else {
                    observer
                }
            super.removeObserver(realObserver)
        }

        override fun observe(
            owner: LifecycleOwner,
            observer: Observer<in T>
        ) {
            super.observe(owner, observer)
            try {
                hook(observer)
            } catch (e: Exception) {
                e.printStackTrace()
            }
        }

        /**
         * 利用反射将 LiveData 的 mVersion 赋值给 ObserverWrapper 的 mLastVersion
         */
        @Throws(Exception::class)
        private fun hook(observer: Observer<*>) {
            // Get wrapper's version.
            val liveDataClass = LiveData::class.java
            // SafeIterableMap<Observer<? super T>, ObserverWrapper> mObservers
            val observersField = liveDataClass.getDeclaredField("mObservers")
            observersField.isAccessible = true
            val observers = observersField[this]
            val observersClass: Class<*> = observers.javaClass
            // It's mObservers's get method.
            val methodGet = observersClass.getDeclaredMethod("get", Any::class.java)
            methodGet.isAccessible = true
            val observerWrapperEntry = methodGet.invoke(observers, observer)
            var observerWrapper: Any? = null
            if (observerWrapperEntry is Map.Entry<*, *>) {
                // Now we got observerWrapper.
                observerWrapper = observerWrapperEntry.value
            }
            if (observerWrapper == null) {
                throw NullPointerException("Wrapper can not be null!")
            }
            val observerWrapperParentClass: Class<*>? = observerWrapper.javaClass.superclass
            val lastVersionField = observerWrapperParentClass!!.getDeclaredField("mLastVersion")
            lastVersionField.isAccessible = true
            // Get livedata's version.
            val versionField = liveDataClass.getDeclaredField("mVersion")
            versionField.isAccessible = true
            val version = versionField[this]
            // Set wrapper's version.
            lastVersionField[observerWrapper] = version
        }
    }

    private class ObserverWrapper<T>(val observer: Observer<T>) : Observer<T> {
        override fun onChanged(t: T) {
            if (isCallOnObserverForever()) {
                return
            }
            observer.onChanged(t)
        }

        private fun isCallOnObserverForever(): Boolean {
            val stackTrace = Thread.currentThread().stackTrace
            for (element in stackTrace) {
                if ("androidx.lifecycle.LiveData" == element.className
                    && "observeForever" == element.methodName
                ) {
                    return true
                }
            }
            return false
        }
    }
}

在这段源码中,用了 BusMutableLiveData 替换了 MutableLiveData ,并且增加了 2 个订阅粘性消息的函数,毕竟有时确实需要用到粘性消息,不可能赶尽杀绝,而原函数 observeobserveForever 则统一变成了非粘性订阅。

总结

可以看到相比第一版的 LiveDataBus ,改良版增加了许多代码,因为 version 机制,但反过来一想,如果没有这个 version ,我们一样还是要自己实现粘性订阅的功能,在 Android 帮我们实现了这个功能后,我们再自己增加非粘性订阅的功能,其实都差不多。

总结一下 LiveDataBus 的优缺点:

优点

  1. UI 和数据保持实时同步,因为 LiveData 采用观察者模式,只要数据发生改变就立刻通知订阅者。
  2. 避免内存泄漏,订阅者和生命周期绑定,当宿主销毁时订阅者能够马上清理自身数据。
  3. 解决 Configuration Change 问题,在屏幕发生旋转或宿主被回收重建时,能够第一时间接收到最新消息。
  4. 延续了传统消息总线的功能,如果宿主在非活跃状态下仍然需要进行消息回调,可以通过 observeForever 进行回调。
  5. 相比较 EventBus 和 RxBus 有着更好的支持,因为是官方组件,不需要额外进行依赖,减少 APK 包体大小。

缺点

  1. 订阅的时候有 Crash 风险,因为我们需要自己指定返回的消息类型,如果发布的消息类型与订阅时指定的消息类型不一致就直接 Crash 了。
  2. 如果需要和同事联调,因为这个消息类型会比较头疼,需要双方约定好每个 key 的消息类型,不然也是 Crash 。
  3. 其实就是上面 2 个缺点结合,增加了使用难度,在指定消息类型的时候需要谨慎。

本文参考了美团大佬的 LiveDataBus 文章,在这基础上阅读源码并写下这篇文章作为记录,美团大佬原文链接 Android消息总线的演进之路:用LiveDataBus替代RxBus、EventBus