Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"name": "weld_defect_classification",
"source": "gstreamer",
"queue_maxsize": 50,
"pipeline": "rtspsrc location=\"rtsp://mediamtx:8554/live.stream\" latency=100 name=source ! rtph264depay ! h264parse ! decodebin ! videoconvert ! video/x-raw,format=BGR ! gvaclassify inference-region=full-frame name=classification ! gvawatermark ! gvametaconvert add-empty-results=true name=metaconvert ! queue ! gvafpscounter ! appsink name=destination",
"pipeline": "rtspsrc add-reference-timestamp-meta=true location=\"rtsp://mediamtx:8554/live.stream\" latency=100 name=source ! rtph264depay ! h264parse ! decodebin ! videoconvert ! video/x-raw,format=BGR ! gvaclassify inference-region=full-frame name=classification ! gvawatermark ! gvametaconvert add-empty-results=true add-rtp-timestamp=true name=metaconvert ! queue ! gvafpscounter ! appsink name=destination",
"parameters": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -45,4 +45,4 @@
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ def find_nearest(buf, ts, type):
# Find the message with minimum timestamp difference
if type == "vision":
# Vision messages have timestamp in metadata.time
nearest_index, nearest_item = min(enumerate(buf), key=lambda x: abs(x[1]["metadata"]["time"] - ts))
diff = abs(nearest_item["metadata"]["time"] - ts)
nearest_index, nearest_item = min(enumerate(buf), key=lambda x: abs(x[1]["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"] - ts))
diff = abs(nearest_item["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"] - ts)
elif type == "timeseries":
# Time-series messages have timestamp in time field
nearest_index, nearest_item = min(enumerate(buf), key=lambda x: abs(x[1]["time"] - ts))
Expand Down Expand Up @@ -177,13 +177,18 @@ def on_message(client, userdata, msg):

elif msg.topic == VISION_TOPIC:
# Process vision-based defect detection message
system_time = payload["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"]
time = payload["metadata"]["time"]

logger.debug(f"Received vision message with system timestamp: ({system_time} ns epoch and metadata timestamp: {time} ({time} ns epoch \
time diff: {diff_timestamps_ns(time, system_time)['ms']:.3f} ms)")

queues["vision"].append(payload)

# Debug: uncomment to see incoming messages
# logger.info(f"Received from Vision: {payload}")

# Write vision weld classification results to InfluxDB
time = payload["metadata"]["time"]
json_body = [{
"measurement": "vision-weld-classification-results",
"time": pd.to_datetime(time, unit="ns").isoformat(),
Expand Down Expand Up @@ -245,7 +250,7 @@ def fuse_firstcome(mode: Literal["AND", "OR"] = "AND") -> Optional[Dict[str, Any
front_vision = queues["vision"][0]

# Determine which message came first based on timestamps
if front_ts["time"] <= front_vision["metadata"]["time"]:
if front_ts["time"] <= front_vision["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"]:
# Time-series message is older, process it first
source_queue = "ts"
target_queue = "vision"
Expand All @@ -256,7 +261,7 @@ def fuse_firstcome(mode: Literal["AND", "OR"] = "AND") -> Optional[Dict[str, Any
source_queue = "vision"
target_queue = "ts"
source_entry = queues[source_queue].popleft()
target_index = find_nearest(queues[target_queue], source_entry["metadata"]["time"], "timeseries")
target_index = find_nearest(queues[target_queue], source_entry["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"], "timeseries")

# Check if a matching message was found within tolerance
if target_index is None:
Expand All @@ -282,19 +287,31 @@ def fuse_firstcome(mode: Literal["AND", "OR"] = "AND") -> Optional[Dict[str, Any
vision_classification = "No Label"

data_dict = {}

vision_time = None
ts_time = None
vision_rstp_time = None
# Extract anomaly decisions from both messages
if source_queue == "vision":
# Vision message processed first
vision_confidence = source_entry["metadata"]["objects"][0]["classification_layer_name:output1"]["confidence"]
vision_time = source_entry["metadata"]["time"]
vision_rstp_time = source_entry["metadata"].get("rtp", {}).get("sender_ntp_unix_timestamp_ns")
ts_time = target_entry["time"]
timeseries_anomaly = target_entry["anomaly_status"]
data_dict = source_entry
else:
# Time-series message processed first
vision_confidence = target_entry["metadata"]["objects"][0]["classification_layer_name:output1"]["confidence"]
vision_time = target_entry["metadata"]["time"]
vision_rstp_time = target_entry["metadata"].get("rtp", {}).get("sender_ntp_unix_timestamp_ns")
ts_time = source_entry["time"]
timeseries_anomaly = source_entry["anomaly_status"]
data_dict = target_entry




logger.info(f"Vision timestamp: {vision_time} ns epoch, RSTP timestamp: {vision_rstp_time} ns epoch, Time-series timestamp: {ts_time} ns epoch \
time diff between vision and ts: {diff_timestamps_ns(vision_time, ts_time)['ms']:.3f} ms, \time diff between vision RSTP and ts: {diff_timestamps_ns(vision_rstp_time, ts_time)['ms']:.3f} ms")
if "metadata" in data_dict and "label" in data_dict["metadata"]["objects"][0]["classification_layer_name:output1"]:
vision_classification = str(data_dict["metadata"]["objects"][0]["classification_layer_name:output1"]["label"])

Expand All @@ -321,7 +338,9 @@ def fuse_firstcome(mode: Literal["AND", "OR"] = "AND") -> Optional[Dict[str, Any
"target_queue": target_queue,
"vision_anomaly": vision_anomaly,
"timeseries_anomaly": timeseries_anomaly,
"vision_classification": vision_classification
"vision_classification": vision_classification,
"time_diff_ms": diff_timestamps_ns(vision_time, ts_time)['ms'],
"rtsp_time_diff_ms": diff_timestamps_ns(vision_rstp_time, ts_time)['ms'] if vision_rstp_time is not None else None
}


Expand Down Expand Up @@ -369,7 +388,7 @@ def main():
# Write fused result to InfluxDB (InfluxDB v1.11.8)

if result["fused_decision"] is not None:
ts = result["from"]["time"] if "time" in result["from"] else result["from"]["metadata"]["time"]
ts = result["from"]["time"] if "time" in result["from"] else result["from"]["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"]

json_body = [{
"measurement": "fusion_result",
Expand All @@ -384,7 +403,9 @@ def main():
else str(result["from"]["anomaly_status"])
),
"vision_anomaly": int(result["vision_anomaly"]),
"timeseries_anomaly": int(result["timeseries_anomaly"])
"timeseries_anomaly": int(result["timeseries_anomaly"]),
"vision_ts_diff_ms": float(result["time_diff_ms"]) if result["time_diff_ms"] is not None else None,
"vision_rtsp_ts_diff_ms": float(result["rtsp_time_diff_ms"]) if result["rtsp_time_diff_ms"] is not None else None
}
}]
influx_client.write_points(json_body)
Expand All @@ -401,4 +422,4 @@ def main():
logger.info("Disconnected from MQTT broker.")

if __name__ == "__main__":
main()
main()
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,16 @@ def check_and_load_simulation_files(target_fps):
return

continuous_ingestion = os.getenv("CONTINUOUS_SIMULATOR_INGESTION", "true").lower() == "true"

first_iteration = True
while True:
for i, filename in enumerate(available_files, 1):
logger.info(f" {i}. {filename}")
stream_video_and_csv(filename, target_fps=target_fps)
if first_iteration:
for i, filename in enumerate(available_files, 1):
logger.info(f" {i}. {filename}")
stream_video_and_csv(filename, target_fps=target_fps)
if not continuous_ingestion:
logger.info("Continuous ingestion disabled. Exiting...")
break
logger.info("Continuous ingestion disabled. Sleeping...")
first_iteration = False
time.sleep(5) # Short delay before exiting to ensure all data is published



Expand Down