Angular与Rxjs学习

1,763

简介初级核心概念创建Observableof()from()fromEvent()fromPromise()interval()ajax()操作符(Operators)mapfiltershare其他错误处理重试失败的可观察对象问题记录fromPromise no longer exported in master (v6)参考文献

简介

RxJSReactiveX编程理念的JavaScript版本。ReactiveX来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。

RxJS 提供了一种对 Observable 类型的实现,直到 Observable 成为了 JavaScript 语言的一部分并且浏览器支持它之前,它都是必要的。这个库还提供了一些工具函数,用于创建和使用可观察对象。这些工具函数可用于:

  • 把现有的异步代码转换成可观察对象
  • 迭代流中的各个值
  • 把这些值映射成其它类型
  • 对流进行过滤
  • 组合多个流

初级核心概念

  • Observable: 一系列值的生产者
  • Observer: 它是observable值的消费者
  • Operator: 可以在数据流的途中对值进行转换的操作符

Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。

Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:

  • 订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
  • 发布:Observable 通过回调 next 方法向 Observer 发布事件。

创建Observable

RxJS 中提供了很多操作符,用于创建 Observable 对象,常用的操作符如下:

  • of(), 将普通JavaScript数据转为 Observable
  • from(), 把数组或iterable对象转换成Observable
  • create(), 返回一个可以在Observer上调用方法的Observable.
  • fromEvent(), 把event转换成Observable.
  • fromPromise(), 把Promise转换成Observable.
  • ajax(), 从ajax创建一个observable

of()

import { Component, OnInit } from '@angular/core';
import { Observable,of } from 'rxjs';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用of来创建可观察对象</h2>
    <div>
      <button (click)="getData()">Click here</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  getData() {
    // Create simple observable that emits three values
    const myObservable = of(123);

    // Create observer object
    const myObserver = {
      nextx => console.log('Observer got a next value: ' + x),
      errorerr => console.error('Observer got an error: ' + err),
      complete() => console.log('Observer got a complete notification'),
    };

    // Execute with the observer object
    myObservable.subscribe(myObserver);
  }

}

启动该项目,打开页面并点击按钮,出现这样的结果:

from()

import { Component, OnInit } from '@angular/core';
import { Observable,from } from 'rxjs';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用from函数创建可观察对象</h2>
    <div>
      <button (click)="fromData()">根据数组创建Observable</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  fromData() {
    let persons = [
      { name'Dave'age34salary2000 },
      { name'Nick'age37salary32000 },
      { name'Howie'age40salary26000 },
      { name'Brian'age40salary30000 },
      { name'Kevin'age47salary24000 },
    ];

    const myObservable = from(persons);
    myObservable.subscribe(person => console.log(person));
  }

}

页面测试结果为:

fromEvent()

import { Component, OnInit } from '@angular/core';
import { Observable,fromEvent } from 'rxjs';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用fromEvent函数创建可观察对象</h2>
    <div>
      <p id="content">Hello Hresh</p>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
      this.fromEvent();
  }

  fromEvent() {
    const el = document.getElementById('content');

    const mouseMoves = fromEvent(el, 'click');

    const subscription = mouseMoves.subscribe(() => {
      el.style.color = 'red';
    });
  }
}

运行项目,点击文本,文本将会变为红色。

关于 fromEvent 在实际生产中有个典型的应用:输入提示(type-ahead)建议

可观察对象可以简化输入提示建议的实现方式。典型的输入提示要完成一系列独立的任务:

  • 从输入中监听数据。
  • 移除输入值前后的空白字符,并确认它达到了最小长度。
  • 防抖(这样才能防止连续按键时每次按键都发起 API 请求,而应该等到按键出现停顿时才发起)
  • 如果输入值没有变化,则不要发起请求(比如按某个字符,然后快速按退格)。
  • 如果已发出的 AJAX 请求的结果会因为后续的修改而变得无效,那就取消它。

完全用 JavaScript 的传统写法实现这个功能可能需要大量的工作。使用可观察对象,你可以使用这样一个 RxJS 操作符的简单序列:

import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { debounceTime, distinctUntilChanged, filter, map, switchMap } from 'rxjs/operators';


const searchBox = document.getElementById('search-box');

const typeahead = fromEvent(searchBox, 'input').pipe(
  map((e: KeyboardEvent) => (e.target as HTMLInputElement).value),
  filter(text => text.length > 2),        //判断输入内容长度是否大于2
  debounceTime(500),    //等待,直到用户停止输入(这个例子中是停止 1/2 秒)
  distinctUntilChanged(),    // 等待搜索文本发生变化。
  switchMap(() => ajax('/api/endpoint'))        //将搜索请求发送到服务。
);

typeahead.subscribe(data => {
 // Handle the data from the API
});

fromPromise()

import { Component, OnInit } from '@angular/core';
import { from, Observable } from 'rxjs';
import { fromPromise } from 'rxjs/internal/observable/fromPromise';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用from函数创建可观察对象</h2>
    <div>
      <button (click)="fromPromiseData()">根据Promise创建Observable</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  fromPromiseData() {
    const myObservable = fromPromise(new Promise((resolve, reject) => {
      setTimeout(() => {
        // tslint:disable-next-line:prefer-const
        let username = 'hresh----Promise';
        resolve(username);
      }, 2000);
    }));
    myObservable.subscribe({
      next(data) { console.log(data); },
      error(err) { console.error('Error' + err); },
      complete() { console.log('completed'); }
    });
  }

}

在 Rxjs6 之后,from 可以用来代替 fromPromise,所以上述代码改为 from()也是可以的。页面测试结果为:

这里扩展一下,官方文档有个案例是通过 fetch 方法返回 Promise 对象,这里我做了一些修改:

import { Component, OnInit } from '@angular/core';
import { from, Observable } from 'rxjs';
import { fromPromise } from 'rxjs/internal/observable/fromPromise';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用from函数创建可观察对象</h2>
    <div>
      <button (click)="fromPromiseData()">根据Promise创建Observable</button>
      <br>
      <button (click)="fromData2()">根据Promise得到的数组创建Observable</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  fromPromiseData() {
    const myObservable = from(fetch('http://a.itying.com/api/productlist'));
    myObservable.subscribe({
      next(data) { console.log(data); },
      error(err) { console.error('Error' + err); },
      complete() { console.log('completed'); }
    });
  }

  fromData2() {
    //fetch()返回的Promise,获取Promise对象中的内容,即数组
    let arrayData = [];
    fetch('http://a.itying.com/api/productlist').then(response => response.json()).then((data => {
      arrayData = data.result;
    }));

    let myObservable = null;
    setTimeout(() => {
      myObservable = from(arrayData);
      myObservable.subscribe(data => console.log(data));
    }, 2000);
  }

}

页面测试结果为:

interval()

import { Component, OnInit } from '@angular/core';
import { interval, Observable } from 'rxjs';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用interval函数创建可观察对象</h2>
    <div>
      <button (click)="interval2()">根据interval创建Observable</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  interval2() {
    const secondsCounter = interval(1000);
    // Subscribe to begin publishing values
    const oData = secondsCounter.subscribe(n =>
      console.log(`It's been ${n} seconds since subscribing!`));
    setTimeout(() => {
      console.log('取消计数操作');
      oData.unsubscribe();  /*5s后取消数据显示*/
    }, 5000);
  }

}

interval 操作符支持一个数值类型的参数,用于表示定时的间隔。上面代码表示每隔 1s,会输出一个递增的值,初始值从 0 开始。页面测试结果为:

ajax()

import { Component, OnInit } from '@angular/core';
import {ajax} from 'rxjs/ajax';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>使用ajax函数创建可观察对象</h2>
    <div>
      <button (click)="ajax2()">根据ajax创建Observable</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  ajax2(){
    const apiData = ajax('http://a.itying.com/api/productlist');
    // Subscribe to create the request
    apiData.subscribe(res => console.log(res.status, res.response));
  }

}

页面测试结果:

操作符(Operators)

上述我们讲解的创建 Observable 的方法其实就是 Rxjs 的操作符,操作符是基于可观察对象构建的一些对集合进行复杂操作的函数。RxJS 还定义了一些操作符,比如 map()filter()concat()flatMap()

操作符接受一些配置项,然后返回一个以来源可观察对象为参数的函数。当执行这个返回的函数时,这个操作符会观察来源可观察对象中发出的值,转换它们,并返回由转换后的值组成的新的可观察对象。

map

import { map } from 'rxjs/operators';

const nums = of(123);

const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);

squaredNums.subscribe(x => console.log(x));

// Logs
// 1
// 4
// 9

可以看到 map 接受一个 function 作为参数, 通过该 function 可以把每个元素做平方运算进行转换.。

filter

你可以使用管道来把这些操作符链接起来。管道让你可以把多个由操作符返回的函数组合成一个。pipe() 函数以你要组合的这些函数作为参数,并且返回一个新的函数,当执行这个新函数时,就会顺序执行那些被组合进去的函数。

应用于某个可观察对象上的一组操作符就像一个处理流程 —— 也就是说,对你感兴趣的这些值进行处理的一组操作步骤。这个处理流程本身不会做任何事。你需要调用 subscribe() 来通过处理流程得出并生成一个结果。

import { filter, map } from 'rxjs/operators';

const nums = of(12345);

// Create a function that accepts an Observable.
const squareOddVals = pipe(
  filter((n: number) => n % 2 !== 0),
  map(n => n * n)
);

// Create an Observable that will run the filter and map functions
const squareOdd = squareOddVals(nums);

// Subscribe to run the combined functions
squareOdd.subscribe(x => console.log(x));

// Logs
// 1
// 9
// 25

pipe() 函数也同时是 RxJS 的 Observable 上的一个方法,所以你可以用下列简写形式来达到同样的效果:

import { filter, map } from 'rxjs/operators';

const squareOdd = of(12345)
  .pipe(
    filter(n => n % 2 !== 0),
    map(n => n * n)
  );

// Subscribe to get values
squareOdd.subscribe(x => console.log(x));

share

在讲解 share 操作符前,我们需要了解一下冷热两种模式下的 Observables。官方定义如下:

Cold Observables 在被订阅后运行,也就是说,observables 序列仅在 subscribe 函数被调用后才会推送数据。与 Hot Observables 不同之处在于,Hot Observables 在被订阅之前就已经开始产生数据,例如mouse move事件。

从原理来说是这样的: Cold内部会创建一个新的数据生产者, 而Hot则会一直使用外部的数据生产者.

举个例子:

Cold: 就相当于我在B站看英雄联盟赛事重播,可以从头看起。

Hot: 就相当于看英雄联盟赛事直播, 如果来晚了, 那么前面就看不到了。

share() 操作符允许多个订阅者共享同一个 Observable. 也就是把 Cold 变成 Hot。看如下示例:

import { Component, OnInit } from '@angular/core';
import { interval } from 'rxjs';
import { share, take } from 'rxjs/operators';

@Component({
  selector'app-observable',
  templateUrl'
    <h2>share操作符</h2>
    <div>
      <button (click)="shareData()">Click</button>
    </div>
  '

})
export class ObservableComponent implements OnInit {
  constructor() { }

  ngOnInit(): void {
  }

  shareData() {
    const numbers = interval(1000).pipe(
      take(5),
      share()
    );

    function subscribeToNumbers(name{
      numbers.subscribe(
        x => console.log(`${name}${x}`)
      );
    }

    subscribeToNumbers('Dave');

    const anotherSubscription = () => subscribeToNumbers('Nick');

    setTimeout(anotherSubscription, 2500);
  }

}

页面测试结果为:

这里 interval 是每隔1秒产生一个数据, take(5)表示取5个数, 也就是1,2,3,4,5,然后 share()就把这个 observable 从 cold 变成了 hot 的。后边 Dave 进行了订阅,2.5秒以后, Nick 进行了订阅。

其他

RxJS 提供了很多操作符,不过只有少数是常用的。 如下图所示:

关于这些操作符的使用可以参看 RxJS 系列之三 - Operators 详解RxJS API 文档

错误处理

除了可以在订阅时提供 error() 处理器外,RxJS 还提供了 catchError 操作符,它允许你在管道中处理已知错误。

假设你有一个可观察对象,它发起 API 请求,然后对服务器返回的响应进行映射。如果服务器返回了错误或值不存在,就会生成一个错误。如果你捕获这个错误并提供了一个默认值,流就会继续处理这些值,而不会报错。

import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('http://a.itying.com/api/productlist').pipe(
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});

页面测试结果:

重试失败的可观察对象

catchError 提供了一种简单的方式进行恢复,而 retry 操作符让你可以尝试失败的请求。

可以在 catchError 之前使用 retry 操作符。它会订阅到原始的来源可观察对象,它可以重新运行导致结果出错的动作序列。如果其中包含 HTTP 请求,它就会重新发起那个 HTTP 请求。

下列代码把前面的例子改成了在捕获错误之前重发请求:

import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';

const apiData = ajax('/api/data').pipe(
  retry(3), // Retry up to 3 times before failing
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});

注意:不要重试登录认证请求,这些请求只应该由用户操作触发。我们肯定不会希望自动重复发送登录请求导致用户的账号被锁定。

问题记录

fromPromise no longer exported in master (v6)

RxJS version:
6.0.0-alpha.3

Code to reproduce:

import { fromPromise } from 'rxjs';

Expected behavior:
should work just as it did with v5.5 (but different location)

解决办法:统一使用 from 来代替 fromPromise。

参考文献

关于Angular6版本升级和RXJS6新特性的讲解

RxJS: 简单入门

RxJS 系列之二 - Observable 详解

RxJS 系列之三 - Operators 详解

RxJS速成 (上)