阅读 989

【临实战】使用 Python 从 Redis 中删除 4000W 个 KEY

本文主要涉及 Redis 的以下两个操作和其 Python 实现,目录:
  • SCAN 命令
  • DEL 命令
  • 使用 Python SCAN
  • 使用 Python DEL
  • 成果展示

SCAN 命令

SCAN 命令及相关的 SSCAN、HSCAN 和 ZSCAN 命令都用于增量迭代(incrementally iterate)一个集合的元素(a collection of elements):
  • SCAN 用于迭代当前数据库中的数据库键
  • SSCAN 用于迭代集合键中的元素
  • HSCAN 用于迭代哈希键中的键值对
  • ZSCAN 用于迭代有序集合中的元素(包括元素分值和元素分值)
以上四列命令都支持增量迭代,每次执行都会返回少量元素,所以他们都可以用于生产环境,而不会出现像 KEYS、SMEMBERS 命令一样 -- 可能会阻塞服务器

不过,增量式迭代命令也不是没有缺点的:
举个例子,使用 SMEMBERS 命令可以返回集合键当前包含的所有元素,但是对于 SCAN 这类增量迭代命令来说,因为在堆键进行增量迭代的过程中,键可能会被改变,所以增量式迭代命令只能对被返回的元素提供有限的保证(offer limited guarantees about the returned elements)。

因为 SCAN、SSCAN、HSCAN 和 ZSCAN 命令的工作方式都非常相似,但是要记住:
  • SSCAN、HSCAN 和 ZSCAN 命令的第一个参数总是一个数据库键;
  • SCAN 命令则不需要在第一个参数提供任何数据库键 -- 因为它迭代的是当前数据库中的所有数据库键。

SCAN 命令的基本用法
SCAN 命令是一个基于游标的迭代器(cursor based iterator):
SCAN 命令每次被调用后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新游标作为 SCAN 命令的游标参数,以此来延续之前的迭代过程。
当 SCAN 命令的游标参数被设置为 0 时,服务器开始一次新的迭代,而当服务器向用户返回值为 0 的游标时,表示迭代结束。
示例:
redis 127.0.0.1:6379> scan 0
1) "17"
2)  1) "key:12"
    2) "key:8"
    3) "key:4"
    4) "key:14"
    5) "key:16"
    6) "key:17"
    7) "key:15"
    8) "key:10"
    9) "key:3"
    10) "key:7"
    11) "key:1"

redis 127.0.0.1:6379> scan 17
1) "0"
2) 1) "key:5"
   2) "key:18"
   3) "key:0"
   4) "key:2"
   5) "key:19"
   6) "key:13"
   7) "key:6"
   8) "key:9"
   9) "key:11"复制代码
上面的例子中,第一次迭代用 0 作为游标,表示开始第一次迭代。
第二次迭代使用第一次迭代时返回的游标,即:17。
从示例可以看出,SCAN 命令的返回是一个两个元素的数组,第一个元素是新游标,第二个元素也是一个数组,包含有所被包含的元素。
第二次调用 SCAN 命令时,返回游标 0,这表示迭代已经结束了,整个数据集(collection)已经被完整遍历过一遍了。
这个过程被称为一次完整遍历(full iteration)。

精简一下内容,补充三点:
  1. 因为 SCAN 命令仅仅使用游标来记录迭代状态,所以在迭代过程中,如果这个数据集的元素有增减,如果是减,不保证元素不返回;如果是增,也不保证一定返回;而且在某种情况下同一个元素还可能被返回多次。所以对迭代返回的元素所执行的操作最好可以重复执行多次(幂等)。
  2. 增量迭代命令不保证每次迭代所返回的元素数量(没扫到嘛),但是我们可以使用 COUNT 选项对命令的行为进行一定程度的调整。COUNT 参数的默认值为 10,在迭代一个足够大的、由哈希表实现的数据库、集合键、哈希键或者有序集合键时,如果用户没有使用 MATCH 选项,那么命令返回的数量通常和 COUNT 选项指定的一样,或者多一些(😓),在迭代编码为整数集合(intset:一个由整数值构成的小集合)或编码为压缩列表(ziplist:由不同值构成的一个小哈希或者一个小有序集合)时,会无视 COUNT 选项指定的值,在第一次迭代就将数据集的所有元素都返回给用户。
  3. MATCH 选项,直接看示例吧,如下
示例:
redis 127.0.0.1:6379> sadd myset 1 2 3 foo foobar feelsgood
(integer) 6

redis 127.0.0.1:6379> sscan myset 0 match f*
1) "0"
2) 1) "foo"
   2) "feelsgood"
   3) "foobar"复制代码
注意:对元素的模式匹配工作是在命令从数据集中取出元素之后,向客户端返回元素之前进行的,所以有可能返回空
示例:
redis 127.0.0.1:6379> scan 0 MATCH *11*
1) "288"
2) 1) "key:911"

redis 127.0.0.1:6379> scan 288 MATCH *11*
1) "224"
2) (empty list or set)

redis 127.0.0.1:6379> scan 224 MATCH *11*
1) "80"
2) (empty list or set)

redis 127.0.0.1:6379> scan 80 MATCH *11*
1) "176"
2) (empty list or set)

redis 127.0.0.1:6379> scan 176 MATCH *11* COUNT 1000
1) "0"
2)  1) "key:611"
    2) "key:711"
    3) "key:118"
    4) "key:117"
    5) "key:311"
    6) "key:112"
    7) "key:111"
    8) "key:110"
    9) "key:113"
   10) "key:211"
   11) "key:411"
   12) "key:115"
   13) "key:116"
   14) "key:114"
   15) "key:119"
   16) "key:811"
   17) "key:511"
   18) "key:11"复制代码
注意:最后一次迭代,通过 COUNT 选项指定为 1000 强制命令为本次迭代扫描更多元素,从而使返回的元素也变多了。

DEL 命令

这个比较简单,删除给定的一个或者多个 key
redis> SET name "redis"
OK
redis> SET type "key-value store"
OK
redis> SET website "redis.com"
OK
redis> DEL name type website
(integer) 3复制代码

使用 Python SCAN

安装 redis 包
pip install redis复制代码
完整代码示例:
import redis

pool=redis.ConnectionPool(host='redis_hostname', port=6379, max_connections=100)
r = redis.StrictRedis(connection_pool=pool)

cursor_number, keys = r.execute_command('scan', 0, "count", 200000)

while True:
    if cursor_number == 0:
        # 结束一次完整的比遍历
        break
    cursor_number, keys = r.execute_command('scan', cursor_number, "count", 200000)
    # do something with keys

复制代码
我将需要删除的 key 存在一个文件里,有 2.2G,大概 4000W 个,下一步就是删除了

使用 Python DEL

因为文件很大,我们用到一个小技巧,分块读取
with open("/data/rediskeys") as kf:
    lines = kf.readlines(1024*1024)复制代码
调用 delete 方法时,用到一个小技巧就是『*』星号
r.delete(*taskkey_list)复制代码
我们看一下定义就清楚了:
delete method
放上完整代码:
import redis
import time

pool=redis.ConnectionPool(host='redis_hostname', port=6379, max_connections=100)
r = redis.StrictRedis(connection_pool=pool)

start_time = time.time()
SUCCESS_DELETED = 0

with open("/data/rediskeys") as kf:
    while True:
        lines = kf.readlines(1024*1024)
        if not lines:
            break
        else:
            taskkey_list = [i.strip() for i in lines if i.startswith("UCS:TASKKEY")]
            SUCCESS_DELETED += r.delete(*taskkey_list)

        print SUCCESS_DELETED

end_time = time.time()
print end_time - start_time, SUCCESS_DELETED复制代码

成果展示

结束,下篇再见


关注下面的标签,发现更多相似文章
评论
说说你的看法