探究 Go 的高级特性之 【redis分布式限流器】

1,233 阅读11分钟

限 流

限流的目的是通过对并发访问请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理。通过对并发(或者一定时间窗口内)请求进行限速来保护系统,一旦达到限制速率则拒绝服务(定向到错误页或告知资源没有了)、排队等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据)。

一般来说,限流的常用处理手段有:

  • 计数器
  • 滑动窗口
  • 漏桶
  • 令牌桶

固定窗口限流

固定窗口法是限流算法里面最简单的,比如我想限制1分钟以内请求为100个,从现在算起的一分钟内,请求就最多就是100个,这分钟过完的那一刻把计数器归零,重新计算,周而复始。

image.png

def can_pass_fixed_window(user, action, time_zone=60, times=30):
    """
    :param user: 用户唯一标识
    :param action: 用户访问的接口标识(即用户在客户端进行的动作)
    :param time_zone: 接口限制的时间段
    :param time_zone: 限制的时间段内允许多少请求通过
    """
    key = '{}:{}'.format(user, action)
    # redis_conn 表示redis连接对象
    count = redis_conn.get(key)
    if not count:
        count = 1
        redis_conn.setex(key, time_zone, count)
    if count < times:
        redis_conn.incr(key)
        return True

    return False

具体功能如下:

  • 根据用户标识和接口标识生成一个用于在 Redis 中存储计数器的键。
  • 通过 Redis 连接对象查询键对应的计数器值。
  • 如果计数器值不存在,说明是该用户的第一次请求,将计数器值设置为 1,并设置计数器的过期时间为时间窗口长度。
  • 如果计数器值小于指定的请求次数上限,将计数器的值加 1,并返回 True 表示请求允许通过。
  • 如果计数器值达到或超过请求次数上限,返回 False 表示请求被限制。
package redis

import (
 "context"
 "fmt"
 "strconv"
 "time"

 "github.com/go-redis/redis/v8"
)

type FixedWindowRateLimiter struct {
 RedisClient *redis.Client
 WindowSize  int
 MaxRequests int
 WindowKey   string
 CountKey    string
 ctx         context.Context
}

func NewFixedWindowRateLimiter(redisClient *redis.Client, windowSize, maxRequests int, windowKey, countKey string) *FixedWindowRateLimiter {
 return &FixedWindowRateLimiter{
  RedisClient: redisClient,
  WindowSize:  windowSize,
  MaxRequests: maxRequests,
  WindowKey:   windowKey,
  CountKey:    countKey,
  ctx:         context.Background(),
 }
}

func (limiter *FixedWindowRateLimiter) Allow() bool {
 // 获取当前时间戳
 currentTime := time.Now().Unix()

 // 判断当前时间窗口是否重置
 resetTimeStr, err := limiter.RedisClient.Get(limiter.ctx, limiter.WindowKey).Result()
 if err != nil && err != redis.Nil {
  panic(err)
 }
 if resetTimeStr == "" {
  // 如果重置时间不存在,说明窗口尚未初始化,设置当前时间为重置时间,并设置计数器初始值为1
  resetTime := fmt.Sprintf("%v", currentTime+int64(limiter.WindowSize))
  limiter.RedisClient.Set(limiter.ctx, limiter.WindowKey, resetTime, 0)
  limiter.RedisClient.Set(limiter.ctx, limiter.CountKey, 1, 0)
 } else {
  resetTime, _ := strconv.ParseInt(resetTimeStr, 10, 64)
  if currentTime > resetTime {
   // 如果当前时间超过了重置时间,说明窗口需要重置,设置新的重置时间并将计数器重置为1
   resetTime := fmt.Sprintf("%v", currentTime+int64(limiter.WindowSize))
   limiter.RedisClient.Set(limiter.ctx, limiter.WindowKey, resetTime, 0)
   limiter.RedisClient.Set(limiter.ctx, limiter.CountKey, 1, 0)
  } else {
   // 如果当前时间未超过重置时间,说明窗口仍在有效期内,增加计数器的值
   limiter.RedisClient.Incr(limiter.ctx, limiter.CountKey)
  }
 }

 // 获取当前计数器的值
 totalRequests, err := limiter.RedisClient.Get(limiter.ctx, limiter.CountKey).Int64()
 if err != nil {
  panic(err)
 }

 // 检查请求数量是否超过限制
 if totalRequests > int64(limiter.MaxRequests) {
  return false
 }
 return true
}

func main() {
 // 创建Redis客户端
 redisClient := redis.NewClient(&redis.Options{
  Addr:     "localhost:6379",
  Password: "", // 如果设置了密码
  DB:       0,  // 使用默认的数据库
 })

 // 创建固定窗口限流器实例
 limiter := NewFixedWindowRateLimiter(redisClient, 10, 100, "fixedwindow", "fixedwindow:count")

 // 测试请求是否通过限流
 if limiter.Allow() {
  fmt.Println("请求通过")
 } else {
  fmt.Println("请求超出限制")
 }
}

当一个时间窗口结束时,下一个时间窗口立即开始,这就意味着窗口切换是瞬间完成的。在窗口切换的瞬间,如果有大量请求同时到达,就会出现流量的剧烈波动。例如,在一个窗口结束时,有很多请求被允许通过,而下一个窗口开始时,大量请求被阻塞。这种波动可能会对系统造成压力,导致延迟增加,甚至出现系统故障。

为了解决固定窗口算法的临界问题,一种改进的算法是滑动窗口算法。滑动窗口算法从固定窗口算法引申而来,它解决了窗口切换瞬间流量剧烈波动的问题。滑动窗口算法中,窗口不是瞬间切换的,而是通过一个滑动窗口的概念,使窗口平滑地移动。这样可以更均匀地限制流量,减少突发请求对系统的冲击

滑动窗口

滑动窗口法,简单来说就像是一扇不断移动的窗户,随着时间的推移,窗户会不断前进。窗户内有一个计数器,它持续记录窗户内发生的请求数量,这样就可以确保在任何时间段内请求数量不会超过最大允许的值。例如,假设当前的时间窗口是从 0 秒开始到 60 秒结束,窗户内的请求数是 40。当时间过去了 10 秒后,窗户的位置就向前移动了,变成了从 10 秒开始到 70 秒结束,而窗户内的请求数变成了 60。

为了实现滑动窗口的功能和计数器,我们可以利用 Redis 的有序集合(sorted set)来完成。这样,我们可以使用以毫秒级时间戳表示的 score 值来表示每个请求的发生时间。通过计算当前时间戳减去时间窗口大小,我们可以确定窗户的边界,并通过对 score 值进行范围筛选,就可以筛选出在窗户内的请求记录。最后,我们只需要统计窗户内的请求数量,并进行判断,就可以进行请求限制的逻辑。

package redis

import (
   "fmt"
   "github.com/go-redis/redis/v8"
   "strconv"
   "time"
)

type RateLimiter struct {
   client   *redis.Client
   rate     int           // 固定的窗口大小
   interval time.Duration // 时间窗口大小
   key      string        // Redis键名
}

func NewRateLimiter(rate int, interval time.Duration, key string) *RateLimiter {
   client := redis.NewClient(&redis.Options{
      Addr: "localhost:6379", // Redis服务器地址和端口号
   })

   return &RateLimiter{
      client:   client,
      rate:     rate,
      interval: interval,
      key:      key,
   }
}

func (limiter *RateLimiter) Allow() (bool, error) {
   now := time.Now()

   pipe := limiter.client.Pipeline()

   pipe.ZRemRangeByScore(limiter.client.Context(), limiter.key, "-inf", strconv.FormatInt(now.Add(-limiter.interval).Unix(), 10))
   pipe.ZCard(limiter.client.Context(), limiter.key)
   _, err := pipe.ZAdd(limiter.client.Context(), limiter.key, &redis.Z{
      Score:  float64(now.Unix()),
      Member: fmt.Sprintf("%d", now.UnixNano()),
   }).Result()
   if err != nil {
      return false, err
   }

   cmds, err := pipe.Exec(limiter.client.Context())
   if err != nil {
      return false, err
   }

   count, _ := cmds[1].(*redis.IntCmd).Result()
   return int(count) <= limiter.rate, nil
}
func TestSoildRateLimiter(t *testing.T) {
   limiter := NewRateLimiter(5, time.Second*60, "my-rate-limiter")

   for i := 0; i < 10; i++ {
      allowed, err := limiter.Allow()
      if err != nil {
         fmt.Println("Error:", err)
         continue
      }

      if allowed {
         fmt.Println("Request passed.")
      } else {
         fmt.Println("Request blocked.")
      }

      time.Sleep(200 * time.Millisecond)
   }
}

image.png

ZRemRangeByScore:使用 ZRemRangeByScore 命令从 Redis 有序集合中移除指定时间范围之前的所有成员。在这里,我们使用 -inf 作为最小分数,即移除所有时间窗口之前的记录。

ZCard:使用 ZCard 命令获取 Redis 有序集合中的成员数量,即当前时间窗口内的请求数量。

ZAdd:使用 ZAdd 命令向 Redis 有序集合中添加新的成员,即当前请求的时间戳。时间戳的分数(score)设置为当前时间的 Unix 时间戳,成员(member)设置为当前时间的纳秒级表示。

pipe.Exec():执行 Redis 管道中的所有命令,包括 ZRemRangeByScoreZCard 和 ZAddExec() 方法会返回执行结果的切片。

检查计数器是否超过限流阈值:通过获取 ZCard 命令返回的计数器值,将该值与限流器设置的固定窗口大小进行比较。如果计数器值大于固定窗口大小,则代表超过了限流阈值,请求将被阻塞;否则,请求将被允许通过。

漏桶限流

image.png 一个固定容量的桶,有水流进来,也有水流出去。对于流进来的水来说,我们无法预计一共有多少水会流进来,也无法预计水流的速度。但是对于流出去的水来说,这个桶可以固定水流出的速率(处理速度),从而达到 流量整形 和 流量控制 的效果。

package redis

import (
	"context"
	"errors"
	"fmt"
	"strconv"
	"time"

	"github.com/go-redis/redis/v8"
)

var (
	ErrAcquireFailed = errors.New("acquire failed")
)

type LeakyBucketLimiter struct {
	peakLevel       int
	currentVelocity int
	key             string
	client          *redis.Client
}

func NewLeakyBucketLimiter(client *redis.Client, key string, peakLevel, currentVelocity int) *LeakyBucketLimiter {
	return &LeakyBucketLimiter{
		peakLevel:       peakLevel,
		currentVelocity: currentVelocity,
		key:             key,
		client:          client,
	}
}

func (l *LeakyBucketLimiter) acquireToken(ctx context.Context) (bool, error) {
	pipe := l.client.TxPipeline()
	pipe.HGet(ctx, l.key, "lastTime")
	pipe.HGet(ctx, l.key, "currentLevel")
	_, err := pipe.Exec(ctx)
	if err != nil {
		return false, err
	}

	result, err := pipe.Exec(ctx)
	if err != nil {
		return false, err
	}

	lastTimeStr, _ := result[0].(*redis.StringCmd).Result()
	currentLevelStr, _ := result[1].(*redis.StringCmd).Result()

	lastTime, _ := strconv.ParseInt(lastTimeStr, 10, 64)
	currentLevel, _ := strconv.ParseInt(currentLevelStr, 10, 64)

	now := time.Now().Unix()
	interval := now - lastTime
	newLevel := currentLevel - (interval * int64(l.currentVelocity))

	if newLevel < 0 {
		newLevel = 0
	}

	if newLevel >= int64(l.peakLevel) {
		return false, nil
	}

	newLevel++
	pipe.HSet(ctx, l.key, "currentLevel", newLevel)
	pipe.HSet(ctx, l.key, "lastTime", now)
	pipe.Expire(ctx, l.key, time.Duration(l.peakLevel/l.currentVelocity)*time.Second)
	_, err = pipe.Exec(ctx)
	if err != nil {
		return false, err
	}

	return true, nil
}

func (l *LeakyBucketLimiter) TryAcquire(ctx context.Context) error {
	success, err := l.acquireToken(ctx)
	if err != nil {
		return err
	}

	if !success {
		return ErrAcquireFailed
	}

	return nil
}

func main() {
	// 创建 Redis 客户端连接
	client := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379", // Redis 服务器地址
		Password: "",               // Redis 服务器密码
		DB:       0,                // Redis 数据库
	})

	// 创建漏桶限流器
	limiter := NewLeakyBucketLimiter(client, "my_bucket", 100, 10) // 最高水位为 100,水流速度为 10

	// 尝试获取令牌
	err := limiter.TryAcquire(context.Background())
	if err != nil {
		if err == ErrAcquireFailed {
			// 请求失败,超出限流水位
			fmt.Println("请求失败,超出限流水位")
		} else {
			// 其他错误处理
			fmt.Println(err.Error())
		}
	} else {
		// 请求成功
		fmt.Println("请求成功")
	}
}

LeakyBucketLimiter 结构体:

  • 该结构体用来表示漏桶限流器,包括漏桶的配置参数(peakLevel、currentVelocity)、Redis 客户端连接(client)以及漏桶的键值(key)等。

NewLeakyBucketLimiter 函数:

  • 该函数用于创建一个新的 LeakyBucketLimiter 实例,接受 Redis 客户端连接、键值、漏桶的最高水位和水流速度作为参数。
  • 通过该函数可以创建多个不同配置的漏桶限流器实例。

acquireToken 方法:

  • 该方法用于从漏桶中获取令牌,控制请求的速率。
  • 通过 Redis 的事务管道(TxPipeline)来操作多个命令,获取当前漏桶状态、计算新的漏桶状态并更新。
  • 根据获取令牌的结果返回 true 或 false。

TryAcquire 方法:

  • 该方法对 acquireToken 方法的结果进行处理,如果获取令牌成功,则返回 nil,否则返回 ErrAcquireFailed 错误。

漏桶算法有以下特点

  • 漏桶具有固定容量,出水速率是固定常量(流出请求)
  • 如果桶是空的,则不需流出水滴
  • 可以以任意速率流入水滴到漏桶(流入请求)
  • 如果流入水滴超出了桶的容量,则流入的水滴溢出(新请求被拒绝)

漏桶限制的是常量流出速率(即流出速率是一个固定常量值),所以最大的速率就是出水的速率,不能出现突发流量。

令牌桶限流

image.png

令牌桶算法(Token Bucket)是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。

我们有一个固定的桶,桶里存放着令牌(token)。一开始桶是空的,系统按固定的时间(rate)往桶里添加令牌,直到桶里的令牌数满,多余的请求会被丢弃。当请求来的时候,从桶里移除一个令牌,如果桶是空的则拒绝请求或者阻塞。

package redis

import (
	"context"
	"github.com/go-redis/redis/v8"
	"log"
	"strconv"
	"time"
)

type TokenBucket struct {
	rdb            *redis.Client
	leakyBucketKey string
	capacity       int64 // 漏桶容量
	rate           int64 // 每秒的令牌添加速率
}

func NewTokenLeakyBucket(address, password, key string, capacity, rate int64) *TokenBucket {
	rdb := redis.NewClient(&redis.Options{
		Addr:     address,
		Password: password,
		DB:       0,
	})

	return &TokenBucket{
		rdb:            rdb,
		leakyBucketKey: key,
		capacity:       capacity,
		rate:           rate,
	}
}

func (lb *TokenBucket) Start() {
	go func() {
		for {
			time.Sleep(time.Second / time.Duration(lb.rate))
			err := lb.rdb.ZAddNX(context.Background(), lb.leakyBucketKey, &redis.Z{
				Score:  float64(time.Now().UnixNano()),
				Member: "token",
			}).Err()
			if err != nil {
				log.Println("Failed to add token:", err)
			}
		}
	}()
}

func (lb *TokenBucket) ProcessRequest() bool {
	result, err := lb.rdb.ZRangeByScore(context.Background(), lb.leakyBucketKey, &redis.ZRangeBy{
		Min:    "0",
		Max:    strconv.FormatInt(time.Now().UnixNano(), 10),
		Offset: 0,
		Count:  1,
	}).Result()

	if err != nil {
		log.Println("Failed to get token:", err)
		return false
	}

	if len(result) > 0 {
		_, err := lb.rdb.ZRem(context.Background(), lb.leakyBucketKey, result[0]).Result()
		if err != nil {
			log.Println("Failed to remove token:", err)
		}
		return true
	}

	return false
}

func (lb *TokenBucket) Close() {
	if err := lb.rdb.Close(); err != nil {
		log.Println("Failed to close Redis connection:", err)
	}
}

func main() {
	leakyBucket := NewTokenLeakyBucket("localhost:6379", "", "myleakybucket", 100, 10)
	defer leakyBucket.Close()

	leakyBucket.Start()

	for i := 0; i < 20; i++ {
		if leakyBucket.ProcessRequest() {
			log.Println("Processed request:", i)
			// TODO: 执行具体请求操作
		} else {
			log.Println("Rejected request:", i)
			// TODO: 执行额外处理逻辑,如返回错误信息等
		}

		time.Sleep(time.Second) // 请求之间的间隔
	}
}

在代码中,TokenBucket 结构体表示令牌桶限流器,包含了Redis客户端连接实例 rdb、漏桶的键值 leakyBucketKey、桶的容量 capacity 和每秒钟添加的令牌速率 rate

NewTokenLeakyBucket 函数用于创建一个新的令牌桶限流器实例,参数包括Redis服务器地址、密码、键值、容量和速率。函数内部创建了Redis客户端连接实例,并返回令牌桶限流器实例。

Start 方法用于启动一个goroutine,以固定速率往桶中添加令牌。该方法会通过 time.Sleep 来控制添加令牌的时间间隔,并使用Redis的有序集合进行添加令牌的操作。

ProcessRequest 方法用于处理请求,检查桶中是否有令牌可用。该方法通过使用 ZRangeByScore 命令从有序集合中获取最早的一个令牌,并使用 ZRem 命令将该令牌从集合中移除。

Close 方法用于关闭与Redis的连接。

在 main 函数中,创建了一个令牌桶限流器实例 leakyBucket,通过调用 NewTokenLeakyBucket 函数进行初始化。

然后调用 leakyBucket.Start 方法来启动添加令牌的goroutine。

接下来,使用一个循环来模拟请求的过程,每次调用 leakyBucket.ProcessRequest 方法来处理请求。如果方法返回 true,表示请求通过;如果返回 false,表示请求被限流。

令牌桶有以下特点

  • 令牌按固定的速率被放入令牌桶中
  • 桶中最多存放 B 个令牌,当桶满时,新添加的令牌被丢弃或拒绝
  • 如果桶中的令牌不足 N 个,则不会删除令牌,且请求将被限流(丢弃或阻塞等待)

令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌...),并允许一定程度突发流量。