Kotlin从0到开发一个 App — Part 5:Kotlin,RxJava&RxAndroid

1,000 阅读5分钟

在这章我们将讨论 Kotlin 中的 RxJava.这是一个不可思议的组合,我们将利用它们使我们的 App 到达另一个层次,解耦我们的 UI 线程与后台线程,在这种情况下从服务端请求 Reddit 新闻.

全部章节:

Kotlin — Part 0:关于这个系列
Kotlin — Part 1:配置 Android Studio
Kotlin — Part 2:语法,空安全,静态类型
Kotlin — Part 3:扩展函数、Android 扩展、委托属性
Kotlin — Part 4:RecyclerView— Kotlin 适配器委托&数据类
Kotlin — Part 5:Kotlin,RxJava&RxAndroid
Kotlin — Part 6:API-Retrofit&Kotlin)
Kotlin — Part 7:无限滑动:高阶函数& Lambdas
Kotlin — Part 8:方向改变(序列化&数据类)
Kotlin — Part 9:单元测试与 Kotlin(Mockito,RxJava)

Github 仓库:github.com/imuhao/Kedd…

如果现在你第一次听说 RxJava,我推荐你先阅读这个文章.

RxJava 是一个 响应式扩展实现,一个由异步和基于事件驱动的第三方框架.

我们将使用 Observable 类避免从主线程中请求服务端数据(或任何长时间操作)

News Manager

NewsManager 类从 Reddit API 提供新闻数据.它负责制定服务请求,给我们一个新闻 item 集合.

这个主要的思想是使我们的 API 调用在主线程以外的其它线程执行.一个大的图片将解释我们所要做的:

Observable

这个 NewsManager 类将提供一个方法返回一个 Observable 对象,允许你在另一个地方运行一块代码(在这种情况下载一个新的线程).

fun getNews(): Observable<List<RedditNewsItem>> {
    ...
}

我们将在 NewsManager 中创建 Observable 对象.我们将提供这个对象的实现.

class NewsManager {
    fun getNews(): Observable<List<RedditNewsItem>> {
        return Observable.create { subscriber ->
            var news = mutableListOf<RedditNewsItem>()
            subscriber.onNext(news)
            subscriber.onCompleted()
        }
    }
}

感谢 Kotlin,我们可以使用 lambda 表达式为 Observable.create() 方法提供需求.这个 create() 方法需要你提供一个 subscriber 对象,这个 subscriber 对象允许你通过3个方法发送事件到订阅者.

  • onNext(item:T): 我们调用这个方法为订阅者提供从服务端接受到的新闻数据,在这种情况下根据 getNews() 的返回类型推断出 T 将是” List “类型
  • onCompleted():调用这个方法代表Observable 已经完成提供所有的数据,我们将在调用 onNext() 方法之后调用这个方法,准备好结束这个进程.
  • onError(e:Throwable): 如果我们调用 API 出现任何错误将使用这个方法告诉订阅者,我们将之后通过 SnackBar 向用户展示错误信息.

Lazy NewsManager

我们为 NewsManager 使用 lazy 属性

private val newsManager by lazy {
        NewsManager()
    }

这个 newsManager 将只在第一次使用时初始化 NewsManager()

从 NewsFragment 请求新闻

现在是时候使用我们的 NewsManager, 请求一些新闻显示在 RecyclerView.

Subscribe>Subscription>Observer

为了从 NewsManager请求 Reddit 新闻, 我们在 NewsFragment 将NewsManager.getNews() 得到的 Observable 转变为 Observer. 我们要做的是调用 getNews() 方法,调用这个方法的” subscribe()”接受这个 Observable:

val subscription = newsManager.getNews().subscribe (
        { retrievedNews ->
            ...
        },
        { e ->
            ...
        }

这个 subscribe 有好几个重载方法,我们将使用这个:

public final Subscription subscribe(
       final Action1<? super T> onNext, 
       final Action1<Throwable> onError) {
       ...
}

它接受两个函数

  • onNext:当 Observable 的 onNext 调用时这个函数将被调用,我们将使用它设置 NewsAdapter.
  • onError: 当 Observable 的 onError 调用时候这个函数被调用,我们使用它显示 SncakBar 与错误消息.

它返回一个 Subscription 对象,这个对象允许我们管理这个subscription, 像检测是否订阅或取消订阅.

再一次感谢 Kotlin, 我们可以对这两个函数使用 lambda 表达式.从这个例子的第一个函数,这个 retrievedNews 变量是我们从 onNext() 方法接受新闻数据的名字.

{ 
    retrievedNews ->
     (news_list.adapter as NewsAdapter).addNews(retrievedNews)
}

记得onNext 返回类型是” List “,所以我要做的是直接将这些数据设置给 NewsAdapter.

在 onError 函数,我只是告诉用户错误信息,我可以使用” e”,一个 Throwable 类型得到详细错误信息

{ e ->
    Snackbar.make(...).show()
}

现在我们仍然在主线程

如果我们现在运行 APP, 它仍然工作,因为我们现在的数据是从 Observable 中获取的,但是如果我们在 Observable 中指定长时间的操作 APP 将会停止运行,因为现在还在主线程中.

我们没有为 Observable 指定任何具体细节,它将运行默认的配置,代码会执行在调用时相同的线程.所以让我们配置我们的 Observable 在其它线程执行,但是在主线程中通知事件.

SubscribeOn

val subscription = newsManager.getNews()
        .subscribeOn(Schedulers.io())
        .subscribe (...)

这个方法 subscribeOn(…)允许你移动Observable 代码到其它线程,使用指定的行为.要做到这一点, RxJava 使用 Schedulers 提供了一个调度列表:

  • io: IO操作线程
  • computation:计算线程
  • newThread:为每一次工作创建一个新的线程
  • test:通常用来测试

在这种情况下,我们使用 Schedulers.io()来执行 API 请求

ObserveOn

ObserveOn 是另一个 Observable 类的方法,它允许你定义在哪里执行”subscribe(…)”方法.换句话说,它可以指定我们的 onNext和 onError 函数所在的线程.我们需要指定这些代码在主线程中执行.

在这里我们需要添加RxAndroid 依赖有一个新的调度器叫:

AndroidSchedulers.mainThread()

之后我们需要更新我们的代码,设置 observeOn 调度器

val subscription = newsManager.getNews()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe ( ...
)

Subscriptions

为了避免在订阅期间我们的 fragment 被关闭或者应用关闭,我们必须管理所有的订阅者,在关闭时对它们全部解绑.一个好的办法是创建一个 CompositeSubscription对象添加所有创建的订阅者,这个类通过 RxJava 提供,运行你通过调用一个方法解绑所有的观察者.我们在 onResume 中初始化 CompositeSubscription对象,在 onPause 方法中解绑.

var subscriptions = CompositeSubscription()
override fun onResume() {
    super.onResume()
    subscriptions = CompositeSubscription()
}
override fun onPause() {
    super.onPause()
    subscriptions.clear()
}

现在我们唯一要做的事情就是添加我们的订阅者到这个新的CompositeSubscription对象,在任何时候我们都可以离开我们的 App.

private fun requestNews() {
    val subscription = newsManager.getNews()
            .subscribeOn(Schedulers.io())
            .subscribe (...)
    subscriptions.add(subscription) // add the subscription
}

为了使代码更加可读,我将这些代码移动到一个新的基类” BaseFragment”中,使我们的 fragment 默认有这些逻辑.

总结

正如你看到的 Kotlin 和 RxJava 可以完美的集合再一起,在这个部分我们只是看到了一小部分,你可以看到 RxJava 与 Kotlin 集合,代码看起来更加可读和理解.