rxjs核心概念之Subject

414 阅读1分钟

订阅 Observable

我们直到rxjs是基于观察者模式与迭代器模式的实现的ObservableObserver之间数据推送体系(具体内容参看博客: rxjs核心概念之Observable)。

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const sub$ = interval(1000).pipe(take(4));
sub$.subscribe(val => console.log(`A: ${val}`));
setTimeout(() => sub$.subscribe(val => console.log(`B: ${val}`)), 2000);

输出结果如下:

订阅Observable
由此我们可以看出来:

  • Observable 对象可以被重复多次订阅。
  • Observable 对象每次被订阅后,都会重新执行。也就是说是cold observable

我们甚至可以简单的认为每次订阅都是再次执行一次函数行为,函数模拟如下:

function interval(desc, count, delay) {
	let index = 0;
	const timer = setInterval(() => {
		if (index > count) {
			clearInterval(timer);
		}
		console.log(`${desc}: ${index}`);
	}, delay);
}

interval('A', 4, 1000);

setTimeout(() => interval('B', 4, 1000), 2000);

输出结果如下:

模拟Observable
这种一对一的默认推送关系,能满足大部分使用场景。但也有时我们也许一对多的关系,也就是hot observable,不会从头开始接收Observable发出的值,而是从第一次订阅当前正在处理的值开始发送,这也是RxJS中的组播,类似与node中的EventEmitter技术。这也就是我们下面要讲的Subject

Subject

Subject类似与node中的EventEmitter技术,也就是说Subject内部维护了一个Observer列表,当有数据更新是会发布通知到所有的观察者。因此,RxJSSubject即实现了Observable也实现了Observer

import { Subject } from 'rxjs';

const sub$ = new Subject();
sub$.subscribe(val => console.log(`Subject A: ${val}`));

for (let i = 0; i < 4; i++) {
	if (i === 1) sub$.subscribe(val => console.log(`Subject B: ${val}`));
	sub$.next(i);
}

输出结果如下:

Subject
由此可见:

  • Subject既是Observable对象,又是Observer对象。
  • 当有新消息时,Subject会通知内部的所有观察者。

Subject & Observable

Subject是一类特殊的Observable,它可以向多个Observer多路推送数值。因为Subject内部维护了一个观察者列表,所以当观察者订阅Subject对象时,Subject对象会把订阅者添加到观察者列表中,每当有Subject对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的next()方法,把值一一送出。 由于Subject即实现了Observable又实现了Observer,所以在Subject类中有五个重要的方法:

  • next —— 每当 Subject 对象接收到新值的时候,next 方法会被调用。
  • error —— 运行中出现异常,error 方法会被调用。
  • complete —— Subject 订阅的 Observable 对象结束后,complete 方法会被调用。
  • subscribe —— 添加观察者。
  • unsubscribe —— 取消订阅(设置终止标识符、清空观察者列表)。

RxJS除了提供Subject之外,还提供了Subject的几种变体:BehaviorSubjectReplaySubjectAsyncSubject

BehaviorSubject

BehaviorSubject具有“最新的值”的概念,即是保存当前的最新状态。当有Observer订阅时它会将保存的最新的状态推送出去,当一个Observer订阅后,它会即刻从BehaviorSubject收到“最新的值”。

import { BehaviorSubject } from 'rxjs';

const sub$ = new BehaviorSubject(0);
sub$.subscribe(val => console.log(`BehaviorSubject A: ${val}`));

setTimeout(() => sub$.subscribe(val => console.log(`BehaviorSubject B: ${val}`)), 2000);
for (let i = 0; i < 4; i++) {
	sub$.next(i);
}

输出结果如下:

订阅BehaviorSubject
如果将BehaviorSubject换成SubjectsetTimeout之后订阅的observer将不输出任何值,因为延迟之后next输出已经完成了。

ReplaySubject

ReplaySubject可以向新的订阅者推送一段时间内的旧数值,就像一个录像机ReplaySubject可以记录Observable的一部分状态(过去时间内推送的值),而并不只是最新值。有时可能会有人认为 ReplaySubject(1) 是不是等同于 BehaviorSubject,其实它们是不一样的。在创建BehaviorSubject 对象时,是设置初始值,它用于表示 Subject 对象当前的状态,而 ReplaySubject 只是事件的重放。

import { ReplaySubject } from 'rxjs';

const sub$ = new ReplaySubject(2 /* 回放数量 */);
sub$.subscribe(val => console.log(`ReplaySubject A: ${val}`));

setTimeout(() => sub$.subscribe(val => console.log(`ReplaySubject B: ${val}`)), 2000);
for (let i = 0; i < 4; i++) {
	sub$.next(i);
}

输出结果如下:

订阅ReplaySubject
除了支持回放数量,ReplaySubject还可以指定“窗口时间”,决定ReplaySubject记录多久以前Observable推送的数值。

import { ReplaySubject } from 'rxjs';
const sub$ = new ReplaySubject(2 /* 回放数量 */, 1000 /* 窗口时间 */);
sub$.subscribe(val => console.log(`ReplaySubject A: ${val}`));

setTimeout(() => sub$.subscribe(val => console.log(`ReplaySubject B: ${val}`)), 1000);
for (let i = 0; i < 4; i++) {
	((val) => setTimeout(() => sub$.next(val), 500))(i);
}

输出结果如下:

窗口时间

AsyncSubject

AsyncSubject类似于last操作符,它会在Subject执行完成后,推送执行环境中的最后一个值。

const sub$ = new AsyncSubject();
sub$.subscribe(val => console.log(`AsyncSubject A: ${val}`));

for (let i = 0; i < 4; i++) {
	sub$.next(i);
}
sub$.complete();
setTimeout(() => sub$.subscribe(val => console.log(`AsyncSubject B: ${val}`)), 1000);

输出结果如下:

AsyncSubject描述