-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdog_detect.py
More file actions
403 lines (350 loc) · 14 KB
/
dog_detect.py
File metadata and controls
403 lines (350 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
#!/usr/bin/env python3
"""
Dog detection with YOLOv8n on ESP32-CAM MJPEG stream.
Sends 'buzz_on'/'buzz_off' over serial to an Arduino/ESP32 that drives a 23 kHz buzzer.
Usage:
pip install -r requirements.txt
python dog_detect.py --url http://<esp32-ip>:81/stream --port AUTO
Press 'q' to quit.
"""
import argparse
import json
import os
import sys
import time
import urllib.request
from urllib.parse import urlparse
from typing import Generator, Iterable, List, Optional, Tuple
import cv2
import numpy as np
import serial
import serial.tools.list_ports
import requests
# Ultralytics YOLO
try:
from ultralytics import YOLO
except Exception as e:
print("Error: ultralytics not installed. Run: pip install ultralytics", file=sys.stderr)
raise
# Auto device selection (cpu/cuda/mps)
_DEVICE_DEFAULT = "cpu"
try:
import torch # type: ignore
if torch.cuda.is_available():
_DEVICE_DEFAULT = "cuda"
elif getattr(getattr(torch, "backends", None), "mps", None) and torch.backends.mps.is_available():
_DEVICE_DEFAULT = "mps"
except Exception:
pass
# -------------------- Config --------------------
DEFAULTS = {
"camera_url": "http://192.168.1.123:81/stream",
"serial_port": "AUTO",
"baudrate": 115200,
"buzzer_mode": "serial", # serial|http
"control_base": "", # http://<ip> for /buzz endpoint (optional; derived from camera_url if empty)
"on_delay_s": 2.0,
"off_delay_s": 1.0,
"conf_threshold": 0.5,
"imgsz": 480,
"device": "auto", # auto|cpu|cuda|mps
"show_window": True,
"draw_overlays": True,
"mirror": False,
}
def load_config(path: Optional[str]) -> dict:
cfg = DEFAULTS.copy()
# env overrides
cfg.update({
"camera_url": os.environ.get("ESP32_CAM_URL", cfg["camera_url"]),
"serial_port": os.environ.get("DOG_BUZZ_PORT", cfg["serial_port"]),
"baudrate": int(os.environ.get("DOG_BUZZ_BAUD", str(cfg["baudrate"]))),
"buzzer_mode": os.environ.get("DOG_BUZZ_MODE", cfg["buzzer_mode"]),
"control_base": os.environ.get("DOG_CONTROL_BASE", cfg["control_base"]),
"on_delay_s": float(os.environ.get("DOG_ON_DELAY", str(cfg["on_delay_s"]))),
"off_delay_s": float(os.environ.get("DOG_OFF_DELAY", str(cfg["off_delay_s"]))),
"conf_threshold": float(os.environ.get("DOG_CONF", str(cfg["conf_threshold"]))),
"imgsz": int(os.environ.get("DOG_IMGSZ", str(cfg["imgsz"]))),
"device": os.environ.get("DOG_DEVICE", cfg["device"]),
"show_window": os.environ.get("DOG_SHOW", "1") not in ("0", "false", "False"),
"draw_overlays": os.environ.get("DOG_DRAW", "1") not in ("0", "false", "False"),
"mirror": os.environ.get("DOG_MIRROR", "0") in ("1", "true", "True"),
})
# file overrides
if path and os.path.isfile(path):
with open(path, "r") as f:
cfg.update(json.load(f))
elif os.path.isfile("config.json"):
with open("config.json", "r") as f:
cfg.update(json.load(f))
# device resolve
if cfg["device"] == "auto":
cfg["device"] = _DEVICE_DEFAULT
return cfg
# -------------------- Serial control --------------------
PREF_PORT_KEYWORDS = (
"Arduino", "wchusbserial", "usbmodem", "usbserial", "Silicon Labs",
"CP210", "CH340", "FTDI", "ESP32"
)
def autodetect_serial_port() -> Optional[str]:
ports = list(serial.tools.list_ports.comports())
if not ports:
return None
for p in ports:
desc = f"{p.device} {p.description} {p.manufacturer}".lower()
if any(k.lower() in desc for k in PREF_PORT_KEYWORDS):
return p.device
return ports[0].device
class SerialBuzz:
def __init__(self, port: str, baud: int = 115200):
self.port_cfg = port
self.baud = baud
self.ser: Optional[serial.Serial] = None
self.state_on = False
self.connect()
def connect(self):
port = self.port_cfg
if str(port).upper() == "AUTO":
port = autodetect_serial_port()
if not port:
print("[Serial] No port found. Dry mode (no buzzer).")
return
try:
self.ser = serial.Serial(port=port, baudrate=self.baud, timeout=0.1)
time.sleep(0.2)
self.send(False, force=True) # ensure off
print(f"[Serial] Connected: {port} @ {self.baud}")
except Exception as e:
print(f"[Serial] Could not open {port}: {e}. Dry mode.")
self.ser = None
def send(self, on: bool, force: bool = False):
if not force and on == self.state_on:
return
self.state_on = on
cmd = b"buzz_on\n" if on else b"buzz_off\n"
if self.ser and self.ser.is_open:
try:
self.ser.write(cmd)
self.ser.flush()
except Exception as e:
print(f"[Serial] send failed: {e}")
else:
print("[DRY]", cmd.decode().strip())
def close(self):
try:
if self.ser and self.ser.is_open:
self.send(False, force=True)
self.ser.close()
except Exception:
pass
def derive_control_base(camera_url: str) -> str:
"""Given http://host:81/stream return http://host (camera server on port 80)."""
try:
p = urlparse(camera_url)
host = p.hostname or ""
scheme = p.scheme or "http"
return f"{scheme}://{host}"
except Exception:
return camera_url.rsplit(":", 1)[0]
class HTTPBuzz:
"""Controls buzzer on ESP32-CAM via /buzz?on=1|0 endpoint on the camera HTTP server (port 80)."""
def __init__(self, base_url: str, timeout: float = 0.5):
self.base = base_url.rstrip("/")
self.timeout = timeout
self.state_on = False
def send(self, on: bool, force: bool = False):
if not force and on == self.state_on:
return
self.state_on = on
url = f"{self.base}/buzz?on={'1' if on else '0'}"
try:
requests.get(url, timeout=self.timeout)
except Exception:
# Non-fatal; keep running
pass
def close(self):
# No persistent resources
pass
# -------------------- Video input --------------------
def try_opencv_stream(url: str) -> Optional[cv2.VideoCapture]:
cap = cv2.VideoCapture(url)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if not cap.isOpened():
cap.release()
return None
ok, _ = cap.read()
if not ok:
cap.release()
return None
return cap
def mjpeg_frames(url: str, timeout: float = 10.0) -> Generator[np.ndarray, None, None]:
while True:
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=timeout) as resp:
buf = bytearray()
while True:
chunk = resp.read(4096)
if not chunk:
break
buf.extend(chunk)
a = buf.find(b"\xff\xd8")
b = buf.find(b"\xff\xd9")
if a != -1 and b != -1 and b > a:
jpg = bytes(buf[a:b+2])
del buf[:b+2]
frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
if frame is not None:
yield frame
except Exception as e:
print(f"[Stream] Reconnect in 1s: {e}")
time.sleep(1.0)
# -------------------- Detection --------------------
def resize_short_side(img: np.ndarray, short: int) -> np.ndarray:
h, w = img.shape[:2]
if min(h, w) == short:
return img
if h < w:
new_h = short
new_w = int(w * (short / h))
else:
new_w = short
new_h = int(h * (short / w))
return cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
def dog_boxes_from_result(result, names: dict, want: str = "dog", conf_thr: float = 0.5) -> List[Tuple[int,int,int,int,float]]:
boxes = []
if result is None or getattr(result, "boxes", None) is None:
return boxes
cls = result.boxes.cls.cpu().numpy().astype(int)
xyxy = result.boxes.xyxy.cpu().numpy().astype(int)
conf = result.boxes.conf.cpu().numpy()
for i in range(len(cls)):
label = names.get(int(cls[i]), str(cls[i]))
if label == want and conf[i] >= conf_thr:
x1, y1, x2, y2 = map(int, xyxy[i])
boxes.append((x1, y1, x2, y2, float(conf[i])))
return boxes
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--config", type=str, default=None)
ap.add_argument("--url", type=str, help="ESP32-CAM stream URL")
ap.add_argument("--port", type=str, help="Serial port or AUTO")
ap.add_argument("--baud", type=int, help="Serial baudrate")
ap.add_argument("--buzzer", type=str, choices=["serial", "http"], help="Buzzer control mode")
ap.add_argument("--control-base", type=str, help="Base URL for ESP32-CAM control (e.g., http://192.168.4.1)")
ap.add_argument("--device", type=str, help="cpu|cuda|mps|auto")
ap.add_argument("--conf", type=float, help="Confidence threshold")
ap.add_argument("--imgsz", type=int, help="Input short side size")
ap.add_argument("--show", action="store_true", help="Force show window")
ap.add_argument("--no-show", action="store_true", help="Disable window")
args = ap.parse_args()
cfg = load_config(args.config)
if args.url: cfg["camera_url"] = args.url
if args.port: cfg["serial_port"] = args.port
if args.baud: cfg["baudrate"] = args.baud
if args.buzzer: cfg["buzzer_mode"] = args.buzzer
if args.control_base: cfg["control_base"] = args.control_base
if args.device: cfg["device"] = args.device
if args.conf is not None: cfg["conf_threshold"] = args.conf
if args.imgsz: cfg["imgsz"] = args.imgsz
if args.show: cfg["show_window"] = True
if args.no_show: cfg["show_window"] = False
print("[Config]", cfg)
# Model
model = YOLO("yolov8n.pt") # COCO pretrained
names = model.model.names if hasattr(model, "model") else model.names
# Buzzer control
if str(cfg.get("buzzer_mode", "serial")).lower() == "http":
base = cfg.get("control_base") or derive_control_base(cfg["camera_url"])
buzzer = HTTPBuzz(base)
print(f"[Buzzer] HTTP mode -> {base}/buzz?on=1|0")
else:
buzzer = SerialBuzz(cfg["serial_port"], cfg["baudrate"])
# Video
cap = try_opencv_stream(cfg["camera_url"]) # try native first
frame_gen: Optional[Iterable[np.ndarray]] = None
if cap is None:
frame_gen = mjpeg_frames(cfg["camera_url"]) # manual MJPEG
# Hysteresis state
buzzing = False
present_since = 0.0
absent_since = time.time()
det_frames = 0
last_t = time.time()
fps = 0.0
try:
while True:
if cap is not None:
ok, frame = cap.read()
if not ok:
print("[Stream] Lost. Switching to MJPEG parser...")
cap.release()
cap = None
frame_gen = mjpeg_frames(cfg["camera_url"]) # fallback
continue
else:
if frame_gen is None:
frame_gen = mjpeg_frames(cfg["camera_url"]) # ensure
try:
frame = next(frame_gen)
except StopIteration:
continue
if cfg["mirror"]:
frame = cv2.flip(frame, 1)
# Resize for speed while keeping aspect
frame_in = resize_short_side(frame, cfg["imgsz"]) if cfg["imgsz"] else frame
# Inference
t0 = time.time()
results = model.predict(source=frame_in, conf=cfg["conf_threshold"], device=cfg["device"], verbose=False)
result = results[0]
boxes = dog_boxes_from_result(result, names, "dog", cfg["conf_threshold"])
dog_present = len(boxes) > 0
if dog_present:
det_frames += 1
# Hysteresis
now = time.time()
if dog_present:
if present_since == 0.0:
present_since = now
absent_since = 0.0
if (not buzzing) and (now - present_since >= cfg["on_delay_s"]):
buzzing = True
buzzer.send(True)
else:
if absent_since == 0.0:
absent_since = now
present_since = 0.0
if buzzing and (now - absent_since >= cfg["off_delay_s"]):
buzzing = False
buzzer.send(False)
# FPS
dt = now - last_t
if dt > 0:
fps = 0.9 * fps + 0.1 * (1.0 / dt) if fps > 0 else 1.0 / dt
last_t = now
# Overlays
if cfg["draw_overlays"]:
for (x1, y1, x2, y2, conf) in boxes:
cv2.rectangle(frame_in, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame_in, f"dog {conf:.2f}", (x1, max(15, y1 - 6)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1, cv2.LINE_AA)
hud = f"FPS {fps:.1f} DET {det_frames} BUZZ {'ON' if buzzing else 'OFF'}"
if present_since:
hud += f" present {now - present_since:.1f}s"
if absent_since:
hud += f" absent {now - absent_since:.1f}s"
cv2.putText(frame_in, hud, (8, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA)
cv2.putText(frame_in, hud, (8, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 1, cv2.LINE_AA)
if cfg["show_window"]:
cv2.imshow("Dog Deterrent", frame_in)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except KeyboardInterrupt:
pass
finally:
buzzer.send(False, force=True)
buzzer.close()
if cap is not None:
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()