首页 > golang > grpc与etcd实现服务注册和服务发现亲测版
2023
03-04

grpc与etcd实现服务注册和服务发现亲测版

一、前言

grpc中没有像go-micro那样集成可插拔式的etcd库使用,如何使得grpc能够使用服务注册发现及命名解析的功能,因此本文基于etcd实现了Name Resolver。

二、所需的grpc版本

grpc相关库:

google.golang.org/grpc v1.45.0
google.golang.org/grpc/resolver

etcd相关库:

go.etcd.io/etcd/client/v3 v3.5.7


所需的protoc protoc-gen-go  protoc-gen-go-grpc 参考: https://117.119.65.11:10064/?id=3209


三、服务端 server.go

package main

import (
   "flag"
   "fmt"
   proto "gogin/proto/etcd_grpc/hello"
   "net"
   "os"
   "os/signal"
   "strings"
   "syscall"
   "time"

   "go.etcd.io/etcd/client/v3"
   "golang.org/x/net/context"
   "google.golang.org/grpc"
)

var host = "127.0.0.1" //服务器主机
var (
   Port        = flag.Int("Port", 3000, "listening port")                           //服务器监听端口
   ServiceName = flag.String("ServiceName", "greet_service", "service name")        //服务名称
   EtcdAddr    = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
)
var cli *clientv3.Client

//rpc服务接口
type greetServer struct {
   //这行很重要,否则会报错如下 添加proto.UnimplementedGreeterServer 为所有服务(生成的代码的一部分)提供了默认实现
   //在结构中包含UnimplementedGreeterServer 意味着如果您将新服务添加到proto 文件,它将无错误地编译,并在客户端尝试调用新服务时返回未实现的错误。
   //Cannot use '&greetServer{}' (type *greetServer) as the type GreeterServer
   //Type does not implement 'GreeterServer' as some methods are missing: mustEmbedUnimplementedGreeterServer()
   //更多参考:https://github.com/grpc/grpc-go/issues/2318 https://github.com/grpc/grpc-go/issues/3669
   proto.UnimplementedGreeterServer
}

func (gs *greetServer) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) {
   fmt.Printf("Hello 调用: %s\n", req.Name)
   return &proto.HelloReply{
      Message: "Hello, " + req.Name,
   }, nil
}

func (gs *greetServer) SayHelloAgain(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) {
   fmt.Printf("Hello 调用: %s\n", req.Name)
   return &proto.HelloReply{
      Message: "Hello, " + req.Name,
   }, nil
}

//将服务地址注册到etcd中
func register(etcdAddr, serviceName, serverAddr string, ttl int64) error {
   var err error

   if cli == nil {
      //构建etcd client
      cli, err = clientv3.New(clientv3.Config{
         Endpoints:   strings.Split(etcdAddr, ";"),
         DialTimeout: 15 * time.Second,
      })
      if err != nil {
         fmt.Printf("连接etcd失败:%s\n", err)
         return err
      }
   }

   //与etcd建立长连接,并保证连接不断(心跳检测)
   ticker := time.NewTicker(time.Second * time.Duration(ttl))
   go func() {
      key := getKey(serviceName, serverAddr)
      for {
         resp, err := cli.Get(context.Background(), key)
         //fmt.Printf("resp:%+v\n", resp)
         if err != nil {
            fmt.Printf("获取服务地址失败:%s", err)
         } else if resp.Count == 0 { //尚未注册
            err = keepAlive(serviceName, serverAddr, ttl)
            if err != nil {
               fmt.Printf("保持连接失败:%s", err)
            }
         }
         <-ticker.C
      }
   }()

   return nil
}

//组装etcd key
func getKey(serviceName, serverAddr string) string {
   return fmt.Sprintf("/%s/%s/%s", "etcd", serviceName, serverAddr)
}

//保持服务器与etcd的长连接
func keepAlive(serviceName, serverAddr string, ttl int64) error {
   //创建租约
   leaseResp, err := cli.Grant(context.Background(), ttl)
   if err != nil {
      fmt.Printf("创建租期失败:%s\n", err)
      return err
   }

   //将服务地址注册到etcd中
   key := getKey(serviceName, serverAddr)
   _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID))

   if err != nil {
      fmt.Printf("注册服务失败:%s", err)
      return err
   }
   fmt.Printf("etcd服务注册成功,key:%s,value:%s", key, serverAddr)
   //建立长连接
   ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
   if err != nil {
      fmt.Printf("建立长连接失败:%s\n", err)
      return err
   }

   //清空keepAlive返回的channel
   go func() {
      for {
         <-ch
      }
   }()
   return nil
}

//取消注册
func unRegister(serviceName, serverAddr string) {
   if cli != nil {
      key := getKey(serviceName, serverAddr)
      cli.Delete(context.Background(), key)
   }
}

func main() {
   flag.Parse()

   //监听网络
   listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *Port))
   if err != nil {
      fmt.Println("监听网络失败:", err)
      return
   }
   defer listener.Close()

   //创建grpc句柄
   srv := grpc.NewServer()
   defer srv.GracefulStop()

   //将greetServer结构体注册到grpc服务中
   proto.RegisterGreeterServer(srv, &greetServer{})

   //将服务地址注册到etcd中
   serverAddr := fmt.Sprintf("%s:%d", host, *Port)
   fmt.Printf("greeting server address: %s\n", serverAddr)
   register(*EtcdAddr, *ServiceName, serverAddr, 5)

   //关闭信号处理
   ch := make(chan os.Signal, 1)
   signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
   go func() {
      s := <-ch
      unRegister(*ServiceName, serverAddr)
      if i, ok := s.(syscall.Signal); ok {
         os.Exit(int(i))
      } else {
         os.Exit(0)
      }
   }()

   //监听服务
   err = srv.Serve(listener)
   if err != nil {
      fmt.Println("监听异常:", err)
      return
   } else {
      fmt.Println("grpc监听开始...")
   }
}

四、客户端 client.go

package main

import (
   "flag"
   "fmt"
   proto "gogin/proto/etcd_grpc/hello"
   "google.golang.org/grpc/credentials/insecure"
   "log"
   "strings"
   "time"

   "go.etcd.io/etcd/client/v3"
   "golang.org/x/net/context"
   "google.golang.org/grpc"
   "google.golang.org/grpc/resolver"
)

var (
   ServiceName = flag.String("ServiceName", "greet_service", "service name")        //服务名称
   EtcdAddr    = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
)

var cli *clientv3.Client

//etcd解析器
type etcdResolver struct {
   etcdAddr   string
   clientConn resolver.ClientConn
}

//初始化一个etcd解析器
func newResolver(etcdAddr string) resolver.Builder {
   return &etcdResolver{etcdAddr: etcdAddr}
}

func (r *etcdResolver) Scheme() string {
   return "etcd"
}

//watch有变化以后会调用
func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
   log.Println("ResolveNow")
   fmt.Println(rn)
}

//解析器关闭时调用
func (r *etcdResolver) Close() {
   log.Println("Close")
}

//构建解析器 grpc.Dial()同步调用
func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
   var err error
   fmt.Println("call build...")
   //构建etcd client
   if cli == nil {
      cli, err = clientv3.New(clientv3.Config{
         Endpoints:   strings.Split(r.etcdAddr, ";"),
         DialTimeout: 15 * time.Second,
      })
      if err != nil {
         fmt.Printf("连接etcd失败:%s\n", err)
         return nil, err
      }
   }

   r.clientConn = clientConn

   //go r.watch("/" + target.URL.Scheme + "/" + target.Endpoint + "/") //不建议使用,改用下面的方式
   go r.watch("/" + target.URL.Scheme + target.URL.Path + "/")

   return r, nil
}

//监听etcd中某个key前缀的服务地址列表的变化
func (r *etcdResolver) watch(keyPrefix string) {
   //初始化服务地址列表
   var addrList []resolver.Address

   fmt.Println("keyPrefix", keyPrefix)
   resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
   if err != nil {
      fmt.Println("获取服务地址列表失败:", err)
   } else {
      for i := range resp.Kvs {
         addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})
      }
   }
   //r.clientConn.NewAddress(addrList)   //不建议使用,改用下面的方式
   state := resolver.State{Addresses: addrList}
   r.clientConn.UpdateState(state)

   //监听服务地址列表的变化
   rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
   for n := range rch {
      for _, ev := range n.Events {
         addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
         switch ev.Type {
         case 0: //mvccpb.PUT
            if !exists(addrList, addr) {
               addrList = append(addrList, resolver.Address{Addr: addr})
               //r.clientConn.NewAddress(addrList)//不建议使用,改用下面的方式
               state := resolver.State{Addresses: addrList}
               r.clientConn.UpdateState(state)
            }
            fmt.Println("有新的服务注册:", addr)
         case 1: //mvccpb.DELETE
            if s, ok := remove(addrList, addr); ok {
               addrList = s
               //r.clientConn.NewAddress(addrList) //不建议使用,改用下面的方式
               state := resolver.State{Addresses: addrList}
               r.clientConn.UpdateState(state)
            }
            fmt.Println("服务注销:", addr)
         }
      }
   }
}

func exists(l []resolver.Address, addr string) bool {
   for i := range l {
      if l[i].Addr == addr {
         return true
      }
   }
   return false
}

func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
   for i := range s {
      if s[i].Addr == addr {
         s[i] = s[len(s)-1]
         return s[:len(s)-1], true
      }
   }
   return nil, false
}

/**
 * 客户端首先需要实现接口resolver.Resolver,其中方法Build()用于创建一个etcd解析器,
 * grpc.Dial()会同步调用该方法,解析器需要根据key前缀监听etcd中服务地址列表的变化并更新本地列表。
 * 然后注册解析器,创建gRPC句柄,使用轮询负载均衡请求服务。
 **/
func main() {
   flag.Parse()

   //注册etcd解析器
   r := newResolver(*EtcdAddr)
   resolver.Register(r)
   //客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
   conn, err := grpc.Dial(
      r.Scheme()+"://author/"+*ServiceName,
      grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
      //grpc.WithInsecure(), //不建议使用,改用下面的方式
      grpc.WithTransportCredentials(insecure.NewCredentials()),
   )
   if err != nil {
      fmt.Println("连接服务器失败:", err)
   }
   defer conn.Close()

   fmt.Println("GetState", conn.GetState())

   //获得grpc句柄
   c := proto.NewGreeterClient(conn)
   ticker := time.NewTicker(2 * time.Second)
   i := 1
   for range ticker.C {

      resp1, err := c.SayHello(
         context.Background(),
         &proto.HelloRequest{Name: fmt.Sprintf("张三%d", i)},
      )
      if err != nil {
         fmt.Println("Hello调用失败:", err)
         return
      }
      fmt.Printf("Hello 响应:%s\n", resp1.Message)

      i++
   }
}

五、proto文件 hello.proto

syntax = "proto3";  // 版本信息,不指定会报错

option java_package = "io.grpc.examples";

package hello; //生成go文件的包名
option go_package ="./hello"; // 表示在当前文件夹下的pb包中

//protoc --go_out=. --go_opt=paths=source_relative  --go-grpc_out=. --go-grpc_opt=paths=source_relative hello/hello.proto
// The greeter service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
  rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

六、 整体目录结构

sample

    hello

        hello.proto

    server

        server.go

    client

        client.go


七、运行测试

//窗口一
cd server
go run server.go -Port 3000
//输出
greeting server address: 127.0.0.1:3000
etcd服务注册成功,key:/etcd/greet_service/127.0.0.1:3000,value:127.0.0.1:3000

Hello 调用: 张三1

//窗口二
cd server
go run server.go -Port 3001
//输出
greeting server address: 127.0.0.1:3001
etcd服务注册成功,key:/etcd/greet_service/127.0.0.1:3001,value:127.0.0.1:3001Hello 调用: 张三1
Hello 调用: 张三2

//客户端测试
cd client
go run client.go
//输出
call build...
GetState IDLE
keyPrefix /etcd/greet_service/
Hello 响应:Hello, 张三1
Hello 响应:Hello, 张三2
...


本文》有 0 条评论

留下一个回复