✨(project) add CRUD API endpoints for Rooms and ResourceAccess models
Introduce CRUD API endpoints for the Rooms and ResourceAccess models. The code follows the Magnify logic, with the exception that token generation has been removed and replaced by a TODO item with a mocked value. Proper integration of LiveKit will be added in future commits. With the removal of group logic, some complex query sets can be simplified. Previously, we checked for both direct and indirect access to a room. Indirect access meant a room was shared with a group, and the user was a member of that group. I haven’t simplified those query set, as I preferred isolate changes in dedicated commits. Additionally, all previous tests are still passing, although tests related to groups have been removed.
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
"""Permission handlers for the impress core app."""
|
||||
from rest_framework import permissions
|
||||
|
||||
from ..models import RoleChoices
|
||||
|
||||
ACTION_FOR_METHOD_TO_PERMISSION = {
|
||||
"versions_detail": {"DELETE": "versions_destroy", "GET": "versions_retrieve"}
|
||||
}
|
||||
@@ -34,3 +36,49 @@ class IsSelf(IsAuthenticated):
|
||||
def has_object_permission(self, request, view, obj):
|
||||
"""Write permissions are only allowed to the user itself."""
|
||||
return obj == request.user
|
||||
|
||||
|
||||
class RoomPermissions(permissions.BasePermission):
|
||||
"""
|
||||
Permissions applying to the room API endpoint.
|
||||
"""
|
||||
|
||||
def has_permission(self, request, view):
|
||||
"""Only allow authenticated users for unsafe methods."""
|
||||
if request.method in permissions.SAFE_METHODS:
|
||||
return True
|
||||
|
||||
return request.user.is_authenticated
|
||||
|
||||
def has_object_permission(self, request, view, obj):
|
||||
"""Object permissions are only given to administrators of the room."""
|
||||
|
||||
if request.method in permissions.SAFE_METHODS:
|
||||
return True
|
||||
|
||||
user = request.user
|
||||
|
||||
if request.method == "DELETE":
|
||||
return obj.is_owner(user)
|
||||
|
||||
return obj.is_administrator(user)
|
||||
|
||||
|
||||
class ResourceAccessPermission(permissions.BasePermission):
|
||||
"""
|
||||
Permissions for a room that can only be updated by room administrators.
|
||||
"""
|
||||
|
||||
def has_permission(self, request, view):
|
||||
"""Only allow authenticated users."""
|
||||
return request.user.is_authenticated
|
||||
|
||||
def has_object_permission(self, request, view, obj):
|
||||
"""
|
||||
Check that the logged-in user is administrator of the linked room.
|
||||
"""
|
||||
user = request.user
|
||||
if request.method == "DELETE" and obj.role == RoleChoices.OWNER:
|
||||
return obj.user == user
|
||||
|
||||
return obj.resource.is_administrator(user)
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
"""Client serializers for the impress core app."""
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from rest_framework import serializers
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
|
||||
from core import models
|
||||
|
||||
@@ -11,3 +14,120 @@ class UserSerializer(serializers.ModelSerializer):
|
||||
model = models.User
|
||||
fields = ["id", "email"]
|
||||
read_only_fields = ["id", "email"]
|
||||
|
||||
|
||||
class ResourceAccessSerializerMixin:
|
||||
"""
|
||||
A serializer mixin to share controlling that the logged-in user submitting a room access object
|
||||
is administrator on the targeted room.
|
||||
"""
|
||||
|
||||
# pylint: disable=too-many-boolean-expressions
|
||||
def validate(self, data):
|
||||
"""
|
||||
Check access rights specific to writing (create/update)
|
||||
"""
|
||||
request = self.context.get("request", None)
|
||||
user = getattr(request, "user", None)
|
||||
if (
|
||||
# Update
|
||||
self.instance
|
||||
and (
|
||||
data["role"] == models.RoleChoices.OWNER
|
||||
and not self.instance.resource.is_owner(user)
|
||||
or self.instance.role == models.RoleChoices.OWNER
|
||||
and not self.instance.user == user
|
||||
)
|
||||
) or (
|
||||
# Create
|
||||
not self.instance
|
||||
and data.get("role") == models.RoleChoices.OWNER
|
||||
and not data["resource"].is_owner(user)
|
||||
):
|
||||
raise PermissionDenied(
|
||||
"Only owners of a room can assign other users as owners."
|
||||
)
|
||||
return data
|
||||
|
||||
def validate_resource(self, resource):
|
||||
"""The logged-in user must be administrator of the resource."""
|
||||
request = self.context.get("request", None)
|
||||
user = getattr(request, "user", None)
|
||||
|
||||
if not (user and user.is_authenticated and resource.is_administrator(user)):
|
||||
raise PermissionDenied(
|
||||
_("You must be administrator or owner of a room to add accesses to it.")
|
||||
)
|
||||
|
||||
return resource
|
||||
|
||||
|
||||
class ResourceAccessSerializer(
|
||||
ResourceAccessSerializerMixin, serializers.ModelSerializer
|
||||
):
|
||||
"""Serialize Room to User accesses for the API."""
|
||||
|
||||
class Meta:
|
||||
model = models.ResourceAccess
|
||||
fields = ["id", "user", "resource", "role"]
|
||||
read_only_fields = ["id"]
|
||||
|
||||
def update(self, instance, validated_data):
|
||||
"""Make "user" and "resource" fields readonly but only on update."""
|
||||
validated_data.pop("resource", None)
|
||||
validated_data.pop("user", None)
|
||||
return super().update(instance, validated_data)
|
||||
|
||||
|
||||
class NestedResourceAccessSerializer(ResourceAccessSerializer):
|
||||
"""Serialize Room accesses for the API with full nested user."""
|
||||
|
||||
user = UserSerializer(read_only=True)
|
||||
|
||||
|
||||
class RoomSerializer(serializers.ModelSerializer):
|
||||
"""Serialize Room model for the API."""
|
||||
|
||||
class Meta:
|
||||
model = models.Room
|
||||
fields = ["id", "name", "slug", "configuration", "is_public"]
|
||||
read_only_fields = ["id", "slug"]
|
||||
|
||||
def to_representation(self, instance):
|
||||
"""
|
||||
Add users only for administrator users.
|
||||
Add LiveKit credentials for public instance or related users/groups
|
||||
"""
|
||||
output = super().to_representation(instance)
|
||||
request = self.context.get("request")
|
||||
|
||||
if not request:
|
||||
return output
|
||||
|
||||
role = instance.get_role(request.user)
|
||||
is_admin = models.RoleChoices.check_administrator_role(role)
|
||||
|
||||
if role is not None:
|
||||
access_serializer = NestedResourceAccessSerializer(
|
||||
instance.accesses.select_related("resource", "user").all(),
|
||||
context=self.context,
|
||||
many=True,
|
||||
)
|
||||
output["accesses"] = access_serializer.data
|
||||
|
||||
if not is_admin:
|
||||
del output["configuration"]
|
||||
|
||||
if role is not None or instance.is_public:
|
||||
output["livekit"] = {
|
||||
# todo - generate a proper livekit name
|
||||
"room": "foo",
|
||||
# todo - generate a proper token
|
||||
"token": "foo",
|
||||
}
|
||||
|
||||
output["is_administrable"] = is_admin
|
||||
|
||||
# todo - pass properly livekit configuration
|
||||
|
||||
return output
|
||||
|
||||
@@ -1,4 +1,12 @@
|
||||
"""API endpoints"""
|
||||
import uuid
|
||||
|
||||
from django.conf import settings
|
||||
from django.db.models import Q
|
||||
from django.http import Http404
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.utils.text import slugify
|
||||
|
||||
from rest_framework import (
|
||||
decorators,
|
||||
mixins,
|
||||
@@ -140,3 +148,131 @@ class UserViewSet(
|
||||
return drf_response.Response(
|
||||
self.serializer_class(request.user, context=context).data
|
||||
)
|
||||
|
||||
|
||||
class RoomViewSet(
|
||||
mixins.CreateModelMixin,
|
||||
mixins.DestroyModelMixin,
|
||||
mixins.UpdateModelMixin,
|
||||
viewsets.GenericViewSet,
|
||||
):
|
||||
"""
|
||||
API endpoints to access and perform actions on rooms.
|
||||
"""
|
||||
|
||||
permission_classes = [permissions.RoomPermissions]
|
||||
queryset = models.Room.objects.all()
|
||||
serializer_class = serializers.RoomSerializer
|
||||
|
||||
def get_object(self):
|
||||
"""Allow getting a room by its slug."""
|
||||
try:
|
||||
uuid.UUID(self.kwargs["pk"])
|
||||
filter_kwargs = {"pk": self.kwargs["pk"]}
|
||||
except ValueError:
|
||||
filter_kwargs = {"slug": slugify(self.kwargs["pk"])}
|
||||
queryset = self.filter_queryset(self.get_queryset())
|
||||
obj = get_object_or_404(queryset, **filter_kwargs)
|
||||
# May raise a permission denied
|
||||
self.check_object_permissions(self.request, obj)
|
||||
return obj
|
||||
|
||||
def retrieve(self, request, *args, **kwargs):
|
||||
"""
|
||||
Allow unregistered rooms when activated.
|
||||
For unregistered rooms we only return a null id and the livekit room and token.
|
||||
"""
|
||||
try:
|
||||
instance = self.get_object()
|
||||
except Http404:
|
||||
if not settings.ALLOW_UNREGISTERED_ROOMS:
|
||||
raise
|
||||
slug = slugify(self.kwargs["pk"])
|
||||
data = {
|
||||
"id": None,
|
||||
"livekit": {
|
||||
"room": slug,
|
||||
# todo - generate a proper token
|
||||
"token": "foo",
|
||||
},
|
||||
}
|
||||
else:
|
||||
data = self.get_serializer(instance).data
|
||||
|
||||
return drf_response.Response(data)
|
||||
|
||||
def list(self, request, *args, **kwargs):
|
||||
"""Limit listed rooms to the ones related to the authenticated user."""
|
||||
user = self.request.user
|
||||
|
||||
if user.is_authenticated:
|
||||
# todo - simplify this queryset
|
||||
queryset = (
|
||||
self.filter_queryset(self.get_queryset())
|
||||
.filter(Q(users=user))
|
||||
.distinct()
|
||||
)
|
||||
else:
|
||||
queryset = self.get_queryset().none()
|
||||
|
||||
page = self.paginate_queryset(queryset)
|
||||
if page is not None:
|
||||
serializer = self.get_serializer(page, many=True)
|
||||
return self.get_paginated_response(serializer.data)
|
||||
|
||||
serializer = self.get_serializer(queryset, many=True)
|
||||
return drf_response.Response(serializer.data)
|
||||
|
||||
def perform_create(self, serializer):
|
||||
"""Set the current user as owner of the newly created room."""
|
||||
room = serializer.save()
|
||||
models.ResourceAccess.objects.create(
|
||||
resource=room,
|
||||
user=self.request.user,
|
||||
role=models.RoleChoices.OWNER,
|
||||
)
|
||||
|
||||
|
||||
class ResourceAccessListModelMixin:
|
||||
"""List mixin for resource access API."""
|
||||
|
||||
def get_permissions(self):
|
||||
"""User only needs to be authenticated to list rooms access"""
|
||||
if self.action == "list":
|
||||
permission_classes = [permissions.IsAuthenticated]
|
||||
else:
|
||||
return super().get_permissions()
|
||||
|
||||
return [permission() for permission in permission_classes]
|
||||
|
||||
def get_queryset(self):
|
||||
"""Return the queryset according to the action."""
|
||||
queryset = super().get_queryset()
|
||||
if self.action == "list":
|
||||
user = self.request.user
|
||||
queryset = queryset.filter(
|
||||
Q(resource__accesses__user=user),
|
||||
resource__accesses__role__in=[
|
||||
models.RoleChoices.ADMIN,
|
||||
models.RoleChoices.OWNER,
|
||||
],
|
||||
).distinct()
|
||||
return queryset
|
||||
|
||||
|
||||
class ResourceAccessViewSet(
|
||||
ResourceAccessListModelMixin,
|
||||
mixins.CreateModelMixin,
|
||||
mixins.DestroyModelMixin,
|
||||
mixins.ListModelMixin,
|
||||
mixins.RetrieveModelMixin,
|
||||
mixins.UpdateModelMixin,
|
||||
viewsets.GenericViewSet,
|
||||
):
|
||||
"""
|
||||
API endpoints to access and perform actions on resource accesses.
|
||||
"""
|
||||
|
||||
permission_classes = [permissions.ResourceAccessPermission]
|
||||
queryset = models.ResourceAccess.objects.all()
|
||||
serializer_class = serializers.ResourceAccessSerializer
|
||||
|
||||
Reference in New Issue
Block a user