Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions amd64-usbtpu.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
python3-dev \
&& apt-get autoremove -y

ENTRYPOINT ["python3", "neuralet-distancing.py"]
CMD ["--config", "config-skeleton.ini"]
#ENTRYPOINT ["python3", "neuralet-distancing.py"]
#CMD ["--config", "config-skeleton.ini"]
ENTRYPOINT ["bash", "start_services.bash"]
CMD ["config-skeleton.ini"]

WORKDIR /repo
EXPOSE 8000

Expand Down
7 changes: 7 additions & 0 deletions api.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM python:3.8-slim

RUN python3 -m pip install --upgrade pip setuptools==41.0.0 && pip install aiofiles fastapi uvicorn
ENV DEV_ALLOW_ALL_ORIGINS=true
WORKDIR /repo
ENTRYPOINT ["python3", "run_processor_api.py"]
CMD ["--config", "config-api.ini"]
97 changes: 97 additions & 0 deletions api/processor_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import time
from threading import Thread
from queue import Queue
from multiprocessing.managers import BaseManager
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, StreamingResponse
import uvicorn
import os
import logging

from share.commands import Commands

logger = logging.getLogger(__name__)

class QueueManager(BaseManager): pass

class ProcessorAPI:
"""
The Webgui object implements a fastapi application and acts as an interface for users.
Once it is created it will act as a central application for viewing outputs.

:param config: Is a ConfigEngine instance which provides necessary parameters.
:param engine_instance: A ConfigEngine object which store all of the config parameters. Access to any parameter
is possible by calling get_section_dict method.
"""

def __init__(self, config):
self.config = config
self._setup_queues()
self._host = self.config.get_section_dict("API")["Host"]
self._port = int(self.config.get_section_dict("API")["Port"])
self.app = self.create_fastapi_app()

def _setup_queues(self):
QueueManager.register('get_cmd_queue')
QueueManager.register('get_result_queue')
self._queue_host = self.config.get_section_dict("CORE")["Host"]
self._queue_port = int(self.config.get_section_dict("CORE")["QueuePort"])
auth_key = self.config.get_section_dict("CORE")["QueueAuthKey"]
self._queue_manager = QueueManager(address=(self._queue_host, self._queue_port), authkey=auth_key.encode('ascii'))

while True:
try:
self._queue_manager.connect()
break
except ConnectionRefusedError:
logger.warning("Waiting for core's queue to initiate ... ")
time.sleep(1)

logger.info("Connection established to Core's queue")
self._cmd_queue = self._queue_manager.get_cmd_queue()
self._result_queue = self._queue_manager.get_result_queue()

def create_fastapi_app(self):
# Create and return a fastapi instance
app = FastAPI()

if os.environ.get('DEV_ALLOW_ALL_ORIGINS', False):
# This option allows React development server (which is served on another port, like 3000) to proxy requests
# to this server.
# WARNING: read this before enabling it in your environment:
# https://medium.com/@stestagg/stealing-secrets-from-developers-using-websockets-254f98d577a0
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(CORSMiddleware, allow_origins='*', allow_credentials=True, allow_methods=['*'],
allow_headers=['*'])
app.mount("/static", StaticFiles(directory="/repo/data/web_gui/static"), name="static")

@app.get("/restart-engine")
async def restart_engine():
logger.info("restart-engine requests on api")
self._cmd_queue.put(Commands.RESTART)
logger.info("waiting for core's response...")
result = self._result_queue.get()
return result

@app.get("/process-video-cfg")
async def restart_engine():
logger.info("process-video-cfg requests on api")
self._cmd_queue.put(Commands.PROCESS_VIDEO_CFG)
logger.info("waiting for core's response...")
result = self._result_queue.get()
return result

@app.get("/stop-process-video")
async def restart_engine():
logger.info("stop-process-video requests on api")
self._cmd_queue.put(Commands.STOP_PROCESS_VIDEO)
logger.info("waiting for core's response...")
result = self._result_queue.get()
return result

return app

def start(self):
uvicorn.run(self.app, host=self._host, port=self._port, log_level='info', access_log=False)

5 changes: 5 additions & 0 deletions build-dockers-openvino.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
docker build -f x86-openvino.Dockerfile -t "neuralet/smart-social-distancing:latest-x86_64_openvino"
docker build -f api.Dockerfile -t "neuralet/smart-social-distancing:latest-api" .
docker build -f frontend.Dockerfile -t "neuralet/smart-social-distancing:latest-frontend" .
docker build -f run-frontend.Dockerfile -t "neuralet/smart-social-distancing:latest-run-frontend" .
8 changes: 8 additions & 0 deletions config-api.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[API]
Host: 0.0.0.0
Port: 8003

[CORE]
Host: 0.0.0.0
QueuePort: 8010
QueueAuthKey: shibalba
7 changes: 7 additions & 0 deletions config-frontend.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[App]
Host: 0.0.0.0
Port: 8002

[Processor]
Host: 0.0.0.0
Port: 8001
11 changes: 9 additions & 2 deletions config-x86-openvino.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
[API]
Host: 0.0.0.0
Port: 8001

[CORE]
Host: 0.0.0.0
QueuePort: 8010
QueueAuthKey: shibalba

[App]
VideoPath: /repo/data/TownCentreXVID.avi
Host: 0.0.0.0
Port: 8000
Resolution: 640,480
Encoder: videoconvert ! video/x-raw,format=I420 ! x264enc speed-preset=ultrafast
; WIP https://github.com/neuralet/neuralet/issues/91
Expand Down
2 changes: 1 addition & 1 deletion frontend.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM node:14-alpine as builder
WORKDIR /frontend
COPY frontend/package.json frontend/yarn.lock /frontend/
RUN yarn install --production
RUN yarn install --network-timeout 1000000 --production
COPY frontend /frontend
RUN yarn build

Expand Down
2 changes: 1 addition & 1 deletion frontend/src/Live.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ function Charts({cameras}) {
if (!cameras) {
return
}
const url = `/static/data/${cameras[0]['id']}/objects_log/${new Date().toISOString().slice(0, 10)}.csv`;
const url = `${cameras[0]['storage_host']}/static/data/${cameras[0]['id']}/objects_log/${new Date().toISOString().slice(0, 10)}.csv`;
axios.get(url, {headers}).then(response => {
let records = Plotly.d3.csv.parse(response.data);
let x1 = [], y1 = [], y2 = [], env_score = [];
Expand Down
16 changes: 14 additions & 2 deletions libs/core.py → libs/distancing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import cv2 as cv
import numpy as np
import math
Expand All @@ -8,7 +9,7 @@
from libs.loggers.loggers import Logger
from tools.environment_score import mx_environment_scoring_consider_crowd
from tools.objects_post_process import extract_violating_objects
from ui.utils import visualization_utils
from libs.utils import visualization_utils
import logging

logger = logging.getLogger(__name__)
Expand All @@ -21,6 +22,7 @@ def __init__(self, config):
self.detector = None
self.device = self.config.get_section_dict('Detector')['Device']
self.running_video = False

self.tracker = CentroidTracker(
max_disappeared=int(self.config.get_section_dict("PostProcessor")["MaxTrackFrame"]))
self.logger = Logger(self.config)
Expand Down Expand Up @@ -48,6 +50,7 @@ def __init__(self, config):
self.dist_threshold = self.config.get_section_dict("PostProcessor")["DistThreshold"]
self.resolution = tuple([int(i) for i in self.config.get_section_dict('App')['Resolution'].split(',')])
self.birds_eye_resolution = (200, 300)
print('Distancing object created')

def __process(self, cv_image):
"""
Expand Down Expand Up @@ -151,13 +154,13 @@ def process_video(self, video_uri):
dist_threshold = float(self.config.get_section_dict("PostProcessor")["DistThreshold"])
class_id = int(self.config.get_section_dict('Detector')['ClassID'])
frame_num = 0

while input_cap.isOpened() and self.running_video:
_, cv_image = input_cap.read()
birds_eye_window = np.zeros(self.birds_eye_resolution[::-1] + (3,), dtype="uint8")
if np.shape(cv_image) != ():
cv_image, objects, distancings = self.__process(cv_image)
output_dict = visualization_utils.visualization_preparation(objects, distancings, dist_threshold)

category_index = {class_id: {
"id": class_id,
"name": "Pedestrian",
Expand Down Expand Up @@ -206,17 +209,26 @@ def process_video(self, video_uri):

out.write(cv_image)
out_birdseye.write(birds_eye_window)

frame_num += 1
if frame_num % 1000 == 1:
logger.info(f'processed frame {frame_num} for {video_uri}')
else:
continue

self.logger.update(objects, distancings)


input_cap.release()
out.release()
out_birdseye.release()

self.running_video = False

def stop_process_video(self):
self.running_video = False


def calculate_distancing(self, objects_list):
"""
this function post-process the raw boxes of object detector and calculate a distance matrix
Expand Down
93 changes: 93 additions & 0 deletions libs/processor_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from threading import Thread
from queue import Queue
from multiprocessing.managers import BaseManager
import logging
from share.commands import Commands
from libs.distancing import Distancing as CvEngine

logger = logging.getLogger(__name__)

class QueueManager(BaseManager): pass


class ProcessorCore:

def __init__(self, config):
self.config = config
self._cmd_queue = Queue()
self._result_queue = Queue()
self._setup_queues()
self._engine = CvEngine(self.config)
self._tasks = {}


def _setup_queues(self):
QueueManager.register('get_cmd_queue', callable=lambda: self._cmd_queue)
QueueManager.register('get_result_queue', callable=lambda: self._result_queue)
self._host = self.config.get_section_dict("CORE")["Host"]
self._queue_port = int(self.config.get_section_dict("CORE")["QueuePort"])
auth_key = self.config.get_section_dict("CORE")["QueueAuthKey"]
self._queue_manager = QueueManager(address=(self._host, self._queue_port), authkey=auth_key.encode('ascii'))
self._queue_manager.start()
self._cmd_queue = self._queue_manager.get_cmd_queue()
self._result_queue = self._queue_manager.get_result_queue()
logger.info("Core's queue has been initiated")


def start(self):
logging.info("Starting processor core")
self._serve()
logging.info("processor core has been terminated.")


def _serve(self):
while True:
logger.info("Core is listening for commands ... ")
cmd_code = self._cmd_queue.get()
logger.info("command received: " + str(cmd_code))

if cmd_code == Commands.RESTART:
# Do everything necessary ... make sure all threads in tasks are stopped
if Commands.PROCESS_VIDEO_CFG in self._tasks.keys():
logger.warning("currently processing a video, stopping ...")
self._engine.stop_process_video()

# TODO: Be sure you have done proper action before this so all threads are stopped
self._tasks = {}

self._engine = CvEngine(self.config)
logger.info("engine restarted")
self._result_queue.put(True)
continue

elif cmd_code == Commands.PROCESS_VIDEO_CFG:
if Commands.PROCESS_VIDEO_CFG in self._tasks.keys():
logger.warning("Already processing a video! ...")
self._result_queue.put(False)
continue

self._tasks[Commands.PROCESS_VIDEO_CFG] = Thread(target = self._engine.process_video, \
args=(self.config.get_section_dict("App").get("VideoPath"),) )
self._tasks[Commands.PROCESS_VIDEO_CFG].start()
logger.info("started to process video ... ")
self._result_queue.put(True)
continue

elif cmd_code == Commands.STOP_PROCESS_VIDEO :
if Commands.PROCESS_VIDEO_CFG in self._tasks.keys():
self._engine.stop_process_video()
del self._tasks[Commands.PROCESS_VIDEO_CFG]
logger.info("processing stopped")
self._result_queue.put(True)
else:
logger.warning("no video is being processed")
self._result_queue.put(False)

continue

else:
logger.warning("Invalid core command " + str(cmd_code))
self._result_queue.put("invalid_cmd_code")
continue


File renamed without changes.
File renamed without changes.
Loading