diff --git a/amd64-usbtpu.Dockerfile b/amd64-usbtpu.Dockerfile index 065bd70d..87fd90dd 100644 --- a/amd64-usbtpu.Dockerfile +++ b/amd64-usbtpu.Dockerfile @@ -89,8 +89,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ python3-dev \ && apt-get autoremove -y -ENTRYPOINT ["python3", "neuralet-distancing.py"] -CMD ["--config", "config-skeleton.ini"] +#ENTRYPOINT ["python3", "neuralet-distancing.py"] +#CMD ["--config", "config-skeleton.ini"] +ENTRYPOINT ["bash", "start_services.bash"] +CMD ["config-skeleton.ini"] + WORKDIR /repo EXPOSE 8000 diff --git a/api.Dockerfile b/api.Dockerfile new file mode 100644 index 00000000..5dbbd80f --- /dev/null +++ b/api.Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.8-slim + +RUN python3 -m pip install --upgrade pip setuptools==41.0.0 && pip install aiofiles fastapi uvicorn +ENV DEV_ALLOW_ALL_ORIGINS=true +WORKDIR /repo +ENTRYPOINT ["python3", "run_processor_api.py"] +CMD ["--config", "config-api.ini"] diff --git a/api/processor_api.py b/api/processor_api.py new file mode 100644 index 00000000..8c5b2738 --- /dev/null +++ b/api/processor_api.py @@ -0,0 +1,97 @@ +import time +from threading import Thread +from queue import Queue +from multiprocessing.managers import BaseManager +from fastapi import FastAPI +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse, StreamingResponse +import uvicorn +import os +import logging + +from share.commands import Commands + +logger = logging.getLogger(__name__) + +class QueueManager(BaseManager): pass + +class ProcessorAPI: + """ + The Webgui object implements a fastapi application and acts as an interface for users. + Once it is created it will act as a central application for viewing outputs. + + :param config: Is a ConfigEngine instance which provides necessary parameters. + :param engine_instance: A ConfigEngine object which store all of the config parameters. Access to any parameter + is possible by calling get_section_dict method. + """ + + def __init__(self, config): + self.config = config + self._setup_queues() + self._host = self.config.get_section_dict("API")["Host"] + self._port = int(self.config.get_section_dict("API")["Port"]) + self.app = self.create_fastapi_app() + + def _setup_queues(self): + QueueManager.register('get_cmd_queue') + QueueManager.register('get_result_queue') + self._queue_host = self.config.get_section_dict("CORE")["Host"] + self._queue_port = int(self.config.get_section_dict("CORE")["QueuePort"]) + auth_key = self.config.get_section_dict("CORE")["QueueAuthKey"] + self._queue_manager = QueueManager(address=(self._queue_host, self._queue_port), authkey=auth_key.encode('ascii')) + + while True: + try: + self._queue_manager.connect() + break + except ConnectionRefusedError: + logger.warning("Waiting for core's queue to initiate ... ") + time.sleep(1) + + logger.info("Connection established to Core's queue") + self._cmd_queue = self._queue_manager.get_cmd_queue() + self._result_queue = self._queue_manager.get_result_queue() + + def create_fastapi_app(self): + # Create and return a fastapi instance + app = FastAPI() + + if os.environ.get('DEV_ALLOW_ALL_ORIGINS', False): + # This option allows React development server (which is served on another port, like 3000) to proxy requests + # to this server. + # WARNING: read this before enabling it in your environment: + # https://medium.com/@stestagg/stealing-secrets-from-developers-using-websockets-254f98d577a0 + from fastapi.middleware.cors import CORSMiddleware + app.add_middleware(CORSMiddleware, allow_origins='*', allow_credentials=True, allow_methods=['*'], + allow_headers=['*']) + app.mount("/static", StaticFiles(directory="/repo/data/web_gui/static"), name="static") + + @app.get("/restart-engine") + async def restart_engine(): + logger.info("restart-engine requests on api") + self._cmd_queue.put(Commands.RESTART) + logger.info("waiting for core's response...") + result = self._result_queue.get() + return result + + @app.get("/process-video-cfg") + async def restart_engine(): + logger.info("process-video-cfg requests on api") + self._cmd_queue.put(Commands.PROCESS_VIDEO_CFG) + logger.info("waiting for core's response...") + result = self._result_queue.get() + return result + + @app.get("/stop-process-video") + async def restart_engine(): + logger.info("stop-process-video requests on api") + self._cmd_queue.put(Commands.STOP_PROCESS_VIDEO) + logger.info("waiting for core's response...") + result = self._result_queue.get() + return result + + return app + + def start(self): + uvicorn.run(self.app, host=self._host, port=self._port, log_level='info', access_log=False) + diff --git a/build-dockers-openvino.sh b/build-dockers-openvino.sh new file mode 100755 index 00000000..a8c2c6d9 --- /dev/null +++ b/build-dockers-openvino.sh @@ -0,0 +1,5 @@ +#!/bin/bash +docker build -f x86-openvino.Dockerfile -t "neuralet/smart-social-distancing:latest-x86_64_openvino" +docker build -f api.Dockerfile -t "neuralet/smart-social-distancing:latest-api" . +docker build -f frontend.Dockerfile -t "neuralet/smart-social-distancing:latest-frontend" . +docker build -f run-frontend.Dockerfile -t "neuralet/smart-social-distancing:latest-run-frontend" . diff --git a/config-api.ini b/config-api.ini new file mode 100644 index 00000000..29c85205 --- /dev/null +++ b/config-api.ini @@ -0,0 +1,8 @@ +[API] +Host: 0.0.0.0 +Port: 8003 + +[CORE] +Host: 0.0.0.0 +QueuePort: 8010 +QueueAuthKey: shibalba diff --git a/config-frontend.ini b/config-frontend.ini new file mode 100644 index 00000000..cb26434e --- /dev/null +++ b/config-frontend.ini @@ -0,0 +1,7 @@ +[App] +Host: 0.0.0.0 +Port: 8002 + +[Processor] +Host: 0.0.0.0 +Port: 8001 diff --git a/config-x86-openvino.ini b/config-x86-openvino.ini index a40814d1..4f8589d0 100644 --- a/config-x86-openvino.ini +++ b/config-x86-openvino.ini @@ -1,7 +1,14 @@ +[API] +Host: 0.0.0.0 +Port: 8001 + +[CORE] +Host: 0.0.0.0 +QueuePort: 8010 +QueueAuthKey: shibalba + [App] VideoPath: /repo/data/TownCentreXVID.avi -Host: 0.0.0.0 -Port: 8000 Resolution: 640,480 Encoder: videoconvert ! video/x-raw,format=I420 ! x264enc speed-preset=ultrafast ; WIP https://github.com/neuralet/neuralet/issues/91 diff --git a/frontend.Dockerfile b/frontend.Dockerfile index 2016e262..4d3b577f 100644 --- a/frontend.Dockerfile +++ b/frontend.Dockerfile @@ -1,7 +1,7 @@ FROM node:14-alpine as builder WORKDIR /frontend COPY frontend/package.json frontend/yarn.lock /frontend/ -RUN yarn install --production +RUN yarn install --network-timeout 1000000 --production COPY frontend /frontend RUN yarn build diff --git a/frontend/src/Live.js b/frontend/src/Live.js index a81a99cc..89e47917 100644 --- a/frontend/src/Live.js +++ b/frontend/src/Live.js @@ -116,7 +116,7 @@ function Charts({cameras}) { if (!cameras) { return } - const url = `/static/data/${cameras[0]['id']}/objects_log/${new Date().toISOString().slice(0, 10)}.csv`; + const url = `${cameras[0]['storage_host']}/static/data/${cameras[0]['id']}/objects_log/${new Date().toISOString().slice(0, 10)}.csv`; axios.get(url, {headers}).then(response => { let records = Plotly.d3.csv.parse(response.data); let x1 = [], y1 = [], y2 = [], env_score = []; diff --git a/libs/core.py b/libs/distancing.py similarity index 99% rename from libs/core.py rename to libs/distancing.py index 438fc631..eb66e3c8 100755 --- a/libs/core.py +++ b/libs/distancing.py @@ -1,3 +1,4 @@ +import threading import cv2 as cv import numpy as np import math @@ -8,7 +9,7 @@ from libs.loggers.loggers import Logger from tools.environment_score import mx_environment_scoring_consider_crowd from tools.objects_post_process import extract_violating_objects -from ui.utils import visualization_utils +from libs.utils import visualization_utils import logging logger = logging.getLogger(__name__) @@ -21,6 +22,7 @@ def __init__(self, config): self.detector = None self.device = self.config.get_section_dict('Detector')['Device'] self.running_video = False + self.tracker = CentroidTracker( max_disappeared=int(self.config.get_section_dict("PostProcessor")["MaxTrackFrame"])) self.logger = Logger(self.config) @@ -48,6 +50,7 @@ def __init__(self, config): self.dist_threshold = self.config.get_section_dict("PostProcessor")["DistThreshold"] self.resolution = tuple([int(i) for i in self.config.get_section_dict('App')['Resolution'].split(',')]) self.birds_eye_resolution = (200, 300) + print('Distancing object created') def __process(self, cv_image): """ @@ -151,13 +154,13 @@ def process_video(self, video_uri): dist_threshold = float(self.config.get_section_dict("PostProcessor")["DistThreshold"]) class_id = int(self.config.get_section_dict('Detector')['ClassID']) frame_num = 0 + while input_cap.isOpened() and self.running_video: _, cv_image = input_cap.read() birds_eye_window = np.zeros(self.birds_eye_resolution[::-1] + (3,), dtype="uint8") if np.shape(cv_image) != (): cv_image, objects, distancings = self.__process(cv_image) output_dict = visualization_utils.visualization_preparation(objects, distancings, dist_threshold) - category_index = {class_id: { "id": class_id, "name": "Pedestrian", @@ -206,17 +209,26 @@ def process_video(self, video_uri): out.write(cv_image) out_birdseye.write(birds_eye_window) + frame_num += 1 if frame_num % 1000 == 1: logger.info(f'processed frame {frame_num} for {video_uri}') else: continue + self.logger.update(objects, distancings) + + input_cap.release() out.release() out_birdseye.release() + + self.running_video = False + + def stop_process_video(self): self.running_video = False + def calculate_distancing(self, objects_list): """ this function post-process the raw boxes of object detector and calculate a distance matrix diff --git a/libs/processor_core.py b/libs/processor_core.py new file mode 100644 index 00000000..f9e8edef --- /dev/null +++ b/libs/processor_core.py @@ -0,0 +1,93 @@ +from threading import Thread +from queue import Queue +from multiprocessing.managers import BaseManager +import logging +from share.commands import Commands +from libs.distancing import Distancing as CvEngine + +logger = logging.getLogger(__name__) + +class QueueManager(BaseManager): pass + + +class ProcessorCore: + + def __init__(self, config): + self.config = config + self._cmd_queue = Queue() + self._result_queue = Queue() + self._setup_queues() + self._engine = CvEngine(self.config) + self._tasks = {} + + + def _setup_queues(self): + QueueManager.register('get_cmd_queue', callable=lambda: self._cmd_queue) + QueueManager.register('get_result_queue', callable=lambda: self._result_queue) + self._host = self.config.get_section_dict("CORE")["Host"] + self._queue_port = int(self.config.get_section_dict("CORE")["QueuePort"]) + auth_key = self.config.get_section_dict("CORE")["QueueAuthKey"] + self._queue_manager = QueueManager(address=(self._host, self._queue_port), authkey=auth_key.encode('ascii')) + self._queue_manager.start() + self._cmd_queue = self._queue_manager.get_cmd_queue() + self._result_queue = self._queue_manager.get_result_queue() + logger.info("Core's queue has been initiated") + + + def start(self): + logging.info("Starting processor core") + self._serve() + logging.info("processor core has been terminated.") + + + def _serve(self): + while True: + logger.info("Core is listening for commands ... ") + cmd_code = self._cmd_queue.get() + logger.info("command received: " + str(cmd_code)) + + if cmd_code == Commands.RESTART: + # Do everything necessary ... make sure all threads in tasks are stopped + if Commands.PROCESS_VIDEO_CFG in self._tasks.keys(): + logger.warning("currently processing a video, stopping ...") + self._engine.stop_process_video() + + # TODO: Be sure you have done proper action before this so all threads are stopped + self._tasks = {} + + self._engine = CvEngine(self.config) + logger.info("engine restarted") + self._result_queue.put(True) + continue + + elif cmd_code == Commands.PROCESS_VIDEO_CFG: + if Commands.PROCESS_VIDEO_CFG in self._tasks.keys(): + logger.warning("Already processing a video! ...") + self._result_queue.put(False) + continue + + self._tasks[Commands.PROCESS_VIDEO_CFG] = Thread(target = self._engine.process_video, \ + args=(self.config.get_section_dict("App").get("VideoPath"),) ) + self._tasks[Commands.PROCESS_VIDEO_CFG].start() + logger.info("started to process video ... ") + self._result_queue.put(True) + continue + + elif cmd_code == Commands.STOP_PROCESS_VIDEO : + if Commands.PROCESS_VIDEO_CFG in self._tasks.keys(): + self._engine.stop_process_video() + del self._tasks[Commands.PROCESS_VIDEO_CFG] + logger.info("processing stopped") + self._result_queue.put(True) + else: + logger.warning("no video is being processed") + self._result_queue.put(False) + + continue + + else: + logger.warning("Invalid core command " + str(cmd_code)) + self._result_queue.put("invalid_cmd_code") + continue + + diff --git a/ui/utils/__init__.py b/libs/utils/__init__.py similarity index 100% rename from ui/utils/__init__.py rename to libs/utils/__init__.py diff --git a/ui/utils/visualization_utils.py b/libs/utils/visualization_utils.py similarity index 100% rename from ui/utils/visualization_utils.py rename to libs/utils/visualization_utils.py diff --git a/neuralet-distancing.py b/neuralet-distancing.py deleted file mode 100644 index 1f31b900..00000000 --- a/neuralet-distancing.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/python3 -import argparse -from multiprocessing import Process -import threading -from libs.config_engine import ConfigEngine -import logging - -logger = logging.getLogger(__name__) - - -def start_engine(config, video_path): - if video_path: - from libs.core import Distancing as CvEngine - engine = CvEngine(config) - engine.process_video(video_path) - else: - logger.warning('Skipping CVEngine as video_path is not set in config file') - - -def start_web_gui(config): - from ui.web_gui import WebGUI - ui = WebGUI(config) - ui.start() - - -def main(config): - logging.basicConfig(level=logging.INFO) - if isinstance(config, str): - config = ConfigEngine(config) - - video_path = config.get_section_dict("App").get("VideoPath", None) - process_engine = Process(target=start_engine, args=(config, video_path,)) - process_api = Process(target=start_web_gui, args=(config,)) - - process_api.start() - process_engine.start() - logger.info("Services Started.") - - forever = threading.Event() - try: - forever.wait() - except KeyboardInterrupt: - logger.info("Received interrupt. Terminating...") - - process_engine.terminate() - process_engine.join() - logger.info("CV Engine terminated.") - process_api.terminate() - process_api.join() - logger.info("Web GUI terminated.") - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--config', required=True) - args = parser.parse_args() - main(args.config) diff --git a/run-frontend.Dockerfile b/run-frontend.Dockerfile new file mode 100644 index 00000000..1a974841 --- /dev/null +++ b/run-frontend.Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.8-slim + +COPY --from=neuralet/smart-social-distancing:latest-frontend /frontend/build /srv/frontend + +COPY ui/requirements.txt /ui/ +WORKDIR /ui + +RUN python3 -m pip install --upgrade pip setuptools==41.0.0 && pip install -r requirements.txt +COPY ui/ /ui +COPY libs/config_engine.py /ui/ +COPY config-frontend.ini /ui/ + +#EXPOSE 8000 + +ENTRYPOINT ["python3", "web_gui.py"] +CMD ["--config", "config-frontend.ini"] diff --git a/run_dockers-openvino.sh b/run_dockers-openvino.sh new file mode 100755 index 00000000..ce660cbb --- /dev/null +++ b/run_dockers-openvino.sh @@ -0,0 +1,11 @@ +#!/bin/bash +API_PORT=$1 +DASHBOARD_PORT=$2 +QUEUE_PORT=$3 + +echo "running openvino docker ..." +docker run --rm -p $QUEUE_PORT:$QUEUE_PORT -v "$PWD/data":/repo/data neuralet/smart-social-distancing:latest-x86_64_openvino & +echo "running api docker... " +docker run --rm -p $QUEUE_PORT:$QUEUE_PORT -p $API_PORT:$API_PORT -v "$PWD":/repo neuralet/smart-social-distancing:latest-api & +echo "run dashboard ..." +docker run --rm -p $DASHBOARD_PORT:$DASHBOARD_PORT -v "$PWD/data":/repo/data neuralet/smart-social-distancing:latest-run-frontend diff --git a/run_processor_api.py b/run_processor_api.py new file mode 100644 index 00000000..ea06f0ef --- /dev/null +++ b/run_processor_api.py @@ -0,0 +1,30 @@ +#!/usr/bin/python3 +import argparse +from multiprocessing import Process +import threading +from libs.config_engine import ConfigEngine +import logging + +logger = logging.getLogger(__name__) + +def start_api(config): + from api.processor_api import ProcessorAPI + api = ProcessorAPI(config) + + logger.info("API Started.") + api.start() + logger.info("API Terminted.") + +def main(config): + logging.basicConfig(level=logging.INFO) + if isinstance(config, str): + config = ConfigEngine(config) + + start_api(config) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--config', required=True) + args = parser.parse_args() + main(args.config) diff --git a/run_processor_core.py b/run_processor_core.py new file mode 100644 index 00000000..2b243b1e --- /dev/null +++ b/run_processor_core.py @@ -0,0 +1,30 @@ +#!/usr/bin/python3 +import argparse +from multiprocessing import Process +import threading +from libs.config_engine import ConfigEngine +import logging + +logger = logging.getLogger(__name__) + +def start_core(config): + from libs.processor_core import ProcessorCore + core = ProcessorCore(config) + + logger.info("Core Started.") + core.start() + logger.info("Core Terminted.") + +def main(config): + logging.basicConfig(level=logging.INFO) + if isinstance(config, str): + config = ConfigEngine(config) + + start_core(config) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--config', required=True) + args = parser.parse_args() + main(args.config) diff --git a/share/commands.py b/share/commands.py new file mode 100644 index 00000000..e118744b --- /dev/null +++ b/share/commands.py @@ -0,0 +1,6 @@ +from enum import Enum + +class Commands(Enum): + RESTART = 1 + PROCESS_VIDEO_CFG = 2 + STOP_PROCESS_VIDEO = 3 diff --git a/start_services.bash b/start_services.bash new file mode 100644 index 00000000..b6d80b53 --- /dev/null +++ b/start_services.bash @@ -0,0 +1,4 @@ +#!/bin/bash +CONFIG="$1" +python3 run_processor_core.py --config $CONFIG & +python3 run_processor_api.py --config $CONFIG diff --git a/ui/requirements.txt b/ui/requirements.txt new file mode 100644 index 00000000..ec467f46 --- /dev/null +++ b/ui/requirements.txt @@ -0,0 +1,3 @@ +aiofiles +fastapi +uvicorn \ No newline at end of file diff --git a/ui/web_gui.py b/ui/web_gui.py index 8c05263a..10070318 100644 --- a/ui/web_gui.py +++ b/ui/web_gui.py @@ -4,6 +4,10 @@ from fastapi.responses import RedirectResponse, FileResponse, StreamingResponse import uvicorn import os +import argparse +import logging +from config_engine import ConfigEngine +logger = logging.getLogger(__name__) class WebGUI: @@ -20,6 +24,8 @@ def __init__(self, config): self.config = config self._host = self.config.get_section_dict("App")["Host"] self._port = int(self.config.get_section_dict("App")["Port"]) + self.processor_host = self.config.get_section_dict("Processor")["Host"] + self.processor_port = self.config.get_section_dict("Processor")["Port"] self.app = self.create_fastapi_app() def create_fastapi_app(self): @@ -36,7 +42,6 @@ def create_fastapi_app(self): allow_headers=['*']) app.mount("/panel/static", StaticFiles(directory="/srv/frontend/static"), name="panel") - app.mount("/static", StaticFiles(directory="/repo/data/web_gui/static"), name="static") @app.get("/panel/") async def panel(): @@ -52,17 +57,36 @@ async def index(): @app.get("/api/cameras/") async def api_cameras(): + processor_host = f'http://{self.processor_host}:{self.processor_port}' return [{ 'id': 'default', + 'storage_host': processor_host, 'streams': [ - {'src': '/static/gstreamer/default/playlist.m3u8', 'type': 'application/x-mpegURL', + {'src': processor_host + '/static/gstreamer/default/playlist.m3u8', 'type': 'application/x-mpegURL', 'birdseye': False}, - {'src': '/static/gstreamer/default-birdseye/playlist.m3u8', 'type': 'application/x-mpegURL', + {'src': processor_host + '/static/gstreamer/default-birdseye/playlist.m3u8', 'type': 'application/x-mpegURL', 'birdseye': True}, ], }] - + #@app.get("/static/") + #async def redirect(): + # response = RedirectResponse('127.0.0.1:8000/static/') + # return response return app def start(self): uvicorn.run(self.app, host=self._host, port=self._port, log_level='info', access_log=False) + + +def main(config): + logging.basicConfig(level=logging.INFO) + if isinstance(config, str): + config = ConfigEngine(config) + ui = WebGUI(config) + ui.start() + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--config', required=True) + args = parser.parse_args() + main(args.config) diff --git a/x86-openvino.Dockerfile b/x86-openvino.Dockerfile index bfc4fbd3..549bf88e 100644 --- a/x86-openvino.Dockerfile +++ b/x86-openvino.Dockerfile @@ -79,11 +79,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ RUN rm -rf /opt/intel/openvino/opencv /opt/intel/openvino/python/cv2.* /opt/intel/openvino/python/python3/cv2.* ADD docker/x86-openvino/openvino_setupvars.py /opt/openvino_setupvars.py -CMD env `python3 /opt/openvino_setupvars.py` python3 neuralet-distancing.py --config=config-x86-openvino.ini - +ENV DEV_ALLOW_ALL_ORIGINS=true +COPY . /repo WORKDIR /repo -EXPOSE 8000 +CMD env `python3 /opt/openvino_setupvars.py` bash start_services.bash config-x86-openvino.ini -COPY --from=neuralet/smart-social-distancing:latest-frontend /frontend/build /srv/frontend - -COPY . /repo