(backend) retrieve & update a document taking into account ancestors

A document should inherit the access rights a user has on any of its
ancestors.
This commit is contained in:
Samuel Paccoud - DINUM
2024-12-17 07:47:23 +01:00
committed by Anthony LC
parent 48662ceecb
commit d073a9c9b3
7 changed files with 544 additions and 124 deletions

View File

@@ -152,8 +152,8 @@ class ListDocumentSerializer(BaseResourceSerializer):
model = models.Document model = models.Document
fields = [ fields = [
"id", "id",
"abilities",
"content", "content",
"abilities",
"created_at", "created_at",
"creator", "creator",
"is_favorite", "is_favorite",

View File

@@ -13,9 +13,9 @@ from django.core.exceptions import ValidationError
from django.core.files.storage import default_storage from django.core.files.storage import default_storage
from django.db import models as db from django.db import models as db
from django.db.models import ( from django.db.models import (
Count,
Exists, Exists,
F, F,
Func,
OuterRef, OuterRef,
Q, Q,
Subquery, Subquery,
@@ -353,13 +353,22 @@ class DocumentViewSet(
return serializers.ListDocumentSerializer return serializers.ListDocumentSerializer
return self.serializer_class return self.serializer_class
def get_queryset(self): def annotate_queryset(self, queryset):
"""Optimize queryset to include favorite status for the current user.""" """Annotate document queryset with favorite and number of accesses."""
queryset = super().get_queryset()
user = self.request.user user = self.request.user
# Annotate the number of accesses associated with each document # Annotate the number of accesses taking into account ancestors
queryset = queryset.annotate(nb_accesses=Count("accesses", distinct=True)) ancestor_accesses_query = (
models.DocumentAccess.objects.filter(
document__path=Left(OuterRef("path"), Length("document__path")),
)
.order_by()
.annotate(total_accesses=Func(Value("id"), function="COUNT"))
.values("total_accesses")
)
# Annotate with the number of accesses, default to 0 if no accesses exist
queryset = queryset.annotate(nb_accesses=Subquery(ancestor_accesses_query))
if not user.is_authenticated: if not user.is_authenticated:
# If the user is not authenticated, annotate `is_favorite` as False # If the user is not authenticated, annotate `is_favorite` as False
@@ -369,19 +378,13 @@ class DocumentViewSet(
favorite_exists = models.DocumentFavorite.objects.filter( favorite_exists = models.DocumentFavorite.objects.filter(
document_id=OuterRef("pk"), user=user document_id=OuterRef("pk"), user=user
) )
queryset = queryset.annotate(is_favorite=Exists(favorite_exists)) return queryset.annotate(is_favorite=Exists(favorite_exists))
# Annotate the queryset with the logged-in user roles def get_queryset(self):
user_roles_query = ( """Optimize queryset to include favorite status for the current user."""
models.DocumentAccess.objects.filter( queryset = super().get_queryset()
Q(user=user) | Q(team__in=user.teams), queryset = self.annotate_queryset(queryset)
document_id=OuterRef("pk"), return queryset.distinct()
)
.values("document")
.annotate(roles_array=ArrayAgg("role"))
.values("roles_array")
)
return queryset.annotate(user_roles=Subquery(user_roles_query)).distinct()
def list(self, request, *args, **kwargs): def list(self, request, *args, **kwargs):
"""Restrict resources returned by the list endpoint""" """Restrict resources returned by the list endpoint"""
@@ -504,8 +507,9 @@ class DocumentViewSet(
# Users should not see version history dating from before they gained access to the # Users should not see version history dating from before they gained access to the
# document. Filter to get the minimum access date for the logged-in user # document. Filter to get the minimum access date for the logged-in user
access_queryset = document.accesses.filter( access_queryset = models.DocumentAccess.objects.filter(
db.Q(user=user) | db.Q(team__in=user.teams) db.Q(user=user) | db.Q(team__in=user.teams),
document__path=Left(Value(document.path), Length("document__path")),
).aggregate(min_date=db.Min("created_at")) ).aggregate(min_date=db.Min("created_at"))
# Handle the case where the user has no accesses # Handle the case where the user has no accesses
@@ -543,10 +547,12 @@ class DocumentViewSet(
user = request.user user = request.user
min_datetime = min( min_datetime = min(
access.created_at access.created_at
for access in document.accesses.filter( for access in models.DocumentAccess.objects.filter(
db.Q(user=user) | db.Q(team__in=user.teams), db.Q(user=user) | db.Q(team__in=user.teams),
document__path=Left(Value(document.path), Length("document__path")),
) )
) )
if response["LastModified"] < min_datetime: if response["LastModified"] < min_datetime:
raise Http404 raise Http404

View File

@@ -18,6 +18,10 @@ from django.core.files.base import ContentFile
from django.core.files.storage import default_storage from django.core.files.storage import default_storage
from django.core.mail import send_mail from django.core.mail import send_mail
from django.db import models from django.db import models
from django.db.models.functions import Left, Length
from django.http import FileResponse
from django.template.base import Template as DjangoTemplate
from django.template.context import Context
from django.template.loader import render_to_string from django.template.loader import render_to_string
from django.utils import timezone from django.utils import timezone
from django.utils.functional import cached_property, lazy from django.utils.functional import cached_property, lazy
@@ -31,23 +35,6 @@ from treebeard.mp_tree import MP_Node
logger = getLogger(__name__) logger = getLogger(__name__)
def get_resource_roles(resource, user):
"""Compute the roles a user has on a resource."""
if not user.is_authenticated:
return []
try:
roles = resource.user_roles or []
except AttributeError:
try:
roles = resource.accesses.filter(
models.Q(user=user) | models.Q(team__in=user.teams),
).values_list("role", flat=True)
except (models.ObjectDoesNotExist, IndexError):
roles = []
return roles
class LinkRoleChoices(models.TextChoices): class LinkRoleChoices(models.TextChoices):
"""Defines the possible roles a link can offer on a document.""" """Defines the possible roles a link can offer on a document."""
@@ -539,11 +526,30 @@ class Document(MP_Node, BaseModel):
Bucket=default_storage.bucket_name, Key=self.file_key, VersionId=version_id Bucket=default_storage.bucket_name, Key=self.file_key, VersionId=version_id
) )
def get_roles(self, user):
"""Return the roles a user has on a document."""
if not user.is_authenticated:
return []
try:
roles = self.user_roles or []
except AttributeError:
try:
roles = DocumentAccess.objects.filter(
models.Q(user=user) | models.Q(team__in=user.teams),
document__path=Left(
models.Value(self.path), Length("document__path")
),
).values_list("role", flat=True)
except (models.ObjectDoesNotExist, IndexError):
roles = []
return roles
def get_abilities(self, user): def get_abilities(self, user):
""" """
Compute and return abilities for a given user on the document. Compute and return abilities for a given user on the document.
""" """
roles = set(get_resource_roles(self, user)) roles = set(self.get_roles(user))
# Compute version roles before adding link roles because we don't # Compute version roles before adding link roles because we don't
# want anonymous users to access versions (we wouldn't know from # want anonymous users to access versions (we wouldn't know from
@@ -551,11 +557,20 @@ class Document(MP_Node, BaseModel):
# Anonymous users should also not see document accesses # Anonymous users should also not see document accesses
has_role = bool(roles) has_role = bool(roles)
# Add role provided by the document link # Add roles provided by the document link, taking into account its ancestors
if self.link_reach == LinkReachChoices.PUBLIC or ( link_reaches = list(self.get_ancestors().values("link_reach", "link_role"))
self.link_reach == LinkReachChoices.AUTHENTICATED and user.is_authenticated link_reaches.append(
): {"link_reach": self.link_reach, "link_role": self.link_role}
roles.add(self.link_role) )
for lr in link_reaches:
if lr["link_reach"] == LinkReachChoices.PUBLIC:
roles.add(lr["link_role"])
if user.is_authenticated:
for lr in link_reaches:
if lr["link_reach"] == LinkReachChoices.AUTHENTICATED:
roles.add(lr["link_role"])
is_owner_or_admin = bool( is_owner_or_admin = bool(
roles.intersection({RoleChoices.OWNER, RoleChoices.ADMIN}) roles.intersection({RoleChoices.OWNER, RoleChoices.ADMIN})
@@ -772,11 +787,27 @@ class Template(BaseModel):
def __str__(self): def __str__(self):
return self.title return self.title
def get_roles(self, user):
"""Return the roles a user has on a resource."""
if not user.is_authenticated:
return []
try:
roles = self.user_roles or []
except AttributeError:
try:
roles = self.accesses.filter(
models.Q(user=user) | models.Q(team__in=user.teams),
).values_list("role", flat=True)
except (models.ObjectDoesNotExist, IndexError):
roles = []
return roles
def get_abilities(self, user): def get_abilities(self, user):
""" """
Compute and return abilities for a given user on the template. Compute and return abilities for a given user on the template.
""" """
roles = get_resource_roles(self, user) roles = self.get_roles(user)
is_owner_or_admin = bool( is_owner_or_admin = bool(
set(roles).intersection({RoleChoices.OWNER, RoleChoices.ADMIN}) set(roles).intersection({RoleChoices.OWNER, RoleChoices.ADMIN})
) )

View File

@@ -185,6 +185,82 @@ def test_api_document_versions_list_authenticated_related_pagination(
assert content["versions"][0]["version_id"] == all_version_ids[2] assert content["versions"][0]["version_id"] == all_version_ids[2]
@pytest.mark.parametrize("via", VIA)
def test_api_document_versions_list_authenticated_related_pagination_parent(
via, mock_user_teams
):
"""
When a user gains access to a document's versions via an ancestor, the date of access
to the parent should be used to filter versions that were created prior to the
user gaining access to the document.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
grand_parent = factories.DocumentFactory()
parent = factories.DocumentFactory(parent=grand_parent)
document = factories.DocumentFactory(parent=parent)
for i in range(3):
document.content = f"before {i:d}"
document.save()
if via == USER:
models.DocumentAccess.objects.create(
document=grand_parent,
user=user,
role=random.choice(models.RoleChoices.choices)[0],
)
elif via == TEAM:
mock_user_teams.return_value = ["lasuite", "unknown"]
models.DocumentAccess.objects.create(
document=grand_parent,
team="lasuite",
role=random.choice(models.RoleChoices.choices)[0],
)
for i in range(4):
document.content = f"after {i:d}"
document.save()
response = client.get(
f"/api/v1.0/documents/{document.id!s}/versions/",
)
content = response.json()
assert content["is_truncated"] is False
# The current version is not listed
assert content["count"] == 3
assert content["next_version_id_marker"] == ""
all_version_ids = [version["version_id"] for version in content["versions"]]
# - set page size
response = client.get(
f"/api/v1.0/documents/{document.id!s}/versions/?page_size=2",
)
content = response.json()
assert content["count"] == 2
assert content["is_truncated"] is True
marker = content["next_version_id_marker"]
assert marker == all_version_ids[1]
assert [
version["version_id"] for version in content["versions"]
] == all_version_ids[:2]
# - get page 2
response = client.get(
f"/api/v1.0/documents/{document.id!s}/versions/?page_size=2&version_id={marker:s}",
)
content = response.json()
assert content["count"] == 1
assert content["is_truncated"] is False
assert content["next_version_id_marker"] == ""
assert content["versions"][0]["version_id"] == all_version_ids[2]
def test_api_document_versions_list_exceeds_max_page_size(): def test_api_document_versions_list_exceeds_max_page_size():
"""Page size should not exceed the limit set on the serializer""" """Page size should not exceed the limit set on the serializer"""
user = factories.UserFactory() user = factories.UserFactory()
@@ -314,6 +390,74 @@ def test_api_document_versions_retrieve_authenticated_related(via, mock_user_tea
assert response.json()["content"] == "new content 1" assert response.json()["content"] == "new content 1"
@pytest.mark.parametrize("via", VIA)
def test_api_document_versions_retrieve_authenticated_related_parent(
via, mock_user_teams
):
"""
A user who gains access to a document's versions via one of its ancestors, should be able to
retrieve the document versions. The date of access to the parent should be used to filter
versions that were created prior to the user gaining access to the document.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
grand_parent = factories.DocumentFactory()
parent = factories.DocumentFactory(parent=grand_parent)
document = factories.DocumentFactory(parent=parent)
document.content = "new content"
document.save()
assert len(document.get_versions_slice()["versions"]) == 1
version_id = document.get_versions_slice()["versions"][0]["version_id"]
if via == USER:
factories.UserDocumentAccessFactory(document=grand_parent, user=user)
elif via == TEAM:
mock_user_teams.return_value = ["lasuite", "unknown"]
factories.TeamDocumentAccessFactory(document=grand_parent, team="lasuite")
time.sleep(1) # minio stores datetimes with the precision of a second
# Versions created before the document was shared should not be seen by the user
response = client.get(
f"/api/v1.0/documents/{document.id!s}/versions/{version_id:s}/",
)
assert response.status_code == 404
# Create a new version should not make it available to the user because
# only the current version is available to the user but it is excluded
# from the list
document.content = "new content 1"
document.save()
assert len(document.get_versions_slice()["versions"]) == 2
version_id = document.get_versions_slice()["versions"][0]["version_id"]
response = client.get(
f"/api/v1.0/documents/{document.id!s}/versions/{version_id:s}/",
)
assert response.status_code == 404
# Adding one more version should make the previous version available to the user
document.content = "new content 2"
document.save()
assert len(document.get_versions_slice()["versions"]) == 3
version_id = document.get_versions_slice()["versions"][0]["version_id"]
response = client.get(
f"/api/v1.0/documents/{document.id!s}/versions/{version_id:s}/",
)
assert response.status_code == 200
assert response.json()["content"] == "new content 1"
def test_api_document_versions_create_anonymous(): def test_api_document_versions_create_anonymous():
"""Anonymous users should not be allowed to create document versions.""" """Anonymous users should not be allowed to create document versions."""
document = factories.DocumentFactory() document = factories.DocumentFactory()

View File

@@ -59,8 +59,8 @@ def test_api_documents_list_format():
assert len(results) == 1 assert len(results) == 1
assert results[0] == { assert results[0] == {
"id": str(document.id), "id": str(document.id),
"abilities": document.get_abilities(user),
"content": document.content, "content": document.content,
"abilities": document.get_abilities(user),
"created_at": document.created_at.isoformat().replace("+00:00", "Z"), "created_at": document.created_at.isoformat().replace("+00:00", "Z"),
"creator": str(document.creator.id), "creator": str(document.creator.id),
"is_favorite": True, "is_favorite": True,
@@ -79,7 +79,6 @@ def test_api_documents_list_authenticated_direct(django_assert_num_queries):
than restricted. than restricted.
""" """
user = factories.UserFactory() user = factories.UserFactory()
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
@@ -110,7 +109,7 @@ def test_api_documents_list_authenticated_direct(django_assert_num_queries):
expected_ids = {str(document1.id), str(document2.id), str(child3_with_access.id)} expected_ids = {str(document1.id), str(document2.id), str(child3_with_access.id)}
with django_assert_num_queries(3): with django_assert_num_queries(7):
response = client.get("/api/v1.0/documents/") response = client.get("/api/v1.0/documents/")
assert response.status_code == 200 assert response.status_code == 200
@@ -145,7 +144,7 @@ def test_api_documents_list_authenticated_via_team(
expected_ids = {str(document.id) for document in documents_team1 + documents_team2} expected_ids = {str(document.id) for document in documents_team1 + documents_team2}
with django_assert_num_queries(3): with django_assert_num_queries(8):
response = client.get("/api/v1.0/documents/") response = client.get("/api/v1.0/documents/")
assert response.status_code == 200 assert response.status_code == 200
@@ -174,7 +173,7 @@ def test_api_documents_list_authenticated_link_reach_restricted(
other_document = factories.DocumentFactory(link_reach="public") other_document = factories.DocumentFactory(link_reach="public")
models.LinkTrace.objects.create(document=other_document, user=user) models.LinkTrace.objects.create(document=other_document, user=user)
with django_assert_num_queries(3): with django_assert_num_queries(4):
response = client.get( response = client.get(
"/api/v1.0/documents/", "/api/v1.0/documents/",
) )
@@ -221,7 +220,7 @@ def test_api_documents_list_authenticated_link_reach_public_or_authenticated(
expected_ids = {str(document1.id), str(document2.id), str(visible_child.id)} expected_ids = {str(document1.id), str(document2.id), str(visible_child.id)}
with django_assert_num_queries(3): with django_assert_num_queries(7):
response = client.get( response = client.get(
"/api/v1.0/documents/", "/api/v1.0/documents/",
) )
@@ -315,7 +314,7 @@ def test_api_documents_list_favorites_no_extra_queries(django_assert_num_queries
factories.DocumentFactory.create_batch(2, users=[user]) factories.DocumentFactory.create_batch(2, users=[user])
url = "/api/v1.0/documents/" url = "/api/v1.0/documents/"
with django_assert_num_queries(3): with django_assert_num_queries(8):
response = client.get(url) response = client.get(url)
assert response.status_code == 200 assert response.status_code == 200
@@ -328,7 +327,7 @@ def test_api_documents_list_favorites_no_extra_queries(django_assert_num_queries
for document in special_documents: for document in special_documents:
models.DocumentFavorite.objects.create(document=document, user=user) models.DocumentFavorite.objects.create(document=document, user=user)
with django_assert_num_queries(3): with django_assert_num_queries(8):
response = client.get(url) response = client.get(url)
assert response.status_code == 200 assert response.status_code == 200

View File

@@ -2,16 +2,17 @@
Tests for Documents API endpoint in impress's core app: retrieve Tests for Documents API endpoint in impress's core app: retrieve
""" """
import random
import pytest import pytest
from rest_framework.test import APIClient from rest_framework.test import APIClient
from core import factories, models from core import factories, models
from core.api import serializers
pytestmark = pytest.mark.django_db pytestmark = pytest.mark.django_db
def test_api_documents_retrieve_anonymous_public(): def test_api_documents_retrieve_anonymous_public_standalone():
"""Anonymous users should be allowed to retrieve public documents.""" """Anonymous users should be allowed to retrieve public documents."""
document = factories.DocumentFactory(link_reach="public") document = factories.DocumentFactory(link_reach="public")
@@ -52,6 +53,70 @@ def test_api_documents_retrieve_anonymous_public():
} }
def test_api_documents_retrieve_anonymous_public_parent():
"""Anonymous users should be allowed to retrieve a document who has a public ancestor."""
grand_parent = factories.DocumentFactory(link_reach="public")
parent = factories.DocumentFactory(
parent=grand_parent, link_reach=random.choice(["authenticated", "restricted"])
)
document = factories.DocumentFactory(
link_reach=random.choice(["authenticated", "restricted"]), parent=parent
)
response = APIClient().get(f"/api/v1.0/documents/{document.id!s}/")
assert response.status_code == 200
assert response.json() == {
"id": str(document.id),
"abilities": {
"accesses_manage": False,
"accesses_view": False,
"ai_transform": grand_parent.link_role == "editor",
"ai_translate": grand_parent.link_role == "editor",
"attachment_upload": grand_parent.link_role == "editor",
"collaboration_auth": True,
"destroy": False,
# Anonymous user can't favorite a document even with read access
"favorite": False,
"invite_owner": False,
"link_configuration": False,
"media_auth": True,
"partial_update": grand_parent.link_role == "editor",
"retrieve": True,
"update": grand_parent.link_role == "editor",
"versions_destroy": False,
"versions_list": False,
"versions_retrieve": False,
},
"content": document.content,
"created_at": document.created_at.isoformat().replace("+00:00", "Z"),
"creator": str(document.creator.id),
"is_favorite": False,
"link_reach": document.link_reach,
"link_role": document.link_role,
"nb_accesses": 0,
"title": document.title,
"updated_at": document.updated_at.isoformat().replace("+00:00", "Z"),
}
def test_api_documents_retrieve_anonymous_public_child():
"""
Anonymous users having access to a document should not gain access to a parent document.
"""
document = factories.DocumentFactory(
link_reach=random.choice(["authenticated", "restricted"])
)
factories.DocumentFactory(link_reach="public", parent=document)
response = APIClient().get(f"/api/v1.0/documents/{document.id!s}/")
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
@pytest.mark.parametrize("reach", ["restricted", "authenticated"]) @pytest.mark.parametrize("reach", ["restricted", "authenticated"])
def test_api_documents_retrieve_anonymous_restricted_or_authenticated(reach): def test_api_documents_retrieve_anonymous_restricted_or_authenticated(reach):
"""Anonymous users should not be able to retrieve a document that is not public.""" """Anonymous users should not be able to retrieve a document that is not public."""
@@ -68,8 +133,8 @@ def test_api_documents_retrieve_anonymous_restricted_or_authenticated(reach):
@pytest.mark.parametrize("reach", ["public", "authenticated"]) @pytest.mark.parametrize("reach", ["public", "authenticated"])
def test_api_documents_retrieve_authenticated_unrelated_public_or_authenticated(reach): def test_api_documents_retrieve_authenticated_unrelated_public_or_authenticated(reach):
""" """
Authenticated users should be able to retrieve a public document to which they are Authenticated users should be able to retrieve a public/authenticated document to
not related. which they are not related.
""" """
user = factories.UserFactory() user = factories.UserFactory()
@@ -118,6 +183,78 @@ def test_api_documents_retrieve_authenticated_unrelated_public_or_authenticated(
) )
@pytest.mark.parametrize("reach", ["public", "authenticated"])
def test_api_documents_retrieve_authenticated_public_or_authenticated_parent(reach):
"""
Authenticated users should be allowed to retrieve a document who has a public or
authenticated ancestor.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
grand_parent = factories.DocumentFactory(link_reach=reach)
parent = factories.DocumentFactory(parent=grand_parent, link_reach="restricted")
document = factories.DocumentFactory(link_reach="restricted", parent=parent)
response = client.get(f"/api/v1.0/documents/{document.id!s}/")
assert response.status_code == 200
assert response.json() == {
"id": str(document.id),
"abilities": {
"accesses_manage": False,
"accesses_view": False,
"ai_transform": grand_parent.link_role == "editor",
"ai_translate": grand_parent.link_role == "editor",
"attachment_upload": grand_parent.link_role == "editor",
"collaboration_auth": True,
"destroy": False,
"favorite": True,
"invite_owner": False,
"link_configuration": False,
"media_auth": True,
"partial_update": grand_parent.link_role == "editor",
"retrieve": True,
"update": grand_parent.link_role == "editor",
"versions_destroy": False,
"versions_list": False,
"versions_retrieve": False,
},
"content": document.content,
"created_at": document.created_at.isoformat().replace("+00:00", "Z"),
"creator": str(document.creator.id),
"is_favorite": False,
"link_reach": document.link_reach,
"link_role": document.link_role,
"nb_accesses": 0,
"title": document.title,
"updated_at": document.updated_at.isoformat().replace("+00:00", "Z"),
}
@pytest.mark.parametrize("reach", ["public", "authenticated"])
def test_api_documents_retrieve_authenticated_public_or_authenticated_child(reach):
"""
Authenticated users having access to a document should not gain access to a parent document.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
document = factories.DocumentFactory(link_reach="restricted")
factories.DocumentFactory(link_reach=reach, parent=document)
response = client.get(f"/api/v1.0/documents/{document.id!s}/")
assert response.status_code == 403
assert response.json() == {
"detail": "You do not have permission to perform this action."
}
@pytest.mark.parametrize("reach", ["public", "authenticated"]) @pytest.mark.parametrize("reach", ["public", "authenticated"])
def test_api_documents_retrieve_authenticated_trace_twice(reach): def test_api_documents_retrieve_authenticated_trace_twice(reach):
""" """
@@ -180,9 +317,7 @@ def test_api_documents_retrieve_authenticated_related_direct():
document = factories.DocumentFactory() document = factories.DocumentFactory()
factories.UserDocumentAccessFactory(document=document, user=user) factories.UserDocumentAccessFactory(document=document, user=user)
access2 = factories.UserDocumentAccessFactory(document=document) factories.UserDocumentAccessFactory(document=document)
serializers.UserSerializer(instance=user)
serializers.UserSerializer(instance=access2.user)
response = client.get( response = client.get(
f"/api/v1.0/documents/{document.id!s}/", f"/api/v1.0/documents/{document.id!s}/",
@@ -203,6 +338,115 @@ def test_api_documents_retrieve_authenticated_related_direct():
} }
def test_api_documents_retrieve_authenticated_related_parent():
"""
Authenticated users should be allowed to retrieve a document if they are related
to one of its ancestors whatever the role.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
grand_parent = factories.DocumentFactory(link_reach="restricted")
parent = factories.DocumentFactory(parent=grand_parent, link_reach="restricted")
document = factories.DocumentFactory(parent=parent, link_reach="restricted")
access = factories.UserDocumentAccessFactory(document=grand_parent, user=user)
factories.UserDocumentAccessFactory(document=grand_parent)
response = client.get(
f"/api/v1.0/documents/{document.id!s}/",
)
assert response.status_code == 200
assert response.json() == {
"id": str(document.id),
"abilities": {
"accesses_manage": access.role in ["administrator", "owner"],
"accesses_view": True,
"ai_transform": access.role != "reader",
"ai_translate": access.role != "reader",
"attachment_upload": access.role != "reader",
"collaboration_auth": True,
"destroy": access.role == "owner",
"favorite": True,
"invite_owner": access.role == "owner",
"link_configuration": access.role in ["administrator", "owner"],
"media_auth": True,
"partial_update": access.role != "reader",
"retrieve": True,
"update": access.role != "reader",
"versions_destroy": access.role in ["administrator", "owner"],
"versions_list": True,
"versions_retrieve": True,
},
"content": document.content,
"creator": str(document.creator.id),
"created_at": document.created_at.isoformat().replace("+00:00", "Z"),
"is_favorite": False,
"link_reach": "restricted",
"link_role": document.link_role,
"nb_accesses": 2,
"title": document.title,
"updated_at": document.updated_at.isoformat().replace("+00:00", "Z"),
}
def test_api_documents_retrieve_authenticated_related_nb_accesses():
"""Validate computation of number of accesses."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
grand_parent = factories.DocumentFactory(link_reach="restricted")
parent = factories.DocumentFactory(parent=grand_parent, link_reach="restricted")
document = factories.DocumentFactory(parent=parent, link_reach="restricted")
factories.UserDocumentAccessFactory(document=grand_parent, user=user)
factories.UserDocumentAccessFactory(document=parent)
factories.UserDocumentAccessFactory(document=document)
response = client.get(
f"/api/v1.0/documents/{document.id!s}/",
)
assert response.status_code == 200
assert response.json()["nb_accesses"] == 3
factories.UserDocumentAccessFactory(document=grand_parent)
response = client.get(
f"/api/v1.0/documents/{document.id!s}/",
)
assert response.status_code == 200
assert response.json()["nb_accesses"] == 4
def test_api_documents_retrieve_authenticated_related_child():
"""
Authenticated users should not be allowed to retrieve a document as a result of being
related to one of its children.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
document = factories.DocumentFactory(link_reach="restricted")
child = factories.DocumentFactory(parent=document)
factories.UserDocumentAccessFactory(document=child, user=user)
factories.UserDocumentAccessFactory(document=document)
response = client.get(
f"/api/v1.0/documents/{document.id!s}/",
)
assert response.status_code == 403
assert response.json() == {
"detail": "You do not have permission to perform this action."
}
def test_api_documents_retrieve_authenticated_related_team_none(mock_user_teams): def test_api_documents_retrieve_authenticated_related_team_none(mock_user_teams):
""" """
Authenticated users should not be able to retrieve a restricted document related to Authenticated users should not be able to retrieve a restricted document related to

View File

@@ -16,6 +16,7 @@ from core.tests.conftest import TEAM, USER, VIA
pytestmark = pytest.mark.django_db pytestmark = pytest.mark.django_db
@pytest.mark.parametrize("via_parent", [True, False])
@pytest.mark.parametrize( @pytest.mark.parametrize(
"reach, role", "reach, role",
[ [
@@ -26,12 +27,18 @@ pytestmark = pytest.mark.django_db
("public", "reader"), ("public", "reader"),
], ],
) )
def test_api_documents_update_anonymous_forbidden(reach, role): def test_api_documents_update_anonymous_forbidden(reach, role, via_parent):
""" """
Anonymous users should not be allowed to update a document when link Anonymous users should not be allowed to update a document when link
configuration does not allow it. configuration does not allow it.
""" """
document = factories.DocumentFactory(link_reach=reach, link_role=role) if via_parent:
grand_parent = factories.DocumentFactory(link_reach=reach, link_role=role)
parent = factories.DocumentFactory(parent=grand_parent, link_reach="restricted")
document = factories.DocumentFactory(parent=parent, link_reach="restricted")
else:
document = factories.DocumentFactory(link_reach=reach, link_role=role)
old_document_values = serializers.DocumentSerializer(instance=document).data old_document_values = serializers.DocumentSerializer(instance=document).data
new_document_values = serializers.DocumentSerializer( new_document_values = serializers.DocumentSerializer(
@@ -52,6 +59,7 @@ def test_api_documents_update_anonymous_forbidden(reach, role):
assert document_values == old_document_values assert document_values == old_document_values
@pytest.mark.parametrize("via_parent", [True, False])
@pytest.mark.parametrize( @pytest.mark.parametrize(
"reach,role", "reach,role",
[ [
@@ -61,7 +69,9 @@ def test_api_documents_update_anonymous_forbidden(reach, role):
("restricted", "editor"), ("restricted", "editor"),
], ],
) )
def test_api_documents_update_authenticated_unrelated_forbidden(reach, role): def test_api_documents_update_authenticated_unrelated_forbidden(
reach, role, via_parent
):
""" """
Authenticated users should not be allowed to update a document to which Authenticated users should not be allowed to update a document to which
they are not related if the link configuration does not allow it. they are not related if the link configuration does not allow it.
@@ -71,7 +81,12 @@ def test_api_documents_update_authenticated_unrelated_forbidden(reach, role):
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
document = factories.DocumentFactory(link_reach=reach, link_role=role) if via_parent:
grand_parent = factories.DocumentFactory(link_reach=reach, link_role=role)
parent = factories.DocumentFactory(parent=grand_parent, link_reach="restricted")
document = factories.DocumentFactory(parent=parent, link_reach="restricted")
else:
document = factories.DocumentFactory(link_reach=reach, link_role=role)
old_document_values = serializers.DocumentSerializer(instance=document).data old_document_values = serializers.DocumentSerializer(instance=document).data
new_document_values = serializers.DocumentSerializer( new_document_values = serializers.DocumentSerializer(
@@ -93,6 +108,7 @@ def test_api_documents_update_authenticated_unrelated_forbidden(reach, role):
assert document_values == old_document_values assert document_values == old_document_values
@pytest.mark.parametrize("via_parent", [True, False])
@pytest.mark.parametrize( @pytest.mark.parametrize(
"is_authenticated,reach,role", "is_authenticated,reach,role",
[ [
@@ -102,10 +118,10 @@ def test_api_documents_update_authenticated_unrelated_forbidden(reach, role):
], ],
) )
def test_api_documents_update_anonymous_or_authenticated_unrelated( def test_api_documents_update_anonymous_or_authenticated_unrelated(
is_authenticated, reach, role is_authenticated, reach, role, via_parent
): ):
""" """
Authenticated users should be able to update a document to which Anonymous and authenticated users should be able to update a document to which
they are not related if the link configuration allows it. they are not related if the link configuration allows it.
""" """
client = APIClient() client = APIClient()
@@ -116,7 +132,12 @@ def test_api_documents_update_anonymous_or_authenticated_unrelated(
else: else:
user = AnonymousUser() user = AnonymousUser()
document = factories.DocumentFactory(link_reach=reach, link_role=role) if via_parent:
grand_parent = factories.DocumentFactory(link_reach=reach, link_role=role)
parent = factories.DocumentFactory(parent=grand_parent, link_reach="restricted")
document = factories.DocumentFactory(parent=parent, link_reach="restricted")
else:
document = factories.DocumentFactory(link_reach=reach, link_role=role)
old_document_values = serializers.DocumentSerializer(instance=document).data old_document_values = serializers.DocumentSerializer(instance=document).data
new_document_values = serializers.DocumentSerializer( new_document_values = serializers.DocumentSerializer(
@@ -147,24 +168,34 @@ def test_api_documents_update_anonymous_or_authenticated_unrelated(
assert value == new_document_values[key] assert value == new_document_values[key]
@pytest.mark.parametrize("via_parent", [True, False])
@pytest.mark.parametrize("via", VIA) @pytest.mark.parametrize("via", VIA)
def test_api_documents_update_authenticated_reader(via, mock_user_teams): def test_api_documents_update_authenticated_reader(via, via_parent, mock_user_teams):
""" """
Users who are reader of a document but not administrators should Users who are reader of a document should not be allowed to update it.
not be allowed to update it.
""" """
user = factories.UserFactory(with_owned_document=True) user = factories.UserFactory(with_owned_document=True)
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
document = factories.DocumentFactory(link_role="reader") if via_parent:
grand_parent = factories.DocumentFactory(link_reach="restricted")
parent = factories.DocumentFactory(parent=grand_parent, link_reach="restricted")
document = factories.DocumentFactory(parent=parent, link_reach="restricted")
access_document = grand_parent
else:
document = factories.DocumentFactory(link_reach="restricted")
access_document = document
if via == USER: if via == USER:
factories.UserDocumentAccessFactory(document=document, user=user, role="reader") factories.UserDocumentAccessFactory(
document=access_document, user=user, role="reader"
)
elif via == TEAM: elif via == TEAM:
mock_user_teams.return_value = ["lasuite", "unknown"] mock_user_teams.return_value = ["lasuite", "unknown"]
factories.TeamDocumentAccessFactory( factories.TeamDocumentAccessFactory(
document=document, team="lasuite", role="reader" document=access_document, team="lasuite", role="reader"
) )
old_document_values = serializers.DocumentSerializer(instance=document).data old_document_values = serializers.DocumentSerializer(instance=document).data
@@ -188,10 +219,11 @@ def test_api_documents_update_authenticated_reader(via, mock_user_teams):
assert document_values == old_document_values assert document_values == old_document_values
@pytest.mark.parametrize("via_parent", [True, False])
@pytest.mark.parametrize("role", ["editor", "administrator", "owner"]) @pytest.mark.parametrize("role", ["editor", "administrator", "owner"])
@pytest.mark.parametrize("via", VIA) @pytest.mark.parametrize("via", VIA)
def test_api_documents_update_authenticated_editor_administrator_or_owner( def test_api_documents_update_authenticated_editor_administrator_or_owner(
via, role, mock_user_teams via, role, via_parent, mock_user_teams
): ):
"""A user who is editor, administrator or owner of a document should be allowed to update it.""" """A user who is editor, administrator or owner of a document should be allowed to update it."""
user = factories.UserFactory(with_owned_document=True) user = factories.UserFactory(with_owned_document=True)
@@ -199,13 +231,23 @@ def test_api_documents_update_authenticated_editor_administrator_or_owner(
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
document = factories.DocumentFactory() if via_parent:
grand_parent = factories.DocumentFactory(link_reach="restricted")
parent = factories.DocumentFactory(parent=grand_parent, link_reach="restricted")
document = factories.DocumentFactory(parent=parent, link_reach="restricted")
access_document = grand_parent
else:
document = factories.DocumentFactory(link_reach="restricted")
access_document = document
if via == USER: if via == USER:
factories.UserDocumentAccessFactory(document=document, user=user, role=role) factories.UserDocumentAccessFactory(
document=access_document, user=user, role=role
)
elif via == TEAM: elif via == TEAM:
mock_user_teams.return_value = ["lasuite", "unknown"] mock_user_teams.return_value = ["lasuite", "unknown"]
factories.TeamDocumentAccessFactory( factories.TeamDocumentAccessFactory(
document=document, team="lasuite", role=role document=access_document, team="lasuite", role=role
) )
old_document_values = serializers.DocumentSerializer(instance=document).data old_document_values = serializers.DocumentSerializer(instance=document).data
@@ -238,52 +280,6 @@ def test_api_documents_update_authenticated_editor_administrator_or_owner(
assert value == new_document_values[key] assert value == new_document_values[key]
@pytest.mark.parametrize("via", VIA)
def test_api_documents_update_authenticated_owners(via, mock_user_teams):
"""Administrators of a document should be allowed to update it."""
user = factories.UserFactory(with_owned_document=True)
client = APIClient()
client.force_login(user)
document = factories.DocumentFactory()
if via == USER:
factories.UserDocumentAccessFactory(document=document, user=user, role="owner")
elif via == TEAM:
mock_user_teams.return_value = ["lasuite", "unknown"]
factories.TeamDocumentAccessFactory(
document=document, team="lasuite", role="owner"
)
old_document_values = serializers.DocumentSerializer(instance=document).data
new_document_values = serializers.DocumentSerializer(
instance=factories.DocumentFactory()
).data
response = client.put(
f"/api/v1.0/documents/{document.id!s}/", new_document_values, format="json"
)
assert response.status_code == 200
document = models.Document.objects.get(pk=document.pk)
document_values = serializers.DocumentSerializer(instance=document).data
for key, value in document_values.items():
if key in [
"id",
"created_at",
"creator",
"link_reach",
"link_role",
"nb_accesses",
]:
assert value == old_document_values[key]
elif key == "updated_at":
assert value > old_document_values[key]
else:
assert value == new_document_values[key]
@pytest.mark.parametrize("via", VIA) @pytest.mark.parametrize("via", VIA)
def test_api_documents_update_administrator_or_owner_of_another(via, mock_user_teams): def test_api_documents_update_administrator_or_owner_of_another(via, mock_user_teams):
""" """