Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 96 additions & 59 deletions contourusv/detection.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
import cv2
import pandas as pd
import os
# import remove_suffix # Removed because not needed


def draw_dashed_rect(img, pt1, pt2, color, thickness=2, dash_length=5):
x1, y1 = pt1
x2, y2 = pt2

# horizontal top
for x in range(x1, x2, dash_length*2):
cv2.line(img, (x, y1), (min(x+dash_length, x2), y1), color, thickness)
# horizontal bottom
for x in range(x1, x2, dash_length*2):
cv2.line(img, (x, y2), (min(x+dash_length, x2), y2), color, thickness)
# vertical left
for y in range(y1, y2, dash_length*2):
cv2.line(img, (x1, y), (x1, min(y+dash_length, y2)), color, thickness)
# vertical right
for y in range(y1, y2, dash_length*2):
cv2.line(img, (x2, y), (x2, min(y+dash_length, y2)), color, thickness)

def detect_contours(cleaned_image, start_time, end_time, freq_min, freq_max,
file_name, annotations, call_type_defs=None, processing="adaptive"):
file_name, annotations, call_type_defs=None, processing="adaptive",):
"""
Detect and classify USVs in cleaned spectrogram images.
Detect and classify USVs in cleaned spectrogram images, with optional
overlay of manual annotations for evaluation.

Parameters
----------
Expand All @@ -21,6 +43,10 @@ def detect_contours(cleaned_image, start_time, end_time, freq_min, freq_max,
Source audio file path
annotations : list
List to accumulate detection annotations
call_type_defs : dict
Call type definitions
processing : str
Thresholding method ("adaptive" or "Otsu")

Returns
-------
Expand All @@ -31,21 +57,20 @@ def detect_contours(cleaned_image, start_time, end_time, freq_min, freq_max,
"""
if call_type_defs is None:
call_type_defs = {
"22kHz": {"freq_min": 15,
"freq_max": 45,
"freq_span_max": 10,
"duration_min": 0.03, # .03 was original
"duration_max": 3.0},
"50kHz": {"freq_min": 40,
"freq_max": 80,
"freq_span_max": 10,
"duration_min": 0.01,
"duration_max": 0.3},

}

# Re-apply Otsu's Thresholding (Cant do if using adaptive)
if(processing == "Otsu"):
"22kHz": {"freq_min": 15, # 25kh works to eliminate false low call detections, but is incorrect logically
"freq_max": 45,
"freq_span_max": 10,
"duration_min": 0.03,
"duration_max": 3.0},
"50kHz": {"freq_min": 40,
"freq_max": 80,
"freq_span_max": 40,
"duration_min": 0.01,
"duration_max": 0.3},
}

# Re-apply Otsu's Thresholding if using Otsus
if processing == "Otsu":
ret, thresholded_image = cv2.threshold(
cleaned_image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
else:
Expand All @@ -54,60 +79,72 @@ def detect_contours(cleaned_image, start_time, end_time, freq_min, freq_max,
contours, _ = cv2.findContours(
thresholded_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

# Initialize a list to hold the details of each detected USV
usv_details = []

# Process each contour
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
duration_start = start_time + \
(x / thresholded_image.shape[1]) * (end_time - start_time)
duration_end = start_time + \
((x + w) /
thresholded_image.shape[1]) * (end_time - start_time)
freq_start = freq_min + \
(y / thresholded_image.shape[0]) * (freq_max - freq_min)
freq_end = freq_min + \
((y + h) / thresholded_image.shape[0]) * (freq_max - freq_min)
duration_start = start_time + (x / thresholded_image.shape[1]) * (end_time - start_time)
duration_end = start_time + ((x + w) / thresholded_image.shape[1]) * (end_time - start_time)
freq_start = freq_min + (y / thresholded_image.shape[0]) * (freq_max - freq_min)
freq_end = freq_min + ((y + h) / thresholded_image.shape[0]) * (freq_max - freq_min)

duration = duration_end - duration_start
# Frequency span
freq_span = freq_end - freq_start

# For 22 kHz USVs
for call_type, call_def in call_type_defs.items():
if (freq_start > call_def["freq_min"] and
freq_end < call_def["freq_max"] and
freq_span < call_def["freq_span_max"] and
call_def["duration_max"] >= duration >= call_def["duration_min"]):

usv_details.append({
'bounding_box': (x, y, w, h),
'duration': (x, x + w),
'duration_start': duration_start,
'duration_end': duration_end,
'freq_start': freq_start,
'freq_end': freq_end
})

annotations.append({
'file': file_name.name,
'begin_time': duration_start,
'end_time': duration_end,
'low_freq': freq_start,
'high_freq': freq_end,
'duration': duration,
'USV_TYPE': call_type
})
# Annotate the image with the bounding boxes
image_with_annotations = cv2.cvtColor(
thresholded_image, cv2.COLOR_GRAY2BGR)
if (freq_start > call_def["freq_min"] and
freq_end < call_def["freq_max"] and
freq_span < call_def["freq_span_max"] and
call_def["duration_max"] >= duration >= call_def["duration_min"]):

usv_details.append({
'bounding_box': (x, y, w, h),
'duration': (x, x + w),
'duration_start': duration_start,
'duration_end': duration_end,
'freq_start': freq_start,
'freq_end': freq_end
})

annotations.append({
'file': file_name.name,
'begin_time': duration_start,
'end_time': duration_end,
'low_freq': freq_start,
'high_freq': freq_end,
'duration': duration,
'USV_TYPE': call_type
})

# Annotate detections
image_with_annotations = cv2.cvtColor(thresholded_image, cv2.COLOR_GRAY2BGR)
for usv in usv_details:
x, y, w, h = usv['bounding_box']
cv2.rectangle(image_with_annotations, (x, y),
(x + w, y + h), (0, 255, 0), 1)
(x + w, y + h), (0, 255, 0), 1) # green boxes

manual_csv = str(file_name).replace(".wav", ".csv")

colors = [(255, 0, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255)]

# Overlay manual annotations
if os.path.exists(manual_csv):
manual_annots = pd.read_csv(manual_csv, header=None, names=["begin_time", "end_time"])
manual_subset = manual_annots[
(manual_annots["end_time"] >= start_time) &
(manual_annots["begin_time"] <= end_time)
].reset_index(drop=True)

for idx, row in manual_subset.iterrows():
# map times to x coordinates for markers
x1 = int(((row["begin_time"] - start_time) / (end_time - start_time)) * thresholded_image.shape[1])
x2 = int(((row["end_time"] - start_time) / (end_time - start_time)) * thresholded_image.shape[1])
y1, y2 = 0, thresholded_image.shape[0] # full height

color = colors[idx % len(colors)] # cycle through colors
draw_dashed_rect(image_with_annotations, (x1, y1), (x2, y2), color, thickness=2, dash_length=5)

final_image = cv2.cvtColor(
image_with_annotations, cv2.COLOR_BGR2RGB)
final_image = cv2.cvtColor(image_with_annotations, cv2.COLOR_BGR2RGB)

return final_image, annotations
return final_image, annotations
53 changes: 43 additions & 10 deletions contourusv/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import matplotlib.pyplot as plt
from codecarbon import EmissionsTracker
from scipy.signal import spectrogram, butter, filtfilt
from scipy import ndimage
from sklearn.decomposition import NMF, FastICA
from preprocessing import clean_spec_imp, clean_spec_orig
from evaluation import run_evaluation
from detection import detect_contours
from generate_annotation import generate_annotations
from sklearn.exceptions import ConvergenceWarning
import warnings
import cv2
warnings.simplefilter("ignore", ConvergenceWarning)

from sklearn.decomposition import FastICA
Expand Down Expand Up @@ -78,14 +80,14 @@ def use_NMF_Small(Sxx, num_splits=120, n_components=25):

Sxx_part = Sxx[:, i:end_idx] # Extract segment

# Pad small segments with zeros or mean value to ensure correct dimensions
# Pad small segments with zeros to ensure correct dimensions
if Sxx_part.shape[1] < n_components:
print(f"Padding segment {i}-{end_idx} to meet {n_components} components")
# Padding with zeros
padding = np.zeros((Sxx_part.shape[0], n_components - Sxx_part.shape[1]))
Sxx_part = np.hstack((Sxx_part, padding)) # Add padding to the segment

# Use `nndsvd` if possible, otherwise fall back to `random`
# Use `nndsvd` if possible, otherwise fall back to `random` This depends on number of components, might not be enough
init_method = 'nndsvd' if Sxx_part.shape[1] >= n_components else 'random'
if init_method == 'nndsvd':
max_iter = 100
Expand All @@ -99,7 +101,7 @@ def use_NMF_Small(Sxx, num_splits=120, n_components=25):
W = model.fit_transform(Sxx_part)
H = model.components_

# Reconstruct the matrix segment
# Reconstruct an approximation the matrix segment
reconstructed_Sxx_part = np.dot(W, H)

transformed_parts.append(reconstructed_Sxx_part)
Expand Down Expand Up @@ -179,7 +181,7 @@ def low_pass_filter(data, cutoff, fs, order=5):


def run_detection(root_path, file_name, experiment, trial, overlap=3,
winlen=10, freq_min=15, freq_max=115, wsize=2500, th_perc=95, processing='none', overlapsize=.25):
winlen=10, freq_min=15, freq_max=115, wsize=2500, th_perc=85, processing='none', overlapsize=.25):
"""
Process audio file to detect ultrasonic vocalizations (USVs).

Expand Down Expand Up @@ -266,13 +268,44 @@ def run_detection(root_path, file_name, experiment, trial, overlap=3,
noise_floor = np.percentile(Sxx, th_perc)
Sxx[Sxx < noise_floor] = noise_floor

if (processing != "Otsu"):
Sxx = use_NMF_Small(Sxx)
# Current thought: Split the spectrogram into upper and lower frequency bands
# Lower band: use current process... might need to make AT a bit less aggressive?
# Upper band: need to make a less aggressive process... CLAHE maybe?

if(processing == "Otsu"):
cleaned_image = clean_spec_orig(Sxx)

# Find index where frequency >= 40 kHz (f is in kHz)

split = 40

split_idx = np.argmax(f >= split)

# Store original upper half from *raw* Sxx for later restoration
Sxx_high_original = Sxx[split_idx:, :]

# Process entire Sxx
if processing != "Otsu":
Sxx_processed = use_NMF_Small(Sxx)
else:
cleaned_image = clean_spec_imp(Sxx)
Sxx_processed = Sxx # no NMF for Otsu

# Clean entire spectrogram
if processing == "Otsu":
cleaned_image = clean_spec_orig(Sxx_processed)
else:
cleaned_image = clean_spec_imp(Sxx_processed)

# Apply mild median filter to reduce noise while keeping signals
Sxx_high_original = ndimage.median_filter(Sxx_high_original, size=3)

# Normalize spectrogram PERFORMS POORLY ON C57
Sxx_high_original = cv2.normalize(Sxx_high_original, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)

min_cols = min(cleaned_image.shape[1], Sxx_high_original.shape[1])
cleaned_image = cleaned_image[:, :min_cols]
Sxx_high_norm = Sxx_high_original[:, :min_cols]

# Combine the lower band (processed) and upper band (original) into cleaned_image
cleaned_image[split_idx:, :] = Sxx_high_norm

final_image, annotations = detect_contours(cleaned_image, start_time, end_time, freq_min, freq_max, file_name, annotations, processing=processing)

Expand Down Expand Up @@ -369,7 +402,7 @@ def run_detection(root_path, file_name, experiment, trial, overlap=3,

# Run detection for each audio file
for audio_file in tqdm(audio_files, desc=f"Running Detection on audio files for {experiment} {trial}"):
run_detection(root_path, audio_file, experiment, trial, **ac_kwargs)
run_detection(root_path, audio_file, experiment, trial,**ac_kwargs)

# Generate ground truth annotations
generate_annotations(experiment, trial, root_path, file_ext)
Expand Down
13 changes: 3 additions & 10 deletions contourusv/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@
from scipy import ndimage
from sklearn.decomposition import NMF

# Lists for PR:
# 1. Combine preprocessing functions into a single function with a parameter to choose between them.
# 2. Generate f1 score graph using sb, add to poster
# 3. Fun tests on rat_pleasant using both methods
# 4. Push PR to main repo
# 5. Make sure ghpages is pushing correctly

def clean_spec_imp(Sxx):
"""
Improved preprocessing of spectrogram data for USV detection, using adaptive theshholding.
Expand Down Expand Up @@ -40,22 +33,22 @@ def clean_spec_imp(Sxx):
enhanced_img, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
15, 15
15, 15 # blockSize, C
)

# best results so far, 15, 15
# 15, 19 seems to do well

# Morphological processing to connect weakly detected signals
# kernel = np.ones((1, 1), np.uint8) # Moderate kernel size
# kernel = np.ones((1, 1), np.uint8) # Moderate kernel size THIS DOESNT WORK WELL

# final_img = cv2.morphologyEx(adaptive_img, cv2.MORPH_OPEN, kernel)

return adaptive_img




# older version of Clean_spec_imp
def clean_spec_orig(Sxx):
"""
Preprocess spectrogram data for USV detection using Otsu's threshholding and CLAHE contrast enhancement.
Expand Down
Loading