用Go实现Redis之四实现Redis的协议交互

764 阅读3分钟
原文链接: segmentfault.com

写在前面

本文实现的Godis代码版本为:v0.0.3

在前三篇文章中,实现了客户端/服务端的交互(基于textprotoco)、服务端初始化和get/set命令。如果阅读过或者调试过粗略的代码实现,会发现使用文本协议进行交互,除了容易阅读之外,解析效率是比较低下的。因为我们的示例是"set alpha 123n",工整的单个空格和n分割,可能在分割上效率还好;既要分割,不免低效。

在本文,将替换文本协议为Redis1.2版本后的统一协议。

Redis通信协议

Redis通信协议解析高效、二进制安全,同时也对人类友好(可直接阅读解析)。

协议格式

Redis在发送命令和返回结果中均使用同一套标准协议。Reids协议“肉眼可辨”,在发送命令是使用类型为"multi bulk reply"的协议类型,回复时根据结果的不同使用不同类型协议。

通过检查服务器发回数据的第一个字节, 可以确定这个回复是什么类型:

  1. 状态回复(status reply)的第一个字节是 "+"
  2. 错误回复(error reply)的第一个字节是 "-"
  3. 整数回复(integer reply)的第一个字节是 ":"
  4. 批量回复(bulk reply)的第一个字节是 "$"
  5. 多条批量回复(multi bulk reply)的第一个字节是 "*"

举两个例子:

1.客户端执行命令"set alpha 123", 服务器返回 "OK"该类型即为状态恢复,服务器返回的结果封装为标准协议是"+OKrn",客户端解释协议结果,将之反馈给使用者。

2.还是客户端执行命令"set alpha 123",在发送给服务端时也是以协议格式交互的。前文提到发送命令使用的是”多条批量回复“类型协议,封装好的命令就是*3\r\n$3\r\nset\r\n$5\r\nalpha\r\n$3\r\n123\r\n
对应的ASCII码如下:

clipboard.png

  1. 符号'*'标识协议类型是多条批量回复,"rn"为元素分割标记;
  2. '$'标识接下来的是批量回复协议,要按照批量回复格式解析;
  3. '3'代表该批量回复长度为3字节;
  4. "set"为批量回复协议内容;
  5. 重复2-4直到协议解析完成。

可以看出,协议的生成和解析可以简化理解为两段文本处理程序。

Godis实现Redis通信协议

GO版本协议实现初探

很多Redis相关的GO组件、模块、工具都有协议的生成和解析实现,并历经生产环境的考验。如go-rediscodis等知名项目。
不提性能和扩展性,协议生成的GO代码可以实现如下:

//将命令行转换为协议
func Cmd2Protocol(cmd string) (pro string) {
    //cmd := "set alpha 123"
    ret := strings.Split(cmd, " ")
    //todo validate cmd and params
    for k, v := range ret {
        if k == 0 {
            pro = fmt.Sprintf("*%d\r\n", len(ret))
        }
        pro += fmt.Sprintf("$%d\r\n%s\r\n", len(v), v)
    }
    return
}

以上代码便可以将命令"set alpha 123"转换为Redis的标准协议格式。

而协议的解析,可以拆解为如下流程:

clipboard.png

以前文示例,拆解过程如下:

clipboard.png

最终的操作只是单独的数据类型解析,数字解析将数字转成文字、文本解析读取对应字节数量的字符即可。

//将协议转成argc、argv
func Protocol2Args(protocol string) (argv []string, argc int) {
    parts := strings.Split(strings.Trim(protocol, " "), "\r\n")
    if len(parts) == 0 {
        return nil, 0
    }
    argc, err := strconv.Atoi(parts[0][1:])
    if err != nil {
        return nil, 0
    }
    j := 0
    var vlen []int
    for _, v := range parts[1:] {
        if len(v) == 0 {
            continue
        }
        if v[0] == '$' {
            tmpl, err := strconv.Atoi(v[1:])
            if err == nil {
                vlen = append(vlen, tmpl)
            }
        } else {
            if j < len(vlen) && vlen[j] == len(v) {
                j++
                argv = append(argv, v)
            }
        }
    }
    return argv, argc
}

协议最终实现

在实现协议的编码过程中,一直希望编码能尽可能简单、又有值得思考和改进的地方,无奈能力有限,远不如codis的实现优雅。还是觉得使用codis的实现方案,才是值得一看的代码。对codis的代码做了部分修改,如果想直接看codis的实现,可以点这里直达。
在Godis的协议实现中,去掉了codis的错误处理和一部分I/O优化,希望尽量让其看起来简单,希望不会生硬:)。
主要增加了两个包:
其一为共用的带缓冲I/O包,封装了ByteReader的一些byte级操作
其二为proto包,分别可实例化为proto.Encoder和proto.Decoder来处理协议编解码

协议编码

将release v0.0.2中的纯文本协议交互改为编码后的协议交互:

func send2Server(msg string, conn net.Conn) (n int, err error) {
    p, e := proto.EncodeCmd(msg)
    if e != nil {
        return 0, e
    }
    //fmt.Println("proto encode", p, string(p))
    n, err = conn.Write(p)
    return n, err
}

前文说过,编码使用的协议类型是多条批量回复。这里仍然以"set alpha 123"命令为例。
首先,拆解字符串为[set alpha 123]三部分(请暂时忽略异常格式)。三部分分别是一条批量回复,每一部分按照一个批量回复格式编码处理即可。
在proto包,使用如下结构体保存协议格式和数据信息:

type Resp struct {
    Type byte

    Value []byte
    Array []*Resp
}

以上文例子,单条批量回复"set",填充进Resp结构的方法是:

// NewBulkBytes 批量回复类型
func NewBulkBytes(value []byte) *Resp {
    r := &Resp{}
    r.Type = TypeBulkBytes//批量回复类型
    r.Value = value
    return r
}

"set","alpha","123"三条批量回复构成多条批量回复类型的方法如下:

// NewArray 多条批量回复类型
func NewArray(array []*Resp) *Resp {
    r := &Resp{}
    r.Type = TypeArray//多条批量回复
    r.Array = array
    return r
}

这样就将[set alpha 123]构成了多条批量回复类型的协议。而在将该多条批量回复类型的协议编码的操作伪代码如下:

// encodeResp 编码
func (e *Encoder) encodeResp(r *Resp) error {
    if err := e.bw.WriteByte(byte(r.Type)); err != nil {
        return errorsTrace(err)
    }
    switch r.Type {
    case TypeString, TypeError, TypeInt:
        return e.encodeTextBytes(r.Value)
    case TypeBulkBytes:
        return e.encodeBulkBytes(r.Value)
    case TypeArray:
        return e.encodeArray(r.Array)
    default:
        return errorsTrace(e.Err)
    }
}
// encodeArray encode 多条批量回复
func (e *Encoder) encodeArray(array []*Resp) error {
    if array == nil {
        return e.encodeInt(-1)
    } else {
        if err := e.encodeInt(int64(len(array))); err != nil {
            return err
        }
        for _, r := range array {
            if err := e.encodeResp(r); err != nil {
                return err
            }
        }
        return nil
    }
}

——编码多条批量回复的操作是先逐条编码Resp.Array数组的元素,比如"set",真正的编码操作为将"set"长度、分隔符"rn"和"set"本身分别追加到协议,
结果就是$3\r\nset\r\n

协议解码

协议生成的过程只依赖多条批量回复类型,而客户端在解读服务端的返回时,会面临不同的回复类型:

// decodeResp 根据返回类型调用不同解析实现
func (d *Decoder) decodeResp() (*Resp, error) {
    b, err := d.br.ReadByte()
    if err != nil {
        return nil, errorsTrace(err)
    }
    r := &Resp{}
    r.Type = byte(b)
    switch r.Type {
    default:
        return nil, errorsTrace(err)
    case TypeString, TypeError, TypeInt:
        r.Value, err = d.decodeTextBytes()
    case TypeBulkBytes:
        r.Value, err = d.decodeBulkBytes()
    case TypeArray:
        r.Array, err = d.decodeArray()
    }
    return r, err
}

该过程与编码过程操作类似,不再赘述。下面的代码是为服务端增加协议解析:


// ProcessInputBuffer 处理客户端请求信息
func (c *Client) ProcessInputBuffer() error {
    //r := regexp.MustCompile("[^\\s]+")
    decoder := proto.NewDecoder(bytes.NewReader([]byte(c.QueryBuf)))
    //decoder := proto.NewDecoder(bytes.NewReader([]byte("*2\r\n$3\r\nget\r\n")))
    if resp, err := decoder.DecodeMultiBulk(); err == nil {
        c.Argc = len(resp)
        c.Argv = make([]*GodisObject, c.Argc)
        for k, s := range resp {
            c.Argv[k] = CreateObject(ObjectTypeString, string(s.Value))
        }
        return nil
    }
    return errors.New("ProcessInputBuffer failed")
}

这里是一些调试信息:

clipboard.png

最后请看添加了协议实现之后的演示:

clipboard.png

因为都是经过客户端/服务端的编解码之后的结果,并不能看出协议本身的内容。感兴趣的读者可以直接编译本篇的release版本v0.0.3,打开调试日志查看交互过程的协议实现。

本篇问题

  1. bufio包的实现中,涉及到一些GO版本和读写操作的问题,细节不容易讲清楚;
  2. 单独编写的Encoder和Decoder在实现上有一些效率和扩展性问题,欢迎讨论。

下集预告

  1. AOF持久化——数据保存;
  2. AOF持久化——启动加载。