基于 Laravel + Swoole + Vue 搭建实时在线聊天室(八):Websocket 服务端重构与用户认证

上篇教程学院君给大家演示了基于 Vue + Muse UI 前端登录到聊天室的实现,不过这一块的实现主要是前端与 Swoole HTTP 服务器的交互,未涉及到 WebSocket 连接,今天我们就来演示基于 Websocket 连接的认证实现,在此之前,我们还要费一番功夫重构下目前的 Websocket 后端实现,以便更好的进行后续开发。

Websocket 服务端重构

之前在建立 socket.io 客户端与 Websocket 服务端连接的时候,我们已经预留了 App\Services\WebSocket\WebSocket 类这个伏笔,并且提到后续与 Websocket 通信相关的业务逻辑都迁移到这个类中实现,\App\Services\WebSocket\WebSocketHandler 将只承担入口功能,所以今天的重构将重点围绕这个 WebSocket 类进行。

创建房间接口及实现类

在开始编写 WebSocket 类代码之前,我们先解决外围业务逻辑,聊天室一个密不可分的功能模块房间模块,这里我们先参照 swooletw/laravel-swoole 这个扩展包的实现将其房间相关类都迁移过来,保存到 app/Services/WebSocket/Rooms 目录下:

Websocket目录结构

其中,RoomContract.php 代码如下:

<?php

namespace App\Services\Websocket\Rooms;

interface RoomContract
{
    /**
     * Rooms key
     *
     * @const string
     */
    public const ROOMS_KEY = 'rooms';

    /**
     * Descriptors key
     *
     * @const string
     */
    public const DESCRIPTORS_KEY = 'fds';

    /**
     * Do some init stuffs before workers started.
     *
     * @return RoomContract
     */
    public function prepare(): RoomContract;

    /**
     * Add multiple socket fds to a room.
     *
     * @param int fd
     * @param array|string rooms
     */
    public function add(int $fd, $rooms);

    /**
     * Delete multiple socket fds from a room.
     *
     * @param int fd
     * @param array|string rooms
     */
    public function delete(int $fd, $rooms);

    /**
     * Get all sockets by a room key.
     *
     * @param string room
     *
     * @return array
     */
    public function getClients(string $room);

    /**
     * Get all rooms by a fd.
     *
     * @param int fd
     *
     * @return array
     */
    public function getRooms(int $fd);
}

这里面定义的是房间的接口,默认的两个实现一个是基于 Redis 作为存储媒介的 RedisRoom.php

<?php

namespace App\Services\Websocket\Rooms;

use Illuminate\Support\Arr;
use Predis\Client as RedisClient;
use Predis\Pipeline\Pipeline;

/**
 * Class RedisRoom
 */
class RedisRoom implements RoomContract
{
    /**
     * @var \Predis\Client
     */
    protected $redis;

    /**
     * @var array
     */
    protected $config;

    /**
     * @var string
     */
    protected $prefix = 'swoole:';

    /**
     * RedisRoom constructor.
     *
     * @param array $config
     */
    public function __construct(array $config)
    {
        $this->config = $config;
    }

    /**
     * @param \Predis\Client|null $redis
     *
     * @return RoomContract
     */
    public function prepare(RedisClient $redis = null): RoomContract
    {
        $this->setRedis($redis);
        $this->setPrefix();
        $this->cleanRooms();

        return $this;
    }

    /**
     * Set redis client.
     *
     * @param \Predis\Client|null $redis
     */
    public function setRedis(?RedisClient $redis = null)
    {
        if (! $redis) {
            $server = Arr::get($this->config, 'server', []);
            $options = Arr::get($this->config, 'options', []);

            // forbid setting prefix from options
            if (Arr::has($options, 'prefix')) {
                $options = Arr::except($options, 'prefix');
            }

            $redis = new RedisClient($server, $options);
        }

        $this->redis = $redis;
    }

    /**
     * Set key prefix from config.
     */
    protected function setPrefix()
    {
        if ($prefix = Arr::get($this->config, 'prefix')) {
            $this->prefix = $prefix;
        }
    }

    /**
     * Get redis client.
     */
    public function getRedis()
    {
        return $this->redis;
    }

    /**
     * Add multiple socket fds to a room.
     *
     * @param int fd
     * @param array|string rooms
     */
    public function add(int $fd, $rooms)
    {
        $rooms = is_array($rooms) ? $rooms : [$rooms];

        $this->addValue($fd, $rooms, RoomContract::DESCRIPTORS_KEY);

        foreach ($rooms as $room) {
            $this->addValue($room, [$fd], RoomContract::ROOMS_KEY);
        }
    }

    /**
     * Delete multiple socket fds from a room.
     *
     * @param int fd
     * @param array|string rooms
     */
    public function delete(int $fd, $rooms)
    {
        $rooms = is_array($rooms) ? $rooms : [$rooms];
        $rooms = count($rooms) ? $rooms : $this->getRooms($fd);

        $this->removeValue($fd, $rooms, RoomContract::DESCRIPTORS_KEY);

        foreach ($rooms as $room) {
            $this->removeValue($room, [$fd], RoomContract::ROOMS_KEY);
        }
    }

    /**
     * Add value to redis.
     *
     * @param $key
     * @param array $values
     * @param string $table
     *
     * @return $this
     */
    public function addValue($key, array $values, string $table)
    {
        $this->checkTable($table);
        $redisKey = $this->getKey($key, $table);

        $this->redis->pipeline(function (Pipeline $pipe) use ($redisKey, $values) {
            foreach ($values as $value) {
                $pipe->sadd($redisKey, $value);
            }
        });

        return $this;
    }

    /**
     * Remove value from reddis.
     *
     * @param $key
     * @param array $values
     * @param string $table
     *
     * @return $this
     */
    public function removeValue($key, array $values, string $table)
    {
        $this->checkTable($table);
        $redisKey = $this->getKey($key, $table);

        $this->redis->pipeline(function (Pipeline $pipe) use ($redisKey, $values) {
            foreach ($values as $value) {
                $pipe->srem($redisKey, $value);
            }
        });

        return $this;
    }

    /**
     * Get all sockets by a room key.
     *
     * @param string room
     *
     * @return array
     */
    public function getClients(string $room)
    {
        return $this->getValue($room, RoomContract::ROOMS_KEY) ?? [];
    }

    /**
     * Get all rooms by a fd.
     *
     * @param int fd
     *
     * @return array
     */
    public function getRooms(int $fd)
    {
        return $this->getValue($fd, RoomContract::DESCRIPTORS_KEY) ?? [];
    }

    /**
     * Check table for rooms and descriptors.
     *
     * @param string $table
     */
    protected function checkTable(string $table)
    {
        if (! in_array($table, [RoomContract::ROOMS_KEY, RoomContract::DESCRIPTORS_KEY])) {
            throw new \InvalidArgumentException("Invalid table name: `{$table}`.");
        }
    }

    /**
     * Get value.
     *
     * @param string $key
     * @param string $table
     *
     * @return array
     */
    public function getValue(string $key, string $table)
    {
        $this->checkTable($table);

        $result = $this->redis->smembers($this->getKey($key, $table));

        // Try to fix occasional non-array returned result
        return is_array($result) ? $result : [];
    }

    /**
     * Get key.
     *
     * @param string $key
     * @param string $table
     *
     * @return string
     */
    public function getKey(string $key, string $table)
    {
        return "{$this->prefix}{$table}:{$key}";
    }

    /**
     * Clean all rooms.
     */
    protected function cleanRooms(): void
    {
        if (count($keys = $this->redis->keys("{$this->prefix}*"))) {
            $this->redis->del($keys);
        }
    }
}

一个是基于 Swoole Table 作为存储媒介的 TableRoom.php

<?php
/**
 * Created by PhpStorm.
 * User: sunqiang
 * Date: 2019/11/7
 * Time: 3:14 PM
 */

namespace App\Services\Websocket\Rooms;

use Swoole\Table;

class TableRoom implements RoomContract
{
    /**
     * @var array
     */
    protected $config;

    /**
     * @var Table
     */
    protected $rooms;

    /**
     * @var Table
     */
    protected $fds;

    /**
     * TableRoom constructor.
     *
     * @param array $config
     */
    public function __construct(array $config)
    {
        $this->config = $config;
    }

    /**
     * Do some init stuffs before workers started.
     *
     * @return RoomContract
     */
    public function prepare(): RoomContract
    {
        $this->initRoomsTable();
        $this->initFdsTable();

        return $this;
    }

    /**
     * Add a socket fd to multiple rooms.
     *
     * @param int fd
     * @param array|string rooms
     */
    public function add(int $fd, $roomNames)
    {
        $rooms = $this->getRooms($fd);
        $roomNames = is_array($roomNames) ? $roomNames : [$roomNames];

        foreach ($roomNames as $room) {
            $fds = $this->getClients($room);

            if (in_array($fd, $fds)) {
                continue;
            }

            $fds[] = $fd;
            $rooms[] = $room;

            $this->setClients($room, $fds);
        }

        $this->setRooms($fd, $rooms);
    }

    /**
     * Delete a socket fd from multiple rooms.
     *
     * @param int fd
     * @param array|string rooms
     */
    public function delete(int $fd, $roomNames = [])
    {
        $allRooms = $this->getRooms($fd);
        $roomNames = is_array($roomNames) ? $roomNames : [$roomNames];
        $rooms = count($roomNames) ? $roomNames : $allRooms;

        $removeRooms = [];
        foreach ($rooms as $room) {
            $fds = $this->getClients($room);

            if (! in_array($fd, $fds)) {
                continue;
            }

            $this->setClients($room, array_values(array_diff($fds, [$fd])));
            $removeRooms[] = $room;
        }

        $this->setRooms($fd, array_values(array_diff($allRooms, $removeRooms)));
    }

    /**
     * Get all sockets by a room key.
     *
     * @param string room
     *
     * @return array
     */
    public function getClients(string $room)
    {
        return $this->getValue($room, RoomContract::ROOMS_KEY) ?? [];
    }

    /**
     * Get all rooms by a fd.
     *
     * @param int fd
     *
     * @return array
     */
    public function getRooms(int $fd)
    {
        return $this->getValue($fd, RoomContract::DESCRIPTORS_KEY) ?? [];
    }

    /**
     * @param string $room
     * @param array $fds
     *
     * @return TableRoom
     */
    protected function setClients(string $room, array $fds): TableRoom
    {
        return $this->setValue($room, $fds, RoomContract::ROOMS_KEY);
    }

    /**
     * @param int $fd
     * @param array $rooms
     *
     * @return TableRoom
     */
    protected function setRooms(int $fd, array $rooms): TableRoom
    {
        return $this->setValue($fd, $rooms, RoomContract::DESCRIPTORS_KEY);
    }

    /**
     * Init rooms table
     */
    protected function initRoomsTable(): void
    {
        $this->rooms = new Table($this->config['room_rows']);
        $this->rooms->column('value', Table::TYPE_STRING, $this->config['room_size']);
        $this->rooms->create();
    }

    /**
     * Init descriptors table
     */
    protected function initFdsTable()
    {
        $this->fds = new Table($this->config['client_rows']);
        $this->fds->column('value', Table::TYPE_STRING, $this->config['client_size']);
        $this->fds->create();
    }

    /**
     * Set value to table
     *
     * @param $key
     * @param array $value
     * @param string $table
     *
     * @return $this
     */
    public function setValue($key, array $value, string $table)
    {
        $this->checkTable($table);

        $this->$table->set($key, ['value' => json_encode($value)]);

        return $this;
    }

    /**
     * Get value from table
     *
     * @param string $key
     * @param string $table
     *
     * @return array|mixed
     */
    public function getValue(string $key, string $table)
    {
        $this->checkTable($table);

        $value = $this->$table->get($key);

        return $value ? json_decode($value['value'], true) : [];
    }

    /**
     * Check table for exists
     *
     * @param string $table
     */
    protected function checkTable(string $table)
    {
        if (! property_exists($this, $table) || ! $this->$table instanceof Table) {
            throw new \InvalidArgumentException("Invalid table name: `{$table}`.");
        }
    }
}

后面我们还会创建一个基于数据库作为存储媒介的 DatabaseRoom 类,这个留到后面具体演示房间功能的时候再实现。此外,我们在 app/Services/WebSocket/Facades 目录下为房间服务创建一个门面代理类 Room.php,并初始化代码如下:

<?php

namespace App\Services\Websocket\Facades;

use App\Services\Websocket\Rooms\RoomContract;
use Illuminate\Support\Facades\Facade;

/**
 * @method static $this prepare()
 * @method static $this add($fd, $rooms)
 * @method static $this delete($fd, $rooms)
 * @method static array getClients($room)
 * @method static array getRooms($fd)
 *
 * @see RoomContract
 */
class Room extends Facade
{
    /**
     * Get the registered name of the component.
     *
     * @return string
     */
    protected static function getFacadeAccessor()
    {
        return 'swoole.room';
    }
}

编写用户认证实现代码

房间是静态的空间,在房间里聊天的是动态的用户,因此,我们还要编写用户认证相关实现代码,以便唯一区分不同用户以及管理不同房间、不同用户的聊天信息,这里我们仍然参照 swooletw/laravel-swoole 的实现,在 app/Services/WebSocket 目录下创建一个 Authenticatable Trait 来实现用户认证相关业务逻辑:

<?php

namespace App\Services\WebSocket;

use Illuminate\Contracts\Auth\Authenticatable as AuthenticatableContract;
use InvalidArgumentException;

/**
 * Trait Authenticatable
 */
trait Authenticatable
{
    protected $userId;

    /**
     * Login using current user.
     *
     * @param \Illuminate\Contracts\Auth\Authenticatable $user
     *
     * @return mixed
     */
    public function loginUsing(AuthenticatableContract $user)
    {
        return $this->loginUsingId($user->getAuthIdentifier());
    }

    /**
     * Login using current userId.
     *
     * @param $userId
     *
     * @return mixed
     */
    public function loginUsingId($userId)
    {
        return $this->join(static::USER_PREFIX . $userId);
    }

    /**
     * Logout with current sender's fd.
     *
     * @return mixed
     */
    public function logout()
    {
        if (is_null($userId = $this->getUserId())) {
            return null;
        }

        return $this->leave(static::USER_PREFIX . $userId);
    }

    /**
     * Set multiple recepients' fds by users.
     *
     * @param $users
     *
     * @return Authenticatable
     */
    public function toUser($users)
    {
        $users = is_object($users) ? func_get_args() : $users;

        $userIds = array_map(function (AuthenticatableContract $user) {
            $this->checkUser($user);

            return $user->getAuthIdentifier();
        }, $users);

        return $this->toUserId($userIds);
    }

    /**
     * Set multiple recepients' fds by userIds.
     *
     * @param $userIds
     *
     * @return Authenticatable
     */
    public function toUserId($userIds)
    {
        $userIds = is_string($userIds) || is_integer($userIds) ? func_get_args() : $userIds;

        foreach ($userIds as $userId) {
            $fds = $this->room->getClients(static::USER_PREFIX . $userId);
            $this->to($fds);
        }

        return $this;
    }

    /**
     * Get current auth user id by sender's fd.
     */
    public function getUserId()
    {
        if (! is_null($this->userId)) {
            return $this->userId;
        }

        $rooms = $this->room->getRooms($this->getSender());

        foreach ($rooms as $room) {
            if (count($explode = explode(static::USER_PREFIX, $room)) === 2) {
                $this->userId = $explode[1];
            }
        }

        return $this->userId;
    }

    /**
     * Check if a user is online by given userId.
     *
     * @param $userId
     *
     * @return bool
     */
    public function isUserIdOnline($userId)
    {
        return ! empty($this->room->getClients(static::USER_PREFIX . $userId));
    }

    /**
     * Check if user object implements AuthenticatableContract.
     *
     * @param $user
     */
    protected function checkUser($user)
    {
        if (! $user instanceOf AuthenticatableContract) {
            throw new InvalidArgumentException('user object must implement ' . AuthenticatableContract::class);
        }
    }
}

实现 Websocket 核心类

完成用户认证和房间这两个外围功能后,接下来,正式开始 Websocket 类的实现,还是参考 swooletw/laravel-swoole 的实现,只是对部分实现做了调整来简化并适配 hhxsv5/laravel-s 扩展包:

<?php
namespace App\Services\WebSocket;

use App\Services\Websocket\Rooms\RoomContract;
use Illuminate\Support\Facades\App;
use Swoole\WebSocket\Server;

class WebSocket
{
    use Authenticatable;

    const PUSH_ACTION = 'push';
    const EVENT_CONNECT = 'connect';
    const USER_PREFIX = 'uid_';

    /**
     * Websocket Server
     * @var Server
     */
    protected $server;

    /**
     * Determine if to broadcast.
     *
     * @var boolean
     */
    protected $isBroadcast = false;

    /**
     * Scoket sender's fd.
     *
     * @var integer
     */
    protected $sender;

    /**
     * Recepient's fd or room name.
     *
     * @var array
     */
    protected $to = [];

    /**
     * Websocket event callbacks.
     *
     * @var array
     */
    protected $callbacks = [];

    /**
     * Room adapter.
     *
     * @var RoomContract
     */
    protected $room;

    /**
     * DI Container.
     *
     * @var \Illuminate\Contracts\Container\Container
     */
    protected $container;

    /**
     * Websocket constructor.
     *
     * @param RoomContract $room
     */
    public function __construct(RoomContract $room)
    {
        $this->room = $room;
    }

    /**
     * Set broadcast to true.
     */
    public function broadcast(): self
    {
        $this->isBroadcast = true;

        return $this;
    }

    /**
     * Set multiple recipients fd or room names.
     *
     * @param integer, string, array
     *
     * @return $this
     */
    public function to($values): self
    {
        $values = is_string($values) || is_integer($values) ? func_get_args() : $values;

        foreach ($values as $value) {
            if (! in_array($value, $this->to)) {
                $this->to[] = $value;
            }
        }

        return $this;
    }

    /**
     * Join sender to multiple rooms.
     *
     * @param string, array $rooms
     *
     * @return $this
     */
    public function join($rooms): self
    {
        $rooms = is_string($rooms) || is_integer($rooms) ? func_get_args() : $rooms;

        $this->room->add($this->sender, $rooms);

        return $this;
    }

    /**
     * Make sender leave multiple rooms.
     *
     * @param array $rooms
     *
     * @return $this
     */
    public function leave($rooms = []): self
    {
        $rooms = is_string($rooms) || is_integer($rooms) ? func_get_args() : $rooms;

        $this->room->delete($this->sender, $rooms);

        return $this;
    }

    /**
     * Emit data and reset some status.
     *
     * @param string
     * @param mixed
     *
     * @return boolean
     */
    public function emit(string $event, $data): bool
    {
        $fds = $this->getFds();
        $assigned = ! empty($this->to);

        // if no fds are found, but rooms are assigned
        // that means trying to emit to a non-existing room
        // skip it directly instead of pushing to a task queue
        if (empty($fds) && $assigned) {
            return false;
        }

        $payload = [
            'sender'    => $this->sender,
            'fds'       => $fds,
            'broadcast' => $this->isBroadcast,
            'assigned'  => $assigned,
            'event'     => $event,
            'message'   => $data,
        ];

        $server = app('swoole');
        $pusher = Pusher::make($payload, $server);
        $parser = app('swoole.parser');
        $pusher->push($parser->encode($pusher->getEvent(), $pusher->getMessage()));

        $this->reset();

        return true;
    }

    /**
     * An alias of `join` function.
     *
     * @param string
     *
     * @return $this
     */
    public function in($room)
    {
        $this->join($room);

        return $this;
    }

    /**
     * Register an event name with a closure binding.
     *
     * @param string
     * @param callback
     *
     * @return $this
     */
    public function on(string $event, $callback)
    {
        if (! is_string($callback) && ! is_callable($callback)) {
            throw new \InvalidArgumentException(
                'Invalid websocket callback. Must be a string or callable.'
            );
        }

        $this->callbacks[$event] = $callback;

        return $this;
    }

    /**
     * Check if this event name exists.
     *
     * @param string
     *
     * @return boolean
     */
    public function eventExists(string $event)
    {
        return array_key_exists($event, $this->callbacks);
    }

    /**
     * Execute callback function by its event name.
     *
     * @param string
     * @param mixed
     *
     * @return mixed
     */
    public function call(string $event, $data = null)
    {
        if (! $this->eventExists($event)) {
            return null;
        }

        // inject request param on connect event
        $isConnect = $event === static::EVENT_CONNECT;
        $dataKey = $isConnect ? 'request' : 'data';

        return App::call($this->callbacks[$event], [
            'websocket' => $this,
            $dataKey => $data,
        ]);
    }

    /**
     * Set sender fd.
     *
     * @param integer
     *
     * @return $this
     */
    public function setSender(int $fd)
    {
        $this->sender = $fd;

        return $this;
    }

    /**
     * Get current sender fd.
     */
    public function getSender()
    {
        return $this->sender;
    }

    /**
     * Get broadcast status value.
     */
    public function getIsBroadcast()
    {
        return $this->isBroadcast;
    }

    /**
     * Get push destinations (fd or room name).
     */
    public function getTo()
    {
        return $this->to;
    }

    /**
     * Get all fds we're going to push data to.
     */
    protected function getFds()
    {
        $fds = array_filter($this->to, function ($value) {
            return is_integer($value);
        });
        $rooms = array_diff($this->to, $fds);

        foreach ($rooms as $room) {
            $clients = $this->room->getClients($room);
            // fallback fd with wrong type back to fds array
            if (empty($clients) && is_numeric($room)) {
                $fds[] = $room;
            } else {
                $fds = array_merge($fds, $clients);
            }
        }

        return array_values(array_unique($fds));
    }

    /**
     * Reset some data status.
     *
     * @param bool $force
     *
     * @return $this
     */
    public function reset($force = false)
    {
        $this->isBroadcast = false;
        $this->to = [];

        if ($force) {
            $this->sender = null;
            $this->userId = null;
        }

        return $this;
    }
}

Websocket 中,我们引入了用户认证实现 Authenticatable Trait 以及房间实例 $room,该属性是 RoomContract 接口的实现类实例,关于其绑定我们马上会介绍。与原来的实现相比,这里移除了中间件功能,纯粹为了简化业务逻辑,如果有需要的话后面再补上。

我们大致来介绍下每个方法对功能:

  • to/setSender:指定消息发送的对象,比如某个客户端、房间;
  • join/in:将当前用户加入某个房间;
  • leave:离开某个房间;
  • emit:封装消息发送逻辑,之前写在 WebsocketHandler 中的消息发送代码将移植到这里,从而让代码职能更清晰,这里仅作消息发送,不涉及业务逻辑处理;
  • on:用于注册 Websocket 事件路由,注册实现拆分到 routes/websocket.php 中定义,从而方便维护,也让代码结构更加清晰,具体的业务逻辑处理都将在这里定义,处理完成后调用上述 emit 方法发送消息给客户端;
  • eventExists:判断指定事件路由是否存在;
  • call:如果某个事件路由存在,则调用对应的事件路由业务逻辑,所以 oneventExistscall 三者环环相扣:on 负责注册、eventExits 负责匹配、call 负责执行;
  • getFds:获取要发送消息的所有对象;
  • reset:重置某些数据的状态,避免复用。

和房间一样,这里我们也在 app/Services/WebSocket/Facades 目录下为其创建一个门面代理类:

<?php

namespace App\Services\Websocket\Facades;

use Illuminate\Support\Facades\Facade;

/**
 * @method static $this broadcast()
 * @method static $this to($values)
 * @method static $this join($rooms)
 * @method static $this leave($rooms)
 * @method static boolean emit($event, $data)
 * @method static $this in($room)
 * @method static $this on($event, $callback)
 * @method static boolean eventExists($event)
 * @method static mixed call($event, $data)
 * @method static boolean close($fd)
 * @method static $this setSender($fd)
 * @method static int getSender()
 * @method static boolean getIsBroadcast()
 * @method static array getTo()
 * @method static $this reset()
 * @method static $this middleware($middleware)
 * @method static $this setContainer($container)
 * @method static $this setPipeline($pipeline)
 * @method static \Illuminate\Contracts\Pipeline\Pipeline getPipeline()
 * @method static mixed loginUsing($user)
 * @method static $this loginUsingId($userId)
 * @method static $this logout()
 * @method static $this toUser($users)
 * @method static $this toUserId($userIds)
 * @method static string getUserId()
 * @method static boolean isUserIdOnline($userId)
 *
 * @see \App\Services\WebSocket\WebSocket
 */
class Websocket extends Facade
{
    /**
     * Get the registered name of the component.
     *
     * @return string
     */
    protected static function getFacadeAccessor()
    {
        return 'swoole.websocket';
    }
}

至此,我们的 app/Services/WebSocket 目录代码结构如下所示:

Websocket目录结构

这个 Websocket 核心类中涉及到一些绑定到容器的对象解析,以及 Websocket 事件路由注册,接下来我们就来介绍对应的实现代码。

注册 Websocket 事件路由

首先我们在 routes 目录下新建一个 websocket.php 文件来存放这些事件路由:

<?php

use Swoole\Http\Request;
use App\Services\WebSocket\WebSocket;
use App\Services\Websocket\Facades\Websocket as WebsocketProxy;

/*
|--------------------------------------------------------------------------
| Websocket Routes
|--------------------------------------------------------------------------
|
| Here is where you can register websocket events for your application.
|
*/

WebsocketProxy::on('connect', function (WebSocket $websocket, Request $request) {
    // 发送欢迎信息
    $websocket->setSender($request->fd);
    $websocket->emit('connect', '欢迎访问聊天室');

});

WebsocketProxy::on('disconnect', function (WebSocket $websocket) {
    // called while socket on disconnect
});

WebsocketProxy::on('login', function (WebSocket $websocket, $data) {
    if (!empty($data['token']) && ($user = \App\User::where('api_token', $data['token'])->first())) {
        $websocket->loginUsing($user);
        // todo 读取未读消息
        $websocket->toUser($user)->emit('login', '登录成功');
    } else {
        $websocket->emit('login', '登录后才能进入聊天室');
    }
});

之所以叫事件路由,是因为这些路由都是根据客户端传递的事件名称来匹配调用的,这里我们初始化了 connectlogin 这两个事件路由的闭包实现,这里的 WebsocketProxy 对应的是 Websocket 门面类,所以静态 on 方法调用最终还是落到 Websocketon 方法去执行,即注册某个事件对应的业务逻辑,在闭包实现参数中,$websocket 对应的是 call 方法中传递过来的 $this 对象,$data 则是经过 Parser 实现类解析的消息数据。因此,在闭包函数中,我们可以调用 Websocket 类的任何方法,最后再通过 emit 方法将消息发送给客户端。

容器绑定和路由加载

再来看容器绑定,包括门面与代理类的绑定、接口与实现类的绑定,以及上述事件路由何时加载,最合适的时机是在 Swoole Worker Start 事件触发的时候,因为对 laravels 扩展包而言,Laravel 容器是在这之前进行初始化的:

public function onWorkerStart(HttpServer $server, $workerId)
{
    parent::onWorkerStart($server, $workerId);

    // To implement gracefully reload
    // Delay to create Laravel
    // Delay to include Laravel's autoload.php
    $this->laravel = $this->initLaravel($this->laravelConf, $this->swoole);

    // Fire WorkerStart event
    $this->fireEvent('WorkerStart', WorkerStartInterface::class, func_get_args());
}

实现起来也很简单,在应用中监听 Swoole 的 WorkerStart 事件即可(注意不是 Laravel 的事件,所以不能使用 Laravel 事件监听机制来实现),监听该事件注册容器绑定和加载路由的另一个好处是有利于提高应用性能,如果我们是在 onRequestonOpenOnMesssage 之类的事件触发时绑定,则每次请求都要注册一遍,就和基于 PHP-FPM 模式的 Laravel 应用没啥区别了,此外,也不能在 onWorkerStart 执行之前绑定,因为这个时候 Laravel 还没有初始化,还没有 Application 容器这个东西,往哪注册呢?

好了,废话不多说,我们在 app/Events 目录下创建 WorkerStartEvent.php 文件,然后编写容器绑定和路由加载代码如下:

<?php
/**
 * Websocket 相关服务容器绑定和路由加载
 * Author:学院君
 */
namespace App\Events;

use App\Services\WebSocket\Parser;
use App\Services\Websocket\Rooms\RoomContract;
use App\Services\WebSocket\WebSocket;
use Hhxsv5\LaravelS\Swoole\Events\WorkerStartInterface;
use Illuminate\Container\Container;
use Illuminate\Pipeline\Pipeline;
use Swoole\Http\Server;

class WorkerStartEvent implements WorkerStartInterface
{
    public function __construct()
    {
    }

    public function handle(Server $server, $workerId)
    {
        $isWebsocket = config('laravels.websocket.enable') == true;
        if (!$isWebsocket) {
            return;
        }
        // WorkerStart 事件发生时 Laravel 已经初始化完成,在这里做一些组件绑定到容器的初始化工作最合适
        app()->singleton(Parser::class, function () {
            $parserClass = config('laravels.websocket.parser');
            return new $parserClass;
        });
        app()->alias(Parser::class, 'swoole.parser');

        app()->singleton(RoomContract::class, function () {
            $driver = config('laravels.websocket.drivers.default', 'table');
            $driverClass = config('laravels.websocket.drivers.' . $driver);
            $driverConfig = config('laravels.websocket.drivers.settings.' . $driver);
            $roomInstance = new $driverClass($driverConfig);
            if ($roomInstance instanceof RoomContract) {
                $roomInstance->prepare();
            }
            return $roomInstance;
        });
        app()->alias(RoomContract::class, 'swoole.room');

        app()->singleton(WebSocket::class, function (Container $app) {
            return new WebSocket($app->make(RoomContract::class));
        });
        app()->alias(WebSocket::class, 'swoole.websocket');

        // 引入 Websocket 路由文件
        $routePath = base_path('routes/websocket.php');
        require $routePath;
    }
}

注意,我这里判断了只有在 Websocket 服务器启动的情况下,才做相应的 Websocket 服务容器绑定了和路由加载,包括 ParserRoomContractWebSocket 的具体实现绑定和别名设置(可用于门面类的解析),最后,加载 Websocket 事件路由文件让对应的事件路由生效。

最后,不要忘了在 config/laravels.phpevent_handlers 中绑定事件与对应的处理器使其生效:

'event_handlers' => [
    'WorkerStart' => \App\Events\WorkerStartEvent::class,
],

完善 Websocket 配置

在上面的容器绑定实现中,还新增了一些配置项,我们在 config/laravels.phpwebsocket 中补充这些配置:

'websocket' => [
   'enable' => true,
   'handler' => \App\Services\WebSocket\WebSocketHandler::class,
   'middleware' => [
       //\Illuminate\Auth\Middleware\Authenticate::class,
       //\App\Http\Middleware\VerifyCsrfToken::class,
   ],
   'parser' => \App\Services\WebSocket\SocketIO\SocketIOParser::class,
   'drivers' => [
       'default' => 'table',
       'table' => \App\Services\Websocket\Rooms\TableRoom::class,
       'redis' => \App\Services\Websocket\Rooms\RedisRoom::class,
       'settings' => [
           'table' => [
               'room_rows' => 4096,
               'room_size' => 2048,
               'client_rows' => 8192,
               'client_size' => 2048,
           ],
           'redis' => [
               'server' => [
                   'host' => env('REDIS_HOST', '127.0.0.1'),
                   'password' => env('REDIS_PASSWORD', null),
                   'port' => env('REDIS_PORT', 6379),
                   'database' => 0,
                   'persistent' => true,
               ],
               'options' => [
                   //
               ],
               'prefix' => 'swoole:',
           ],
       ],
   ],
],

至此,我们就初步完成后 Websocket 后端的业务逻辑重构,就差最后一步在处理器入口中将它们串联起来了。

重构 WebSocketHandler 处理器

那我们就来补上这一环,重构 App\Services\WebSocket\WebSocketHandler 处理器实现类如下:

<?php
namespace App\Services\WebSocket;

use App\Services\WebSocket\SocketIO\Packet;
use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
use Illuminate\Support\Facades\Log;
use Swoole\Http\Request;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;

class WebSocketHandler implements WebSocketHandlerInterface
{
    /**
     * @var WebSocket
     */
    protected $websocket;
    /**
     * @var Parser
     */
    protected $parser;

    public function __construct()
    {
        $this->websocket = app('swoole.websocket');
        $this->parser = app('swoole.parser');
    }

    // 连接建立时触发
    public function onOpen(Server $server, Request $request)
    {
        // 如果未建立连接,先建立连接
        if (!request()->input('sid')) {
            // 初始化连接信息 socket.io-client
            $payload = json_encode([
                'sid' => base64_encode(uniqid()),
                'upgrades' => [],
                'pingInterval' => config('laravels.swoole.heartbeat_idle_time') * 1000,
                'pingTimeout' => config('laravels.swoole.heartbeat_check_interval') * 1000,
            ]);
            $initPayload = Packet::OPEN . $payload;
            $connectPayload = Packet::MESSAGE . Packet::CONNECT;
            $server->push($request->fd, $initPayload);
            $server->push($request->fd, $connectPayload);
        }
        Log::info('WebSocket 连接建立:' . $request->fd);
        if ($this->websocket->eventExists('connect')) {
            $this->websocket->call('connect', $request);
        }
    }

    // 收到消息时触发
    public function onMessage(Server $server, Frame $frame)
    {
        // $frame->fd 是客户端 id,$frame->data 是客户端发送的数据
        Log::info("从 {$frame->fd} 接收到的数据: {$frame->data}");
        if ($this->parser->execute($server, $frame)) {
            return;
        }
        $payload = $this->parser->decode($frame);
        ['event' => $event, 'data' => $data] = $payload;
        $this->websocket->reset(true)->setSender($frame->fd);
        if ($this->websocket->eventExists($event)) {
            $this->websocket->call($event, $data);
        } else {
            // 兜底处理,一般不会执行到这里
            return;
        }
    }

    // 连接关闭时触发
    public function onClose(Server $server, $fd, $reactorId)
    {
        Log::info('WebSocket 连接关闭:' . $fd);
        $this->websocket->setSender($fd);
        if ($this->websocket->eventExists('disconnect')) {
            $this->websocket->call('disconnect', '连接关闭');
        }
    }
}

之前冗杂的代码都已经拆分到 Websocket 类的事件路由、事件调用以及消息发送实现代码中,整个处理器实现变得更加精简,更加优雅了,在这里,我们将通过 Parser 实现类从客户端数据解析出事件名称和消息内容,然后调用 Websocket 实例的 eventExists 方法判断对应事件路由是否存在,如果存在,则通过 call 方法调用对应闭包函数,处理业务逻辑,最后发送消息给对应客户端。

到这里,我们的 Swoole Websocket 服务端代码就算重构完成了,后面可能还会有细节优化,不过主体结构已经完成,而且扩展性非常好,尤其是事件路由这一块的处理,使得基于 Swoole 的 Laravel 应用真正得以与 Laravel 思想融为一体,这一点要感谢 swooletw/laravel-swoole 提供的设计思路。

Websocket 客户端与服务端的用户认证

如果是基于 Session 进行认证的话,可以完全沿用 swooletw/laravel-swoole 自带的中间件和用户认证实现,不过我们这个项目是前后端分离的,我这里简单在 login 事件路由中通过从请求数据中获取 token 字段进行认证:

WebsocketProxy::on('login', function (WebSocket $websocket, $data) {
    if (!empty($data['token']) && ($user = \App\User::where('api_token', $data['token'])->first())) {
        $websocket->loginUsing($user);
        // todo 读取未读消息
        $websocket->toUser($user)->emit('login', '登录成功');
    } else {
        $websocket->emit('login', '登录后才能进入聊天室');
    }
});

对应的客户端代码如下(resources/js/app.js):

const userId = store.state.userInfo.userid;
const token = store.state.userInfo.token;
if (userId) {
    socket.emit('login', {
        name: userId,
        token: token
    });
}

重新启动 Swoole Websocket 服务器使重构代码生效:

bin/laravels reload

然后再刷新聊天室首页,通过 F12->Network->WS 就可以看到新的 Websocket 通信数据了:

Websocket通信

下一篇教程,我们将实现前后端房间初始化和客户端代码优化。

本项目代码已经提交到 Github 仓库:https://github.com/nonfu/webchat,你可以从这里获取最新代码。

上一篇: 基于 Laravel + Swoole + Vue 搭建实时在线聊天室(七):基于 Muse UI 3.0 的前端用户认证功能实现

下一篇: 没有下一篇了