♻️(backend) extract livekit API client creation to reusable utility

Create dedicated utility function for livekit API client initialization.
Centralizes configuration logic including custom session handling for SSL
verification. Improves code reuse across backend components that interact
with LiveKit.
This commit is contained in:
lebaudantoine
2025-04-24 16:33:00 +02:00
committed by aleb_the_flash
parent 2ef95aa835
commit ae17fbdaa8
5 changed files with 90 additions and 33 deletions

View File

@@ -2,12 +2,10 @@
# pylint: disable=no-member
from django.conf import settings
import aiohttp
from asgiref.sync import async_to_sync
from livekit import api as livekit_api
from ... import utils
from ..enums import FileExtension
from .exceptions import WorkerConnectionError, WorkerResponseError
from .factories import WorkerServiceConfig
@@ -30,14 +28,7 @@ class BaseEgressService:
async def _handle_request(self, request, method_name: str):
"""Handle making a request to the LiveKit API and returns the response."""
custom_session = None
if not settings.LIVEKIT_VERIFY_SSL:
connector = aiohttp.TCPConnector(ssl=False)
custom_session = aiohttp.ClientSession(connector=connector)
lkapi = livekit_api.LiveKitAPI(
session=custom_session, **self._config.server_configurations
)
lkapi = utils.create_livekit_client(self._config.server_configurations)
# ruff: noqa: SLF001
# pylint: disable=protected-access

View File

@@ -14,7 +14,6 @@ from django.core.cache import cache
from asgiref.sync import async_to_sync
from livekit.api import ( # pylint: disable=E0611
ListRoomsRequest,
LiveKitAPI,
SendDataRequest,
TwirpError,
)
@@ -347,7 +346,7 @@ class LobbyService:
"type": settings.LOBBY_NOTIFICATION_TYPE,
}
lkapi = LiveKitAPI(**settings.LIVEKIT_CONFIGURATION)
lkapi = utils.create_livekit_client()
try:
room_response = await lkapi.room.list_rooms(

View File

@@ -776,9 +776,10 @@ def test_update_participant_status_success(mock_cache, lobby_service, participan
lobby_service._get_cache_key.assert_called_once_with(room.id, participant_id)
@mock.patch("core.services.lobby.LiveKitAPI")
def test_notify_participants_success_no_room(mock_livekit_api, lobby_service):
@mock.patch("core.utils.create_livekit_client")
def test_notify_participants_success_no_room(mock_create_livekit_client, lobby_service):
"""Test the notify_participants method when the LiveKit room doesn't exist yet."""
room = RoomFactory(access_level=RoomAccessLevel.RESTRICTED)
# Set up the mock LiveKitAPI and its behavior
@@ -794,15 +795,11 @@ def test_notify_participants_success_no_room(mock_livekit_api, lobby_service):
mock_api_instance.room.list_rooms = mock.AsyncMock(return_value=MockResponse())
mock_api_instance.aclose = mock.AsyncMock()
mock_livekit_api.return_value = mock_api_instance
mock_create_livekit_client.return_value = mock_api_instance
# Act
lobby_service.notify_participants(room.id)
# Assert
# Verify the API was initialized with correct configuration
mock_livekit_api.assert_called_once_with(**settings.LIVEKIT_CONFIGURATION)
# Verify that the service checked for existing rooms
mock_api_instance.room.list_rooms.assert_called_once()
@@ -813,8 +810,8 @@ def test_notify_participants_success_no_room(mock_livekit_api, lobby_service):
mock_api_instance.aclose.assert_called_once()
@mock.patch("core.services.lobby.LiveKitAPI")
def test_notify_participants_success(mock_livekit_api, lobby_service):
@mock.patch("core.utils.create_livekit_client")
def test_notify_participants_success(mock_create_livekit_client, lobby_service):
"""Test successful participant notification."""
room = RoomFactory(access_level=RoomAccessLevel.RESTRICTED)
# Set up the mock LiveKitAPI and its behavior
@@ -830,14 +827,11 @@ def test_notify_participants_success(mock_livekit_api, lobby_service):
mock_api_instance.room.list_rooms = mock.AsyncMock(return_value=MockResponse())
mock_api_instance.aclose = mock.AsyncMock()
mock_livekit_api.return_value = mock_api_instance
mock_create_livekit_client.return_value = mock_api_instance
# Call the function
lobby_service.notify_participants(room.id)
# Verify the API was called correctly
mock_livekit_api.assert_called_once_with(**settings.LIVEKIT_CONFIGURATION)
# Verify that the service checked for existing rooms
mock_api_instance.room.list_rooms.assert_called_once()
@@ -855,8 +849,8 @@ def test_notify_participants_success(mock_livekit_api, lobby_service):
mock_api_instance.aclose.assert_called_once()
@mock.patch("core.services.lobby.LiveKitAPI")
def test_notify_participants_error(mock_livekit_api, lobby_service):
@mock.patch("core.utils.create_livekit_client")
def test_notify_participants_error(mock_create_livekit_client, lobby_service):
"""Test participant notification with API error."""
room = RoomFactory(access_level=RoomAccessLevel.RESTRICTED)
# Set up the mock LiveKitAPI and its behavior
@@ -874,7 +868,7 @@ def test_notify_participants_error(mock_livekit_api, lobby_service):
mock_api_instance.room.list_rooms = mock.AsyncMock(return_value=MockResponse())
mock_api_instance.aclose = mock.AsyncMock()
mock_livekit_api.return_value = mock_api_instance
mock_create_livekit_client.return_value = mock_api_instance
# Call the function and expect an exception
with pytest.raises(
@@ -882,9 +876,6 @@ def test_notify_participants_error(mock_livekit_api, lobby_service):
):
lobby_service.notify_participants(room.id)
# Verify the API was called correctly
mock_livekit_api.assert_called_once_with(**settings.LIVEKIT_CONFIGURATION)
# Verify that the service checked for existing rooms
mock_api_instance.room.list_rooms.assert_called_once()

View File

@@ -0,0 +1,60 @@
"""
Test utils functions
"""
from unittest import mock
from core.utils import create_livekit_client
@mock.patch("asyncio.get_running_loop")
@mock.patch("core.utils.LiveKitAPI")
def test_create_livekit_client_ssl_enabled(
mock_livekit_api, mock_get_running_loop, settings
):
"""Test LiveKitAPI client creation with SSL verification enabled."""
mock_get_running_loop.return_value = mock.MagicMock()
settings.LIVEKIT_VERIFY_SSL = True
create_livekit_client()
mock_livekit_api.assert_called_once_with(
**settings.LIVEKIT_CONFIGURATION, session=None
)
@mock.patch("core.utils.aiohttp.ClientSession")
@mock.patch("asyncio.get_running_loop")
@mock.patch("core.utils.LiveKitAPI")
def test_create_livekit_client_ssl_disabled(
mock_livekit_api, mock_get_running_loop, mock_client_session, settings
):
"""Test LiveKitAPI client creation with SSL verification disabled."""
mock_get_running_loop.return_value = mock.MagicMock()
mock_session_instance = mock.MagicMock()
mock_client_session.return_value = mock_session_instance
settings.LIVEKIT_VERIFY_SSL = False
create_livekit_client()
mock_livekit_api.assert_called_once_with(
**settings.LIVEKIT_CONFIGURATION, session=mock_session_instance
)
@mock.patch("asyncio.get_running_loop")
@mock.patch("core.utils.LiveKitAPI")
def test_create_livekit_client_custom_configuration(
mock_livekit_api, mock_get_running_loop
):
"""Test LiveKitAPI client creation with custom configuration."""
mock_get_running_loop.return_value = mock.MagicMock()
custom_configuration = {
"api_key": "mock_key",
"api_secret": "mock_secret",
"url": "http://mock-url.com",
}
create_livekit_client(custom_configuration)
mock_livekit_api.assert_called_once_with(**custom_configuration, session=None)

View File

@@ -13,8 +13,9 @@ from uuid import uuid4
from django.conf import settings
from django.core.files.storage import default_storage
import aiohttp
import botocore
from livekit.api import AccessToken, VideoGrants
from livekit.api import AccessToken, LiveKitAPI, VideoGrants
def generate_color(identity: str) -> str:
@@ -142,3 +143,18 @@ def generate_s3_authorization_headers(key):
auth.add_auth(request)
return request
def create_livekit_client(custom_configuration=None):
"""Create and return a configured LiveKit API client."""
custom_session = None
if not settings.LIVEKIT_VERIFY_SSL:
connector = aiohttp.TCPConnector(ssl=False)
custom_session = aiohttp.ClientSession(connector=connector)
# Use default configuration if none provided
configuration = custom_configuration or settings.LIVEKIT_CONFIGURATION
return LiveKitAPI(session=custom_session, **configuration)