Golang实现延迟队列(DelayQueue)

7,012 阅读7分钟

背景

延迟队列是一种特殊的队列,元素入队时需要指定到期时间(或延迟时间),从队头出队的元素必须是已经到期的,而且最先到期的元素最先出队,也就是队列里面的元素是按照到期时间排序的,添加元素和从队头出队的时间复杂度是O(log(n))。

由于以上性质,延迟队列一般可以用于以下场景(定时任务、延迟任务):

  • 缓存:用户淘汰过期元素
  • 通知:在指定时间通知用户,比如会议开始前30分钟
  • 订单:30分钟未支付取消订单
  • 超时:服务器自动断开太长时间没有心跳的连接

其实在Golang中是自带定时器的,也就是time.After()time.AfterFunc()等函数,它们的性能也是非常好的,随着Golang版本升级还会优化。但是对于某些场景来说确实不够方便,比如缓存场景我们需要能够支持随机删除定时器,随机重置过期时间,更加灵活的删除一小批过期元素。而且像Kafka的时间轮算法(TimeWheel)里面也用到了延迟队列,因此还是有必要了解下如何实现延迟队列。

原理

延迟队列每次出队的是最小到期时间的元素,而堆就是用来获取最值的数据结构。使用堆我们可以实现O(log(n))时间复杂度添加元素和移除最小到期时间元素。

随机删除

有时候延迟队列还需要具有随机删除元素的能力,可以通过以下方式实现:

  • 元素添加删除标记字段:堆中每个元素都添加一个删除标记字段,并把这个元素的地址返回给用户,用户就可以标记元素的这个字段为true,这样元素到达堆顶时如果判断到这个字段为true就会被清除,而延迟队列里的元素逻辑上是一定会到达堆顶的(因为时间会流逝)。这是一种懒删除的方式。
  • 元素添加堆中下标字段(或用map记录下标):堆中每个元素都添加一个堆中下标字段,并把这个元素的地址返回给用户,这样我们就可以通过这个元素里面记录的下标快速定位元素在堆中的位置,从而删除元素。详细可以看文章如何实现一个支持O(log(n))随机删除元素的堆

重置元素到期时间

如果需要重置延迟队列里面元素的到期时间,则必须知道元素在堆中的下标,因为重置到期时间之后必须对堆进行调整,因此只能是元素添加堆中下标字段

Golang实现

这里我们实现一个最简单的延迟队列,也就是不支持随机删除元素和重置元素的到期时间,因为有些场景只需要添加元素和获取到期元素这两个功能,比如Kafka中的时间轮,而且这种简单实现性能会高一点。

代码地址

数据结构

主要的结构可以看到就是一个heap,entry是每个元素在堆中的表示,value是具体的元素值,expiration是为了堆中元素根据到期时间排序。

mutex是一个互斥锁,主要是保证操作并发安全。

sleeping则是表示Take(),也就是阻塞获取元素操作,是否在等待队列不为空或者有更小到期时间元素加入。这样Push(),也就是添加元素操作,才知道是否去唤醒Take()。(重点)

wakeup是一个通道,通过它实现添加元素的时候唤醒等待的Take()。(重点)

简单来说就是sleeping是表示是否要唤醒Take(),唤醒操作则是通过wakeup通道实现。

type entry[T any] struct {
	value      T
	expiration time.Time // 到期时间
}

// 延迟队列
type DelayQueue[T any] struct {
	h *heap.Heap[*entry[T]]
	// // 保证并发安全
	mutex sync.Mutex
	// 表示Take()是否正在等待队列不为空或更早到期的元素
	// 0表示Take()没在等待,1表示Take()在等待
	sleeping int32
	// 唤醒通道
	wakeup chan struct{}
}

// 创建延迟队列
func New[T any]() *DelayQueue[T] {
	return &DelayQueue[T]{
		h: heap.New(nil, func(e1, e2 *entry[T]) bool {
			return e1.expiration.Before(e2.expiration)
		}),
		wakeup: make(chan struct{}),
	}
}

实现原理

Take()的时候如果队列已经没有元素,或者没有元素到期,那么协程就需要挂起等待。而被唤醒的条件是元素到期队列不为空或者有更小到期时间元素加入

其中元素到期协程在Take()时发现堆顶元素还没到期,因此这个条件可以自己构造并等待。但是条件队列不为空有更小到期时间元素加入则需要另外一个协程在Push()时才能满足,因此必须通过一个中间结构来进行协程间通信,一般Golang里面会使用Channel来实现。而Take()是否在等待则是通过sleeping来表示。

添加元素

一开始加了一个互斥锁,避免并发冲突,然后把元素加到堆里。

因为我们Take()操作在不满足条件时会去设置sleeping为1表示正在等待Push()来唤醒它,并在wakeup通道阻塞读取。因为sleeping可能被Take()和Push()同时操作,因此使用CAS()来进行设置,也就是如果sleeping本来是1,我们就设置为0,然后往wakeup写入一个元素,表示如果有Take()在等待,则唤醒它。

// 添加延迟元素到队列
func (q *DelayQueue[T]) Push(value T, delay time.Duration) {
	q.mutex.Lock()
	defer q.mutex.Unlock()
	entry := &entry[T]{
		value:      value,
		expiration: time.Now().Add(delay),
	}
	q.h.Push(entry)
	// 唤醒等待的Take()
	// 这里表示新添加的元素到期时间是最早的,或者原来队列为空
	// 因此必须唤醒等待的Take(),因为可以拿到更早到期的元素
	if q.h.Peek() == entry {
		// 把sleeping从1修改成0,也就是唤醒等待的Take()
		if atomic.CompareAndSwapInt32(&q.sleeping, 1, 0) {
			q.wakeup <- struct{}{}
		}
	}
}

阻塞获取元素

这里先判断堆是否有元素,如果有获取堆顶元素,然后判断是否已经到期,如果到期则直接出堆并返回。

否则等待直到超时或者元素到期或者有新的元素到达。

在这里需要设置sleeping为1表示Take()正在等待。

// 等待直到有元素到期
// 或者ctx被关闭
func (q *DelayQueue[T]) Take(ctx context.Context) (T, bool) {
	for {
		var timer *time.Timer
		q.mutex.Lock()
		// 有元素
		if !q.h.Empty() {
			// 获取元素
			entry := q.h.Peek()
			now := time.Now()
			if now.After(entry.expiration) {
				q.h.Pop()
				q.mutex.Unlock()
				return entry.value, true
			}
			// 到期时间,使用time.NewTimer()才能够调用Stop(),从而释放定时器
			timer = time.NewTimer(entry.expiration.Sub(now))
		}
		// 走到这里表示需要等待了,设置为1告诉Push()在有新元素时要通知
		atomic.StoreInt32(&q.sleeping, 1)
		q.mutex.Unlock()

		// 不为空,需要同时等待元素到期,并且除非timer到期,否则都需要关闭timer避免泄露
		if timer != nil {
			select {
			case <-q.wakeup: // 新的更快到期元素
				timer.Stop()
			case <-timer.C: // 首元素到期
				// 设置为0,如果原来也为0表示有Push()正在q.wakeup被阻塞
				if atomic.SwapInt32(&q.sleeping, 0) == 0 {
					// 避免Push()的协程被阻塞
					<-q.wakeup
				}
			case <-ctx.Done(): // 被关闭
				timer.Stop()
				var t T
				return t, false
			}
		} else {
			select {
			case <-q.wakeup: // 新的更快到期元素
			case <-ctx.Done(): // 被关闭
				var t T
				return t, false
			}
		}
	}
}

Channel方式阻塞读取

Golang里面可以使用Channel进行流式消费,因此简单包装一个Channel形式的阻塞读取接口,给通道一点缓冲区大小可以带来更好的性能。

// 返回一个通道,输出到期元素
// size是通道缓存大小
func (q *DelayQueue[T]) Channel(ctx context.Context, size int) <-chan T {
	out := make(chan T, size)
	go func() {
		for {
			entry, ok := q.Take(ctx)
			if !ok {
				close(out)
				return
			}
			out <- entry
		}
	}()
	return out
}

使用方式

for entry := range q.Channel(context.Background(), 10) {
    // do something
}

性能测试

这里进行一个简单的性能测试,也就是先添加元素,然后等待到期后全部拿出来。

func BenchmarkPushAndTake(b *testing.B) {
	q := New[int]()
	b.ResetTimer()
        
        // 添加元素
	for i := 0; i < b.N; i++ {
		q.Push(i, time.Duration(i))
	}
        
        // 等待全部元素到期
	b.StopTimer()
	time.Sleep(time.Duration(b.N))
	b.StartTimer()

        // 获取元素
	for i := 0; i < b.N; i++ {
		_, ok := q.Take(context.Background())
		if !ok {
			b.Errorf("want %v, but %v", true, ok)
		}
	}
}

测试结果:

BenchmarkPushAndTake-8           2361145               454.7 ns/op            75 B/op          1 allocs/op

总结

堆实现的延迟队列是一种实现起来比较简单的定时器(当然阻塞读取Take()是比较复杂的),由于时间复杂度是O(log(n)),因此可以满足定时任务数量不是特别多的场景。堆实现的延迟队列也是可以随机删除元素的,可以根据具体任务选择是否实现。如果对定时器性能要求比较敏感的话可以选择使用时间轮实现定时器,它可以在O(1)的时间复杂度添加和删除一个定时器,具体可以阅读实现代码