Swoole-Demo(3) Swoole数据库连接池

3,275 阅读4分钟
原文链接: zhuanlan.zhihu.com

摘自百度百科:

数据库连接池负责分配、管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库连接,而不是再重新建立一个;释放空闲时间超过最大空闲时间的数据库连接来避免因为没有释放数据库连接而引起的数据库连接遗漏。这项技术能明显提高对数据库操作的性能。

数据库连接是一种关键的、有限的、昂贵的资源,这一点在多用户的网页应用程序中体现得尤为突出。对数据库连接的管理能显著影响到整个应用程序的伸缩性和健壮性,影响到程序的性能指标。数据库连接池正是针对这个问题提出来的。

数据库连接池的最小连接数和最大连接数的设置要考虑到下列几个因素:

1. 最小连接数

是连接池一直保持的数据库连接,所以如果应用程序对数据库连接的使用量不大,将会有大量的数据库连接资源被浪费。

2. 最大连接数

是连接池能申请的最大连接数,如果数据库连接请求超过此数,后面的数据库连接请求将被加入到等待队列中,这会影响之后的数据库操作。

3. 最小连接数与最大连接数差距

最小连接数与最大连接数相差太大,那么最先的连接请求将会获利,之后超过最小连接数量的连接请求等价于建立一个新的数据库连接。不过,这些大于最小连接数的数据库连接在使用完不会马上被释放,它将被放到连接池中等待重复使用或是空闲超时后被释放。

在PHP-FPM中,数据库是没有连接池的,请求开始,在需要查询数据的地方开始建立数据库连接,之后查询数据,请求完成连接关闭。下一次请求继续重复这样的操作,弊端在于需要在每次请求中初始化数据库连接操作。

在Swoole中,由于对象式持久化的,那么就可以在服务器启动初期事先建立好一定数量的数据库连接放在那,应用程序需要连接的时候,在去获取,同时应用每隔一段时间把数据库连接维持在min-max之间,并且向数据库发送一段简单查询以维持数据库连接不断,这样就省去了建立连接的过程。

Swoole已经提供了Mysql异步客户端。下面将用Mysql异步客户端来实现连接池。

<?php

/**
 * author : rookiejin <mrjnamei@gmail.com>
 * createTime : 2018/1/16 14:32
 * description: Server.php - swoole-demo
 */
class Pool
{
    // 连接池数组 .
    protected $connections ;

    // 最大连接数
    protected $max ;

    // 最小连接数
    protected $min ;

    // 已连接数
    protected $count = 0 ;

    protected $inited = false ;

    // 单例
    private static $instance ;

    //数据库配置
    protected $config  = array(
        'host' => '127.0.0.1',
        'port' => 3306,
        'user' => 'root',
        'password' => 'rootpassword',
        'database' => 'swoole',
        'charset' => 'utf8',
        'timeout' => 2,
    );

    public function __construct()
    {
        //初始化连接是一个Spl队列
        $this->connections = new SplQueue() ;
        $this->max = 30 ;
        $this->min = 5 ;
        // 绑定单例
        self::$instance = & $this ;
    }

    // worker启动的时候 建立 min 个连接
    public function init()
    {
        if($this->inited){
            return ;
        }
        for($i = 0; $i < $this->min ; $i ++){
            $this->generate();
        }
        return $this ;
    }

    /**
     * 维持当前的连接数不断线,并且剔除断线的链接 .
     */
    public function keepAlive()
    {
        // 2分钟检测一次连接
        swoole_timer_tick( 1000 , function(){
            // 维持连接
            while ($this->connections->count() >0 && $next=$this->connections->shift()){
                $next->query("select 1" , function($db ,$res){
                    if($res == false){
                        return ;
                    }
                    echo "当前连接数:" . $this->connections->count() . PHP_EOL ;
                    $this->connections->push($db);
                });
            }
        });

        swoole_timer_tick(1000 , function(){
            // 维持活跃的链接数在 min-max之间
            if($this->connections->count() > $this->max) {
                while($this->max < $this->connections->count()){
                    $next = $this->connections->shift();
                    $next->close();
                    $this->count-- ;
                    echo "关闭连接...\n" ;
                }
            }
        });
    }

    // 建立一个新的连接
    public function generate($callback = null)
    {
        $db = new swoole_mysql ;
        $db->connect($this->config , function($db , $res) use($callback) {
            if($res == false){
                throw new Exception("数据库连接错误::" . $db->connect_errno . $db->connect_error);
            }
            $this->count ++ ;
            $this->addConnections($db);
            if(is_callable($callback)){
                call_user_func($callback);
            }
        });
    }

    // 连接推进队列
    public function addConnections($db)
    {
        $this->connections->push($db);
        return $this;
    }

    // 执行数据库命令 . 会判断连接数够不够,够就直接执行,不够就新建连接执行
    public function query($query , $callback)
    {
        if($this->connections->count() == 0) {
            $this->generate(function() use($query,$callback){
                $this->exec($query,$callback);
            });
        }
        else{
           $this->exec($query,$callback);
        }
    }
    // 直接执行数据库命令并且 callback();
    private function exec($query, $callback)
    {
        $db = $this->connections->shift();
        $db->query($query ,function($db , $result) use($callback){
            $this->connections->push($db);
            $callback($result);
        });
    }

    public static function getInstance()
    {
        if(is_null(self::$instance)){
            new Pool();
        }
        return self::$instance;
    }
}

$server = new swoole_http_server("0.0.0.0",9501);
$server->set([
    'worker_num' => 4 ,
]);
$server->on("WorkerStart",function($server , $wid){
    Pool::getInstance()->init()->keepAlive();
});
$server->on("request",function($request,$response){
    $pool = Pool::getInstance()->query("select * from users", function($res) use($response) {
        $response->end(json_encode($res));
    });
});
$server->start();

代码解读:

  • workerStart的时候先实例化单例Pool . Pool初始化配置,并且绑定单例到本身
  • 执行init()函数,该函数主要是初始化连接池的数量为Pool::min的数量
  • 执行keepAlive() , 该函数主要是用2个定时器,定时检查数据库连接的健康状态,以及连接数量的大小。具体多少时间可以根据业务来定。
  • 监听http请求,在需要使用数据库的地方执行sql就行了。可以将sql封装成orm来操作。
  • 最后返回response.

连接池虽然已经实现了,但是在开发的过程中,我们却被无限个 callback 函数嵌套苦恼着,

下一节将带来如何用同步的写法来实现异步操作。

如有错误,敬请纠正!

本代码已托管到github: clearcodecn/swoole-demo

另外:clearcode.cn是本人正在建设中的一个社区,旨在讲解服务器端技术与原理,提倡开源精神,授人以鱼不如授人以渔,由于时间不足,导致进展缓慢,希望有兴趣的朋友一起加入.

QQ群: 139348611

转载请申明来源!