Skip to content

Commit 0dab433

Browse files
authored
Add optional keepalive and session timeouts (#23)
1 parent 486be0b commit 0dab433

File tree

5 files changed

+183
-6
lines changed

5 files changed

+183
-6
lines changed

README-CN.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,22 @@ TRIGGER_PASSWORD=password
102102
...
103103
~~~
104104

105+
### 连接超时(建议)
106+
107+
如果你的 MySQL(或其前置代理)会断开长时间空闲的连接(例如 `wait_timeout` / `interactive_timeout` 设置较小),trigger 进程可能会出现类似错误并退出:
108+
109+
`SQLSTATE[HY000] ... 4031 The client was disconnected by the server because of inactivity.`
110+
111+
为提升稳定性,你可以开启 keepalive 心跳查询,及/或为 trigger 的 MySQL 元数据连接设置 session 变量:
112+
113+
~~~env
114+
# 定期 ping MySQL(秒)。设置为 0 表示禁用。
115+
TRIGGER_KEEPALIVE=60
116+
117+
# 在连接建立后应用的 session 变量(逗号分隔 key=value)。
118+
TRIGGER_SESSION_VARIABLES=wait_timeout=7200,interactive_timeout=7200,net_read_timeout=3600,net_write_timeout=3600
119+
~~~
120+
105121
## 启动服务
106122

107123
~~~bash

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,22 @@ TRIGGER_PASSWORD=password
106106
...
107107
~~~
108108

109+
### Connection Timeouts (Recommended)
110+
111+
If your MySQL server (or a proxy in front of it) disconnects idle clients (low `wait_timeout` / `interactive_timeout`), the trigger process may crash with errors like:
112+
113+
`SQLSTATE[HY000] ... 4031 The client was disconnected by the server because of inactivity.`
114+
115+
To improve stability, enable a keepalive ping and/or set session variables for the trigger's MySQL metadata connection:
116+
117+
~~~env
118+
# Ping MySQL periodically (seconds). Set to 0 to disable.
119+
TRIGGER_KEEPALIVE=60
120+
121+
# Session variables applied on connect (comma-separated key=value pairs).
122+
TRIGGER_SESSION_VARIABLES=wait_timeout=7200,interactive_timeout=7200,net_read_timeout=3600,net_write_timeout=3600
123+
~~~
124+
109125
## Usage
110126

111127
Start the trigger service to begin listening for MySQL events:

config/trigger.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@
2525
'tables' => env('TRIGGER_TABLES', '') ? explode(',', env('TRIGGER_TABLES')) : [],
2626

2727
'heartbeat' => (int) env('TRIGGER_HEARTBEAT', 3),
28+
29+
// Periodically ping the MySQL metadata connection to avoid server-side idle disconnects.
30+
// Set to 0 to disable.
31+
'keepalive' => (int) env('TRIGGER_KEEPALIVE', 0),
32+
33+
// MySQL session variables to apply on connect (for the metadata connection).
34+
// Example:
35+
// - wait_timeout=7200,interactive_timeout=7200
36+
'session_variables' => env('TRIGGER_SESSION_VARIABLES', '')
37+
? array_filter(array_map('trim', explode(',', (string) env('TRIGGER_SESSION_VARIABLES'))))
38+
: [],
2839
'subscribers' => [
2940
// Huangdijia\Trigger\Subscribers\Heartbeat::class,
3041
],

src/Console/StartCommand.php

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111

1212
namespace Huangdijia\Trigger\Console;
1313

14+
use Doctrine\DBAL\Exception as DbalException;
1415
use Huangdijia\Trigger\Facades\Trigger;
1516
use Illuminate\Console\Command;
1617
use MySQLReplication\Exception\MySQLReplicationException;
18+
use PDOException;
19+
use Throwable;
1720

1821
class StartCommand extends Command
1922
{
@@ -91,6 +94,18 @@ public function handle(): int
9194
$this->info('Retry now');
9295
sleep(1);
9396

97+
goto start;
98+
} catch (DbalException|PDOException $e) {
99+
$this->error($e->getMessage());
100+
101+
if (! $this->shouldRetry($e)) {
102+
throw $e;
103+
}
104+
105+
// Keep current binlog position so we can resume after reconnect.
106+
$this->info('Retry now');
107+
sleep(1);
108+
94109
goto start;
95110
}
96111

@@ -123,4 +138,32 @@ protected function listenForSignals(): void
123138
pcntl_signal(SIGTERM, $handler);
124139
pcntl_signal(SIGINT, $handler);
125140
}
141+
142+
private function shouldRetry(Throwable $e): bool
143+
{
144+
$pdo = $e instanceof PDOException ? $e : null;
145+
146+
if ($pdo === null && $e->getPrevious() instanceof PDOException) {
147+
$pdo = $e->getPrevious();
148+
}
149+
150+
if ($pdo !== null && isset($pdo->errorInfo[1])) {
151+
$driverCode = (int) $pdo->errorInfo[1];
152+
153+
return in_array($driverCode, [2006, 2013, 2055, 4031], true);
154+
}
155+
156+
$message = strtolower($e->getMessage());
157+
158+
if (str_contains($message, 'access denied') || str_contains($message, 'unknown database')) {
159+
return false;
160+
}
161+
162+
return str_contains($message, 'server has gone away')
163+
|| str_contains($message, 'lost connection')
164+
|| str_contains($message, 'disconnected by the server')
165+
|| str_contains($message, 'connection refused')
166+
|| str_contains($message, 'connection timed out')
167+
|| str_contains($message, 'broken pipe');
168+
}
126169
}

src/Trigger.php

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Huangdijia\Trigger;
1313

1414
use Closure;
15+
use Doctrine\DBAL\Connection;
1516
use Exception;
1617
use Illuminate\Container\Container;
1718
use Illuminate\Contracts\Queue\ShouldQueue;
@@ -31,6 +32,10 @@ class Trigger
3132
{
3233
protected \Illuminate\Contracts\Cache\Repository $cache;
3334

35+
protected ?Connection $dbConnection = null;
36+
37+
protected int $lastKeepalivePingAt = 0;
38+
3439
protected array $events = [];
3540

3641
protected int $bootTime;
@@ -137,12 +142,17 @@ public function loadRoutes(): void
137142
*/
138143
public function start(bool $keepUp = true): void
139144
{
140-
tap(new MySQLReplicationFactory($this->configure($keepUp)), function (MySQLReplicationFactory $binLogStream) {
141-
collect($this->getSubscribers())
142-
->reject(fn ($subscriber) => ! is_subclass_of($subscriber, EventSubscriber::class))
143-
->unique()
144-
->each(fn ($subscriber) => $binLogStream->registerSubscriber(new $subscriber($this)));
145-
})->run();
145+
$binLogStream = new MySQLReplicationFactory($this->configure($keepUp));
146+
147+
$this->dbConnection = $binLogStream->getDbConnection();
148+
$this->applySessionVariables($this->dbConnection);
149+
150+
collect($this->getSubscribers())
151+
->reject(fn ($subscriber) => ! is_subclass_of($subscriber, EventSubscriber::class))
152+
->unique()
153+
->each(fn ($subscriber) => $binLogStream->registerSubscriber(new $subscriber($this)));
154+
155+
$binLogStream->run();
146156
}
147157

148158
/**
@@ -183,6 +193,8 @@ public function isTerminated(): bool
183193
public function heartbeat(EventDTO $event): void
184194
{
185195
$this->rememberCurrent($event->getEventInfo()->binLogCurrent);
196+
197+
$this->keepalive();
186198
}
187199

188200
/**
@@ -344,6 +356,85 @@ public function getTables(): array
344356
return $tables;
345357
}
346358

359+
private function applySessionVariables(?Connection $connection): void
360+
{
361+
if ($connection === null) {
362+
return;
363+
}
364+
365+
$variables = $this->getConfig('session_variables', []);
366+
367+
if (is_string($variables)) {
368+
$variables = array_filter(array_map('trim', explode(',', $variables)));
369+
}
370+
371+
if (! is_array($variables) || $variables === []) {
372+
return;
373+
}
374+
375+
foreach ($variables as $name => $value) {
376+
if (is_int($name)) {
377+
$pair = trim((string) $value);
378+
379+
if ($pair === '' || ! str_contains($pair, '=')) {
380+
continue;
381+
}
382+
383+
[$name, $value] = array_map('trim', explode('=', $pair, 2));
384+
}
385+
386+
if (! is_string($name) || $name === '' || ! preg_match('/^[A-Za-z0-9_]+$/', $name)) {
387+
continue;
388+
}
389+
390+
if (is_array($value) || is_object($value)) {
391+
continue;
392+
}
393+
394+
if (is_string($value) && preg_match('/^-?\d+$/', $value)) {
395+
$value = (int) $value;
396+
}
397+
398+
try {
399+
$connection->executeStatement("SET SESSION {$name} = ?", [$value]);
400+
} catch (Throwable) {
401+
// Ignore session variable failures (permission/unsupported variables).
402+
}
403+
}
404+
}
405+
406+
private function keepalive(): void
407+
{
408+
$period = (int) $this->getConfig('keepalive', 0);
409+
410+
if ($period <= 0 || $this->dbConnection === null) {
411+
return;
412+
}
413+
414+
$now = time();
415+
416+
if ($this->lastKeepalivePingAt > 0 && ($now - $this->lastKeepalivePingAt) < $period) {
417+
return;
418+
}
419+
420+
$this->lastKeepalivePingAt = $now;
421+
422+
try {
423+
$this->dbConnection->executeQuery($this->dbConnection->getDatabasePlatform()->getDummySelectSQL());
424+
} catch (Throwable) {
425+
try {
426+
$this->dbConnection->close();
427+
428+
// DBAL will reconnect automatically on the next query.
429+
$this->dbConnection->executeQuery($this->dbConnection->getDatabasePlatform()->getDummySelectSQL());
430+
431+
$this->applySessionVariables($this->dbConnection);
432+
} catch (Throwable) {
433+
// If reconnect fails, the next metadata query will throw and the daemon can restart.
434+
}
435+
}
436+
}
437+
347438
/**
348439
* Parse action.
349440
*

0 commit comments

Comments
 (0)