我所了解的RxJS

2,709 阅读6分钟

简介

RxJS 是使用 Observables的响应式编程的库,它使编写异步或基于回调的代码更容易,是ReactiveX编程理念的JavaScript版本。RxJS的强大之处正是它使用纯函数来产生值的能力。这意味着你的代码更不容易出错。

安装

官方安装

npm install rxjs
/// 导入整个核心功能集:
import Rx from 'rxjs/Rx';
Rx.Observable.of(1,2,3)

推荐安装

根据官方安装发现rxjs不能完全加载,需要依赖rxjs-compat包,推荐使用以下安装

npm i -s Rxjs@6 rxjs-compat@6
import * as Rx from 'rxjs/Rx'

RxJS核心概念

Observable简介

Observable举例说明

  Rx.Observable.of('1', '2', '3').map(x=>x*10).filter(x=>x>5).subscribe(x=>console.log(x))
  • 创建过程:Rx.Observable.of('1', '2', '3') 创建一个依次发送1、2、3的observable
  • 逻辑过程:*.map().filter()*每个值乘以10,然后去过滤出大于5的值。如果先写filter操作符,然后再map,则得不到数据
  • 订阅过程:*subscribe()*类似回调函数。这个过程会得到一个对象subscription。
  • 执行过程:x=>console.log(x) 默认情况下为执行next回调
  • 清理过程:示例如下
const subscription = Rx.Observable.of('1','2','3').map(x=>x*10).filter(x=>x>5).delay(1000).subscribe(x=>console.log(x));
subscription.unsubscribe()

Subject简介

什么是 Subject? - RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。 每个 Subject 都是 Observable 。 - 对于 Subject,你可以提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。 在 Subject 的内部,subscribe 不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener 的工作方式。 每个 Subject 都是观察者。 - Subject 是一个有如下方法的对象: next(v)、error(e) 和 complete() 。要给 Subject 提供新值,只要调用 next(theValue),它会将值多播给已注册监听该 Subject 的观察者们。 Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。 根据官网,我们大概可以以下理解: Observable类似单车道单行线,逆行或者多辆车同时开都是不允许的 Subject类似没有监控的双行线,随你往哪里开,怎么开,多少车开都没有问题 所以可以理解Subject是一类特殊的Observable,它可以向多个Observer多路推送数值。普通的Observable并不具备多路推送的能力(每一个Observer都有自己独立的执行环境),而Subject可以共享一个执行环境

Subject举例说明

const test = Observable.interval(1000).take(3);
const observerA = {
  v => console.log(`a:${v}`) 
}
const observerB = {
  v => console.log(`b:${v}`)
}                                                                              ///定义好observable
test .subscribe(observerA)
setTimeout(() => {test .subscribe(observerB) }, 2000)    
///因为observable是单播的,所以会输出 a:0、a:1、b:0、a:2、b:1、b:2
const subject = new Subject()
subject.subscribe(observerA)
test.subscribe(subject)
setTimeout(() => {subject.subscribe(observerB)}, 2000)
///因为Subject是多播的,共享一个执行,所以输出为:a:0、a:1、a:2、b:2

Subject多态

由于subject的特殊性,衍生出多种subject的变体,具体就不阐述了,他们的对比如下图

Rxjs 是否存储数据 是否需要初始值 何时向订阅者发布数据
Subject 及时发布,有新数据就发布
BehaviorSubject 是,存储最后一条数据或者初始值 及时发布,有新数据就发布
ReplaySubject 是,存储所有数据 及时发布,有新数据就发布
AsyncSubject 是,存储最后一条数据 延时发布,只有当数据源完成时才会发布

Scheduler简介

什么是Scheduler? - Scheduler控制着何时启动 subscription 和何时发送通知。它由三部分组成

调度器是一种数据结构。它知道如何根据优先级或其他标准来存储任务和将任务进行排序。 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了“时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。 调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者。

操作符归纳

RxJS提供了各种API来创建数据流:

单值:of, empty, never 多值:from 定时:interval, timer 从事件创建:fromEvent 从Promise创建:fromPromise 自定义创建:create

创建出来的数据流是一种可观察的序列,可以被订阅,也可以被用来做一些转换操作,比如:

改变数据形态:map, mapTo, pluck 过滤一些值:filter, skip, first, last, take 时间轴上的操作:delay, timeout, throttle, debounce, audit, bufferTime 累加:reduce, scan 异常处理:throw, catch, finally, retry, 条件执行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn 转接:switch

也可以对若干个数据流进行组合:

concat,保持原来的序列顺序连接两个数据流 merge,合并序列 race,预设条件为其中一个数据流完成 forkJoin,预设条件为所有数据流都完成 zip,取各来源数据流最后一个值合并为对象 combineLatest,取各来源数据流最后一个值合并为数组

RxJS 难点

RxJS 处理异步逻辑,数据流,事件非常擅长。使用Rxjs前处理数据一般是处于一种'上帝'视角来对数据可视化的调试,Rxjs大大缩短了代码量的同时能够更好的达到数据的处理(纯净性)。正是由于其强大的特性,所以学习Rxjs有以下难点(个人认为) 1、抽象程度比较高,需要开发人员具备比较强的归纳总结能力 2、操作符多而且杂,需要花大力气记住并且合理使用各个操作符

测试题

  • 1、鼠标点击后console相隔2秒输出5的倍数
  • 2、现有3个异步操作a、b、c,请提供让三个异步并行完成后同时输出值的方法
  • 3、’人和未来大数据‘ ===》 取最后4个字(多种方法)
  • 4、模拟一个程序员,工资不涨,每天赚相同的钱n,钱足够了(100n)就买房,买了房然后把房子租给别人,每个月收取房租m(5n),然后收入变成n+m,然后钱足够了继续买房,然后继续租给访客,收入变成n+2m

参考答案

//////题目1
const timer = Rx.Observable.interval(2000);
const event = Rx.Observable.fromEvent(document, 'click')
event.switchMap(() => timer)
 .map(x => x * 5)
 .subscribe(x => console.log('第1题:' + x));
/////题目2
const fa = (cb) => {
  setTimeout(() => cb('a'), 1000);
}
const fb = (cb) => {
  setTimeout(() => cb('b'), 2000);
}
const fc = (cb) => {
  setTimeout(() => cb('c'), 4000);
}
const oa = Rx.Observable.bindCallback(fa);
const ob = Rx.Observable.bindCallback(fb);
const oc = Rx.Observable.bindCallback(fc);

Rx.Observable.combineLatest(oa(),ob(),oc())
  .subscribe(x => console.log('第2题:' + x));
  /////同时还可以用forkJoin,zip
//////题目3
const str = "人和未来大数据";
const param = str.split('');
Rx.Observable.from(param)
  .takeLast(4)
  .subscribe(x => console.log('第3题:' + x));
///////////////////////////////////////////////////////
Rx.Observable.from(param).subscribe(new ReplaySubject(3))
///////题目4
const house$ = new Subject()  ///房子
const houseCount$ = house$.scan((acc, num) => acc + num, 0).startWith(0) ///房子数

// 工资始终不涨
const salary$ = Observable.interval(100).mapTo(1) //程序员工资n
const rent$ = Observable.interval(3000)
  .withLatestFrom(houseCount$)
  .map(arr => arr[1] * 5)

// 一买了房,就没现金了……
const income$ = Observable.merge(salary$, rent$)
const cash$ = income$
  .scan((acc, num) => {
    const newSum = acc + num
    const newHouse = Math.floor(newSum / 100)
    if (newHouse > 0) {
      house$.next(newHouse)
    }
    return newSum % 100
  }, 0)
houseCount$.subscribe(num => console.log(`houseCount: ${num}`))
cash$.subscribe(num => console.log(`cash: ${num}`))

原文链接:tech.gtxlab.com/sth-about-r…


作者简介: 张栓,人和未来大数据前端工程师,专注于html/css/js的学习与开发。