diff --git a/src/backend/core/api/serializers.py b/src/backend/core/api/serializers.py index 7653728f..abb02459 100644 --- a/src/backend/core/api/serializers.py +++ b/src/backend/core/api/serializers.py @@ -1,9 +1,10 @@ """Client serializers for the Meet core app.""" -# pylint: disable=W0223 +# pylint: disable=W0223,E0611 from django.utils.translation import gettext_lazy as _ +from livekit.api import ParticipantPermission from rest_framework import serializers from rest_framework.exceptions import PermissionDenied from timezone_field.rest_framework import TimeZoneSerializerField @@ -234,3 +235,69 @@ class RoomInviteSerializer(serializers.Serializer): """Validate room invite creation data.""" emails = serializers.ListField(child=serializers.EmailField(), allow_empty=False) + + +class BaseParticipantsManagementSerializer(BaseValidationOnlySerializer): + """Base serializer for participant management operations.""" + + participant_identity = serializers.UUIDField( + help_text="LiveKit participant identity (UUID format)" + ) + + +class MuteParticipantSerializer(BaseParticipantsManagementSerializer): + """Validate participant muting data.""" + + track_sid = serializers.CharField( + max_length=255, help_text="LiveKit track SID to mute" + ) + + +class UpdateParticipantSerializer(BaseParticipantsManagementSerializer): + """Validate participant update data.""" + + metadata = serializers.DictField( + required=False, allow_null=True, help_text="Participant metadata as JSON object" + ) + attributes = serializers.DictField( + required=False, + allow_null=True, + help_text="Participant attributes as JSON object", + ) + permission = serializers.DictField( + required=False, + allow_null=True, + help_text="Participant permission as JSON object", + ) + name = serializers.CharField( + max_length=255, + required=False, + allow_blank=True, + allow_null=True, + help_text="Display name for the participant", + ) + + def validate(self, attrs): + """Ensure at least one update field is provided.""" + update_fields = ["metadata", "attributes", "permission", "name"] + + has_update = any( + field in attrs and attrs[field] is not None and attrs[field] != "" + for field in update_fields + ) + + if not has_update: + raise serializers.ValidationError( + f"At least one of the following fields must be provided: " + f"{', '.join(update_fields)}." + ) + + if "permission" in attrs: + try: + ParticipantPermission(**attrs["permission"]) + except ValueError as e: + raise serializers.ValidationError( + {"permission": f"Invalid permission: {str(e)}"} + ) from e + + return attrs diff --git a/src/backend/core/api/viewsets.py b/src/backend/core/api/viewsets.py index 6e5b6a14..0bf90c27 100644 --- a/src/backend/core/api/viewsets.py +++ b/src/backend/core/api/viewsets.py @@ -50,6 +50,10 @@ from core.services.lobby import ( LobbyParticipantNotFound, LobbyService, ) +from core.services.participants_management import ( + ParticipantsManagement, + ParticipantsManagementException, +) from core.services.room_creation import RoomCreation from core.services.subtitle import SubtitleException, SubtitleService @@ -564,6 +568,104 @@ class RoomViewSet( {"status": "success"}, status=drf_status.HTTP_200_OK ) + @decorators.action( + detail=True, + methods=["post"], + url_path="mute-participant", + url_name="mute-participant", + permission_classes=[permissions.HasPrivilegesOnRoom], + ) + def mute_participant(self, request, pk=None): # pylint: disable=unused-argument + """Mute a specific track for a participant in the room.""" + room = self.get_object() + + serializer = serializers.MuteParticipantSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + try: + ParticipantsManagement().mute( + room_name=str(room.id), + identity=str(serializer.validated_data["participant_identity"]), + track_sid=serializer.validated_data["track_sid"], + ) + except ParticipantsManagementException: + return drf_response.Response( + {"error": "Failed to mute participant"}, + status=drf_status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + return drf_response.Response( + { + "status": "success", + }, + status=drf_status.HTTP_200_OK, + ) + + @decorators.action( + detail=True, + methods=["post"], + url_path="update-participant", + url_name="update-participant", + permission_classes=[permissions.HasPrivilegesOnRoom], + ) + def update_participant(self, request, pk=None): # pylint: disable=unused-argument + """Update participant attributes, permissions, or metadata.""" + room = self.get_object() + + serializer = serializers.UpdateParticipantSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + try: + ParticipantsManagement().update( + room_name=str(room.id), + identity=str(serializer.validated_data["participant_identity"]), + metadata=serializer.validated_data.get("metadata"), + attributes=serializer.validated_data.get("attributes"), + permission=serializer.validated_data.get("permission"), + name=serializer.validated_data.get("name"), + ) + except ParticipantsManagementException: + return drf_response.Response( + {"error": "Failed to update participant"}, + status=drf_status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + return drf_response.Response( + { + "status": "success", + }, + status=drf_status.HTTP_200_OK, + ) + + @decorators.action( + detail=True, + methods=["post"], + url_path="remove-participant", + url_name="remove-participant", + permission_classes=[permissions.HasPrivilegesOnRoom], + ) + def remove_participant(self, request, pk=None): # pylint: disable=unused-argument + """Remove a participant from the room.""" + room = self.get_object() + + serializer = serializers.BaseParticipantsManagementSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + try: + ParticipantsManagement().remove( + room_name=str(room.id), + identity=str(serializer.validated_data["participant_identity"]), + ) + except ParticipantsManagementException: + return drf_response.Response( + {"error": "Failed to remove participant"}, + status=drf_status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + return drf_response.Response( + {"status": "success"}, status=drf_status.HTTP_200_OK + ) + class ResourceAccessViewSet( mixins.CreateModelMixin, diff --git a/src/backend/core/services/participants_management.py b/src/backend/core/services/participants_management.py new file mode 100644 index 00000000..4e07af25 --- /dev/null +++ b/src/backend/core/services/participants_management.py @@ -0,0 +1,128 @@ +"""Participants management service for LiveKit rooms.""" + +# pylint: disable=R0913,E0611,R0917 +# ruff: noqa:PLR0913 + +import json +import uuid +from logging import getLogger +from typing import Dict, Optional + +from asgiref.sync import async_to_sync +from livekit.api import ( + MuteRoomTrackRequest, + RoomParticipantIdentity, + TwirpError, + UpdateParticipantRequest, +) + +from core import utils + +from .lobby import LobbyService + +logger = getLogger(__name__) + + +class ParticipantsManagementException(Exception): + """Exception raised when a participant management operations fail.""" + + +class ParticipantsManagement: + """Service for managing participants.""" + + @async_to_sync + async def mute(self, room_name: str, identity: str, track_sid: str): + """Mute a specific audio or video track for a participant in a room.""" + + lkapi = utils.create_livekit_client() + + try: + await lkapi.room.mute_published_track( + MuteRoomTrackRequest( + room=room_name, + identity=identity, + track_sid=track_sid, + muted=True, + ) + ) + + except TwirpError as e: + logger.exception( + "Unexpected error muting participant %s for room %s", + identity, + room_name, + ) + raise ParticipantsManagementException("Could not mute participant") from e + + finally: + await lkapi.aclose() + + @async_to_sync + async def remove(self, room_name: str, identity: str): + """Remove a participant from a room and clear their lobby cache.""" + + try: + LobbyService().clear_participant_cache( + room_id=uuid.UUID(room_name), participant_id=identity + ) + except (ValueError, TypeError) as exc: + logger.warning( + "participants_management.remove: room_name '%s' is not a UUID; " + "skipping lobby cache clear", + room_name, + exc_info=exc, + ) + + lkapi = utils.create_livekit_client() + + try: + await lkapi.room.remove_participant( + RoomParticipantIdentity(room=room_name, identity=identity) + ) + except TwirpError as e: + logger.exception( + "Unexpected error removing participant %s for room %s", + identity, + room_name, + ) + raise ParticipantsManagementException("Could not remove participant") from e + + finally: + await lkapi.aclose() + + @async_to_sync + async def update( + self, + room_name: str, + identity: str, + metadata: Optional[Dict] = None, + attributes: Optional[Dict] = None, + permission: Optional[Dict] = None, + name: Optional[str] = None, + ): + """Update participant properties such as metadata, attributes, permissions, or name.""" + + lkapi = utils.create_livekit_client() + + try: + await lkapi.room.update_participant( + UpdateParticipantRequest( + room=room_name, + identity=identity, + metadata=json.dumps(metadata), + permission=permission, + attributes=attributes, + name=name, + ) + ) + + except TwirpError as e: + logger.exception( + "Unexpected error updating participant %s for room %s", + identity, + room_name, + ) + raise ParticipantsManagementException("Could not update participant") from e + + finally: + await lkapi.aclose() diff --git a/src/backend/core/tests/rooms/test_api_rooms_participants_management.py b/src/backend/core/tests/rooms/test_api_rooms_participants_management.py new file mode 100644 index 00000000..d363460e --- /dev/null +++ b/src/backend/core/tests/rooms/test_api_rooms_participants_management.py @@ -0,0 +1,417 @@ +""" +Test rooms API endpoints in the Meet core app: participants management. +""" + +# pylint: disable=W0621,W0613, W0212 + +import random +from unittest import mock +from uuid import uuid4 + +from django.urls import reverse + +import pytest +from livekit.api import TwirpError +from rest_framework import status +from rest_framework.test import APIClient + +from core.factories import RoomFactory, UserFactory, UserResourceAccessFactory +from core.services.lobby import LobbyService + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def mock_livekit_client(): + """Mock LiveKit API client.""" + with mock.patch("core.utils.create_livekit_client") as mock_create: + mock_client = mock.AsyncMock() + mock_create.return_value = mock_client + yield mock_client + + +def test_mute_participant_success(mock_livekit_client): + """Test successful participant muting.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = {"participant_identity": str(uuid4()), "track_sid": "test-track-sid"} + + url = reverse("rooms-mute-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_200_OK + assert response.data == {"status": "success"} + + mock_livekit_client.room.mute_published_track.assert_called_once() + mock_livekit_client.aclose.assert_called_once() + + +def test_mute_participant_forbidden_without_access(): + """Test mute participant returns 403 when user lacks room privileges.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() # User without UserResourceAccess + client.force_authenticate(user=user) + + payload = {"participant_identity": str(uuid4()), "track_sid": "test-track-sid"} + + url = reverse("rooms-mute-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_mute_participant_invalid_payload(): + """Test mute participant with invalid payload.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = {"participant_identity": "invalid-uuid", "track_sid": ""} + + url = reverse("rooms-mute-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_mute_participant_unexpected_twirp_error(mock_livekit_client): + """Test mute participant when LiveKit API raises TwirpError.""" + client = APIClient() + + mock_livekit_client.room.mute_published_track.side_effect = TwirpError( + msg="Internal server error", code=500, status=500 + ) + + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = {"participant_identity": str(uuid4()), "track_sid": "test-track-sid"} + + url = reverse("rooms-mute-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert response.data == {"error": "Failed to mute participant"} + + mock_livekit_client.aclose.assert_called_once() + + +def test_update_participant_success(mock_livekit_client): + """Test successful participant update.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = { + "participant_identity": str(uuid4()), + "metadata": {"role": "presenter"}, + "permission": { + "can_subscribe": True, + "can_publish": True, + "can_publish_data": True, + "can_publish_sources": [ + 1, + 2, + ], # [TrackSource.CAMERA, TrackSource.MICROPHONE] + "hidden": False, + "recorder": False, + "can_update_metadata": True, + "agent": False, + "can_subscribe_metrics": False, + }, + "name": "John Doe", + } + + url = reverse("rooms-update-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_200_OK + assert response.data == {"status": "success"} + + mock_livekit_client.room.update_participant.assert_called_once() + mock_livekit_client.aclose.assert_called_once() + + +def test_update_participant_forbidden_without_access(): + """Test update participant returns 403 when user lacks room privileges.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() # User without UserResourceAccess + client.force_authenticate(user=user) + + payload = {"participant_identity": str(uuid4()), "name": "Test User"} + + url = reverse("rooms-update-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_update_participant_invalid_payload(): + """Test update participant with invalid payload.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = {"participant_identity": "invalid-uuid"} + + url = reverse("rooms-update-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert "Must be a valid UUID." in str(response.data) + + +def test_update_participant_no_update_fields(): + """Test update participant with no update fields provided.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = { + "participant_identity": str(uuid4()) + # No metadata, attributes, permission, or name + } + + url = reverse("rooms-update-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert "At least one of the following fields must be provided" in str(response.data) + + +def test_update_participant_invalid_permission(): + """Test update participant with wrong permission object.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = { + "participant_identity": str(uuid4()), + "permission": {"invalid-attributes": True}, + } + + url = reverse("rooms-update-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert "Invalid permission" in str(response.data) + + +def test_update_participant_wrong_metadata_attributes(): + """Test update participant with wrong metadata or attributes provided.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = { + "participant_identity": str(uuid4()), + "metadata": "wrong string", + "attributes": "wrong string", + } + + url = reverse("rooms-update-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert "metadata" in response.data or "attributes" in response.data + + +def test_update_participant_unexpected_twirp_error(mock_livekit_client): + """Test update participant when LiveKit API raises TwirpError.""" + client = APIClient() + + mock_livekit_client.room.update_participant.side_effect = TwirpError( + msg="Internal server error", code=500, status=500 + ) + + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = {"participant_identity": str(uuid4()), "name": "Test User"} + + url = reverse("rooms-update-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert response.data == {"error": "Failed to update participant"} + + mock_livekit_client.aclose.assert_called_once() + + +def test_remove_participant_success_lobby_cache(mock_livekit_client): + """Test successful participant removal. + + The lobby cache cleanup is crucial for security - without it, removed + participants could potentially re-enter the room using their cached + lobby session. + """ + client = APIClient() + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + participant_identity = str(uuid4()) + + # Create participant in lobby cache first + LobbyService().enter(room.id, participant_identity, "John doe") + + # Accept participant + LobbyService().handle_participant_entry(room.id, participant_identity, True) + + payload = {"participant_identity": participant_identity} + + url = reverse("rooms-remove-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_200_OK + assert response.data == {"status": "success"} + + mock_livekit_client.room.remove_participant.assert_called_once() + # called twice: once for Lobby, once for ParticipantManagement + mock_livekit_client.aclose.assert_called() + + # Verify lobby cache was cleared - participant should no longer exist + participant = LobbyService()._get_participant(room.id, participant_identity) + assert participant is None + + +def test_remove_participant_success(mock_livekit_client): + """Test successful participant removal.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = {"participant_identity": str(uuid4())} + + url = reverse("rooms-remove-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_200_OK + assert response.data == {"status": "success"} + + mock_livekit_client.room.remove_participant.assert_called_once() + mock_livekit_client.aclose.assert_called_once() + + +def test_remove_participant_forbidden_without_access(): + """Test remove participant returns 403 when user lacks room privileges.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() # User without UserResourceAccess + client.force_authenticate(user=user) + + payload = {"participant_identity": str(uuid4())} + + url = reverse("rooms-remove-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_remove_participant_invalid_payload(): + """Test remove participant with invalid payload.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = {"participant_identity": "invalid-uuid"} + + url = reverse("rooms-remove-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_remove_participant_missing_identity(): + """Test remove participant with missing participant_identity.""" + client = APIClient() + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = {} # Missing participant_identity + + url = reverse("rooms-remove-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_remove_participant_unexpected_twirp_error(mock_livekit_client): + """Test remove participant when LiveKit API raises TwirpError.""" + client = APIClient() + + mock_livekit_client.room.remove_participant.side_effect = TwirpError( + msg="Internal server error", code=500, status=500 + ) + + room = RoomFactory() + user = UserFactory() + UserResourceAccessFactory( + resource=room, user=user, role=random.choice(["administrator", "owner"]) + ) + client.force_authenticate(user=user) + + payload = {"participant_identity": str(uuid4())} + + url = reverse("rooms-remove-participant", kwargs={"pk": room.id}) + response = client.post(url, payload, format="json") + + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert response.data == {"error": "Failed to remove participant"} + + mock_livekit_client.aclose.assert_called_once()