注册中心篇(二):Consul 服务注册与删除的底层实现


上篇分享我们简单介绍了 Consul 的基本原理,以及在开发模式下的启动过程,今天我们还是以开发模式为例,介绍在 Consul 底层,服务注册和发现是如何实现的。

服务注册

假设你已经在本地环境通过 consul agent -dev 启动了 Consul 代理,然后到之前示例的微服务项目 hello 中启动微服务完成服务注册:

Consul 服务注册流程

然后我们在启动 Consul 代理的终端窗口也可以看到服务注册日志:

Consul 服务注册日志

关于 Go Micro 框架中启动服务时注册服务到 Consul 的底层源码我们在服务注册底层实现中已经详细介绍过,这里我们重点看下 Consul 这边做了什么。

服务可以通过服务定义文件或者是 HTTP API 的方式进行注册,基于 Go Micro 框架的 Registry 组件 consul 注册服务显然是通过 HTTP API 的方式,在日志中也可以看到这个 PUT 请求,对应的服务注册接口是 /v1/agent/service/register,这个逻辑可以在 github.com/hashicorp/consul/api/agent.goServiceRegister 方法中看到:

func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error {
    r := a.c.newRequest("PUT", "/v1/agent/service/register")
    r.obj = service
    _, resp, err := requireOK(a.c.doRequest(r))
    if err != nil {
        return err
    }
    resp.Body.Close()
    return nil
}

服务信息数据会从 github.com/micro/go-micro/registry/consul/consul.goRegister 方法中传过来,对应的数据结构是个 AgentServiceRegistration 类:

// AgentServiceRegistration is used to register a new service
type AgentServiceRegistration struct {
    	Kind              ServiceKind       `json:",omitempty"`
    	ID                string            `json:",omitempty"`
    	Name              string            `json:",omitempty"`
    	Tags              []string          `json:",omitempty"`
    	Port              int               `json:",omitempty"`
    	Address           string            `json:",omitempty"`
    	EnableTagOverride bool              `json:",omitempty"`
    	Meta              map[string]string `json:",omitempty"`
    	Weights           *AgentWeights     `json:",omitempty"`
    	Check             *AgentServiceCheck
    	Checks            AgentServiceChecks
    	// DEPRECATED (ProxyDestination) - remove this field
    	ProxyDestination string                          `json:",omitempty"`
    	Proxy            *AgentServiceConnectProxyConfig `json:",omitempty"`
    	Connect          *AgentServiceConnect            `json:",omitempty"`
}

其中 ID 是服务节点 ID,即 go.micro.srv.greeter-eac4032b-5b26-45fe-a1bd-0e71266d042eName 即服务节点名称 go.micro.srv.greeterAddress 是服务节点的 IP 地址,Port 则是服务节点上运行微服务的进程端口,Tags 则是对服务节点的元数据 Metadata 以及所有端点(Endpoint)、版本信息进行编码后的切片:

// encode the tags
tags := encodeMetadata(node.Metadata)
tags = append(tags, encodeEndpoints(s.Endpoints)...)
tags = append(tags, encodeVersion(s.Version)...)

服务节点的 Metadata 数据结构如下:

服务节点的 Metadata 数据结构

AgentServiceRegistration 中的 Check 包含服务健康检查信息(包含 IP、端口、间隔时间)。所有这些信息组装处理后在 consul 组件的服务注册方法(Register)中调用代理的 ServiceRegister 发送过来:

// register the service
asr := &consul.AgentServiceRegistration{
    ID:      node.Id,
    Name:    s.Name,
    Tags:    tags,
    Port:    node.Port,
    Address: node.Address,
    Check:   check,
 }

 // Specify consul connect
 if c.connect {
	  asr.Connect = &consul.AgentServiceConnect{
	      Native: true,
	  }
 }

 if err := c.Client.Agent().ServiceRegister(asr); err != nil {
     return err
 }

服务数据的结构符合 Consul 官方文档约定的格式才可以通过 HTTP API 进行注册:https://www.consul.io/docs/agent/services.html

服务注册成功后如果是集群部署的话还会把这个服务从领导者同步到其它 Consul Server Follower 节点。

健康检查

服务注册时会带上健康检查信息,Go Micro 实现的 consul 组件支持通过 TCP 或 TTL 两种方式来实现健康检查,在 Register 方法中可以看到对应的实现:

var regTCPCheck bool
var regInterval time.Duration

var options registry.RegisterOptions
for _, o := range opts {
    o(&options)
}

// 如果 consul 组件上下文不为空且包含 consul_tcp_check 配置,则设置通过 TCP 进行健康检查
if c.opts.Context != nil {
    if tcpCheckInterval, ok := c.opts.Context.Value("consul_tcp_check").(time.Duration); ok {
        	regTCPCheck = true
        	regInterval = tcpCheckInterval
    }
}

...

// 如果服务已经注册并匹配到则对其进行健康检查,如果通过则不再重新注册服务节点信息
if ok && v == h {
    // 如果 options.TTL = 0
    if options.TTL == time.Duration(0) {
        // 确保服务节点没有被 Consul 删除
        if time.Since(lastChecked) <= getDeregisterTTL(regInterval) {
            return nil
        }
        // 检查服务节点是否健康,如果健康并且与传入 node.id 相匹配则表示不需要进行注册
        services, _, err := c.Client.Health().Checks(s.Name, c.queryOptions)
        if err == nil {
        	for _, v := range services {
        		if v.ServiceID == node.Id {
        			return nil
        		}
        	}
        }
    } else {
        // 如果 options.TTL > 0,则调用 Consul 代理健康检查 API 查看服务是否健康,
        // 调用成功则不再注册服务节点,不成功则不能确保服务状态,故而重新注册
        if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
        	return nil
        }
    }
}

...

var check *consul.AgentServiceCheck

if regTCPCheck {
    // 如果是通过 TCP 进行健康检查,则通过 regInterval 设置 TTL
    deregTTL := getDeregisterTTL(regInterval)
    // 然后初始化健康检查接口数据
    check = &consul.AgentServiceCheck{
        TCP:                            fmt.Sprintf("%s:%d", node.Address, node.Port),
        Interval:                       fmt.Sprintf("%v", regInterval),
        DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
    }
} else if options.TTL > time.Duration(0) {
    // 如果 options.TTL 大于0,则通过 TTL 进行健康检查,通过该值设置 TTL
    deregTTL := getDeregisterTTL(options.TTL)
    // 然后初始化健康检查接口数据
    check = &consul.AgentServiceCheck{
        TTL:                            fmt.Sprintf("%v", options.TTL),
        DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
    }
}

...

// 如果 TTL = 0,服务注册后不进行健康检查
if options.TTL == time.Duration(0) {
    return nil
}

// 如果 TTL > 0,则服务注册后进行一次健康检查
return c.Client.Agent().PassTTL("service:"+node.Id, "")

Consul 健康检查相关接口 API 方法和数据结构都定义在 github.com/hashicorp/consul/api/agent.go 中,可以自行去查看,在 Go Micro 中,options.TTL 可以通过启动服务时传递 MICRO_REGISTER_TTL(过期时间) 和 MICRO_REGISTER_INTERVAL (重新注册间隔时间)参数来设置(或者将它们放到环境变量中),或者你还可以在设置 consul_tcp_check 配置项通过 TCP 来进行健康检查。

默认情况下,既没有设置 options.TTL,也没有设置 consul_tcp_check,所以在本地示例微服务运行过程中,没有进行健康检查,相应的注册服务节点时对应的 check 实例值为 nil。

服务删除

当我们通过 Ctrl+C 终止服务的时候,会对 Consul 中的服务节点进行反注册操作(删除):

Consul 服务节点删除

从 Consul 代理运行窗口的日志中也可以看到相应的操作流程,也是通过调用对应的 HTTP API 接口实现的:

Consul 服务节点删除流程

对应的 API 接口是 /v1/agent/service/deregister/<node.id>,请求方式是 PUT

Go Micro 框架的 consul 组件对应的方法是 Deregister

func (c *consulRegistry) Deregister(s *registry.Service) error {
    if len(s.Nodes) == 0 {
    	    return errors.New("Require at least one node")
    }
        
    // delete our hash and time check of the service
    c.Lock()
    delete(c.register, s.Name)
    delete(c.lastChecked, s.Name)
    c.Unlock()
        
    node := s.Nodes[0]
    return c.Client.Agent().ServiceDeregister(node.Id)
}

Consul Agent 对应的调用方法实现如下:

// ServiceDeregister is used to deregister a service with
// the local agent
func (a *Agent) ServiceDeregister(serviceID string) error {
    r := a.c.newRequest("PUT", "/v1/agent/service/deregister/"+serviceID)
    _, resp, err := requireOK(a.c.doRequest(r))
    if err != nil {
    	   return err
    }
    resp.Body.Close()
    return nil
}

点赞 取消点赞 收藏 取消收藏

<< 上一篇: 注册中心篇(一):Consul 简介和使用入门

>> 下一篇: 注册中心篇(三):Consul 服务发现的底层实现