diff --git a/src/backend/core/services/livekit_events_service.py b/src/backend/core/services/livekit_events_service.py new file mode 100644 index 00000000..d40a42ed --- /dev/null +++ b/src/backend/core/services/livekit_events_service.py @@ -0,0 +1,117 @@ +"""LiveKit Events Service""" + +import uuid +from enum import Enum + +from django.conf import settings + +from livekit import api + +from .lobby_service import LobbyService + + +class LiveKitWebhookError(Exception): + """Base exception for LiveKit webhook processing errors.""" + + status_code = 500 + + +class AuthenticationError(LiveKitWebhookError): + """Authentication failed.""" + + status_code = 401 + + +class InvalidPayloadError(LiveKitWebhookError): + """Invalid webhook payload.""" + + status_code = 400 + + +class UnsupportedEventTypeError(LiveKitWebhookError): + """Unsupported event type.""" + + status_code = 422 + + +class ActionFailedError(LiveKitWebhookError): + """Webhook action fails to process or complete.""" + + status_code = 500 + + +class LiveKitWebhookEventType(Enum): + """LiveKit webhook event types.""" + + # Room events + ROOM_STARTED = "room_started" + ROOM_FINISHED = "room_finished" + + # Participant events + PARTICIPANT_JOINED = "participant_joined" + PARTICIPANT_LEFT = "participant_left" + + # Track events + TRACK_PUBLISHED = "track_published" + TRACK_UNPUBLISHED = "track_unpublished" + + # Egress events + EGRESS_STARTED = "egress_started" + EGRESS_UPDATED = "egress_updated" + EGRESS_ENDED = "egress_ended" + + # Ingress events + INGRESS_STARTED = "ingress_started" + INGRESS_ENDED = "ingress_ended" + + +class LiveKitEventsService: + """Service for processing and handling LiveKit webhook events and notifications.""" + + def __init__(self): + """Initialize with required services.""" + + token_verifier = api.TokenVerifier( + settings.LIVEKIT_CONFIGURATION["api_key"], + settings.LIVEKIT_CONFIGURATION["api_secret"], + ) + self.webhook_receiver = api.WebhookReceiver(token_verifier) + self.lobby_service = LobbyService() + + def receive(self, request): + """Process webhook and route to appropriate handler.""" + + auth_token = request.headers.get("Authorization") + if not auth_token: + raise AuthenticationError("Authorization header missing") + + try: + data = self.webhook_receiver.receive( + request.body.decode("utf-8"), auth_token + ) + except Exception as e: + raise InvalidPayloadError("Invalid webhook payload") from e + + try: + webhook_type = LiveKitWebhookEventType(data.event) + except ValueError as e: + raise UnsupportedEventTypeError( + f"Unknown webhook type: {data.event}" + ) from e + + handler_name = f"_handle_{webhook_type.value}" + handler = getattr(self, handler_name, None) + + if not handler or not callable(handler): + return + + # pylint: disable=not-callable + handler(data) + + def _handle_room_finished(self, data): + """Handle 'room_finished' event.""" + try: + room_id = uuid.UUID(data.room.name) + self.lobby_service.clear_room_cache(room_id) + except Exception as e: + raise ActionFailedError("Failed to process room finished event") from e diff --git a/src/backend/core/tests/services/test_livekit_events_service.py b/src/backend/core/tests/services/test_livekit_events_service.py new file mode 100644 index 00000000..c55762b1 --- /dev/null +++ b/src/backend/core/tests/services/test_livekit_events_service.py @@ -0,0 +1,132 @@ +""" +Test LiveKitEvents service. +""" +# pylint: disable=W0621,W0613, W0212 + +import uuid +from unittest import mock + +import pytest + +from core.services.livekit_events_service import ( + ActionFailedError, + AuthenticationError, + InvalidPayloadError, + LiveKitEventsService, + UnsupportedEventTypeError, + api, +) +from core.services.lobby_service import LobbyService + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def mock_livekit_config(settings): + """Mock LiveKit configuration.""" + settings.LIVEKIT_CONFIGURATION = { + "api_key": "test_api_key", + "api_secret": "test_api_secret", + "url": "https://test-livekit.example.com/", + } + return settings.LIVEKIT_CONFIGURATION + + +@pytest.fixture +def service(mock_livekit_config): + """Initialize LiveKitEventsService.""" + return LiveKitEventsService() + + +@mock.patch("livekit.api.TokenVerifier") +@mock.patch("livekit.api.WebhookReceiver") +def test_initialization( + mock_webhook_receiver, mock_token_verifier, mock_livekit_config +): + """Should correctly initialize the service with required dependencies.""" + + api_key = mock_livekit_config["api_key"] + api_secret = mock_livekit_config["api_secret"] + + service = LiveKitEventsService() + + mock_token_verifier.assert_called_once_with(api_key, api_secret) + mock_webhook_receiver.assert_called_once_with(mock_token_verifier.return_value) + assert isinstance(service.lobby_service, LobbyService) + + +@mock.patch.object(LobbyService, "clear_room_cache") +def test_handle_room_finished(mock_clear_cache, service): + """Should clear lobby cache when room is finished.""" + + mock_room_name = uuid.uuid4() + + mock_data = mock.MagicMock() + mock_data.room.name = str(mock_room_name) + + service._handle_room_finished(mock_data) + + mock_clear_cache.assert_called_once_with(mock_room_name) + + +@mock.patch.object( + LobbyService, "clear_room_cache", side_effect=Exception("Test error") +) +def test_handle_room_finished_error(mock_clear_cache, service): + """Should raise ActionFailedError when processing fails.""" + mock_data = mock.MagicMock() + mock_data.room.name = "00000000-0000-0000-0000-000000000000" + with pytest.raises( + ActionFailedError, match="Failed to process room finished event" + ): + service._handle_room_finished(mock_data) + + +def test_handle_room_finished_invalid_room_name(service): + """Should raise ActionFailedError when processing fails.""" + mock_data = mock.MagicMock() + mock_data.room.name = "invalid" + with pytest.raises( + ActionFailedError, match="Failed to process room finished event" + ): + service._handle_room_finished(mock_data) + + +@mock.patch.object( + api.WebhookReceiver, "receive", side_effect=Exception("Invalid payload") +) +def test_receive_invalid_payload(mock_receive, service): + """Should raise InvalidPayloadError for invalid payloads.""" + mock_request = mock.MagicMock() + mock_request.headers = {"Authorization": "test_token"} + mock_request.body = b"{}" + + with pytest.raises(InvalidPayloadError, match="Invalid webhook payload"): + service.receive(mock_request) + + +def test_receive_missing_auth(service): + """Should raise AuthenticationError when auth header is missing.""" + mock_request = mock.MagicMock() + mock_request.headers = {} + + with pytest.raises(AuthenticationError, match="Authorization header missing"): + service.receive(mock_request) + + +@mock.patch.object(api.WebhookReceiver, "receive") +def test_receive_unsupported_event(mock_receive, service): + """Should raise LiveKitWebhookError for unsupported events.""" + mock_request = mock.MagicMock() + mock_request.headers = {"Authorization": "test_token"} + mock_request.body = b"{}" + + # Mock returned data with unsupported event type + mock_data = mock.MagicMock() + mock_data.event = "unsupported_event" + mock_receive.return_value = mock_data + + with pytest.raises( + UnsupportedEventTypeError, match="Unknown webhook type: unsupported_event" + ): + service.receive(mock_request)