Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,18 @@ services:
MEDIAMTX_PORT: 8554
TS_TOPIC: "${TS_TOPIC}"
RTSP_STREAM_NAME: live.stream
no_proxy: "ia-mqtt-broker,ia-time-series-analytics-microservice,mediamtx,${no_proxy}"
NO_PROXY: "ia-mqtt-broker,ia-time-series-analytics-microservice,mediamtx,${no_proxy}"
no_proxy: "ia-mqtt-broker,ia-time-series-analytics-microservice,mediamtx,${no_proxy},dlstreamer-pipeline-server"
NO_PROXY: "ia-mqtt-broker,ia-time-series-analytics-microservice,mediamtx,${no_proxy},dlstreamer-pipeline-server"
TS_MS_SERVER: "ia-time-series-analytics-microservice"
TS_MS_PORT: ${KAPACITOR_PORT}
DLSPS_SERVER_HOST: dlstreamer-pipeline-server
DLSPS_SERVER_PORT: 8080
CONTINUOUS_SIMULATOR_INGESTION: ${CONTINUOUS_SIMULATOR_INGESTION}
SIMULATION_TARGET_FPS: ${SIMULATION_TARGET_FPS}
LOG_LEVEL: ${LOG_LEVEL}
volumes:
- ./weld-data-simulator/simulation-data:/simulation-data
- ./weld-data-simulator/publisher.py:/publisher.py:ro
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this, you may have added primarily for debug purpose this.

networks:
- timeseries_network

Expand Down Expand Up @@ -316,6 +319,8 @@ services:
depends_on:
ia-weld-data-simulator:
condition: service_started
ia-time-series-analytics-microservice:
condition: service_healthy
seaweedfs-s3:
condition: service_started
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
# SPDX-License-Identifier: Apache-2.0
#

import socket

import cv2
import pandas as pd
import paho.mqtt.client as mqtt
import time
import base64
import subprocess
import requests
import json
import os
import glob
Expand Down Expand Up @@ -269,17 +272,85 @@ def check_and_load_simulation_files(target_fps):
return

continuous_ingestion = os.getenv("CONTINUOUS_SIMULATOR_INGESTION", "true").lower() == "true"

streamed_once = False
while True:
for i, filename in enumerate(available_files, 1):
logger.info(f" {i}. {filename}")
stream_video_and_csv(filename, target_fps=target_fps)
if not streamed_once:
for i, filename in enumerate(available_files, 1):
logger.info(f" {i}. {filename}")
stream_video_and_csv(filename, target_fps=target_fps)
if not continuous_ingestion:
streamed_once = True
if not continuous_ingestion:
logger.info("Continuous ingestion disabled. Exiting...")
break
logger.info("Streaming completed once as CONTINUOUS_SIMULATOR_INGESTION is set to false. Sleeping indefinitely.")
time.sleep(60) # Sleep indefinitely after one complete streaming

def is_port_accessible(Host: str, Port: int):
"""
Check if a port is accessible on a given host.

:param host: The hostname or IP address to check.
:param port: The port number to check.
:param timeout: The timeout in seconds for the connection attempt.
:return: True if the port is accessible, False otherwise.
"""

logger.info("Waiting for %s accessible...", Host)
while True:
try:
# Create a socket object
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Set the timeout for the connection attempt
sock.settimeout(5)
# Attempt to connect to the host and port
sock.connect((Host, Port))
logger.info("%s is accessible...", Host)
return True
except (socket.timeout, socket.error):
pass
time.sleep(1)

def start_dlsps_pipeline_server(Host: str, Port: int):
"""
Start the DLStreamPipelineServer using a subprocess call to the dlstreamer-pipeline-server binary.
"""
logger.info("Starting DLStreamPipelineServer...")
try:
# Create the pipeline via HTTP request
payload = {
"source": {
"uri": "rtsp://mediamtx:8554/live.stream",
"type": "uri",
},
"destination": {
"metadata": {
"type": "mqtt",
"topic": "vision_weld_defect_classification",
},
"frame": [
{
"type": "webrtc",
"peer-id": "samplestream",
}
],
},
"parameters": {
"classification-properties": {
"model": "/home/pipeline-server/resources/models/weld-defect-classification-f16-DeiT/deployment/Classification/model/model.xml",
"device": "CPU",
}
},
}
response = requests.post(
f"http://{Host}:{Port}/pipelines/user_defined_pipelines/weld_defect_classification",
json=payload,
timeout=10,
)
response.raise_for_status()
logger.info("DLStreamPipelineServer started successfully.")
return response
except Exception as e:
logger.error("Failed to start DLStreamPipelineServer: %s", e)
return None

if __name__ == "__main__":
# Uncomment the line below to see available files
Expand All @@ -293,6 +364,19 @@ def check_and_load_simulation_files(target_fps):
# Default behavior - process all available files
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.connect(MQTT_BROKER)

ts_host = os.getenv("TS_MS_SERVER", "ia-time-series-analytics-microservice")
ts_port = int(os.getenv("TS_MS_PORT", "9092"))

dlsps_host = os.getenv("DLSPS_MS_SERVER", "dlstreamer-pipeline-server")
dlsps_port = int(os.getenv("DLSPS_MS_PORT", "8080"))

is_port_accessible(ts_host, ts_port)
time.sleep(10)
# is_port_accessible(dlsps_host, dlsps_port)



target_fps = int(os.getenv("SIMULATION_TARGET_FPS", "10"))
ffmpeg_cmd = [
"ffmpeg",
Expand All @@ -310,6 +394,12 @@ def check_and_load_simulation_files(target_fps):
]

ffmpeg_proc = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE)

# dlsps_proc = start_dlsps_pipeline_server(dlsps_host, dlsps_port)

# if dlsps_proc is None:
# logger.error("DLStreamPipelineServer failed to start. Exiting.")
# exit(1)
Comment on lines +398 to +402
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this commented code and the function def, also line 376?

check_and_load_simulation_files(target_fps)

if 'ffmpeg_proc' in locals():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
opencv-python==4.12.0.88
paho-mqtt==2.1.0
pandas==2.3.2
pandas==2.3.2
requests==2.32.4
Loading