Skip to content

Commit c60f13b

Browse files
committed
fix(instance): 优化实例心跳机制及IP获取逻辑
- 修复 Client 魔术方法对属性名的类型校验,提升异常准确性 - 新增 get_local_ip 函数,优先通过本地DNS解析获取非回环IP,失败则调用公网服务 - InstanceRegistrarProcess 支持获取本机IP作为实例IP,兼容容器和云环境 - 修改实例心跳逻辑,仅对临时实例发送心跳,默认 ephemeral 为 true - 增加实例心跳失败计数,连续失败超过配置次数后重启进程,避免雪崩效应 - 兼容 Guzzle 请求参数映射,支持 FORM_PARAMS、JSON、BODY 等多种格式 - 多处 provider 请求改用异步事件循环版本,提升异步处理能力 - 配置新增实例心跳失败最大次数参数,默认值为3 - 更新配置监听器中 config_path 字段名称,统一配置文件路径标识
1 parent f3736ec commit c60f13b

12 files changed

Lines changed: 164 additions & 60 deletions

src/Client.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public function getName(): ?string
129129
*/
130130
public function __get($name)
131131
{
132-
if (!isset($name) || !isset($this->alias[$name])) {
132+
if (!is_string($name) || $name === '' || !isset($this->alias[$name])) {
133133
throw new NacosException("{$name} is invalid.");
134134
}
135135

src/Process/AbstractProcess.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ protected function _stop(int|float $sleep = 0): void
4646
if ($sleep > 0) {
4747
Timer::add($sleep, function () {
4848
Worker::stopAll();
49-
});
49+
}, [], false);
5050

5151
return;
5252
}

src/Process/ConfigListenerProcess.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ public function onWorkerStart(Worker $worker)
8282
Timer::add(0.0, (float) $this->longPullingInterval, function () {
8383
$promises = [];
8484
foreach ($this->configListeners as $listener) {
85-
list($dataId, $group, $tenant, $configPath) = $listener;
85+
$dataId = $listener['data_id'];
86+
$group = $listener['group_name'];
87+
$tenant = $listener['namespace_id'];
88+
$configPath = $listener['config_path'] ?? $listener['file_path'] ?? null;
8689
if (file_exists($configPath)) {
8790
$promises[] = $this->client->config->listenerAsync(
8891
$dataId,

src/Process/InstanceRegistrarProcess.php

Lines changed: 108 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use GuzzleHttp\Exception\GuzzleException;
88
use GuzzleHttp\Promise\Utils;
99
use Psr\Http\Message\ResponseInterface;
10+
use function Workbunny\WebmanNacos\get_local_ip;
1011
use Workerman\Timer;
1112
use Workerman\Worker;
1213

@@ -26,15 +27,28 @@ class InstanceRegistrarProcess extends AbstractProcess
2627
*/
2728
protected array $heartbeatTimers = [];
2829

30+
/**
31+
* 每个实例的连续心跳失败计数
32+
* @var array<string,int>
33+
*/
34+
protected array $heartbeatFailCount = [];
35+
2936
/**
3037
* @var float
3138
*/
3239
protected float $heartbeat;
3340

41+
/**
42+
* 允许的最大连续心跳失败次数,超过才重启进程
43+
* @var int
44+
*/
45+
protected int $heartbeatFailMax;
46+
3447
public function __construct()
3548
{
3649
parent::__construct();
3750
$this->heartbeat = (float) config('plugin.workbunny.webman-nacos.app.instance_heartbeat', 5.0);
51+
$this->heartbeatFailMax = (int) config('plugin.workbunny.webman-nacos.app.instance_heartbeat_fail_max', 3);
3852
}
3953

4054
/**
@@ -44,50 +58,97 @@ public function __construct()
4458
*/
4559
protected function _heartbeat(string $name): void
4660
{
47-
if (isset($this->instanceRegistrars[$name])) {
48-
list($serviceName, $ip, $port, $option) = $this->instanceRegistrars[$name];
49-
if (isset($option['ephemeral'])) {
50-
$option['ephemeral'] = (is_string($option['ephemeral']) ? filter_var($option['ephemeral'], FILTER_VALIDATE_BOOLEAN, FILTER_NULL_ON_FAILURE) : (bool) $option['ephemeral']);
61+
if (!isset($this->instanceRegistrars[$name])) {
62+
return;
63+
}
64+
$instanceRegistrar = $this->instanceRegistrars[$name];
65+
$serviceName = $instanceRegistrar['service_name'];
66+
$ip = $instanceRegistrar['pod_ip'] ?: get_local_ip();
67+
$port = $instanceRegistrar['pod_port'];
68+
$option = $instanceRegistrar['options'] ?? [];
69+
70+
// 关键修复:Nacos OpenAPI 的 ephemeral 默认值为 true(临时实例)
71+
// 当用户未显式指定时必须按 true 处理,否则会导致临时实例不发心跳而被 Nacos 摘除
72+
if (array_key_exists('ephemeral', $option)) {
73+
$ephemeral = is_string($option['ephemeral'])
74+
? filter_var($option['ephemeral'], FILTER_VALIDATE_BOOLEAN)
75+
: (bool) $option['ephemeral'];
76+
} else {
77+
$ephemeral = true;
78+
}
79+
$option['ephemeral'] = $ephemeral;
80+
81+
// 仅对临时实例进行心跳(永久实例由服务端健康检查维持)
82+
if (!$ephemeral) {
83+
return;
84+
}
85+
86+
$this->heartbeatFailCount[$name] = 0;
87+
$this->heartbeatTimers[$name] = Timer::add($this->heartbeat, function () use ($name, $serviceName, $ip, $port, $option) {
88+
if ($this->is_stopping) {
89+
return;
5190
}
52-
// 仅对非永久实例进行心跳
53-
if ($option['ephemeral'] ?? false) {
54-
$this->heartbeatTimers[$name] = Timer::add($this->heartbeat, function () use ($name, $serviceName, $ip, $port, $option) {
55-
if ($this->is_stopping) {
56-
return;
57-
}
58-
try {
59-
if (!$this->client->instance->beat(
60-
$serviceName,
61-
array_filter([
62-
'ip' => $ip,
63-
'port' => $port,
64-
'serviceName' => $serviceName,
65-
] + $option, fn ($value) => $value !== null),
66-
$option['groupName'] ?? null,
67-
$option['namespaceId'] ?? null,
68-
$option['ephemeral'] ?? null,
69-
false,
70-
$this->heartbeat
71-
)) {
72-
$this->logger()->error(
73-
"Nacos instance heartbeat failed: [0] {$this->client->instance->getMessage()}.",
74-
['name' => $name, 'trace' => []]
75-
);
76-
$this->_stop($this->retry_interval);
91+
// beat JSON 只保留 Nacos 识别的标准字段,避免非标字段污染
92+
$beatData = array_filter([
93+
'ip' => $ip,
94+
'port' => $port,
95+
'serviceName' => $serviceName,
96+
'cluster' => $option['clusterName'] ?? ($option['cluster'] ?? null),
97+
'weight' => $option['weight'] ?? null,
98+
'metadata' => $option['metadata'] ?? null,
99+
'scheduled' => $option['scheduled'] ?? null,
100+
], fn ($value) => $value !== null);
77101

78-
return;
79-
}
80-
} catch (GuzzleException $exception) {
81-
$this->logger()->error(
82-
"Nacos instance heartbeat failed: [{$exception->getCode()}] {$exception->getMessage()}.",
83-
['name' => $name, 'trace' => $exception->getTrace()]
84-
);
85-
$this->_stop($this->retry_interval);
86-
87-
return;
88-
}
89-
});
102+
try {
103+
$result = $this->client->instance->beat(
104+
$serviceName,
105+
$beatData,
106+
$option['groupName'] ?? null,
107+
$option['namespaceId'] ?? null,
108+
$option['ephemeral'] ?? null,
109+
false,
110+
$this->heartbeat
111+
);
112+
if ($result === false) {
113+
$this->_onHeartbeatFail(
114+
$name,
115+
"Nacos instance heartbeat failed: [0] {$this->client->instance->getMessage()}.",
116+
[]
117+
);
118+
119+
return;
120+
}
121+
// 心跳成功,重置失败计数
122+
$this->heartbeatFailCount[$name] = 0;
123+
} catch (\Throwable $exception) {
124+
$this->_onHeartbeatFail(
125+
$name,
126+
"Nacos instance heartbeat failed: [{$exception->getCode()}] {$exception->getMessage()}.",
127+
$exception->getTrace()
128+
);
90129
}
130+
});
131+
}
132+
133+
/**
134+
* 心跳失败处理:累计 N 次失败才重启进程,避免偶发抖动导致雪崩
135+
* @param string $name
136+
* @param string $message
137+
* @param array $trace
138+
* @return void
139+
*/
140+
protected function _onHeartbeatFail(string $name, string $message, array $trace = []): void
141+
{
142+
$this->heartbeatFailCount[$name] = ($this->heartbeatFailCount[$name] ?? 0) + 1;
143+
$count = $this->heartbeatFailCount[$name];
144+
$this->logger()->error($message, [
145+
'name' => $name,
146+
'fail_count'=> $count,
147+
'max' => $this->heartbeatFailMax,
148+
'trace' => $trace,
149+
]);
150+
if ($count >= $this->heartbeatFailMax) {
151+
$this->_stop($this->retry_interval);
91152
}
92153
}
93154

@@ -103,7 +164,7 @@ public function onWorkerStart(Worker $worker)
103164
foreach ($instanceRegistrars as $name => $instanceRegistrar) {
104165
// 拆解配置
105166
$serviceName = $instanceRegistrar['service_name'];
106-
$ip = $instanceRegistrar['pod_ip'];
167+
$ip = $instanceRegistrar['pod_ip'] ?: get_local_ip();
107168
$port = $instanceRegistrar['pod_port'];
108169
$option = $instanceRegistrar['options'] ?? [];
109170
// 注册
@@ -153,18 +214,19 @@ public function onWorkerStop(Worker $worker)
153214
}
154215
// 拆解配置
155216
$serviceName = $instanceRegistrar['service_name'];
156-
$ip = $instanceRegistrar['pod_ip'];
217+
$ip = $instanceRegistrar['pod_ip'] ?: get_local_ip();
157218
$port = $instanceRegistrar['pod_port'];
158219
$option = $instanceRegistrar['options'] ?? [];
159-
// 注销实例
220+
// 注销实例(groupName 未配置时使用 Nacos 默认的 DEFAULT_GROUP,避免 null 触发 TypeError)
160221
if (!$this->client->instance->delete(
161222
$serviceName,
162-
$option['groupName'] ?? null,
223+
(string) ($option['groupName'] ?? 'DEFAULT_GROUP'),
163224
$ip,
164225
$port,
165226
[
166227
'namespaceId' => $option['namespaceId'] ?? null,
167228
'ephemeral' => $option['ephemeral'] ?? null,
229+
'clusterName' => $option['clusterName'] ?? null,
168230
]
169231
)) {
170232
$this->logger()->error(
@@ -173,7 +235,7 @@ public function onWorkerStop(Worker $worker)
173235
);
174236
}
175237
}
176-
} catch (GuzzleException $exception) {
238+
} catch (\Throwable $exception) {
177239
$this->logger()->error(
178240
"Nacos instance delete failed: [{$exception->getCode()}] {$exception->getMessage()}.",
179241
['name' => '#base', 'trace' => $exception->getTrace()]

src/Provider/AbstractProvider.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,13 +217,25 @@ public function requestAsyncUseEventLoop(string $method, string $uri, array $opt
217217
$headers = array_merge($options[RequestOptions::HEADERS] ?? [], [
218218
'Connection' => 'keep-alive',
219219
]);
220+
// 兼容 Guzzle 的 FORM_PARAMS / JSON / BODY,映射到 workerman/http-client 的 data
221+
$data = [];
222+
if (isset($options[RequestOptions::FORM_PARAMS])) {
223+
$data = $options[RequestOptions::FORM_PARAMS];
224+
} elseif (isset($options[RequestOptions::JSON])) {
225+
$data = json_encode($options[RequestOptions::JSON]);
226+
$headers['Content-Type'] = 'application/json';
227+
} elseif (isset($options[RequestOptions::BODY])) {
228+
$data = $options[RequestOptions::BODY];
229+
} elseif (isset($options['data'])) {
230+
$data = $options['data'];
231+
}
220232
$this->httpClientAsync()->request(
221233
sprintf('http://%s:%d/%s?%s', $this->host, $this->port, $uri, $queryString),
222234
[
223235
'method' => $method,
224236
'version' => '1.1',
225237
'headers' => $headers,
226-
'data' => $options['data'] ?? [],
238+
'data' => $data,
227239
'success' => $options['success'] ?? function (Response $response) {
228240
},
229241
'error' => $options['error'] ?? function (\Exception $exception) {

src/Provider/ConfigProvider.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ public function listenerAsyncUseEventLoop(array $options, ?callable $success = n
329329
($options['group'] ?? null) . self::WORD_SEPARATOR .
330330
($options['contentMD5'] ?? null) . self::WORD_SEPARATOR .
331331
($options['tenant'] ?? null) . self::LINE_SEPARATOR;
332+
$timeout = $options['timeout'] ?? null;
332333

333334
return $this->requestAsyncUseEventLoop(self::LISTENER_METHOD, self::LISTENER_URL, [
334335
RequestOptions::QUERY => [

src/Provider/InstanceProvider.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ public function detail(string $ip, int $port, string $serviceName, array $option
420420
*/
421421
public function detailAsync(string $ip, int $port, string $serviceName, array $optional = [])
422422
{
423-
return $this->request(self::DETAIL_METHOD, self::DETAIL_URL, [
423+
return $this->requestAsync(self::DETAIL_METHOD, self::DETAIL_URL, [
424424
RequestOptions::QUERY => $this->filter(array_merge($optional, [
425425
'ip' => $ip,
426426
'port' => $port,
@@ -665,7 +665,7 @@ public function beatAsyncUseEventLoop(array $options, bool $lightBeatEnabled = f
665665
['timeout', 'is_float', false],
666666
]);
667667

668-
return $this->requestAsync(self::BEAT_METHOD, self::BEAT_URL, [
668+
return $this->requestAsyncUseEventLoop(self::BEAT_METHOD, self::BEAT_URL, [
669669
RequestOptions::QUERY => $this->filter([
670670
'serviceName' => $options['serviceName'] ?? null,
671671
'ip' => $options['beat']['ip'] ?? null,

src/Provider/OperatorProvider.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public function getLeaderAsync()
237237
*/
238238
public function getLeaderAsyncUseEventLoop(?callable $success = null, ?callable $error = null): bool
239239
{
240-
return $this->requestAsync(self::GET_LEADER_METHOD, self::GET_LEADER_URL, [
240+
return $this->requestAsyncUseEventLoop(self::GET_LEADER_METHOD, self::GET_LEADER_URL, [
241241
OPTIONS_SUCCESS => $success,
242242
OPTIONS_ERROR => $error,
243243
]);

src/Provider/ServiceProvider.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public function deleteAsyncUseEventLoop(array $options, ?callable $success = nul
157157
['namespaceId', 'is_string', false],
158158
]);
159159

160-
return $this->requestAsync(self::DELETE_METHOD, self::DELETE_URL, [
160+
return $this->requestAsyncUseEventLoop(self::DELETE_METHOD, self::DELETE_URL, [
161161
RequestOptions::QUERY => $this->filter([
162162
'serviceName' => $options['serviceName'] ?? null,
163163
'groupName' => $options['groupName'] ?? null,
@@ -226,7 +226,7 @@ public function updateAsync(string $serviceName, array $optional = [])
226226
*/
227227
public function updateAsyncUseEventLoop(string $serviceName, array $optional = [], ?callable $success = null, ?callable $error = null): bool
228228
{
229-
return $this->requestAsync(self::UPDATE_METHOD, self::UPDATE_URL, [
229+
return $this->requestAsyncUseEventLoop(self::UPDATE_METHOD, self::UPDATE_URL, [
230230
RequestOptions::QUERY => $this->filter(array_merge($optional, [
231231
'serviceName' => $serviceName,
232232
])),
@@ -290,7 +290,7 @@ public function getAsyncUseEventLoop(array $options, ?callable $success = null,
290290
['namespaceId', 'is_string', false],
291291
]);
292292

293-
return $this->requestAsync(self::GET_METHOD, self::GET_URL, [
293+
return $this->requestAsyncUseEventLoop(self::GET_METHOD, self::GET_URL, [
294294
RequestOptions::QUERY => $this->filter([
295295
'serviceName' => $options['serviceName'] ?? null,
296296
'groupName' => $options['groupName'] ?? null,
@@ -362,7 +362,7 @@ public function listAsyncUseEventLoop(array $options, ?callable $success = null,
362362
['namespaceId', 'is_string', false],
363363
]);
364364

365-
return $this->requestAsync(self::LIST_METHOD, self::LIST_URL, [
365+
return $this->requestAsyncUseEventLoop(self::LIST_METHOD, self::LIST_URL, [
366366
RequestOptions::QUERY => $this->filter([
367367
'pageNo' => $options['pageNo'] ?? null,
368368
'pageSize' => $options['pageSize'] ?? null,

src/config/plugin/workbunny/webman-nacos/app.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
'long_pulling_interval' => 30,
1111
/* float 实例心跳间隔 秒 */
1212
'instance_heartbeat' => 5.0,
13+
/* int 实例心跳连续失败达该次数后才重启进程,避免偶发抖动导致服务摘除 */
14+
'instance_heartbeat_fail_max' => 3,
1315
/* int 进程重试间隔 秒 */
1416
'process_retry_interval' => 5,
1517
/* 日志 channel,为 null 时使用默认通道 */

0 commit comments

Comments
 (0)