-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpyPitchCom.py
193 lines (160 loc) · 6.04 KB
/
pyPitchCom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#!/usr/bin/python2
import sys
import wave
from math import sqrt
import numpy as np
def ReadWaveFile(filename):
"""
Open wave file specified by filename and return data.
Return: (waveInfo, wave_data)
waveInfo is a dict, contains following items:
"nchannels", "framerate", "nframes", "samplewidth"
wave_data is a 2-dimension numpy array, the nth channel's
data can be access by wave_data[:, n]
Warn: This function has an assumption that the
wave file should be in format of 16-bit integer (short)
"""
f = wave.open(filename, 'rb')
waveInfo = dict()
waveInfo["nchannels"] = f.getnchannels()
waveInfo["framerate"] = f.getframerate()
waveInfo["nframes"] = f.getnframes()
waveInfo["samplewidth"] = f.getsampwidth()
str_data = f.readframes(waveInfo["nframes"])
# np.short is 16-bit length
wave_data = np.fromstring(str_data, dtype=np.short)
wave_data = wave_data.astype(np.float16)
wave_data /= 32768.0
wave_data.shape = -1, waveInfo["nchannels"]
return waveInfo, wave_data
def EnergyNorm(dataMatrix):
"""
Normalize the energy of signals in dataMatrix.
Arguments:
dataMatrix:
this is a numpy array which contains several sequences,
dataMatrix[:, n] is the n-th sequence.
Return:
newDataMatrix:
a 2-dimention numpy array after energy normalization.
"""
nSeq = dataMatrix.shape[1]
nLength = dataMatrix.shape[0]
FRAME_LEN = 400
FRAME_SHIFT = 100
D = 2048
WINDOW = np.hanning(FRAME_LEN).reshape(-1,1)
nFrame = (nLength - 2 * D) / FRAME_SHIFT + 1
newDataMatrix = np.zeros_like(dataMatrix, np.float16)
for nthData in range(nSeq):
for i in range(nFrame):
framePos = i * FRAME_SHIFT - FRAME_LEN / 2 + D
windowedWaveBuffer = dataMatrix[framePos: framePos + FRAME_LEN, nthData].reshape(-1,1) * WINDOW
frameEnergy = sqrt(np.sum(windowedWaveBuffer * windowedWaveBuffer))
newDataMatrix[framePos: framePos + FRAME_LEN, nthData] += (windowedWaveBuffer * WINDOW / frameEnergy).ravel()
return newDataMatrix
#@profile
def AutoCorr(dataMatrix):
"""
Compute auto-correlation function of one signal in dataMatrix
Arguments:
dataMatrix:
this is a numpy array which contains several sequences,
dataMatrix[:, n] is the n-th sequence.
Return:
(corr, score):
corr is the result directly computed by auto-correlation function
of each delta (range in 0 to CORR_N). It is a 2-dimention numpy array,
of which the 1st dimention refers to each frame, and the 2nd refers to
the value of delta.
score is result after a simple procession.
Note: This function can only process one signal in input dataMatrix.
"""
FRAME_SHIFT = 160
FRAME_LEN = 400
CORR_N = 300
CORR_S = 16
D = 2048
COEFF = np.linspace(float(2) - float(CORR_S)/float(CORR_N), 1, CORR_N - CORR_S + 1)
COEFF = np.append(np.zeros(15), COEFF)
nSeq = dataMatrix.shape[1]
nLength = dataMatrix.shape[0]
WINDOW = np.hanning(FRAME_LEN).reshape(-1,1)
nFrame = (nLength - 2 * D) / FRAME_SHIFT + 1
score = np.zeros((nFrame, CORR_N), np.float16)
corr = np.zeros((nFrame, CORR_N), np.float16)
for nthData in range(nSeq):
for i in range(nFrame):
framePos = (i - 1) * FRAME_SHIFT - FRAME_LEN / 2 + D
windowedFrameBuffer = dataMatrix[framePos: framePos + FRAME_LEN, nthData].reshape(-1,1) * WINDOW
for delta in range(CORR_N):
corr[i][delta] = windowedFrameBuffer.ravel().dot(dataMatrix[framePos + delta: framePos + FRAME_LEN + delta].ravel())
# Method 1: fast but has some difference with the original one
#
# score[i] = corr[i] * COEFF
# tmp = (corr[i]<0)
# score[i][tmp] = corr[i][tmp]
# Method 2: the original method, very slow.
#
# for delta in range(CORR_S, CORR_N):
# if corr[i][delta] > 0:
# score[i][delta] = corr[i][delta] * (2 * CORR_N - delta) / CORR_N
# else:
# pass
# Use no coeffs
score = corr
return (corr, score)
def ZeroAdding(D, dataMatrix):
"""
Add zeros to data.
Arguments:
D:
there will be 2*D zeros to be added before the beginning
and after the ending of data sequences.
dataMatrix:
this is a numpy array which contains several sequences,
dataMatrix[:, n] is the n-th sequence.
Return:
newDataMatrix:
also a numpy array after adding zeros to dataMatrix.
"""
nSeq = dataMatrix.shape[1]
nLength = dataMatrix.shape[0]
data2Add = np.zeros((D, nSeq), np.float16)
return np.concatenate((data2Add, dataMatrix, data2Add), axis=0)
def RandAdding(D, dataMatrix):
"""
Add random values to data.
Arguments:
D:
there will be 2*D random values to be added before the beginning
and after the ending of data sequences.
dataMatrix:
this is a numpy array which contains several sequences,
dataMatrix[:, n] is the n-th sequence.
Return:
newDataMatrix:
also a numpy array after adding random values to dataMatrix.
"""
nSeq = dataMatrix.shape[1]
nLength = dataMatrix.shape[0]
# random value's range is [-1,1], a uniform distribution.
data2Add = np.random.rand(D, nSeq) * 2 - 1
dataMatrix += (np.random.rand(nLength, nSeq) - 0.5)* 0.0001
return np.concatenate((data2Add, dataMatrix, data2Add), axis=0)
if __name__ == "__main__":
import pylab as pl
from pprint import pprint
waveInfo, wave_data = ReadWaveFile(sys.argv[1])
pprint(waveInfo)
sig0 = RandAdding(2048, wave_data)
sig1 = EnergyNorm(sig0)
corr,score = AutoCorr(sig1)
x, y = np.mgrid[:score.shape[0], 1:301]
fig, ax = pl.subplots()
mesh = ax.pcolormesh(x,y,score)
pl.colorbar(mesh)
pl.xlabel("Time(nframe)")
pl.ylabel("Delta")
pl.title("ACF-gram")
pl.show()