教你写一个Redis的库,redis 集群驱动

1,358 阅读7分钟

本文只是抛砖引玉; P

在下在 Shopee 工作,不喜欢加班想换个环境的同学可以考虑一下

拒绝 996,那就来 shopee,待遇 work life balance 两不: www.v2ex.com/t/672561#re…

###前言

最近跟同事请教了一下 redis 相关的事情,就找来了一下 redis 的驱动,看看这些库是怎么做 redis clusterpipeline 以及 transaction的,以下就把相关流程的代码剖析一下,还是有一些有意思的点的。

因为 C 语言比较底层,其他语言感觉描述性都差了一点,我找的是 elixir 的库来看的,质量很高。

事后才发现原来这个elixir 的 redis 库的作者是 elixir 这门语言的核心开发者;P

正文开始。

首先呢, Elixir 的这个库不支持 redis 集群,后来有人基于它扩展成支持简单的集群,所以先讲普通的怎么做,再扩展。

架构

这个库是单进程异步,当你发命令过来时,此库处理完后会马上发给 Redis 服务器,然后就可以接收新的命令,当 Redis Server 答复时,会返回此Reply给你。

一般连接池有通用的库,所以交给调用方来做,库只处理每个连接的请求。

RESP (REdis Serialization Protocol)

ps,上面这个标题就是来自 redis 官网的,明显 RE是 typo。

Redis 用的协议RESP是自己定的文本协议,客户端与服务端直接通过 TCP 连接通讯。

这个文本协议,其实就是对数据的序列化,以下就是规则:

  • For Simple Strings the first byte of the reply is "+"
  • For Errors the first byte of the reply is "-"
  • For Integers the first byte of the reply is ":"
  • For Bulk Strings the first byte of the reply is "$"
  • For Arrays the first byte of the reply is "*"

对于客户端而言,发过去给服务器的命令其实数据结构就是数组,所以只需要*数组长度\r\n$数组[0]里命令的长度\r\n 数组[0]里命令

说起来有点抽象,看看实际例子:

  • LLEN mylist 按照协议 encode就变成 *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n 的文本,

    • 数组里有两个字符串,分别是 4 长度的LLEN以及 6 个字符的mylist
  • SET mykey 1按协议 encode 就变成*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$1\r\n1\r\n"

    • 数组里有三个字符串,分别是 3 长度的SET以及 5 个字符的mykey,还有 1 个字符的1

    可以看看这个库是怎么做的,就是递归拼接,记录数组的长度,最后在最开头拼上*数组长度

  @doc ~S"""
  Packs a list of Elixir terms to a Redis (RESP) array.

  This function returns an iodata (instead of a binary) because the packed
  result is usually sent to Redis through `:gen_tcp.send/2` or similar. It can
  be converted to a binary with `IO.iodata_to_binary/1`.

  All elements of `elems` are converted to strings with `to_string/1`, hence
  this function supports encoding everything that implements `String.Chars`.

  ## Examples

      iex> iodata = Redix.Protocol.pack(["SET", "mykey", 1])
      iex> IO.iodata_to_binary(iodata)
      "*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$1\r\n1\r\n"

  """
  @crlf_iodata [?\r, ?\n]
  @spec pack([binary]) :: iodata
  def pack(items) when is_list(items) do
    pack(items, [], 0)
  end

  defp pack([item | rest], acc, count) do
    item = to_string(item)
    new_acc = [acc, [?$, Integer.to_string(byte_size(item)), @crlf_iodata, item, @crlf_iodata]]
    pack(rest, new_acc, count + 1)
  end

  defp pack([], acc, count) do
    [?*, Integer.to_string(count), @crlf_iodata, acc]
  end

维护长连接

作为 client 的库,维护长连接,避免频繁创建连接,这个是常规操作。

而有趣的是,作者使用了 erlang OTP自带的状态机框架 gen_statem 来维持 TCP 长连接,这个功能是OTP 19也就是 16 年才推出的,在不知道此作者是 elixir 语言的贡献者前,我还小小的膜拜了一下。

状态机如下图,初始状态不是同步连接,就是connecting 状态;同步的话,成功就是处于 connected 状态。

状态的动作依靠 TCP 的事件消息来驱动,状态转移自己控制。

例子:

  def disconnected({:timeout, :reconnect}, _timer_info, %__MODULE__{} = data) do
    {:ok, socket_owner} = SocketOwner.start_link(self(), data.opts, data.table)
    new_data = %{data | socket_owner: socket_owner}
    {:next_state, :connecting, new_data}
  end

以上代码就是在 discconected状态收到 TCP{:timeout, :reconnect}消息,创建一个新的TCP socket进程,将状态转移到:connecting

socket 进程在初始化时,会发送connect消息给自己:

  def handle_info(:connect, state) do
    with {:ok, socket, address} <- Connector.connect(state.opts),
         :ok <- setopts(state, socket, active: :once) do
      send(state.conn, {:connected, self(), socket, address})
      {:noreply, %{state | socket: socket}}
    else
      {:error, reason} -> stop(reason, state)
      {:stop, reason} -> stop(reason, state)
    end
  end

成功了,就发送connected消息给原来的状态机进程(也就是 connection 进程)connection进程处于connecting状态时,接受此消息,更新 socket 信息,状态转移到 connected

  def connecting(
        :info,
        {:connected, owner, socket, address},
        %__MODULE__{socket_owner: owner} = data
      ) do
    if data.backoff_current do
      :telemetry.execute([:redix, :reconnection], %{}, %{
        connection: data.opts[:name] || self(),
        address: address
      })
    end

    data = %{data | socket: socket, backoff_current: nil, connected_address: address}
    {:next_state, :connected, %{data | socket: socket}}
  end

执行命令

Redis 执行命令主要有 ComandPipeline以及Trasaction三种概念:

  • command:一问一答式的,客户端等待 server 返回消息;
  • Pipeline:发送一连串命令,这些命令发往 server,不用一问一答,收到命令马上返回。sever 以队列执行,执行完后全部结果返回回来;
  • Trasaction:依靠MULTI/EXEC命令,MULTI命令开始Trasaction,此后发送的命令都存到 server 的队列里,EXEC命令发送后马上这队列里所有命令;期间不会有其他命令影响这些命令的执行。

库里把 Command 命令用 Pipeline来做,其实本质是一样的。

Pipeline

以下的pipeline就是负责用户调用的函数,:gen_statem.cast就是把消息数据传给状态机,接着就是起了一个进程来监控这个连接,挂了就退出;同时阻塞等待状态机完成处理获得数据后发消息过来。

  def pipeline(conn, commands, timeout) do
    conn = GenServer.whereis(conn)

    request_id = Process.monitor(conn)

    # We cast to the connection process knowing that it will reply at some point,
    # either after roughly timeout or when a response is ready.
    cast = {:pipeline, commands, _from = {self(), request_id}, timeout}
    :ok = :gen_statem.cast(conn, cast)

    receive do
      {^request_id, resp} ->
        _ = Process.demonitor(request_id, [:flush])
        resp

      {:DOWN, ^request_id, _, _, reason} ->
        exit(reason)
    end
  end

状态机这块的代码就是:

  def connected(:cast, {:pipeline, commands, from, timeout}, data) do
    {ncommands, data} = get_client_reply(data, commands)

    if ncommands > 0 do
      {counter, data} = get_and_update_in(data.counter, &{&1, &1 + 1})

      row = {counter, from, ncommands, _timed_out? = false}
      :ets.insert(data.table, row)

      case data.transport.send(data.socket, Enum.map(commands, &Protocol.pack/1)) do
        :ok ->
          actions =
            case timeout do
              :infinity -> []
              _other -> [{{:timeout, {:client_timed_out, counter}}, timeout, from}]
            end

          {:keep_state, data, actions}

        {:error, _reason} ->
          # The socket owner will get a closed message at some point, so we just move to the
          # disconnected state.
          :ok = data.transport.close(data.socket)
          {:next_state, :disconnected, data}
      end
    else
      reply(from, {:ok, []})
      {:keep_state, data}
    end
  end

没什么特别的,get_client_reply就是处理客户端是否想得到服务器回复的命令的 CLIENT REPLY的各种指令,

  defp get_client_reply([command | rest], ncommands, client_reply) do
    case parse_client_reply(command) do
      :off -> get_client_reply(rest, ncommands, :off)
      :skip when client_reply == :off -> get_client_reply(rest, ncommands, :off)
      :skip -> get_client_reply(rest, ncommands, :skip)
      :on -> get_client_reply(rest, ncommands + 1, :on)
      nil when client_reply == :on -> get_client_reply(rest, ncommands + 1, client_reply)
      nil when client_reply == :off -> get_client_reply(rest, ncommands, client_reply)
      nil when client_reply == :skip -> get_client_reply(rest, ncommands, :on)
    end
  end

接着就是把命令序列号成 RESP,使用data.transport.send发送给服务器,其实 Redis 除了 TCP 外还可以使用SSL/TLS 协议,所以就有了这一层抽象。

如果是 TCP,那么socket 服务就会在 redis 服务器返回消息后,此函数接收自动处理:

  def handle_info({transport, socket, data}, %__MODULE__{socket: socket} = state)
      when transport in [:tcp, :ssl] do
    :ok = setopts(state, socket, active: :once)
    state = new_data(state, data)
    {:noreply, state}
  end

支持Redis Cluster

Redis Cluster 的分布式算法

官网写的很好了,我简单说一下好了。

Redis Cluster does not use consistent hashing, but a different form of sharding where every key is conceptually part of what we call an hash slot.

Redis Cluster没有用一致性哈希算法,而是用了hash slot(哈希桶)

There are 16384 hash slots in Redis Cluster, and to compute what is the hash slot of a given key, we simply take the CRC16 of the key modulo 16384.

redis 会固定分配 16384 个 slots 到不同的节点,用的算法就是对 key 做 CRC16 然后对 16384取模: HASH_SLOT = CRC16(key) mod 16384

例子如下:

Every node in a Redis Cluster is responsible for a subset of the hash slots, so for example you may have a cluster with 3 nodes, where:

- Node A contains hash slots from 0 to 5500.
- Node B contains hash slots from 5501 to 11000.
- Node C contains hash slots from 11001 to 16383.

This allows to add and remove nodes in the cluster easily. For example if I want to add a new node D, I need to move some hash slot from nodes A, B, C to D. Similarly if I want to remove node A from the cluster I can just move the hash slots served by A to B and C. When the node A will be empty I can remove it from the cluster completely.

用这样的算法,比一致性哈希方便,更有操作性:

Redis Cluster implements a concept called hash tags that can be used in order to force certain keys to be stored in the same hash slot.

Because moving hash slots from a node to another does not require to stop operations, adding and removing nodes, or changing the percentage of hash slots hold by nodes, does not require any downtime.

对于 redis 或者对用户来说,可以轻松地分配移动 slots;

而一致性哈希就只能自己算虚拟节点,并且『祈求』之后请求量多了最终达到想要的平衡了。

#####redix-cluster

原版没有支持集群,zhongwencool/redix-cluster 写了一个简单的包装版本。

只需要看这段,就很清楚为了集群做了些啥:

  @spec pipeline([command], Keyword.t) :: {:ok, term} |{:error, term}
  def pipeline(pipeline, opts) do
    case RedixCluster.Monitor.get_slot_cache do
      {:cluster, slots_maps, slots, version} ->
         pipeline
           |> parse_keys_from_pipeline
           |> keys_to_slot_hashs
           |> is_same_slot_hashs
           |> get_pool_by_slot(slots_maps, slots, version)
           |> query_redis_pool(pipeline, :pipeline, opts)
      {:not_cluster, version, pool_name} ->
         query_redis_pool({version, pool_name}, pipeline, :pipeline, opts)
    end
  end

|> 就是类似 unix 的 管道 |,把函数返回值当做下个函数的第一个参数传给他。

get_slot_cache就是获取redis的cluster slots这个记录,并且缓存起来。

CLUSTER SLOTS returns details about which cluster slots map to which Redis instances.

  • parse_keys_from_pipeline 将全部 keys 从Pineline 命令里提取出来
  • keys_to_slot_hashs 找出 各个key 在哪个 hash slot
  • is_same_slot_hashs 判断所有 key 是不是在同一个 hash slot,是的,这个还不支持跨 slot,我在准备帮他写一个
  • get_pool_by_slot 项目用了连接池来管理,所以要根据名字找对应的连接
  • query_redis_pool 就是调用 原来的 Redix 做处理了

简单来说,这个库就是残废的,哈哈哈。。。

不支持分布不同 slot,就是玩具。