阅读 169

简析redux技术栈(二):认识saga的buffer和chanel

本文地址 我们知道redux-saga 也是通过中间件的形式与 redux 本身连接起来。例如下面使用了redux-saga的react项目需要以下这样的 初始化

function configureStore(initialState) {
  // 运行返回一个redux middleware
  const sagaMiddleware = createSagaMiddleware();
  return {
    ...createStore(
      reducer,
      initialState,
      applyMiddleware(middleware1, middleware2, sagaMiddleware)
    ),
    runSaga: sagaMiddleware.run
  };
}
复制代码

所以分析 redux-saga 的第一步,就从 redux-saga 的中间件开始。我们平时写代码在 react 中与 saga 进行交互,都是dispatch一个action到与我们的 saga 逻辑进行交互。翻看createSagaMiddleware源码,可以很清晰的看到,这就是使用了中间件后,我们每次dispatch一个 action 后,在 saga 中间件内会往channelput这个action,进而触发我们 saga 里面的逻辑。就实现了 react 组件和 saga 的交互了。那么这个stdChannel是什么呢

// 省略一些多余部分
function sagaMiddlewareFactory({ channel = stdChannel() } = {}) {
  //...
  function sagaMiddleware({ getState, dispatch }) {
    return next => action => {
      const result = next(action);
      // 实现了react和saga的交互
      channel.put(action);
      return result;
    };
  }
  //...
  return sagaMiddleware;
}
复制代码

在了解 saga 的运行机制之前,先学习 redux-saga 源码内部的两个比较常用的数据结构bufferchanel

buffer

buffer 是一个固定长度类似队列的数据结构,它有四种类型(下面介绍),对外暴露了几个函数,如下

  • put 用来缓存 action
  • take 取出一个 action
  • isEmpty 判断 buffer 是否为空
  • flush 取出缓存的内的所有 action

我们知道如果我们直接使用数组的 push/unshift(pop/shift)函数实现队列的话,当我们出队列的时候时间复杂度是o(n)。而这里的 buffer 实现是比较巧妙的。数据存储是使用定长数组。通过pushIndexpopIndex标识位来记录出入队列的位置,它们的初始值都是 0,出队列的时候直接把popIndex位置空,然后值+1。入队列则是pushIndex+1。这样,无论take还是put,时间复杂度都是o(1)

pushIndex达到了 buffer 的长度的时候,buffer 的处理会根据 buffer 类型不同进行处理

1、ON_OVERFLOW_THROW:超出限制直接报错

2、ON_OVERFLOW_SLIDE:类似于环状队列,达到长度限制后,从索引 0 继续存储。

3、ON_OVERFLOW_EXPAND:达到限制后,长度自动变大 2 倍。

4、ON_OVERFLOW_DROP:达到限制后,后续的都丢弃

chanel

chanel 的实现是类似发布/订阅的设计模式。chanel.take(taker)存入一个 taker 函数,chanel.put(action)时,取出 cb 函数执行,action 是用来消费 taker 的

  • 普通 chanel(单播)

特点:当put一个 action 时,如果没有taker的时候,会将这个 action 存起来,存 action 是用了上面提到的buffer这个数据结构。等到有 taker 的时候可以马上调用 action。

一个简化版的单播 chanel 实现如下

class Chanel {
  constructor() {
    // 存action
    this.buffers = [];
    // 存taker
    this.takers = [];
    this.isClosed = false;
  }
  take(cb) {
    if (this.isClosed) {
      return;
    }
    if (this.buffers.length > 0) {
      cb(this.buffers.shift());
    } else {
      this.takers.push(cb);
    }
  }
  put(action) {
    if (this.takers.length === 0) {
      this.buffers.push(action);
    } else {
      this.takers.shift()(action);
    }
  }
  close() {
    if (this.isClosed) {
      return;
    }
  }
}
复制代码

eventChanel 是在普通 Chanel 基础上实现,是用来用于订阅外部的事件源。chanel的一些使用参考可以看文档

简化的 eventChanel 实现如下,其实给订阅函数传进一个函数,调用这个函数可以往 Chanel 内 put 东西。

class eventChanel extends Chanel {
  constructor(subscribe) {
    super();
    this.unscribe = subscribe(action => {
      super.put(action);
    });
  }

  close() {
    this.unscribe();
    this.isClosed = true;
  }
}
复制代码
  • 多播(multiCast) chanel

从上面的中间件源码可以看到,redux-saga 默认情况下的ChanelstdChannelstdChannel就是基于多播 chanel (multiCastChanel)实现,只不过添加了redux-saga本身的调度系统。multiCastChanel和 nodejs 的eventEmiter是非常类似的,multiCastChaneltake类似于eventEmiteronce,multiCastChanelput类似于eventEmiteremit

通俗的理解,saga 内 multiCastChanel 和 Chanel 最大的区别是,multiCastChanel 不能存 action,只能存 taker,能根据 action 的 type 判断是否执行 taker;chanel 可以缓存 action 和 taker,接收到 action 马上触发 taker,不会判断 type,类似于两个人对话的样子(单播)

一个简化版的 multiCastChanel 实现如下

class Chanel {
  constructor() {
    this.isClosed = false;
    this.takers = [];
  }
  put(action) {
    if (this.isClosed) {
      return;
    }
    const takers = this.takers;
    for (let i = 0, len = takers.length; i < len; i++) {
      if (!takers[i].MATCH || action.type === takers[i].MATCH) {
        takers[i](action);
        takers.splice(takers.indexOf(takers[i]), 1);
      }
    }
  }
  take(cb, match) {
    cb["MATCH"] = match;
    this.takers.push(cb);
  }
  close() {
    this.isClosed = true;
  }
}
复制代码

源码中的 stdChanel 实现

export function stdChannel() {
  const chan = multicastChannel();
  const { put } = chan;
  chan.put = input => {
    // saga的action,不进入调度状态
    if (input[SAGA_ACTION]) {
      put(input);
      return;
    }
    asap(() => {
      put(input);
    });
  };
  return chan;
}
复制代码

上面代码中的multicastChannel和我们的简化版 chanel 原理是一样的。我们可以看到,stdChanel是对multicastChannelput方法进行了重写。只是对于非 saga 内置action使用asap(() => { put(input); });进行调用,这个asap方法其实是 saga 内部调度系统的一个执行函数,它的作用是如果当前 saga 是空闲状态,则执行我们的回调;如果是挂起状态则将回调存进任务队列中。后面会专门介绍 saga 的调度系统。