-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathk9_oak_pipe.py
623 lines (579 loc) · 27.6 KB
/
k9_oak_pipe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
#!/usr/bin/env python
# coding: utf-8
# Author: Richard Hopkins
# Date: 30 October 2022
#
# This program uses the OAK-D lite camera from Luxonis
# to support three key functions:
# 1. to identify the vector to the person in front of K9
# 2. to identify the vector to the nearest vertical obstacle
# that is near K9 (that may not be recognisable as a person)
# 3. generate a point cloud to help avoid forward collisions
# and to avoid straight line collisions
#
import time
print("Time started...")
import argparse
print("Ready for arguments...")
import math
print("Counting on fingers... yep")
import depthai as dai
print("Depthai looking forward...")
import numpy as np
print("Numpy is running...")
import pandas as pd
print("Pandas are frolicking...")
import warnings
from memory import Memory
print("All imports done!")
def main():
global testing
parser = argparse.ArgumentParser(description='Runs OAK pipe to find people in view.')
'''parser.add_argument('command',
choices=['arc','fd','bk','lt','rt','stop'],
help='movement command')
parser.add_argument('parameter',
type=float,
default=0.0,
nargs='?',
help='distance in metres or angle in radians')
'''
parser.add_argument('-t', '--test',
action='store_true',
help='execute in visual testing mode')
args = parser.parse_args()
testing = args.test
if testing:
print("Visual test mode active")
class Point_Cloud():
'''
Calculates a point cloud of an asked for size in mm
expressed as a 2D numpy array of 100x100mm blocks.
Each block contains the median depth reading of all the pixels
that have been projected into each block by the point cloud.
'''
def __init__(self, width:int=4000, height:int=1560, min_depth:float = 200.0, max_depth:float = 10000.0):
self.width = width
self.height = height
self.width_elem = int(self.width/pc_width)
self.height_elem = int(self.height/pc_height)
self.pc_min_range = min_depth
self.pc_max_range = max_depth
self.x_bins = pd.interval_range(start = -int(self.width/2), end = int(self.width/2), periods = int(self.width/pc_width))
self.y_bins = pd.interval_range(start = 0, end = self.height, periods = int(self.height/pc_height))
self.column, self.row = np.meshgrid(np.arange(pc_width), np.arange(pc_height), sparse=True)
def populate_bins(self, depth_image):
'''
Work out which points are valid, project them into a point cloud and then
group them into bins. The median value of each bin is then reported back
as a two dimensional numpy array (height x width)
'''
# Ignore points too close or too far away
valid = (depth_image >= self.pc_min_range) & (depth_image <= self.pc_max_range)
# Calculate the point cloud using simple extrapolation from depth
z = np.where(valid, depth_image, 0.0)
x = np.where(valid, (z * (self.column - cx) / cx / fx) + 120.0, self.pc_max_range)
y = np.where(valid, cam_height - (z * (self.row - cy) / cy / fy), self.pc_max_range)
z2 = z.flatten()
x2 = x.flatten()
y2 = y.flatten()
cloud = np.column_stack((x2,y2,z2))
# Remove points that are projected to fall outside the field of view
# points below floor level, above 1.6m or those more than 2m to the
# sides of the robot are ignored
in_scope = (cloud[:,1] < self.height) & (cloud[:,1] > 0) & (cloud[:,0] < self.width/2) & (cloud[:,0] > -self.width/2)
in_scope = np.repeat(in_scope, 3)
in_scope = in_scope.reshape(-1, 3)
scope = np.where(in_scope, cloud, np.nan)
scope = scope[~np.isnan(scope).any(axis=1)]
# place the points into a set of 10cm square bins
x_index = pd.cut(scope[:,0], self.x_bins)
y_index = pd.cut(scope[:,1], self.y_bins)
binned_depths = pd.Series(scope[:,2])
# simplify each bin to a single median value
totals = binned_depths.groupby([y_index, x_index]).median()
# shape the simplified bins into a 2D array
totals = totals.values.reshape(self.height_elem,self.width_elem)
return totals
class Big_Point_Cloud():
'''
Creates a point cloud that determines any obstacles in front of the robot
'''
def __init__(self):
self.bpc = Point_Cloud(4000,1560)
# Pre-calculate the angles in the point cloud
self.angles_array = np.arange(-36.5, 39.42, 3.04)
def calc_cartesian(self, angle, depth):
x = depth * math.sin(math.radians(angle))
y = depth * math.cos(math.radians(angle))
return x,y
def record_point_cloud(self,depth_image):
'''
Distills the point cloud down to a single value that is the distance to the
nearest obstacle that is directly in front of the robot
'''
totals = self.bpc.populate_bins(depth_image)
focus = totals[0:7,10:15]
min_dist = float(np.nanmin(focus))
if np.isnan(min_dist):
min_dist = 0.0
else:
min_dist = float(min_dist/1000.0)
mem.storeState("forward", min_dist)
# for each column in the array, find out the closest
# bin; as the robot cannot duck or jump, the
# y values are irrelevant
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
point_cloud = np.nanmin(totals, axis = 0)
# inject the resulting 40 sensor points into the
# short term memory of the robot
if testing:
colour_green = (0, 255, 0)
points = np.zeros((27,2))
for index, point in enumerate(point_cloud):
angle = self.angles_array[index]
depth = point/10.0
x,y = self.calc_cartesian(angle, depth)
if not (np.isnan(x) or np.isnan(y)):
points[index+1,0] = x
points[index+1,1] = y
x_min = np.nanmin(points[:,0])
x_max = np.nanmax(points[:,0])
y_min = np.nanmin(points[:,1])
y_max = np.nanmax(points[:,1])
win_width = int(abs(x_max - x_min))
if np.isnan(win_width):
win_width = 300
win_height = int(abs(y_max - y_min))
if np.isnan(win_height):
win_height = 300
pc_image = np.zeros((win_height, win_width, 3), np.uint8)
for index in range(26):
from_p = (int(points[index,0] - x_min), int(points[index,1] - y_min))
to_p = (int(points[index+1,0] - x_min), int(points[index+1,1] - y_min))
cv2.line(pc_image, from_p, to_p, color=(0,255,0), thickness = 3)
x_text = "X: {:.1f}m, {:.1f}m".format(x_min/100, x_max/100)
y_text = "Y: {:.1f}m, {:.1f}m".format(y_min/100, y_max/100)
pc_image = cv2.putText(pc_image, x_text, (10, 20), cv2.FONT_HERSHEY_PLAIN, 1, colour_green)
pc_image = cv2.putText(pc_image, y_text, (10, 40), cv2.FONT_HERSHEY_PLAIN, 1, colour_green)
cv2.imshow("Point cloud", pc_image)
for index, point in enumerate(point_cloud):
mem.storeSensorReading("oak",float(point/1000.0), math.radians(float(self.angles_array[index])))
class Legs_Detector():
''''
Detects legs and records the vector to them. This is done by
detecting vertical slices of the image that include an object
and then averaging the distance for each slice. One or two contiguous
rectangle of a minimum width are used to determine the target.
'''
def __init__ (self):
self.keep_top = 0.7 # bottom 15% of image tends to include floor
self.certainty = 0.5 # likelihood that a person in in the column
# max_depth_diff is the largest allowable differrence between the column
# depth readings in mm to allow both columns to be taken into account in the
# direction and depth.
self.max_depth_diff = 200
# Min_col_size is the minimum width of a consecutive column
# to allow it to be taken into account.
self.min_col_size = 5
# max_gap_dist_prod is a measure of how close two rectangles
# are in reality by multiplying the gaps between their centres
# by the sensed depth. Too large a number means the two rectangles
# can't be legs from the same person
self.max_gap_dist_prod = 100000.0
def record_legs_vector(self,depth_image) -> dict:
'''
Analyse image and record vector to legs, returns a dict of
columns that contain legs and target data
'''
# reduce the size of the depth image by decimating it by
# a factor (numbers between 3 and 20 seem to work best)
# remove the bottom of the image as the figures that
# are valid are mostly floor
pix_height, pix_width = depth_image.shape
pix_height = int(self.keep_top * pix_height)
# just use the depth data within valid ranges
valid_frame = (depth_image >= min_range) & (depth_image <= max_range)
# invalid cells will be set to max_range
valid_image = np.where(valid_frame, depth_image, max_range)
# remove the bottom 15% of the image
valid_image = valid_image[0:pix_height,:]
# work out which columns are likely to contain a vertical object
columns = np.sum(valid_image < max_range, axis = 0) >= (self.certainty*pix_height)
columns = columns.reshape(1,-1)
# work out the average distance per column for valid readings
distance = np.average(valid_image, axis = 0)
# eliminate any readings not in a column with a consistent vertical object
useful_distances = distance * columns
# narrow down the array to just those 'vertical' columns
subset = useful_distances[np.where((useful_distances < max_range) & (useful_distances > 0.0))]
# determine the middle of the depth image
mid_point = int((pix_width - 1.0 ) / 2.0)
# collate a list of all the column numbers that have the
# 'vertical' data in them
indices = columns.nonzero()[1]
# if there are sufficient non zero columns left, then
# determine the average angle that these columns as a multiplier for the h_fov
num_cols = int(len(indices))
# Create a column_set list of each group of indices
column_set = np.split(indices, np.where(np.diff(indices) != 1)[0]+1)
# Strip out all the consecutive columns that are not wide enough
# and place the remaining colums in big_cols
big_cols = []
for col in column_set:
if col.size >= self.min_col_size:
big_cols.append(col)
# Calculate the mean distances (big_dists) for the big columns and
# store their column ranges (big_col_ind).
big_dists = []
big_col_ind = []
indices = {}
if len(big_cols) > 0:
for big_col in big_cols:
bc_min = np.amin(big_col)
bc_max = np.amax(big_col)
big_col_ind.append([bc_min, bc_max])
dists = []
for col in big_col:
dists.append(np.mean(valid_image[:,col-1]))
dist_mean = float(np.mean(dists))
big_dists.append(dist_mean)
else:
mem.storeSensorReading("follow",0,0)
return None
# record the result for the nearest column
result = {}
index_min = np.argmin(big_dists)
result = {
"index" : int(index_min),
"dist" : float(dist_mean),
"min_col" : int(big_col_ind[index_min][0]),
"max_col" : int(big_col_ind[index_min][1])
}
'''
Refine the result by deciding whether to combine in the second
nearest contiguous rectangle in terms of depth. This will only happen
if the depth distances are within a fixed tolerance and the rectangle
themselves are within a range determined by their distance.
If the depth is small then the rectangles can be further apart,
if the depth is further, then the rectangles must be closer.
This measure is calculated from the product of the distance between
the centres of the rectangle and their minimum distance.
'''
result["combined"] = 0
if np.size(big_dists) > 1:
store = big_dists[index_min]
big_dists[index_min] = max_range
index_next_min = np.argmin(big_dists)
big_dists[index_min] = store
cen1 = float((big_col_ind[index_min][0] + big_col_ind[index_min][1]) / 2)
cen2 = float((big_col_ind[index_next_min][0] + big_col_ind[index_next_min][1]) / 2)
dist_cen = float(abs(cen1 - cen2))
if ((big_dists[index_next_min] - big_dists[index_min]) < self.max_depth_diff) and \
((dist_cen * float(big_dists[index_min])) < self.max_gap_dist_prod):
result["min_col"] = int(min(big_col_ind[index_min][0], big_col_ind[index_next_min][0]))
result["max_col"] = int(max(big_col_ind[index_min][1],big_col_ind[index_next_min][1]))
result["combined"] = 1.0
# prepare the result object
direction = float((((float(result["max_col"]) + float(result["min_col"])) / 2) - mid_point) / pix_width)
angle = float(direction * math.radians(cam_h_fov))
my_angle = math.degrees(angle)
move = float(result["dist"] - sweet_spot)
move = move / 1000.0 # convert to m
# print("Follow:", move, angle)
mem.storeSensorReading("follow", move, angle)
result["top"] = self.keep_top
result["angle"] = float(my_angle)
result["dist"] = float(result["dist"]) / 1000.0
result["max_cols"] = int(pix_width)
result["num_cols"] = float(result["max_col"]) - float(result["min_col"])
return result
class Person_Detector():
'''
This part of the code will identify the nearest
person in front of K9 (up to about 5m away). As they
move, they should be tracked
'''
def __init__(self):
# Initially there is no identified target, so dictionary is empty
self.target = {
"id" : None,
"status": None,
"x" : None,
"z" : None
}
def record_person_vector (self, trackletsData) -> dict:
'''
If a target has been identified than look through the trackletData
and retrieve the latest information tracklet for that id
and store it in the target object
if there is no active matching id, then drop them as a target
if there is NO target identified yet, then scan the trackletData and
find the closest NEW or TRACKED tracklet instance and make them the
target. Record the tracklet vector in robot dog's memory
'''
heel_range = heel_upper # reset range to max
if self.target["id"] is not None:
# extract the tracklet id that matches the existing id
candidate = [tracklet for tracklet in trackletsData
if tracklet.id == self.target["id"]
if tracklet.status.name == "TRACKED"
]
if candidate:
# refresh the data if identified
self.target["id"] = candidate[0].id
self.target["status"] = candidate[0].status.name
self.target["x"] = candidate[0].spatialCoordinates.x
self.target["z"] = candidate[0].spatialCoordinates.z
else:
# drop the target otherwise
self.target["id"] = None
else:
# look for any new or tracked tracklets
candidates = [tracklet for tracklet in trackletsData
if (tracklet.status.name == "NEW"
or tracklet.status.name == "TRACKED")]
for candidate in candidates:
# identify the closest tracklet
if candidate.spatialCoordinates.z < heel_range:
heel_range = candidate.spatialCoordinates.z
self.target["id"] = candidate.id
self.target["status"] = candidate.status.name
self.target["x"] = candidate.spatialCoordinates.x
self.target["z"] = candidate.spatialCoordinates.z
self.target["tracklet"] = candidate
# store the nearest tracket (if there is one) in
# the short term memory
if self.target["id"] is not None:
z = float(self.target["z"])
x = float(self.target["x"])
angle = ( math.pi / 2 ) - math.atan2(z, x)
trk_distance = max((math.sqrt(z ** 2 + x ** 2 )) - sweet_spot, 0)
trk_distance = trk_distance / 1000.0
mem.storeSensorReading("person",trk_distance,angle)
my_angle = math.degrees(angle)
target_dict = {
"angle" : my_angle,
"dist" : trk_distance,
"tracklet" : self.target["tracklet"]
}
return target_dict
else:
mem.storeSensorReading("person",0,0)
return {}
testing = False
mem = Memory()
# OAK-D Lite Horizontal FoV
cam_h_fov = 73.0
# Point cloud variabless
cam_height = 268.0 # mm distance from floor
fx = 1.3514 # values derived mathematically due to new resolution and fov
fy = 1.7985 # values derived mathematically due to new resolution and fov
pc_width = 160
cx = pc_width / 2
pc_height = 120
cy = pc_height / 2
# Shared contraints
min_range = 750.0 # default for device is mm
max_range = 1750.0 # default for device is mm
sweet_spot = min_range + ((max_range - min_range) / 2.0)
# Heel constants
heel_confidence = 0.7 # NN certainty that its a person
heel_lower = 200 # minimum depth for person detection
heel_upper = 5000 # maximu depth for person detection
# Path to NN model
nnBlob = "/home/pi/depthai-python/examples/models/mobilenet-ssd_openvino_2021.4_5shave.blob"
# Create OAK-D Lite pipeline
print("Creating Oak pipeline...")
pipeline = dai.Pipeline()
# Create nodes within pipeline
stereo = pipeline.create(dai.node.StereoDepth)
right = pipeline.create(dai.node.MonoCamera)
left = pipeline.create(dai.node.MonoCamera)
camRgb = pipeline.create(dai.node.ColorCamera)
spatialDetectionNetwork = pipeline.create(dai.node.MobileNetSpatialDetectionNetwork)
objectTracker = pipeline.create(dai.node.ObjectTracker)
# Configure stereo vision node
stereo.setLeftRightCheck(True)
stereo.setExtendedDisparity(False)
stereo.setSubpixel(True)
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_ACCURACY)
stereo.initialConfig.setMedianFilter(dai.StereoDepthProperties.MedianFilter.KERNEL_7x7)
stereo.setRectifyEdgeFillColor(0)
# Configure mono camera nodes
right.setBoardSocket(dai.CameraBoardSocket.RIGHT)
left.setBoardSocket(dai.CameraBoardSocket.LEFT)
right.setResolution(dai.MonoCameraProperties.SensorResolution.THE_480_P)
left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_480_P)
# Configure colour camera node
camRgb.setPreviewSize(300, 300)
camRgb.setPreviewKeepAspectRatio(False)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camRgb.setInterleaved(False)
camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
# Configure Spatial Detection Neural Network Node
spatialDetectionNetwork.setBlobPath(nnBlob)
spatialDetectionNetwork.setConfidenceThreshold(heel_confidence)
spatialDetectionNetwork.input.setBlocking(False)
spatialDetectionNetwork.setBoundingBoxScaleFactor(0.5)
spatialDetectionNetwork.setDepthLowerThreshold(heel_lower)
spatialDetectionNetwork.setDepthUpperThreshold(heel_upper)
# Configure Object Tracker Node
objectTracker.setDetectionLabelsToTrack([15])
objectTracker.setTrackerType(dai.TrackerType.ZERO_TERM_COLOR_HISTOGRAM)
objectTracker.setTrackerIdAssignmentPolicy(dai.TrackerIdAssignmentPolicy.SMALLEST_ID)
objectTracker.inputTrackerFrame.setBlocking(False)
objectTracker.inputTrackerFrame.setQueueSize(2)
# Linking nodes together in pipeline
# Connect mono cameras to stereo pipe
left.out.link(stereo.left)
right.out.link(stereo.right)
# Connect colour camera preview to Spatial Detection Network pipeline
camRgb.preview.link(spatialDetectionNetwork.input)
# Connect colour camera video frame to Object Tracker pipeline
camRgb.video.link(objectTracker.inputTrackerFrame)
# Pass the video received.by the Spatial Detection Network to the to Object Tracker input
spatialDetectionNetwork.passthrough.link(objectTracker.inputDetectionFrame)
# Connext the output of the Spatial Detection Network to the input of the object tracker
spatialDetectionNetwork.out.link(objectTracker.inputDetections)
# Connect the depth output to the Spatial Detection Network depth input
stereo.depth.link(spatialDetectionNetwork.inputDepth)
# Define OAK-lite output streams
# Create depth output stream
xOut = pipeline.create(dai.node.XLinkOut)
xOut.setStreamName("depth")
stereo.depth.link(xOut.input)
# Create tracker output stream
trackerOut = pipeline.create(dai.node.XLinkOut)
trackerOut.setStreamName("tracklets")
objectTracker.out.link(trackerOut.input)
# Decimate the depth image by a factor of 4
config = stereo.initialConfig.get()
config.postProcessing.decimationFilter.decimationMode.NON_ZERO_MEDIAN
config.postProcessing.decimationFilter.decimationFactor = 4
stereo.initialConfig.set(config)
# if executed from the command line then execute arguments as functions
if __name__ == '__main__':
main()
if testing:
import cv2
print("Windows are open...")
xOutRgb = pipeline.create(dai.node.XLinkOut)
xOutRgb.setStreamName("rgb")
camRgb.video.link(xOutRgb.input)
# Declare the device
with dai.Device(pipeline) as device:
# declare buffer queues for the streams
FPS = 0.0
qDep = device.getOutputQueue(name="depth", maxSize=3, blocking=False)
if testing:
qRgb = device.getOutputQueue(name="rgb", maxSize=1, blocking=False)
qTrack = device.getOutputQueue("tracklets", maxSize=1, blocking=False)
print("Oak pipeline running...")
f_pc = Big_Point_Cloud()
f_ld = Legs_Detector()
f_pd = Person_Detector()
if testing:
cv2.namedWindow("OAK Perception Preview", cv2.WINDOW_NORMAL)
FPS_counter = 0
last_reading = time.time()
# Main loop starts here
while True:
start_time = time.time() # start time of the loop
depth_image = qDep.get().getCvFrame() # get latest information from queue
# Retrieve latest tracklets
track = qTrack.get()
trackletsData = track.tracklets
f_pc.record_point_cloud(depth_image)
legs_dict = f_ld.record_legs_vector(depth_image=depth_image)
target_dict = f_pd.record_person_vector(trackletsData=trackletsData)
if testing:
frame_min = np.amin(depth_image)
depth_image = depth_image - frame_min
mean = np.mean(depth_image)
disp = (depth_image / mean * 128.0).astype(np.uint8)
dim = (640, 480)
resized = cv2.resize(disp, dim, interpolation = cv2.INTER_AREA)
im_frame = cv2.applyColorMap(resized, cv2.COLORMAP_HOT)
cv2.imshow("False Depth Image", im_frame)
in_rgb = qRgb.get()
preview = in_rgb.getCvFrame() # get RGB frame
# Resize frame to fit Pi VNC viewer
scale = 0.5
width = int(preview.shape[1] * scale)
height = int(preview.shape[0] * scale)
dsize = (width, height)
colour_red = (0, 0, 255)
col_white = (255,255,255)
colour_green = (0, 255, 0)
thickness = 3
output = cv2.resize(preview, dsize)
# Include safe distance text and
# frames per second on preview window
min_dist = mem.retrieveState("forward")
if min_dist:
dist_txt = "safe dist = {:.2f}m".format(min_dist)
output = cv2.putText(output, dist_txt, (10, height - 40), cv2.FONT_HERSHEY_PLAIN, 1, col_white)
FPS_txt = "FPS = {:.2f}".format(FPS)
output = cv2.putText(output, FPS_txt, (10, height - 20), cv2.FONT_HERSHEY_PLAIN, 1, col_white)
# Draw a green bounding box
# around the nearest person
if target_dict:
t = target_dict["tracklet"]
bearing_txt = "t0 = {:.0f} degrees".format(target_dict['angle'])
dist_txt = "td = {:.2f}m".format(target_dict['dist'])
roi = t.roi.denormalize(width, height)
x1 = int(roi.topLeft().x)
y1 = int(roi.topLeft().y)
x2 = int(roi.bottomRight().x)
y2 = int(roi.bottomRight().y)
output = cv2.rectangle(output, (x1, y1), (x2, y2), colour_green, thickness)
output = cv2.putText(output, bearing_txt, (x1 + 10, y2 - 40), cv2.FONT_HERSHEY_PLAIN, 1, colour_green)
output = cv2.putText(output, dist_txt, (x1 + 10, y2 - 20), cv2.FONT_HERSHEY_PLAIN, 1, colour_green)
# If legs have been spotted, draw red
# bounding boxes
if legs_dict:
box_min = legs_dict["min_col"]
box_max = legs_dict["max_col"]
cols = legs_dict['max_cols']
x_min = int(box_min /cols * width)
x_max = int(box_max / cols * width)
y_max = int(height * legs_dict['top'])
output = cv2.rectangle(output, (x_min, 0), (x_max, y_max), colour_red, thickness)
mean_col = (box_min + box_max) / 2
x_dir = int(mean_col/cols * width)
output = cv2.circle(output, (x_dir, int(y_max/2)), 10, colour_red, thickness)
bearing_txt = "0 = {:.0f} degrees".format(legs_dict['angle'])
dist_txt = "d = {:.2f}m".format(legs_dict['dist'])
com_txt = "comb: " + str(legs_dict["combined"])
output = cv2.putText(output, bearing_txt, (x_dir + 15, int(y_max/2)), cv2.FONT_HERSHEY_PLAIN, 1, colour_red)
output = cv2.putText(output, dist_txt, (x_dir + 15, int(y_max/2) + 20), cv2.FONT_HERSHEY_PLAIN, 1, colour_red)
output = cv2.putText(output, com_txt, (x_dir + 15, int(y_max/2) + 40), cv2.FONT_HERSHEY_PLAIN, 1, colour_red)
cv2.imshow("OAK Perception Preview", output)
if cv2.waitKey(250) == ord("q"):
break
# print out the FPS achieved
FPS_counter +=1
now_time = time.time()
if not testing:
time.sleep(0.05)
# Every 10 seconds print out the short term memory
if (now_time - last_reading) > 10:
FPS = FPS_counter / 10.0
print("FPS: {:.1f}".format(FPS))
last_reading = now_time
FPS_counter = 0
person = mem.retrieveLastSensorReading("person")
try:
print("Person at: {:.2f}m and at {:.2f} degrees.".format(person['distance'], math.degrees(person['angle'])))
except KeyError:
print("No Person currently detected")
follow = mem.retrieveLastSensorReading("follow")
try:
print("Move towards: {:.2f}m and at {:.2f} degrees.".format((follow['distance']), math.degrees(follow['angle'])))
except KeyError:
print("Nothing to follow")
min_dist = mem.retrieveState("forward")
print("Can't move more than {:.1f}m forward".format(min_dist))
print("*** OAK PIPE OUTPUT ENDS ***")