33"""
44Сервис аннотации аудио для CallAnnotate
55
6+ Добавлено:
7+ - Дополнительная финальная нормализация сегментов после сборки (после маппинга слов и подсчёта текста):
8+ удаление сверхкоротких артефактов и слияние соседних сегментов одного спикера
9+ (порог min_segment_duration берём из конфига, merge_gap=0.05s).
10+ - Это гарантирует отсутствие микро-сегментов даже если они каким-то образом пройдут через предыдущие этапы.
11+
612Автор: akoodoy@capilot.ru
713Ссылка: https://github.com/momentics/CallAnnotate
814Лицензия: Apache-2.0
1319import shutil
1420from pathlib import Path
1521from datetime import datetime , timezone
16- from typing import Awaitable , Dict , Any , Callable , List , Optional
22+ from typing import Awaitable , Dict , Any , Callable , List , Optional , Tuple
1723
1824from .config import AppSettings
1925from .stages import PreprocessingStage , DiarizationStage , TranscriptionStage , RecognitionStage
@@ -36,24 +42,15 @@ def __init__(self, config: AppSettings):
3642
3743 self .logger = logging .getLogger (__name__ )
3844 self .stages = [
39- # дефектный препроцессинг. Оставлено на потом.
40- # PreprocessingStage(self.config, self.config.preprocess.dict(), models_registry),
45+ # PreprocessingStage(self.config, self.config.preprocess.dict(), models_registry),
4146 DiarizationStage (self .config , self .config .diarization .dict (), models_registry ),
4247 TranscriptionStage (self .config , self .config .transcription .dict (), models_registry ),
4348 RecognitionStage (self .config , self .config .recognition .dict (), models_registry ),
4449 CardDAVStage (self .config , self .config .carddav .dict (), models_registry )
4550 ]
4651 self .logger .info ("AnnotationService инициализирован с архитектурой этапов" )
47-
48- # DEBUG: отключаем все этапы кроме диаризации
49- # self.stages = [stage for stage in self.stages if isinstance(stage, DiarizationStage)
50- # or isinstance(stage, TranscriptionStage)
51- # #or isinstance(stage, PreprocessingStage)
52- # ]
53-
5452 self .logger .info (f"AnnotationService инициализирован len(stages)={ len (self .stages )} " )
5553
56-
5754 async def process_audio (
5855 self ,
5956 file_path : str ,
@@ -129,31 +126,136 @@ async def _update_progress(
129126 await result
130127 self .logger .info (f"Прогресс: { progress :3d} % – { message } " )
131128
129+ # ----------------------------- ВСПОМОГАТЕЛЬНЫЕ МЕТОДЫ -----------------------------
130+ def _overlap_ratio (self , a_start : float , a_end : float , b_start : float , b_end : float ) -> float :
131+ """
132+ Возвращает долю перекрытия от длины меньшего интервала.
133+ Используется для устойчивой оценки соответствия.
134+ """
135+ inter_start = max (a_start , b_start )
136+ inter_end = min (a_end , b_end )
137+ inter = max (0.0 , inter_end - inter_start )
138+ denom = max (1e-9 , min (a_end - a_start , b_end - b_start ))
139+ return inter / denom
140+
141+ def _assign_speakers_to_transcription (
142+ self ,
143+ diar_segments : List [Dict [str , Any ]],
144+ diar_label_to_id : Dict [str , str ],
145+ trans_words : List [Dict [str , Any ]],
146+ trans_segments : List [Dict [str , Any ]],
147+ min_overlap : float
148+ ) -> Tuple [List [Dict [str , Any ]], List [Dict [str , Any ]]]:
149+ """
150+ Присваивает спикеров словам и сегментам транскрипции на основании диаризации.
151+ """
152+ for w in trans_words :
153+ ws = float (w .get ("start" , 0.0 ))
154+ we = float (w .get ("end" , ws ))
155+ best_ratio = 0.0
156+ best_label = None
157+ for d in diar_segments :
158+ ratio = self ._overlap_ratio (ws , we , d ["start" ], d ["end" ])
159+ if ratio > best_ratio :
160+ best_ratio = ratio
161+ best_label = d ["speaker_label" ] if "speaker_label" in d else d ["speaker" ]
162+ if best_label and best_ratio >= min_overlap :
163+ w ["speaker" ] = diar_label_to_id .get (best_label , diar_label_to_id .get (str (best_label ), "unknown" ))
164+
165+ for s in trans_segments :
166+ ss = float (s .get ("start" , 0.0 ))
167+ se = float (s .get ("end" , ss ))
168+ best_ratio = 0.0
169+ best_label = None
170+ for d in diar_segments :
171+ ratio = self ._overlap_ratio (ss , se , d ["start" ], d ["end" ])
172+ if ratio > best_ratio :
173+ best_ratio = ratio
174+ best_label = d ["speaker_label" ] if "speaker_label" in d else d ["speaker" ]
175+ if best_label and best_ratio >= min_overlap :
176+ s ["speaker" ] = diar_label_to_id .get (best_label , diar_label_to_id .get (str (best_label ), "unknown" ))
177+ s ["speaker_confidence" ] = round (best_ratio , 3 )
178+
179+ return trans_words , trans_segments
180+
181+ def _merge_and_filter_final_segments (
182+ self ,
183+ segments : List [FinalSegment ],
184+ speakers_map : Dict [str , FinalSpeaker ],
185+ min_duration : float ,
186+ merge_gap : float = 0.05
187+ ) -> List [FinalSegment ]:
188+ """
189+ Финальная нормализация итоговых сегментов:
190+ - Отбрасываем сверхкороткие (duration < min_duration).
191+ - Сливаем соседние сегменты одного и того же speaker, если они стыкуются внутри merge_gap.
192+ Обновляем текст и слова при слиянии.
193+ """
194+ # Фильтр по длительности
195+ seq = [s for s in segments if float (s .duration ) >= float (min_duration )]
196+ if not seq :
197+ return []
198+
199+ # Сортировка по времени
200+ seq .sort (key = lambda s : (s .speaker , s .start ))
201+ merged : List [FinalSegment ] = []
202+ cur = seq [0 ]
203+
204+ for s in seq [1 :]:
205+ same = (s .speaker == cur .speaker )
206+ gap = float (s .start ) - float (cur .end )
207+ if same and gap >= - 1e-6 and gap <= merge_gap :
208+ # Слияние: конкатенируем текст и слова, обновляем границы
209+ cur .end = max (cur .end , s .end )
210+ cur .duration = round (float (cur .end ) - float (cur .start ), 3 )
211+ # Конкатенируем слова по времени
212+ words = list (cur .words ) + list (s .words )
213+ words .sort (key = lambda w : w .start )
214+ cur .words = words
215+ # Текст пересобираем из слов (надёжнее)
216+ cur .text = " " .join ((w .word or "" ).strip () for w in cur .words ).strip ()
217+ # Уверенность как среднее по словам (если нужны уточнения, можно изменить)
218+ if cur .words :
219+ cur .confidence = round (sum (w .probability for w in cur .words ) / max (1 , len (cur .words )), 3 )
220+ else :
221+ merged .append (cur )
222+ cur = s
223+ merged .append (cur )
224+
225+ # Возвращаем к порядку по времени (независимо от спикера)
226+ merged .sort (key = lambda s : s .start )
227+ return merged
228+
132229 def _build_final_annotation (
133230 self ,
134231 task_id : str ,
135232 audio_metadata : AudioMetadata ,
136233 context : Dict [str , Any ]
137234 ) -> AnnotationResult :
138- """
139- Собирает финальный результат аннотации.
140- Соответствует структуре docs/callannotate-schema.json.
141- """
142235 diar_result = context .get ("diarization" )
143236 trans_result = context .get ("transcription" )
144237 recog_result = context .get ("recognition" )
145238 carddav_result = context .get ("carddav" )
146239
147- # Собираем processing_info согласно схеме
148240 diar_cfg = self .config .diarization
149241 trans_cfg = self .config .transcription
150242 recog_cfg = self .config .recognition
151243
244+ diar_model_name = None
245+ diar_framework = None
246+ if diar_result and hasattr (diar_result , "model_info" ):
247+ diar_model_name = diar_result .model_info .get ("model_name" )
248+ diar_framework = diar_result .model_info .get ("framework" )
249+ if not diar_model_name :
250+ diar_model_name = diar_cfg .model
251+ if not diar_framework :
252+ diar_framework = "pyannote.audio"
253+
152254 diar_model_info = {
153255 "stage" : "diarization" ,
154- "model_name" : diar_result . model_info . get ( "model_name" ) if diar_result else None ,
256+ "model_name" : diar_model_name ,
155257 "device" : diar_cfg .device ,
156- "framework" : diar_result . model_info . get ( "framework" ) if diar_result else None
258+ "framework" : diar_framework
157259 }
158260
159261 trans_model_info = {
@@ -168,7 +270,7 @@ def _build_final_annotation(
168270 "model_name" : recog_cfg .model ,
169271 "device" : recog_cfg .device ,
170272 "threshold" : recog_cfg .threshold ,
171- "database_size" : len (context .get ("recognition" ).payload .get ("speakers" , {})), # type: ignore
273+ "database_size" : len (context .get ("recognition" ).payload .get ("speakers" , {})) if recog_result else 0 , # type: ignore
172274 "framework" : "SpeechBrain + FAISS"
173275 }
174276
@@ -191,15 +293,15 @@ def _build_final_annotation(
191293 speakers_map : Dict [str , FinalSpeaker ] = {}
192294 idx_counter = 1
193295 for seg in raw_diar :
194- label = seg ["speaker" ]
296+ label = seg . get ( "speaker_label" ) or seg ["speaker" ]
195297 if label not in speakers_map :
196298 spk_id = f"speaker_{ idx_counter :02d} "
197299 idx_counter += 1
198300 rec_info = raw_recog .get (label , {})
199301 init_conf = float (rec_info .get ("confidence" , 0.0 ))
200302 speakers_map [label ] = FinalSpeaker (
201303 id = spk_id ,
202- label = label ,
304+ label = str ( label ) ,
203305 segments_count = 0 ,
204306 total_duration = 0.0 ,
205307 identified = bool (rec_info .get ("identified" , False )),
@@ -209,68 +311,130 @@ def _build_final_annotation(
209311 confidence = round (init_conf , 3 )
210312 )
211313
314+ diar_label_to_id : Dict [str , str ] = {lbl : fs .id for lbl , fs in speakers_map .items ()}
315+
212316 segments : List [FinalSegment ] = []
213317 full_text_parts : List [str ] = []
214318 total_words = 0
215319 total_speech = 0.0
216320
217- words_all = raw_trans .get ("words" , [])
321+ words_all : List [Dict [str , Any ]] = list (raw_trans .get ("words" , []))
322+ trans_segments_raw : List [Dict [str , Any ]] = list (raw_trans .get ("segments" , []))
323+
324+ diar_for_map : List [Dict [str , Any ]] = []
325+ for d in raw_diar :
326+ diar_for_map .append ({
327+ "start" : float (d ["start" ]),
328+ "end" : float (d ["end" ]),
329+ "speaker_label" : d .get ("speaker_label" ) or d ["speaker" ]
330+ })
331+
332+ min_overlap = max (0.0 , min (1.0 , float (self .config .transcription .min_overlap )))
333+
334+ words_all , trans_segments_raw = self ._assign_speakers_to_transcription (
335+ diar_segments = diar_for_map ,
336+ diar_label_to_id = diar_label_to_id ,
337+ trans_words = words_all ,
338+ trans_segments = trans_segments_raw ,
339+ min_overlap = min_overlap
340+ )
341+
342+ # Формируем финальные сегменты по диаризации
218343 for idx , d in enumerate (raw_diar , start = 1 ):
219- start , end = d ["start" ], d ["end" ]
344+ start , end = float ( d ["start" ]), float ( d ["end" ])
220345 duration = round (end - start , 3 )
221- spk = speakers_map [d ["speaker" ]]
346+ label = d .get ("speaker_label" ) or d ["speaker" ]
347+ spk = speakers_map [label ]
222348 spk .segments_count += 1
223349 spk .total_duration += duration
224350 total_speech += duration
225351
226352 words_in = [
227353 w for w in words_all
228- if w [ "start" ] < end and w [ "end" ] > start
354+ if float ( w . get ( "start" , 0.0 )) < end and float ( w . get ( "end" , 0.0 )) > start
229355 ]
230- text = " " . join ( w [ "word" ] for w in words_in ). strip ( )
356+ words_in . sort ( key = lambda w : float ( w . get ( "start" , 0.0 )) )
231357
232- seg_conf = d .get ("confidence" , 0.0 )
358+ text = " " .join ((w .get ("word" ) or "" ).strip () for w in words_in ).strip ()
359+
360+ seg_conf = float (d .get ("confidence" , 0.0 ))
233361 if seg_conf == 0.0 and words_in :
234- seg_conf = sum (w [ "probability" ] for w in words_in ) / len (words_in )
362+ seg_conf = sum (float ( w . get ( "probability" , 0.0 )) for w in words_in ) / max ( 1 , len (words_in ) )
235363
236364 segments .append (FinalSegment (
237365 id = idx ,
238366 start = round (start , 3 ),
239367 end = round (end , 3 ),
240368 duration = duration ,
241369 speaker = spk .id ,
242- speaker_label = d [ "speaker" ] ,
370+ speaker_label = str ( label ) ,
243371 text = text ,
244- words = [TranscriptionWord (** w ) for w in words_in ],
372+ words = [TranscriptionWord (** {
373+ "start" : float (w ["start" ]),
374+ "end" : float (w ["end" ]),
375+ "word" : str (w ["word" ]),
376+ "probability" : float (w .get ("probability" , 0.0 )),
377+ "speaker" : str (w .get ("speaker" , spk .id ))
378+ }) for w in words_in ],
245379 confidence = round (seg_conf , 3 )
246380 ))
247- full_text_parts .append (f"[{ spk .id } ]: { text } " )
381+ if text :
382+ full_text_parts .append (f"[{ spk .id } ]: { text } " )
383+
248384 total_words += len (words_in )
249385
250- overall_conf = raw_trans .get ("confidence" , 0.0 )
251- if overall_conf == 0.0 and words_all :
252- overall_conf = sum (w ["probability" ] for w in words_all ) / len (words_all )
386+ # Финальная нормализация итоговых сегментов: фильтр + слияние.
387+ min_seg = float (self .config .transcription .min_segment_duration )
388+ segments = self ._merge_and_filter_final_segments (
389+ segments = segments ,
390+ speakers_map = speakers_map ,
391+ min_duration = min_seg ,
392+ merge_gap = 0.05
393+ )
394+
395+ # Пересобираем full_text после нормализации
396+ full_text_parts = []
397+ for seg in segments :
398+ if seg .text :
399+ full_text_parts .append (f"[{ seg .speaker } ]: { seg .text } " )
400+
401+ overall_conf = float (raw_trans .get ("confidence" , 0.0 ))
402+ all_words_flat : List [TranscriptionWord ] = []
403+ for seg in segments :
404+ all_words_flat .extend (seg .words )
405+ if overall_conf == 0.0 and all_words_flat :
406+ overall_conf = sum (float (w .probability ) for w in all_words_flat ) / max (1 , len (all_words_flat ))
407+
408+ fixed_trans_segments : List [TranscriptionSegment ] = []
409+ for s in trans_segments_raw :
410+ s_start = float (s .get ("start" , 0.0 ))
411+ s_end = float (s .get ("end" , s_start ))
412+ s_text = s .get ("text" )
413+ if not s_text :
414+ ws = [w for w in all_words_flat if float (w .start ) < s_end and float (w .end ) > s_start ]
415+ ws .sort (key = lambda w : float (w .start ))
416+ s_text = " " .join ((w .word or "" ).strip () for w in ws ).strip ()
417+
418+ fixed_trans_segments .append (TranscriptionSegment (
419+ start = round (s_start , 3 ),
420+ end = round (s_end , 3 ),
421+ text = s_text or "" ,
422+ speaker = s .get ("speaker" ),
423+ speaker_confidence = s .get ("speaker_confidence" ),
424+ no_speech_prob = s .get ("no_speech_prob" ),
425+ avg_logprob = s .get ("avg_logprob" ),
426+ confidence = s .get ("confidence" )
427+ ))
253428
254429 transcription = FinalTranscription (
255430 full_text = "\n " .join (full_text_parts ),
256431 confidence = round (overall_conf , 3 ),
257432 language = raw_trans .get ("language" , "unknown" ),
258- segments = [
259- TranscriptionSegment (
260- start = s ["start" ],
261- end = s ["end" ],
262- text = s .get ("text" , "" ),
263- speaker = s .get ("speaker" ),
264- speaker_confidence = s .get ("speaker_confidence" ),
265- no_speech_prob = s .get ("no_speech_prob" ),
266- avg_logprob = s .get ("avg_logprob" ),
267- confidence = s .get ("confidence" )
268- )
269- for s in raw_trans .get ("segments" , [])
270- ],
271- #words=[TranscriptionWord(**w) for w in words_all]
433+ segments = fixed_trans_segments ,
434+ # Список слов возвращаем в составе fin-сегментов; при необходимости можно добавить общим списком
272435 )
273436
437+ # CardDAV сопоставление, если доступно
274438 if carddav_result :
275439 cd_speakers = carddav_result .payload .get ("speakers" , {})
276440 for label , spk in speakers_map .items ():
@@ -284,9 +448,9 @@ def _build_final_annotation(
284448 identified_speakers = sum (1 for s in final_speakers if s .identified ),
285449 unknown_speakers = sum (1 for s in final_speakers if not s .identified ),
286450 total_segments = len (segments ),
287- total_words = total_words ,
288- speech_duration = round (total_speech , 3 ),
289- silence_duration = round (max (0.0 , audio_metadata .duration - total_speech ), 3 )
451+ total_words = sum ( len ( s . words ) for s in segments ) ,
452+ speech_duration = round (sum ( s . duration for s in segments ) , 3 ),
453+ silence_duration = round (max (0.0 , audio_metadata .duration - sum ( s . duration for s in segments ) ), 3 )
290454 )
291455
292456 return AnnotationResult (
0 commit comments