队列


1、介绍

Laravel队列为不同的后台队列服务提供统一的API,例如Beanstalk,Amazon SQS, Redis,甚至其他基于关系型数据库的队列。队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短Web请求和相应的时间。

队列配置文件存放在config/queue.php。每一种队列驱动的配置都可以在该文件中找到,包括数据库、Beanstalkd Amazon SQS、 Redis以及同步(本地使用)驱动。其中还包含了一个null队列驱动用于那些放弃队列的任务。

连接和队列的关系

在开始使用Laravel队列以前,了解“连接”和“队列”的关系非常重要。在配置文件 config/queue.php 有关于“连接(connections)”的配置项。该配置项定义了后台队列服务类型,如Amazon SQS, Beanstalk, 或Redis. 显而易见,每种连接都可以有很多队列,可以想象在银行办理现金业务的各个窗口队列。

请注意每个连接都有一个“queue”配置项。当新的队列任务被添加到指定的连接时,该配置项的值就是默认监听的队列(名称)。换种说法,如果你没有指派特别的队列名称,那么“queue”的值,也是该任务默认添加到的队列(名称)。

// 以下的任务将被委派到默认队列...
dispatch(new Job);

// 以下任务将被委派到 "emails" 队列...
dispatch((new Job)->onQueue('emails'));

多数应用并不需要将任务分配成多到队列,单个队列已经非常适用。但是,应用的任务有优先级差异或者类别差异的时候,多队列将是更好地选择,尤其Laravel的队列进程已经支持的情况下。举个例子,你可以将高优先级的任务委派到“high”(高优先级)队列,从而让它优先执行。

php artisan queue:work --queue=high,default

驱动预备知识

数据库

如果使用数据库来驱动队列,你需要数据表保存任务信息。可以很容易的通过Artisan命令建立这些表,相关知识需要参考migration,代码示例如下:

php artisan queue:table  //生成数据库队列的migration
php artisan migrate      //创建该数据库队列表

其他驱动预备知识

如果使用以下几种队列驱动,需要相应的依赖:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~3.0
  • Redis: predis/predis ~1.0

2、创建任务

生成任务类

通常,所有的任务类都保存在 app/Jobs 目录.如果app/Jobs 不存在,在运行Artisan命令 make:job 的时候,它将会自动创建。你可以通过Artisan CLI来生成队列任务类:

php artisan make:job SendReminderEmail

生成的类都实现了 Illuminate\Contracts\Queue\ShouldQueue 接口, 告诉Laravel将该任分配到队列,而不是立即运行。

任务类结构

任务类非常简单,通常只包含处理该任务的“handle”方法,让我们看一个任务类的例子。我们模拟管理播客发布服务,并在发布以前上传相应的播客文件:

<?php

namespace App\Jobs;

use App\Podcast;
use App\AudioProcessor;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建任务实例
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行任务
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 执行播客的上传…
    }
}

在本示例中,我们将Eloquent模型作为参数直接传递到构造函数。因为该任务使用了SerializesModels trait,Eloquent模型将会在任务被执行是优雅地序列化和反序列化。如果你的队列任务在构造函数中接收Eloquent模型,只有模型的主键会被序列化到队列,当任务真正被执行的时候,队列系统会自动从数据库中获取整个模型实例。这对应用而言是完全透明的,从而避免序列化整个Eloquent模型实例引起的问题。

handle方法在任务被处理的时候调用,注意我们可以在任务的handle方法中进行依赖注入。Laravel服务容器会自动注入这些依赖。

注:二进制数据,如原生图片内容,在传递给队列任务之前先经过base64_encode方法处理,此外,该任务被推送到队列时将不会被序列化为JSON格式。

3、委派任务

一旦你创建了任务类,就可以通过辅助函数 dispatch 委派它到队列。辅助函数 dispatch 需要的唯一参数就是该任务的实例:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 保存新的播客.
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建新的播客

        dispatch(new ProcessPodcast($podcast));
    }
}
辅助函数dispatch  是全局且易用的函数,而且非常容易测试。请通过Laravel的测试文档可以了解更多细节。

延时任务

有时候你可能想要延迟队列任务的执行,可以通过在任务实例使用delay 方法。该delay 方法由Illuminate\Bus\Queueable trait提供,已经自动添加在通过命令行生成的任务类中。例如你希望将某个任务在创建10分钟以后才执行:

<?php
namespace App\Http\Controllers;

use Carbon\Carbon;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 保存播客.
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        $job = (new ProcessPodcast($pocast))
                    ->delay(Carbon::now()->addMinutes(10));

        dispatch($job);
    }
}
Amazon SQS 的队列服务最长延时15分钟。

自定义队列和连接

委派到指定的队列

由于任务可推送到的不同队列,你应该将队列任务进行“分类”,甚至根据优先级来分配每个队列的进程数。请注意,这并不意味着使用了配置项中那些不同的连接来管理队列,实际上只有单一连接会被用到。要指定队列,请在任务实例使用 onQueue 方法:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 保存播客
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客

        $job = (new ProcessPodcast($podcast))->onQueue('processing');

        dispatch($job);
    }
}

委派到指定的连接

如果你的项目使用多个连接来管理队列,那么你会用到委派任务到指定的连接。请在任务实例使用 onConnection 方法来指定连接:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 保存播客
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客

        $job = (new ProcessPodcast($podcast))->onConnection('sqs');

        dispatch($job);
    }
}

当然,你可以同时使用 onConnection 和onQueue 方法来切换不同的连接和队列:

$job = (new ProcessPodcast($podcast))
                ->onConnection('sqs')
                ->onQueue('processing');

处理错误

如果任务在处理的时候有异常抛出,则该任务将会被自动释放回队列以便再次尝试执行。任务会持续被释放直到尝试次数达到应用允许的最大次数。最大尝试次数通过Artisan任务queue:listenqueue:work上的--tries开关来定义。关于运行队列监听器的更多信息可以在下面看到。

4、运行队列进程

Laravel 包含了一个 Artisan 命令用来运行被推送到队列的新任务。你可以使用 queue:work  命令运行队列进程。请注意,队列进程开始运行后,会持续监听队列,直至你手动停止或php周期结束停止。

php artisan queue:work
注:为了保持队列进程queue:work 持续在后台运行,你需要使用进程守护程序,比如 Supervisor 来确保队列进程持续运行。

请记住,队列进程是长生命周期的进程,会在启动后驻留内存。若应用有任何改动将不会影响到已经启动的进程。所以请在发布程序后,重启队列进程。

指定连接和队列

队列进程同样可以自定义连接和队列。队列进程使用到的名称必须在config/queue.php 中已经配置:

php artisan queue:work redis

你可以自定义将某个队列进程指定某个连接来管理。举例来说,如果所有的邮件任务都是通过redis 连接管理,那么可以用以下示例代码来启动单一进程仅仅出来单一队列:

php artisan queue:work redis --queue=emails

队列优先级

有时候你需要区分任务的优先级。比如,在配置文件config/queue.php 中,你可能定义连接 redis 的默认队列为 low. 可是,偶尔需要将任务委派到高优先级high , 可以这么做:

dispatch((new Job)->onQueue('high'));

如果期望所有high 高优先级的队列都将先于low 低优先级的任务执行,可以像这样启动队列进程

php artisan queue:work --queue=high,low

队列进程 & 部署

前文已经提到队列进程是长生命周期的进程,在重启以前,所有源码的修改并不会对其产生影响。所以,最简单的方法是在每次发布新版本后重新启动队列进程。你可以通过Aritisan命令queue:restart 来优雅的重启队列进程:

php artisan queue:restart

该命令将在队列进程完成正在进行的任务后,结束该进程,避免队列任务的丢失或错误。你仍然需要通过Supervisor 进程守护程序来确保队列进程的运行。

任务过期和超时

任务执行时间

在配置文件 config/queue.php 中,每个连接都定义了 retry_after 项。该项的目的是定义任务在执行以后多少秒后释放回队列.如果retry_after 设定的值为 90, 任务在运行90秒后还未完成,那么将被释放会队列而不是删除掉。毫无疑问,你需要把 retry_after 的值设定为任务执行时间的最大可能值。

注:只有亚马逊 SQS配置信息不包含 retry_after 项。亚马逊SQS 的任务执行时间基于 Default Visibility Timeout ,该项在亚马逊AWS控制台配置。

队列进程超时

队列进程 queue:work 可以设定超时--timeout 项。该 --timeout 控制队列进程执行每个任务的最长时间,如果超时,该进程将被关闭。各种错误都可能导致某个任务处于“冻结”状态,比如HTTP无响应等。队列进程超时就是为了将这些“冻结”的进程关闭。

php artisan queue:work --timeout=60

配置项 retry_after 和 Aritisan参数项 --timeout 不同,但目的都是为了确保任务的安全,并且只被成功的执行一次。

注:参数项 --timeout 的值应该是中小于配置项retry_after 的值,这是为了确保队列进程总在任务重试以前关闭。如果--timeout 比retry_after 大,则你的任务可能被执行两次。

5、守护进程Supervisor的配置

安装Supervisor

Supervisor是Linux系统中常用的进程守护程序。如果队列进程queue:work意外关闭,它会自动重启启动队列进程。在Ubuntu安装Supervisor 非常简单:

sudo apt-get install supervisor
注:如果自己配置Supervisor有困难,可以考虑使用Laravel Forge,它会为Laravel项目自动安装并配置Supervisor。

配置Supervisor

Supervisor配置文件通常存放在/etc/supervisor/conf.d目录,在该目录中,可以创建多个配置文件指示Supervisor如何监视进程,例如,让我们创建一个开启并监视queue:work进程的laravel-worker.conf文件:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

在本例中,numprocs指令让Supervisor运行8个queue:work进程并监视它们,如果失败的话自动重启。配置文件创建好了之后,可以使用如下命令更新Supervisor配置并开启进程:

启动Supervisor

当你成功创建配置文件后,你需要刷新Supervisor 的配置信息:

sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*

你可以通过Supervisor官方文档获的更多信息 Supervisor文档.

6、处理失败的任务

不可避免会出现失败的任务。不必担心,Laravel很容易设置任务允许的最大尝试次数,若是执行次数达到该限定,任务会被插入到failed_jobs表,失败任务的名字可以通过配置文件config/queue.php来配置。

要创建一个failed_jobs表的迁移,可以使用queue:failed-table命令

php artisan queue:failed-table
php artisan migrate

通过 --tries 参数项来设置队列任务允许的最大尝试次数:

php artisan queue:work redis --tries=3

清理失败的任务

你可以在任务类中定义 failed 方法,  从而允许你在失败发生时执行指定的动作,比如发送任务失败的通知,记录日志等。异常抛出的的时候,就会传递到failed 方法:

<?php

namespace App\Jobs;

use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * Create a new job instance.
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * Execute the job.
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // Process uploaded podcast...
    }

    /**
     * The job failed to process.
     *
     * @param  Exception  $exception
     * @return void
     */
    public function failed(Exception $e)
    {
        // 发送失败通知, etc...
    }
}

任务失败事件

如果你期望在任务失败的时候触发某个事件,可以使用Queue::failing 方法。该事件通过邮件或HipChat通知团队。举个例子,我么可以在Laravel自带的AppServiceProvider中附加一个回调到该事件:

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Bootstrap any application services.
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }

    /**
     * Register the service provider.
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

重试失败的任务

要查看已插入到failed_jobs数据表中的所有失败任务,可以使用Artisan命令queue:failed

php artisan queue:failed

该命令将会列出任务ID,连接,对列和失败时间,任务ID可用于重试失败任务,例如,要重试一个ID为5的失败任务,要用到下面的命令

php artisan queue:retry 5

要重试所有失败任务,使用如下命令即可:

php artisan queue:retry all

如果你要删除一个失败任务,可以使用queue:forget命令:

php artisan queue:forget 5

要删除所有失败任务,可以使用queue:flush命令

php artisan queue:flush

7、任务事件

当你使用Queue门面的时候,可以使用 before 和 after 方法。你可以自定义在任务开始前或者结束后执行某个回调。这些回调可用来记录日志或者记录统计数据。通常,你可以在服务提供者中使用这些 方法。比如,我们可能在AppServiceProvider 这样用:

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Bootstrap any application services.
     *
     * @return void
     */
    public function boot()
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }

    /**
     * Register the service provider.
     *
     * @return void
     */
    public function register()
    {
        //
    }
}
声明:本文档由网友AC1982提供翻译支持,学院君进行校注。感谢AC1982的努力和付出!

点赞 取消点赞 收藏 取消收藏

<< 上一篇: 通知

>> 下一篇: 起步