从bgsave命令看redis的RDB持久化机制

1,294 阅读5分钟

redis通过bgSave命令将数据持久化到磁盘上,在启动的时候,可以从磁盘上加载bgSave生成的RDB文件,恢复数据

save命令会阻塞,不推荐使用

RDB持久化机制简介

redis的RDB结构大致如下

以hashtable为例

REDIS|db_version|SELECTDB|0|REDIS_TYPE_HASH|hash_size|key1_len|key1_value|key1_value_len|key1_value|EOF|checksum
  • REDIS:放在文件开头的标识符
  • db_version:当前RDB的版本
  • SELECTDB:标识符,接下来要读到的是server中的数据库下标
  • 0:表示第0个db,默认有16个
  • REDIS_TYPE_HASH:在db中存了hashTable结构
  • hash_size:hashTable中一共有多少个元素
  • key1_len:第一个key占的字节数
  • key1_value:第一个key的字面值
  • key1_value_len:第一个key对应的value的字节数
  • key1_value:第一个key对应的value的值
  • EOF:没有数据的标识符
  • checksum:RDB文件的校验和,校验内容的完整性

调用bgSave进行存储

当用户执行bgSave命令的时候,redis会fork出子进程进行处理,使得其他命令不会被阻塞执行

Code.SLICE.source("if ((childpid = fork()) == 0) {" +
"        //..." +
"        retval = rdbSave(filename,rsi);" +
"        if (retval == C_OK) {" +
"        //..." +
"            server.child_info_data.cow_size = private_dirty;" +
"            sendChildInfo(CHILD_INFO_TYPE_RDB);" +
"        }" +
"        exitFromChild((retval == C_OK) ? 0 : 1);" +
"    } else {" +
"        /* Parent */" +
"        //..." +
"        server.rdb_save_time_start = time(NULL);" +
"        server.rdb_child_pid = childpid;" +
"        server.rdb_child_type = RDB_CHILD_TYPE_DISK;" +
"        updateDictResizePolicy();" +
"        return C_OK;" +
"    }")
.interpretation("创建子进程,子进程负责做rdb相关的处理,父进程记下处理中的子进程ID,返回当前bgsave的执行,也就是说bgsave不会阻塞其它命令的执行");
   

在存储数据进入RDB的时候,首先会在文件头写入 REDIS 字符串,拼上当前RDB的版本

Code.SLICE.source("snprintf(magic,sizeof(magic),\"REDIS%04d\",RDB_VERSION);" +
"    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;")
.interpretation("首先在文件中写下 REDIS字符串和RDB的版本");

紧接着遍历redis的server中所有的数据库,一个个的写入数据,根据数据的类型不同,采用不用的TYPE来标识,然后记下对应的长度,再存入值,比如要存储的对象的值是hashTable

Code.SLICE.source("else if (o->type == OBJ_HASH) {" +
"        /* Save a hash value */" +
"        if (o->encoding == OBJ_ENCODING_ZIPLIST) {" +
"            size_t l = ziplistBlobLen((unsigned char*)o->ptr);" +
"" +
"            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;" +
"            nwritten += n;" +
"" +
"        } else if (o->encoding == OBJ_ENCODING_HT) {" +
"            dictIterator *di = dictGetIterator(o->ptr);" +
"            dictEntry *de;" +
"" +
"            if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {" +
"                dictReleaseIterator(di);" +
"                return -1;" +
"            }" +
"            nwritten += n;" +
"" +
"            while((de = dictNext(di)) != NULL) {" +
"                sds field = dictGetKey(de);" +
"                sds value = dictGetVal(de);" +
"" +
"                if ((n = rdbSaveRawString(rdb,(unsigned char*)field," +
"                        sdslen(field))) == -1)" +
"                {" +
"                    dictReleaseIterator(di);" +
"                    return -1;" +
"                }" +
"                nwritten += n;" +
"                if ((n = rdbSaveRawString(rdb,(unsigned char*)value," +
"                        sdslen(value))) == -1)" +
"                {" +
"                    dictReleaseIterator(di);" +
"                    return -1;" +
"                }" +
"                nwritten += n;" +
"            }" +
"            dictReleaseIterator(di);" +
"        } else {" +
"            serverPanic(\"Unknown hash encoding\");" +
"        }" +
"    } ")
.interpretation("以hash的编码方式为例,看底层的实现")
.interpretation("1: hash的底层实现如果是ziplist,那么拿到ziplist的长度,将ziplist转为字符串存储")
.interpretation("2: hash的底层实现方式为 hasttable,那么一个个的遍历key,value,将它们分别转成String的形式再存储");

当所有数据记录完成之后,写入EOF结束标记,最后加上校验和,至此完成内存数据序列化,存储到磁盘

Code.SLICE.source("if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;")
        .interpretation("写入EOF标记,代表所有db的数据都已经写入了");
Code.SLICE.source("cksum = rdb->cksum;" +
        "    memrev64ifbe(&cksum);" +
        "    if (rioWrite(rdb,&cksum,8) == 0) goto werr;")
        .interpretation("写入校验和,完整的内存数据写入完毕");

启动加载

在redis的启动的过程中会进行加载,它实质上就是存储的反序列化过程,首先是读取字符串 REDIS

 Code.SLICE.source("if (rioRead(rdb,buf,9) == 0) goto eoferr;" +
    "    buf[9] = '\\0';" +
    "    if (memcmp(buf,\"REDIS\",5) != 0)")
    .interpretation("读取文件的前9个字节,前5个必定是REDIS字符,否则出错");

接下来便可以按照序列化的规则,进行反序列化,知道读取完成

Code.SLICE.source("while(1) {..." +
"if ((type = rdbLoadType(rdb)) == -1) goto eoferr;" +
"..." +
" else if (type == RDB_OPCODE_EOF) {" +
"            /* EOF: End of file, exit the main loop. */" +
"            break;" +
"..." +
"else if (type == RDB_OPCODE_RESIZEDB){...}" +
"..." +
"if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;" +
"if ((val = rdbLoadObject(type,rdb)) == NULL) goto eoferr;" +
"}")
.interpretation("循环读取文件的内容,首先读到接下来的类型")
.interpretation("1: 读到EOF结束")
.interpretation("2: 读取到对应的标记,就继续读取后面的字节,直到读到key")
.interpretation("3: 读取key,读取val");

value以hashtable为例,会构造出对应的结构

  Code.SLICE.source("else if (rdbtype == RDB_TYPE_HASH) {" +
    "        len = rdbLoadLen(rdb, NULL);" +
    "..." +
    "        o = createHashObject();" +
    "        /* ... */" +
    "        while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) {" +
    "            len--;" +
    "            /* Load raw strings */" +
    "            if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))" +
    "                == NULL) return NULL;" +
    "            if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))" +
    "                == NULL) return NULL;" +
    "" +
    "            /* Add pair to ziplist */" +
    "            o->ptr = ziplistPush(o->ptr, (unsigned char*)field," +
    "                    sdslen(field), ZIPLIST_TAIL);" +
    "            o->ptr = ziplistPush(o->ptr, (unsigned char*)value," +
    "                    sdslen(value), ZIPLIST_TAIL);" +
    "" +
    "            /* Convert to hash table if size threshold is exceeded */" +
    "            if (sdslen(field) > server.hash_max_ziplist_value ||" +
    "                sdslen(value) > server.hash_max_ziplist_value)" +
    "            {" +
    "                sdsfree(field);" +
    "                sdsfree(value);" +
    "                hashTypeConvert(o, OBJ_ENCODING_HT);" +
    "                break;" +
    "            }" +
    "            sdsfree(field);" +
    "            sdsfree(value);" +
    "        }" +
    " ........"+
    "        /* Load remaining fields and values into the hash table */" +
    "        while (o->encoding == OBJ_ENCODING_HT && len > 0) {" +
    "            len--;" +
    "            /* Load encoded strings */" +
    "            if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))" +
    "                == NULL) return NULL;" +
    "            if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))" +
    "                == NULL) return NULL;" +
    "" +
    "            /* Add pair to hash table */" +
    "            ret = dictAdd((dict*)o->ptr, field, value);" +
    "            if (ret == DICT_ERR) {" +
    "                rdbExitReportCorruptRDB(\"Duplicate keys detected\");" +
    "            }" +
    "        }" +
    "    }")
    .interpretation("以hashtable为例,读取到对应的数据长度,创建对象,根据对象的编码方式,分别解析成ziplist或者是hashtable来存储");
 

总结

  1. bgsave不会阻塞redis其它命令的运行,通过fork子进程实现;
  2. RDB序列化内存对象的机制是先设定数据的类型表示,然后记下数据量,再记下数据值的长度,再记下数据本身
  3. 启动加载RDB文件的解析就是按照既定的保存规则进行反序列化

RDB的优势与劣势

  • 优势:RDB是一个紧凑压缩的二进制文件,适用于备份,全量复制的场景;它的恢复速度远快于AOF
  • 劣势:不适用于实时持久化,实时操作成本高;老版本的Redis服务无法兼容新版本的Redis产生的RDB文件

附录

RDB启动加载源码
bgSave执行源码
书籍:Redis设计与实现、Redis开发与运维
AOF机制介绍