diff --git a/src/summary/summary/core/analytics.py b/src/summary/summary/core/analytics.py index 407f2d39..d88b8e98 100644 --- a/src/summary/summary/core/analytics.py +++ b/src/summary/summary/core/analytics.py @@ -112,19 +112,16 @@ class MetadataManager: if self._is_disabled or self.has_task_id(task_id): return - initial_metadata = { - "start_time": time.time(), - "asr_model": settings.whisperx_asr_model, - "retries": 0, - } - _, filename, email, _, received_at, *_ = task_args + start_time = time.time() initial_metadata = { - **initial_metadata, + "start_time": start_time, + "asr_model": settings.whisperx_asr_model, + "retries": 0, "filename": filename, "email": email, - "queuing_time": round(initial_metadata["start_time"] - received_at, 2), + "queuing_time": round(start_time - received_at, 2), } self._save_metadata(task_id, initial_metadata) diff --git a/src/summary/summary/core/celery_worker.py b/src/summary/summary/core/celery_worker.py index 0f1131a9..8ba10c30 100644 --- a/src/summary/summary/core/celery_worker.py +++ b/src/summary/summary/core/celery_worker.py @@ -10,9 +10,7 @@ import openai import sentry_sdk from celery import Celery, signals from celery.utils.log import get_task_logger -from requests import Session, exceptions -from requests.adapters import HTTPAdapter -from urllib3.util import Retry +from requests import exceptions from summary.core.analytics import MetadataManager, get_analytics from summary.core.config import get_settings @@ -30,6 +28,7 @@ from summary.core.prompt import ( PROMPT_USER_PART, ) from summary.core.transcript_formatter import TranscriptFormatter +from summary.core.webhook_service import submit_content settings = get_settings() analytics = get_analytics() @@ -56,103 +55,17 @@ if settings.sentry_dsn and settings.sentry_is_enabled: sentry_sdk.init(dsn=settings.sentry_dsn, enable_tracing=True) -file_service = FileService(logger=logger) +file_service = FileService() -def create_retry_session(): - """Create an HTTP session configured with retry logic.""" - session = Session() - retries = Retry( - total=settings.webhook_max_retries, - backoff_factor=settings.webhook_backoff_factor, - status_forcelist=settings.webhook_status_forcelist, - allowed_methods={"POST"}, - ) - session.mount("https://", HTTPAdapter(max_retries=retries)) - return session +def transcribe_audio(task_id, filename, language): + """Transcribe an audio file using WhisperX. + Downloads the audio from MinIO, sends it to WhisperX for transcription, + and tracks metadata throughout the process. -def format_actions(llm_output: dict) -> str: - """Format the actions from the LLM output into a markdown list. - - fomat: - - [ ] Action title Assignée à : assignee1, assignee2, Échéance : due_date + Returns the transcription object, or None if the file could not be retrieved. """ - lines = [] - for action in llm_output.get("actions", []): - title = action.get("title", "").strip() - assignees = ", ".join(action.get("assignees", [])) or "-" - due_date = action.get("due_date") or "-" - line = f"- [ ] {title} Assignée à : {assignees}, Échéance : {due_date}" - lines.append(line) - if lines: - return "### Prochaines étapes\n\n" + "\n".join(lines) - return "" - - -def post_with_retries(url, data): - """Send POST request with automatic retries.""" - session = create_retry_session() - session.headers.update( - {"Authorization": f"Bearer {settings.webhook_api_token.get_secret_value()}"} - ) - try: - response = session.post(url, json=data) - response.raise_for_status() - return response - finally: - session.close() - - -@celery.task( - bind=True, - autoretry_for=[exceptions.HTTPError], - max_retries=settings.celery_max_retries, - queue=settings.transcribe_queue, -) -def process_audio_transcribe_summarize_v2( - self, - owner_id: str, - filename: str, - email: str, - sub: str, - received_at: float, - room: Optional[str], - recording_date: Optional[str], - recording_time: Optional[str], - language: Optional[str], - download_link: Optional[str], - context_language: Optional[str] = None, -): - """Process an audio file by transcribing it and generating a summary. - - This Celery task performs the following operations: - 1. Retrieves the audio file from MinIO storage - 2. Transcribes the audio using WhisperX model - 3. Sends the results via webhook - - Args: - self: Celery task instance (passed on with bind=True) - owner_id: Unique identifier of the recording owner. - filename: Name of the audio file in MinIO storage. - email: Email address of the recording owner. - sub: OIDC subject identifier of the recording owner. - received_at: Unix timestamp when the recording was received. - room: room name where the recording took place. - recording_date: Date of the recording (localized display string). - recording_time: Time of the recording (localized display string). - language: ISO 639-1 language code for transcription. - download_link: URL to download the original recording. - context_language: ISO 639-1 language code of the meeting summary context text. - """ - logger.info( - "Notification received | Owner: %s | Room: %s", - owner_id, - room, - ) - - task_id = self.request.id - logger.info("Initiating WhisperX client") whisperx_client = openai.OpenAI( api_key=settings.whisperx_api_key.get_secret_value(), @@ -162,9 +75,7 @@ def process_audio_transcribe_summarize_v2( # Transcription try: - with ( - file_service.prepare_audio_file(filename) as (audio_file, metadata), - ): + with file_service.prepare_audio_file(filename) as (audio_file, metadata): metadata_manager.track(task_id, {"audio_length": metadata["duration"]}) if language is None: @@ -195,16 +106,32 @@ def process_audio_transcribe_summarize_v2( except FileServiceException: logger.exception("Unexpected error for filename: %s", filename) - return + return None metadata_manager.track_transcription_metadata(task_id, transcription) + return transcription - # For locale of context, use in decreasing priority context_language, - # language (of meeting), default context language + +def format_transcript( + transcription, + context_language, + language, + room, + recording_date, + recording_time, + download_link, +): + """Format a transcription into readable content with a title. + + Resolves the locale from context_language / language, then uses + TranscriptFormatter to produce markdown content and a title. + + Returns a (content, title) tuple. + """ locale = get_locale(context_language, language) formatter = TranscriptFormatter(locale) - content, title = formatter.format( + return formatter.format( transcription, room=room, recording_date=recording_date, @@ -212,32 +139,90 @@ def process_audio_transcribe_summarize_v2( download_link=download_link, ) - data = { - "title": title, - "content": content, - "email": email, - "sub": sub, - } - logger.debug("Submitting webhook to %s", settings.webhook_url) - logger.debug("Request payload: %s", json.dumps(data, indent=2)) +def format_actions(llm_output: dict) -> str: + """Format the actions from the LLM output into a markdown list. - response = post_with_retries(settings.webhook_url, data) + fomat: + - [ ] Action title Assignée à : assignee1, assignee2, Échéance : due_date + """ + lines = [] + for action in llm_output.get("actions", []): + title = action.get("title", "").strip() + assignees = ", ".join(action.get("assignees", [])) or "-" + due_date = action.get("due_date") or "-" + line = f"- [ ] {title} Assignée à : {assignees}, Échéance : {due_date}" + lines.append(line) + if lines: + return "### Prochaines étapes\n\n" + "\n".join(lines) + return "" - try: - response_data = response.json() - document_id = response_data.get("id", "N/A") - except (json.JSONDecodeError, AttributeError): - document_id = "Unable to parse response" - response_data = response.text +@celery.task( + bind=True, + autoretry_for=[exceptions.HTTPError], + max_retries=settings.celery_max_retries, + queue=settings.transcribe_queue, +) +def process_audio_transcribe_summarize_v2( + self, + owner_id: str, + filename: str, + email: str, + sub: str, + received_at: float, + room: Optional[str], + recording_date: Optional[str], + recording_time: Optional[str], + language: Optional[str], + download_link: Optional[str], + context_language: Optional[str] = None, +): + """Process an audio file by transcribing it and generating a summary. + + This Celery task orchestrates: + 1. Audio transcription via WhisperX + 2. Transcript formatting + 3. Webhook submission + 4. Conditional summarization queuing + + Args: + self: Celery task instance (passed on with bind=True) + owner_id: Unique identifier of the recording owner. + filename: Name of the audio file in MinIO storage. + email: Email address of the recording owner. + sub: OIDC subject identifier of the recording owner. + received_at: Unix timestamp when the recording was received. + room: room name where the recording took place. + recording_date: Date of the recording (localized display string). + recording_time: Time of the recording (localized display string). + language: ISO 639-1 language code for transcription. + download_link: URL to download the original recording. + context_language: ISO 639-1 language code of the meeting summary context text. + """ logger.info( - "Webhook success | Document %s submitted (HTTP %s)", - document_id, - response.status_code, + "Notification received | Owner: %s | Room: %s", + owner_id, + room, ) - logger.debug("Full response: %s", response_data) + task_id = self.request.id + + transcription = transcribe_audio(task_id, filename, language) + if transcription is None: + return + + content, title = format_transcript( + transcription, + context_language, + language, + room, + recording_date, + recording_time, + download_link, + ) + + submit_content(content, title, email, sub) metadata_manager.capture(task_id, settings.posthog_event_success) # LLM Summarization @@ -306,12 +291,11 @@ def summarize_transcription( # a singleton client. This is a performance trade-off we accept to ensure per-user # privacy controls in observability traces. llm_observability = LLMObservability( - logger=logger, user_has_tracing_consent=user_has_tracing_consent, session_id=self.request.id, user_id=owner_id, ) - llm_service = LLMService(llm_observability=llm_observability, logger=logger) + llm_service = LLMService(llm_observability=llm_observability) tldr = llm_service.call(PROMPT_SYSTEM_TLDR, transcript, name="tldr") @@ -354,20 +338,9 @@ def summarize_transcription( logger.info("Summary cleaned") summary = tldr + "\n\n" + cleaned_summary + "\n\n" + next_steps + summary_title = settings.summary_title_template.format(title=title) - data = { - "title": settings.summary_title_template.format(title=title), - "content": summary, - "email": email, - "sub": sub, - } - - logger.debug("Submitting webhook to %s", settings.webhook_url) - - response = post_with_retries(settings.webhook_url, data) - - logger.info("Webhook submitted successfully. Status: %s", response.status_code) - logger.debug("Response body: %s", response.text) + submit_content(summary, summary_title, email, sub) llm_observability.flush() logger.debug("LLM observability flushed") diff --git a/src/summary/summary/core/file_service.py b/src/summary/summary/core/file_service.py index e42e193a..80c63200 100644 --- a/src/summary/summary/core/file_service.py +++ b/src/summary/summary/core/file_service.py @@ -1,5 +1,6 @@ """File service to encapsulate files' manipulations.""" +import logging import os import subprocess import tempfile @@ -15,6 +16,9 @@ from summary.core.config import get_settings settings = get_settings() +logger = logging.getLogger(__name__) + + class FileServiceException(Exception): """Base exception for file service operations.""" @@ -24,10 +28,8 @@ class FileServiceException(Exception): class FileService: """Service for downloading and preparing files from MinIO storage.""" - def __init__(self, logger): + def __init__(self): """Initialize FileService with MinIO client and configuration.""" - self._logger = logger - endpoint = ( settings.aws_s3_endpoint_url.removeprefix("https://") .removeprefix("http://") @@ -53,16 +55,16 @@ class FileService: The file is downloaded to a temporary location for local manipulation such as validation, conversion, or processing before being used. """ - self._logger.info("Download recording | object_key: %s", remote_object_key) + logger.info("Download recording | object_key: %s", remote_object_key) if not remote_object_key: - self._logger.warning("Invalid object_key '%s'", remote_object_key) + logger.warning("Invalid object_key '%s'", remote_object_key) raise ValueError("Invalid object_key") extension = Path(remote_object_key).suffix.lower() if extension not in self._allowed_extensions: - self._logger.warning("Invalid file extension '%s'", extension) + logger.warning("Invalid file extension '%s'", extension) raise ValueError(f"Invalid file extension '{extension}'") response = None @@ -81,8 +83,8 @@ class FileService: tmp.flush() local_path = Path(tmp.name) - self._logger.info("Recording successfully downloaded") - self._logger.debug("Recording local file path: %s", local_path) + logger.info("Recording successfully downloaded") + logger.debug("Recording local file path: %s", local_path) return local_path @@ -100,7 +102,7 @@ class FileService: file_metadata = mutagen.File(local_path).info duration = file_metadata.length - self._logger.info( + logger.info( "Recording file duration: %.2f seconds", duration, ) @@ -109,14 +111,14 @@ class FileService: error_msg = "Recording too long. Limit is %.2fs seconds" % ( self._max_duration, ) - self._logger.error(error_msg) + logger.error(error_msg) raise ValueError(error_msg) return duration def _extract_audio_from_video(self, video_path: Path) -> Path: """Extract audio from video file (e.g., MP4) and save as audio file.""" - self._logger.info("Extracting audio from video file: %s", video_path) + logger.info("Extracting audio from video file: %s", video_path) with tempfile.NamedTemporaryFile( suffix=".m4a", delete=False, prefix="audio_extract_" @@ -140,16 +142,16 @@ class FileService: command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True ) - self._logger.info("Audio successfully extracted to: %s", output_path) + logger.info("Audio successfully extracted to: %s", output_path) return output_path except FileNotFoundError as e: - self._logger.error("ffmpeg not found. Please install ffmpeg.") + logger.error("ffmpeg not found. Please install ffmpeg.") if output_path.exists(): os.remove(output_path) raise RuntimeError("ffmpeg is not installed or not in PATH") from e except subprocess.CalledProcessError as e: - self._logger.error("Audio extraction failed: %s", e.stderr.decode()) + logger.error("Audio extraction failed: %s", e.stderr.decode()) if output_path.exists(): os.remove(output_path) raise RuntimeError("Failed to extract audio.") from e @@ -173,7 +175,7 @@ class FileService: extension = downloaded_path.suffix.lower() if extension in settings.recording_video_extensions: - self._logger.info("Video file detected, extracting audio...") + logger.info("Video file detected, extracting audio...") extracted_audio_path = self._extract_audio_from_video(downloaded_path) processed_path = extracted_audio_path else: @@ -194,8 +196,6 @@ class FileService: try: os.remove(path) - self._logger.debug("Temporary file removed: %s", path) + logger.debug("Temporary file removed: %s", path) except OSError as e: - self._logger.warning( - "Failed to remove temporary file %s: %s", path, e - ) + logger.warning("Failed to remove temporary file %s: %s", path, e) diff --git a/src/summary/summary/core/llm_service.py b/src/summary/summary/core/llm_service.py index d26d7ad8..e8b52b0c 100644 --- a/src/summary/summary/core/llm_service.py +++ b/src/summary/summary/core/llm_service.py @@ -1,5 +1,6 @@ """LLM service to encapsulate LLM's calls.""" +import logging from typing import Any, Mapping, Optional import openai @@ -10,6 +11,9 @@ from summary.core.config import get_settings settings = get_settings() +logger = logging.getLogger(__name__) + + class LLMObservability: """Manage observability and tracing for LLM calls using Langfuse. @@ -21,13 +25,11 @@ class LLMObservability: def __init__( self, - logger, session_id: str, user_id: str, user_has_tracing_consent: bool = False, ): """Initialize the LLMObservability client.""" - self._logger = logger self._observability_client: Optional[Langfuse] = None self.session_id = session_id self.user_id = user_id @@ -75,7 +77,7 @@ class LLMObservability: } if not self.is_enabled: - self._logger.debug("Using regular OpenAI client (observability disabled)") + logger.debug("Using regular OpenAI client (observability disabled)") return openai.OpenAI(**base_args) # Langfuse's OpenAI wrapper is imported here to avoid triggering client @@ -83,7 +85,7 @@ class LLMObservability: # is missing. Conditional import ensures Langfuse only initializes when enabled. from langfuse.openai import openai as langfuse_openai # noqa: PLC0415 - self._logger.debug("Using LangfuseOpenAI client (observability enabled)") + logger.debug("Using LangfuseOpenAI client (observability enabled)") return langfuse_openai.OpenAI(**base_args) def flush(self): @@ -99,11 +101,10 @@ class LLMException(Exception): class LLMService: """Service for performing calls to the LLM configured in the settings.""" - def __init__(self, llm_observability, logger): + def __init__(self, llm_observability): """Init the LLMService once.""" self._client = llm_observability.get_openai_client() self._observability = llm_observability - self._logger = logger def call( self, @@ -140,5 +141,5 @@ class LLMService: return response.choices[0].message.content except Exception as e: - self._logger.exception("LLM call failed: %s", e) + logger.exception("LLM call failed: %s", e) raise LLMException(f"LLM call failed: {e}") from e diff --git a/src/summary/summary/core/webhook_service.py b/src/summary/summary/core/webhook_service.py new file mode 100644 index 00000000..a5ff1cf6 --- /dev/null +++ b/src/summary/summary/core/webhook_service.py @@ -0,0 +1,73 @@ +"""Service for delivering content to external destinations.""" + +import json +import logging + +from requests import Session +from requests.adapters import HTTPAdapter +from urllib3.util import Retry + +from summary.core.config import get_settings + +settings = get_settings() + +logger = logging.getLogger(__name__) + + +def _create_retry_session(): + """Create an HTTP session configured with retry logic.""" + session = Session() + retries = Retry( + total=settings.webhook_max_retries, + backoff_factor=settings.webhook_backoff_factor, + status_forcelist=settings.webhook_status_forcelist, + allowed_methods={"POST"}, + ) + session.mount("https://", HTTPAdapter(max_retries=retries)) + return session + + +def _post_with_retries(url, data): + """Send POST request with automatic retries.""" + session = _create_retry_session() + session.headers.update( + {"Authorization": f"Bearer {settings.webhook_api_token.get_secret_value()}"} + ) + try: + response = session.post(url, json=data) + response.raise_for_status() + return response + finally: + session.close() + + +def submit_content(content, title, email, sub): + """Submit content to the configured webhook destination. + + Builds the payload, sends it with retries, and logs the outcome. + """ + data = { + "title": title, + "content": content, + "email": email, + "sub": sub, + } + + logger.debug("Submitting to %s", settings.webhook_url) + logger.debug("Request payload: %s", json.dumps(data, indent=2)) + + response = _post_with_retries(settings.webhook_url, data) + + try: + response_data = response.json() + document_id = response_data.get("id", "N/A") + except (json.JSONDecodeError, AttributeError): + document_id = "Unable to parse response" + response_data = response.text + + logger.info( + "Delivery success | Document %s submitted (HTTP %s)", + document_id, + response.status_code, + ) + logger.debug("Full response: %s", response_data)