阅读 3724

史上最全的Rxjava2讲解(使用篇)

1.前言

在很久之前就一直想整理一下rxjava,但是一直没有时间,最近是因为离职了,总算有时间整理一下了。因为打算每篇博客都记录一个框架。所以为了描述清楚,本篇博客可能略长(包含rxjava的简介,使用,背压,原理等),希望你们能认真的读完,收获肯定还是有的,也会采用大量的图来介绍,这样可以加深理解。也可以当一个工具博客,需要的使用的话随时查阅。

后续还会继续出背压和原理篇,敬请期待

2.简介

什么是rxjava? 是一种事件驱动的基于异步数据流的编程模式,整个数据流就像一条河流,它可以被观测(监听),过滤,操控或者与其他数据流合并为一条新的数据流。

三要素

  1. 被观察者(Observable)
  2. 观察者(Observer)
  3. 订阅(subscribe)

好了,因为秉持着要有图的思想,在介绍rxjava各个操作符的时候,会采用大量的图示来表示,图示来源于官方,这里先给大家介绍一下怎么看。
ok,进入到撸码环节

3.简单使用

1.首先要在 build.gradle 文件中添加依赖

implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
复制代码

2.依赖搭建完毕了,我们先写个最简单的案例,一共3步走

  • 2.1 创建被观察者
   // 创建被观察者
   Observable.create(new ObservableOnSubscribe<String>() {

       @Override
       public void subscribe(ObservableEmitter<String> emitter) throws Exception {

            emitter.onNext("你好呀");
            emitter.onNext("我爱中国");
            emitter.onNext("祝愿祖国繁荣富强");
            emitter.onComplete();
        }
   });
复制代码
  • 2.2 创建观察者
   // 创建观察者
   Observer observer = new Observer<String>(){

       @Override
       public void onSubscribe(Disposable d) {

           Log.i("lybj", "准备监听");
       }

       @Override
       public void onNext(String s) {

           Log.i("lybj", s);
       }

       @Override
       public void onError(Throwable e) {

           Log.i("lybj", "error");
       }

       @Override
       public void onComplete() {

           Log.i("lybj", "监听完毕");
       }
   };
复制代码
  • 2.3 订阅(也就是将被观察者和观察者关联)
   // 订阅
   observable.subscribe(observer);
复制代码

这就完事了,看下结果

是不是很简单,几个概念再介绍一下

  • onNext():当被观察者(observable)通过调用onNext()发射数据的时候,观察者(observer)调用onNext()接收数据
  • onError():当被观察者(observable)调用该函数时,观察者(observer)调用onError(),其他事件将不会继续发送
  • onComplete():当被观察者(observable)调用该函数时,观察者(observer)调用onComplete(),其他事件将不会继续发送

其实rxjava,打个比方,就类似花洒的头,数据流就类似水流,它的被观察者(observable)的各种操作符就是花洒的那个头,可以有各种模式,比如中间喷水的,周围喷水的,喷水雾的等等。根据操作符的不同,可以改变数据的各种样式,根据花洒头的不同,可以把水流改成各种样式。 接下来,就来学习下observable的丰富的操作符。

先看看大纲


4.创建操作符

1.create()

1.1 做啥的?

创建被观察者对象

1.2 如何用?

// 创建被观察者
Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("你好呀");
                emitter.onNext("我爱中国");
                emitter.onNext("祝愿祖国繁荣富强");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>(){ // 关联观察者

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj", "准备监听");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj", s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "error");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "监听完毕");
            }
        });
复制代码

1.3 结果

可以直接链式调用关联观察者


2 just()

2.1 做啥的?

通过上面的图,应该很形象的说明了,主要作用就是创建一个被观察者,并发送事件,但是发送的事件不可以超过10个以上。

2.2 如何用?

Observable.just("小明", "小红", "小兰").subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj", "准备监听");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj", s+"来了");
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "Error");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "完毕");
            }
        });
复制代码

2.3 结果


3 timer()

3.1 做啥的?

当到指定时间后就会发送一个 0 的值给观察者。 在项目中,可以做一些延时的处理,类似于Handler中的延时

3.2 如何用?

Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {

        @Override
        public void accept(Long aLong) throws Exception {

             Log.i("lybj", aLong+"");
        }
});
复制代码

3.3 结果

延迟2秒后,将结果发送给观察者,Consumer和Observer是创建观察者的两种写法,相当于观察者中的onNext方法。


4 interval()

4.1 做啥的?

每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。 类似于项目中的timer,做计时器

4.2 如何用?

Observable.interval(3,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            
     @Override
     public void accept(Long aLong) throws Exception {

          Log.i("lybj", aLong+"");
     }
});
复制代码

4.3 结果


5 intervalRange()

5.1 做啥的?

可以指定发送事件的开始值和数量,其他与 interval() 的功能一样。

5.2 如何用?

Observable.intervalRange(100, 4, 0, 10, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
           
      @Override
      public void accept(Long aLong) throws Exception {

           Log.i("lybj", aLong+"");
      }
});
复制代码

5.3 结果

参数依次是:开始值,循环执行的次数,初始延迟时间,执行间隔时间,时间单位


6 range()

6.1 做啥的?

同时发送一定范围的事件序列。

6.2 如何用?

Observable.range(0,10).subscribe(new Consumer<Integer>() {
            
    @Override
    public void accept(Integer integer) throws Exception {

         Log.i("lybj", integer+"");
    }
});
复制代码

6.3 结果


7 rangeLong()

7.1 做啥的?

作用与 range() 一样,只是数据类型为 Long

7.2 如何用?

Observable.rangeLong(0,10).subscribe(new Consumer<Long>() {
            
    @Override
    public void accept(Long aLong) throws Exception {

         Log.i("lybj", aLong+"");
    }
});
复制代码

7.3 结果


8 empty() & never() & error()

8.1 做啥的?

  • never():不发送任何事件
  • error():发送 onError() 事件
  • empty() : 直接发送 onComplete() 事件

8.2 如何用?

 private void  empty_never_error(){

        Observable.empty().subscribe(new Observer(){
            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj", "准备监听");
            }

            @Override
            public void onNext(Object o) {

                Log.i("lybj", o+"");
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "onError");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "onComplete");
            }
        });
复制代码

8.3 结果

如果是empty() 则:

如果是error() 则:

如果是never()则:



5.转换操作符

1 map()

1.1 做啥的?

map 可以将被观察者发送的数据类型转变成其他的类型

1.2 怎么用?

Observable.just("中国", "祖国", "中国军人")
                .map(new Function<String, String>() {

                    @Override
                    public String apply(String s) throws Exception {

                        return "我爱" + s;
                    }
                })
                .subscribe(new Consumer<String>() {

                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj", s);
                    }
                });
复制代码

1.3 结果

简单来讲,就是可以对发射过来的数据进行再加工,再传给观察者


2 flatMap()

2.1 做啥的?

这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。 flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable,map()只是返回数据,如果在元素再加工的时候,想再使用上面的创建操作符的话,建议使用flatMap(),而非map()。

2.2 怎么用?

 Observable.just("中国", "祖国", "中国军人", "贪官")
                .flatMap(new Function<String, ObservableSource<String>>() {

                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {

                        if(s.equals("贪官")){

                            return Observable.error(new Exception("贪官不能被喜欢"));
                        }
                        return Observable.just("我爱"+s);
                    }
                })
                .subscribe(new Consumer<String>() {

                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj", s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(Throwable throwable) throws Exception {

                        Log.i("lybj", throwable.getMessage());
                    }
                });
复制代码

2.3 结果

new Consumer方法监听的是Observable.error()


3 concatMap()

3.1 做啥的?

concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。

3.2 怎么用?

Observable.just("中国", "祖国", "中国军人", "贪官")
                .concatMap(new Function<String, ObservableSource<String>>() {

                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {

                        if(s.equals("贪官")){

                            return Observable.error(new Exception("贪官不能被喜欢"));
                        }
                        return Observable.just("我爱"+s);
                    }
                })
                .subscribe(new Consumer<String>() {

                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj", s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(Throwable throwable) throws Exception {

                        Log.i("lybj", throwable.getMessage());
                    }
                });
复制代码

3.3 结果


4 buffer()

4.1 做啥的?

从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。

4.2 怎么用?

buffer 有两个参数,一个是 count,另一个 skip。count 缓冲区元素的数量,skip 就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。

Observable.just("1", "2", "3", "4", "5")
                .buffer(2,1)
                .subscribe(new Consumer<List<String>>() {

                    @Override
                    public void accept(List<String> strings) throws Exception {

                        Log.d("lybj", "缓冲区大小: " + strings.size());
                        for (String s : strings){
                            Log.d("lybj",  s);
                        }
                    }
                });
复制代码

4.3 结果


5 scan()

5.1 做啥的?

将发射的数据通过一个函数进行变换,然后将变换后的结果作为参数跟下一个发射的数据一起继续通过那个函数变换,这样依次连续发射得到最终结果。

5.2 怎么用

Observable.just(1, 2, 3, 4, 5)
                .scan(new BiFunction<Integer, Integer, Integer>() {

                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {

                        Log.i("lybj", "integer01: " + integer + " integer02: "+ integer2);
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj", "accept: " + integer);
                    }
                });
复制代码

5.3 结果

简单来说,先将第一个元素返回给观察者,然后将1,2的和返给观察者,然后将上一次计算的和,当第一个元素,也就是3,第2个元素,是一直按顺序取值,取第3个元素也就是3,那么将,3+3 =6,返回给观察者,以此类推,将6作为第一个元素,第二个元素取值4,将6+4=10返回给观察者。

sacn操作符是遍历源Observable产生的结果,再按照自定义规则进行运算,依次输出每次计算后的结果给订阅者


6 window()

6.1 做啥的?

发送事件时,将这些事件分为按数量重新分组。window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分成一组。

window与buffer区别:window是把数据分割成了Observable,buffer是把数据分割成List

6.2 如何用?

Observable.just("鲁班", "孙尚香", "亚索","火女","盖伦")
                .window(2)
                .subscribe(new Consumer<Observable<String>>() {

                    @Override
                    public void accept(Observable<String> stringObservable) throws Exception {

                        Log.i("lybj", "分组开始");
                        stringObservable.subscribe(new Consumer<String>() {

                            @Override
                            public void accept(String s) throws Exception {

                                Log.i("lybj", s);
                            }
                        });
                    }
                });
复制代码

6.3 结果


6.组合操作符

1.concat()

1.1 做啥的?

可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件。

1.2 怎么用?

  private void concat(){

        Observable.concat(
                Observable.just(1, 2, 3),
                Observable.just(4, 5),
                Observable.just(6, 7),
                Observable.just(8, 9))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj", integer+"");
                    }
                });
    }
复制代码

1.3 结果


2.concatArray()

2.1 做啥的?

与 concat() 作用一样,不过 concatArray() 可以发送多于 4 个被观察者。

2.2 怎么用?

Observable.concatArray(Observable.just(1, 2, 3, 4),
                Observable.just(5, 6),
                Observable.just(7, 8, 9, 10),
                Observable.just(11, 12, 13),
                Observable.just(14, 15),
                Observable.just(16))
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj", integer+"");
            }
        });
复制代码

2.3 结果


3.merge()

3.1 做啥的?

这个方法与 concat() 作用基本一样,但是 concat() 是串行发送事件,而 merge() 并行发送事件,也是只能发送4个。

3.2 怎么用?

Observable.merge(Observable.just(1, 2, 3, 4),
                Observable.just(5, 6),
                Observable.just(7, 8, 9, 10),
                Observable.just(11, 12, 13))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj", integer+"");
                    }
                });
复制代码

3.3 结果


4.zip()

4.1 做啥的?

zip操作符用于将多个数据源合并,并生成一个新的数据源。新生成的数据源严格按照合并前的数据源的数据发射顺序,并且新数据源的数据个数等于合并前发射数据个数最少的那个数据源的数据个数。

4.2 怎么用?

Observable.zip(Observable.just(1, 2, 3),
                Observable.just("A", "B", "C", "D", "E"),
                new BiFunction<Integer, String, String>(){

                    @Override
                    public String apply(Integer o1, String o2) throws Exception {

                        return o1 +"_"+ o2;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String o) throws Exception {

                        Log.i("lybj", o);
                    }
                });
复制代码

4.3 结果


5.startWith() & startWithArray()

5.1 做啥的?

在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。

5.2 怎么用?

Observable.just(1, 2, 3)
               .startWithArray(4, 5)
               .subscribe(new Consumer<Integer>() {

                   @Override
                   public void accept(Integer integer) throws Exception {

                       Log.i("lybj", integer+"");
                   }
               });
复制代码

5.3 结果


6.count()

6.1 做啥的?

返回被观察者发送事件的数量。

6.2 怎么用?

Observable.just(2, 3, 4, 5, 6)
                .count()
                .subscribe(new Consumer<Long>() {

                    @Override
                    public void accept(Long aLong) throws Exception {

                        Log.i("lybj", "事件数量:" + aLong);
                    }
                });
复制代码

6.3 结果


7.功能操作符

1.delay()

1.1 做啥的?

延迟一段事件发送事件。

1.2 怎么用?

Observable.just(1,2,3,4)
                .delay(3, TimeUnit.SECONDS)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

1.3 结果


2.周期函数

2.1 做啥的?

doOnEach(): 每次发送事件之前都会回调这个方法

doOnNext(): Observable 每发送 onNext() 之前都会先回调这个方法。

doAfterNext(): Observable 每发送 onNext() 之后都会回调这个方法。

doOnComplete(): Observable 每发送 onComplete() 之前都会回调这个方法。

doOnError(): Observable 每发送 onError() 之前都会回调这个方法。

doOnSubscribe(): Observable 每发送 onSubscribe()之前都会回调这个方法。

doOnDispose(): 当调用 Disposable 的 dispose() 之后回调该方法。

doOnTerminate(): 在 onError 或者 onComplete 发送之前回调。

doAfterTerminate(): onError 或者 onComplete 发送之后回调。

doFinally(): 在所有事件发送完毕之后回调该方法。如果取消订阅之后doAfterTerminate()就不会被回调,而doFinally()无论怎么样都会被回调,且都会在事件序列的最后。

2.2 怎么用?

Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).doOnEach(new Consumer<Notification<Integer>>() {

            @Override
            public void accept(Notification<Integer> integerNotification) throws Exception {

                Log.i("lybj",  "doOnEach 方法执行了, 结果:"+ integerNotification.getValue());
            }
        }).doOnNext(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj",  "doOnNext 方法执行了, 结果:"+ integer);
            }
        }).doAfterNext(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj",  "doAfterNext 方法执行了, 结果:"+ integer);
            }
        }).doOnComplete(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doOnComplete 方法执行了");
            }
        }).doOnError(new Consumer<Throwable>() {

            @Override
            public void accept(Throwable throwable) throws Exception {

                Log.i("lybj",  "doOnError 方法执行了");
            }
        }).doOnSubscribe(new Consumer<Disposable>() {

            @Override
            public void accept(Disposable disposable) throws Exception {

                Log.i("lybj",  "doOnSubscribe 方法执行了");
            }
        }).doOnDispose(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doOnDispose 方法执行了");
            }
        }).doOnTerminate(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doOnTerminate 方法执行了");
            }
        }).doAfterTerminate(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doAfterTerminate 方法执行了");
            }
        }).doFinally(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doFinally 方法执行了");
            }
        }).subscribe(new Observer<Integer>() {

            private Disposable disposable;

            @Override
            public void onSubscribe(Disposable d) {

                disposable = d;
                Log.i("lybj", "------观察者onSubscribe()执行");
            }

            @Override
            public void onNext(Integer integer) {

                Log.i("lybj", "------观察者onNext()执行:"+integer);
                if(integer == 2){
//                    disposable.dispose(); // 取消订阅
                }
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "------观察者onError()执行");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "------观察者onComplete()执行");
            }
        });
复制代码

2.3 结果


3.onErrorReturn()

3.1 做啥的?

当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列。

3.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("小明:到");
                emitter.onError(new IllegalStateException("error"));
                emitter.onNext("小方:到");
            }
        }).onErrorReturn(new Function<Throwable, String>() {

            @Override
            public String apply(Throwable throwable) throws Exception {

                Log.i("lybj",  "小红请假了");
                return "小李:到";
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj",  e.getMessage());
            }

            @Override
            public void onComplete() {
            }
        });
复制代码

3.3 结果


4.onErrorResumeNext()

4.1 做啥的?

当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。

4.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("小明");
                emitter.onNext("小方");
                emitter.onNext("小红");
                emitter.onError(new NullPointerException("error"));
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {

            @Override
            public ObservableSource<? extends String> apply(Throwable throwable) throws Exception {

                return Observable.just("1", "2", "3");
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj",  "准备监听");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj",  e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i("lybj",  "onComplete");
            }
        });
复制代码

4.3 结果


5.onExceptionResumeNext()

5.1 做啥的?

与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception。

5.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("小明");
                emitter.onNext("小方");
                emitter.onNext("小红");
                emitter.onError(new Error("error"));
            }
        }).onExceptionResumeNext(new Observable<String>() {

            @Override
            protected void subscribeActual(Observer observer) {

                observer.onNext("小张");
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj",  "准备监听");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj",  e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i("lybj",  "onComplete");
            }
        });
复制代码

5.3 结果


6.retry()

6.1 做啥的?

如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数。

6.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onError(new IllegalStateException());
            }
        }).retry(2)
          .subscribe(new Observer<String>() {

              @Override
              public void onSubscribe(Disposable d) {

                  Log.i("lybj",  "准备监听");
              }

              @Override
              public void onNext(String s) {

                  Log.i("lybj",  s);
              }

              @Override
              public void onError(Throwable e) {

                  Log.i("lybj",  e.getMessage());
              }

              @Override
              public void onComplete() {
                  Log.i("lybj",  "onComplete");
              }
          });
复制代码

6.3 结果


7.retryUntil()

7.1 做啥的?

出现错误事件之后,可以通过此方法判断是否继续发送事件。

7.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            public void subscribe(@NonNull ObservableEmitter<String> emitter){

                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onError(new NullPointerException("error"));
                emitter.onNext("4");
                emitter.onNext("5");
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {

                Log.i("lybj",  "getAsBoolean");
                return true;
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });
复制代码

7.3 结果


8.repeat()

8.1 做啥的?

重复发送被观察者的事件,times 为发送次数。

8.2 怎么用?

Observable.just(1,2,3)
                .repeat(2)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

8.3 结果


9.subscribeOn() & observeOn()

9.1 做啥的?

subscribeOn(): 指定被观察者的线程,如果多次调用此方法,只有第一次有效。 observeOn(): 指定观察者的线程

9.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            public void subscribe(@NonNull ObservableEmitter<String> emitter){

                emitter.onNext("1");
                Log.i("lybj",  Thread.currentThread().getName());
            }
        }).subscribeOn(Schedulers.io())
          .observeOn(Schedulers.newThread())
          .subscribe(new Consumer<String>() {

               @Override
               public void accept(String s) throws Exception {

                   Log.i("lybj",  s);
                   Log.i("lybj",  Thread.currentThread().getName());
               }
          });
复制代码

9.3 结果


8.过滤操作符

1.filter()

1.1 做啥的?

如果返回 true 则会发送事件,否则不会发送

1.2 怎么用?

Observable.just(1,2,3,4,5)
          .filter(new Predicate<Integer>() {
                    
                    @Override
                    public boolean test(Integer integer) throws Exception {

                        if(integer > 4){
                            return true;
                        }
                        return false;
                    }
           }).subscribe(new Consumer<Integer>() {
            
               @Override
               public void accept(Integer integer) throws Exception {

                   Log.i("lybj",  integer+"");
               }
          });
复制代码

1.3 结果


2.ofType()

2.1 做啥的?

可以过滤不符合该类型事件

2.2 怎么用?

Observable.just(1, 2, 3, "小明", "小方")
                .ofType(String.class)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj",  s+"");
                    }
                });
复制代码

2.3 结果


3.skip()

3.1 做啥的?

跳过正序某些事件,count 代表跳过事件的数量

3.2 怎么用?

Observable.just(1,2,3,4,5,6,7)
                .skip(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

3.3 结果


4.distinct()

4.1 做啥的?

过滤事件序列中的重复事件。

4.2 做啥的?

Observable.just(1,2,3,1,4,1,2)
                .distinct()
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

4.3 结果


5.distinctUntilChanged()

5.1 做啥的?

过滤掉连续重复的事件

5.2 做啥的?

Observable.just(1,2,3,3,1,5,6)
        .distinctUntilChanged()
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj",  integer+"");
            }
        });
复制代码

5.3 结果


6.take()

6.1 做啥的?

控制观察者接收的事件的数量。

6.2 怎么用?

 Observable.just(1,2,3,4,5,6)
                .take(3)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

6.3 结果


7.debounce()

7.1 做啥的?

如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。 简单来说就是防抖动,比如按钮控制快速点击等。

7.2 怎么用?

Observable.just(1,2,3,4,5)
                .map(new Function<Integer, Integer>() {

                    @Override
                    public Integer apply(Integer integer) throws Exception {

                        Thread.sleep(900);
                        return integer;
                    }
                })
                .debounce(1,TimeUnit.SECONDS)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

7.3 结果


8.firstElement() && lastElement() && elementAt()

8.1 做啥的?

firstElement(): 取事件序列的第一个元素。

lastElement(): 取事件序列的最后一个元素。

elementAt(): 以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。

8.2 怎么用?

Observable.just(1,2,3,4)
                .firstElement()
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

8.3 结果


9.条件操作符

1.all()

1.1 做啥的?

判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false。

1.2 怎么用?

Observable.just(1, 2, 3, 4, 5)
                .all(new Predicate<Integer>() {

                    @Override
                    public boolean test(Integer integer) throws Exception {

                        return integer <= 4;
                    }
                }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {

                Log.i("lybj",  aBoolean+"");
            }
        });
复制代码

1.3 结果


2.takeWhile() & takeUntil()

2.1 做啥的?

takeWhile(): 从左边开始,将满足条件的元素取出来,直到遇到第一个不满足条件的元素,则终止 takeUntil(): 从左边开始,将满足条件的元素取出来,直到遇到第一个满足条件的元素,则终止 filter(): 是将所有满足条件的数据都取出。

2.2 怎么用?

Observable.just(1, 2, 3, 4, 5)
                .takeWhile(new Predicate<Integer>() {

                    @Override
                    public boolean test(Integer integer) throws Exception {

                        return integer < 3;
                    }
                }).subscribe(new Consumer<Integer>() {
            
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

2.3 结果


3.skipWhile

3.1 做啥的?

从左边开始,根据条件跳过元素

3.2 怎么用?

Observable.just(1,2,3,4,5,3,2,1,7)
                .skipWhile(new Predicate<Integer>() {

                    @Override
                    public boolean test(Integer integer) throws Exception {

                        return integer < 3;
                    }
                }).subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
复制代码

3.3 结束


4.isEmpty() & defaultIfEmpty()

4.1 做啥的?

isEmpty(): 判断事件序列是否为空。

defaultIfEmpty(): 如果观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值。

4.2 怎么用?

Observable.create(new ObservableOnSubscribe<String>() {

            public void subscribe(@NonNull ObservableEmitter<String> emitter){

                emitter.onComplete();
            }
        }).isEmpty()
          .subscribe(new Consumer<Boolean>() {

              @Override
              public void accept(Boolean aBoolean) throws Exception {

                  Log.i("lybj",  aBoolean+"");
              }
          });
复制代码

4.3 结果


5.contains()

5.1 做啥的?

判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false。

5.2 怎么用?

在Observable.just(1,2,3,4,5,6)
                .contains(2)
                .subscribe(new Consumer<Boolean>() {

                    @Override
                    public void accept(Boolean aBoolean) throws Exception {

                        Log.i("lybj",  aBoolean+"");
                    }
                });
复制代码

5.3 结果


6.sequenceEqual()

6.1 做啥的?

判断两个 Observable 发送的事件是否相同。

6.2 怎么用?

Observable.sequenceEqual(Observable.just("小明", "小方", "小李"),
                Observable.just("小明", "小方", "小李", "小张"))
                .subscribe(new Consumer<Boolean>() {

                    @Override
                    public void accept(Boolean aBoolean) throws Exception {

                        Log.i("lybj",  aBoolean+"");
                    }
                });
复制代码

6.3 结果


10.源码

demo下载

关注下面的标签,发现更多相似文章
评论