-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathlog_streaming.py
More file actions
137 lines (112 loc) · 3.83 KB
/
log_streaming.py
File metadata and controls
137 lines (112 loc) · 3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""Live log streaming from Kubernetes pods.
Provides utilities to stream pod logs to stdout in real-time using a
background daemon thread. Used by both GKE and Pathways backends during
job execution.
"""
import sys
import threading
from collections import deque
import urllib3
from absl import logging
from kubernetes.client.rest import ApiException
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
_MAX_DISPLAY_LINES = 25
def _stream_pod_logs(core_v1, pod_name, namespace):
"""Stream pod logs to stdout. Designed to run in a daemon thread.
Uses the Kubernetes follow API to tail logs in real-time. The stream
ends naturally when the container exits.
In interactive terminals, logs are displayed in a Rich Live panel.
In non-interactive contexts (piped output, CI), logs are streamed
as raw lines with Rich Rule delimiters.
Args:
core_v1: Kubernetes CoreV1Api client.
pod_name: Name of the pod to stream logs from.
namespace: Kubernetes namespace.
"""
console = Console()
resp = None
try:
resp = core_v1.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
follow=True,
_preload_content=False,
)
if console.is_terminal:
_render_live_panel(resp, pod_name, console)
else:
_render_plain(resp, pod_name, console)
except ApiException:
pass # Pod deleted or not found
except urllib3.exceptions.ProtocolError:
pass # Connection broken mid-stream (pod terminated)
except Exception:
logging.warning(
"Log streaming from %s failed unexpectedly", pod_name, exc_info=True
)
finally:
if resp is not None:
resp.release_conn()
def _render_live_panel(resp, pod_name, console):
"""Render streaming logs inside a Rich Live panel."""
lines = deque(maxlen=_MAX_DISPLAY_LINES)
title = f"Remote logs \u2022 {pod_name}"
buffer = ""
with Live(
_make_log_panel(lines, title),
console=console,
refresh_per_second=4,
) as live:
for chunk in resp.stream(decode_content=True):
buffer += chunk.decode("utf-8", errors="replace")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
lines.append(line)
live.update(_make_log_panel(lines, title))
# Flush remaining partial line
if buffer.strip():
lines.append(buffer)
live.update(_make_log_panel(lines, title))
def _render_plain(resp, pod_name, console):
"""Render streaming logs as raw lines with Rule delimiters."""
console.rule(f"Remote logs ({pod_name})", style="blue")
for chunk in resp.stream(decode_content=True):
sys.stdout.write(chunk.decode("utf-8", errors="replace"))
sys.stdout.flush()
console.rule("End remote logs", style="blue")
def _make_log_panel(lines, title):
"""Build a Panel renderable from accumulated log lines."""
content = "\n".join(lines) if lines else "Waiting for output..."
return Panel(content, title=title, border_style="blue")
class LogStreamer:
"""Context manager that owns the log-streaming thread lifecycle.
Usage::
with LogStreamer(core_v1, namespace) as streamer:
while polling:
...
if pod_is_running:
streamer.start(pod_name) # idempotent
"""
def __init__(self, core_v1, namespace):
self._core_v1 = core_v1
self._namespace = namespace
self._thread = None
def __enter__(self):
return self
def __exit__(self, *exc):
if self._thread is not None:
self._thread.join(timeout=5)
return False
def start(self, pod_name):
"""Start streaming if not already active (idempotent)."""
if self._thread is not None:
return
logging.info("Streaming logs from %s...", pod_name)
self._thread = threading.Thread(
target=_stream_pod_logs,
args=(self._core_v1, pod_name, self._namespace),
daemon=True,
)
self._thread.start()