Skip to content

Commit a8acd38

Browse files
committed
[vlm_alerts.py]: refine alert logic and improve processing flow
1 parent 0941c5c commit a8acd38

File tree

1 file changed

+158
-98
lines changed

1 file changed

+158
-98
lines changed

samples/gstreamer/python/vlm_alerts/vlm_alerts.py

Lines changed: 158 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,6 @@
66
#!/usr/bin/env python3
77
"""
88
Run a DLStreamer VLM pipeline on a video and export JSON and MP4 results.
9-
10-
The script can:
11-
1. Download or reuse a local video.
12-
2. Export or reuse an OpenVINO model.
13-
3. Build a GStreamer pipeline string.
14-
4. Execute the pipeline and store results.
159
"""
1610

1711
import argparse
@@ -22,66 +16,108 @@
2216
import urllib.request
2317
from dataclasses import dataclass
2418
from pathlib import Path
25-
from typing import Tuple
19+
from typing import Tuple, Optional
2620

2721
import gi
22+
from gi.repository import Gst, GLib, GstPbutils # pylint: disable=no-name-in-module
2823
gi.require_version("Gst", "1.0")
29-
from gi.repository import Gst, GLib # pylint: disable=no-name-in-module, wrong-import-position
24+
gi.require_version("GstPbutils", "1.0")
3025

3126
BASE_DIR = Path(__file__).resolve().parent
32-
VIDEOS_DIR = BASE_DIR / "videos"
33-
MODELS_DIR = BASE_DIR / "models"
34-
RESULTS_DIR = BASE_DIR / "results"
27+
28+
class VLMAlertsError(Exception):
29+
"""Domain-specific exception for VLM Alerts failures."""
30+
3531

3632
@dataclass
3733
class PipelineConfig:
38-
"""Configuration required to build and run the pipeline."""
39-
4034
video: Path
4135
model: Path
42-
question: str
36+
prompt: str
4337
device: str
4438
max_tokens: int
4539
frame_rate: float
40+
results_dir: Path
4641

4742

48-
def ensure_video(path_or_url: str) -> Path:
43+
def download_video(url: str, target_path: Path) -> None:
4944
"""Return a local video path, downloading it if needed."""
50-
candidate = Path(path_or_url)
51-
if candidate.is_file():
52-
return candidate.resolve()
53-
54-
VIDEOS_DIR.mkdir(exist_ok=True)
55-
filename = path_or_url.rstrip("/").split("/")[-1]
56-
local_path = VIDEOS_DIR / filename
57-
58-
if local_path.exists():
59-
print(f"[video] using cached {local_path}")
60-
return local_path.resolve()
61-
62-
print(f"[video] downloading {path_or_url}")
63-
request = urllib.request.Request(
64-
path_or_url,
65-
headers={"User-Agent": "Mozilla/5.0"},
66-
)
45+
request = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
46+
try:
47+
with urllib.request.urlopen(request, timeout=30) as response:
48+
if hasattr(response, "status") and response.status != 200:
49+
raise VLMAlertsError(f"Video download failed: HTTP {response.status}")
50+
data = response.read()
51+
if not data:
52+
raise VLMAlertsError("Video download failed: empty response")
53+
with open(target_path, "wb") as file:
54+
file.write(data)
55+
except Exception as error:
56+
raise VLMAlertsError(f"Video download failed: {error}") from error
57+
58+
59+
def validate_video(video_path: Path) -> None:
60+
if not video_path.exists() or video_path.stat().st_size == 0:
61+
raise VLMAlertsError("Video file is missing or empty")
62+
63+
Gst.init(None)
64+
try:
65+
discoverer = GstPbutils.Discoverer.new(5 * Gst.SECOND)
66+
info = discoverer.discover_uri(video_path.as_uri())
67+
except GLib.Error as error:
68+
raise VLMAlertsError(f"GStreamer discovery failed: {error}") from error
69+
70+
if info.get_result() != GstPbutils.DiscovererResult.OK:
71+
raise VLMAlertsError(f"Unsupported media: {info.get_result()}")
72+
73+
if not info.get_stream_list():
74+
raise VLMAlertsError("No valid streams found in media file")
75+
76+
77+
def resolve_video(
78+
video_path: Optional[str],
79+
video_url: Optional[str],
80+
videos_dir: Path,
81+
) -> Path:
82+
if video_path:
83+
path = Path(video_path).resolve()
84+
if not path.exists():
85+
raise VLMAlertsError("Provided --video-path does not exist")
86+
validate_video(path)
87+
return path
6788

68-
with urllib.request.urlopen(request) as response, open(local_path, "wb") as file:
69-
file.write(response.read())
89+
videos_dir.mkdir(parents=True, exist_ok=True)
90+
filename = video_url.rstrip("/").split("/")[-1]
91+
local_path = videos_dir / filename
7092

93+
if not local_path.exists():
94+
print(f"[video] downloading {video_url}")
95+
download_video(video_url, local_path)
96+
97+
validate_video(local_path)
7198
return local_path.resolve()
7299

73100

74-
def ensure_model(model_id: str) -> Path:
101+
def resolve_model(
102+
model_id: Optional[str],
103+
model_path: Optional[str],
104+
models_dir: Path,
105+
) -> Path:
75106
"""Return a local OpenVINO model directory, exporting it if needed."""
107+
if model_path:
108+
path = Path(model_path).resolve()
109+
if not path.exists():
110+
raise VLMAlertsError("Provided --model-path does not exist")
111+
return path
112+
113+
models_dir.mkdir(parents=True, exist_ok=True)
76114
model_name = model_id.split("/")[-1]
77-
output_dir = MODELS_DIR / model_name
115+
output_dir = models_dir / model_name
78116

79117
if output_dir.exists() and any(output_dir.glob("*.xml")):
80118
print(f"[model] using cached {output_dir}")
81119
return output_dir.resolve()
82120

83-
MODELS_DIR.mkdir(exist_ok=True)
84-
85121
command = [
86122
"optimum-cli",
87123
"export",
@@ -94,26 +130,31 @@ def ensure_model(model_id: str) -> Path:
94130
str(output_dir),
95131
]
96132

97-
print("[model] exporting:", " ".join(command))
98-
subprocess.run(command, check=True)
133+
try:
134+
subprocess.run(command, check=True)
135+
except subprocess.CalledProcessError as error:
136+
raise VLMAlertsError(
137+
f"OpenVINO export failed with return code {error.returncode}"
138+
) from error
99139

100140
if not any(output_dir.glob("*.xml")):
101-
raise RuntimeError("OpenVINO export failed, no XML files found")
141+
raise VLMAlertsError("OpenVINO export failed: no XML files found")
102142

103143
return output_dir.resolve()
104144

105145

106146
def build_pipeline_string(cfg: PipelineConfig) -> Tuple[str, Path, Path, Path]:
107147
"""Construct the GStreamer pipeline string and related output paths."""
108-
RESULTS_DIR.mkdir(exist_ok=True)
148+
cfg.results_dir.mkdir(parents=True, exist_ok=True)
109149

110-
output_json = RESULTS_DIR / f"{cfg.model.name}-{cfg.video.stem}.jsonl"
111-
output_video = RESULTS_DIR / f"{cfg.model.name}-{cfg.video.stem}.mp4"
150+
output_json = cfg.results_dir / f"{cfg.model.name}-{cfg.video.stem}.jsonl"
151+
output_video = cfg.results_dir / f"{cfg.model.name}-{cfg.video.stem}.mp4"
112152

113153
fd, prompt_path_str = tempfile.mkstemp(suffix=".txt")
114154
prompt_path = Path(prompt_path_str)
155+
115156
with os.fdopen(fd, "w") as file:
116-
file.write(cfg.question)
157+
file.write(cfg.prompt)
117158

118159
generation_cfg = f"max_new_tokens={cfg.max_tokens}"
119160

@@ -147,8 +188,15 @@ def build_pipeline_string(cfg: PipelineConfig) -> Tuple[str, Path, Path, Path]:
147188
return pipeline_str, output_json, output_video, prompt_path
148189

149190

150-
def run_pipeline_string(pipeline_str: str) -> int:
191+
192+
def run_pipeline(cfg: PipelineConfig) -> int:
151193
"""Execute a GStreamer pipeline string and block until completion."""
194+
pipeline_str, output_json, output_video, prompt_path = build_pipeline_string(cfg)
195+
196+
print("\nPipeline:\n")
197+
print(pipeline_str)
198+
print()
199+
152200
Gst.init(None)
153201

154202
try:
@@ -160,78 +208,90 @@ def run_pipeline_string(pipeline_str: str) -> int:
160208
bus = pipeline.get_bus()
161209
pipeline.set_state(Gst.State.PLAYING)
162210

163-
while True:
164-
message = bus.timed_pop_filtered(
165-
Gst.CLOCK_TIME_NONE,
166-
Gst.MessageType.ERROR | Gst.MessageType.EOS,
167-
)
168-
169-
if message.type == Gst.MessageType.ERROR:
170-
err, debug = message.parse_error()
171-
print("ERROR:", err.message)
172-
if debug:
173-
print("DEBUG:", debug)
174-
pipeline.set_state(Gst.State.NULL)
175-
return 1
176-
177-
if message.type == Gst.MessageType.EOS:
178-
pipeline.set_state(Gst.State.NULL)
179-
return 0
180-
181-
182-
def run_pipeline(cfg: PipelineConfig) -> int:
183-
"""Build and execute the pipeline from configuration."""
184-
pipeline_str, output_json, output_video, prompt_path = build_pipeline_string(cfg)
185-
186-
print("\nPipeline:\n")
187-
print(pipeline_str)
188-
print()
189-
190211
try:
191-
result = run_pipeline_string(pipeline_str)
212+
while True:
213+
message = bus.timed_pop_filtered(
214+
Gst.CLOCK_TIME_NONE,
215+
Gst.MessageType.ERROR | Gst.MessageType.EOS,
216+
)
217+
218+
if message.type == Gst.MessageType.ERROR:
219+
err, debug = message.parse_error()
220+
print("ERROR:", err.message)
221+
if debug:
222+
print("DEBUG:", debug)
223+
return 1
224+
225+
if message.type == Gst.MessageType.EOS:
226+
break
192227
finally:
228+
pipeline.set_state(Gst.State.NULL)
193229
if prompt_path.exists():
194230
prompt_path.unlink()
195231

196-
if result == 0:
197-
print(f"\nJSON output: {output_json}")
198-
print(f"Video output: {output_video}")
232+
print(f"\nJSON output: {output_json}")
233+
print(f"Video output: {output_video}")
199234

200-
return result
235+
return 0
201236

202237

203238
def parse_args() -> argparse.Namespace:
204-
"""Parse command line arguments."""
205239
parser = argparse.ArgumentParser(
206240
description="DLStreamer VLM Alerts sample"
207241
)
208-
parser.add_argument("video")
209-
parser.add_argument("model")
210-
parser.add_argument("question")
242+
243+
parser.add_argument("--video-path", help="Path to local video file")
244+
parser.add_argument("--video-url", help="URL to download video from")
245+
246+
parser.add_argument("--model-id", help="HuggingFace model id")
247+
parser.add_argument("--model-path", help="Path to exported OpenVINO model")
248+
249+
parser.add_argument("--prompt", required=True, help="Text prompt for VLM")
250+
211251
parser.add_argument("--device", default="GPU")
212252
parser.add_argument("--max-tokens", type=int, default=20)
213253
parser.add_argument("--frame-rate", type=float, default=1.0)
214254

215-
return parser.parse_args()
255+
parser.add_argument("--videos-dir", type=Path, default=BASE_DIR / "videos")
256+
parser.add_argument("--models-dir", type=Path, default=BASE_DIR / "models")
257+
parser.add_argument("--results-dir", type=Path, default=BASE_DIR / "results")
258+
259+
args = parser.parse_args()
260+
261+
if not (args.video_path or args.video_url):
262+
parser.error("Either --video-path or --video-url must be provided")
263+
264+
if not (args.model_id or args.model_path):
265+
parser.error("Either --model-id or --model-path must be provided")
266+
267+
return args
216268

217269

218270
def main() -> int:
219-
"""Entry point."""
220-
args = parse_args()
221-
222-
video_path = ensure_video(args.video)
223-
model_path = ensure_model(args.model)
224-
225-
config = PipelineConfig(
226-
video=video_path,
227-
model=model_path,
228-
question=args.question,
229-
device=args.device,
230-
max_tokens=args.max_tokens,
231-
frame_rate=args.frame_rate,
232-
)
271+
try:
272+
args = parse_args()
273+
274+
video = resolve_video(args.video_path, args.video_url, args.videos_dir)
275+
model = resolve_model(args.model_id, args.model_path, args.models_dir)
276+
277+
config = PipelineConfig(
278+
video=video,
279+
model=model,
280+
prompt=args.prompt,
281+
device=args.device,
282+
max_tokens=args.max_tokens,
283+
frame_rate=args.frame_rate,
284+
results_dir=args.results_dir,
285+
)
233286

234-
return run_pipeline(config)
287+
return run_pipeline(config)
288+
289+
except VLMAlertsError as error:
290+
print(f"Error: {error}")
291+
return 1
292+
except Exception as error:
293+
print(f"Unexpected failure: {error}")
294+
return 1
235295

236296

237297
if __name__ == "__main__":

0 commit comments

Comments
 (0)