Rxjava源码思路分析与程序员的思考

1,974 阅读4分钟

Rxjava近来在技术圈非常火,个人认为其优势主要是两点:

  • 线程切换
    使用Rxjava 可以非常方便指定订阅者对执行线程,这一点对于安卓开发中来说简直美妙
  • 逻辑清晰
    使用Rxjava自带的几个函数对数据进行处理, 代码可以非常整齐,逻辑很清晰。

java作为一个静态语言,缺少了很多灵活性,使用起来很多时候感觉非常死板,使用Rxjava后其类似函数式语言的特性让人感觉很灵动。本文主要来分析一下Rxjava源码是如何将这样一个个复杂的流程清晰地展现出来的。

先来看一下一个Rxjava使用的一个例子:

Observable.just("a", "b", "c", "d")
                .observeOn(Schedulers.computation())
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        System.out.print(Thread.currentThread().getName() + ":first--" + s);
                        return s + s;
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        System.out.print(Thread.currentThread().getName() + ":second--" + s);
                        return s + s;
                    }
                })
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        System.out.print(Thread.currentThread().getName());
                        System.out.print("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.print("error");
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });

这是一个非常简单的流程,但是也进行了三次线程的切换,对原始数据进行了两次map,可以看到使用Rxjava非常优雅,逻辑也很清楚。以下来通过该过程对Rxjava的逻辑进行一些分析。

关键类

首先来介绍一下源码中的几个重要的类:

  1. Observable
    Observable是使用Rxjava的入口,很多重要的方法都在此类中,如常用的create(), map(), filter(), subscribe()等。

  2. OnSubscribe
    OnSubscribe是Observable中定义的接口,继承自Action1接口,实际只有一个call方法,但是Rxjava流程中多个环节中的多个类都实现了该接口,之后通过链表到方式连接起来。

  3. Operator
    Operator是Observable中定义的接口,继承自Func1接口,在Rxjava中需要切换线程的时候均需要实现该接口,java中不能直接传递函数,所以只能通过接口的方式来实现。
  4. ObserveOnSubseriber
    ObserveOnSubseriber是OperatorObserveOn类中的静态类,它是数据处理的关键,该类继承自Subscriber类,实现基本的onNext,onError, onCompleate方法。
  5. Worker
    Worker是Scheduler中的一个抽象静态类,worker是真正执行任务的地方,在worker中含有各个线程池的引用,在这里进行线程的执行。

Rxjava流程

Rxjava流程实际上时一个链表结构,代码例子每一个"."在具体实现中都创建了一个Observable对象,然后通过parent或者source属性连接起来,当subscibe方法执行后,通过一个list连接各个subscriber,通过链表回溯到最开始的地方,然后从最开始处理数据,数据再一级一级传递,直到所有数据处理完成。
下边以一个简单的流程示意一下,可能不太准确,但是原理差不多:

  • 链表创建:A(just)-B(lift(线程切换实际是通过lift实现))-C(map)-D(lift)-(map)
  • subscribe开始执行代码
  • 通过回溯链表创建subscriber list
  • 回溯到数据源,开始通过fun对数据进行处理,数据处理通过创建的subscriber list不断传输处理,直到最终的subsciber执行完成。

实际基本过程进行了三遍。

以下是一张根据实际源码绘制的流程图,理解原理后结合图就可以愉快地看源码了:

Paste_Image.png

可以根据顺序标号走一遍流程,基本源码也就理解一些了。本文实际上并没有贴源码,主要介绍一下思路,然后跟着流程图,源码必须是自己读并进行代码调试才是有效果的。

Rxjava之路

这里mark一下,希望通过阅读Rxjava, 自己可以实现一个简易的函数式流程。

NotRxJava guide for lazy folks中作者嫌Rxjava代码量大,于是介绍了一下自己的思路。文中的代码并不能实现一个非常安全可靠的代码库,但是其思路很值得学习。

另谷歌开源了ageragithub,也是一个函数式的例子,代码量比Rxjava要少许多,其亮点是实现了数据的分离,“Push event,pull data model”是其目标。其源码思路非常清晰,简直是写代码的典范。github,MaterialDesignExample中是MaterialDesign使用的例子,其中使用agera对网络访问到数据渲染到流程进行控制,数据来源是知乎日报,感兴趣的可以看下。

参考

gank.io/post/560e15…
blog.csdn.net/wl9739/arti…
yarikx.github.io/NotRxJava/