Rxjava2操作符入门(一)

1,599 阅读7分钟

友情链接:
Rxjava2操作符(一)
Rxjava2操作符(二)
Rxjava2操作符(三)
demo地址:github

1. 概述:

用官网的一句话:"a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。也就是咱们常说的链式编程

2. Rxjava的好处

异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁(本段内容摘自扔物线的博客)。

3. 观察者和被观察者

  • Observable(被观察者)/Observer(观察者) Obsesrver用来连接这两个
  • Flowable(被观察者)/Subscriber(观察者) (2.0出现的支持背压) subscribe用来连接这两个

4. 简单使用

发射源有多少个onNext就会发射多少次,onComplete 和 onError是冲突的两个方法,有你没我,有我没你 如果在onComplete或者onError调用OnNext方法不会再起作用

    //创建被观察者
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
                e.onNext(3);
                //e.onError(new Throwable());
            }
        });
        //创建观察者
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG,"onSubscribe订阅了");
            }

            @Override
            public void onNext(Integer integer) {
                Log.i(TAG,"onNext"+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG,"onError");
            }

            @Override
            public void onComplete() {
                Log.i(TAG,"onComplete");
            }
        };
        // 开始订阅
        observable.subscribe(observer);

打印出来的Log为

onSubscribe订阅了
onNext1
onNext2
onComplete

4.1 Observer(观察者)中的方法

  • onSubscribe(Disposable d)
    当订阅到被观察者的时候调用 , Disposable 用来解除订阅的,防止内存泄漏
  • onNext(T t)
    被观察者发送OnNext方法的时候调用
  • onComplete()
    当被观察者调用onComplete方法时执行
  • onError(Throwable e)
    当被观察者调用onError方法时执行

subscribe()方法内可以传递的东西

  • Consumer<? super T> onNext
    表示被观察者只关注onNext
  • Consumer<? super T> onNext, Consumer<? super Throwable> onError)
    表示被观察者只关注onNext 和 onError
  • Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
    表示被观察者只关注onNext 和 onError 和 onComplete
  • Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
    表示被观察者四个方法都关注

5. 大致的操作符分为以下几点:

当然了下面的也不是所有的操作符

创建型 Create Just fromIterable Timer Interval Repeat
转化型 Map FlatMap Buffer Scan Window GroupBy
过滤型 Filter Distince Skip Take Last Debounce
组合型 Zip Join And Switch Merge StartWith
错误处理性 Retry Catch
辅助型 SubscribeOn ObserveOn Timer Interval DoOnNext Delay
条件和布尔 All SkipUntil TakeUntil Contains Amb
算数和聚合型 Conact Count Max Min Sum
连接型 Connect Publish Replay RefCount
异步操作 Start ToAsync StartFuture FromAction FromCallable RunAsync
阻塞操作 ForEach Firsh Last MostRecent Next Single
字符串操作 Split Decode Encode Join Form ByLine

6. 创建型(Creating): 也就是创建 Observable (被观察者)

6.1 Create (表示只发送OnNext方法)

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.i(TAG,integer+"");
        }
    });

打印出来

12-12 05:17:00.307 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 1
12-12 05:17:00.307 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 2
12-12 05:17:00.307 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 3

6.2 just (将传入的参数依次发送出来)

    Observable.just(1,2,3).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.i(TAG,integer+"");
        }
    });

打印出来

12-12 05:17:32.349 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 1
12-12 05:17:32.349 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 2
12-12 05:17:32.349 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 3

6.3 fromIterable (将Iterable中的对象依次发送出去)

同样 fromArray 是将 数组 中的数据依次发送出去

    ArrayList<String >  arrayList = new ArrayList<>();
    for(int i = 0;i<3;i++) {
        arrayList.add(""+i+i);
    }
    Observable.fromIterable(arrayList).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.i(TAG,s+"");
        }
    });

打印出来

12-12 05:17:46.458 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 00
12-12 05:17:46.458 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 11
12-12 05:17:46.458 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 22

6.4 Timer (它在一个给定的延迟后发射一个特殊的值,等同于Android中Handler的postDelay( )方法)

//                        DAYS,HOURS,MICROSECONDS,MILLISECONDS,MINUTES,NANOSECONDS,SECONDS;
//                        天   小时   微秒         毫秒         分钟    纳秒       秒                             
   final long start = System.currentTimeMillis();
                Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        long end = System.currentTimeMillis();
                        Log.i(TAG,"时间差:"+(end-start)+"ms");
                    }
                });

打印出来

12-12 05:20:22.483 20471-20629/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 时间差:1008ms

6.5 Interval (创建一个按固定时间间隔发射整数序列)

可以用来当做计时器,或者间隔性请求网络数据

 Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                         Log.i(TAG,""+aLong);
                    }
                });

打印出来

12-12 05:25:35.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 0
12-12 05:25:36.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 1
12-12 05:25:37.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 2
12-12 05:25:38.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 3
12-12 05:25:39.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 4
12-12 05:25:40.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 5
...
...
...

6.6 repeat (创建一个重复发射特定数据的Observable)

可以用来当做计时器,或者间隔性请求网络数据

  Observable.just(1).repeat(2).subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer i) throws Exception {
                        mCount++;
                        Log.i(TAG,"第:"+mCount+"次"+"数据为:"+i);
                    }
                });

打印出来

12-12 05:25:56.585 21915-21915/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 第:1次数据为:1
12-12 05:25:56.585 21915-21915/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 第:2次数据为:1

7. 既然讲了创建被观察者(Observable)和观察者(Observer), 那么先讲一下Schedulers线程调度器

如果Observable默认的是在主线程中,Observer默认跟随Observable的线程

  • Schedulers.computation()
    计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

  • Schedulers.newThread()
    开启一个新的线程

 Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        Log.i(TAG, "发布的线程是:" + Thread.currentThread().getName());
                    }
                })
                        .subscribeOn(Schedulers.io())
                        .observeOn(Schedulers.newThread())
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.i(TAG, "订阅的线程是:" + Thread.currentThread().getName());
                            }
                        });

打印出来

12-13 06:59:25.085 11730-12398/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 发布的线程是:RxCachedThreadScheduler-2
12-13 06:59:25.086 11730-12573/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 订阅的线程是:RxNewThreadScheduler-1
  • Schedulers.io()
    主要用于一些耗时操作,比如读写文件,数据库存取,网络交互等。 这个调度器根据需要,增加或者减少线程池中的线程数量。需要注意的是Schedulers.i0()中的线程池数量是无限制大的,大量的I/0操作将创建许多线程,我们需要在性能和线程数量中做出取舍。
 Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        Log.i(TAG, "发布的线程是:" + Thread.currentThread().getName());
                    }
                })
                        .subscribeOn(Schedulers.io())
                        .observeOn(Schedulers.io())
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.i(TAG, "订阅的线程是:" + Thread.currentThread().getName());
                            }
                        });

打印出来

12-13 06:58:22.448 11730-12398/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 发布的线程是:RxCachedThreadScheduler-2
12-13 06:58:22.448 11730-12399/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 订阅的线程是:RxCachedThreadScheduler-3
  • AndroidSchedulers.mainThread()
    Android中专用的,指定的操作在Android的主线程(UI线程中)运行
 Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        Log.i(TAG, "发布的线程是:" + Thread.currentThread().getName());
                    }
                })
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.i(TAG, "订阅的线程是:" + Thread.currentThread().getName());
                            }
                        });

打印出来

12-13 06:54:57.969 11730-11782/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 发布的线程是:RxCachedThreadScheduler-1
12-13 06:54:57.970 11730-11730/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 订阅的线程是:main