From aff87d49532108e04a8d20d08691cb36f6d6e3c3 Mon Sep 17 00:00:00 2001 From: lebaudantoine Date: Thu, 11 Dec 2025 22:51:32 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8(summary)=20add=20Langfuse=20observabi?= =?UTF-8?q?lity=20for=20LLM=20API=20calls?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement Langfuse tracing integration for LLM service calls to capture prompts, responses, latency, token usage, and errors, enabling comprehensive monitoring and debugging of AI model interactions for performance analysis and cost optimization. --- CHANGELOG.md | 1 + src/summary/summary/core/celery_worker.py | 51 ++++++++--- src/summary/summary/core/config.py | 1 + src/summary/summary/core/llm_service.py | 105 ++++++++++++++++++++-- 4 files changed, 139 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b49c15d5..309d56a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to ### Added - ✨(backend) enable user creation via email for external integrations +- ✨(summary) add Langfuse observability for LLM API calls ## [1.0.1] - 2025-12-17 diff --git a/src/summary/summary/core/celery_worker.py b/src/summary/summary/core/celery_worker.py index cd7a0619..e12110fa 100644 --- a/src/summary/summary/core/celery_worker.py +++ b/src/summary/summary/core/celery_worker.py @@ -21,7 +21,7 @@ from urllib3.util import Retry from summary.core.analytics import MetadataManager, get_analytics from summary.core.config import get_settings -from summary.core.llm_service import LLMException, LLMService +from summary.core.llm_service import LLMException, LLMObservability, LLMService from summary.core.prompt import ( FORMAT_NEXT_STEPS, FORMAT_PLAN, @@ -41,6 +41,7 @@ metadata_manager = MetadataManager() logger = get_task_logger(__name__) + celery = Celery( __name__, broker=settings.celery_broker_url, @@ -259,7 +260,7 @@ def process_audio_transcribe_summarize_v2( ): logger.info("Queuing summary generation task.") summarize_transcription.apply_async( - args=[content, email, sub, title], + args=[owner_id, content, email, sub, title], queue=settings.summarize_queue, ) else: @@ -291,7 +292,9 @@ def task_failure_handler(task_id, exception=None, **kwargs): max_retries=settings.celery_max_retries, queue=settings.summarize_queue, ) -def summarize_transcription(self, transcript: str, email: str, sub: str, title: str): +def summarize_transcription( + self, owner_id: str, transcript: str, email: str, sub: str, title: str +): """Generate a summary from the provided transcription text. This Celery task performs the following operations: @@ -301,16 +304,34 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title: 4. Generates next steps. 5. Sends the final summary via webhook. """ - logger.info("Starting summarization task") + logger.info( + "Starting summarization task | Owner: %s", + owner_id, + ) - llm_service = LLMService() + user_has_tracing_consent = analytics.is_feature_enabled( + "summary-tracing-consent", distinct_id=owner_id + ) - tldr = llm_service.call(PROMPT_SYSTEM_TLDR, transcript) + # NOTE: We must instantiate a new LLMObservability client for each task invocation + # because the masking function needs to be user-specific. The masking function is + # baked into the Langfuse client at initialization time, so we can't reuse + # a singleton client. This is a performance trade-off we accept to ensure per-user + # privacy controls in observability traces. + llm_observability = LLMObservability( + logger=logger, + user_has_tracing_consent=user_has_tracing_consent, + session_id=self.request.id, + user_id=owner_id, + ) + llm_service = LLMService(llm_observability=llm_observability, logger=logger) + + tldr = llm_service.call(PROMPT_SYSTEM_TLDR, transcript, name="tldr") logger.info("TLDR generated") parts = llm_service.call( - PROMPT_SYSTEM_PLAN, transcript, response_format=FORMAT_PLAN + PROMPT_SYSTEM_PLAN, transcript, name="parts", response_format=FORMAT_PLAN ) logger.info("Plan generated") @@ -321,21 +342,28 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title: for part in parts: prompt_user_part = PROMPT_USER_PART.format(part=part, transcript=transcript) logger.info("Summarizing part: %s", part) - parts_summarized.append(llm_service.call(PROMPT_SYSTEM_PART, prompt_user_part)) + parts_summarized.append( + llm_service.call(PROMPT_SYSTEM_PART, prompt_user_part, name="part") + ) logger.info("Parts summarized") raw_summary = "\n\n".join(parts_summarized) next_steps = llm_service.call( - PROMPT_SYSTEM_NEXT_STEP, transcript, response_format=FORMAT_NEXT_STEPS + PROMPT_SYSTEM_NEXT_STEP, + transcript, + name="next-steps", + response_format=FORMAT_NEXT_STEPS, ) next_steps = format_actions(json.loads(next_steps)) logger.info("Next steps generated") - cleaned_summary = llm_service.call(PROMPT_SYSTEM_CLEANING, raw_summary) + cleaned_summary = llm_service.call( + PROMPT_SYSTEM_CLEANING, raw_summary, name="cleaning" + ) logger.info("Summary cleaned") summary = tldr + "\n\n" + cleaned_summary + "\n\n" + next_steps @@ -355,3 +383,6 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title: logger.info("Webhook submitted successfully. Status: %s", response.status_code) logger.debug("Response body: %s", response.text) + + llm_observability.flush() + logger.debug("LLM observability flushed") diff --git a/src/summary/summary/core/config.py b/src/summary/summary/core/config.py index f7879606..b5b057d6 100644 --- a/src/summary/summary/core/config.py +++ b/src/summary/summary/core/config.py @@ -83,6 +83,7 @@ class Settings(BaseSettings): langfuse_host: Optional[str] = None langfuse_public_key: Optional[str] = None langfuse_secret_key: Optional[SecretStr] = None + langfuse_environment: Optional[str] = "development" # TaskTracker task_tracker_redis_url: str = "redis://redis/0" diff --git a/src/summary/summary/core/llm_service.py b/src/summary/summary/core/llm_service.py index 872ecf2e..a3b04f54 100644 --- a/src/summary/summary/core/llm_service.py +++ b/src/summary/summary/core/llm_service.py @@ -1,15 +1,95 @@ """LLM service to encapsulate LLM's calls.""" -import logging from typing import Any, Mapping, Optional import openai +from langfuse import Langfuse from summary.core.config import get_settings settings = get_settings() -logger = logging.getLogger(__name__) + +class LLMObservability: + """Manage observability and tracing for LLM calls using Langfuse. + + Handles the initialization and configuration of the Langfuse client with + per-user masking rules to enforce privacy controls based on tracing consent. + Also provides an OpenAI client wrapper that integrates with Langfuse tracing + when observability is enabled. + """ + + def __init__( + self, + logger, + session_id: str, + user_id: str, + user_has_tracing_consent: bool = False, + ): + """Initialize the LLMObservability client.""" + self._logger = logger + self._observability_client: Optional[Langfuse] = None + self.session_id = session_id + self.user_id = user_id + + if settings.langfuse_enabled: + + def masking_function(data, **kwargs): + if ( + user_has_tracing_consent + or settings.langfuse_environment != "production" + ): + return data + + return "[REDACTED]" + + if not settings.langfuse_secret_key: + raise ValueError( + "langfuse_secret_key is not configured. " + "Please set the secret key or disable Langfuse." + ) + + self._observability_client = Langfuse( + secret_key=settings.langfuse_secret_key.get_secret_value(), + public_key=settings.langfuse_public_key, + host=settings.langfuse_host, + environment=settings.langfuse_environment, + mask=masking_function, + ) + + @property + def is_enabled(self): + """Check if observability is enabled.""" + return self._observability_client is not None + + def get_openai_client(self): + """Get an OpenAI client configured for observability. + + Returns a regular OpenAI client if observability is disabled, or a + Langfuse-wrapped OpenAI client that automatically traces all API calls + to Langfuse for observability when enabled. + """ + base_args = { + "base_url": settings.llm_base_url, + "api_key": settings.llm_api_key.get_secret_value(), + } + + if not self.is_enabled: + self._logger.debug("Using regular OpenAI client (observability disabled)") + return openai.OpenAI(**base_args) + + # Langfuse's OpenAI wrapper is imported here to avoid triggering client + # init at module load, which would log a warning if LANGFUSE_PUBLIC_KEY + # is missing. Conditional import ensures Langfuse only initializes when enabled. + from langfuse.openai import openai as langfuse_openai # noqa: PLC0415 + + self._logger.debug("Using LangfuseOpenAI client (observability enabled)") + return langfuse_openai.OpenAI(**base_args) + + def flush(self): + """Flush pending observability traces to Langfuse.""" + if self.is_enabled: + self._observability_client.flush() class LLMException(Exception): @@ -19,17 +99,17 @@ class LLMException(Exception): class LLMService: """Service for performing calls to the LLM configured in the settings.""" - def __init__(self): + def __init__(self, llm_observability, logger): """Init the LLMService once.""" - self._client = openai.OpenAI( - base_url=settings.llm_base_url, - api_key=settings.llm_api_key.get_secret_value(), - ) + self._client = llm_observability.get_openai_client() + self._observability = llm_observability + self._logger = logger def call( self, system_prompt: str, user_prompt: str, + name: str, response_format: Optional[Mapping[str, Any]] = None, ): """Call the LLM service. @@ -48,10 +128,17 @@ class LLMService: if response_format is not None: params["response_format"] = response_format - response = self._client.chat.completions.create(**params) + if self._observability.is_enabled: + params["name"] = name + params["metadata"] = { + "user_id": self._observability.user_id, + "langfuse_tags": ["summary"], + "langfuse_session_id": self._observability.session_id, + } + response = self._client.chat.completions.create(**params) return response.choices[0].message.content except Exception as e: - logger.exception("LLM call failed: %s", e) + self._logger.exception("LLM call failed: %s", e) raise LLMException("LLM call failed: {e}") from e