Skip to content

Commit 3acc0fc

Browse files
Added NATS publishing for test results
Added NATS publishing functionality to speedtest script.
1 parent 1c0f8f3 commit 3acc0fc

File tree

1 file changed

+54
-2
lines changed

1 file changed

+54
-2
lines changed

speedtest.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
PROTOCOL - 'tcp' or 'udp' (default: tcp)
1616
BANDWIDTH - UDP bandwidth limit (default: 100M)
1717
BIND_INTERFACE - Network interface to bind to (e.g., eth0, wlan0)
18+
NATS_URL - NATS server URL (default: nats://deltax.speedtest:4222)
19+
NATS_TOPIC - NATS topic to publish to (default: speedtest)
1820
1921
Usage:
2022
podman run --env-file options.env -v ./results:/data:z image_name
@@ -26,6 +28,38 @@
2628
import subprocess
2729
import os
2830
from pathlib import Path
31+
import asyncio
32+
from nats.aio.client import Client as NATS
33+
34+
35+
async def publish_to_nats(nats_url, topic, data):
36+
"""Publish JSON data to NATS topic."""
37+
nc = NATS()
38+
39+
try:
40+
await nc.connect(nats_url)
41+
42+
# Convert data to JSON bytes
43+
message = json.dumps(data).encode()
44+
45+
# Publish to topic
46+
await nc.publish(topic, message)
47+
await nc.flush()
48+
49+
print(f"Published to NATS topic '{topic}'")
50+
51+
except Exception as e:
52+
print(f"Error publishing to NATS: {e}")
53+
finally:
54+
await nc.close()
55+
56+
57+
def send_to_nats(nats_url, topic, data):
58+
"""Synchronous wrapper for NATS publishing."""
59+
try:
60+
asyncio.run(publish_to_nats(nats_url, topic, data))
61+
except Exception as e:
62+
print(f"Failed to send to NATS: {e}")
2963

3064

3165
def run_server(port, bind_interface=None):
@@ -45,6 +79,7 @@ def run_server(port, bind_interface=None):
4579
def run_tests(server_ip, port, duration, output_dir, test_count, protocol, bandwidth, bind_interface=None):
4680
"""Run download and upload iperf3 tests."""
4781
results = {}
82+
raw_results = {'download': None, 'upload': None}
4883

4984
# Download test
5085
print("Running download test...")
@@ -56,6 +91,7 @@ def run_tests(server_ip, port, duration, output_dir, test_count, protocol, bandw
5691

5792
result = subprocess.run(cmd, capture_output=True, text=True)
5893
data = json.loads(result.stdout)
94+
raw_results['download'] = data
5995

6096
if protocol == 'udp':
6197
results['download_mbps'] = data['end']['sum']['bits_per_second'] / 1_000_000
@@ -81,6 +117,7 @@ def run_tests(server_ip, port, duration, output_dir, test_count, protocol, bandw
81117

82118
result = subprocess.run(cmd, capture_output=True, text=True)
83119
data = json.loads(result.stdout)
120+
raw_results['upload'] = data
84121

85122
if protocol == 'udp':
86123
results['upload_mbps'] = data['end']['sum']['bits_per_second'] / 1_000_000
@@ -94,7 +131,7 @@ def run_tests(server_ip, port, duration, output_dir, test_count, protocol, bandw
94131
with open(output_dir / f"raw_upload_{test_count:03d}.json", 'w') as f:
95132
f.write(result.stdout)
96133

97-
return results
134+
return results, raw_results
98135

99136

100137
def save_to_csv(results, csv_file, test_number, protocol):
@@ -147,6 +184,8 @@ def main():
147184
protocol = os.environ.get('PROTOCOL', 'tcp').lower()
148185
bandwidth = os.environ.get('BANDWIDTH', '100M')
149186
bind_interface = os.environ.get('BIND_INTERFACE')
187+
nats_url = os.environ.get('NATS_URL', 'nats://deltax.speedtest:4222')
188+
nats_topic = os.environ.get('NATS_TOPIC', 'speedtest')
150189

151190
# Validate mode
152191
if mode not in ['client', 'server']:
@@ -174,14 +213,27 @@ def main():
174213
print(f"UDP bandwidth: {bandwidth}")
175214
if bind_interface:
176215
print(f"Bound to interface: {bind_interface}")
216+
print(f"NATS publishing to: {nats_url} on topic '{nats_topic}'")
177217
print(f"Results: {csv_file}\n")
178218

179219
# Run continuous tests
180220
test_count = 0
181221
while True:
182222
test_count += 1
183-
results = run_tests(server_ip, port, duration, output_dir, test_count, protocol, bandwidth, bind_interface)
223+
results, raw_results = run_tests(server_ip, port, duration, output_dir, test_count, protocol, bandwidth, bind_interface)
184224
save_to_csv(results, csv_file, test_count, protocol)
225+
226+
# Publish to NATS
227+
nats_payload = {
228+
'timestamp': datetime.datetime.now().isoformat(),
229+
'test_number': test_count,
230+
'protocol': protocol,
231+
'results': results,
232+
'raw_download': raw_results['download'],
233+
'raw_upload': raw_results['upload']
234+
}
235+
send_to_nats(nats_url, nats_topic, nats_payload)
236+
185237
time.sleep(interval)
186238

187239

0 commit comments

Comments
 (0)