From 59cd1f766a8eeeb54e000fe76f3566112aac35ce Mon Sep 17 00:00:00 2001 From: lebaudantoine Date: Tue, 15 Jul 2025 20:01:02 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8(backend)=20add=20egress=20limit=20not?= =?UTF-8?q?ification=20handler=20to=20LiveKit=20service?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement method to process egress limit reached events from LiveKit webhooks for better recording duration management. Livekit by default is not notifying the participant of a room when an egress reached its limit. I needed to proxy it through the back. --- src/backend/core/services/livekit_events.py | 30 +++++ .../tests/services/test_livekit_events.py | 108 +++++++++++++++++- 2 files changed, 136 insertions(+), 2 deletions(-) diff --git a/src/backend/core/services/livekit_events.py b/src/backend/core/services/livekit_events.py index ece436cd..9c15b741 100644 --- a/src/backend/core/services/livekit_events.py +++ b/src/backend/core/services/livekit_events.py @@ -1,5 +1,7 @@ """LiveKit Events Service""" +# pylint: disable=E1101 + import uuid from enum import Enum from logging import getLogger @@ -9,6 +11,10 @@ from django.conf import settings from livekit import api from core import models +from core.recording.services.recording_events import ( + RecordingEventsError, + RecordingEventsService, +) from .lobby import LobbyService from .telephony import TelephonyException, TelephonyService @@ -84,6 +90,7 @@ class LiveKitEventsService: self.webhook_receiver = api.WebhookReceiver(token_verifier) self.lobby_service = LobbyService() self.telephony_service = TelephonyService() + self.recording_events = RecordingEventsService() def receive(self, request): """Process webhook and route to appropriate handler.""" @@ -115,6 +122,29 @@ class LiveKitEventsService: # pylint: disable=not-callable handler(data) + def _handle_egress_ended(self, data): + """Handle 'egress_ended' event.""" + + try: + recording = models.Recording.objects.get( + worker_id=data.egress_info.egress_id + ) + except models.Recording.DoesNotExist as err: + raise ActionFailedError( + f"Recording with worker ID {data.egress_info.egress_id} does not exist" + ) from err + + if ( + data.egress_info.status == api.EgressStatus.EGRESS_LIMIT_REACHED + and recording.status == models.RecordingStatusChoices.ACTIVE + ): + try: + self.recording_events.handle_limit_reached(recording) + except RecordingEventsError as e: + raise ActionFailedError( + f"Failed to process limit reached event for recording {recording}" + ) from e + def _handle_room_started(self, data): """Handle 'room_started' event.""" diff --git a/src/backend/core/tests/services/test_livekit_events.py b/src/backend/core/tests/services/test_livekit_events.py index 91b2dea2..24c2c82f 100644 --- a/src/backend/core/tests/services/test_livekit_events.py +++ b/src/backend/core/tests/services/test_livekit_events.py @@ -1,14 +1,16 @@ """ Test LiveKitEvents service. """ -# pylint: disable=W0621,W0613, W0212 +# pylint: disable=W0621,W0613, W0212, E0611 import uuid from unittest import mock import pytest +from livekit.api import EgressStatus -from core.factories import RoomFactory +from core.factories import RecordingFactory, RoomFactory +from core.recording.services.recording_events import RecordingEventsService from core.services.livekit_events import ( ActionFailedError, AuthenticationError, @@ -19,6 +21,7 @@ from core.services.livekit_events import ( ) from core.services.lobby import LobbyService from core.services.telephony import TelephonyException, TelephonyService +from core.utils import NotificationError pytestmark = pytest.mark.django_db @@ -55,6 +58,107 @@ def test_initialization( mock_token_verifier.assert_called_once_with(api_key, api_secret) mock_webhook_receiver.assert_called_once_with(mock_token_verifier.return_value) assert isinstance(service.lobby_service, LobbyService) + assert isinstance(service.telephony_service, TelephonyService) + assert isinstance(service.recording_events, RecordingEventsService) + + +@pytest.mark.parametrize( + ("mode", "notification_type"), + ( + ("screen_recording", "screenRecordingLimitReached"), + ("transcript", "transcriptionLimitReached"), + ), +) +@mock.patch("core.utils.notify_participants") +def test_handle_egress_ended_success(mock_notify, mode, notification_type, service): + """Should successfully stop recording and notifies all participant.""" + + recording = RecordingFactory(worker_id="worker-1", mode=mode, status="active") + mock_data = mock.MagicMock() + mock_data.egress_info.egress_id = recording.worker_id + mock_data.egress_info.status = EgressStatus.EGRESS_LIMIT_REACHED + + service._handle_egress_ended(mock_data) + + mock_notify.assert_called_once_with( + room_name=str(recording.room.id), notification_data={"type": notification_type} + ) + + recording.refresh_from_db() + assert recording.status == "stopped" + + +@mock.patch("core.utils.notify_participants") +def test_handle_egress_ended_notification_fails(mock_notify, service): + """Should raise ActionFailedError when notification fails but still stop recording.""" + + recording = RecordingFactory(worker_id="worker-1", status="active") + mock_data = mock.MagicMock() + mock_data.egress_info.egress_id = recording.worker_id + mock_data.egress_info.status = EgressStatus.EGRESS_LIMIT_REACHED + + mock_notify.side_effect = NotificationError("Error notifying") + + with pytest.raises( + ActionFailedError, + match=r"Failed to process limit reached event for recording .+", + ): + service._handle_egress_ended(mock_data) + + recording.refresh_from_db() + assert recording.status == "stopped" + + +@mock.patch("core.utils.notify_participants") +def test_handle_egress_ended_recording_not_found(mock_notify, service): + """Should raise ActionFailedError when recording doesn't exist.""" + + recording = RecordingFactory(worker_id="worker-1", status="active") + mock_data = mock.MagicMock() + mock_data.egress_info.egress_id = "worker-2" + mock_data.egress_info.status = EgressStatus.EGRESS_LIMIT_REACHED + + with pytest.raises( + ActionFailedError, match=r"Recording with worker ID .+ does not exist" + ): + service._handle_egress_ended(mock_data) + + mock_notify.assert_not_called() + + recording.refresh_from_db() + assert recording.status == "active" + + +@mock.patch("core.utils.notify_participants") +def test_handle_egress_ended_recording_not_active(mock_notify, service): + """Should ignore non-active recordings.""" + + recording = RecordingFactory(worker_id="worker-1", status="failed_to_stop") + mock_data = mock.MagicMock() + mock_data.egress_info.egress_id = "worker-1" + mock_data.egress_info.status = EgressStatus.EGRESS_LIMIT_REACHED + + service._handle_egress_ended(mock_data) + + mock_notify.assert_not_called() + + recording.refresh_from_db() + assert recording.status == "failed_to_stop" + + +@mock.patch("core.utils.notify_participants") +def test_handle_egress_ended_recording_not_limit_reached(mock_notify, service): + """Should ignore egress non-limit-reached statuses.""" + + recording = RecordingFactory(worker_id="worker-1", status="stopped") + mock_data = mock.MagicMock() + mock_data.egress_info.egress_id = "worker-1" + mock_data.egress_info.status = EgressStatus.EGRESS_COMPLETE + + service._handle_egress_ended(mock_data) + + mock_notify.assert_not_called() + assert recording.status == "stopped" @mock.patch.object(LobbyService, "clear_room_cache")