-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtrickle_publisher.py
More file actions
536 lines (464 loc) · 20.3 KB
/
trickle_publisher.py
File metadata and controls
536 lines (464 loc) · 20.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
from __future__ import annotations
import asyncio
from dataclasses import dataclass
import logging
import time
from typing import Optional, AsyncIterator, Callable
import aiohttp
from .errors import LivepeerGatewayError
_LOG = logging.getLogger(__name__)
@dataclass(frozen=True)
class TricklePublisherStats:
elapsed_s: float
segments_started: int
segments_completed: int
segments_failed: int
post_attempts: int
post_retries_no_body_consumed: int
post_success: int
post_http_failures: int
post_exceptions: int
post_404: int
segment_writer_put_timeouts: int
bytes_submitted_to_transport: int
terminal_failures: int
seq: int
consecutive_failures: int
terminal_error: bool
def __str__(self) -> str:
return (
"TricklePublisherStats("
f"elapsed_s={self.elapsed_s:.1f}, "
f"segments_started={self.segments_started}, "
f"segments_completed={self.segments_completed}, "
f"segments_failed={self.segments_failed}, "
f"post_attempts={self.post_attempts}, "
f"post_http_failures={self.post_http_failures}, "
f"post_exceptions={self.post_exceptions}, "
f"segment_writer_put_timeouts={self.segment_writer_put_timeouts}, "
f"terminal_failures={self.terminal_failures}, "
f"terminal_error={self.terminal_error}"
")"
)
class TricklePublishError(LivepeerGatewayError):
"""Base error for trickle publisher failures."""
class TrickleSegmentWriteError(TricklePublishError):
"""A single segment write failed, but the publisher may still be usable."""
def __init__(
self,
message: str,
*,
seq: int,
url: Optional[str] = None,
status: Optional[int] = None,
) -> None:
super().__init__(message)
self.seq = seq
self.url = url
self.status = status
class TricklePublisherTerminalError(TricklePublishError):
"""The trickle publisher entered a terminal failure state."""
def __init__(
self,
message: str,
*,
consecutive_failures: int,
url: Optional[str] = None,
) -> None:
super().__init__(message)
self.consecutive_failures = consecutive_failures
self.url = url
class TricklePublisher:
"""
Trickle publisher that streams bytes to a sequence of HTTP POST endpoints:
- Create stream: POST {base_url}
- Write segment: POST {base_url}/{seq} (streaming body)
- Close stream: DELETE {base_url}
The API matches the usage pattern:
async with TricklePublisher(url, "application/json") as pub:
async with await pub.next() as seg:
await seg.write(b"...")
"""
def __init__(
self,
url: str,
mime_type: str,
*,
start_seq: int = -1,
connection_close: bool = False,
max_consecutive_failures: int = 3,
):
self.url = url.rstrip("/")
self.mime_type = mime_type
self.connection_close = connection_close
self._max_consecutive_failures = max(1, int(max_consecutive_failures))
self.seq = int(start_seq)
# Lazily initialized async runtime bits (safe to construct in sync code).
self._lock: Optional[asyncio.Lock] = None
self._session: Optional[aiohttp.ClientSession] = None
# Preconnected writer state for the next segment.
self._next_state: Optional[_SegmentPostState] = None
# Terminal failure for the whole publisher. Once set, no new segments
# should be opened or written.
self._terminal_error: Optional[TricklePublisherTerminalError] = None
self._consecutive_failures: int = 0
self._started_at = time.time()
self._stats: dict[str, int] = {
"segments_started": 0,
"segments_completed": 0,
"segments_failed": 0,
"post_attempts": 0,
"post_retries_no_body_consumed": 0,
"post_success": 0,
"post_http_failures": 0,
"post_exceptions": 0,
"post_404": 0,
"segment_writer_put_timeouts": 0,
"bytes_submitted_to_transport": 0,
"terminal_failures": 0,
}
async def __aenter__(self) -> "TricklePublisher":
return self
async def __aexit__(self, exc_type, exc_value, traceback) -> None:
await self.close()
async def _ensure_runtime(self) -> None:
if self._lock is None:
self._lock = asyncio.Lock()
if self._session is None:
# Ignore TLS validation (matches the rest of this repo).
connector = aiohttp.TCPConnector(ssl=False)
self._session = aiohttp.ClientSession(connector=connector)
def _stream_url(self, seq: int) -> str:
return f"{self.url}/{seq}"
async def preconnect(self, seq: int, *, send_reset: bool = False) -> _SegmentPostState:
"""
Start the POST for `seq` in the background and return mutable segment state.
"""
await self._ensure_runtime()
assert self._session is not None
url = self._stream_url(seq)
_LOG.debug("Trickle preconnect: %s", url)
state = _SegmentPostState(seq, send_reset=send_reset)
asyncio.create_task(self._run_post(url, state))
return state
async def _run_post(self, url: str, seg_state: _SegmentPostState) -> None:
await self._ensure_runtime()
assert self._session is not None
seq = seg_state.seq
final_exc: Optional[TrickleSegmentWriteError] = None
final_status: Optional[int] = None
final_body: Optional[str] = None
for attempt in range(2):
self._stats["post_attempts"] += 1
seg_state.data_consumed = False
headers = {"Content-Type": self.mime_type}
if seg_state.send_reset:
# Unblocks any hanging subscribers from a previous publish
headers["Lp-Trickle-Reset"] = "1"
if self.connection_close:
headers["Connection"] = "close"
try:
# Intentionally do not set an overall aiohttp request timeout here.
# A trickle segment may stay open for an arbitrary amount of time while
# the producer is still streaming bytes, so a wall-clock POST timeout
# would incorrectly fail healthy long-lived uploads. The bounded failure
# path for stalled delivery is the per-chunk queue.put timeout in
# SegmentWriter.write(), which detects when the HTTP client stops
# consuming request-body data fast enough.
resp = await self._session.post(
url,
headers=headers,
data=self._stream_data(seg_state),
)
final_status = resp.status
final_body = await resp.text() if resp.status != 200 else None
resp.release()
if resp.status == 200:
self._consecutive_failures = 0
self._stats["post_success"] += 1
self._stats["segments_completed"] += 1
return
self._stats["post_http_failures"] += 1
final_exc = TrickleSegmentWriteError(
f"Trickle POST failed url={url} status={resp.status} body={final_body!r}",
seq=seq,
url=url,
status=resp.status,
)
except Exception as e:
self._stats["post_exceptions"] += 1
err = TrickleSegmentWriteError(
f"Trickle POST exception url={url}",
seq=seq,
url=url,
)
err.__cause__ = e
final_exc = err
final_status = None
final_body = None
if final_status == 404:
# Stream doesn't exist on the server; fail fast and do not retry.
self._stats["post_404"] += 1
break
if not seg_state.data_consumed and attempt == 0:
self._stats["post_retries_no_body_consumed"] += 1
_LOG.warning(
"Trickle POST retrying same segment url=%s (no request body consumed)",
url,
)
continue
break
if final_status is not None:
_LOG.error("Trickle POST failed url=%s status=%s body=%r", url, final_status, final_body)
else:
_LOG.error("Trickle POST exception url=%s error=%s", url, final_exc)
assert final_exc is not None
self._record_segment_failure(final_exc, seg_state)
if final_status == 404 and self._terminal_error is None:
_LOG.error("Trickle publisher channel does not exist url=%s", self.url)
terminal_exc = TricklePublisherTerminalError(
"Trickle publisher channel does not exist",
consecutive_failures=self._consecutive_failures,
url=self.url,
)
terminal_exc.__cause__ = final_exc
self._terminal_error = terminal_exc
self._stats["terminal_failures"] += 1
def _record_segment_failure(
self,
exc: TrickleSegmentWriteError,
seg_state: _SegmentPostState,
) -> None:
seg_state.error = exc
self._stats["segments_failed"] += 1
self._consecutive_failures += 1
# check whether failure limit has been hit
if self._terminal_error is None and self._consecutive_failures >= self._max_consecutive_failures:
_LOG.error(
"Trickle publisher reached terminal failure state after %s consecutive failures",
self._consecutive_failures,
)
terminal_exc = TricklePublisherTerminalError(
"Trickle publisher reached terminal failure state",
consecutive_failures=self._consecutive_failures,
url=self.url,
)
terminal_exc.__cause__ = exc
self._terminal_error = terminal_exc
self._stats["terminal_failures"] += 1
async def _run_delete(self) -> None:
await self._ensure_runtime()
assert self._session is not None
try:
resp = await self._session.delete(self.url)
resp.release()
except aiohttp.ClientConnectorError as exc:
# Orchestrator already unreachable — suppress, no need to log at ERROR.
_LOG.debug("Trickle DELETE: orchestrator unreachable (suppressed) url=%s: %s", self.url, exc)
# Suppress any other shutdown-time exceptions, including cancellation.
except BaseException:
_LOG.error("Trickle DELETE exception url=%s", self.url, exc_info=True)
async def _stream_data(self, seg_state: _SegmentPostState) -> AsyncIterator[bytes]:
while True:
chunk = await seg_state.queue.get()
if chunk is None:
break
seg_state.data_consumed = True
yield chunk
async def create(self) -> None:
await self._ensure_runtime()
assert self._session is not None
resp = await self._session.post(
self.url,
headers={"Expect-Content": self.mime_type},
data={},
)
if resp.status != 200:
body = await resp.text()
resp.release()
raise ValueError(f"Trickle create failed: status={resp.status} body={body!r}")
resp.release()
async def _resolve_next_seq(self) -> int:
"""Resolve seq via /next, or return -1 on failure."""
assert self._session is not None
url = f"{self.url}/next"
try:
resp = await self._session.get(url)
latest = resp.headers.get("Lp-Trickle-Latest")
resp.release()
if latest is not None:
resolved_seq = int(latest)
_LOG.debug("Trickle resolved seq from %s: %s", url, resolved_seq)
return resolved_seq
else:
_LOG.warning("Trickle /next missing Lp-Trickle-Latest header")
except Exception:
_LOG.warning("Trickle /next request failed", exc_info=True)
return -1
async def next(self) -> "SegmentWriter":
await self._ensure_runtime()
assert self._lock is not None
async with self._lock:
if self._terminal_error is not None:
raise self._terminal_error
send_reset = False
if self.seq < 0:
send_reset = True
self.seq = await self._resolve_next_seq()
if self._next_state is None or self._next_state.seq != self.seq:
# don't have queue, or a queue for the wrong seq
self._next_state = await self.preconnect(self.seq, send_reset=send_reset)
seg_state = self._next_state
assert seg_state is not None
self._next_state = None
self._stats["segments_started"] += 1
# Preconnect the next segment in the background
self.seq += 1
asyncio.create_task(self._preconnect_task(self.seq))
return SegmentWriter(
seg_state,
error_getter=lambda: self._terminal_error,
on_write_bytes=self._record_write_bytes,
on_write_timeout=self._record_write_timeout,
)
async def _preconnect_task(self, seq: int) -> None:
await self._ensure_runtime()
assert self._lock is not None
# Hold the lock across preconnect so only one task can reserve and
# publish the next-state slot for this sequence at a time.
async with self._lock:
if self._terminal_error is not None:
return
if self._next_state is not None:
return
if self.seq != seq:
# seq is stale
return
self._next_state = await self.preconnect(seq)
async def close(self) -> None:
# If the publisher was never used, avoid creating a session just to close it.
if self._session is None and self._lock is None and self._next_state is None:
return
try:
await self._ensure_runtime()
# Close is best-effort; suppress cancellation/runtime-init failures.
except BaseException:
_LOG.warning("Trickle close suppressed runtime init failure url=%s", self.url, exc_info=True)
return
assert self._lock is not None
_LOG.debug("Trickle close: %s", self.url)
try:
async with self._lock:
if self._next_state is not None:
await SegmentWriter(self._next_state).close()
self._next_state = None
if self._session is not None:
try:
await self._run_delete()
# Best-effort shutdown: do not abort close on delete failures,
# including cancellation.
except BaseException:
_LOG.warning("Trickle close suppressed delete failure url=%s", self.url, exc_info=True)
try:
await self._session.close()
# Session close should not block the rest of teardown.
except BaseException:
_LOG.warning("Trickle close suppressed session close failure url=%s", self.url, exc_info=True)
self._session = None
# Close should not raise; preserve best-effort semantics even on cancellation.
except BaseException:
_LOG.warning("Trickle close suppressed failure url=%s", self.url, exc_info=True)
if self._session is not None:
try:
await self._session.close()
# Final fallback close must remain non-throwing.
except BaseException:
_LOG.warning("Trickle close suppressed fallback session close failure url=%s", self.url, exc_info=True)
self._session = None
def _record_write_bytes(self, byte_count: int) -> None:
self._stats["bytes_submitted_to_transport"] += max(0, byte_count)
def _record_write_timeout(self) -> None:
self._stats["segment_writer_put_timeouts"] += 1
def get_stats(self) -> TricklePublisherStats:
return TricklePublisherStats(
elapsed_s=max(0.0, time.time() - self._started_at),
segments_started=self._stats["segments_started"],
segments_completed=self._stats["segments_completed"],
segments_failed=self._stats["segments_failed"],
post_attempts=self._stats["post_attempts"],
post_retries_no_body_consumed=self._stats["post_retries_no_body_consumed"],
post_success=self._stats["post_success"],
post_http_failures=self._stats["post_http_failures"],
post_exceptions=self._stats["post_exceptions"],
post_404=self._stats["post_404"],
segment_writer_put_timeouts=self._stats["segment_writer_put_timeouts"],
bytes_submitted_to_transport=self._stats["bytes_submitted_to_transport"],
terminal_failures=self._stats["terminal_failures"],
seq=self.seq,
consecutive_failures=self._consecutive_failures,
terminal_error=self._terminal_error is not None,
)
class _SegmentPostState:
__slots__ = ("seq", "queue", "error", "data_consumed", "send_reset")
def __init__(self, seq: int, *, send_reset: bool = False) -> None:
self.seq = seq
self.queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue(maxsize=1)
# Failure for this one segment only, not necessarily terminal
self.error: Optional[TrickleSegmentWriteError] = None
self.data_consumed: bool = False
self.send_reset: bool = send_reset
_SEGMENT_QUEUE_PUT_TIMEOUT_S = 5.0
class SegmentWriter:
def __init__(
self,
seg_state: _SegmentPostState,
*,
error_getter: Optional[Callable[[], Optional[TricklePublisherTerminalError]]] = None,
on_write_bytes: Optional[Callable[[int], None]] = None,
on_write_timeout: Optional[Callable[[], None]] = None,
):
self._seg_state = seg_state
self.queue = seg_state.queue
self._seq = seg_state.seq
self._error_getter = error_getter
self._on_write_bytes = on_write_bytes
self._on_write_timeout = on_write_timeout
async def write(self, data: bytes) -> None:
if self._error_getter is not None:
err = self._error_getter()
if err is not None:
raise err
if self._seg_state.error is not None:
raise self._seg_state.error
try:
# This bounds local backpressure while feeding the request body; it does
# not bound the total lifetime of the HTTP POST once the body is drained.
await asyncio.wait_for(self.queue.put(data), timeout=_SEGMENT_QUEUE_PUT_TIMEOUT_S)
if self._on_write_bytes is not None:
self._on_write_bytes(len(data))
except asyncio.TimeoutError as e:
if self._on_write_timeout is not None:
self._on_write_timeout()
if self._error_getter is not None:
err = self._error_getter()
if err is not None:
raise err
raise TrickleSegmentWriteError(
f"Trickle segment writer timed out after {_SEGMENT_QUEUE_PUT_TIMEOUT_S:.1f}s",
seq=self._seq,
) from e
async def close(self) -> None:
# Close is best-effort; capture any errors, log them and move on.
if self._seg_state.error is not None:
return
try:
await asyncio.wait_for(self.queue.put(None), timeout=_SEGMENT_QUEUE_PUT_TIMEOUT_S)
# BaseException to also capture cancellation errors, timeout errors, etc
except BaseException:
_LOG.warning("Trickle segment close suppressed seq=%s", self._seq, exc_info=True)
async def __aenter__(self) -> "SegmentWriter":
return self
async def __aexit__(self, exc_type, exc_value, traceback) -> None:
await self.close()
def seq(self) -> int:
return self._seq