diff --git a/src/backend/core/api/permissions.py b/src/backend/core/api/permissions.py index 43a0465f..09007847 100644 --- a/src/backend/core/api/permissions.py +++ b/src/backend/core/api/permissions.py @@ -6,6 +6,7 @@ from django.http import Http404 from rest_framework import permissions +from core import choices from core.models import DocumentAccess, RoleChoices, get_trashbin_cutoff ACTION_FOR_METHOD_TO_PERMISSION = { @@ -96,26 +97,27 @@ class CanCreateInvitationPermission(permissions.BasePermission): ).exists() -class AccessPermission(permissions.BasePermission): - """Permission class for access objects.""" +class ResourceWithAccessPermission(permissions.BasePermission): + """A permission class for templates and invitations.""" def has_permission(self, request, view): + """check create permission for templates.""" return request.user.is_authenticated or view.action != "create" def has_object_permission(self, request, view, obj): """Check permission for a given object.""" abilities = obj.get_abilities(request.user) action = view.action - try: - action = ACTION_FOR_METHOD_TO_PERMISSION[view.action][request.method] - except KeyError: - pass return abilities.get(action, False) -class DocumentAccessPermission(AccessPermission): +class DocumentPermission(permissions.BasePermission): """Subclass to handle soft deletion specificities.""" + def has_permission(self, request, view): + """check create permission for documents.""" + return request.user.is_authenticated or view.action != "create" + def has_object_permission(self, request, view, obj): """ Return a 404 on deleted documents @@ -127,10 +129,45 @@ class DocumentAccessPermission(AccessPermission): ) and deleted_at < get_trashbin_cutoff(): raise Http404 - # Compute permission first to ensure the "user_roles" attribute is set - has_permission = super().has_object_permission(request, view, obj) + abilities = obj.get_abilities(request.user) + action = view.action + try: + action = ACTION_FOR_METHOD_TO_PERMISSION[view.action][request.method] + except KeyError: + pass + + has_permission = abilities.get(action, False) if obj.ancestors_deleted_at and not RoleChoices.OWNER in obj.user_roles: raise Http404 return has_permission + + +class ResourceAccessPermission(IsAuthenticated): + """Permission class for document access objects.""" + + def has_permission(self, request, view): + """check create permission for accesses in documents tree.""" + if super().has_permission(request, view) is False: + return False + + if view.action == "create": + role = getattr(view, view.resource_field_name).get_role(request.user) + if role not in choices.PRIVILEGED_ROLES: + raise exceptions.PermissionDenied( + "You are not allowed to manage accesses for this resource." + ) + + return True + + def has_object_permission(self, request, view, obj): + """Check permission for a given object.""" + abilities = obj.get_abilities(request.user) + + requested_role = request.data.get("role") + if requested_role and requested_role not in abilities.get("set_role_to", []): + return False + + action = view.action + return abilities.get(action, False) diff --git a/src/backend/core/api/serializers.py b/src/backend/core/api/serializers.py index fcf2a1ea..6bd9da30 100644 --- a/src/backend/core/api/serializers.py +++ b/src/backend/core/api/serializers.py @@ -10,7 +10,7 @@ from django.utils.functional import lazy from django.utils.translation import gettext_lazy as _ import magic -from rest_framework import exceptions, serializers +from rest_framework import serializers from core import choices, enums, models, utils from core.services.ai_services import AI_ACTIONS @@ -38,78 +38,7 @@ class UserLightSerializer(UserSerializer): read_only_fields = ["full_name", "short_name"] -class BaseAccessSerializer(serializers.ModelSerializer): - """Serialize template accesses.""" - - abilities = serializers.SerializerMethodField(read_only=True) - - def update(self, instance, validated_data): - """Make "user" field is readonly but only on update.""" - validated_data.pop("user", None) - return super().update(instance, validated_data) - - def get_abilities(self, instance) -> dict: - """Return abilities of the logged-in user on the instance.""" - request = self.context.get("request") - if request: - return instance.get_abilities(request.user) - return {} - - def validate(self, attrs): - """ - Check access rights specific to writing (create/update) - """ - request = self.context.get("request") - user = getattr(request, "user", None) - role = attrs.get("role") - - # Update - if self.instance: - can_set_role_to = self.instance.get_abilities(user)["set_role_to"] - if role and role not in can_set_role_to: - message = ( - f"You are only allowed to set role to {', '.join(can_set_role_to)}" - if can_set_role_to - else "You are not allowed to set this role for this template." - ) - raise exceptions.PermissionDenied(message) - - # Create - else: - try: - resource_id = self.context["resource_id"] - except KeyError as exc: - raise exceptions.ValidationError( - "You must set a resource ID in kwargs to create a new access." - ) from exc - - if not self.Meta.model.objects.filter( # pylint: disable=no-member - Q(user=user) | Q(team__in=user.teams), - role__in=choices.PRIVILEGED_ROLES, - **{self.Meta.resource_field_name: resource_id}, # pylint: disable=no-member - ).exists(): - raise exceptions.PermissionDenied( - "You are not allowed to manage accesses for this resource." - ) - - if ( - role == models.RoleChoices.OWNER - and not self.Meta.model.objects.filter( # pylint: disable=no-member - Q(user=user) | Q(team__in=user.teams), - role=models.RoleChoices.OWNER, - **{self.Meta.resource_field_name: resource_id}, # pylint: disable=no-member - ).exists() - ): - raise exceptions.PermissionDenied( - "Only owners of a resource can assign other users as owners." - ) - - # pylint: disable=no-member - attrs[f"{self.Meta.resource_field_name}_id"] = self.context["resource_id"] - return attrs - - -class DocumentAccessSerializer(BaseAccessSerializer): +class DocumentAccessSerializer(serializers.ModelSerializer): """Serialize document accesses.""" document_id = serializers.PrimaryKeyRelatedField( @@ -124,6 +53,7 @@ class DocumentAccessSerializer(BaseAccessSerializer): allow_null=True, ) user = UserSerializer(read_only=True) + abilities = serializers.SerializerMethodField(read_only=True) max_ancestors_role = serializers.SerializerMethodField(read_only=True) class Meta: @@ -141,10 +71,22 @@ class DocumentAccessSerializer(BaseAccessSerializer): ] read_only_fields = ["id", "document_id", "abilities", "max_ancestors_role"] + def get_abilities(self, instance) -> dict: + """Return abilities of the logged-in user on the instance.""" + request = self.context.get("request") + if request: + return instance.get_abilities(request.user) + return {} + def get_max_ancestors_role(self, instance): """Return max_ancestors_role if annotated; else None.""" return getattr(instance, "max_ancestors_role", None) + def update(self, instance, validated_data): + """Make "user" field is readonly but only on update.""" + validated_data.pop("user", None) + return super().update(instance, validated_data) + class DocumentAccessLightSerializer(DocumentAccessSerializer): """Serialize document accesses with limited fields.""" @@ -173,15 +115,29 @@ class DocumentAccessLightSerializer(DocumentAccessSerializer): ] -class TemplateAccessSerializer(BaseAccessSerializer): +class TemplateAccessSerializer(serializers.ModelSerializer): """Serialize template accesses.""" + abilities = serializers.SerializerMethodField(read_only=True) + class Meta: model = models.TemplateAccess resource_field_name = "template" fields = ["id", "user", "team", "role", "abilities"] read_only_fields = ["id", "abilities"] + def get_abilities(self, instance) -> dict: + """Return abilities of the logged-in user on the instance.""" + request = self.context.get("request") + if request: + return instance.get_abilities(request.user) + return {} + + def update(self, instance, validated_data): + """Make "user" field is readonly but only on update.""" + validated_data.pop("user", None) + return super().update(instance, validated_data) + class ListDocumentSerializer(serializers.ModelSerializer): """Serialize documents with limited fields for display in lists.""" diff --git a/src/backend/core/api/viewsets.py b/src/backend/core/api/viewsets.py index f5422da1..5f766293 100644 --- a/src/backend/core/api/viewsets.py +++ b/src/backend/core/api/viewsets.py @@ -19,6 +19,7 @@ from django.db.models.expressions import RawSQL from django.db.models.functions import Left, Length from django.http import Http404, StreamingHttpResponse from django.urls import reverse +from django.utils.functional import cached_property from django.utils.text import capfirst, slugify from django.utils.translation import gettext_lazy as _ @@ -356,7 +357,7 @@ class DocumentViewSet( ordering_fields = ["created_at", "updated_at", "title"] pagination_class = Pagination permission_classes = [ - permissions.DocumentAccessPermission, + permissions.DocumentPermission, ] queryset = models.Document.objects.all() serializer_class = serializers.DocumentSerializer @@ -842,7 +843,7 @@ class DocumentViewSet( try: current_document = self.queryset.only("depth", "path").get(pk=pk) except models.Document.DoesNotExist as excpt: - raise drf.exceptions.NotFound from excpt + raise drf.exceptions.NotFound() from excpt ancestors = ( (current_document.get_ancestors() | self.queryset.filter(pk=pk)) @@ -902,7 +903,10 @@ class DocumentViewSet( @drf.decorators.action( detail=True, methods=["post"], - permission_classes=[permissions.IsAuthenticated, permissions.AccessPermission], + permission_classes=[ + permissions.IsAuthenticated, + permissions.DocumentPermission, + ], url_path="duplicate", ) @transaction.atomic @@ -1473,25 +1477,32 @@ class DocumentAccessViewSet( """ lookup_field = "pk" - permission_classes = [permissions.IsAuthenticated, permissions.AccessPermission] - queryset = models.DocumentAccess.objects.select_related("user").all() + permission_classes = [permissions.ResourceAccessPermission] + queryset = models.DocumentAccess.objects.select_related("user", "document").only( + "id", + "created_at", + "role", + "team", + "user__id", + "user__short_name", + "user__full_name", + "user__email", + "user__language", + "document__id", + "document__path", + "document__depth", + ) resource_field_name = "document" - def __init__(self, *args, **kwargs): - """Initialize the viewset and define default value for contextual document.""" - super().__init__(*args, **kwargs) - self.document = None - - def initial(self, request, *args, **kwargs): - """Retrieve self.document with annotated user roles.""" - super().initial(request, *args, **kwargs) - + @cached_property + def document(self): + """Get related document from resource ID in url and annotate user roles.""" try: - self.document = models.Document.objects.annotate_user_roles( - self.request.user - ).get(pk=self.kwargs["resource_id"]) + return models.Document.objects.annotate_user_roles(self.request.user).get( + pk=self.kwargs["resource_id"] + ) except models.Document.DoesNotExist as excpt: - raise Http404() from excpt + raise drf.exceptions.NotFound() from excpt def get_serializer_class(self): """Use light serializer for unprivileged users.""" @@ -1579,8 +1590,24 @@ class DocumentAccessViewSet( return drf.response.Response(serialized_data) def perform_create(self, serializer): - """Add a new access to the document and send an email to the new added user.""" - access = serializer.save() + """ + Actually create the new document access: + - Ensures the `document_id` is explicitly set from the URL + - If the assigned role is `OWNER`, checks that the requesting user is an owner + of the document. This is the only permission check deferred until this step; + all other access checks are handled earlier in the permission lifecycle. + - Sends an invitation email to the newly added user after saving the access. + """ + role = serializer.validated_data.get("role") + if ( + role == choices.RoleChoices.OWNER + and self.document.get_role(self.request.user) != choices.RoleChoices.OWNER + ): + raise drf.exceptions.PermissionDenied( + "Only owners of a document can assign other users as owners." + ) + + access = serializer.save(document_id=self.kwargs["resource_id"]) access.document.send_invitation_email( access.user.email, @@ -1626,7 +1653,7 @@ class TemplateViewSet( filter_backends = [drf.filters.OrderingFilter] permission_classes = [ permissions.IsAuthenticatedOrSafe, - permissions.AccessPermission, + permissions.ResourceWithAccessPermission, ] ordering = ["-created_at"] ordering_fields = ["created_at", "updated_at", "title"] @@ -1717,11 +1744,19 @@ class TemplateAccessViewSet( """ lookup_field = "pk" - permission_classes = [permissions.IsAuthenticated, permissions.AccessPermission] + permission_classes = [permissions.ResourceAccessPermission] queryset = models.TemplateAccess.objects.select_related("user").all() resource_field_name = "template" serializer_class = serializers.TemplateAccessSerializer + @cached_property + def template(self): + """Get related template from resource ID in url.""" + try: + return models.Template.objects.get(pk=self.kwargs["resource_id"]) + except models.Template.DoesNotExist as excpt: + raise drf.exceptions.NotFound() from excpt + def list(self, request, *args, **kwargs): """Restrict templates returned by the list endpoint""" user = self.request.user @@ -1739,6 +1774,25 @@ class TemplateAccessViewSet( serializer = self.get_serializer(queryset, many=True) return drf.response.Response(serializer.data) + def perform_create(self, serializer): + """ + Actually create the new template access: + - Ensures the `template_id` is explicitly set from the URL. + - If the assigned role is `OWNER`, checks that the requesting user is an owner + of the document. This is the only permission check deferred until this step; + all other access checks are handled earlier in the permission lifecycle. + """ + role = serializer.validated_data.get("role") + if ( + role == choices.RoleChoices.OWNER + and self.template.get_role(self.request.user) != choices.RoleChoices.OWNER + ): + raise drf.exceptions.PermissionDenied( + "Only owners of a template can assign other users as owners." + ) + + serializer.save(template_id=self.kwargs["resource_id"]) + class InvitationViewset( drf.mixins.CreateModelMixin, @@ -1771,7 +1825,7 @@ class InvitationViewset( pagination_class = Pagination permission_classes = [ permissions.CanCreateInvitationPermission, - permissions.AccessPermission, + permissions.ResourceWithAccessPermission, ] queryset = ( models.Invitation.objects.all() diff --git a/src/backend/core/models.py b/src/backend/core/models.py index 8c4cd35b..5d16dacf 100644 --- a/src/backend/core/models.py +++ b/src/backend/core/models.py @@ -1291,10 +1291,10 @@ class Template(BaseModel): def __str__(self): return self.title - def get_roles(self, user): + def get_role(self, user): """Return the roles a user has on a resource as an iterable.""" if not user.is_authenticated: - return [] + return None try: roles = self.user_roles or [] @@ -1305,21 +1305,20 @@ class Template(BaseModel): ).values_list("role", flat=True) except (models.ObjectDoesNotExist, IndexError): roles = [] - return roles + + return RoleChoices.max(*roles) def get_abilities(self, user): """ Compute and return abilities for a given user on the template. """ - roles = self.get_roles(user) - is_owner_or_admin = bool( - set(roles).intersection({RoleChoices.OWNER, RoleChoices.ADMIN}) - ) - can_get = self.is_public or bool(roles) - can_update = is_owner_or_admin or RoleChoices.EDITOR in roles + role = self.get_role(user) + is_owner_or_admin = role in PRIVILEGED_ROLES + can_get = self.is_public or bool(role) + can_update = is_owner_or_admin or role == RoleChoices.EDITOR return { - "destroy": RoleChoices.OWNER in roles, + "destroy": role == RoleChoices.OWNER, "generate_document": can_get, "accesses_manage": is_owner_or_admin, "update": can_update, diff --git a/src/backend/core/tests/documents/test_api_document_accesses_create.py b/src/backend/core/tests/documents/test_api_document_accesses_create.py index 8b831980..5af0da91 100644 --- a/src/backend/core/tests/documents/test_api_document_accesses_create.py +++ b/src/backend/core/tests/documents/test_api_document_accesses_create.py @@ -103,32 +103,37 @@ def test_api_document_accesses_create_authenticated_reader_or_editor( assert not models.DocumentAccess.objects.filter(user=other_user).exists() +@pytest.mark.parametrize("depth", [1, 2, 3]) @pytest.mark.parametrize("via", VIA) -def test_api_document_accesses_create_authenticated_administrator(via, mock_user_teams): +def test_api_document_accesses_create_authenticated_administrator( + via, depth, mock_user_teams +): """ - Administrators of a document should be able to create document accesses - except for the "owner" role. + Administrators of a document (direct or by heritage) should be able to create + document accesses except for the "owner" role. An email should be sent to the accesses to notify them of the adding. """ user = factories.UserFactory(with_owned_document=True) - client = APIClient() client.force_login(user) - document = factories.DocumentFactory() + documents = [] + for i in range(depth): + parent = documents[i - 1] if i > 0 else None + documents.append(factories.DocumentFactory(parent=parent)) + if via == USER: factories.UserDocumentAccessFactory( - document=document, user=user, role="administrator" + document=documents[0], user=user, role="administrator" ) elif via == TEAM: mock_user_teams.return_value = ["lasuite", "unknown"] factories.TeamDocumentAccessFactory( - document=document, team="lasuite", role="administrator" + document=documents[0], team="lasuite", role="administrator" ) other_user = factories.UserFactory(language="en-us") - - # It should not be allowed to create an owner access + document = documents[-1] response = client.post( f"/api/v1.0/documents/{document.id!s}/accesses/", { @@ -140,7 +145,7 @@ def test_api_document_accesses_create_authenticated_administrator(via, mock_user assert response.status_code == 403 assert response.json() == { - "detail": "Only owners of a resource can assign other users as owners." + "detail": "Only owners of a document can assign other users as owners." } # It should be allowed to create a lower access @@ -184,28 +189,35 @@ def test_api_document_accesses_create_authenticated_administrator(via, mock_user assert "docs/" + str(document.id) + "/" in email_content +@pytest.mark.parametrize("depth", [1, 2, 3]) @pytest.mark.parametrize("via", VIA) -def test_api_document_accesses_create_authenticated_owner(via, mock_user_teams): +def test_api_document_accesses_create_authenticated_owner(via, depth, mock_user_teams): """ - Owners of a document should be able to create document accesses whatever the role. - An email should be sent to the accesses to notify them of the adding. + Owners of a document (direct or by heritage) should be able to create document accesses + whatever the role. An email should be sent to the accesses to notify them of the adding. """ user = factories.UserFactory() client = APIClient() client.force_login(user) - document = factories.DocumentFactory() + documents = [] + for i in range(depth): + parent = documents[i - 1] if i > 0 else None + documents.append(factories.DocumentFactory(parent=parent)) + if via == USER: - factories.UserDocumentAccessFactory(document=document, user=user, role="owner") + factories.UserDocumentAccessFactory( + document=documents[0], user=user, role="owner" + ) elif via == TEAM: mock_user_teams.return_value = ["lasuite", "unknown"] factories.TeamDocumentAccessFactory( - document=document, team="lasuite", role="owner" + document=documents[0], team="lasuite", role="owner" ) other_user = factories.UserFactory(language="en-us") - + document = documents[-1] role = random.choice([role[0] for role in models.RoleChoices.choices]) assert len(mail.outbox) == 0 diff --git a/src/backend/core/tests/templates/test_api_template_accesses_create.py b/src/backend/core/tests/templates/test_api_template_accesses_create.py index f52a5344..a33cd470 100644 --- a/src/backend/core/tests/templates/test_api_template_accesses_create.py +++ b/src/backend/core/tests/templates/test_api_template_accesses_create.py @@ -133,7 +133,7 @@ def test_api_template_accesses_create_authenticated_administrator(via, mock_user assert response.status_code == 403 assert response.json() == { - "detail": "Only owners of a resource can assign other users as owners." + "detail": "Only owners of a template can assign other users as owners." } # It should be allowed to create a lower access