掌握RxJS系列(03):剖析Observable

628 阅读10分钟

掌握RxJS系列(03):剖析Observable

前言

这是掌握RxJS系列的第三篇文章,这篇文章主要是和大家一起来剖析一下RxJS中的Observable

初步了解

  • 创建一个Observable

    我们首先看一下下面创建一个Observable的代码:

    import { Observable } from "rxjs";
    
    // 创建一个Observable
    const observable = Observable.create(function subscribe(observer) {
      observer.next("Hello, World!");
    });
    // 订阅一个Observable
    observable.subscribe(val => {
      console.log(val);
    });

    我们首先从rxjs库里面导出Observable,然后通过它的create方法创建了一个observable; 在observable对象上面调用subscribe方法就完成了对observable的订阅。

    运行上面的代码,控制台就会打印出:

    Hello, World!
    

    到此为止,你已经学会了如何使用rxjs了,本次讲解到这里就结束了。 开个玩笑 😁

    说起创建一个Observable除了使用Observable.create我们还可以使用RxJS提供的创建操作符 在平时的工作中,我们使用这些创建操作符会更加频繁一些。

    rxjs中导出的Observable其实是一个,我们上面使用的Observable.create,这个类的静态方法的内部直接调用了Observable的构造器,可以看一下它的源码或者这部分的 文档; 我们也可以将上面代码中的Observable.create替换为new Observable

    传递给Observable.create的是一个subscribe函数,这个函数对于我们了解一个Observable是非常重要的,下文会有一些详细的讲解; 这个函数有一个默认的参数observer,通过调用observernext方法,我们可以把一个值传递给observable的订阅者。

  • 订阅一个Observable

    上面的代码也演示了如何订阅一个Observable对象,通过调用observablesubscribe方法,我们就可以订阅observable对象; observable.subscribe方法的参数可以是一到三个函数,还可以是一个observer类型的对象。 详情可以看subscribe

    你也许会注意到observable.subscribe中的subscribeObservable.create(function subscribe(observer) {...}中作为参数的subscribe函数的名字一样, 这不是一个巧合;虽然在代码层面,它们确实是不一样的,但是出于实用目的,你可以认为它们在概念上是相同的。

    我们需要注意的是,对于一个Observable对象可能会有多个订阅(subscribe),但是这些订阅都是独立的,它们之间没有任何的共享;也就是说,observable的每一次订阅, 与之对应的Observable.create里面的subscribe函数里面的程序就会重新运行一次;

    我们修改修改一下上面的代码,修改后如下:

    // 创建一个Observable
    const observable = Observable.create(function subscribe(observer) {
      console.log("------>");
      observer.next("Hello, World!");
    });
    // 第一个订阅
    observable.subscribe(val => {
      console.log(val);
    });
    // 第二个订阅
    observable.subscribe(val => {
      console.log(val);
    });

    我们可以看到控制台的输出如下所示,------>打印了两遍,说明Observable.create里作为参数的subscribe函数运行了两次。

    ------>
    Hello, World!
    ------>
    Hello, World!
    

    还有,我们需要知道,对于一个Observable的订阅,就像是调用一个函数那样;数据将会传递到它提供的回调函数里面。

    我们接下来在Observable.create里面添加一个异步的操作,修改后的代码如下所示:

    const observable = Observable.create(function subscribe(observer) {
      observer.next("Hello, World!");
      setTimeout(() => {
        observer.next("setTimeout");
      });
    });
    console.log("observable.subscribe begin ------>");
    observable.subscribe(val => {
      console.log(val);
    });
    console.log("observable.subscribe end ------>");
    

    上面代码的运行结果如下:

    observable.subscribe begin ------>
    Hello, World!
    observable.subscribe end ------>
    setTimeout
    

    代码的运行结果也表明了我们上面所说的,在传入Observable.createsubscribe函数中: 如果通过observer.next传递给外面的值是同步传递的,那么我们在订阅这个Observable的时候也会同步得到这个值; 如果通过observer.next传递给外面的值是异步传递的,那么我们在订阅这个Observable的时候也会异步得到这个值;

    还有一些需要知道的是:observable.subscribeaddEventListenerremoveEventListener等一些事件处理的API是不一样的; 对于Observable对象的subscribe我们传递的是一个observer, 但是这个observer并不是作为一个事件监听器注册在Observable中,Observable甚至都不会维护附加的observer列表。

    对一个Observable对象的订阅,是一种启动Observable execution的简单方法,然后把相应的值和事件传递到相应的订阅函数中。

  • 执行一个Observable

    Observable.create(function subscribe(observer) {...})中省略的代码,代表一个Observable executionObservable execution是延迟计算的, 只有当一个Observable对象被订阅的时候Observable execution才会被计算运行。Observable execution会同步或者异步的产生许多值。

    Observable execution可以产生三种类型的通知:

    • Next类型的通知:发送NumberStringObject等类型的值。
    • Error类型的通知:发送JavaScript的异常或者错误。
    • Complete类型的通知:不发送任何值。

    Next类型的通知是最重要和最常用的,这种类型的通知把数据传递给相应的ObserverErrorComplete这两种类型的通知 在Observable execution的执行过程中,只会发送一种;要么是Error类型,要么是Complete类型;并且只会发送一次。 因为一旦Error或者Complete类型的通知发送完毕,整个Observable execution就结束了。

    这三种通知的关系,我们可以使用Observable风格的约定来表示,表示如下:

    next*(error|complete)?
    

    我们接下来使用代码来实践一下上面所说的内容:

    const observable = Observable.create(function subscribe(observer) {
        observer.next(1);
        observer.next(2);
        observer.complete();
        observer.next(3) // 3 不会被打印出来
    });
    observable.subscribe(val => {
        console.log(val);
    });

    上面代码的运行结果如下所示:

    1
    2
    

    这表明了,在Observable execution在发出一个Complete通知后,整个Observable execution执行结束,后面的代码不会再执行了。

    接下来我们来通过Observable execution发送一个Error通知,代码如下所示:

    const observable = Observable.create(function subscribe(observer) {
        observer.next(1);
        observer.next(2);
        observer.error(0);
        observer.next(3) // 3 不会被打印出来
    });
    observable.subscribe(val => {
        console.log(val);
    });

    代码的运行结果如下所示:

    1
    2
    Uncaught 0 // 这一行在控制台显示为红色
    

    从上面的结果我们可以看出,当一个Observable execution发出一个Error通知之后,整个Observable execution执行结束,接下来的代码也就不会执行了。 但是控制台抛出了一个错误Uncaught 0,这是因为我们没有捕获这个错误,所以控制台就抛出了这个错误。关于这部分我们在下面的文章中有详细的说明。

    最佳实践: 我们可以使用try/catch来包裹我们Observable execution里面的代码;这样一来当我们的代码抛出错误的时候,就会发送一个Error类型的通知, 我们就可以及时地捕获到这个错误,方便我们下一步的处理。

  • 取消Observable Execution的执行

    因为有些Observable Execution的执行是无限的,所以我们需要一些方法取消Observable Execution的执行;先看下面的代码:

    import { interval } from "rxjs";
    const observable = interval(1000);
    const subscription = observable.subscribe(val => {
      console.log(val);
    });
    setTimeout(() => subscription.unsubscribe(), 4000);

    上面的代码,先从rxjs中导出interval创建操作符,它可以直接生成一个Observable对象;const observable = interval(1000);这条语句表明 我们生成的这个Observable对象每一秒会往外面发送一个Next类型的通知,并且会传递一个升序整数,从0开始,每秒增加1,一直持续下去。 如果我们想在一段时间后取消这个Observable Execution执行过程,我们应该怎么做呢?

    observable.subscribe方法会返回一个Subscription类型的对象, 它代表着对应的持续运行的Observable Execution,这个对象上面有一个unsubscribe方法,通过调用这个方法,我们可以实现对Observable Execution执行过程的取消。

    一般情况下,我们通过创建操作符生成的Observable,通过调用其subscriptionunsubscribe方法我们可以取消其Observable Execution的执行过程。 但是如果我们是通过Observable.create生成Observable对象的话,我们就需要自己定义如何取消Observable Execution执行的方法;我们可以通过返回一个unsubscribe 函数,来取消Observable Execution的执行。下面的代码是一个相应的例子:

    import { Observable } from "rxjs";
    const observable = Observable.create(observer => {
      const intervalId = setInterval(() => {
        console.log("inner interval");
        observer.next("hello, world!");
      }, 1000);
      return function unsubscribe() {
        clearInterval(intervalId);
      };
    });
    const subscription = observable.subscribe(val => {
      console.log(val);
    });
    
    setTimeout(() => {
      subscription.unsubscribe();
    }, 4000);

    运行的结果如下:

    inner interval
    hello, world!
    inner interval
    hello, world!
    inner interval
    hello, world!
    inner interval
    hello, world!
    

    从上面的结果我们可以看到,通过返回一个unsubscribe函数(在这个函数里面我们需要手动的取消上面定义的定时器), 当外部的subscription调用unsubscribe方法的时候,我们就可以取消整个Observable execution的执行过程。如果我们没有返回 这个unsubscribe函数,那么我们上面所定义的定时器会一直运行,就不会被清除。然后我们就看到控制台上会一直打印inner interval

    实际上,如果我们移除包裹在代码外层的ReactiveX类型,我们就可以得到如下所示很直观的JavaScript代码:

    const subscribe = observer => {
      const intervalId = setInterval(() => {
        console.log("inner interval");
        observer.next("hello, world!");
      }, 1000);
      return function unsubscribe() {
        clearInterval(intervalId);
      };
    };
    
    const unsubscribe = subscribe({
      next: val => {
        console.log(val);
      }
    });
    
    setTimeout(() => {
      unsubscribe();
    }, 4000);

    大家可能会想,为什么要在这么直观的代码上包裹一层ReactiveX类型的代码?主要原因是因为有了这层包裹, 我们就获得和RxJS类型的操作符一起使用的安全性和可组合性。

深入了解

接下来我们来深入了解一下我们上面学习到的一些内容,首先是Observable, 这是一个类,用来创建Observable;我们一般会使用它的create 方法来创建一个Observable对象;create方法其实调用的是Observable的构造器constructor()

那我们来看一下这个构造器需要传递的参数是什么,看下面的代码

...
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic)
...

我们可以知道,传递的参数是一个subscribe,它是一个函数;这个函数会在Observable对象调用subscribe方法的时候执行;这个函数有一个Subscriber类型的值, 然后我们可以通过这个subscriber,发送三种类型的通知,并且可以传递值给外面的 observer, 或者相应接收值的函数。

Observable对象的subscribe方法的参数可以是一个Observer类型的对象, 或者一到三个函数;如果是一个对象的话,需要满足Observer接口的一些属性; 一般情况下这个对象上面至少有一个next方法,来接收相应的Observable传递过来的值。如果给subscribe方法传递的参数是函数的话,那么可以传递一到三个, 第一个接收Next类型通知,第二个接收Error类型的通知,第三个接收Complete类型的通知;如果相应的通知可以传递值的话,那么我们函数的参数就是相应要传递的值。

Observable对象调用subscribe方法之后,返回的是一个Subscription类型的对象,它有一个unsubscribe方法,可以取消Observable execution的执行过程。

结束语

到这里我们这一篇文章就算是结束啦,如果文章中有什么不正确的地方,也希望大家指出来;以免误导别的读者。如果你有什么建议,反馈或者想法可以写在这里

版权声明:知识共享许可协议 共享-保持署名-非商业性使用-禁止演绎