rxjs核心概念之Observable

1,310 阅读3分钟

RxJS简介

RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。简单来讲RxJS的运行就是基于Observable和Observer之间的数据生产和消费的互动过程。因此,理解RxJs,首先要对Observable和Observer有深入的理解。顾名思义,Observable就是可被观察者,Observer就是观察者,两者通过Observable上的sunscribe方法作为桥梁将两者联系起来。 RxJS的数据流就是Observable对象,其实现了观察者模式(Observer Pattern)和迭代器模式(Iterator Pattern)这两种设计模式。

观察者模式

Observer Pattern 定义

观察者模式是软件设计模式的一种。在此种模式中,一个目标对象管理所有相依于它的观察者对象,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实时事件处理系统。----维基百科

当对象之间存在一对多的关系,就可以使用观察者模式,比如当一个对象更新时通知并更新到给多个对象。其中发布通知的是发布者,接收通知执行更新的是观察者。观察者模式属于行为模式。在RxJS中,Observable就是发布者,Observer几件事观察者,二者通过订阅subscribe进行关联。

发布者和观察者的关系
source$就是一个Observable对象,是发布者,它产生的连续数据通知给console.log这个观察者。

import { of } from 'rxjs';
const source$ = of(1, [2]);
source$.subscribe(console.log);

观察者模式就是分而治之的思想。其就是将复制的问题分隔开,每一部分有独立的功能模块进行处理。在RxJS其主要是将复杂的问题被分解成三个小问题:

  • 如何产生事件,这是发布者的责任,在RxJS中是Observable对象的工作。
  • 如何响应事件,这是观察者的责任,在RxJS中由subscribe的参数来决定。
  • 如何将发布者与观察者进行关联,也就是何时调用subscribe。 由此可见观察者模式的治的思想,就是将逻辑分为发布者(Publisher)和观察者(Observer),其中发布者只管负责产生事件,产生数据时通知所有注册挂的观察者,而不关心这些观察者如何处理这些事件,相对的,观察者可以被注册到某个发布者,只管接收到事件之后就处理,而不关心这些数据是如何产生的。观察者模式带来的好处很明显,这个模式中的两方都可以专心做一件事,而且可以任意组合。

迭代器模式

Iterator Pattern 定义

在 面向对象编程里,迭代器模式是一种设计模式,是一种最简单也最常见的设计模式。它可以让用户透过特定的接口巡访容器中的每一个元素而不用了解底层的实现。此外,也可以实现特定目的版本的迭代器。----维基百科

迭代器(Iterator)模式,又叫做游标(Cursor)模式。提供一个通用的接口对数据集合进行遍历,而不需要关系数据集合的内部实现。就像是一个移动的指针一样,从集合中一个元素移到另一个元素,完成对整个集合的遍历。

const array = [1, 2, 3];
const iterator = array[Symbol.iterator]();
iterator.next(); // {value: 1, done: false}

在ES6中迭代器模式:

  • next方法将游标移动到下一个元素
  • value属性获取当前的值
  • done属性判断是否已经遍历完所有的元素 在RxJS中,是看不到类似上面的代码,RxJs作为迭代器的使用者,并不需要主动去从Observable中“拉”取生产的数据进行消费,仅需要subscribe上Observable对象之后,自然就能够收到消息的推送,这就是观察者模式和迭代器两种模式结合的优势之处。

创建Observable

创建Observable已经很简单,RxJS提供了很多创建类操作符。

import { interval } from 'rxjs';
const source$ = interval(1000);
source$.subscribe(console.log);

Observable对象调用观察者next传递出数据的动作,实际上可以在subscribe桥梁方法中传入一个观察者对象类似{next(){}, complete(){}, error(){}},分别对Observable对象传递对数据、完结状态、错误进行处理。也可以分别传递三个函数参数。需要注意的是,Observable对象只有一种终结状态,完结(complete),要么是出错(error),一旦进入出错状态,这个Observable对象也就终结了,再不会调用对应Observer的next函数,也不会再调用Observer的complete函数;同样,如果一个Observable对象进入了完结状态,也不能再调用Observer的next和error。

Observable的终结状态

退订Observable

Observable和Observer之间初了通过subscribe将两者进行联系,有时也需要在一定条件下将两者的关系进行断开。在RxJS中,subscribe函数的返回结果是subscription对象,调用了subscription上的unsubscribe函数就可以退订Observable。

import { interval } from 'rxjs';
const source$ = interval(1000);
const subscription = source$.subscribe(console.log);
setTimeout(() => subscription.unsubscribe(), 3000); // 3秒后退订

hot & cold

Think of a hot Observable as a radio station. All of the listeners that are listening to it at this moment listen to the same song.A cold Observable is a music CD. Many people can buy it and listen to it independently.by Nickolay Tsvetinov

Observable对象就是一个数据流,可以在一个时间范围内生产一系列数据,如果仅有一个Observer的简单场景无需考虑太多,但是对于存在多个Observer的场景的复杂场景,尤其是当多个Observer并不是同时订阅,那就需要考虑两个传递合适产生的数据给Observer。两种选择:

  • A:Cold Observable,不能错过,需要获取Observable产生的所有数据。
  • B:Hot Observable,可以错过,获取订阅开始的时间算起Observable产生的所有数据。

Hot Observable无论是否被订阅,事件一直发生数据也一直产生。当Hot Observable有多个订阅者时,Hot Observable 与每一个订阅者共享信息,与订阅着是一对多的关系。Cold Observable只有被订阅时,才开始执行发射数据流的代码。并且Cold Observable和 订阅者是一对一的关系,当有多个不同的订阅者时,消息是重新完整发送的。也就是说对 Cold Observable 而言,有多个订阅者时,他们各自的事件是独立的。对于是使用Hot Observable还是Cold Observable,不同的场景可以有不同期望值。而RxJS同时提供这这种需求场景。并且也提供了hot与cold之间的互相转换。