首页 > golang > etcd clientv3 源码解读 - grpc负载均衡原理
2023
03-03

etcd clientv3 源码解读 - grpc负载均衡原理

1.  几个会反复出现的术语的理解

resolver:解析器,可以粗暴理解为从一个字符串映射到一堆服务。比如经典的dns就是完成一个域名到一堆ip的映射解析,在etcd中,它自己定义了一个endpoint的解析器。有自己的格式,最终也是映射到服务地址。

clientconns: 连接池。可以粗暴理解为这个池子缓存着与每个resolver解析后的地址的连接,数据哪里来呢? 原始数据会根据resolver解析完存进来,然后后面介绍的balancer会小心的维护这个池子。

balancer:均衡器。它最操心。它要watch连接池里有没有要变化的,有的话就更新。另外,它还以某种机制(比如最简单的rr)从conns里面拿连接用,当有连接不可用时,自己默默的换一个可用的,并且更新conns的状态,不用上层操心(对上层透明)


2. 基本原理

    基本原理是gRPC的client端为它要访问的target地址维护了ClientConn。

    同时grpc提供注册机制,可以注册自己的balancer,resolver,picker。这些注册要在初始化的时候完成。

3. 注册自己的均衡器相关的重要的接口

type Balancer

type Balancer interface {
    //当grpc检测到sc的连接状态发生改变时,balancer的策略,一般来说,
    //balancer需要重新聚合下所有的subconn,更新自己的cache,方便picker使用,并且通告给grpc。
    HandleSubConnStateChange(sc SubConn, state connectivity.State)
    //这个接口由grpc调用,用来发送地址变更消息给balancers。balancer可以重建conns或者删除失效的conns。
    HandleResolvedAddrs([]resolver.Address, error)
    //关闭这个balancer。可以不关闭连接池中的数据。
    Close()
}

type Builder   // 用来创建balancer

type Builder interface {
    // Build creates a new balancer with the ClientConn.
    Build(cc ClientConn, opts BuildOptions) Balancer
    // Name returns the name of balancers built by this builder.
    // It will be used to pick balancers (for example in service config).
    Name() string
}

type Picker   //用来选clientconn

type Picker interface {
    // Pick returns the SubConn to be used to send the RPC.
    Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
}

type Resolver

type Resolver interface {
    // ResolveNow will be called by gRPC to try to resolve the target name
    // again. It's just a hint, resolver can ignore this if it's not necessary.
    //
    // It could be called multiple times concurrently.
    ResolveNow(ResolveNowOption)
    // Close closes the resolver.
    Close()
}

type Builder // 用来创建resolver

type Builder interface {
    // Build creates a new resolver for the given target.
    //
    // gRPC dial calls Build synchronously, and fails if the returned error is
    // not nil.
    Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
    // Scheme returns the scheme supported by this resolver.
    // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
    Scheme() string
}

4. 实现细节

grpc在执行DialContext的时候,会根据DialOption初始化负载均衡策略,返回一个*ClientConn

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error)

首先先了解一下ClientConn的结构

// ClientConn represents a client connection to an RPC server.
type ClientConn struct {
	ctx    context.Context      //上下文,由context.WithCancel(context.Background())创建
	cancel context.CancelFunc   // 与ctx一起生成的cancel函数
 
	target       string      //初始化连接的目标
	parsedTarget resolver.Target  //represents a target for gRPC
	authority    string
	dopts        dialOptions
	csMgr        *connectivityStateManager
 
	balancerBuildOpts balancer.BuildOptions  //important!!! struct中的Dialer是干什么的
	resolverWrapper   *ccResolverWrapper  //the resolver, 根据resolverBuilder生成,然后start,一直有个单独的goroutine在跑
	blockingpicker    *pickerWrapper
 
	mu    sync.RWMutex
	sc    ServiceConfig
	scRaw string
	conns map[*addrConn]struct{}  //important!! 连接池
	// Keepalive parameter can be updated if a GoAway is received.
	mkp             keepalive.ClientParameters
	curBalancerName string
	preBalancerName string // previous balancer name.
	curAddresses    []resolver.Address
	balancerWrapper *ccBalancerWrapper
	retryThrottler  atomic.Value
 
	channelzID          int64 // channelz unique identification number
	czmu                sync.RWMutex
	callsStarted        int64
	callsSucceeded      int64
	callsFailed         int64
	lastCallStartedTime time.Time
}

在dialcontext的过程中,依次做了几件事:

(1)set resolver builder 。set的逻辑是:如果已经被注册了,就用注册的builder,否则尝试根据给的单个target的scheme进行解析,实在不行就采取默认的配置,配置好cc.dopts.resolverBuilder以及parsedTarge的值。

(2)build the resolver。主要是调用newCCResolverWrapper生成:

	// Build the resolver.
	cc.resolverWrapper, err = newCCResolverWrapper(cc)
	if err != nil {
		return nil, fmt.Errorf("failed to build resolver: %v", err)
	}
	// Start the resolver wrapper goroutine after resolverWrapper is created.
	//
	// If the goroutine is started before resolverWrapper is ready, the
	// following may happen: The goroutine sends updates to cc. cc forwards
	// those to balancer. Balancer creates new addrConn. addrConn fails to
	// connect, and calls resolveNow(). resolveNow() tries to use the non-ready
	// resolverWrapper.
	cc.resolverWrapper.start()

其中newCCResolverWrapper 根据DialOption中指定的resolverBuilder。并调用builder生成相应的resolver

// newCCResolverWrapper parses cc.target for scheme and gets the resolver
// builder for this scheme and builds the resolver. The monitoring goroutine
// for it is not started yet and can be created by calling start().
//
// If withResolverBuilder dial option is set, the specified resolver will be
// used instead.
func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
	rb := cc.dopts.resolverBuilder
	if rb == nil {
		return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
	}
 
	ccr := &ccResolverWrapper{
		cc:     cc,
		addrCh: make(chan []resolver.Address, 1),
		scCh:   make(chan string, 1),
		done:   make(chan struct{}),
	}
 
	var err error
	ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
	if err != nil {
		return nil, err
	}
	return ccr, nil
}

(3)start the resolver。执行resolverWrapper.start(). 这个函数开启一个goroutine,watch是否有新的addr出现,如果有的话,调用clientConn 的 handleResolvedAddrs相关的逻辑。

func (ccr *ccResolverWrapper) start() {
	go ccr.watcher()
}
 
// watcher processes address updates and service config updates sequentially.
// Otherwise, we need to resolve possible races between address and service
// config (e.g. they specify different balancer types).
func (ccr *ccResolverWrapper) watcher() {
	for {
		select {
		case <-ccr.done:
			return
		default:
		}
 
		select {
		case addrs := <-ccr.addrCh:
			select {
			case <-ccr.done:
				return
			default:
			}
			grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
			ccr.cc.handleResolvedAddrs(addrs, nil)
		case sc := <-ccr.scCh:
			select {
			case <-ccr.done:
				return
			default:
			}
			grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
			ccr.cc.handleServiceConfig(sc)
		case <-ccr.done:
			return
		}
	}
}

(4)resolver获取到一堆addr,然后初始化一遍balancerbuilder,得到响应的balancer, 同时也会生成一个watcher, 使用相应的balancer对这堆地址进行处理。

func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
	cc.mu.Lock()
	defer cc.mu.Unlock()
	if cc.conns == nil {
		// cc was closed.
		return
	}
 
	if reflect.DeepEqual(cc.curAddresses, addrs) {
		return
	}
 
	cc.curAddresses = addrs
 
	if cc.dopts.balancerBuilder == nil {
		// Only look at balancer types and switch balancer if balancer dial
		// option is not set.
		var isGRPCLB bool
		for _, a := range addrs {
			if a.Type == resolver.GRPCLB {
				isGRPCLB = true
				break
			}
		}
		var newBalancerName string
		if isGRPCLB {
			newBalancerName = grpclbName
		} else {
			// Address list doesn't contain grpclb address. Try to pick a
			// non-grpclb balancer.
			newBalancerName = cc.curBalancerName
			// If current balancer is grpclb, switch to the previous one.
			if newBalancerName == grpclbName {
				newBalancerName = cc.preBalancerName
			}
			// The following could be true in two cases:
			// - the first time handling resolved addresses
			//   (curBalancerName="")
			// - the first time handling non-grpclb addresses
			//   (curBalancerName="grpclb", preBalancerName="")
			if newBalancerName == "" {
				newBalancerName = PickFirstBalancerName
			}
		}
		cc.switchBalancer(newBalancerName)
	} else if cc.balancerWrapper == nil {
		// Balancer dial option was set, and this is the first time handling
		// resolved addresses. Build a balancer with dopts.balancerBuilder.
		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
	}
 
	cc.balancerWrapper.handleResolvedAddrs(addrs, nil)


本文》有 0 条评论

留下一个回复