函数式编程|打造自定义的RxJs操作符

321 阅读2分钟

最近在学习函数式编程相关的知识,这篇文章源于学习自定义RxJs操作符的一点想法,如有理解不正确的地方,还请指出。


第一步:搭建环境

1. 安装项目依赖
mkdir rxjs && cd rxjs
yarn init -y
yarn add rxjs rxjs-compat webpack webpack-dev-server typescript ts-loader --save
yarn add webpack-cli --save-dev
2. 设置webpack和typescript

① 创建webpack.cofig.js文件

const path = require('path');

module.exports = {
  entry: './src/index.ts',
  devtool: 'inline-source-map',
  module: {
    rules: [
      {
        test: /\.tsx?$/,
        use: 'ts-loader',
        exclude: /node_modules/
      }
    ]
  },
  resolve: {
    extensions: [ '.ts', '.js', '.tsx' ]
  },
  output: {
    filename: 'bundle.js',
    path: path.resolve(__dirname, 'dist')
  }
};

② 创建typescript.json文件

{
  "compilerOptions": {
    "outDir": "./dist/",
    "noImplicitAny": true,
    "module": "es6",
    "moduleResolution": "node",
    "sourceMap": true,
    "target": "es6",
    "typeRoots": [
      "node_modules/@types"
    ],
    "lib": [
      "es2017",
      "dom"
    ]
  }
}

③ 在package.json文件下添加启动命令

"scripts": {
    "start": "webpack-dev-server --mode development"
  },
3. 完成项目设置

① 创建index.html并复制粘贴如下代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Learn RxJS with Coursetro</title>

    <style>
        body { font-family: 'Arial'; background: #ececec; }
        ul { list-style-type: none; padding: 20px; }
        li { padding: 20px; background: white; margin-bottom: 5px; }
    </style>
</head>
<body>
    <script src="/bundle.js"></script>
</body>
</html>

② 创建src文件夹,在项目下创建``index.ts

import * as Rx from "rxjs/Observable";

console.log(Rx);

③ 在浏览器中访问 http://localhost:8080,打开控制台即可看到 Rx 对象数据

第二步:创建通用subscriber

import {from} from "rxjs";

const observable$ = from([1,2,3,4,5]);

const subscriber = {
    next: (value: any) => console.log(value),
    complete: () => console.log('has something done.'),
    error: (err: any) => console.error('something error: ', err)
};

observable$.subscribe(subscriber);

// 输出:
// 1
// 2
// 3
// 4
// 5 

第三步:继承Subscriber并重写_next函数

class DoubleSubscriber extends Subscriber<number> {
    protected _next(value: number): void {
        // @ts-ignore
        this.destination.next(value * 2);
    }
}

observable$.subscribe(new DoubleSubscriber(subscriber));

// 输出:
// 2
// 4
// 6
// 8
// 10

第四步:连接source和subscribe,创建一个基础 pipe 操作符

① 手动连接

const double = (source: any) => {
  const o$ = new Observable();
  o$.source = source;
  o$.operator = {
      call(sub, source) {
          // @ts-ignore
          source.subscribe(new DoubleSubscriber(sub))
      }
  };
  return o$;
};

observable$.pipe(double).subscribe(subscriber);

② 使用lift链接source和subscriber

const double = (source: any) => source.lift({
    call(subscriber: Subscriber<number>, source: any): any {
        source.subscribe(new DoubleSubscriber(subscriber))
    }
});

第五步:创建一个可复用的 操作符

① 使用lift创建可复用的pipe操作符

// src/operators/multiply/index.ts
/**
 * @format
 * @Author: Alvin
 * @Date 2020-03-05
 * @Last modified by: Alvin
 * @Last modified time: 2020-03-05
 */
import {Subscriber} from "rxjs";
import {PartialObserver} from "rxjs/src/internal/types";

class MultiplySubscriber extends Subscriber<number> {
    constructor(subscriber: PartialObserver<any> | ((value: number) => void), number: number) {
        super(subscriber);
        // @ts-ignore
        this.number = number;
    }
    protected _next(value: number): void {
        // @ts-ignore
        this.destination.next(value * this.number);
    }
}

export const multiply = (number: number) => (source: any) => source.lift({
    call(subscriber: Subscriber<number>, source: any): any {
        source.subscribe(new MultiplySubscriber(subscriber, number))
    }
});


// src/index.ts
observable$.pipe(multiply(3)).subscribe(subscriber);
observable$.pipe(multiply(45)).subscribe(subscriber);

② 复用现有的map实现相同功能的multiply

import {map} from "rxjs/operators";

export const multiply = (number: number) => map((value: number) => value * number);

③ 自定义pipe构建自定义操作符

const pipe = (...fns: Array<Function>) => (source: any) => fns.reduce((acc: any, fn: any) => fn(acc), source)

export const multiply = (number: number) => pipe(
    map((value: number) => value * number),
    filter((value: number) => value < 100)
);

observable$.pipe(multiply(3)).subscribe(subscriber);
observable$.pipe(multiply(45)).subscribe(subscriber);

第六步:创建有RXJS特色的高阶map

在前面我们拆解了与map类似的操作符multiply的实现方式,这个操作符其实非常好理解,因为有JavaScript原生的map对应。接下来分析真正有RxJs特色的高阶map —— 把Observable对象玩得更加出神入化的操作符。

① mergeMap

mergeMap marble diagram

mergeMap能够解决异步操作的问题,最典型的应该属于AJAX请求的处理。在网页应用当中,每点击按钮一次就发送一个AJAX

请求给server,同时还要根据返回的结果在ui上更新状态。传统的方法来解决这样的异步操作代码会比较繁杂。

mergeMap把用户的点击操作看作一个数据流。把AJAX的返回结果也看做一个数据流,为原本繁杂的解决方式提供了一种简单解。

fromEvent(document.querySelector("#send"), 'click').pipe(
    mergeMap(() => ajax("apiUrl"))
).subscribe((result: any) => {
    // 常规处理ajax返回的结果
})

稍微讲解了merge的使用场景,接下来看看如果手动实现这样的功能,该如何做呢?

import {fromEvent, Observable, of, Subscriber} from "rxjs";
import {PartialObserver} from "rxjs/src/internal/types";
import {delay, scan} from "rxjs/operators";

class MergeMapSubscriber extends Subscriber<Function> {
    constructor(
        destination: PartialObserver<any> | ((value: any) => void),
        private readonly func: Function
    ) {
        super(destination);
        this.func = func;
    }

    protected _next(value: any): void {
        const o$ = this.func(value);

        o$.subscribe({
            next: (value: any) => {
                this.destination.next(value);
            }
        })
    }
}

const mergeMap = (func: any) => (source: Observable<any>) => source.lift({
    call(subscriber: Subscriber<any>, source: any): any {
        source.subscribe(new MergeMapSubscriber(subscriber, func))
    }
});

const observable$ = fromEvent(document, 'click').pipe(
    scan(i => i + 1, 0),
    mergeMap((value: any) => of(value).pipe(delay(500)))
);

const subscriber = {
    next: (value: any) => console.log("subscriber function output ::: ", value),
    complete: () => console.log('has something done.'),
    error: (err: any) => console.error('something error: ', err)
};

observable$.subscribe(subscriber);

② switchMap

switchMap marble diagram

上面介绍了一个适合于AJAX的mergeMap,但mergeMap会存在一个问题,每一个上游数据都将会引发调用Ajax而且会将每一个Ajax结果传递给下游。这样的处理方式似乎并不是适合所有的场景。比如对股票等网页系统等对数据显示实时性要求比较高的情况下相对来说,如果没有处理好将会出现重大经济损失。而switchMap正是解决了mergeMap这样弊端的操作符。

import {fromEvent, Observable, of, Subscriber} from "rxjs";
import {PartialObserver} from "rxjs/src/internal/types";
import {delay, scan} from "rxjs/operators";

class SwitchMapSubscriber extends Subscriber<Function> {
    private innerSubscription: any;
    constructor(
        destination: PartialObserver<any> | ((value: any) => void),
        private readonly func: Function
    ) {
        super(destination);
        this.func = func;
    }

    protected _next(value: any): void {
        const o$ = this.func(value);

      	// 保证获取到最新的数据流
        if(this.innerSubscription) {
            this.innerSubscription.unsubscribe();
        }

        this.innerSubscription = o$.subscribe({
            next: (value: any) => {
                this.destination.next(value);
            }
        })
    }
}

const switchMap = (func: any) => (source: Observable<any>) => source.lift({
    call(subscriber: Subscriber<any>, source: any): any {
        source.subscribe(new SwitchMapSubscriber(subscriber, func))
    }
});

export const observable$ = fromEvent(document, 'click').pipe(
    scan(i => i + 1, 0),
    switchMap((value: any) => of(value).pipe(delay(500)))
);

③ concatMap

concatMap marble diagram

concatMap的模式与switchMap基本相似,差别在于获取最新的数据流后concatMap将其顺序推进数据流,而switchMap则将原本的数据流取消订阅,转而订阅最新的数据流。

class ConcatMapSubscriber extends Subscriber<Function> {
    private innerSubscription: any;
    private buffer: Array<any> = [];
    constructor(
        destination: PartialObserver<any> | ((value: any) => void),
        private readonly func: Function
    ) {
        super(destination);
        this.func = func;
    }

    protected _next(value: any): void {
        const {isStopped} = this.innerSubscription || {isStopped: true};

        if(!isStopped) {
            this.buffer = [...this.buffer, value];
        } else {
            const o$ = this.func(value);
            this.innerSubscription = o$.subscribe({
                next: (value: any) => {
                    this.destination.next(value);
                },
                complete: () => {
                    if(this.buffer.length) {
                        const [first, ...rest] = this.buffer;
                        this.buffer = rest;
                        this._next(first);
                    }
                }
            });
						
          	// 顺序推进数据流
            this.add(this.innerSubscription);
        }
    }
}

const concatMap = (func: any) => (source: Observable<any>) => source.lift({
    call(subscriber: Subscriber<any>, source: any): any {
        source.subscribe(new ConcatMapSubscriber(subscriber, func))
    }
});

export const observable$ = fromEvent(document, 'click').pipe(
    scan(i => i + 1, 0),
    concatMap((value: any) => of(value).pipe(delay(500))),
    takeUntil(fromEvent(document, 'keydown'))
);

最后

关于rxjs的学习总结将不定期更新,如果觉得文章不错,请点个赞让我收到你对rxjs相关文章的期待。顺带推下公众号