forked from speechbrain/speechbrain
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdynamic_mixing.py
234 lines (190 loc) · 7.12 KB
/
dynamic_mixing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import glob
import os
import random
import warnings
import numpy as np
import pyloudnorm
import torch
import torchaudio
from tqdm import tqdm
import speechbrain as sb
from speechbrain.dataio.batch import PaddedBatch
"""
The functions to implement Dynamic Mixing For SpeechSeparation
Authors
* Samuele Cornell 2021
* Cem Subakan 2021
"""
def build_spk_hashtable_librimix(hparams):
"""
This function builds a dictionary of speaker-utterance pairs to be used in dynamic mixing
"""
libri_utterances = glob.glob(
os.path.join(hparams["base_folder_dm"], "**/*.wav"), recursive=True
)
spk_hashtable = {}
# just for one file check if the sample rate is correct
assert (
torchaudio.info(libri_utterances[0]).sample_rate
== hparams["sample_rate"]
)
for utt in tqdm(libri_utterances):
path = os.path.normpath(utt)
path_list = path.split(os.sep)
spk_id = path_list[-3]
# e.g. LibriSpeech/train-clean-100/441/128988/441-128988-0014.flac
# id of speaker is 441 utterance is 128988-0014
if spk_id not in spk_hashtable.keys():
spk_hashtable[spk_id] = [utt]
else:
spk_hashtable[spk_id].append(utt)
# calculate weights for each speaker ( len of list of utterances)
spk_weights = [len(spk_hashtable[x]) for x in spk_hashtable.keys()]
return spk_hashtable, spk_weights
def get_wham_noise_filenames(hparams):
"This function lists the WHAM! noise files to be used in dynamic mixing"
if "Libri" in hparams["data_folder"]:
# Data folder should point to Libri2Mix folder
if hparams["sample_rate"] == 8000:
noise_path = "wav8k/min/train-360/noise/"
elif hparams["sample_rate"] == 16000:
noise_path = "wav16k/min/train-360/noise/"
else:
raise ValueError("Unsupported Sampling Rate")
else:
if hparams["sample_rate"] == 8000:
noise_path = "wav8k/min/tr/noise/"
elif hparams["sample_rate"] == 16000:
noise_path = "wav16k/min/tr/noise/"
else:
raise ValueError("Unsupported Sampling Rate")
noise_files = glob.glob(
os.path.join(hparams["data_folder"], noise_path, "*.wav")
)
return noise_files
def dynamic_mix_data_prep_librimix(hparams):
"""
Dynamic mixing for LibriMix
"""
# 1. Define datasets
train_data = sb.dataio.dataset.DynamicItemDataset.from_csv(
csv_path=hparams["train_data"],
replacements={"data_root": hparams["data_folder"]},
)
# we build an dictionary where keys are speakers id and entries are list
# of utterances files of that speaker
print("Building the speaker hashtable for dynamic mixing")
spk_hashtable, spk_weights = build_spk_hashtable_librimix(hparams)
spk_list = [x for x in spk_hashtable.keys()]
spk_weights = [x / sum(spk_weights) for x in spk_weights]
if hparams["use_wham_noise"]:
noise_files = get_wham_noise_filenames(hparams)
@sb.utils.data_pipeline.takes("mix_wav")
@sb.utils.data_pipeline.provides(
"mix_sig", "s1_sig", "s2_sig", "s3_sig", "noise_sig"
)
def audio_pipeline(
mix_wav,
): # this is dummy --> it means one epoch will be same as without dynamic mixing
"""
This audio pipeline defines the compute graph for dynamic mixing
"""
speakers = np.random.choice(
spk_list, hparams["num_spks"], replace=False, p=spk_weights
)
if hparams["use_wham_noise"]:
noise_file = np.random.choice(noise_files, 1, replace=False)
noise, fs_read = torchaudio.load(noise_file[0])
noise = noise.squeeze()
# select two speakers randomly
sources = []
spk_files = [
np.random.choice(spk_hashtable[spk], 1, False)[0]
for spk in speakers
]
minlen = min(
*[torchaudio.info(x).num_frames for x in spk_files],
hparams["training_signal_len"],
)
meter = pyloudnorm.Meter(hparams["sample_rate"])
MAX_AMP = 0.9
MIN_LOUDNESS = -33
MAX_LOUDNESS = -25
def normalize(signal, is_noise=False):
"""
This function normalizes the audio signals for loudness
"""
with warnings.catch_warnings():
warnings.simplefilter("ignore")
c_loudness = meter.integrated_loudness(signal)
if is_noise:
target_loudness = random.uniform(
MIN_LOUDNESS - 5, MAX_LOUDNESS - 5
)
else:
target_loudness = random.uniform(MIN_LOUDNESS, MAX_LOUDNESS)
signal = pyloudnorm.normalize.loudness(
signal, c_loudness, target_loudness
)
# check for clipping
if np.max(np.abs(signal)) >= 1:
signal = signal * MAX_AMP / np.max(np.abs(signal))
return torch.from_numpy(signal)
for i, spk_file in enumerate(spk_files):
# select random offset
length = torchaudio.info(spk_file).num_frames
start = 0
stop = length
if length > minlen: # take a random window
start = np.random.randint(0, length - minlen)
stop = start + minlen
tmp, fs_read = torchaudio.load(
spk_file,
frame_offset=start,
num_frames=stop - start,
)
tmp = tmp[0].numpy()
tmp = normalize(tmp)
sources.append(tmp)
sources = torch.stack(sources)
mixture = torch.sum(sources, 0)
if hparams["use_wham_noise"]:
len_noise = len(noise)
len_mix = len(mixture)
min_len = min(len_noise, len_mix)
noise = normalize(noise.numpy(), is_noise=True)
mixture = mixture[:min_len] + noise[:min_len]
# check for clipping
max_amp_insig = mixture.abs().max().item()
if max_amp_insig > MAX_AMP:
weight = MAX_AMP / max_amp_insig
else:
weight = 1
sources = weight * sources
mixture = weight * mixture
yield mixture
for i in range(hparams["num_spks"]):
yield sources[i]
# If the number of speakers is 2, yield None for the 3rd speaker
if hparams["num_spks"] == 2:
yield None
if hparams["use_wham_noise"]:
noise = noise * weight
yield noise
else:
yield None
sb.dataio.dataset.add_dynamic_item([train_data], audio_pipeline)
sb.dataio.dataset.set_output_keys(
[train_data],
["id", "mix_sig", "s1_sig", "s2_sig", "s3_sig", "noise_sig"],
)
train_data = torch.utils.data.DataLoader(
train_data,
batch_size=hparams["dataloader_opts"]["batch_size"],
num_workers=hparams["dataloader_opts"]["num_workers"],
collate_fn=PaddedBatch,
worker_init_fn=lambda x: np.random.seed(
int.from_bytes(os.urandom(4), "little") + x
),
)
return train_data