基于 Thrift + Laravel 构建微服务(二):引入 Swoole 重构服务端实现


上篇分享学院君给大家演示了如何在 Laravel 项目中集成 Thrift 提供远程 RPC 服务调用,不过,Thrift 默认是基于 PHP 同步阻塞机制的,在应对高并发场景时性能上是个硬伤,因此,今天这篇分享我们将引入 Swoole 来实现异步 TCP 服务器。

服务端代码

首先我们还是从服务端入手,在 app 目录下新建一个 Swoole 目录用于存放 Swoole 相关代码,首先我们创建一个 ServerTransport.php 用来存放服务端代理类,并编写代码如下:

<?php
namespace App\Swoole;

use Thrift\Exception\TTransportException;
use Thrift\Server\TServerTransport;
use Swoole\Server as SwooleServer;

class ServerTransport extends TServerTransport
{
    /**
     * @var array 服务器选项
     */
    public $options = [
        'worker_num' => 1,
        'dispatch_mode' => 1, //1: 轮循, 3: 争抢
        'open_length_check' => true, //打开包长检测
        'package_max_length' => 8192000, //最大的请求包长度,8M
        'package_length_type' => 'N', //长度的类型,参见PHP的pack函数
        'package_length_offset' => 0,   //第N个字节是包长度的值
        'package_body_offset' => 4,   //从第几个字节计算长度
    ];
    
    /**
     * @var SwooleServer
     */
    public $server;

    /**
     * SwooleServerTransport constructor.
     * @param $host
     * @param int $port
     * @param int $mode
     * @param int $sockType
     * @param array $options
     */
    public function __construct($host, $port = 9999, $mode = SWOOLE_PROCESS, $sockType = SWOOLE_SOCK_TCP, $options = [])
    {
        $this->server = new SwooleServer($host, $port, $mode, $sockType);
        $options = array_merge($this->options, $options);
        $this->server->set($options);
    }
    /**
     * List for new clients
     *
     * @return void
     * @throws TTransportException
     */
    public function listen()
    {
        if (!$this->server->start()) {
            throw new TTransportException('Swoole ServerTransport start failed.', TTransportException::UNKNOWN);
        }
    }
    /**
     * Close the server
     *
     * @return void
     */
    public function close()
    {
        $this->server->shutdown();
    }
    /**
     * Swoole服务端通过回调函数获取请求,不可以调用accept方法
     * @return TTransport
     */
    protected function acceptImpl()
    {
        return null;
    }
}

我们在代理类的构造函数中初始化 Swoole TCP 服务器参数,然后在该类中定义 listen 方法启动这个 TCP 服务器并监听客户端请求,此外,我们还定义了一个 close 方法关闭该服务器。

接下来,我们在 app/Swoole 目录下创建 Transport.php 文件用于存放基于 Swoole 的传输层实现代码:

<?php
namespace App\Swoole;

use Swoole\Server as SwooleServer;
use Thrift\Exception\TTransportException;
use Thrift\Transport\TTransport;

class Transport extends TTransport
{
    /**
     * @var swoole服务器实例
     */
    protected $server;
    /**
     * @var int 客户端连接描述符
     */
    protected $fd = -1;
    /**
     * @var string 数据
     */
    protected $data = '';
    /**
     * @var int 数据读取指针
     */
    protected $offset = 0;
    
    /**
     * SwooleTransport constructor.
     * @param SwooleServer $server
     * @param int $fd
     * @param string $data
     */
    public function __construct(SwooleServer $server, $fd, $data)
    {
        $this->server = $server;
        $this->fd = $fd;
        $this->data = $data;
    }
    
    /**
     * Whether this transport is open.
     *
     * @return boolean true if open
     */
    public function isOpen()
    {
        return $this->fd > -1;
    }
    
    /**
     * Open the transport for reading/writing
     *
     * @throws TTransportException if cannot open
     */
    public function open()
    {
        if ($this->isOpen()) {
            throw new TTransportException('Swoole Transport already connected.', TTransportException::ALREADY_OPEN);
        }
    }
    
    /**
     * Close the transport.
     * @throws TTransportException
     */
    public function close()
    {
        if (!$this->isOpen()) {
            throw new TTransportException('Swoole Transport not open.', TTransportException::NOT_OPEN);
        }
        $this->server->close($this->fd, true);
        $this->fd = -1;
    }
    
    /**
     * Read some data into the array.
     *
     * @param int $len How much to read
     * @return string The data that has been read
     * @throws TTransportException if cannot read any more data
     */
    public function read($len)
    {
        if (strlen($this->data) - $this->offset < $len) {
            throw new TTransportException('Swoole Transport[' . strlen($this->data) . '] read ' . $len . ' bytes failed.');
        }
        $data = substr($this->data, $this->offset, $len);
        $this->offset += $len;
        return $data;
    }
   
    /**
     * Writes the given data out.
     *
     * @param string $buf The data to write
     * @throws TTransportException if writing fails
     */
    public function write($buf)
    {
        if (!$this->isOpen()) {
            throw new TTransportException('Swoole Transport not open.', TTransportException::NOT_OPEN);
        }
        $this->server->send($this->fd, $buf);
    }
}

Transport 类主要用于从传输层写入或读取数据,最后我们创建 Server.php 文件,用于存放基于 Swoole 的 RPC 服务器类:

<?php
namespace App\Swoole;

use Swoole\Server as SwooleServer;
use Thrift\Server\TServer;

class Server extends TServer
{
    public function serve()
    {
        $this->transport_->server->on('receive', [$this, 'handleReceive']);
        $this->transport_->listen();
    }

    public function stop()
    {
        $this->transport_->close();
    }

    /**
     * 处理RPC请求
     * @param Server $server
     * @param int $fd
     * @param int $fromId
     * @param string $data
     */
    public function handleReceive(SwooleServer $server, $fd, $fromId, $data)
    {
        $transport = new Transport($server, $fd, $data);
        $inputTransport = $this->inputTransportFactory_->getTransport($transport);
        $outputTransport = $this->outputTransportFactory_->getTransport($transport);
        $inputProtocol = $this->inputProtocolFactory_->getProtocol($inputTransport);
        $outputProtocol = $this->outputProtocolFactory_->getProtocol($outputTransport);
        $this->processor_->process($inputProtocol, $outputProtocol);
    }
}

该类继承自 Thrift\Server\TServer,在子类中需要实现 servestop 方法,分别定义服务器启动和关闭逻辑,这里我们在 serve 方法中定义了 Swoole TCP 服务器收到请求时的回调处理函数,其中 $this->transport 指向 App\Swoole\ServerTransport 实例,回调函数 handleReceive 中我们会将请求数据传入传输层处理类 Transport 进行初始化,然后再通过一系列转化通过处理器对请求进行处理,该方法中 $this 指针指向的属性都是在外部启动 RPC 服务器时传入的,后面我们会看到。定义好请求回调后,即可通过 $this->transport_->listen() 启动服务器并监听请求。

下面,我们还是通过 Artisan 命令来启动 RPC 服务器,创建一个新的命令类 SwooleServerStart

php artisan make:command SwooleServerStart

编写命令类实现代码如下:

<?php

namespace App\Console\Commands;

use App\Services\Server\UserService;
use App\Swoole\Server;
use App\Swoole\ServerTransport;
use App\Swoole\TFramedTransportFactory;
use App\Thrift\User\UserProcessor;
use Illuminate\Console\Command;
use Thrift\Exception\TException;
use Thrift\Factory\TBinaryProtocolFactory;

class SwooleServerStart extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'swoole:start';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Start Swoole Thrift RPC Server';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        try {
            $processor = new UserProcessor(new UserService());
            $tFactory = new TFramedTransportFactory();
            $pFactory = new TBinaryProtocolFactory();
            // 监听本地 9999 端口,等待客户端连接请求
            $transport = new ServerTransport('127.0.0.1', 9999);
            $server = new Server($processor, $transport, $tFactory, $tFactory, $pFactory, $pFactory);
            $this->info("服务监听地址: 127.0.0.1:9999");
            $server->serve();
        } catch (TException $exception) {
            $this->error("服务启动失败!");
        }
    }
}

基本逻辑和上篇一致,这里我们使用了一个自定义的 TFramedTransportFactory 类,对应源码位于 app/Swoole/TFramedTransportFactory.php

<?php
namespace App\Swoole;

use Thrift\Factory\TTransportFactory;
use Thrift\Transport\TFramedTransport;
use Thrift\Transport\TTransport;

class TFramedTransportFactory extends TTransportFactory
{
    public static function getTransport(TTransport $transport)
    {
        return new TFramedTransport($transport);
    }
}

该类重写了 getTransport 类方法来返回经过 TFramedTransport 封装的 Transport,以便被Swoole 服务器处理。

最后,我们在 app/Console/Kernel.php 中注册 SwooleServerStart 这个命令类使其生效:

use App\Console\Commands\SwooleServerStart;

protected $commands = [
    RpcServerStart::class,
    SwooleServerStart::class,
];

客户端代码

接下来,我们来修改客户端请求服务端远程接口的代码,在此之前在 app/Swoole 目录下新建一个 ClientTransport.php 来存放客户端与服务端通信的传输层实现代码:

<?php
namespace App\Swoole;

use Swoole\Client;
use Thrift\Exception\TTransportException;
use Thrift\Transport\TTransport;

class ClientTransport extends TTransport
{
    /**
     * @var string 连接地址
     */
    protected $host;
    /**
     * @var int 连接端口
     */
    protected $port;
    /**
     * @var Client
     */
    protected $client;

    /**
     * ClientTransport constructor.
     * @param string $host
     * @param int $port
     */
    public function __construct($host, $port)
    {
        $this->host = $host;
        $this->port = $port;
        $this->client = new Client(SWOOLE_SOCK_TCP);
    }

    /**
     * Whether this transport is open.
     *
     * @return boolean true if open
     */
    public function isOpen()
    {
        return $this->client->sock > 0;
    }

    /**
     * Open the transport for reading/writing
     *
     * @throws TTransportException if cannot open
     */
    public function open()
    {
        if ($this->isOpen()) {
            throw new TTransportException('ClientTransport already open.', TTransportException::ALREADY_OPEN);
        }
        if (!$this->client->connect($this->host, $this->port)) {
            throw new TTransportException(
                'ClientTransport could not open:' . $this->client->errCode,
                TTransportException::UNKNOWN
            );
        }
    }

    /**
     * Close the transport.
     * @throws TTransportException
     */
    public function close()
    {
        if (!$this->isOpen()) {
            throw new TTransportException('ClientTransport not open.', TTransportException::NOT_OPEN);
        }
        $this->client->close();
    }

    /**
     * Read some data into the array.
     *
     * @param int $len How much to read
     * @return string The data that has been read
     * @throws TTransportException if cannot read any more data
     */
    public function read($len)
    {
        if (!$this->isOpen()) {
            throw new TTransportException('ClientTransport not open.', TTransportException::NOT_OPEN);
        }
        return $this->client->recv($len, true);
    }
    
    /**
     * Writes the given data out.
     *
     * @param string $buf The data to write
     * @throws TTransportException if writing fails
     */
    public function write($buf)
    {
        if (!$this->isOpen()) {
            throw new TTransportException('ClientTransport not open.', TTransportException::NOT_OPEN);
        }
        $this->client->send($buf);
    }
}

然后我们在 app/Services/Client/UserService.php 中改写 UserService 类实现代码如下:

<?php
namespace App\Services\Client;

use App\Swoole\ClientTransport;
use App\Thrift\User\UserClient;
use Thrift\Exception\TException;
use Thrift\Protocol\TBinaryProtocol;
use Thrift\Protocol\TMultiplexedProtocol;
use Thrift\Transport\TBufferedTransport;
use Thrift\Transport\TFramedTransport;
use Thrift\Transport\TSocket;

class UserService
{
    public function getUserInfoViaRpc(int $id)
    {
        try {
            // 建立与 RpcServer 的连接
            $socket = new TSocket("127.0.0.1", 8888);
            $socket->setRecvTimeout(30000);  // 超时时间
            $socket->setDebug(true);
            $transport = new TBufferedTransport($socket, 1024, 1024);
            $protocol = new TBinaryProtocol($transport);
            $thriftProtocol = new TMultiplexedProtocol($protocol, 'UserService');
            $client = new UserClient($thriftProtocol);
            $transport->open();
            $result = $client->getInfo($id);
            $transport->close();
            return $result;
        } catch (TException $TException) {
            dd($TException);
        }
    }

    public function getUserInfoViaSwoole(int $id)
    {
        try {
            // 建立与 SwooleServer 的连接
            $socket = new ClientTransport("127.0.0.1", 9999);
            $transport = new TFramedTransport($socket);
            $protocol = new TBinaryProtocol($transport);
            $client = new UserClient($protocol);
            $transport->open();
            $result = $client->getInfo($id);
            $transport->close();
            return $result;
        } catch (TException $TException) {
            dd($TException);
        }
    }
}

新增了一个 getUserInfoViaSwoole 方法来定义与 Swoole TCP 服务器通信访问远程服务接口的实现。基本逻辑和 getUserInfoViaRpc 差不多,只是将 Transport 实现改为 ClientTransport 类,并且通过 TFramedTransport 进行封装,从而与服务端保持一致实现数据的正常读取与写入。

为了测试该方法,需要在 routes/web.php 中修改之前的路由定义:

Route::get('/user/{id}', function($id) {
    $userService = new UserService();
    //$user = $userService->getUserInfoViaRpc($id);
    $user = $userService->getUserInfoViaSwoole($id);
    return $user;
});

测试接口访问

至此,所有编码工作告一段落,我们新开一个终端窗口,启动 Swoole TCP 服务器:

启动 Swoole TCP 服务器

然后在客户端还是通过 php artisan serve 启动 Laravel 应用,然后在浏览器中访问 http://127.0.0.1:8000/user/1,得到的结果和上篇分享一致:

测试接口访问


点赞 取消点赞 收藏 取消收藏

<< 上一篇: 基于 Thrift + Laravel 构建微服务(一):RPC 调用实现

>> 下一篇: 基于 Thrift + Laravel 构建微服务(三):引入 Zookeeper 作为注册中心