(backend) add egress limit notification handler to LiveKit service

Implement method to process egress limit reached events from LiveKit
webhooks for better recording duration management.

Livekit by default is not notifying the participant of a room when
an egress reached its limit. I needed to proxy it through the back.
This commit is contained in:
lebaudantoine
2025-07-15 20:01:02 +02:00
committed by aleb_the_flash
parent f0a17b1ce1
commit 59cd1f766a
2 changed files with 136 additions and 2 deletions

View File

@@ -1,5 +1,7 @@
"""LiveKit Events Service"""
# pylint: disable=E1101
import uuid
from enum import Enum
from logging import getLogger
@@ -9,6 +11,10 @@ from django.conf import settings
from livekit import api
from core import models
from core.recording.services.recording_events import (
RecordingEventsError,
RecordingEventsService,
)
from .lobby import LobbyService
from .telephony import TelephonyException, TelephonyService
@@ -84,6 +90,7 @@ class LiveKitEventsService:
self.webhook_receiver = api.WebhookReceiver(token_verifier)
self.lobby_service = LobbyService()
self.telephony_service = TelephonyService()
self.recording_events = RecordingEventsService()
def receive(self, request):
"""Process webhook and route to appropriate handler."""
@@ -115,6 +122,29 @@ class LiveKitEventsService:
# pylint: disable=not-callable
handler(data)
def _handle_egress_ended(self, data):
"""Handle 'egress_ended' event."""
try:
recording = models.Recording.objects.get(
worker_id=data.egress_info.egress_id
)
except models.Recording.DoesNotExist as err:
raise ActionFailedError(
f"Recording with worker ID {data.egress_info.egress_id} does not exist"
) from err
if (
data.egress_info.status == api.EgressStatus.EGRESS_LIMIT_REACHED
and recording.status == models.RecordingStatusChoices.ACTIVE
):
try:
self.recording_events.handle_limit_reached(recording)
except RecordingEventsError as e:
raise ActionFailedError(
f"Failed to process limit reached event for recording {recording}"
) from e
def _handle_room_started(self, data):
"""Handle 'room_started' event."""

View File

@@ -1,14 +1,16 @@
"""
Test LiveKitEvents service.
"""
# pylint: disable=W0621,W0613, W0212
# pylint: disable=W0621,W0613, W0212, E0611
import uuid
from unittest import mock
import pytest
from livekit.api import EgressStatus
from core.factories import RoomFactory
from core.factories import RecordingFactory, RoomFactory
from core.recording.services.recording_events import RecordingEventsService
from core.services.livekit_events import (
ActionFailedError,
AuthenticationError,
@@ -19,6 +21,7 @@ from core.services.livekit_events import (
)
from core.services.lobby import LobbyService
from core.services.telephony import TelephonyException, TelephonyService
from core.utils import NotificationError
pytestmark = pytest.mark.django_db
@@ -55,6 +58,107 @@ def test_initialization(
mock_token_verifier.assert_called_once_with(api_key, api_secret)
mock_webhook_receiver.assert_called_once_with(mock_token_verifier.return_value)
assert isinstance(service.lobby_service, LobbyService)
assert isinstance(service.telephony_service, TelephonyService)
assert isinstance(service.recording_events, RecordingEventsService)
@pytest.mark.parametrize(
("mode", "notification_type"),
(
("screen_recording", "screenRecordingLimitReached"),
("transcript", "transcriptionLimitReached"),
),
)
@mock.patch("core.utils.notify_participants")
def test_handle_egress_ended_success(mock_notify, mode, notification_type, service):
"""Should successfully stop recording and notifies all participant."""
recording = RecordingFactory(worker_id="worker-1", mode=mode, status="active")
mock_data = mock.MagicMock()
mock_data.egress_info.egress_id = recording.worker_id
mock_data.egress_info.status = EgressStatus.EGRESS_LIMIT_REACHED
service._handle_egress_ended(mock_data)
mock_notify.assert_called_once_with(
room_name=str(recording.room.id), notification_data={"type": notification_type}
)
recording.refresh_from_db()
assert recording.status == "stopped"
@mock.patch("core.utils.notify_participants")
def test_handle_egress_ended_notification_fails(mock_notify, service):
"""Should raise ActionFailedError when notification fails but still stop recording."""
recording = RecordingFactory(worker_id="worker-1", status="active")
mock_data = mock.MagicMock()
mock_data.egress_info.egress_id = recording.worker_id
mock_data.egress_info.status = EgressStatus.EGRESS_LIMIT_REACHED
mock_notify.side_effect = NotificationError("Error notifying")
with pytest.raises(
ActionFailedError,
match=r"Failed to process limit reached event for recording .+",
):
service._handle_egress_ended(mock_data)
recording.refresh_from_db()
assert recording.status == "stopped"
@mock.patch("core.utils.notify_participants")
def test_handle_egress_ended_recording_not_found(mock_notify, service):
"""Should raise ActionFailedError when recording doesn't exist."""
recording = RecordingFactory(worker_id="worker-1", status="active")
mock_data = mock.MagicMock()
mock_data.egress_info.egress_id = "worker-2"
mock_data.egress_info.status = EgressStatus.EGRESS_LIMIT_REACHED
with pytest.raises(
ActionFailedError, match=r"Recording with worker ID .+ does not exist"
):
service._handle_egress_ended(mock_data)
mock_notify.assert_not_called()
recording.refresh_from_db()
assert recording.status == "active"
@mock.patch("core.utils.notify_participants")
def test_handle_egress_ended_recording_not_active(mock_notify, service):
"""Should ignore non-active recordings."""
recording = RecordingFactory(worker_id="worker-1", status="failed_to_stop")
mock_data = mock.MagicMock()
mock_data.egress_info.egress_id = "worker-1"
mock_data.egress_info.status = EgressStatus.EGRESS_LIMIT_REACHED
service._handle_egress_ended(mock_data)
mock_notify.assert_not_called()
recording.refresh_from_db()
assert recording.status == "failed_to_stop"
@mock.patch("core.utils.notify_participants")
def test_handle_egress_ended_recording_not_limit_reached(mock_notify, service):
"""Should ignore egress non-limit-reached statuses."""
recording = RecordingFactory(worker_id="worker-1", status="stopped")
mock_data = mock.MagicMock()
mock_data.egress_info.egress_id = "worker-1"
mock_data.egress_info.status = EgressStatus.EGRESS_COMPLETE
service._handle_egress_ended(mock_data)
mock_notify.assert_not_called()
assert recording.status == "stopped"
@mock.patch.object(LobbyService, "clear_room_cache")