高效定时器的实现

2,637 阅读4分钟
原文链接: jiachuhuang.github.io

场景

linux crontab在实现定时任务中起到重要的作用,那如果我们去实现定时一个contab又如何实现呢?怎么样才能高效呢?本文会介绍Nginx内部的定时器的实现和golang的Tick定时器的实现。

常规思路

在一个死循环里每秒(精度为秒)去轮询查看有没有到期的任务需要执行。

问题一

有必要每秒轮询一次吗?下一秒、下下一秒、下下下一秒都不一定会有到期执行的任务,那样不就浪费了CPU了,为何不一直休眠到最近一个到期的任务的时间点,再去遍历呢?

问题二

用什么样的数据结构去存储所有任务,才能起到高效的轮询呢?用数组?每次都遍历一遍?很明显,不高效。在Nginx里,使用了红黑树,golang中使用了小堆。

golang tick实现

type timer struct {
    i int // heap index
    when   int64
    period int64
    f      func(interface{}, uintptr)
    arg    interface{}
    seq    uintptr
}

golang用一个timer结构体来表示一个定时任务,其中when表示到期的绝对时间戳,也是用来进行排序构建小堆的关键字段。这样子,最先到期的任务,就会是小堆堆头的那个节点了。period字段用来表示是否循环执行这个定时任务,如果是循环任务,执行完后会重新修改when为下一次执行的时间点,调整小堆。否则,把定时任务从小堆中删除。

func addtimerLocked(t *timer) {
    if t.when < 0 {
        t.when = 1<<63 - 1
    }
    t.i = len(timers.t)
    timers.t = append(timers.t, t) // 插入到小堆尾部
    siftupTimer(t.i)  // 堆调整
    if t.i == 0 {
        // 如果新加入的timer比之前最早到期的timer还早
        // 更新后台运行的检查到期timer 的 goroutine 的睡眠时间
        if timers.sleeping {
            timers.sleeping = false
            notewakeup(&timers.waitnote)
        }
        if timers.rescheduling {
            timers.rescheduling = false
            goready(timers.gp, 0)
        }
    }
    if !timers.created {
        // 如果是第一次创建的定时器维护
        // 启动一条后台goroutine来维护定时器
        timers.created = true
        go timerproc()
    }
}
func timerproc() {
    timers.gp = getg()
    for {
        lock(&timers.lock)
        timers.sleeping = false
        now := nanotime()
        delta := int64(-1)
        for {
            if len(timers.t) == 0 {
                delta = -1
                break
            }
            t := timers.t[0] // 获取小堆第一个定时任务,即最早到期的任务
            delta = t.when - now
            if delta > 0 {
                break
            }
            if t.period > 0 {
                // 如果是循环执行的定时任务,重新计算下次执行时间,重新调整堆内位置
                t.when += t.period * (1 + -delta/t.period)
                siftdownTimer(0)
            } else {
                // 如果是只执行一次,直接把它从堆内删除
                last := len(timers.t) - 1
                if last > 0 {
                    timers.t[0] = timers.t[last]
                    timers.t[0].i = 0
                }
                timers.t[last] = nil
                timers.t = timers.t[:last]
                if last > 0 {
                    siftdownTimer(0)
                }
                t.i = -1 // mark as removed
            }
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&timers.lock)
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            // 找到到期的任务,并执行
            f(arg, seq)
            lock(&timers.lock)
        }
        if delta < 0 || faketime > 0 {
            // No timers left - put goroutine to sleep.
            timers.rescheduling = true
            goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
            continue
        }
        // 休眠到最早到期的定时任务
        timers.sleeping = true
        noteclear(&timers.waitnote)
        unlock(&timers.lock)
        notetsleepg(&timers.waitnote, delta)
    }
}

Nginx定时器

Nginx运用定时器的地方很多,例如读取http头部超时。Nginx使用红黑树维护所有定时任务事件,进程在每次事件轮询返回后,都会检查一遍红黑树,处理过期的定时事件,设置ngx_event_t结构体里的timedout字段为1。

// ngx_event_timer.h
ngx_int_t ngx_event_timer_init(ngx_log_t *log); // 初始化定时任务管理器
ngx_msec_t ngx_event_find_timer(void);          // 查找定时恩物
void ngx_event_expire_timers(void);             // 处理过期的定时任务
ngx_int_t ngx_event_no_timers_left(void);
extern ngx_rbtree_t  ngx_event_timer_rbtree;    // 全局变量,维护保存所有定时任务时间
  • Nginx woker进程检查处理过期的定时事件

    // worker的事件和定时任务处理函数
    void
    ngx_process_events_and_timers(ngx_cycle_t *cycle)
    {
        ngx_uint_t  flags;
        ngx_msec_t  timer, delta;
        // ...
        delta = ngx_current_msec;
        (void) ngx_process_events(cycle, timer, flags);
        delta = ngx_current_msec - delta;
        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "timer delta: %M", delta);
        // 事件处理
        ngx_event_process_posted(cycle, &ngx_posted_accept_events);
        if (ngx_accept_mutex_held) {
            ngx_shmtx_unlock(&ngx_accept_mutex);
        }
        if (delta) {
            // 处理过期的定时任务
            ngx_event_expire_timers();
        }
        // ...
    }
  • ngx_event_expire_timers过期定时任务处理

    void
    ngx_event_expire_timers(void)
    {
        ngx_event_t        *ev;
        ngx_rbtree_node_t  *node, *root, *sentinel;
        sentinel = ngx_event_timer_rbtree.sentinel;
        // 循环查找处理当前所有过期的时间
        for ( ;; ) {
            root = ngx_event_timer_rbtree.root;
            if (root == sentinel) {
                return;
            }
            node = ngx_rbtree_min(root, sentinel);  // 在红黑树种查找当前时间最小的节点
            if ((ngx_msec_int_t) (node->key - ngx_current_msec) > 0) {
                // 如果时间最小的节点都还没过期,直接返回
                return;
            }
            ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer));
            ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                           "event timer del: %d: %M",
                           ngx_event_ident(ev->data), ev->timer.key);
            ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer); // 把时间从红黑树种删除
            ev->timer_set = 0;
            ev->timedout = 1;  // 把时间设为过期
            ev->handler(ev);   // 回调事件处理函数
        }
    }
  • 回调事件处理函数,拿http头部处理函数(ngx_http_process_request_headers)来举例吧

    static void
    ngx_http_process_request_headers(ngx_event_t *rev)
    {
        // ...
        
        c = rev->data;
        r = c->data;
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
                       "http process request header line");
        // 如果过期了,在上一步ngx_event_expire_timers中,timedout字段会被设为1,表示头部处理超时了,就给客户端错误提示
        if (rev->timedout) {
            ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
            c->timedout = 1;
            // 超时,关闭链接,发送request timeout错误
            ngx_http_close_request(r, NGX_HTTP_REQUEST_TIME_OUT);
            return;
        }
        // ...
    }

总结

实现一个定时任务调度器,需要一个进程(线程或者协程) + 一个高效数据结构(满足高效查询、频繁插入删除),例如Nginx使用的红黑树、golang使用的小堆,或者skip list(跳跃表),跳跃表是有序链表,同时插入、删除、查找性能效率也不俗,实现起来还容易,可以考虑下。