【Laravel-海贼王系列】第十九章, 精通 Horizon 之后可以为所欲为(上)

1,543 阅读3分钟

关于 Horizon

HorizonLaravel 作者对框架 Queue 的一个补充包,提供了友好的 UI 界面和 Supervisor 进程管理


启动

`php artisan horizon`

完整结构

Horizon 在 Redis 中的完整结构 , 后面解释字段含义。

源码解析

开始进入源码部分了,请坐稳扶好, 这是我们启动 Horizon 的时候触发的代码,标注了 6 处待分析的地方

public function handle(MasterSupervisorRepository $masters)
{
    if ($masters->find(MasterSupervisor::name())) {
        return $this->comment('A master supervisor is already running on this machine.');
    }  1️⃣ 

    $master = (new MasterSupervisor)->handleOutputUsing(function ($type, $line) {
        $this->output->write($line);
    }); 2️⃣

    ProvisioningPlan::get(MasterSupervisor::name())->deploy(
        $this->option('environment') ?? config('horizon.env') ?? config('app.env')
    ); 3️⃣

    $this->info('Horizon started successfully.');

    pcntl_async_signals(true); 4️⃣

    pcntl_signal(SIGINT, function () use ($master) {
        $this->line('Shutting down...');

        return $master->terminate();
    }); 5️⃣

    $master->monitor(); 6️⃣
}

6 块代码分析

  • 1️⃣ 这里就是生成一个 gethostname() + Str::random(4)Master 进程名称,如果不幸产生了同名的进程,那么直接返回一个错误提醒~

  • 2️⃣初始化 MasterSupervisor 对象

    $this->name 在第一步就已经生成了静态的名字了

    $this->supervisors 可以看出给了一个集合,一看就是要支持多个进程的样子

    $this->output 第二步就是让 output 属性能拥有输出到缓冲区的能力

    最后一步是刷新,刷新高清大图中的 horizon:master:gethostname()+Str::random(4) 这个进程名称

    关于这个 flush 方法干了啥大家一定很好奇 $this->connection()->del('commands:'.$name);

    这就是它做的事情,去 Redis 删除这个 key。


  • 3️⃣过了四级的我翻译了一下:

    创建一个资源分配计划!

    这个就是 Horizon 的配置文件,里面的 processes 进程数,balance 进程分配策略等等各项参数

    就是通过这个 ProvisioningPlan 类解析的内容。

    🏁这里我们解析一下它对应的方法

    ProvisioningPlan::get(MasterSupervisor::name())
    这时候相当于得到了一个具有指定配置文件和进程 `Name` 的对象。
    

    接着

    (new ProvisioningPlan)->deploy(
        $this->option('environment') ?? config('horizon.env') ?? config('app.env')
    ); 
    

    部署指定环境的配置,这段代码目的是将所有后面要执行的指令先 rpushRedis 中。

    实际就是执行下图方法,将参数存到 Redis

    上图中的代码不会立即创建这些数据,而是暂存了相关创建指令到 Redis

    存到Redis的格式是,变量代表本机名称: 'commands:master:gethostname()+Str::random(4)': json_encode([AddSupervisor::class,$options])

    $options 如图

    这样理解,就是把操作和配置存到 Redis,后面要执行的时候就可以查出所有相关的数据生成对应的进程!

    🏁动手确认

    执行 php artisan horizon 去看看 Redis 中的变化

    要不是这个 dd() 这个数据立马就被消费了!

    我们此时可以大胆猜想下图中就是后续要创建的进程!

    这时候大家肯定有疑问了,接下来 Horizon 将会在什么时候消费掉 Redis 中的数据,然后生成对应的 进程呢。这个就要继续往后看了!


  • 4️⃣这里没什么好讲的,在php7.1之后,有了新的信号处理函数:pcntl_async_signals,返回或设置是否异步信号处理。


  • 5️⃣这里注册一个 terminate的信号,功能是关闭进程。比如我们在 Mac OsCommand + C的时候就是发送这个信号。我们先大概看一下,是如何关闭所有进程的,具体实现放到后面的篇幅讲。


  • 6️⃣ 接下来就是核心部分了

    • $this->ensureNoOtherMasterSupervisors(),确保是否存在相同名字的进程,如果有,就抛出异常。

    • $this->listenForSignals(),注册将要处理的信号

    • $this->persist()master supervisors 持久化到 Redis,后面图解解析后的数据

    • 接着执行 $this->loop(), 启动进程。

    • $this->processPendingCommands(); 这行就是从 Redis 取出刚刚保存的数据

    • 开始启动进程 $this->monitorSupervisors()

    上面的 supervisors 里面信息量很大,接下来就是循环 supervisors

    执行 Symfony\Component\Process()->start() 启动 supervisors 所有相关的子进程。

    然后执行里面的 commandline 中的命令,执行部分如下

    • 启动完成后,打印一下本机的进程,可以看到和刚才执行的命令相同


总结

通过各种方式将要启动的进程相关数据保存到 MasterSupervisor 对象中,在最后根据对象中的数据创建进程。

进程的终止,暂停,重启通过 pcntl 拓展实现监听信号实现,进程创建通过 Symfony\Component\Processstart()方法

进行实现。