♻️(backend) move LiveKit participant management to server-side API

Refactor client-side LiveKit API calls to server-side endpoints
following LiveKit documentation recommendations for participant
management operations.

Replaces hacky direct client calls with proper backend-mediated
requests, improving security and following official LiveKit
This commit is contained in:
lebaudantoine
2025-08-26 22:31:54 +02:00
committed by aleb_the_flash
parent 84e62246b7
commit 5f70840398
4 changed files with 715 additions and 1 deletions

View File

@@ -1,9 +1,10 @@
"""Client serializers for the Meet core app."""
# pylint: disable=W0223
# pylint: disable=W0223,E0611
from django.utils.translation import gettext_lazy as _
from livekit.api import ParticipantPermission
from rest_framework import serializers
from rest_framework.exceptions import PermissionDenied
from timezone_field.rest_framework import TimeZoneSerializerField
@@ -234,3 +235,69 @@ class RoomInviteSerializer(serializers.Serializer):
"""Validate room invite creation data."""
emails = serializers.ListField(child=serializers.EmailField(), allow_empty=False)
class BaseParticipantsManagementSerializer(BaseValidationOnlySerializer):
"""Base serializer for participant management operations."""
participant_identity = serializers.UUIDField(
help_text="LiveKit participant identity (UUID format)"
)
class MuteParticipantSerializer(BaseParticipantsManagementSerializer):
"""Validate participant muting data."""
track_sid = serializers.CharField(
max_length=255, help_text="LiveKit track SID to mute"
)
class UpdateParticipantSerializer(BaseParticipantsManagementSerializer):
"""Validate participant update data."""
metadata = serializers.DictField(
required=False, allow_null=True, help_text="Participant metadata as JSON object"
)
attributes = serializers.DictField(
required=False,
allow_null=True,
help_text="Participant attributes as JSON object",
)
permission = serializers.DictField(
required=False,
allow_null=True,
help_text="Participant permission as JSON object",
)
name = serializers.CharField(
max_length=255,
required=False,
allow_blank=True,
allow_null=True,
help_text="Display name for the participant",
)
def validate(self, attrs):
"""Ensure at least one update field is provided."""
update_fields = ["metadata", "attributes", "permission", "name"]
has_update = any(
field in attrs and attrs[field] is not None and attrs[field] != ""
for field in update_fields
)
if not has_update:
raise serializers.ValidationError(
f"At least one of the following fields must be provided: "
f"{', '.join(update_fields)}."
)
if "permission" in attrs:
try:
ParticipantPermission(**attrs["permission"])
except ValueError as e:
raise serializers.ValidationError(
{"permission": f"Invalid permission: {str(e)}"}
) from e
return attrs

View File

@@ -50,6 +50,10 @@ from core.services.lobby import (
LobbyParticipantNotFound,
LobbyService,
)
from core.services.participants_management import (
ParticipantsManagement,
ParticipantsManagementException,
)
from core.services.room_creation import RoomCreation
from core.services.subtitle import SubtitleException, SubtitleService
@@ -564,6 +568,104 @@ class RoomViewSet(
{"status": "success"}, status=drf_status.HTTP_200_OK
)
@decorators.action(
detail=True,
methods=["post"],
url_path="mute-participant",
url_name="mute-participant",
permission_classes=[permissions.HasPrivilegesOnRoom],
)
def mute_participant(self, request, pk=None): # pylint: disable=unused-argument
"""Mute a specific track for a participant in the room."""
room = self.get_object()
serializer = serializers.MuteParticipantSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
try:
ParticipantsManagement().mute(
room_name=str(room.id),
identity=str(serializer.validated_data["participant_identity"]),
track_sid=serializer.validated_data["track_sid"],
)
except ParticipantsManagementException:
return drf_response.Response(
{"error": "Failed to mute participant"},
status=drf_status.HTTP_500_INTERNAL_SERVER_ERROR,
)
return drf_response.Response(
{
"status": "success",
},
status=drf_status.HTTP_200_OK,
)
@decorators.action(
detail=True,
methods=["post"],
url_path="update-participant",
url_name="update-participant",
permission_classes=[permissions.HasPrivilegesOnRoom],
)
def update_participant(self, request, pk=None): # pylint: disable=unused-argument
"""Update participant attributes, permissions, or metadata."""
room = self.get_object()
serializer = serializers.UpdateParticipantSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
try:
ParticipantsManagement().update(
room_name=str(room.id),
identity=str(serializer.validated_data["participant_identity"]),
metadata=serializer.validated_data.get("metadata"),
attributes=serializer.validated_data.get("attributes"),
permission=serializer.validated_data.get("permission"),
name=serializer.validated_data.get("name"),
)
except ParticipantsManagementException:
return drf_response.Response(
{"error": "Failed to update participant"},
status=drf_status.HTTP_500_INTERNAL_SERVER_ERROR,
)
return drf_response.Response(
{
"status": "success",
},
status=drf_status.HTTP_200_OK,
)
@decorators.action(
detail=True,
methods=["post"],
url_path="remove-participant",
url_name="remove-participant",
permission_classes=[permissions.HasPrivilegesOnRoom],
)
def remove_participant(self, request, pk=None): # pylint: disable=unused-argument
"""Remove a participant from the room."""
room = self.get_object()
serializer = serializers.BaseParticipantsManagementSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
try:
ParticipantsManagement().remove(
room_name=str(room.id),
identity=str(serializer.validated_data["participant_identity"]),
)
except ParticipantsManagementException:
return drf_response.Response(
{"error": "Failed to remove participant"},
status=drf_status.HTTP_500_INTERNAL_SERVER_ERROR,
)
return drf_response.Response(
{"status": "success"}, status=drf_status.HTTP_200_OK
)
class ResourceAccessViewSet(
mixins.CreateModelMixin,

View File

@@ -0,0 +1,128 @@
"""Participants management service for LiveKit rooms."""
# pylint: disable=R0913,E0611,R0917
# ruff: noqa:PLR0913
import json
import uuid
from logging import getLogger
from typing import Dict, Optional
from asgiref.sync import async_to_sync
from livekit.api import (
MuteRoomTrackRequest,
RoomParticipantIdentity,
TwirpError,
UpdateParticipantRequest,
)
from core import utils
from .lobby import LobbyService
logger = getLogger(__name__)
class ParticipantsManagementException(Exception):
"""Exception raised when a participant management operations fail."""
class ParticipantsManagement:
"""Service for managing participants."""
@async_to_sync
async def mute(self, room_name: str, identity: str, track_sid: str):
"""Mute a specific audio or video track for a participant in a room."""
lkapi = utils.create_livekit_client()
try:
await lkapi.room.mute_published_track(
MuteRoomTrackRequest(
room=room_name,
identity=identity,
track_sid=track_sid,
muted=True,
)
)
except TwirpError as e:
logger.exception(
"Unexpected error muting participant %s for room %s",
identity,
room_name,
)
raise ParticipantsManagementException("Could not mute participant") from e
finally:
await lkapi.aclose()
@async_to_sync
async def remove(self, room_name: str, identity: str):
"""Remove a participant from a room and clear their lobby cache."""
try:
LobbyService().clear_participant_cache(
room_id=uuid.UUID(room_name), participant_id=identity
)
except (ValueError, TypeError) as exc:
logger.warning(
"participants_management.remove: room_name '%s' is not a UUID; "
"skipping lobby cache clear",
room_name,
exc_info=exc,
)
lkapi = utils.create_livekit_client()
try:
await lkapi.room.remove_participant(
RoomParticipantIdentity(room=room_name, identity=identity)
)
except TwirpError as e:
logger.exception(
"Unexpected error removing participant %s for room %s",
identity,
room_name,
)
raise ParticipantsManagementException("Could not remove participant") from e
finally:
await lkapi.aclose()
@async_to_sync
async def update(
self,
room_name: str,
identity: str,
metadata: Optional[Dict] = None,
attributes: Optional[Dict] = None,
permission: Optional[Dict] = None,
name: Optional[str] = None,
):
"""Update participant properties such as metadata, attributes, permissions, or name."""
lkapi = utils.create_livekit_client()
try:
await lkapi.room.update_participant(
UpdateParticipantRequest(
room=room_name,
identity=identity,
metadata=json.dumps(metadata),
permission=permission,
attributes=attributes,
name=name,
)
)
except TwirpError as e:
logger.exception(
"Unexpected error updating participant %s for room %s",
identity,
room_name,
)
raise ParticipantsManagementException("Could not update participant") from e
finally:
await lkapi.aclose()

View File

@@ -0,0 +1,417 @@
"""
Test rooms API endpoints in the Meet core app: participants management.
"""
# pylint: disable=W0621,W0613, W0212
import random
from unittest import mock
from uuid import uuid4
from django.urls import reverse
import pytest
from livekit.api import TwirpError
from rest_framework import status
from rest_framework.test import APIClient
from core.factories import RoomFactory, UserFactory, UserResourceAccessFactory
from core.services.lobby import LobbyService
pytestmark = pytest.mark.django_db
@pytest.fixture
def mock_livekit_client():
"""Mock LiveKit API client."""
with mock.patch("core.utils.create_livekit_client") as mock_create:
mock_client = mock.AsyncMock()
mock_create.return_value = mock_client
yield mock_client
def test_mute_participant_success(mock_livekit_client):
"""Test successful participant muting."""
client = APIClient()
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {"participant_identity": str(uuid4()), "track_sid": "test-track-sid"}
url = reverse("rooms-mute-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_200_OK
assert response.data == {"status": "success"}
mock_livekit_client.room.mute_published_track.assert_called_once()
mock_livekit_client.aclose.assert_called_once()
def test_mute_participant_forbidden_without_access():
"""Test mute participant returns 403 when user lacks room privileges."""
client = APIClient()
room = RoomFactory()
user = UserFactory() # User without UserResourceAccess
client.force_authenticate(user=user)
payload = {"participant_identity": str(uuid4()), "track_sid": "test-track-sid"}
url = reverse("rooms-mute-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_mute_participant_invalid_payload():
"""Test mute participant with invalid payload."""
client = APIClient()
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {"participant_identity": "invalid-uuid", "track_sid": ""}
url = reverse("rooms-mute-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_mute_participant_unexpected_twirp_error(mock_livekit_client):
"""Test mute participant when LiveKit API raises TwirpError."""
client = APIClient()
mock_livekit_client.room.mute_published_track.side_effect = TwirpError(
msg="Internal server error", code=500, status=500
)
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {"participant_identity": str(uuid4()), "track_sid": "test-track-sid"}
url = reverse("rooms-mute-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert response.data == {"error": "Failed to mute participant"}
mock_livekit_client.aclose.assert_called_once()
def test_update_participant_success(mock_livekit_client):
"""Test successful participant update."""
client = APIClient()
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {
"participant_identity": str(uuid4()),
"metadata": {"role": "presenter"},
"permission": {
"can_subscribe": True,
"can_publish": True,
"can_publish_data": True,
"can_publish_sources": [
1,
2,
], # [TrackSource.CAMERA, TrackSource.MICROPHONE]
"hidden": False,
"recorder": False,
"can_update_metadata": True,
"agent": False,
"can_subscribe_metrics": False,
},
"name": "John Doe",
}
url = reverse("rooms-update-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_200_OK
assert response.data == {"status": "success"}
mock_livekit_client.room.update_participant.assert_called_once()
mock_livekit_client.aclose.assert_called_once()
def test_update_participant_forbidden_without_access():
"""Test update participant returns 403 when user lacks room privileges."""
client = APIClient()
room = RoomFactory()
user = UserFactory() # User without UserResourceAccess
client.force_authenticate(user=user)
payload = {"participant_identity": str(uuid4()), "name": "Test User"}
url = reverse("rooms-update-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_update_participant_invalid_payload():
"""Test update participant with invalid payload."""
client = APIClient()
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {"participant_identity": "invalid-uuid"}
url = reverse("rooms-update-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "Must be a valid UUID." in str(response.data)
def test_update_participant_no_update_fields():
"""Test update participant with no update fields provided."""
client = APIClient()
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {
"participant_identity": str(uuid4())
# No metadata, attributes, permission, or name
}
url = reverse("rooms-update-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "At least one of the following fields must be provided" in str(response.data)
def test_update_participant_invalid_permission():
"""Test update participant with wrong permission object."""
client = APIClient()
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {
"participant_identity": str(uuid4()),
"permission": {"invalid-attributes": True},
}
url = reverse("rooms-update-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "Invalid permission" in str(response.data)
def test_update_participant_wrong_metadata_attributes():
"""Test update participant with wrong metadata or attributes provided."""
client = APIClient()
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {
"participant_identity": str(uuid4()),
"metadata": "wrong string",
"attributes": "wrong string",
}
url = reverse("rooms-update-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "metadata" in response.data or "attributes" in response.data
def test_update_participant_unexpected_twirp_error(mock_livekit_client):
"""Test update participant when LiveKit API raises TwirpError."""
client = APIClient()
mock_livekit_client.room.update_participant.side_effect = TwirpError(
msg="Internal server error", code=500, status=500
)
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {"participant_identity": str(uuid4()), "name": "Test User"}
url = reverse("rooms-update-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert response.data == {"error": "Failed to update participant"}
mock_livekit_client.aclose.assert_called_once()
def test_remove_participant_success_lobby_cache(mock_livekit_client):
"""Test successful participant removal.
The lobby cache cleanup is crucial for security - without it, removed
participants could potentially re-enter the room using their cached
lobby session.
"""
client = APIClient()
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
participant_identity = str(uuid4())
# Create participant in lobby cache first
LobbyService().enter(room.id, participant_identity, "John doe")
# Accept participant
LobbyService().handle_participant_entry(room.id, participant_identity, True)
payload = {"participant_identity": participant_identity}
url = reverse("rooms-remove-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_200_OK
assert response.data == {"status": "success"}
mock_livekit_client.room.remove_participant.assert_called_once()
# called twice: once for Lobby, once for ParticipantManagement
mock_livekit_client.aclose.assert_called()
# Verify lobby cache was cleared - participant should no longer exist
participant = LobbyService()._get_participant(room.id, participant_identity)
assert participant is None
def test_remove_participant_success(mock_livekit_client):
"""Test successful participant removal."""
client = APIClient()
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {"participant_identity": str(uuid4())}
url = reverse("rooms-remove-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_200_OK
assert response.data == {"status": "success"}
mock_livekit_client.room.remove_participant.assert_called_once()
mock_livekit_client.aclose.assert_called_once()
def test_remove_participant_forbidden_without_access():
"""Test remove participant returns 403 when user lacks room privileges."""
client = APIClient()
room = RoomFactory()
user = UserFactory() # User without UserResourceAccess
client.force_authenticate(user=user)
payload = {"participant_identity": str(uuid4())}
url = reverse("rooms-remove-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_remove_participant_invalid_payload():
"""Test remove participant with invalid payload."""
client = APIClient()
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {"participant_identity": "invalid-uuid"}
url = reverse("rooms-remove-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_remove_participant_missing_identity():
"""Test remove participant with missing participant_identity."""
client = APIClient()
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {} # Missing participant_identity
url = reverse("rooms-remove-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_remove_participant_unexpected_twirp_error(mock_livekit_client):
"""Test remove participant when LiveKit API raises TwirpError."""
client = APIClient()
mock_livekit_client.room.remove_participant.side_effect = TwirpError(
msg="Internal server error", code=500, status=500
)
room = RoomFactory()
user = UserFactory()
UserResourceAccessFactory(
resource=room, user=user, role=random.choice(["administrator", "owner"])
)
client.force_authenticate(user=user)
payload = {"participant_identity": str(uuid4())}
url = reverse("rooms-remove-participant", kwargs={"pk": room.id})
response = client.post(url, payload, format="json")
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert response.data == {"error": "Failed to remove participant"}
mock_livekit_client.aclose.assert_called_once()