PHP 多进程处理任务

1,684 阅读1分钟

pcntl 模块(非 Unix 类系统不支持此模块)

一个 PHP 多进程简单例子大概是这个样子:

// 5 个子进程处理任务
for ($i = 0; $i < 5; $i++) {
    $pid = pcntl_fork();
    if ($pid == -1) {
        die("could not fork");
    } elseif ($pid) {
        echo "I'm the Parent $i\n";
    } else { // 子进程处理
        echo "I'm the Child $i\n";
        // 业务处理
        exit($i); // 一定要注意退出子进程,否则 pcntl_fork() 会被子进程再 fork,带来处理上的影响。
    }
}

// 等待子进程执行结束
while (pcntl_waitpid(0, $status) != -1) {
    $status = pcntl_wexitstatus($status);
    echo "Child $status completed\n";
}

当然实际应用中我们不能够这样输出代码,不够健壮,也不够优雅,我所以找了个基于 pcntl 封装的扩展包来使用。

spatie/async - 基于 pcntl 封装的扩展包

以下是我使用 spatie/async 来优化一个多进程请求的例子

原代码(耗时 20s 左右)- github.com/guanguans/m…

原代码
/**
 * @param string $keyword
 *
 * @return array
 */
public function searchAll(string $keyword): array
{
    $songAll = [];

    foreach ($this->platforms as $platform) {
        $songAll = array_merge($songAll, $this->search($platform, $keyword));
    }

    return $songAll;
}

/**
 * @param string $platform
 * @param string $keyword
 *
 * @return mixed
 */
public function search(string $platform, string $keyword)
{
    $meting = $this->getMeting($platform);

    $songs = json_decode($meting->format()->search($keyword), true);

    foreach ($songs as $key => &$song) {
        $detail = json_decode($meting->format()->url($song['url_id']), true);
        if (empty($detail['url'])) {
            unset($songs[$key]);
        }
        $song = array_merge($song, $detail);
    }
    unset($song);

    return $songs;
}

改进后(耗时 4s 左右)- github.com/guanguans/m…

改进后
/**
 * @param string $keyword
 *
 * @return array
 */
public function searchAll(string $keyword): array
{
    $songAll = [];
    $pool = Pool::create();
    foreach ($this->platforms as $platform) {
        $pool->add(function () use ($platform, $keyword) {
            return $this->search($platform, $keyword);
        }, $this->getSerializedOutput())->then(function ($output) use (&$songAll) {
            $songAll = array_merge($songAll, $output);
        })->catch(function (\Throwable $exception) {
            exit($exception->getMessage());
        });
    }
    $pool->wait();

    return $songAll;
}

/**
 * @return mixed
 */
public function search(string $platform, string $keyword)
{
    $meting = $this->getMeting($platform);
    $songs = json_decode($meting->format()->search($keyword), true);

    $pool = Pool::create();
    foreach ($songs as $key => &$song) {
        $pool->add(function () use ($meting, $song) {
            return json_decode($meting->format()->url($song['url_id']), true);
        })->then(function ($output) use (&$songs, &$song, $key) {
            $song = array_merge($song, $output);
            if (empty($song['url'])) {
                unset($songs[$key]);
            }
        })->catch(function (\Throwable $exception) {
            exit($exception->getMessage());
        });
    }
    unset($song);
    $pool->wait();

    return $songs;
}

相关链接

原文链接