PHP中实现MySQL连接池与持久化

MySQL的连接方式

MySQL的连接方式分为两种,一种是socket套接字,一种是基于TCP/IP协议的。如果通过IP访问数据库就是基于TCP/IP协议,如果是本机登录不通过IP访问就使用socket。

具体:

  • TCP/IP:mysql -h 127.0.0.1 -uroot -p
  • socket:mysql -h localhost -uroot -p 或者 mysql -uroot -p

可以通过tcpdump抓包看看,命令tcpdump -i lo port 3306 ,下图是用mysql -uroot -p连接,可以看到并没有抓取到什么内容,说明是不经过网卡;

image-20190320144200537

下图是用mysql -h 127.0.0.1 -uroot -p进行连接,可以看到MySQL的连接过程,是基于TCP/IP协议的。

image-20190320144357782

当退出MySQL,会接收发送4条记录,也就是TCP的4次挥手。

image-20190320144420131

socket的连接方式会快于TCP/IP连接,所以MySQL服务跟其它服务同一机器的话,用socket的连接方式速度会快一些。MySQL使用线程来处理连接,每当一个请求进来,MySQL会创建一个线程去处理请求,这点可以通过在请求前后用show status命令对比Thread_connected数。在高并发下,这将给MySQL服务器带来巨大的压力,消耗服务器资源。

MySQL 线程池

实际上,MySQL实现了线程池,当客户端断开连接后,MySQL将当前线程缓存起来,在接到新的连接请求时快速响应,无需创建新的线程。

查看线程池大小:

show variables like 'thread_cache_size';

设置线程池大小:

set global thread_cache_size = 20;

查看线程池状态:

show status like 'Threads_%';

可以看到以下值,通过这些值适当去调整线程池的大小。

+-------------------+-------+
| Variable_name     | Value |
+-------------------+-------+
| Threads_cached    | 5     |
| Threads_connected | 83    |
| Threads_created   | 11535 |
| Threads_running   | 1     |
+-------------------+-------+
4 rows in set (0.00 sec)
  • **Threads_cached:空闲线程数量。**当有新请求是,MySQL不会立即创建线程去处理,而是去Threads_cached查看空间的连接线程,如果存在则使用,不存在则创建新的线程。

  • Threads_connected:当前处于连接状态的线程个数。

  • Threads_created:创建过的线程数,如果发现Threads_created值过大的话,表明MySQL服务器一直在创建线程,这也是比较耗资源,可以适当增加配置文件中thread_cache_size值。

  • Threads_running:处于激活状态的线程的个数,这个一般都是远小于Threads_connected的。

线程池避免了频繁的创建连接和销毁连接,但有了线程池还是不够的,不能解决客户端频繁连接MySQL数据库的问题,会进行一些频繁的TCP握手挥手。

查看是否持久连接

开启 general_log

// 查看是否开启
mysql> show variables like "general_%";
+------------------+----------------------------------+
| Variable_name    | Value                            |
+------------------+----------------------------------+
| general_log      | OFF                              |
| general_log_file | /var/lib/mysql/VM_0_6_centos.log |
+------------------+----------------------------------+
2 rows in set (0.13 sec)

如果general_log为OFF,则开启:

SET global general_log = on;

这时可以监听general_log_file:

tailf /var/lib/mysql/VM_0_6_centos.log

搭建一个server,或者在cli模式执行PHP:

<?php

try {
    $dbh = new \PDO("mysql:dbname=database_name;host=127.0.0.1", 'username', 'password', [
        \PDO::ATTR_PERSISTENT => true,
    ]);

    $dbh->exec("select id from table_name ORDER BY created_at limit 1");
} catch (\PDOException $e) {
    echo $e->getMessage();
}

下图中的111163-111174就是线程ID,不一致说明没有持久连接。

image-20190321103815556

show processlist

除了开启general_log,还可以在MySQL中执行show processlist;查看连接。

MySQL长连接

在 PHP 脚本的生命周期里,每个请求都会新建一个MySQL数据库连接,在PHP脚本执行结束时,PHP会自动关闭所有连接,所有资源都会自动回收。在并发很高的情况下,MySQL的连接数很快就会被消耗完,这将给MySQL服务器带来巨大的压力。而长连接就是在PHP回收资源时不关闭连接,由某个进程继续管理这个连接,达到复用的效果,减少数据库连接的开销时间。长连接可以避免每次请求都创建连接的开销,节省了时间和IO消耗。

Apache

把 PHP 用作Apache的一个模块,对于一个多进程的服务器,其典型特征是有一个父进程和一组子进程协调运行,其中实际生成 web 页面的是子进程。每当客户端向父进程提出请求时,该请求会被传递给还没有被其它的客户端请求占用的子进程。这也就是说当相同的客户端第二次向服务端提出请求时,它将有可能被一个不同的子进程来处理。在开启了一个持久连接后,所有请求 SQL 服务的后继页面都能够重用这个已经建立的 SQL Server 连接。

Nginx

Nginx的 stream 模块实现了 TCP/UDP 服务的负载均衡,同时借助 stream-lua 模块,我们就可以实现可编程的 stream 服务,也就是用 Nginx 实现自定义的 TCP/UDP 服务!我们可以选择 OpenResty 库来完成MySQL的连接池功能,OpenResty是一个非常强大,而且功能完善的Nginx Lua框架,他封装了Socket、MySQL, Redis, Memcache 等操作。通常大部分 PHP 是搭配 Nginx 来使用的,而且 PHP 和 Nginx 多半是在同一台服务器上。有了这个客观条件,我们就可以利用 Nginx 来实现一个连接池,在 Nginx 上完成连接 MySQL 等服务的工作,然后 PHP 通过本地的 Unix Domain Socket 来连接 Nginx,如此一来既规避了短链接的种种弊端,也享受到了连接池带来的种种好处。

##MySQL连接池

还有一种复用MySQL连接的方式是使用连接池,原理是连接池使用一个共享资源的模式,如并发100个请求,实际上并不是每个请求的所有时间都在执行SQL查询。这样100个请求,共享20个MySQL连接就可以满足需求了。当一个请求操作完数据库后,这时就会释放数据库连接给其他的请求使用。

连接池仅在超大型应用中才有价值。普通的应用采用MySQL长连接方案,每个php-fpm创建一个MySQL连接,每台机器开启100个php-fpm进程。如果有10台机器,每台机器并发的请求为100。实际上只需要创建1000个MySQL连接就能满足需求,数据库的压力并不大。即使有100台机器,硬件配置好的存储服务器依然可以承受。

达到数百或者数千台应用服务器时,MySQL服务器就需要维持十万级的连接。这时数据库的压力就会非常大了。连接池技术就可以派上用场了,可以大大降低数据库连接数。

Swoole同步阻塞模式

基于swooleAsyncTask模块实现的连接池,编程简单,没有数据同步和锁的问题。甚至可以多个服务共享连接池。缺点是:

  1. 灵活性不如多线程连接池,无法动态增减连接
  2. 有一次进程间通信的开销。
<?php

$serv = new swoole_server("127.0.0.1", 9508);
$serv->set(array(
    'worker_num'      => 100,
    'task_worker_num' => 1, //MySQL连接的数量
));

function my_onReceive($serv, $fd, $from_id, $data)
{
    //taskwait就是投递一条任务,这里直接传递SQL语句了
    //然后阻塞等待SQL完成
    $result = $serv->taskwait("show tables");
    if ($result !== false) {
        list($status, $db_res) = explode(':', $result, 2);
        if ($status == 'OK') {
            //数据库操作成功了,执行业务逻辑代码,这里就自动释放掉MySQL连接的占用
            $serv->send($fd, var_export(unserialize($db_res), true) . "\n");
        } else {
            $serv->send($fd, $db_res);
        }
        return;
    } else {
        $serv->send($fd, "Error. Task timeout\n");
    }
}

function my_onTask($serv, $task_id, $from_id, $sql)
{
    static $link = null;
    if ($link == null) {
        $link = mysqli_connect('127.0.0.1', 'root', 'root', 'root');
        if ($link == null || mysqli_ping($link) == false) {
            $link = null;
            $serv->finish("ER:" . mysqli_error($link));
            return;
        }
    }
    $result = $link->query($sql);
    if (!$result) {
        $serv->finish("ER:" . mysqli_error($link));
        return;
    }
    $data = $result->fetch_all(MYSQLI_ASSOC);
    print_r($data);
    $serv->finish("OK:" . serialize($data));
}

function my_onFinish($serv, $data)
{
    echo "AsyncTask Finish:Connect.PID=" . posix_getpid() . PHP_EOL;
}

$serv->on('Receive', 'my_onReceive');
$serv->on('Task', 'my_onTask');
$serv->on('Finish', 'my_onFinish');
$serv->start();

协程模式

<?php

use Swoole\Coroutine\Channel;

abstract class AbstractPool
{
    private $min;//最少连接数
    private $max;//最大连接数
    private $count;//当前连接数
    private $connections;//连接池组
    protected $spareTime;//用于空闲连接回收判断
    
    //数据库配置
    protected $dbConfig = array(
        'host'     => '127.0.0.1',
        'port'     => 3306,
        'user'     => 'root',
        'password' => 'root',
        'database' => 'test',
        'charset'  => 'utf8',
        'timeout'  => 2,
    );

    private $inited = false;

    protected abstract function createDb();

    public function __construct()
    {
        $this->min = 10;
        $this->max = 100;
        $this->spareTime = 10 * 3600;
        $this->connections = new Channel($this->max + 1);
    }

    protected function createObject()
    {
        $obj = null;
        $db = $this->createDb();
        if ($db) {
            $obj = [
                'last_used_time' => time(),
                'db'             => $db,
            ];
        }
        return $obj;
    }

    /**
     * 初始换最小数量连接池
     *
     * @return $this|null
     */
    public function init()
    {
        if ($this->inited) {
            return null;
        }
        for ($i = 0; $i < $this->min; $i++) {
            $obj = $this->createObject();
            $this->count++;
            $this->connections->push($obj);
        }
        return $this;
    }

    public function getConnection($timeOut = 3)
    {
        $obj = null;
        if ($this->connections->isEmpty()) {
            if ($this->count < $this->max) {//连接数没达到最大,新建连接入池
                $this->count++;
                $obj = $this->createObject();
            } else {
                $obj = $this->connections->pop($timeOut);//timeout为出队的最大的等待时间
            }
        } else {
            $obj = $this->connections->pop($timeOut);
        }
        return $obj;
    }

    public function free($obj)
    {
        if ($obj) {
            $this->connections->push($obj);
        }
    }

    /**
     * 处理空闲连接
     */
    public function gcSpareObject()
    {
        //大约2分钟检测一次连接
        swoole_timer_tick(120000, function () {
            $list = [];
            /*echo "开始检测回收空闲链接" . $this->connections->length() . PHP_EOL;*/
            if ($this->connections->length() < intval($this->max * 0.5)) {
                echo "请求连接数还比较多,暂不回收空闲连接\n";
            }#1
            while (true) {
                if (!$this->connections->isEmpty()) {
                    $obj = $this->connections->pop(0.001);
                    $last_used_time = $obj['last_used_time'];
                    if ($this->count > $this->min && (time() - $last_used_time > $this->spareTime)) {
                        //回收
                        $this->count--;
                    } else {
                        array_push($list, $obj);
                    }
                } else {
                    break;
                }
            }
            foreach ($list as $item) {
                $this->connections->push($item);
            }
            unset($list);
        });
    }
}

PDO实现抽象类

<?php

class MysqlPoolPdo extends AbstractPool
{
    protected $dbConfig = array(
        'host'     => 'mysql:host=127.0.0.1:3306;dbname=test',
        'port'     => 3306,
        'user'     => 'root',
        'password' => 'root',
        'database' => 'test',
        'charset'  => 'utf8',
        'timeout'  => 2,
    );
    public static $instance;

    public static function getInstance()
    {
        if (is_null(self::$instance)) {
            self::$instance = new MysqlPoolPdo();
        }
        return self::$instance;
    }

    protected function createDb()
    {
        return new PDO($this->dbConfig['host'], $this->dbConfig['user'], $this->dbConfig['password']);
    }
}

$httpServer = new swoole_http_server('0.0.0.0', 9501);
$httpServer->set([
        'worker_num' => 1
    ]
);
$httpServer->on("WorkerStart", function () {
    MysqlPoolPdo::getInstance()->init();
});
$httpServer->on("request", function ($request, $response) {
    $db = null;
    $obj = MysqlPoolPdo::getInstance()->getConnection();
    if (!empty($obj)) {
        $db = $obj ? $obj['db'] : null;
    }
    if ($db) {
        $db->query("select sleep(2)");
        $ret = $db->query("select * from guestbook limit 1");
        MysqlPoolPdo::getInstance()->free($obj);
        $response->end(json_encode($ret));
    }
});
$httpServer->start();

参考资料

数据库持久连接

MySQL的连接池、异步、断线重连

基于swoole扩展实现真正的PHP数据库连接池

用Swoole4 打造高并发的PHP协程Mysql连接池