💩(backend) notify the summary service when a new recording is available
Draft a piece of code to try the feature in staging. I'll consolidate this implementation ASAP, as soon we have a first implementation functional. What's missing? - when owners are multiple - retry when the backend cannot reach the summary service - factorize the key oneliner, duplicated from the egress service - optimize SQL query - unit tests
This commit is contained in:
committed by
aleb_the_flash
parent
300756b323
commit
4fe01ae2bf
@@ -32,6 +32,7 @@ from core.recording.event.exceptions import (
|
||||
InvalidFileTypeError,
|
||||
ParsingEventDataError,
|
||||
)
|
||||
from core.recording.event.notification import notification_service
|
||||
from core.recording.event.parsers import get_parser
|
||||
from core.recording.worker.exceptions import (
|
||||
RecordingStartError,
|
||||
@@ -448,7 +449,17 @@ class RecordingViewSet(
|
||||
" in an error state or has already been saved."
|
||||
)
|
||||
|
||||
recording.status = models.RecordingStatusChoices.SAVED
|
||||
# Attempt to notify external services about the recording
|
||||
# This is a non-blocking operation - failures are logged but don't interrupt the flow
|
||||
notification_succeeded = notification_service.notify_external_services(
|
||||
recording
|
||||
)
|
||||
|
||||
recording.status = (
|
||||
models.RecordingStatusChoices.NOTIFICATION_SUCCEEDED
|
||||
if notification_succeeded
|
||||
else models.RecordingStatusChoices.SAVED
|
||||
)
|
||||
recording.save()
|
||||
|
||||
return drf_response.Response(
|
||||
|
||||
18
src/backend/core/migrations/0009_alter_recording_status.py
Normal file
18
src/backend/core/migrations/0009_alter_recording_status.py
Normal file
@@ -0,0 +1,18 @@
|
||||
# Generated by Django 5.1.3 on 2024-12-02 13:23
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('core', '0008_user_full_name_user_short_name'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name='recording',
|
||||
name='status',
|
||||
field=models.CharField(choices=[('initiated', 'Initiated'), ('active', 'Active'), ('stopped', 'Stopped'), ('saved', 'Saved'), ('aborted', 'Aborted'), ('failed_to_start', 'Failed to Start'), ('failed_to_stop', 'Failed to Stop'), ('notification_succeeded', 'Notification succeeded')], default='initiated', max_length=50),
|
||||
),
|
||||
]
|
||||
@@ -49,6 +49,7 @@ class RecordingStatusChoices(models.TextChoices):
|
||||
ABORTED = "aborted", _("Aborted")
|
||||
FAILED_TO_START = "failed_to_start", _("Failed to Start")
|
||||
FAILED_TO_STOP = "failed_to_stop", _("Failed to Stop")
|
||||
NOTIFICATION_SUCCEEDED = "notification_succeeded", _("Notification succeeded")
|
||||
|
||||
@classmethod
|
||||
def is_final(cls, status):
|
||||
@@ -462,7 +463,23 @@ class BaseAccess(BaseModel):
|
||||
|
||||
|
||||
class Recording(BaseModel):
|
||||
"""Model for recordings that take place in a room"""
|
||||
"""Model for recordings that take place in a room.
|
||||
|
||||
Recording Status Flow:
|
||||
1. INITIATED: Initial state when recording is requested
|
||||
2. ACTIVE: Recording is currently in progress
|
||||
3. STOPPED: Recording has been stopped by user/system
|
||||
4. SAVED: Recording has been successfully processed and stored
|
||||
4. NOTIFICATION_SUCCEEDED: External service has been notified of this recording
|
||||
|
||||
Error States:
|
||||
- FAILED_TO_START: Worker failed to initialize recording
|
||||
- FAILED_TO_STOP: Worker failed during stop operation
|
||||
- ABORTED: Recording was terminated before completion
|
||||
|
||||
Warning: Worker failures may lead to database inconsistency between the actual
|
||||
recording state and its status in the database.
|
||||
"""
|
||||
|
||||
room = models.ForeignKey(
|
||||
Room,
|
||||
@@ -471,7 +488,7 @@ class Recording(BaseModel):
|
||||
verbose_name=_("Room"),
|
||||
)
|
||||
status = models.CharField(
|
||||
max_length=20,
|
||||
max_length=50,
|
||||
choices=RecordingStatusChoices.choices,
|
||||
default=RecordingStatusChoices.INITIATED,
|
||||
)
|
||||
@@ -543,22 +560,7 @@ class Recording(BaseModel):
|
||||
|
||||
|
||||
class RecordingAccess(BaseAccess):
|
||||
"""Relation model to give access to a recording for a user or a team with a role.
|
||||
|
||||
Recording Status Flow:
|
||||
1. INITIATED: Initial state when recording is requested
|
||||
2. ACTIVE: Recording is currently in progress
|
||||
3. STOPPED: Recording has been stopped by user/system
|
||||
4. SAVED: Recording has been successfully processed and stored
|
||||
|
||||
Error States:
|
||||
- FAILED_TO_START: Worker failed to initialize recording
|
||||
- FAILED_TO_STOP: Worker failed during stop operation
|
||||
- ABORTED: Recording was terminated before completion
|
||||
|
||||
Warning: Worker failures may lead to database inconsistency between the actual
|
||||
recording state and its status in the database.
|
||||
"""
|
||||
"""Relation model to give access to a recording for a user or a team with a role."""
|
||||
|
||||
recording = models.ForeignKey(
|
||||
Recording,
|
||||
|
||||
93
src/backend/core/recording/event/notification.py
Normal file
93
src/backend/core/recording/event/notification.py
Normal file
@@ -0,0 +1,93 @@
|
||||
"""Service to notify external services when a new recording is ready."""
|
||||
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
import requests
|
||||
|
||||
from core import models
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotificationService:
|
||||
"""Service for processing recordings and notifying external services."""
|
||||
|
||||
def notify_external_services(self, recording):
|
||||
"""Process a recording based on its mode."""
|
||||
|
||||
if recording.mode == models.RecordingModeChoices.TRANSCRIPT:
|
||||
return self._notify_summary_service(recording)
|
||||
|
||||
if recording.mode == models.RecordingModeChoices.SCREEN_RECORDING:
|
||||
logger.warning(
|
||||
"Screen recording mode not implemented for recording %s", recording.id
|
||||
)
|
||||
return False
|
||||
|
||||
logger.error(
|
||||
"Unknown recording mode %s for recording %s",
|
||||
recording.mode,
|
||||
recording.id,
|
||||
)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _notify_summary_service(recording):
|
||||
"""Notify summary service about a new recording."""
|
||||
|
||||
if (
|
||||
not settings.SUMMARY_SERVICE_ENDPOINT
|
||||
or not settings.SUMMARY_SERVICE_API_TOKEN
|
||||
):
|
||||
logger.error("Summary service not configured")
|
||||
return False
|
||||
|
||||
owner_access = (
|
||||
models.RecordingAccess.objects.select_related("user")
|
||||
.filter(
|
||||
role=models.RoleChoices.OWNER,
|
||||
recording_id=recording.id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not owner_access:
|
||||
logger.error("No owner found for recording %s", recording.id)
|
||||
return False
|
||||
|
||||
key = f"{settings.RECORDING_OUTPUT_FOLDER}/{recording.id}.ogg"
|
||||
|
||||
payload = {
|
||||
"filename": key,
|
||||
"email": owner_access.user.email,
|
||||
"sub": owner_access.user.sub,
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {settings.SUMMARY_SERVICE_API_TOKEN}",
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
settings.SUMMARY_SERVICE_ENDPOINT,
|
||||
json=payload,
|
||||
headers=headers,
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except requests.HTTPError as exc:
|
||||
logger.exception(
|
||||
"Summary service HTTP error for recording %s. URL: %s. Exception: %s",
|
||||
recording.id,
|
||||
settings.SUMMARY_SERVICE_ENDPOINT,
|
||||
exc,
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
notification_service = NotificationService()
|
||||
@@ -448,6 +448,12 @@ class Base(Configuration):
|
||||
RECORDING_STORAGE_EVENT_TOKEN = values.Value(
|
||||
None, environ_name="RECORDING_STORAGE_EVENT_TOKEN", environ_prefix=None
|
||||
)
|
||||
SUMMARY_SERVICE_ENDPOINT = values.Value(
|
||||
None, environ_name="SUMMARY_SERVICE_ENDPOINT", environ_prefix=None
|
||||
)
|
||||
SUMMARY_SERVICE_API_TOKEN = values.Value(
|
||||
None, environ_name="SUMMARY_SERVICE_API_TOKEN", environ_prefix=None
|
||||
)
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
@property
|
||||
|
||||
@@ -59,6 +59,8 @@ backend:
|
||||
RECORDING_VERIFY_SSL: False
|
||||
RECORDING_STORAGE_EVENT_ENABLE: True
|
||||
RECORDING_STORAGE_EVENT_TOKEN: password
|
||||
SUMMARY_SERVICE_ENDPOINT: http://meet-summary:80/api/v1/tasks/
|
||||
SUMMARY_SERVICE_API_TOKEN: password
|
||||
|
||||
|
||||
migrate:
|
||||
|
||||
Reference in New Issue
Block a user