diff --git a/CHANGELOG.md b/CHANGELOG.md index b49c15d5..309d56a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to ### Added - ✨(backend) enable user creation via email for external integrations +- ✨(summary) add Langfuse observability for LLM API calls ## [1.0.1] - 2025-12-17 diff --git a/src/summary/summary/core/celery_worker.py b/src/summary/summary/core/celery_worker.py index cd7a0619..e12110fa 100644 --- a/src/summary/summary/core/celery_worker.py +++ b/src/summary/summary/core/celery_worker.py @@ -21,7 +21,7 @@ from urllib3.util import Retry from summary.core.analytics import MetadataManager, get_analytics from summary.core.config import get_settings -from summary.core.llm_service import LLMException, LLMService +from summary.core.llm_service import LLMException, LLMObservability, LLMService from summary.core.prompt import ( FORMAT_NEXT_STEPS, FORMAT_PLAN, @@ -41,6 +41,7 @@ metadata_manager = MetadataManager() logger = get_task_logger(__name__) + celery = Celery( __name__, broker=settings.celery_broker_url, @@ -259,7 +260,7 @@ def process_audio_transcribe_summarize_v2( ): logger.info("Queuing summary generation task.") summarize_transcription.apply_async( - args=[content, email, sub, title], + args=[owner_id, content, email, sub, title], queue=settings.summarize_queue, ) else: @@ -291,7 +292,9 @@ def task_failure_handler(task_id, exception=None, **kwargs): max_retries=settings.celery_max_retries, queue=settings.summarize_queue, ) -def summarize_transcription(self, transcript: str, email: str, sub: str, title: str): +def summarize_transcription( + self, owner_id: str, transcript: str, email: str, sub: str, title: str +): """Generate a summary from the provided transcription text. This Celery task performs the following operations: @@ -301,16 +304,34 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title: 4. Generates next steps. 5. Sends the final summary via webhook. """ - logger.info("Starting summarization task") + logger.info( + "Starting summarization task | Owner: %s", + owner_id, + ) - llm_service = LLMService() + user_has_tracing_consent = analytics.is_feature_enabled( + "summary-tracing-consent", distinct_id=owner_id + ) - tldr = llm_service.call(PROMPT_SYSTEM_TLDR, transcript) + # NOTE: We must instantiate a new LLMObservability client for each task invocation + # because the masking function needs to be user-specific. The masking function is + # baked into the Langfuse client at initialization time, so we can't reuse + # a singleton client. This is a performance trade-off we accept to ensure per-user + # privacy controls in observability traces. + llm_observability = LLMObservability( + logger=logger, + user_has_tracing_consent=user_has_tracing_consent, + session_id=self.request.id, + user_id=owner_id, + ) + llm_service = LLMService(llm_observability=llm_observability, logger=logger) + + tldr = llm_service.call(PROMPT_SYSTEM_TLDR, transcript, name="tldr") logger.info("TLDR generated") parts = llm_service.call( - PROMPT_SYSTEM_PLAN, transcript, response_format=FORMAT_PLAN + PROMPT_SYSTEM_PLAN, transcript, name="parts", response_format=FORMAT_PLAN ) logger.info("Plan generated") @@ -321,21 +342,28 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title: for part in parts: prompt_user_part = PROMPT_USER_PART.format(part=part, transcript=transcript) logger.info("Summarizing part: %s", part) - parts_summarized.append(llm_service.call(PROMPT_SYSTEM_PART, prompt_user_part)) + parts_summarized.append( + llm_service.call(PROMPT_SYSTEM_PART, prompt_user_part, name="part") + ) logger.info("Parts summarized") raw_summary = "\n\n".join(parts_summarized) next_steps = llm_service.call( - PROMPT_SYSTEM_NEXT_STEP, transcript, response_format=FORMAT_NEXT_STEPS + PROMPT_SYSTEM_NEXT_STEP, + transcript, + name="next-steps", + response_format=FORMAT_NEXT_STEPS, ) next_steps = format_actions(json.loads(next_steps)) logger.info("Next steps generated") - cleaned_summary = llm_service.call(PROMPT_SYSTEM_CLEANING, raw_summary) + cleaned_summary = llm_service.call( + PROMPT_SYSTEM_CLEANING, raw_summary, name="cleaning" + ) logger.info("Summary cleaned") summary = tldr + "\n\n" + cleaned_summary + "\n\n" + next_steps @@ -355,3 +383,6 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title: logger.info("Webhook submitted successfully. Status: %s", response.status_code) logger.debug("Response body: %s", response.text) + + llm_observability.flush() + logger.debug("LLM observability flushed") diff --git a/src/summary/summary/core/config.py b/src/summary/summary/core/config.py index f7879606..b5b057d6 100644 --- a/src/summary/summary/core/config.py +++ b/src/summary/summary/core/config.py @@ -83,6 +83,7 @@ class Settings(BaseSettings): langfuse_host: Optional[str] = None langfuse_public_key: Optional[str] = None langfuse_secret_key: Optional[SecretStr] = None + langfuse_environment: Optional[str] = "development" # TaskTracker task_tracker_redis_url: str = "redis://redis/0" diff --git a/src/summary/summary/core/llm_service.py b/src/summary/summary/core/llm_service.py index 872ecf2e..a3b04f54 100644 --- a/src/summary/summary/core/llm_service.py +++ b/src/summary/summary/core/llm_service.py @@ -1,15 +1,95 @@ """LLM service to encapsulate LLM's calls.""" -import logging from typing import Any, Mapping, Optional import openai +from langfuse import Langfuse from summary.core.config import get_settings settings = get_settings() -logger = logging.getLogger(__name__) + +class LLMObservability: + """Manage observability and tracing for LLM calls using Langfuse. + + Handles the initialization and configuration of the Langfuse client with + per-user masking rules to enforce privacy controls based on tracing consent. + Also provides an OpenAI client wrapper that integrates with Langfuse tracing + when observability is enabled. + """ + + def __init__( + self, + logger, + session_id: str, + user_id: str, + user_has_tracing_consent: bool = False, + ): + """Initialize the LLMObservability client.""" + self._logger = logger + self._observability_client: Optional[Langfuse] = None + self.session_id = session_id + self.user_id = user_id + + if settings.langfuse_enabled: + + def masking_function(data, **kwargs): + if ( + user_has_tracing_consent + or settings.langfuse_environment != "production" + ): + return data + + return "[REDACTED]" + + if not settings.langfuse_secret_key: + raise ValueError( + "langfuse_secret_key is not configured. " + "Please set the secret key or disable Langfuse." + ) + + self._observability_client = Langfuse( + secret_key=settings.langfuse_secret_key.get_secret_value(), + public_key=settings.langfuse_public_key, + host=settings.langfuse_host, + environment=settings.langfuse_environment, + mask=masking_function, + ) + + @property + def is_enabled(self): + """Check if observability is enabled.""" + return self._observability_client is not None + + def get_openai_client(self): + """Get an OpenAI client configured for observability. + + Returns a regular OpenAI client if observability is disabled, or a + Langfuse-wrapped OpenAI client that automatically traces all API calls + to Langfuse for observability when enabled. + """ + base_args = { + "base_url": settings.llm_base_url, + "api_key": settings.llm_api_key.get_secret_value(), + } + + if not self.is_enabled: + self._logger.debug("Using regular OpenAI client (observability disabled)") + return openai.OpenAI(**base_args) + + # Langfuse's OpenAI wrapper is imported here to avoid triggering client + # init at module load, which would log a warning if LANGFUSE_PUBLIC_KEY + # is missing. Conditional import ensures Langfuse only initializes when enabled. + from langfuse.openai import openai as langfuse_openai # noqa: PLC0415 + + self._logger.debug("Using LangfuseOpenAI client (observability enabled)") + return langfuse_openai.OpenAI(**base_args) + + def flush(self): + """Flush pending observability traces to Langfuse.""" + if self.is_enabled: + self._observability_client.flush() class LLMException(Exception): @@ -19,17 +99,17 @@ class LLMException(Exception): class LLMService: """Service for performing calls to the LLM configured in the settings.""" - def __init__(self): + def __init__(self, llm_observability, logger): """Init the LLMService once.""" - self._client = openai.OpenAI( - base_url=settings.llm_base_url, - api_key=settings.llm_api_key.get_secret_value(), - ) + self._client = llm_observability.get_openai_client() + self._observability = llm_observability + self._logger = logger def call( self, system_prompt: str, user_prompt: str, + name: str, response_format: Optional[Mapping[str, Any]] = None, ): """Call the LLM service. @@ -48,10 +128,17 @@ class LLMService: if response_format is not None: params["response_format"] = response_format - response = self._client.chat.completions.create(**params) + if self._observability.is_enabled: + params["name"] = name + params["metadata"] = { + "user_id": self._observability.user_id, + "langfuse_tags": ["summary"], + "langfuse_session_id": self._observability.session_id, + } + response = self._client.chat.completions.create(**params) return response.choices[0].message.content except Exception as e: - logger.exception("LLM call failed: %s", e) + self._logger.exception("LLM call failed: %s", e) raise LLMException("LLM call failed: {e}") from e