RxJava2.0│基础简介

94 阅读6分钟
原文链接: mp.weixin.qq.com

☝点击上方蓝字,关注我们!

RxJava2是基于观察者模式的,针对事件流链式操作的响应式编程,在Android开发中已经进行了广泛的应用。本文通过对RxJava2基础流程、线程调度、背压策略的源码解析,对RxJava2在Android中的用法进行归纳和总结。目的帮助大家了解RxJava2的实现机制,和在不同的业务场景选择合适的用法。

1、RxJava2的思维导图

2、五种观察者模式

被观察者

观察者

描述

应用场景

Observable

Observer

能够发射0或n个数据,并以成功或错误事件终止。

常规

Flowable

Subscriber

能发射0或n个数据,并以成功或错误事件终止。支持背压,可以控制数据源发射的速度。

下载/上传或其他需要回调进度的情况,数据库操作,rxbus等

Single

SingleObserver

只发射单个数据或错误事件。没有onComplete

上游必须发射数据给下游处理,否则任务失败

Completable

CompletableObserver

从来不发射数据,只处理onComplete和onError事件。可以看成是Rx的Runnable。

耗时操作,只处理完成或任务失败的情况,不在意上游处理后的返回数据。

Maybe

MaybeObserver

能够发射0或1个数据,要么成功onSuccess,要么失败onError,要么onComplete。

http网络请求。

 

3、Transformer和compose()

消除重复代码

Transformer能够将一个Observable/Flowable/Single/Completable/Maybe对象转换成另一个Observable/Flowable/Single/Completable/Maybe对象。通常可配合compose()操作符使用。非常适合用于封装常用的操作符组合。

先看如下代码:

1Observable.from(someSource) 2        .map(new Func1<Data, Data>() { 3            @Override public Data call(Data data) { 4                return manipulate(data); 5            } 6        }).subscribeOn(Schedulers.io()) 7        .observeOn(AndroidSchedulers.mainThread()) 8        .subscribe(new Action1<Data>() { 9            @Override public void call(Data data) {10                doSomething(data);11            }12        });

我们通常都希望在工作线程中处理数据,然后在主线程中处理结果,会经常使用subscribeOn()和observeOn()进行线程切换。所以我们可以对此进行封装,例如:

1<T> Observable<T> applySchedulers(Observable<T> observable) {2    return observable.subscribeOn(Schedulers.io())3            .observeOn(AndroidSchedulers.mainThread());4}

1applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() { 2    @Override public Data call(Data data) { 3        return manipulate(data); 4    } 5}) 6        ).subscribe(new Action1<Data>() { 7    @Override public void call(Data data) { 8        doSomething(data); 9    }10});

但是这段代码看起来不美观,并且不符合链式结构。如果在一个数据流中反复使用,代码看起来会非常丑。而Transformer和compose()的结合,能帮助我们优雅的解决这个问题。

我们先通过Transformer对Observable的线程调度进行封装:

1<T> ObservableTransformer<T, T> applySchedulers() {2    return new ObservableTransformer<T, T>() {3        @Override4        public ObservableSource<T> apply(Observable<T> upstream) {5            return upstream.subscribeOn(Schedulers.io())6                    .observeOn(AndroidSchedulers.mainThread());7        }8    };9}

然后通过compose()操作符进行连接:

1Observable.from(someSource) 2        .map(new Func1<Data, Data>() { 3            @Override public Data call(Data data) { 4                return manipulate(data); 5            } 6        }) 7        .compose(this.<YourType>applySchedulers()) 8        .subscribe(new Action1<Data>() { 9            @Override public void call(Data data) {10                doSomething(data);11            }12        });

3.1 compose()与flatMap():

相同:

返回值都是Observable

不同:

(1) 只能通过 compose() 来从数据流中获取源 Observable 。所以对于影响整个事件流的操作函数(例如 subscribeOn() 和 observeOn())需要使用 compose()。如果你把 subscribeOn()/observeOn() 在 flatMap 内使用,则只会对在 flatMap 里面创建的 Observable 有用而不是整个数据流;

(2) 一旦创建了事件流,compose() 就立刻开始执行了。而flatMap() 只有当每次 onNext() 调用的时候才开始执行。也就是说,flatMap() 转换的是每个单独的数据而 compose() 是作用在整个数据流上的;

(3) 由于每次调用 Observable 的 onNext() 函数 flatMap() 都会创建一个新的 Observable,所以 flatMap() 的效率并不是很好。

4、Hot Observable和Cold Observable

在RxJava中,Observable有Hot和Cold之分。这是Observable的基本概念,但是由于客户端对此的应用场景有限(绝大多数时候用的是cold),所以放到最后来说明。

Hot Observable:无论有没有Observer进行订阅,事件始终都会发生。有多个Observer订阅时,Hot Observable与Observer的关系是一对多的关系。

Cold Observable:只有Observer订阅了,才开始发射数据。Cold Observable与Observer是一对一的关系。有多个Observer时,它们各自的事件是独立的。

我们常见的工厂方法,如just, create, range, fromXXX等创建的Observable都是cold的。

Cold Observable转换为Hot Observable有两类方式:

1.publish/replay: 线程安全;

2.Subject/Processor: 线程不安全。

4.1 publish/replay:

publish和replay将原先的Observable转换为ConnectableObservable。两者的不同是,Observer订阅之后,replay会先收到Observable之前发送的N个数据。

工作流程如下:

个人理解,refCount()后的Observable,介于cold和hot之间。属于广播式的observable,但是需要被订阅才可被激活,在最后一个Observer解绑后停止。

4.2 Subject/Processor:

Subject与Processor作用相同。不同点在于,Processor继承Flowable,支持背压。

Subject既是Observable,又是Observer (继承Observable,实现Observer接口)。作为Observer,可以订阅Cold Observable,是对方开始发送时间。同时,又作为Observable转发或发送新的事件。让Cold Observable转换为Hot Observable。

 


狐友技术团队其他精彩文章

Swift之Codable实战技巧

不了解GIF的加载原理?看我就够了!

安卓系统权限,你真的了解吗?

AspectJ在Android中的应用


加入搜狐技术作者天团

千元稿费等你来!

戳这里!☛