Mongo 代理程序实现-代码实战篇

2,167 阅读4分钟
原文链接: zhuanlan.zhihu.com

延续上一篇文章 Mongo 代理程序实现-复制集搭建及抓包篇,接下来,就正式开始我们的代码实战。

根据一贯的风格,我们先来梳理下项目目录结构,结构如下:

.
|__ bin/                   # 用于存放编译后生成的二进制文件
|__ config/                # 用于存放配置文件
|__ connection/            # 存放连接相关的文件
|   |__ proxy.go           # 代理组件
|   |__ pool.go            # 连接池组件
|   |__ repl_set.go        # 复制集组件
|   |__ conn.go            # 连接对象组件
|__ internal/              # 存放 mongo 内部协议相关文件
|   |__ auth.go            # 握手鉴权组件
|   |__ protocol.go        # 协议解析组件
|   |__ request.go         # 请求重写组件
|   |__ response.go        # 响应重写组件
|__ statistics/            # 存放指标统计上报组件
|__ test/                  # 存放各种语言驱动测试代码的文件夹
|__ utils/                 # 工具函数文件夹
|__ glide.yaml             # 依赖包配置文件
|__ main.go                # 入口文件

限于篇幅的原因,不可能把上面的细节一一讲个遍,我只挑选 proxy、pool 两个组件来讲...想了解更多实现细节的童鞋,可以私信我。

proxy 实现

最简单的 proxy 实现套路就像下面这样:

// main.go
func main() {
  // 传入配置参数,实例化一个代理对象
  p := NewProxy(conf)
  // 卡住,循环接受客户端请求
  p.LoopAccept()
}

接着来实现 NewProxy、LoopAccept 方法:

// connection/proxy.go
type Proxy struct {
    sync.RWMutex

    listener            net.Listener
    writePool, readPool *pool
}

func NewProxy(conf config.UserConf) *Proxy {
  // 开始监听本地端口
    listener, err := net.Listen("tcp", ":"+conf.GetString("port"))
    if err != nil {
        log.Fatalln(err)
    }

  p := &Proxy{
    listener: listener,
  }

  // 实例化连接池
  p.readPool, p.writePool, err = newPool(p)
    if err != nil {
        panic(err)
    }

  return p 
}

func (p *Proxy) LoopAccept() {
  for {
    client, err := p.listener.Accept()

    go func(c net.Conn) {
      defer c.Close()

      // 一个连接在多次 messageHandler 中共用一个 Reader 对象
      cr := bufio.NewReader(c)
      // 因为一个连接可能会进行多次读或写操作
      for {
        // 将客户端请求代理给服务端,服务端响应代理回客户端
        // 同时中间对请求或响应进行重写操作
        err := p.messageHandler(cr, c)

        if err != nil {
          // 只要出现错误,就执行到上面的 defer c.Close() 来关闭连接
          return
        }
      }
    }(client)
  }
}

接着来实现核心逻辑 messageHandler:

// connection/proxy.go
func (p *Proxy) messageHandler(cr *bufio.Reader, c net.Conn) error {
  // 对请求报文进行解析操作
  req, err := internal.Decode(clientReader)
  if err != nil {
        return errors.New("decode error")
    }

  // 将客户端请求发送给数据库服务器
  res, err := p.clientToServer(req)
  if err != nil {
    return errors.New("request error")
  }

  // 将数据库服务器响应返回给客户端
  return res.WriteTo(c)
}

func (p *Proxy) clientToServer(req *internal.Message) (*internal.Message, error) {
  var server net.Conn
  // 如果是读操作,就从读池中取出连接
  if req.IsReadOp() {
    host := req.GetHost()
    // 某些读操作需要发送到指定的读库上,所以需要传 host,来获取指定读库连接
    server = p.readPool.Acquire(host)
  // 反之,写操作从写池中取出连接
  } else {
    // 由于写库只有一个,所以不用传 host 参数了
    server = p.writePool.Acquire()
  }

  // 将客户端请求发送给数据库服务器
  err := req.WriteTo(server)
  if err != nil {
    return nil, err
  }

  // 获取解析数据库服务器响应
  res, err := internal.Decode(bufio.NewReader(server))
  return res, err
}

大致逻辑就是,客户端通过代理把请求发给服务端,服务端响应也通过代理响应回客户端。

------------  request  -----------  request  ------------
|          | --------> |         | --------> |          |
|  client  |           |  proxy  |           | repl_set |
|          | <-------- |         | <-------- |          |
------------  response -----------  response ------------

呐~,当然还有非常多的细节,由于篇幅原因不得不省略...

pool 实现

由 proxy 的代码逻辑来看,我们取读或写库连接是通过读或写池的 Acquire 方法来取的:

// connection/pool.go
type pool struct {
  sync.RWMutex

  connCh   chan net.Conn
  newConn  func(string) (net.Conn, error)
  freeConn func(net.Conn) error
}

func (p *pool) Acquire(opts ...interface{}) (net.Conn, error) {
  host := ""
  if len(opts) > 0 {
    host, _ = (opts[0]).(string)
  }

  chLen := len(p.connCh)
  // 从 channel 中遍历剩余数量的 conn
  for i := 0; i < chLen; i++ {
    select {
    case conn, ok := <- ch:
      if ok {
        if len(host) > 0 {
          if conn.RemoteAddr().String() == host {
            return conn, nil
          }
          // 没有找到对应 host 的 conn,则把 conn 重新放回 channel
          // 你可以简单理解为只是执行了 p.connCh <- conn 操作
          p.freeConn(conn)
        } else {
          return conn, nil
        }
      }
    // 避免数量不住而导致 channel 阻塞等待
    default:
    }
  }

  // 若还没有从 channel 中取到 conn,则立马 new 一个
  conn, err := p.newConn(host)
  if err != nil {
    return nil, err
  }

  return conn, nil
}

池的实现大致就是实现了一个循环队列,连接从池中取,取出的连接在使用完后,可以放回池中。

总结

聪明的童鞋可能已经看出,我在定义各种 struct 的时候,基本没有添加什么状态量,因为在并发场景下,对状态量的把控不好会导致一些很严重的问题,读者可以自由发挥设计功底,使用 atomic 或 go 1.9 提供的 sync.Map 等无锁操作来解决这些问题。

结束语

一溜写下来,看过抓包篇的童鞋可能会说,mmp 你根本就没讲如何实现自动主备切换的逻辑。我表示确实是立了个大 flag (老脸一红...

但我要真的一字一句写下来,恐怕很多人看都不想看,文章篇幅就是要简短明了,才有看下去的勇气。当然你真想知道细节,可以私信我,我一定知而答(233。