rxjava3.0 入门到精通系列(一)初步入门篇

4,673 阅读6分钟

简介

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)

RxJava是Reactive Extensions的Java实现,用于通过Observable序列来构建异步和基于事件的程序的库。

初学者只需要把握两点:观察者模式和异步就基本可以熟练使用RxJava了。

异步没有很高深的东西,不做过多解释,就是多线程,线程切换的东西。

观察者模式

举个例子:

妈妈叫孩子吃饭

  • 妈妈(被观察者)作为事件的发起方,是主动的,作为"叫吃饭"事件的起点。
  • 孩子(观察者)作为事件的接收方,是被动的,作为"叫吃饭"事件的终点。
  • 在起点和终点之间还可以被加工处理,具体为过滤,转换,合并等操作符(后面会进行说明)。例如妈妈让 大儿子去叫二儿子吃饭,大儿子看二儿子正在小伙伴家吃饭,就不叫二儿子吃饭了,这个是过滤操作符。

上面三点对理解Rxjava十分重要,因为它们分别对应了Rxjava中的观察者(Observable)、观察者(Obserer)、操作符的职能。

Rxjava 是基于观察者模式来组建程序逻辑的,构建被观察者和观察者 然后建立二者订阅关系,就像妈妈喊话的声音,连接起孩子和妈妈,在实际传递过程中,还可以对事件进行各种处理。

创建被观察者

private Observable createMotherCallEatObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("baby ,it's time for lunch");
            }
        });
    }

创建观察者

    private Observer createChildObserver() {
        return new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String str) {
                //处理传递过来的事件
                Toast.makeText(MainActivity.this, str, Toast.LENGTH_LONG).show();
            }

            @Override
            public void onError(Throwable e) {
                //事件出错会调用这个方法
            }

            @Override
            public void onComplete() {
                //事件结束会执行到这里
            }
        };
    }

实现订阅关系

    private void callEat(){
        Observable motherCallEat = createMotherCallEatObservable();
        motherCallEat.subscribeWith(createChildObserver());
    }

这里motherCallEat.subscribeWith(createChildObserver())大家可能会有疑问,语法上为什么是被观察者订阅了观察者呢?应该是观察者订阅被观察者才对呀, 为什么?

是这样,逻辑上应该是观察者订阅被观察者,但是为了保证流式API调用风格,什么是流式API调用风格呢?把上面订阅的代码简写成如下:

private void callEatSimpleOperator() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                emitter.onNext("baby ,it's time for lunch");
            }
        }).filter(new Predicate<String>(){

            @Override
            public boolean test(String str) throws Throwable {
                if("baby ,it's time for lunch".equals(str))
                    return true;
                return false;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) {
                Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
            }
        });
    }

这就是流式调用风格,被观察者是流的上游,而观察者是流的下游。这里还增加了过滤操作符,传递的数据不是"baby ,it's time for lunch"事件将不传递下去。同一个调用主体调用下来一气呵成。 代码和流程图对应关系如下:

可以看到上游(Observable)和 下游(Observer)之间可以有n个操作符。上面已经详细介绍了rxjava事件起点和终点的观察者架构,下面来说一说事件中间处理过程,也就是操作符。

操作符

事件的起点和事件的重点可能没有什么称奇的地方,而事件传递过程各种操作符才是rxjava最大的亮点。这里我们只介绍一下最常用到的map和flatmap操作符。map和flatmap 都是用于对Observable发射的数据执行变换的操作符。

  • map 对序列的每一项都应用一个函数来变换Observable发射的数据序列。 代码示例:
Observable.just(R.mipmap.ic_launcher).subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.io()).map(new Function<Integer, Bitmap>() {
            @Override
            public Bitmap apply(Integer resource) {
                return BitmapFactory.decodeResource(getResources(), R.mipmap.ic_launcher);
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Bitmap>() {
            @Override
            public void accept(Bitmap bitmap) throws Exception {
                mImageView.setImageBitmap(bitmap);
            }
        });

这是一个将int类型资源转换为Bitmap类型,然后做显示的例子。

  • flatmap 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable。
Observable.just(R.mipmap.ic_launcher).subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.io()).flatMap(new Function<Integer, ObservableSource<Bitmap>>() {
            @Override
            public ObservableSource<Bitmap> apply(Integer integer) throws Exception {
                return  Observable.just(BitmapFactory.decodeResource(getResources(), R.mipmap.ic_launcher));
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Bitmap>() {

            @Override
            public void accept(Bitmap bitmap) throws Exception {
                mImageView.setImageBitmap(bitmap);
            }
        });

同样是显示图片的例子,flatmap 会将数据转换为observable,并且将Observable 中数据传递到下游,当然Observable中可以有n个数据,这样会发射n个事件传递给下游。

操作符还有很多,主要分为创建操作、变换操作、过滤操作、结合操作等类型,如题可参考如下: RxJava操作符的文档地址(中文)

异步

异步就是相对与主线程的多线程调度。

调度器 功能
AndroidSchedulers.mainThread() 需要引用rxandroid, 切换到UI线程
Schedulers.computation() 用于计算任务,如事件循环和回调处理,默认线程数等于处理器数量
Schedulers.io() 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需求,它默认是一个CacheThreadScheduler
Schedulers.newThread() 为每一个任务创建一个新线程
Schedulers.trampoline() 在当前线程中立刻执行,如当前线程中有任务在执行则将其暂停, 等插入进来的任务执行完成之后,在将未完成的任务继续完成。
Scheduler.from(executor) 指定Executor作为调度器
     //new Observable.just()执行在新线程
  Observable.just(getFilePath())
           //指定在新线程中创建被观察者
          .subscribeOn(Schedulers.newThread())
          //将接下来执行的线程环境指定为io线程
          .observeOn(Schedulers.io())
            //map就处在io线程
          .map(mMapOperater)
            //将后面执行的线程环境切换为主线程,
            //但是这一句依然执行在io线程
          .observeOn(AndroidSchedulers.mainThread())
          //指定线程无效,但这句代码本身执行在主线程
          .subscribeOn(Schedulers.io())
          //执行在主线程
          .subscribe(mSubscriber);
 

rxjava中线程调度只需要用到subscribeOn()observeOn()两个方法

  • subscribeOn 指定 Observable 事件序列在某调度器上创建,只会执行一次,如指定多次以第一次为准。
  • observeOn 指定 事件传输过程加工的操作符及事件最终处理的Observer 发生在哪一个调度器线程中。可指定多次,每一次指定都会在下一步生效。

总结

本篇文章是一篇rxjava入门文章,介绍了rxjava概念以及需要掌握的两点观察者模式和异步。中间还介绍了常用的符号map和flatmap,后面准备从不同角度深入rxjava。

参考资料