【laravel】消息队列在laravel中的使用

4,828 阅读3分钟

作者:不洗碗工作室 - molym
出处:helloyz.cn/article/34
版权归作者所有,转载请注明出处

前言

消息队列这层中间件在分布式系统中是重要的组件,主要能解决异步消息,流量削峰等问题,在laravel中有一套框架提供的消息队列,Laravel 的队列服务为不同的队列后端系统,比如 Beanstalk,Amazon SQS,Redis,甚至是关系型数据库,提供了一套统一的 API 。队列允许你将一个耗时的任务进行延迟处理,例如像 e-mail 发送。这能让应用程序对页面的请求有更快的响应。

驱动的设置

我们使用database 这个队列驱动的话,需要一张表来存储任务的命令,在laravel中,提供了queue:table这个artisan命令来创建job表的迁移,非常方便

php artisan queue:table
php artisan migrate

配置的修改

任务默认是用同步的方式执行的,要使用队列,分为两步

  • 首先需要在 .env 文件中修改QUEUE_DRIVER=database
  • 我们在config\queue.php中可以看到
 /*
    |--------------------------------------------------------------------------
    | Default Queue Driver
    |--------------------------------------------------------------------------
    |
    | Laravel's queue API supports an assortment of back-ends via a single
    | API, giving you convenient access to each back-end using the same
    | syntax for each one. Here you may set the default queue driver.
    |
    | Supported: "sync", "database", "beanstalkd", "sqs", "redis", "null"
    |
    */

    'default' => env('QUEUE_DRIVER', 'sync'),

laravel为我们提供了多样的队列驱动,并在上层封装好了api,调用的方法都是一样的,对开发者是透明的,只要在这里设置使用哪个驱动就好了,我们把这里的sync修改为database

创建任务

laravel将队列的任务类都默认放在了app/jobs目录下,使用如下命令会生成一个新的队列任务(目录也会被创建如果它不存在的话)

php artisan make:job SendJugdeRequest

生成的类实现了Illuminate\Contracts\Queue\ShouldQueue 这样任务将会进入队列,不再跟以往一样同步执行

任务类的结构

包含了一个让队列用来调用此任务的handle方法,这里可以是调用一个比较耗时的service,比如我这里举的例子是调用我们的判题机器,得到判题数据

<?php

namespace App\Jobs;

use App\Jobs\Job;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Bus\SelfHandling;
use Illuminate\Contracts\Queue\ShouldQueue;
use Requests;

class SendJudgeRequest implements ShouldQueue

{
    use InteractsWithQueue, SerializesModels;




    /**
     * Create a new job instance.
     */
    public function __construct()
    {

    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle(RoleRepository $repository)
    {


        $data = array
            ("src" => "# include<stdio.h> \n int main()\n  {\n printf(\"1\");\n return 0;\n}",
            "language" => "c",
            "max_cpu_time" => 1000,
            "max_memory" => 395671011,
            "test_case_id" => 1001);

        $res = Requests::post("http://domain.com", array("token" => "domain","Content-Type"=>"application/json"), json_encode($data));
        $repository->insert(['result'=>$res->body]);

        return;
    }
}

分发任务

写好任务类,通过dispatch就可以分发它了,传递这个函数的参数是这个任务类的实例,是不是很方便

<?php

namespace App\Http\Controllers;

use App\Jobs\SendJudgeRequest;


class TestController extends Controller
{
    function test(){

       $this->dispatch(new SendJudgeRequest());

    }
}

延时分发

为了避开高峰期,可以延迟执行队列中的任务,可以使用任务实例的delay方法。这个方法是Illuminate\Bus\Queueable
trait提供的,这个在自动生成的任务类中都有默认添加

$job = (new JobInstance()->delay(Carbon::now()->addMinutes(5);
dispath($job)

比如延迟个5分钟

还有一些自定义的队列,可以查看手册,这里就不赘述了

运行队列处理器

php artisan queue:work/listen

你必须让它一直在后台运行,不然队列任务不会被执行,而且做了改动后必须通过重启处理器,才能把修改应用到队列上来

任务过期 & 超时

  • 任务过期
    config/queue.php配置文件中,每个队列都定义了retry_after选项,指定了任务最长处理时间,超时的任务会被释放回队列
  • 队列处理器超时
    --timeout选项,这个选项指定了laravel队列处理器最多执行时间后就应该被关闭掉,这个选项可以帮助你移除僵死进程

上述的两个选项--timeout应该比retry_after短几秒钟,这样保证任务进程在重试前就被杀死了,否则任务会被执行两次

请求

队列在第一次启动的时候,可能需要几秒钟才有返回,之后都能在几十毫秒就能直接返回结果,这样一来,用户的体验是很棒的。