Swoole-Demo(4) Yield协程

877 阅读4分钟
原文链接: zhuanlan.zhihu.com

上一章,我们讲到如何使用swoole实现数据库连接池,可是为了要实现控制器同一个方法内多次查询必须嵌套各种callback函数,人都要疯了。

public function getUserinfo(){
    $sql = "select * from users where id = 1";
    Pool::getInstance()->query($sql ,function($user){
        $sql = "select * from address where user_id = {$user['user_id']}"; 
        Pool::getInstance()->query($sql , function($result) use ($user){
            $this->context->response->end(json_encode(["user" => $user, "address" => $result]));
        });
    });
}

试想,如何不用callback的方式来实现如下的结果呢?

public function getUserinfo(){
   [1] $user = Pool::getInstance()->query("select * from users where id = 1");
   [2] $address = Pool::getInstance()->query("select * from address where user_id = {$user['user_id']}");
   [3] $this->context->response->end(json_encode(["user" => $user, "address" => $address]));
}

嗯,对,看来这才是我们真正需要的同步代码,而不是烦死人的异步回调,那么有一个问题在这,如何实现?

首先,来看看我们期望的代码的执行过程,

完了,我swoole_mysql->query是要有第二个callback参数的,你这个封装的把我的callback都封装没了?so 这种方案一看就不合理。

要是能让程序在这里给我$user一个值就好了,我下面的方法就能继续写了。忽然想到了一段代码:

function logger(){
    while(true) {
        $log = yield . "\n" ;
        echo $log ;
    }
}
$l = logger(); 
$l->send("第一段日志");
$l->send("第二段日志");

运行一下,会发现在屏幕上输出

第一段日志
第二段日志

先暂且不讨论这段代码的运行原理,我惊叹的地方是 logger 函数明明没有参数,我居然能变相的向他传递值进去,并且把这个值赋给 $log变量。

看来代码块[1] 处的实现有戏了。不就是说让我们输送一个值进去给 $user 然后让程序继续运行吗、代码[2] 处也是一样实现,也就是说我们只要想办法把执行sql的结果返回过去给$user就行了。

于是代码会变成这个样子

public function getUserinfo(){
    $user = yield Pool::getInstance()->query("select * from users where id = 1");
    $address = yield Pool::getInstance()->query("select * from address where user_id = {$user['user_id']}");
    $this->context->response->end(json_encode(["user" => $user, "address" => $address]));
}

对比一下上面的 logger 函数,发现我们少了一个可以给 yield 传送数据的东西,我暂时把这个东西叫做调度器。

回想一下swoole_http_server的代码

$server = new swoole_http_server("0.0.0.0", 9501);
$server->on("request",function($request,$response){
    // 获取控制器
    // 注入上下文->控制器
    // $返回结果 = 执行控制器方法 
});
$server->start();

那么现在 返回结果是 yield生成的迭代器Generator。但是迭代器只执行了第一个yield,并没有继续向下处理,怎么办呢?于是我们要自己调度该请求过程中迭代器对象中所有的步骤一步一步完成.

class Controller {
    protected $context;
    public function getUserinfo(){
        $user = yield Pool::getInstance()->query("select * from users where id = 1");
        $address = yield Pool::getInstance()->query("select * from address where user_id = {$user['user_id']}");
        $this->context->response->end(json_encode(["user" => $user, "address" => $address]));
    }
    public function setContext(Context $context){$this->context = $context;}
}
class Context{
    public $request;  // $request
    public $response; // $response
    public function __construct($request,$response){
        $this->request = $request;
        $this->response = $response;
    }
}

$server = new swoole_http_server("0.0.0.0", 9501);
$server->on("request",function($request,$response){
    $context = new Context($request,$response);
    $controller = new Controller();
    $controller->setContext($context);
    $result = $controller->getUserinfo();
    if($result instanceof Generator) {
        // yield 迭代器需要自己调度.  
    }
});
$server->start();

我的思路:

  • 定义一个调度器类,全局唯一
  • 定义一个任务类,一次请求如果返回的是Generator,就用任务类封装成任务,push进调度器的任务队列
  • 调度器每隔一段时间运行一次,遍历该任务队列,并且执行任务,而任务就针对控制器返回的Generator进行send数据

好吧,看样子可以解决了

// 调度器
class Scheduler{
    protected $taskList = [];
    protected $tickerTime = 1 * 1000;
    public function newTask(Task $task)
    {
        $this->taskList [] = $task ;
    }

    public function ticker()
    {
        swoole_timer_tick($this->tickerTime, function($timerId){
            foreach ($this->taskList as $key => $task)
            {
                if($task->isFinished()){
                    unset($this->taskList[$key]);
                }
                $task->run();
            }
        });
    }
}

// 任务类 
class Task{
    public $generator ;
    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
       // 具体的 与控制器通信逻辑. 
    }
   
    public function isFinished(){
       // todo 
    }
}

// onrequest 
$scheduler = new Scheduler();
$server->on("request",function($request,$response) use($scheduler) {
    $context = new Context($request,$response);
    $controller = new Controller();
    $controller->setContext($context);
    $result = $controller->getUserinfo();
    if($result instanceof Generator) {
         $task = new Task($result);
         $scheduler->newTask($task);
    }
});
$scheduler->ticker();
$server->start();

接下来就是看 Task::run() 具体需要干什么了。我们来分析一下:

   public function getUserinfo(){
        $user = [1] yield Pool::getInstance()->query("select * from users where id = 1");
        $address = [2] yield Pool::getInstance()->query("select * from address where user_id = {$user['user_id']}");
        [3] $this->context->response->end(json_encode(["user" => $user, "address" => $address]));
    }
  • 第一遍运行的时候 Task::generator 的值 是 [1] 处的 yield query()方法的返回值.
  • 我们需要根据 [1]处的值来判断是否需要进行 send 操作,将query() 的结果发送到 $user . 由于数据库执行是有时间延迟的,并不能保证第一次运行task的时候query()函数就有值了,因此调度器的执行时间间隔得把握的稍微小一点,当 task 命中query()有值的时候,立即将该值返回,并交给$user. 然后向下执行,
  • 在 2 处又返回了一个Generator对象,于是又重复进行调度,等到第二个query()有值的时候,把这个值发送给$address. 然后继续向下执行,
  • 在 3 处已经没有yield了,直接一次运行通过了。将response发送到客户端。
Schedule 任务调度

根据推断,我们的query()函数到底需要返回什么才是程序需要的?

  • query() 里面应该是拿不到 sql 的执行结果的,因为是异步的。

假想一下:给Pool 对象两个属性:

  • $results [] 结果集
  • $resultKey 结果集对应的key,

我们在异步查询之前生成好key. 然后异步查询完了之后,把key所对应的result放进$results[]数组里面,任务队列拿到这个key. 再轮询从$results[]里面判断key存在不存在。ok:

class Pool{
    protected $results = [] ;

    protected $resultKey = 0 ;

    public function query($query)
    {
        $db = $this->connections->shift();
        $key = $this->resultKey ++ ;
        $db->query($query ,function($db , $result) use($key){
            $this->connections->push($db);
            $this->results [$key] = $result ;
        });
        return $key ;
    }

    public function getResult($key){
          if(isset($this->results [$key]))
          {
               return $this->results [$key];
          }
          return null ;
    }

}
class Task{
    public $generator ;
    protected $finished = false;
    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
        $key = $this->generator->current();
        $result = Pool::getInstance()->getResult($key);
        if(!is_null($result)) {
             $this->generator->send($result);
             // 为了让任务结束,我们判断一下迭代器里面是否还有值
             if(!$this->generator()->valid()) {
                  $this->finished = true;
             }
        }
    }

    public function isFinished(){
        return $this->finished ;
    }
}

大功告成.

~$ curl http://localhost:9501
[[{"id":"1","username":"1111","date":null}],[{"id":"2","username":"sadasd","date":null}],[{"id":"2","username":"sadasd","date":null}]]

由于篇幅有限,实现方式略显简陋,这只是一种实现思想,Task里面并不需要绑定Pool相关的东西进去,相反可以注入接口,以接口的形式来调用。

完整代码 传送门

推荐阅读:

在PHP中使用协程实现多任务调度 | 风雪之隅www.laruence.com

如有错误,敬请纠正!

本代码已托管到github: github.com/clearcodecn

另外:clearcode.cn是本人正在建设中的一个社区,旨在讲解服务器端技术与原理,提倡开源精神,授人以鱼不如授人以渔,由于时间不足,导致进展缓慢,希望有兴趣的朋友一起加入.

QQ群: 139348611

转载请申明来源!

路人张:Swoole-Demo(3) Swoole数据库连接池zhuanlan.zhihu.com图标 路人张:Swoole-Demo(2) Swoole对象池原理 zhuanlan.zhihu.com 图标 路人张:Swoole-Demo(1) TCP服务端简单实现zhuanlan.zhihu.com图标