基于 Consul 的 Go Micro 客户端服务发现是如何实现的

服务发现

上篇分享我们介绍了基于 Consul 作为注册中心的 Go Micro 服务注册底层实现原理,今天我们来看看 Go Micro 中客户端服务发现是如何实现的。

客户端服务发现要复杂一些,涉及到服务发现 Registry 和节点选择 Selector 两部分。

所谓服务发现指的是当我们从客户端向指定服务发起请求时,可以通过名字识别服务,然后通过服务发现获取到包含 IP 地址和端口号的对应远程服务实例,远程服务会在启动时向注册中心注册,退出时注销,客户端无需关心这些细节,由 Go Micro 的 Registry 组件统一处理服务注册与发现逻辑(这里,我们基于 Consul 作为 Registry 的具体实现插件)。

而节点选择指的是,远程服务实例通常部署在多个节点上,通过指定的服务名称可以获取到一个地址列表,节点选择要做的事情是通过某种策略从列表中获取指定的 IP 进行访问,这就是 Go Micro 中 Selector 组件发挥作用的地方,它基于 Registry 组件实现,提供了负载均衡策略,比如轮询或随机,以及过滤、缓存和黑名单的功能。

下面我们还是通过分析客户端调用底层源码来看下 Go Micro 框架中服务发现与节点选择的具体实现。首先打开 ~/go/hello/src/hello/client.go 文件,在 main 函数中,通过一系列的初始化操作后,真正发起服务调用的代码是 greeter.Hello 函数调用:

func main() {
	// Create a new service. Optionally include some options here.
	service := micro.NewService(micro.Name("go.micro.cli.greeter"))
	service.Init()

	// Create new greeter client
	greeter := proto.NewGreeterService("go.micro.srv.greeter", service.Client())

	// Call the greeter
	rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "学院君"})
	if err != nil {
		fmt.Println(err)
	}

	// Print response
	fmt.Println(rsp.Greeting)
}

greeter.Hello 函数定义在 ~/go/hello/src/hello/proto/hello.micro.go 中:

func (c *greeterService) Hello(ctx context.Context, in *HelloRequest, opts ...client.CallOption) (*HelloResponse, error) {
	req := c.c.NewRequest(c.name, "Greeter.Hello", in)
	out := new(HelloResponse)
	err := c.c.Call(ctx, req, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

在这个函数中,通过 NewRequest 初始化了请求实例,包含服务名称及请求端点、参数信息,然后初始化了响应实例,接下来是调用 ClientCall 函数,所有请求调用处理的核心逻辑(服务发现、节点选择、请求处理、超时、重试、编码)都在这个函数里,最终源码对应 ~/go/hello/src/github.com/micro/go-micro/client/rpc_client.goCall 函数,这个函数代码量较大,我们选取一些关键片段进行解读。

首先会通过 rpcClient 类的 next 方法获取远程服务节点,在未设置系统环境变量 MICRO_PROXY_ADDRESS 的情况下会执行 Selector 的 Select 方法获取服务节点,默认的 Selector 初始化操作位于 ~/go/hello/src/github.com/micro/go-micro/client/options.gonewOptions 方法(该方法会在 client.go 的初始化操作中调用):

if opts.Selector == nil {
    opts.Selector = selector.NewSelector(
        selector.Registry(opts.Registry),
    )
}

这里我们可以看到 Selector 依赖于 Registry 组件,如果没有额外设置的话,基于系统默认的 Registry 实现(这里是 Consul),NewSelector 方法源码如下:

func NewSelector(opts ...Option) Selector {
    sopts := Options{
        Strategy: Random,
    }
    
    for _, opt := range opts {
        opt(&sopts)
    }
    
    if sopts.Registry == nil {
        sopts.Registry = registry.DefaultRegistry
    }
    
    s := &registrySelector{
        so: sopts,
    }
    
    s.rc = s.newCache()
    
    return s
}

Selector 默认的负载均衡策略使用的是随机算法(关于 Selector 支持的所有负载均衡算法后面我们还会单独介绍),并且会在本地对节点选择结果进行缓存。

回到 Selector 的 Select 函数,该函数源码定义在 ~/go/hello/src/github.com/micro/go-micro/selector/default.go 中:

func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
	sopts := SelectOptions{
		Strategy: c.so.Strategy,
	}

	for _, opt := range opts {
		opt(&sopts)
	}

	// get the service
	// try the cache first
	// if that fails go directly to the registry
	services, err := c.rc.GetService(service)
	if err != nil {
		return nil, err
	}

	// apply the filters
	for _, filter := range sopts.Filters {
		services = filter(services)
	}

	// if there's nothing left, return
	if len(services) == 0 {
		return nil, ErrNoneAvailable
	}

	return sopts.Strategy(services), nil
}

通过 c.rc.GetService(service) 传入指定服务名称,再通过默认 Registry 实现 Consul 获取对应的服务实例列表并缓存(如果已缓存则直接返回提高性能,在通过 Registry 获取服务节点列表时还会单独跑一个协程去监听服务注册,如果有新节点注册进来,则加到缓存中,如果有节点故障则删除缓存中的节点信息,具体源码位于 ~/go/hello/src/github.com/micro/go-micro/registry/cache/rcache.go),应用过滤器后最后通过默认负载均衡实现(这里是 Random 算法)返回指定节点,获取到远程服务实例节点后,就可以发起远程服务请求了,回到 rpc_client.goCall 函数,对应的远程调用代码逻辑实现片段如下:

call := func(i int) error {
  ...
  
  // select next node
	node, err := next()
	if err != nil && err == selector.ErrNotFound {
		return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
	} else if err != nil {
		return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
	}

	// make the call
	err = rcall(ctx, node, request, response, callOpts)
	r.opts.Selector.Mark(request.Service(), node, err)
	return err
   
   ...
  }

上述调用 rpcClientnext 函数返回的并不是真正的节点而是一个匿名函数,到 node, err := next() 这里才真正调用对应的函数返回远程服务节点信息,如果返回节点成功则调用 rcall 函数(即 rpcClientcall 函数)发起远程网络请求(通过协程实现),如果请求处理出错则返回相应错误信息,然后对服务调用成功与否通过 Selector 的 Mark 函数进行标记(以便后续对服务进行监控和治理),最后在 Call 函数中,也是通过协程发起对上述 call 匿名函数的调用:

...

for i := 0; i <= callOpts.Retries; i++ {
	go func(i int) {
		ch <- call(i)  // 调用上面定义的 call 匿名函数
	}(i)

	select {
	case <-ctx.Done():
		return errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
	case err := <-ch:
		// if the call succeeded lets bail early
		if err == nil {
			return nil
		}

		retry, rerr := callOpts.Retry(ctx, request, i, err)
		if rerr != nil {
			return rerr
		}

		if !retry {
			return err
		}

		gerr = err
	}
}

...

如果服务调用失败,则进行重试或报错处理。

以上就是在 Go Micro 体系内客户端请求服务发现与节点选择的底层实现,如果是以 HTTP 方式从外部通过 Micro API 网关形式对远程服务发起请求,则 API 网关会将 HTTP 请求解析并转化为默认的服务形式,比如 /greeter/say/hello 请求会被转化为服务名为 go.micro.api.greeter,方法名为 Say.Hello 的请求,然后调用 go.micro.srv.greeter 远程服务,后续处理逻辑与上面完全一致。请求处理完成后,返回处理结果给 API 网关,API 网关将其转化为 HTTP 响应返回给调用客户端。

上一篇: Go Micro 底层是如何将服务注册到 Consul 的

下一篇: 微服务中的 API 网关模式概述