加深对RxJava线程切换的理解之ObserveOn和SubscribeOn

4,128 阅读3分钟

这几天看到一篇些文章,关于RxJava线程切换的,说到RxJava的线程切换,就一定会涉及ObserveOn和SubscribeOn两个方法。

网上对于ObserveOn和SubscribeOn的调用顺序、调用有效等有很多文章,读者可以自行搜索。

胖子只是为了加深对RxJava线程切换的理解,同时完善一些网上结论不严谨的地方,毕竟实践出真理。

这里只做结论,对于原理,会另起一篇,介绍RxJava原理机制时,详细介绍。

先来说说大概介绍下ObserveOn和SubscribeOn分别是做什么的,看官网的描述和图

As shown in this illustration, the SubscribeOn operator designates which thread the 
Observable will begin operating on, no matter at what point in the chain of operators 
that operator is called. ObserveOn, on the other hand, affects the thread that the 
Observable will use below where that operator appears. For this reason, you may call 
ObserveOn multiple times at various points during the chain of Observable operators in 
order to change on which threads certain of those operators operate.
翻译
如此图所示,SubscribeOn运算符指定Observable将在哪个线程上开始操作,无论该运算符在运算符链中的哪个点被调用。另一方面,ObserveOn影响Observable将在该运算符出现的位置使用的线程。因此,您可以在Observable运算符链中的各个点多次调用ObserveOn,以更改某些运算符在哪些线程上进行操作。
比较通熟易懂点的结论:
     1.SubscribeOn切换的是订阅的线程
     2.ObserveOn切换的是发送的线程
再通熟易懂点一点:
     1.SubscribeOn切换的是Observable执行的线程
     2.ObserveOn改切换的是观察Observable的线程
简单、粗暴:
     1.SubscribeOn在ObserveOn调用前,切换网络请求、IO等耗时操作的线程。
     2.耗时操作完后,ObserveOn切换回来,更新UI。

ObserveOn和SubscribeOn.png

关于subscribeOn

activityMainBinding.button.setOnClickListener(v->{
            Observable.just(3)//传一个3
                    .subscribeOn(Schedulers.newThread())//注意这里1
                    .map(str->{
                        Log.e(TAG, "第一次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribeOn(Schedulers.newThread())//注意这里2
                    .map(str->{
                        Log.e(TAG, "第二次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribeOn(Schedulers.newThread())//注意这里3
                    .map(str->{
                        Log.e(TAG, "第三次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribe(integer->{
                Log.e(TAG,"最后接收到的为" + integer+",当前线程 = "+Thread.currentThread().getName());
            });
        });

image-20200925140548098.png再来看看另一种

activityMainBinding.button.setOnClickListener(v->{
            Observable.just(3)//传一个3
                    .subscribeOn(Schedulers.newThread())//注意这里1
                    .map(str->{
                        Log.e(TAG, "第一次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribeOn(Schedulers.io())//注意这里2
                    .map(str->{
                        Log.e(TAG, "第二次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())//注意这里3
                    .map(str->{
                        Log.e(TAG, "第三次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribe(integer->{
                Log.e(TAG,"最后接收到的为" + integer+",当前线程 = "+Thread.currentThread().getName());
            });
        });

image-20200925141306317.png再来

activityMainBinding.button.setOnClickListener(v->{
            Observable.just(3)//传一个3
                    .subscribeOn(Schedulers.newThread())
                    .map(str->{
                        Log.e(TAG, "第一次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribeOn(Schedulers.newThread())
                    .map(str->{
                        Log.e(TAG, "第二次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .map(str->{
                        Log.e(TAG, "第三次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribe(integer->{
                Log.e(TAG,"最后接收到的为" + integer+",当前线程 = "+Thread.currentThread().getName());
            });
        });

image-20200925141651668.png再来

activityMainBinding.button.setOnClickListener(v->{
            Observable.just(3)//传一个3
                    .subscribeOn(Schedulers.io())
                    .map(str->{
                        Log.e(TAG, "第一次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribeOn(Schedulers.io())
                    .map(str->{
                        Log.e(TAG, "第二次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribeOn(Schedulers.io())
                    .map(str->{
                        Log.e(TAG, "第三次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                        return Math.pow(str,2);
                    })
                    .subscribe(integer->{
                Log.e(TAG,"最后接收到的为" + integer+",当前线程 = "+Thread.currentThread().getName());
            });
        });

image-20200925144538226.png

对于subscribeOn

不同的调用,只保留第一次调用。

如:

.subscribeOn(Schedulers.newThread())//保留

.subscribeOn(AndroidSchedulers.mainThread())

Schedulers.io()多次调用,调用次数为双数为2,单数为1。

如:

.subscribeOn(Schedulers.io())会切换到RxCachedThreadScheduler-1

or

.subscribeOn(Schedulers.io())

.subscribeOn(Schedulers.io())会切换到RxCachedThreadScheduler-2

or

.subscribeOn(Schedulers.io())

.subscribeOn(Schedulers.io())

.subscribeOn(Schedulers.io())会切换到RxCachedThreadScheduler-1

...以此内推

Schedulers.newThread()多次调用,叠加。

如:

.subscribeOn(Schedulers.newThread())

.subscribeOn(Schedulers.newThread())

.subscribeOn(Schedulers.newThread())会切换到RxNewThreadScheduler-3

关于ObserveOn

activityMainBinding.button.setOnClickListener(v->{
    Observable.just(3)//传一个3
            .observeOn(Schedulers.newThread())//第一次切换
            .map(str->{
                Log.e(TAG, "第一次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                return Math.pow(str,2);
            })
            .observeOn(Schedulers.io())//第二次切换
            .map(str->{
                Log.e(TAG, "第二次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                return Math.pow(str,2);
            })
            .observeOn(AndroidSchedulers.mainThread())//第三次切换
            .map(str->{
                Log.e(TAG, "第三次map,参数 = "+str+",当前线程 = "+Thread.currentThread().getName());
                return Math.pow(str,2);
            })
            .subscribe(integer->{
        Log.e(TAG,"最后接收到的为" + integer+",当前线程 = "+Thread.currentThread().getName());
    });
});

image-20200925143403055.pngobserveOn比较直接,调用一次,切换一次,次次生效。