-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprediction_functions.py
More file actions
258 lines (206 loc) · 8.57 KB
/
prediction_functions.py
File metadata and controls
258 lines (206 loc) · 8.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import os
import numpy as np
import soundfile as sf
import matplotlib.pyplot as plt
from scipy.io import wavfile, loadmat
from scipy.interpolate import interp1d
from hsp_utils.evaluation_functions import eval_sound_model, class_representations
def find_and_open_audio(db_folder, true_value=False):
'''Función que permite la apertura de archivos de audio en la base
de datos de la carpeta especificada.
Parameters
----------
db_folder : str
Carpeta de la base de datos.
true_value : bool, optional
Booleano que indica si es que se retorna la etiqueta original
del archivo de audio (en formato .mat). Por defecto es False.
Returns
-------
audio : ndarray
Señal de audio de interés.
samplerate : int or float
Tasa de muestreo de la señal.
'''
def _file_selection(filenames):
print('Seleccione el archivo que desea descomponer:')
for num, name in enumerate(filenames):
print(f'[{num + 1}] {name}')
# Definición de la selección
selection = int(input('Selección: '))
# Se retorna
try:
return filenames[selection-1].strip('.wav')
except:
raise Exception('No ha seleccionado un archivo válido.')
def _open_file(filename):
# Obtención del archivo de audio .wav
try:
samplerate, audio = wavfile.read(f'{filename}.wav')
except:
audio, samplerate = sf.read(f'{filename}.wav')
return audio, samplerate
# Definición del archivo a revisar
filenames = [i for i in os.listdir(db_folder) if i.endswith('.wav')]
# Definición de la ubicación del archivo
filename = f'{db_folder}/{_file_selection(filenames)}'
# Retornando
if not true_value:
return _open_file(filename)
else:
return _open_file(filename), \
loadmat(f'{filename}.mat')['PCG_states']
def hss_segmentation(signal_in, samplerate, model_name,
length_desired, lowpass_params=None,
plot_outputs=False):
'''Función que segmenta un sonido auscultado de entrada utilizando
uno de los modelos disponibles en la carpeta "models".
Parameters
----------
signal_in : ndarray
Señal de entrada.
samplerate : int or float
Tasa de muestreo de la señal de entrada.
model_name : str
Nombre del modelo de la red en la dirección
"heart_sound_segmentation/models".
length_desired : int or float
Largo deseado de la señal.
lowpass_params : dict or None
Diccionario que contiene la información del filtro pasa
bajos en la salida de la red. Si es None, no se utiliza.
Por defecto es None.
plot_outputs : bool
Booleano para indicar si se realizan gráficos. Por defecto
es False.
Returns
-------
y_hat : ndarray
Salidas de la red indicando la probabilidad de ocurrencia
de cada clase.
y_hat_to : ndarray
Salidas de la red indicando la probabilidad de ocurrencia
de cada clase, pero con la corrección de la cantidad de
puntos.
(y_out2, y_out3, y_out4) : list of ndarray
Salida de la red indicando las 2, 3 y 4 posibles clases.
'''
# Salida de la red
_audio, y_hat = eval_sound_model(signal_in, samplerate, model_name,
lowpass_params=lowpass_params)
# Definición del largo deseado ajustado a y_hat
length_desired_to = round(len(y_hat[0,:,0]) / len(_audio) * \
length_desired)
# Definición de las probabilidades resampleadas
y_hat_to = np.zeros((1, length_desired_to, 3))
# Para cada una de las salidas, se aplica un resample
for i in range(3):
y_hat_to[0, :, i] = \
segments_redimension(y_hat[0, :, i],
length_desired=length_desired_to,
kind='cubic')
# Definiendo la cantidad de puntos finales a añadir
q_times = length_desired - y_hat_to.shape[1]
# Generando los puntos a añadir
points_to_add = np.tile(y_hat_to[:,-1,:], (1, q_times, 1))
# Agregando los puntos a la señal
y_hat_to = np.concatenate((y_hat_to, points_to_add), axis=1)
# Representación en clases
y_out2, y_out3, y_out4 = \
class_representations(y_hat_to, plot_outputs=plot_outputs,
audio_data=None)
return y_hat, y_hat_to, (y_out2, y_out3, y_out4)
def segments_redimension(signal_in, length_desired, kind='linear'):
'''Función que redimensiona la salida y_hat de las redes para
dejarlo en función de un largo deseado.
Parameters
----------
signal_in : ndarray
Señal de entrada.
length_desired : int
Largo deseado de la señal.
kind : str
Opción kind de la función "scipy.interpolate.interp1d".
Por defecto es "linear".
Returns
-------
signal_out : ndarray
Señal resampleada.
'''
# Definición del eje temporal de la señal
x = np.linspace(0, length_desired - 1, len(signal_in))
# Función de interpolación en base a los datos de la señal
f = interp1d(x, signal_in, kind=kind)
# Definición de la nueva cantidad de puntos
x_new = np.arange(length_desired)
# Interpolando finalmente
return f(x_new)
def find_segments_limits(y_hat, segments_return='Non-Heart'):
'''Función que obtiene los límites de las posiciones de los sonidos
cardiacos a partir de la señal binaria indica su presencia.
Parameters
----------
y_hat : ndarray
Señal binaria que indica la presencia de sonidos cardiacos.
segments_return : {'Heart', 'Non-Heart'}, optional
Opción que decide si es que se retornan los intervalos de sonido
cardiaco o los intervalos libres de sonido cardiaco. Por defecto
es 'Non-Heart'.
Returns
-------
interval_list : list
Lista de intervalos en los que se encuentra el sonido cardiaco.
'''
# Encontrando los puntos de cada sonido
if segments_return == 'Non-Heart':
hss_pos = np.where(y_hat == 0)[0]
elif segments_return == 'Heart':
hss_pos = np.where(y_hat == 1)[0]
else:
raise Exception('Opción no válida para "segments_return".')
# Definición de la lista de intervalos
interval_list = list()
# Inicio del intervalo
beg_seg = hss_pos[0]
# Definiendo
for i in range(len(hss_pos) - 1):
if hss_pos[i + 1] - hss_pos[i] != 1:
interval_list.append([beg_seg, hss_pos[i]])
beg_seg = hss_pos[i + 1]
if hss_pos[-1] > beg_seg:
interval_list.append([beg_seg, hss_pos[-1]])
return interval_list
# Módulo de testeo
if __name__ == '__main__':
print("Testeo de función en utils\n")
# Definición de la función a testear
test_func = 'signal_segmentation'
if test_func == 'signal_segmentation':
# Definición de la frecuencia de muestreo deseada para separación de fuentes
samplerate_des = 11025 # Hz
# Cargando el archivo de audio
db_folder = 'samples_test'
audio, samplerate = find_and_open_audio(db_folder)
# Parámetros del filtro pasa bajos a la salida de la red
lowpass_params = {'freq_pass': 140, 'freq_stop': 150}
# Definición del modelo a utilizar
model_name = 'definitive_segnet_based'
# Obteniendo la salida de la red
y_hat, y_hat_to, (y_out2, y_out3, y_out4) = \
hss_segmentation(audio, samplerate, model_name,
length_desired=len(audio),
lowpass_params=lowpass_params,
plot_outputs=False)
plt.figure(figsize=(9,5))
audio_data_plot = 0.5 * audio / max(abs(audio))
plt.plot(audio_data_plot + 0.5, label=r'$s(n)$',
color='silver', zorder=0)
plt.plot(y_hat_to[0,:,0], label=r'$S_0$', color='limegreen', zorder=2)
plt.plot(y_hat_to[0,:,1], label=r'$S_1$', color='red', zorder=1)
plt.plot(y_hat_to[0,:,2], label=r'$S_2$', color='blue', zorder=1)
plt.legend(loc='lower right')
plt.yticks([0, 0.5, 1])
plt.xlabel('Muestras')
plt.ylabel(r'$P(y(n) = k | X)$')
plt.title('Predicción de sonidos cardiacos')
plt.show()