♻️(backend) optimize refactoring access abilities and fix inheritance
The latest refactoring in a445278 kept some factorizations that are
not legit anymore after the refactoring.
It is also cleaner to not make serializer choice in the list view if
the reason for this choice is related to something else b/c other
views would then use the wrong serializer and that would be a
security leak.
This commit also fixes a bug in the access rights inheritance: if a
user is allowed to see accesses on a document, he should see all
acesses related to ancestors, even the ancestors that he can not
read. This is because the access that was granted on all ancestors
also apply on the current document... so it must be displayed.
Lastly, we optimize database queries because the number of accesses
we fetch is going up with multi-pages and we were generating a lot
of useless queries.
This commit is contained in:
committed by
Anthony LC
parent
c1fc1bd52f
commit
f782a0236b
@@ -32,21 +32,10 @@ class UserSerializer(serializers.ModelSerializer):
|
||||
class UserLightSerializer(UserSerializer):
|
||||
"""Serialize users with limited fields."""
|
||||
|
||||
id = serializers.SerializerMethodField(read_only=True)
|
||||
email = serializers.SerializerMethodField(read_only=True)
|
||||
|
||||
def get_id(self, _user):
|
||||
"""Return always None. Here to have the same fields than in UserSerializer."""
|
||||
return None
|
||||
|
||||
def get_email(self, _user):
|
||||
"""Return always None. Here to have the same fields than in UserSerializer."""
|
||||
return None
|
||||
|
||||
class Meta:
|
||||
model = models.User
|
||||
fields = ["id", "email", "full_name", "short_name"]
|
||||
read_only_fields = ["id", "email", "full_name", "short_name"]
|
||||
fields = ["full_name", "short_name"]
|
||||
read_only_fields = ["full_name", "short_name"]
|
||||
|
||||
|
||||
class BaseAccessSerializer(serializers.ModelSerializer):
|
||||
@@ -59,11 +48,11 @@ class BaseAccessSerializer(serializers.ModelSerializer):
|
||||
validated_data.pop("user", None)
|
||||
return super().update(instance, validated_data)
|
||||
|
||||
def get_abilities(self, access) -> dict:
|
||||
def get_abilities(self, instance) -> dict:
|
||||
"""Return abilities of the logged-in user on the instance."""
|
||||
request = self.context.get("request")
|
||||
if request:
|
||||
return access.get_abilities(request.user)
|
||||
return instance.get_abilities(request.user)
|
||||
return {}
|
||||
|
||||
def validate(self, attrs):
|
||||
@@ -77,7 +66,6 @@ class BaseAccessSerializer(serializers.ModelSerializer):
|
||||
# Update
|
||||
if self.instance:
|
||||
can_set_role_to = self.instance.get_abilities(user)["set_role_to"]
|
||||
|
||||
if role and role not in can_set_role_to:
|
||||
message = (
|
||||
f"You are only allowed to set role to {', '.join(can_set_role_to)}"
|
||||
@@ -140,19 +128,41 @@ class DocumentAccessSerializer(BaseAccessSerializer):
|
||||
class Meta:
|
||||
model = models.DocumentAccess
|
||||
resource_field_name = "document"
|
||||
fields = ["id", "document_id", "user", "user_id", "team", "role", "abilities"]
|
||||
fields = [
|
||||
"id",
|
||||
"document_id",
|
||||
"user",
|
||||
"user_id",
|
||||
"team",
|
||||
"role",
|
||||
"abilities",
|
||||
]
|
||||
read_only_fields = ["id", "document_id", "abilities"]
|
||||
|
||||
|
||||
class DocumentAccessLightSerializer(BaseAccessSerializer):
|
||||
class DocumentAccessLightSerializer(DocumentAccessSerializer):
|
||||
"""Serialize document accesses with limited fields."""
|
||||
|
||||
user = UserLightSerializer(read_only=True)
|
||||
|
||||
class Meta:
|
||||
model = models.DocumentAccess
|
||||
fields = ["id", "user", "team", "role", "abilities"]
|
||||
read_only_fields = ["id", "team", "role", "abilities"]
|
||||
resource_field_name = "document"
|
||||
fields = [
|
||||
"id",
|
||||
"document_id",
|
||||
"user",
|
||||
"team",
|
||||
"role",
|
||||
"abilities",
|
||||
]
|
||||
read_only_fields = [
|
||||
"id",
|
||||
"document_id",
|
||||
"team",
|
||||
"role",
|
||||
"abilities",
|
||||
]
|
||||
|
||||
|
||||
class TemplateAccessSerializer(BaseAccessSerializer):
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from urllib.parse import unquote, urlencode, urlparse
|
||||
|
||||
from django.conf import settings
|
||||
@@ -1500,49 +1501,88 @@ class DocumentAccessViewSet(
|
||||
permission_classes = [permissions.IsAuthenticated, permissions.AccessPermission]
|
||||
queryset = models.DocumentAccess.objects.select_related("user").all()
|
||||
resource_field_name = "document"
|
||||
serializer_class = serializers.DocumentAccessSerializer
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Initialize the viewset and define default value for contextual document."""
|
||||
super().__init__(*args, **kwargs)
|
||||
self.document = None
|
||||
|
||||
def initial(self, request, *args, **kwargs):
|
||||
"""Retrieve self.document with annotated user roles."""
|
||||
super().initial(request, *args, **kwargs)
|
||||
|
||||
try:
|
||||
self.document = models.Document.objects.annotate_user_roles(
|
||||
self.request.user
|
||||
).get(pk=self.kwargs["resource_id"])
|
||||
except models.Document.DoesNotExist as excpt:
|
||||
raise Http404() from excpt
|
||||
|
||||
def get_serializer_class(self):
|
||||
"""Use light serializer for unprivileged users."""
|
||||
return (
|
||||
serializers.DocumentAccessSerializer
|
||||
if self.document.get_role(self.request.user) in choices.PRIVILEGED_ROLES
|
||||
else serializers.DocumentAccessLightSerializer
|
||||
)
|
||||
|
||||
def list(self, request, *args, **kwargs):
|
||||
"""Return accesses for the current document with filters and annotations."""
|
||||
user = self.request.user
|
||||
user = request.user
|
||||
|
||||
try:
|
||||
document = models.Document.objects.get(pk=self.kwargs["resource_id"])
|
||||
except models.Document.DoesNotExist:
|
||||
return drf.response.Response([])
|
||||
|
||||
role = document.get_role(user)
|
||||
if role is None:
|
||||
role = self.document.get_role(user)
|
||||
if not role:
|
||||
return drf.response.Response([])
|
||||
|
||||
ancestors = (
|
||||
(document.get_ancestors() | models.Document.objects.filter(pk=document.pk))
|
||||
.filter(ancestors_deleted_at__isnull=True)
|
||||
.order_by("path")
|
||||
)
|
||||
highest_readable = ancestors.readable_per_se(user).only("depth").first()
|
||||
self.document.get_ancestors()
|
||||
| models.Document.objects.filter(pk=self.document.pk)
|
||||
).filter(ancestors_deleted_at__isnull=True)
|
||||
|
||||
if highest_readable is None:
|
||||
return drf.response.Response([])
|
||||
queryset = self.get_queryset().filter(document__in=ancestors)
|
||||
|
||||
queryset = self.get_queryset()
|
||||
queryset = queryset.filter(
|
||||
document__in=ancestors.filter(depth__gte=highest_readable.depth)
|
||||
)
|
||||
|
||||
is_privileged = role in choices.PRIVILEGED_ROLES
|
||||
if is_privileged:
|
||||
serializer_class = serializers.DocumentAccessSerializer
|
||||
else:
|
||||
# Return only the document's privileged accesses
|
||||
if role not in choices.PRIVILEGED_ROLES:
|
||||
queryset = queryset.filter(role__in=choices.PRIVILEGED_ROLES)
|
||||
serializer_class = serializers.DocumentAccessLightSerializer
|
||||
|
||||
queryset = queryset.distinct()
|
||||
serializer = serializer_class(
|
||||
queryset, many=True, context=self.get_serializer_context()
|
||||
accesses = list(
|
||||
queryset.annotate(document_path=db.F("document__path")).order_by(
|
||||
"document_path"
|
||||
)
|
||||
)
|
||||
return drf.response.Response(serializer.data)
|
||||
|
||||
# Annotate more information on roles
|
||||
path_to_ancestors_roles = defaultdict(list)
|
||||
path_to_role = defaultdict(lambda: None)
|
||||
for access in accesses:
|
||||
if access.user_id == user.id or access.team in user.teams:
|
||||
parent_path = access.document_path[: -models.Document.steplen]
|
||||
if parent_path:
|
||||
path_to_ancestors_roles[access.document_path].extend(
|
||||
path_to_ancestors_roles[parent_path]
|
||||
)
|
||||
path_to_ancestors_roles[access.document_path].append(
|
||||
path_to_role[parent_path]
|
||||
)
|
||||
else:
|
||||
path_to_ancestors_roles[access.document_path] = []
|
||||
|
||||
path_to_role[access.document_path] = choices.RoleChoices.max(
|
||||
path_to_role[access.document_path], access.role
|
||||
)
|
||||
|
||||
# serialize and return the response
|
||||
context = self.get_serializer_context()
|
||||
serializer_class = self.get_serializer_class()
|
||||
serialized_data = []
|
||||
for access in accesses:
|
||||
access.set_user_roles_tuple(
|
||||
choices.RoleChoices.max(*path_to_ancestors_roles[access.document_path]),
|
||||
path_to_role.get(access.document_path),
|
||||
)
|
||||
serializer = serializer_class(access, context=context)
|
||||
serialized_data.append(serializer.data)
|
||||
|
||||
return drf.response.Response(serialized_data)
|
||||
|
||||
def perform_create(self, serializer):
|
||||
"""Add a new access to the document and send an email to the new added user."""
|
||||
|
||||
@@ -289,66 +289,6 @@ class BaseAccess(BaseModel):
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
def _get_role(self, resource, user):
|
||||
"""
|
||||
Get the role a user has on a resource.
|
||||
"""
|
||||
roles = []
|
||||
if user.is_authenticated:
|
||||
teams = user.teams
|
||||
try:
|
||||
roles = self.user_roles or []
|
||||
except AttributeError:
|
||||
try:
|
||||
roles = resource.accesses.filter(
|
||||
models.Q(user=user) | models.Q(team__in=teams),
|
||||
).values_list("role", flat=True)
|
||||
except (self._meta.model.DoesNotExist, IndexError):
|
||||
roles = []
|
||||
|
||||
return RoleChoices.max(*roles)
|
||||
|
||||
def _get_abilities(self, resource, user):
|
||||
"""
|
||||
Compute and return abilities for a given user taking into account
|
||||
the current state of the object.
|
||||
"""
|
||||
role = self._get_role(resource, user)
|
||||
is_owner_or_admin = role in (RoleChoices.OWNER, RoleChoices.ADMIN)
|
||||
|
||||
if self.role == RoleChoices.OWNER:
|
||||
can_delete = (role == RoleChoices.OWNER) and resource.accesses.filter(
|
||||
role=RoleChoices.OWNER
|
||||
).count() > 1
|
||||
set_role_to = (
|
||||
[RoleChoices.ADMIN, RoleChoices.EDITOR, RoleChoices.READER]
|
||||
if can_delete
|
||||
else []
|
||||
)
|
||||
else:
|
||||
can_delete = is_owner_or_admin
|
||||
set_role_to = []
|
||||
if role == RoleChoices.OWNER:
|
||||
set_role_to.append(RoleChoices.OWNER)
|
||||
if is_owner_or_admin:
|
||||
set_role_to.extend(
|
||||
[RoleChoices.ADMIN, RoleChoices.EDITOR, RoleChoices.READER]
|
||||
)
|
||||
|
||||
# Remove the current role as we don't want to propose it as an option
|
||||
try:
|
||||
set_role_to.remove(self.role)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return {
|
||||
"destroy": can_delete,
|
||||
"update": bool(set_role_to),
|
||||
"partial_update": bool(set_role_to),
|
||||
"retrieve": bool(role),
|
||||
"set_role_to": set_role_to,
|
||||
}
|
||||
|
||||
|
||||
class DocumentQuerySet(MP_NodeQuerySet):
|
||||
"""
|
||||
@@ -1103,48 +1043,117 @@ class DocumentAccess(BaseAccess):
|
||||
super().save(*args, **kwargs)
|
||||
self.document.invalidate_nb_accesses_cache()
|
||||
|
||||
@property
|
||||
def target_key(self):
|
||||
"""Get a unique key for the actor targeted by the access, without possible conflict."""
|
||||
return f"user:{self.user_id!s}" if self.user_id else f"team:{self.team:s}"
|
||||
|
||||
def delete(self, *args, **kwargs):
|
||||
"""Override delete to clear the document's cache for number of accesses."""
|
||||
super().delete(*args, **kwargs)
|
||||
self.document.invalidate_nb_accesses_cache()
|
||||
|
||||
def set_user_roles_tuple(self, ancestors_role, current_role):
|
||||
"""
|
||||
Set a precomputed (ancestor_role, current_role) tuple for this instance.
|
||||
|
||||
This avoids querying the database in `get_roles_tuple()` and is useful
|
||||
when roles are already known, such as in bulk serialization.
|
||||
|
||||
Args:
|
||||
ancestor_role (str | None): Highest role on any ancestor document.
|
||||
current_role (str | None): Role on the current document.
|
||||
"""
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
self._prefetched_user_roles_tuple = (ancestors_role, current_role)
|
||||
|
||||
def get_user_roles_tuple(self, user):
|
||||
"""
|
||||
Return a tuple of:
|
||||
- the highest role the user has on any ancestor of the document
|
||||
- the role the user has on the current document
|
||||
|
||||
If roles have been explicitly set using `set_user_roles_tuple()`,
|
||||
those will be returned instead of querying the database.
|
||||
|
||||
This allows viewsets or serializers to precompute roles for performance
|
||||
when handling multiple documents at once.
|
||||
|
||||
Args:
|
||||
user (User): The user whose roles are being evaluated.
|
||||
|
||||
Returns:
|
||||
tuple[str | None, str | None]: (max_ancestor_role, current_document_role)
|
||||
"""
|
||||
if not user.is_authenticated:
|
||||
return None, None
|
||||
|
||||
try:
|
||||
return self._prefetched_user_roles_tuple
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
ancestors = (
|
||||
self.document.get_ancestors() | Document.objects.filter(pk=self.document_id)
|
||||
).filter(ancestors_deleted_at__isnull=True)
|
||||
|
||||
access_tuples = DocumentAccess.objects.filter(
|
||||
models.Q(user=user) | models.Q(team__in=user.teams),
|
||||
document__in=ancestors,
|
||||
).values_list("document_id", "role")
|
||||
|
||||
ancestors_roles = []
|
||||
current_roles = []
|
||||
for doc_id, role in access_tuples:
|
||||
if doc_id == self.document_id:
|
||||
current_roles.append(role)
|
||||
else:
|
||||
ancestors_roles.append(role)
|
||||
|
||||
return RoleChoices.max(*ancestors_roles), RoleChoices.max(*current_roles)
|
||||
|
||||
def get_abilities(self, user):
|
||||
"""
|
||||
Compute and return abilities for a given user on the document access.
|
||||
"""
|
||||
role = self._get_role(self.document, user)
|
||||
ancestors_role, current_role = self.get_user_roles_tuple(user)
|
||||
role = RoleChoices.max(ancestors_role, current_role)
|
||||
is_owner_or_admin = role in PRIVILEGED_ROLES
|
||||
|
||||
if self.role == RoleChoices.OWNER:
|
||||
can_delete = (
|
||||
role == RoleChoices.OWNER
|
||||
and self.document.accesses.filter(role=RoleChoices.OWNER).count() > 1
|
||||
)
|
||||
set_role_to = (
|
||||
[RoleChoices.ADMIN, RoleChoices.EDITOR, RoleChoices.READER]
|
||||
if can_delete
|
||||
else []
|
||||
and DocumentAccess.objects.filter(
|
||||
document_id=self.document_id, role=RoleChoices.OWNER
|
||||
).count()
|
||||
> 1
|
||||
)
|
||||
set_role_to = RoleChoices.values if can_delete else []
|
||||
else:
|
||||
can_delete = is_owner_or_admin
|
||||
set_role_to = []
|
||||
if role == RoleChoices.OWNER:
|
||||
set_role_to.append(RoleChoices.OWNER)
|
||||
if is_owner_or_admin:
|
||||
set_role_to.extend(
|
||||
[RoleChoices.ADMIN, RoleChoices.EDITOR, RoleChoices.READER]
|
||||
[RoleChoices.READER, RoleChoices.EDITOR, RoleChoices.ADMIN]
|
||||
)
|
||||
if role == RoleChoices.OWNER:
|
||||
set_role_to.append(RoleChoices.OWNER)
|
||||
|
||||
# Remove the current role as we don't want to propose it as an option
|
||||
try:
|
||||
set_role_to.remove(self.role)
|
||||
except ValueError:
|
||||
pass
|
||||
# Filter out roles that would be lower than the one the user already has
|
||||
ancestors_role_priority = RoleChoices.get_priority(ancestors_role)
|
||||
set_role_to = [
|
||||
candidate_role
|
||||
for candidate_role in set_role_to
|
||||
if RoleChoices.get_priority(candidate_role) >= ancestors_role_priority
|
||||
]
|
||||
if len(set_role_to) == 1:
|
||||
set_role_to = []
|
||||
|
||||
return {
|
||||
"destroy": can_delete,
|
||||
"update": bool(set_role_to) and is_owner_or_admin,
|
||||
"partial_update": bool(set_role_to) and is_owner_or_admin,
|
||||
"retrieve": self.user and self.user.id == user.id or is_owner_or_admin,
|
||||
"retrieve": (self.user and self.user.id == user.id) or is_owner_or_admin,
|
||||
"set_role_to": set_role_to,
|
||||
}
|
||||
|
||||
@@ -1352,11 +1361,65 @@ class TemplateAccess(BaseAccess):
|
||||
def __str__(self):
|
||||
return f"{self.user!s} is {self.role:s} in template {self.template!s}"
|
||||
|
||||
def get_role(self, user):
|
||||
"""
|
||||
Get the role a user has on a resource.
|
||||
"""
|
||||
if not user.is_authenticated:
|
||||
return None
|
||||
|
||||
try:
|
||||
roles = self.user_roles or []
|
||||
except AttributeError:
|
||||
teams = user.teams
|
||||
try:
|
||||
roles = self.template.accesses.filter(
|
||||
models.Q(user=user) | models.Q(team__in=teams),
|
||||
).values_list("role", flat=True)
|
||||
except (Template.DoesNotExist, IndexError):
|
||||
roles = []
|
||||
|
||||
return RoleChoices.max(*roles)
|
||||
|
||||
def get_abilities(self, user):
|
||||
"""
|
||||
Compute and return abilities for a given user on the template access.
|
||||
"""
|
||||
return self._get_abilities(self.template, user)
|
||||
role = self.get_role(user)
|
||||
is_owner_or_admin = role in PRIVILEGED_ROLES
|
||||
|
||||
if self.role == RoleChoices.OWNER:
|
||||
can_delete = (role == RoleChoices.OWNER) and self.template.accesses.filter(
|
||||
role=RoleChoices.OWNER
|
||||
).count() > 1
|
||||
set_role_to = (
|
||||
[RoleChoices.ADMIN, RoleChoices.EDITOR, RoleChoices.READER]
|
||||
if can_delete
|
||||
else []
|
||||
)
|
||||
else:
|
||||
can_delete = is_owner_or_admin
|
||||
set_role_to = []
|
||||
if role == RoleChoices.OWNER:
|
||||
set_role_to.append(RoleChoices.OWNER)
|
||||
if is_owner_or_admin:
|
||||
set_role_to.extend(
|
||||
[RoleChoices.ADMIN, RoleChoices.EDITOR, RoleChoices.READER]
|
||||
)
|
||||
|
||||
# Remove the current role as we don't want to propose it as an option
|
||||
try:
|
||||
set_role_to.remove(self.role)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return {
|
||||
"destroy": can_delete,
|
||||
"update": bool(set_role_to),
|
||||
"partial_update": bool(set_role_to),
|
||||
"retrieve": bool(role),
|
||||
"set_role_to": set_role_to,
|
||||
}
|
||||
|
||||
|
||||
class Invitation(BaseModel):
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""
|
||||
Test document accesses API endpoints for users in impress's core app.
|
||||
"""
|
||||
# pylint: disable=too-many-lines
|
||||
|
||||
import random
|
||||
from uuid import uuid4
|
||||
@@ -64,8 +65,8 @@ def test_api_document_accesses_list_unexisting_document():
|
||||
client.force_login(user)
|
||||
|
||||
response = client.get(f"/api/v1.0/documents/{uuid4()!s}/accesses/")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == []
|
||||
assert response.status_code == 404
|
||||
assert response.json() == {"detail": "Not found."}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("via", VIA)
|
||||
@@ -74,7 +75,7 @@ def test_api_document_accesses_list_unexisting_document():
|
||||
[role for role in choices.RoleChoices if role not in choices.PRIVILEGED_ROLES],
|
||||
)
|
||||
def test_api_document_accesses_list_authenticated_related_non_privileged(
|
||||
via, role, mock_user_teams
|
||||
via, role, mock_user_teams, django_assert_num_queries
|
||||
):
|
||||
"""
|
||||
Authenticated users with no privileged role should only be able to list document
|
||||
@@ -95,10 +96,13 @@ def test_api_document_accesses_list_authenticated_related_non_privileged(
|
||||
child = factories.DocumentFactory(parent=document)
|
||||
|
||||
# Create accesses related to each document
|
||||
factories.UserDocumentAccessFactory(document=unreadable_ancestor)
|
||||
grand_parent_access = factories.UserDocumentAccessFactory(document=grand_parent)
|
||||
parent_access = factories.UserDocumentAccessFactory(document=parent)
|
||||
document_access = factories.UserDocumentAccessFactory(document=document)
|
||||
accesses = (
|
||||
factories.UserDocumentAccessFactory(document=unreadable_ancestor),
|
||||
factories.UserDocumentAccessFactory(document=grand_parent),
|
||||
factories.UserDocumentAccessFactory(document=parent),
|
||||
factories.UserDocumentAccessFactory(document=document),
|
||||
factories.TeamDocumentAccessFactory(document=document),
|
||||
)
|
||||
factories.UserDocumentAccessFactory(document=child)
|
||||
|
||||
if via == USER:
|
||||
@@ -115,22 +119,17 @@ def test_api_document_accesses_list_authenticated_related_non_privileged(
|
||||
role=role,
|
||||
)
|
||||
|
||||
access1 = factories.TeamDocumentAccessFactory(document=document)
|
||||
access2 = factories.UserDocumentAccessFactory(document=document)
|
||||
|
||||
# Accesses for other documents to which the user is related should not be listed either
|
||||
other_access = factories.UserDocumentAccessFactory(user=user)
|
||||
factories.UserDocumentAccessFactory(document=other_access.document)
|
||||
|
||||
response = client.get(
|
||||
f"/api/v1.0/documents/{document.id!s}/accesses/",
|
||||
)
|
||||
with django_assert_num_queries(3):
|
||||
response = client.get(f"/api/v1.0/documents/{document.id!s}/accesses/")
|
||||
|
||||
assert response.status_code == 200
|
||||
content = response.json()
|
||||
|
||||
# Make sure only privileged roles are returned
|
||||
accesses = [grand_parent_access, parent_access, document_access, access1, access2]
|
||||
privileged_accesses = [
|
||||
acc for acc in accesses if acc.role in choices.PRIVILEGED_ROLES
|
||||
]
|
||||
@@ -140,9 +139,8 @@ def test_api_document_accesses_list_authenticated_related_non_privileged(
|
||||
[
|
||||
{
|
||||
"id": str(access.id),
|
||||
"document_id": str(access.document_id),
|
||||
"user": {
|
||||
"id": None,
|
||||
"email": None,
|
||||
"full_name": access.user.full_name,
|
||||
"short_name": access.user.short_name,
|
||||
}
|
||||
@@ -150,7 +148,13 @@ def test_api_document_accesses_list_authenticated_related_non_privileged(
|
||||
else None,
|
||||
"team": access.team,
|
||||
"role": access.role,
|
||||
"abilities": access.get_abilities(user),
|
||||
"abilities": {
|
||||
"destroy": False,
|
||||
"partial_update": False,
|
||||
"retrieve": False,
|
||||
"set_role_to": [],
|
||||
"update": False,
|
||||
},
|
||||
}
|
||||
for access in privileged_accesses
|
||||
],
|
||||
@@ -163,7 +167,7 @@ def test_api_document_accesses_list_authenticated_related_non_privileged(
|
||||
"role", [role for role in choices.RoleChoices if role in choices.PRIVILEGED_ROLES]
|
||||
)
|
||||
def test_api_document_accesses_list_authenticated_related_privileged(
|
||||
via, role, mock_user_teams
|
||||
via, role, mock_user_teams, django_assert_num_queries
|
||||
):
|
||||
"""
|
||||
Authenticated users with a privileged role should be able to list all
|
||||
@@ -183,13 +187,6 @@ def test_api_document_accesses_list_authenticated_related_privileged(
|
||||
document = factories.DocumentFactory(parent=parent)
|
||||
child = factories.DocumentFactory(parent=document)
|
||||
|
||||
# Create accesses related to each document
|
||||
factories.UserDocumentAccessFactory(document=unreadable_ancestor)
|
||||
grand_parent_access = factories.UserDocumentAccessFactory(document=grand_parent)
|
||||
parent_access = factories.UserDocumentAccessFactory(document=parent)
|
||||
document_access = factories.UserDocumentAccessFactory(document=document)
|
||||
factories.UserDocumentAccessFactory(document=child)
|
||||
|
||||
if via == USER:
|
||||
user_access = models.DocumentAccess.objects.create(
|
||||
document=document,
|
||||
@@ -206,31 +203,37 @@ def test_api_document_accesses_list_authenticated_related_privileged(
|
||||
else:
|
||||
raise RuntimeError()
|
||||
|
||||
access1 = factories.TeamDocumentAccessFactory(document=document)
|
||||
access2 = factories.UserDocumentAccessFactory(document=document)
|
||||
# Create accesses related to each document
|
||||
ancestors_accesses = [
|
||||
# Access on unreadable ancestor should still be listed
|
||||
# as the related user gains access to our document
|
||||
factories.UserDocumentAccessFactory(document=unreadable_ancestor),
|
||||
factories.UserDocumentAccessFactory(document=grand_parent),
|
||||
factories.UserDocumentAccessFactory(document=parent),
|
||||
]
|
||||
document_accesses = [
|
||||
factories.UserDocumentAccessFactory(document=document),
|
||||
factories.TeamDocumentAccessFactory(document=document),
|
||||
factories.UserDocumentAccessFactory(document=document),
|
||||
user_access,
|
||||
]
|
||||
factories.UserDocumentAccessFactory(document=child)
|
||||
|
||||
# Accesses for other documents to which the user is related should not be listed either
|
||||
other_access = factories.UserDocumentAccessFactory(user=user)
|
||||
factories.UserDocumentAccessFactory(document=other_access.document)
|
||||
|
||||
response = client.get(
|
||||
f"/api/v1.0/documents/{document.id!s}/accesses/",
|
||||
)
|
||||
nb_queries = 3
|
||||
if role == "owner":
|
||||
# Queries that secure the owner status
|
||||
nb_queries += sum(acc.role == "owner" for acc in document_accesses)
|
||||
|
||||
with django_assert_num_queries(nb_queries):
|
||||
response = client.get(f"/api/v1.0/documents/{document.id!s}/accesses/")
|
||||
|
||||
assert response.status_code == 200
|
||||
content = response.json()
|
||||
|
||||
# Make sure all expected accesses are returned
|
||||
accesses = [
|
||||
user_access,
|
||||
grand_parent_access,
|
||||
parent_access,
|
||||
document_access,
|
||||
access1,
|
||||
access2,
|
||||
]
|
||||
assert len(content) == 6
|
||||
|
||||
assert len(content) == 7
|
||||
assert sorted(content, key=lambda x: x["id"]) == sorted(
|
||||
[
|
||||
{
|
||||
@@ -249,7 +252,7 @@ def test_api_document_accesses_list_authenticated_related_privileged(
|
||||
"role": access.role,
|
||||
"abilities": access.get_abilities(user),
|
||||
}
|
||||
for access in accesses
|
||||
for access in ancestors_accesses + document_accesses
|
||||
],
|
||||
key=lambda x: x["id"],
|
||||
)
|
||||
@@ -310,7 +313,9 @@ def test_api_document_accesses_retrieve_authenticated_unrelated():
|
||||
@pytest.mark.parametrize("via", VIA)
|
||||
@pytest.mark.parametrize("role", models.RoleChoices)
|
||||
def test_api_document_accesses_retrieve_authenticated_related(
|
||||
via, role, mock_user_teams
|
||||
via,
|
||||
role,
|
||||
mock_user_teams,
|
||||
):
|
||||
"""
|
||||
A user who is related to a document should be allowed to retrieve the
|
||||
@@ -491,26 +496,21 @@ def test_api_document_accesses_update_administrator_except_owner(
|
||||
|
||||
for field, value in new_values.items():
|
||||
new_data = {**old_values, field: value}
|
||||
if new_data["role"] == old_values["role"]:
|
||||
with mock_reset_connections(document.id, str(access.user_id)):
|
||||
response = client.put(
|
||||
f"/api/v1.0/documents/{document.id!s}/accesses/{access.id!s}/",
|
||||
data=new_data,
|
||||
format="json",
|
||||
)
|
||||
assert response.status_code == 403
|
||||
else:
|
||||
with mock_reset_connections(document.id, str(access.user_id)):
|
||||
response = client.put(
|
||||
f"/api/v1.0/documents/{document.id!s}/accesses/{access.id!s}/",
|
||||
data=new_data,
|
||||
format="json",
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.status_code == 200
|
||||
|
||||
access.refresh_from_db()
|
||||
updated_values = serializers.DocumentAccessSerializer(instance=access).data
|
||||
if field == "role":
|
||||
assert updated_values == {**old_values, "role": new_values["role"]}
|
||||
assert updated_values == {
|
||||
**old_values,
|
||||
"role": new_values["role"],
|
||||
}
|
||||
else:
|
||||
assert updated_values == old_values
|
||||
|
||||
@@ -605,7 +605,7 @@ def test_api_document_accesses_update_administrator_to_owner(
|
||||
for field, value in new_values.items():
|
||||
new_data = {**old_values, field: value}
|
||||
# We are not allowed or not really updating the role
|
||||
if field == "role" or new_data["role"] == old_values["role"]:
|
||||
if field == "role":
|
||||
response = client.put(
|
||||
f"/api/v1.0/documents/{document.id!s}/accesses/{access.id!s}/",
|
||||
data=new_data,
|
||||
@@ -665,30 +665,23 @@ def test_api_document_accesses_update_owner(
|
||||
|
||||
for field, value in new_values.items():
|
||||
new_data = {**old_values, field: value}
|
||||
if (
|
||||
new_data["role"] == old_values["role"]
|
||||
): # we are not really updating the role
|
||||
with mock_reset_connections(document.id, str(access.user_id)):
|
||||
response = client.put(
|
||||
f"/api/v1.0/documents/{document.id!s}/accesses/{access.id!s}/",
|
||||
data=new_data,
|
||||
format="json",
|
||||
)
|
||||
assert response.status_code == 403
|
||||
else:
|
||||
with mock_reset_connections(document.id, str(access.user_id)):
|
||||
response = client.put(
|
||||
f"/api/v1.0/documents/{document.id!s}/accesses/{access.id!s}/",
|
||||
data=new_data,
|
||||
format="json",
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.status_code == 200
|
||||
|
||||
access.refresh_from_db()
|
||||
updated_values = serializers.DocumentAccessSerializer(instance=access).data
|
||||
|
||||
if field == "role":
|
||||
assert updated_values == {**old_values, "role": new_values["role"]}
|
||||
assert updated_values == {
|
||||
**old_values,
|
||||
"role": new_values["role"],
|
||||
}
|
||||
else:
|
||||
assert updated_values == old_values
|
||||
|
||||
|
||||
@@ -123,7 +123,7 @@ def test_models_document_access_get_abilities_for_owner_of_self_allowed():
|
||||
"retrieve": True,
|
||||
"update": True,
|
||||
"partial_update": True,
|
||||
"set_role_to": ["administrator", "editor", "reader"],
|
||||
"set_role_to": ["reader", "editor", "administrator", "owner"],
|
||||
}
|
||||
|
||||
|
||||
@@ -155,7 +155,7 @@ def test_models_document_access_get_abilities_for_owner_of_owner():
|
||||
"retrieve": True,
|
||||
"update": True,
|
||||
"partial_update": True,
|
||||
"set_role_to": ["administrator", "editor", "reader"],
|
||||
"set_role_to": ["reader", "editor", "administrator", "owner"],
|
||||
}
|
||||
|
||||
|
||||
@@ -172,7 +172,7 @@ def test_models_document_access_get_abilities_for_owner_of_administrator():
|
||||
"retrieve": True,
|
||||
"update": True,
|
||||
"partial_update": True,
|
||||
"set_role_to": ["owner", "editor", "reader"],
|
||||
"set_role_to": ["reader", "editor", "administrator", "owner"],
|
||||
}
|
||||
|
||||
|
||||
@@ -189,7 +189,7 @@ def test_models_document_access_get_abilities_for_owner_of_editor():
|
||||
"retrieve": True,
|
||||
"update": True,
|
||||
"partial_update": True,
|
||||
"set_role_to": ["owner", "administrator", "reader"],
|
||||
"set_role_to": ["reader", "editor", "administrator", "owner"],
|
||||
}
|
||||
|
||||
|
||||
@@ -206,7 +206,7 @@ def test_models_document_access_get_abilities_for_owner_of_reader():
|
||||
"retrieve": True,
|
||||
"update": True,
|
||||
"partial_update": True,
|
||||
"set_role_to": ["owner", "administrator", "editor"],
|
||||
"set_role_to": ["reader", "editor", "administrator", "owner"],
|
||||
}
|
||||
|
||||
|
||||
@@ -243,7 +243,7 @@ def test_models_document_access_get_abilities_for_administrator_of_administrator
|
||||
"retrieve": True,
|
||||
"update": True,
|
||||
"partial_update": True,
|
||||
"set_role_to": ["editor", "reader"],
|
||||
"set_role_to": ["reader", "editor", "administrator"],
|
||||
}
|
||||
|
||||
|
||||
@@ -260,7 +260,7 @@ def test_models_document_access_get_abilities_for_administrator_of_editor():
|
||||
"retrieve": True,
|
||||
"update": True,
|
||||
"partial_update": True,
|
||||
"set_role_to": ["administrator", "reader"],
|
||||
"set_role_to": ["reader", "editor", "administrator"],
|
||||
}
|
||||
|
||||
|
||||
@@ -277,7 +277,7 @@ def test_models_document_access_get_abilities_for_administrator_of_reader():
|
||||
"retrieve": True,
|
||||
"update": True,
|
||||
"partial_update": True,
|
||||
"set_role_to": ["administrator", "editor"],
|
||||
"set_role_to": ["reader", "editor", "administrator"],
|
||||
}
|
||||
|
||||
|
||||
@@ -400,12 +400,12 @@ def test_models_document_access_get_abilities_for_reader_of_reader_user(
|
||||
|
||||
|
||||
def test_models_document_access_get_abilities_preset_role(django_assert_num_queries):
|
||||
"""No query is done if the role is preset, e.g., with a query annotation."""
|
||||
"""No query is done if user roles are preset on the document, e.g., with a query annotation."""
|
||||
access = factories.UserDocumentAccessFactory(role="reader")
|
||||
user = factories.UserDocumentAccessFactory(
|
||||
document=access.document, role="reader"
|
||||
).user
|
||||
access.user_roles = ["reader"]
|
||||
access.set_user_roles_tuple(None, "reader")
|
||||
|
||||
with django_assert_num_queries(0):
|
||||
abilities = access.get_abilities(user)
|
||||
|
||||
Reference in New Issue
Block a user