RxJS学习之路五(Operators(二))

1,156 阅读6分钟

这次介绍剩下的operators,这些操作符很多都是用来处理各个observable之间的相互关系的,在实际应用中可以解决很多复杂的异步交互逻辑的问题。这部分操作符的理解也会较为复杂。

startWith

startWith是在接收的observable前添加一个元素,和concat的区别是不能添加observable。

    interval(1000).pipe(
      startWith(666)
    ).subscribe(res => console.log(res))
    
    source :     ----0----1----2----3--...
                startWith(0)
    startWith: (0)----0----1----2----3--...

merge

merge是用来合并observable的,它和concat也有明显的区别,concat是合并之后依次执行,merge合并后是同时执行

    merge(
      interval(1000),
      interval(500)
    ).subscribe(res => console.log(res))
    
    
    source1: ----0----1----2---
    source2: --0--1--2--3--4--5---
            merge()
    merge: --0--(01)--2--(13)--4--(24)--5--

可以看出,merge后的两个observable是同步运行的,触发哪个输出哪个,这种类型的operator通常用于需要同时监听多个操作,输出结果类似的情况。 比如播放器同时监听暂停和停止按钮,瀑布流同时监听滚动条事件和点击‘更多’按钮的事件。

combineLatest

merge有点像or(||)操作,任意一个有值都可以输出。而combineLatest则是有点像and(&&)操作符

combineLatest是取得各个observable 最后发出的值,再输出成一个值,如下例子:

combineLatest(
      interval(500),
      interval(300)
    ).subscribe(([source1, source2]) => {
      console.log(source1 + source2);
    });
    
source1 : ----0----1----2----3
source2 : --0--1--2--3--4--5--6--

    combineLatest(source1 + source2);

combineLatest : ----01--23-4--(56)--7|

这个看起来有点抽象,初次接触很难理解这个输出的原因。

我们需要理解它的运作原理:当任意一个observable触发一个值时,这个值将会与另外其他的observable的现有值结合起来发送出去,当前的值会替换掉之前的值。

例子中的触发顺序:

  1. source2触发0,source1无值,不触发;
  2. source1触发0,source2为0,触发0+0=0;
  3. source2触发1,source1为0,触发1+0=1;
  4. source2触发2,source1为0,触发2+0=2;
  5. source1触发1,source2为2,触发1+2=3; .......

以此类推

这个operator非常常用,在监听多个相关联的值的时候会用到。 例如两个obervable为总数据(dataSet)和筛选因子(fliterStatus),其中任意一个发生变化都需要重新处理数据渲染页面。则可以这样处理。

combineLatest(
      $dataSet,
      $fliterStatus
    ).subscribe(callback)

zip

zip也是合并observable的操作符,combineLatest的区别是zip为对应了index的合并打包,source1的第一个元素只和source2的第一个元素合并,source1的第二个元素只和source2的第二个元素合并。

zip(
      interval(500),
      interval(300)
    ).subscribe(([source1, source2]) => {
      console.log(source1 + source2);
    });
    
    
source1 : ----0----1----2----3
source2 : --0--1--2--3--4--5--6--

    zip(source1 + source2);

zip : ----0----2----4----6----8--

zip经常用来把发送事件和时间间隔结合起来,例如实现每隔1s发送一条数据:

zip(interval(1000), $sendInfo).subscribe(callback)

withLatestFrom

withLatestFrom和combineLatest的触发方式几乎一样,唯一的区别就是withLatestFrom具有主从关系,只有主observable触发时才输出。

    interval(1000).pipe(
      withLatestFrom(interval(500), (x, y) => x + y)
    ).subscribe(res => console.log(res))
    
source1 : ----0----1----2----3
source2 : --0--1--2--3--4--5--6--

    withLatestFrom(source1 + source2);

withLatestFrom : ----1----4----7----6----9--
    

如例子所示,仅在source1触发的时候才会去检查source2的当前值,然后再进行处理输出。

这种操作可以用在后台数据不断更新我们也不断接收,但仅在点击事件发生时才在某处进行展示。

scan

scan和数组中的reduce方法比较相似,是针对于obervable的累加器

    interval(1000).pipe(
      scan((origin, next) => origin + next)
    ).subscribe(res => console.log(res))
    
source1 : --0--1--2--3--4--5--6--

    scan((origin, next) => origin + next)

scan : --0--1--3--6--10--

scan还有第三个参数为index下标

buffer

buffer是一个系列的操作符:

  • buffer
  • bufferCount
  • bufferTime
  • bufferToggle
  • bufferWhen

buffer是用一个observable将目标observable进行缓存输出

如下例子:

    interval(300).pipe(
      buffer(interval(1000))
    ).subscribe(res => console.log(res))
    
source1 : --0--1--2--3--4--5--6--
source2 : ------0------1----
     source1.buffer(source2)
     
buffer: ------(012)------(345)---

bufferTime是用时间来缓存

    interval(300).pipe(
      bufferTime(1000)
    ).subscribe(res => console.log(res))
 source1 : --0--1--2--3--4--5--6--
     source1.bufferTime(1000)
     
buffer: ------(012)------(345)---   

bufferCount是用数量来缓存

    interval(300).pipe(
      bufferCount(2)
    ).subscribe(res => console.log(res))
    
 source1 : --0--1--2--3--4--5--6--
     source1.bufferCount(2)
 buffer: ----(01)----(23)--- 

另外两个很少使用,bufferWhen传入一个function来决定何时关闭、发出、重置缓存。bufferToggle,第一个参数为一个observable来规定开启缓存期,第二个参数为函数返回缓存时间。

delay 和 delayWhen

delay可以延迟observable第一次发送元素的时间点,delayWhen也是延迟发送,但是它是传入一个返回observable的回调函数针对每一个元素触发延迟.

    interval(300).pipe(
      delay(300)
    ).subscribe(res => console.log(res))
    
source1 : --0--1--2--3--4--5--6--
            delay(300)
delay     ----0--1--2--3--4--5-



    interval(300).pipe(
      delayWhen(x => empty().pipe(delay(100 * x))
    ).subscribe(res => console.log(res))
    
 source1 : --0--1--2--3--4--5--6--
    delayWhen(x => empty().pipe(delay(100 * x))
delay     --0---1----2-----3------4--   

delayWhen的回调参数x为当前传入的元素。

这两个操作符通常用来进行一些ui交互的延迟操作。

throttleTime 和 debounceTime

debounceTime是非常常用的一个用来处理防抖的操作符,它可以将传入的元素缓存规定的时间,若时间内有新的元素进来,则刷新时间和元素,待到时间结束,发出元素。

    interval(300).pipe(
      take(5),
      debounceTime(1000)
    ).subscribe(res => console.log(res))

 source1 : --0--1--2--3--4|
        debounceTime(1000)
debounce:  --------------4|
    

debounceTime常用于输入框的联想功能对后端发送请求的防抖,也用于异步校验器的防抖优化。

throttleTime和debounce类似,在收到第一个元素时,立刻送出,但是会在规定时间内不接收任何元素。

    interval(300).pipe(
      take(5),
      throttleTime(1000)
    ).subscribe(res => console.log(res))

 source1 : --0--1--2--3--4|
        throttleTime(1000)
debounce:  --0----------4|

throttleTime常用于防止重复向后端发送请求的情况,例如下载按钮,点击一次后进入沉默状态一段时间,防止多次下载。

distinct

distinct是一个过滤的操作符,可以帮我们把之前出现过的元素过滤掉

    of(1, 2, 3, 1, 2).pipe(
      distinct()
    ).subscribe(res => console.log(res))
    // 输出123, 12被过滤掉

distinct有两个参数,第一个参数为一个回调函数,可以用来进行筛选条件的判断,第二个参数为一个observable,用来触发何时清除记录的缓存。

distinctUntilChanged

distinctUntilChanged和distinct类似,也是过滤相同元素,不同的是distinctUntilChanged只会和最后一个元素作比较,只要相邻的两个不相同就可以正常发送

of(1, 2, 3, 3, 1, 2).pipe(
      distinct()
    ).subscribe(res => console.log(res))
    // 输出12312, 3被过滤掉

小结

RxJS的操作符还有不少,不过其他都很少在日常中使用到。这些操作符灵活使用可以让observable之间的交互变得更加容易和直观。