♻️(backend) refactor backend recording state management

Instead of relying on the egress_started event—which fires when egress is
starting, not actually started—I now rely on egress_updated for more accurate
status updates. This is especially important for the active status, which
triggers after egress has truly joined the room. Using this avoids prematurely
stopping client-side listening to room.isRecording updates. A further
refactoring may remove reliance on room updates entirely.

The goal is to minimize handling metadata in the mediator class. egress_starting
is still used for simplicity, but egress_started could be considered in the
future.

Note: if the API to start egress hasn’t responded yet, the webhook may fail to
find the recording because it currently matches by worker ID. This is unstable.
A better approach would be to pass the database ID in the egress metadata and
recover the recording from it in the webhook.
This commit is contained in:
lebaudantoine
2026-01-04 23:27:33 +01:00
committed by aleb_the_flash
parent 2863aa832d
commit f6cdb1125b
5 changed files with 73 additions and 42 deletions

View File

@@ -1,7 +1,11 @@
"""Recording-related LiveKit Events Service""" """Recording-related LiveKit Events Service"""
# pylint: disable=no-member
from logging import getLogger from logging import getLogger
from livekit import api
from core import models, utils from core import models, utils
logger = getLogger(__name__) logger = getLogger(__name__)
@@ -14,6 +18,27 @@ class RecordingEventsError(Exception):
class RecordingEventsService: class RecordingEventsService:
"""Handles recording-related Livekit webhook events.""" """Handles recording-related Livekit webhook events."""
@staticmethod
def handle_update(recording, egress_status):
"""Handle egress status updates and sync recording state to room metadata."""
room_name = str(recording.room.id)
status_mapping = {
api.EgressStatus.EGRESS_ACTIVE: "started",
api.EgressStatus.EGRESS_ENDING: "saving",
api.EgressStatus.EGRESS_ABORTED: "aborted",
}
recording_status = status_mapping.get(egress_status)
if recording_status:
try:
utils.update_room_metadata(
room_name, {"recording_status": recording_status}
)
except utils.MetadataUpdateException as e:
logger.exception("Failed to update room's metadata: %s", e)
@staticmethod @staticmethod
def handle_limit_reached(recording): def handle_limit_reached(recording):
"""Stop recording and notify participants when limit is reached.""" """Stop recording and notify participants when limit is reached."""

View File

@@ -105,10 +105,4 @@ class WorkerServiceMediator:
finally: finally:
recording.save() recording.save()
try:
room_name = str(recording.room.id)
utils.update_room_metadata(room_name, {"recording_status": "saving"})
except utils.MetadataUpdateException as e:
logger.exception("Failed to update room's metadata: %s", e)
logger.info("Worker stopped for room %s", recording.room) logger.info("Worker stopped for room %s", recording.room)

View File

@@ -138,23 +138,19 @@ class LiveKitEventsService:
# pylint: disable=not-callable # pylint: disable=not-callable
handler(data) handler(data)
def _handle_egress_started(self, data): def _handle_egress_updated(self, data):
"""Handle 'egress_started' event.""" """Handle 'egress_updated' event."""
egress_id = data.egress_info.egress_id
try: try:
recording = models.Recording.objects.get( recording = models.Recording.objects.get(worker_id=egress_id)
worker_id=data.egress_info.egress_id
)
except models.Recording.DoesNotExist as err: except models.Recording.DoesNotExist as err:
raise ActionFailedError( raise ActionFailedError(
f"Recording with worker ID {data.egress_info.egress_id} does not exist" f"Recording with worker ID {egress_id} does not exist"
) from err ) from err
try: egress_status = data.egress_info.status
room_name = str(recording.room.id) self.recording_events.handle_update(recording, egress_status)
utils.update_room_metadata(room_name, {"recording_status": "started"})
except utils.MetadataUpdateException as e:
logger.exception("Failed to update room's metadata: %s", e)
def _handle_egress_ended(self, data): def _handle_egress_ended(self, data):
"""Handle 'egress_ended' event.""" """Handle 'egress_ended' event."""

View File

@@ -122,10 +122,7 @@ def test_mediator_start_recording_from_forbidden_status(
mock_update_room_metadata.assert_not_called() mock_update_room_metadata.assert_not_called()
@mock.patch("core.utils.update_room_metadata") def test_mediator_stop_recording_success(mediator, mock_worker_service):
def test_mediator_stop_recording_success(
mock_update_room_metadata, mediator, mock_worker_service
):
"""Test successful recording stop""" """Test successful recording stop"""
# Setup # Setup
mock_recording = RecordingFactory( mock_recording = RecordingFactory(
@@ -143,15 +140,8 @@ def test_mediator_stop_recording_success(
mock_recording.refresh_from_db() mock_recording.refresh_from_db()
assert mock_recording.status == RecordingStatusChoices.STOPPED assert mock_recording.status == RecordingStatusChoices.STOPPED
mock_update_room_metadata.assert_called_once_with(
str(mock_recording.room.id), {"recording_status": "saving"}
)
def test_mediator_stop_recording_aborted(mediator, mock_worker_service):
@mock.patch("core.utils.update_room_metadata")
def test_mediator_stop_recording_aborted(
mock_update_room_metadata, mediator, mock_worker_service
):
"""Test recording stop when worker returns ABORTED""" """Test recording stop when worker returns ABORTED"""
# Setup # Setup
mock_recording = RecordingFactory( mock_recording = RecordingFactory(
@@ -166,15 +156,10 @@ def test_mediator_stop_recording_aborted(
mock_recording.refresh_from_db() mock_recording.refresh_from_db()
assert mock_recording.status == RecordingStatusChoices.ABORTED assert mock_recording.status == RecordingStatusChoices.ABORTED
mock_update_room_metadata.assert_called_once_with(
str(mock_recording.room.id), {"recording_status": "saving"}
)
@pytest.mark.parametrize("error_class", [WorkerConnectionError, WorkerResponseError]) @pytest.mark.parametrize("error_class", [WorkerConnectionError, WorkerResponseError])
@mock.patch("core.utils.update_room_metadata")
def test_mediator_stop_recording_worker_errors( def test_mediator_stop_recording_worker_errors(
mock_update_room_metadata, mediator, mock_worker_service, error_class mediator, mock_worker_service, error_class
): ):
"""Test handling of worker errors during stop""" """Test handling of worker errors during stop"""
# Setup # Setup
@@ -190,5 +175,3 @@ def test_mediator_stop_recording_worker_errors(
# Verify recording updates # Verify recording updates
mock_recording.refresh_from_db() mock_recording.refresh_from_db()
assert mock_recording.status == RecordingStatusChoices.FAILED_TO_STOP assert mock_recording.status == RecordingStatusChoices.FAILED_TO_STOP
mock_update_room_metadata.assert_not_called()

View File

@@ -94,22 +94,55 @@ def test_handle_egress_ended_success(
assert recording.status == "stopped" assert recording.status == "stopped"
@pytest.mark.parametrize(
("egress_status", "status"),
(
(EgressStatus.EGRESS_ACTIVE, "started"),
(EgressStatus.EGRESS_ENDING, "saving"),
(EgressStatus.EGRESS_ABORTED, "aborted"),
),
)
@mock.patch("core.utils.update_room_metadata") @mock.patch("core.utils.update_room_metadata")
def test_handle_egress_started_success(mock_update_room_metadata, service): def test_handle_egress_updated_success(
"""Should successfully start recording and update room's metadata.""" mock_update_room_metadata, egress_status, status, service
):
"""Should successfully update room's metadata."""
recording = RecordingFactory(worker_id="worker-1", status="initiated") recording = RecordingFactory(worker_id="worker-1", status="initiated")
mock_data = mock.MagicMock() mock_data = mock.MagicMock()
mock_data.egress_info.egress_id = recording.worker_id mock_data.egress_info.egress_id = recording.worker_id
mock_data.egress_info.status = EgressStatus.EGRESS_ACTIVE mock_data.egress_info.status = egress_status
service._handle_egress_started(mock_data) service._handle_egress_updated(mock_data)
mock_update_room_metadata.assert_called_once_with( mock_update_room_metadata.assert_called_once_with(
str(recording.room.id), {"recording_status": "started"} str(recording.room.id), {"recording_status": status}
) )
@pytest.mark.parametrize(
"egress_status",
(
EgressStatus.EGRESS_FAILED,
EgressStatus.EGRESS_LIMIT_REACHED,
),
)
@mock.patch("core.utils.update_room_metadata")
def test_handle_egress_updated_non_handled(
mock_update_room_metadata, egress_status, service
):
"""Should ignore certain egress status and don't trigger metadata updates."""
recording = RecordingFactory(worker_id="worker-1", status="initiated")
mock_data = mock.MagicMock()
mock_data.egress_info.egress_id = recording.worker_id
mock_data.egress_info.status = egress_status
service._handle_egress_updated(mock_data)
mock_update_room_metadata.assert_not_called()
@pytest.mark.parametrize( @pytest.mark.parametrize(
("mode", "notification_type"), ("mode", "notification_type"),
( (