通过 Broker 在 Go Micro 中实现基于事件驱动的异步通信

broker

同步通信 vs 异步通信

今天我们来介绍 Go Micro 的最后一个组件 —— Broker,Broker 是一个异步消息组件,可用于在 Go Micro 微服体系中通过事件驱动实现基于发布/订阅机制的异步通信,与之相对的是我们前面介绍的 Transport 组件,该组件实现的是服务间的同步通信,在同步通信中,服务之间需要通过接口进行应答式通信:

同步应答式通信

我们回顾下,以 HTTP 协议为例,服务端先启动 HTTP 服务器并通过 httpTransport.Listen 方法监听请求,然后通过 httpTransportListener.Accept 方法处理客户端请求;客户端则通过 httpTransport.Dial 方法来建立与服务端的连接并发送请求、接收响应。

Broker 是与 Transport 并列的通信组件,只不过采用的是异步通信:

异步通信

比如新用户注册成功后,我们需要给这个用户发送邮件和短信通知,面对这种场景我们就可以在用户注册完成后,将该事件发布到消息系统的 user.registered 主题,订阅该主题的其他服务就会收到通知,然后执行对应的操作,比如发邮件、发短信、初始化积分等等,这样一来,就极大增强了系统的扩展性和灵活性,这种方式显然优于注册成功后再通过访问指定接口去发送邮件和短信。

下面,我们先来通过实例演示如何在 Go Micro 中基于 Broker 实现事件驱动的异步通信,然后再介绍其底层实现。

定义发布事件的 user 服务

首先,我们在 hello 项目的 src 目录下创建一个 user 目录,用于存放 user 服务相关代码,然后仿照之前的 hello 服务在 user 目录下创建服务接口原型文件 proto/user.proto,并初始化代码如下:

syntax = "proto3";

package user;

service UserService {
    rpc Create(User) returns (Response) {}
    rpc Get(User) returns (Response) {}
}

message User {
    string id = 1;
    string name = 2;
    string email = 3;
    string password = 4;
}

message Error {
    int32 code = 1;
    string description = 2;
}

message Request {}

message Response {
    User user = 1;
    repeated User users = 2;
    repeated Error errors = 3;
}

接下来,通过 protoc 指令根据上述接口原型文件快速生成 user 服务相关基础类/接口代码:

protoc --proto_path=. --micro_out=. --go_out=. proto/user.proto

代码目录结构

然后我们在 user 目录下创建 main.go 文件用于定义服务端逻辑,由于我们需要实现用户注册功能,所以,这里我们先编写数据库相关逻辑,在本示例项目中,我们将通过 jinzhu/gorm 这个包来实现与数据库的交互,所以在开始之前先安装这个依赖包:

go get -u github.com/jinzhu/gorm

安装完成后,我们到本地数据库(以 MySQL 为例)创建一个数据库 greeter,然后编写数据库连接代码如下:

// 建立数据库连接
func CreateConnection() (*gorm.DB, error) {
    // Get database details from environment variables
    host := "localhost"
    port := "3306"
    user := "root"
    password := "root"
    DbName := "greeter"

    return gorm.Open(
        "mysql",
        fmt.Sprintf(
            "%s:%s@(%s:%s)/%s?charset=utf8&parseTime=True&loc=Local",
            user, password, host, port, DbName,
        ),
    )
}

接下来,定义一个 UserRepository 作为模型类,并编写新增用户和查询用户相关逻辑:

// 通过 Repository 与数据库进行交互
type Repository interface {
    Get(id string) (*proto.User, error)
    Create(user *proto.User) error
    GetByEmail(email string) (*proto.User, error)
}

type UserRepository struct {
    db *gorm.DB
}

func (repo *UserRepository) Get(id string) (*proto.User, error) {
    var user proto.User
    user.Id = id
    if err := repo.db.First(&user).Error; err != nil {
        return nil, err
    }
    return &user, nil
}

func (repo *UserRepository) Create(user *proto.User) error {
    if err := repo.db.Create(user).Error; err != nil {
        return err
    }
    return nil
}

func (repo *UserRepository) GetByEmail(email string) (*proto.User, error) {
    user := &proto.User{}
    if err := repo.db.Where("email = ?", email).First(&user).Error; err != nil {
        return nil, err
    }
    return user, nil
}

可以看到,我们通过 gorm.DB 类提供的方法实现数据库的增删改查操作,UserRepository 中传入的 db 指针正是上述 CreateConnection() 方法的返回值。

定义好数据库交互逻辑后,我们再来编写服务端口:

const topic = "user.registered"

// 服务端提供的服务端口
type service struct {
    repo    Repository
    PubSub  broker.Broker
}

func (srv *service) Get(ctx context.Context, req *proto.User, res *proto.Response) error {
    user, err := srv.repo.Get(req.Id)
    if err != nil {
        return err
    }
    res.User = user
    return nil
}

func (srv *service) Create(ctx context.Context, req *proto.User, res *proto.Response) error {
    // Generates a hashed version of our password
    hashedPass, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)
    if err != nil {
        return err
    }
    req.Password = string(hashedPass)
    if err := srv.repo.Create(req); err != nil {
        return err
    }

    res.User = req
    body, err := json.Marshal(res.User)
    if err != nil {
        return err
    }

    // Create a broker message
    msg := &broker.Message{
        Header: map[string]string{
            "id": res.User.Id,
        },
        Body: body,
    }
    if err := srv.PubSub.Publish(topic, msg); err != nil {
        fmt.Errorf("publish message failed: %v\n", err)
        return err
    }

    return nil
}

上述代码中的 service 类实现了我们在原型文件中定义的 UserService 接口,此外,我们还引入了两个新的属性 repopubsub,分别用于实现与底层 UserRepository 的交互以及通过 Broker 组件发布消息。在 Get 方法中,我们只是简单查询数据库并返回用户实例,在 Create 方法中,当我们将用户信息成功保存到数据库之后,还要调用 Broker 组件实现类(默认是 httpBroker)的 Publish 方法将编码后的消息发布到指定 topic 上,这里是通过常量定义的 user.registered

至此,我们就完成了服务端各个组件的实现逻辑,接下来,在 main() 方法中将它们组合起来,并且对外提供服务,最终的 main.go 文件完整代码如下所示:

package main

import (
    "encoding/json"
    "fmt"
    "github.com/jinzhu/gorm"
    _ "github.com/jinzhu/gorm/dialects/mysql"
    "github.com/micro/go-micro"
    "github.com/micro/go-micro/broker"
    _ "github.com/micro/go-plugins/registry/etcd"
    "golang.org/x/crypto/bcrypt"
    "golang.org/x/net/context"
    proto "user/proto"
)

const topic = "user.registered"

// 服务端提供的服务端口
type service struct {
    repo    Repository
    PubSub  broker.Broker
}

func (srv *service) Get(ctx context.Context, req *proto.User, res *proto.Response) error {
    user, err := srv.repo.Get(req.Id)
    if err != nil {
        return err
    }
    res.User = user
    return nil
}

func (srv *service) Create(ctx context.Context, req *proto.User, res *proto.Response) error {
    // Generates a hashed version of our password
    hashedPass, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)
    if err != nil {
        return err
    }
    req.Password = string(hashedPass)
    if err := srv.repo.Create(req); err != nil {
        return err
    }

    res.User = req
    body, err := json.Marshal(res.User)
    if err != nil {
        return err
    }

    // Create a broker message
    msg := &broker.Message{
        Header: map[string]string{
            "id": res.User.Id,
        },
        Body: body,
    }
    if err := srv.PubSub.Publish(topic, msg); err != nil {
        fmt.Errorf("publish message failed: %v\n", err)
        return err
    }

    return nil
}

// 建立数据库连接
func CreateConnection() (*gorm.DB, error) {
    // Get database details from environment variables
    host := "localhost"
    port := "3306"
    user := "root"
    password := "root"
    DbName := "greeter"

    return gorm.Open(
        "mysql",
        fmt.Sprintf(
            "%s:%s@(%s:%s)/%s?charset=utf8&parseTime=True&loc=Local",
            user, password, host, port, DbName,
        ),
    )
}

// 通过 Repository 与数据库进行交互
type Repository interface {
    Get(id string) (*proto.User, error)
    Create(user *proto.User) error
    GetByEmail(email string) (*proto.User, error)
}

type UserRepository struct {
    db *gorm.DB
}

func (repo *UserRepository) Get(id string) (*proto.User, error) {
    var user proto.User
    user.Id = id
    if err := repo.db.First(&user).Error; err != nil {
        return nil, err
    }
    return &user, nil
}

func (repo *UserRepository) Create(user *proto.User) error {
    if err := repo.db.Create(user).Error; err != nil {
        return err
    }
    return nil
}

func (repo *UserRepository) GetByEmail(email string) (*proto.User, error) {
    user := &proto.User{}
    if err := repo.db.Where("email = ?", email).First(&user).Error; err != nil {
        return nil, err
    }
    return user, nil
}

//  UserService 服务端入口函数
func main() {
    // Creates a database connection and handles
    // closing it again before exit.
    db, err := CreateConnection()
    defer db.Close()

    if err != nil {
        fmt.Errorf("Could not connect to DB: %v\n", err)
    }

    // Automatically migrates the user struct
    // into database columns/types etc. This will
    // check for changes and migrate them each time
    // this service is restarted.
    db.AutoMigrate(&proto.User{})

    repo := &UserRepository{db}

    // Create a new service. Optionally include some options here.
    srv := micro.NewService(
        // This name must match the package name given in your protobuf definition
        micro.Name("go.micro.srv.user"),
        micro.Version("latest"),
    )

    // Init will parse the command line flags.
    srv.Init()

    pubsub := srv.Server().Options().Broker

    // Register handler
    proto.RegisterUserServiceHandler(srv.Server(), &service{repo, pubsub})

    // Run the server
    if err := srv.Run(); err != nil {
        fmt.Println(err)
    }
}

main 方法中,我们先建立与数据库的连接,然后运行迁移命令,如果数据表未创建的话会自动创建,接下来,将数据库连接实例赋值给 UserRepository,并且在注册 UserService 处理器的时候和 Broker 实例一起传递到 proto.RegisterUserServiceHandler 方法中,这样,当客户端发起请求时,就可以在执行相应服务端口时调用这里初始化的 repopubsub 实例了。其他逻辑和之前介绍的 hello 服务一样,不再赘述。

user 服务客户端实现

服务端逻辑编写好了之后,我们再在 user 目录下创建一个 client.go 用于定义客户端逻辑,在客户端代码中,主要传入参数完成用户注册并且测试下查询方法是否可用:

package main

import (
    "fmt"
    "github.com/micro/go-micro"
    _ "github.com/micro/go-plugins/registry/etcd"
    "golang.org/x/net/context"
    "math/rand"
    proto "user/proto"
)

func main() {
    // Create a new service. Optionally include some options here.
    service := micro.NewService(micro.Name("go.micro.cli.user"))
    service.Init()

    // Create new user client
    userService := proto.NewUserService("go.micro.srv.user", service.Client())

    id := randStr(16)
    name := "学院君"
    email := "yaojinbu@163.com"
    password := "test123"

    fmt.Println(id, name, email, password)

    // Call the greeter
    rsp, err := userService.Create(context.TODO(), &proto.User{
        Id: id,
        Name: name,
        Email: email,
        Password: password,
    })
    if err != nil {
        fmt.Errorf("User Register Failed: %v\n", err);
    }
    fmt.Println("User Registered:", rsp.User.Id)

    resp, err := userService.Get(context.TODO(), &proto.User{
        Id: rsp.User.Id,
    })
    if err != nil {
        fmt.Errorf("Could not get user: %v\n", err)
    }
    fmt.Println("User Info:", resp.User)
}

func randStr(n int) string {
    var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890")
    b := make([]rune, n)
    for i := range b {
        b[i] = letters[rand.Intn(len(letters))]
    }
    return string(b)
}

用户注册注册成功后,服务端会发布事件到 user.registered,这样,订阅该主题的其他服务就可以异步执行相应的业务逻辑,下面我们就来编写一个简单的 email 服务来订阅并处理用户注册事件。

订阅用户注册的 email 服务

我们在 hello/src 目录下创建一个新的子目录 email 用来存放邮件服务相关代码,由于这里我们仅将其用作演示用户注册事件订阅及处理,所以不再定义服务原型接口文件,只在 email 目录下创建一个 main.go 用于启动 email 服务并 Broker 订阅 user.registered 事件:

package main

import (
    "encoding/json"
    "fmt"
    "github.com/micro/go-micro"
    "github.com/micro/go-micro/broker"
    _ "github.com/micro/go-plugins/registry/etcd"
    userProto "user/proto"
)

const topic = "user.registered"

func main() {
    srv := micro.NewService(
        micro.Name("go.micro.srv.email"),
        micro.Version("latest"),
    )
    srv.Init()

    pubSub := srv.Server().Options().Broker
    if err := pubSub.Connect(); err != nil {
        fmt.Errorf("broker connect error: %v\n", err)
    }

    // 订阅消息
    _, err := pubSub.Subscribe(topic, func(pub broker.Event) error {
        var user *userProto.User
        if err := json.Unmarshal(pub.Message().Body, &user); err != nil {
            fmt.Errorf("process message failed: %v\n", err)
            return err
        }
        fmt.Printf("[User Registered]: %v\n", user)
        go sendEmail(user)
        return nil
    })

    if err != nil {
        fmt.Printf("sub error: %v\n", err)
    }

    if err := srv.Run(); err != nil {
        fmt.Errorf("srv run error: %v\n", err)
    }
}

func sendEmail(user *userProto.User) error {
    fmt.Printf("[SENDING A EMAIL TO %s...]\n", user.Name)
    return nil
}

在邮件服务的 main() 方法中,我们通过默认的 Broker 组件实现类实例提供的方法建立与消息系统的连接,连接成功则订阅指定 topic,这里是 user.registered,并通过一个回调函数来定义事件发生时的处理逻辑,这里我们对收到的消息进行解码并打印,然后通过协程调用邮件发送逻辑,这里为了简化代码,我们只是通过打印一行语句模拟邮件发送。其他逻辑和普通的服务端启动并无区别。

完整流程演示

好了,现在我们已经有了发布用户注册事件的服务端,有了触发用户注册事件的客户端,还有了订阅用户注册事件的其他服务端,接下来,我们可以来完整演示 Broker 基于事件驱动实现异步通信了。

首先启动 UserService 服务,通过日志我们可以看到 Broker 默认基于 http 系统:

启动 UserService 服务

然后我们新开一个 Terminal 窗口来启动 EmailService 服务:

启动 EmailService 服务

最后,再新开一个 Terminal 窗口在 user 目录下运行客户端代码模拟用户注册:

客户端代码模拟用户注册

通过输出的日志可以看到用户注册成功,并且可以在数据库查询到对应的记录,此时切换到 EmailService 所在的窗口,可以看到如下输出:

邮件发送日志

则表示它已经成功接收到 UserService 服务发布的事件并且做了发送邮件的处理。

以上就是在 Go Micro 框架中通过 Broker 组件实现异步通信的简单实现,当然在实际生产环境中,我们基本不会使用 http 作为消息系统,而是使用更加工业级的 NATS、RabbitMQ、Redis 或者其他云服务组件,比如 GoolePubsub、AWS 的 SQS 等。切换起来也很简单,和前面介绍的注册中心切换逻辑类似,下篇分享介绍 Broker 底层实现逻辑的时候,我们会捎带提一下。

上一篇: Go Micro 框架底层组件篇 —— Codec 底层源码剖析

下一篇: Go Micro 框架底层组件篇 —— Broker 底层源码剖析