Go Micro 框架底层组件篇 —— Broker 底层源码剖析


微服务底层通信原理

Broker接口

今天这篇分享我们来看看 Go Micro 框架中 Broker 组件的底层实现。和其他组件一样,Go Micro 也是通过抽象接口的方式提供了 Broker 接口:

type Broker interface {
	Init(...Option) error
	Options() Options
	Address() string
	Connect() error
	Disconnect() error
	Publish(topic string, m *Message, opts ...PublishOption) error
	Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
	String() string
}

从而方便开发者按照自己的业务需求去实现和扩展 Broker 接口,系统默认的 Broker 组件实现类是点对点的 HTTP 系统,以便最小化对其他工具的依赖,此外还开箱支持 NATS,你还可以通过 Go Plugins 引入其它消息系统插件,比如 Redis、gRPC、RabbitMQ、SQS、Kafka、Google Pub/Sub 等:

broker支持的消息系统

Broker 默认实现类初始化

我们以默认的 HTTP 系统为例介绍其底层实现,打开 user/main.go,在通过 micro.NewService 创建新的服务时,就包含了对 Broker 组件的初始化,这个初始化动作在 micro.newService 方法中完成(源码位于 src/github.com/micro/go-micro/service.go):

func newService(opts ...Option) Service {
	options := newOptions(opts...)

	options.Client = &clientWrapper{
		options.Client,
		metadata.Metadata{
			HeaderPrefix + "From-Service": options.Server.Options().Name,
		},
	}

	return &service{
		opts: options,
	}
}

我们点开 newOptions 方法,就可以看到 Broker 组件和 Cmd、Client、Server、Registry、Transport 等其他组件一起被初始化:

func newOptions(opts ...Option) Options {
	opt := Options{
		Broker:    broker.DefaultBroker,
		Cmd:       cmd.DefaultCmd,
		Client:    client.DefaultClient,
		Server:    server.DefaultServer,
		Registry:  registry.DefaultRegistry,
		Transport: transport.DefaultTransport,
		Context:   context.Background(),
	}

	for _, o := range opts {
		o(&opt)
	}

	return opt
}

被通过 Options 类封装起来设置到 Service 的 opts 属性上,以便后续调用。这里的 broker.DefaultBrokersrc/github.com/micro/go-micro/broker/broker.go 中通过如下这行代码返回的 httpBroker 实例:

var (
	DefaultBroker Broker = newHttpBroker()
)

httpBroker 就是实现了 Broker 接口的 HTTP Broker 实现类,具体的初始化逻辑感兴趣的可以自行去查看,这就是默认的 Broker 组件初始化逻辑。

接下来,回到 user/main.go,接下来执行 srv.Init() 方法,该方法源码仍然位于 src/github.com/micro/go-micro/service.go 中:

func (s *service) Init(opts ...Option) {
	// process options
	for _, o := range opts {
		o(&s.opts)
	}

	s.once.Do(func() {
		// Initialise the command flags, overriding new service
		_ = s.opts.Cmd.Init(
			cmd.Broker(&s.opts.Broker),
			cmd.Registry(&s.opts.Registry),
			cmd.Transport(&s.opts.Transport),
			cmd.Client(&s.opts.Client),
			cmd.Server(&s.opts.Server),
		)
	})
}

可以看到在这里,Broker 组件会和 Registry、Transport、Client、Server 一起从系统环境变量/命令行参数读取配置来覆盖默认的 Broker 实现实例,相应的实现逻辑和之前介绍的 Registry 和 Transport 一样,只不过对应的系统环境变量名是 MICRO_BROKER,参数选项名是 --broker,关于其具体实现可以在 src/github.com/micro/go-micro/config/cmd/cmd.go 中查看:

cli.StringFlag{
		Name:   "broker",
		EnvVar: "MICRO_BROKER",
		Usage:  "Broker for pub/sub. http, nats, rabbitmq",
  },

比如我们在系统环境变量中设置 MICRO_BROKER=nats,或者在启动 UserService 的时候执行的命令是 go run main.go --broker=nats,则对应的 Broker 实例则变成 nats.NewBroker 返回的 natsBroker 实例并覆盖默认的 httpBroker 实现,当然要使用 NATS 作为 Broker 消息系统还要安装并运行 NATS 软件。

发布事件

接下来,我们就可以在服务中通过 srv.Server().Options().Broker 获取系统的 Broker 组件实例了,该实例以 srv.Init() 运行之后为准。然后我们再把它注册到服务处理处理器上:

proto.RegisterUserServiceHandler(srv.Server(), &service{repo, pubsub})

以便我们在服务端口中通过该实例来发布或者订阅指定主题消息:

srv.PubSub.Publish(topic, msg);

其中 topic 是字符串类型的消息主题,msgbroker.Message 类型的经过编码的消息头和消息主体,通常,我们以 JSON 格式对消息进行编码。

httpBroker 类为例,对应的 Publish 方法实现源码如下:

func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
	// create the message first
	m := &Message{
		Header: make(map[string]string),
		Body:   msg.Body,
	}

	for k, v := range msg.Header {
		m.Header[k] = v
	}

	m.Header[":topic"] = topic

	// encode the message
	b, err := h.opts.Codec.Marshal(m)
	if err != nil {
		return err
	}

	// save the message
	h.saveMessage(topic, b)

	// now attempt to get the service
	h.RLock()
	s, err := h.r.GetService("topic:" + topic)
	if err != nil {
		h.RUnlock()
		// ignore error
		return nil
	}
	h.RUnlock()

	pub := func(node *registry.Node, t string, b []byte) error {
		scheme := "http"

		// check if secure is added in metadata
		if node.Metadata["secure"] == "true" {
			scheme = "https"
		}

		vals := url.Values{}
		vals.Add("id", node.Id)

		uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode())
		r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
		if err != nil {
			return err
		}

		// discard response body
		io.Copy(ioutil.Discard, r.Body)
		r.Body.Close()
		return nil
	}

	srv := func(s []*registry.Service, b []byte) {
		for _, service := range s {
			// only process if we have nodes
			if len(service.Nodes) == 0 {
				continue
			}

			switch service.Version {
			// broadcast version means broadcast to all nodes
			case broadcastVersion:
				var success bool

				// publish to all nodes
				for _, node := range service.Nodes {
					// publish async
					if err := pub(node, topic, b); err == nil {
						success = true
					}
				}

				// save if it failed to publish at least once
				if !success {
					h.saveMessage(topic, b)
				}
			default:
				// select node to publish to
				node := service.Nodes[rand.Int()%len(service.Nodes)]

				// publish async to one node
				if err := pub(node, topic, b); err != nil {
					// if failed save it
					h.saveMessage(topic, b)
				}
			}
		}
	}

	// do the rest async
	go func() {
		// get a third of the backlog
		messages := h.getMessage(topic, 8)
		delay := (len(messages) > 1)

		// publish all the messages
		for _, msg := range messages {
			// serialize here
			srv(s, msg)

			// sending a backlog of messages
			if delay {
				time.Sleep(time.Millisecond * 100)
			}
		}
	}()

	return nil
}

在此之前,我们在 user/main.go 中通过 srv.run() 启动服务过程中,会启动 Transport 和 Broker 监听请求,对应代码位于 rpcServer.Start() 方法中:

func (s *rpcServer) Start() error {
    ...
    ts, err := config.Transport.Listen(config.Address)
    ...
    log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
    
    // connect to the broker
    if err := config.Broker.Connect(); err != nil {
        return err
    }
    bname := config.Broker.String()
    log.Logf("Broker [%s] Connected to %s", bname, config.Broker.Address())
    ...
}

对应的 Connect() 方法实现位于 httpBroker 中,主要是启动 HTTP 服务并初始化 httpBroker.r 属性,即通过缓存封装的默认 Registry 组件实现实例,以便后续查询指定 topic 对应服务。

我们回到 Publish() 方法,在该方法中首先对消息进行编码并保存到 httpBroker 的 inbox 中,inbox 是一个字典结构,以 topic 作为键,以消息数组作为值,新增的消息都会追加到这个数组上。

然后通过 h.r.GetService("topic:" + topic) 从 Registry 获取部署 topic 服务的节点信息,我们前面提到 h.r 是通过 cache.New 返回的封装了默认 Registry 实现的 cache 类实例,这里的 h.r 类似我们前介绍 Selector 组件时提到的 registrySelector.rc 属性,对应的 GetService 方法定义在 src/github.com/micro/go-micro/registry/cache/rcache.go 中,关于其具体实现可以参考 Selector 组件中的介绍,这里不再赘述了。

对于 httpBroker 而言,"topic:" + topic 对应服务节点是在运行 httpBroker.httpBroker 时注册的,所以在没有运行 EmailService 服务时,这里返回的 s 值为空并且 err 值不为空,所以直接退出了,但是在 cache 层 GetService 实现中会监听这个 key 对应的服务。

假设我们启动了订阅该 topic 的服务,则会注册对应服务,此时,再运行 Publish() 方法时,会运行到末尾的协程代码,从 httpBroker 的 inbox 中读取所有消息,然后调用 srv 方法将每条消息推送到服务节点去处理。所以,如果发布事件方未主动再次触发 Publish 方法,则堆积在 inbox 中的消息不会被订阅方拉取消费。

订阅事件

接下来,我们来到 mail/main.go 文件,看看订阅方的处理逻辑底层实现。srv.Init() 及之前的 Broker 组件实例初始化逻辑和 user 完全一致, 这里也是 httpBroker,下面我们重点来看一下这段订阅代码:

pubSub := srv.Server().Options().Broker
if err := pubSub.Connect(); err != nil {
    fmt.Errorf("broker connect error: %v\n", err)
}

// 订阅消息
_, err := pubSub.Subscribe(topic, func(pub broker.Event) error {
    var user *userProto.User
    if err := json.Unmarshal(pub.Message().Body, &user); err != nil {
        fmt.Errorf("process message failed: %v\n", err)
        return err
    }
    fmt.Printf("[User Registered]: %v\n", user)
    go sendEmail(user)
    return nil
})

首先我们从 srv 中获取默认的 Broker 实现实例,并调用其 Connect() 方法启动 HTTP 消息系统服务并初始化 httpBroker.r,注意这里的 httpBroker 实例不同于上一个 user 服务中的 httpBroker 实例,是一个新的实例,接下来我们调用这新实例的 Subscribe() 方法,其底层源码如下所示:

func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
	var err error
	var host, port string
	options := NewSubscribeOptions(opts...)

	// parse address for host, port
	host, port, err = net.SplitHostPort(h.Address())
	if err != nil {
		return nil, err
	}

	addr, err := maddr.Extract(host)
	if err != nil {
		return nil, err
	}

	// create unique id
	id := h.id + "." + uuid.New().String()

	var secure bool

	if h.opts.Secure || h.opts.TLSConfig != nil {
		secure = true
	}

	// register service
	node := &registry.Node{
		Id:      id,
		Address: mnet.HostPort(addr, port),
		Metadata: map[string]string{
			"secure": fmt.Sprintf("%t", secure),
		},
	}

	// check for queue group or broadcast queue
	version := options.Queue
	if len(version) == 0 {
		version = broadcastVersion
	}

	service := &registry.Service{
		Name:    "topic:" + topic,
		Version: version,
		Nodes:   []*registry.Node{node},
	}

	// generate subscriber
	subscriber := &httpSubscriber{
		opts:  options,
		hb:    h,
		id:    id,
		topic: topic,
		fn:    handler,
		svc:   service,
	}

	// subscribe now
	if err := h.subscribe(subscriber); err != nil {
		return nil, err
	}

	// return the subscriber
	return subscriber, nil
}

首先,我们将 email 启动的 HTTP 消息系统服务节点信息封装后设置到 httpSubscriber 实例中,该实例还包含了订阅主题、传入 Subscribe 方法的事件处理器、当前的 httpBroker 实例等信息,然后将其作为参数传递到当前 httpBroker 实例的 subscribe 方法:

func (h *httpBroker) subscribe(s *httpSubscriber) error {
	h.Lock()
	defer h.Unlock()

	if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
		return err
	}

	h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
	return nil
}

该方法会调用 h.r.Register 注册 "topic:" + topic 对应的 HTTP 消息系统服务节点信息,并将传入的 httpSubscriber 实例追加到 h.subscribers[s.topic] 数组中。

user 服务中指定 topic 对应事件 user.registered 发布,会通过 httpBroker 的 Publish 方法调用其中的匿名函数 pub 将消息推送到订阅该 topic 的 email 服务中来:

uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))

其中的 node.Address 即通过 Registry 获取到的 email 服务 HTTP 消息系统 IP + 端口号,然后在 email 服务中,通过 httpBroker 实现的 ServeHTTP() 方法处理请求(httpBroker 同时也实现了 http.Handler 接口):

func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	if req.Method != "POST" {
		err := merr.BadRequest("go.micro.broker", "Method not allowed")
		http.Error(w, err.Error(), http.StatusMethodNotAllowed)
		return
	}
	defer req.Body.Close()

	req.ParseForm()

	b, err := ioutil.ReadAll(req.Body)
	if err != nil {
		errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
		w.WriteHeader(500)
		w.Write([]byte(errr.Error()))
		return
	}

	var m *Message
	if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
		errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
		w.WriteHeader(500)
		w.Write([]byte(errr.Error()))
		return
	}

	topic := m.Header[":topic"]
	delete(m.Header, ":topic")

	if len(topic) == 0 {
		errr := merr.InternalServerError("go.micro.broker", "Topic not found")
		w.WriteHeader(500)
		w.Write([]byte(errr.Error()))
		return
	}

	p := &httpEvent{m: m, t: topic}
	id := req.Form.Get("id")

	h.RLock()
	for _, subscriber := range h.subscribers[topic] {
		if id == subscriber.id {
			// sub is sync; crufty rate limiting
			// so we don't hose the cpu
			subscriber.fn(p)
		}
	}
	h.RUnlock()
}

该方法只接受 POST 请求,对于读取到的异步消息会通过 httpBroker 实现的解码器进行解码后,从 h.subscribers[topic] 中获取所有的 httpSubscriber 实例,并调用其中的 fn 属性指向的处理器方法对消息进行处理,这里的处理器就是我们在调用 pubSub.Subscribe 时传入的第二个匿名函数,这里的 topic 就是第一个参数 user.registered

以上就是 Broker 组件默认实现类 httpBroker 底层的实现逻辑,你可以参考此流程分析下集成其他消息系统的 Broker 组件实现类,比如 NATS,我这里就不一一介绍了。


点赞 取消点赞 收藏 取消收藏

<< 上一篇: 通过 Broker 在 Go Micro 中实现基于事件驱动的异步通信

>> 下一篇: Go Micro 框架增补篇:Protobuf 快速入门