-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy path__init__.py
More file actions
398 lines (346 loc) · 14.9 KB
/
Copy path__init__.py
File metadata and controls
398 lines (346 loc) · 14.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
import os
import math
import librosa
import numpy as np
from compiam.exceptions import ModelNotTrainedError
from compiam.utils.pitch import normalisation, resampling
from compiam.utils.download import download_remote_model
from compiam.melody.pitch_extraction.ftanet_carnatic.pitch_processing import (
batchize_test,
get_est_arr,
)
from compiam.melody.pitch_extraction.ftanet_carnatic.cfp import cfp_process
from compiam.io import write_csv
from compiam.utils import get_logger, WORKDIR
logger = get_logger(__name__)
class FTANetCarnatic(object):
"""FTA-Net melody extraction tuned to Carnatic Music."""
def __init__(
self,
model_path=None,
download_link=None,
download_checksum=None,
sample_rate=8000,
gpu="-1",
):
"""FTA-Net melody extraction init method.
:param model_path: path to file to the model weights.
:param download_link: link to the remote pre-trained model.
:param download_checksum: checksum of the model file.
:param sample_rate: Sample rate to which the audio is sampled for extraction.
:param gpu: Id of the available GPU to use (-1 by default, to run on CPU), use string: '0', '1', etc.
"""
### IMPORTING OPTIONAL DEPENDENCIES
try:
global tf
import tensorflow as tf
except:
raise ImportError(
"In order to use this tool you need to have tensorflow installed. "
"Install compIAM with tensorflow support: pip install 'compiam[tensorflow]'"
)
###
## Setting up GPU if specified
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu)
self.gpu = gpu
self.model = self._build_model()
self.sample_rate = sample_rate
self.trained = False
self.model_path = model_path
self.download_link = download_link
self.download_checksum = download_checksum
if self.model_path is not None:
self.load_model(self.model_path)
@staticmethod
def SF_Module(x_list, n_channel, reduction, limitation):
"""Selection and fusion module.
Implementation taken from https://github.com/yushuai/FTANet-melodic
:param x_list: list of tensor inputs.
:param n_channel: number of feature channels.
:param reduction: the rate to which the data is compressed.
:param limitation: setting a compressing limit.
:returns: a tensor with the fused and selected feature map.
"""
## Split
fused = None
for x_s in x_list:
if fused == None:
fused = x_s
else:
fused = tf.keras.layers.Add()([fused, x_s])
## Fuse
fused = tf.keras.layers.GlobalAveragePooling2D()(fused)
fused = tf.keras.layers.BatchNormalization()(fused)
fused = tf.keras.layers.Dense(
max(n_channel // reduction, limitation), activation="selu"
)(fused)
## Select
masks = []
for i in range(len(x_list)):
masks.append(tf.keras.layers.Dense(n_channel)(fused))
mask_stack = tf.keras.layers.Lambda(
tf.keras.backend.stack, arguments={"axis": -1}
)(masks)
# (n_channel, n_kernel)
mask_stack = tf.keras.layers.Softmax(axis=-2)(mask_stack)
selected = None
for i, x_s in enumerate(x_list):
mask = tf.keras.layers.Lambda(lambda z: z[:, :, i])(mask_stack)
mask = tf.keras.layers.Reshape((1, 1, n_channel))(mask)
x_s = tf.keras.layers.Multiply()([x_s, mask])
if selected == None:
selected = x_s
else:
selected = tf.keras.layers.Add()([selected, x_s])
return selected
@staticmethod
def FTA_Module(x, shape, kt, kf):
"""Selection and fusion module.
Implementation taken from https://github.com/yushuai/FTANet-melodic
:param x: input tensor.
:param shape: the shape of the input tensor.
:param kt: kernel size for time attention.
:param kf: kernel size for frequency attention.
:returns: the resized input, the time-attention map,
and the frequency-attention map.
"""
x = tf.keras.layers.BatchNormalization()(x)
## Residual
x_r = tf.keras.layers.Conv2D(
shape[2], (1, 1), padding="same", activation="relu"
)(x)
## Time Attention
# Attn Map (1, T, C), FC
a_t = tf.keras.layers.Lambda(tf.keras.backend.mean, arguments={"axis": -3})(x)
a_t = tf.keras.layers.Conv1D(shape[2], kt, padding="same", activation="selu")(
a_t
)
a_t = tf.keras.layers.Conv1D(shape[2], kt, padding="same", activation="selu")(
a_t
) # 2
a_t = tf.keras.layers.Softmax(axis=-2)(a_t)
a_t = tf.keras.layers.Reshape((1, shape[1], shape[2]))(a_t)
# Reweight
x_t = tf.keras.layers.Conv2D(
shape[2], (3, 3), padding="same", activation="selu"
)(x)
x_t = tf.keras.layers.Conv2D(
shape[2], (5, 5), padding="same", activation="selu"
)(x_t)
x_t = tf.keras.layers.Multiply()([x_t, a_t])
# Frequency Attention
# Attn Map (F, 1, C), Conv1D
a_f = tf.keras.layers.Lambda(tf.keras.backend.mean, arguments={"axis": -2})(x)
a_f = tf.keras.layers.Conv1D(shape[2], kf, padding="same", activation="selu")(
a_f
)
a_f = tf.keras.layers.Conv1D(shape[2], kf, padding="same", activation="selu")(
a_f
)
a_f = tf.keras.layers.Softmax(axis=-2)(a_f)
a_f = tf.keras.layers.Reshape((shape[0], 1, shape[2]))(a_f)
# Reweight
x_f = tf.keras.layers.Conv2D(
shape[2], (3, 3), padding="same", activation="selu"
)(x)
x_f = tf.keras.layers.Conv2D(
shape[2], (5, 5), padding="same", activation="selu"
)(x_f)
x_f = tf.keras.layers.Multiply()([x_f, a_f])
return x_r, x_t, x_f
def _build_model(self, input_shape=(320, 128, 3)):
"""Building the entire FTA-Net.
Implementation taken from https://github.com/yushuai/FTANet-melodic
:param input_shape: input shape.
:returns: a tensorflow Model instance of the FTA-Net.
"""
visible = tf.keras.layers.Input(shape=input_shape)
x = tf.keras.layers.BatchNormalization()(visible)
## Bottom
# bm = BatchNormalization()(x)
bm = x
bm = tf.keras.layers.Conv2D(
16, (4, 1), padding="valid", strides=(4, 1), activation="selu"
)(
bm
) # 80
bm = tf.keras.layers.Conv2D(
16, (4, 1), padding="valid", strides=(4, 1), activation="selu"
)(
bm
) # 20
bm = tf.keras.layers.Conv2D(
16, (4, 1), padding="valid", strides=(4, 1), activation="selu"
)(
bm
) # 5
bm = tf.keras.layers.Conv2D(
1, (5, 1), padding="valid", strides=(5, 1), activation="selu"
)(
bm
) # 1
shape = input_shape
x_r, x_t, x_f = self.FTA_Module(x, (shape[0], shape[1], 32), 3, 3)
x = self.SF_Module([x_r, x_t, x_f], 32, 4, 4)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x_r, x_t, x_f = self.FTA_Module(x, (shape[0] // 2, shape[1] // 2, 64), 3, 3)
x = self.SF_Module([x_r, x_t, x_f], 64, 4, 4)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x_r, x_t, x_f = self.FTA_Module(x, (shape[0] // 4, shape[1] // 4, 128), 3, 3)
x = self.SF_Module([x_r, x_t, x_f], 128, 4, 4)
x_r, x_t, x_f = self.FTA_Module(x, (shape[0] // 4, shape[1] // 4, 128), 3, 3)
x = self.SF_Module([x_r, x_t, x_f], 128, 4, 4)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x_r, x_t, x_f = self.FTA_Module(x, (shape[0] // 2, shape[1] // 2, 64), 3, 3)
x = self.SF_Module([x_r, x_t, x_f], 64, 4, 4)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x_r, x_t, x_f = self.FTA_Module(x, (shape[0], shape[1], 32), 3, 3)
x = self.SF_Module([x_r, x_t, x_f], 32, 4, 4)
x_r, x_t, x_f = self.FTA_Module(x, (shape[0], shape[1], 1), 3, 3)
x = self.SF_Module([x_r, x_t, x_f], 1, 4, 4)
x = tf.keras.layers.Concatenate(axis=1)([bm, x])
# Softmax
x = tf.keras.layers.Lambda(tf.keras.backend.squeeze, arguments={"axis": -1})(x)
x = tf.keras.layers.Softmax(axis=-2)(x)
return tf.keras.models.Model(inputs=visible, outputs=x)
def load_model(self, model_path):
if ".data-00000-of-00001" not in model_path:
path_to_check = model_path + ".data-00000-of-00001"
if not os.path.exists(path_to_check):
self.download_model(model_path) # Downloading model weights
self.model.load_weights(model_path).expect_partial()
self.model_path = model_path
self.trained = True
def download_model(self, model_path=None, force_overwrite=False):
"""Download pre-trained model."""
download_path = (
os.sep + os.path.join(*model_path.split(os.sep)[:-2])
if model_path is not None
else os.path.join(WORKDIR, "models", "melody", "ftanet-carnatic")
)
# Creating model folder to store the weights
if not os.path.exists(download_path):
os.makedirs(download_path)
download_remote_model(
self.download_link,
self.download_checksum,
download_path,
force_overwrite=force_overwrite,
)
def predict(
self,
input_data,
input_sr=44100,
hop_size=80,
batch_size=5,
out_step=None,
amplify_input=1.0,
gpu="-1",
):
"""Extract melody from input_data.
Implementation taken (and slightly adapted) from https://github.com/yushuai/FTANet-melodic.
:param input_data: path to audio file or numpy array like audio signal.
:param input_sr: sampling rate of the input array of data (if any). This variable is only
relevant if the input is an array of data instead of a filepath.
:param hop_size: hop size between frequency estimations.
:param batch_size: batches of seconds that are passed through the model
(defaulted to 5, increase if enough computational power, reduce if
needed).
:param out_step: particular time-step duration if needed at output
:param amplify_input: for low volume inputs, we've found that overlouding it may provide better voicing detection (e.g. x10, x50)
:param gpu: Id of the available GPU to use (-1 by default, to run on CPU), use string: '0', '1', etc.
:returns: a 2-D list with time-stamps and pitch values per timestamp.
"""
## Setting up GPU if any
if gpu != self.gpu:
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu)
self.gpu = gpu
if self.trained is False:
raise ModelNotTrainedError(
"""Model is not trained. Please load model before running inference!
You can load the pre-trained instance with the load_model wrapper."""
)
# Loading and resampling audio
if isinstance(input_data, str):
if not os.path.exists(input_data):
raise FileNotFoundError("Target audio not found.")
audio, _ = librosa.load(input_data, sr=self.sample_rate)
elif isinstance(input_data, np.ndarray):
logger.warning(
f"Resampling... (input sampling rate is assumed {input_sr}Hz, \
make sure this is correct and change input_sr otherwise)"
)
audio = librosa.resample(
input_data, orig_sr=input_sr, target_sr=self.sample_rate
)
else:
raise ValueError("Input must be path to audio signal or an audio array")
audio_shape = audio.shape
if len(audio_shape) > 1:
audio_channels = min(audio_shape)
if audio_channels == 1:
audio = audio.flatten()
else:
audio = np.mean(audio, axis=np.argmin(audio_shape))
xlist = []
timestamps = []
# Applying loudness scaling
audio = audio / audio.max()
audio = audio * amplify_input
audio_len = len(audio)
batch_min = self.sample_rate * 60 * batch_size
freqs = []
if audio_len > batch_min:
iters = math.ceil(audio_len / batch_min)
for i in np.arange(iters):
if i < iters - 1:
audio_in = audio[batch_min * i : batch_min * (i + 1)]
if i == iters - 1:
audio_in = audio[batch_min * i :]
feature, _, time_arr = cfp_process(
audio_in, sr=self.sample_rate, hop=hop_size
)
data = batchize_test(feature, size=128)
xlist.append(data)
timestamps.append(time_arr)
estimation = get_est_arr(self.model, xlist, timestamps, batch_size=16)
if i == 0:
freqs = estimation[:, 1]
else:
freqs = np.concatenate((freqs, estimation[:, 1]))
else:
feature, _, time_arr = cfp_process(audio, sr=self.sample_rate, hop=hop_size)
data = batchize_test(feature, size=128)
xlist.append(data)
timestamps.append(time_arr)
# Getting estimatted pitch
estimation = get_est_arr(self.model, xlist, timestamps, batch_size=16)
freqs = estimation[:, 1]
TStamps = np.linspace(0, audio_len / self.sample_rate, len(freqs))
freqs[freqs < 50] = 0
output = np.array([TStamps, freqs]).transpose()
if out_step is not None:
new_len = int((audio_len / self.sample_rate) // out_step)
return resampling(output, new_len)
return output
@staticmethod
def normalise_pitch(pitch, tonic, bins_per_octave=120, max_value=4):
"""Normalise pitch given a tonic.
:param pitch: a 2-D list with time-stamps and pitch values per timestamp.
:param tonic: recording tonic to normalize the pitch to.
:param bins_per_octave: number of frequency bins per octave.
:param max_value: maximum value to clip the normalized pitch to.
:returns: a 2-D list with time-stamps and normalised to a given tonic
pitch values per timestamp.
"""
return normalisation(
pitch, tonic, bins_per_octave=bins_per_octave, max_value=max_value
)
@staticmethod
def save_pitch(data, output_path):
"""Calling the write_csv function in compiam.io to write the output pitch curve in a fle
:param data: the data to write
:param output_path: the path where the data is going to be stored
:returns: None
"""
return write_csv(data, output_path)