From d2f79d4524f35f992e84af055e58e056a40d63cc Mon Sep 17 00:00:00 2001 From: lebaudantoine Date: Thu, 6 Mar 2025 01:59:23 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8(backend)=20introduce=20LiveKit=20even?= =?UTF-8?q?t-handler=20matching=20service?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create new service that matches received events with their appropriate handlers. Provides centralized system for event routing and processing across the application. If an event has no handler, it would be ignored. --- .../core/services/livekit_events_service.py | 117 ++++++++++++++++ .../services/test_livekit_events_service.py | 132 ++++++++++++++++++ 2 files changed, 249 insertions(+) create mode 100644 src/backend/core/services/livekit_events_service.py create mode 100644 src/backend/core/tests/services/test_livekit_events_service.py diff --git a/src/backend/core/services/livekit_events_service.py b/src/backend/core/services/livekit_events_service.py new file mode 100644 index 00000000..d40a42ed --- /dev/null +++ b/src/backend/core/services/livekit_events_service.py @@ -0,0 +1,117 @@ +"""LiveKit Events Service""" + +import uuid +from enum import Enum + +from django.conf import settings + +from livekit import api + +from .lobby_service import LobbyService + + +class LiveKitWebhookError(Exception): + """Base exception for LiveKit webhook processing errors.""" + + status_code = 500 + + +class AuthenticationError(LiveKitWebhookError): + """Authentication failed.""" + + status_code = 401 + + +class InvalidPayloadError(LiveKitWebhookError): + """Invalid webhook payload.""" + + status_code = 400 + + +class UnsupportedEventTypeError(LiveKitWebhookError): + """Unsupported event type.""" + + status_code = 422 + + +class ActionFailedError(LiveKitWebhookError): + """Webhook action fails to process or complete.""" + + status_code = 500 + + +class LiveKitWebhookEventType(Enum): + """LiveKit webhook event types.""" + + # Room events + ROOM_STARTED = "room_started" + ROOM_FINISHED = "room_finished" + + # Participant events + PARTICIPANT_JOINED = "participant_joined" + PARTICIPANT_LEFT = "participant_left" + + # Track events + TRACK_PUBLISHED = "track_published" + TRACK_UNPUBLISHED = "track_unpublished" + + # Egress events + EGRESS_STARTED = "egress_started" + EGRESS_UPDATED = "egress_updated" + EGRESS_ENDED = "egress_ended" + + # Ingress events + INGRESS_STARTED = "ingress_started" + INGRESS_ENDED = "ingress_ended" + + +class LiveKitEventsService: + """Service for processing and handling LiveKit webhook events and notifications.""" + + def __init__(self): + """Initialize with required services.""" + + token_verifier = api.TokenVerifier( + settings.LIVEKIT_CONFIGURATION["api_key"], + settings.LIVEKIT_CONFIGURATION["api_secret"], + ) + self.webhook_receiver = api.WebhookReceiver(token_verifier) + self.lobby_service = LobbyService() + + def receive(self, request): + """Process webhook and route to appropriate handler.""" + + auth_token = request.headers.get("Authorization") + if not auth_token: + raise AuthenticationError("Authorization header missing") + + try: + data = self.webhook_receiver.receive( + request.body.decode("utf-8"), auth_token + ) + except Exception as e: + raise InvalidPayloadError("Invalid webhook payload") from e + + try: + webhook_type = LiveKitWebhookEventType(data.event) + except ValueError as e: + raise UnsupportedEventTypeError( + f"Unknown webhook type: {data.event}" + ) from e + + handler_name = f"_handle_{webhook_type.value}" + handler = getattr(self, handler_name, None) + + if not handler or not callable(handler): + return + + # pylint: disable=not-callable + handler(data) + + def _handle_room_finished(self, data): + """Handle 'room_finished' event.""" + try: + room_id = uuid.UUID(data.room.name) + self.lobby_service.clear_room_cache(room_id) + except Exception as e: + raise ActionFailedError("Failed to process room finished event") from e diff --git a/src/backend/core/tests/services/test_livekit_events_service.py b/src/backend/core/tests/services/test_livekit_events_service.py new file mode 100644 index 00000000..c55762b1 --- /dev/null +++ b/src/backend/core/tests/services/test_livekit_events_service.py @@ -0,0 +1,132 @@ +""" +Test LiveKitEvents service. +""" +# pylint: disable=W0621,W0613, W0212 + +import uuid +from unittest import mock + +import pytest + +from core.services.livekit_events_service import ( + ActionFailedError, + AuthenticationError, + InvalidPayloadError, + LiveKitEventsService, + UnsupportedEventTypeError, + api, +) +from core.services.lobby_service import LobbyService + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def mock_livekit_config(settings): + """Mock LiveKit configuration.""" + settings.LIVEKIT_CONFIGURATION = { + "api_key": "test_api_key", + "api_secret": "test_api_secret", + "url": "https://test-livekit.example.com/", + } + return settings.LIVEKIT_CONFIGURATION + + +@pytest.fixture +def service(mock_livekit_config): + """Initialize LiveKitEventsService.""" + return LiveKitEventsService() + + +@mock.patch("livekit.api.TokenVerifier") +@mock.patch("livekit.api.WebhookReceiver") +def test_initialization( + mock_webhook_receiver, mock_token_verifier, mock_livekit_config +): + """Should correctly initialize the service with required dependencies.""" + + api_key = mock_livekit_config["api_key"] + api_secret = mock_livekit_config["api_secret"] + + service = LiveKitEventsService() + + mock_token_verifier.assert_called_once_with(api_key, api_secret) + mock_webhook_receiver.assert_called_once_with(mock_token_verifier.return_value) + assert isinstance(service.lobby_service, LobbyService) + + +@mock.patch.object(LobbyService, "clear_room_cache") +def test_handle_room_finished(mock_clear_cache, service): + """Should clear lobby cache when room is finished.""" + + mock_room_name = uuid.uuid4() + + mock_data = mock.MagicMock() + mock_data.room.name = str(mock_room_name) + + service._handle_room_finished(mock_data) + + mock_clear_cache.assert_called_once_with(mock_room_name) + + +@mock.patch.object( + LobbyService, "clear_room_cache", side_effect=Exception("Test error") +) +def test_handle_room_finished_error(mock_clear_cache, service): + """Should raise ActionFailedError when processing fails.""" + mock_data = mock.MagicMock() + mock_data.room.name = "00000000-0000-0000-0000-000000000000" + with pytest.raises( + ActionFailedError, match="Failed to process room finished event" + ): + service._handle_room_finished(mock_data) + + +def test_handle_room_finished_invalid_room_name(service): + """Should raise ActionFailedError when processing fails.""" + mock_data = mock.MagicMock() + mock_data.room.name = "invalid" + with pytest.raises( + ActionFailedError, match="Failed to process room finished event" + ): + service._handle_room_finished(mock_data) + + +@mock.patch.object( + api.WebhookReceiver, "receive", side_effect=Exception("Invalid payload") +) +def test_receive_invalid_payload(mock_receive, service): + """Should raise InvalidPayloadError for invalid payloads.""" + mock_request = mock.MagicMock() + mock_request.headers = {"Authorization": "test_token"} + mock_request.body = b"{}" + + with pytest.raises(InvalidPayloadError, match="Invalid webhook payload"): + service.receive(mock_request) + + +def test_receive_missing_auth(service): + """Should raise AuthenticationError when auth header is missing.""" + mock_request = mock.MagicMock() + mock_request.headers = {} + + with pytest.raises(AuthenticationError, match="Authorization header missing"): + service.receive(mock_request) + + +@mock.patch.object(api.WebhookReceiver, "receive") +def test_receive_unsupported_event(mock_receive, service): + """Should raise LiveKitWebhookError for unsupported events.""" + mock_request = mock.MagicMock() + mock_request.headers = {"Authorization": "test_token"} + mock_request.body = b"{}" + + # Mock returned data with unsupported event type + mock_data = mock.MagicMock() + mock_data.event = "unsupported_event" + mock_receive.return_value = mock_data + + with pytest.raises( + UnsupportedEventTypeError, match="Unknown webhook type: unsupported_event" + ): + service.receive(mock_request)