golang应用平滑重启

3,684 阅读6分钟
平滑重启:

     在当前的软件系统中,在不关闭服务的情况下部署一个新的版本或者是修改一些配置信息已经成为了必备的要求。这里介绍不同的方法去平滑重启应用,同时用些实例去挖掘其中的细节信息。这里通过介绍Teleport来展开,Teleport是为Kubernetes权限控制而设计的,对于不熟悉的可以查看这个链接https://gravitational.com/teleport/。


SO_REUSERPORT vs Duplicating Sockets:

       为了Teleport更加高可用,我们最近花费了一些时间如何去平滑重启Teleport的TLS和SSH的监听者,我们的目标是在不生成一个新的实例的情况下去升级Teleport的包。

       两种通用的实现方法在这篇文章中有介绍,https://blog.cloudflare.com/the-sad-state-of-linux-socket-balancing,其方法大概就是这样:

        》你可以在使用socket时设置 SO_REUSERPORT ,这样就允许多个进程绑定同一个端口,采用这种方法时每个进程都有一个对应的接收处理队列。

       》你也可以复用socket,通过将其传递给子进程的方式来使用,这种方式就是多个进程共用一个接收队列。

      对于SO_REUSERPORT有一些负面的影响,一个是我们的工程师以前用过这种方式,这种多个接收队列的方式有时会导致tcp连接的中断。另外 Go不容易去设置SO_REUSERPORT这个参数。

     第二种方法由于大部分开发者都比较熟悉其简单的unix fork/exec模型 反而是比较吸引的。这种方式可以把所有的文件描述符都传递给子进程,不过在go中 os/exec包目前是不允许这样的,可能是因为安全问题,只能传递 stdin stdou和stderr给子进程。但是os包有比较底层的包可以传递所有的文件描述符给子进程,而这正是我们要做的。


信号控制进程切换:

 在讲正式的源码前,先说下这种方式工作的细节信息。

 开始一个新的Teleport进程时会创建一个socket listener,其会接收发送给目的端口的所有traffic。我们增加一个信号处理函数来处理 SIGUSR2,这个信号可以使Teleport复制一份lisenter socket然后传递的文件描述符和其环境变量的元数据信息生成一个新的进程。一旦一个新的进程开始,就使用前面传递过来的文件描述符合元素开始改造socket并开始处traffic。

   这里应该注意下 socket被复用后,两个socket是循环均衡的处理traffic,具体可以查看下面的图。这意味这Teleport进程每一段时间将接受新的连接。


                 Figure 1: Teleport可以复用自身,与其余复用的进程共享数据传输

    父进程(PID2))的关闭方式是一样的,只是顺序反过来。一旦一个Teleport进程接收了SIGOUT信号将会开始关闭进程,其流程:先停止接收新连接,然后等待所有连接退出。然后父进程将关闭它自己的listener socket并退出。现在内核只发送traffic给新的进程了。



                            Figure 2: 一旦第一个进程关闭了,所有的traffic将不再进行复用,

实例:

我们使用这种方法写了一个小应用。源代码在底部。首先我们来编译然后开始应用:

$ go build restart.go
$ ./restart &
[1] 95147
$ Created listener file descriptor for :8080.

$ curl http://localhost:8080/hello
Hello from 95147!

  发送USR2信号给原始进程,现在你点击发送http请求时,将会返回两个进程的pid号:

$ kill -SIGUSR2 95147
user defined signal 2 signal received.
Forked child 95170.
$ Imported listener file descriptor for :8080.

$ curl http://localhost:8080/hello
Hello from 95170!
$ curl http://localhost:8080/hello
Hello from 95147!

kil掉原始的进程,你将会发现其返回一个新的pid号:

$ kill -SIGTERM 95147
signal: killed
[1]+  Exit 1                  go run restart.go
$ curl http://localhost:8080/hello
Hello from 95170!
$ curl http://localhost:8080/hello
Hello from 95170!

最后kill调新的进行,整个进程就别干掉了。

$ kill -SIGTERM 95170
$ curl http://localhost:8080/hello
curl: (7) Failed to connect to localhost port 8080: Connection refused

   正如你看到的,一旦你了解其是如何工作的,用go写一个平滑重启的服务是很easy的,同时可以极大的提升你的服务的效率。

Golang Graceful Restart Source Example

package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"net"
	"net/http"
	"os"
	"os/signal"
	"path/filepath"
	"syscall"
	"time"
)

type listener struct {
	Addr     string `json:"addr"`
	FD       int    `json:"fd"`
	Filename string `json:"filename"`
}

func importListener(addr string) (net.Listener, error) {
	// Extract the encoded listener metadata from the environment.
	listenerEnv := os.Getenv("LISTENER")
	if listenerEnv == "" {
		return nil, fmt.Errorf("unable to find LISTENER environment variable")
	}

	// Unmarshal the listener metadata.
	var l listener
	err := json.Unmarshal([]byte(listenerEnv), &l)
	if err != nil {
		return nil, err
	}
	if l.Addr != addr {
		return nil, fmt.Errorf("unable to find listener for %v", addr)
	}

	// The file has already been passed to this process, extract the file
	// descriptor and name from the metadata to rebuild/find the *os.File for
	// the listener.
	listenerFile := os.NewFile(uintptr(l.FD), l.Filename)
	if listenerFile == nil {
		return nil, fmt.Errorf("unable to create listener file: %v", err)
	}
	defer listenerFile.Close()

	// Create a net.Listener from the *os.File.
	ln, err := net.FileListener(listenerFile)
	if err != nil {
		return nil, err
	}

	return ln, nil
}

func createListener(addr string) (net.Listener, error) {
	ln, err := net.Listen("tcp", addr)
	if err != nil {
		return nil, err
	}

	return ln, nil
}

func createOrImportListener(addr string) (net.Listener, error) {
	// Try and import a listener for addr. If it's found, use it.
	ln, err := importListener(addr)
	if err == nil {
		fmt.Printf("Imported listener file descriptor for %v.\n", addr)
		return ln, nil
	}

	// No listener was imported, that means this process has to create one.
	ln, err = createListener(addr)
	if err != nil {
		return nil, err
	}

	fmt.Printf("Created listener file descriptor for %v.\n", addr)
	return ln, nil
}

func getListenerFile(ln net.Listener) (*os.File, error) {
	switch t := ln.(type) {
	case *net.TCPListener:
		return t.File()
	case *net.UnixListener:
		return t.File()
	}
	return nil, fmt.Errorf("unsupported listener: %T", ln)
}

func forkChild(addr string, ln net.Listener) (*os.Process, error) {
	// Get the file descriptor for the listener and marshal the metadata to pass
	// to the child in the environment.
	lnFile, err := getListenerFile(ln)
	if err != nil {
		return nil, err
	}
	defer lnFile.Close()
	l := listener{
		Addr:     addr,
		FD:       3,
		Filename: lnFile.Name(),
	}
	listenerEnv, err := json.Marshal(l)
	if err != nil {
		return nil, err
	}

	// Pass stdin, stdout, and stderr along with the listener to the child.
	files := []*os.File{
		os.Stdin,
		os.Stdout,
		os.Stderr,
		lnFile,
	}

	// Get current environment and add in the listener to it.
	environment := append(os.Environ(), "LISTENER="+string(listenerEnv))

	// Get current process name and directory.
	execName, err := os.Executable()
	if err != nil {
		return nil, err
	}
	execDir := filepath.Dir(execName)

	// Spawn child process.
	p, err := os.StartProcess(execName, []string{execName}, &os.ProcAttr{
		Dir:   execDir,
		Env:   environment,
		Files: files,
		Sys:   &syscall.SysProcAttr{},
	})
	if err != nil {
		return nil, err
	}

	return p, nil
}

func waitForSignals(addr string, ln net.Listener, server *http.Server) error {
	signalCh := make(chan os.Signal, 1024)
	signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGUSR2, syscall.SIGINT, syscall.SIGQUIT)
	for {
		select {
		case s := <-signalCh:
			fmt.Printf("%v signal received.\n", s)
			switch s {
			case syscall.SIGHUP:
				// Fork a child process.
				p, err := forkChild(addr, ln)
				if err != nil {
					fmt.Printf("Unable to fork child: %v.\n", err)
					continue
				}
				fmt.Printf("Forked child %v.\n", p.Pid)

				// Create a context that will expire in 5 seconds and use this as a
				// timeout to Shutdown.
				ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
				defer cancel()

				// Return any errors during shutdown.
				return server.Shutdown(ctx)
			case syscall.SIGUSR2:
				// Fork a child process.
				p, err := forkChild(addr, ln)
				if err != nil {
					fmt.Printf("Unable to fork child: %v.\n", err)
					continue
				}

				// Print the PID of the forked process and keep waiting for more signals.
				fmt.Printf("Forked child %v.\n", p.Pid)
			case syscall.SIGINT, syscall.SIGQUIT:
				// Create a context that will expire in 5 seconds and use this as a
				// timeout to Shutdown.
				ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
				defer cancel()

				// Return any errors during shutdown.
				return server.Shutdown(ctx)
			}
		}
	}
}

func handler(w http.ResponseWriter, r *http.Request) {
	fmt.Fprintf(w, "Hello from %v!\n", os.Getpid())
}

func startServer(addr string, ln net.Listener) *http.Server {
	http.HandleFunc("/hello", handler)

	httpServer := &http.Server{
		Addr: addr,
	}
	go httpServer.Serve(ln)

	return httpServer
}

func main() {
	// Parse command line flags for the address to listen on.
	var addr string
	flag.StringVar(&addr, "addr", ":8080", "Address to listen on.")

	// Create (or import) a net.Listener and start a goroutine that runs
	// a HTTP server on that net.Listener.
	ln, err := createOrImportListener(addr)
	if err != nil {
		fmt.Printf("Unable to create or import a listener: %v.\n", err)
		os.Exit(1)
	}
	server := startServer(addr, ln)

	// Wait for signals to either fork or quit.
	err = waitForSignals(addr, ln, server)
	if err != nil {
		fmt.Printf("Exiting: %v\n", err)
		return
	}
	fmt.Printf("Exiting.\n")
}

注意:golang1.8及以上,因为server.shutdown优雅的关闭是1.8才加上的特性。

英文原文:https://gravitational.com/blog/golang-ssh-bastion-graceful-restarts/