grpc重構過程

重構grpc已變後續拓展


原本的程式碼

單一Server

func InitGrpcServer(port int) {
	tools.Log("start grpc server ...")
	localIP := tools.GetLocalIP()
	lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", localIP, port))
	tools.HandelError("listen errors ", err)
	s := grpc.NewServer()
	echo := grpcserver.NewEchoServer()
	protobuf.RegisterEchoServer(s, echo)
 
	var heart int64 = 3
	etcd := etcdservice.RegisterETCD([]string{"127.0.0.1:2379"}, heart)
	leaseId := etcd.Register("echo_service", fmt.Sprintf("%s:%d", localIP, port), 0)
 
	go func() {
		for {
			etcd.Register("echo_service", fmt.Sprintf("%s:%d", localIP, port), leaseId)
			time.Sleep(time.Duration(heart)*time.Second - 100*time.Millisecond)
		}
	}()
 
	go func() {
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		sig := <-c
		tools.Log(fmt.Sprintf("receive a signal %s", sig.String()))
		etcd.UnRegister("echo_service", fmt.Sprintf("%s:%d", localIP, port))
		os.Exit(0)
	}()
 
	err = s.Serve(lis)
	tools.HandelError("start grpc server error", err,
		func(args ...interface{}) {
			etcd.UnRegister("echo_service", fmt.Sprintf("%s:%d", localIP, port))
		})
}

如何支援多server?

  • 微服務世界中會把不同的服務當成不同server,這邊已開在不同port模擬
  • 現階段問題
    1. 重複的etcd註冊
    2. 每次新增一個server要寫一堆重複程式

重構Etcd部分

  • 將使用到etcd分離
type EtcdRegister struct {
	client  *etcdservice.EtcdService
	leaseId clientv3.LeaseID
	heart   int64
}
 
func NewEtcdRegister(endpoints []string, heart int64) *EtcdRegister {
	return &EtcdRegister{
		client: etcdservice.RegisterETCD(endpoints, heart),
		heart:  heart,
	}
}
 
func (e *EtcdRegister) Register(serviceName, addr string) {
	e.leaseId = e.client.Register(serviceName, addr, 0)
	tools.Log(fmt.Sprintf("Registered service %s at %s", serviceName, addr))
	go func() {
		for {
			e.client.Register(serviceName, addr, e.leaseId)
			time.Sleep(time.Duration(e.heart)*time.Second - 100*time.Millisecond)
		}
	}()
}
 
func (e *EtcdRegister) UnRegister(serviceName, addr string) {
	e.client.UnRegister(serviceName, addr)
	tools.Log(fmt.Sprintf("unregiter service: %s from etcd, addr: %s", serviceName, addr))
}
 
func (e *EtcdRegister) ListenExit(serviceName, addr string) {
	go func() {
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		sig := <-c
		tools.Log(fmt.Sprintf("receive a signal %s", sig.String()))
		e.client.UnRegister(serviceName, addr)
		os.Exit(0)
	}()
}

現在依然有個問題

  • 註冊grpc server的時候要寫很多重複代碼且拓展性不加

抽離server服務當作基底

  • 後續只要從基底進行拓展即可
  • baseserver.go
type GrpcServer interface {
	InitServer(port int)
}
 
type BaseServer struct {
	ServiceName  string
	Registerfunc func(*grpc.Server)
	interceptors []grpc.UnaryServerInterceptor
}
 
func (b *BaseServer) RegisterInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
	b.interceptors = append(b.interceptors, interceptors...)
}
 
func (b *BaseServer) InitServer(port int) {
	tools.Log(fmt.Sprintf("Starting gRPC server [%s] on port %d ...", b.ServiceName, port))
	addr := tools.FormatAddr(port)
	lis, err := net.Listen("tcp", addr)
	tools.HandelError("error in listen addr", err)
 
	var opts []grpc.ServerOption
	if len(b.interceptors) > 0 {
		opts = append(opts, grpc.ChainUnaryInterceptor(b.interceptors...))
	}
	s := grpc.NewServer(opts...)
  b.Registerfunc(s)
 
	etcd := etcdregister.NewEtcdRegister([]string{"127.0.0.1:2379"}, 3)
	etcd.Register(b.ServiceName, addr)
	etcd.ListenExit(b.ServiceName, addr)
 
	err = s.Serve(lis)
	tools.HandelError("start grpc server error", err,
		func(args ...interface{}) {
			etcd.UnRegister(b.ServiceName, addr)
		})
}

此時重構echo server只需要向struct填充即可

type EchoServer struct {
	BaseServer
}
 
func RegisterEchoServer() *EchoServer {
	echogrpc := &EchoServer{
		BaseServer{
			ServiceName: "echo_service",
			Registerfunc: func(server *grpc.Server) {
				echo := grpcserver.NewEchoServer()
				protobuf.RegisterEchoServer(server, echo)
				tools.Log("register echo server success")
			},
		},
	}
	echogrpc.RegisterInterceptors(interceptors.LoggingInterceptor)
	return echogrpc
}

啟動Server

echo := server.RegisterEchoServer()
echo.InitServer(*port)
  • 假設有多服務
    • 由於s.Serve會阻塞,因此要開goruntine
servers := []server.GrpcServer{
		server.RegisterEchoServer(),
		server.RegisterLoginServer(),
		server.RegisterTokenTestServer(),
	}
	ports := []int{8081, 8082, 8083}
	for i, gserver := range servers {
		go gserver.InitServer(ports[i])
	}