diff --git a/src/backend/core/recording/__init__.py b/src/backend/core/recording/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/backend/core/recording/worker/__init__.py b/src/backend/core/recording/worker/__init__.py new file mode 100644 index 00000000..f3cc9ffa --- /dev/null +++ b/src/backend/core/recording/worker/__init__.py @@ -0,0 +1 @@ +"""Meet worker services classes and exceptions.""" diff --git a/src/backend/core/recording/worker/exceptions.py b/src/backend/core/recording/worker/exceptions.py new file mode 100644 index 00000000..011cc5d3 --- /dev/null +++ b/src/backend/core/recording/worker/exceptions.py @@ -0,0 +1,21 @@ +"""Recording and worker services specific exceptions.""" + + +class WorkerRequestError(Exception): + """Raised when there is an issue with the worker request""" + + +class WorkerConnectionError(Exception): + """Raised when there is an issue connecting to the worker.""" + + +class WorkerResponseError(Exception): + """Raised when the worker's response is not as expected.""" + + +class RecordingStartError(Exception): + """Raised when there is an error starting the recording.""" + + +class RecordingStopError(Exception): + """Raised when there is an error stopping the recording.""" diff --git a/src/backend/core/recording/worker/factories.py b/src/backend/core/recording/worker/factories.py new file mode 100644 index 00000000..73bebba9 --- /dev/null +++ b/src/backend/core/recording/worker/factories.py @@ -0,0 +1,75 @@ +"""Factory, configurations and Protocol to create worker services""" + +import logging +from dataclasses import dataclass +from functools import lru_cache +from typing import Any, ClassVar, Dict, Optional, Protocol, Type + +from django.conf import settings +from django.utils.module_loading import import_string + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class WorkerServiceConfig: + """Declare Worker Service common configurations""" + + output_folder: str + server_configurations: Dict[str, Any] + verify_ssl: Optional[bool] + bucket_args: Optional[dict] + + @classmethod + @lru_cache + def from_settings(cls) -> "WorkerServiceConfig": + """Load configuration from Django settings with caching for efficiency.""" + + logger.debug("Loading WorkerServiceConfig from settings.") + return cls( + output_folder=settings.RECORDING_OUTPUT_FOLDER, + server_configurations=settings.LIVEKIT_CONFIGURATION, + verify_ssl=settings.RECORDING_VERIFY_SSL, + bucket_args={ + "endpoint": settings.AWS_S3_ENDPOINT_URL, + "access_key": settings.AWS_S3_ACCESS_KEY_ID, + "secret": settings.AWS_S3_SECRET_ACCESS_KEY, + "region": settings.AWS_S3_REGION_NAME, + "bucket": settings.AWS_STORAGE_BUCKET_NAME, + "force_path_style": True, + }, + ) + + +class WorkerService(Protocol): + """Define the interface for interacting with a worker service.""" + + hrid: ClassVar[str] + + def __init__(self, config: WorkerServiceConfig): + """Initialize the service with the given configuration.""" + + def start(self, room_id: str, recording_id: str) -> str: + """Start a recording for a specified room.""" + + def stop(self, worker_id: str) -> str: + """Stop recording for a specified worker.""" + + +def get_worker_service(mode: str) -> WorkerService: + """Instantiate a worker service by its mode.""" + + worker_registry: Dict[str, str] = settings.RECORDING_WORKER_CLASSES + + try: + worker_class_path = worker_registry[mode] + except KeyError as e: + raise ValueError( + f"Recording mode '{mode}' not found in RECORDING_WORKER_CLASSES. " + f"Available modes: {list(worker_registry.keys())}" + ) from e + + worker_class: Type[WorkerService] = import_string(worker_class_path) + + config = WorkerServiceConfig.from_settings() + return worker_class(config=config) diff --git a/src/backend/core/recording/worker/mediator.py b/src/backend/core/recording/worker/mediator.py new file mode 100644 index 00000000..e2829699 --- /dev/null +++ b/src/backend/core/recording/worker/mediator.py @@ -0,0 +1,100 @@ +"""Mediator between the worker service and recording instances in the Django ORM.""" + +import logging + +from core.models import Recording, RecordingStatusChoices + +from .exceptions import ( + RecordingStartError, + RecordingStopError, + WorkerConnectionError, + WorkerRequestError, + WorkerResponseError, +) +from .factories import WorkerService + +logger = logging.getLogger(__name__) + + +class WorkerServiceMediator: + """Mediate interactions between a worker service and a recording instance. + + A mediator class that decouples the worker from Django ORM, handles recording updates + based on worker status, and transforms worker errors into user-friendly exceptions. + Implements Mediator pattern. + """ + + def __init__(self, worker_service: WorkerService): + """Initialize the WorkerServiceMediator with the provided worker service.""" + + self._worker_service = worker_service + + def start(self, recording: Recording): + """Start the recording process using the worker service. + + If the operation is successful, the recording's status will + transition from INITIATED to ACTIVE, else to FAILED_TO_START to keep track of errors. + + Args: + recording (Recording): The recording instance to start. + Raises: + RecordingStartError: If there is an error starting the recording. + """ + + # FIXME - no manipulations of room_name should be required + room_name = f"{recording.room.id!s}".replace("-", "") + + if recording.status != RecordingStatusChoices.INITIATED: + logger.error("Cannot start recording in %s status.", recording.status) + raise RecordingStartError() + + try: + worker_id = self._worker_service.start(room_name, recording.id) + except (WorkerRequestError, WorkerConnectionError, WorkerResponseError) as e: + logger.exception( + "Failed to start recording for room %s: %s", recording.room.slug, e + ) + recording.status = RecordingStatusChoices.FAILED_TO_START + raise RecordingStartError() from e + else: + recording.worker_id = worker_id + recording.status = RecordingStatusChoices.ACTIVE + finally: + recording.save() + + logger.info( + "Worker started for room %s (worker ID: %s)", + recording.room, + recording.worker_id, + ) + + def stop(self, recording: Recording): + """Stop the recording process using the worker service. + + If the operation is successful, the recording's status will transition + from ACTIVE to STOPPED, else to FAILED_TO_STOP to keep track of errors. + + Args: + recording (Recording): The recording instance to stop. + Raises: + RecordingStopError: If there is an error stopping the recording. + """ + + if recording.status != RecordingStatusChoices.ACTIVE: + logger.error("Cannot stop recording in %s status.", recording.status) + raise RecordingStopError() + + try: + response = self._worker_service.stop(worker_id=recording.worker_id) + except (WorkerConnectionError, WorkerResponseError) as e: + logger.exception( + "Failed to stop recording for room %s: %s", recording.room.slug, e + ) + recording.status = RecordingStatusChoices.FAILED_TO_STOP + raise RecordingStopError() from e + else: + recording.status = RecordingStatusChoices[response] + finally: + recording.save() + + logger.info("Worker stopped for room %s", recording.room) diff --git a/src/backend/core/recording/worker/services.py b/src/backend/core/recording/worker/services.py new file mode 100644 index 00000000..2fb229e5 --- /dev/null +++ b/src/backend/core/recording/worker/services.py @@ -0,0 +1,140 @@ +"""Worker services in charge of recording a room.""" + +# pylint: disable=no-member + +import aiohttp +from asgiref.sync import async_to_sync +from livekit import api as livekit_api +from livekit.api.egress_service import EgressService + +from .exceptions import WorkerConnectionError, WorkerResponseError +from .factories import WorkerServiceConfig + + +class BaseEgressService: + """Base egress defining common methods to manage and interact with LiveKit egress processes.""" + + def __init__(self, config: WorkerServiceConfig): + self._config = config + self._s3 = livekit_api.S3Upload(**config.bucket_args) + + def _get_filepath(self, filename: str, extension: str) -> str: + """Construct the file path for a given filename and extension. + Unsecure method, doesn't handle paths robustly and securely. + """ + return f"{self._config.output_folder}/{filename}.{extension}" + + @async_to_sync + async def _handle_request(self, request, method_name: str): + """Handle making a request to the LiveKit API and returns the response.""" + + # Use HTTP connector for local development with Tilt, + # where cluster communications are unsecure + connector = aiohttp.TCPConnector(ssl=self._config.verify_ssl) + + async with aiohttp.ClientSession(connector=connector) as session: + client = EgressService(session, **self._config.server_configurations) + method = getattr(client, method_name) + try: + response = await method(request) + except livekit_api.TwirpError as e: + raise WorkerConnectionError( + f"LiveKit client connection error, {e.message}." + ) from e + + return response + + def stop(self, worker_id: str) -> str: + """Stop an ongoing egress worker. + The StopEgressRequest is shared among all types of egress, + so a single implementation in the base class should be sufficient. + """ + + request = livekit_api.StopEgressRequest( + egress_id=worker_id, + ) + + response = self._handle_request(request, "stop_egress") + + if not response.status: + raise WorkerResponseError( + "LiveKit response is missing the recording status." + ) + + # To avoid exposing EgressStatus values and coupling with LiveKit outside of this class, + # the response status is mapped to simpler "ABORTED", "STOPPED" or "FAILED_TO_STOP" strings. + if response.status == livekit_api.EgressStatus.EGRESS_ABORTED: + return "ABORTED" + + if response.status == livekit_api.EgressStatus.EGRESS_ENDING: + return "STOPPED" + + return "FAILED_TO_STOP" + + def start(self, room_name, recording_id): + """Start the egress process for a recording (not implemented in the base class). + Each derived class must implement this method, providing the necessary parameters for + its specific egress type (e.g. audio_only, streaming output). + """ + raise NotImplementedError("Subclass must implement this method.") + + +class VideoCompositeEgressService(BaseEgressService): + """Record multiple participant video and audio tracks into a single output '.mp4' file.""" + + hrid = "video-recording-composite-livekit-egress" + + def start(self, room_name, recording_id): + """Start the video composite egress process for a recording.""" + + # Save room's recording as a mp4 video file. + file_type = livekit_api.EncodedFileType.MP4 + filepath = self._get_filepath(filename=recording_id, extension="mp4") + + file_output = livekit_api.EncodedFileOutput( + file_type=file_type, + filepath=filepath, + s3=self._s3, + ) + + request = livekit_api.RoomCompositeEgressRequest( + room_name=room_name, + file_outputs=[file_output], + ) + + response = self._handle_request(request, "start_room_composite_egress") + + if not response.egress_id: + raise WorkerResponseError("Egress ID not found in the response.") + + return response.egress_id + + +class AudioCompositeEgressService(BaseEgressService): + """Record multiple participant audio tracks into a single output '.ogg' file.""" + + hrid = "audio-recording-composite-livekit-egress" + + def start(self, room_name, recording_id): + """Start the audio composite egress process for a recording.""" + + # Save room's recording as an ogg audio file. + file_type = livekit_api.EncodedFileType.OGG + filepath = self._get_filepath(filename=recording_id, extension="ogg") + + file_output = livekit_api.EncodedFileOutput( + file_type=file_type, + filepath=filepath, + s3=self._s3, + ) + + request = livekit_api.RoomCompositeEgressRequest( + room_name=room_name, file_outputs=[file_output], audio_only=True + ) + + response = self._handle_request(request, "start_room_composite_egress") + + if not response.egress_id: + raise WorkerResponseError("Egress ID not found in the response.") + + return response.egress_id diff --git a/src/backend/core/tests/recording/worker/__init__.py b/src/backend/core/tests/recording/worker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/backend/core/tests/recording/worker/test_factories.py b/src/backend/core/tests/recording/worker/test_factories.py new file mode 100644 index 00000000..f2c7537b --- /dev/null +++ b/src/backend/core/tests/recording/worker/test_factories.py @@ -0,0 +1,171 @@ +""" +Test worker service factories. +""" + +# pylint: disable=W0212,W0621,W0613 + +from dataclasses import FrozenInstanceError +from unittest.mock import Mock + +from django.test import override_settings + +import pytest + +from core.recording.worker.factories import ( + WorkerService, + WorkerServiceConfig, + get_worker_service, +) + + +@pytest.fixture(autouse=True) +def clear_lru_cache(): + """Clear the lru_cache before and after each test""" + WorkerServiceConfig.from_settings.cache_clear() + yield + WorkerServiceConfig.from_settings.cache_clear() + + +@pytest.fixture +def test_settings(): + """Fixture to provide test Django settings""" + mocked_settings = { + "RECORDING_OUTPUT_FOLDER": "/test/output", + "LIVEKIT_CONFIGURATION": {"server": "test.example.com"}, + "RECORDING_VERIFY_SSL": True, + "AWS_S3_ENDPOINT_URL": "https://s3.test.com", + "AWS_S3_ACCESS_KEY_ID": "test_key", + "AWS_S3_SECRET_ACCESS_KEY": "test_secret", + "AWS_S3_REGION_NAME": "test-region", + "AWS_STORAGE_BUCKET_NAME": "test-bucket", + } + + # Use override_settings to properly patch Django settings + with override_settings(**mocked_settings): + yield test_settings + + +@pytest.fixture +def default_config(test_settings): + """Fixture to provide a WorkerServiceConfig instance""" + return WorkerServiceConfig.from_settings() + + +# Tests +def test_config_initialization(default_config): + """Test that WorkerServiceConfig is properly initialized from settings""" + assert default_config.output_folder == "/test/output" + assert default_config.server_configurations == {"server": "test.example.com"} + assert default_config.verify_ssl is True + assert default_config.bucket_args == { + "endpoint": "https://s3.test.com", + "access_key": "test_key", + "secret": "test_secret", + "region": "test-region", + "bucket": "test-bucket", + "force_path_style": True, + } + + +def test_config_immutability(default_config): + """Test that config instances are immutable after creation""" + with pytest.raises(FrozenInstanceError): + default_config.output_folder = "new/path" + + +@override_settings( + RECORDING_OUTPUT_FOLDER="/test/output", + LIVEKIT_CONFIGURATION={"server": "test.example.com"}, + RECORDING_VERIFY_SSL=True, + AWS_S3_ENDPOINT_URL="https://s3.test.com", + AWS_S3_ACCESS_KEY_ID="test_key", + AWS_S3_SECRET_ACCESS_KEY="test_secret", + AWS_S3_REGION_NAME="test-region", + AWS_STORAGE_BUCKET_NAME="test-bucket", +) +def test_config_caching(): + """Test that from_settings method caches its result""" + # Clear cache before testing caching behavior + WorkerServiceConfig.from_settings.cache_clear() + + config1 = WorkerServiceConfig.from_settings() + config2 = WorkerServiceConfig.from_settings() + assert config1 is config2 + + +class MockWorkerService(WorkerService): + """Mock worker service for testing.""" + + def __init__(self, config): + self.config = config + + +@pytest.fixture +def mock_import_string(monkeypatch): + """Fixture to mock import_string function.""" + mock = Mock(return_value=MockWorkerService) + monkeypatch.setattr("core.recording.worker.factories.import_string", mock) + return mock + + +def test_factory_valid_mode(mock_import_string, settings, default_config): + """Test getting worker service with valid mode.""" + + settings.RECORDING_WORKER_CLASSES = { + "test_mode": "path.to.MockWorkerService", + "another_mode": "path.to.AnotherWorkerService", + } + + worker = get_worker_service("test_mode") + + mock_import_string.assert_called_once_with("path.to.MockWorkerService") + assert isinstance(worker, MockWorkerService) + assert worker.config == default_config + + +def test_factory_invalid_mode(settings, mock_import_string, default_config): + """Test getting worker service with invalid mode raises ValueError.""" + + settings.RECORDING_WORKER_CLASSES = { + "test_mode": "path.to.MockWorkerService", + "another_mode": "path.to.AnotherWorkerService", + } + + worker = get_worker_service("test_mode") + + mock_import_string.assert_called_once_with("path.to.MockWorkerService") + assert isinstance(worker, MockWorkerService) + + with pytest.raises(ValueError) as exc_info: + get_worker_service("invalid_mode") + mock_import_string.assert_not_called() + + assert "Recording mode 'invalid_mode' not found" in str(exc_info.value) + assert "Available modes: ['test_mode', 'another_mode']" in str(exc_info.value) + + +def test_factory_import_error(mock_import_string, settings): + """Test handling of import errors.""" + + mock_import_string.side_effect = ImportError("Module not found") + + settings.RECORDING_WORKER_CLASSES = { + "test_mode": "path.to.MockWorkerService", + "another_mode": "path.to.AnotherWorkerService", + } + + with pytest.raises(ImportError) as exc_info: + get_worker_service("test_mode") + + assert "Module not found" in str(exc_info.value) + + +def test_factory_empty_registry(settings): + """Test behavior when worker registry is empty.""" + + settings.RECORDING_WORKER_CLASSES = {} + + with pytest.raises(ValueError) as exc_info: + get_worker_service("any_mode") + + assert "Available modes: []" in str(exc_info.value) diff --git a/src/backend/core/tests/recording/worker/test_mediator.py b/src/backend/core/tests/recording/worker/test_mediator.py new file mode 100644 index 00000000..f84dc260 --- /dev/null +++ b/src/backend/core/tests/recording/worker/test_mediator.py @@ -0,0 +1,161 @@ +"""Test WorkerServiceMediator class.""" + +# pylint: disable=W0621,W0613 + +from unittest.mock import Mock + +import pytest + +from core.factories import RecordingFactory +from core.models import RecordingStatusChoices +from core.recording.worker.exceptions import ( + RecordingStartError, + RecordingStopError, + WorkerConnectionError, + WorkerRequestError, + WorkerResponseError, +) +from core.recording.worker.factories import WorkerService +from core.recording.worker.mediator import WorkerServiceMediator + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def mock_worker_service(): + """Fixture for mock worker service""" + return Mock(spec=WorkerService) + + +@pytest.fixture +def mediator(mock_worker_service): + """Fixture for WorkerServiceMediator""" + return WorkerServiceMediator(mock_worker_service) + + +def test_start_recording_success(mediator, mock_worker_service): + """Test successful recording start""" + # Setup + worker_id = "test-worker-123" + mock_worker_service.start.return_value = worker_id + + mock_recording = RecordingFactory( + status=RecordingStatusChoices.INITIATED, worker_id=None + ) + mediator.start(mock_recording) + + # Verify worker service call + expected_room_name = str(mock_recording.room.id).replace("-", "") + mock_worker_service.start.assert_called_once_with( + expected_room_name, mock_recording.id + ) + + # Verify recording updates + mock_recording.refresh_from_db() + assert mock_recording.worker_id == worker_id + assert mock_recording.status == RecordingStatusChoices.ACTIVE + + +@pytest.mark.parametrize( + "error_class", [WorkerRequestError, WorkerConnectionError, WorkerResponseError] +) +def test_mediator_start_recording_worker_errors( + mediator, mock_worker_service, error_class +): + """Test handling of various worker errors during start""" + # Setup + mock_worker_service.start.side_effect = error_class("Test error") + mock_recording = RecordingFactory( + status=RecordingStatusChoices.INITIATED, worker_id=None + ) + + # Execute and verify + with pytest.raises(RecordingStartError): + mediator.start(mock_recording) + + # Verify recording updates + mock_recording.refresh_from_db() + assert mock_recording.status == RecordingStatusChoices.FAILED_TO_START + assert mock_recording.worker_id is None + + +@pytest.mark.parametrize( + "status", + [ + RecordingStatusChoices.ACTIVE, + RecordingStatusChoices.FAILED_TO_START, + RecordingStatusChoices.FAILED_TO_STOP, + RecordingStatusChoices.STOPPED, + RecordingStatusChoices.SAVED, + RecordingStatusChoices.ABORTED, + ], +) +def test_mediator_start_recording_from_forbidden_status( + mediator, mock_worker_service, status +): + """Test handling of various worker errors during start""" + # Setup + mock_recording = RecordingFactory(status=status) + + # Execute and verify + with pytest.raises(RecordingStartError): + mediator.start(mock_recording) + + # Verify recording was not updated + mock_recording.refresh_from_db() + assert mock_recording.status == status + + +def test_mediator_stop_recording_success(mediator, mock_worker_service): + """Test successful recording stop""" + # Setup + mock_recording = RecordingFactory( + status=RecordingStatusChoices.ACTIVE, worker_id="test-worker-123" + ) + mock_worker_service.stop.return_value = "STOPPED" + + # Execute + mediator.stop(mock_recording) + + # Verify worker service call + mock_worker_service.stop.assert_called_once_with(worker_id=mock_recording.worker_id) + + # Verify recording updates + mock_recording.refresh_from_db() + assert mock_recording.status == RecordingStatusChoices.STOPPED + + +def test_mediator_stop_recording_aborted(mediator, mock_worker_service): + """Test recording stop when worker returns ABORTED""" + # Setup + mock_recording = RecordingFactory( + status=RecordingStatusChoices.ACTIVE, worker_id="test-worker-123" + ) + mock_worker_service.stop.return_value = "ABORTED" + + # Execute + mediator.stop(mock_recording) + + # Verify recording updates + mock_recording.refresh_from_db() + assert mock_recording.status == RecordingStatusChoices.ABORTED + + +@pytest.mark.parametrize("error_class", [WorkerConnectionError, WorkerResponseError]) +def test_mediator_stop_recording_worker_errors( + mediator, mock_worker_service, error_class +): + """Test handling of worker errors during stop""" + # Setup + mock_recording = RecordingFactory( + status=RecordingStatusChoices.ACTIVE, worker_id="test-worker-123" + ) + mock_worker_service.stop.side_effect = error_class("Test error") + + # Execute and verify + with pytest.raises(RecordingStopError): + mediator.stop(mock_recording) + + # Verify recording updates + mock_recording.refresh_from_db() + assert mock_recording.status == RecordingStatusChoices.FAILED_TO_STOP diff --git a/src/backend/core/tests/recording/worker/test_services.py b/src/backend/core/tests/recording/worker/test_services.py new file mode 100644 index 00000000..30997b23 --- /dev/null +++ b/src/backend/core/tests/recording/worker/test_services.py @@ -0,0 +1,368 @@ +""" +Test worker service classes. +""" + +# pylint: disable=W0212,W0621,W0613,E1101 + +from unittest.mock import AsyncMock, Mock, patch + +import aiohttp +import pytest + +from core.recording.worker.exceptions import WorkerConnectionError, WorkerResponseError +from core.recording.worker.factories import WorkerServiceConfig +from core.recording.worker.services import ( + AudioCompositeEgressService, + BaseEgressService, + VideoCompositeEgressService, + livekit_api, +) + + +@pytest.fixture +def config(): + """Fixture to provide a WorkerServiceConfig instance""" + return WorkerServiceConfig( + output_folder="/test/output", + server_configurations={ + "host": "test.livekit.io", + "api_key": "test_key", + "api_secret": "test_secret", + }, + verify_ssl=True, + bucket_args={ + "endpoint": "https://s3.test.com", + "access_key": "test_key", + "secret": "test_secret", + "region": "test-region", + "bucket": "test-bucket", + "force_path_style": True, + }, + ) + + +@pytest.fixture +def mock_s3_upload(): + """Fixture for mocked S3Upload""" + with patch("core.recording.worker.services.livekit_api.S3Upload") as mock: + yield mock + + +@pytest.fixture +def mock_egress_service(): + """Fixture for mocked EgressService""" + with patch("core.recording.worker.services.EgressService") as mock: + yield mock + + +@pytest.fixture +def service(config, mock_s3_upload): + """Fixture for BaseEgressService instance""" + return BaseEgressService(config) + + +@pytest.fixture +def mock_client_session(): + """Fixture for mocked aiohttp.ClientSession""" + with patch("aiohttp.ClientSession") as mock: + mock.return_value.__aenter__ = AsyncMock() + mock.return_value.__aexit__ = AsyncMock() + yield mock + + +@pytest.fixture +def mock_tcp_connector(): + """Fixture for TCPConnector""" + with patch("aiohttp.TCPConnector") as mock_connector: + mock_connector_instance = Mock() + mock_connector.return_value = mock_connector_instance + yield mock_connector + + +@pytest.fixture +def video_service(config): + """Fixture for VideoCompositeEgressService""" + service = VideoCompositeEgressService(config) + service._handle_request = Mock() # Mock the request handler + return service + + +@pytest.fixture +def audio_service(config): + """Fixture for AudioCompositeEgressService""" + service = AudioCompositeEgressService(config) + service._handle_request = Mock() # Mock the request handler + return service + + +def test_base_egress_initialization(config, mock_s3_upload): + """Test service initialization""" + + service = BaseEgressService(config) + assert service._config == config + + mock_s3_upload.assert_called_once_with( + endpoint="https://s3.test.com", + access_key="test_key", + secret="test_secret", + region="test-region", + bucket="test-bucket", + force_path_style=True, + ) + + +@pytest.mark.parametrize( + "filename,extension,expected", + [ + ("test", "mp4", "/test/output/test.mp4"), + ("recording123", "ogg", "/test/output/recording123.ogg"), + ("live_stream", "m3u8", "/test/output/live_stream.m3u8"), + ], +) +def test_base_egress_filepath_construction(service, filename, extension, expected): + """Test filepath construction with various inputs""" + result = service._get_filepath(filename, extension) + assert result == expected + assert result.startswith(service._config.output_folder) + assert result.endswith(f"{filename}.{extension}") + + +def test_base_egress_handle_request_success( + config, service, mock_client_session, mock_egress_service, mock_tcp_connector +): + """Test successful request handling""" + # Setup mock response + mock_response = Mock() + mock_method = AsyncMock(return_value=mock_response) + mock_egress_instance = Mock() + mock_egress_instance.test_method = mock_method + mock_egress_service.return_value = mock_egress_instance + + # Create test request + test_request = Mock() + + response = service._handle_request(test_request, "test_method") + + mock_client_session.assert_called_once_with( + connector=mock_tcp_connector.return_value + ) + + # Verify EgressService initialization + mock_egress_service.assert_called_once_with( + mock_client_session.return_value.__aenter__.return_value, + **service._config.server_configurations, + ) + + # Verify method call and response + mock_method.assert_called_once_with(test_request) + assert response == mock_response + + +def test_base_egress_handle_request_connection_error(service, mock_egress_service): + """Test handling of connection errors""" + # Setup mock error + mock_method = AsyncMock( + side_effect=livekit_api.TwirpError(msg="Connection failed", code=500) + ) + mock_egress_instance = Mock() + mock_egress_instance.test_method = mock_method + mock_egress_service.return_value = mock_egress_instance + + # Create test request + test_request = Mock() + + # Verify error handling + with pytest.raises(WorkerConnectionError) as exc: + service._handle_request(test_request, "test_method") + + assert "LiveKit client connection error" in str(exc.value) + assert "Connection failed" in str(exc.value) + + +@pytest.mark.parametrize( + "response_status,expected_result", + [ + (livekit_api.EgressStatus.EGRESS_ABORTED, "ABORTED"), + (livekit_api.EgressStatus.EGRESS_COMPLETE, "FAILED_TO_STOP"), + (livekit_api.EgressStatus.EGRESS_ENDING, "STOPPED"), + (livekit_api.EgressStatus.EGRESS_FAILED, "FAILED_TO_STOP"), + ], +) +def test_base_egress_stop_with_status(service, response_status, expected_result): + """Test stop method with different response statuses""" + # Mock _handle_request + mock_response = Mock(status=response_status) + service._handle_request = Mock(return_value=mock_response) + + # Execute stop + result = service.stop("test_worker_id") + + # Verify request and response handling + service._handle_request.assert_called_once_with( + livekit_api.StopEgressRequest(egress_id="test_worker_id"), "stop_egress" + ) + assert result == expected_result + + +def test_base_egress_stop_missing_status(service): + """Test stop method when response is missing status""" + # Mock _handle_request with missing status + mock_response = Mock(status=None) + service._handle_request = Mock(return_value=mock_response) + + # Verify error handling + with pytest.raises(WorkerResponseError) as exc: + service.stop("test_worker_id") + + assert "missing the recording status" in str(exc.value) + + +def test_base_egress_start_not_implemented(service): + """Test that start method raises NotImplementedError""" + with pytest.raises(NotImplementedError) as exc: + service.start("test_room", "test_recording") + assert "Subclass must implement this method" in str(exc.value) + + +@pytest.mark.parametrize("verify_ssl", [True, False]) +def test_base_egress_ssl_verification_config(verify_ssl): + """Test SSL verification configuration""" + config = WorkerServiceConfig( + output_folder="/test/output", + server_configurations={ + "host": "test.livekit.io", + "api_key": "test_key", + "api_secret": "test_secret", + }, + verify_ssl=verify_ssl, + bucket_args={ + "endpoint": "https://s3.test.com", + "access_key": "test_key", + "secret": "test_secret", + "region": "test-region", + "bucket": "test-bucket", + "force_path_style": True, + }, + ) + + service = BaseEgressService(config) + + # Mock ClientSession to capture connector configuration + with patch("aiohttp.ClientSession") as mock_session: + mock_session.return_value.__aenter__ = AsyncMock() + mock_session.return_value.__aexit__ = AsyncMock() + + # Trigger request to verify connector configuration + service._handle_request(Mock(), "test_method") + + # Verify SSL configuration + connector = mock_session.call_args[1]["connector"] + assert isinstance(connector, aiohttp.TCPConnector) + assert connector._ssl == verify_ssl + + +def test_video_composite_egress_hrid(video_service): + """Test HRID is correct""" + assert video_service.hrid == "video-recording-composite-livekit-egress" + + +def test_video_composite_egress_start_success(video_service): + """Test successful start of video composite egress""" + # Setup mock response + egress_id = "test-egress-123" + video_service._handle_request.return_value = Mock(egress_id=egress_id) + + # Test parameters + room_name = "test-room" + recording_id = "rec-123" + + # Call start + result = video_service.start(room_name, recording_id) + + # Verify result + assert result == egress_id + + # Verify request construction + video_service._handle_request.assert_called_once() + request = video_service._handle_request.call_args[0][0] + method = video_service._handle_request.call_args[0][1] + + # Verify request properties + assert isinstance(request, livekit_api.RoomCompositeEgressRequest) + assert request.room_name == room_name + assert len(request.file_outputs) == 1 + + assert not request.audio_only # Video service shouldn't set audio_only + + # Verify file output configuration + file_output = request.file_outputs[0] + assert file_output.file_type == livekit_api.EncodedFileType.MP4 + assert file_output.filepath == f"/test/output/{recording_id}.mp4" + assert file_output.s3.bucket == "test-bucket" + + # Verify method name + assert method == "start_room_composite_egress" + + +def test_video_composite_egress_start_missing_egress_id(video_service): + """Test handling of missing egress ID in response""" + # Setup mock response without egress_id + video_service._handle_request.return_value = Mock(egress_id=None) + + with pytest.raises(WorkerResponseError) as exc_info: + video_service.start("test-room", "rec-123") + + assert "Egress ID not found" in str(exc_info.value) + + +def test_audio_composite_egress_hrid(audio_service): + """Test HRID is correct""" + assert audio_service.hrid == "audio-recording-composite-livekit-egress" + + +def test_audio_composite_egress_start_success(audio_service): + """Test successful start of audio composite egress""" + # Setup mock response + egress_id = "test-egress-123" + audio_service._handle_request.return_value = Mock(egress_id=egress_id) + + # Test parameters + room_name = "test-room" + recording_id = "rec-123" + + # Call start + result = audio_service.start(room_name, recording_id) + + # Verify result + assert result == egress_id + + # Verify request construction + audio_service._handle_request.assert_called_once() + request = audio_service._handle_request.call_args[0][0] + method = audio_service._handle_request.call_args[0][1] + + # Verify request properties + assert isinstance(request, livekit_api.RoomCompositeEgressRequest) + assert request.room_name == room_name + assert len(request.file_outputs) == 1 + assert request.audio_only is True # Audio service should set audio_only + + # Verify file output configuration + file_output = request.file_outputs[0] + assert file_output.file_type == livekit_api.EncodedFileType.OGG + assert file_output.filepath == f"/test/output/{recording_id}.ogg" + assert file_output.s3.bucket == "test-bucket" + + # Verify method name + assert method == "start_room_composite_egress" + + +def test_audio_composite_egress_start_missing_egress_id(audio_service): + """Test handling of missing egress ID in response""" + # Setup mock response without egress_id + audio_service._handle_request.return_value = Mock(egress_id=None) + + with pytest.raises(WorkerResponseError) as exc_info: + audio_service.start("test-room", "rec-123") + + assert "Egress ID not found" in str(exc_info.value) diff --git a/src/backend/meet/settings.py b/src/backend/meet/settings.py index 7bb9116d..b49b5079 100755 --- a/src/backend/meet/settings.py +++ b/src/backend/meet/settings.py @@ -406,6 +406,14 @@ class Base(Configuration): True, environ_name="ALLOW_UNREGISTERED_ROOMS", environ_prefix=None ) + # Recording settings + RECORDING_OUTPUT_FOLDER = values.Value( + "recordings", environ_name="RECORDING_OUTPUT_FOLDER", environ_prefix=None + ) + RECORDING_VERIFY_SSL = values.BooleanValue( + True, environ_name="RECORDING_VERIFY_SSL", environ_prefix=None + ) + # pylint: disable=invalid-name @property def ENVIRONMENT(self): diff --git a/src/backend/pyproject.toml b/src/backend/pyproject.toml index 464fcbd1..30f5a9c7 100644 --- a/src/backend/pyproject.toml +++ b/src/backend/pyproject.toml @@ -57,6 +57,7 @@ dependencies = [ "whitenoise==6.7.0", "mozilla-django-oidc==4.0.1", "livekit-api==0.7.0", + "aiohttp==3.10.10", ] [project.urls]