Skip to content

Commit 101dda4

Browse files
committed
feat: 为channel监听增加signal监听(实验性功能)
1 parent 2f3363c commit 101dda4

2 files changed

Lines changed: 104 additions & 33 deletions

File tree

src/Future.php

Lines changed: 50 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ class Future
1414
public static ?Closure $debugFunc = null;
1515
public static array $debugArgs = [];
1616

17+
/** @var bool 使用信号监听 */
18+
public static bool $useSignal = false;
19+
20+
// todo 因为event等事件循环库是对标准信号的监听,所以不能使用自定实时信号SIGRTMIN ~ SIGRTMAX
21+
// todo 暂时使用SIGPOLL,异步IO监听信号,可能影响异步文件IO相关的触发
22+
/** @var int 监听的信号 */
23+
public static int $signal = \SIGPOLL;
24+
1725
/**
1826
* @var array = [id => func]
1927
*/
@@ -22,10 +30,10 @@ class Future
2230
/**
2331
* @param Closure $func
2432
* @param array $args
25-
* @param float|int|null $interval
33+
* @param float|int $interval
2634
* @return int|false
2735
*/
28-
public static function add(Closure $func, array $args = [], float|int|null $interval = null): int|false
36+
public static function add(Closure $func, array $args = [], float|int $interval = 0): int|false
2937
{
3038
if (self::$debug) {
3139
self::$debugFunc = $func;
@@ -37,14 +45,32 @@ public static function add(Closure $func, array $args = [], float|int|null $inte
3745
throw new Error("Event driver error. ");
3846
}
3947

40-
$interval = Worker::$eventLoopClass === Event::class ? 0 : 0.001;
41-
if ($id = method_exists(Worker::$globalEvent, 'delay')
42-
? Worker::$globalEvent->delay($interval, $func, $args)
43-
: Worker::$globalEvent->add($interval, EventInterface::EV_TIMER, $func, $args)
44-
) {
45-
self::$_futures[$id] = $func;
48+
// 使用信号监听
49+
if (self::$useSignal) {
50+
$id = false;
51+
if (
52+
method_exists(Worker::$globalEvent, 'onSignal')
53+
? Worker::$globalEvent->onSignal(self::$signal, function () use ($func, $args) {
54+
call_user_func($func, $args);
55+
})
56+
: Worker::$globalEvent->add(self::$signal, EventInterface::EV_SIGNAL, $func, $args)
57+
) {
58+
self::$_futures[$id = 0] = $func;
59+
}
60+
}
61+
// 使用定时器轮询
62+
else {
63+
$interval = $interval > 0 ? $interval : (Worker::$eventLoopClass === Event::class ? 0 : 0.001);
64+
if (
65+
$id = method_exists(Worker::$globalEvent, 'delay')
66+
? Worker::$globalEvent->delay($interval, $func, $args)
67+
: Worker::$globalEvent->add($interval, EventInterface::EV_TIMER, $func, $args)
68+
) {
69+
self::$_futures[$id] = $func;
70+
}
4671
}
4772

73+
4874
return $id;
4975
}
5076

@@ -64,21 +90,23 @@ public static function del(int|null $id = null): void
6490
throw new Error("Event driver error. ");
6591
}
6692

67-
if ($id !== null) {
68-
if (method_exists(Worker::$globalEvent, 'offDelay')) {
69-
Worker::$globalEvent->offDelay($id);
70-
} else {
71-
Worker::$globalEvent->del($id, EventInterface::EV_TIMER);
93+
$futures = $id === null ? self::$_futures : [$id => (self::$_futures[$id] ?? null)];
94+
foreach ($futures as $id => $fuc) {
95+
// 使用信号监听
96+
if (self::$useSignal and $id === 0) {
97+
if (method_exists(Worker::$globalEvent, 'offSignal')) {
98+
Worker::$globalEvent->offSignal(self::$signal);
99+
} else {
100+
Worker::$globalEvent->del(self::$signal, EventInterface::EV_SIGNAL);
101+
}
72102
}
73-
unset(self::$_futures[$id]);
74-
return;
75-
}
76-
77-
foreach(self::$_futures as $id => $fuc) {
78-
if (method_exists(Worker::$globalEvent, 'offDelay')) {
79-
Worker::$globalEvent->offDelay($id);
80-
} else {
81-
Worker::$globalEvent->del($id, EventInterface::EV_TIMER);
103+
// 使用定时器轮询
104+
else {
105+
if (method_exists(Worker::$globalEvent, 'offDelay')) {
106+
Worker::$globalEvent->offDelay($id);
107+
} else {
108+
Worker::$globalEvent->del($id, EventInterface::EV_TIMER);
109+
}
82110
}
83111
unset(self::$_futures[$id]);
84112
}

src/Traits/ChannelMethods.php

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,24 @@ trait ChannelMethods
2020
/** @var string 通道前缀 */
2121
protected static string $_CHANNEL = '#Channel#';
2222

23+
/** @var string 通道pid列表 */
24+
protected static string $_CHANNEL_PID_LIST = '#ChannelPidList#';
25+
2326
/**
2427
* @var array = [channelKey => futureId]
2528
*/
2629
protected static array $_listeners = [];
2730

2831
/**
29-
* @var float|int|null
32+
* @var float|int
3033
*/
31-
protected static float|int|null $interval = null;
34+
protected static float|int $interval = 0;
3235

3336
/**
34-
* @param float|int|null $interval
37+
* @param float|int $interval
3538
* @return void
3639
*/
37-
public static function SetChannelListenerInterval(float|int|null $interval): void
40+
public static function SetChannelListenerInterval(float|int $interval): void
3841
{
3942
self::$interval = $interval;
4043
}
@@ -48,6 +51,27 @@ public static function GetChannelKey(string $key): string
4851
return self::$_CHANNEL . $key;
4952
}
5053

54+
/**
55+
* 通道全局开启使用信号监听
56+
*
57+
* @param bool $enable
58+
* @return void
59+
*/
60+
public static function channelUseSignalEnable(bool $enable = true): void
61+
{
62+
Future::$useSignal = $enable;
63+
}
64+
65+
/**
66+
* 通道是否使用信号监听
67+
*
68+
* @return bool
69+
*/
70+
public static function isChannelUseSignal(): bool
71+
{
72+
return Future::$useSignal;
73+
}
74+
5175
/**
5276
* 通道获取
5377
*
@@ -124,6 +148,13 @@ protected static function _ChPublish(string $key, mixed $message, bool $store =
124148
}
125149

126150
self::_Set($channelName, $channel);
151+
// 使用信号监听
152+
if (self::isChannelUseSignal()) {
153+
$list = self::_Get(self::$_CHANNEL_PID_LIST, []);
154+
foreach ($list as $pid) {
155+
@posix_kill($pid, Future::$signal);
156+
}
157+
}
127158
return [
128159
'timestamp' => microtime(true),
129160
'method' => $func,
@@ -156,6 +187,12 @@ protected static function _ChCreateListener(string $key, string|int $workerId, C
156187
self::_Atomic($key, function () use (
157188
$key, $workerId, $func, $params, $listener, &$result
158189
) {
190+
// 信号监听则注册pid
191+
if (self::isChannelUseSignal()) {
192+
$channelPidList = self::_Get(self::$_CHANNEL_PID_LIST, []);
193+
$channelPidList[$pid = posix_getpid()] = $pid;
194+
self::_Set(self::$_CHANNEL_PID_LIST, $channelPidList);
195+
}
159196
/**
160197
* [
161198
* workerId = [
@@ -165,11 +202,7 @@ protected static function _ChCreateListener(string $key, string|int $workerId, C
165202
* ]
166203
*/
167204
$channel = self::_Get($channelName = self::GetChannelKey($key), []);
168-
169-
// 设置回调
170-
$channel[$workerId]['futureId'] =
171-
self::$_listeners[$key] =
172-
$result = Future::add(function () use ($key, $workerId, $listener) {
205+
$callback = function () use ($key, $workerId, $listener) {
173206
// 原子性执行
174207
self::_Atomic($key, function () use ($key, $workerId, $listener) {
175208
$channel = self::_Get($channelName = self::GetChannelKey($key), []);
@@ -182,7 +215,9 @@ protected static function _ChCreateListener(string $key, string|int $workerId, C
182215
}
183216

184217
});
185-
}, interval: self::$interval);
218+
};
219+
// 设置回调
220+
$channel[$workerId]['futureId'] = self::$_listeners[$key] = $result = Future::add($callback, interval: self::$interval);
186221
$channel[$workerId]['value'] = [];
187222
// 如果存在默认数据
188223
if ($default = $channel['--default--']['value'] ?? []) {
@@ -218,7 +253,16 @@ protected static function _ChRemoveListener(string $key, string|int $workerId, b
218253
$key, $workerId, $func, $params, $remove
219254
) {
220255
if ($id = self::$_listeners[$key] ?? null) {
256+
// 移除future
221257
Future::del($id);
258+
// 信号监听则注册pid
259+
if (self::isChannelUseSignal()) {
260+
$channelPidList = self::_Get(self::$_CHANNEL_PID_LIST, []);
261+
if ($channelPidList[$pid = posix_getpid()] ?? null) {
262+
unset($channelPidList[$pid]);
263+
self::_Set(self::$_CHANNEL_PID_LIST, $channelPidList);
264+
}
265+
}
222266
if ($remove) {
223267
/**
224268
* [
@@ -234,7 +278,6 @@ protected static function _ChRemoveListener(string $key, string|int $workerId, b
234278
}
235279
unset(self::$_listeners[$key]);
236280
}
237-
238281
return [
239282
'timestamp' => microtime(true),
240283
'method' => $func,

0 commit comments

Comments
 (0)