diff --git a/manufacturing-ai-suite/industrial-edge-insights-multimodal/docker-compose.yml b/manufacturing-ai-suite/industrial-edge-insights-multimodal/docker-compose.yml index 3151af772c..6ad93f50f1 100644 --- a/manufacturing-ai-suite/industrial-edge-insights-multimodal/docker-compose.yml +++ b/manufacturing-ai-suite/industrial-edge-insights-multimodal/docker-compose.yml @@ -219,15 +219,18 @@ services: MEDIAMTX_PORT: 8554 TS_TOPIC: "${TS_TOPIC}" RTSP_STREAM_NAME: live.stream - no_proxy: "ia-mqtt-broker,ia-time-series-analytics-microservice,mediamtx,${no_proxy}" - NO_PROXY: "ia-mqtt-broker,ia-time-series-analytics-microservice,mediamtx,${no_proxy}" + no_proxy: "ia-mqtt-broker,ia-time-series-analytics-microservice,mediamtx,${no_proxy},dlstreamer-pipeline-server" + NO_PROXY: "ia-mqtt-broker,ia-time-series-analytics-microservice,mediamtx,${no_proxy},dlstreamer-pipeline-server" TS_MS_SERVER: "ia-time-series-analytics-microservice" TS_MS_PORT: ${KAPACITOR_PORT} + DLSPS_SERVER_HOST: dlstreamer-pipeline-server + DLSPS_SERVER_PORT: 8080 CONTINUOUS_SIMULATOR_INGESTION: ${CONTINUOUS_SIMULATOR_INGESTION} SIMULATION_TARGET_FPS: ${SIMULATION_TARGET_FPS} LOG_LEVEL: ${LOG_LEVEL} volumes: - ./weld-data-simulator/simulation-data:/simulation-data + - ./weld-data-simulator/publisher.py:/publisher.py:ro networks: - timeseries_network @@ -316,6 +319,8 @@ services: depends_on: ia-weld-data-simulator: condition: service_started + ia-time-series-analytics-microservice: + condition: service_healthy seaweedfs-s3: condition: service_started environment: diff --git a/manufacturing-ai-suite/industrial-edge-insights-multimodal/weld-data-simulator/publisher.py b/manufacturing-ai-suite/industrial-edge-insights-multimodal/weld-data-simulator/publisher.py index a8c8a9ab77..0c07205fce 100644 --- a/manufacturing-ai-suite/industrial-edge-insights-multimodal/weld-data-simulator/publisher.py +++ b/manufacturing-ai-suite/industrial-edge-insights-multimodal/weld-data-simulator/publisher.py @@ -4,12 +4,15 @@ # SPDX-License-Identifier: Apache-2.0 # +import socket + import cv2 import pandas as pd import paho.mqtt.client as mqtt import time import base64 import subprocess +import requests import json import os import glob @@ -269,17 +272,85 @@ def check_and_load_simulation_files(target_fps): return continuous_ingestion = os.getenv("CONTINUOUS_SIMULATOR_INGESTION", "true").lower() == "true" - + streamed_once = False while True: - for i, filename in enumerate(available_files, 1): - logger.info(f" {i}. {filename}") - stream_video_and_csv(filename, target_fps=target_fps) + if not streamed_once: + for i, filename in enumerate(available_files, 1): + logger.info(f" {i}. {filename}") + stream_video_and_csv(filename, target_fps=target_fps) + if not continuous_ingestion: + streamed_once = True if not continuous_ingestion: - logger.info("Continuous ingestion disabled. Exiting...") - break + logger.info("Streaming completed once as CONTINUOUS_SIMULATOR_INGESTION is set to false. Sleeping indefinitely.") + time.sleep(60) # Sleep indefinitely after one complete streaming +def is_port_accessible(Host: str, Port: int): + """ + Check if a port is accessible on a given host. + + :param host: The hostname or IP address to check. + :param port: The port number to check. + :param timeout: The timeout in seconds for the connection attempt. + :return: True if the port is accessible, False otherwise. + """ + logger.info("Waiting for %s accessible...", Host) + while True: + try: + # Create a socket object + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + # Set the timeout for the connection attempt + sock.settimeout(5) + # Attempt to connect to the host and port + sock.connect((Host, Port)) + logger.info("%s is accessible...", Host) + return True + except (socket.timeout, socket.error): + pass + time.sleep(1) +def start_dlsps_pipeline_server(Host: str, Port: int): + """ + Start the DLStreamPipelineServer using a subprocess call to the dlstreamer-pipeline-server binary. + """ + logger.info("Starting DLStreamPipelineServer...") + try: + # Create the pipeline via HTTP request + payload = { + "source": { + "uri": "rtsp://mediamtx:8554/live.stream", + "type": "uri", + }, + "destination": { + "metadata": { + "type": "mqtt", + "topic": "vision_weld_defect_classification", + }, + "frame": [ + { + "type": "webrtc", + "peer-id": "samplestream", + } + ], + }, + "parameters": { + "classification-properties": { + "model": "/home/pipeline-server/resources/models/weld-defect-classification-f16-DeiT/deployment/Classification/model/model.xml", + "device": "CPU", + } + }, + } + response = requests.post( + f"http://{Host}:{Port}/pipelines/user_defined_pipelines/weld_defect_classification", + json=payload, + timeout=10, + ) + response.raise_for_status() + logger.info("DLStreamPipelineServer started successfully.") + return response + except Exception as e: + logger.error("Failed to start DLStreamPipelineServer: %s", e) + return None if __name__ == "__main__": # Uncomment the line below to see available files @@ -293,6 +364,19 @@ def check_and_load_simulation_files(target_fps): # Default behavior - process all available files client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) client.connect(MQTT_BROKER) + + ts_host = os.getenv("TS_MS_SERVER", "ia-time-series-analytics-microservice") + ts_port = int(os.getenv("TS_MS_PORT", "9092")) + + dlsps_host = os.getenv("DLSPS_MS_SERVER", "dlstreamer-pipeline-server") + dlsps_port = int(os.getenv("DLSPS_MS_PORT", "8080")) + + is_port_accessible(ts_host, ts_port) + time.sleep(10) + # is_port_accessible(dlsps_host, dlsps_port) + + + target_fps = int(os.getenv("SIMULATION_TARGET_FPS", "10")) ffmpeg_cmd = [ "ffmpeg", @@ -310,6 +394,12 @@ def check_and_load_simulation_files(target_fps): ] ffmpeg_proc = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE) + + # dlsps_proc = start_dlsps_pipeline_server(dlsps_host, dlsps_port) + + # if dlsps_proc is None: + # logger.error("DLStreamPipelineServer failed to start. Exiting.") + # exit(1) check_and_load_simulation_files(target_fps) if 'ffmpeg_proc' in locals(): diff --git a/manufacturing-ai-suite/industrial-edge-insights-multimodal/weld-data-simulator/requirements.txt b/manufacturing-ai-suite/industrial-edge-insights-multimodal/weld-data-simulator/requirements.txt index 8af3fcfc2c..b5c7596cfd 100644 --- a/manufacturing-ai-suite/industrial-edge-insights-multimodal/weld-data-simulator/requirements.txt +++ b/manufacturing-ai-suite/industrial-edge-insights-multimodal/weld-data-simulator/requirements.txt @@ -1,3 +1,4 @@ opencv-python==4.12.0.88 paho-mqtt==2.1.0 -pandas==2.3.2 \ No newline at end of file +pandas==2.3.2 +requests==2.32.4 \ No newline at end of file