Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ jobs:
sleep 10

- name: Run Tests
run: docker compose exec tests vendor/bin/phpunit
run: docker compose exec tests vendor/bin/phpunit --debug
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
"swoole/ide-helper": "5.1.2",
"textalk/websocket": "1.5.2",
"phpunit/phpunit": "^9.5.5",
"workerman/workerman": "^4.0",
"phpstan/phpstan": "^1.8",
"workerman/workerman": "4.1.*",
"phpstan/phpstan": "^1.12",
"laravel/pint": "^1.15"
}
}
165 changes: 81 additions & 84 deletions composer.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions php-8.0.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM composer:2.0 as composer
FROM composer:2.0 AS composer

ARG TESTING=false
ENV TESTING=$TESTING
Expand All @@ -15,7 +15,7 @@ RUN composer install \
--no-scripts \
--prefer-dist

FROM appwrite/utopia-base:php-8.0-0.1.0 as final
FROM appwrite/utopia-base:php-8.0-0.1.0 AS final

RUN docker-php-ext-install sockets pcntl

Expand Down
4 changes: 2 additions & 2 deletions php-8.1.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM composer:2.0 as composer
FROM composer:2.0 AS composer

ARG TESTING=false
ENV TESTING=$TESTING
Expand All @@ -15,7 +15,7 @@ RUN composer install \
--no-scripts \
--prefer-dist

FROM appwrite/utopia-base:php-8.1-0.1.0 as final
FROM appwrite/utopia-base:php-8.1-0.1.0 AS final

RUN docker-php-ext-install sockets pcntl

Expand Down
4 changes: 2 additions & 2 deletions php-8.2.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM composer:2.0 as composer
FROM composer:2.0 AS composer

ARG TESTING=false
ENV TESTING=$TESTING
Expand All @@ -15,7 +15,7 @@ RUN composer install \
--no-scripts \
--prefer-dist

FROM appwrite/utopia-base:php-8.2-0.1.0 as final
FROM appwrite/utopia-base:php-8.2-0.1.0 AS final

RUN docker-php-ext-install sockets pcntl

Expand Down
4 changes: 2 additions & 2 deletions php-8.3.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM composer:2.0 as composer
FROM composer:2.0 AS composer

ARG TESTING=false
ENV TESTING=$TESTING
Expand All @@ -15,7 +15,7 @@ RUN composer install \
--no-scripts \
--prefer-dist

FROM appwrite/utopia-base:php-8.3-0.1.0 as final
FROM appwrite/utopia-base:php-8.3-0.1.0 AS final

RUN docker-php-ext-install sockets pcntl

Expand Down
8 changes: 4 additions & 4 deletions phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false"
processIsolation="true"
stopOnFailure="true"
>
<testsuites>
<!-- <testsuite name="Unit">
<testsuite name="Unit">
<directory>./tests/unit/</directory>
</testsuite> -->
</testsuite>
<testsuite name="E2E">
<directory>./tests/e2e/</directory>
</testsuite>
Expand Down
265 changes: 265 additions & 0 deletions src/WebSocket/Client.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
<?php

namespace Utopia\WebSocket;

use Swoole\Coroutine\Http\Client as SwooleClient;
use Swoole\WebSocket\Frame;

class Client
{
private SwooleClient $client;
private bool $connected = false;
private string $host;
private int $port;
private string $path;
/** @var array<string, string> */
private array $headers;
private float $timeout;

// Event handlers
private ?\Closure $onMessage = null;
private ?\Closure $onClose = null;
private ?\Closure $onError = null;
private ?\Closure $onOpen = null;
private ?\Closure $onPing = null;
private ?\Closure $onPong = null;

/**
* @param string $url
* @param array{headers?: array<string, string>, timeout?: float} $options
*/
public function __construct(string $url, array $options = [])
{
$parsedUrl = parse_url($url);
if ($parsedUrl === false) {
throw new \InvalidArgumentException('Invalid WebSocket URL');
}

if (!isset($parsedUrl['host'])) {
throw new \InvalidArgumentException('WebSocket URL must contain a host');
}

$this->host = $parsedUrl['host'];
$this->port = $parsedUrl['port'] ?? (isset($parsedUrl['scheme']) && $parsedUrl['scheme'] === 'wss' ? 443 : 80);
$this->path = $parsedUrl['path'] ?? '/';
if (isset($parsedUrl['query'])) {
$this->path .= '?' . $parsedUrl['query'];
}

$this->headers = $options['headers'] ?? [];
$this->timeout = $options['timeout'] ?? 30;
}

public function connect(): void
{
$this->client = new SwooleClient($this->host, $this->port, $this->port === 443);
$this->client->set([
'timeout' => $this->timeout,
'websocket_compression' => true,
'max_frame_size' => 32 * 1024 * 1024, // 32MB max frame size
]);

if (!empty($this->headers)) {
$this->client->setHeaders($this->headers);
}

$success = $this->client->upgrade($this->path);

if (!$success) {
$error = new \RuntimeException(
"WebSocket connection failed: {$this->client->errCode} - {$this->client->errMsg}"
);
$this->emit('error', $error);
throw $error;
}

$this->connected = true;
$this->emit('open');
}

public function listen(): void
{
while ($this->connected) {
try {
$frame = $this->client->recv($this->timeout);

if ($frame === false) {
if ($this->client->errCode === SWOOLE_ERROR_CLIENT_NO_CONNECTION) {
$this->handleClose();
break;
}
throw new \RuntimeException(
"Failed to receive data: {$this->client->errCode} - {$this->client->errMsg}"
);
}

if ($frame === "") {
continue;
}

if ($frame instanceof Frame) {
$this->handleFrame($frame);
}
} catch (\Throwable $e) {
$this->emit('error', $e);
$this->handleClose();
break;
}
}
}

private function handleFrame(Frame $frame): void
{
switch ($frame->opcode) {
case WEBSOCKET_OPCODE_TEXT:
$this->emit('message', $frame->data);
break;
case WEBSOCKET_OPCODE_CLOSE:
$this->handleClose();
break;
case WEBSOCKET_OPCODE_PING:
$this->emit('ping', $frame->data);
$this->client->push('', WEBSOCKET_OPCODE_PONG);
break;
case WEBSOCKET_OPCODE_PONG:
$this->emit('pong', $frame->data);
break;
}
}

private function handleClose(): void
{
if ($this->connected) {
$this->connected = false;
$this->emit('close');
$this->client->close();
}
}

public function send(string $data): void
{
if (!$this->connected) {
throw new \RuntimeException('Not connected to WebSocket server');
}

$success = $this->client->push($data);

if ($success === false) {
$error = new \RuntimeException(
"Failed to send data: {$this->client->errCode} - {$this->client->errMsg}"
);
$this->emit('error', $error);
throw $error;
}
}

public function close(): void
{
$this->handleClose();
}

public function isConnected(): bool
{
return $this->connected;
}

// Event handling methods
public function onMessage(\Closure $callback): self
{
$this->onMessage = $callback;
return $this;
}

public function onClose(\Closure $callback): self
{
$this->onClose = $callback;
return $this;
}

public function onError(\Closure $callback): self
{
$this->onError = $callback;
return $this;
}

public function onOpen(\Closure $callback): self
{
$this->onOpen = $callback;
return $this;
}

public function onPing(\Closure $callback): self
{
$this->onPing = $callback;
return $this;
}

public function onPong(\Closure $callback): self
{
$this->onPong = $callback;
return $this;
}

/**
* @param string $event
* @param mixed $data
*/
private function emit(string $event, mixed $data = null): void
{
$handler = match ($event) {
'message' => $this->onMessage,
'close' => $this->onClose,
'error' => $this->onError,
'open' => $this->onOpen,
'ping' => $this->onPing,
'pong' => $this->onPong,
default => null
};

if ($handler !== null) {
$handler($data);
}
}

public function receive(): ?string
{
if (!$this->connected) {
throw new \RuntimeException('Not connected to WebSocket server');
}

$frame = $this->client->recv($this->timeout);

if ($frame === false) {
if ($this->client->errCode === SWOOLE_ERROR_CLIENT_NO_CONNECTION) {
$this->handleClose();
return null;
}
throw new \RuntimeException(
"Failed to receive data: {$this->client->errCode} - {$this->client->errMsg}"
);
}

if ($frame === "") {
return null;
}

if ($frame instanceof Frame) {
switch ($frame->opcode) {
case WEBSOCKET_OPCODE_TEXT:
return $frame->data;
case WEBSOCKET_OPCODE_CLOSE:
$this->handleClose();
return null;
case WEBSOCKET_OPCODE_PING:
$this->emit('ping', $frame->data);
$this->client->push('', WEBSOCKET_OPCODE_PONG);
return null;
case WEBSOCKET_OPCODE_PONG:
$this->emit('pong', $frame->data);
return null;
}
}

return null;
}
}
Loading