(summary) add Langfuse observability for LLM API calls

Implement Langfuse tracing integration for LLM service calls to capture
prompts, responses, latency, token usage, and errors, enabling
comprehensive monitoring and debugging of AI model interactions
for performance analysis and cost optimization.
This commit is contained in:
lebaudantoine
2025-12-11 22:51:32 +01:00
committed by aleb_the_flash
parent c81ef38005
commit aff87d4953
4 changed files with 139 additions and 19 deletions

View File

@@ -11,6 +11,7 @@ and this project adheres to
### Added
- ✨(backend) enable user creation via email for external integrations
- ✨(summary) add Langfuse observability for LLM API calls
## [1.0.1] - 2025-12-17

View File

@@ -21,7 +21,7 @@ from urllib3.util import Retry
from summary.core.analytics import MetadataManager, get_analytics
from summary.core.config import get_settings
from summary.core.llm_service import LLMException, LLMService
from summary.core.llm_service import LLMException, LLMObservability, LLMService
from summary.core.prompt import (
FORMAT_NEXT_STEPS,
FORMAT_PLAN,
@@ -41,6 +41,7 @@ metadata_manager = MetadataManager()
logger = get_task_logger(__name__)
celery = Celery(
__name__,
broker=settings.celery_broker_url,
@@ -259,7 +260,7 @@ def process_audio_transcribe_summarize_v2(
):
logger.info("Queuing summary generation task.")
summarize_transcription.apply_async(
args=[content, email, sub, title],
args=[owner_id, content, email, sub, title],
queue=settings.summarize_queue,
)
else:
@@ -291,7 +292,9 @@ def task_failure_handler(task_id, exception=None, **kwargs):
max_retries=settings.celery_max_retries,
queue=settings.summarize_queue,
)
def summarize_transcription(self, transcript: str, email: str, sub: str, title: str):
def summarize_transcription(
self, owner_id: str, transcript: str, email: str, sub: str, title: str
):
"""Generate a summary from the provided transcription text.
This Celery task performs the following operations:
@@ -301,16 +304,34 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title:
4. Generates next steps.
5. Sends the final summary via webhook.
"""
logger.info("Starting summarization task")
logger.info(
"Starting summarization task | Owner: %s",
owner_id,
)
llm_service = LLMService()
user_has_tracing_consent = analytics.is_feature_enabled(
"summary-tracing-consent", distinct_id=owner_id
)
tldr = llm_service.call(PROMPT_SYSTEM_TLDR, transcript)
# NOTE: We must instantiate a new LLMObservability client for each task invocation
# because the masking function needs to be user-specific. The masking function is
# baked into the Langfuse client at initialization time, so we can't reuse
# a singleton client. This is a performance trade-off we accept to ensure per-user
# privacy controls in observability traces.
llm_observability = LLMObservability(
logger=logger,
user_has_tracing_consent=user_has_tracing_consent,
session_id=self.request.id,
user_id=owner_id,
)
llm_service = LLMService(llm_observability=llm_observability, logger=logger)
tldr = llm_service.call(PROMPT_SYSTEM_TLDR, transcript, name="tldr")
logger.info("TLDR generated")
parts = llm_service.call(
PROMPT_SYSTEM_PLAN, transcript, response_format=FORMAT_PLAN
PROMPT_SYSTEM_PLAN, transcript, name="parts", response_format=FORMAT_PLAN
)
logger.info("Plan generated")
@@ -321,21 +342,28 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title:
for part in parts:
prompt_user_part = PROMPT_USER_PART.format(part=part, transcript=transcript)
logger.info("Summarizing part: %s", part)
parts_summarized.append(llm_service.call(PROMPT_SYSTEM_PART, prompt_user_part))
parts_summarized.append(
llm_service.call(PROMPT_SYSTEM_PART, prompt_user_part, name="part")
)
logger.info("Parts summarized")
raw_summary = "\n\n".join(parts_summarized)
next_steps = llm_service.call(
PROMPT_SYSTEM_NEXT_STEP, transcript, response_format=FORMAT_NEXT_STEPS
PROMPT_SYSTEM_NEXT_STEP,
transcript,
name="next-steps",
response_format=FORMAT_NEXT_STEPS,
)
next_steps = format_actions(json.loads(next_steps))
logger.info("Next steps generated")
cleaned_summary = llm_service.call(PROMPT_SYSTEM_CLEANING, raw_summary)
cleaned_summary = llm_service.call(
PROMPT_SYSTEM_CLEANING, raw_summary, name="cleaning"
)
logger.info("Summary cleaned")
summary = tldr + "\n\n" + cleaned_summary + "\n\n" + next_steps
@@ -355,3 +383,6 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title:
logger.info("Webhook submitted successfully. Status: %s", response.status_code)
logger.debug("Response body: %s", response.text)
llm_observability.flush()
logger.debug("LLM observability flushed")

View File

@@ -83,6 +83,7 @@ class Settings(BaseSettings):
langfuse_host: Optional[str] = None
langfuse_public_key: Optional[str] = None
langfuse_secret_key: Optional[SecretStr] = None
langfuse_environment: Optional[str] = "development"
# TaskTracker
task_tracker_redis_url: str = "redis://redis/0"

View File

@@ -1,15 +1,95 @@
"""LLM service to encapsulate LLM's calls."""
import logging
from typing import Any, Mapping, Optional
import openai
from langfuse import Langfuse
from summary.core.config import get_settings
settings = get_settings()
logger = logging.getLogger(__name__)
class LLMObservability:
"""Manage observability and tracing for LLM calls using Langfuse.
Handles the initialization and configuration of the Langfuse client with
per-user masking rules to enforce privacy controls based on tracing consent.
Also provides an OpenAI client wrapper that integrates with Langfuse tracing
when observability is enabled.
"""
def __init__(
self,
logger,
session_id: str,
user_id: str,
user_has_tracing_consent: bool = False,
):
"""Initialize the LLMObservability client."""
self._logger = logger
self._observability_client: Optional[Langfuse] = None
self.session_id = session_id
self.user_id = user_id
if settings.langfuse_enabled:
def masking_function(data, **kwargs):
if (
user_has_tracing_consent
or settings.langfuse_environment != "production"
):
return data
return "[REDACTED]"
if not settings.langfuse_secret_key:
raise ValueError(
"langfuse_secret_key is not configured. "
"Please set the secret key or disable Langfuse."
)
self._observability_client = Langfuse(
secret_key=settings.langfuse_secret_key.get_secret_value(),
public_key=settings.langfuse_public_key,
host=settings.langfuse_host,
environment=settings.langfuse_environment,
mask=masking_function,
)
@property
def is_enabled(self):
"""Check if observability is enabled."""
return self._observability_client is not None
def get_openai_client(self):
"""Get an OpenAI client configured for observability.
Returns a regular OpenAI client if observability is disabled, or a
Langfuse-wrapped OpenAI client that automatically traces all API calls
to Langfuse for observability when enabled.
"""
base_args = {
"base_url": settings.llm_base_url,
"api_key": settings.llm_api_key.get_secret_value(),
}
if not self.is_enabled:
self._logger.debug("Using regular OpenAI client (observability disabled)")
return openai.OpenAI(**base_args)
# Langfuse's OpenAI wrapper is imported here to avoid triggering client
# init at module load, which would log a warning if LANGFUSE_PUBLIC_KEY
# is missing. Conditional import ensures Langfuse only initializes when enabled.
from langfuse.openai import openai as langfuse_openai # noqa: PLC0415
self._logger.debug("Using LangfuseOpenAI client (observability enabled)")
return langfuse_openai.OpenAI(**base_args)
def flush(self):
"""Flush pending observability traces to Langfuse."""
if self.is_enabled:
self._observability_client.flush()
class LLMException(Exception):
@@ -19,17 +99,17 @@ class LLMException(Exception):
class LLMService:
"""Service for performing calls to the LLM configured in the settings."""
def __init__(self):
def __init__(self, llm_observability, logger):
"""Init the LLMService once."""
self._client = openai.OpenAI(
base_url=settings.llm_base_url,
api_key=settings.llm_api_key.get_secret_value(),
)
self._client = llm_observability.get_openai_client()
self._observability = llm_observability
self._logger = logger
def call(
self,
system_prompt: str,
user_prompt: str,
name: str,
response_format: Optional[Mapping[str, Any]] = None,
):
"""Call the LLM service.
@@ -48,10 +128,17 @@ class LLMService:
if response_format is not None:
params["response_format"] = response_format
response = self._client.chat.completions.create(**params)
if self._observability.is_enabled:
params["name"] = name
params["metadata"] = {
"user_id": self._observability.user_id,
"langfuse_tags": ["summary"],
"langfuse_session_id": self._observability.session_id,
}
response = self._client.chat.completions.create(**params)
return response.choices[0].message.content
except Exception as e:
logger.exception("LLM call failed: %s", e)
self._logger.exception("LLM call failed: %s", e)
raise LLMException("LLM call failed: {e}") from e