PHP中实现MySQL连接池与持久化
MySQL的连接方式
MySQL的连接方式分为两种,一种是socket套接字,一种是基于TCP/IP协议的。如果通过IP访问数据库就是基于TCP/IP协议,如果是本机登录不通过IP访问就使用socket。
具体:
- TCP/IP:mysql -h 127.0.0.1 -uroot -p
- socket:mysql -h localhost -uroot -p 或者 mysql -uroot -p
可以通过tcpdump抓包看看,命令tcpdump -i lo port 3306
,下图是用mysql -uroot -p
连接,可以看到并没有抓取到什么内容,说明是不经过网卡;

下图是用mysql -h 127.0.0.1 -uroot -p
进行连接,可以看到MySQL的连接过程,是基于TCP/IP协议的。

当退出MySQL,会接收发送4条记录,也就是TCP的4次挥手。

socket的连接方式会快于TCP/IP连接,所以MySQL服务跟其它服务同一机器的话,用socket的连接方式速度会快一些。MySQL使用线程来处理连接,每当一个请求进来,MySQL会创建一个线程去处理请求,这点可以通过在请求前后用show status
命令对比Thread_connected
数。在高并发下,这将给MySQL服务器带来巨大的压力,消耗服务器资源。
MySQL 线程池
实际上,MySQL实现了线程池,当客户端断开连接后,MySQL将当前线程缓存起来,在接到新的连接请求时快速响应,无需创建新的线程。
查看线程池大小:
show variables like 'thread_cache_size';
设置线程池大小:
set global thread_cache_size = 20;
查看线程池状态:
show status like 'Threads_%';
可以看到以下值,通过这些值适当去调整线程池的大小。
+-------------------+-------+
| Variable_name | Value |
+-------------------+-------+
| Threads_cached | 5 |
| Threads_connected | 83 |
| Threads_created | 11535 |
| Threads_running | 1 |
+-------------------+-------+
4 rows in set (0.00 sec)
-
**Threads_cached:空闲线程数量。**当有新请求是,MySQL不会立即创建线程去处理,而是去Threads_cached查看空间的连接线程,如果存在则使用,不存在则创建新的线程。
-
Threads_connected:当前处于连接状态的线程个数。
-
Threads_created:创建过的线程数,如果发现Threads_created值过大的话,表明MySQL服务器一直在创建线程,这也是比较耗资源,可以适当增加配置文件中thread_cache_size值。
-
Threads_running:处于激活状态的线程的个数,这个一般都是远小于Threads_connected的。
线程池避免了频繁的创建连接和销毁连接,但有了线程池还是不够的,不能解决客户端频繁连接MySQL数据库的问题,会进行一些频繁的TCP握手挥手。
查看是否持久连接
开启 general_log
// 查看是否开启
mysql> show variables like "general_%";
+------------------+----------------------------------+
| Variable_name | Value |
+------------------+----------------------------------+
| general_log | OFF |
| general_log_file | /var/lib/mysql/VM_0_6_centos.log |
+------------------+----------------------------------+
2 rows in set (0.13 sec)
如果general_log为OFF,则开启:
SET global general_log = on;
这时可以监听general_log_file:
tailf /var/lib/mysql/VM_0_6_centos.log
搭建一个server,或者在cli模式执行PHP:
<?php
try {
$dbh = new \PDO("mysql:dbname=database_name;host=127.0.0.1", 'username', 'password', [
\PDO::ATTR_PERSISTENT => true,
]);
$dbh->exec("select id from table_name ORDER BY created_at limit 1");
} catch (\PDOException $e) {
echo $e->getMessage();
}
下图中的111163-111174就是线程ID,不一致说明没有持久连接。

show processlist
除了开启general_log,还可以在MySQL中执行show processlist;
查看连接。
MySQL长连接
在 PHP 脚本的生命周期里,每个请求都会新建一个MySQL数据库连接,在PHP脚本执行结束时,PHP会自动关闭所有连接,所有资源都会自动回收。在并发很高的情况下,MySQL的连接数很快就会被消耗完,这将给MySQL服务器带来巨大的压力。而长连接就是在PHP回收资源时不关闭连接,由某个进程继续管理这个连接,达到复用的效果,减少数据库连接的开销时间。长连接可以避免每次请求都创建连接的开销,节省了时间和IO消耗。
Apache
把 PHP 用作Apache的一个模块,对于一个多进程的服务器,其典型特征是有一个父进程和一组子进程协调运行,其中实际生成 web 页面的是子进程。每当客户端向父进程提出请求时,该请求会被传递给还没有被其它的客户端请求占用的子进程。这也就是说当相同的客户端第二次向服务端提出请求时,它将有可能被一个不同的子进程来处理。在开启了一个持久连接后,所有请求 SQL 服务的后继页面都能够重用这个已经建立的 SQL Server 连接。
Nginx
Nginx的 stream 模块实现了 TCP/UDP 服务的负载均衡,同时借助 stream-lua 模块,我们就可以实现可编程的 stream 服务,也就是用 Nginx 实现自定义的 TCP/UDP 服务!我们可以选择 OpenResty 库来完成MySQL的连接池功能,OpenResty是一个非常强大,而且功能完善的Nginx Lua框架,他封装了Socket、MySQL, Redis, Memcache 等操作。通常大部分 PHP 是搭配 Nginx 来使用的,而且 PHP 和 Nginx 多半是在同一台服务器上。有了这个客观条件,我们就可以利用 Nginx 来实现一个连接池,在 Nginx 上完成连接 MySQL 等服务的工作,然后 PHP 通过本地的 Unix Domain Socket 来连接 Nginx,如此一来既规避了短链接的种种弊端,也享受到了连接池带来的种种好处。
##MySQL连接池
还有一种复用MySQL连接的方式是使用连接池,原理是连接池使用一个共享资源的模式,如并发100个请求,实际上并不是每个请求的所有时间都在执行SQL查询。这样100个请求,共享20个MySQL连接就可以满足需求了。当一个请求操作完数据库后,这时就会释放数据库连接给其他的请求使用。
连接池仅在超大型应用中才有价值。普通的应用采用MySQL长连接方案,每个php-fpm创建一个MySQL连接,每台机器开启100个php-fpm进程。如果有10台机器,每台机器并发的请求为100。实际上只需要创建1000个MySQL连接就能满足需求,数据库的压力并不大。即使有100台机器,硬件配置好的存储服务器依然可以承受。
达到数百或者数千台应用服务器时,MySQL服务器就需要维持十万级的连接。这时数据库的压力就会非常大了。连接池技术就可以派上用场了,可以大大降低数据库连接数。
Swoole同步阻塞模式
基于swoole
的AsyncTask
模块实现的连接池,编程简单,没有数据同步和锁的问题。甚至可以多个服务共享连接池。缺点是:
- 灵活性不如多线程连接池,无法动态增减连接
- 有一次进程间通信的开销。
<?php
$serv = new swoole_server("127.0.0.1", 9508);
$serv->set(array(
'worker_num' => 100,
'task_worker_num' => 1, //MySQL连接的数量
));
function my_onReceive($serv, $fd, $from_id, $data)
{
//taskwait就是投递一条任务,这里直接传递SQL语句了
//然后阻塞等待SQL完成
$result = $serv->taskwait("show tables");
if ($result !== false) {
list($status, $db_res) = explode(':', $result, 2);
if ($status == 'OK') {
//数据库操作成功了,执行业务逻辑代码,这里就自动释放掉MySQL连接的占用
$serv->send($fd, var_export(unserialize($db_res), true) . "\n");
} else {
$serv->send($fd, $db_res);
}
return;
} else {
$serv->send($fd, "Error. Task timeout\n");
}
}
function my_onTask($serv, $task_id, $from_id, $sql)
{
static $link = null;
if ($link == null) {
$link = mysqli_connect('127.0.0.1', 'root', 'root', 'root');
if ($link == null || mysqli_ping($link) == false) {
$link = null;
$serv->finish("ER:" . mysqli_error($link));
return;
}
}
$result = $link->query($sql);
if (!$result) {
$serv->finish("ER:" . mysqli_error($link));
return;
}
$data = $result->fetch_all(MYSQLI_ASSOC);
print_r($data);
$serv->finish("OK:" . serialize($data));
}
function my_onFinish($serv, $data)
{
echo "AsyncTask Finish:Connect.PID=" . posix_getpid() . PHP_EOL;
}
$serv->on('Receive', 'my_onReceive');
$serv->on('Task', 'my_onTask');
$serv->on('Finish', 'my_onFinish');
$serv->start();
协程模式
<?php
use Swoole\Coroutine\Channel;
abstract class AbstractPool
{
private $min;//最少连接数
private $max;//最大连接数
private $count;//当前连接数
private $connections;//连接池组
protected $spareTime;//用于空闲连接回收判断
//数据库配置
protected $dbConfig = array(
'host' => '127.0.0.1',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'charset' => 'utf8',
'timeout' => 2,
);
private $inited = false;
protected abstract function createDb();
public function __construct()
{
$this->min = 10;
$this->max = 100;
$this->spareTime = 10 * 3600;
$this->connections = new Channel($this->max + 1);
}
protected function createObject()
{
$obj = null;
$db = $this->createDb();
if ($db) {
$obj = [
'last_used_time' => time(),
'db' => $db,
];
}
return $obj;
}
/**
* 初始换最小数量连接池
*
* @return $this|null
*/
public function init()
{
if ($this->inited) {
return null;
}
for ($i = 0; $i < $this->min; $i++) {
$obj = $this->createObject();
$this->count++;
$this->connections->push($obj);
}
return $this;
}
public function getConnection($timeOut = 3)
{
$obj = null;
if ($this->connections->isEmpty()) {
if ($this->count < $this->max) {//连接数没达到最大,新建连接入池
$this->count++;
$obj = $this->createObject();
} else {
$obj = $this->connections->pop($timeOut);//timeout为出队的最大的等待时间
}
} else {
$obj = $this->connections->pop($timeOut);
}
return $obj;
}
public function free($obj)
{
if ($obj) {
$this->connections->push($obj);
}
}
/**
* 处理空闲连接
*/
public function gcSpareObject()
{
//大约2分钟检测一次连接
swoole_timer_tick(120000, function () {
$list = [];
/*echo "开始检测回收空闲链接" . $this->connections->length() . PHP_EOL;*/
if ($this->connections->length() < intval($this->max * 0.5)) {
echo "请求连接数还比较多,暂不回收空闲连接\n";
}#1
while (true) {
if (!$this->connections->isEmpty()) {
$obj = $this->connections->pop(0.001);
$last_used_time = $obj['last_used_time'];
if ($this->count > $this->min && (time() - $last_used_time > $this->spareTime)) {
//回收
$this->count--;
} else {
array_push($list, $obj);
}
} else {
break;
}
}
foreach ($list as $item) {
$this->connections->push($item);
}
unset($list);
});
}
}
PDO实现抽象类
<?php
class MysqlPoolPdo extends AbstractPool
{
protected $dbConfig = array(
'host' => 'mysql:host=127.0.0.1:3306;dbname=test',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'charset' => 'utf8',
'timeout' => 2,
);
public static $instance;
public static function getInstance()
{
if (is_null(self::$instance)) {
self::$instance = new MysqlPoolPdo();
}
return self::$instance;
}
protected function createDb()
{
return new PDO($this->dbConfig['host'], $this->dbConfig['user'], $this->dbConfig['password']);
}
}
$httpServer = new swoole_http_server('0.0.0.0', 9501);
$httpServer->set([
'worker_num' => 1
]
);
$httpServer->on("WorkerStart", function () {
MysqlPoolPdo::getInstance()->init();
});
$httpServer->on("request", function ($request, $response) {
$db = null;
$obj = MysqlPoolPdo::getInstance()->getConnection();
if (!empty($obj)) {
$db = $obj ? $obj['db'] : null;
}
if ($db) {
$db->query("select sleep(2)");
$ret = $db->query("select * from guestbook limit 1");
MysqlPoolPdo::getInstance()->free($obj);
$response->end(json_encode($ret));
}
});
$httpServer->start();