阅读 52

node - timer学习

之前大概看了libuv的源码,看到eventloop里每个phase,一旦进入之后,在这个phase的回调队列的所有回调被执行完之前,是不会返回的。这就和node的表现有出入了,node 11之后,每执行一个回调,都会看一看有没有待执行 nextTick 和 microtask回调。 感觉有点诡异,所以就看了一下node这边timer的源码。

流程

先看看暴露给咱用的setTimeout函数:

function setTimeout(callback, after, arg1, arg2, arg3) {
  if (typeof callback !== 'function') {
    throw new ERR_INVALID_CALLBACK(callback);
  }

  var i, args;
  switch (arguments.length) {
    // fast cases
    case 1:
    case 2:
      break;
    case 3:
      args = [arg1];
      break;
    case 4:
      args = [arg1, arg2];
      break;
    default:
      args = [arg1, arg2, arg3];
      for (i = 5; i < arguments.length; i++) {
        // Extend array dynamically, makes .apply run much faster in v6.0.0
        args[i - 2] = arguments[i];
      }
      break;
  }

  // 初始化一个Timeout对象(双向链表节点)
  const timeout = new Timeout(callback, after, args, false);
  active(timeout); // 插入到对应的链表里

  return timeout;
}
复制代码

需要注意的是这里生成了一个Timeout对象,那Timeout长啥样?

function Timeout(callback, after, args, isRepeat) {
  after *= 1; // Coalesce to number or NaN
  if (!(after >= 1 && after <= TIMEOUT_MAX)) {
    if (after > TIMEOUT_MAX) {
      process.emitWarning(`${after} does not fit into` +
                          ' a 32-bit signed integer.' +
                          '\nTimeout duration was set to 1.',
                          'TimeoutOverflowWarning');
    }
    after = 1; // Schedule on next tick, follows browser behavior
  }

  this._idleTimeout = after;  // 延迟时间
  this._idlePrev = this;   // 前一个Timeout指针
  this._idleNext = this;   // 后一个Timeout指针
  this._idleStart = null;
  // This must be set to null first to avoid function tracking
  // on the hidden class, revisit in V8 versions after 6.2
  this._onTimeout = null;
  this._onTimeout = callback;  // 传进来的回调函数
  this._timerArgs = args;
  this._repeat = isRepeat ? after : null;  // 是否需要重复
  this._destroyed = false;

  this[kRefed] = null;

  initAsyncResource(this, 'Timeout');
}
复制代码

可以看到,Timeout对象在初始化的时候干了3件事:

  1. 看一看传进来的延时的参数是不是合规(1 - 2*31 - 1),不合规的话就设成1。 注意,延迟0ms 也是不合法的,也就是说平时咱写的 setTimout(cb, 0)其实就等于写了 setTimout(cb, 1)
  2. 初始化一堆自己的属性,需要注意的是这里多了一个_idlePrev 和 _idleNext参数,看起来像是个双向链表? 事实就是的, 为了优化性能, node会根据延时参数创建多个链表,每个Timeout对象都是链表的节点,后面还会总结一下。
  3. 初始化async资源,这个就是为了实现async_hooks的功能了,和timer没啥关系。

回到setTimeout,在创建好Timeout对象后,就调用了active函数把这个timeout放到了列表里:

function active(item) {
  insert(item, true, getLibuvNow());
}
复制代码

这里介绍一下node对Timeout的处理,其实 internal/timers.js开头的注释也说的很清楚了:

// Object maps are kept which contain linked lists keyed by their duration in
// milliseconds.
//
/* eslint-disable node-core/non-ascii-character */
//
// ╔════ > Object Map
// ║
// ╠══
// ║ lists: { '40': { }, '320': { etc } } (keys of millisecond duration)
// ╚══          ┌────┘
//              │
// ╔══          │
// ║ TimersList { _idleNext: { }, _idlePrev: (self) }
// ║         ┌────────────────┘
// ║    ╔══  │                              ^
// ║    ║    { _idleNext: { },  _idlePrev: { }, _onTimeout: (callback) }
// ║    ║      ┌───────────┘
// ║    ║      │                                  ^
// ║    ║      { _idleNext: { etc },  _idlePrev: { }, _onTimeout: (callback) }
// ╠══  ╠══
// ║    ║
// ║    ╚════ >  Actual JavaScript timeouts
// ║
// ╚════ > Linked List
//
/* eslint-enable node-core/non-ascii-character */
//
// With this, virtually constant-time insertion (append), removal, and timeout
// is possible in the JavaScript layer. Any one list of timers is able to be
// sorted by just appending to it because all timers within share the same
// duration. Therefore, any timer added later will always have been scheduled to
// timeout later, thus only needing to be appended.
// Removal from an object-property linked list is also virtually constant-time
// as can be seen in the lib/internal/linkedlist.js implementation.
// Timeouts only need to process any timers currently due to expire, which will
// always be at the beginning of the list for reasons stated above. Any timers
// after the first one encountered that does not yet need to timeout will also
// always be due to timeout at a later time.
复制代码

大概意思就是 根据延时时间的不同,比如现在有 m个30ms, n个50ms的setTimeout调用,node就会生成m个Timeout对象组成一个链表,放到一个对象里,key为30ms,value是这个链表。 同理也会生成n个Timeout组成50ms的链表。 这么干好处是啥呢, 注释里也说了,这么搞的话, Timer的 插入、删除等操作的时间复杂度都差不多是常量。 想想也是,大家都是30ms的回调,新来的timer肯定比之前就来的更晚过期么,所以直接塞到队尾就好了。

结构说清楚了再来看看insert函数,

function insert(item, refed, start) {
  let msecs = item._idleTimeout;
  if (msecs < 0 || msecs === undefined)
    return;

  // Truncate so that accuracy of sub-milisecond timers is not assumed.
  msecs = Math.trunc(msecs);

  item._idleStart = start;   // 当前事件循环开始的时间,在libuv每个时间循环开始都会更新一次

  // 看一下有没有对应的延时链表,没有的话,就再创建一个
  var list = timerListMap[msecs];
  if (list === undefined) {
    debug('no %d list was found in insert, creating a new one', msecs);
    const expiry = start + msecs;
    timerListMap[msecs] = list = new TimersList(expiry, msecs);
    timerListQueue.insert(list);

    // 如果过期时间比之前最近的过期时间还早,那就也schedule一下
    if (nextExpiry > expiry) {
      scheduleTimer(msecs);
      nextExpiry = expiry;
    }
  }

  ......

  // 把Timeout放到链表最后面
  L.append(list, item);
}
复制代码

首先会看一下是不是已经有对应延时的链表了,如果没有,就新建一个。建好之后,直接扔到链表的最后。

这里需要注意的是 scheduleTimer, 如果新的Timeout过期时间最近,那就要schedule这个Timeout。

再看看scheduleTimer函数,

void Environment::ScheduleTimer(int64_t duration_ms) {
  if (started_cleanup_) return;
  uv_timer_start(timer_handle(), RunTimers, duration_ms, 0);
}
复制代码

这里的uv_timer_start函数其实就是往libuv的timer phase注册了一个回调,这里咱重点关注一下传入的回调函数RunTimers,

void Environment::RunTimers(uv_timer_t* handle) {
  .... setup ...

  Local<Function> cb = env->timers_callback_function();
  MaybeLocal<Value> ret;
  Local<Value> arg = env->GetNow();
  /* This code will loop until all currently due timers will    * process. It is impossible for us to end up in an          * infinite loop due to how the JS-side
   * /
  // is structured.
  do {
    TryCatchScope try_catch(env);
    try_catch.SetVerbose(true);
    ret = cb->Call(env->context(), process, 1, &arg);
  } while (ret.IsEmpty() && env->can_call_into_js());

  ......
}
复制代码

这里只需要关注到调用了env->timers_callback_function()这个函数,这个函数其实是通过binding通过processTimers做的封装,看一看processTimers

function processTimers(now) {
    debug('process timer lists %d', now);
    nextExpiry = Infinity;

    let list;
    let ranAtLeastOneList = false;
    while (list = timerListQueue.peek()) {
      if (list.expiry > now) {
        nextExpiry = list.expiry;
        return refCount > 0 ? nextExpiry : -nextExpiry;
      }
      if (ranAtLeastOneList)
        runNextTicks(); // 执行nextTick回调
      else
        ranAtLeastOneList = true;
      listOnTimeout(list, now);  // 执行一个list里所有到期的回调
    }
    return 0;
  }
复制代码

其实就是每次拿出过期时间最近的Timeout,看看时候到时了,到时了的话,就对这个Timeout调用listOnTimeout函数。

function listOnTimeout(list, now) {
    const msecs = list.msecs;

    debug('timeout callback %d', msecs);

    var diff, timer;
    let ranAtLeastOneTimer = false;
    while (timer = L.peek(list)) {
      diff = now - timer._idleStart;

      // Check if this loop iteration is too early for the next timer.
      // This happens if there are more timers scheduled for later in the list.
      if (diff < msecs) {
        list.expiry = Math.max(timer._idleStart + msecs, now + 1);
        list.id = timerListId++;
        timerListQueue.percolateDown(1); // 调整timerListQueue顺序,把过期时间最近的放到前面去
        debug('%d list wait because diff is %d', msecs, diff);
        return;
      }

      if (ranAtLeastOneTimer)
        runNextTicks();
      else
        ranAtLeastOneTimer = true;

      // 把timer从列表中删掉
      L.remove(timer);

      ......

      let start;
      if (timer._repeat)
        start = getLibuvNow();

      try {
        const args = timer._timerArgs;
        // 执行js传入的回调
        if (args === undefined)
          timer._onTimeout();  
        else
          Reflect.apply(timer._onTimeout, timer, args);
      } finally {
        // 如果是需要重复的timer,重新insert进链表里
        if (timer._repeat && timer._idleTimeout !== -1) {
          timer._idleTimeout = timer._repeat;
          if (start === undefined)
            start = getLibuvNow();
          insert(timer, timer[kRefed], start);
        } else if (!timer._idleNext && !timer._idlePrev) {
          if (timer[kRefed])
            refCount--;
          timer[kRefed] = null;

          if (destroyHooksExist() && !timer._destroyed) {
            emitDestroy(timer[async_id_symbol]);
            timer._destroyed = true;
          }
        }
      }

      emitAfter(asyncId);
    }
    ....
  }
复制代码

可以看到,和之前的逻辑差不多,就在链表里一直拿出Timeout,直到拿出的Timeout还没到过期时间,这个时候就把当前的list,在timerListQueue往后移动。 比如timerListQueue里面本来按次序放着30ms, 50ms,70ms 3个list, 这里就会先看30ms的list,有哪些Timeout过期了的就执行掉,然后会再看50ms和70ms的,这里看完之后,这个时间循环阶段所有的timer也就都看完了。

需要特别注意的是,在看一看processTimers和listOnTimeout里,如果不是第一次执行,都会先调用runNextTicks这个函数,看名字就知道是干啥的了,执行nextTick回调

这个函数其实是封装了processTicksAndRejections,

function processTicksAndRejections() {
  let tock;
  do {
    while (tock = queue.shift()) {
      const asyncId = tock[async_id_symbol];
      emitBefore(asyncId, tock[trigger_async_id_symbol]);
      
      if (destroyHooksExist())
        emitDestroy(asyncId);

      const callback = tock.callback;
      if (tock.args === undefined)
        callback();
      else
        Reflect.apply(callback, undefined, tock.args);

      emitAfter(asyncId);
    }
    setHasTickScheduled(false);
    runMicrotasks();
  } while (!queue.isEmpty() || processPromiseRejections());
  setHasRejectionToWarn(false);
}
复制代码

可以看到,每次执行完nextTickQueue里的回调后,会调用runMicroTasks这个函数,这函数就是去执行v8的微任务队列。

看到这里,一开始的疑惑基本就解开了。node是把咱传给setTimeout的回调给封装了一下(processTimers), 封装里面,调用实际传入的回调前,都会先去检查一下nextTickQueue和microTaskQueue, 如果里面有回调的话,得先执行完。

参考

  1. New Changes to the Timers and Microtasks in Node v11.0.0 ( and above)
  2. node source code
  3. libuv source code