一、前言
grpc中没有像go-micro那样集成可插拔式的etcd库使用,如何使得grpc能够使用服务注册发现及命名解析的功能,因此本文基于etcd实现了Name Resolver。
二、所需的grpc版本
grpc相关库:
google.golang.org/grpc v1.45.0 google.golang.org/grpc/resolver
etcd相关库:
go.etcd.io/etcd/client/v3 v3.5.7
所需的protoc protoc-gen-go protoc-gen-go-grpc 参考: https://117.119.65.11:10064/?id=3209
三、服务端 server.go
package main import ( "flag" "fmt" proto "gogin/proto/etcd_grpc/hello" "net" "os" "os/signal" "strings" "syscall" "time" "go.etcd.io/etcd/client/v3" "golang.org/x/net/context" "google.golang.org/grpc" ) var host = "127.0.0.1" //服务器主机 var ( Port = flag.Int("Port", 3000, "listening port") //服务器监听端口 ServiceName = flag.String("ServiceName", "greet_service", "service name") //服务名称 EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址 ) var cli *clientv3.Client //rpc服务接口 type greetServer struct { //这行很重要,否则会报错如下 添加proto.UnimplementedGreeterServer 为所有服务(生成的代码的一部分)提供了默认实现 //在结构中包含UnimplementedGreeterServer 意味着如果您将新服务添加到proto 文件,它将无错误地编译,并在客户端尝试调用新服务时返回未实现的错误。 //Cannot use '&greetServer{}' (type *greetServer) as the type GreeterServer //Type does not implement 'GreeterServer' as some methods are missing: mustEmbedUnimplementedGreeterServer() //更多参考:https://github.com/grpc/grpc-go/issues/2318 https://github.com/grpc/grpc-go/issues/3669 proto.UnimplementedGreeterServer } func (gs *greetServer) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) { fmt.Printf("Hello 调用: %s\n", req.Name) return &proto.HelloReply{ Message: "Hello, " + req.Name, }, nil } func (gs *greetServer) SayHelloAgain(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) { fmt.Printf("Hello 调用: %s\n", req.Name) return &proto.HelloReply{ Message: "Hello, " + req.Name, }, nil } //将服务地址注册到etcd中 func register(etcdAddr, serviceName, serverAddr string, ttl int64) error { var err error if cli == nil { //构建etcd client cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(etcdAddr, ";"), DialTimeout: 15 * time.Second, }) if err != nil { fmt.Printf("连接etcd失败:%s\n", err) return err } } //与etcd建立长连接,并保证连接不断(心跳检测) ticker := time.NewTicker(time.Second * time.Duration(ttl)) go func() { key := getKey(serviceName, serverAddr) for { resp, err := cli.Get(context.Background(), key) //fmt.Printf("resp:%+v\n", resp) if err != nil { fmt.Printf("获取服务地址失败:%s", err) } else if resp.Count == 0 { //尚未注册 err = keepAlive(serviceName, serverAddr, ttl) if err != nil { fmt.Printf("保持连接失败:%s", err) } } <-ticker.C } }() return nil } //组装etcd key func getKey(serviceName, serverAddr string) string { return fmt.Sprintf("/%s/%s/%s", "etcd", serviceName, serverAddr) } //保持服务器与etcd的长连接 func keepAlive(serviceName, serverAddr string, ttl int64) error { //创建租约 leaseResp, err := cli.Grant(context.Background(), ttl) if err != nil { fmt.Printf("创建租期失败:%s\n", err) return err } //将服务地址注册到etcd中 key := getKey(serviceName, serverAddr) _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID)) if err != nil { fmt.Printf("注册服务失败:%s", err) return err } fmt.Printf("etcd服务注册成功,key:%s,value:%s", key, serverAddr) //建立长连接 ch, err := cli.KeepAlive(context.Background(), leaseResp.ID) if err != nil { fmt.Printf("建立长连接失败:%s\n", err) return err } //清空keepAlive返回的channel go func() { for { <-ch } }() return nil } //取消注册 func unRegister(serviceName, serverAddr string) { if cli != nil { key := getKey(serviceName, serverAddr) cli.Delete(context.Background(), key) } } func main() { flag.Parse() //监听网络 listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *Port)) if err != nil { fmt.Println("监听网络失败:", err) return } defer listener.Close() //创建grpc句柄 srv := grpc.NewServer() defer srv.GracefulStop() //将greetServer结构体注册到grpc服务中 proto.RegisterGreeterServer(srv, &greetServer{}) //将服务地址注册到etcd中 serverAddr := fmt.Sprintf("%s:%d", host, *Port) fmt.Printf("greeting server address: %s\n", serverAddr) register(*EtcdAddr, *ServiceName, serverAddr, 5) //关闭信号处理 ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT) go func() { s := <-ch unRegister(*ServiceName, serverAddr) if i, ok := s.(syscall.Signal); ok { os.Exit(int(i)) } else { os.Exit(0) } }() //监听服务 err = srv.Serve(listener) if err != nil { fmt.Println("监听异常:", err) return } else { fmt.Println("grpc监听开始...") } }
四、客户端 client.go
package main import ( "flag" "fmt" proto "gogin/proto/etcd_grpc/hello" "google.golang.org/grpc/credentials/insecure" "log" "strings" "time" "go.etcd.io/etcd/client/v3" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/resolver" ) var ( ServiceName = flag.String("ServiceName", "greet_service", "service name") //服务名称 EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址 ) var cli *clientv3.Client //etcd解析器 type etcdResolver struct { etcdAddr string clientConn resolver.ClientConn } //初始化一个etcd解析器 func newResolver(etcdAddr string) resolver.Builder { return &etcdResolver{etcdAddr: etcdAddr} } func (r *etcdResolver) Scheme() string { return "etcd" } //watch有变化以后会调用 func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) { log.Println("ResolveNow") fmt.Println(rn) } //解析器关闭时调用 func (r *etcdResolver) Close() { log.Println("Close") } //构建解析器 grpc.Dial()同步调用 func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { var err error fmt.Println("call build...") //构建etcd client if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(r.etcdAddr, ";"), DialTimeout: 15 * time.Second, }) if err != nil { fmt.Printf("连接etcd失败:%s\n", err) return nil, err } } r.clientConn = clientConn //go r.watch("/" + target.URL.Scheme + "/" + target.Endpoint + "/") //不建议使用,改用下面的方式 go r.watch("/" + target.URL.Scheme + target.URL.Path + "/") return r, nil } //监听etcd中某个key前缀的服务地址列表的变化 func (r *etcdResolver) watch(keyPrefix string) { //初始化服务地址列表 var addrList []resolver.Address fmt.Println("keyPrefix", keyPrefix) resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix()) if err != nil { fmt.Println("获取服务地址列表失败:", err) } else { for i := range resp.Kvs { addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)}) } } //r.clientConn.NewAddress(addrList) //不建议使用,改用下面的方式 state := resolver.State{Addresses: addrList} r.clientConn.UpdateState(state) //监听服务地址列表的变化 rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) for n := range rch { for _, ev := range n.Events { addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix) switch ev.Type { case 0: //mvccpb.PUT if !exists(addrList, addr) { addrList = append(addrList, resolver.Address{Addr: addr}) //r.clientConn.NewAddress(addrList)//不建议使用,改用下面的方式 state := resolver.State{Addresses: addrList} r.clientConn.UpdateState(state) } fmt.Println("有新的服务注册:", addr) case 1: //mvccpb.DELETE if s, ok := remove(addrList, addr); ok { addrList = s //r.clientConn.NewAddress(addrList) //不建议使用,改用下面的方式 state := resolver.State{Addresses: addrList} r.clientConn.UpdateState(state) } fmt.Println("服务注销:", addr) } } } } func exists(l []resolver.Address, addr string) bool { for i := range l { if l[i].Addr == addr { return true } } return false } func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false } /** * 客户端首先需要实现接口resolver.Resolver,其中方法Build()用于创建一个etcd解析器, * grpc.Dial()会同步调用该方法,解析器需要根据key前缀监听etcd中服务地址列表的变化并更新本地列表。 * 然后注册解析器,创建gRPC句柄,使用轮询负载均衡请求服务。 **/ func main() { flag.Parse() //注册etcd解析器 r := newResolver(*EtcdAddr) resolver.Register(r) //客户端连接服务器(负载均衡:轮询) 会同步调用r.Build() conn, err := grpc.Dial( r.Scheme()+"://author/"+*ServiceName, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`), //grpc.WithInsecure(), //不建议使用,改用下面的方式 grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { fmt.Println("连接服务器失败:", err) } defer conn.Close() fmt.Println("GetState", conn.GetState()) //获得grpc句柄 c := proto.NewGreeterClient(conn) ticker := time.NewTicker(2 * time.Second) i := 1 for range ticker.C { resp1, err := c.SayHello( context.Background(), &proto.HelloRequest{Name: fmt.Sprintf("张三%d", i)}, ) if err != nil { fmt.Println("Hello调用失败:", err) return } fmt.Printf("Hello 响应:%s\n", resp1.Message) i++ } }
五、proto文件 hello.proto
syntax = "proto3"; // 版本信息,不指定会报错 option java_package = "io.grpc.examples"; package hello; //生成go文件的包名 option go_package ="./hello"; // 表示在当前文件夹下的pb包中 //protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative hello/hello.proto // The greeter service definition. service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} rpc SayHelloAgain (HelloRequest) returns (HelloReply) {} } // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; }
六、 整体目录结构
sample
hello
hello.proto
server
server.go
client
client.go
七、运行测试
//窗口一 cd server go run server.go -Port 3000 //输出 greeting server address: 127.0.0.1:3000 etcd服务注册成功,key:/etcd/greet_service/127.0.0.1:3000,value:127.0.0.1:3000 Hello 调用: 张三1 //窗口二 cd server go run server.go -Port 3001 //输出 greeting server address: 127.0.0.1:3001 etcd服务注册成功,key:/etcd/greet_service/127.0.0.1:3001,value:127.0.0.1:3001Hello 调用: 张三1 Hello 调用: 张三2 //客户端测试 cd client go run client.go //输出 call build... GetState IDLE keyPrefix /etcd/greet_service/ Hello 响应:Hello, 张三1 Hello 响应:Hello, 张三2 ...
- 本文固定链接: https://117.119.65.11:10064/?id=1480
- 转载请注明: admin 于 PHP面试网 发表
《本文》有 0 条评论