(backend) introduce LiveKit event-handler matching service

Create new service that matches received events with their appropriate
handlers. Provides centralized system for event routing and processing
across the application.

If an event has no handler, it would be ignored.
This commit is contained in:
lebaudantoine
2025-03-06 01:59:23 +01:00
committed by aleb_the_flash
parent 2168643fd4
commit d2f79d4524
2 changed files with 249 additions and 0 deletions

View File

@@ -0,0 +1,117 @@
"""LiveKit Events Service"""
import uuid
from enum import Enum
from django.conf import settings
from livekit import api
from .lobby_service import LobbyService
class LiveKitWebhookError(Exception):
"""Base exception for LiveKit webhook processing errors."""
status_code = 500
class AuthenticationError(LiveKitWebhookError):
"""Authentication failed."""
status_code = 401
class InvalidPayloadError(LiveKitWebhookError):
"""Invalid webhook payload."""
status_code = 400
class UnsupportedEventTypeError(LiveKitWebhookError):
"""Unsupported event type."""
status_code = 422
class ActionFailedError(LiveKitWebhookError):
"""Webhook action fails to process or complete."""
status_code = 500
class LiveKitWebhookEventType(Enum):
"""LiveKit webhook event types."""
# Room events
ROOM_STARTED = "room_started"
ROOM_FINISHED = "room_finished"
# Participant events
PARTICIPANT_JOINED = "participant_joined"
PARTICIPANT_LEFT = "participant_left"
# Track events
TRACK_PUBLISHED = "track_published"
TRACK_UNPUBLISHED = "track_unpublished"
# Egress events
EGRESS_STARTED = "egress_started"
EGRESS_UPDATED = "egress_updated"
EGRESS_ENDED = "egress_ended"
# Ingress events
INGRESS_STARTED = "ingress_started"
INGRESS_ENDED = "ingress_ended"
class LiveKitEventsService:
"""Service for processing and handling LiveKit webhook events and notifications."""
def __init__(self):
"""Initialize with required services."""
token_verifier = api.TokenVerifier(
settings.LIVEKIT_CONFIGURATION["api_key"],
settings.LIVEKIT_CONFIGURATION["api_secret"],
)
self.webhook_receiver = api.WebhookReceiver(token_verifier)
self.lobby_service = LobbyService()
def receive(self, request):
"""Process webhook and route to appropriate handler."""
auth_token = request.headers.get("Authorization")
if not auth_token:
raise AuthenticationError("Authorization header missing")
try:
data = self.webhook_receiver.receive(
request.body.decode("utf-8"), auth_token
)
except Exception as e:
raise InvalidPayloadError("Invalid webhook payload") from e
try:
webhook_type = LiveKitWebhookEventType(data.event)
except ValueError as e:
raise UnsupportedEventTypeError(
f"Unknown webhook type: {data.event}"
) from e
handler_name = f"_handle_{webhook_type.value}"
handler = getattr(self, handler_name, None)
if not handler or not callable(handler):
return
# pylint: disable=not-callable
handler(data)
def _handle_room_finished(self, data):
"""Handle 'room_finished' event."""
try:
room_id = uuid.UUID(data.room.name)
self.lobby_service.clear_room_cache(room_id)
except Exception as e:
raise ActionFailedError("Failed to process room finished event") from e

View File

@@ -0,0 +1,132 @@
"""
Test LiveKitEvents service.
"""
# pylint: disable=W0621,W0613, W0212
import uuid
from unittest import mock
import pytest
from core.services.livekit_events_service import (
ActionFailedError,
AuthenticationError,
InvalidPayloadError,
LiveKitEventsService,
UnsupportedEventTypeError,
api,
)
from core.services.lobby_service import LobbyService
pytestmark = pytest.mark.django_db
@pytest.fixture
def mock_livekit_config(settings):
"""Mock LiveKit configuration."""
settings.LIVEKIT_CONFIGURATION = {
"api_key": "test_api_key",
"api_secret": "test_api_secret",
"url": "https://test-livekit.example.com/",
}
return settings.LIVEKIT_CONFIGURATION
@pytest.fixture
def service(mock_livekit_config):
"""Initialize LiveKitEventsService."""
return LiveKitEventsService()
@mock.patch("livekit.api.TokenVerifier")
@mock.patch("livekit.api.WebhookReceiver")
def test_initialization(
mock_webhook_receiver, mock_token_verifier, mock_livekit_config
):
"""Should correctly initialize the service with required dependencies."""
api_key = mock_livekit_config["api_key"]
api_secret = mock_livekit_config["api_secret"]
service = LiveKitEventsService()
mock_token_verifier.assert_called_once_with(api_key, api_secret)
mock_webhook_receiver.assert_called_once_with(mock_token_verifier.return_value)
assert isinstance(service.lobby_service, LobbyService)
@mock.patch.object(LobbyService, "clear_room_cache")
def test_handle_room_finished(mock_clear_cache, service):
"""Should clear lobby cache when room is finished."""
mock_room_name = uuid.uuid4()
mock_data = mock.MagicMock()
mock_data.room.name = str(mock_room_name)
service._handle_room_finished(mock_data)
mock_clear_cache.assert_called_once_with(mock_room_name)
@mock.patch.object(
LobbyService, "clear_room_cache", side_effect=Exception("Test error")
)
def test_handle_room_finished_error(mock_clear_cache, service):
"""Should raise ActionFailedError when processing fails."""
mock_data = mock.MagicMock()
mock_data.room.name = "00000000-0000-0000-0000-000000000000"
with pytest.raises(
ActionFailedError, match="Failed to process room finished event"
):
service._handle_room_finished(mock_data)
def test_handle_room_finished_invalid_room_name(service):
"""Should raise ActionFailedError when processing fails."""
mock_data = mock.MagicMock()
mock_data.room.name = "invalid"
with pytest.raises(
ActionFailedError, match="Failed to process room finished event"
):
service._handle_room_finished(mock_data)
@mock.patch.object(
api.WebhookReceiver, "receive", side_effect=Exception("Invalid payload")
)
def test_receive_invalid_payload(mock_receive, service):
"""Should raise InvalidPayloadError for invalid payloads."""
mock_request = mock.MagicMock()
mock_request.headers = {"Authorization": "test_token"}
mock_request.body = b"{}"
with pytest.raises(InvalidPayloadError, match="Invalid webhook payload"):
service.receive(mock_request)
def test_receive_missing_auth(service):
"""Should raise AuthenticationError when auth header is missing."""
mock_request = mock.MagicMock()
mock_request.headers = {}
with pytest.raises(AuthenticationError, match="Authorization header missing"):
service.receive(mock_request)
@mock.patch.object(api.WebhookReceiver, "receive")
def test_receive_unsupported_event(mock_receive, service):
"""Should raise LiveKitWebhookError for unsupported events."""
mock_request = mock.MagicMock()
mock_request.headers = {"Authorization": "test_token"}
mock_request.body = b"{}"
# Mock returned data with unsupported event type
mock_data = mock.MagicMock()
mock_data.event = "unsupported_event"
mock_receive.return_value = mock_data
with pytest.raises(
UnsupportedEventTypeError, match="Unknown webhook type: unsupported_event"
):
service.receive(mock_request)