🐛(backend) allow creating accesses when privileged by heritage

We took the opportunity of this bug to refactor serializers and
permissions as advised one day by @qbey: no permission checks in
serializers.
This commit is contained in:
Samuel Paccoud - DINUM
2025-05-06 09:41:16 +02:00
committed by Anthony LC
parent d12c637dad
commit 433cead0ac
6 changed files with 192 additions and 134 deletions

View File

@@ -6,6 +6,7 @@ from django.http import Http404
from rest_framework import permissions
from core import choices
from core.models import DocumentAccess, RoleChoices, get_trashbin_cutoff
ACTION_FOR_METHOD_TO_PERMISSION = {
@@ -96,26 +97,27 @@ class CanCreateInvitationPermission(permissions.BasePermission):
).exists()
class AccessPermission(permissions.BasePermission):
"""Permission class for access objects."""
class ResourceWithAccessPermission(permissions.BasePermission):
"""A permission class for templates and invitations."""
def has_permission(self, request, view):
"""check create permission for templates."""
return request.user.is_authenticated or view.action != "create"
def has_object_permission(self, request, view, obj):
"""Check permission for a given object."""
abilities = obj.get_abilities(request.user)
action = view.action
try:
action = ACTION_FOR_METHOD_TO_PERMISSION[view.action][request.method]
except KeyError:
pass
return abilities.get(action, False)
class DocumentAccessPermission(AccessPermission):
class DocumentPermission(permissions.BasePermission):
"""Subclass to handle soft deletion specificities."""
def has_permission(self, request, view):
"""check create permission for documents."""
return request.user.is_authenticated or view.action != "create"
def has_object_permission(self, request, view, obj):
"""
Return a 404 on deleted documents
@@ -127,10 +129,45 @@ class DocumentAccessPermission(AccessPermission):
) and deleted_at < get_trashbin_cutoff():
raise Http404
# Compute permission first to ensure the "user_roles" attribute is set
has_permission = super().has_object_permission(request, view, obj)
abilities = obj.get_abilities(request.user)
action = view.action
try:
action = ACTION_FOR_METHOD_TO_PERMISSION[view.action][request.method]
except KeyError:
pass
has_permission = abilities.get(action, False)
if obj.ancestors_deleted_at and not RoleChoices.OWNER in obj.user_roles:
raise Http404
return has_permission
class ResourceAccessPermission(IsAuthenticated):
"""Permission class for document access objects."""
def has_permission(self, request, view):
"""check create permission for accesses in documents tree."""
if super().has_permission(request, view) is False:
return False
if view.action == "create":
role = getattr(view, view.resource_field_name).get_role(request.user)
if role not in choices.PRIVILEGED_ROLES:
raise exceptions.PermissionDenied(
"You are not allowed to manage accesses for this resource."
)
return True
def has_object_permission(self, request, view, obj):
"""Check permission for a given object."""
abilities = obj.get_abilities(request.user)
requested_role = request.data.get("role")
if requested_role and requested_role not in abilities.get("set_role_to", []):
return False
action = view.action
return abilities.get(action, False)

View File

@@ -10,7 +10,7 @@ from django.utils.functional import lazy
from django.utils.translation import gettext_lazy as _
import magic
from rest_framework import exceptions, serializers
from rest_framework import serializers
from core import choices, enums, models, utils
from core.services.ai_services import AI_ACTIONS
@@ -38,78 +38,7 @@ class UserLightSerializer(UserSerializer):
read_only_fields = ["full_name", "short_name"]
class BaseAccessSerializer(serializers.ModelSerializer):
"""Serialize template accesses."""
abilities = serializers.SerializerMethodField(read_only=True)
def update(self, instance, validated_data):
"""Make "user" field is readonly but only on update."""
validated_data.pop("user", None)
return super().update(instance, validated_data)
def get_abilities(self, instance) -> dict:
"""Return abilities of the logged-in user on the instance."""
request = self.context.get("request")
if request:
return instance.get_abilities(request.user)
return {}
def validate(self, attrs):
"""
Check access rights specific to writing (create/update)
"""
request = self.context.get("request")
user = getattr(request, "user", None)
role = attrs.get("role")
# Update
if self.instance:
can_set_role_to = self.instance.get_abilities(user)["set_role_to"]
if role and role not in can_set_role_to:
message = (
f"You are only allowed to set role to {', '.join(can_set_role_to)}"
if can_set_role_to
else "You are not allowed to set this role for this template."
)
raise exceptions.PermissionDenied(message)
# Create
else:
try:
resource_id = self.context["resource_id"]
except KeyError as exc:
raise exceptions.ValidationError(
"You must set a resource ID in kwargs to create a new access."
) from exc
if not self.Meta.model.objects.filter( # pylint: disable=no-member
Q(user=user) | Q(team__in=user.teams),
role__in=choices.PRIVILEGED_ROLES,
**{self.Meta.resource_field_name: resource_id}, # pylint: disable=no-member
).exists():
raise exceptions.PermissionDenied(
"You are not allowed to manage accesses for this resource."
)
if (
role == models.RoleChoices.OWNER
and not self.Meta.model.objects.filter( # pylint: disable=no-member
Q(user=user) | Q(team__in=user.teams),
role=models.RoleChoices.OWNER,
**{self.Meta.resource_field_name: resource_id}, # pylint: disable=no-member
).exists()
):
raise exceptions.PermissionDenied(
"Only owners of a resource can assign other users as owners."
)
# pylint: disable=no-member
attrs[f"{self.Meta.resource_field_name}_id"] = self.context["resource_id"]
return attrs
class DocumentAccessSerializer(BaseAccessSerializer):
class DocumentAccessSerializer(serializers.ModelSerializer):
"""Serialize document accesses."""
document_id = serializers.PrimaryKeyRelatedField(
@@ -124,6 +53,7 @@ class DocumentAccessSerializer(BaseAccessSerializer):
allow_null=True,
)
user = UserSerializer(read_only=True)
abilities = serializers.SerializerMethodField(read_only=True)
max_ancestors_role = serializers.SerializerMethodField(read_only=True)
class Meta:
@@ -141,10 +71,22 @@ class DocumentAccessSerializer(BaseAccessSerializer):
]
read_only_fields = ["id", "document_id", "abilities", "max_ancestors_role"]
def get_abilities(self, instance) -> dict:
"""Return abilities of the logged-in user on the instance."""
request = self.context.get("request")
if request:
return instance.get_abilities(request.user)
return {}
def get_max_ancestors_role(self, instance):
"""Return max_ancestors_role if annotated; else None."""
return getattr(instance, "max_ancestors_role", None)
def update(self, instance, validated_data):
"""Make "user" field is readonly but only on update."""
validated_data.pop("user", None)
return super().update(instance, validated_data)
class DocumentAccessLightSerializer(DocumentAccessSerializer):
"""Serialize document accesses with limited fields."""
@@ -173,15 +115,29 @@ class DocumentAccessLightSerializer(DocumentAccessSerializer):
]
class TemplateAccessSerializer(BaseAccessSerializer):
class TemplateAccessSerializer(serializers.ModelSerializer):
"""Serialize template accesses."""
abilities = serializers.SerializerMethodField(read_only=True)
class Meta:
model = models.TemplateAccess
resource_field_name = "template"
fields = ["id", "user", "team", "role", "abilities"]
read_only_fields = ["id", "abilities"]
def get_abilities(self, instance) -> dict:
"""Return abilities of the logged-in user on the instance."""
request = self.context.get("request")
if request:
return instance.get_abilities(request.user)
return {}
def update(self, instance, validated_data):
"""Make "user" field is readonly but only on update."""
validated_data.pop("user", None)
return super().update(instance, validated_data)
class ListDocumentSerializer(serializers.ModelSerializer):
"""Serialize documents with limited fields for display in lists."""

View File

@@ -19,6 +19,7 @@ from django.db.models.expressions import RawSQL
from django.db.models.functions import Left, Length
from django.http import Http404, StreamingHttpResponse
from django.urls import reverse
from django.utils.functional import cached_property
from django.utils.text import capfirst, slugify
from django.utils.translation import gettext_lazy as _
@@ -356,7 +357,7 @@ class DocumentViewSet(
ordering_fields = ["created_at", "updated_at", "title"]
pagination_class = Pagination
permission_classes = [
permissions.DocumentAccessPermission,
permissions.DocumentPermission,
]
queryset = models.Document.objects.all()
serializer_class = serializers.DocumentSerializer
@@ -842,7 +843,7 @@ class DocumentViewSet(
try:
current_document = self.queryset.only("depth", "path").get(pk=pk)
except models.Document.DoesNotExist as excpt:
raise drf.exceptions.NotFound from excpt
raise drf.exceptions.NotFound() from excpt
ancestors = (
(current_document.get_ancestors() | self.queryset.filter(pk=pk))
@@ -902,7 +903,10 @@ class DocumentViewSet(
@drf.decorators.action(
detail=True,
methods=["post"],
permission_classes=[permissions.IsAuthenticated, permissions.AccessPermission],
permission_classes=[
permissions.IsAuthenticated,
permissions.DocumentPermission,
],
url_path="duplicate",
)
@transaction.atomic
@@ -1473,25 +1477,32 @@ class DocumentAccessViewSet(
"""
lookup_field = "pk"
permission_classes = [permissions.IsAuthenticated, permissions.AccessPermission]
queryset = models.DocumentAccess.objects.select_related("user").all()
permission_classes = [permissions.ResourceAccessPermission]
queryset = models.DocumentAccess.objects.select_related("user", "document").only(
"id",
"created_at",
"role",
"team",
"user__id",
"user__short_name",
"user__full_name",
"user__email",
"user__language",
"document__id",
"document__path",
"document__depth",
)
resource_field_name = "document"
def __init__(self, *args, **kwargs):
"""Initialize the viewset and define default value for contextual document."""
super().__init__(*args, **kwargs)
self.document = None
def initial(self, request, *args, **kwargs):
"""Retrieve self.document with annotated user roles."""
super().initial(request, *args, **kwargs)
@cached_property
def document(self):
"""Get related document from resource ID in url and annotate user roles."""
try:
self.document = models.Document.objects.annotate_user_roles(
self.request.user
).get(pk=self.kwargs["resource_id"])
return models.Document.objects.annotate_user_roles(self.request.user).get(
pk=self.kwargs["resource_id"]
)
except models.Document.DoesNotExist as excpt:
raise Http404() from excpt
raise drf.exceptions.NotFound() from excpt
def get_serializer_class(self):
"""Use light serializer for unprivileged users."""
@@ -1579,8 +1590,24 @@ class DocumentAccessViewSet(
return drf.response.Response(serialized_data)
def perform_create(self, serializer):
"""Add a new access to the document and send an email to the new added user."""
access = serializer.save()
"""
Actually create the new document access:
- Ensures the `document_id` is explicitly set from the URL
- If the assigned role is `OWNER`, checks that the requesting user is an owner
of the document. This is the only permission check deferred until this step;
all other access checks are handled earlier in the permission lifecycle.
- Sends an invitation email to the newly added user after saving the access.
"""
role = serializer.validated_data.get("role")
if (
role == choices.RoleChoices.OWNER
and self.document.get_role(self.request.user) != choices.RoleChoices.OWNER
):
raise drf.exceptions.PermissionDenied(
"Only owners of a document can assign other users as owners."
)
access = serializer.save(document_id=self.kwargs["resource_id"])
access.document.send_invitation_email(
access.user.email,
@@ -1626,7 +1653,7 @@ class TemplateViewSet(
filter_backends = [drf.filters.OrderingFilter]
permission_classes = [
permissions.IsAuthenticatedOrSafe,
permissions.AccessPermission,
permissions.ResourceWithAccessPermission,
]
ordering = ["-created_at"]
ordering_fields = ["created_at", "updated_at", "title"]
@@ -1717,11 +1744,19 @@ class TemplateAccessViewSet(
"""
lookup_field = "pk"
permission_classes = [permissions.IsAuthenticated, permissions.AccessPermission]
permission_classes = [permissions.ResourceAccessPermission]
queryset = models.TemplateAccess.objects.select_related("user").all()
resource_field_name = "template"
serializer_class = serializers.TemplateAccessSerializer
@cached_property
def template(self):
"""Get related template from resource ID in url."""
try:
return models.Template.objects.get(pk=self.kwargs["resource_id"])
except models.Template.DoesNotExist as excpt:
raise drf.exceptions.NotFound() from excpt
def list(self, request, *args, **kwargs):
"""Restrict templates returned by the list endpoint"""
user = self.request.user
@@ -1739,6 +1774,25 @@ class TemplateAccessViewSet(
serializer = self.get_serializer(queryset, many=True)
return drf.response.Response(serializer.data)
def perform_create(self, serializer):
"""
Actually create the new template access:
- Ensures the `template_id` is explicitly set from the URL.
- If the assigned role is `OWNER`, checks that the requesting user is an owner
of the document. This is the only permission check deferred until this step;
all other access checks are handled earlier in the permission lifecycle.
"""
role = serializer.validated_data.get("role")
if (
role == choices.RoleChoices.OWNER
and self.template.get_role(self.request.user) != choices.RoleChoices.OWNER
):
raise drf.exceptions.PermissionDenied(
"Only owners of a template can assign other users as owners."
)
serializer.save(template_id=self.kwargs["resource_id"])
class InvitationViewset(
drf.mixins.CreateModelMixin,
@@ -1771,7 +1825,7 @@ class InvitationViewset(
pagination_class = Pagination
permission_classes = [
permissions.CanCreateInvitationPermission,
permissions.AccessPermission,
permissions.ResourceWithAccessPermission,
]
queryset = (
models.Invitation.objects.all()

View File

@@ -1291,10 +1291,10 @@ class Template(BaseModel):
def __str__(self):
return self.title
def get_roles(self, user):
def get_role(self, user):
"""Return the roles a user has on a resource as an iterable."""
if not user.is_authenticated:
return []
return None
try:
roles = self.user_roles or []
@@ -1305,21 +1305,20 @@ class Template(BaseModel):
).values_list("role", flat=True)
except (models.ObjectDoesNotExist, IndexError):
roles = []
return roles
return RoleChoices.max(*roles)
def get_abilities(self, user):
"""
Compute and return abilities for a given user on the template.
"""
roles = self.get_roles(user)
is_owner_or_admin = bool(
set(roles).intersection({RoleChoices.OWNER, RoleChoices.ADMIN})
)
can_get = self.is_public or bool(roles)
can_update = is_owner_or_admin or RoleChoices.EDITOR in roles
role = self.get_role(user)
is_owner_or_admin = role in PRIVILEGED_ROLES
can_get = self.is_public or bool(role)
can_update = is_owner_or_admin or role == RoleChoices.EDITOR
return {
"destroy": RoleChoices.OWNER in roles,
"destroy": role == RoleChoices.OWNER,
"generate_document": can_get,
"accesses_manage": is_owner_or_admin,
"update": can_update,

View File

@@ -103,32 +103,37 @@ def test_api_document_accesses_create_authenticated_reader_or_editor(
assert not models.DocumentAccess.objects.filter(user=other_user).exists()
@pytest.mark.parametrize("depth", [1, 2, 3])
@pytest.mark.parametrize("via", VIA)
def test_api_document_accesses_create_authenticated_administrator(via, mock_user_teams):
def test_api_document_accesses_create_authenticated_administrator(
via, depth, mock_user_teams
):
"""
Administrators of a document should be able to create document accesses
except for the "owner" role.
Administrators of a document (direct or by heritage) should be able to create
document accesses except for the "owner" role.
An email should be sent to the accesses to notify them of the adding.
"""
user = factories.UserFactory(with_owned_document=True)
client = APIClient()
client.force_login(user)
document = factories.DocumentFactory()
documents = []
for i in range(depth):
parent = documents[i - 1] if i > 0 else None
documents.append(factories.DocumentFactory(parent=parent))
if via == USER:
factories.UserDocumentAccessFactory(
document=document, user=user, role="administrator"
document=documents[0], user=user, role="administrator"
)
elif via == TEAM:
mock_user_teams.return_value = ["lasuite", "unknown"]
factories.TeamDocumentAccessFactory(
document=document, team="lasuite", role="administrator"
document=documents[0], team="lasuite", role="administrator"
)
other_user = factories.UserFactory(language="en-us")
# It should not be allowed to create an owner access
document = documents[-1]
response = client.post(
f"/api/v1.0/documents/{document.id!s}/accesses/",
{
@@ -140,7 +145,7 @@ def test_api_document_accesses_create_authenticated_administrator(via, mock_user
assert response.status_code == 403
assert response.json() == {
"detail": "Only owners of a resource can assign other users as owners."
"detail": "Only owners of a document can assign other users as owners."
}
# It should be allowed to create a lower access
@@ -184,28 +189,35 @@ def test_api_document_accesses_create_authenticated_administrator(via, mock_user
assert "docs/" + str(document.id) + "/" in email_content
@pytest.mark.parametrize("depth", [1, 2, 3])
@pytest.mark.parametrize("via", VIA)
def test_api_document_accesses_create_authenticated_owner(via, mock_user_teams):
def test_api_document_accesses_create_authenticated_owner(via, depth, mock_user_teams):
"""
Owners of a document should be able to create document accesses whatever the role.
An email should be sent to the accesses to notify them of the adding.
Owners of a document (direct or by heritage) should be able to create document accesses
whatever the role. An email should be sent to the accesses to notify them of the adding.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
document = factories.DocumentFactory()
documents = []
for i in range(depth):
parent = documents[i - 1] if i > 0 else None
documents.append(factories.DocumentFactory(parent=parent))
if via == USER:
factories.UserDocumentAccessFactory(document=document, user=user, role="owner")
factories.UserDocumentAccessFactory(
document=documents[0], user=user, role="owner"
)
elif via == TEAM:
mock_user_teams.return_value = ["lasuite", "unknown"]
factories.TeamDocumentAccessFactory(
document=document, team="lasuite", role="owner"
document=documents[0], team="lasuite", role="owner"
)
other_user = factories.UserFactory(language="en-us")
document = documents[-1]
role = random.choice([role[0] for role in models.RoleChoices.choices])
assert len(mail.outbox) == 0

View File

@@ -133,7 +133,7 @@ def test_api_template_accesses_create_authenticated_administrator(via, mock_user
assert response.status_code == 403
assert response.json() == {
"detail": "Only owners of a resource can assign other users as owners."
"detail": "Only owners of a template can assign other users as owners."
}
# It should be allowed to create a lower access