【Laravel-海贼王系列】第十章,Job&队列存储端实现

1,684 阅读3分钟

任务的存储端

Job 的定义

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class TestJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct()
    {
        echo '开始构造Job';
    }
    
    public function handle()
    {
        echo '开始处理Job';
    }
}

新建的 TestJob 类,这个类实现了序列化模型,队列功能等等都是通过trait类来补充的。 这些特性我们通过使用来分解。

运行一个任务

dispatch(new TestJob());

这里就是执行一个 TestJob 的任务,接下去看看 dispatch() 这个方法

function dispatch($job)
    {
        if ($job instanceof Closure) {
            $job = new CallQueuedClosure(new SerializableClosure($job));
        }

        return new PendingDispatch($job);
    }

这里会返回一个 Illuminate\Foundation\Bus\PendingDispatch 对象

TestJob 这个对象里面通过 use Queueable 引入的几个成员属性。 目前为止我们看到只不过是实例化了一个对象,同时将 TestJob 传给 PendingDispatch

我们来解读 PendingDispatch 这个类

<?php

namespace Illuminate\Foundation\Bus;

use Illuminate\Contracts\Bus\Dispatcher;

class PendingDispatch
{
    protected $job;
   
    public function __construct($job)
    {
        // "接收传入的 job 对象"
        $this->job = $job; 
    }

    public function onConnection($connection)
    {
        // "设置任务指定连接"
        $this->job->onConnection($connection); 

        return $this;
    }

    public function onQueue($queue)
    {
        // "设置任务队列名"
        $this->job->onQueue($queue);

        return $this;
    }

    public function allOnConnection($connection)
    {
        // "设置工作链所有需要的连接"
        $this->job->allOnConnection($connection);

        return $this;
    }

    public function allOnQueue($queue)
    {
        // "设置工作链的队列"
        $this->job->allOnQueue($queue);

        return $this;
    }

    public function delay($delay)
    {
        // "设置延迟时间"
        $this->job->delay($delay);

        return $this;
    }
  
    public function chain($chain)
    {
        // "设置工作链任务"
        $this->job->chain($chain);

        return $this;
    }

    public function __destruct()
    {
        // "通过析构函数来转发job"
        app(Dispatcher::class)->dispatch($this->job);
    }
}

分解完这个类,其实大部分都是设置参数的过程,也是通过这些参数来控制任务的执行状态,比如延迟,工作链模式运行等等。

重点在析构函数,当运行完 return new PendingDispatch($job); 之后对象如果没有被任何变量接收,那么对象的内存空间会被回收,从而触发析构函数执行,也是触发 job 继续执行的方式!

public function __destruct()
{
    // "通过析构函数来转发job"
    app(Dispatcher::class)->dispatch($this->job);
}

获取任务对应的解析器

app(Dispatcher::class) 传入的参数是 Illuminate\Bus\Dispatcher , 这个契约对应的绑定类是通过配置文件 app.providers.Illuminate\Bus\BusServiceProvider::class 来加载的 关于 provider 的启动在第九章中有讲,我们直接看启动方法

public function register()
{
    $this->app->singleton(Dispatcher::class, function ($app) {
        return new Dispatcher($app, function ($connection = null) use ($app) {
            return $app[QueueFactoryContract::class]->connection($connection);
        });
    });

    $this->app->alias(
        Dispatcher::class, DispatcherContract::class
    );

    $this->app->alias(
        Dispatcher::class, QueueingDispatcherContract::class
    );
}

app(Dispatcher::class) 的实质就是这个闭包的返回

function ($app) {
    return new Dispatcher($app, function ($connection = null) use ($app) {
        return $app[QueueFactoryContract::class]->connection($connection);
    });
}

看看 Dispatcher 构造函数

public function __construct(Container $container, Closure $queueResolver = null)
{
    $this->container = $container;
    $this->queueResolver = $queueResolver;
    $this->pipeline = new Pipeline($container);
}

接受两个参数,第一个是容器,第二个就是闭包所以 $this->queueResolver 就是

function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
         }

我管这个 $this->queueResolver 叫解析器,作用是接收一个 $connection 然后从容器中解析出队列的驱动并进行连接。

QueueFactoryContract::class 是通过 provider 加载的

位于 app.providers.Illuminate\Queue\QueueServiceProvider::class,

返回的对象是 Illuminate\Queue\QueueManager 由于 'default' => env('QUEUE_CONNECTION', 'sync'),

中配置的 redis 所以最后返回的对象是 Illuminate\Queue\RedisQueue

分发任务到队列

public function dispatch($command)
{
    // "$this->queueResolver 这个队列解析器是在构造的时候注入的"
    if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
        return $this->dispatchToQueue($command);
    } 

    return $this->dispatchNow($command);
}

上面的方法明确了任务是该通过队列还是同步执行。

这里我们看,传入的 $command 就是开始的 TestJob 对象。

还记得 Laravel 文档说的如果要通过队列实现需要实现一个指定的接口吗

implements ShouldQueue,这段代码就是解释了原因。

protected function commandShouldBeQueued($command)
{
    return $command instanceof ShouldQueue;
}

继续下去,通过上面的判断之后我们进入 dispatchToQueue($command) 这里

public function dispatchToQueue($command)
{
    $connection = $command->connection ?? null;

    $queue = call_user_func($this->queueResolver, $connection);

    if (! $queue instanceof Queue) {
        throw new RuntimeException('Queue resolver did not return a Queue implementation.');
    }

    if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    }

    return $this->pushCommandToQueue($queue, $command);
}

上面解析过了 $queue 就是 Illuminate\Queue\RedisQueue 这个对象

// "返回 false"
if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    }

所有最后执行了 return $this->pushCommandToQueue($queue, $command);

protected function pushCommandToQueue($queue, $command)
{
    // "如果存在指定的队列和延迟,则推入指定队列+延迟"
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    } 

    // "如果存在指定的队列则push到指定的队列"
    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }

    // "只存在延迟设置,推入延迟"
    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }

    // "默认"
    return $queue->push($command);
}

构造数据

上面已经到了最终的调用,那么接下来的事情就是构造一个什么样格式的数据存入redis

追踪 $queue->push($command)

// "这里的 $job 就是最开始传入的 TestJob 对象!"

public function push($job, $data = '', $queue = null)
{
    return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}

构造 payload

protected function createPayload($job, $queue, $data = '')
{
    $payload = json_encode($this->createPayloadArray($job, $queue, $data));

    if (JSON_ERROR_NONE !== json_last_error()) {
        throw new InvalidPayloadException(
            'Unable to JSON encode payload. Error code: '.json_last_error()
        );
    }

    return $payload;
}

这里的 createPayloadArray() 先调用Illuminate\Queue\RedisQueue对象的

protected function createPayloadArray($job, $queue, $data = '')
{
    return array_merge(parent::createPayloadArray($job, $queue, $data), [
        'id' => $this->getRandomId(),
        'attempts' => 0,
    ]);
}

追踪父类Illuminate\Queue\Queue 方法

protected function createPayloadArray($job, $queue, $data = '')
{
    return is_object($job)
                ? $this->createObjectPayload($job, $queue)
                : $this->createStringPayload($job, $queue, $data);
}    
    
// "$job 是对象的时候格式化方式"
protected function createObjectPayload($job, $queue)
{
    $payload = $this->withCreatePayloadHooks($queue, [
        'displayName' => $this->getDisplayName($job),
        'job' => 'Illuminate\Queue\CallQueuedHandler@call',
        'maxTries' => $job->tries ?? null, // "这是任务设置的重试次数"
        'timeout' => $job->timeout ?? null, // "这是超时时间"
        'timeoutAt' => $this->getJobExpiration($job), // "获取处理过期时间"
        'data' => [
            'commandName' => $job,
            'command' => $job,
        ],
    ]);

    return array_merge($payload, [
        'data' => [
            'commandName' => get_class($job),
            'command' => serialize(clone $job), 
                        // "序列化,这里的序列化会调用
                           SerializesModels 特质类的__sleep()方法
                           在开头的时候所有的 Job 类都有use"
        ],
    ]);
}
    
// "$job 是字符串的时候格式化方式"
protected function createStringPayload($job, $queue, $data)
{
    return $this->withCreatePayloadHooks($queue, [
        'displayName' => is_string($job) ? explode('@', $job)[0] : null,
        'job' => $job,
        'maxTries' => null,
        'timeout' => null,
        'data' => $data,
    ]);
}    

将获取的最后 json 字符串 rpushredis 中。

public function pushRaw($payload, $queue = null, array $options = [])
{
    $this->getConnection()->rpush($this->getQueue($queue), $payload); 
    return json_decode($payload, true)['id'] ?? null; 
}    

至于延迟任务return $queue->later($command->delay, $command);, 逻辑基本上一样,只不过最后存入的队列是名不一样

小结

到这里位置关于任务和队列的应用写入端口已经完成,最终是把指定的格式的数据存入配置的存储驱动中的过程。