注册中心篇(三):Consul 服务发现的底层实现


我们先回顾下通过客户端请求服务端部署的服务:

// Create a new service. Optionally include some options here.
 service := micro.NewService(micro.Name("go.micro.cli.greeter"))
 service.Init()

 // Create new greeter client
 greeter := proto.NewGreeterService("go.micro.srv.greeter", service.Client())

 // Call the greeter
 rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "学院君"})
 if err != nil {
	 fmt.Println(err)
 }

 // Print response
 fmt.Println(rsp.Greeting)

需要指定调用的服务名称,端点信息(Greeter.Hello),已经请求参数(可选),如果区分版本的话还要指定服务的版本,我们在终端运行这个客户端调用,就会打印出服务端返回的数据:

在终端运行客户端

然后在运行 Consul 代理的终端窗口,可以看到如下这条日志,表示有请求过来查询可用的服务节点信息:

Consul 代理日志

下面我们分析下 Go Micro 中客户端如何从 Consul Server 中查询服务名称对应的节点信息,已经如何根据获取到的可用节点发起服务请求。关于这一块和我们前面介绍的服务发现实现原理类似,只不过这一次更加底层,更加彻底。

根据前面的介绍我们知道,客户端服务调用最终会走到 github.com/micro/go-micro/client/rpc_client.goCall 方法,在该方法中会通过 next() 方法获取部署微服务的某个节点,然后针对该节点发起请求调用:

node, err := next()
...

  // make the call
  err = rcall(ctx, node, request, response, callOpts)

我们先来看看服务节点的查询,进入 next 方法源码:

func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
	service := request.Service()

	...

	// get next nodes from the selector
	next, err := r.opts.Selector.Select(service, opts.SelectOptions...)
	if err != nil && err == selector.ErrNotFound {
		return nil, errors.NotFound("go.micro.client", "service %s: %v", service, err.Error())
	} else if err != nil {
		return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", service, err.Error())
	}

	return next, nil
}

request 对象实例数据如下:

request 对象实例数据

service 即我们要获取的服务名称 go.micro.srv.greeter,Go Micro 框架在 Registry 之上封装了 Selector 组件来对服务节点进行负载均衡,所以服务节点的信息通过 Selector 组件来获取,默认的 Selector 类是 registrySelector,对应源码位于 github.com/micro/go-micro/selector/default.go,我们来看其中的 Select 方法是如何实现服务节点查询的:

func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
	sopts := SelectOptions{
		Strategy: c.so.Strategy,
	}

	for _, opt := range opts {
		opt(&sopts)
	}

	// get the service
	// try the cache first
	// if that fails go directly to the registry
	services, err := c.rc.GetService(service)
	if err != nil {
		return nil, err
	}

	// apply the filters
	for _, filter := range sopts.Filters {
		services = filter(services)
	}

	// if there's nothing left, return
	if len(services) == 0 {
		return nil, ErrNoneAvailable
	}

	return sopts.Strategy(services), nil
} 

c.so 属性在 Selector 的构造函数中初始化:

func NewSelector(opts ...Option) Selector {
	sopts := Options{
		Strategy: Random,
	}

	for _, opt := range opts {
		opt(&sopts)
	}

	if sopts.Registry == nil {
		sopts.Registry = registry.DefaultRegistry
	}

	s := &registrySelector{
		so: sopts,
	}
	s.rc = s.newCache()

	return s
}

默认的负载均衡策略是随机算法,即从所有可用服务节点中随机选择一个,默认的注册中心属性即 Go Micro 框架默认的注册中心,此外还提供了缓存层对注册中心查询结果进行缓存(基于 Registry 组件中的 Cache 类,源码位于 github.com/micro/go-micro/registry/cache 目录)。

回到 Select 方法,设置了节点选择策略后,我们通过 c.rc.getService(service) 从缓存层获取 指定服务名称对应的所有节点,该方法源码定义在 github.com/micro/go-micro/registry/cache/rcache.go 中:

func (c *cache) GetService(service string) ([]*registry.Service, error) {
	// get the service
	services, err := c.get(service)
	
	...

	// return services
	return services, nil
}

核心逻辑定义在同一个类中的 get 方法:

func (c *cache) get(service string) ([]*registry.Service, error) {
	// read lock
	c.RLock()

	// check the cache first
	services := c.cache[service]
	// get cache ttl
	ttl := c.ttls[service]

	// got services && within ttl so return cache
	if c.isValid(services, ttl) {
		// make a copy
		cp := c.cp(services)
		// unlock the read
		c.RUnlock()
		// return servics
		return cp, nil
	}

	// get does the actual request for a service and cache it
	get := func(service string) ([]*registry.Service, error) {
		// ask the registry
		services, err := c.Registry.GetService(service)
		if err != nil {
			return nil, err
		}

		// cache results
		c.Lock()
		c.set(service, c.cp(services))
		c.Unlock()

		return services, nil
	}

	// watch service if not watched
	if _, ok := c.watched[service]; !ok {
		go c.run(service)
	}

	// unlock the read lock
	c.RUnlock()

	// get and return services
	return get(service)
}

在该方法中,先判断缓存中是否有记录且有效,如果有效的话直接返回,否则通过 c.Registry.GetService(service) 从注册中心中查询对应的服务节点,c.Registry 在构造 Cache 类的时候已经被初始化为和 Selector 组件一致的默认注册中心实例,也就是 consul,然后我们调用 consulRegistryGetService 方法获取对应服务节点,源码定义在 github.com/micro/go-micro/registry/consul/consul.go 文件中:

func (c *consulRegistry) GetService(name string) ([]*registry.Service, error) {
	var rsp []*consul.ServiceEntry
	var err error

	// if we're connect enabled only get connect services
	if c.connect {
		rsp, _, err = c.Client.Health().Connect(name, "", false, c.queryOptions)
	} else {
		rsp, _, err = c.Client.Health().Service(name, "", false, c.queryOptions)
	}
	if err != nil {
		return nil, err
	}

	serviceMap := map[string]*registry.Service{}

	for _, s := range rsp {
		if s.Service.Service != name {
			continue
		}

		// version is now a tag
		version, _ := decodeVersion(s.Service.Tags)
		// service ID is now the node id
		id := s.Service.ID
		// key is always the version
		key := version

		// address is service address
		address := s.Service.Address

		// use node address
		if len(address) == 0 {
			address = s.Node.Address
		}

		svc, ok := serviceMap[key]
		if !ok {
			svc = &registry.Service{
				Endpoints: decodeEndpoints(s.Service.Tags),
				Name:      s.Service.Service,
				Version:   version,
			}
			serviceMap[key] = svc
		}

		var del bool

		for _, check := range s.Checks {
			// delete the node if the status is critical
			if check.Status == "critical" {
				del = true
				break
			}
		}

		// if delete then skip the node
		if del {
			continue
		}

		svc.Nodes = append(svc.Nodes, &registry.Node{
			Id:       id,
			Address:  address,
			Port:     s.Service.Port,
			Metadata: decodeMetadata(s.Service.Tags),
		})
	}

	var services []*registry.Service
	for _, service := range serviceMap {
		services = append(services, service)
	}
	return services, nil
}

在这个方法中,无论 c.connect 是否为 true,代码都会执行到 Health 类的 Service 方法,然后调用 service 方法执行核心逻辑,源码位于 github.com/hashicorp/consul/api/health.go

func (h *Health) service(service string, tags []string, passingOnly bool, q *QueryOptions, connect bool) ([]*ServiceEntry, *QueryMeta, error) {
	path := "/v1/health/service/" + service
	if connect {
		path = "/v1/health/connect/" + service
	}
	r := h.c.newRequest("GET", path)
	r.setQueryOptions(q)
	if len(tags) > 0 {
		for _, tag := range tags {
			r.params.Add("tag", tag)
		}
	}
	if passingOnly {
		r.params.Set(HealthPassing, "1")
	}
	rtt, resp, err := requireOK(h.c.doRequest(r))
	if err != nil {
		return nil, nil, err
	}
	defer resp.Body.Close()

	qm := &QueryMeta{}
	parseQueryMeta(resp, qm)
	qm.RequestTime = rtt

	var out []*ServiceEntry
	if err := decodeBody(resp, &out); err != nil {
		return nil, nil, err
	}
	return out, qm, nil
}

在这里,我们正式组装出针对 Consul 提供的 HTTP API 调用 URL 和参数并将其设置到请求对象 r 上,最后通过 h.c.doRequest(r) 发起请求查询指定服务名称下所有有效的服务节点,h.c 实例对应的类是定义在 github.com/hashicorp/consul/api/api.go 中的 Client 类,对应的请求组装、发起方法源码都在这个类中,对应的请求日志正是我们开头看到的 Consul 终端输出的那个 GET 请求。

如果调用过程中没有错误,则会对响应实体进行解码后返回给 Go Micro 中的 consul 组件,回到 GetService 方法,对响应结果进行遍历,然后将有效的服务节点信息添加到 serviceMap 字典并将其返回给上一层的缓存方法缓存起来。

缓存方法中还有一段监听该服务的代码:

go c.run(service)

这段代码通过协程异步启动,最终调用的是 consul.goWatch 方法,其用途是监听到指定服务名称下有新的节点加入将其添加到缓存,如果有节点删除或出现故障,则将起从缓存中删除,这样一来,客户端和注册中心、微服务节点完全解耦,同时还能保证服务的可用性。

接下来,我们回到 registrySelector 类的 Select 方法,获取到从缓存层缓存的节点信息后,如果设置了过滤器的话还会运行过滤器对服务节点进行过滤(比如 IP 黑名单之类的),然后运行指定的负载均衡策略算法从服务节点中选取一个进行通信。

在 Go Micro 框架中,最终针对实际部署微服务的节点发起请求并获取响应结果的逻辑位于 github.com/micro/go-micro/client/rpc_client.gocall 方法:

func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error {
	...

	go func() {
		...

		// send request
		if err := stream.Send(req.Body()); err != nil {
			ch <- err
			return
		}

		// recv request
		if err := stream.Recv(resp); err != nil {
			ch <- err
			return
		}

		// success
		ch <- nil
	}()

	...
}

stream.Send(req.Body()) 方法对应源码位于 github.com/micro/go-micro/client/rpc_stream.go 文件中,req 实例和上面截图数据一致:

func (r *rpcStream) Send(msg interface{}) error {
	r.Lock()
	defer r.Unlock()

	if r.isClosed() {
		r.err = errShutdown
		return errShutdown
	}

	req := codec.Message{
		Id:       r.id,
		Target:   r.request.Service(),
		Method:   r.request.Method(),
		Endpoint: r.request.Endpoint(),
		Type:     codec.Request,
	}

	if err := r.codec.Write(&req, msg); err != nil {
		r.err = err
		return err
	}

	return nil
}

r.codec 在上述 rpc_client.go 文件的 call 方法中有设置:

codec := newRpcCodec(msg, c, cf)

对应的 Write() 方法定义在 github.com/micro/go-micro/client/rpc_codec.go 中:

func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
	
	...
	
	setHeaders(m)
	
	if body != nil {
		b, ok := body.(*raw.Frame)
		if ok {
			// set body
			m.Body = b.Data
			body = nil
		}
	}

	if len(m.Body) == 0 {
		// write to codec
		if err := c.codec.Write(m, body); err != nil {
			return errors.InternalServerError("go.micro.client.codec", err.Error())
		}
		// set body
		m.Body = c.buf.wbuf.Bytes()
	}

	// create new transport message
	msg := transport.Message{
		Header: m.Header,
		Body:   m.Body,
	}
	// send the request
	if err := c.client.Send(&msg); err != nil {
		return errors.InternalServerError("go.micro.client.transport", err.Error())
	 }
	return nil
}

在这个方法中,先通过 setHeaders(m) 设置请求头(包含服务名、端点、版本),c.codec 对应的是默认 app lication/protobuf 类型对应的 Codec 组件 proro,相应源码位于 github.com/micro/go-micro/codec/proto/proto.go,通过编码器提供的方法对请求实体(req.Body() 返回数据)进行编码,然后经过 transport.Message 转化后再通过 c.client.Send() 方法发送,c.client 是从连接池中取出的 poolConn 指针,对应数据结构定义在 github.com/micro/go-micro/client/rpc_pool.go 中:

type poolConn struct {
	transport.Client
	created int64
}

Send 方法即定义在这个类中,Go Micro 框架默认的 transport 传输协议是 HTTP,所以对应的 Client 是 httpTransportClient,源码位于 github.com/micro/go-micro/transport/http_transport.go,最终对应的 Send() 方法定义如下:

func (h *httpTransportClient) Send(m *Message) error {
	header := make(http.Header)

	for k, v := range m.Header {
		header.Set(k, v)
	}

	reqB := bytes.NewBuffer(m.Body)
	defer reqB.Reset()
	buf := &buffer{
		reqB,
	}

	req := &http.Request{
		Method: "POST",
		URL: &url.URL{
			Scheme: "http",
			Host:   h.addr,
		},
		Header:        header,
		Body:          buf,
		ContentLength: int64(reqB.Len()),
		Host:          h.addr,
	}

	h.Lock()
	h.bl = append(h.bl, req)
	select {
	case h.r <- h.bl[0]:
		h.bl = h.bl[1:]
	default:
	}
	h.Unlock()

	// set timeout if its greater than 0
	if h.ht.opts.Timeout > time.Duration(0) {
		h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
	}

	return req.Write(h.conn)
}

在这里会构建 HTTP 请求,并发送这个请求到部署微服务的指定节点(之前从通过 Selector 组件从注册中心获取到的服务节点)。

响应的接收和请求是分离的,不过分析思路类似。


点赞 取消点赞 收藏 取消收藏

<< 上一篇: 注册中心篇(二):Consul 服务注册与删除的底层实现

>> 下一篇: 注册中心篇(四):通过 Consul 集群实现服务注册与发现