diff --git a/src/backend/core/recording/services/recording_events.py b/src/backend/core/recording/services/recording_events.py index f24a7b02..2577eceb 100644 --- a/src/backend/core/recording/services/recording_events.py +++ b/src/backend/core/recording/services/recording_events.py @@ -1,7 +1,11 @@ """Recording-related LiveKit Events Service""" +# pylint: disable=no-member + from logging import getLogger +from livekit import api + from core import models, utils logger = getLogger(__name__) @@ -14,6 +18,27 @@ class RecordingEventsError(Exception): class RecordingEventsService: """Handles recording-related Livekit webhook events.""" + @staticmethod + def handle_update(recording, egress_status): + """Handle egress status updates and sync recording state to room metadata.""" + + room_name = str(recording.room.id) + + status_mapping = { + api.EgressStatus.EGRESS_ACTIVE: "started", + api.EgressStatus.EGRESS_ENDING: "saving", + api.EgressStatus.EGRESS_ABORTED: "aborted", + } + + recording_status = status_mapping.get(egress_status) + if recording_status: + try: + utils.update_room_metadata( + room_name, {"recording_status": recording_status} + ) + except utils.MetadataUpdateException as e: + logger.exception("Failed to update room's metadata: %s", e) + @staticmethod def handle_limit_reached(recording): """Stop recording and notify participants when limit is reached.""" diff --git a/src/backend/core/recording/worker/mediator.py b/src/backend/core/recording/worker/mediator.py index 69728334..c73a96fe 100644 --- a/src/backend/core/recording/worker/mediator.py +++ b/src/backend/core/recording/worker/mediator.py @@ -105,10 +105,4 @@ class WorkerServiceMediator: finally: recording.save() - try: - room_name = str(recording.room.id) - utils.update_room_metadata(room_name, {"recording_status": "saving"}) - except utils.MetadataUpdateException as e: - logger.exception("Failed to update room's metadata: %s", e) - logger.info("Worker stopped for room %s", recording.room) diff --git a/src/backend/core/services/livekit_events.py b/src/backend/core/services/livekit_events.py index a526e07f..db7bf25a 100644 --- a/src/backend/core/services/livekit_events.py +++ b/src/backend/core/services/livekit_events.py @@ -138,23 +138,19 @@ class LiveKitEventsService: # pylint: disable=not-callable handler(data) - def _handle_egress_started(self, data): - """Handle 'egress_started' event.""" + def _handle_egress_updated(self, data): + """Handle 'egress_updated' event.""" + egress_id = data.egress_info.egress_id try: - recording = models.Recording.objects.get( - worker_id=data.egress_info.egress_id - ) + recording = models.Recording.objects.get(worker_id=egress_id) except models.Recording.DoesNotExist as err: raise ActionFailedError( - f"Recording with worker ID {data.egress_info.egress_id} does not exist" + f"Recording with worker ID {egress_id} does not exist" ) from err - try: - room_name = str(recording.room.id) - utils.update_room_metadata(room_name, {"recording_status": "started"}) - except utils.MetadataUpdateException as e: - logger.exception("Failed to update room's metadata: %s", e) + egress_status = data.egress_info.status + self.recording_events.handle_update(recording, egress_status) def _handle_egress_ended(self, data): """Handle 'egress_ended' event.""" diff --git a/src/backend/core/tests/recording/worker/test_mediator.py b/src/backend/core/tests/recording/worker/test_mediator.py index 5ca32b11..0b3fd4c3 100644 --- a/src/backend/core/tests/recording/worker/test_mediator.py +++ b/src/backend/core/tests/recording/worker/test_mediator.py @@ -122,10 +122,7 @@ def test_mediator_start_recording_from_forbidden_status( mock_update_room_metadata.assert_not_called() -@mock.patch("core.utils.update_room_metadata") -def test_mediator_stop_recording_success( - mock_update_room_metadata, mediator, mock_worker_service -): +def test_mediator_stop_recording_success(mediator, mock_worker_service): """Test successful recording stop""" # Setup mock_recording = RecordingFactory( @@ -143,15 +140,8 @@ def test_mediator_stop_recording_success( mock_recording.refresh_from_db() assert mock_recording.status == RecordingStatusChoices.STOPPED - mock_update_room_metadata.assert_called_once_with( - str(mock_recording.room.id), {"recording_status": "saving"} - ) - -@mock.patch("core.utils.update_room_metadata") -def test_mediator_stop_recording_aborted( - mock_update_room_metadata, mediator, mock_worker_service -): +def test_mediator_stop_recording_aborted(mediator, mock_worker_service): """Test recording stop when worker returns ABORTED""" # Setup mock_recording = RecordingFactory( @@ -166,15 +156,10 @@ def test_mediator_stop_recording_aborted( mock_recording.refresh_from_db() assert mock_recording.status == RecordingStatusChoices.ABORTED - mock_update_room_metadata.assert_called_once_with( - str(mock_recording.room.id), {"recording_status": "saving"} - ) - @pytest.mark.parametrize("error_class", [WorkerConnectionError, WorkerResponseError]) -@mock.patch("core.utils.update_room_metadata") def test_mediator_stop_recording_worker_errors( - mock_update_room_metadata, mediator, mock_worker_service, error_class + mediator, mock_worker_service, error_class ): """Test handling of worker errors during stop""" # Setup @@ -190,5 +175,3 @@ def test_mediator_stop_recording_worker_errors( # Verify recording updates mock_recording.refresh_from_db() assert mock_recording.status == RecordingStatusChoices.FAILED_TO_STOP - - mock_update_room_metadata.assert_not_called() diff --git a/src/backend/core/tests/services/test_livekit_events.py b/src/backend/core/tests/services/test_livekit_events.py index aaac8c6f..54455c47 100644 --- a/src/backend/core/tests/services/test_livekit_events.py +++ b/src/backend/core/tests/services/test_livekit_events.py @@ -94,22 +94,55 @@ def test_handle_egress_ended_success( assert recording.status == "stopped" +@pytest.mark.parametrize( + ("egress_status", "status"), + ( + (EgressStatus.EGRESS_ACTIVE, "started"), + (EgressStatus.EGRESS_ENDING, "saving"), + (EgressStatus.EGRESS_ABORTED, "aborted"), + ), +) @mock.patch("core.utils.update_room_metadata") -def test_handle_egress_started_success(mock_update_room_metadata, service): - """Should successfully start recording and update room's metadata.""" +def test_handle_egress_updated_success( + mock_update_room_metadata, egress_status, status, service +): + """Should successfully update room's metadata.""" recording = RecordingFactory(worker_id="worker-1", status="initiated") mock_data = mock.MagicMock() mock_data.egress_info.egress_id = recording.worker_id - mock_data.egress_info.status = EgressStatus.EGRESS_ACTIVE + mock_data.egress_info.status = egress_status - service._handle_egress_started(mock_data) + service._handle_egress_updated(mock_data) mock_update_room_metadata.assert_called_once_with( - str(recording.room.id), {"recording_status": "started"} + str(recording.room.id), {"recording_status": status} ) +@pytest.mark.parametrize( + "egress_status", + ( + EgressStatus.EGRESS_FAILED, + EgressStatus.EGRESS_LIMIT_REACHED, + ), +) +@mock.patch("core.utils.update_room_metadata") +def test_handle_egress_updated_non_handled( + mock_update_room_metadata, egress_status, service +): + """Should ignore certain egress status and don't trigger metadata updates.""" + + recording = RecordingFactory(worker_id="worker-1", status="initiated") + mock_data = mock.MagicMock() + mock_data.egress_info.egress_id = recording.worker_id + mock_data.egress_info.status = egress_status + + service._handle_egress_updated(mock_data) + + mock_update_room_metadata.assert_not_called() + + @pytest.mark.parametrize( ("mode", "notification_type"), (