1. 几个会反复出现的术语的理解
resolver:解析器,可以粗暴理解为从一个字符串映射到一堆服务。比如经典的dns就是完成一个域名到一堆ip的映射解析,在etcd中,它自己定义了一个endpoint的解析器。有自己的格式,最终也是映射到服务地址。
clientconns: 连接池。可以粗暴理解为这个池子缓存着与每个resolver解析后的地址的连接,数据哪里来呢? 原始数据会根据resolver解析完存进来,然后后面介绍的balancer会小心的维护这个池子。
balancer:均衡器。它最操心。它要watch连接池里有没有要变化的,有的话就更新。另外,它还以某种机制(比如最简单的rr)从conns里面拿连接用,当有连接不可用时,自己默默的换一个可用的,并且更新conns的状态,不用上层操心(对上层透明)
2. 基本原理
基本原理是gRPC的client端为它要访问的target地址维护了ClientConn。
同时grpc提供注册机制,可以注册自己的balancer,resolver,picker。这些注册要在初始化的时候完成。
3. 注册自己的均衡器相关的重要的接口
type Balancer
type Balancer interface { //当grpc检测到sc的连接状态发生改变时,balancer的策略,一般来说, //balancer需要重新聚合下所有的subconn,更新自己的cache,方便picker使用,并且通告给grpc。 HandleSubConnStateChange(sc SubConn, state connectivity.State) //这个接口由grpc调用,用来发送地址变更消息给balancers。balancer可以重建conns或者删除失效的conns。 HandleResolvedAddrs([]resolver.Address, error) //关闭这个balancer。可以不关闭连接池中的数据。 Close() }
type Builder // 用来创建balancer
type Builder interface { // Build creates a new balancer with the ClientConn. Build(cc ClientConn, opts BuildOptions) Balancer // Name returns the name of balancers built by this builder. // It will be used to pick balancers (for example in service config). Name() string }
type Picker //用来选clientconn
type Picker interface { // Pick returns the SubConn to be used to send the RPC. Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error) }
type Resolver
type Resolver interface { // ResolveNow will be called by gRPC to try to resolve the target name // again. It's just a hint, resolver can ignore this if it's not necessary. // // It could be called multiple times concurrently. ResolveNow(ResolveNowOption) // Close closes the resolver. Close() }
type Builder // 用来创建resolver
type Builder interface { // Build creates a new resolver for the given target. // // gRPC dial calls Build synchronously, and fails if the returned error is // not nil. Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error) // Scheme returns the scheme supported by this resolver. // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. Scheme() string }
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error)
// ClientConn represents a client connection to an RPC server. type ClientConn struct { ctx context.Context //上下文,由context.WithCancel(context.Background())创建 cancel context.CancelFunc // 与ctx一起生成的cancel函数 target string //初始化连接的目标 parsedTarget resolver.Target //represents a target for gRPC authority string dopts dialOptions csMgr *connectivityStateManager balancerBuildOpts balancer.BuildOptions //important!!! struct中的Dialer是干什么的 resolverWrapper *ccResolverWrapper //the resolver, 根据resolverBuilder生成,然后start,一直有个单独的goroutine在跑 blockingpicker *pickerWrapper mu sync.RWMutex sc ServiceConfig scRaw string conns map[*addrConn]struct{} //important!! 连接池 // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters curBalancerName string preBalancerName string // previous balancer name. curAddresses []resolver.Address balancerWrapper *ccBalancerWrapper retryThrottler atomic.Value channelzID int64 // channelz unique identification number czmu sync.RWMutex callsStarted int64 callsSucceeded int64 callsFailed int64 lastCallStartedTime time.Time }
在dialcontext的过程中,依次做了几件事:
(1)set resolver builder 。set的逻辑是:如果已经被注册了,就用注册的builder,否则尝试根据给的单个target的scheme进行解析,实在不行就采取默认的配置,配置好cc.dopts.resolverBuilder以及parsedTarge的值。
(2)build the resolver。主要是调用newCCResolverWrapper生成:
// Build the resolver. cc.resolverWrapper, err = newCCResolverWrapper(cc) if err != nil { return nil, fmt.Errorf("failed to build resolver: %v", err) } // Start the resolver wrapper goroutine after resolverWrapper is created. // // If the goroutine is started before resolverWrapper is ready, the // following may happen: The goroutine sends updates to cc. cc forwards // those to balancer. Balancer creates new addrConn. addrConn fails to // connect, and calls resolveNow(). resolveNow() tries to use the non-ready // resolverWrapper. cc.resolverWrapper.start()
// newCCResolverWrapper parses cc.target for scheme and gets the resolver // builder for this scheme and builds the resolver. The monitoring goroutine // for it is not started yet and can be created by calling start(). // // If withResolverBuilder dial option is set, the specified resolver will be // used instead. func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { rb := cc.dopts.resolverBuilder if rb == nil { return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme) } ccr := &ccResolverWrapper{ cc: cc, addrCh: make(chan []resolver.Address, 1), scCh: make(chan string, 1), done: make(chan struct{}), } var err error ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig}) if err != nil { return nil, err } return ccr, nil }
func (ccr *ccResolverWrapper) start() { go ccr.watcher() } // watcher processes address updates and service config updates sequentially. // Otherwise, we need to resolve possible races between address and service // config (e.g. they specify different balancer types). func (ccr *ccResolverWrapper) watcher() { for { select { case <-ccr.done: return default: } select { case addrs := <-ccr.addrCh: select { case <-ccr.done: return default: } grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs) ccr.cc.handleResolvedAddrs(addrs, nil) case sc := <-ccr.scCh: select { case <-ccr.done: return default: } grpclog.Infof("ccResolverWrapper: got new service config: %v", sc) ccr.cc.handleServiceConfig(sc) case <-ccr.done: return } } }
func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { cc.mu.Lock() defer cc.mu.Unlock() if cc.conns == nil { // cc was closed. return } if reflect.DeepEqual(cc.curAddresses, addrs) { return } cc.curAddresses = addrs if cc.dopts.balancerBuilder == nil { // Only look at balancer types and switch balancer if balancer dial // option is not set. var isGRPCLB bool for _, a := range addrs { if a.Type == resolver.GRPCLB { isGRPCLB = true break } } var newBalancerName string if isGRPCLB { newBalancerName = grpclbName } else { // Address list doesn't contain grpclb address. Try to pick a // non-grpclb balancer. newBalancerName = cc.curBalancerName // If current balancer is grpclb, switch to the previous one. if newBalancerName == grpclbName { newBalancerName = cc.preBalancerName } // The following could be true in two cases: // - the first time handling resolved addresses // (curBalancerName="") // - the first time handling non-grpclb addresses // (curBalancerName="grpclb", preBalancerName="") if newBalancerName == "" { newBalancerName = PickFirstBalancerName } } cc.switchBalancer(newBalancerName) } else if cc.balancerWrapper == nil { // Balancer dial option was set, and this is the first time handling // resolved addresses. Build a balancer with dopts.balancerBuilder. cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) } cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
《本文》有 0 条评论