🚧(backend) update recording metadata alongside recording state changes
Previously, this was handled manually by the client, sending notifications to other participants and keeping the recording state only in memory. There was no shared or persisted state, so leaving and rejoining a meeting lost this information. Delegating this responsibility solely to the client was a poor choice. The backend now owns this responsibility and relies on LiveKit webhooks to keep room metadata in sync with the egress lifecycle. This also reveals that the room.isRecording attribute does not update as fast as the egress stop event, which is unexpected and should be investigated further. This will make state management working when several room’s owner will be in the same meeting, which is expected to arrive any time soon.
This commit is contained in:
committed by
aleb_the_flash
parent
57a7523cc4
commit
16badde82d
@@ -2,6 +2,7 @@
|
||||
|
||||
import logging
|
||||
|
||||
from core import utils
|
||||
from core.models import Recording, RecordingStatusChoices
|
||||
|
||||
from .exceptions import (
|
||||
@@ -60,6 +61,15 @@ class WorkerServiceMediator:
|
||||
finally:
|
||||
recording.save()
|
||||
|
||||
mode = recording.options.get("original_mode", None) or recording.mode
|
||||
|
||||
try:
|
||||
utils.update_room_metadata(
|
||||
room_name, {"recording_mode": mode, "recording_status": "starting"}
|
||||
)
|
||||
except utils.MetadataUpdateException as e:
|
||||
logger.exception("Failed to update room's metadata: %s", e)
|
||||
|
||||
logger.info(
|
||||
"Worker started for room %s (worker ID: %s)",
|
||||
recording.room,
|
||||
@@ -95,4 +105,10 @@ class WorkerServiceMediator:
|
||||
finally:
|
||||
recording.save()
|
||||
|
||||
try:
|
||||
room_name = str(recording.room.id)
|
||||
utils.update_room_metadata(room_name, {"recording_status": "saving"})
|
||||
except utils.MetadataUpdateException as e:
|
||||
logger.exception("Failed to update room's metadata: %s", e)
|
||||
|
||||
logger.info("Worker stopped for room %s", recording.room)
|
||||
|
||||
@@ -11,7 +11,7 @@ from django.conf import settings
|
||||
|
||||
from livekit import api
|
||||
|
||||
from core import models
|
||||
from core import models, utils
|
||||
from core.recording.services.recording_events import (
|
||||
RecordingEventsError,
|
||||
RecordingEventsService,
|
||||
@@ -138,6 +138,24 @@ class LiveKitEventsService:
|
||||
# pylint: disable=not-callable
|
||||
handler(data)
|
||||
|
||||
def _handle_egress_started(self, data):
|
||||
"""Handle 'egress_started' event."""
|
||||
|
||||
try:
|
||||
recording = models.Recording.objects.get(
|
||||
worker_id=data.egress_info.egress_id
|
||||
)
|
||||
except models.Recording.DoesNotExist as err:
|
||||
raise ActionFailedError(
|
||||
f"Recording with worker ID {data.egress_info.egress_id} does not exist"
|
||||
) from err
|
||||
|
||||
try:
|
||||
room_name = str(recording.room.id)
|
||||
utils.update_room_metadata(room_name, {"recording_status": "started"})
|
||||
except utils.MetadataUpdateException as e:
|
||||
logger.exception("Failed to update room's metadata: %s", e)
|
||||
|
||||
def _handle_egress_ended(self, data):
|
||||
"""Handle 'egress_ended' event."""
|
||||
|
||||
@@ -150,6 +168,14 @@ class LiveKitEventsService:
|
||||
f"Recording with worker ID {data.egress_info.egress_id} does not exist"
|
||||
) from err
|
||||
|
||||
try:
|
||||
room_name = str(recording.room.id)
|
||||
utils.update_room_metadata(
|
||||
room_name, {}, ["recording_mode", "recording_status"]
|
||||
)
|
||||
except utils.MetadataUpdateException as e:
|
||||
logger.exception("Failed to update room's metadata: %s", e)
|
||||
|
||||
if (
|
||||
data.egress_info.status == api.EgressStatus.EGRESS_LIMIT_REACHED
|
||||
and recording.status == models.RecordingStatusChoices.ACTIVE
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
# pylint: disable=redefined-outer-name,unused-argument
|
||||
|
||||
from unittest import mock
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
@@ -33,14 +34,18 @@ def mediator(mock_worker_service):
|
||||
return WorkerServiceMediator(mock_worker_service)
|
||||
|
||||
|
||||
def test_start_recording_success(mediator, mock_worker_service):
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_start_recording_success(
|
||||
mock_update_room_metadata, mediator, mock_worker_service
|
||||
):
|
||||
"""Test successful recording start"""
|
||||
# Setup
|
||||
worker_id = "test-worker-123"
|
||||
mock_worker_service.start.return_value = worker_id
|
||||
|
||||
mock_recording = RecordingFactory(
|
||||
status=RecordingStatusChoices.INITIATED, worker_id=None
|
||||
status=RecordingStatusChoices.INITIATED,
|
||||
worker_id=None,
|
||||
)
|
||||
mediator.start(mock_recording)
|
||||
|
||||
@@ -55,12 +60,18 @@ def test_start_recording_success(mediator, mock_worker_service):
|
||||
assert mock_recording.worker_id == worker_id
|
||||
assert mock_recording.status == RecordingStatusChoices.ACTIVE
|
||||
|
||||
mock_update_room_metadata.assert_called_once_with(
|
||||
str(mock_recording.room.id),
|
||||
{"recording_mode": mock_recording.mode, "recording_status": "starting"},
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"error_class", [WorkerRequestError, WorkerConnectionError, WorkerResponseError]
|
||||
)
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_mediator_start_recording_worker_errors(
|
||||
mediator, mock_worker_service, error_class
|
||||
mock_update_room_metadata, mediator, mock_worker_service, error_class
|
||||
):
|
||||
"""Test handling of various worker errors during start"""
|
||||
# Setup
|
||||
@@ -78,6 +89,8 @@ def test_mediator_start_recording_worker_errors(
|
||||
assert mock_recording.status == RecordingStatusChoices.FAILED_TO_START
|
||||
assert mock_recording.worker_id is None
|
||||
|
||||
mock_update_room_metadata.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"status",
|
||||
@@ -90,8 +103,9 @@ def test_mediator_start_recording_worker_errors(
|
||||
RecordingStatusChoices.ABORTED,
|
||||
],
|
||||
)
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_mediator_start_recording_from_forbidden_status(
|
||||
mediator, mock_worker_service, status
|
||||
mock_update_room_metadata, mediator, mock_worker_service, status
|
||||
):
|
||||
"""Test handling of various worker errors during start"""
|
||||
# Setup
|
||||
@@ -105,8 +119,13 @@ def test_mediator_start_recording_from_forbidden_status(
|
||||
mock_recording.refresh_from_db()
|
||||
assert mock_recording.status == status
|
||||
|
||||
mock_update_room_metadata.assert_not_called()
|
||||
|
||||
def test_mediator_stop_recording_success(mediator, mock_worker_service):
|
||||
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_mediator_stop_recording_success(
|
||||
mock_update_room_metadata, mediator, mock_worker_service
|
||||
):
|
||||
"""Test successful recording stop"""
|
||||
# Setup
|
||||
mock_recording = RecordingFactory(
|
||||
@@ -124,8 +143,15 @@ def test_mediator_stop_recording_success(mediator, mock_worker_service):
|
||||
mock_recording.refresh_from_db()
|
||||
assert mock_recording.status == RecordingStatusChoices.STOPPED
|
||||
|
||||
mock_update_room_metadata.assert_called_once_with(
|
||||
str(mock_recording.room.id), {"recording_status": "saving"}
|
||||
)
|
||||
|
||||
def test_mediator_stop_recording_aborted(mediator, mock_worker_service):
|
||||
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_mediator_stop_recording_aborted(
|
||||
mock_update_room_metadata, mediator, mock_worker_service
|
||||
):
|
||||
"""Test recording stop when worker returns ABORTED"""
|
||||
# Setup
|
||||
mock_recording = RecordingFactory(
|
||||
@@ -140,10 +166,15 @@ def test_mediator_stop_recording_aborted(mediator, mock_worker_service):
|
||||
mock_recording.refresh_from_db()
|
||||
assert mock_recording.status == RecordingStatusChoices.ABORTED
|
||||
|
||||
mock_update_room_metadata.assert_called_once_with(
|
||||
str(mock_recording.room.id), {"recording_status": "saving"}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("error_class", [WorkerConnectionError, WorkerResponseError])
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_mediator_stop_recording_worker_errors(
|
||||
mediator, mock_worker_service, error_class
|
||||
mock_update_room_metadata, mediator, mock_worker_service, error_class
|
||||
):
|
||||
"""Test handling of worker errors during stop"""
|
||||
# Setup
|
||||
@@ -159,3 +190,5 @@ def test_mediator_stop_recording_worker_errors(
|
||||
# Verify recording updates
|
||||
mock_recording.refresh_from_db()
|
||||
assert mock_recording.status == RecordingStatusChoices.FAILED_TO_STOP
|
||||
|
||||
mock_update_room_metadata.assert_not_called()
|
||||
|
||||
@@ -21,7 +21,7 @@ from core.services.livekit_events import (
|
||||
)
|
||||
from core.services.lobby import LobbyService
|
||||
from core.services.telephony import TelephonyException, TelephonyService
|
||||
from core.utils import NotificationError
|
||||
from core.utils import MetadataUpdateException, NotificationError
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
@@ -70,7 +70,10 @@ def test_initialization(
|
||||
),
|
||||
)
|
||||
@mock.patch("core.utils.notify_participants")
|
||||
def test_handle_egress_ended_success(mock_notify, mode, notification_type, service):
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_handle_egress_ended_success(
|
||||
mock_update_room_metadata, mock_notify, mode, notification_type, service
|
||||
):
|
||||
"""Should successfully stop recording and notifies all participant."""
|
||||
|
||||
recording = RecordingFactory(worker_id="worker-1", mode=mode, status="active")
|
||||
@@ -83,13 +86,65 @@ def test_handle_egress_ended_success(mock_notify, mode, notification_type, servi
|
||||
mock_notify.assert_called_once_with(
|
||||
room_name=str(recording.room.id), notification_data={"type": notification_type}
|
||||
)
|
||||
mock_update_room_metadata.assert_called_once_with(
|
||||
str(recording.room.id), {}, ["recording_mode", "recording_status"]
|
||||
)
|
||||
|
||||
recording.refresh_from_db()
|
||||
assert recording.status == "stopped"
|
||||
|
||||
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_handle_egress_started_success(mock_update_room_metadata, service):
|
||||
"""Should successfully start recording and update room's metadata."""
|
||||
|
||||
recording = RecordingFactory(worker_id="worker-1", status="initiated")
|
||||
mock_data = mock.MagicMock()
|
||||
mock_data.egress_info.egress_id = recording.worker_id
|
||||
mock_data.egress_info.status = EgressStatus.EGRESS_ACTIVE
|
||||
|
||||
service._handle_egress_started(mock_data)
|
||||
|
||||
mock_update_room_metadata.assert_called_once_with(
|
||||
str(recording.room.id), {"recording_status": "started"}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("mode", "notification_type"),
|
||||
(
|
||||
("screen_recording", "screenRecordingLimitReached"),
|
||||
("transcript", "transcriptionLimitReached"),
|
||||
),
|
||||
)
|
||||
@mock.patch("core.utils.notify_participants")
|
||||
def test_handle_egress_ended_notification_fails(mock_notify, service):
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_handle_egress_ended_metadata_update_fails(
|
||||
mock_update_room_metadata, mock_notify, mode, notification_type, service
|
||||
):
|
||||
"""Should successfully stop recording when metadata's update fails."""
|
||||
|
||||
recording = RecordingFactory(worker_id="worker-1", mode=mode, status="active")
|
||||
mock_data = mock.MagicMock()
|
||||
mock_data.egress_info.egress_id = recording.worker_id
|
||||
mock_data.egress_info.status = EgressStatus.EGRESS_LIMIT_REACHED
|
||||
|
||||
mock_update_room_metadata.side_effect = MetadataUpdateException("Error notifying")
|
||||
|
||||
service._handle_egress_ended(mock_data)
|
||||
|
||||
mock_notify.assert_called_once_with(
|
||||
room_name=str(recording.room.id), notification_data={"type": notification_type}
|
||||
)
|
||||
recording.refresh_from_db()
|
||||
assert recording.status == "stopped"
|
||||
|
||||
|
||||
@mock.patch("core.utils.notify_participants")
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_handle_egress_ended_notification_fails(
|
||||
mock_update_room_metadata, mock_notify, service
|
||||
):
|
||||
"""Should raise ActionFailedError when notification fails but still stop recording."""
|
||||
|
||||
recording = RecordingFactory(worker_id="worker-1", status="active")
|
||||
@@ -108,9 +163,16 @@ def test_handle_egress_ended_notification_fails(mock_notify, service):
|
||||
recording.refresh_from_db()
|
||||
assert recording.status == "stopped"
|
||||
|
||||
mock_update_room_metadata.assert_called_once_with(
|
||||
str(recording.room.id), {}, ["recording_mode", "recording_status"]
|
||||
)
|
||||
|
||||
|
||||
@mock.patch("core.utils.notify_participants")
|
||||
def test_handle_egress_ended_recording_not_found(mock_notify, service):
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_handle_egress_ended_recording_not_found(
|
||||
mock_update_room_metadata, mock_notify, service
|
||||
):
|
||||
"""Should raise ActionFailedError when recording doesn't exist."""
|
||||
|
||||
recording = RecordingFactory(worker_id="worker-1", status="active")
|
||||
@@ -124,13 +186,17 @@ def test_handle_egress_ended_recording_not_found(mock_notify, service):
|
||||
service._handle_egress_ended(mock_data)
|
||||
|
||||
mock_notify.assert_not_called()
|
||||
mock_update_room_metadata.assert_not_called()
|
||||
|
||||
recording.refresh_from_db()
|
||||
assert recording.status == "active"
|
||||
|
||||
|
||||
@mock.patch("core.utils.notify_participants")
|
||||
def test_handle_egress_ended_recording_not_active(mock_notify, service):
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_handle_egress_ended_recording_not_active(
|
||||
mock_update_room_metadata, mock_notify, service
|
||||
):
|
||||
"""Should ignore non-active recordings."""
|
||||
|
||||
recording = RecordingFactory(worker_id="worker-1", status="failed_to_stop")
|
||||
@@ -141,13 +207,19 @@ def test_handle_egress_ended_recording_not_active(mock_notify, service):
|
||||
service._handle_egress_ended(mock_data)
|
||||
|
||||
mock_notify.assert_not_called()
|
||||
mock_update_room_metadata.assert_called_once_with(
|
||||
str(recording.room.id), {}, ["recording_mode", "recording_status"]
|
||||
)
|
||||
|
||||
recording.refresh_from_db()
|
||||
assert recording.status == "failed_to_stop"
|
||||
|
||||
|
||||
@mock.patch("core.utils.notify_participants")
|
||||
def test_handle_egress_ended_recording_not_limit_reached(mock_notify, service):
|
||||
@mock.patch("core.utils.update_room_metadata")
|
||||
def test_handle_egress_ended_recording_not_limit_reached(
|
||||
mock_update_room_metadata, mock_notify, service
|
||||
):
|
||||
"""Should ignore egress non-limit-reached statuses."""
|
||||
|
||||
recording = RecordingFactory(worker_id="worker-1", status="stopped")
|
||||
@@ -158,6 +230,9 @@ def test_handle_egress_ended_recording_not_limit_reached(mock_notify, service):
|
||||
service._handle_egress_ended(mock_data)
|
||||
|
||||
mock_notify.assert_not_called()
|
||||
mock_update_room_metadata.assert_called_once_with(
|
||||
str(recording.room.id), {}, ["recording_mode", "recording_status"]
|
||||
)
|
||||
assert recording.status == "stopped"
|
||||
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ from livekit.api import ( # pylint: disable=E0611
|
||||
LiveKitAPI,
|
||||
SendDataRequest,
|
||||
TwirpError,
|
||||
UpdateRoomMetadataRequest,
|
||||
VideoGrants,
|
||||
)
|
||||
|
||||
@@ -244,6 +245,57 @@ async def notify_participants(room_name: str, notification_data: dict):
|
||||
await lkapi.aclose()
|
||||
|
||||
|
||||
class MetadataUpdateException(Exception):
|
||||
"""Room's metadata update fails."""
|
||||
|
||||
|
||||
@async_to_sync
|
||||
async def update_room_metadata(
|
||||
room_name: str, metadata: dict, remove_keys: Optional[list[str]] = None
|
||||
):
|
||||
"""Update LiveKit room metadata by merging new values with existing metadata.
|
||||
|
||||
Args:
|
||||
room_name: Name of the room to update
|
||||
metadata: Dictionary of metadata key-values to add/update
|
||||
remove_keys: Optional list of keys to remove from existing metadata.
|
||||
"""
|
||||
|
||||
lkapi = create_livekit_client()
|
||||
|
||||
try:
|
||||
response = await lkapi.room.list_rooms(
|
||||
ListRoomsRequest(
|
||||
names=[room_name],
|
||||
)
|
||||
)
|
||||
|
||||
if not response.rooms:
|
||||
return
|
||||
|
||||
room = response.rooms[0]
|
||||
|
||||
existing_metadata = json.loads(room.metadata) if room.metadata else {}
|
||||
|
||||
if remove_keys:
|
||||
for key in remove_keys:
|
||||
existing_metadata.pop(key, None)
|
||||
|
||||
updated_metadata = {**existing_metadata, **metadata}
|
||||
|
||||
await lkapi.room.update_room_metadata(
|
||||
UpdateRoomMetadataRequest(
|
||||
room=room_name, metadata=json.dumps(updated_metadata).encode("utf-8")
|
||||
)
|
||||
)
|
||||
except TwirpError as e:
|
||||
raise MetadataUpdateException(
|
||||
f"Failed to update metadata for room {room_name}: {e}"
|
||||
) from e
|
||||
finally:
|
||||
await lkapi.aclose()
|
||||
|
||||
|
||||
ALPHANUMERIC_CHARSET = string.ascii_letters + string.digits
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user