Skip to content

Commit 819ad65

Browse files
committed
feat: add hasIncomingMessage and ensureCoroutine
1 parent 629e536 commit 819ad65

File tree

3 files changed

+198
-124
lines changed

3 files changed

+198
-124
lines changed

src/WebSocket/Client.php

+142-81
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
use Swoole\Coroutine\Http\Client as SwooleClient;
66
use Swoole\WebSocket\Frame;
7+
use Swoole\Coroutine;
8+
9+
use function Swoole\Coroutine\run as Co;
710

811
class Client
912
{
@@ -50,62 +53,95 @@ public function __construct(string $url, array $options = [])
5053
$this->timeout = $options['timeout'] ?? 30;
5154
}
5255

53-
public function connect(): void
56+
/**
57+
* Ensures code runs in a coroutine context
58+
* @param callable $callback Function to run in coroutine
59+
* @return mixed Result of the callback
60+
* @throws \Throwable If the callback throws an exception
61+
*/
62+
private function ensureCoroutine(callable $callback): mixed
5463
{
55-
$this->client = new SwooleClient($this->host, $this->port, $this->port === 443);
56-
$this->client->set([
57-
'timeout' => $this->timeout,
58-
'websocket_compression' => true,
59-
'max_frame_size' => 32 * 1024 * 1024, // 32MB max frame size
60-
]);
61-
62-
if (!empty($this->headers)) {
63-
$this->client->setHeaders($this->headers);
64-
}
64+
if (Coroutine::getCid() === -1) {
65+
$result = null;
66+
$exception = null;
67+
68+
Co(function () use ($callback, &$result, &$exception) {
69+
try {
70+
$result = $callback();
71+
} catch (\Throwable $e) {
72+
$exception = $e;
73+
}
74+
});
6575

66-
$success = $this->client->upgrade($this->path);
76+
if ($exception !== null) {
77+
throw $exception;
78+
}
6779

68-
if (!$success) {
69-
$error = new \RuntimeException(
70-
"WebSocket connection failed: {$this->client->errCode} - {$this->client->errMsg}"
71-
);
72-
$this->emit('error', $error);
73-
throw $error;
80+
return $result;
7481
}
82+
return $callback();
83+
}
7584

76-
$this->connected = true;
77-
$this->emit('open');
85+
public function connect(): void
86+
{
87+
$this->ensureCoroutine(function () {
88+
$this->client = new SwooleClient($this->host, $this->port, $this->port === 443);
89+
$this->client->set([
90+
'timeout' => $this->timeout,
91+
'websocket_compression' => true,
92+
'max_frame_size' => 32 * 1024 * 1024, // 32MB max frame size
93+
]);
94+
95+
if (!empty($this->headers)) {
96+
$this->client->setHeaders($this->headers);
97+
}
98+
99+
$success = $this->client->upgrade($this->path);
100+
101+
if (!$success) {
102+
$error = new \RuntimeException(
103+
"WebSocket connection failed: {$this->client->errCode} - {$this->client->errMsg}"
104+
);
105+
$this->emit('error', $error);
106+
throw $error;
107+
}
108+
109+
$this->connected = true;
110+
$this->emit('open');
111+
});
78112
}
79113

80114
public function listen(): void
81115
{
82-
while ($this->connected) {
83-
try {
84-
$frame = $this->client->recv($this->timeout);
85-
86-
if ($frame === false) {
87-
if ($this->client->errCode === SWOOLE_ERROR_CLIENT_NO_CONNECTION) {
88-
$this->handleClose();
89-
break;
116+
$this->ensureCoroutine(function () {
117+
while ($this->connected) {
118+
try {
119+
$frame = $this->client->recv($this->timeout);
120+
121+
if ($frame === false) {
122+
if ($this->client->errCode === SWOOLE_ERROR_CLIENT_NO_CONNECTION) {
123+
$this->handleClose();
124+
break;
125+
}
126+
throw new \RuntimeException(
127+
"Failed to receive data: {$this->client->errCode} - {$this->client->errMsg}"
128+
);
90129
}
91-
throw new \RuntimeException(
92-
"Failed to receive data: {$this->client->errCode} - {$this->client->errMsg}"
93-
);
94-
}
95130

96-
if ($frame === "") {
97-
continue;
98-
}
131+
if ($frame === "") {
132+
continue;
133+
}
99134

100-
if ($frame instanceof Frame) {
101-
$this->handleFrame($frame);
135+
if ($frame instanceof Frame) {
136+
$this->handleFrame($frame);
137+
}
138+
} catch (\Throwable $e) {
139+
$this->emit('error', $e);
140+
$this->handleClose();
141+
break;
102142
}
103-
} catch (\Throwable $e) {
104-
$this->emit('error', $e);
105-
$this->handleClose();
106-
break;
107143
}
108-
}
144+
});
109145
}
110146

111147
private function handleFrame(Frame $frame): void
@@ -138,18 +174,25 @@ private function handleClose(): void
138174

139175
public function send(string $data): void
140176
{
141-
if (!$this->connected) {
142-
throw new \RuntimeException('Not connected to WebSocket server');
143-
}
177+
try {
178+
$this->ensureCoroutine(function () use ($data) {
179+
if (!$this->connected) {
180+
throw new \RuntimeException('Not connected to WebSocket server');
181+
}
144182

145-
$success = $this->client->push($data);
183+
$success = $this->client->push($data);
146184

147-
if ($success === false) {
148-
$error = new \RuntimeException(
149-
"Failed to send data: {$this->client->errCode} - {$this->client->errMsg}"
150-
);
151-
$this->emit('error', $error);
152-
throw $error;
185+
if ($success === false) {
186+
$error = new \RuntimeException(
187+
"Failed to send data: {$this->client->errCode} - {$this->client->errMsg}"
188+
);
189+
$this->emit('error', $error);
190+
throw $error;
191+
}
192+
});
193+
} catch (\Throwable $e) {
194+
$this->emit('error', $e);
195+
throw $e;
153196
}
154197
}
155198

@@ -223,43 +266,61 @@ private function emit(string $event, mixed $data = null): void
223266

224267
public function receive(): ?string
225268
{
226-
if (!$this->connected) {
227-
throw new \RuntimeException('Not connected to WebSocket server');
228-
}
269+
/** @var string|null */
270+
return $this->ensureCoroutine(function (): ?string {
271+
if (!$this->connected) {
272+
throw new \RuntimeException('Not connected to WebSocket server');
273+
}
229274

230-
$frame = $this->client->recv($this->timeout);
275+
$frame = $this->client->recv($this->timeout);
231276

232-
if ($frame === false) {
233-
if ($this->client->errCode === SWOOLE_ERROR_CLIENT_NO_CONNECTION) {
234-
$this->handleClose();
277+
if ($frame === false) {
278+
if ($this->client->errCode === SWOOLE_ERROR_CLIENT_NO_CONNECTION) {
279+
$this->handleClose();
280+
return null;
281+
}
282+
throw new \RuntimeException(
283+
"Failed to receive data: {$this->client->errCode} - {$this->client->errMsg}"
284+
);
285+
}
286+
287+
if ($frame === "") {
235288
return null;
236289
}
237-
throw new \RuntimeException(
238-
"Failed to receive data: {$this->client->errCode} - {$this->client->errMsg}"
239-
);
240-
}
241290

242-
if ($frame === "") {
291+
if ($frame instanceof Frame) {
292+
switch ($frame->opcode) {
293+
case WEBSOCKET_OPCODE_TEXT:
294+
return $frame->data;
295+
case WEBSOCKET_OPCODE_CLOSE:
296+
$this->handleClose();
297+
return null;
298+
case WEBSOCKET_OPCODE_PING:
299+
$this->emit('ping', $frame->data);
300+
$this->client->push('', WEBSOCKET_OPCODE_PONG);
301+
return null;
302+
case WEBSOCKET_OPCODE_PONG:
303+
$this->emit('pong', $frame->data);
304+
return null;
305+
}
306+
}
307+
243308
return null;
244-
}
309+
});
310+
}
245311

246-
if ($frame instanceof Frame) {
247-
switch ($frame->opcode) {
248-
case WEBSOCKET_OPCODE_TEXT:
249-
return $frame->data;
250-
case WEBSOCKET_OPCODE_CLOSE:
251-
$this->handleClose();
252-
return null;
253-
case WEBSOCKET_OPCODE_PING:
254-
$this->emit('ping', $frame->data);
255-
$this->client->push('', WEBSOCKET_OPCODE_PONG);
256-
return null;
257-
case WEBSOCKET_OPCODE_PONG:
258-
$this->emit('pong', $frame->data);
259-
return null;
312+
/**
313+
* Check if there is an incoming message available without consuming it
314+
*/
315+
public function hasIncomingMessage(): bool
316+
{
317+
return (bool) $this->ensureCoroutine(function (): bool {
318+
if (!$this->connected) {
319+
return false;
260320
}
261-
}
262321

263-
return null;
322+
// Small timeout to check if there is an incoming message
323+
return $this->client->recv(0.001) !== false;
324+
});
264325
}
265326
}

tests/e2e/AdapterTest.php

+39-43
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
use PHPUnit\Framework\TestCase;
66
use Utopia\WebSocket\Client;
77

8-
use function Swoole\Coroutine\run;
9-
108
class AdapterTest extends TestCase
119
{
1210
private function getWebsocket(string $host, int $port): Client
@@ -32,46 +30,44 @@ public function testWorkerman(): void
3230

3331
private function testServer(string $host, int $port): void
3432
{
35-
run(function () use ($host, $port) {
36-
$client = $this->getWebsocket($host, $port);
37-
$client->connect();
38-
39-
$client->send('ping');
40-
$this->assertEquals('pong', $client->receive());
41-
$this->assertEquals(true, $client->isConnected());
42-
43-
$clientA = $this->getWebsocket($host, $port);
44-
$clientA->connect();
45-
$clientB = $this->getWebsocket($host, $port);
46-
$clientB->connect();
47-
48-
$clientA->send('ping');
49-
$this->assertEquals('pong', $clientA->receive());
50-
$clientB->send('pong');
51-
$this->assertEquals('ping', $clientB->receive());
52-
53-
$clientA->send('broadcast');
54-
$this->assertEquals('broadcast', $client->receive());
55-
$this->assertEquals('broadcast', $clientA->receive());
56-
$this->assertEquals('broadcast', $clientB->receive());
57-
58-
$clientB->send('broadcast');
59-
$this->assertEquals('broadcast', $client->receive());
60-
$this->assertEquals('broadcast', $clientA->receive());
61-
$this->assertEquals('broadcast', $clientB->receive());
62-
63-
$clientA->close();
64-
$clientB->close();
65-
66-
$client->send('disconnect');
67-
$this->assertEquals('disconnect', $client->receive());
68-
69-
try {
70-
$client->receive();
71-
$this->fail('Expected RuntimeException was not thrown');
72-
} catch (\RuntimeException $e) {
73-
$this->assertStringContainsString('Failed to receive data:', $e->getMessage());
74-
}
75-
});
33+
$client = $this->getWebsocket($host, $port);
34+
$client->connect();
35+
36+
$client->send('ping');
37+
$this->assertEquals('pong', $client->receive());
38+
$this->assertEquals(true, $client->isConnected());
39+
40+
$clientA = $this->getWebsocket($host, $port);
41+
$clientA->connect();
42+
$clientB = $this->getWebsocket($host, $port);
43+
$clientB->connect();
44+
45+
$clientA->send('ping');
46+
$this->assertEquals('pong', $clientA->receive());
47+
$clientB->send('pong');
48+
$this->assertEquals('ping', $clientB->receive());
49+
50+
$clientA->send('broadcast');
51+
$this->assertEquals('broadcast', $client->receive());
52+
$this->assertEquals('broadcast', $clientA->receive());
53+
$this->assertEquals('broadcast', $clientB->receive());
54+
55+
$clientB->send('broadcast');
56+
$this->assertEquals('broadcast', $client->receive());
57+
$this->assertEquals('broadcast', $clientA->receive());
58+
$this->assertEquals('broadcast', $clientB->receive());
59+
60+
$clientA->close();
61+
$clientB->close();
62+
63+
$client->send('disconnect');
64+
$this->assertEquals('disconnect', $client->receive());
65+
66+
try {
67+
$client->receive();
68+
$this->fail('Expected RuntimeException was not thrown');
69+
} catch (\RuntimeException $e) {
70+
$this->assertStringContainsString('Failed to receive data:', $e->getMessage());
71+
}
7672
}
7773
}

tests/unit/ClientTest.php

+17
Original file line numberDiff line numberDiff line change
@@ -188,4 +188,21 @@ public function testListenWithError(): void
188188
$this->assertTrue($errorReceived);
189189
$this->assertFalse($this->client->isConnected());
190190
}
191+
192+
public function testHasIncomingMessage(): void
193+
{
194+
$swooleClient = $this->createMock(\Swoole\Coroutine\Http\Client::class);
195+
$swooleClient->method('recv')
196+
->willReturnOnConsecutiveCalls(
197+
new \Swoole\WebSocket\Frame(),
198+
false
199+
);
200+
201+
$reflectionClass = new \ReflectionClass(Client::class);
202+
$reflectionClass->getProperty('connected')->setValue($this->client, true);
203+
$reflectionClass->getProperty('client')->setValue($this->client, $swooleClient);
204+
205+
$this->assertTrue($this->client->hasIncomingMessage());
206+
$this->assertFalse($this->client->hasIncomingMessage());
207+
}
191208
}

0 commit comments

Comments
 (0)