[译] RxJS: 白话 Subjects

1,738 阅读3分钟

原文链接: netbasal.com/rxjs-subjec…

本文为 RxJS 中文社区 翻译文章,如需转载,请注明出处,谢谢合作!

如果你也想和我们一起,翻译更多优质的 RxJS 文章以奉献给大家,请点击【这里】

我已经发表过一篇关于 Subject 的文章 (中文),但这次我想尝试一种不同的方式。

要理解 Subject 是什么的最简单的方式就是重新创建一个。我们来创建一个简易版的 Subject

注意: 下面的示例只是为了阐述概念,还不足以应用于实际开发之中,还有它们并不是 Rx 中 Subjects 的真正完整实现。

我们来看看真相。

Subject 既是 Observable,又是 Observer 。

Subject 是 Observable

这表示它拥有所有的操作符 (mapfilter,等等) 并且你可以订阅它。

class MySubject extends Rx.Observable {
 
 constructor() {
    super();
 }
 
}

这是第一部分所需的一切了。它可以通过扩展 Observable 类成为 Observable

Subject 是 Observer

这表示它必须实现 next()error()complete() 方法。

class MySubject extends Rx.Observable {
 
 constructor() {
   super();
 }

 next() {}
  
 error() {} 
  
 complete() {}
 
}

好了,我们来看下一个真相。

Subject 可以扮演源 observable 和 众多观察者之间的桥梁或代理,使得多个观察者可以共享同一个 observable 执行。

class MySubject extends Rx.Observable {
 
 constructor() {
   super();
   this.observers = [];
 }
 
 subscribe(observer) {
   this.observers.push(observer);
 }
 
 next(value) {
   this.observers.forEach(observer => observer.next(value));
 }
 
 error(error) {
   this.observers.forEach(observer => observer.error(error));
 }
 
 complete() {
   this.observers.forEach(observer => observer.complete());
 }
 
}

当你调用 subscribe() 方法时,仅仅是将 observer 添加到一个数组中。next()error()completed() 方法会调用数组中每个 observer 的对应方法。

来使用我们的 Subject 。

const interval$ = Rx.Observable.interval(1000).take(7);

const subject = new MySubject();

subject.map(value => `Observer one ${value}`).subscribe(value => {
  console.log(value);
});

interval$.subscribe(subject);

setTimeout(() => {
  subject.map(value => `Observer two ${value}`).subscribe(value => {
     console.log(value);
  });
}, 2000);

当使用 Subject 时,无论你何时 subscribe, 你永远都会得到相同的执行,这点不同于典型的 observable,每次 subscribe 都会开启有个新的执行。(在我们的案例中,这表示你会有两个不相关的 intervals)

Subject 让你同享相同的 observable 执行

我们来总结一下这里发生了什么。

当对 subject 调用 subscribe 时,只是将 observer 添加到数组中。

subject 扮演 observer 时,每当源 observable (在我们的案例中是指 interval) 发出值时,它会调用数组中每个 observernext() 方法。

BehaviorSubject

现在让我们来尝试实现 BehaviorSubject 的简易版。

我们来看看真相

  • BehaviorSubject 需要一个初始值,因为它必须始终返回一个订阅值,即使它还没接收到 next() 调用。
  • 被订阅后,它会返回 subject 的最新值。
  • 无论在任何时候,你都可以在非 observable 的代码中使用 getValue() 方法来获取 subject 的最新值。
class MyBehaviorSubject extends Rx.Observable {

  constructor(initialValue) {
    super();
    this.observers = [];

    if (typeof initialValue === 'undefined') {
      throw new Error('You need to provide initial value');
    }

    this.lastValue = initialValue;
  }

  subscribe(observer) {
    this.observers.push(observer);
    observer.next(this.lastValue);
  }

  next(value) {
    this.lastValue = value;
    this.observers.forEach(observer => observer.next(value));
  }
  
  getValue() {
    return this.lastValue;
  }


}

来使用我们的 BehaviorSubject

const subject = new MyBehaviorSubject('initialValue');

subject.map(value => `Observer one ${value}`).subscribe(function(value) {
  console.log(value);
});

subject.next('New value');

setTimeout(() => {
  subject.map(value => `Observer two ${value}`).subscribe(function(value) {
    console.log(value);
  });
}, 2000);

ReplaySubject

现在让我们来尝试实现 ReplaySubject 的简易版。

我们来看看真相.

  • ReplaySubject 表示一个对象既是 observable 序列,又是 observer 。
  • 每次通知都会广播给所有已经订阅和未来的 observers,observers 会遵循缓冲调整策略。
class MyReplaySubject extends Rx.Observable {

  constructor(bufferSize) {
    super();
    this.observers = [];
    this.bufferSize = bufferSize;
    this.lastValues = [];
  }

  subscribe(observer) {
    this.lastValues.forEach(val => observer.next(val));
    this.observers.push(observer);
  }

  next(value) {
    if (this.lastValues.length === this.bufferSize) {
      this.lastValues.shift();
    }

    this.lastValues.push(value);
    this.observers.forEach(observer => observer.next(value));
  }
}

来使用我们的 ReplaySubject

const subject = new MyReplaySubject(3);

subject.next('One');
subject.next('Two');
subject.next('Three');
subject.next('Four');

setTimeout(() => {
 subject.map(value => `Later Observer ${value}`).subscribe(function(value) {
   console.log(value);
 });
}, 2000);

何时使用 Subject

  • 需要共享相同的 observable 执行。
  • 当需要决定观察者迟来时该怎么做,是否使用 ReplaySubjectBehaviorSubject
  • 需要完全控制 next()error()completed() 方法。

MediumTwitter 关注我,以阅读更多 Angular、Vue 和 JS 相关内容!