-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfunctions.php
160 lines (141 loc) · 5.47 KB
/
functions.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
<?php
namespace Clue\React\Promise\Stream;
use React\Stream\ReadableStreamInterface;
use React\Promise;
use React\Promise\PromiseInterface;
use React\Stream\WritableStreamInterface;
use Evenement\EventEmitterInterface;
/**
* Creates a `Promise` which resolves with the stream data buffer
*
* @param ReadableStreamInterface $stream
* @return CancellablePromiseInterface Promise<string, Exception>
*/
function buffer(ReadableStreamInterface $stream)
{
// stream already ended => resolve with empty buffer
if (!$stream->isReadable()) {
return Promise\resolve('');
}
$buffer = '';
$bufferer = function ($data) use (&$buffer) {
$buffer .= $data;
};
$stream->on('data', $bufferer);
$promise = new Promise\Promise(function ($resolve, $reject) use ($stream, &$buffer) {
$stream->on('error', function ($error) use ($reject) {
$reject(new \RuntimeException('An error occured on the underlying stream while buffering', 0, $error));
});
$stream->on('close', function () use ($resolve, &$buffer) {
$resolve($buffer);
});
}, function ($_, $reject) {
$reject(new \RuntimeException('Cancelled buffering'));
});
return $promise->then(null, function ($error) use (&$buffer, $bufferer, $stream) {
// promise rejected => clear buffer and buffering
$buffer = '';
$stream->removeListener('data', $bufferer);
throw $error;
});
}
/**
* Creates a `Promise` which resolves with the first event data
*
* @param ReadableStreamInterface|WritableStreamInterface $stream
* @param string $event
* @return CancellablePromiseInterface Promise<mixed, Exception>
*/
function first(EventEmitterInterface $stream, $event = 'data')
{
if ($stream instanceof ReadableStreamInterface) {
// readable or duplex stream not readable => already closed
// a half-open duplex stream is considered closed if its readable side is closed
if (!$stream->isReadable()) {
return Promise\reject(new \RuntimeException('Stream already closed'));
}
} elseif ($stream instanceof WritableStreamInterface) {
// writable-only stream (not duplex) not writable => already closed
if (!$stream->isWritable()) {
return Promise\reject(new \RuntimeException('Stream already closed'));
}
}
return new Promise\Promise(function ($resolve, $reject) use ($stream, $event, &$listener) {
$listener = function ($data) use ($stream, $event, &$listener, $resolve) {
$stream->removeListener($event, $listener);
$resolve($data);
};
$stream->on($event, $listener);
$stream->on('close', function () use ($stream, $event, $listener, $reject) {
$stream->removeListener($event, $listener);
$reject(new \RuntimeException('Stream closed'));
});
}, function ($_, $reject) use ($stream, $event, &$listener) {
$stream->removeListener($event, $listener);
$reject(new \RuntimeException('Operation cancelled'));
});
}
/**
* Creates a `Promise` which resolves with an array of all the event data
*
* @param ReadableStreamInterface|WritableStreamInterface $stream
* @param string $event
* @return CancellablePromiseInterface Promise<string, Exception>
*/
function all(EventEmitterInterface $stream, $event = 'data')
{
// stream already ended => resolve with empty buffer
if ($stream instanceof ReadableStreamInterface) {
// readable or duplex stream not readable => already closed
// a half-open duplex stream is considered closed if its readable side is closed
if (!$stream->isReadable()) {
return Promise\resolve(array());
}
} elseif ($stream instanceof WritableStreamInterface) {
// writable-only stream (not duplex) not writable => already closed
if (!$stream->isWritable()) {
return Promise\resolve(array());
}
}
$buffer = array();
$bufferer = function ($data) use (&$buffer) {
$buffer []= $data;
};
$stream->on($event, $bufferer);
$promise = new Promise\Promise(function ($resolve, $reject) use ($stream, &$buffer) {
$stream->on('error', function ($error) use ($reject) {
$reject(new \RuntimeException('An error occured on the underlying stream while buffering', 0, $error));
});
$stream->on('close', function () use ($resolve, &$buffer) {
$resolve($buffer);
});
}, function ($_, $reject) {
$reject(new \RuntimeException('Cancelled buffering'));
});
return $promise->then(null, function ($error) use (&$buffer, $bufferer, $stream, $event) {
// promise rejected => clear buffer and buffering
$buffer = array();
$stream->removeListener($event, $bufferer);
throw $error;
});
}
/**
* unwrap a `Promise` which resolves with a `ReadableStreamInterface`.
*
* @param PromiseInterface $promise Promise<ReadableStreamInterface, Exception>
* @return ReadableStreamInterface
*/
function unwrapReadable(PromiseInterface $promise)
{
return new UnwrapReadableStream($promise);
}
/**
* unwrap a `Promise` which resolves with a `WritableStreamInterface`.
*
* @param PromiseInterface $promise Promise<WritableStreamInterface, Exception>
* @return WritableStreamInterface
*/
function unwrapWritable(PromiseInterface $promise)
{
return new UnwrapWritableStream($promise);
}