♻️(summary) clean up code and unify logging in preparation for testing

Refactor the summary service to better separate concerns, making components
easier to isolate and test. Unify logging logic to ensure consistent
behavior and reduce duplication across the service layer. These changes
set up the codebase for granular testing.
This commit is contained in:
leo
2026-03-03 15:08:35 +01:00
committed by aleb_the_flash
parent 25167495cc
commit 14526808ab
5 changed files with 214 additions and 170 deletions

View File

@@ -112,19 +112,16 @@ class MetadataManager:
if self._is_disabled or self.has_task_id(task_id):
return
initial_metadata = {
"start_time": time.time(),
"asr_model": settings.whisperx_asr_model,
"retries": 0,
}
_, filename, email, _, received_at, *_ = task_args
start_time = time.time()
initial_metadata = {
**initial_metadata,
"start_time": start_time,
"asr_model": settings.whisperx_asr_model,
"retries": 0,
"filename": filename,
"email": email,
"queuing_time": round(initial_metadata["start_time"] - received_at, 2),
"queuing_time": round(start_time - received_at, 2),
}
self._save_metadata(task_id, initial_metadata)

View File

@@ -10,9 +10,7 @@ import openai
import sentry_sdk
from celery import Celery, signals
from celery.utils.log import get_task_logger
from requests import Session, exceptions
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from requests import exceptions
from summary.core.analytics import MetadataManager, get_analytics
from summary.core.config import get_settings
@@ -30,6 +28,7 @@ from summary.core.prompt import (
PROMPT_USER_PART,
)
from summary.core.transcript_formatter import TranscriptFormatter
from summary.core.webhook_service import submit_content
settings = get_settings()
analytics = get_analytics()
@@ -56,103 +55,17 @@ if settings.sentry_dsn and settings.sentry_is_enabled:
sentry_sdk.init(dsn=settings.sentry_dsn, enable_tracing=True)
file_service = FileService(logger=logger)
file_service = FileService()
def create_retry_session():
"""Create an HTTP session configured with retry logic."""
session = Session()
retries = Retry(
total=settings.webhook_max_retries,
backoff_factor=settings.webhook_backoff_factor,
status_forcelist=settings.webhook_status_forcelist,
allowed_methods={"POST"},
)
session.mount("https://", HTTPAdapter(max_retries=retries))
return session
def transcribe_audio(task_id, filename, language):
"""Transcribe an audio file using WhisperX.
Downloads the audio from MinIO, sends it to WhisperX for transcription,
and tracks metadata throughout the process.
def format_actions(llm_output: dict) -> str:
"""Format the actions from the LLM output into a markdown list.
fomat:
- [ ] Action title Assignée à : assignee1, assignee2, Échéance : due_date
Returns the transcription object, or None if the file could not be retrieved.
"""
lines = []
for action in llm_output.get("actions", []):
title = action.get("title", "").strip()
assignees = ", ".join(action.get("assignees", [])) or "-"
due_date = action.get("due_date") or "-"
line = f"- [ ] {title} Assignée à : {assignees}, Échéance : {due_date}"
lines.append(line)
if lines:
return "### Prochaines étapes\n\n" + "\n".join(lines)
return ""
def post_with_retries(url, data):
"""Send POST request with automatic retries."""
session = create_retry_session()
session.headers.update(
{"Authorization": f"Bearer {settings.webhook_api_token.get_secret_value()}"}
)
try:
response = session.post(url, json=data)
response.raise_for_status()
return response
finally:
session.close()
@celery.task(
bind=True,
autoretry_for=[exceptions.HTTPError],
max_retries=settings.celery_max_retries,
queue=settings.transcribe_queue,
)
def process_audio_transcribe_summarize_v2(
self,
owner_id: str,
filename: str,
email: str,
sub: str,
received_at: float,
room: Optional[str],
recording_date: Optional[str],
recording_time: Optional[str],
language: Optional[str],
download_link: Optional[str],
context_language: Optional[str] = None,
):
"""Process an audio file by transcribing it and generating a summary.
This Celery task performs the following operations:
1. Retrieves the audio file from MinIO storage
2. Transcribes the audio using WhisperX model
3. Sends the results via webhook
Args:
self: Celery task instance (passed on with bind=True)
owner_id: Unique identifier of the recording owner.
filename: Name of the audio file in MinIO storage.
email: Email address of the recording owner.
sub: OIDC subject identifier of the recording owner.
received_at: Unix timestamp when the recording was received.
room: room name where the recording took place.
recording_date: Date of the recording (localized display string).
recording_time: Time of the recording (localized display string).
language: ISO 639-1 language code for transcription.
download_link: URL to download the original recording.
context_language: ISO 639-1 language code of the meeting summary context text.
"""
logger.info(
"Notification received | Owner: %s | Room: %s",
owner_id,
room,
)
task_id = self.request.id
logger.info("Initiating WhisperX client")
whisperx_client = openai.OpenAI(
api_key=settings.whisperx_api_key.get_secret_value(),
@@ -162,9 +75,7 @@ def process_audio_transcribe_summarize_v2(
# Transcription
try:
with (
file_service.prepare_audio_file(filename) as (audio_file, metadata),
):
with file_service.prepare_audio_file(filename) as (audio_file, metadata):
metadata_manager.track(task_id, {"audio_length": metadata["duration"]})
if language is None:
@@ -195,16 +106,32 @@ def process_audio_transcribe_summarize_v2(
except FileServiceException:
logger.exception("Unexpected error for filename: %s", filename)
return
return None
metadata_manager.track_transcription_metadata(task_id, transcription)
return transcription
# For locale of context, use in decreasing priority context_language,
# language (of meeting), default context language
def format_transcript(
transcription,
context_language,
language,
room,
recording_date,
recording_time,
download_link,
):
"""Format a transcription into readable content with a title.
Resolves the locale from context_language / language, then uses
TranscriptFormatter to produce markdown content and a title.
Returns a (content, title) tuple.
"""
locale = get_locale(context_language, language)
formatter = TranscriptFormatter(locale)
content, title = formatter.format(
return formatter.format(
transcription,
room=room,
recording_date=recording_date,
@@ -212,32 +139,90 @@ def process_audio_transcribe_summarize_v2(
download_link=download_link,
)
data = {
"title": title,
"content": content,
"email": email,
"sub": sub,
}
logger.debug("Submitting webhook to %s", settings.webhook_url)
logger.debug("Request payload: %s", json.dumps(data, indent=2))
def format_actions(llm_output: dict) -> str:
"""Format the actions from the LLM output into a markdown list.
response = post_with_retries(settings.webhook_url, data)
fomat:
- [ ] Action title Assignée à : assignee1, assignee2, Échéance : due_date
"""
lines = []
for action in llm_output.get("actions", []):
title = action.get("title", "").strip()
assignees = ", ".join(action.get("assignees", [])) or "-"
due_date = action.get("due_date") or "-"
line = f"- [ ] {title} Assignée à : {assignees}, Échéance : {due_date}"
lines.append(line)
if lines:
return "### Prochaines étapes\n\n" + "\n".join(lines)
return ""
try:
response_data = response.json()
document_id = response_data.get("id", "N/A")
except (json.JSONDecodeError, AttributeError):
document_id = "Unable to parse response"
response_data = response.text
@celery.task(
bind=True,
autoretry_for=[exceptions.HTTPError],
max_retries=settings.celery_max_retries,
queue=settings.transcribe_queue,
)
def process_audio_transcribe_summarize_v2(
self,
owner_id: str,
filename: str,
email: str,
sub: str,
received_at: float,
room: Optional[str],
recording_date: Optional[str],
recording_time: Optional[str],
language: Optional[str],
download_link: Optional[str],
context_language: Optional[str] = None,
):
"""Process an audio file by transcribing it and generating a summary.
This Celery task orchestrates:
1. Audio transcription via WhisperX
2. Transcript formatting
3. Webhook submission
4. Conditional summarization queuing
Args:
self: Celery task instance (passed on with bind=True)
owner_id: Unique identifier of the recording owner.
filename: Name of the audio file in MinIO storage.
email: Email address of the recording owner.
sub: OIDC subject identifier of the recording owner.
received_at: Unix timestamp when the recording was received.
room: room name where the recording took place.
recording_date: Date of the recording (localized display string).
recording_time: Time of the recording (localized display string).
language: ISO 639-1 language code for transcription.
download_link: URL to download the original recording.
context_language: ISO 639-1 language code of the meeting summary context text.
"""
logger.info(
"Webhook success | Document %s submitted (HTTP %s)",
document_id,
response.status_code,
"Notification received | Owner: %s | Room: %s",
owner_id,
room,
)
logger.debug("Full response: %s", response_data)
task_id = self.request.id
transcription = transcribe_audio(task_id, filename, language)
if transcription is None:
return
content, title = format_transcript(
transcription,
context_language,
language,
room,
recording_date,
recording_time,
download_link,
)
submit_content(content, title, email, sub)
metadata_manager.capture(task_id, settings.posthog_event_success)
# LLM Summarization
@@ -306,12 +291,11 @@ def summarize_transcription(
# a singleton client. This is a performance trade-off we accept to ensure per-user
# privacy controls in observability traces.
llm_observability = LLMObservability(
logger=logger,
user_has_tracing_consent=user_has_tracing_consent,
session_id=self.request.id,
user_id=owner_id,
)
llm_service = LLMService(llm_observability=llm_observability, logger=logger)
llm_service = LLMService(llm_observability=llm_observability)
tldr = llm_service.call(PROMPT_SYSTEM_TLDR, transcript, name="tldr")
@@ -354,20 +338,9 @@ def summarize_transcription(
logger.info("Summary cleaned")
summary = tldr + "\n\n" + cleaned_summary + "\n\n" + next_steps
summary_title = settings.summary_title_template.format(title=title)
data = {
"title": settings.summary_title_template.format(title=title),
"content": summary,
"email": email,
"sub": sub,
}
logger.debug("Submitting webhook to %s", settings.webhook_url)
response = post_with_retries(settings.webhook_url, data)
logger.info("Webhook submitted successfully. Status: %s", response.status_code)
logger.debug("Response body: %s", response.text)
submit_content(summary, summary_title, email, sub)
llm_observability.flush()
logger.debug("LLM observability flushed")

View File

@@ -1,5 +1,6 @@
"""File service to encapsulate files' manipulations."""
import logging
import os
import subprocess
import tempfile
@@ -15,6 +16,9 @@ from summary.core.config import get_settings
settings = get_settings()
logger = logging.getLogger(__name__)
class FileServiceException(Exception):
"""Base exception for file service operations."""
@@ -24,10 +28,8 @@ class FileServiceException(Exception):
class FileService:
"""Service for downloading and preparing files from MinIO storage."""
def __init__(self, logger):
def __init__(self):
"""Initialize FileService with MinIO client and configuration."""
self._logger = logger
endpoint = (
settings.aws_s3_endpoint_url.removeprefix("https://")
.removeprefix("http://")
@@ -53,16 +55,16 @@ class FileService:
The file is downloaded to a temporary location for local manipulation
such as validation, conversion, or processing before being used.
"""
self._logger.info("Download recording | object_key: %s", remote_object_key)
logger.info("Download recording | object_key: %s", remote_object_key)
if not remote_object_key:
self._logger.warning("Invalid object_key '%s'", remote_object_key)
logger.warning("Invalid object_key '%s'", remote_object_key)
raise ValueError("Invalid object_key")
extension = Path(remote_object_key).suffix.lower()
if extension not in self._allowed_extensions:
self._logger.warning("Invalid file extension '%s'", extension)
logger.warning("Invalid file extension '%s'", extension)
raise ValueError(f"Invalid file extension '{extension}'")
response = None
@@ -81,8 +83,8 @@ class FileService:
tmp.flush()
local_path = Path(tmp.name)
self._logger.info("Recording successfully downloaded")
self._logger.debug("Recording local file path: %s", local_path)
logger.info("Recording successfully downloaded")
logger.debug("Recording local file path: %s", local_path)
return local_path
@@ -100,7 +102,7 @@ class FileService:
file_metadata = mutagen.File(local_path).info
duration = file_metadata.length
self._logger.info(
logger.info(
"Recording file duration: %.2f seconds",
duration,
)
@@ -109,14 +111,14 @@ class FileService:
error_msg = "Recording too long. Limit is %.2fs seconds" % (
self._max_duration,
)
self._logger.error(error_msg)
logger.error(error_msg)
raise ValueError(error_msg)
return duration
def _extract_audio_from_video(self, video_path: Path) -> Path:
"""Extract audio from video file (e.g., MP4) and save as audio file."""
self._logger.info("Extracting audio from video file: %s", video_path)
logger.info("Extracting audio from video file: %s", video_path)
with tempfile.NamedTemporaryFile(
suffix=".m4a", delete=False, prefix="audio_extract_"
@@ -140,16 +142,16 @@ class FileService:
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
)
self._logger.info("Audio successfully extracted to: %s", output_path)
logger.info("Audio successfully extracted to: %s", output_path)
return output_path
except FileNotFoundError as e:
self._logger.error("ffmpeg not found. Please install ffmpeg.")
logger.error("ffmpeg not found. Please install ffmpeg.")
if output_path.exists():
os.remove(output_path)
raise RuntimeError("ffmpeg is not installed or not in PATH") from e
except subprocess.CalledProcessError as e:
self._logger.error("Audio extraction failed: %s", e.stderr.decode())
logger.error("Audio extraction failed: %s", e.stderr.decode())
if output_path.exists():
os.remove(output_path)
raise RuntimeError("Failed to extract audio.") from e
@@ -173,7 +175,7 @@ class FileService:
extension = downloaded_path.suffix.lower()
if extension in settings.recording_video_extensions:
self._logger.info("Video file detected, extracting audio...")
logger.info("Video file detected, extracting audio...")
extracted_audio_path = self._extract_audio_from_video(downloaded_path)
processed_path = extracted_audio_path
else:
@@ -194,8 +196,6 @@ class FileService:
try:
os.remove(path)
self._logger.debug("Temporary file removed: %s", path)
logger.debug("Temporary file removed: %s", path)
except OSError as e:
self._logger.warning(
"Failed to remove temporary file %s: %s", path, e
)
logger.warning("Failed to remove temporary file %s: %s", path, e)

View File

@@ -1,5 +1,6 @@
"""LLM service to encapsulate LLM's calls."""
import logging
from typing import Any, Mapping, Optional
import openai
@@ -10,6 +11,9 @@ from summary.core.config import get_settings
settings = get_settings()
logger = logging.getLogger(__name__)
class LLMObservability:
"""Manage observability and tracing for LLM calls using Langfuse.
@@ -21,13 +25,11 @@ class LLMObservability:
def __init__(
self,
logger,
session_id: str,
user_id: str,
user_has_tracing_consent: bool = False,
):
"""Initialize the LLMObservability client."""
self._logger = logger
self._observability_client: Optional[Langfuse] = None
self.session_id = session_id
self.user_id = user_id
@@ -75,7 +77,7 @@ class LLMObservability:
}
if not self.is_enabled:
self._logger.debug("Using regular OpenAI client (observability disabled)")
logger.debug("Using regular OpenAI client (observability disabled)")
return openai.OpenAI(**base_args)
# Langfuse's OpenAI wrapper is imported here to avoid triggering client
@@ -83,7 +85,7 @@ class LLMObservability:
# is missing. Conditional import ensures Langfuse only initializes when enabled.
from langfuse.openai import openai as langfuse_openai # noqa: PLC0415
self._logger.debug("Using LangfuseOpenAI client (observability enabled)")
logger.debug("Using LangfuseOpenAI client (observability enabled)")
return langfuse_openai.OpenAI(**base_args)
def flush(self):
@@ -99,11 +101,10 @@ class LLMException(Exception):
class LLMService:
"""Service for performing calls to the LLM configured in the settings."""
def __init__(self, llm_observability, logger):
def __init__(self, llm_observability):
"""Init the LLMService once."""
self._client = llm_observability.get_openai_client()
self._observability = llm_observability
self._logger = logger
def call(
self,
@@ -140,5 +141,5 @@ class LLMService:
return response.choices[0].message.content
except Exception as e:
self._logger.exception("LLM call failed: %s", e)
logger.exception("LLM call failed: %s", e)
raise LLMException(f"LLM call failed: {e}") from e

View File

@@ -0,0 +1,73 @@
"""Service for delivering content to external destinations."""
import json
import logging
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from summary.core.config import get_settings
settings = get_settings()
logger = logging.getLogger(__name__)
def _create_retry_session():
"""Create an HTTP session configured with retry logic."""
session = Session()
retries = Retry(
total=settings.webhook_max_retries,
backoff_factor=settings.webhook_backoff_factor,
status_forcelist=settings.webhook_status_forcelist,
allowed_methods={"POST"},
)
session.mount("https://", HTTPAdapter(max_retries=retries))
return session
def _post_with_retries(url, data):
"""Send POST request with automatic retries."""
session = _create_retry_session()
session.headers.update(
{"Authorization": f"Bearer {settings.webhook_api_token.get_secret_value()}"}
)
try:
response = session.post(url, json=data)
response.raise_for_status()
return response
finally:
session.close()
def submit_content(content, title, email, sub):
"""Submit content to the configured webhook destination.
Builds the payload, sends it with retries, and logs the outcome.
"""
data = {
"title": title,
"content": content,
"email": email,
"sub": sub,
}
logger.debug("Submitting to %s", settings.webhook_url)
logger.debug("Request payload: %s", json.dumps(data, indent=2))
response = _post_with_retries(settings.webhook_url, data)
try:
response_data = response.json()
document_id = response_data.get("id", "N/A")
except (json.JSONDecodeError, AttributeError):
document_id = "Unable to parse response"
response_data = response.text
logger.info(
"Delivery success | Document %s submitted (HTTP %s)",
document_id,
response.status_code,
)
logger.debug("Full response: %s", response_data)