基于 Swoole 实现协程篇(三):在 Laravel 框架中使用协程

前两篇教程我们陆续介绍了 Swoole 协程的实现原理,以及如何在 PHP 中基于 Swoole 协程实现并发编程,今天,学院君将介绍如何在 Laravel 框架中使用 Swoole 协程,还是基于 LaravelS 扩展包为例来进行演示。

通过上篇教程的示例代码,你应该已经知道,多个 Swoole 协程的执行顺序时是无序的,这通常取决于每个 Swoole 协程任务的时间复杂度,因此,对于处理 Web 请求的 HTTP 服务器来说,每个请求的数据都要与特定的协程 ID 关联起来,才能让业务逻辑正确执行。

不过在 Laravel 框架中,有很多单例和全局的静态属性,在基于 Swoole 实现的 HTTP 服务器中,这种单例和静态属性一旦初始化后,会常驻在内存中,不会随着请求的结束而销毁,下次同一个 worker 进程中有新的请求过来,依然使用上个请求初始化的单例和静态属性,这就会导致不同请求之间数据的相互影响,比如数据库连接实例就是单例。这种状况在同步阻塞模式下不会有什么问题,但是在基于协程的异步非阻塞模式下,就会有问题,因为每个数据库操作都要建立新的连接,并维护对应连接的 IO 状态,否则就会因为状态异常导致操作失败,要解决这个问题,需要引入连接池,但是 LaravelS 扩展包不支持连接池功能,所以不要在这些场景下使用协程,一般只有在用户自定义的进程中才能使用。

LaravelS 配置文件 config/laravels.php 中有两个与协程相关的配置:

'enable_coroutine_runtime' => false,
...
'swoole' => [
    ...
    'enable_coroutine' => false,
    ...
]

第一个配置 enable_coroutine_runtime 设置为 true 的话,表示可以在运行时动态将基于php_stream 实现的扩展、PHP 网络客户端代码一键协程化,该特性需要 Swoole 4.1.0 及以上版本才支持,这些扩展目前包括 PHP 官方 redis、pdo、mysqli 扩展等。

第二个配置 enable_coroutine 设置为 true 的话则表示在 LaravelS 代码中启用 Swoole 协程。这里,我们将该配置值设置为 true

接下来,我们创建一个用户自定义的进程 TestProcess 并在其中使用协程,自定义进程需要实现 CustomProcessInterface 接口:

<?php
namespace App\Processes;

use App\Jobs\TestTask;
use Hhxsv5\LaravelS\Swoole\Process\CustomProcessInterface;
use Hhxsv5\LaravelS\Swoole\Task\Task;
use Illuminate\Support\Facades\Log;
use Swoole\Coroutine;
use Swoole\Http\Server;
use Swoole\Process;

class TestProcess implements CustomProcessInterface
{
    public static function getName()
    {
        return "test"; // 进程名称
    }

    public static function callback(Server $swoole, Process $process)
    {
        // 回调函数不能退出,一旦退出,Manager 会自动创建该进程,频繁退出/创建进程会消耗系统性能
        Log::info(__METHOD__, [posix_getpid(), $swoole->stats()]);
        while (true) {  // 需要通过 while(true) 进行循环保持不退出
            Log::info('随便做点什么...');
            // 相当于 PHP 的 sleep 函数,但是底层会自动启动协程,让出时间片,睡眠时间结束后恢复运行
            Coroutine::sleep(1);
            // 在自定义进程中调度任务, 但不支持任务的 finish 回调
            // 注意:
            // 1、第二个参数设置为 true,表示通过消息管道通信
            // 2、在 config/laravels.php 配置文件中设置 task_ipc_mode 为 1 或 2
            // task_ipc_mode 用于设置 task worker 与 worker 进程之间的通信模式,1 表示通过 unix socket,2 表示使用消息队列
            $ret = Task::deliver(new TestTask('task data'), true);
            var_dump($ret);
            // 上一层会捕获本函数抛出的异常并将其记录到 Swoole 日志中,如果异常数超过10个,该进程会退出,然后 Master 进程会重新创建这个进程
        }
    }

    public static function onReload(Server $swoole, Process $process)
    {
        // 结束进程
        $process->exit(0);
    }
}

将该文件保存到 app/Processes 目录下,然后在配置文件 config/laravels.php 中注册自定义进程:

'processes'  => [
    [
        'class'    => \App\Processes\TestProcess::class,
        'redirect' => false, // 是否将输入输出重定向到 stdin/stdout, true or false
        'pipe'     => 0 // 管道类型, 0: 不使用管道 1: SOCK_STREAM 2: SOCK_DGRAM
    ],
],

用户自定义进程生命周期和 Master 和 Manager 进程一样,在服务器启动时创建,因此可用于监控、上报或完成其它特殊的全局任务,这里我们使用 TestProcess 来调度 TestTask 任务的执行。关于用户自定义进程的明细可以参考 Swoole Server 文档

接下来,在项目根目录下启动基于 Swoole 的 laravels 服务器:

./bin/laravels start

然后我们就可以在 storage/logs 目录下最新的日志文件中看到 TestProcess 回调函数的触发和 TestTask 任务的执行:

[2019-07-02 22:31:46] local.INFO: App\Processes\TestProcess::callback [373,{"start_time":1562077903,"connection_num":0,"accept_count":0,"close_count":0,"worker_num":18,"idle_worker_num":16,"tasking_num":0,"request_count":0,"worker_request_count":0,"worker_dispatch_count":0,"coroutine_num":1}] 
[2019-07-02 22:31:46] local.INFO: Do something
...
[2019-07-02 22:38:39] local.INFO: App\Jobs\TestTask: 开始处理任务 ["task data"] 
[2019-07-02 22:38:39] local.INFO: Do something  
[2019-07-02 22:38:40] local.INFO: App\Jobs\TestTask: 开始处理任务 ["task data"] 
[2019-07-02 22:38:40] local.INFO: Do something  
[2019-07-02 22:38:41] local.INFO: App\Jobs\TestTask: 开始处理任务 ["task data"] 
[2019-07-02 22:38:41] local.INFO: Do something  
[2019-07-02 22:38:42] local.INFO: App\Jobs\TestTask: 开始处理任务 ["task data"] 
[2019-07-02 22:38:42] local.INFO: Do something   
...

由于使用协程的缘故,这些任务都是每隔 1 秒执行一次,而且是异步非阻塞的。

上一篇: 基于 Swoole 实现协程篇(二):通过协程实现并发编程

下一篇: 基于 SMProxy 通过协程调度实现 MySQL 连接池