diff --git a/src/summary/summary/core/analytics.py b/src/summary/summary/core/analytics.py index e945e8cc..dce046d4 100644 --- a/src/summary/summary/core/analytics.py +++ b/src/summary/summary/core/analytics.py @@ -55,8 +55,8 @@ def get_analytics(): return Analytics() -class TasksTracker: - """Tracks task execution metadata and analytics for background tasks.""" +class MetadataManager: + """A Redis-based metadata manager for storing and retrieving task metadata.""" def __init__(self): """Initialize the task tracker with analytics client.""" diff --git a/src/summary/summary/core/celery_worker.py b/src/summary/summary/core/celery_worker.py index 2c18e721..a464a352 100644 --- a/src/summary/summary/core/celery_worker.py +++ b/src/summary/summary/core/celery_worker.py @@ -16,14 +16,14 @@ from requests import Session, exceptions from requests.adapters import HTTPAdapter from urllib3.util import Retry -from summary.core.analytics import TasksTracker, get_analytics +from summary.core.analytics import MetadataManager, get_analytics from summary.core.config import get_settings from summary.core.prompt import get_instructions settings = get_settings() analytics = get_analytics() -tasks_tracker = TasksTracker() +metadata_manager = MetadataManager() logger = get_task_logger(__name__) @@ -136,19 +136,19 @@ def post_with_retries(url, data): def task_started(task_id=None, task=None, args=None, **kwargs): """Signal handler called before task execution begins.""" task_args = args or [] - tasks_tracker.create(task_id, task_args) + metadata_manager.create(task_id, task_args) @signals.task_retry.connect def task_retry_handler(request=None, reason=None, einfo=None, **kwargs): """Signal handler called when task execution retries.""" - tasks_tracker.retry(request.id) + metadata_manager.retry(request.id) @signals.task_failure.connect def task_failure_handler(task_id, exception=None, **kwargs): """Signal handler called when task execution fails permanently.""" - tasks_tracker.capture(task_id, settings.posthog_event_failure) + metadata_manager.capture(task_id, settings.posthog_event_failure) @celery.task(max_retries=settings.celery_max_retries) @@ -267,7 +267,7 @@ def process_audio_transcribe_summarize_v2( logger.debug("Recording filepath: %s", temp_file_path) audio_file = File(temp_file_path) - tasks_tracker.track(task_id, {"audio_length": audio_file.info.length}) + metadata_manager.track(task_id, {"audio_length": audio_file.info.length}) if audio_file.info.length > settings.recording_max_duration: error_msg = "Recording too long: %.2fs > %.2fs limit" % ( @@ -291,7 +291,7 @@ def process_audio_transcribe_summarize_v2( transcription = openai_client.audio.transcriptions.create( model=settings.openai_asr_model, file=audio_file ) - tasks_tracker.track( + metadata_manager.track( task_id, { "transcription_time": round( @@ -312,7 +312,7 @@ def process_audio_transcribe_summarize_v2( else format_segments(transcription) ) - tasks_tracker.track_transcription_metadata(task_id, transcription) + metadata_manager.track_transcription_metadata(task_id, transcription) data = { "title": "Transcription", @@ -329,6 +329,6 @@ def process_audio_transcribe_summarize_v2( logger.info("Webhook submitted successfully. Status: %s", response.status_code) logger.debug("Response body: %s", response.text) - tasks_tracker.capture(task_id, settings.posthog_event_success) + metadata_manager.capture(task_id, settings.posthog_event_success) # TODO - integrate summarize the transcript and create a new document.