Node.js 代码阅读笔记系列(0)Timer 的实现

1,263 阅读11分钟

setImmediate

先来看看当我们使用 setImmediate 的时候经历了那些过程

我们先这样用

setImmediate(fn, arg)

可以看到 setImmediate 接收到了 callback, arg1等几个参数

exports.setImmediate = function(callback, arg1, arg2, arg3) {
  if (typeof callback !== 'function') {
    throw new TypeError('"callback" argument must be a function');
  }

  var i, args;

    // 判断传入参数数量
  switch (arguments.length) {
       // 如果只有 callback 不带其他参数的话,立即退出这里的switch
    // fast cases
    case 1:
      break;
    case 2:
        // 只有一个参数的话,设置 `args` 为有包含一个参数的数组
      args = [arg1];
      break;
    case 3:
      args = [arg1, arg2];
      break;
    default:
        // 参数长度超过 4 的话,遍历之后的参数填入 `args`
      args = [arg1, arg2, arg3];
      for (i = 4; i < arguments.length; i++)
            // 这里也有提到在 Node 6.0.0 之后使用 `apply` 会比目前这种动态扩展数组快很多
        // extend array dynamically, makes .apply run much faster in v6.0.0
        args[i - 1] = arguments[i];
      break;
  }
  // 前面主要工作是参数的判断和包装,在这里开始创建 `Immediate`
  return createImmediate(args, callback);
};

前面主要工作是参数的判断和包装,在这里开始创建 Immediate

createImmediate


function createImmediate(args, callback) {
  // 这里注释提到,在使用 `const immediate` 在 6.0.0 中不能被优化
  // 创建 `Immediate` 节点,并给节点赋参数, 值得注意的是 `_callback` 和 `_onImmediate` 同样都是赋 `callback`

  var immediate = new Immediate();
  immediate._callback = callback;
  immediate._argv = args;
  immediate._onImmediate = callback;

    // 设置 `process._needImmediateCallback` 标记,并给 `processImmediate ` 赋值到 `process._immediateCallback` ,用于原生模块调用
  if (!process._needImmediateCallback) {
    process._needImmediateCallback = true;
    process._immediateCallback = processImmediate;
  }

  // `immediateQueue` 队列链表中加入 immediate 节点
  immediateQueue.append(immediate);

  return immediate;
}

这里的 createImmediate 根据接收的参数创建 immediate ,并把它加入到 immediateQueue 的队列,在线程中设置需要执行Immediate回调的标记。

Immediate 队列节点

这里用到的 Immediate 任务队列节点的构造函数。这里 ImmediateQueue 采用的的是一个无序链表。

function Immediate() {
    // 直接注册 callback 会导致优化不稳定(node v6.0.0, v8 5.0.71.35 老铁不稳啊)
    // 所以先就声明,有个疑问,这里是 hidden class 的问题吗?
  this._idleNext = null;
  this._idlePrev = null;
  this._callback = null;
  this._argv = null;
  this._onImmediate = null;
  // 设置为当前线程的域
  this.domain = process.domain;
}

processImmediate

function processImmediate() {
    // 取队列的头尾,申明 `domain` 也就是域
  var immediate = immediateQueue.head;
  var tail = immediateQueue.tail;
  var domain;

  // 清空队列头尾
  immediateQueue.head = immediateQueue.tail = null;

  while (immediate) {
       // immediate 任务的域
    domain = immediate.domain;

    // 如果没有回调就下一个
    if (!immediate._onImmediate) {
      immediate = immediate._idleNext;
      continue;
    }

    if (domain)
      domain.enter();
    // 不是很明白这里,之前不是给它俩都赋值了 `callback` 么 😯
    immediate._callback = immediate._onImmediate;

    // 先暂存一个下一个节点,避免 `clearImmediate(immediate)` 被调用时被清理。 
    var next = immediate._idleNext;

    tryOnImmediate(immediate, tail);

    if (domain)
      domain.exit();

    // 如果有调用 `clearImmediate(immediate)` 的话就使用之前暂存的next,没有的话,那就调用 `immediate._idleNext`
    if (immediate._idleNext)
      immediate = immediate._idleNext;
    else
      immediate = next;
  }

  // 判断 immediate 队列为空的话设置 `_needImmediateCallback ` 标志为false
  // 需要提到的是这里的逻辑 C++ 模块中有实现
  if (!immediateQueue.head) {
    process._needImmediateCallback = false;
  }
}

上面实现了 processImmediate 主要的作用是遍历 immediateQueue 中的节点,并调用 tryOnImmediate 尝试执行任务。

可以看到它被设置在 process_immediateCallback 。那么有一个疑问,他是在什么时候被调用执行的?

可以看到这里在env全局环境变量上设置 _immediateCallback 的的代理符号

// src/env.h

V(immediate_callback_string, "_immediateCallback")  

  static inline Environment* from_immediate_check_handle(uv_check_t* handle);
  static inline Environment* from_destroy_ids_idle_handle(uv_idle_t* handle);
  inline uv_check_t* immediate_check_handle();
  inline uv_idle_t* immediate_idle_handle();
  inline uv_idle_t* destroy_ids_idle_handle();
// src/node.cc

static void CheckImmediate(uv_check_t* handle) {
  Environment* env = Environment::from_immediate_check_handle(handle);
  HandleScope scope(env->isolate());
  Context::Scope context_scope(env->context());
  MakeCallback(env, env->process_object(), env->immediate_callback_string());
}

看到这里 CheckImmediate 感觉已经快接近答案了。

tryOnImmediate

我们继续回到 JS

function tryOnImmediate(immediate, oldTail) {
  var threw = true;
  try {
    // 这里是因为之前的 v8 会放弃优化带有`try/finally`的function,所以这里把执行函数再外置到一个小函数,small function 会得到v8优化
    runCallback(immediate);
    threw = false;
  } finally {
      // 如果执行成功并且有下一个节点
    if (threw && immediate._idleNext) {
      // 处理正常的话,继续下一个
      const curHead = immediateQueue.head;
      const next = immediate._idleNext;

      if (curHead) {
        curHead._idlePrev = oldTail;
        oldTail._idleNext = curHead;
        next._idlePrev = null;
        immediateQueue.head = next;
      } else {
        immediateQueue.head = next;
        immediateQueue.tail = oldTail;
      }
      // 下一个事件循环中继续处理 Immediate 任务队列
      process.nextTick(processImmediate);
    }
  }
}

前面提到为了获得v8优化的 tryOnImmediatetry/finally 中将执行节点的callback放在了 runCallback 这个 small function 中。

runCallback

function runCallback(timer) {
  const argv = timer._argv;
  const argc = argv ? argv.length : 0;
  switch (argc) {
    // 这里可以回头看看上面开始的创建时的参数处理
    case 0:
      return timer._callback();
    case 1:
      return timer._callback(argv[0]);
    case 2:
      return timer._callback(argv[0], argv[1]);
    case 3:
      return timer._callback(argv[0], argv[1], argv[2]);
    // more than 3 arguments run slower with .apply
    default:
      return timer._callback.apply(timer, argv);
  }
}

好像终于把 setImmediate 的创建处理部分 👀看完了

setTimeout

这里的参数处理和之前 setImmediate 参数处理很像

exports.setTimeout = function(callback, after, arg1, arg2, arg3) {
  if (typeof callback !== 'function') {
    throw new TypeError('"callback" argument must be a function');
  }

  var len = arguments.length;
  var args;
  if (len === 3) {
    args = [arg1];
  } else if (len === 4) {
    args = [arg1, arg2];
  } else if (len > 4) {
    args = [arg1, arg2, arg3];
    for (var i = 5; i < len; i++)
      args[i - 2] = arguments[i];
  }

  return createSingleTimeout(callback, after, args);
};

createSingleTimeout

这里开始有点不一样了,继续看代码

function createSingleTimeout(callback, after, args) {
    // 尝试转换为 Number 或者 NaN
  after *= 1;
  // 如果 after 小于 1 或者 after > TIMEOUT_MAX
  // after = 1
  if (!(after >= 1 && after <= TIMEOUT_MAX))
    after = 1;

    // 根据参数创建新的 Timeout 队列节点
  var timer = new Timeout(after, callback, args);
  if (process.domain)
    timer.domain = process.domain;

  // 加入到Timeout 队列
  active(timer);

  return timer;
}

const TIMEOUT_MAX = 2147483647; // 2^31-1

补充一下, TIMEOUT_MAX 的值为 2^31-1,也就是我们最多可以通过 setTimeout 延迟执行大约 2147483647 ms,也就是 24 天左右。

Timeout 节点的构造函数

function Timeout(after, callback, args) {
  this._called = false;
  this._idleTimeout = after;
  this._idlePrev = this;
  this._idleNext = this;
  this._idleStart = null;
  this._onTimeout = callback;
  this._timerArgs = args;
  // 这里会和setInterval联系起来
  this._repeat = null;
}

将 timeout 计时器插入计时器列表

这里的叫做 时间轮算法,这里给相同 ms 级的 timeout 任务共用了一个 timeWrap,相同时间的任务分配在同一个链表,使计时任务的调度和新增的复杂度都是 O(1), 也达到高效复用了同一个 timeWrap。

const active = exports.active = function(item) {
  insert(item, false);
};

// 计时器的调度或者重新调度的底层逻辑
// 将会添加计时器到已存在的计时器列表的末尾,或者创建新的列表

function insert(item, unrefed) {
  const msecs = item._idleTimeout;
  if (msecs < 0 || msecs === undefined) return;

    // TimerWrap 是原生模块 timer_wrap
  item._idleStart = TimerWrap.now();

  const lists = unrefed === true ? unrefedLists : refedLists;

    // 创建或者使用已存在的队列
  var list = lists[msecs];
  if (!list) {
    debug('no %d list was found in insert, creating a new one', msecs);
    lists[msecs] = list = createTimersList(msecs, unrefed);
  }

  L.append(list, item);
  assert(!L.isEmpty(list)); // list is not empty
}

创建 timeout 计时器列表

function createTimersList (msecs, unrefed) {
    // 创建一个新的链表并创建一个 TimerWrap 实例来对链表进行调度
  const list = new TimersList(msecs, unrefed);
  L.init(list);
  list._timer._list = list;

  if (unrefed === true) list._timer.unref();
  list._timer.start(msecs);

  list._timer[kOnTimeout] = listOnTimeout;

  return list;
}

TimersList

这里的链表节点和之前的 Immediate 不同的地方是 this._timer = new TimerWrap(), 这里创建了一个新的 TimerWrap 实例。

function TimersList (msecs, unrefed) {
  this._idleNext = null; // Create the list with the linkedlist properties to
  this._idlePrev = null; // prevent any unnecessary hidden class changes.
  this._timer = new TimerWrap();
  this._unrefed = unrefed;
  this.msecs = msecs;
  this.nextTick = false;
}

TimerWrap

TimerWrap 是 Nodejs中的一个类,实现在 /src/timer_wrap.cc, 是一个 uv_timer_t 的封装,是连接 JavaScript 和 libuv 的一个 brige。

我们先通过这个例子来看看 TimerWrap 能实现什么功能。


const TimerWrap = process.binding('timer_wrap').Timer
const kOnTimeout = TimerWrap.kOnTimeout | 0

let timer = new TimerWrap();
timer.start(2333);

console.log('started');

timer[kOnTimeout] = function () {
  console.log('2333!');
};

输出:
started

2333 // 2.333s之后

在 libuv 的 uv_timer_t 实现中使用的是 最小堆 的数据结构,节点的最小判断依据就是它的 timeout, 如果是相同 timeout 的话,则判断两个节点的 start_id, start_id 是一个递增的节点计数,这样也就保证了调用时序。

// deps/uv/src/unix/timer.c

static int timer_less_than(const struct heap_node* ha,
                           const struct heap_node* hb) {
  const uv_timer_t* a;
  const uv_timer_t* b;

  a = container_of(ha, uv_timer_t, heap_node);
  b = container_of(hb, uv_timer_t, heap_node);

  if (a->timeout < b->timeout)
    return 1;
  if (b->timeout < a->timeout)
    return 0;

  /* Compare start_id when both have the same timeout. start_id is
   * allocated with loop->timer_counter in uv_timer_start().
   */
  if (a->start_id < b->start_id)
    return 1;
  if (b->start_id < a->start_id)
    return 0;

  return 0;
}

TimerWrap 源码

TimerWrap 作为一个连接 libuv 的 birge,所以我们容易看到在 Start 方法中调用了uv_timer_start,传递了自己的指针,第二个参数为回调,第三个参数便是 timeout。

我们继续看看 OnTimeout, 它的主要工作就是调用 key 为 kOnTimeout 的回调,也就触发了我们 JavaScript 层的回调函数了。

// src/timer_wrap.cc
class TimerWrap : public HandleWrap {
...
private:
  static void Start(const FunctionCallbackInfo<Value>& args) {
    TimerWrap* wrap = Unwrap<TimerWrap>(args.Holder());

    CHECK(HandleWrap::IsAlive(wrap));

    int64_t timeout = args[0]->IntegerValue();
    int err = uv_timer_start(&wrap->handle_, OnTimeout, timeout, 0);
    args.GetReturnValue().Set(err);
  }

  static void OnTimeout(uv_timer_t* handle) {
    TimerWrap* wrap = static_cast<TimerWrap*>(handle->data);
    Environment* env = wrap->env();
    HandleScope handle_scope(env->isolate());
    Context::Scope context_scope(env->context());
    wrap->MakeCallback(kOnTimeout, 0, nullptr);
  }

我们先回到 createTimersList, 刚才简单介绍的 TimerWrap ,现在,我们就能继续愉快往下看了。

function createTimersList (msecs, unrefed) {
    // 创建一个新的链表并创建一个 TimerWrap 实例来对链表进行调度
  const list = new TimersList(msecs, unrefed);
  L.init(list);
  list._timer._list = list;

  if (unrefed === true) list._timer.unref();

  // 这里设置延时
  list._timer.start(msecs);

    // 这里设置延时的回调函数, 下一步,继续看👉 listOnTimeout
  list._timer[kOnTimeout] = listOnTimeout;

  return list;
}

listOnTimeout

这里的套路到是和 processImmediate 类似

function listOnTimeout() {
  var list = this._list;
  var msecs = list.msecs;

  // 如果 list.nextTick 为 true, 在下一个事件循环调用 listOnTimeoutNT 立即执行
  if (list.nextTick) {
    list.nextTick = false;
    process.nextTick(listOnTimeoutNT, list);
    return;
  }

  debug('timeout callback %d', msecs);

    // 获取当前运行时间
  var now = TimerWrap.now();
  debug('now: %d', now);

  var diff, timer;
  while (timer = L.peek(list)) {
    diff = now - timer._idleStart;
    // 判断这里的循环是否被过早调用
    if (diff < msecs) {
      var timeRemaining = msecs - (TimerWrap.now() - timer._idleStart);
      if (timeRemaining < 0) {
        timeRemaining = 0;
      }
      this.start(timeRemaining);
      debug('%d list wait because diff is %d', msecs, diff);
      return;
    }

     // 开始进入 timeout 逻辑 

     // 从链表中删除当前计时器节点 
    L.remove(timer);
    // 检测是否从链表中移除
    assert(timer !== L.peek(list));
     // 没有回调函数的情况,跳到下一次循环
    if (!timer._onTimeout) continue;

    var domain = timer.domain;
    if (domain) {
        // 如果计数器回调抛出错误, domain 和 uncaughtException 都忽略异常,其他计时器正常执行
      // https://github.com/nodejs/node-v0.x-archive/issues/2631

      if (domain._disposed)
        continue;

      domain.enter();
    }

    tryOnTimeout(timer, list);

    if (domain)
      domain.exit();
  }

  // 计时器已经全部被调用,链表也已经清空,调用 TimerWrap 的 close 进行清理处理
  debug('%d list empty', msecs);
  assert(L.isEmpty(list));
  this.close();

  // Either refedLists[msecs] or unrefedLists[msecs] may have been removed and
  // recreated since the reference to `list` was created. Make sure they're
  // the same instance of the list before destroying.

  // 清理
  if (list._unrefed === true && list === unrefedLists[msecs]) {
    delete unrefedLists[msecs];
  } else if (list === refedLists[msecs]) {
    delete refedLists[msecs];
  }
}

tryOnTimeout

tryOnTimeout 和之前的 tryOnImmediate的处理方式大体还是一样

// 这里和 tryOnImmediate一样 也考虑到 v8 的优化,所以使用 small function 来执行 timer

function tryOnTimeout(timer, list) {
  timer._called = true;
  var threw = true;
  try {
    ontimeout(timer);
    threw = false;
  } finally {
    // 如果没抛出错误,直接结束
    if (!threw) return;
     // 抛出错误未正常执行情况下
     // 为了保证执行顺序,推迟列表中所有事件到下一周期。

    const lists = list._unrefed === true ? unrefedLists : refedLists;

    for (var key in lists) {
      if (key > list.msecs) {
        lists[key].nextTick = true;
      }
    }

    // We need to continue processing after domain error handling
    // is complete, but not by using whatever domain was left over
    // when the timeout threw its exception.

    const domain = process.domain;
    process.domain = null;

    // 如果抛出错误,在 nextTick 中执行接下来的计数器回调
    process.nextTick(listOnTimeoutNT, list);
    process.domain = domain;
  }
}

ontimeout

function ontimeout(timer) {
  var args = timer._timerArgs;
  var callback = timer._onTimeout;
  if (!args)
    callback.call(timer);
  else {
    switch (args.length) {
      case 1:
        callback.call(timer, args[0]);
        break;
      case 2:
        callback.call(timer, args[0], args[1]);
        break;
      case 3:
        callback.call(timer, args[0], args[1], args[2]);
        break;
      default:
        callback.apply(timer, args);
    }
  }
  // 这里就是 setInterval 的实现了,之后再细看
  if (timer._repeat)
    rearm(timer);
}

setInterval

这里的实现和 setTimeout , setImmediate 几乎一样。

exports.setInterval = function(callback, repeat, arg1, arg2, arg3) {
  if (typeof callback !== 'function') {
    throw new TypeError('"callback" argument must be a function');
  }

  var len = arguments.length;
  var args;
  if (len === 3) {
    args = [arg1];
  } else if (len === 4) {
    args = [arg1, arg2];
  } else if (len > 4) {
    args = [arg1, arg2, arg3];
    for (var i = 5; i < len; i++)
      // extend array dynamically, makes .apply run much faster in v6.0.0
      args[i - 2] = arguments[i];
  }

  return createRepeatTimeout(callback, repeat, args);
};

interval === repeat timeout ?

setInterval 的实现和 setTimeout 不同在于 timer._repeat = repeat

function createRepeatTimeout(callback, repeat, args) {
  repeat *= 1; // coalesce to number or NaN
  if (!(repeat >= 1 && repeat <= TIMEOUT_MAX))
    repeat = 1; // schedule on next tick, follows browser behaviour

  var timer = new Timeout(repeat, callback, args);
  timer._repeat = repeat;
  if (process.domain)
    timer.domain = process.domain;

  active(timer);
  return timer;
}

clear

之前看了创建 3 种时间调度的方法,在看看清理的 timer 的代码。

clearImmediate

exports.clearImmediate = function(immediate) {
  if (!immediate) return;

  immediate._onImmediate = null;

  immediateQueue.remove(immediate);

  if (!immediateQueue.head) {
    process._needImmediateCallback = false;
  }
};

clearTimeout

const clearTimeout = exports.clearTimeout = function(timer) {
  if (timer && (timer[kOnTimeout] || timer._onTimeout)) {
    timer[kOnTimeout] = timer._onTimeout = null;
    if (timer instanceof Timeout) {
      timer.close(); // for after === 0
    } else {
      unenroll(timer);
    }
  }
};

Timeout.unref

这里的 timer 提供了 closeunrefref 3 个方法,其中 refunref 通过 TimerWrap 调用底层的 uv_ref()uv_unref()

在 Nodejs 官方文档提到

When called, the active Timeout object will not require the Node.js event loop to remain active. If there is no other activity keeping the event loop running, the process may exit before the Timeout object's callback is invoked.

主动调用 unref(),如果没有其他活跃的对象,可能会使 Nodejs 的事件循环提前退出

Timeout.prototype.unref = function() {
  if (this._handle) {
    this._handle.unref();
  } else if (typeof this._onTimeout === 'function') {
    var now = TimerWrap.now();
    if (!this._idleStart) this._idleStart = now;
    var delay = this._idleStart + this._idleTimeout - now;
    if (delay < 0) delay = 0;

    // 防止在调用 `unref()`之后 再次运行回调
    if (this._called && !this._repeat) {
      unenroll(this);
      return;
    }

    var handle = reuse(this);

    this._handle = handle || new TimerWrap();
    this._handle.owner = this;
    this._handle[kOnTimeout] = unrefdHandle;
    this._handle.start(delay);
    this._handle.domain = this.domain;
    this._handle.unref();
  }
  return this;
};

Timeout.ref

Timeout.prototype.ref = function() {
  if (this._handle)
    this._handle.ref();
  return this;
};

Timeout.close

Timeout.prototype.close = function() {
  this._onTimeout = null;
  if (this._handle) {
    this._idleTimeout = -1;
    this._handle[kOnTimeout] = null;
    this._handle.close();
  } else {
    unenroll(this);
  }
  return this;
};


// 移除计时器,取消延时以及重置有关的计时器属性
const unenroll = exports.unenroll = function(item) {
  var handle = reuse(item);
  if (handle) {
    debug('unenroll: list empty');
    handle.close();
  }
  // 确保之后不会被继续插入队列
  item._idleTimeout = -1;
};


// 为了复用 TimerWrap 的一简单的转换函数
//
// This mostly exists to fix https://github.com/nodejs/node/issues/1264.
// Handles in libuv take at least one `uv_run` to be registered as unreferenced.
// Re-using an existing handle allows us to skip that, so that a second `uv_run`
// will return no active handles, even when running `setTimeout(fn).unref()`.

function reuse(item) {
  L.remove(item);

  var list = refedLists[item._idleTimeout];
  // if empty - reuse the watcher
  if (list && L.isEmpty(list)) {
    debug('reuse hit');
    list._timer.stop();
    delete refedLists[item._idleTimeout];
    return list._timer;
  }

  return null;
}

clearInterval

exports.clearInterval = function(timer) {
  if (timer && timer._repeat) {
    timer._repeat = null;
    clearTimeout(timer);
  }
};

结尾 💊

先上图

   ┌───────────────────────┐
┌─>│        timers         │
│  └──────────┬────────────┘
│  ┌──────────┴────────────┐
│  │     I/O callbacks     │
│  └──────────┬────────────┘
│  ┌──────────┴────────────┐
│  │     idle, prepare     │
│  └──────────┬────────────┘      ┌───────────────┐
│  ┌──────────┴────────────┐      │   incoming:   │
│  │         poll          │<─────┤  connections, │
│  └──────────┬────────────┘      │   data, etc.  │
│  ┌──────────┴────────────┐      └───────────────┘
│  │        check          │
│  └──────────┬────────────┘
│  ┌──────────┴────────────┐
└──┤    close callbacks    │
   └───────────────────────┘

setImmediate 一般在 check 阶段执行,也有可能在 poll 阶段执行
setTimeout setInterval 在 timer 阶段执行

来一个问题:setTimeout(fn, 0) setImmediate(fn) 谁会先执行?

setTimeout(console.log, 0, 1);
setImmediate(console.log, 2);

// event loop 每个阶段都比较空闲的话,一次 event loop 小于 1ms 时: 
2
1

// 超过 1ms 时也可能是

1
2

如果在一个I/O循环内调用,immediate 始终会比 setTimeout 先执行。因为immediate 会在 event loop 中 poll 完成之后立即执行,setTimeout 则是到下一个 timers 阶段。

var fs = require('fs')

fs.readFile(__filename, () => {
    setTimeout(console.log, 0, 1);
    setImmediate(console.log, 2);
})

// 输出: 
2
1

再来一个

我们在 Nodejs 中这样写, 会怎么输出?

var a = setTimeout(console.log, 50, 2333);
a._repeat = true;

这样呢?

var a = setTimeout(console.log, 1000, 2333);
a.close()

这样呢?

var a = setTimeout(console.log, 1000, 2333);
a.unref()

参考资料:

node/lib/timers.js

node/lib/internal/linkedlist.js

node/src/timer_wrap.cc

event-loop-timers-and-nexttick

Optimizing _unrefActive