(backend) introduce general recording worker concepts

Introducing a new worker service architecture. Sorry for the long commit.
This design adheres to several key principles, primarily the  Single
Responsibility Principle. Dependency Injection and composition are
prioritized over inheritance, enhancing modularity and maintainability.

Interactions between the backend and external workers are encapsulated in
classes implementing a common `WorkerService` interface. I chose Protocol
over an abstract class for agility, aligning closely with static typing
without requiring inheritance. Each `WorkerService` implementation can
independently manage recordings according to its specific requirements.
This flexibility ensures that adding a new worker service, such as for
LiveKit, can be done without any change to existing components.

Configuration management is centralized in a single `WorkerServiceConfig`
class, which loads and provides all settings for different worker
implementations, keeping configurations organized and extensible. The worker
service class itself handles accessing relevant configurations as needed,
simplifying the configuration process.

A basic dictionary in Django settings acts as a factory, responsible for
instantiating the correct worker service based on the client's request mode.
This approach aligns with Django development conventions, emphasizing
simplicity. While a full factory class with a builder pattern could provide
future flexibility, YAGNI (You Aren't Gonna Need It) suggests deferring
such complexity until it’s necessary.

At the core of this design is the worker mediator, which decouples worker
service implementations from the Django ORM and manages database state
according to worker state. The mediator is purposefully limited in
responsibility, handling only what’s essential. It doesn’t instantiate
worker services directly; instead, services are injected via composition,
allowing the mediator to manage any object conforming to the `WorkerService`
interface. This setup preserves flexibility and maintains a clear
separation of responsibilities. The factory create worker services,
the mediator runs it.

(sorry for this long commit)
This commit is contained in:
lebaudantoine
2024-11-07 18:56:27 +01:00
committed by aleb_the_flash
parent 7278613b20
commit f6f1222f47
12 changed files with 1046 additions and 0 deletions

View File

View File

@@ -0,0 +1 @@
"""Meet worker services classes and exceptions."""

View File

@@ -0,0 +1,21 @@
"""Recording and worker services specific exceptions."""
class WorkerRequestError(Exception):
"""Raised when there is an issue with the worker request"""
class WorkerConnectionError(Exception):
"""Raised when there is an issue connecting to the worker."""
class WorkerResponseError(Exception):
"""Raised when the worker's response is not as expected."""
class RecordingStartError(Exception):
"""Raised when there is an error starting the recording."""
class RecordingStopError(Exception):
"""Raised when there is an error stopping the recording."""

View File

@@ -0,0 +1,75 @@
"""Factory, configurations and Protocol to create worker services"""
import logging
from dataclasses import dataclass
from functools import lru_cache
from typing import Any, ClassVar, Dict, Optional, Protocol, Type
from django.conf import settings
from django.utils.module_loading import import_string
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class WorkerServiceConfig:
"""Declare Worker Service common configurations"""
output_folder: str
server_configurations: Dict[str, Any]
verify_ssl: Optional[bool]
bucket_args: Optional[dict]
@classmethod
@lru_cache
def from_settings(cls) -> "WorkerServiceConfig":
"""Load configuration from Django settings with caching for efficiency."""
logger.debug("Loading WorkerServiceConfig from settings.")
return cls(
output_folder=settings.RECORDING_OUTPUT_FOLDER,
server_configurations=settings.LIVEKIT_CONFIGURATION,
verify_ssl=settings.RECORDING_VERIFY_SSL,
bucket_args={
"endpoint": settings.AWS_S3_ENDPOINT_URL,
"access_key": settings.AWS_S3_ACCESS_KEY_ID,
"secret": settings.AWS_S3_SECRET_ACCESS_KEY,
"region": settings.AWS_S3_REGION_NAME,
"bucket": settings.AWS_STORAGE_BUCKET_NAME,
"force_path_style": True,
},
)
class WorkerService(Protocol):
"""Define the interface for interacting with a worker service."""
hrid: ClassVar[str]
def __init__(self, config: WorkerServiceConfig):
"""Initialize the service with the given configuration."""
def start(self, room_id: str, recording_id: str) -> str:
"""Start a recording for a specified room."""
def stop(self, worker_id: str) -> str:
"""Stop recording for a specified worker."""
def get_worker_service(mode: str) -> WorkerService:
"""Instantiate a worker service by its mode."""
worker_registry: Dict[str, str] = settings.RECORDING_WORKER_CLASSES
try:
worker_class_path = worker_registry[mode]
except KeyError as e:
raise ValueError(
f"Recording mode '{mode}' not found in RECORDING_WORKER_CLASSES. "
f"Available modes: {list(worker_registry.keys())}"
) from e
worker_class: Type[WorkerService] = import_string(worker_class_path)
config = WorkerServiceConfig.from_settings()
return worker_class(config=config)

View File

@@ -0,0 +1,100 @@
"""Mediator between the worker service and recording instances in the Django ORM."""
import logging
from core.models import Recording, RecordingStatusChoices
from .exceptions import (
RecordingStartError,
RecordingStopError,
WorkerConnectionError,
WorkerRequestError,
WorkerResponseError,
)
from .factories import WorkerService
logger = logging.getLogger(__name__)
class WorkerServiceMediator:
"""Mediate interactions between a worker service and a recording instance.
A mediator class that decouples the worker from Django ORM, handles recording updates
based on worker status, and transforms worker errors into user-friendly exceptions.
Implements Mediator pattern.
"""
def __init__(self, worker_service: WorkerService):
"""Initialize the WorkerServiceMediator with the provided worker service."""
self._worker_service = worker_service
def start(self, recording: Recording):
"""Start the recording process using the worker service.
If the operation is successful, the recording's status will
transition from INITIATED to ACTIVE, else to FAILED_TO_START to keep track of errors.
Args:
recording (Recording): The recording instance to start.
Raises:
RecordingStartError: If there is an error starting the recording.
"""
# FIXME - no manipulations of room_name should be required
room_name = f"{recording.room.id!s}".replace("-", "")
if recording.status != RecordingStatusChoices.INITIATED:
logger.error("Cannot start recording in %s status.", recording.status)
raise RecordingStartError()
try:
worker_id = self._worker_service.start(room_name, recording.id)
except (WorkerRequestError, WorkerConnectionError, WorkerResponseError) as e:
logger.exception(
"Failed to start recording for room %s: %s", recording.room.slug, e
)
recording.status = RecordingStatusChoices.FAILED_TO_START
raise RecordingStartError() from e
else:
recording.worker_id = worker_id
recording.status = RecordingStatusChoices.ACTIVE
finally:
recording.save()
logger.info(
"Worker started for room %s (worker ID: %s)",
recording.room,
recording.worker_id,
)
def stop(self, recording: Recording):
"""Stop the recording process using the worker service.
If the operation is successful, the recording's status will transition
from ACTIVE to STOPPED, else to FAILED_TO_STOP to keep track of errors.
Args:
recording (Recording): The recording instance to stop.
Raises:
RecordingStopError: If there is an error stopping the recording.
"""
if recording.status != RecordingStatusChoices.ACTIVE:
logger.error("Cannot stop recording in %s status.", recording.status)
raise RecordingStopError()
try:
response = self._worker_service.stop(worker_id=recording.worker_id)
except (WorkerConnectionError, WorkerResponseError) as e:
logger.exception(
"Failed to stop recording for room %s: %s", recording.room.slug, e
)
recording.status = RecordingStatusChoices.FAILED_TO_STOP
raise RecordingStopError() from e
else:
recording.status = RecordingStatusChoices[response]
finally:
recording.save()
logger.info("Worker stopped for room %s", recording.room)

View File

@@ -0,0 +1,140 @@
"""Worker services in charge of recording a room."""
# pylint: disable=no-member
import aiohttp
from asgiref.sync import async_to_sync
from livekit import api as livekit_api
from livekit.api.egress_service import EgressService
from .exceptions import WorkerConnectionError, WorkerResponseError
from .factories import WorkerServiceConfig
class BaseEgressService:
"""Base egress defining common methods to manage and interact with LiveKit egress processes."""
def __init__(self, config: WorkerServiceConfig):
self._config = config
self._s3 = livekit_api.S3Upload(**config.bucket_args)
def _get_filepath(self, filename: str, extension: str) -> str:
"""Construct the file path for a given filename and extension.
Unsecure method, doesn't handle paths robustly and securely.
"""
return f"{self._config.output_folder}/{filename}.{extension}"
@async_to_sync
async def _handle_request(self, request, method_name: str):
"""Handle making a request to the LiveKit API and returns the response."""
# Use HTTP connector for local development with Tilt,
# where cluster communications are unsecure
connector = aiohttp.TCPConnector(ssl=self._config.verify_ssl)
async with aiohttp.ClientSession(connector=connector) as session:
client = EgressService(session, **self._config.server_configurations)
method = getattr(client, method_name)
try:
response = await method(request)
except livekit_api.TwirpError as e:
raise WorkerConnectionError(
f"LiveKit client connection error, {e.message}."
) from e
return response
def stop(self, worker_id: str) -> str:
"""Stop an ongoing egress worker.
The StopEgressRequest is shared among all types of egress,
so a single implementation in the base class should be sufficient.
"""
request = livekit_api.StopEgressRequest(
egress_id=worker_id,
)
response = self._handle_request(request, "stop_egress")
if not response.status:
raise WorkerResponseError(
"LiveKit response is missing the recording status."
)
# To avoid exposing EgressStatus values and coupling with LiveKit outside of this class,
# the response status is mapped to simpler "ABORTED", "STOPPED" or "FAILED_TO_STOP" strings.
if response.status == livekit_api.EgressStatus.EGRESS_ABORTED:
return "ABORTED"
if response.status == livekit_api.EgressStatus.EGRESS_ENDING:
return "STOPPED"
return "FAILED_TO_STOP"
def start(self, room_name, recording_id):
"""Start the egress process for a recording (not implemented in the base class).
Each derived class must implement this method, providing the necessary parameters for
its specific egress type (e.g. audio_only, streaming output).
"""
raise NotImplementedError("Subclass must implement this method.")
class VideoCompositeEgressService(BaseEgressService):
"""Record multiple participant video and audio tracks into a single output '.mp4' file."""
hrid = "video-recording-composite-livekit-egress"
def start(self, room_name, recording_id):
"""Start the video composite egress process for a recording."""
# Save room's recording as a mp4 video file.
file_type = livekit_api.EncodedFileType.MP4
filepath = self._get_filepath(filename=recording_id, extension="mp4")
file_output = livekit_api.EncodedFileOutput(
file_type=file_type,
filepath=filepath,
s3=self._s3,
)
request = livekit_api.RoomCompositeEgressRequest(
room_name=room_name,
file_outputs=[file_output],
)
response = self._handle_request(request, "start_room_composite_egress")
if not response.egress_id:
raise WorkerResponseError("Egress ID not found in the response.")
return response.egress_id
class AudioCompositeEgressService(BaseEgressService):
"""Record multiple participant audio tracks into a single output '.ogg' file."""
hrid = "audio-recording-composite-livekit-egress"
def start(self, room_name, recording_id):
"""Start the audio composite egress process for a recording."""
# Save room's recording as an ogg audio file.
file_type = livekit_api.EncodedFileType.OGG
filepath = self._get_filepath(filename=recording_id, extension="ogg")
file_output = livekit_api.EncodedFileOutput(
file_type=file_type,
filepath=filepath,
s3=self._s3,
)
request = livekit_api.RoomCompositeEgressRequest(
room_name=room_name, file_outputs=[file_output], audio_only=True
)
response = self._handle_request(request, "start_room_composite_egress")
if not response.egress_id:
raise WorkerResponseError("Egress ID not found in the response.")
return response.egress_id

View File

@@ -0,0 +1,171 @@
"""
Test worker service factories.
"""
# pylint: disable=W0212,W0621,W0613
from dataclasses import FrozenInstanceError
from unittest.mock import Mock
from django.test import override_settings
import pytest
from core.recording.worker.factories import (
WorkerService,
WorkerServiceConfig,
get_worker_service,
)
@pytest.fixture(autouse=True)
def clear_lru_cache():
"""Clear the lru_cache before and after each test"""
WorkerServiceConfig.from_settings.cache_clear()
yield
WorkerServiceConfig.from_settings.cache_clear()
@pytest.fixture
def test_settings():
"""Fixture to provide test Django settings"""
mocked_settings = {
"RECORDING_OUTPUT_FOLDER": "/test/output",
"LIVEKIT_CONFIGURATION": {"server": "test.example.com"},
"RECORDING_VERIFY_SSL": True,
"AWS_S3_ENDPOINT_URL": "https://s3.test.com",
"AWS_S3_ACCESS_KEY_ID": "test_key",
"AWS_S3_SECRET_ACCESS_KEY": "test_secret",
"AWS_S3_REGION_NAME": "test-region",
"AWS_STORAGE_BUCKET_NAME": "test-bucket",
}
# Use override_settings to properly patch Django settings
with override_settings(**mocked_settings):
yield test_settings
@pytest.fixture
def default_config(test_settings):
"""Fixture to provide a WorkerServiceConfig instance"""
return WorkerServiceConfig.from_settings()
# Tests
def test_config_initialization(default_config):
"""Test that WorkerServiceConfig is properly initialized from settings"""
assert default_config.output_folder == "/test/output"
assert default_config.server_configurations == {"server": "test.example.com"}
assert default_config.verify_ssl is True
assert default_config.bucket_args == {
"endpoint": "https://s3.test.com",
"access_key": "test_key",
"secret": "test_secret",
"region": "test-region",
"bucket": "test-bucket",
"force_path_style": True,
}
def test_config_immutability(default_config):
"""Test that config instances are immutable after creation"""
with pytest.raises(FrozenInstanceError):
default_config.output_folder = "new/path"
@override_settings(
RECORDING_OUTPUT_FOLDER="/test/output",
LIVEKIT_CONFIGURATION={"server": "test.example.com"},
RECORDING_VERIFY_SSL=True,
AWS_S3_ENDPOINT_URL="https://s3.test.com",
AWS_S3_ACCESS_KEY_ID="test_key",
AWS_S3_SECRET_ACCESS_KEY="test_secret",
AWS_S3_REGION_NAME="test-region",
AWS_STORAGE_BUCKET_NAME="test-bucket",
)
def test_config_caching():
"""Test that from_settings method caches its result"""
# Clear cache before testing caching behavior
WorkerServiceConfig.from_settings.cache_clear()
config1 = WorkerServiceConfig.from_settings()
config2 = WorkerServiceConfig.from_settings()
assert config1 is config2
class MockWorkerService(WorkerService):
"""Mock worker service for testing."""
def __init__(self, config):
self.config = config
@pytest.fixture
def mock_import_string(monkeypatch):
"""Fixture to mock import_string function."""
mock = Mock(return_value=MockWorkerService)
monkeypatch.setattr("core.recording.worker.factories.import_string", mock)
return mock
def test_factory_valid_mode(mock_import_string, settings, default_config):
"""Test getting worker service with valid mode."""
settings.RECORDING_WORKER_CLASSES = {
"test_mode": "path.to.MockWorkerService",
"another_mode": "path.to.AnotherWorkerService",
}
worker = get_worker_service("test_mode")
mock_import_string.assert_called_once_with("path.to.MockWorkerService")
assert isinstance(worker, MockWorkerService)
assert worker.config == default_config
def test_factory_invalid_mode(settings, mock_import_string, default_config):
"""Test getting worker service with invalid mode raises ValueError."""
settings.RECORDING_WORKER_CLASSES = {
"test_mode": "path.to.MockWorkerService",
"another_mode": "path.to.AnotherWorkerService",
}
worker = get_worker_service("test_mode")
mock_import_string.assert_called_once_with("path.to.MockWorkerService")
assert isinstance(worker, MockWorkerService)
with pytest.raises(ValueError) as exc_info:
get_worker_service("invalid_mode")
mock_import_string.assert_not_called()
assert "Recording mode 'invalid_mode' not found" in str(exc_info.value)
assert "Available modes: ['test_mode', 'another_mode']" in str(exc_info.value)
def test_factory_import_error(mock_import_string, settings):
"""Test handling of import errors."""
mock_import_string.side_effect = ImportError("Module not found")
settings.RECORDING_WORKER_CLASSES = {
"test_mode": "path.to.MockWorkerService",
"another_mode": "path.to.AnotherWorkerService",
}
with pytest.raises(ImportError) as exc_info:
get_worker_service("test_mode")
assert "Module not found" in str(exc_info.value)
def test_factory_empty_registry(settings):
"""Test behavior when worker registry is empty."""
settings.RECORDING_WORKER_CLASSES = {}
with pytest.raises(ValueError) as exc_info:
get_worker_service("any_mode")
assert "Available modes: []" in str(exc_info.value)

View File

@@ -0,0 +1,161 @@
"""Test WorkerServiceMediator class."""
# pylint: disable=W0621,W0613
from unittest.mock import Mock
import pytest
from core.factories import RecordingFactory
from core.models import RecordingStatusChoices
from core.recording.worker.exceptions import (
RecordingStartError,
RecordingStopError,
WorkerConnectionError,
WorkerRequestError,
WorkerResponseError,
)
from core.recording.worker.factories import WorkerService
from core.recording.worker.mediator import WorkerServiceMediator
pytestmark = pytest.mark.django_db
@pytest.fixture
def mock_worker_service():
"""Fixture for mock worker service"""
return Mock(spec=WorkerService)
@pytest.fixture
def mediator(mock_worker_service):
"""Fixture for WorkerServiceMediator"""
return WorkerServiceMediator(mock_worker_service)
def test_start_recording_success(mediator, mock_worker_service):
"""Test successful recording start"""
# Setup
worker_id = "test-worker-123"
mock_worker_service.start.return_value = worker_id
mock_recording = RecordingFactory(
status=RecordingStatusChoices.INITIATED, worker_id=None
)
mediator.start(mock_recording)
# Verify worker service call
expected_room_name = str(mock_recording.room.id).replace("-", "")
mock_worker_service.start.assert_called_once_with(
expected_room_name, mock_recording.id
)
# Verify recording updates
mock_recording.refresh_from_db()
assert mock_recording.worker_id == worker_id
assert mock_recording.status == RecordingStatusChoices.ACTIVE
@pytest.mark.parametrize(
"error_class", [WorkerRequestError, WorkerConnectionError, WorkerResponseError]
)
def test_mediator_start_recording_worker_errors(
mediator, mock_worker_service, error_class
):
"""Test handling of various worker errors during start"""
# Setup
mock_worker_service.start.side_effect = error_class("Test error")
mock_recording = RecordingFactory(
status=RecordingStatusChoices.INITIATED, worker_id=None
)
# Execute and verify
with pytest.raises(RecordingStartError):
mediator.start(mock_recording)
# Verify recording updates
mock_recording.refresh_from_db()
assert mock_recording.status == RecordingStatusChoices.FAILED_TO_START
assert mock_recording.worker_id is None
@pytest.mark.parametrize(
"status",
[
RecordingStatusChoices.ACTIVE,
RecordingStatusChoices.FAILED_TO_START,
RecordingStatusChoices.FAILED_TO_STOP,
RecordingStatusChoices.STOPPED,
RecordingStatusChoices.SAVED,
RecordingStatusChoices.ABORTED,
],
)
def test_mediator_start_recording_from_forbidden_status(
mediator, mock_worker_service, status
):
"""Test handling of various worker errors during start"""
# Setup
mock_recording = RecordingFactory(status=status)
# Execute and verify
with pytest.raises(RecordingStartError):
mediator.start(mock_recording)
# Verify recording was not updated
mock_recording.refresh_from_db()
assert mock_recording.status == status
def test_mediator_stop_recording_success(mediator, mock_worker_service):
"""Test successful recording stop"""
# Setup
mock_recording = RecordingFactory(
status=RecordingStatusChoices.ACTIVE, worker_id="test-worker-123"
)
mock_worker_service.stop.return_value = "STOPPED"
# Execute
mediator.stop(mock_recording)
# Verify worker service call
mock_worker_service.stop.assert_called_once_with(worker_id=mock_recording.worker_id)
# Verify recording updates
mock_recording.refresh_from_db()
assert mock_recording.status == RecordingStatusChoices.STOPPED
def test_mediator_stop_recording_aborted(mediator, mock_worker_service):
"""Test recording stop when worker returns ABORTED"""
# Setup
mock_recording = RecordingFactory(
status=RecordingStatusChoices.ACTIVE, worker_id="test-worker-123"
)
mock_worker_service.stop.return_value = "ABORTED"
# Execute
mediator.stop(mock_recording)
# Verify recording updates
mock_recording.refresh_from_db()
assert mock_recording.status == RecordingStatusChoices.ABORTED
@pytest.mark.parametrize("error_class", [WorkerConnectionError, WorkerResponseError])
def test_mediator_stop_recording_worker_errors(
mediator, mock_worker_service, error_class
):
"""Test handling of worker errors during stop"""
# Setup
mock_recording = RecordingFactory(
status=RecordingStatusChoices.ACTIVE, worker_id="test-worker-123"
)
mock_worker_service.stop.side_effect = error_class("Test error")
# Execute and verify
with pytest.raises(RecordingStopError):
mediator.stop(mock_recording)
# Verify recording updates
mock_recording.refresh_from_db()
assert mock_recording.status == RecordingStatusChoices.FAILED_TO_STOP

View File

@@ -0,0 +1,368 @@
"""
Test worker service classes.
"""
# pylint: disable=W0212,W0621,W0613,E1101
from unittest.mock import AsyncMock, Mock, patch
import aiohttp
import pytest
from core.recording.worker.exceptions import WorkerConnectionError, WorkerResponseError
from core.recording.worker.factories import WorkerServiceConfig
from core.recording.worker.services import (
AudioCompositeEgressService,
BaseEgressService,
VideoCompositeEgressService,
livekit_api,
)
@pytest.fixture
def config():
"""Fixture to provide a WorkerServiceConfig instance"""
return WorkerServiceConfig(
output_folder="/test/output",
server_configurations={
"host": "test.livekit.io",
"api_key": "test_key",
"api_secret": "test_secret",
},
verify_ssl=True,
bucket_args={
"endpoint": "https://s3.test.com",
"access_key": "test_key",
"secret": "test_secret",
"region": "test-region",
"bucket": "test-bucket",
"force_path_style": True,
},
)
@pytest.fixture
def mock_s3_upload():
"""Fixture for mocked S3Upload"""
with patch("core.recording.worker.services.livekit_api.S3Upload") as mock:
yield mock
@pytest.fixture
def mock_egress_service():
"""Fixture for mocked EgressService"""
with patch("core.recording.worker.services.EgressService") as mock:
yield mock
@pytest.fixture
def service(config, mock_s3_upload):
"""Fixture for BaseEgressService instance"""
return BaseEgressService(config)
@pytest.fixture
def mock_client_session():
"""Fixture for mocked aiohttp.ClientSession"""
with patch("aiohttp.ClientSession") as mock:
mock.return_value.__aenter__ = AsyncMock()
mock.return_value.__aexit__ = AsyncMock()
yield mock
@pytest.fixture
def mock_tcp_connector():
"""Fixture for TCPConnector"""
with patch("aiohttp.TCPConnector") as mock_connector:
mock_connector_instance = Mock()
mock_connector.return_value = mock_connector_instance
yield mock_connector
@pytest.fixture
def video_service(config):
"""Fixture for VideoCompositeEgressService"""
service = VideoCompositeEgressService(config)
service._handle_request = Mock() # Mock the request handler
return service
@pytest.fixture
def audio_service(config):
"""Fixture for AudioCompositeEgressService"""
service = AudioCompositeEgressService(config)
service._handle_request = Mock() # Mock the request handler
return service
def test_base_egress_initialization(config, mock_s3_upload):
"""Test service initialization"""
service = BaseEgressService(config)
assert service._config == config
mock_s3_upload.assert_called_once_with(
endpoint="https://s3.test.com",
access_key="test_key",
secret="test_secret",
region="test-region",
bucket="test-bucket",
force_path_style=True,
)
@pytest.mark.parametrize(
"filename,extension,expected",
[
("test", "mp4", "/test/output/test.mp4"),
("recording123", "ogg", "/test/output/recording123.ogg"),
("live_stream", "m3u8", "/test/output/live_stream.m3u8"),
],
)
def test_base_egress_filepath_construction(service, filename, extension, expected):
"""Test filepath construction with various inputs"""
result = service._get_filepath(filename, extension)
assert result == expected
assert result.startswith(service._config.output_folder)
assert result.endswith(f"{filename}.{extension}")
def test_base_egress_handle_request_success(
config, service, mock_client_session, mock_egress_service, mock_tcp_connector
):
"""Test successful request handling"""
# Setup mock response
mock_response = Mock()
mock_method = AsyncMock(return_value=mock_response)
mock_egress_instance = Mock()
mock_egress_instance.test_method = mock_method
mock_egress_service.return_value = mock_egress_instance
# Create test request
test_request = Mock()
response = service._handle_request(test_request, "test_method")
mock_client_session.assert_called_once_with(
connector=mock_tcp_connector.return_value
)
# Verify EgressService initialization
mock_egress_service.assert_called_once_with(
mock_client_session.return_value.__aenter__.return_value,
**service._config.server_configurations,
)
# Verify method call and response
mock_method.assert_called_once_with(test_request)
assert response == mock_response
def test_base_egress_handle_request_connection_error(service, mock_egress_service):
"""Test handling of connection errors"""
# Setup mock error
mock_method = AsyncMock(
side_effect=livekit_api.TwirpError(msg="Connection failed", code=500)
)
mock_egress_instance = Mock()
mock_egress_instance.test_method = mock_method
mock_egress_service.return_value = mock_egress_instance
# Create test request
test_request = Mock()
# Verify error handling
with pytest.raises(WorkerConnectionError) as exc:
service._handle_request(test_request, "test_method")
assert "LiveKit client connection error" in str(exc.value)
assert "Connection failed" in str(exc.value)
@pytest.mark.parametrize(
"response_status,expected_result",
[
(livekit_api.EgressStatus.EGRESS_ABORTED, "ABORTED"),
(livekit_api.EgressStatus.EGRESS_COMPLETE, "FAILED_TO_STOP"),
(livekit_api.EgressStatus.EGRESS_ENDING, "STOPPED"),
(livekit_api.EgressStatus.EGRESS_FAILED, "FAILED_TO_STOP"),
],
)
def test_base_egress_stop_with_status(service, response_status, expected_result):
"""Test stop method with different response statuses"""
# Mock _handle_request
mock_response = Mock(status=response_status)
service._handle_request = Mock(return_value=mock_response)
# Execute stop
result = service.stop("test_worker_id")
# Verify request and response handling
service._handle_request.assert_called_once_with(
livekit_api.StopEgressRequest(egress_id="test_worker_id"), "stop_egress"
)
assert result == expected_result
def test_base_egress_stop_missing_status(service):
"""Test stop method when response is missing status"""
# Mock _handle_request with missing status
mock_response = Mock(status=None)
service._handle_request = Mock(return_value=mock_response)
# Verify error handling
with pytest.raises(WorkerResponseError) as exc:
service.stop("test_worker_id")
assert "missing the recording status" in str(exc.value)
def test_base_egress_start_not_implemented(service):
"""Test that start method raises NotImplementedError"""
with pytest.raises(NotImplementedError) as exc:
service.start("test_room", "test_recording")
assert "Subclass must implement this method" in str(exc.value)
@pytest.mark.parametrize("verify_ssl", [True, False])
def test_base_egress_ssl_verification_config(verify_ssl):
"""Test SSL verification configuration"""
config = WorkerServiceConfig(
output_folder="/test/output",
server_configurations={
"host": "test.livekit.io",
"api_key": "test_key",
"api_secret": "test_secret",
},
verify_ssl=verify_ssl,
bucket_args={
"endpoint": "https://s3.test.com",
"access_key": "test_key",
"secret": "test_secret",
"region": "test-region",
"bucket": "test-bucket",
"force_path_style": True,
},
)
service = BaseEgressService(config)
# Mock ClientSession to capture connector configuration
with patch("aiohttp.ClientSession") as mock_session:
mock_session.return_value.__aenter__ = AsyncMock()
mock_session.return_value.__aexit__ = AsyncMock()
# Trigger request to verify connector configuration
service._handle_request(Mock(), "test_method")
# Verify SSL configuration
connector = mock_session.call_args[1]["connector"]
assert isinstance(connector, aiohttp.TCPConnector)
assert connector._ssl == verify_ssl
def test_video_composite_egress_hrid(video_service):
"""Test HRID is correct"""
assert video_service.hrid == "video-recording-composite-livekit-egress"
def test_video_composite_egress_start_success(video_service):
"""Test successful start of video composite egress"""
# Setup mock response
egress_id = "test-egress-123"
video_service._handle_request.return_value = Mock(egress_id=egress_id)
# Test parameters
room_name = "test-room"
recording_id = "rec-123"
# Call start
result = video_service.start(room_name, recording_id)
# Verify result
assert result == egress_id
# Verify request construction
video_service._handle_request.assert_called_once()
request = video_service._handle_request.call_args[0][0]
method = video_service._handle_request.call_args[0][1]
# Verify request properties
assert isinstance(request, livekit_api.RoomCompositeEgressRequest)
assert request.room_name == room_name
assert len(request.file_outputs) == 1
assert not request.audio_only # Video service shouldn't set audio_only
# Verify file output configuration
file_output = request.file_outputs[0]
assert file_output.file_type == livekit_api.EncodedFileType.MP4
assert file_output.filepath == f"/test/output/{recording_id}.mp4"
assert file_output.s3.bucket == "test-bucket"
# Verify method name
assert method == "start_room_composite_egress"
def test_video_composite_egress_start_missing_egress_id(video_service):
"""Test handling of missing egress ID in response"""
# Setup mock response without egress_id
video_service._handle_request.return_value = Mock(egress_id=None)
with pytest.raises(WorkerResponseError) as exc_info:
video_service.start("test-room", "rec-123")
assert "Egress ID not found" in str(exc_info.value)
def test_audio_composite_egress_hrid(audio_service):
"""Test HRID is correct"""
assert audio_service.hrid == "audio-recording-composite-livekit-egress"
def test_audio_composite_egress_start_success(audio_service):
"""Test successful start of audio composite egress"""
# Setup mock response
egress_id = "test-egress-123"
audio_service._handle_request.return_value = Mock(egress_id=egress_id)
# Test parameters
room_name = "test-room"
recording_id = "rec-123"
# Call start
result = audio_service.start(room_name, recording_id)
# Verify result
assert result == egress_id
# Verify request construction
audio_service._handle_request.assert_called_once()
request = audio_service._handle_request.call_args[0][0]
method = audio_service._handle_request.call_args[0][1]
# Verify request properties
assert isinstance(request, livekit_api.RoomCompositeEgressRequest)
assert request.room_name == room_name
assert len(request.file_outputs) == 1
assert request.audio_only is True # Audio service should set audio_only
# Verify file output configuration
file_output = request.file_outputs[0]
assert file_output.file_type == livekit_api.EncodedFileType.OGG
assert file_output.filepath == f"/test/output/{recording_id}.ogg"
assert file_output.s3.bucket == "test-bucket"
# Verify method name
assert method == "start_room_composite_egress"
def test_audio_composite_egress_start_missing_egress_id(audio_service):
"""Test handling of missing egress ID in response"""
# Setup mock response without egress_id
audio_service._handle_request.return_value = Mock(egress_id=None)
with pytest.raises(WorkerResponseError) as exc_info:
audio_service.start("test-room", "rec-123")
assert "Egress ID not found" in str(exc_info.value)

View File

@@ -406,6 +406,14 @@ class Base(Configuration):
True, environ_name="ALLOW_UNREGISTERED_ROOMS", environ_prefix=None
)
# Recording settings
RECORDING_OUTPUT_FOLDER = values.Value(
"recordings", environ_name="RECORDING_OUTPUT_FOLDER", environ_prefix=None
)
RECORDING_VERIFY_SSL = values.BooleanValue(
True, environ_name="RECORDING_VERIFY_SSL", environ_prefix=None
)
# pylint: disable=invalid-name
@property
def ENVIRONMENT(self):

View File

@@ -57,6 +57,7 @@ dependencies = [
"whitenoise==6.7.0",
"mozilla-django-oidc==4.0.1",
"livekit-api==0.7.0",
"aiohttp==3.10.10",
]
[project.urls]