-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathone_ring.py
More file actions
218 lines (183 loc) · 9.51 KB
/
one_ring.py
File metadata and controls
218 lines (183 loc) · 9.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/env python3
import logging
import threading
import sys
from pathlib import Path
import gi
import queue
import time
gi.require_version("Gst", "1.0")
gi.require_version('GdkPixbuf', '2.0')
from gi.repository import Gst, GLib, GdkPixbuf
# Import our custom modules
from pymumble_py3 import Mumble
import config
from overlay.renderer import OverlayRenderer
from telemetry.simulator import TelemetrySimulator
# --- Shared Data Stores ---
telemetry_data = {
'driver_hr': 80, 'codriver_hr': 70, 'rpm': 0, 'gear': "N",
'speed_mph': 0.0, 'throttle_pct': 0, 'brake_pct': 100,
'g_force_x': 0.0, 'g_force_y': 0.0, 'g_force_z': 1.0,
'boost_psi': 0.0,
}
telemetry_lock = threading.Lock()
# Double-buffer system for thread-safe graphics
graphics_buffers = [None, None] # [front_buffer, back_buffer]
active_buffer_idx = [0] # A list containing the index of the active front buffer
graphics_lock = threading.Lock()
audio_queue = queue.Queue(maxsize=10) # Queue for audio data from GStreamer to Mumble
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(threadName)s - %(message)s')
class MumbleClient(threading.Thread):
def __init__(self, settings, audio_q, audio_device):
super().__init__(name="Mumble-Client", daemon=True)
self.audio_queue = audio_q
self.mumble = Mumble(settings['host'], settings['user'], port=settings['port'], password=settings['password'], debug=False)
self.mumble.set_application_string("Phares Motors Racecam")
self.mumble.callbacks.set_callback("sound_received", self._sound_received_handler)
self.last_log_time = 0
# GStreamer pipeline for Mumble audio playback
pipeline_str = f"appsrc name=mumble_audio_src ! audio/x-raw,format=S16LE,layout=interleaved,rate=48000,channels=1 ! audioconvert ! audioresample ! alsasink device={audio_device}"
self.playback_pipeline = Gst.parse_launch(pipeline_str)
self.appsrc = self.playback_pipeline.get_by_name("mumble_audio_src")
self.appsrc.set_property("format", Gst.Format.TIME)
self.playback_pipeline.set_state(Gst.State.PLAYING)
def _sound_received_handler(self, user, sound_chunk):
"""Called by pymumble when audio is received from the server."""
# Push the raw PCM data into our GStreamer playback pipeline
buf = Gst.Buffer.new_wrapped(sound_chunk.pcm)
self.appsrc.emit("push-buffer", buf)
def run(self):
logging.info("Mumble client thread started.")
try:
self.mumble.start()
self.mumble.is_ready()
logging.info("Mumble client connected to server.")
# Loop to send audio from the queue
while self.mumble.is_alive():
try:
chunk = self.audio_queue.get(block=True, timeout=1)
self.mumble.sound_output.add_sound(chunk)
# Log every 5 seconds to avoid spamming the logs
if time.time() - self.last_log_time > 5:
logging.info("Mumble client is successfully sending audio data to the server.")
self.last_log_time = time.time()
except queue.Empty:
continue
except Exception as e:
logging.error(f"Mumble client error: {e}")
finally:
logging.info("Mumble client shutting down.")
if self.playback_pipeline:
self.playback_pipeline.set_state(Gst.State.NULL)
class TelemetryStreamer:
def __init__(self, audio_device, rtmp_url, mumble_enabled):
Gst.init(None)
self.loop = GLib.MainLoop()
self.mumble_enabled = mumble_enabled
self.pipeline = self._build_pipeline(audio_device, rtmp_url)
if not self.pipeline:
logging.error("Pipeline construction failed. Exiting."); sys.exit(1)
self.graphics_overlay = self.pipeline.get_by_name("overlay_graphics")
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self._on_bus_message)
if self.mumble_enabled:
mumble_sink = self.pipeline.get_by_name("mumble_sink")
mumble_sink.set_property("emit-signals", True)
mumble_sink.connect("new-sample", self._on_new_mumble_audio_sample)
def _build_pipeline(self, audio_device, rtmp_url):
# Base pipeline for video and overlay
pipeline_str = (
f"v4l2src device={config.VIDEO_DEVICE} ! image/jpeg,width={config.SCREEN_WIDTH},height={config.SCREEN_HEIGHT},framerate={config.FRAME_RATE} ! jpegparse ! jpegdec ! "
f"gdkpixbufoverlay name=overlay_graphics ! "
f"videoconvert ! v4l2h264enc extra-controls=\"controls,video_bitrate=8000000\" ! h264parse ! flvmux name=mux streamable=true ! rtmp2sink location={rtmp_url} "
)
# Audio pipeline part
if self.mumble_enabled:
# If mumble is on, we tee the audio to the stream and to an appsink for the mumble client
pipeline_str += (
f"alsasrc device={audio_device} ! tee name=audio_tee ! "
f"queue ! audioconvert ! audioresample ! lamemp3enc ! mpegaudioparse ! mux. "
f"audio_tee. ! queue ! audioconvert ! audio/x-raw,format=S16LE,rate=48000,channels=1 ! appsink name=mumble_sink"
)
else:
# Original audio pipeline
pipeline_str += f"alsasrc device={audio_device} ! audioconvert ! audioresample ! lamemp3enc ! mpegaudioparse ! mux."
print(f"Using pipeline:\n{pipeline_str}")
try:
return Gst.parse_launch(pipeline_str)
except GLib.Error as e:
logging.error(f"Failed to parse pipeline: {e}"); return None
def _on_bus_message(self, bus, message):
t = message.type
if t == Gst.MessageType.ERROR: err, dbg = message.parse_error(); logging.error(f"GStreamer error: {err} - {dbg}"); self.stop()
elif t == Gst.MessageType.EOS: logging.info("End-Of-Stream reached."); self.stop()
def _update_overlays(self):
"""Called at video framerate (e.g., 60fps). Does minimal work."""
with graphics_lock:
front_buffer_data = graphics_buffers[active_buffer_idx[0]]
if front_buffer_data:
pixbuf = GdkPixbuf.Pixbuf.new_from_data(front_buffer_data, GdkPixbuf.Colorspace.RGB, True, 8, config.SCREEN_WIDTH, config.SCREEN_HEIGHT, config.SCREEN_WIDTH * 4)
self.graphics_overlay.set_property("pixbuf", pixbuf)
return True
def _on_new_mumble_audio_sample(self, sink):
"""Called by appsink when a new audio buffer is ready for Mumble."""
# This is a critical log. If you don't see this, GStreamer isn't sending audio here.
logging.info("Mumble appsink received an audio sample from GStreamer.")
sample = sink.emit("pull-sample")
if sample:
buf = sample.get_buffer()
data = buf.extract_dup(0, buf.get_size())
if not audio_queue.full():
audio_queue.put(data)
else:
logging.warning("Mumble audio queue is full. Dropping audio frame.")
return Gst.FlowReturn.OK
def run(self):
self.pipeline.set_state(Gst.State.PLAYING)
GLib.timeout_add(int(1000/60), self._update_overlays)
print("Streamer is running. Press Ctrl+C to stop.")
try: self.loop.run()
except KeyboardInterrupt: self.stop()
def stop(self):
print("\nStopping streamer...")
if self.loop.is_running():
if self.pipeline: self.pipeline.set_state(Gst.State.NULL)
self.loop.quit()
if __name__ == "__main__":
if not config.check_assets():
logging.error(f"FATAL: One or more assets not found. Run create_assets.py or check paths in config.py."); sys.exit(1)
stream_settings = config.get_stream_settings()
if not stream_settings:
logging.error("FATAL: Could not read stream settings from config file."); sys.exit(1)
service = stream_settings.get('service')
stream_key = stream_settings.get('stream_key')
if not stream_key:
logging.error("FATAL: Stream key is not configured in the web interface."); sys.exit(1)
if service == 'custom':
rtmp_url = stream_settings.get('custom_url')
if not rtmp_url:
logging.error("FATAL: 'Custom' service selected but no custom URL provided."); sys.exit(1)
elif service in config.STREAM_URLS:
rtmp_url = f"{config.STREAM_URLS[service]}/{stream_key}"
else:
logging.error(f"FATAL: Unknown streaming service '{service}' configured."); sys.exit(1)
audio_device_name = config.probe_audio_device()
if audio_device_name is None: sys.exit(1)
mumble_settings = config.get_mumble_settings()
if mumble_settings.get('enabled'):
logging.info("Mumble client is enabled. Starting Mumble thread.")
mumble_client = MumbleClient(mumble_settings, audio_queue, audio_device_name)
mumble_client.start()
# Create and start the data source thread
data_source = TelemetrySimulator(telemetry_lock, telemetry_data)
data_source.start()
# Create and start the dedicated graphics rendering thread
graphics_renderer = OverlayRenderer(telemetry_data, telemetry_lock, graphics_buffers, graphics_lock, active_buffer_idx)
graphics_renderer.start()
# Give the graphics thread a moment to produce the first frame
time.sleep(1.0)
streamer = TelemetryStreamer(audio_device_name, rtmp_url, mumble_settings.get('enabled'))
streamer.run()