diff --git a/src/backend/core/recording/worker/mediator.py b/src/backend/core/recording/worker/mediator.py index 12311c2b..69728334 100644 --- a/src/backend/core/recording/worker/mediator.py +++ b/src/backend/core/recording/worker/mediator.py @@ -2,6 +2,7 @@ import logging +from core import utils from core.models import Recording, RecordingStatusChoices from .exceptions import ( @@ -60,6 +61,15 @@ class WorkerServiceMediator: finally: recording.save() + mode = recording.options.get("original_mode", None) or recording.mode + + try: + utils.update_room_metadata( + room_name, {"recording_mode": mode, "recording_status": "starting"} + ) + except utils.MetadataUpdateException as e: + logger.exception("Failed to update room's metadata: %s", e) + logger.info( "Worker started for room %s (worker ID: %s)", recording.room, @@ -95,4 +105,10 @@ class WorkerServiceMediator: finally: recording.save() + try: + room_name = str(recording.room.id) + utils.update_room_metadata(room_name, {"recording_status": "saving"}) + except utils.MetadataUpdateException as e: + logger.exception("Failed to update room's metadata: %s", e) + logger.info("Worker stopped for room %s", recording.room) diff --git a/src/backend/core/services/livekit_events.py b/src/backend/core/services/livekit_events.py index ce9d2604..a526e07f 100644 --- a/src/backend/core/services/livekit_events.py +++ b/src/backend/core/services/livekit_events.py @@ -11,7 +11,7 @@ from django.conf import settings from livekit import api -from core import models +from core import models, utils from core.recording.services.recording_events import ( RecordingEventsError, RecordingEventsService, @@ -138,6 +138,24 @@ class LiveKitEventsService: # pylint: disable=not-callable handler(data) + def _handle_egress_started(self, data): + """Handle 'egress_started' event.""" + + try: + recording = models.Recording.objects.get( + worker_id=data.egress_info.egress_id + ) + except models.Recording.DoesNotExist as err: + raise ActionFailedError( + f"Recording with worker ID {data.egress_info.egress_id} does not exist" + ) from err + + try: + room_name = str(recording.room.id) + utils.update_room_metadata(room_name, {"recording_status": "started"}) + except utils.MetadataUpdateException as e: + logger.exception("Failed to update room's metadata: %s", e) + def _handle_egress_ended(self, data): """Handle 'egress_ended' event.""" @@ -150,6 +168,14 @@ class LiveKitEventsService: f"Recording with worker ID {data.egress_info.egress_id} does not exist" ) from err + try: + room_name = str(recording.room.id) + utils.update_room_metadata( + room_name, {}, ["recording_mode", "recording_status"] + ) + except utils.MetadataUpdateException as e: + logger.exception("Failed to update room's metadata: %s", e) + if ( data.egress_info.status == api.EgressStatus.EGRESS_LIMIT_REACHED and recording.status == models.RecordingStatusChoices.ACTIVE diff --git a/src/backend/core/tests/recording/worker/test_mediator.py b/src/backend/core/tests/recording/worker/test_mediator.py index 2bafb317..5ca32b11 100644 --- a/src/backend/core/tests/recording/worker/test_mediator.py +++ b/src/backend/core/tests/recording/worker/test_mediator.py @@ -2,6 +2,7 @@ # pylint: disable=redefined-outer-name,unused-argument +from unittest import mock from unittest.mock import Mock import pytest @@ -33,14 +34,18 @@ def mediator(mock_worker_service): return WorkerServiceMediator(mock_worker_service) -def test_start_recording_success(mediator, mock_worker_service): +@mock.patch("core.utils.update_room_metadata") +def test_start_recording_success( + mock_update_room_metadata, mediator, mock_worker_service +): """Test successful recording start""" # Setup worker_id = "test-worker-123" mock_worker_service.start.return_value = worker_id mock_recording = RecordingFactory( - status=RecordingStatusChoices.INITIATED, worker_id=None + status=RecordingStatusChoices.INITIATED, + worker_id=None, ) mediator.start(mock_recording) @@ -55,12 +60,18 @@ def test_start_recording_success(mediator, mock_worker_service): assert mock_recording.worker_id == worker_id assert mock_recording.status == RecordingStatusChoices.ACTIVE + mock_update_room_metadata.assert_called_once_with( + str(mock_recording.room.id), + {"recording_mode": mock_recording.mode, "recording_status": "starting"}, + ) + @pytest.mark.parametrize( "error_class", [WorkerRequestError, WorkerConnectionError, WorkerResponseError] ) +@mock.patch("core.utils.update_room_metadata") def test_mediator_start_recording_worker_errors( - mediator, mock_worker_service, error_class + mock_update_room_metadata, mediator, mock_worker_service, error_class ): """Test handling of various worker errors during start""" # Setup @@ -78,6 +89,8 @@ def test_mediator_start_recording_worker_errors( assert mock_recording.status == RecordingStatusChoices.FAILED_TO_START assert mock_recording.worker_id is None + mock_update_room_metadata.assert_not_called() + @pytest.mark.parametrize( "status", @@ -90,8 +103,9 @@ def test_mediator_start_recording_worker_errors( RecordingStatusChoices.ABORTED, ], ) +@mock.patch("core.utils.update_room_metadata") def test_mediator_start_recording_from_forbidden_status( - mediator, mock_worker_service, status + mock_update_room_metadata, mediator, mock_worker_service, status ): """Test handling of various worker errors during start""" # Setup @@ -105,8 +119,13 @@ def test_mediator_start_recording_from_forbidden_status( mock_recording.refresh_from_db() assert mock_recording.status == status + mock_update_room_metadata.assert_not_called() -def test_mediator_stop_recording_success(mediator, mock_worker_service): + +@mock.patch("core.utils.update_room_metadata") +def test_mediator_stop_recording_success( + mock_update_room_metadata, mediator, mock_worker_service +): """Test successful recording stop""" # Setup mock_recording = RecordingFactory( @@ -124,8 +143,15 @@ def test_mediator_stop_recording_success(mediator, mock_worker_service): mock_recording.refresh_from_db() assert mock_recording.status == RecordingStatusChoices.STOPPED + mock_update_room_metadata.assert_called_once_with( + str(mock_recording.room.id), {"recording_status": "saving"} + ) -def test_mediator_stop_recording_aborted(mediator, mock_worker_service): + +@mock.patch("core.utils.update_room_metadata") +def test_mediator_stop_recording_aborted( + mock_update_room_metadata, mediator, mock_worker_service +): """Test recording stop when worker returns ABORTED""" # Setup mock_recording = RecordingFactory( @@ -140,10 +166,15 @@ def test_mediator_stop_recording_aborted(mediator, mock_worker_service): mock_recording.refresh_from_db() assert mock_recording.status == RecordingStatusChoices.ABORTED + mock_update_room_metadata.assert_called_once_with( + str(mock_recording.room.id), {"recording_status": "saving"} + ) + @pytest.mark.parametrize("error_class", [WorkerConnectionError, WorkerResponseError]) +@mock.patch("core.utils.update_room_metadata") def test_mediator_stop_recording_worker_errors( - mediator, mock_worker_service, error_class + mock_update_room_metadata, mediator, mock_worker_service, error_class ): """Test handling of worker errors during stop""" # Setup @@ -159,3 +190,5 @@ def test_mediator_stop_recording_worker_errors( # Verify recording updates mock_recording.refresh_from_db() assert mock_recording.status == RecordingStatusChoices.FAILED_TO_STOP + + mock_update_room_metadata.assert_not_called() diff --git a/src/backend/core/tests/services/test_livekit_events.py b/src/backend/core/tests/services/test_livekit_events.py index 67ebeb0b..aaac8c6f 100644 --- a/src/backend/core/tests/services/test_livekit_events.py +++ b/src/backend/core/tests/services/test_livekit_events.py @@ -21,7 +21,7 @@ from core.services.livekit_events import ( ) from core.services.lobby import LobbyService from core.services.telephony import TelephonyException, TelephonyService -from core.utils import NotificationError +from core.utils import MetadataUpdateException, NotificationError pytestmark = pytest.mark.django_db @@ -70,7 +70,10 @@ def test_initialization( ), ) @mock.patch("core.utils.notify_participants") -def test_handle_egress_ended_success(mock_notify, mode, notification_type, service): +@mock.patch("core.utils.update_room_metadata") +def test_handle_egress_ended_success( + mock_update_room_metadata, mock_notify, mode, notification_type, service +): """Should successfully stop recording and notifies all participant.""" recording = RecordingFactory(worker_id="worker-1", mode=mode, status="active") @@ -83,13 +86,65 @@ def test_handle_egress_ended_success(mock_notify, mode, notification_type, servi mock_notify.assert_called_once_with( room_name=str(recording.room.id), notification_data={"type": notification_type} ) + mock_update_room_metadata.assert_called_once_with( + str(recording.room.id), {}, ["recording_mode", "recording_status"] + ) recording.refresh_from_db() assert recording.status == "stopped" +@mock.patch("core.utils.update_room_metadata") +def test_handle_egress_started_success(mock_update_room_metadata, service): + """Should successfully start recording and update room's metadata.""" + + recording = RecordingFactory(worker_id="worker-1", status="initiated") + mock_data = mock.MagicMock() + mock_data.egress_info.egress_id = recording.worker_id + mock_data.egress_info.status = EgressStatus.EGRESS_ACTIVE + + service._handle_egress_started(mock_data) + + mock_update_room_metadata.assert_called_once_with( + str(recording.room.id), {"recording_status": "started"} + ) + + +@pytest.mark.parametrize( + ("mode", "notification_type"), + ( + ("screen_recording", "screenRecordingLimitReached"), + ("transcript", "transcriptionLimitReached"), + ), +) @mock.patch("core.utils.notify_participants") -def test_handle_egress_ended_notification_fails(mock_notify, service): +@mock.patch("core.utils.update_room_metadata") +def test_handle_egress_ended_metadata_update_fails( + mock_update_room_metadata, mock_notify, mode, notification_type, service +): + """Should successfully stop recording when metadata's update fails.""" + + recording = RecordingFactory(worker_id="worker-1", mode=mode, status="active") + mock_data = mock.MagicMock() + mock_data.egress_info.egress_id = recording.worker_id + mock_data.egress_info.status = EgressStatus.EGRESS_LIMIT_REACHED + + mock_update_room_metadata.side_effect = MetadataUpdateException("Error notifying") + + service._handle_egress_ended(mock_data) + + mock_notify.assert_called_once_with( + room_name=str(recording.room.id), notification_data={"type": notification_type} + ) + recording.refresh_from_db() + assert recording.status == "stopped" + + +@mock.patch("core.utils.notify_participants") +@mock.patch("core.utils.update_room_metadata") +def test_handle_egress_ended_notification_fails( + mock_update_room_metadata, mock_notify, service +): """Should raise ActionFailedError when notification fails but still stop recording.""" recording = RecordingFactory(worker_id="worker-1", status="active") @@ -108,9 +163,16 @@ def test_handle_egress_ended_notification_fails(mock_notify, service): recording.refresh_from_db() assert recording.status == "stopped" + mock_update_room_metadata.assert_called_once_with( + str(recording.room.id), {}, ["recording_mode", "recording_status"] + ) + @mock.patch("core.utils.notify_participants") -def test_handle_egress_ended_recording_not_found(mock_notify, service): +@mock.patch("core.utils.update_room_metadata") +def test_handle_egress_ended_recording_not_found( + mock_update_room_metadata, mock_notify, service +): """Should raise ActionFailedError when recording doesn't exist.""" recording = RecordingFactory(worker_id="worker-1", status="active") @@ -124,13 +186,17 @@ def test_handle_egress_ended_recording_not_found(mock_notify, service): service._handle_egress_ended(mock_data) mock_notify.assert_not_called() + mock_update_room_metadata.assert_not_called() recording.refresh_from_db() assert recording.status == "active" @mock.patch("core.utils.notify_participants") -def test_handle_egress_ended_recording_not_active(mock_notify, service): +@mock.patch("core.utils.update_room_metadata") +def test_handle_egress_ended_recording_not_active( + mock_update_room_metadata, mock_notify, service +): """Should ignore non-active recordings.""" recording = RecordingFactory(worker_id="worker-1", status="failed_to_stop") @@ -141,13 +207,19 @@ def test_handle_egress_ended_recording_not_active(mock_notify, service): service._handle_egress_ended(mock_data) mock_notify.assert_not_called() + mock_update_room_metadata.assert_called_once_with( + str(recording.room.id), {}, ["recording_mode", "recording_status"] + ) recording.refresh_from_db() assert recording.status == "failed_to_stop" @mock.patch("core.utils.notify_participants") -def test_handle_egress_ended_recording_not_limit_reached(mock_notify, service): +@mock.patch("core.utils.update_room_metadata") +def test_handle_egress_ended_recording_not_limit_reached( + mock_update_room_metadata, mock_notify, service +): """Should ignore egress non-limit-reached statuses.""" recording = RecordingFactory(worker_id="worker-1", status="stopped") @@ -158,6 +230,9 @@ def test_handle_egress_ended_recording_not_limit_reached(mock_notify, service): service._handle_egress_ended(mock_data) mock_notify.assert_not_called() + mock_update_room_metadata.assert_called_once_with( + str(recording.room.id), {}, ["recording_mode", "recording_status"] + ) assert recording.status == "stopped" diff --git a/src/backend/core/utils.py b/src/backend/core/utils.py index 19919448..5079e2c1 100644 --- a/src/backend/core/utils.py +++ b/src/backend/core/utils.py @@ -25,6 +25,7 @@ from livekit.api import ( # pylint: disable=E0611 LiveKitAPI, SendDataRequest, TwirpError, + UpdateRoomMetadataRequest, VideoGrants, ) @@ -244,6 +245,57 @@ async def notify_participants(room_name: str, notification_data: dict): await lkapi.aclose() +class MetadataUpdateException(Exception): + """Room's metadata update fails.""" + + +@async_to_sync +async def update_room_metadata( + room_name: str, metadata: dict, remove_keys: Optional[list[str]] = None +): + """Update LiveKit room metadata by merging new values with existing metadata. + + Args: + room_name: Name of the room to update + metadata: Dictionary of metadata key-values to add/update + remove_keys: Optional list of keys to remove from existing metadata. + """ + + lkapi = create_livekit_client() + + try: + response = await lkapi.room.list_rooms( + ListRoomsRequest( + names=[room_name], + ) + ) + + if not response.rooms: + return + + room = response.rooms[0] + + existing_metadata = json.loads(room.metadata) if room.metadata else {} + + if remove_keys: + for key in remove_keys: + existing_metadata.pop(key, None) + + updated_metadata = {**existing_metadata, **metadata} + + await lkapi.room.update_room_metadata( + UpdateRoomMetadataRequest( + room=room_name, metadata=json.dumps(updated_metadata).encode("utf-8") + ) + ) + except TwirpError as e: + raise MetadataUpdateException( + f"Failed to update metadata for room {room_name}: {e}" + ) from e + finally: + await lkapi.aclose() + + ALPHANUMERIC_CHARSET = string.ascii_letters + string.digits