通过 Swoole\Table 实现 Swoole 多进程数据共享

第三方存储媒介

前面我们介绍了基于 Swoole 的 ProcessProcess\Pool 模块在 PHP 中实现多进程管理,但是多进程模式下进程间是相互隔离的,无法共享数据和变量,即便是通过 global 定义的全局或超全局变量,也只是在所属进程中有效,如果要在 Swoole 实现的多进程间共享数据,需要借助第三方存储媒介实现:

  • 数据库:MySQL、MongoDB
  • 缓存:Redis、Memcached
  • 磁盘文件

但是这也会引入新的问题,多进程同时操作一条记录或一个文件存在并发访问问题,以数据库操作为例,两个进程可能会同时读取一条数据,或者一个进程对某条记录进行更新处理时,另一个进程也来读取这条记录并进行操作,会导致最终结果数据与预期不一致的情况,这个时候,我们就需要引入锁的概念,当一个进程(比如进程A)对某个记录进行写操作时,对该记录加锁,这样其它进程就无法操作该条记录, 直到进程 A 事务提交再释放这个锁,让其他进程可以进行操作。

内存共享

PHP 相关扩展

对于单机操作来说,除了这些第三方存储媒介之外,还可以通过共享内存的方式实现进程间数据读写操作,有多个 PHP 扩展可以支持共享内存数据操作:

  • Semaphore 扩展:可通过该扩展包提供的 shm_get_varshm_put_var 函数实现内存共享数据的读写操作;
  • Shmop 扩展:可通过该扩展包提供的 shmop_readshmop_write 函数实现内存共享数据的读写操作;
  • APCu(APC User Cache)扩展:可通过该扩展包提供的 apc_fetchapc_store 实现内存共享数据的读写操作。

Swoole Table

但是上述扩展要么不支持锁,要么高并发时性能比较差,所以 Swoole 自己实现了一个共享内存读写工具 —— Swoole\Table,该工具是一个基于共享内存和锁实现的高性能并发数据结构,可用于解决多进程/多线程数据共享和同步加锁问题:

  • 性能强悍,单线程每秒可读写200万次;
  • 应用代码无需加锁,内置行锁自旋锁,所有操作均是多线程/多进程安全,用户层完全不需要考虑数据同步问题;
  • 支持多进程,可用于多进程之间共享数据;
  • 使用行锁,而不是全局锁,仅当 2 个进程在同一 CPU 时间,并发读取同一条数据才会进行发生抢锁。

Swoole\Table 支持以 Key-Value 方式读写,使用起来非常简单:

<?php

// 初始化一个容量为 1024 的 Swoole Table
$table = new \Swoole\Table(1024);
// 在 Table 中新增 id 列
$table->column('id', \Swoole\Table::TYPE_INT);
// 在 Table 中新增 name 列,长度为 50
$table->column('name', \Swoole\Table::TYPE_STRING, 10);
// 在 Table 中新泽 score 列
$table->column('score', \Swoole\Table::TYPE_FLOAT);
// 创建这个 Swoole Table
$table->create();


// 设置 Key-Value 值
$table->set('student-1', ['id' => 1, 'name' => '学小君', 'score' => 80]);
$table->set('student-2', ['id' => 2, 'name' => '学院君', 'score' => 90]);

// 如果指定 Key 值存在则打印对应 Value 值
if ($table->exist('student-1')) {
    echo "Student-" . $table->get('student-1', 'id') . ':' . $table->get('student-1', 'name').":".
        $table->get('student-1', 'score') . "\n";
}

// 自增操作
$table->incr('student-2', 'score', 5);
// 自减操作
$table->decr('student-2', 'score', 5);

// 表中总记录数
$count = $table->count();

// 删除指定表记录
$table->del('student-1');

此外 Swoole\Table 类还实现了迭代器接口,支持通过 foreach 进行遍历。

在 Laravel 中使用 Swoole\Table

如果要在 Laravel 中集成 Swoole 使用 Swoole\Table,以 LaravelS 扩展包为例,首先要在配置文件 config/laravels.php 中定义 swoole_tables 配置项:

'swoole_tables'            => [
    'ws' => [ // 表名,会加上 Table 后缀,比如这里是 wsTable
        'size'   => 102400, //  表容量
        'column' => [ // 表字段,字段名为 value
            ['name' => 'value', 'type' => \Swoole\Table::TYPE_INT, 'size' => 8],
        ],
    ],
    ... // 还可以定义其它表
],

然后我们可以在代码中通过 swoole 实例上的 wsTable 属性访问 SwooleTable:

class WebSocketService implements WebSocketHandlerInterface
{
    ...

    // 连接建立时触发
    public function onOpen(Server $server, Request $request)
    {
        // 在触发 WebSocket 连接建立事件之前,Laravel 应用初始化的生命周期已经结束,你可以在这里获取 Laravel 请求和会话数据
        // 调用 push 方法向客户端推送数据,fd 是客户端连接标识字段
        Log::info('WebSocket 连接建立:' . $request->fd);
        app('swoole')->wsTable->set('fd:' . $request->fd, ['value' => $request->fd]);
        $server->push($request->fd, 'Welcome to WebSocket Server built on LaravelS');
    }

    // 收到消息时触发
    public function onMessage(Server $server, Frame $frame)
    {
        foreach (app('swoole')->wsTable as $key => $row) {
            if (strpos($key, 'fd:') === 0 && $server->exist($row['value'])) {
                Log::info('Receive message from client: ' . $row['value']);
                // 调用 push 方法向客户端推送数据
                $server->push($frame->fd, 'This is a message sent from WebSocket Server at ' . date('Y-m-d H:i:s'));
            }
        }
    }
    
    ...

}

然后我们参考在 Laravel 中集成 Swoole 实现 WebSocket 服务器这篇教程从客户端向 WebSocket 服务器发起请求,即可在最新日志文件中看到相应的日志信息:

[2019-06-19 22:09:03] local.INFO: WebSocket 连接建立:1  
[2019-06-19 22:09:07] local.INFO: Receive message from client: 1

上一篇: 基于 Process\Pool 通过进程池实现数据库和 Redis 的持久连接

下一篇: 基于 Swoole 实现协程篇(一):基本概念和底层原理