grpc重構過程
重構grpc已變後續拓展
原本的程式碼
單一Server
func InitGrpcServer(port int) {
tools.Log("start grpc server ...")
localIP := tools.GetLocalIP()
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", localIP, port))
tools.HandelError("listen errors ", err)
s := grpc.NewServer()
echo := grpcserver.NewEchoServer()
protobuf.RegisterEchoServer(s, echo)
var heart int64 = 3
etcd := etcdservice.RegisterETCD([]string{"127.0.0.1:2379"}, heart)
leaseId := etcd.Register("echo_service", fmt.Sprintf("%s:%d", localIP, port), 0)
go func() {
for {
etcd.Register("echo_service", fmt.Sprintf("%s:%d", localIP, port), leaseId)
time.Sleep(time.Duration(heart)*time.Second - 100*time.Millisecond)
}
}()
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
sig := <-c
tools.Log(fmt.Sprintf("receive a signal %s", sig.String()))
etcd.UnRegister("echo_service", fmt.Sprintf("%s:%d", localIP, port))
os.Exit(0)
}()
err = s.Serve(lis)
tools.HandelError("start grpc server error", err,
func(args ...interface{}) {
etcd.UnRegister("echo_service", fmt.Sprintf("%s:%d", localIP, port))
})
}
如何支援多server?
- 微服務世界中會把不同的服務當成不同server,這邊已開在不同port模擬
- 現階段問題
- 重複的etcd註冊
- 每次新增一個server要寫一堆重複程式
重構Etcd部分
- 將使用到etcd分離
type EtcdRegister struct {
client *etcdservice.EtcdService
leaseId clientv3.LeaseID
heart int64
}
func NewEtcdRegister(endpoints []string, heart int64) *EtcdRegister {
return &EtcdRegister{
client: etcdservice.RegisterETCD(endpoints, heart),
heart: heart,
}
}
func (e *EtcdRegister) Register(serviceName, addr string) {
e.leaseId = e.client.Register(serviceName, addr, 0)
tools.Log(fmt.Sprintf("Registered service %s at %s", serviceName, addr))
go func() {
for {
e.client.Register(serviceName, addr, e.leaseId)
time.Sleep(time.Duration(e.heart)*time.Second - 100*time.Millisecond)
}
}()
}
func (e *EtcdRegister) UnRegister(serviceName, addr string) {
e.client.UnRegister(serviceName, addr)
tools.Log(fmt.Sprintf("unregiter service: %s from etcd, addr: %s", serviceName, addr))
}
func (e *EtcdRegister) ListenExit(serviceName, addr string) {
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
sig := <-c
tools.Log(fmt.Sprintf("receive a signal %s", sig.String()))
e.client.UnRegister(serviceName, addr)
os.Exit(0)
}()
}
現在依然有個問題
- 註冊grpc server的時候要寫很多重複代碼且拓展性不加
抽離server服務當作基底
- 後續只要從基底進行拓展即可
- baseserver.go
type GrpcServer interface {
InitServer(port int)
}
type BaseServer struct {
ServiceName string
Registerfunc func(*grpc.Server)
interceptors []grpc.UnaryServerInterceptor
}
func (b *BaseServer) RegisterInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
b.interceptors = append(b.interceptors, interceptors...)
}
func (b *BaseServer) InitServer(port int) {
tools.Log(fmt.Sprintf("Starting gRPC server [%s] on port %d ...", b.ServiceName, port))
addr := tools.FormatAddr(port)
lis, err := net.Listen("tcp", addr)
tools.HandelError("error in listen addr", err)
var opts []grpc.ServerOption
if len(b.interceptors) > 0 {
opts = append(opts, grpc.ChainUnaryInterceptor(b.interceptors...))
}
s := grpc.NewServer(opts...)
b.Registerfunc(s)
etcd := etcdregister.NewEtcdRegister([]string{"127.0.0.1:2379"}, 3)
etcd.Register(b.ServiceName, addr)
etcd.ListenExit(b.ServiceName, addr)
err = s.Serve(lis)
tools.HandelError("start grpc server error", err,
func(args ...interface{}) {
etcd.UnRegister(b.ServiceName, addr)
})
}
此時重構echo server只需要向struct填充即可
type EchoServer struct {
BaseServer
}
func RegisterEchoServer() *EchoServer {
echogrpc := &EchoServer{
BaseServer{
ServiceName: "echo_service",
Registerfunc: func(server *grpc.Server) {
echo := grpcserver.NewEchoServer()
protobuf.RegisterEchoServer(server, echo)
tools.Log("register echo server success")
},
},
}
echogrpc.RegisterInterceptors(interceptors.LoggingInterceptor)
return echogrpc
}
啟動Server
echo := server.RegisterEchoServer()
echo.InitServer(*port)
- 假設有多服務
- 由於s.Serve會阻塞,因此要開goruntine
servers := []server.GrpcServer{
server.RegisterEchoServer(),
server.RegisterLoginServer(),
server.RegisterTokenTestServer(),
}
ports := []int{8081, 8082, 8083}
for i, gserver := range servers {
go gserver.InitServer(ports[i])
}