通过GatewayWorker/Workerman搭建Websocket微服务

5月 11, 2018

背景

最近在一些项目需要用到Websocket实时推送给分组的用户,前端需要传输给后端的信息比较少,通过多方考虑选择了通过GatewayWorker框架(基于Workerman)搭建微服务。

介绍

Workerman

Workerman是一款纯PHP开发的开源高性能的PHP socket 服务框架。

Workerman不是重复造轮子,它不是一个MVC框架,而是一个更底层更通用的socket服务框架,你可以用它开发tcp代理、梯子代理、做游戏服务器、邮件服务器、ftp服务器、甚至开发一个php版本的redis、php版本的数据库、php版本的nginx、php版本的php-fpm等等。Workerman可以说是PHP领域的一次创新,让开发者彻底摆脱了PHP只能做WEB的束缚。

实际上Workerman类似一个PHP版本的nginx,核心也是多进程+Epoll+非阻塞IO。Workerman每个进程能维持上万并发连接。由于本身常住内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能。同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协以及各种自定义协议。拥有定时器、异步socket客户端、异步Mysql、异步Redis、异步Http、异步消息队列等众多高性能组件。

github地址:https://github.com/walkor/Workerman

文档:http://doc.workerman.net/315110

GatewayWorker

GatewayWorker基于Workerman开发的一个项目框架,用于快速开发TCP长连接应用,例如app推送服务端、即时IM服务端、游戏服务端、物联网、智能家居等等

GatewayWorker使用经典的Gateway和Worker进程模型。Gateway进程负责维持客户端连接,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。Gateway服务和BusinessWorker服务可以分开部署在不同的服务器上,实现分布式集群。

GatewayWorker提供非常方便的API,可以全局广播数据、可以向某个群体广播数据、也可以向某个特定客户端推送数据。配合Workerman的定时器,也可以定时推送数据。

github地址:https://github.com/walkor/GatewayWorker

文档:http://doc2.workerman.net/326102

Workerman与GatewayWorker的关系

Workerman可以看做是一个纯粹的socket类库,可以开发几乎所有的网络应用,不管是TCP的还是UDP的,长连接的还是短连接的。Workerman代码精简,功能强大,使用灵活,能够快速开发出各种网络应用。同时Workerman相比GatewayWorker也更底层,需要开发者有一定的多进程编程经验。

因为绝大多数开发者的目标是基于Workerman开发TCP长连接应用,而长连接应用服务端有很多共同之处,例如它们有相同的进程模型以及单发、群发、广播等接口需求。所以才有了GatewayWorker框架,GatewayWorker是基于Workerman开发的一个TCP长连接框架,实现了单发、群送、广播等长连接必用的接口。GatewayWorker框架实现了Gateway Worker进程模型,天然支持分布式多服务器部署,扩容缩容非常方便,能够应对海量并发连接。可以说GatewayWorker是基于Workerman实现的一个更完善的专门用于实现TCP长连接的项目框架。

GatewayClient

GatewayClient是GatewayWorker的客户端程序,可以进行推送、分组、统计等操作。

#websocket微服务介绍

总体原则,websocket微服务不处理业务逻辑,仅仅是一个单向连接,只负责推送信息。但客户端连接websocket微服务时,websocket微服务返回给客户端clientId,客户端调用接口把clientId传给后端,此时后端就可以通过GatewayClient绑定用户到具体分组。但需要推送时,通过text协议与GatewayWorker通信,把要推送的clientId或者分组传给GatewayWorker,GatewayWorker再推送给客户端。图示如下:

具体实现

安装GatewayWorker内核

新建一个空白项目(不在Laravel/Lumen/ThinkPHP 等PHP框架里),执行

1
composer require workerman/gateway-worker

启动文件

在根目录新建start.php作为启动文件,代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php

ini_set('display_errors', 'on');
use Workerman\Worker;

if(strpos(strtolower(PHP_OS), 'win') === 0)
{
exit("start.php not support windows, please use start_for_win.bat\n");
}

// 检查扩展
if(!extension_loaded('pcntl'))
{
exit("Please install pcntl extension. See http://doc3.workerman.net/appendices/install-extension.html\n");
}

if(!extension_loaded('posix'))
{
exit("Please install posix extension. See http://doc3.workerman.net/appendices/install-extension.html\n");
}

// 标记是全局启动
define('GLOBAL_START', 1);

require_once __DIR__ . '/vendor/autoload.php';

// 加载所有Applications/*/start.php,以便启动所有服务
foreach(glob(__DIR__.'/src/start*.php') as $start_file)
{
require_once $start_file;
}
// 运行所有服务
Worker::runAll();

注册Register类

Register类负责注册内部通讯地址。Gateway进程和BusinessWorker进程启动后分别向Register进程注册自己的通讯地址,Gateway进程和BusinessWorker通过Register进程得到通讯地址后,就可以建立起连接并通讯了。

GatewayWorker工作原理

src/start_register.php (目录名可以自己定义) 代码:

1
2
3
4
5
6
<?php 

use \GatewayWorker\Register;

// register 服务必须是text协议
$register = new Register('text://0.0.0.0:1238');

注册Gateway类

Gateway类用于初始化Gateway进程。Gateway进程是暴露给客户端的让其连接的进程。所有客户端的请求都是由Gateway接收然后分发给BusinessWorker处理,同样BusinessWorker也会将要发给客户端的响应通过Gateway转发出去。

src/start_gateway.php 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?php
use \Workerman\Worker;
use \Workerman\WebServer;
use \GatewayWorker\Gateway;
use \GatewayWorker\BusinessWorker;
use \Workerman\Autoloader;

// gateway 进程
$gateway = new Gateway("websocket://0.0.0.0:8282");
// gateway名称,status方便查看
$gateway->name = 'business-gateway';
// gateway进程数
$gateway->count = 2;
// 本机ip,分布式部署时使用内网ip
$gateway->lanIp = '127.0.0.1';
// 内部通讯起始端口,假如$gateway->count=4,起始端口为4000
// 则一般会使用4000 4001 4002 4003 4个端口作为内部通讯端口
$gateway->startPort = 2900;
// 服务注册地址(register类地址)
$gateway->registerAddress = '127.0.0.1:1238';

注册BusinessWorker类

BusinessWorker是运行业务逻辑的进程,BusinessWorker收到Gateway转发来的事件及请求时会默认调用Events.php中的onConnect onMessage onClose方法处理事件及数据,开发者正是通过实现这些回调控制业务及流程。

src/start_businessworker.php 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<?php

use \Workerman\Worker;
use \Workerman\WebServer;
use \GatewayWorker\Gateway;
use \GatewayWorker\BusinessWorker;
use \Workerman\Autoloader;

// bussinessWorker 进程
$worker = new BusinessWorker();
// worker名称
$worker->name = 'Steam-BusinessWorker';
// bussinessWorker进程数量
$worker->count = 1;
// 服务注册地址
$worker->registerAddress = '127.0.0.1:1238';

Events类

Events类用于捕获GatewayWorker事件,在这里可以写一些回调信息。

src/Events.php 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<?php

/**
* 用于检测业务代码死循环或者长时间阻塞等问题
* 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
* 然后观察一段时间workerman.log看是否有process_timeout异常
*/
//declare(ticks=1);

use \GatewayWorker\Lib\Gateway;

/**
* 主逻辑
* 主要是处理 onConnect onMessage onClose 三个方法
* onConnect 和 onClose 如果不需要可以不用实现并删除
*/
class Events
{
/**
* 当客户端连接时触发
* 如果业务不需此回调可以删除onConnect
*
* @param int $client_id 连接id
*/
public static function onConnect($client_id)
{
// 向当前client_id发送数据
Gateway::sendToClient($client_id, json_encode([
'clientId' => $client_id,
]));
}

/**
* 当客户端发来消息时触发
* @param int $client_id 连接id
* @param mixed $message 具体消息
*/
public static function onMessage($client_id, $message)
{
}

/**
* 当用户断开连接时触发
* @param int $client_id 连接id
*/
public static function onClose($client_id)
{
}
}

在PHP项目分组或推送给客户端

这是在你自己的项目写的代码,过程:前端调用接口传来clientId,后端绑定到分组,再推送信息给分组或者指定的clientId客户端。

需要在项目中引用GatewayClient包

1
composer require workerman/gatewayclient

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// GatewayClient 3.0.0版本以后加了命名空间
use GatewayClient\Gateway;

/**
* === 指定registerAddress表明与哪个GatewayWorker(集群)通讯。===
* GatewayWorker里用Register服务来区分集群,即一个GatewayWorker(集群)只有一个Register服务,
* GatewayClient要与之通讯必须知道这个Register服务地址才能通讯,这个地址格式为 ip:端口 ,
* 其中ip为Register服务运行的ip(如果GatewayWorker是单机部署则ip就是运行GatewayWorker的服务器ip),
* 端口是对应ip的服务器上start_register.php文件中监听的端口,也就是GatewayWorker启动时看到的Register的端口。
* GatewayClient要想推送数据给客户端,必须知道客户端位于哪个GatewayWorker(集群),
* 然后去连这个GatewayWorker(集群)Register服务的 ip:端口,才能与对应GatewayWorker(集群)通讯。
* 这个 ip:端口 在GatewayClient一侧使用 Gateway::$registerAddress 来指定。
*
* === 如果GatewayClient和GatewayWorker不在同一台服务器需要以下步骤 ===
* 1、需要设置start_gateway.php中的lanIp为实际的本机内网ip(如不在一个局域网也可以设置成外网ip),设置完后要重启GatewayWorker
* 2、GatewayClient这里的Gateway::$registerAddress的ip填写填写上面步骤1lanIp所指定的ip,端口
* 3、需要开启GatewayWorker所在服务器的防火墙,让以下端口可以被GatewayClient所在服务器访问,
* 端口包括Rgister服务的端口以及start_gateway.php中lanIp与startPort指定的几个端口
*
* === 如果GatewayClient和GatewayWorker在同一台服务器 ===
* GatewayClient和Register服务都在一台服务器上,ip填写127.0.0.1及即可,无需其它设置。
**/
Gateway::$registerAddress = '127.0.0.1:1236';

// GatewayClient支持GatewayWorker中的所有接口(Gateway::closeCurrentClient Gateway::sendToCurrentClient除外)
Gateway::sendToAll($data);
Gateway::sendToClient($client_id, $data);
Gateway::closeClient($client_id);
Gateway::isOnline($client_id);
Gateway::bindUid($client_id, $uid);
Gateway::isUidOnline($uid);
Gateway::getClientIdByUid($client_id);
Gateway::unbindUid($client_id, $uid);
Gateway::sendToUid($uid, $dat);
Gateway::joinGroup($client_id, $group);
Gateway::sendToGroup($group, $data);
Gateway::leaveGroup($client_id, $group);
Gateway::getClientCountByGroup($group);
Gateway::getClientSessionsByGroup($group);
Gateway::getAllClientCount();
Gateway::getAllClientSessions();
Gateway::setSession($client_id, $session);
Gateway::updateSession($client_id, $session);
Gateway::getSession($client_id);