Skip to content

Commit 5f2414c

Browse files
committed
feat: 增加分布式服务注册支持
1 parent 6a5a8bf commit 5f2414c

5 files changed

Lines changed: 62 additions & 13 deletions

File tree

README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@
4646
┌─────────────┐ 2 | 3
4747
┌───> | Push-server | ─── ─ · ─
4848
| └─────────────┘ 1 | 4 ··· n
49-
| Hash |
49+
| Hash | register
5050
| | PUB | SUB
5151
┌────────────────────┐ ──┘ ┌──────────────┐ <────┘
5252
| webman-push-server | ──────> | Redis-server |
5353
└────────────────────┘ ──┐ └──────────────┘ <────┐
5454
| | PUB | SUB
55-
| Hash |
55+
| Hash | register
5656
| ┌────────────┐ 2 | 3
5757
└────> | API-server | ─── ─ · ─
5858
└────────────┘ 1 | 4 ··· n
@@ -77,6 +77,7 @@
7777
|-- process.php # 启动进程
7878
|-- redis.php # redis配置
7979
|-- route.php # APIs路由信息
80+
|-- registrar.php # 分布式服务注册器配置
8081
```
8182

8283
### 频道说明
@@ -150,8 +151,6 @@ composer require workbunny/webman-push-server
150151
| POST | /apps/[app_id]/users/[user_id]/terminate_connections | [对应的pusher文档地址](https://pusher.com/docs/channels/library_auth_reference/rest-api/#post-terminate-user-connections) |
151152
| GET | /apps/[app_id]/channels/[channel_name]/users | [对应的pusher文档地址](https://pusher.com/docs/channels/library_auth_reference/rest-api/#get-users) |
152153

153-
154-
155154
## 客户端
156155

157156
### javascript客户端

src/Registrars/RedisRegistrar.php

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class RedisRegistrar implements RegistrarInterface
1414
/** @inheritDoc */
1515
public function get(string $name, string $ip, int $port): ?array
1616
{
17-
$client = Redis::connection('plugin.workbunny.webman-push-server.server-register')->client();
17+
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
1818
try {
1919
$result = $client->hGetAll($this->_registrarKey($name, $ip, $port));
2020
foreach ($result as &$value) {
@@ -36,7 +36,7 @@ public function get(string $name, string $ip, int $port): ?array
3636
/** @inheritDoc */
3737
public function query(?string $name): ?array
3838
{
39-
$client = Redis::connection('plugin.workbunny.webman-push-server.server-register')->client();
39+
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
4040
$hash = [];
4141
try {
4242
while(
@@ -68,8 +68,12 @@ public function register(string $name, string $ip, int $port, string|null $worke
6868
{
6969
$workerId = $workerId ?: '';
7070
try {
71-
$client = Redis::connection('plugin.workbunny.webman-push-server.server-register')->client();
72-
$res = $client->hSet($key = $this->_registrarKey($name, $ip, $port), $workerId, json_encode($metadata, JSON_UNESCAPED_UNICODE));
71+
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
72+
$res = $client->hSet(
73+
$key = $this->_registrarKey($name, $ip, $port),
74+
$workerId,
75+
$metadata ? json_encode($metadata, JSON_UNESCAPED_UNICODE) : '{}'
76+
);
7377
// 如果存在定时间隔,则存在定时上报,则开启键值过期
7478
if ($interval = config('plugin.workbunny.webman-push-server.registrar.interval')) {
7579
$client->expire($key, $interval * 1.5);
@@ -92,8 +96,12 @@ public function report(string $name, string $ip, int $port, string|null $workerI
9296
{
9397
$workerId = $workerId ?: '';
9498
try {
95-
$client = Redis::connection('plugin.workbunny.webman-push-server.server-register')->client();
96-
$res = $client->hSet($key = $this->_registrarKey($name, $ip, $port), $workerId, json_encode($metadata, JSON_UNESCAPED_UNICODE));
99+
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
100+
$res = $client->hSet(
101+
$key = $this->_registrarKey($name, $ip, $port),
102+
$workerId,
103+
$metadata ? json_encode($metadata, JSON_UNESCAPED_UNICODE) : '{}'
104+
);
97105
// 如果存在定时间隔,则存在定时上报,则开启键值过期
98106
if ($interval = config('plugin.workbunny.webman-push-server.registrar.interval')) {
99107
$client->expire($key, $interval * 1.5);
@@ -116,7 +124,7 @@ public function unregister(string $name, string $ip, int $port, string|null $wor
116124
{
117125
$workerId = $workerId ?: '';
118126
try {
119-
$client = Redis::connection('plugin.workbunny.webman-push-server.server-register')->client();
127+
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
120128
return boolval(
121129
$client->hDel($this->_registrarKey($name, $ip, $port), $workerId)
122130
);

src/Traits/RegistrarMethods.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public function registrarStart(Worker $worker): void
4343
$port = $this->registrarGetHostPort()
4444
) {
4545
// 注册
46-
$registrar->register($serverName, $ip, $port, $id = (string)$worker->id);
46+
$registrar->register($serverName, $ip, $port, $id = ($worker->id === 0 ? 'master' : strval($worker->id)));
4747
// 定时上报
4848
if ($interval = config('plugin.workbunny.webman-push-server.registrar.interval')) {
4949
$this->registrarTimerId = Timer::add($interval, function () use ($registrar, $serverName, $ip, $port, $id) {
@@ -70,7 +70,7 @@ public function registrarStop(Worker $worker): void
7070
$ip = $this->registrarGetHostIp() and
7171
$port = $this->registrarGetHostPort()
7272
) {
73-
$registrar->unregister($serverName, $ip, $port, (string)$worker->id);
73+
$registrar->unregister($serverName, $ip, $port, ($worker->id === 0 ? 'master' : strval($worker->id)));
7474
}
7575
}
7676

tests/ApiServerBaseTest.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313

1414
namespace Tests;
1515

16+
use support\Redis;
1617
use Tests\MockClass\MockTcpConnection;
1718
use Webman\Http\Request;
1819
use Webman\Http\Response;
1920
use Workbunny\WebmanPushServer\ApiRoute;
21+
use Workerman\Worker;
2022

2123
/**
2224
* @runTestsInSeparateProcesses
@@ -131,4 +133,23 @@ public function testApiServerOnMessageWithArrayData()
131133
$this->assertEquals(400, $mockConnection->getSendBuffer()->getStatusCode());
132134
$this->assertEquals('application/json', $mockConnection->getSendBuffer()->getHeader('Content-Type'));
133135
}
136+
137+
public function testApiServerRegistrar()
138+
{
139+
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
140+
141+
$this->assertEquals([], $client->keys("registrar:*"));
142+
143+
$this->getPushServer()->registrarStart($worker = new Worker());
144+
145+
$this->assertNotEquals([], $keys = $client->keys("registrar:*"));
146+
$this->assertEquals([
147+
'master' => '{}'
148+
], $client->hGetAll($key = $keys[0]));
149+
150+
$this->getPushServer()->registrarStop($worker);
151+
152+
$this->assertEquals(false, $client->exists($key));
153+
$this->assertEquals([], $client->keys("registrar:*"));
154+
}
134155
}

tests/PushServerBaseTest.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313

1414
namespace Tests;
1515

16+
use support\Redis;
1617
use Tests\MockClass\MockTcpConnection;
1718
use Workbunny\WebmanPushServer\Events\Ping;
1819
use Workbunny\WebmanPushServer\Events\Subscribe;
1920
use Workbunny\WebmanPushServer\PublishTypes\AbstractPublishType;
2021
use Workbunny\WebmanPushServer\PushServer;
22+
use Workerman\Worker;
2123
use const Workbunny\WebmanPushServer\EVENT_CONNECTION_ESTABLISHED;
2224
use const Workbunny\WebmanPushServer\EVENT_ERROR;
2325
use const Workbunny\WebmanPushServer\EVENT_PONG;
@@ -485,4 +487,23 @@ public function testPushServerSubscribeResponse()
485487
$wsConnection->setSendBuffer(null);
486488
}
487489

490+
public function testPushServerRegistrar()
491+
{
492+
$client = Redis::connection('plugin.workbunny.webman-push-server.server-registrar')->client();
493+
494+
$this->assertEquals([], $client->keys("registrar:*"));
495+
496+
$this->getPushServer()->registrarStart($worker = new Worker());
497+
498+
$this->assertNotEquals([], $keys = $client->keys("registrar:*"));
499+
$this->assertEquals([
500+
'master' => '{}'
501+
], $client->hGetAll($key = $keys[0]));
502+
503+
$this->getPushServer()->registrarStop($worker);
504+
505+
$this->assertEquals(false, $client->exists($key));
506+
$this->assertEquals([], $client->keys("registrar:*"));
507+
}
508+
488509
}

0 commit comments

Comments
 (0)