Go Micro 框架底层组件篇 —— Transport 底层源码剖析(下)

micro-arch

上篇我们介绍了基于 Go Micro 的微服务服务端启动时 Transport 组件底层的功能,主要是启动底层的 HTTP 服务器监听并处理客户端请求,请求的具体处理逻辑还是在 Server 层完成,今天我们来看看客户端发起请求时 Transport 组件发挥了怎样的作用。

首先我们来看看基于 Go Micro 框架的客户端代码。

打开 src/hello/client.go,在 main 函数中,跳过新建服务和服务初始化的代码,接下来是调用官方 SDK 创建客户端服务的代码:

greeter := proto.NewGreeterService("go.micro.srv.greeter", service.Client())

greeter 对应的是定义在 hello.micro.go 中的 greeterService 实例,其客户端属性 c 默认是 rpcClient (对应源码位于 src/github.com/micro/go-micro/client/rpc_client.go)实例,name 则是这里传入的要调用的服务名称 go.micro.srv.greeter

然后,就是调用 greeter 实例上的 Hello 方法调用远程服务,并且传递了请求参数 {Name: "学院君"}

greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "学院君"})

下面我们简单看下 Hello 方法的源码:

func (c *greeterService) Hello(ctx context.Context, in *HelloRequest, opts ...client.CallOption) (*HelloResponse, error) {
	req := c.c.NewRequest(c.name, "Greeter.Hello", in)
	out := new(HelloResponse)
	err := c.c.Call(ctx, req, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

这里先初始化了远程请求对象 req,这里传入了远程服务名称和端点信息 Greeter.Hello(对应我们在上篇分享中在服务端注册的路由),以及包含请求参数信息的本地请求实例 in,然后实例化了响应实例,接着就是调用默认客户端 rpcClient 实例上的 Call 方法发起远程服务请求。

接下来的桥段我们前面介绍 Selector 和 Registry 组件的时候都已经分析过,先通过服务名称从 Selector 层获取服务节点,然后建立与服务节点的连接,再通过端点信息和请求参数访问服务节点对应路由,最后把处理结果返回给客户端,下面我们来重点关注下涉及到 Transport 组件的代码部分,这部分代码在获取到服务节点之后,主要逻辑封装在 rpcClientcall 方法中:

func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error {
	address := node.Address

	msg := &transport.Message{
		Header: make(map[string]string),
	}

	md, ok := metadata.FromContext(ctx)
	if ok {
		for k, v := range md {
			msg.Header[k] = v
		}
	}

	// set timeout in nanoseconds
	msg.Header["Timeout"] = fmt.Sprintf("%d", opts.RequestTimeout)
	// set the content type for the request
	msg.Header["Content-Type"] = req.ContentType()
	// set the accept header
	msg.Header["Accept"] = req.ContentType()

	// setup old protocol
	cf := setupProtocol(msg, node)

	// no codec specified
	if cf == nil {
		var err error
		cf, err = r.newCodec(req.ContentType())
		if err != nil {
			return errors.InternalServerError("go.micro.client", err.Error())
		}
	}

	var grr error
	c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout))
	if err != nil {
		return errors.InternalServerError("go.micro.client", "connection error: %v", err)
	}
	defer func() {
		// defer execution of release
		r.pool.release(address, c, grr)
	}()

	seq := atomic.LoadUint64(&r.seq)
	atomic.AddUint64(&r.seq, 1)
	codec := newRpcCodec(msg, c, cf)

	rsp := &rpcResponse{
		socket: c,
		codec:  codec,
	}

	stream := &rpcStream{
		context:  ctx,
		request:  req,
		response: rsp,
		codec:    codec,
		closed:   make(chan bool),
		id:       fmt.Sprintf("%v", seq),
	}
	defer stream.Close()

	ch := make(chan error, 1)

	go func() {
		defer func() {
			if r := recover(); r != nil {
				ch <- errors.InternalServerError("go.micro.client", "panic recovered: %v", r)
			}
		}()

		// send request
		if err := stream.Send(req.Body()); err != nil {
			ch <- err
			return
		}

		// recv request
		if err := stream.Recv(resp); err != nil {
			ch <- err
			return
		}

		// success
		ch <- nil
	}()

	select {
	case err := <-ch:
		grr = err
		return err
	case <-ctx.Done():
		grr = ctx.Err()
		return errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
	}
}

前面都是设置 Transport 组件请求头信息:

Transport 组件请求头信息

其中比较重要的是 Content-Type 字段,用于设置内容编码格式,默认是 application/protobuf,然后我们会通过它调用 rpcClientnewCodec 方法设置 Codec 组件,关于这一组件的底层实现我们放到下一篇去讲,这里默认的 Codec 实现是 proto,对应源码位于 src/github.com/micro/go-micro/codec/proto/proto.go,我们通过该实现类对服务请求和响应信息进行编解码。

接下来从客户端连接池中获取一个连接,如果连接池为空的话,则调用 Transport 默认实现类(这里是 httpTransport )的 Dial 方法新建连接并返回:

func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
	dopts := DialOptions{
		Timeout: DefaultDialTimeout,
	}

	for _, opt := range opts {
		opt(&dopts)
	}

	var conn net.Conn
	var err error

	// TODO: support dial option here rather than using internal config
	if h.opts.Secure || h.opts.TLSConfig != nil {
		config := h.opts.TLSConfig
		if config == nil {
			config = &tls.Config{
				InsecureSkipVerify: true,
			}
		}
		config.NextProtos = []string{"http/1.1"}
		conn, err = newConn(func(addr string) (net.Conn, error) {
			return tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config)
		})(addr)
	} else {
		conn, err = newConn(func(addr string) (net.Conn, error) {
			return net.DialTimeout("tcp", addr, dopts.Timeout)
		})(addr)
	}

	if err != nil {
		return nil, err
	}

	return &httpTransportClient{
		ht:       h,
		addr:     addr,
		conn:     conn,
		buff:     bufio.NewReader(conn),
		dialOpts: dopts,
		r:        make(chan *http.Request, 1),
		local:    conn.LocalAddr().String(),
		remote:   conn.RemoteAddr().String(),
	}, nil
}

该方法的核心是调用 newConn 建立与指定服务节点(即从注册中心获取的部署微服务服务端代码的节点)的连接,这里用到了 net 包提供的 DialTimeout 方法建立网络连接。连接建立成功后会返回一个 httpTransportClient 实例,其中包含了客户端连接的所有信息,在 rpc_pool.gogetConn 方法中,又将其封装到 poolConn 对象中并返回。

回到 rpcClientcall 方法中,接下来是一个 defer 语句,用于将本次新建立的连接信息添加到客户端连接池以便下次复用。再往后是编码对象和响应对象的初始化,以及初始化一个流对象,流对象中包含了请求实例、响应实例、请求上下文、编码实例、请求序列号以及关闭信号,我们将通过这个流对象在指定连接上发送请求、接收响应,对应的实现在后面的 go 协程语句中,意味着我们可以在客户端并发请求远程服务,最后 call 方法执行完毕调用,调用 defer stream.Close() 关闭这个流。

下面我们深入分析下在指定连接上发送请求、接收响应的源码,Go Micro 框架将这两个操作封装到了 Codec 层,我们可以看下 stream.Sendstream.Recv 的源码:

func (r *rpcStream) Send(msg interface{}) error {
	r.Lock()
	defer r.Unlock()

	if r.isClosed() {
		r.err = errShutdown
		return errShutdown
	}

	req := codec.Message{
		Id:       r.id,
		Target:   r.request.Service(),
		Method:   r.request.Method(),
		Endpoint: r.request.Endpoint(),
		Type:     codec.Request,
	}

	if err := r.codec.Write(&req, msg); err != nil {
		r.err = err
		return err
	}

	return nil
}

func (r *rpcStream) Recv(msg interface{}) error {
	r.Lock()
	defer r.Unlock()

	if r.isClosed() {
		r.err = errShutdown
		return errShutdown
	}

	var resp codec.Message

	if err := r.codec.ReadHeader(&resp, codec.Response); err != nil {
		if err == io.EOF && !r.isClosed() {
			r.err = io.ErrUnexpectedEOF
			return io.ErrUnexpectedEOF
		}
		r.err = err
		return err
	}

	switch {
	case len(resp.Error) > 0:
		// We've got an error response. Give this to the request;
		// any subsequent requests will get the ReadResponseBody
		// error if there is one.
		if resp.Error != lastStreamResponseError {
			r.err = serverError(resp.Error)
		} else {
			r.err = io.EOF
		}
		if err := r.codec.ReadBody(nil); err != nil {
			r.err = err
		}
	default:
		if err := r.codec.ReadBody(msg); err != nil {
			r.err = err
		}
	}

	return r.err
}

这样做的好处是统一对请求和响应进行编码、解码操作,关于编码和解码的细节我们放到 Codec 中去介绍,发送请求操作定义在 codec 实现类的 Write 方法中,这里就是 codec/proto 包里的 Write 方法:

func (c *Codec) Write(m *codec.Message, b interface{}) error {
	p, ok := b.(proto.Message)
	if !ok {
		return nil
	}
	buf, err := proto.Marshal(p)
	if err != nil {
		return err
	}
	_, err = c.Conn.Write(buf)
	return err
}

这里的 c.Conn 就是我们在前面初始化 Codec 对象时传入的 poolConn 实例,对应的 Write 方法实现则在 httpTransportClient 类中,最终追溯到 net 包的 (*Conn).Write 方法,通过 HTTP 协议从客户端将编码后的请求信息发送给服务端。

服务端如果已经启动则会监听客户端请求,收到请求后进行处理并返回响应,客户端通过 stream.Recv 方法接收响应,该方法先读取响应头,如果没有报错则继续读取响应实体,codec/protoReadHeader 实现为空,默认永远不会报错,然后我们来看下 ReadBody 的实现:

func (c *Codec) ReadBody(b interface{}) error {
	if b == nil {
		return nil
	}
	buf, err := ioutil.ReadAll(c.Conn)
	if err != nil {
		return err
	}
	return proto.Unmarshal(buf, b.(proto.Message))
}

对应的响应获取逻辑主要在 ioutil.ReadAll(c.Conn) 中,这段代码调用了 ioutil 包的 ReadAll 方法从指定客户端连接 httpTransportClient 实例上读取所有响应数据,如果没有错误的话将缓冲数据解码后返回给客户端。

最后,greeter.Hello 方法将最终响应信息返回,交给客户端打印出来:

fmt.Println(rsp.Greeting)

就是我们在终端看到的结果了:

请求处理结果

上一篇: Go Micro 框架底层组件篇 —— Transport 底层源码剖析(上)

下一篇: Go Micro 框架底层组件篇 —— Codec 底层源码剖析