-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtranscribesegment
More file actions
213 lines (173 loc) Β· 7.17 KB
/
transcribesegment
File metadata and controls
213 lines (173 loc) Β· 7.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#!/usr/bin/env python3
"""
Fix a specific transcription segment using faster-whisper.
Interactive flow:
1) Lists media files and lets you pick one
2) Asks for start/end times (HH:MM:SS,mmm)
3) Optional padding (pre/post seconds) to avoid cutting words at boundaries
4) Optional light denoise via FFmpeg (afftdn + high/low-pass + dynaudnorm)
5) Re-transcribes the extracted slice with beam_size=10, VAD, word timestamps
6) Writes an .srt with ABSOLUTE timestamps ready to merge
Requires:
- ffmpeg in PATH
- faster-whisper installed
- CUDA GPU recommended
"""
import os
import subprocess
import tempfile
import time
from faster_whisper import WhisperModel
# ---------- Helpers ----------
def list_media_files():
exts = ('.mkv', '.mp4', '.mp3', '.wav', '.m4a', '.aac', '.flac', '.mov')
files = [f for f in sorted(os.listdir()) if f.lower().endswith(exts)]
for idx, file in enumerate(files, 1):
print(f"{idx}: {file}")
return files
def pick_file(files):
while True:
sel = input("\nEnter the number of the file to fix: ").strip()
if sel.isdigit() and 1 <= int(sel) <= len(files):
return files[int(sel) - 1]
print("β Invalid selection. Try again.")
def parse_srt_time(t):
"""Convert 'HH:MM:SS,mmm' -> seconds (float)."""
h, m, s_ms = t.split(":")
s, ms = s_ms.split(",")
return int(h)*3600 + int(m)*60 + int(s) + int(ms)/1000.0
def format_time(seconds):
"""Convert float seconds -> 'HH:MM:SS,mmm' (clamped at 0)."""
if seconds < 0:
seconds = 0.0
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
ms = int((seconds - int(seconds)) * 1000)
return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02},{ms:03}"
def get_media_duration(input_file):
"""Return media duration (seconds) using ffprobe."""
try:
out = subprocess.check_output([
"ffprobe", "-v", "error", "-select_streams", "a:0",
"-show_entries", "format=duration", "-of", "default=nw=1:nk=1", input_file
], stderr=subprocess.STDOUT).decode().strip()
return float(out)
except Exception:
# Fallback: try on container if audio stream probe failed
try:
out = subprocess.check_output([
"ffprobe", "-v", "error",
"-show_entries", "format=duration", "-of", "default=nw=1:nk=1", input_file
], stderr=subprocess.STDOUT).decode().strip()
return float(out)
except Exception:
return None
def extract_audio_segment(input_file, start, end, pad_seconds=0.0, denoise=False):
"""
Use ffmpeg to extract the requested audio segment, with optional pre/post padding and denoise.
Returns path to a temp WAV (16kHz mono PCM).
"""
media_dur = get_media_duration(input_file)
start_padded = max(0.0, start - pad_seconds)
end_padded = end + pad_seconds
if media_dur is not None:
end_padded = min(end_padded, media_dur)
duration = max(0.01, end_padded - start_padded)
temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
# Light, model-free denoise chain (safe defaults):
# - afftdn: frequency-domain noise reduction
# - highpass/lowpass: remove rumble/hiss
# - dynaudnorm: tame dynamics, lift quieter speech a bit
af_chain = None
if denoise:
af_chain = "afftdn=nf=-25,highpass=f=80,lowpass=f=12000,dynaudnorm"
cmd = [
"ffmpeg", "-y",
"-ss", f"{start_padded:.3f}",
"-t", f"{duration:.3f}",
"-i", input_file,
"-vn", "-ac", "1", "-ar", "16000",
"-acodec", "pcm_s16le",
]
if af_chain:
cmd += ["-af", af_chain]
cmd += [temp_wav, "-loglevel", "error"]
subprocess.run(cmd, check=True)
return temp_wav, start_padded
# ---------- Main ----------
def main():
print("\nπ― Fix transcription segment (GPU Whisper)")
print("ββββββββββββββββββββββββββββββββββββββββ")
files = list_media_files()
if not files:
print("No media files found here.")
return
input_file = pick_file(files)
print("\nEnter the segment you want to re-transcribe (use exact SRT format HH:MM:SS,mmm):")
start_str = input("Start time (e.g. 00:15:33,950): ").strip()
end_str = input("End time (e.g. 00:20:33,950): ").strip()
# Optional padding & denoise
pad_in = input("Add context padding (seconds) [default 5]: ").strip()
pad_seconds = 5.0 if pad_in == "" else max(0.0, float(pad_in))
denoise_flag = input("Apply light denoise? (y/N): ").strip().lower().startswith("y")
start_s = parse_srt_time(start_str)
end_s = parse_srt_time(end_str)
if end_s <= start_s:
print("β End time must be greater than start time.")
return
print(f"\nπ§ Re-transcribing: {input_file}")
print(f"π Segment: {start_str} β {end_str} (Β±{pad_seconds:.1f}s padding)")
if denoise_flag:
print("π Denoise: ON (afftdn + high/low-pass + dynaudnorm)")
else:
print("π Denoise: OFF")
# --- Extract audio ---
print("π Extracting audio slice...")
segment_path, start_effective = extract_audio_segment(
input_file, start_s, end_s, pad_seconds=pad_seconds, denoise=denoise_flag
)
# --- Load model ---
# --- Load model ---
print("\nπ Loading Whisper model (large-v3-turbo)β¦")
try:
model = WhisperModel("large-v3-turbo", device="cuda", compute_type="float16")
except Exception:
print("β οΈ large-v3-turbo not found, using large-v3")
model = WhisperModel("large-v3", device="cuda", compute_type="float16")
# --- Choose language ---
lang_input = input("\nAuto-detect language or force one? (e.g. en, es, fr) [Enter=auto]: ").strip().lower()
language = lang_input if lang_input else None
print("\nπ Transcribing segment with beam_size=10, VAD, word timestampsβ¦\n")
start_time = time.time()
segments, info = model.transcribe(
segment_path,
beam_size=10,
vad_filter=True,
word_timestamps=True,
task="transcribe",
language=language
)
detected = info.language if hasattr(info, "language") else "?"
print(f"π Detected language: {detected.upper()}")
base = os.path.splitext(input_file)[0]
tag = f"FIX_{start_str.replace(':','-').replace(',','_')}"
srt_path = f"{base}.{tag}.srt"
with open(srt_path, "w", encoding="utf-8") as srt_file:
for i, seg in enumerate(segments, start=1):
abs_start = start_effective + seg.start
abs_end = start_effective + seg.end
start_time_str = format_time(abs_start)
end_time_str = format_time(abs_end)
text = seg.text.strip()
print(f"[{start_time_str} - {end_time_str}] {text}")
srt_file.write(f"{i}\n{start_time_str} --> {end_time_str}\n{text}\n\n")
end_time = time.time()
duration = end_time - start_time
print(f"\nβ
Fixed subtitles saved to: {srt_path}")
print(f"π Transcription completed in {duration:.1f} seconds\n")
try:
os.remove(segment_path)
except Exception:
pass
if __name__ == "__main__":
main()