📈(summary) kickstart analytics tracking in summary microservice

Add product analytics to understand summary feature usage and behavior.
Track transcript and task metadata for insights without exposing sensitive
content or speaker data.

Hacky but functional PostHog usage - fully optional for self-hosting.
Extensive tracking approach works for current needs despite not being
PostHog's typical use case.
This commit is contained in:
lebaudantoine
2025-07-08 23:13:04 +02:00
committed by aleb_the_flash
parent 21bed40484
commit 6e55013b15
5 changed files with 257 additions and 4 deletions

View File

@@ -173,6 +173,7 @@ summary:
WEBHOOK_URL: https://www.mock-impress.com/webhook/
CELERY_BROKER_URL: redis://default:pass@redis-master:6379/1
CELERY_RESULT_BACKEND: redis://default:pass@redis-master:6379/1
TASK_TRACKER_REDIS_URL: redis://default:pass@redis-master:6379/1
image:
repository: localhost:5001/meet-summary
@@ -206,6 +207,7 @@ celery:
WEBHOOK_URL: https://www.mock-impress.com/webhook/
CELERY_BROKER_URL: redis://default:pass@redis-master:6379/1
CELERY_RESULT_BACKEND: redis://default:pass@redis-master:6379/1
TASK_TRACKER_REDIS_URL: redis://default:pass@redis-master:6379/1
image:
repository: localhost:5001/meet-summary

View File

@@ -1,5 +1,6 @@
"""API routes related to application tasks."""
import time
from typing import Optional
from celery.result import AsyncResult
@@ -33,7 +34,7 @@ async def create_task(request: TaskCreation):
)
else:
task = process_audio_transcribe_summarize_v2.delay(
request.filename, request.email, request.sub
request.filename, request.email, request.sub, time.time()
)
return {"id": task.id, "message": "Task created"}

View File

@@ -0,0 +1,197 @@
"""Analytics classes."""
import json
import time
from collections import Counter
from functools import lru_cache
import redis
from celery.utils.log import get_task_logger
from posthog import Posthog
from summary.core.config import get_settings
logger = get_task_logger(__name__)
settings = get_settings()
class AnalyticsException(Exception):
"""Exception raised when analytics operations fail."""
pass
class Analytics:
"""Analytics client wrapper for PostHog integration."""
def __init__(self):
"""Initialize a client if settings are configure."""
self._client = None
if settings.posthog_api_key and settings.posthog_enabled:
logger.info("Initialize analytics client")
self._client = Posthog(settings.posthog_api_key, settings.posthog_api_host)
@property
def is_disabled(self):
"""Check if analytics client is disabled or not configured."""
return not self._client
def capture(self, event_name, distinct_id, properties=None):
"""Track an event if analytics is enabled."""
if self.is_disabled:
return
try:
self._client.capture(
event_name, distinct_id=distinct_id, properties=properties
)
except Exception as e:
raise AnalyticsException("Failed to capture analytics event") from e
@lru_cache
def get_analytics():
"""Init Analytics client once."""
return Analytics()
class TasksTracker:
"""Tracks task execution metadata and analytics for background tasks."""
def __init__(self):
"""Initialize the task tracker with analytics client."""
self._redis = redis.from_url(settings.task_tracker_redis_url)
self._key_prefix = settings.task_tracker_prefix
self._analytics = get_analytics()
self._is_disabled = self._analytics.is_disabled
def _get_redis_key(self, task_id):
"""Generate Redis key for task metadata."""
return f"{self._key_prefix}{task_id}"
def _save_metadata(self, task_id, metadata):
"""Wip."""
self._redis.hset(self._get_redis_key(task_id), mapping=metadata)
def _get_metadata(self, task_id):
"""Wip."""
raw_metadata = self._redis.hgetall(self._get_redis_key(task_id))
return {k.decode("utf-8"): v.decode("utf-8") for k, v in raw_metadata.items()}
def has_task_id(self, task_id):
"""Check if task_id exists in tasks metadata cache."""
return self._redis.exists(self._get_redis_key(task_id))
def create(self, task_id, task_args):
"""Create initial metadata entry for a new task."""
if self._is_disabled or self.has_task_id(task_id):
return
initial_metadata = {
"start_time": time.time(),
"asr_model": settings.openai_asr_model,
"retries": 0,
}
_required_args_count = 4
if len(task_args) != _required_args_count:
logger.error("Invalid number of arguments.")
return
filename, email, _, received_at = task_args
initial_metadata = {
**initial_metadata,
"filename": filename,
"email": email,
"queuing_time": round(initial_metadata["start_time"] - received_at, 2),
}
self._save_metadata(task_id, initial_metadata)
def retry(self, task_id):
"""Increment retry counter for a task."""
if self._is_disabled or not self.has_task_id(task_id):
return
metadata = self._get_metadata(task_id)
if "retries" in metadata:
metadata["retries"] = int(metadata["retries"]) + 1
else:
metadata["retries"] = 1
self._save_metadata(task_id, metadata)
def clear(self, task_id):
"""Remove task metadata from cache."""
if self._is_disabled or not self.has_task_id(task_id):
return
self._redis.delete(self._get_redis_key(task_id))
def track(self, task_id, data):
"""Update task metadata with additional data."""
if self._is_disabled or not self.has_task_id(task_id):
return
metadata = self._get_metadata(task_id)
self._save_metadata(task_id, {**metadata, **data})
def track_transcription_metadata(self, task_id, transcription):
"""Extract and track metadata from transcription results."""
if self._is_disabled or not self.has_task_id(task_id):
return
if not transcription or not transcription.segments:
self.track(task_id, {"transcription_empty": "true", "number_speakers": 0})
return
speakers = [
segment.get("speaker", "UNKNOWN_SPEAKER")
for segment in transcription.segments
]
speaker_counts = Counter(speakers)
segments_count = len(transcription.segments)
speaker_percentages = {
speaker: round((count / segments_count) * 100, 2)
for speaker, count in speaker_counts.items()
}
text_length = 0
for segment in transcription.segments:
text_length += len(segment.get("text", ""))
self.track(
task_id,
{
"transcription_empty": "false",
"speakers_count": len(set(speakers)),
"segments_count": segments_count,
"speakers_distribution": json.dumps(speaker_percentages),
"text_length": text_length,
},
)
def capture(self, task_id, event_name):
"""Capture analytics event with task metadata and clean up."""
if self._is_disabled or not self.has_task_id(task_id):
return
metadata = self._get_metadata(task_id)
if "start_time" in metadata:
metadata["execution_time"] = str(round(
time.time() - float(metadata["start_time"]), 2
))
del metadata["start_time"]
metadata["task_id"] = task_id
self.clear(task_id)
try:
self._analytics.capture(event_name, metadata.get("email"), metadata)
except AnalyticsException:
logger.exception("Failed to capture analytics event")

View File

@@ -3,6 +3,7 @@
import json
import os
import tempfile
import time
from pathlib import Path
import openai
@@ -10,14 +11,19 @@ import sentry_sdk
from celery import Celery, signals
from celery.utils.log import get_task_logger
from minio import Minio
from mutagen import File
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from summary.core.analytics import TasksTracker, get_analytics
from summary.core.config import get_settings
from summary.core.prompt import get_instructions
settings = get_settings()
analytics = get_analytics()
tasks_tracker = TasksTracker()
logger = get_task_logger(__name__)
@@ -120,6 +126,25 @@ def post_with_retries(url, data):
session.close()
@signals.task_prerun.connect
def task_started(task_id=None, task=None, args=None, **kwargs):
"""Signal handler called before task execution begins."""
task_args = args or []
tasks_tracker.create(task_id, task_args)
@signals.task_retry.connect
def task_retry_handler(request=None, reason=None, einfo=None, **kwargs):
"""Signal handler called when task execution retries."""
tasks_tracker.retry(request.id)
@signals.task_failure.connect
def task_failure_handler(task_id, exception=None, **kwargs):
"""Signal handler called when task execution fails permanently."""
tasks_tracker.capture(task_id, "task_failed")
@celery.task(max_retries=settings.celery_max_retries)
def process_audio_transcribe_summarize(filename: str, email: str, sub: str):
"""Process an audio file by transcribing it and generating a summary.
@@ -194,8 +219,10 @@ def process_audio_transcribe_summarize(filename: str, email: str, sub: str):
logger.debug("Response body: %s", response.text)
@celery.task(max_retries=settings.celery_max_retries)
def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str):
@celery.task(bind=True, max_retries=settings.celery_max_retries)
def process_audio_transcribe_summarize_v2(
self, filename: str, email: str, sub: str, received_at: float
):
"""Process an audio file by transcribing it and generating a summary.
This Celery task performs the following operations:
@@ -207,6 +234,8 @@ def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str):
logger.info("Notification received")
logger.debug("filename: %s", filename)
task_id = self.request.id
minio_client = Minio(
settings.aws_s3_endpoint_url,
access_key=settings.aws_s3_access_key_id,
@@ -223,6 +252,9 @@ def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str):
temp_file_path = save_audio_stream(audio_file_stream)
logger.debug("Recording successfully downloaded, filepath: %s", temp_file_path)
audio_file = File(temp_file_path)
tasks_tracker.track(task_id, {"audio_length": audio_file.info.length})
logger.debug("Initiating OpenAI client")
openai_client = openai.OpenAI(
api_key=settings.openai_api_key, base_url=settings.openai_base_url
@@ -230,11 +262,19 @@ def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str):
try:
logger.debug("Querying transcription …")
transcription_start_time = time.time()
with open(temp_file_path, "rb") as audio_file:
transcription = openai_client.audio.transcriptions.create(
model=settings.openai_asr_model, file=audio_file
)
tasks_tracker.track(
task_id,
{
"transcription_time": round(
time.time() - transcription_start_time, 2
)
},
)
logger.debug("Transcription: \n %s", transcription)
finally:
if os.path.exists(temp_file_path):
@@ -247,6 +287,8 @@ def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str):
else format_segments(transcription)
)
tasks_tracker.track_transcription_metadata(task_id, transcription)
data = {
"title": "Transcription",
"content": formatted_transcription,
@@ -262,4 +304,6 @@ def process_audio_transcribe_summarize_v2(filename: str, email: str, sub: str):
logger.info("Webhook submitted successfully. Status: %s", response.status_code)
logger.debug("Response body: %s", response.text)
tasks_tracker.capture(task_id, "task_succeeded")
# TODO - integrate summarize the transcript and create a new document.

View File

@@ -45,6 +45,15 @@ class Settings(BaseSettings):
sentry_is_enabled: bool = False
sentry_dsn: Optional[str] = None
# Posthog (analytics)
posthog_enabled: bool = False
posthog_api_key: Optional[str] = None
posthog_api_host: Optional[str] = "https://eu.i.posthog.com"
# TaskTracker
task_tracker_redis_url: str = "redis://redis/0"
task_tracker_prefix: str = "task_metadata:"
@lru_cache
def get_settings():