From 6e55013b15c454b374f88fd0195d330b3fb710e5 Mon Sep 17 00:00:00 2001 From: lebaudantoine Date: Tue, 8 Jul 2025 23:13:04 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=88(summary)=20kickstart=20analytics?= =?UTF-8?q?=20tracking=20in=20summary=20microservice?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add product analytics to understand summary feature usage and behavior. Track transcript and task metadata for insights without exposing sensitive content or speaker data. Hacky but functional PostHog usage - fully optional for self-hosting. Extensive tracking approach works for current needs despite not being PostHog's typical use case. --- .../dev-keycloak/values.meet.yaml.gotmpl | 2 + src/summary/summary/api/route/tasks.py | 3 +- src/summary/summary/core/analytics.py | 197 ++++++++++++++++++ src/summary/summary/core/celery_worker.py | 50 ++++- src/summary/summary/core/config.py | 9 + 5 files changed, 257 insertions(+), 4 deletions(-) create mode 100644 src/summary/summary/core/analytics.py diff --git a/src/helm/env.d/dev-keycloak/values.meet.yaml.gotmpl b/src/helm/env.d/dev-keycloak/values.meet.yaml.gotmpl index 42dc0ff0..2b0ec523 100644 --- a/src/helm/env.d/dev-keycloak/values.meet.yaml.gotmpl +++ b/src/helm/env.d/dev-keycloak/values.meet.yaml.gotmpl @@ -173,6 +173,7 @@ summary: WEBHOOK_URL: https://www.mock-impress.com/webhook/ CELERY_BROKER_URL: redis://default:pass@redis-master:6379/1 CELERY_RESULT_BACKEND: redis://default:pass@redis-master:6379/1 + TASK_TRACKER_REDIS_URL: redis://default:pass@redis-master:6379/1 image: repository: localhost:5001/meet-summary @@ -206,6 +207,7 @@ celery: WEBHOOK_URL: https://www.mock-impress.com/webhook/ CELERY_BROKER_URL: redis://default:pass@redis-master:6379/1 CELERY_RESULT_BACKEND: redis://default:pass@redis-master:6379/1 + TASK_TRACKER_REDIS_URL: redis://default:pass@redis-master:6379/1 image: repository: localhost:5001/meet-summary diff --git a/src/summary/summary/api/route/tasks.py b/src/summary/summary/api/route/tasks.py index 0c5bc3b7..56ba8608 100644 --- a/src/summary/summary/api/route/tasks.py +++ b/src/summary/summary/api/route/tasks.py @@ -1,5 +1,6 @@ """API routes related to application tasks.""" +import time from typing import Optional from celery.result import AsyncResult @@ -33,7 +34,7 @@ async def create_task(request: TaskCreation): ) else: task = process_audio_transcribe_summarize_v2.delay( - request.filename, request.email, request.sub + request.filename, request.email, request.sub, time.time() ) return {"id": task.id, "message": "Task created"} diff --git a/src/summary/summary/core/analytics.py b/src/summary/summary/core/analytics.py new file mode 100644 index 00000000..8c904fee --- /dev/null +++ b/src/summary/summary/core/analytics.py @@ -0,0 +1,197 @@ +"""Analytics classes.""" + +import json +import time +from collections import Counter +from functools import lru_cache + +import redis +from celery.utils.log import get_task_logger +from posthog import Posthog + +from summary.core.config import get_settings + +logger = get_task_logger(__name__) +settings = get_settings() + + +class AnalyticsException(Exception): + """Exception raised when analytics operations fail.""" + + pass + + +class Analytics: + """Analytics client wrapper for PostHog integration.""" + + def __init__(self): + """Initialize a client if settings are configure.""" + self._client = None + if settings.posthog_api_key and settings.posthog_enabled: + logger.info("Initialize analytics client") + self._client = Posthog(settings.posthog_api_key, settings.posthog_api_host) + + @property + def is_disabled(self): + """Check if analytics client is disabled or not configured.""" + return not self._client + + def capture(self, event_name, distinct_id, properties=None): + """Track an event if analytics is enabled.""" + if self.is_disabled: + return + + try: + self._client.capture( + event_name, distinct_id=distinct_id, properties=properties + ) + except Exception as e: + raise AnalyticsException("Failed to capture analytics event") from e + + +@lru_cache +def get_analytics(): + """Init Analytics client once.""" + return Analytics() + + +class TasksTracker: + """Tracks task execution metadata and analytics for background tasks.""" + + def __init__(self): + """Initialize the task tracker with analytics client.""" + self._redis = redis.from_url(settings.task_tracker_redis_url) + self._key_prefix = settings.task_tracker_prefix + self._analytics = get_analytics() + self._is_disabled = self._analytics.is_disabled + + def _get_redis_key(self, task_id): + """Generate Redis key for task metadata.""" + return f"{self._key_prefix}{task_id}" + + def _save_metadata(self, task_id, metadata): + """Wip.""" + self._redis.hset(self._get_redis_key(task_id), mapping=metadata) + + def _get_metadata(self, task_id): + """Wip.""" + raw_metadata = self._redis.hgetall(self._get_redis_key(task_id)) + return {k.decode("utf-8"): v.decode("utf-8") for k, v in raw_metadata.items()} + + def has_task_id(self, task_id): + """Check if task_id exists in tasks metadata cache.""" + return self._redis.exists(self._get_redis_key(task_id)) + + def create(self, task_id, task_args): + """Create initial metadata entry for a new task.""" + if self._is_disabled or self.has_task_id(task_id): + return + + initial_metadata = { + "start_time": time.time(), + "asr_model": settings.openai_asr_model, + "retries": 0, + } + + _required_args_count = 4 + if len(task_args) != _required_args_count: + logger.error("Invalid number of arguments.") + return + + filename, email, _, received_at = task_args + initial_metadata = { + **initial_metadata, + "filename": filename, + "email": email, + "queuing_time": round(initial_metadata["start_time"] - received_at, 2), + } + + self._save_metadata(task_id, initial_metadata) + + def retry(self, task_id): + """Increment retry counter for a task.""" + if self._is_disabled or not self.has_task_id(task_id): + return + + metadata = self._get_metadata(task_id) + + if "retries" in metadata: + metadata["retries"] = int(metadata["retries"]) + 1 + else: + metadata["retries"] = 1 + + self._save_metadata(task_id, metadata) + + def clear(self, task_id): + """Remove task metadata from cache.""" + if self._is_disabled or not self.has_task_id(task_id): + return + self._redis.delete(self._get_redis_key(task_id)) + + def track(self, task_id, data): + """Update task metadata with additional data.""" + if self._is_disabled or not self.has_task_id(task_id): + return + + metadata = self._get_metadata(task_id) + self._save_metadata(task_id, {**metadata, **data}) + + def track_transcription_metadata(self, task_id, transcription): + """Extract and track metadata from transcription results.""" + if self._is_disabled or not self.has_task_id(task_id): + return + + if not transcription or not transcription.segments: + self.track(task_id, {"transcription_empty": "true", "number_speakers": 0}) + return + + speakers = [ + segment.get("speaker", "UNKNOWN_SPEAKER") + for segment in transcription.segments + ] + + speaker_counts = Counter(speakers) + segments_count = len(transcription.segments) + + speaker_percentages = { + speaker: round((count / segments_count) * 100, 2) + for speaker, count in speaker_counts.items() + } + + text_length = 0 + + for segment in transcription.segments: + text_length += len(segment.get("text", "")) + + self.track( + task_id, + { + "transcription_empty": "false", + "speakers_count": len(set(speakers)), + "segments_count": segments_count, + "speakers_distribution": json.dumps(speaker_percentages), + "text_length": text_length, + }, + ) + + def capture(self, task_id, event_name): + """Capture analytics event with task metadata and clean up.""" + if self._is_disabled or not self.has_task_id(task_id): + return + + metadata = self._get_metadata(task_id) + + if "start_time" in metadata: + metadata["execution_time"] = str(round( + time.time() - float(metadata["start_time"]), 2 + )) + del metadata["start_time"] + + metadata["task_id"] = task_id + + self.clear(task_id) + + try: + self._analytics.capture(event_name, metadata.get("email"), metadata) + except AnalyticsException: + logger.exception("Failed to capture analytics event") diff --git a/src/summary/summary/core/celery_worker.py b/src/summary/summary/core/celery_worker.py index e0b08a3e..b77931d2 100644 --- a/src/summary/summary/core/celery_worker.py +++ b/src/summary/summary/core/celery_worker.py @@ -3,6 +3,7 @@ import json import os import tempfile +import time from pathlib import Path import openai @@ -10,14 +11,19 @@ import sentry_sdk from celery import Celery, signals from celery.utils.log import get_task_logger from minio import Minio +from mutagen import File from requests import Session from requests.adapters import HTTPAdapter from urllib3.util import Retry +from summary.core.analytics import TasksTracker, get_analytics from summary.core.config import get_settings from summary.core.prompt import get_instructions settings = get_settings() +analytics = get_analytics() + +tasks_tracker = TasksTracker() logger = get_task_logger(__name__) @@ -120,6 +126,25 @@ def post_with_retries(url, data): session.close() +@signals.task_prerun.connect +def task_started(task_id=None, task=None, args=None, **kwargs): + """Signal handler called before task execution begins.""" + task_args = args or [] + tasks_tracker.create(task_id, task_args) + + +@signals.task_retry.connect +def task_retry_handler(request=None, reason=None, einfo=None, **kwargs): + """Signal handler called when task execution retries.""" + tasks_tracker.retry(request.id) + + +@signals.task_failure.connect +def task_failure_handler(task_id, exception=None, **kwargs): + """Signal handler called when task execution fails permanently.""" + tasks_tracker.capture(task_id, "task_failed") + + @celery.task(max_retries=settings.celery_max_retries) def process_audio_transcribe_summarize(filename: str, email: str, sub: str): """Process an audio file by transcribing it and generating a summary. @@ -194,8 +219,10 @@ def process_audio_transcribe_summarize(filename: str, email: str, sub: str): logger.debug("Response body: %s", response.text) -@celery.task(max_retries=settings.celery_max_retries) -def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str): +@celery.task(bind=True, max_retries=settings.celery_max_retries) +def process_audio_transcribe_summarize_v2( + self, filename: str, email: str, sub: str, received_at: float +): """Process an audio file by transcribing it and generating a summary. This Celery task performs the following operations: @@ -207,6 +234,8 @@ def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str): logger.info("Notification received") logger.debug("filename: %s", filename) + task_id = self.request.id + minio_client = Minio( settings.aws_s3_endpoint_url, access_key=settings.aws_s3_access_key_id, @@ -223,6 +252,9 @@ def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str): temp_file_path = save_audio_stream(audio_file_stream) logger.debug("Recording successfully downloaded, filepath: %s", temp_file_path) + audio_file = File(temp_file_path) + tasks_tracker.track(task_id, {"audio_length": audio_file.info.length}) + logger.debug("Initiating OpenAI client") openai_client = openai.OpenAI( api_key=settings.openai_api_key, base_url=settings.openai_base_url @@ -230,11 +262,19 @@ def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str): try: logger.debug("Querying transcription …") + transcription_start_time = time.time() with open(temp_file_path, "rb") as audio_file: transcription = openai_client.audio.transcriptions.create( model=settings.openai_asr_model, file=audio_file ) - + tasks_tracker.track( + task_id, + { + "transcription_time": round( + time.time() - transcription_start_time, 2 + ) + }, + ) logger.debug("Transcription: \n %s", transcription) finally: if os.path.exists(temp_file_path): @@ -247,6 +287,8 @@ def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str): else format_segments(transcription) ) + tasks_tracker.track_transcription_metadata(task_id, transcription) + data = { "title": "Transcription", "content": formatted_transcription, @@ -262,4 +304,6 @@ def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str): logger.info("Webhook submitted successfully. Status: %s", response.status_code) logger.debug("Response body: %s", response.text) + tasks_tracker.capture(task_id, "task_succeeded") + # TODO - integrate summarize the transcript and create a new document. diff --git a/src/summary/summary/core/config.py b/src/summary/summary/core/config.py index 7ffacd70..ddcdc032 100644 --- a/src/summary/summary/core/config.py +++ b/src/summary/summary/core/config.py @@ -45,6 +45,15 @@ class Settings(BaseSettings): sentry_is_enabled: bool = False sentry_dsn: Optional[str] = None + # Posthog (analytics) + posthog_enabled: bool = False + posthog_api_key: Optional[str] = None + posthog_api_host: Optional[str] = "https://eu.i.posthog.com" + + # TaskTracker + task_tracker_redis_url: str = "redis://redis/0" + task_tracker_prefix: str = "task_metadata:" + @lru_cache def get_settings():