up. 添加gateway服务
This commit is contained in:
parent
3539555fe8
commit
f5dbbf936b
@ -13,7 +13,7 @@ use think\facade\Console;
|
|||||||
/**
|
/**
|
||||||
* 定时任务
|
* 定时任务
|
||||||
*/
|
*/
|
||||||
class Crontab extends Command
|
class SysCrontab extends Command
|
||||||
{
|
{
|
||||||
protected function configure()
|
protected function configure()
|
||||||
{
|
{
|
||||||
94
app/command/admin/SysGateway.php
Normal file
94
app/command/admin/SysGateway.php
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace app\command\admin;
|
||||||
|
|
||||||
|
use GatewayWorker\Register;
|
||||||
|
use think\console\Command;
|
||||||
|
use think\console\Input;
|
||||||
|
use think\console\input\Argument;
|
||||||
|
use think\console\input\Option;
|
||||||
|
use think\console\Output;
|
||||||
|
use \GatewayWorker\BusinessWorker;
|
||||||
|
|
||||||
|
class SysGateway extends Command
|
||||||
|
{
|
||||||
|
protected function configure()
|
||||||
|
{
|
||||||
|
$this->setName('admin:gateway')
|
||||||
|
->addArgument('action', Argument::OPTIONAL, "start|stop|restart|reload|status|connections", 'start')
|
||||||
|
->addOption('mode', 'm', Option::VALUE_OPTIONAL, 'Run the workerman server in daemon mode.')
|
||||||
|
->setDescription('后台系统网关服务');
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function execute(Input $input, Output $output)
|
||||||
|
{
|
||||||
|
$action = $input->getArgument('action');
|
||||||
|
$mode = $input->getOption('mode');
|
||||||
|
global $argv;
|
||||||
|
$argv = [];
|
||||||
|
array_unshift($argv, 'think', $action);
|
||||||
|
if ($mode == 'd') {
|
||||||
|
$argv[] = '-d';
|
||||||
|
} else if ($mode == 'g') {
|
||||||
|
$argv[] = '-g';
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
$register = $this->startRegister();
|
||||||
|
$gateway = $this->startGateway();
|
||||||
|
|
||||||
|
$worker = new BusinessWorker();
|
||||||
|
$worker->name = 'ChatBusinessWorker';
|
||||||
|
$worker->count = 4;
|
||||||
|
$worker->registerAddress = '127.0.0.1:1236';
|
||||||
|
|
||||||
|
// 运行所有服务
|
||||||
|
Worker::runAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private function startRegister(): Register
|
||||||
|
{
|
||||||
|
return new Register('text://127.0.0.1:1236');
|
||||||
|
}
|
||||||
|
private function startGateway(): \GatewayWorker\Gateway
|
||||||
|
{
|
||||||
|
// gateway 进程
|
||||||
|
$gateway = new \GatewayWorker\Gateway("Websocket://0.0.0.0:7272");
|
||||||
|
// 设置名称,方便status时查看
|
||||||
|
$gateway->name = 'ChatGateway';
|
||||||
|
// 设置进程数,一般两个进程就足够
|
||||||
|
$gateway->count = 2;
|
||||||
|
// 分布式部署时请设置成内网ip(非127.0.0.1)
|
||||||
|
$gateway->lanIp = '127.0.0.1';
|
||||||
|
// 内部通讯起始端口。假如$gateway->count=2,起始端口为2300
|
||||||
|
// 则一般会使用2300 2301 2个端口作为内部通讯端口
|
||||||
|
$gateway->startPort = 2300;
|
||||||
|
// 心跳间隔
|
||||||
|
$gateway->pingInterval = 10;
|
||||||
|
// 心跳数据
|
||||||
|
$gateway->pingData = '{"type":"ping"}';
|
||||||
|
// 服务注册地址
|
||||||
|
$gateway->registerAddress = '127.0.0.1:1236';
|
||||||
|
|
||||||
|
/*
|
||||||
|
// 当客户端连接上来时,设置连接的onWebSocketConnect,即在websocket握手时的回调
|
||||||
|
$gateway->onConnect = function($connection)
|
||||||
|
{
|
||||||
|
$connection->onWebSocketConnect = function($connection , $http_header)
|
||||||
|
{
|
||||||
|
// 可以在这里判断连接来源是否合法,不合法就关掉连接
|
||||||
|
// $_SERVER['HTTP_ORIGIN']标识来自哪个站点的页面发起的websocket链接
|
||||||
|
if($_SERVER['HTTP_ORIGIN'] != 'http://chat.workerman.net')
|
||||||
|
{
|
||||||
|
$connection->close();
|
||||||
|
}
|
||||||
|
// onWebSocketConnect 里面$_GET $_SERVER是可用的
|
||||||
|
// var_dump($_GET, $_SERVER);
|
||||||
|
};
|
||||||
|
};
|
||||||
|
*/
|
||||||
|
|
||||||
|
return $gateway;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -5,7 +5,8 @@
|
|||||||
return [
|
return [
|
||||||
// 指令定义
|
// 指令定义
|
||||||
'commands' => [
|
'commands' => [
|
||||||
\app\command\admin\Crontab::class,
|
\app\command\admin\SysCrontab::class,
|
||||||
|
\app\command\admin\SysGateway::class,
|
||||||
\app\command\admin\Worker::class,
|
\app\command\admin\Worker::class,
|
||||||
],
|
],
|
||||||
];
|
];
|
||||||
|
|||||||
2
extend/.gitignore
vendored
2
extend/.gitignore
vendored
@ -1,2 +0,0 @@
|
|||||||
*
|
|
||||||
!.gitignore
|
|
||||||
3
extend/GatewayClient/.gitignore
vendored
Normal file
3
extend/GatewayClient/.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
/vendor
|
||||||
|
/.vscode
|
||||||
|
/.idea
|
||||||
115
extend/GatewayClient/Context.php
Normal file
115
extend/GatewayClient/Context.php
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
<?php
|
||||||
|
namespace GatewayClient;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This file is part of workerman.
|
||||||
|
*
|
||||||
|
* Licensed under The MIT License
|
||||||
|
* For full copyright and license information, please see the MIT-LICENSE.txt
|
||||||
|
* Redistributions of files must retain the above copyright notice.
|
||||||
|
*
|
||||||
|
* @author walkor<walkor@workerman.net>
|
||||||
|
* @copyright walkor<walkor@workerman.net>
|
||||||
|
* @link http://www.workerman.net/
|
||||||
|
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
|
||||||
|
*/
|
||||||
|
class Context
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* 内部通讯id
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
public static $local_ip;
|
||||||
|
/**
|
||||||
|
* 内部通讯端口
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
public static $local_port;
|
||||||
|
/**
|
||||||
|
* 客户端ip
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
public static $client_ip;
|
||||||
|
/**
|
||||||
|
* 客户端端口
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
public static $client_port;
|
||||||
|
/**
|
||||||
|
* client_id
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
public static $client_id;
|
||||||
|
/**
|
||||||
|
* 连接connection->id
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
public static $connection_id;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 旧的session
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
public static $old_session;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 编码session
|
||||||
|
* @param mixed $session_data
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
public static function sessionEncode($session_data = '')
|
||||||
|
{
|
||||||
|
if ($session_data !== '') {
|
||||||
|
return serialize($session_data);
|
||||||
|
}
|
||||||
|
|
||||||
|
return '';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 解码session
|
||||||
|
* @param string $session_buffer
|
||||||
|
* @return mixed
|
||||||
|
*/
|
||||||
|
public static function sessionDecode($session_buffer)
|
||||||
|
{
|
||||||
|
return unserialize($session_buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 清除上下文
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public static function clear()
|
||||||
|
{
|
||||||
|
static::$local_ip = static::$local_port = static::$client_ip = static::$client_port =
|
||||||
|
static::$client_id = static::$connection_id = static::$old_session = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 通讯地址到client_id的转换
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
public static function addressToClientId($local_ip, $local_port, $connection_id)
|
||||||
|
{
|
||||||
|
return bin2hex(pack('NnN', $local_ip, $local_port, $connection_id));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* client_id到通讯地址的转换
|
||||||
|
* @return array
|
||||||
|
*/
|
||||||
|
public static function clientIdToAddress($client_id)
|
||||||
|
{
|
||||||
|
if (strlen($client_id) !== 20) {
|
||||||
|
throw new \Exception("client_id $client_id is invalid");
|
||||||
|
}
|
||||||
|
return unpack('Nlocal_ip/nlocal_port/Nconnection_id' ,pack('H*', $client_id));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
1400
extend/GatewayClient/Gateway.php
Normal file
1400
extend/GatewayClient/Gateway.php
Normal file
File diff suppressed because it is too large
Load Diff
187
extend/GatewayClient/GatewayProtocol.php
Normal file
187
extend/GatewayClient/GatewayProtocol.php
Normal file
@ -0,0 +1,187 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace GatewayClient;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This file is part of workerman.
|
||||||
|
*
|
||||||
|
* Licensed under The MIT License
|
||||||
|
* For full copyright and license information, please see the MIT-LICENSE.txt
|
||||||
|
* Redistributions of files must retain the above copyright notice.
|
||||||
|
*
|
||||||
|
* @author walkor<walkor@workerman.net>
|
||||||
|
* @copyright walkor<walkor@workerman.net>
|
||||||
|
* @link http://www.workerman.net/
|
||||||
|
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gateway 与 Worker 间通讯的二进制协议
|
||||||
|
*
|
||||||
|
* struct GatewayProtocol
|
||||||
|
* {
|
||||||
|
* unsigned int pack_len,
|
||||||
|
* unsigned char cmd,//命令字
|
||||||
|
* unsigned int local_ip,
|
||||||
|
* unsigned short local_port,
|
||||||
|
* unsigned int client_ip,
|
||||||
|
* unsigned short client_port,
|
||||||
|
* unsigned int connection_id,
|
||||||
|
* unsigned char flag,
|
||||||
|
* unsigned short gateway_port,
|
||||||
|
* unsigned int ext_len,
|
||||||
|
* char[ext_len] ext_data,
|
||||||
|
* char[pack_length-HEAD_LEN] body//包体
|
||||||
|
* }
|
||||||
|
* NCNnNnNCnN
|
||||||
|
*/
|
||||||
|
class GatewayProtocol
|
||||||
|
{
|
||||||
|
// 发给worker,gateway有一个新的连接
|
||||||
|
const CMD_ON_CONNECT = 1;
|
||||||
|
// 发给worker的,客户端有消息
|
||||||
|
const CMD_ON_MESSAGE = 3;
|
||||||
|
// 发给worker上的关闭链接事件
|
||||||
|
const CMD_ON_CLOSE = 4;
|
||||||
|
// 发给gateway的向单个用户发送数据
|
||||||
|
const CMD_SEND_TO_ONE = 5;
|
||||||
|
// 发给gateway的向所有用户发送数据
|
||||||
|
const CMD_SEND_TO_ALL = 6;
|
||||||
|
// 发给gateway的踢出用户
|
||||||
|
// 1、如果有待发消息,将在发送完后立即销毁用户连接
|
||||||
|
// 2、如果无待发消息,将立即销毁用户连接
|
||||||
|
const CMD_KICK = 7;
|
||||||
|
// 发给gateway的立即销毁用户连接
|
||||||
|
const CMD_DESTROY = 8;
|
||||||
|
// 发给gateway,通知用户session更新
|
||||||
|
const CMD_UPDATE_SESSION = 9;
|
||||||
|
// 获取在线状态
|
||||||
|
const CMD_GET_ALL_CLIENT_SESSIONS = 10;
|
||||||
|
// 判断是否在线
|
||||||
|
const CMD_IS_ONLINE = 11;
|
||||||
|
// client_id绑定到uid
|
||||||
|
const CMD_BIND_UID = 12;
|
||||||
|
// 解绑
|
||||||
|
const CMD_UNBIND_UID = 13;
|
||||||
|
// 向uid发送数据
|
||||||
|
const CMD_SEND_TO_UID = 14;
|
||||||
|
// 根据uid获取绑定的clientid
|
||||||
|
const CMD_GET_CLIENT_ID_BY_UID = 15;
|
||||||
|
// 批量获取uid列表批量获取绑定的clientid
|
||||||
|
const CMD_BATCH_GET_CLIENT_ID_BY_UID = 16;
|
||||||
|
// 加入组
|
||||||
|
const CMD_JOIN_GROUP = 20;
|
||||||
|
// 离开组
|
||||||
|
const CMD_LEAVE_GROUP = 21;
|
||||||
|
// 向组成员发消息
|
||||||
|
const CMD_SEND_TO_GROUP = 22;
|
||||||
|
// 获取组成员
|
||||||
|
const CMD_GET_CLIENT_SESSIONS_BY_GROUP = 23;
|
||||||
|
// 获取组在线连接数
|
||||||
|
const CMD_GET_CLIENT_COUNT_BY_GROUP = 24;
|
||||||
|
// 按照条件查找
|
||||||
|
const CMD_SELECT = 25;
|
||||||
|
// 获取在线的群组ID
|
||||||
|
const CMD_GET_GROUP_ID_LIST = 26;
|
||||||
|
// 取消分组
|
||||||
|
const CMD_UNGROUP = 27;
|
||||||
|
// 批量获取群组ID内客户端个数
|
||||||
|
const CMD_BATCH_GET_CLIENT_COUNT_BY_GROUP = 28;
|
||||||
|
// worker连接gateway事件
|
||||||
|
const CMD_WORKER_CONNECT = 200;
|
||||||
|
// 心跳
|
||||||
|
const CMD_PING = 201;
|
||||||
|
// GatewayClient连接gateway事件
|
||||||
|
const CMD_GATEWAY_CLIENT_CONNECT = 202;
|
||||||
|
// 根据client_id获取session
|
||||||
|
const CMD_GET_SESSION_BY_CLIENT_ID = 203;
|
||||||
|
// 发给gateway,覆盖session
|
||||||
|
const CMD_SET_SESSION = 204;
|
||||||
|
// 当websocket握手时触发,只有websocket协议支持此命令字
|
||||||
|
const CMD_ON_WEBSOCKET_CONNECT = 205;
|
||||||
|
// 包体是标量
|
||||||
|
const FLAG_BODY_IS_SCALAR = 0x01;
|
||||||
|
// 通知gateway在send时不调用协议encode方法,在广播组播时提升性能
|
||||||
|
const FLAG_NOT_CALL_ENCODE = 0x02;
|
||||||
|
/**
|
||||||
|
* 包头长度
|
||||||
|
*
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
const HEAD_LEN = 28;
|
||||||
|
public static $empty = array(
|
||||||
|
'cmd' => 0,
|
||||||
|
'local_ip' => 0,
|
||||||
|
'local_port' => 0,
|
||||||
|
'client_ip' => 0,
|
||||||
|
'client_port' => 0,
|
||||||
|
'connection_id' => 0,
|
||||||
|
'flag' => 0,
|
||||||
|
'gateway_port' => 0,
|
||||||
|
'ext_data' => '',
|
||||||
|
'body' => '',
|
||||||
|
);
|
||||||
|
/**
|
||||||
|
* 返回包长度
|
||||||
|
*
|
||||||
|
* @param string $buffer
|
||||||
|
* @return int return current package length
|
||||||
|
*/
|
||||||
|
public static function input($buffer)
|
||||||
|
{
|
||||||
|
if (strlen($buffer) < self::HEAD_LEN) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
$data = unpack("Npack_len", $buffer);
|
||||||
|
return $data['pack_len'];
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* 获取整个包的 buffer
|
||||||
|
*
|
||||||
|
* @param mixed $data
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
public static function encode($data)
|
||||||
|
{
|
||||||
|
$flag = (int)is_scalar($data['body']);
|
||||||
|
if (!$flag) {
|
||||||
|
$data['body'] = serialize($data['body']);
|
||||||
|
}
|
||||||
|
$data['flag'] |= $flag;
|
||||||
|
$ext_len = strlen($data['ext_data']);
|
||||||
|
$package_len = self::HEAD_LEN + $ext_len + strlen($data['body']);
|
||||||
|
return pack("NCNnNnNCnN", $package_len,
|
||||||
|
$data['cmd'], $data['local_ip'],
|
||||||
|
$data['local_port'], $data['client_ip'],
|
||||||
|
$data['client_port'], $data['connection_id'],
|
||||||
|
$data['flag'], $data['gateway_port'],
|
||||||
|
$ext_len) . $data['ext_data'] . $data['body'];
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* 从二进制数据转换为数组
|
||||||
|
*
|
||||||
|
* @param string $buffer
|
||||||
|
* @return array
|
||||||
|
*/
|
||||||
|
public static function decode($buffer)
|
||||||
|
{
|
||||||
|
$data = unpack("Npack_len/Ccmd/Nlocal_ip/nlocal_port/Nclient_ip/nclient_port/Nconnection_id/Cflag/ngateway_port/Next_len",
|
||||||
|
$buffer);
|
||||||
|
if ($data['ext_len'] > 0) {
|
||||||
|
$data['ext_data'] = substr($buffer, self::HEAD_LEN, $data['ext_len']);
|
||||||
|
if ($data['flag'] & self::FLAG_BODY_IS_SCALAR) {
|
||||||
|
$data['body'] = substr($buffer, self::HEAD_LEN + $data['ext_len']);
|
||||||
|
} else {
|
||||||
|
$data['body'] = unserialize(substr($buffer, self::HEAD_LEN + $data['ext_len']));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
$data['ext_data'] = '';
|
||||||
|
if ($data['flag'] & self::FLAG_BODY_IS_SCALAR) {
|
||||||
|
$data['body'] = substr($buffer, self::HEAD_LEN);
|
||||||
|
} else {
|
||||||
|
$data['body'] = unserialize(substr($buffer, self::HEAD_LEN));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return $data;
|
||||||
|
}
|
||||||
|
}
|
||||||
21
extend/GatewayClient/MIT-LICENSE.txt
Normal file
21
extend/GatewayClient/MIT-LICENSE.txt
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
The MIT License
|
||||||
|
|
||||||
|
Copyright (c) 2009-2015 walkor<walkor@workerman.net> and contributors (see https://github.com/walkor/workerman/contributors)
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in
|
||||||
|
all copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||||
|
THE SOFTWARE.
|
||||||
91
extend/GatewayClient/README.md
Normal file
91
extend/GatewayClient/README.md
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
# GatewayClient
|
||||||
|
|
||||||
|
GatewayWorker1.0请使用[1.0版本的GatewayClient](https://github.com/walkor/GatewayClient/releases/tag/v1.0)
|
||||||
|
|
||||||
|
GatewayWorker2.0.1-2.0.4请使用[2.0.4版本的GatewayClient](https://github.com/walkor/GatewayClient/releases/tag/2.0.4)
|
||||||
|
|
||||||
|
GatewayWorker2.0.5-2.0.6版本请使用[2.0.6版本的GatewayClient](https://github.com/walkor/GatewayClient/releases/tag/2.0.6)
|
||||||
|
|
||||||
|
GatewayWorker2.0.7版本请使用 [2.0.7版本的GatewayClient](https://github.com/walkor/GatewayClient/releases/tag/v2.0.7)
|
||||||
|
|
||||||
|
GatewayWorker3.0.0-3.0.7版本请使用 [3.0.0版本的GatewayClient](https://github.com/walkor/GatewayClient/releases/tag/v3.0.0)<br>
|
||||||
|
|
||||||
|
GatewayWorker3.0.8及以上版本请使用 [3.0.13版本的GatewayClient](https://github.com/walkor/GatewayClient/releases/tag/v3.0.13)<br>
|
||||||
|
|
||||||
|
注意:GatewayClient3.0.0以后支持composer并加了命名空间```GatewayClient``` <br>
|
||||||
|
|
||||||
|
[如何查看GatewayWorker版本请点击这里](http://doc2.workerman.net/get-gateway-version.html)
|
||||||
|
|
||||||
|
## 安装
|
||||||
|
**方法一**
|
||||||
|
```
|
||||||
|
composer require workerman/gatewayclient
|
||||||
|
```
|
||||||
|
使用时引入`vendor/autoload.php` 类似如下:
|
||||||
|
```php
|
||||||
|
use GatewayClient\Gateway;
|
||||||
|
require_once '真实路径/vendor/autoload.php';
|
||||||
|
```
|
||||||
|
|
||||||
|
**方法二**
|
||||||
|
下载源文件到任意目录,手动引入 `GatewayClient/Gateway.php`, 类似如下:
|
||||||
|
```php
|
||||||
|
use GatewayClient\Gateway;
|
||||||
|
require_once '真实路径/GatewayClient/Gateway.php';
|
||||||
|
```
|
||||||
|
|
||||||
|
## 使用
|
||||||
|
```php
|
||||||
|
// GatewayClient 3.0.0版本以后加了命名空间
|
||||||
|
use GatewayClient\Gateway;
|
||||||
|
|
||||||
|
// composer安装
|
||||||
|
require_once '真实路径/vendor/autoload.php';
|
||||||
|
|
||||||
|
// 源文件引用
|
||||||
|
//require_once '真实路径/GatewayClient/Gateway.php';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* === 指定registerAddress表明与哪个GatewayWorker(集群)通讯。===
|
||||||
|
* GatewayWorker里用Register服务来区分集群,即一个GatewayWorker(集群)只有一个Register服务,
|
||||||
|
* GatewayClient要与之通讯必须知道这个Register服务地址才能通讯,这个地址格式为 ip:端口 ,
|
||||||
|
* 其中ip为Register服务运行的ip(如果GatewayWorker是单机部署则ip就是运行GatewayWorker的服务器ip),
|
||||||
|
* 端口是对应ip的服务器上start_register.php文件中监听的端口,也就是GatewayWorker启动时看到的Register的端口。
|
||||||
|
* GatewayClient要想推送数据给客户端,必须知道客户端位于哪个GatewayWorker(集群),
|
||||||
|
* 然后去连这个GatewayWorker(集群)Register服务的 ip:端口,才能与对应GatewayWorker(集群)通讯。
|
||||||
|
* 这个 ip:端口 在GatewayClient一侧使用 Gateway::$registerAddress 来指定。
|
||||||
|
*
|
||||||
|
* === 如果GatewayClient和GatewayWorker不在同一台服务器需要以下步骤 ===
|
||||||
|
* 1、需要设置start_gateway.php中的lanIp为实际的本机内网ip(如不在一个局域网也可以设置成外网ip),设置完后要重启GatewayWorker
|
||||||
|
* 2、GatewayClient这里的Gateway::$registerAddress的地址填写实际运行Register的服务器ip和端口
|
||||||
|
* 3、需要开启GatewayWorker所在服务器的防火墙,让以下端口可以被GatewayClient所在服务器访问,
|
||||||
|
* 端口包括Rgister服务的端口以及start_gateway.php中lanIp与startPort指定的几个端口
|
||||||
|
*
|
||||||
|
* === 如果GatewayClient和GatewayWorker在同一台服务器 ===
|
||||||
|
* GatewayClient和Register服务都在一台服务器上,ip填写127.0.0.1及即可,无需其它设置。
|
||||||
|
**/
|
||||||
|
Gateway::$registerAddress = '127.0.0.1:1236';
|
||||||
|
|
||||||
|
// GatewayClient支持GatewayWorker中的所有接口(Gateway::closeCurrentClient Gateway::sendToCurrentClient除外)
|
||||||
|
Gateway::sendToAll($data);
|
||||||
|
Gateway::sendToClient($client_id, $data);
|
||||||
|
Gateway::closeClient($client_id);
|
||||||
|
Gateway::isOnline($client_id);
|
||||||
|
Gateway::bindUid($client_id, $uid);
|
||||||
|
Gateway::isUidOnline($uid);
|
||||||
|
Gateway::isUidsOnline($uids);
|
||||||
|
Gateway::getClientIdByUid($uid);
|
||||||
|
Gateway::unbindUid($client_id, $uid);
|
||||||
|
Gateway::sendToUid($uid, $dat);
|
||||||
|
Gateway::joinGroup($client_id, $group);
|
||||||
|
Gateway::sendToGroup($group, $data);
|
||||||
|
Gateway::leaveGroup($client_id, $group);
|
||||||
|
Gateway::getClientCountByGroup($group);
|
||||||
|
Gateway::getClientSessionsByGroup($group);
|
||||||
|
Gateway::getAllClientCount();
|
||||||
|
Gateway::getAllClientSessions();
|
||||||
|
Gateway::setSession($client_id, $session);
|
||||||
|
Gateway::updateSession($client_id, $session);
|
||||||
|
Gateway::getSession($client_id);
|
||||||
|
```
|
||||||
|
|
||||||
11
extend/GatewayClient/composer.json
Normal file
11
extend/GatewayClient/composer.json
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
{
|
||||||
|
"name" : "workerman/gatewayclient",
|
||||||
|
"type" : "library",
|
||||||
|
"homepage": "http://www.workerman.net",
|
||||||
|
"license" : "MIT",
|
||||||
|
"autoload": {
|
||||||
|
"psr-4": {
|
||||||
|
"GatewayClient\\": "./"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
522
extend/GatewayWorker/BusinessWorker.php
Normal file
522
extend/GatewayWorker/BusinessWorker.php
Normal file
@ -0,0 +1,522 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This file is part of workerman.
|
||||||
|
*
|
||||||
|
* Licensed under The MIT License
|
||||||
|
* For full copyright and license information, please see the MIT-LICENSE.txt
|
||||||
|
* Redistributions of files must retain the above copyright notice.
|
||||||
|
*
|
||||||
|
* @author walkor<walkor@workerman.net>
|
||||||
|
* @copyright walkor<walkor@workerman.net>
|
||||||
|
* @link http://www.workerman.net/
|
||||||
|
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
||||||
|
*/
|
||||||
|
namespace GatewayWorker;
|
||||||
|
|
||||||
|
use RuntimeException;
|
||||||
|
use Workerman\Connection\TcpConnection;
|
||||||
|
|
||||||
|
use Workerman\Events\Swoole;
|
||||||
|
use Workerman\Events\Swow;
|
||||||
|
use Workerman\Worker;
|
||||||
|
use Workerman\Timer;
|
||||||
|
use Workerman\Connection\AsyncTcpConnection;
|
||||||
|
use GatewayWorker\Protocols\GatewayProtocol;
|
||||||
|
use GatewayWorker\Lib\Context;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* BusinessWorker 用于处理Gateway转发来的数据
|
||||||
|
*
|
||||||
|
* @author walkor<walkor@workerman.net>
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class BusinessWorker extends Worker
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* 保存与 gateway 的连接 connection 对象
|
||||||
|
*
|
||||||
|
* @var array
|
||||||
|
*/
|
||||||
|
public $gatewayConnections = array();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 注册中心地址
|
||||||
|
*
|
||||||
|
* @var string|array
|
||||||
|
*/
|
||||||
|
public $registerAddress = '127.0.0.1:1236';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 事件处理类,默认是 Event 类
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
public $eventHandler = 'Events';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 秘钥
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
public $secretKey = '';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* businessWorker进程将消息转发给gateway进程的发送缓冲区大小
|
||||||
|
*
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
public $sendToGatewayBufferSize = 10240000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 保存用户设置的 worker 启动回调
|
||||||
|
*
|
||||||
|
* @var callable|null
|
||||||
|
*/
|
||||||
|
protected $_onWorkerStart = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 保存用户设置的 workerReload 回调
|
||||||
|
*
|
||||||
|
* @var callable|null
|
||||||
|
*/
|
||||||
|
protected $_onWorkerReload = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 保存用户设置的 workerStop 回调
|
||||||
|
*
|
||||||
|
* @var callable|null
|
||||||
|
*/
|
||||||
|
protected $_onWorkerStop= null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 到注册中心的连接
|
||||||
|
*
|
||||||
|
* @var AsyncTcpConnection
|
||||||
|
*/
|
||||||
|
protected $_registerConnection = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处于连接状态的 gateway 通讯地址
|
||||||
|
*
|
||||||
|
* @var array
|
||||||
|
*/
|
||||||
|
protected $_connectingGatewayAddresses = array();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 所有 geteway 内部通讯地址
|
||||||
|
*
|
||||||
|
* @var array
|
||||||
|
*/
|
||||||
|
protected $_gatewayAddresses = array();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 等待连接个 gateway 地址
|
||||||
|
*
|
||||||
|
* @var array
|
||||||
|
*/
|
||||||
|
protected $_waitingConnectGatewayAddresses = array();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Event::onConnect 回调
|
||||||
|
*
|
||||||
|
* @var callable|null
|
||||||
|
*/
|
||||||
|
protected $_eventOnConnect = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Event::onMessage 回调
|
||||||
|
*
|
||||||
|
* @var callable|null
|
||||||
|
*/
|
||||||
|
protected $_eventOnMessage = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Event::onClose 回调
|
||||||
|
*
|
||||||
|
* @var callable|null
|
||||||
|
*/
|
||||||
|
protected $_eventOnClose = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* websocket回调
|
||||||
|
*
|
||||||
|
* @var null
|
||||||
|
*/
|
||||||
|
protected $_eventOnWebSocketConnect = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SESSION 版本缓存
|
||||||
|
*
|
||||||
|
* @var array
|
||||||
|
*/
|
||||||
|
protected $_sessionVersion = array();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 用于保持长连接的心跳时间间隔
|
||||||
|
*
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
const PERSISTENCE_CONNECTION_PING_INTERVAL = 25;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构造函数
|
||||||
|
*
|
||||||
|
* @param string $socket_name
|
||||||
|
* @param array $context_option
|
||||||
|
*/
|
||||||
|
public function __construct($socket_name = '', $context_option = array())
|
||||||
|
{
|
||||||
|
parent::__construct($socket_name, $context_option);
|
||||||
|
$backrace = debug_backtrace();
|
||||||
|
$this->_autoloadRootPath = dirname($backrace[0]['file']);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritdoc}
|
||||||
|
*/
|
||||||
|
public function run(): void
|
||||||
|
{
|
||||||
|
$this->_onWorkerStart = $this->onWorkerStart;
|
||||||
|
$this->_onWorkerReload = $this->onWorkerReload;
|
||||||
|
$this->_onWorkerStop = $this->onWorkerStop;
|
||||||
|
$this->onWorkerStop = array($this, 'onWorkerStop');
|
||||||
|
$this->onWorkerStart = array($this, 'onWorkerStart');
|
||||||
|
$this->onWorkerReload = array($this, 'onWorkerReload');
|
||||||
|
parent::run();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 当进程启动时一些初始化工作
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
protected function onWorkerStart()
|
||||||
|
{
|
||||||
|
if (function_exists('opcache_reset')) {
|
||||||
|
opcache_reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!class_exists('\Protocols\GatewayProtocol')) {
|
||||||
|
class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!is_array($this->registerAddress)) {
|
||||||
|
$this->registerAddress = array($this->registerAddress);
|
||||||
|
}
|
||||||
|
$this->connectToRegister();
|
||||||
|
|
||||||
|
\GatewayWorker\Lib\Gateway::setBusinessWorker($this);
|
||||||
|
\GatewayWorker\Lib\Gateway::$secretKey = $this->secretKey;
|
||||||
|
if ($this->_onWorkerStart) {
|
||||||
|
call_user_func($this->_onWorkerStart, $this);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_callable($this->eventHandler . '::onWorkerStart')) {
|
||||||
|
call_user_func($this->eventHandler . '::onWorkerStart', $this);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置回调
|
||||||
|
if (is_callable($this->eventHandler . '::onConnect')) {
|
||||||
|
$this->_eventOnConnect = $this->eventHandler . '::onConnect';
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_callable($this->eventHandler . '::onMessage')) {
|
||||||
|
$this->_eventOnMessage = $this->eventHandler . '::onMessage';
|
||||||
|
} else {
|
||||||
|
echo "Waring: {$this->eventHandler}::onMessage is not callable\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_callable($this->eventHandler . '::onClose')) {
|
||||||
|
$this->_eventOnClose = $this->eventHandler . '::onClose';
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_callable($this->eventHandler . '::onWebSocketConnect')) {
|
||||||
|
$this->_eventOnWebSocketConnect = $this->eventHandler . '::onWebSocketConnect';
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* onWorkerReload 回调
|
||||||
|
*
|
||||||
|
* @param Worker $worker
|
||||||
|
*/
|
||||||
|
protected function onWorkerReload($worker)
|
||||||
|
{
|
||||||
|
// 防止进程立刻退出
|
||||||
|
$worker->reloadable = false;
|
||||||
|
// 延迟 0.05 秒退出,避免 BusinessWorker 瞬间全部退出导致没有可用的 BusinessWorker 进程
|
||||||
|
Timer::add(0.05, array('Workerman\Worker', 'stopAll'));
|
||||||
|
// 执行用户定义的 onWorkerReload 回调
|
||||||
|
if ($this->_onWorkerReload) {
|
||||||
|
call_user_func($this->_onWorkerReload, $this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 当进程关闭时一些清理工作
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
protected function onWorkerStop()
|
||||||
|
{
|
||||||
|
if ($this->_onWorkerStop) {
|
||||||
|
call_user_func($this->_onWorkerStop, $this);
|
||||||
|
}
|
||||||
|
if (is_callable($this->eventHandler . '::onWorkerStop')) {
|
||||||
|
call_user_func($this->eventHandler . '::onWorkerStop', $this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 连接服务注册中心
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function connectToRegister()
|
||||||
|
{
|
||||||
|
foreach ($this->registerAddress as $register_address) {
|
||||||
|
$register_connection = new AsyncTcpConnection("text://{$register_address}");
|
||||||
|
$secret_key = $this->secretKey;
|
||||||
|
$register_connection->onConnect = function () use ($register_connection, $secret_key, $register_address) {
|
||||||
|
$register_connection->send('{"event":"worker_connect","secret_key":"' . $secret_key . '"}');
|
||||||
|
// 如果Register服务器不在本地服务器,则需要保持心跳
|
||||||
|
if (strpos($register_address, '127.0.0.1') !== 0) {
|
||||||
|
$register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) {
|
||||||
|
$register_connection->send('{"event":"ping"}');
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
$register_connection->onClose = function ($register_connection) {
|
||||||
|
if(!empty($register_connection->ping_timer)) {
|
||||||
|
Timer::del($register_connection->ping_timer);
|
||||||
|
}
|
||||||
|
$register_connection->reconnect(1);
|
||||||
|
};
|
||||||
|
$register_connection->onMessage = array($this, 'onRegisterConnectionMessage');
|
||||||
|
$register_connection->connect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 当注册中心发来消息时
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function onRegisterConnectionMessage($register_connection, $data)
|
||||||
|
{
|
||||||
|
$data = json_decode($data, true);
|
||||||
|
if (!isset($data['event'])) {
|
||||||
|
echo "Received bad data from Register\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
$event = $data['event'];
|
||||||
|
switch ($event) {
|
||||||
|
case 'broadcast_addresses':
|
||||||
|
if (!is_array($data['addresses'])) {
|
||||||
|
echo "Received bad data from Register. Addresses empty\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
$addresses = $data['addresses'];
|
||||||
|
$this->_gatewayAddresses = array();
|
||||||
|
foreach ($addresses as $addr) {
|
||||||
|
$this->_gatewayAddresses[$addr] = $addr;
|
||||||
|
}
|
||||||
|
$this->checkGatewayConnections($addresses);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
echo "Receive bad event:$event from Register.\n";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 当 gateway 转发来数据时
|
||||||
|
*
|
||||||
|
* @param TcpConnection $connection
|
||||||
|
* @param mixed $data
|
||||||
|
*/
|
||||||
|
public function onGatewayMessage($connection, $data)
|
||||||
|
{
|
||||||
|
$cmd = $data['cmd'];
|
||||||
|
if ($cmd === GatewayProtocol::CMD_PING) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// 上下文数据
|
||||||
|
Context::$client_ip = $data['client_ip'];
|
||||||
|
Context::$client_port = $data['client_port'];
|
||||||
|
Context::$local_ip = $data['local_ip'];
|
||||||
|
Context::$local_port = $data['local_port'];
|
||||||
|
Context::$connection_id = $data['connection_id'];
|
||||||
|
Context::$client_id = Context::addressToClientId($data['local_ip'], $data['local_port'],
|
||||||
|
$data['connection_id']);
|
||||||
|
// $_SERVER 变量
|
||||||
|
$_SERVER = array(
|
||||||
|
'REMOTE_ADDR' => long2ip($data['client_ip']),
|
||||||
|
'REMOTE_PORT' => $data['client_port'],
|
||||||
|
'GATEWAY_ADDR' => long2ip($data['local_ip']),
|
||||||
|
'GATEWAY_PORT' => $data['gateway_port'],
|
||||||
|
'GATEWAY_CLIENT_ID' => Context::$client_id,
|
||||||
|
);
|
||||||
|
// 检查session版本,如果是过期的session数据则拉取最新的数据
|
||||||
|
if ($cmd !== GatewayProtocol::CMD_ON_CLOSE && isset($this->_sessionVersion[Context::$client_id]) && $this->_sessionVersion[Context::$client_id] !== crc32($data['ext_data'])) {
|
||||||
|
$_SESSION = Context::$old_session = \GatewayWorker\Lib\Gateway::getSession(Context::$client_id);
|
||||||
|
$this->_sessionVersion[Context::$client_id] = crc32($data['ext_data']);
|
||||||
|
} else {
|
||||||
|
if (!isset($this->_sessionVersion[Context::$client_id])) {
|
||||||
|
$this->_sessionVersion[Context::$client_id] = crc32($data['ext_data']);
|
||||||
|
}
|
||||||
|
// 尝试解析 session
|
||||||
|
if ($data['ext_data'] != '') {
|
||||||
|
Context::$old_session = $_SESSION = Context::sessionDecode($data['ext_data']);
|
||||||
|
} else {
|
||||||
|
Context::$old_session = $_SESSION = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 尝试执行 Event::onConnection、Event::onMessage、Event::onClose
|
||||||
|
switch ($cmd) {
|
||||||
|
case GatewayProtocol::CMD_ON_CONNECT:
|
||||||
|
if ($this->_eventOnConnect) {
|
||||||
|
call_user_func($this->_eventOnConnect, Context::$client_id);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case GatewayProtocol::CMD_ON_MESSAGE:
|
||||||
|
if ($this->_eventOnMessage) {
|
||||||
|
call_user_func($this->_eventOnMessage, Context::$client_id, $data['body']);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case GatewayProtocol::CMD_ON_CLOSE:
|
||||||
|
unset($this->_sessionVersion[Context::$client_id]);
|
||||||
|
if ($this->_eventOnClose) {
|
||||||
|
call_user_func($this->_eventOnClose, Context::$client_id);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case GatewayProtocol::CMD_ON_WEBSOCKET_CONNECT:
|
||||||
|
if ($this->_eventOnWebSocketConnect) {
|
||||||
|
call_user_func($this->_eventOnWebSocketConnect, Context::$client_id, $data['body']);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// session 必须是数组
|
||||||
|
if ($_SESSION !== null && !is_array($_SESSION)) {
|
||||||
|
throw new RuntimeException('$_SESSION must be an array. But $_SESSION=' . var_export($_SESSION, true) . ' is not array.');
|
||||||
|
}
|
||||||
|
|
||||||
|
// 判断 session 是否被更改
|
||||||
|
if ($_SESSION !== Context::$old_session && $cmd !== GatewayProtocol::CMD_ON_CLOSE) {
|
||||||
|
// 如果是swoole或者swow环境,不允许使用 $_SESSION 全局变量,协程会导致 $_SESSION 污染
|
||||||
|
if (Worker::$eventLoopClass === Swoole::class || Worker::$eventLoopClass === Swow::class) {
|
||||||
|
echo new RuntimeException('Can not use $_SESSION in swoole or swow environment. Please use \GatewayWorker\Lib\Gateway::setSession to set session data.');
|
||||||
|
}
|
||||||
|
$session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
|
||||||
|
\GatewayWorker\Lib\Gateway::setSocketSession(Context::$client_id, $session_str_now);
|
||||||
|
$this->_sessionVersion[Context::$client_id] = crc32($session_str_now);
|
||||||
|
}
|
||||||
|
|
||||||
|
Context::clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 当与 Gateway 的连接断开时触发
|
||||||
|
*
|
||||||
|
* @param TcpConnection $connection
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function onGatewayClose($connection)
|
||||||
|
{
|
||||||
|
$addr = $connection->remoteAddr;
|
||||||
|
unset($this->gatewayConnections[$addr], $this->_connectingGatewayAddresses[$addr]);
|
||||||
|
if (isset($this->_gatewayAddresses[$addr]) && !isset($this->_waitingConnectGatewayAddresses[$addr])) {
|
||||||
|
Timer::add(1, array($this, 'tryToConnectGateway'), array($addr), false);
|
||||||
|
$this->_waitingConnectGatewayAddresses[$addr] = $addr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 尝试连接 Gateway 内部通讯地址
|
||||||
|
*
|
||||||
|
* @param string $addr
|
||||||
|
*/
|
||||||
|
public function tryToConnectGateway($addr)
|
||||||
|
{
|
||||||
|
if (!isset($this->gatewayConnections[$addr]) && !isset($this->_connectingGatewayAddresses[$addr]) && isset($this->_gatewayAddresses[$addr])) {
|
||||||
|
$gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
|
||||||
|
$gateway_connection->remoteAddr = $addr;
|
||||||
|
$gateway_connection->onConnect = array($this, 'onConnectGateway');
|
||||||
|
$gateway_connection->onMessage = array($this, 'onGatewayMessage');
|
||||||
|
$gateway_connection->onClose = array($this, 'onGatewayClose');
|
||||||
|
$gateway_connection->onError = array($this, 'onGatewayError');
|
||||||
|
$gateway_connection->maxSendBufferSize = $this->sendToGatewayBufferSize;
|
||||||
|
if (TcpConnection::$defaultMaxSendBufferSize == $gateway_connection->maxSendBufferSize) {
|
||||||
|
$gateway_connection->maxSendBufferSize = 50 * 1024 * 1024;
|
||||||
|
}
|
||||||
|
$gateway_data = GatewayProtocol::$empty;
|
||||||
|
$gateway_data['cmd'] = GatewayProtocol::CMD_WORKER_CONNECT;
|
||||||
|
$gateway_data['body'] = json_encode(array(
|
||||||
|
'worker_key' =>"{$this->name}:{$this->id}",
|
||||||
|
'secret_key' => $this->secretKey,
|
||||||
|
));
|
||||||
|
$gateway_connection->send($gateway_data);
|
||||||
|
$gateway_connection->connect();
|
||||||
|
$this->_connectingGatewayAddresses[$addr] = $addr;
|
||||||
|
}
|
||||||
|
unset($this->_waitingConnectGatewayAddresses[$addr]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 检查 gateway 的通信端口是否都已经连
|
||||||
|
* 如果有未连接的端口,则尝试连接
|
||||||
|
*
|
||||||
|
* @param array $addresses_list
|
||||||
|
*/
|
||||||
|
public function checkGatewayConnections($addresses_list)
|
||||||
|
{
|
||||||
|
if (empty($addresses_list)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
foreach ($addresses_list as $addr) {
|
||||||
|
if (!isset($this->_waitingConnectGatewayAddresses[$addr])) {
|
||||||
|
$this->tryToConnectGateway($addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 当连接上 gateway 的通讯端口时触发
|
||||||
|
* 将连接 connection 对象保存起来
|
||||||
|
*
|
||||||
|
* @param TcpConnection $connection
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function onConnectGateway($connection)
|
||||||
|
{
|
||||||
|
$this->gatewayConnections[$connection->remoteAddr] = $connection;
|
||||||
|
unset($this->_connectingGatewayAddresses[$connection->remoteAddr], $this->_waitingConnectGatewayAddresses[$connection->remoteAddr]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 当与 gateway 的连接出现错误时触发
|
||||||
|
*
|
||||||
|
* @param TcpConnection $connection
|
||||||
|
* @param int $error_no
|
||||||
|
* @param string $error_msg
|
||||||
|
*/
|
||||||
|
public function onGatewayError($connection, $error_no, $error_msg)
|
||||||
|
{
|
||||||
|
echo "GatewayConnection Error : $error_no ,$error_msg\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取所有 Gateway 内部通讯地址
|
||||||
|
*
|
||||||
|
* @return array
|
||||||
|
*/
|
||||||
|
public function getAllGatewayAddresses()
|
||||||
|
{
|
||||||
|
return $this->_gatewayAddresses;
|
||||||
|
}
|
||||||
|
}
|
||||||
1216
extend/GatewayWorker/Gateway.php
Normal file
1216
extend/GatewayWorker/Gateway.php
Normal file
File diff suppressed because it is too large
Load Diff
136
extend/GatewayWorker/Lib/Context.php
Normal file
136
extend/GatewayWorker/Lib/Context.php
Normal file
@ -0,0 +1,136 @@
|
|||||||
|
<?php
|
||||||
|
/**
|
||||||
|
* This file is part of workerman.
|
||||||
|
*
|
||||||
|
* Licensed under The MIT License
|
||||||
|
* For full copyright and license information, please see the MIT-LICENSE.txt
|
||||||
|
* Redistributions of files must retain the above copyright notice.
|
||||||
|
*
|
||||||
|
* @author walkor<walkor@workerman.net>
|
||||||
|
* @copyright walkor<walkor@workerman.net>
|
||||||
|
* @link http://www.workerman.net/
|
||||||
|
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
||||||
|
*/
|
||||||
|
namespace GatewayWorker\Lib;
|
||||||
|
|
||||||
|
use Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 上下文 包含当前用户 uid, 内部通信 local_ip local_port socket_id,以及客户端 client_ip client_port
|
||||||
|
*/
|
||||||
|
class Context
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* 内部通讯 id
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
public static $local_ip;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 内部通讯端口
|
||||||
|
*
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
public static $local_port;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 客户端 ip
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
public static $client_ip;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 客户端端口
|
||||||
|
*
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
public static $client_port;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* client_id
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
public static $client_id;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 连接 connection->id
|
||||||
|
*
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
public static $connection_id;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 旧的session
|
||||||
|
*
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
public static $old_session;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 编码 session
|
||||||
|
*
|
||||||
|
* @param mixed $session_data
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
public static function sessionEncode($session_data = '')
|
||||||
|
{
|
||||||
|
if ($session_data !== '') {
|
||||||
|
return serialize($session_data);
|
||||||
|
}
|
||||||
|
return '';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 解码 session
|
||||||
|
*
|
||||||
|
* @param string $session_buffer
|
||||||
|
* @return mixed
|
||||||
|
*/
|
||||||
|
public static function sessionDecode($session_buffer)
|
||||||
|
{
|
||||||
|
return unserialize($session_buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 清除上下文
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public static function clear()
|
||||||
|
{
|
||||||
|
self::$local_ip = self::$local_port = self::$client_ip = self::$client_port =
|
||||||
|
self::$client_id = self::$connection_id = self::$old_session = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 通讯地址到 client_id 的转换
|
||||||
|
*
|
||||||
|
* @param int $local_ip
|
||||||
|
* @param int $local_port
|
||||||
|
* @param int $connection_id
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
public static function addressToClientId($local_ip, $local_port, $connection_id)
|
||||||
|
{
|
||||||
|
return bin2hex(pack('NnN', $local_ip, $local_port, $connection_id));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* client_id 到通讯地址的转换
|
||||||
|
*
|
||||||
|
* @param string $client_id
|
||||||
|
* @return array
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public static function clientIdToAddress($client_id)
|
||||||
|
{
|
||||||
|
if (strlen($client_id) !== 20) {
|
||||||
|
echo new Exception("client_id $client_id is invalid");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return unpack('Nlocal_ip/nlocal_port/Nconnection_id', pack('H*', $client_id));
|
||||||
|
}
|
||||||
|
}
|
||||||
1459
extend/GatewayWorker/Lib/Gateway.php
Normal file
1459
extend/GatewayWorker/Lib/Gateway.php
Normal file
File diff suppressed because it is too large
Load Diff
228
extend/GatewayWorker/Protocols/GatewayProtocol.php
Normal file
228
extend/GatewayWorker/Protocols/GatewayProtocol.php
Normal file
@ -0,0 +1,228 @@
|
|||||||
|
<?php
|
||||||
|
/**
|
||||||
|
* This file is part of workerman.
|
||||||
|
*
|
||||||
|
* Licensed under The MIT License
|
||||||
|
* For full copyright and license information, please see the MIT-LICENSE.txt
|
||||||
|
* Redistributions of files must retain the above copyright notice.
|
||||||
|
*
|
||||||
|
* @author walkor<walkor@workerman.net>
|
||||||
|
* @copyright walkor<walkor@workerman.net>
|
||||||
|
* @link http://www.workerman.net/
|
||||||
|
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
||||||
|
*/
|
||||||
|
namespace GatewayWorker\Protocols;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gateway 与 Worker 间通讯的二进制协议
|
||||||
|
*
|
||||||
|
* struct GatewayProtocol
|
||||||
|
* {
|
||||||
|
* unsigned int pack_len,
|
||||||
|
* unsigned char cmd,//命令字
|
||||||
|
* unsigned int local_ip,
|
||||||
|
* unsigned short local_port,
|
||||||
|
* unsigned int client_ip,
|
||||||
|
* unsigned short client_port,
|
||||||
|
* unsigned int connection_id,
|
||||||
|
* unsigned char flag,
|
||||||
|
* unsigned short gateway_port,
|
||||||
|
* unsigned int ext_len,
|
||||||
|
* char[ext_len] ext_data,
|
||||||
|
* char[pack_length-HEAD_LEN] body//包体
|
||||||
|
* }
|
||||||
|
* NCNnNnNCnN
|
||||||
|
*/
|
||||||
|
class GatewayProtocol
|
||||||
|
{
|
||||||
|
// 发给worker,gateway有一个新的连接
|
||||||
|
const CMD_ON_CONNECT = 1;
|
||||||
|
|
||||||
|
// 发给worker的,客户端有消息
|
||||||
|
const CMD_ON_MESSAGE = 3;
|
||||||
|
|
||||||
|
// 发给worker上的关闭链接事件
|
||||||
|
const CMD_ON_CLOSE = 4;
|
||||||
|
|
||||||
|
// 发给gateway的向单个用户发送数据
|
||||||
|
const CMD_SEND_TO_ONE = 5;
|
||||||
|
|
||||||
|
// 发给gateway的向所有用户发送数据
|
||||||
|
const CMD_SEND_TO_ALL = 6;
|
||||||
|
|
||||||
|
// 发给gateway的踢出用户
|
||||||
|
// 1、如果有待发消息,将在发送完后立即销毁用户连接
|
||||||
|
// 2、如果无待发消息,将立即销毁用户连接
|
||||||
|
const CMD_KICK = 7;
|
||||||
|
|
||||||
|
// 发给gateway的立即销毁用户连接
|
||||||
|
const CMD_DESTROY = 8;
|
||||||
|
|
||||||
|
// 发给gateway,通知用户session更新
|
||||||
|
const CMD_UPDATE_SESSION = 9;
|
||||||
|
|
||||||
|
// 获取在线状态
|
||||||
|
const CMD_GET_ALL_CLIENT_SESSIONS = 10;
|
||||||
|
|
||||||
|
// 判断是否在线
|
||||||
|
const CMD_IS_ONLINE = 11;
|
||||||
|
|
||||||
|
// client_id绑定到uid
|
||||||
|
const CMD_BIND_UID = 12;
|
||||||
|
|
||||||
|
// 解绑
|
||||||
|
const CMD_UNBIND_UID = 13;
|
||||||
|
|
||||||
|
// 向uid发送数据
|
||||||
|
const CMD_SEND_TO_UID = 14;
|
||||||
|
|
||||||
|
// 根据uid获取绑定的clientid
|
||||||
|
const CMD_GET_CLIENT_ID_BY_UID = 15;
|
||||||
|
|
||||||
|
// 批量获取uid列表批量获取绑定的clientid
|
||||||
|
const CMD_BATCH_GET_CLIENT_ID_BY_UID = 16;
|
||||||
|
|
||||||
|
// 加入组
|
||||||
|
const CMD_JOIN_GROUP = 20;
|
||||||
|
|
||||||
|
// 离开组
|
||||||
|
const CMD_LEAVE_GROUP = 21;
|
||||||
|
|
||||||
|
// 向组成员发消息
|
||||||
|
const CMD_SEND_TO_GROUP = 22;
|
||||||
|
|
||||||
|
// 获取组成员
|
||||||
|
const CMD_GET_CLIENT_SESSIONS_BY_GROUP = 23;
|
||||||
|
|
||||||
|
// 获取组在线连接数
|
||||||
|
const CMD_GET_CLIENT_COUNT_BY_GROUP = 24;
|
||||||
|
|
||||||
|
// 按照条件查找
|
||||||
|
const CMD_SELECT = 25;
|
||||||
|
|
||||||
|
// 获取在线的群组ID
|
||||||
|
const CMD_GET_GROUP_ID_LIST = 26;
|
||||||
|
|
||||||
|
// 取消分组
|
||||||
|
const CMD_UNGROUP = 27;
|
||||||
|
|
||||||
|
// 批量获取群组ID内客户端个数
|
||||||
|
const CMD_BATCH_GET_CLIENT_COUNT_BY_GROUP = 28;
|
||||||
|
|
||||||
|
// worker连接gateway事件
|
||||||
|
const CMD_WORKER_CONNECT = 200;
|
||||||
|
|
||||||
|
// 心跳
|
||||||
|
const CMD_PING = 201;
|
||||||
|
|
||||||
|
// GatewayClient连接gateway事件
|
||||||
|
const CMD_GATEWAY_CLIENT_CONNECT = 202;
|
||||||
|
|
||||||
|
// 根据client_id获取session
|
||||||
|
const CMD_GET_SESSION_BY_CLIENT_ID = 203;
|
||||||
|
|
||||||
|
// 发给gateway,覆盖session
|
||||||
|
const CMD_SET_SESSION = 204;
|
||||||
|
|
||||||
|
// 当websocket握手时触发,只有websocket协议支持此命令字
|
||||||
|
const CMD_ON_WEBSOCKET_CONNECT = 205;
|
||||||
|
|
||||||
|
// 包体是标量
|
||||||
|
const FLAG_BODY_IS_SCALAR = 0x01;
|
||||||
|
|
||||||
|
// 通知gateway在send时不调用协议encode方法,在广播组播时提升性能
|
||||||
|
const FLAG_NOT_CALL_ENCODE = 0x02;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 包头长度
|
||||||
|
*
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
const HEAD_LEN = 28;
|
||||||
|
|
||||||
|
public static $empty = array(
|
||||||
|
'cmd' => 0,
|
||||||
|
'local_ip' => 0,
|
||||||
|
'local_port' => 0,
|
||||||
|
'client_ip' => 0,
|
||||||
|
'client_port' => 0,
|
||||||
|
'connection_id' => 0,
|
||||||
|
'flag' => 0,
|
||||||
|
'gateway_port' => 0,
|
||||||
|
'ext_data' => '',
|
||||||
|
'body' => '',
|
||||||
|
);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回包长度
|
||||||
|
*
|
||||||
|
* @param string $buffer
|
||||||
|
* @return int return current package length
|
||||||
|
*/
|
||||||
|
public static function input($buffer)
|
||||||
|
{
|
||||||
|
if (strlen($buffer) < self::HEAD_LEN) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
$data = unpack("Npack_len", $buffer);
|
||||||
|
return $data['pack_len'];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取整个包的 buffer
|
||||||
|
*
|
||||||
|
* @param mixed $data
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
public static function encode($data)
|
||||||
|
{
|
||||||
|
$flag = (int)is_scalar($data['body']);
|
||||||
|
if (!$flag) {
|
||||||
|
$data['body'] = serialize($data['body']);
|
||||||
|
}
|
||||||
|
$data['flag'] |= $flag;
|
||||||
|
$ext_len = strlen($data['ext_data']??'');
|
||||||
|
$package_len = self::HEAD_LEN + $ext_len + strlen($data['body']);
|
||||||
|
return pack("NCNnNnNCnN", $package_len,
|
||||||
|
$data['cmd'], $data['local_ip'],
|
||||||
|
$data['local_port'], $data['client_ip'],
|
||||||
|
$data['client_port'], $data['connection_id'],
|
||||||
|
$data['flag'], $data['gateway_port'],
|
||||||
|
$ext_len) . $data['ext_data'] . $data['body'];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从二进制数据转换为数组
|
||||||
|
*
|
||||||
|
* @param string $buffer
|
||||||
|
* @return array
|
||||||
|
*/
|
||||||
|
public static function decode($buffer)
|
||||||
|
{
|
||||||
|
$data = unpack("Npack_len/Ccmd/Nlocal_ip/nlocal_port/Nclient_ip/nclient_port/Nconnection_id/Cflag/ngateway_port/Next_len",
|
||||||
|
$buffer);
|
||||||
|
if ($data['ext_len'] > 0) {
|
||||||
|
$data['ext_data'] = substr($buffer, self::HEAD_LEN, $data['ext_len']);
|
||||||
|
if ($data['flag'] & self::FLAG_BODY_IS_SCALAR) {
|
||||||
|
$data['body'] = substr($buffer, self::HEAD_LEN + $data['ext_len']);
|
||||||
|
} else {
|
||||||
|
// 防止反序列化成类实例
|
||||||
|
try {
|
||||||
|
$data['body'] = unserialize(substr($buffer, self::HEAD_LEN + $data['ext_len']), ['allowed_classes' => false]);
|
||||||
|
} catch (\Throwable $e) {}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
$data['ext_data'] = '';
|
||||||
|
if ($data['flag'] & self::FLAG_BODY_IS_SCALAR) {
|
||||||
|
$data['body'] = substr($buffer, self::HEAD_LEN);
|
||||||
|
} else {
|
||||||
|
// 防止反序列化成类实例
|
||||||
|
try {
|
||||||
|
$data['body'] = unserialize(substr($buffer, self::HEAD_LEN), ['allowed_classes' => false]);
|
||||||
|
} catch (\Throwable $e) {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return $data;
|
||||||
|
}
|
||||||
|
}
|
||||||
194
extend/GatewayWorker/Register.php
Normal file
194
extend/GatewayWorker/Register.php
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
<?php
|
||||||
|
/**
|
||||||
|
* This file is part of workerman.
|
||||||
|
*
|
||||||
|
* Licensed under The MIT License
|
||||||
|
* For full copyright and license information, please see the MIT-LICENSE.txt
|
||||||
|
* Redistributions of files must retain the above copyright notice.
|
||||||
|
*
|
||||||
|
* @author walkor<walkor@workerman.net>
|
||||||
|
* @copyright walkor<walkor@workerman.net>
|
||||||
|
* @link http://www.workerman.net/
|
||||||
|
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
||||||
|
*/
|
||||||
|
namespace GatewayWorker;
|
||||||
|
|
||||||
|
use Workerman\Worker;
|
||||||
|
use Workerman\Timer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* 注册中心,用于注册 Gateway 和 BusinessWorker
|
||||||
|
*
|
||||||
|
* @author walkor<walkor@workerman.net>
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class Register extends Worker
|
||||||
|
{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 秘钥
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
public $secretKey = '';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 所有 gateway 的连接
|
||||||
|
*
|
||||||
|
* @var array
|
||||||
|
*/
|
||||||
|
protected $_gatewayConnections = array();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 所有 worker 的连接
|
||||||
|
*
|
||||||
|
* @var array
|
||||||
|
*/
|
||||||
|
protected $_workerConnections = array();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 进程启动时间
|
||||||
|
*
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
protected $_startTime = 0;
|
||||||
|
|
||||||
|
|
||||||
|
public function __construct(string $socketName = '', array $contextOption = [])
|
||||||
|
{
|
||||||
|
$this->name = 'Register';
|
||||||
|
$this->reloadable = false;
|
||||||
|
parent::__construct($socketName, $contextOption);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritdoc}
|
||||||
|
*/
|
||||||
|
public function run(): void
|
||||||
|
{
|
||||||
|
// 设置 onMessage 连接回调
|
||||||
|
$this->onConnect = array($this, 'onConnect');
|
||||||
|
|
||||||
|
// 设置 onMessage 回调
|
||||||
|
$this->onMessage = array($this, 'onMessage');
|
||||||
|
|
||||||
|
// 设置 onClose 回调
|
||||||
|
$this->onClose = array($this, 'onClose');
|
||||||
|
|
||||||
|
// 记录进程启动的时间
|
||||||
|
$this->_startTime = time();
|
||||||
|
|
||||||
|
// 强制使用text协议
|
||||||
|
$this->protocol = '\Workerman\Protocols\Text';
|
||||||
|
|
||||||
|
// reusePort
|
||||||
|
$this->reusePort = false;
|
||||||
|
|
||||||
|
// 运行父方法
|
||||||
|
parent::run();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设置个定时器,将未及时发送验证的连接关闭
|
||||||
|
*
|
||||||
|
* @param \Workerman\Connection\ConnectionInterface $connection
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function onConnect($connection)
|
||||||
|
{
|
||||||
|
$connection->timeout_timerid = Timer::add(10, function () use ($connection) {
|
||||||
|
Worker::log("Register auth timeout (".$connection->getRemoteIp()."). See http://doc2.workerman.net/register-auth-timeout.html");
|
||||||
|
$connection->close();
|
||||||
|
}, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设置消息回调
|
||||||
|
*
|
||||||
|
* @param \Workerman\Connection\ConnectionInterface $connection
|
||||||
|
* @param string $buffer
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function onMessage($connection, $buffer)
|
||||||
|
{
|
||||||
|
// 删除定时器
|
||||||
|
Timer::del($connection->timeout_timerid);
|
||||||
|
$data = @json_decode($buffer, true);
|
||||||
|
if (empty($data['event'])) {
|
||||||
|
$error = "Bad request for Register service. Request info(IP:".$connection->getRemoteIp().", Request Buffer:$buffer). See http://doc2.workerman.net/register-auth-timeout.html";
|
||||||
|
Worker::log($error);
|
||||||
|
return $connection->close($error);
|
||||||
|
}
|
||||||
|
$event = $data['event'];
|
||||||
|
$secret_key = isset($data['secret_key']) ? $data['secret_key'] : '';
|
||||||
|
// 开始验证
|
||||||
|
switch ($event) {
|
||||||
|
// 是 gateway 连接
|
||||||
|
case 'gateway_connect':
|
||||||
|
if (empty($data['address'])) {
|
||||||
|
echo "address not found\n";
|
||||||
|
return $connection->close();
|
||||||
|
}
|
||||||
|
if ($secret_key !== $this->secretKey) {
|
||||||
|
Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));
|
||||||
|
return $connection->close();
|
||||||
|
}
|
||||||
|
$this->_gatewayConnections[$connection->id] = $data['address'];
|
||||||
|
$this->broadcastAddresses();
|
||||||
|
break;
|
||||||
|
// 是 worker 连接
|
||||||
|
case 'worker_connect':
|
||||||
|
if ($secret_key !== $this->secretKey) {
|
||||||
|
Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));
|
||||||
|
return $connection->close();
|
||||||
|
}
|
||||||
|
$this->_workerConnections[$connection->id] = $connection;
|
||||||
|
$this->broadcastAddresses($connection);
|
||||||
|
break;
|
||||||
|
case 'ping':
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
Worker::log("Register unknown event:$event IP: ".$connection->getRemoteIp()." Buffer:$buffer. See http://doc2.workerman.net/register-auth-timeout.html");
|
||||||
|
$connection->close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 连接关闭时
|
||||||
|
*
|
||||||
|
* @param \Workerman\Connection\ConnectionInterface $connection
|
||||||
|
*/
|
||||||
|
public function onClose($connection)
|
||||||
|
{
|
||||||
|
// 删除定时器
|
||||||
|
Timer::del($connection->timeout_timerid);
|
||||||
|
if (isset($this->_gatewayConnections[$connection->id])) {
|
||||||
|
unset($this->_gatewayConnections[$connection->id]);
|
||||||
|
$this->broadcastAddresses();
|
||||||
|
}
|
||||||
|
if (isset($this->_workerConnections[$connection->id])) {
|
||||||
|
unset($this->_workerConnections[$connection->id]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 向 BusinessWorker 广播 gateway 内部通讯地址
|
||||||
|
*
|
||||||
|
* @param \Workerman\Connection\ConnectionInterface $connection
|
||||||
|
*/
|
||||||
|
public function broadcastAddresses($connection = null)
|
||||||
|
{
|
||||||
|
$data = array(
|
||||||
|
'event' => 'broadcast_addresses',
|
||||||
|
'addresses' => array_unique(array_values($this->_gatewayConnections)),
|
||||||
|
);
|
||||||
|
$buffer = json_encode($data);
|
||||||
|
if ($connection) {
|
||||||
|
$connection->send($buffer);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
foreach ($this->_workerConnections as $con) {
|
||||||
|
$con->send($buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user