Skip to content

Commit e20c095

Browse files
authored
Merge pull request #70 from xmartlabs/Occupancy-SendNotifications
Occupancy Notifications
2 parents 4596c4e + 65645e1 commit e20c095

13 files changed

+240
-10
lines changed

config-coral.ini

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ DashboardURL = http://0.0.0.0:8000
99
ScreenshotsDirectory = /repo/data/processor/static/screenshots
1010
EnableSlackNotifications = no
1111
SlackChannel = lanthorn-notifications
12+
; OccupancyAlertsMinInterval time is measured in seconds (if interval < 0 then no occupancy alerts are triggered)
13+
OccupancyAlertsMinInterval = 180
1214

1315
[API]
1416
Host = 0.0.0.0

config-jetson.ini

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ DashboardURL = http://0.0.0.0:8000
1212
ScreenshotsDirectory = /repo/data/processor/static/screenshots
1313
EnableSlackNotifications = no
1414
SlackChannel = lanthorn-notifications
15+
; OccupancyAlertsMinInterval time is measured in seconds (if interval < 0 then no occupancy alerts are triggered)
16+
OccupancyAlertsMinInterval = 180
1517

1618
[API]
1719
Host = 0.0.0.0

config-x86-openvino.ini

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ DashboardURL = http://0.0.0.0:8000
2323
ScreenshotsDirectory = /repo/data/processor/static/screenshots
2424
EnableSlackNotifications = no
2525
SlackChannel = lanthorn-notifications
26+
; OccupancyAlertsMinInterval time is measured in seconds (if interval < 0 then no occupancy alerts are triggered)
27+
OccupancyAlertsMinInterval = 180
2628

2729
[Area_0]
2830
Id = area0

config-x86.ini

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ DashboardURL = http://0.0.0.0:8000
2323
ScreenshotsDirectory = /repo/data/processor/static/screenshots
2424
EnableSlackNotifications = no
2525
SlackChannel = lanthorn-notifications
26+
; OccupancyAlertsMinInterval time is measured in seconds (if interval < 0 then no occupancy alerts are triggered)
27+
OccupancyAlertsMinInterval = 180
2628

2729
[Area_0]
2830
Id = area0

libs/area_reporting.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import os
2+
import time
3+
import logging
4+
import csv
5+
from datetime import date, datetime
6+
from collections import deque
7+
from .utils.mailing import MailService
8+
from .notifications.slack_notifications import SlackService
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class AreaReporting:
14+
15+
def __init__(self, config, area):
16+
self.processing_alerts = False
17+
self.config = config
18+
self.area = area
19+
20+
self.occupancy_sleep_time_interval = float(self.config.get_section_dict("App")["OccupancyAlertsMinInterval"])
21+
self.log_dir = self.config.get_section_dict("Logger")["LogDirectory"]
22+
self.idle_time = float(self.config.get_section_dict('Logger')['TimeInterval'])
23+
self.area_id = self.area['id']
24+
self.area_name = self.area['name']
25+
self.occupancy_threshold = self.area['occupancy_threshold']
26+
self.should_send_email_notifications = self.area['should_send_email_notifications']
27+
self.should_send_slack_notifications = self.area['should_send_slack_notifications']
28+
self.cameras = [camera for camera in self.config.get_video_sources() if camera['id'] in self.area['cameras']]
29+
for camera in self.cameras:
30+
camera['file_path'] = os.path.join(self.log_dir, camera['id'], "objects_log")
31+
camera['last_processed_time'] = time.time()
32+
33+
self.mail_service = MailService(config)
34+
self.slack_service = SlackService(config)
35+
36+
def process_area(self):
37+
# Sleep for a while so cameras start processing
38+
time.sleep(30)
39+
40+
self.processing_alerts = True
41+
logger.info(f'Enabled processing alerts for - {self.area_id}: {self.area_name} with {len(self.cameras)} cameras')
42+
while self.processing_alerts:
43+
camera_file_paths = [os.path.join(camera['file_path'], str(date.today()) + ".csv") for camera in self.cameras]
44+
if not all(list(map(os.path.isfile, camera_file_paths))):
45+
# Wait before csv for this day are created
46+
logger.info(f'Area reporting on - {self.area_id}: {self.area_name} is waiting for reports to be created')
47+
time.sleep(5)
48+
49+
occupancy = 0
50+
for camera in self.cameras:
51+
with open(os.path.join(camera['file_path'], str(date.today()) + ".csv"), 'r') as log:
52+
last_log = deque(csv.DictReader(log), 1)[0]
53+
log_time = datetime.strptime(last_log['Timestamp'], "%Y-%m-%d %H:%M:%S")
54+
# TODO: If the TimeInterval of the Logger is more than 30 seconds this would have to be revised.
55+
if (datetime.now() - log_time).total_seconds() < 30:
56+
occupancy += int(last_log['DetectedObjects'])
57+
else:
58+
logger.warn(f"Logs aren't being updated for camera {camera['id']} - {camera['name']}")
59+
60+
if occupancy > self.occupancy_threshold:
61+
# Trigger alerts
62+
if self.should_send_email_notifications:
63+
self.mail_service.send_occupancy_notification(self.area, occupancy)
64+
if self.should_send_slack_notifications:
65+
self.slack_service.occupancy_alert(self.area, occupancy)
66+
# Sleep until the cooldown of the alert
67+
time.sleep(self.occupancy_sleep_time_interval)
68+
else:
69+
# Sleep until new data is logged
70+
time.sleep(self.idle_time)
71+
72+
self.stop_process_area()
73+
74+
def stop_process_area(self):
75+
logger.info(f'Disabled processing alerts for area - {self.area_id}: {self.area_name}')
76+
self.processing_alerts = False

libs/area_threading.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import os
2+
from threading import Thread
3+
from libs.area_reporting import AreaReporting as AreaEngine
4+
import logging
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
def run_area_processing(config, pipe, areas):
10+
pid = os.getpid()
11+
logger.info(f"[{pid}] taking on notifications for {len(areas)} areas")
12+
threads = []
13+
for area in areas:
14+
engine = AreaThread(config, area)
15+
engine.start()
16+
threads.append(engine)
17+
18+
# Wait for a signal to die
19+
pipe.recv()
20+
logger.info(f"[{pid}] will stop area alerts and die")
21+
for t in threads:
22+
t.stop()
23+
24+
logger.info(f"[{pid}] Goodbye!")
25+
26+
27+
class AreaThread(Thread):
28+
def __init__(self, config, area):
29+
Thread.__init__(self)
30+
self.engine = None
31+
self.config = config
32+
self.area = area
33+
34+
def run(self):
35+
self.engine = AreaEngine(self.config, self.area)
36+
self.engine.process_area()
37+
38+
def stop(self):
39+
self.engine.stop_process_area()
40+
self.join()

libs/config_engine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ def get_areas(self):
178178
if 'Cameras' in section and section['Cameras'].strip() != "":
179179
area['cameras'] = section['Cameras'].split(',')
180180

181-
if area['notify_every_minutes'] > 0 and (area['violation_threshold'] > 0 or area['occupancy_threshold'] > 0):
181+
if (area['notify_every_minutes'] > 0 and area['violation_threshold'] > 0) or area['occupancy_threshold'] > 0:
182182
area['should_send_email_notifications'] = 'emails' in area
183183
area['should_send_slack_notifications'] = bool(self.config['App']['SlackChannel'] and
184184
self.config.getboolean('App', 'EnableSlackNotifications'))

libs/distancing.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ def __init__(self, config, source, live_feed_enabled=True):
6060
logger.info(f"Falling back using {self.default_dist_method}")
6161
self.dist_method = self.default_dist_method
6262

63-
self.screenshot_period = float(
64-
self.config.get_section_dict("App")["ScreenshotPeriod"]) * 60 # config.ini uses minutes as unit
63+
# config.ini uses minutes as unit
64+
self.screenshot_period = float(self.config.get_section_dict("App")["ScreenshotPeriod"]) * 60
6565
self.bucket_screenshots = config.get_section_dict("App")["ScreenshotS3Bucket"]
6666
self.uploader = S3Uploader(self.config)
6767
self.screenshot_path = os.path.join(self.config.get_section_dict("App")["ScreenshotsDirectory"], self.camera_id)

libs/notifications/slack_notifications.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,14 @@ def violation_report(self, entity_info, number):
4949
msg = f"We found {number} violations in {entity_id}: {entity_name} ({entity_type})"
5050
self.post_message_to_channel(msg, self.channel)
5151

52-
def daily_report(self, entity_info, number,):
52+
def daily_report(self, entity_info, number):
5353
entity_id, entity_type, entity_name = entity_info['id'], entity_info['type'], entity_info['name']
5454
msg = f"Yesterday we found {number} violations in {entity_id}: {entity_name} ({entity_type})."
5555
self.post_message_to_channel(msg, self.channel)
56+
57+
def occupancy_alert(self, entity_info, number):
58+
entity_id, entity_type = entity_info['id'], entity_info['type']
59+
entity_name, entity_threshold = entity_info['name'], entity_info['occupancy_threshold']
60+
msg = f"Occupancy threshold was exceeded in {entity_type} {entity_id}: {entity_name}." \
61+
f"We found {number} people out of a capacity of {entity_threshold}."
62+
self.post_message_to_channel(msg, self.channel)

libs/processor_core.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from queue import Empty
77
import schedule
88
from libs.engine_threading import run_video_processing
9+
from libs.area_threading import run_area_processing
910
from libs.utils.notifications import run_check_violations
1011

1112
logger = logging.getLogger(__name__)
@@ -63,11 +64,12 @@ def _setup_scheduled_tasks(self):
6364
should_send_slack_notifications = area['should_send_slack_notifications']
6465
if should_send_email_notifications or should_send_slack_notifications:
6566
interval = area['notify_every_minutes']
66-
threshold = area['violation_threshold']
67-
schedule.every(interval).minutes.do(
68-
run_check_violations, threshold, self.config, area, interval,
69-
should_send_email_notifications, should_send_slack_notifications
70-
).tag("notification-task")
67+
violation_threshold = area['violation_threshold']
68+
if violation_threshold > 0:
69+
schedule.every(interval).minutes.do(
70+
run_check_violations, violation_threshold, self.config, area, interval,
71+
should_send_email_notifications, should_send_slack_notifications
72+
).tag("notification-task")
7173
else:
7274
logger.info(f"should not send notification for camera {area['id']}")
7375

@@ -131,6 +133,22 @@ def _start_processing(self):
131133
p = mp.Process(target=run_video_processing, args=(self.config, recv_conn, p_src))
132134
p.start()
133135
engines.append((send_conn, p))
136+
137+
# Set up occupancy alerts
138+
areas_to_notify = [
139+
area for area in self.config.get_areas() if area['occupancy_threshold'] > 0 and area['cameras'] and (
140+
area['should_send_email_notifications'] or area['should_send_slack_notifications']
141+
)
142+
]
143+
if areas_to_notify and float(self.config.get_section_dict('App')['OccupancyAlertsMinInterval']) >= 0:
144+
logger.info(f'Spinning up area alert threads for {len(areas_to_notify)} areas')
145+
recv_conn, send_conn = mp.Pipe(False)
146+
p = mp.Process(target=run_area_processing, args=(self.config, recv_conn, areas_to_notify))
147+
p.start()
148+
engines.append((send_conn, p))
149+
else:
150+
logger.info('Area occupancy alerts are disabled for all areas')
151+
134152
self._engines = engines
135153

136154
def _stop_processing(self):

0 commit comments

Comments
 (0)