gRPC负载均衡-Golang

11,669 阅读3分钟

一. 负载均衡三种解决方案

构建高可用、高性能的通信服务,通常采用服务注册与发现、负载均衡和容错处理等机制实现。根据负载均衡实现所在的位置不同,通常可分为以下三种解决方案:

  • 1、集中式LB(Proxy Model)
  • 2、进程内LB(Balancing-aware Client)
  • 3、独立 LB 进程(External Load Balancing Service)

出处在这里,写的很详细: 链接地址

二. gRPC的准备

gRPC 默认使用 protocol buffers,这是 Google 开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如 JSON)。其客户端提供Objective-C、Java接口,服务器侧则有Java、Golang、C++等接口,从而为移动端(iOS/Androi)到服务器端通讯提供了一种解决方案。链接地址

1.安装brew,这个自己百度谷歌.

2.终端 :

brew install autoconf automake libtool

3.安装golang protobuf

go get -u github.com/golang/protobuf/proto // golang protobuf 库
go get -u github.com/golang/protobuf/protoc-gen-go //protoc --go_out 工具

三. 简单的protobuf

../proto/hello.proto

syntax = "proto3";

package proto;

message SayReq {
    string content = 1;
}

message SayResp {
    string content = 1;
}

service Test{
    rpc Say(SayReq) returns (SayResp) {}
}

在proto下 , 输入终端命令 protoc --go_out=plugins=grpc:. hello.proto

生成 hello.pb.go 文件,通过protoc就能生成不同语言需要的.pb.go文件


这里是介绍四种protoc,如果不用到流,可以不看:

1:简单 RPC

2:服务器端流式 RPC

3:客户端流式 RPC

4:双向流式 RPC

4种方式与以下定义四种服务对应: test.proto文件: service Test{

rpc LZX1(SayReq) returns (SayResp) {}
rpc LZX2(SayReq) returns (stream SayResp) {}
rpc LZX3(stream SayReq) returns (SayResp) {}
rpc LZX4(stream SayReq) returns (stream SayResp) {}

}

生成的test.pb.go文件:

type TestClient interface {

LZX1(ctx context.Context, in *SayReq, opts ...grpc.CallOption) (*SayResp, error)
LZX2(ctx context.Context, in *SayReq, opts ...grpc.CallOption) (Test_LZX2Client, error)
LZX3(ctx context.Context, opts ...grpc.CallOption) (Test_LZX3Client, error)
LZX4(ctx context.Context, opts ...grpc.CallOption) (Test_LZX4Client, error)

}

所以无流的函数,也就是第一种简单的RPC,都只是一一对应.只要有流,返回的类型都是 服务结构名_函数名Client;只要客户端是流式,传参将不包含其他参数.


四.6种负载均衡算法

1、轮询法

2、随机法

3、源地址哈希法

4、加权轮询法

5、加权随机法

6、最小连接数法

算法的描述看这里:链接地址

五. gRPC的例子

例子出处链接:链接地址

架构:

以下用的是随机负载均衡:

客户端 etcd/client/random/main.go

package main

import (
	etcd "github.com/coreos/etcd/client"
	grpclb "github.com/liyue201/grpc-lb"
	"github.com/liyue201/grpc-lb/examples/proto"
	registry "github.com/liyue201/grpc-lb/registry/etcd"
	"golang.org/x/net/context"
	"google.golang.org/grpc"
	"log"
	"strconv"
	"time"
)

func main() {
	etcdConfg := etcd.Config{
		Endpoints: []string{"http://120.24.44.201:2379"},
	}
	r := registry.NewResolver("/grpc-lb", "test", etcdConfg) // 加载registry
	b := grpclb.NewBalancer(r, grpclb.NewRandomSelector())   //加载grpclbs
	c, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(b))
	if err != nil {
		log.Printf("grpc dial: %s", err)
		return
	}
	defer c.Close()

	client := proto.NewTestClient(c)
	var num int
	for i := 0; i < 1000; i++ {
		resp, err := client.Say(context.Background(), &proto.SayReq{Content: "random"})
		if err != nil {
			log.Println(err)
			time.Sleep(time.Second)
			continue
		}
		time.Sleep(time.Second)
		num++
		log.Printf(resp.Content + ",  clientOfnum: " + strconv.Itoa(num))
	}
}

服务端 etcd/server/main.go

package main

import (
	"flag"
	"fmt"
	etcd "github.com/coreos/etcd/client"
	"github.com/liyue201/grpc-lb/examples/proto"
	registry "github.com/liyue201/grpc-lb/registry/etcd"
	"golang.org/x/net/context"
	"google.golang.org/grpc"
	"log"
	"net"
	"sync"
	"time"
)

var nodeID = flag.String("node", "node1", "node ID")
var port = flag.Int("port", 8080, "listening port")

type RpcServer struct {
	addr string
	s    *grpc.Server
}

func NewRpcServer(addr string) *RpcServer {
	s := grpc.NewServer()
	rs := &RpcServer{
		addr: addr,
		s:    s,
	}
	return rs
}

func (s *RpcServer) Run() {
	listener, err := net.Listen("tcp", s.addr)
	if err != nil {
		log.Printf("failed to listen: %v", err)
		return
	}
	log.Printf("rpc listening on:%s", s.addr)

	proto.RegisterTestServer(s.s, s)
	s.s.Serve(listener)
}

func (s *RpcServer) Stop() {
	s.s.GracefulStop()
}

var num int

func (s *RpcServer) Say(ctx context.Context, req *proto.SayReq) (*proto.SayResp, error) {
	num++
	text := "Hello " + req.Content + ", I am " + *nodeID + ", serverOfnum: " + strconv.Itoa(num)
	log.Println(text)

	return &proto.SayResp{Content: text}, nil
}

func StartService() {
	etcdConfg := etcd.Config{
		Endpoints: []string{"http://120.24.44.201:2379"},
	}

	registry, err := registry.NewRegistry(
		registry.Option{
			EtcdConfig:  etcdConfg,
			RegistryDir: "/grpc-lb",
			ServiceName: "test",
			NodeID:      *nodeID,
			NData: registry.NodeData{
				Addr: fmt.Sprintf("127.0.0.1:%d", *port),
				//Metadata: map[string]string{"weight": "1"},
			},
			Ttl: 10 * time.Second,
		})
	if err != nil {
		log.Panic(err)
		return
	}
	server := NewRpcServer(fmt.Sprintf("0.0.0.0:%d", *port))
	wg := sync.WaitGroup{}

	wg.Add(1)
	go func() {
		server.Run()
		wg.Done()
	}()

	wg.Add(1)
	go func() {
		registry.Register()
		wg.Done()
	}()

	//stop the server after one minute
	//go func() {
	//	time.Sleep(time.Minute)
	//	server.Stop()
	//	registry.Deregister()
	//}()

	wg.Wait()
}

func main() {
	flag.Parse()
	StartService()
}

服务端的代码如下, 使用以下命令运行3个服务进程,再启动客户端。

go run main.go -node node1 -port 28544

go run main.go -node node2 -port 18562

go run main.go -node node3 -port 27772

六.最终效果

(客户端不停的随机访问三个服务端)

客户端:


服务端node1:


服务端node2:


服务端node3:


断开服务端node1,node3,client照样能跑,并只连接到node2服务端:

客户端:

服务器node2:


再次启动node1,node3服务端,client自动连接上,并继续随机访问3台服务端:

客户端:

服务端node1:

服务端node3:


最后,关闭所有服务端,客户端处于阻塞状态.