Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"name": "weld_defect_classification",
"source": "gstreamer",
"queue_maxsize": 50,
"pipeline": "rtspsrc location=\"rtsp://mediamtx:8554/live.stream\" latency=100 name=source ! rtph264depay ! h264parse ! decodebin ! videoconvert ! video/x-raw,format=BGR ! gvaclassify inference-region=full-frame name=classification ! gvawatermark ! gvametaconvert add-empty-results=true name=metaconvert ! queue ! gvafpscounter ! appsink name=destination",
"pipeline": "rtspsrc add-reference-timestamp-meta=true location=\"rtsp://mediamtx:8554/live.stream\" latency=100 name=source ! rtph264depay ! h264parse ! decodebin ! videoconvert ! video/x-raw,format=BGR ! gvaclassify inference-region=full-frame name=classification ! gvawatermark ! gvametaconvert add-empty-results=true add-rtp-timestamp=true name=metaconvert ! queue ! gvafpscounter ! appsink name=destination",
"parameters": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -45,4 +45,4 @@
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ cd edge-ai-suites/manufacturing-ai-suite/industrial-edge-insights-multimodal
> Grafana’s minimum refresh interval is 5 seconds.
> - The graph and table may initially display "No Data" because the Time Series Analytics Microservice requires some time to
> install its dependency packages before it can start running.
> - Fusion Analytics starts once the RTP sender timestamp is available in the metadata packet from the DL Streamer Pipeline Server.
> - **Known issue:** DL Streamer Pipeline Server may not send RTP sender timestamps for the first ~300 packets.
> This may result in a delay before Fusion Analytics becomes fully operational.

```bash
cd <PATH_TO_REPO>/edge-ai-suites/manufacturing-ai-suite/industrial-edge-insights-multimodal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ This release introduces **S3-based frame storage**, **deployment hardening**, an

## What's New

- **RTP Timestamp Alignment** — Fusion analytics now uses RTP SRC time field to
match frames with the nearest metadata records for improved synchronization.
- **SeaweedFS S3 Integration** — DL Streamer now stores output frames and images
in an S3-compatible SeaweedFS backend, with full Helm chart support.
- **Vision Metadata Persistence** — DL pipeline vision metadata is now persisted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ defect classification model, publishes the frame metadata results over MQTT, sto
| `name` | The name of the pipeline configuration. | `"weld_defect_classification"` |
| `source` | The source type for video ingestion. | `"gstreamer"` |
| `queue_maxsize`| Maximum size of the queue for processing frames. | `50` |
| `pipeline` | GStreamer pipeline string defining the video processing flow from RTSP source through classification to output. | `"rtspsrc location=\"rtsp://mediamtx:8554/live.stream\" latency=100 name=source ! rtph264depay ! h264parse ! decodebin ! videoconvert ! gvaclassify inference-region=full-frame name=classification ! gvametaconvert add-empty-results=true name=metaconvert ! queue ! gvafpscounter ! appsink name=destination"` |
| `pipeline` | GStreamer pipeline string defining the video processing flow from RTSP source through classification to output. | `"rtspsrc add-reference-timestamp-meta=true location=\"rtsp://mediamtx:8554/live.stream\" latency=100 name=source ! rtph264depay ! h264parse ! decodebin ! videoconvert ! video/x-raw,format=BGR ! gvaclassify inference-region=full-frame name=classification ! gvawatermark ! gvametaconvert add-empty-results=true add-rtp-timestamp=true name=metaconvert ! queue ! gvafpscounter ! appsink name=destination"` |
| `parameters` | Configuration parameters for pipeline elements, specifically for the classification element properties. | See below for nested structure |

**Parameters Properties**:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ def find_nearest(buf, ts, type):

# Find the message with minimum timestamp difference
if type == "vision":
# Vision messages have timestamp in metadata.time
nearest_index, nearest_item = min(enumerate(buf), key=lambda x: abs(x[1]["metadata"]["time"] - ts))
diff = abs(nearest_item["metadata"]["time"] - ts)
# Vision messages have timestamp in metadata.rtp.sender_ntp_unix_timestamp_ns
nearest_index, nearest_item = min(enumerate(buf), key=lambda x: abs(x[1]["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"] - ts))
diff = abs(nearest_item["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"] - ts)
elif type == "timeseries":
# Time-series messages have timestamp in time field
nearest_index, nearest_item = min(enumerate(buf), key=lambda x: abs(x[1]["time"] - ts))
Expand Down Expand Up @@ -177,13 +177,17 @@ def on_message(client, userdata, msg):

elif msg.topic == VISION_TOPIC:
# Process vision-based defect detection message
queues["vision"].append(payload)
if "metadata" not in payload or "rtp" not in payload["metadata"] or "sender_ntp_unix_timestamp_ns" not in payload["metadata"]["rtp"]:
logger.warning(f"missing RTP timestamp metadata in vision message. Skipping timestamp-based fusion for frame_id: {payload['metadata'].get('frame_id', 'unknown')}")
time = payload["metadata"]["time"]
else:
time = payload["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"]
queues["vision"].append(payload)

# Debug: uncomment to see incoming messages
# logger.info(f"Received from Vision: {payload}")

# Write vision weld classification results to InfluxDB
time = payload["metadata"]["time"]
json_body = [{
"measurement": "vision-weld-classification-results",
"time": pd.to_datetime(time, unit="ns").isoformat(),
Expand Down Expand Up @@ -245,7 +249,7 @@ def fuse_firstcome(mode: Literal["AND", "OR"] = "AND") -> Optional[Dict[str, Any
front_vision = queues["vision"][0]

# Determine which message came first based on timestamps
if front_ts["time"] <= front_vision["metadata"]["time"]:
if front_ts["time"] <= front_vision["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"]:
# Time-series message is older, process it first
source_queue = "ts"
target_queue = "vision"
Expand All @@ -256,7 +260,7 @@ def fuse_firstcome(mode: Literal["AND", "OR"] = "AND") -> Optional[Dict[str, Any
source_queue = "vision"
target_queue = "ts"
source_entry = queues[source_queue].popleft()
target_index = find_nearest(queues[target_queue], source_entry["metadata"]["time"], "timeseries")
target_index = find_nearest(queues[target_queue], source_entry["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"], "timeseries")

# Check if a matching message was found within tolerance
if target_index is None:
Expand All @@ -282,19 +286,24 @@ def fuse_firstcome(mode: Literal["AND", "OR"] = "AND") -> Optional[Dict[str, Any
vision_classification = "No Label"

data_dict = {}

ts_time = None
vision_rstp_time = None
# Extract anomaly decisions from both messages
if source_queue == "vision":
# Vision message processed first
vision_confidence = source_entry["metadata"]["objects"][0]["classification_layer_name:output1"]["confidence"]
vision_rstp_time = source_entry["metadata"].get("rtp", {}).get("sender_ntp_unix_timestamp_ns")
ts_time = target_entry["time"]
timeseries_anomaly = target_entry["anomaly_status"]
data_dict = source_entry
else:
# Time-series message processed first
vision_confidence = target_entry["metadata"]["objects"][0]["classification_layer_name:output1"]["confidence"]
vision_rstp_time = target_entry["metadata"].get("rtp", {}).get("sender_ntp_unix_timestamp_ns")
ts_time = source_entry["time"]
timeseries_anomaly = source_entry["anomaly_status"]
data_dict = target_entry

if "metadata" in data_dict and "label" in data_dict["metadata"]["objects"][0]["classification_layer_name:output1"]:
vision_classification = str(data_dict["metadata"]["objects"][0]["classification_layer_name:output1"]["label"])

Expand All @@ -311,7 +320,7 @@ def fuse_firstcome(mode: Literal["AND", "OR"] = "AND") -> Optional[Dict[str, Any
else: # mode == "OR"
# Either system detecting anomaly triggers alert
fused_decision = vision_anomaly | timeseries_anomaly
logger.info(f"Vision_Anomaly Type: {vision_classification}, Vision anomaly: {vision_anomaly}, TS anomaly: {timeseries_anomaly} fused decision: {fused_decision}")
logger.info(f"Vision_Anomaly Type: {vision_classification}, Vision anomaly: {vision_anomaly}, TS anomaly: {timeseries_anomaly} fused decision: {fused_decision} time diff between RSTP and ts: {diff_timestamps_ns(vision_rstp_time, ts_time)['ms']:.3f} ms")
return {
"from": source_entry,
"nearest": target_entry,
Expand All @@ -321,7 +330,8 @@ def fuse_firstcome(mode: Literal["AND", "OR"] = "AND") -> Optional[Dict[str, Any
"target_queue": target_queue,
"vision_anomaly": vision_anomaly,
"timeseries_anomaly": timeseries_anomaly,
"vision_classification": vision_classification
"vision_classification": vision_classification,
"src_time_diff_ms": diff_timestamps_ns(vision_rstp_time, ts_time)['ms'] if vision_rstp_time is not None else None
}


Expand Down Expand Up @@ -369,7 +379,7 @@ def main():
# Write fused result to InfluxDB (InfluxDB v1.11.8)

if result["fused_decision"] is not None:
ts = result["from"]["time"] if "time" in result["from"] else result["from"]["metadata"]["time"]
ts = result["from"]["time"] if "time" in result["from"] else result["from"]["metadata"]["rtp"]["sender_ntp_unix_timestamp_ns"]

json_body = [{
"measurement": "fusion_result",
Expand All @@ -384,7 +394,8 @@ def main():
else str(result["from"]["anomaly_status"])
),
"vision_anomaly": int(result["vision_anomaly"]),
"timeseries_anomaly": int(result["timeseries_anomaly"])
"timeseries_anomaly": int(result["timeseries_anomaly"]),
"vision_rtsp_ts_diff_ms": float(result["src_time_diff_ms"]) if result["src_time_diff_ms"] is not None else None
}
}]
influx_client.write_points(json_body)
Expand All @@ -401,4 +412,4 @@ def main():
logger.info("Disconnected from MQTT broker.")

if __name__ == "__main__":
main()
main()
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,16 @@ def check_and_load_simulation_files(target_fps):
return

continuous_ingestion = os.getenv("CONTINUOUS_SIMULATOR_INGESTION", "true").lower() == "true"

first_iteration = True
while True:
for i, filename in enumerate(available_files, 1):
logger.info(f" {i}. {filename}")
stream_video_and_csv(filename, target_fps=target_fps)
if first_iteration:
for i, filename in enumerate(available_files, 1):
logger.info(f" {i}. {filename}")
stream_video_and_csv(filename, target_fps=target_fps)
if not continuous_ingestion:
logger.info("Continuous ingestion disabled. Exiting...")
break
logger.info("Continuous ingestion disabled. Sleeping...")
first_iteration = False
time.sleep(5) # Short delay before exiting to ensure all data is published



Expand Down