-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
165 lines (149 loc) · 7.72 KB
/
Copy pathmain.py
File metadata and controls
165 lines (149 loc) · 7.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
Author: Giorgio
Date: July 2023
Main
__________
Main script to invoke the biological measure classes and compute them from a given dataset. The dataset can
be the lab recordings. As conditions, it must have the Green, Red and IR LEDs. From there, the main script displays the
raw signal, pre-processed signal and the biological measures in a single dyanmic plot that
iterates every new epoch.
"""
import numpy as np
from models.filter_zero_offset import FilterZeroOffset
from notebooks.data_exploration import DataExploration
from src.prototype.data_visualization import DataPlotter
from src.biological_measures.pulse_oxyemeter import Spo2
from src.biological_measures.heart_rate_variability import HeartRate
from src.biological_measures.respiration_rate_periodogram import RespirationRatePeriodogram
from configurations.params_20230714_lab_rec import params_lab
import pandas as pd
from tqdm import tqdm
if __name__ == "__main__":
# %% initialize variables
root = params_lab['root_path']
epoch_duration = 10 # epoch duration in seconds
# %% Files we will work with
csv_files = list(root.joinpath(params_lab['data_path']).glob('*.csv'))
num_files = len(csv_files)
# %% Evaluate how many epochs
# time duration of the dataset with a random subject
total_samples = pd.read_csv(csv_files[0]).shape[0]
# Calculate the number of samples per epoch
samples_per_epoch = int(params_lab['fs'] * epoch_duration) + 1
epoch_starts = [int(start_index_ / params_lab['fs']) for start_index_ in range(0, total_samples, samples_per_epoch)]
# %% data plotter
data_plotter = DataPlotter(config=params_lab,
segment_duration_sec=epoch_duration,
fs=params_lab['fs']
)
# %% initialize SPO2 class
spo2_class = Spo2(fs=params_lab['fs'], epoch_length=samples_per_epoch, )
# %% respiration rate measure
resp_rate_estimation = RespirationRatePeriodogram(config=params_lab,
epoch_duration_sec=epoch_duration,
sampling_rate=params_lab['fs'],
welch_overlap=50)
# %% initialize HeartRate class
hr_class = HeartRate(fs=params_lab['fs'], epoch_length=samples_per_epoch, )
# %% pre-processing class
filter_zero_offset = FilterZeroOffset(fs=params_lab['fs'], samples_segment=epoch_duration,
signal_duration_sec=epoch_duration,
filter_order=2, lowcut=.5, highcut=10)
# %% Loop across the epochs and compute the measures
for file_ in tqdm(csv_files, # skip the subject info file (last one)
desc=f"Processing files",
leave=True):
subject_data = pd.read_csv(file_)
# initialize the class that selects the epochs from the complete time series dataset
data_exploration = DataExploration(config=params_lab,
frame=subject_data,
fs=params_lab['fs'])
# Iterate over the time series in non-overlapping epochs and store each metric as a time series
for epoch_ in tqdm(epoch_starts[0:-1],
desc="Processing epochs:",
leave=False,
colour='green'):
# print(f'epoch: {epoch_}')
_, data_window = data_exploration.apply_window(start_sec=epoch_,
duration_sec=epoch_duration,
re_factor=True)
# set the signal
ir_signal: list = data_window.loc[:, 'Infrared'].values
red_signal: list = data_window.loc[:, 'Red'].values
green_signal: list = data_window.loc[:, 'Green'].values
# preprocess
filter_zero_offset.set_signal(in_signal=ir_signal.copy())
ir_denoised = filter_zero_offset.process_signal()
filter_zero_offset.set_signal(in_signal=red_signal.copy())
red_denoised = filter_zero_offset.process_signal()
filter_zero_offset.set_signal(in_signal=green_signal.copy())
green_denoised = filter_zero_offset.process_signal()
# %% compute biological measures
resp_rate_estimation.set_ppg_epoch(epoch=green_signal)
resp_rate = resp_rate_estimation.calculate_respiration_rate()
pred_spo2 = spo2_class.compute_spo2(
epoch_ppg={'red': red_signal, 'ir': ir_signal, 'green': green_signal},
denoised_ppg={'red': red_denoised, 'ir': ir_denoised, 'green': green_denoised},
spo2_by_beat=None)
bpm = hr_class.compute_heart_measures(
epoch_ppg={'red': red_signal, 'ir': ir_signal, 'green': green_signal},
denoised_ppg={'red': red_denoised, 'ir': ir_denoised, 'green': green_denoised},
plot_waveforms=False)
# %% update the frames of the plot
data_plotter.set_frame_data(raw_signal=np.asarray(green_signal),
pre_proc_signal=green_denoised,
bpm_txt=str(bpm["bpm_time"]),
heart_rate_txt=str(np.round(np.mean(bpm["hrv"]), 2)),
spo2_txt=str(np.round(np.mean(pred_spo2['spo2s_timed']), 2)),
resprate_txt=str(np.round(resp_rate, 2))
)
data_plotter.plot_boi()
data_plotter.finish_cycle()
# if __name__ == "__main__":
# signal_duration_sec = 10
# n_samples_to_show = signal_duration_sec * params_imp_ver_one['fs'] # Number of samples per segment
#
# data_plotter = DataPlotter(segment_duration_sec=signal_duration_sec, fs=params_imp_ver_one['fs'])
#
# ceemandwt = CeemandWT(
# signal_duration_sec=signal_duration_sec,
# fs=params_imp_ver_one['fs'],
# n_trials=params_ceemand_wt['n_trials'],
# epsilon=params_ceemand_wt['epsilon'],
# seed=params_ceemand_wt['seed'],
# max_imf=params_ceemand_wt['max_imf'],
# z_thr=params_ceemand_wt['z_thr'],
# wavelet_levels=params_ceemand_wt['wavelet_levels'],
# wavelet=params_ceemand_wt['wavelet'])
#
# device = ReadPPgSerial(baudrate=115200, port="COM4")
#
# start_time = time.time() # Store the start time
# while True:
# start_pipeline = time.time()
# elapsed_time = start_pipeline - start_time # Calculate the elapsed time
#
# print(f'Requesting data at {elapsed_time} seconds\n')
# ppg = device.request_data(number_of_samples=signal_duration_sec * params_imp_ver_one['fs'])
#
# start_time_ceemdan_wt = time.time()
# denoised_signal = ceemandwt.run_ceemand_wt(in_signal=ppg)
# ceemandwt_execution_time = time.time() - start_time_ceemdan_wt # Calculate the execution time
# print(f'CEEMDAN WT computed in {ceemandwt_execution_time} seconds\n')
#
# data_plotter.set_frame_data(segment_one=np.array(ppg),
# segment_two=denoised_signal,
# upper_txt='12', lower_txt='30')
#
# data_plotter.plot_boi()
#
# execution_time = time.time() - start_pipeline
# # request data every 15 seconds
# if execution_time < 15:
# print(f'sleeping {execution_time}')
# time.sleep(15 - execution_time)
# else:
# print(f'Execution time of {execution_time} seconds\n')
# continue
#
# data_plotter.finish_cycle()