♻️(summary) refactor transcript formatting into unified handler class

Consolidate scattered transcript formatting functions into single
cohesive class encapsulating all transcript processing logic
for better maintainability and clearer separation of concerns.

Add transcript cleaning step to remove spurious recognition artifacts
like randomly predicted "Vap'n'Roll Thierry" phrases that appear
without corresponding audio, improving transcript quality
by filtering model hallucinations.
This commit is contained in:
Martin Guitteny
2025-10-22 15:15:33 +02:00
committed by aleb_the_flash
parent fba879e739
commit ad494f5de5
3 changed files with 137 additions and 68 deletions

View File

@@ -31,6 +31,7 @@ from summary.core.prompt import (
PROMPT_SYSTEM_TLDR,
PROMPT_USER_PART,
)
from summary.core.transcript_formatter import TranscriptFormatter
settings = get_settings()
analytics = get_analytics()
@@ -56,28 +57,6 @@ if settings.sentry_dsn and settings.sentry_is_enabled:
sentry_sdk.init(dsn=settings.sentry_dsn, enable_tracing=True)
DEFAULT_EMPTY_TRANSCRIPTION = """
**Aucun contenu audio na été détecté dans votre transcription.**
*Si vous pensez quil sagit dune erreur, nhésitez pas à contacter
notre support technique : visio@numerique.gouv.fr*
.
.
.
Quelques points que nous vous conseillons de vérifier :
- Un micro était-il activé ?
- Étiez-vous suffisamment proche ?
- Le micro est-il de bonne qualité ?
- Lenregistrement dure-t-il plus de 30 secondes ?
"""
class AudioValidationError(Exception):
"""Custom exception for audio validation errors."""
@@ -166,36 +145,6 @@ def format_actions(llm_output: dict) -> str:
return ""
def format_segments(transcription_data):
"""Format transcription segments from WhisperX into a readable conversation format.
Processes transcription data with segments containing speaker information and text,
combining consecutive segments from the same speaker and formatting them as a
conversation with speaker labels.
"""
formatted_output = ""
if not transcription_data or not hasattr(transcription_data, "segments"):
if isinstance(transcription_data, dict) and "segments" in transcription_data:
segments = transcription_data["segments"]
else:
return "Error: Invalid transcription data format"
else:
segments = transcription_data.segments
previous_speaker = None
for segment in segments:
speaker = segment.get("speaker", "UNKNOWN_SPEAKER")
text = segment.get("text", "")
if text:
if speaker != previous_speaker:
formatted_output += f"\n\n **{speaker}**: {text}"
else:
formatted_output += f" {text}"
previous_speaker = speaker
return formatted_output
def post_with_retries(url, data):
"""Send POST request with automatic retries."""
session = create_retry_session()
@@ -306,25 +255,20 @@ def process_audio_transcribe_summarize_v2(
os.remove(temp_file_path)
logger.debug("Temporary file removed: %s", temp_file_path)
formatted_transcription = (
DEFAULT_EMPTY_TRANSCRIPTION
if not transcription.segments
else format_segments(transcription)
)
metadata_manager.track_transcription_metadata(task_id, transcription)
if not room or not recording_date or not recording_time:
title = settings.document_default_title
else:
title = settings.document_title_template.format(
room=room,
room_recording_date=recording_date,
room_recording_time=recording_time,
)
formatter = TranscriptFormatter()
content, title = formatter.format(
transcription,
room=room,
recording_date=recording_date,
recording_time=recording_time,
)
data = {
"title": title,
"content": formatted_transcription,
"content": content,
"email": email,
"sub": sub,
}
@@ -356,7 +300,7 @@ def process_audio_transcribe_summarize_v2(
):
logger.info("Queuing summary generation task.")
summarize_transcription.apply_async(
args=[formatted_transcription, email, sub, title],
args=[content, email, sub, title],
queue=settings.summarize_queue,
)
else:

View File

@@ -45,6 +45,10 @@ class Settings(BaseSettings):
llm_api_key: str
llm_model: str
# Transcription processing
hallucination_patterns: List[str] = ["Vap'n'Roll Thierry"]
hallucination_replacement_text: str = "[Texte impossible à transcrire]"
# Webhook-related settings
webhook_max_retries: int = 2
webhook_status_forcelist: List[int] = [502, 503, 504]

View File

@@ -0,0 +1,121 @@
"""Transcript formatting into readable conversation format with speaker labels."""
import logging
from typing import Optional, Tuple
from summary.core.config import get_settings
settings = get_settings()
logger = logging.getLogger(__name__)
DEFAULT_EMPTY_TRANSCRIPTION = """
**Aucun contenu audio na été détecté dans votre transcription.**
*Si vous pensez quil sagit dune erreur, nhésitez pas à contacter
notre support technique : visio@numerique.gouv.fr*
.
.
.
Quelques points que nous vous conseillons de vérifier :
- Un micro était-il activé ?
- Étiez-vous suffisamment proche ?
- Le micro est-il de bonne qualité ?
- Lenregistrement dure-t-il plus de 30 secondes ?
"""
class TranscriptFormatter:
"""Formats WhisperX transcription output into readable conversation format.
Handles:
- Extracting segments from transcription objects or dictionaries
- Combining consecutive segments from the same speaker
- Removing hallucination patterns from content
- Generating descriptive titles from context
"""
def __init__(self):
"""Initialize formatter with settings."""
self.hallucination_patterns = settings.hallucination_patterns
self.hallucination_replacement_text = settings.hallucination_replacement_text
self.default_title = settings.document_default_title
self.default_empty_transcription = DEFAULT_EMPTY_TRANSCRIPTION
def _get_segments(self, transcription):
"""Extract segments from transcription object or dictionary."""
if hasattr(transcription, "segments"):
return transcription.segments
if isinstance(transcription, dict):
return transcription.get("segments", None)
return None
def format(
self,
transcription,
room: Optional[str] = None,
recording_date: Optional[str] = None,
recording_time: Optional[str] = None,
) -> Tuple[str, str]:
"""Format transcription into the final document and its title."""
segments = self._get_segments(transcription)
if not segments:
content = self.default_empty_transcription
else:
content = self._format_speaker(segments)
content = self._remove_hallucinations(content)
title = self._generate_title(room, recording_date, recording_time)
return content, title
def _remove_hallucinations(self, content: str) -> str:
"""Remove hallucination patterns from content."""
replacement = self.hallucination_replacement_text or ""
for pattern in self.hallucination_patterns:
content = content.replace(pattern, replacement)
return content
def _format_speaker(self, segments) -> str:
"""Format segments with speaker labels, combining consecutive speakers."""
formatted_output = ""
previous_speaker = None
for segment in segments:
speaker = segment.get("speaker", "UNKNOWN_SPEAKER")
text = segment.get("text", "")
if text:
if speaker != previous_speaker:
formatted_output += f"\n\n **{speaker}**: {text}"
else:
formatted_output += f" {text}"
previous_speaker = speaker
return formatted_output
def _generate_title(
self,
room: Optional[str] = None,
recording_date: Optional[str] = None,
recording_time: Optional[str] = None,
) -> str:
"""Generate title from context or return default."""
if not room or not recording_date or not recording_time:
return self.default_title
return settings.document_title_template.format(
room=room,
room_recording_date=recording_date,
room_recording_time=recording_time,
)