RxJava 的使用与理解(一)

3,987 阅读6分钟

ReactiveX编程简称Rx编程,又叫响应式编程、响应式扩展,英文为Reactive Extensions。可以查看官方网站www.reactive.io,就像其网站说的”Expertise makes better software.”,响应式编程的目标是提供一致的编程接口, 帮助开发者更方便的处理异步数据流,使软件开发更高效、更简洁。Rx是一个多语言的实现,已经支持多种语言包括Java、Swift、C++、.NET、JavaScript、Ruby、Groovy、Scala等等,支持的库包括:RxJavaRxSwift、Rx.NET、RxJS、RXRuby等等,真是屌炸天。在Android上我们添加 RxAndroid 库就可以,RxAndroid 是对 RxJava 一种更接地气的扩展。下面让我们通过 RxAndroid 去使用、理解 RxJava 吧。

Rx使用观察者模式

创建:Rx可以方便的创建事件流和数据流
组合:Rx使用查询式的操作符组合和变换数据流
监听:Rx可以订阅任何可观察的数据流并执行操作

Rx使代码简化

函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题

先看简单的例子,通过RxJava将Integer类型转成String类型

private void funcDemo() {
    Observable.OnSubscribe onSubscribe1 = new Observable.OnSubscribe() {
        @Override
        public void call(Subscriber subscriber) {
            subscriber.onNext(100);
        }
    };

    Func1 func1 = new Func1() {
        @Override
        public String call(Integer integer) {
            return String.valueOf(integer);
        }
    };

    Subscriber subscriber1 = new Subscriber() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onNext(String s) {
            Log.d("onNext(): ", s);
        }
    };

    Observable.create(onSubscribe1)
            .map(func1)
            .subscribe(subscriber1);

    // 将上面分解成三步执行
    // Observable observable1 = Observable.create(onSubscribe1);
    // Observable observable2 = observable1.map(func1);
    // observable2.subscribe(subscriber1);
}

主要关注这里面生成的四个对象:observable1、onSubscribe1、func1、subscriber1,如果对相应的单词不清晰,移步下面的备注。
这是典型的的被观察者与观察者的关系,或者叫被订阅者与订阅者的关系;下面理一下他们的角色:

observable1:被观察者,是subscriber1要订阅的对象。  
onSubscribe1:被观察者的行为,是subscriber1要订阅的行为,subscribe()时被执行。  
subscriber1:订阅者,是抽象类实现了Observer接口,可以叫观察者,其实就是观察者的角色。

// 分解成三步执行的代码
Observable observable1 = Observable.create(onSubscribe1);
Observable observable2 = observable1.map(func1);
observable2.subscribe(subscriber1);

这段代码的意思就是:创建一个被观察者observable1,给被观察者指定其所发布的行为(onSubscribe1来实现); 指定观察者时,只需指定相应的观察回调即可;在完成订阅的操作时,是先调用subscriber1的onStart方法, 之后通过订阅行为onSubscribe1来调用subscriber1完成相应的订阅操作;最后若出现异常则会回调subscriber1的onError方法。

换句话说就是:被观察者发布的行为是传递Integer类型的数值100,map()变换后,观察者收到了String类型的字符串”100”。

简单点说就是:观察者订阅了被观察者要发布的行为。

源码参考个人作品【图灵机器人】

现在结合源码,一步步看它到底是怎么执行的!

public static  Observable create(OnSubscribe f) {
    return new Observable(hook.onCreate(f));
}

protected Observable(OnSubscribe f) {
    this.onSubscribe = f;
}

这里面泛型比较多,先忽略泛型符号;hook又是什么东东?翻译过来就是“钩子”,到底是什么钩子呢,查看源码后知道, `hook` 是一个 proxy 对象, 仅仅用作调试的时候可以插入一些测试代码的,那也先忽略。

所以Observable.create(onSubscribe1)干了两件事,第一 new 一个 observable1 对象,第二将 new 出的 onSubscribe1 对象通过 Observable 的构造函数赋值给它的成员变量 onSubscribe。

第二步 observable1.map(func1)

先看func1对象,RxJava有一系列的(Func+数字)的接口和一系列(Action+数字)接口,这些接口中都只有一个call方法,其中(Func+数字)接口的call方法都有返回值,而(Action+数字)接口的call方法都没有返回值,后面的那个数字表示call方法接受几个泛型类型的参数。看一下Func1和Action1的源码:

/**
 * Represents a function with one argument.
 */
public interface Func1 extends Function {
    R call(T t);
}

/**
 * A one-argument action.
 */
public interface Action1 extends Action {
    void call(T t);
}

Func1 和 Action(Action1继承Action) 都继承 Function 接口,Func1 接收一个 T 泛型类型的参数,call 回调后,返回一个 R 泛型类型的值,是一种“变换”函数接口,我们可以在 call 回调中处理这种“变换”的需求。
接下来看看 map(func1) 干了神马,上源码。。。

public final  Observable map(Func1 func) {
    return lift(new OperatorMap(func));
}

public final  Observable lift(final Operator operator) {
    return new Observable(new OnSubscribe() {
        @Override
        public void call(Subscriber o) {
            // 核心代码
            Subscriber subscriber2 = hook.onLift(operator).call(o);
            subscriber2.onStart();
            onSubscribe1.call(subscriber2);
        }
    });
}

map()函数里直接调用的lift()函数,先看看OperatorMap和lift()函数的参数Operator是什么玩意,再上源码。。。

/**
 * Operator function for lifting into an Observable.
 */
public interface Operator extends Func1, Subscriber> {
    // cover for generics insanity
}

public final class OperatorMap implements Operator {

    final Func1 transformer;

    public OperatorMap(Func1 transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber call(final Subscriber o) {
        return new Subscriber(o) {
            @Override
            public void onCompleted() {
                o.onCompleted();
            }
            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }
            @Override
            public void onNext(T t) {
                o.onNext(transformer.call(t));
            }
        };
    }
}

Operator 继承 Func1,Operator 和 Func1 都是一种“变换”接口,比如输入Integer类型参数经过处理返回String类型值, OperatorMap 继承 Operator,但它的参数又和Operator相反,难道经过OperatorMap又把String类型变换成Integer类型值啦, 其实 OperatorMap 类有个Func1属性transformer(transformer就是func1),执行o.onNext(transformer.call(t))就将func1变换的结果传递下去了。

lift()返回的是个新的被观察者对象observable2,同时创建一个新的OnSubscribe对象,暂时标记为onSubscribe2; 在onSubscribe2回调中调用hook.onLift(operator).call(o),变换后并生成新的观察者subscriber2对象, 新的观察者subscriber2对象被绑定到原来创建onSubscribe1对象上,可以理解为subscriber2已经订阅到经过变换后的 observable1要发布的行为,最后这种变化后的行为继续被发送到subscriber1观察者那里。

subscriber1开始订阅被观察者的行为,也可以说被观察者的行为subscribe()时被执行;我们可以在观察者的回调中处理我们最终得到的结果; 在执行订阅后返回了Subscription对象,里面包含两个方法:

public interface Subscription {
    // 取消订阅
    void unsubscribe();
    // 订阅是否被取消
    boolean isUnsubscribed();
}

最后总结下吧,为了了解RxJava的运行机制;我使用一个简单的函数,实现的功能就是通过RxJava将Integer类型转成String类型并打印出来,并结合源码理解其执行过程;这里面泛型、接口的回调和各种角色的变换确实不好理解;接下来我会将RxJava和Retrofit2结合起来使用, 所以【图灵机器人】就这样产生啦。

参考文章:

彻底搞懂 RxJava — 基础篇
彻底搞懂 RxJava — 中级篇
彻底搞懂 RxJava — 高级篇

大头鬼Bruce
RxJava基本流程和lift源码分析

扔物线
给 Android 开发者的 RxJava 详解

备注:

observable:
adj. 显著的;觉察得到的;看得见的
n. [物] 可观察量;感觉到的事物

observer:
n. 观察者;[天] 观测者;遵守者

subscribe:
vi. 订阅;捐款;认购;赞成;签署
vt. 签署;赞成;捐助

subscriber:
n. 订户;签署者;捐献者