本文为 RxJS 中文社区 原创文章,如需转载,请注明出处,谢谢合作!
写在前面的话:
1. 本文内容仅适合 RxJS 初学者。
2. Observable 是 RxJS 中的核心,理解 Observable 是学习 RxJS 的基石。 本文中将讲解如何从头创建一个简易版的 Observable (麻雀虽小,五脏俱全),从而揭开 Observable 的神秘面纱,以加深对 Observable 的理解。
3. 本文内容来源于 Ben Lesh 的视频 Creating Observable From Scratch ,如果我的文字不能有助于你加深理解的话,请直接观看视频。
4. Ben Lesh 是谁?如果你已经在学习 RxJS 还有这个疑问的话,那么确实需要好好了解下此人,他是 RxJS 5 的领导者及布道者。所以请相信他的视频往往能助你拨开云雾见青天。
本文将由浅入深,一步一步地创建出更完善的 Observable 的简易版实现。
Talk is cheep. Show me the code! Let's Go!
阶段一: 极简版
function simpleObservable(observer) {
for (let i = 0; i < 10; i++) {
observer.next(i);
}
observer.complete();
}
const observer = {
next: value => console.log(`next -> ${value}`),
error: () => {},
complete: () => console.log('complete')
};
simpleObservable(observer);
// 同步输出
// next -> 0
// next -> 1
// next -> 2
// next -> 3
// next -> 4
// next -> 5
// next -> 6
// next -> 7
// next -> 8
// next -> 9
// complete
simpleObservable 其实并无任何神秘之处,它无非是一个接受 observer (观察者) 对象的函数而已,该函数定义了 Observable 的行为,在本例中是发出10个数字后完成。而 observer 仍是一如既往的简单,它只是一个带有三个回调函数的对象而已,这三个回调函数分别是 next、error、complete 。next 是发出值时的回调,error 是发出错误时的回调,complete 是发出完成通知时回调。最重要的是这三个回调函数还都是可选的。
这样一个最简单的 Observable 就诞生了,很简单是吧。如果你认为这样就可以了,那可真是 Too Young Too Simple ~
你可能会说,“我平时用的 Observable 都是异步的,但你这个 simpleObservable 却只是同步的啊!” 。确实如此啊,没有异步的 Observable 就是耍流氓,我们进入下一阶段。
阶段二: 极简版 + 异步
在 JavaScript 中实现异步最简单最直接的方式就是使用定时器 setTimeout 和 setInterval ,所以 RxJS 也不例外,来看代码:
function simpleObservable(observer) {
let i = 0;
const id = setInterval(() => {
if (i < 10) {
observer.next(i++);
} else {
observer.complete();
clearInterval(id);
}
}, 100);
}
const observer = {
next: value => console.log(`next -> ${value}`),
error: () => {},
complete: () => console.log('complete')
};
simpleObservable(observer);
// 异步输出
// next -> 0
// next -> 1
// next -> 2
// next -> 3
// next -> 4
// next -> 5
// next -> 6
// next -> 7
// next -> 8
// next -> 9
// complete
与阶段一不同之处就在于 simpleObservable 的实现,这里利用了 setInterval 实现异步,每100毫秒执行一次 observer 的 next 方法。既然用异步取代了同步,那必将会带来新的问题,比如在异步执行的过程中,我想要取消执行怎么办,看来我们的 simpleObservable 远没有如此 simple ,还要提供取消订阅的功能。
阶段三: 极简版+异步+取消订阅
既然在阶段二中使用了 setInterval 来实现异步,那么取消订阅就必然要使用其相对应的 clearInterval 了,至于具体怎么实现,我们来看代码:
function simpleObservable(observer) {
let i = 0;
const id = setInterval(() => {
if (i < 10) {
observer.next(i++);
} else {
observer.complete();
clearInterval(id);
}
}, 100);
return () => {
console.log('disposed!');
clearInterval(id);
}
}
const observer = {
next: value => console.log(`next -> ${value}`),
error: () => {},
complete: () => console.log('complete')
};
const unsub = simpleObservable(observer);
setTimeout(unsub, 500);
// 异步输出
// next -> 0
// next -> 1
// next -> 2
// next -> 3
// next -> 4
// disposed!
首先在 simpleObservable 内部返回一个调用 clearInterval 的函数以备取消时使用。而在 simpleObservable 调用的时候将返回的函数赋给 unsub,unsub 就是取消订阅的函数,在本例中将在500毫秒后运行 unsub 以取消订阅。
这样我们的 simpleObservable 就可以了吧?不,不,不,先来看一段代码:
function simpleObservable(observer) {
let i = 0;
const id = setInterval(() => {
if (i < 10) {
observer.next(i++);
} else {
observer.complete();
observer.next('stop me!!!'); // 新增代码
clearInterval(id);
}
}, 100);
}
// 为了篇幅,略去未改动的代码
// 异步输出
// next -> 0
// next -> 1
// next -> 2
// next -> 3
// next -> 4
// next -> 5
// next -> 6
// next -> 7
// next -> 8
// next -> 9
// complete
// next -> stop me!!!
这段代码以阶段二的 simpleObservable 为基础,在 observer.complete(); 之后又调用了 next 方法,对于 Observable 来说,complete() 之后不会发出任何值了,但本例中 next -> stop me!!! 依旧出现在了 complete 之后,这说明我们的 observer 还不够健壮,还有需要完善之处。
阶段四: SafeObserver 类的实现
话不多说,先看代码:
class SafeObserver {
constructor(destination) {
this.destination = destination;
}
next(value) {
const destination = this.destination;
if (destination.next && !this.isUnsubscribed) {
destination.next && destination.next(value);
}
}
error(err) {
const destination = this.destination;
if (!this.isUnsubscribed) {
this.isUnsubscribed = true;
if (destination.error) {
destination.error(err);
}
}
}
complete() {
const destination = this.destination;
if (!this.isUnsubscribed) {
this.isUnsubscribed = true;
if (destination.complete) {
destination.complete();
}
}
}
}
function simpleObservable(observer) {
const safeObserver = new SafeObserver(observer);
let i = 0;
const id = setInterval(() => {
if (i < 10) {
safeObserver.next(i++);
} else {
safeObserver.complete();
safeObserver.next('stop me!');
clearInterval(id);
}
}, 100);
return () => {
console.log('disposed!');
clearInterval(id);
}
}
const observer = {
next: value => console.log(`next -> ${value}`),
error: () => {},
complete: () => console.log('complete')
};
simpleObservable(observer);
// 异步输出
// next -> 0
// next -> 1
// next -> 2
// next -> 3
// next -> 4
// next -> 5
// next -> 6
// next -> 7
// next -> 8
// next -> 9
// complete
simpleObservable 的变化不大,只是在开头处用 SafeObserver 类生成一个 safeObserver,并用 safeObserver 替代之前的 observer 。SafeObserver 是一个新增的类,它会用传给 simpleObservable 的 observer 来生成一个更安全的 safeObserver,它会确保 Observable 在 complete() 之后 observer 的任何回调都不会生效。这个关键点就是 isUnsubscribed 属性,它会在首次调用 error 或 complete 时变为 true,以表示 Observable 完成并取消订阅了。
好了,现在该完善的功能也完善了,总该差不多了吧,但不知道你是否注意到了,我们的 simpleObservable 还只是个函数,为什么我们不把它弄的更专业些呢,像 SafeObserver 一样抽象成一个类。
阶段五: Observable 类的实现
class SafeObserver {
constructor(destination) {
this.destination = destination;
}
next(value) {
const destination = this.destination;
if (destination.next && !this.isUnsubscribed) {
destination.next && destination.next(value);
}
}
error(err) {
const destination = this.destination;
if (!this.isUnsubscribed) {
this.isUnsubscribed = true;
if (destination.error) {
destination.error(err);
}
}
}
complete() {
const destination = this.destination;
if (!this.isUnsubscribed) {
this.isUnsubscribed = true;
if (destination.complete) {
destination.complete();
}
}
}
}
class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(observer) {
const safeObserver = new SafeObserver(observer);
return this._subscribe(safeObserver);
}
}
const simpleObservable = new Observable((observer) => {
let i = 0;
const id = setInterval(() => {
if (i < 10) {
observer.next(i++);
} else {
observer.complete();
observer.next('stop me!');
clearInterval(id);
}
}, 100);
return () => {
console.log('disposed!');
clearInterval(id);
}
});
const observer = {
next: value => console.log(`next -> ${value}`),
error: () => {},
complete: () => console.log('complete')
};
simpleObservable.subscribe(observer);
// 异步输出
// next -> 0
// next -> 1
// next -> 2
// next -> 3
// next -> 4
// next -> 5
// next -> 6
// next -> 7
// next -> 8
// next -> 9
// complete
WoW ~ 我们终于有了 Observable 类,这看起来专业多了。
simpleObservable 不再是一个函数了,它现在是一个彻彻底底的 Observable 了,由我们的 Observable 类生成的 Observable 。而之前 simpleObservable 函数所做的事现在全权交给 Observable 类了。创建 Observable 时传入的是一个 _subscribe 函数,即之前 simpleObservable 函数所做的事,它负责 Observable 的行为。
Observable 只实现了一个 subscribe 方法,它负责接收 observer 对象并生成 safeObserver ,然后返回 _subscribe 调用的结果,即取消订阅函数。
最后,现在的 simpleObservable 要调用 subscribe 方法,来取代之前的直接调用。因为它现在已经是 Observable,而不在是简易版当中的函数了。
其实,写到这我已经是精疲力尽了,我相信你们也已经看烦了,但是真的只差那么一点点,我们再共同坚持一下,将取消订阅的执行重构至 SafeObserver 中。
阶段六: 终结版
class SafeObserver {
constructor(destination) {
this.destination = destination;
}
next(value) {
const destination = this.destination;
if (destination.next && !this.isUnsubscribed) {
destination.next && destination.next(value);
}
}
error(err) {
const destination = this.destination;
if (!this.isUnsubscribed) {
if (destination.error) {
destination.error(err);
}
this.unsubscribe();
}
}
complete() {
const destination = this.destination;
if (!this.isUnsubscribed) {
if (destination.complete) {
destination.complete();
}
this.unsubscribe();
}
}
unsubscribe() {
this.isUnsubscribed = true;
if (this._unsubscribe) {
this._unsubscribe();
}
}
}
class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(observer) {
const safeObserver = new SafeObserver(observer);
safeObserver._unsubscribe = this._subscribe(safeObserver);
return () => safeObserver.unsubscribe();
}
}
const simpleObservable = new Observable((observer) => {
let i = 0;
const id = setInterval(() => {
if (i < 10) {
observer.next(i++);
} else {
observer.complete();
observer.next('stop me!');
clearInterval(id);
}
}, 100);
return () => {
console.log('disposed!');
clearInterval(id);
}
});
const observer = {
next: value => console.log(`next -> ${value}`),
error: () => {},
complete: () => console.log('complete')
};
const unsub = simpleObservable.subscribe(observer);
setTimeout(unsub, 500);
// 异步输出
// next -> 0
// next -> 1
// next -> 2
// next -> 3
// next -> 4
// disposed!
改动点主要是 Observable 类的 subscribe 方法和 SafeObserver 类的 unsubscribe 方法,没有根本的改变什么,只是简单的重构,大家自行对比下:
subscribe(observer) {
const safeObserver = new SafeObserver(observer);
safeObserver._unsubscribe = this._subscribe(safeObserver);
return () => safeObserver.unsubscribe();
}
unsubscribe() {
this.isUnsubscribed = true;
if (this._unsubscribe) {
this._unsubscribe();
}
}
最后,还是要向 Ben Lesh 老兄致敬,能在短短10几分钟的视频中,编纂出一套 Observable 的编年史。希望大家看完后,能对 Observable 有一个更清晰的认知,感谢大家的耐心观看。