-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpublish_simdevice_data_to_quix.py
More file actions
135 lines (107 loc) · 4.36 KB
/
publish_simdevice_data_to_quix.py
File metadata and controls
135 lines (107 loc) · 4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import signal
import time
from pathlib import Path
from typing import Dict
import opendaq
import quixstreams as qx
from dotenv import load_dotenv
# Load environment variables from .env file
env_file = Path(__file__).parent / ".env"
load_dotenv(env_file)
# --- Configuration ---
quix_sdk_token = os.environ.get("QUIX_SDK_TOKEN")
if not quix_sdk_token:
raise RuntimeError("QUIX_SDK_TOKEN environment variable must be set.")
# --- Graceful Shutdown Handler ---
running = True
def shutdown_handler(signum, frame):
global running
print("Shutdown signal received, stopping...")
running = False
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
def main():
# 1. CONNECT TO OPENDAQ DEVICE
# =============================
print("Searching for openDAQ device simulator...")
daq_instance = opendaq.Instance()
device = None
for device_info in daq_instance.available_devices:
if device_info.name == "Reference device simulator":
device = daq_instance.add_device(device_info.connection_string)
print(f"Connected to '{device.info.name}'")
break
if not device:
print("Device simulator not found. Please ensure it is running and accessible.")
return
# 2. PREPARE FOR STREAMING
# ========================
search_filter = opendaq.RecursiveSearchFilter(opendaq.AnySearchFilter())
signals = device.get_signals(search_filter)
if not signals:
print("No signals found on the device.")
return
readers: Dict[str, opendaq.StreamReader] = {}
domain_info: Dict[str, dict] = {}
print("\nFound signals to stream:")
for sig in signals:
if sig.domain_signal:
global_id = sig.global_id
print(f"- {global_id}")
readers[global_id] = opendaq.StreamReader(sig)
descriptor = sig.domain_signal.descriptor
resolution = descriptor.tick_resolution
domain_info[global_id] = {
"origin": descriptor.origin,
"resolution_num": resolution.numerator,
"resolution_den": resolution.denominator,
}
# 3. SETUP QUIX STREAMS PRODUCER
# ==============================
app = qx.Application(quix_sdk_token=quix_sdk_token)
output_topic = app.topic("opendaq-telemetry", value_serializer="json")
with app.get_producer() as producer:
print(
f"\nStreaming data to Quix topic '{output_topic.name}'. Press Ctrl+C to stop."
)
# 4. START THE STREAMING LOOP
# ===========================
while running:
has_data = False
for signal_id, reader in readers.items():
values, domain_values = reader.read_with_domain(100)
count = len(values)
if count > 0:
has_data = True
info = domain_info[signal_id]
print(f"Read {count} samples from {signal_id}")
for i in range(count):
# --- THIS SECTION IS CORRECTED ---
# Convert NumPy types to standard Python types before serialization.
payload = {
"timestamp_ticks": int(
domain_values[i]
), # Cast numpy.int64 to Python int
"value": float(
values[i]
), # Cast numpy.float64 to Python float
"signal_id": signal_id,
"origin": info["origin"],
"tick_resolution_num": info["resolution_num"],
"tick_resolution_den": info["resolution_den"],
}
# --- END OF CORRECTION ---
kafka_message = output_topic.serialize(
key=signal_id, value=payload
)
producer.produce(
topic=output_topic.name,
key=kafka_message.key,
value=kafka_message.value,
)
if not has_data:
time.sleep(0.05)
print("\nStreaming stopped. Producer closed.")
if __name__ == "__main__":
main()