Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/__init__.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
'core/Coroutine/Barrier.php',
'core/Coroutine/Http/ClientProxy.php',
'core/Coroutine/Http/functions.php',
# <core for Http2> #
'core/Coroutine/Http2/Client2.php',
'core/Coroutine/Http2/ChannelManager.php',
# <core for connection pool> #
'core/ConnectionPool.php',
'core/Database/ObjectProxy.php',
Expand Down
61 changes: 61 additions & 0 deletions src/core/Coroutine/Http2/ChannelManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

declare(strict_types=1);

namespace Swoole\Coroutine\Http2;

use Swoole\Coroutine\Channel;

class ChannelManager
{
/**
* @var Channel[]
*/
protected array $channels = [];

public function get(int $streamId, bool $initialize = false): ?Channel
{
if (isset($this->channels[$streamId])) {
return $this->channels[$streamId];
}

if ($initialize) {
return $this->channels[$streamId] = $this->make(1);
}

return null;
}

public function make(int $limit): Channel
{
return new Channel($limit);
}

public function close(int $streamId): void
{
if ($channel = $this->channels[$streamId] ?? null) {
$channel->close();
}

unset($this->channels[$streamId]);
}

public function getChannels(): array
{
return $this->channels;
}

public function flush(): void
{
$channels = $this->getChannels();
$streamIds = array_keys($channels);
foreach ($streamIds as $streamId) {
$this->close($streamId);
}
}

public function isEmpty(): bool
{
return count($this->channels) === 0;
}
}
147 changes: 147 additions & 0 deletions src/core/Coroutine/Http2/Client2.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
<?php

declare(strict_types=1);

namespace Swoole\Coroutine\Http2;

use Swoole\Coroutine\Channel;
use Swoole\Http2\Request;
use Swoole\Http2\Response;
use Throwable;

use function Swoole\Coroutine\go;

class Client2 extends Client
{
protected ?Channel $chan = null;

protected ?Channel $sleepChan = null;

protected ChannelManager $channelManager;

protected bool $idleClose = false;

protected int $lastSendTime = 0;

public function __construct(string $host, int $port = 80, bool $open_ssl = false)
{
parent::__construct($host, $port, $open_ssl);
$this->channelManager = new ChannelManager();
}

public function request(Request $request, float $timeout = -1): false|Response
{
$this->loop();
$streamId = $this->send($request);
$this->lastSendTime = time();

if ($streamId === false) {
return false;
}
$manager = $this->getChannelManager();
$chan = $manager->get($streamId, true);
try {
$data = $chan->pop($timeout);
} finally {
$manager->close($streamId);
}

return $data;
}

public function close(): bool
{
$this->getChannelManager()->flush();
$this->chan?->close();
$this->chan = null;
$this->sleepChan?->close();
$this->sleepChan = null;
return parent::close();
}

protected function getChannelManager(): ChannelManager
{
return $this->channelManager;
}

protected function reconnect(): bool
{
parent::close();
return parent::connect();
}

protected function loop(): void
{
$this->idleClose();

if ($this->chan !== null) {
return;
}
$this->chan = new Channel(65535);

if (! $this->ping()) {
$this->reconnect();
}
go(
function () {
$reason = '';
try {
$chan = $this->chan;
while (true) {
$response = $this->recv();

if ($chan?->errCode !== SWOOLE_CHANNEL_OK) {
$reason = 'channel closed.';
break;
}

if ($response === false) {
$reason = 'client broken.';
break;
}

if ($channel = $this->getChannelManager()->get($response->streamId)) {
$channel->push($response);
}
}
} catch (Throwable $exception) {
swoole_error_log(SWOOLE_LOG_ERROR, (string) $exception);
} finally {
swoole_error_log(SWOOLE_LOG_DEBUG, 'Recv loop broken, wait to restart in next time. The reason is ' . $reason);
$this->close();
}
}
);
}

protected function idleClose(): void
{
if (! $this->idleClose) {
$this->idleClose = true;
go(
function () {
try {
while (true) {
$this->sleep(3);
if ($this->chan === null) {
break;
}
if ($this->channelManager->isEmpty() && time() - $this->lastSendTime > 10) {
$this->close();
break;
}
}
} finally {
$this->idleClose = false;
}
}
);
}
}

protected function sleep(float $timeout = -1): void
{
$this->sleepChan ??= new Channel(1);
$this->sleepChan->pop($timeout);
}
}
59 changes: 59 additions & 0 deletions tests/unit/Coroutine/Http2/ChannelManagerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

declare(strict_types=1);

namespace Swoole\Coroutine\Http2;

use PHPUnit\Framework\Attributes\CoversClass;
use PHPUnit\Framework\TestCase;
use Swoole\Coroutine\Channel;

use function Swoole\Coroutine\go;
use function Swoole\Coroutine\run;

/**
* @internal
*/
#[CoversClass(ChannelManager::class)]
class ChannelManagerTest extends TestCase
{
public function testChannelManager()
{
run(
function () {
$manager = new ChannelManager();
$chan = $manager->get(1, true);
$this->assertInstanceOf(Channel::class, $chan);
$chan = $manager->get(1);
$this->assertInstanceOf(Channel::class, $chan);
go(
function () use ($chan) {
usleep(10 * 1000);
$chan->push('Hello World.');
}
);

$this->assertSame('Hello World.', $chan->pop());
$manager->close(1);
$this->assertNull($manager->get(1));
}
);
}

public function testChannelFlush()
{
run(
function () {
$manager = new ChannelManager();
$manager->get(1, true);
$manager->get(2, true);
$manager->get(4, true);
$manager->get(5, true);

$this->assertSame(4, count($manager->getChannels()));
$manager->flush();
$this->assertSame(0, count($manager->getChannels()));
}
);
}
}
57 changes: 57 additions & 0 deletions tests/unit/Coroutine/Http2/Client2Test.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

/**
* This file is part of Swoole.
*
* @see https://www.swoole.com
* @contact team@swoole.com
* @license https://github.com/swoole/library/blob/master/LICENSE
*/

declare(strict_types=1);

namespace Swoole\Coroutine\Http2;

use PHPUnit\Framework\Attributes\CoversClass;
use PHPUnit\Framework\TestCase;
use Swoole\Http2\Request;

use function Swoole\Coroutine\go;
use function Swoole\Coroutine\run;

/**
* @internal
*/
#[CoversClass(Client2::class)]
class Client2Test extends TestCase
{
public function testRequest(): void
{
run(function () {
$domain = 'httpbin.org';
$client = new Client2($domain, 443, true);
$client->set([
'timeout' => -1,
'ssl_host_name' => $domain,
]);
$client->connect();
for ($i = 1; $i < 30; ++$i) {
go(function () use ($client, $i) {
$req = new Request();
$req->method = 'POST';
$req->path = '/post';
$req->headers = [
'host' => '127.0.0.1',
'user-agent' => 'Chrome/49.0.2587.3',
'accept' => 'text/html,application/xhtml+xml,application/xml',
'accept-encoding' => 'gzip',
];
$req->data = (string) $i;
$data = $client->request($req);
$result = json_decode($data->data, true);
$this->assertEquals($i, (int) $result['data']);
});
}
});
}
}