(backend) add dedicated service for LiveKit recording webhook events

Create new service to handle recording-related webhooks, starting with
limit reached events. Will expand to enhance UX by notifying backend
of other LiveKit events.

Doesn't fit cleanly with existing recording package - may need broader
redesign. Chose dedicated service over mixing responsibilities.
This commit is contained in:
lebaudantoine
2025-07-15 17:20:52 +02:00
committed by aleb_the_flash
parent 17c486f7bf
commit f0a17b1ce1
4 changed files with 121 additions and 0 deletions

View File

@@ -0,0 +1,49 @@
"""Recording-related LiveKit Events Service"""
from logging import getLogger
from core import models, utils
logger = getLogger(__name__)
class RecordingEventsError(Exception):
"""Recording event handling fails."""
class RecordingEventsService:
"""Handles recording-related Livekit webhook events."""
@staticmethod
def handle_limit_reached(recording):
"""Stop recording and notify participants when limit is reached."""
recording.status = models.RecordingStatusChoices.STOPPED
recording.save()
notification_mapping = {
models.RecordingModeChoices.SCREEN_RECORDING: "screenRecordingLimitReached",
models.RecordingModeChoices.TRANSCRIPT: "transcriptionLimitReached",
}
notification_type = notification_mapping.get(recording.mode)
if not notification_type:
return
try:
utils.notify_participants(
room_name=str(recording.room.id),
notification_data={"type": notification_type},
)
except utils.NotificationError as e:
logger.exception(
"Failed to notify participants about recording limit reached: "
"room=%s, recording_id=%s, mode=%s",
recording.room.id,
recording.id,
recording.mode,
)
raise RecordingEventsError(
f"Failed to notify participants in room '{recording.room.id}' about "
f"recording limit reached (recording_id={recording.id})"
) from e

View File

@@ -0,0 +1,72 @@
"""
Test RecordingEventsService service.
"""
# pylint: disable=W0621
from unittest import mock
import pytest
from core.factories import RecordingFactory
from core.recording.services.recording_events import (
RecordingEventsError,
RecordingEventsService,
)
from core.utils import NotificationError
pytestmark = pytest.mark.django_db
@pytest.fixture
def service():
"""Initialize RecordingEventsService."""
return RecordingEventsService()
@pytest.mark.parametrize(
("mode", "notification_type"),
(
("screen_recording", "screenRecordingLimitReached"),
("transcript", "transcriptionLimitReached"),
),
)
@mock.patch("core.utils.notify_participants")
def test_handle_limit_reached_success(mock_notify, mode, notification_type, service):
"""Test handle_limit_reached stops recording and notifies participants."""
recording = RecordingFactory(status="active", mode=mode)
service.handle_limit_reached(recording)
assert recording.status == "stopped"
mock_notify.assert_called_once_with(
room_name=str(recording.room.id), notification_data={"type": notification_type}
)
@pytest.mark.parametrize(
("mode", "notification_type"),
(
("screen_recording", "screenRecordingLimitReached"),
("transcript", "transcriptionLimitReached"),
),
)
@mock.patch("core.utils.notify_participants")
def test_handle_limit_reached_error(mock_notify, mode, notification_type, service):
"""Test handle_limit_reached raises RecordingEventsError when notification fails."""
mock_notify.side_effect = NotificationError("Error notifying")
recording = RecordingFactory(status="active", mode=mode)
with pytest.raises(
RecordingEventsError,
match=r"Failed to notify participants in room '.+' "
r"about recording limit reached \(recording_id=.+\)",
):
service.handle_limit_reached(recording)
assert recording.status == "stopped"
mock_notify.assert_called_once_with(
room_name=str(recording.room.id), notification_data={"type": notification_type}
)