阅读 323

RxJS与Redux结合使用(一):打造自己的redux-observable

背景

Redux 的核心理念是单向数据流,只能通过 dispatch(action) 的方式修改状态,使用react-redux可以在组件和redux之间形成下面这么一个数据流闭环:

view ->  action -> reducer -> state -> view
复制代码

然而,在实际业务中往往有大量异步场景,最直接的做法是在React组件中发起异步请求,在拿到数据后调用dispatch(action)去数据层修改数据。不过这样的做法使得视图层和数据层耦合在一起,会造成后期维护的困难。

Redux作者建议用中间件来处理异步流,因为在中间件中我们可以灵活地控制 dispatch的时机,这对于处理异步场景非常有效。较为常见的做法主要有两种:

  1. 更改action的类型,如redux-thunk,用函数替换了action;
  2. 在middleware中接收到action的时候做出一些对应处理,如redux-saga。

而我们今天要讲的rxjs与redux的结合,采用了第二种方式来处理异步流程。

中间件干预处理

市面上已经存在这么个中间件了:redux-observable。而我们今天要做的就是带领大家,一步一步慢慢实现自己的一个redux-observable。

这个中间件的原理我可以简化为下面的代码:

export default store => next => action => {
    const result = next(action);
    if (action.type === 'ping') {
        store.dispatch({ type: 'pong' })
    }
    return result;
}
复制代码

原理实在简单,在next(action)之后去根据action做判断,做一些异步逻辑,再发起dispatch修改数据即可,而redux-observable也只是在这个基础之上加入RxJs的一些特性。

处理异步逻辑的思路

如果你比较熟悉redux的话,就会知道,redux的中间件就是一个洋葱模型,上面我们也说了,我们会在中间件的后面根据你的action再去重新dispatch一些action,而Rxjs最核心的思想就是将数据都流化。所以你可以理解为action在中间件的末端流入一个管道,最后从管道又流出一些action,这些action最终会再次被store dispatch。

image

至于在这个管道中进行了什么样的变化、操作,那就是Rxjs的管辖范围,通过Rxjs的强大的操作符,我们可以非常优雅地实现异步逻辑。

所以,需要有个流来承载所有的action,这样你就可以通过这个action$来进行fetch:

action$.pipe(
    switchMap(
        () => fromPromise(fetch('/api/whatever')).pipe(
            map(res => action)
        )
    ),
    catchError(() => {})
)
复制代码

这样就将异步逻辑嵌入到流当中。

创建Action流

我们的核心思想是action in, action out,所以最终流出的action是要重新被store.dispatch消费的,所以action$是一个Observable对象。

同时,在dispatch的时候,action经过中间件,action中需要放入这个action,所以action也是一个observer。

因此,action$既是观察者又是可观察对象,是一个Subject对象:

替换中间件的简单写法,变成:

import { Subject } from 'rxjs/Subject';

export default (store) => {

  const action$ = new Subject();
  
  action$.subscribe(store.dispatch);
  
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
复制代码

在上面代码中我们在middleware中去放入action,然后通过订阅,store会触发dispatch。

但是,如果我们就这么写的话,这是个死循环,因为任何action在进入到action后就立马被消费者store.dipatch(action)执行了,这个action又会在后面的流程中重新被送到action去。

聪明的你应该想到了,我们对于进入到action$的action还没有进行任何过滤,而这个过滤过程也正是我们需要的处理异步逻辑的地方。

下面我们要把这个步骤加上。

流的转化器Epic

为了达到action的一个转化处理,我们将这个过程抽离出来,这个中间处理的逻辑称为Epic,epic的形式大概可以写为:

const epic = (action$) => {
    return action$.pipe(
        // 因为所有的action都会过来
        // 所以我们只需要处理我们想要的aciton
        filter(action => action.type === 'GET_USER'),
        switchMap(
            // 将fetch也转化为流
            () => fromPromise(fetch('/api/user/get', {
                method: 'POST',
                body: {
                    id: 1
                },
            })).pipe(
                map(res => ({ type: 'GET_USER_SUCCESS', payload: res })),
                catchError(error => ({ type: 'GET_USER_FAILED', payload: error }))
            )
        )
    )
}

复制代码

epic本质是一个函数,在这个函数中,我们在action$的基础上,加入了管道控制,产生了另外一个流,而这个流就是最终我们要的,对action进行了控制的action流,上面的fetch只是一个例子,在这个管道中,你可以处理任意的异步逻辑。

而我们要做的就是将这个Epic,整合进刚才的中间中。

做法也很简单,我们只需要将订阅从action$换到新的流上就可以了:

import { Subject } from 'rxjs/Subject';

export default (store) => {

  const action$ = new Subject();
  const newAction$ = epic(action$);
  
  newAction$.subscribe(store.dispatch);
  
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
复制代码

这样,action$在接收到新的action的时候,会流经epic定义的管道,然后才出发dispatch

多个Epic合并

到此,我们的中间件已经有初步处理异步逻辑的能力,但是,在现实中,我们的异步逻辑不可能只有一个,所以epic是会有很多的,而store去订阅的流只能是一个,所以这么多的epic产生的流要合并成一个流。

合并流的操作,强大的RxJs自然是有安排的,相信你想到了操作符merge,我们可以提供一个combineEpics的函数:

export const combineEpics = (...epics) => {
  const merger = (...args) => merge(
    ...epics.map((epic) => {
      const output$ = epic(...args);
      return output$;
    })
  );
  return merger;
};
复制代码

上面的代码不难理解,combineEpics整合了所有传入的epic,然后返回一个merger,这个merger是利用merge操作符,将所有的epic产生的流合并成一个流。

image

代码形式为:

const pingEpic = action$ => action$.pipe(
  filter(action => action.type === 'ping'),
  map(() => ({ type: 'pong' })),
);

const getUserEpic = action$ => action$.pipe(
  filter(action => action.type === 'GET_USER'),
  map(() => ({ type: 'GET_USER_SUCCESS', payload: { user: { name: 'kang' } } })),
);

const rootEpic = combineEpics(pingEpic, getUserEpic);

export default (store) => {
  const action$ = new Subject();
  const newAction$ = rootEpic(action$);

  newAction$.subscribe(store.dispatch);
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
复制代码

state获取

在epic中我们不可避免地要借助state里面的数据进行不同的处理,所以我们是需要获取到state的,所以你可以在中间件中的epci执行函数中添加一个参数,将state获取函数暴露出去:

export default (store) => {
  ...
  const newAction$ = rootEpic(action$, store.getState);
  ...
};
复制代码

这样epic里就可以用getState()获取state:

const pingEpic = (action$, getState) => action$.pipe(
  filter(action => action.type === 'ping'),
  map(() => ({ type: 'pong', payload: getState() })),
);
复制代码

进一步优化:将state也流化

上面的做法是直接去获取state,这样的做法是主动获取,不符合函数响应式编程模式。函数响应式中,state的改变状态,应该是要能被观察的。

当state也能被响应观察,我们就可以做更多的功能,例如:当state的某些数据在发生变化的时候,我们要去进行实时保存。

在传统模式的做法中,你可以在中间件中这样写:

export default store => next => action => {
    const oldState = store.getState();
    const result = next(action);
    const newState = store.getState();
    // 类似这样的写法
    if (newState.xxx !== oldState.xxx) {
        fetch('/api/save', {
            method: 'POST',
            body: {
            
            }
        }).then(() => {}).catch(() => {})
    }
    return result;
}
复制代码

这个处理逻辑要独立为一个中间件,而如果你将state也流化,你可以直接使用epic这样处理:

const saveEpic = (action$, state$) => state$.pipe(
const autoSaveEpic = (action$, state$) =>
  return action$.pipe(
    filter(action => action.type === 'AUTO_SAVE_ENABLE'), // 自动保存的开启
    exhaustMap(() => state$.pipe(
        pluck('xxx'), // 获取state.xxx
        distinctUntilChanged(), // 前后值不同时才将其发出。
        concatMap((value) => {
            // fetch to save
        }),
        // 自动保存的关闭
        takeUntil(action$.pipe(
            filter(action => action.type === 'AUTO_SAVE_DISABLE')
        ))
    ))
  )
)
复制代码

如果仔细阅读这段代码,可以发现这样的方式可以使用非常优雅的方式控制这个自动保存,可以和action$结合使用,快速开关自动保存,可以利用RxJs的特性解决保存的异步执行延迟问题。

如果你只是单存想要获取最新state,可以使用withLatestFrom操作符:

const countEpic = (action$, state$) => action$.pipe(
  filter(action => action.type === 'count'),
  withLatestFrom(state$),
  switchMap(([action, state]) => {
    return of({ type: 'whatever' });
  })
);
复制代码

在中间件加入state流:

export default (store) => {
  const action$ = new Subject();
  const state$ = new Subject();
  const source$ = rootEpic(action$, state$);

  source$.subscribe(store.dispatch);
  return next => (action) => {
    const result = next(action);
    state$.next(store.getState());
    action$.next(action);
    return result;
  };
};
复制代码

注意state.next要先执行,这样在epic中才会拿到最新的,另外可以知道一下,redux在init的时候不会经过中间件,所以当你没有dispatch任何action的时候,state最新值不是默认state。

Action的顺序问题

如果你有耐心看到这里,那么说明你对于redux结合RxJs的使用已经理解得差不多了,但是这里还是有个问题,就是action的生效顺序,我们可以直接看个例子说明,假设有下面这样两个epic:

const epic1 = action$ => action$.pipe(
  filter(action => action.type === 'one'),
  mergeMap(() => of({ type: 'two' }, { type: 'three' })),
);

const epic2 = action$ => action$.pipe(
  filter(action => action.type === 'two'),
  mergeMap(() => of({ type: 'four' })),
);
复制代码

store.dispatch({ type: 'one' }) 的时候,action的顺序为:

'one' -> 'two' -> 'four' -> 'three'
复制代码

可见,action的执行顺序并不是如我们预期的那样,在two触发后就发出了four,这是因为RxJs默认的调度器是同步的,用一段简单的代码,上面的效果类似于:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) => {
      p.print();
    });
  }
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, four, three
复制代码

换成上面的代码的话你就不陌生了吧,也会对于输出的结果表示肯定,但是我们需要的效果是

'one' -> 'two' -> 'three' -> 'four'
复制代码

这如何做到?

明显,需要将调度器换成其他的,RxJs有这么几种调度器:null(同步)、asap、queue、async、animationFrame。最后一种是动画场景的调度器,直接剔除,默认是第一种,那么就剩下asap、queue、async。在这个场景下,这三种调度器都是可行的,但是queue在大量的数据的时候对于性能是有利的,所以这里可以使用它。不过,记住,这三种调度器是有区别的,大家有兴趣的自己去google一下,只提示:asap是Micro Task、async是Macro Task、queue在延迟为0的时候接近于同步,在延迟不为0的时候与async一样。

中间件中:

  const action$ = new Subject().pipe(
    observeOn(queue)
  );
复制代码

这样得到的结果就是:

'one' -> 'two' -> 'three' -> 'four'
复制代码

如果用简单的代码,相当于发生了这样的变化:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) => {
      setTimeout(() => p.print(), 0);
    });
  }
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, three, four

复制代码

总结

本文就讲到这里,这次介绍了如何自己实现一个redux-observable,下次会讲redux-observable在实战中的一些应用,例如怎么类似dva那样进行模块化开发、如何统一处理loading、error等。

关注下面的标签,发现更多相似文章
评论