项目实战之Rxjava、RxBinding在实际项目中的使用

5,466 阅读8分钟

前言

网上很多讲rxjava入门的文章,讲了什么是rxjava以及一些高大上的操作符,但是真正在项目中使用的场景很少讲,那本篇文章主要讲一下rxjava在实际项目中的应用场景,rxjava结合rxbinding在实际项目中的使用姿势了解一下。因为rxbind2 本身依赖rxjava2,所以项目中引入rxbinding就可以了,rxjava2就不用引入了。

implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1'

引入完了就看一下常用的使用场景吧:

1、优化搜索

基本上app里都有搜索这个功能需求吧,监听et的文本变化然后请求服务器拉取数据,如果不优化处理器的话,每次et的值发生变化都会请求服务器,在弱网环境下很可能出现数据错乱的问题。如果不用rxjava来处理,各种timer会把人写晕掉吧,那么看看rxjava怎么来优雅的处理:

//优化搜索功能
    RxTextView.textChanges(mBinding.etSearch)
            // 跳过一开始et内容为空时的搜索
            .skip(1)
            //debounce 在一定的时间内没有操作就会发送事件
            .debounce(1000, TimeUnit.MILLISECONDS)
            //下面这两个都是数据转换
            //flatMap:当同时多个网络请求访问的时候,前面的网络数据会覆盖后面的网络数据
            //switchMap:当同时多个网络请求访问的时候,会以最后一个发送请求为准,前面网路数据会被最后一个覆盖
            .switchMap(new Function<CharSequence, ObservableSource<List<String>>>() {
                @Override
                public ObservableSource<List<String>> apply(CharSequence charSequence) throws Exception {
                    String searchKey = charSequence.toString();
                    System.out.println("binding=======搜索内容:" + searchKey);
                    //这里执行网络操作,获取数据
                    List<String> list = new ArrayList<String>();
                    list.add("小刘哥");
                    list.add("可爱多");

                    return Observable.just(list);
                }
            })
            // .onErrorResumeNext()
            //网络操作,获取我们需要的数据
            .subscribeOn(Schedulers.io())
            //界面更新在主线程
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<String>>() {
                @Override
                public void accept(List<String> strings) throws Exception {
                    System.out.println("binding=======搜索到" + strings.size() + "条数据");
                }
            });

注释写的很清楚了,不用解释了吧,需要注意的一点就是 .skip(1) 这个操作符不能少,不然页面一打开就会执行一次搜索的。

2、结合rxbinding防手抖

    /**
     * 防止多次点击--2秒内执行一次点击
     */
    RxView.clicks(mBinding.btClick)
            .throttleFirst(2, TimeUnit.SECONDS)
            .subscribe(c -> System.out.println("binding=======点击了按钮"));

假如一个页面有一个按钮,点击一次要请求一下服务器或者其他操作都可以,这里做了2秒内响应一次点击事件,很常用的场景。

3、长按事件

    /**
     * 长按事件
     */
    RxView.longClicks(mBinding.btClick)
            .subscribe(c->System.out.println("binding=======长按了按钮"));

长按事件,这个不用说了吧。

4、监听view的选中状态

    /**
     * checkbox 选中就修改textview
     */
    RxCompoundButton.checkedChanges(mBinding.checkbox)
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    mBinding.tvCb.setText(aBoolean ? "按钮选中了" : "按钮未选中");
                }
            });

假如页面有一个cb,比如选中表示同意阅读了用户协议什么的,用来监听选中状态来做一些逻辑操作,几行代码就搞定。

5、注册、登录等获取验证码时的倒计时操作

/**
 * 倒计时操作
 */
public void clickTimer(View view) {

    // 2 秒后发送数据
    Observable.timer(2, TimeUnit.SECONDS)
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Long value) {
                    System.out.println("binding=======value:" + value);//0
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });

    //倒计时操作
    final int count = 10;
    Observable.interval(0, 1, TimeUnit.SECONDS)//设置0延迟,每隔一秒发送一条数据
            .take(count + 1)//设置循环次数
            .map(new Function<Long, Long>() {
                @Override
                public Long apply(Long aLong) throws Exception {

                    return count - aLong;
                }
            })
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    //在发送数据的时候设置为不能点击
                    mBinding.btCutdown.setEnabled(false);

                    //背景色
                    mBinding.btCutdown.setBackgroundColor(Color.parseColor("#39c6c1"));
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Long value) {
                    mBinding.btCutdown.setText("" + value);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    mBinding.btCutdown.setText("重新获取");
                    mBinding.btCutdown.setEnabled(true);
                    mBinding.btCutdown.setBackgroundColor(Color.parseColor("#d1d1d1"));
                }
            });

}

很简洁吧

6、注册登录等情况下,所有输入都合法再点亮登录按钮

/**
     * 注册登录等情况下,所有输入都合法再点亮登录按钮
     */
    Observable<CharSequence> name = RxTextView.textChanges(mBinding.etName).skip(1);
    Observable<CharSequence> age = RxTextView.textChanges(mBinding.etAge).skip(1);

    Observable.combineLatest(name, age, new BiFunction<CharSequence, CharSequence, Boolean>() {
        @Override
        public Boolean apply(CharSequence charSequence, CharSequence charSequence2) throws Exception {

            boolean isNameEmpty = TextUtils.isEmpty(mBinding.etName.getText());
            boolean isAgeEmpty = TextUtils.isEmpty(mBinding.etAge.getText());

            return !isNameEmpty && !isAgeEmpty;
        }
    })
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    System.out.println("bt======" + aBoolean);
                    mBinding.btSubmit.setEnabled(aBoolean);
                }
            });

7、使用interval做周期性操作

 /**
 * 每隔2秒 输出一次日志
 */
Disposable mDisposable;
public void clickIntervar(View view) {

    Observable.interval(2, TimeUnit.SECONDS)
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    mDisposable =d;

                }

                @Override
                public void onNext(Long value) {
                    System.out.println("binding=======输出日志:" + value);
                    if (value == 5L) {
                        System.out.println("binding=======dispose");
                        mDisposable.dispose();
                    }
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}

8、使用schedulePeriodically做轮询请求

/**
 * 使用schedulePeriodically做轮询请求 3秒轮询一次
 */
Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(final ObservableEmitter<String> e) throws Exception {

            Schedulers.newThread().createWorker()
                    .schedulePeriodically(new Runnable() {
                        @Override
                        public void run() {
                            e.onNext("net work-----");
                        }
                    }, 0, 3, TimeUnit.SECONDS);

        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("binding=======net work");
        }
    });

9、网络出错重试

/**
 * 网络错误重试
 * 这里just操作符 改为retrofit 网络请求返回的即可。
 */
int mRetryCount;

public void clickRetry(View view) {
    Observable.just("retry")
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {

                    // 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型
                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Throwable throwable) throws Exception {

                            // 判断异常信息  根据异常信息判断是否需要重试
                            if (throwable instanceof IOException) {
                                System.out.println("retry======y==");
                                // 重试
                                // 判断重试次数 这里设置最多重试5次
                                if (mRetryCount < 5) {
                                    mRetryCount++;
                                    /**
                                     * 1、通过返回的Observable发送的事件 = Next事件,从而使得retryWhen()重订阅,最终实现重试功能
                                     * 2、延迟1段时间再重试  采用delay操作符 = 延迟一段时间发送,以实现重试间隔设置
                                     * 3、在delay操作符的等待时间内设置 = 每重试1次,增多延迟重试时间1s
                                     */
                                    int time = 1000 + mRetryCount * 1000;
                                    return Observable.just(1).delay(time, TimeUnit.MILLISECONDS);
                                } else {
                                    System.out.println("retry======5==");
                                    return Observable.error(new Throwable("已重试5次 放弃治疗"));
                                }

                            } else {
                                // 不重试
                                System.out.println("retry======n==");
                                return Observable.error(new Throwable("发生了非网络异常(非I/O异常)"));
                            }
                        }
                    });
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String value) {
                    System.out.println("retry======suc==" + value);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("retry======err==" + e.toString());
                }

                @Override
                public void onComplete() {

                }
            });

}

10、解决网络嵌套请求

/**
 * 优化网络嵌套请求问题
 * 以下为了方便演示 写的伪代码
 */
public void clickRequest(View view) {
    Observable<String> requestLogin = Observable.just("requestLogin");
    final Observable<String> request2 = Observable.just("request2");

    requestLogin.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("flat=======loginsuccess");
                }
            })
            .observeOn(Schedulers.io())
            .flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String s) throws Exception {
                    // 将网络请求1转换成网络请求2,即发送网络请求2
                    return request2;
                }
            })
            // (新被观察者,同时也是新观察者)切换到IO线程去发起登录请求
            //  特别注意:因为flatMap是对初始被观察者作变换,所以对于旧被观察者,它是新观察者,所以通过observeOn切换线程
            // 但对于初始观察者,它则是新的被观察者
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("flat=======第二次请求成功");
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("flat=======loginerr");
                }
            });
}

11、背压--这个就是记录一下

/**
 * 背压 Flowable g观察者使用
 * 解决发送和订阅事件 流速不一致的问题
 * <p>
 * 注意:同步订阅中,被观察者 & 观察者工作于同1线程,同步订阅关系中没有缓存区。
 * 被观察者在发送1个事件后,必须等待观察者接收后,才能继续发下1个事件.若Subscription.request没有设置,
 * 观察者接收不到事件,会抛出MissingBackpressureException异常。
 */
Subscription mSubscription;

public void clickFlow(View view) {

    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> e) throws Exception {

            /**
             * 同步订阅:
             * 同步订阅的情况下,调用e.requested()方法,获取当前观察者需要接收的事件数量.
             * 根据当前观察者需要接收的事件数量来发送事件
             *
             * 异步订阅:
             * 由于二者处于不同线程,所以被观察者 无法通过 FlowableEmitter.requested()知道观察者自身接收事件能力。
             * 异步的反向控制:
             */
            long count = e.requested();
            System.out.println("flowable======需要接收的事件数量=" + count);

            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onNext(4);
            e.onNext(5);
            e.onComplete();
        }
    }, BackpressureStrategy.ERROR)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {

                    // 作用:决定观察者能够接收多少个事件,多出的事件放入缓存区.若不设置,则不接收事件.
                    // 不过被观察者仍然在发送事件(存放在缓存区,大小为128),等观察者需要时 再取出被观察者事件(比如点击事件里).
                    // 但是 当缓存区满时  就会溢出报错
                    // 官方默认推荐使用Long.MAX_VALUE,即s.request(Long.MAX_VALUE);
                    mSubscription = s;
                    s.request(2);
                    // s.request(1); // 同步订阅 观察者连续要求接收事件的话,被观察者e.requested() 返回3
                }

                @Override
                public void onNext(Integer integer) {

                    System.out.println("flowable=======" + integer);
                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onComplete() {

                }
            });

}

12、补充一个动态权限

添加依赖 compile 'com.tbruyelle.rxpermissions2:rxpermissions:0.9.5@aar'

// 记得危险权限 清单文件里也需要配置。
// 因为各个业务组件都可能使用到危险权限,我把权限统一写在了commonLibrary里
 RxPermissions permissions = new RxPermissions(this);
    RxView.clicks(mBinding.btPermission)
            .throttleFirst(1, TimeUnit.SECONDS)
            .subscribeOn(AndroidSchedulers.mainThread())
            .compose(permissions.ensure(Manifest.permission.CAMERA))
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    if (aBoolean) {
                        System.out.println("binding=======允许");
                    } else {
                        System.out.println("binding=======拒绝");
                    }
                }
            });

13、结合retrofit网络请求封装、统一错误预处理等

以上的使用场景是在现有项目中,而项目架构搭建的初期涉及到的网络封装、统一错误预处理等由于篇幅问题,要拿出来单独写了。网络返回的数据一般情况下是后台封装好的固定格式(比如错误码、错误信息由后台接口设定),这样处理起来还简单一点。但是有时候api返回的数据格式是原生的http响应格式,这样封装处理的话外面又要套一层response泛型类,处理起来稍微比第一种情况复杂一点。下篇博客再写吧。

其他项目中使用到的场景,遇到了会更新在本博客……

最后,国际惯例 贴出项目地址:点我点我查看demo,如果对你有帮助,麻烦动动小手start鼓励一下,谢谢。