(backend) add soft delete to documents and refactor db queryset

Now that we have introduced a document tree structure, it is not
possible to allow deleting documents anymore as it impacts the whole
subtree below the deleted document and the consequences are too big.

We introduce soft delete in order to give a second thought to the
document's owner (who is the only one to be allowed to delete a
document). After a document is soft deleted, the owner can still
see it in the trashbin (/api/v1.0/documents/trashbin).
After a grace period (30 days be default) the document disappears
from the trashbin and can't be restored anymore. Note that even
then it is still kept in database. Cleaning the database to erase
deleted documents after the grace period can be done as a maintenance
script.
This commit is contained in:
Samuel Paccoud - DINUM
2025-01-02 17:20:09 +01:00
committed by Anthony LC
parent 4de03d292a
commit 8ccfdb3c6a
21 changed files with 1373 additions and 252 deletions

View File

@@ -11,6 +11,7 @@ and this project adheres to
## Added
- ✨(backend) add soft delete API endpoint to documents #516
- ✨(backend) allow organizing documents in a tree structure #516
- ✨(backend) add "excerpt" field to document list serializer #516
- ✨(backend) add github actions to manage Crowdin workflow #559 & #563

View File

@@ -24,7 +24,7 @@ class DocumentFilter(django_filters.FilterSet):
class Meta:
model = models.Document
fields = ["is_creator_me", "is_favorite", "link_reach", "title"]
fields = ["is_creator_me", "is_favorite", "title"]
# pylint: disable=unused-argument
def filter_is_creator_me(self, queryset, name, value):
@@ -63,7 +63,4 @@ class DocumentFilter(django_filters.FilterSet):
if not user.is_authenticated:
return queryset
if value:
return queryset.filter(favorited_by_users__user=user)
return queryset.exclude(favorited_by_users__user=user)
return queryset.filter(is_favorite=bool(value))

View File

@@ -2,10 +2,11 @@
from django.core import exceptions
from django.db.models import Q
from django.http import Http404
from rest_framework import permissions
from core.models import DocumentAccess, RoleChoices
from core.models import DocumentAccess, RoleChoices, get_trashbin_cutoff
ACTION_FOR_METHOD_TO_PERMISSION = {
"versions_detail": {"DELETE": "versions_destroy", "GET": "versions_retrieve"},
@@ -110,3 +111,26 @@ class AccessPermission(permissions.BasePermission):
except KeyError:
pass
return abilities.get(action, False)
class DocumentAccessPermission(AccessPermission):
"""Subclass to handle soft deletion specificities."""
def has_object_permission(self, request, view, obj):
"""
Return a 404 on deleted documents
- for which the trashbin cutoff is past
- for which the current user is not owner of the document or one of its ancestors
"""
if (
deleted_at := obj.ancestors_deleted_at
) and deleted_at < get_trashbin_cutoff():
raise Http404
# Compute permission first to ensure the "user_roles" attribute is set
has_permission = super().has_object_permission(request, view, obj)
if obj.ancestors_deleted_at and not RoleChoices.OWNER in obj.user_roles:
raise Http404
return has_permission

View File

@@ -11,6 +11,29 @@ import botocore
from rest_framework.throttling import BaseThrottle
def filter_root_paths(paths, skip_sorting=False):
"""
Filters root paths from a list of paths representing a tree structure.
A root path is defined as a path that is not a prefix of any other path.
Args:
paths (list of str): The list of paths.
Returns:
list of str: The filtered list of root paths.
"""
if not skip_sorting:
paths.sort()
root_paths = []
for path in paths:
# If the current path is not a prefix of the last added root path, add it
if not root_paths or not path.startswith(root_paths[-1]):
root_paths.append(path)
return root_paths
def generate_s3_authorization_headers(key):
"""
Generate authorization headers for an s3 object.

View File

@@ -8,6 +8,7 @@ from urllib.parse import urlparse
from django.conf import settings
from django.contrib.postgres.aggregates import ArrayAgg
from django.contrib.postgres.fields import ArrayField
from django.contrib.postgres.search import TrigramSimilarity
from django.core.exceptions import ValidationError
from django.core.files.storage import default_storage
@@ -29,7 +30,7 @@ from django.http import Http404
import rest_framework as drf
from botocore.exceptions import ClientError
from django_filters import rest_framework as drf_filters
from rest_framework import filters, status
from rest_framework import filters, status, viewsets
from rest_framework import response as drf_response
from rest_framework.permissions import AllowAny
@@ -56,7 +57,7 @@ COLLABORATION_WS_URL_PATTERN = re.compile(rf"(?:^|&)room=(?P<pk>{UUID_REGEX})(?:
# pylint: disable=too-many-ancestors
class NestedGenericViewSet(drf.viewsets.GenericViewSet):
class NestedGenericViewSet(viewsets.GenericViewSet):
"""
A generic Viewset aims to be used in a nested route context.
e.g: `/api/v1.0/resource_1/<resource_1_pk>/resource_2/<resource_2_pk>/`
@@ -137,7 +138,7 @@ class Pagination(drf.pagination.PageNumberPagination):
class UserViewSet(
drf.mixins.UpdateModelMixin, drf.viewsets.GenericViewSet, drf.mixins.ListModelMixin
drf.mixins.UpdateModelMixin, viewsets.GenericViewSet, drf.mixins.ListModelMixin
):
"""User ViewSet"""
@@ -315,118 +316,242 @@ class DocumentMetadata(drf.metadata.SimpleMetadata):
class DocumentViewSet(
drf.mixins.CreateModelMixin,
drf.mixins.DestroyModelMixin,
drf.mixins.ListModelMixin,
drf.mixins.UpdateModelMixin,
drf.viewsets.GenericViewSet,
viewsets.GenericViewSet,
):
"""
Document ViewSet for managing documents.
DocumentViewSet API.
Provides endpoints for creating, updating, and deleting documents,
along with filtering options.
This view set provides CRUD operations and additional actions for managing documents.
Supports filtering, ordering, and annotations for enhanced querying capabilities.
Filtering:
### API Endpoints:
1. **List**: Retrieve a paginated list of documents.
Example: GET /documents/?page=2
2. **Retrieve**: Get a specific document by its ID.
Example: GET /documents/{id}/
3. **Create**: Create a new document.
Example: POST /documents/
4. **Update**: Update a document by its ID.
Example: PUT /documents/{id}/
5. **Delete**: Soft delete a document by its ID.
Example: DELETE /documents/{id}/
### Additional Actions:
1. **Trashbin**: List soft deleted documents for a document owner
Example: GET /documents/{id}/trashbin/
2. **Children**: List or create child documents.
Example: GET, POST /documents/{id}/children/
3. **Versions List**: Retrieve version history of a document.
Example: GET /documents/{id}/versions/
4. **Version Detail**: Get or delete a specific document version.
Example: GET, DELETE /documents/{id}/versions/{version_id}/
5. **Favorite**: Get list of favorite documents for a user. Mark or unmark
a document as favorite.
Examples:
- GET /documents/favorite/
- POST, DELETE /documents/{id}/favorite/
6. **Create for Owner**: Create a document via server-to-server on behalf of a user.
Example: POST /documents/create-for-owner/
7. **Link Configuration**: Update document link configuration.
Example: PUT /documents/{id}/link-configuration/
8. **Attachment Upload**: Upload a file attachment for the document.
Example: POST /documents/{id}/attachment-upload/
9. **Media Auth**: Authorize access to document media.
Example: GET /documents/media-auth/
10. **Collaboration Auth**: Authorize access to the collaboration server for a document.
Example: GET /documents/collaboration-auth/
11. **AI Transform**: Apply a transformation action on a piece of text with AI.
Example: POST /documents/{id}/ai-transform/
Expected data:
- text (str): The input text.
- action (str): The transformation type, one of [prompt, correct, rephrase, summarize].
Returns: JSON response with the processed text.
Throttled by: AIDocumentRateThrottle, AIUserRateThrottle.
12. **AI Translate**: Translate a piece of text with AI.
Example: POST /documents/{id}/ai-translate/
Expected data:
- text (str): The input text.
- language (str): The target language, chosen from settings.LANGUAGES.
Returns: JSON response with the translated text.
Throttled by: AIDocumentRateThrottle, AIUserRateThrottle.
### Ordering: created_at, updated_at, is_favorite, title
Example:
- Ascending: GET /api/v1.0/documents/?ordering=created_at
- Desceding: GET /api/v1.0/documents/?ordering=-title
### Filtering:
- `is_creator_me=true`: Returns documents created by the current user.
- `is_creator_me=false`: Returns documents created by other users.
- `is_favorite=true`: Returns documents marked as favorite by the current user
- `is_favorite=false`: Returns documents not marked as favorite by the current user
- `title=hello`: Returns documents which title contains the "hello" string
Example Usage:
Example:
- GET /api/v1.0/documents/?is_creator_me=true&is_favorite=true
- GET /api/v1.0/documents/?is_creator_me=false&title=hello
### Annotations:
1. **is_favorite**: Indicates whether the document is marked as favorite by the current user.
2. **user_roles**: Roles the current user has on the document or its ancestors.
### Notes:
- Only the highest ancestor in a document hierarchy is shown in list views.
- Implements soft delete logic to retain document tree structures.
"""
filter_backends = [drf_filters.DjangoFilterBackend, filters.OrderingFilter]
filter_backends = [drf_filters.DjangoFilterBackend]
filterset_class = DocumentFilter
metadata_class = DocumentMetadata
ordering = ["-updated_at"]
ordering_fields = ["created_at", "is_favorite", "updated_at", "title"]
ordering_fields = ["created_at", "updated_at", "title"]
permission_classes = [
permissions.AccessPermission,
permissions.DocumentAccessPermission,
]
queryset = models.Document.objects.all()
serializer_class = serializers.DocumentSerializer
def get_serializer_class(self):
"""
Use ListDocumentSerializer for list actions, otherwise use DocumentSerializer.
Use ListDocumentSerializer for list actions; otherwise, use DocumentSerializer.
"""
if self.action == "list":
return serializers.ListDocumentSerializer
return self.serializer_class
def annotate_queryset(self, queryset):
"""Annotate document queryset with favorite and number of accesses."""
user = self.request.user
# Annotate the number of accesses taking into account ancestors
ancestor_accesses_query = (
models.DocumentAccess.objects.filter(
document__path=Left(OuterRef("path"), Length("document__path")),
)
.order_by()
.annotate(total_accesses=Func(Value("id"), function="COUNT"))
.values("total_accesses")
return (
serializers.ListDocumentSerializer
if self.action == "list"
else self.serializer_class
)
# Annotate with the number of accesses, default to 0 if no accesses exist
queryset = queryset.annotate(nb_accesses=Subquery(ancestor_accesses_query))
if not user.is_authenticated:
# If the user is not authenticated, annotate `is_favorite` as False
return queryset.annotate(is_favorite=Value(False))
# Annotate the queryset to indicate if the document is favorited by the current user
favorite_exists = models.DocumentFavorite.objects.filter(
document_id=OuterRef("pk"), user=user
)
return queryset.annotate(is_favorite=Exists(favorite_exists))
def get_queryset(self):
"""Optimize queryset to include favorite status for the current user."""
queryset = super().get_queryset()
queryset = self.annotate_queryset(queryset)
return queryset.distinct()
def list(self, request, *args, **kwargs):
"""Restrict resources returned by the list endpoint"""
queryset = self.filter_queryset(self.get_queryset())
def annotate_is_favorite(self, queryset):
"""
Annotate document queryset with the favorite status for the current user.
"""
user = self.request.user
if user.is_authenticated:
queryset = queryset.filter(
db.Q(accesses__user=user)
| db.Q(accesses__team__in=user.teams)
| (
db.Q(link_traces__user=user)
& ~db.Q(link_reach=models.LinkReachChoices.RESTRICTED)
favorite_exists_subquery = models.DocumentFavorite.objects.filter(
document_id=db.OuterRef("pk"), user=user
)
return queryset.annotate(is_favorite=db.Exists(favorite_exists_subquery))
return queryset.annotate(is_favorite=db.Value(False))
def annotate_user_roles(self, queryset):
"""
Annotate document queryset with the roles of the current user
on the document or its ancestors.
"""
user = self.request.user
output_field = ArrayField(base_field=db.CharField())
if user.is_authenticated:
user_roles_subquery = models.DocumentAccess.objects.filter(
db.Q(user=user) | db.Q(team__in=user.teams),
document__path=Left(db.OuterRef("path"), Length("document__path")),
).values_list("role", flat=True)
return queryset.annotate(
user_roles=db.Func(
user_roles_subquery, function="ARRAY", output_field=output_field
)
)
# Among the results, we may have documents that are ancestors/children of each other
# In this case we want to keep only the highest ancestor. Let's annotate, each document
# with the path of its highest ancestor within results so we can use it to filter
shortest_path = Subquery(
queryset.filter(path=Left(OuterRef("path"), Length("path")))
.order_by("path") # Get the shortest (root) path
.values("path")[:1]
)
queryset = queryset.annotate(root_path=shortest_path)
return queryset.annotate(
user_roles=db.Value([], output_field=output_field),
)
# Filter documents based on their shortest path (root path)
queryset = queryset.filter(
root_path=F(
"path"
) # Keep only documents who are the annotated highest ancestor
def get_queryset(self):
"""Get queryset performing all annotation and filtering on the document tree structure."""
user = self.request.user
queryset = super().get_queryset()
# Only list views need filtering and annotation
if self.detail:
return queryset
if not user.is_authenticated:
return queryset.none()
queryset = queryset.filter(ancestors_deleted_at__isnull=True)
# Filter documents to which the current user has access...
access_documents_ids = models.DocumentAccess.objects.filter(
db.Q(user=user) | db.Q(team__in=user.teams)
).values_list("document_id", flat=True)
# ...or that were previously accessed and are not restricted
traced_documents_ids = models.LinkTrace.objects.filter(user=user).values_list(
"document_id", flat=True
)
return queryset.filter(
db.Q(id__in=access_documents_ids)
| (
db.Q(id__in=traced_documents_ids)
& ~db.Q(link_reach=models.LinkReachChoices.RESTRICTED)
)
)
def filter_queryset(self, queryset):
"""Apply annotations and filters sequentially."""
filterset = DocumentFilter(
self.request.GET, queryset=queryset, request=self.request
)
filterset.is_valid()
filter_data = filterset.form.cleaned_data
# Filter as early as possible on fields that are available on the model
for field in ["is_creator_me", "title"]:
queryset = filterset.filters[field].filter(queryset, filter_data[field])
queryset = self.annotate_user_roles(queryset)
if self.action == "list":
# Among the results, we may have documents that are ancestors/descendants
# of each other. In this case we want to keep only the highest ancestors.
root_paths = utils.filter_root_paths(
queryset.order_by("path").values_list("path", flat=True),
skip_sorting=True,
)
queryset = queryset.filter(path__in=root_paths)
# Annotate the queryset with an attribute marking instances as highest ancestor
# in order to save some time while computing abilities in the instance
queryset = queryset.annotate(
is_highest_ancestor_for_user=db.Value(
True, output_field=db.BooleanField()
)
)
else:
queryset = queryset.none()
# Annotate favorite status and filter if applicable as late as possible
queryset = self.annotate_is_favorite(queryset)
queryset = filterset.filters["is_favorite"].filter(
queryset, filter_data["is_favorite"]
)
# Apply ordering only now that everyting is filtered and annotated
return filters.OrderingFilter().filter_queryset(self.request, queryset, self)
def get_response_for_queryset(self, queryset):
"""Return paginated response for the queryset if requested."""
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
result = self.get_paginated_response(serializer.data)
return result
serializer = self.get_serializer(queryset, many=True)
return drf.response.Response(serializer.data)
@@ -437,20 +562,18 @@ class DocumentViewSet(
on a user's list view even though the user has no specific role in the document (link
access when the link reach configuration of the document allows it).
"""
user = self.request.user
instance = self.get_object()
serializer = self.get_serializer(instance)
if self.request.user.is_authenticated:
try:
# Add a trace that the user visited the document (this is needed to include
# the document in the user's list view)
models.LinkTrace.objects.create(
document=instance,
user=self.request.user,
)
except ValidationError:
# The trace already exists, so we just pass without doing anything
pass
# The `create` query generates 5 db queries which are much less efficient than an
# `exists` query. The user will visit the document many times after the first visit
# so that's what we should optimize for.
if (
user.is_authenticated
and not instance.link_traces.filter(user=user).exists()
):
models.LinkTrace.objects.create(document=instance, user=request.user)
return drf.response.Response(serializer.data)
@@ -467,6 +590,47 @@ class DocumentViewSet(
role=models.RoleChoices.OWNER,
)
def perform_destroy(self, instance):
"""Override to implement a soft delete instead of dumping the record in database."""
instance.soft_delete()
@drf.decorators.action(
detail=False,
methods=["get"],
)
def favorite_list(self, request, *args, **kwargs):
"""Get list of favorite documents for the current user."""
user = request.user
favorite_documents_ids = models.DocumentFavorite.objects.filter(
user=user
).values_list("document_id", flat=True)
queryset = self.get_queryset()
queryset = queryset.filter(id__in=favorite_documents_ids)
return self.get_response_for_queryset(queryset)
@drf.decorators.action(
detail=False,
methods=["get"],
serializer_class=serializers.ListDocumentSerializer,
)
def trashbin(self, request, *args, **kwargs):
"""
Retrieve soft-deleted documents for which the current user has the owner role.
The selected documents are those deleted within the cutoff period defined in the
settings (see TRASHBIN_CUTOFF_DAYS), before they are considered permanently deleted.
"""
queryset = self.queryset.filter(
deleted_at__isnull=False,
deleted_at__gte=models.get_trashbin_cutoff(),
)
queryset = self.annotate_user_roles(queryset)
queryset = queryset.filter(user_roles__contains=[models.RoleChoices.OWNER])
return self.get_response_for_queryset(queryset)
@drf.decorators.action(
authentication_classes=[authentication.ServerToServerAuthentication],
detail=False,
@@ -553,6 +717,7 @@ class DocumentViewSet(
@drf.decorators.action(
detail=True,
methods=["get", "post"],
ordering=["path"],
serializer_class=serializers.ListDocumentSerializer,
url_path="children",
)
@@ -586,16 +751,11 @@ class DocumentViewSet(
)
# GET: List children
queryset = document.get_children()
queryset = self.annotate_queryset(queryset)
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return drf.response.Response(serializer.data)
queryset = document.get_children().filter(deleted_at__isnull=True)
queryset = self.filter_queryset(queryset)
queryset = self.annotate_is_favorite(queryset)
queryset = self.annotate_user_roles(queryset)
return self.get_response_for_queryset(queryset)
@drf.decorators.action(detail=True, methods=["get"], url_path="versions")
def versions_list(self, request, *args, **kwargs):
@@ -617,7 +777,7 @@ class DocumentViewSet(
# document. Filter to get the minimum access date for the logged-in user
access_queryset = models.DocumentAccess.objects.filter(
db.Q(user=user) | db.Q(team__in=user.teams),
document__path=Left(Value(document.path), Length("document__path")),
document__path=Left(db.Value(document.path), Length("document__path")),
).aggregate(min_date=db.Min("created_at"))
# Handle the case where the user has no accesses
@@ -657,7 +817,7 @@ class DocumentViewSet(
access.created_at
for access in models.DocumentAccess.objects.filter(
db.Q(user=user) | db.Q(team__in=user.teams),
document__path=Left(Value(document.path), Length("document__path")),
document__path=Left(db.Value(document.path), Length("document__path")),
)
)
@@ -948,7 +1108,7 @@ class DocumentAccessViewSet(
drf.mixins.ListModelMixin,
drf.mixins.RetrieveModelMixin,
drf.mixins.UpdateModelMixin,
drf.viewsets.GenericViewSet,
viewsets.GenericViewSet,
):
"""
API ViewSet for all interactions with document accesses.
@@ -1021,7 +1181,7 @@ class TemplateViewSet(
drf.mixins.DestroyModelMixin,
drf.mixins.RetrieveModelMixin,
drf.mixins.UpdateModelMixin,
drf.viewsets.GenericViewSet,
viewsets.GenericViewSet,
):
"""Template ViewSet"""
@@ -1045,14 +1205,14 @@ class TemplateViewSet(
user_roles_query = (
models.TemplateAccess.objects.filter(
Q(user=user) | Q(team__in=user.teams),
template_id=OuterRef("pk"),
db.Q(user=user) | db.Q(team__in=user.teams),
template_id=db.OuterRef("pk"),
)
.values("template")
.annotate(roles_array=ArrayAgg("role"))
.values("roles_array")
)
return queryset.annotate(user_roles=Subquery(user_roles_query)).distinct()
return queryset.annotate(user_roles=db.Subquery(user_roles_query)).distinct()
def list(self, request, *args, **kwargs):
"""Restrict templates returned by the list endpoint"""
@@ -1092,7 +1252,7 @@ class TemplateAccessViewSet(
drf.mixins.ListModelMixin,
drf.mixins.RetrieveModelMixin,
drf.mixins.UpdateModelMixin,
drf.viewsets.GenericViewSet,
viewsets.GenericViewSet,
):
"""
API ViewSet for all interactions with template accesses.
@@ -1132,7 +1292,7 @@ class InvitationViewset(
drf.mixins.RetrieveModelMixin,
drf.mixins.DestroyModelMixin,
drf.mixins.UpdateModelMixin,
drf.viewsets.GenericViewSet,
viewsets.GenericViewSet,
):
"""API ViewSet for user invitations to document.

View File

@@ -77,6 +77,7 @@ class DocumentFactory(factory.django.DjangoModelFactory):
excerpt = factory.Sequence(lambda n: f"excerpt{n}")
content = factory.Sequence(lambda n: f"content{n}")
creator = factory.SubFactory(UserFactory)
deleted_at = None
link_reach = factory.fuzzy.FuzzyChoice(
[a[0] for a in models.LinkReachChoices.choices]
)
@@ -94,11 +95,19 @@ class DocumentFactory(factory.django.DjangoModelFactory):
if parent:
# Add as a child node
kwargs["ancestors_deleted_at"] = (
kwargs.get("ancestors_deleted_at") or parent.ancestors_deleted_at
)
return parent.add_child(instance=model_class(**kwargs))
# Add as a root node
return model_class.add_root(instance=model_class(**kwargs))
@factory.lazy_attribute
def ancestors_deleted_at(self):
"""Should always be set when "deleted_at" is set."""
return self.deleted_at
@factory.post_generation
def users(self, create, extracted, **kwargs):
"""Add users to document from a given list of users with or without roles."""
@@ -109,6 +118,16 @@ class DocumentFactory(factory.django.DjangoModelFactory):
else:
UserDocumentAccessFactory(document=self, user=item[0], role=item[1])
@factory.post_generation
def teams(self, create, extracted, **kwargs):
"""Add teams to document from a given list of teams with or without roles."""
if create and extracted:
for item in extracted:
if isinstance(item, str):
TeamDocumentAccessFactory(document=self, team=item)
else:
TeamDocumentAccessFactory(document=self, team=item[0], role=item[1])
@factory.post_generation
def link_traces(self, create, extracted, **kwargs):
"""Add link traces to document from a given list of users."""

View File

@@ -6,7 +6,7 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0014_set_path_on_existing_documents'),
('core', '0015_set_path_on_existing_documents'),
]
operations = [

View File

@@ -0,0 +1,36 @@
# Generated by Django 5.1.4 on 2025-01-12 14:27
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0016_add_document_excerpt'),
]
operations = [
migrations.AlterModelOptions(
name='document',
options={'ordering': ('path',), 'verbose_name': 'Document', 'verbose_name_plural': 'Documents'},
),
migrations.AddField(
model_name='document',
name='ancestors_deleted_at',
field=models.DateTimeField(blank=True, null=True),
),
migrations.AddField(
model_name='document',
name='deleted_at',
field=models.DateTimeField(blank=True, null=True),
),
migrations.AlterField(
model_name='user',
name='language',
field=models.CharField(choices="(('en-us', 'English'), ('fr-fr', 'French'), ('de-de', 'German'))", default='en-us', help_text='The language in which the user wants to see the interface.', max_length=10, verbose_name='language'),
),
migrations.AddConstraint(
model_name='document',
constraint=models.CheckConstraint(condition=models.Q(('deleted_at__isnull', True), ('deleted_at', models.F('ancestors_deleted_at')), _connector='OR'), name='check_deleted_at_matches_ancestors_deleted_at_when_set'),
),
]

View File

@@ -13,15 +13,13 @@ from django.conf import settings
from django.contrib.auth import models as auth_models
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.sites.models import Site
from django.core import exceptions, mail, validators
from django.core import mail, validators
from django.core.cache import cache
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
from django.core.mail import send_mail
from django.db import models
from django.db import models, transaction
from django.db.models.functions import Left, Length
from django.http import FileResponse
from django.template.base import Template as DjangoTemplate
from django.template.context import Context
from django.template.loader import render_to_string
from django.utils import timezone
from django.utils.functional import cached_property, lazy
@@ -29,12 +27,27 @@ from django.utils.translation import get_language, override
from django.utils.translation import gettext_lazy as _
from botocore.exceptions import ClientError
from rest_framework.exceptions import ValidationError
from timezone_field import TimeZoneField
from treebeard.mp_tree import MP_Node
logger = getLogger(__name__)
def get_trashbin_cutoff():
"""
Calculate the cutoff datetime for soft-deleted items based on the retention policy.
The function returns the current datetime minus the number of days specified in
the TRASHBIN_CUTOFF_DAYS setting, indicating the oldest date for items that can
remain in the trash bin.
Returns:
datetime: The cutoff datetime for soft-deleted items.
"""
return timezone.now() - timedelta(days=settings.TRASHBIN_CUTOFF_DAYS)
class LinkRoleChoices(models.TextChoices):
"""Defines the possible roles a link can offer on a document."""
@@ -374,6 +387,8 @@ class Document(MP_Node, BaseModel):
blank=True,
null=True,
)
deleted_at = models.DateTimeField(null=True, blank=True)
ancestors_deleted_at = models.DateTimeField(null=True, blank=True)
_content = None
@@ -389,6 +404,15 @@ class Document(MP_Node, BaseModel):
ordering = ("path",)
verbose_name = _("Document")
verbose_name_plural = _("Documents")
constraints = [
models.CheckConstraint(
check=(
models.Q(deleted_at__isnull=True)
| models.Q(deleted_at=models.F("ancestors_deleted_at"))
),
name="check_deleted_at_matches_ancestors_deleted_at_when_set",
),
]
def __str__(self):
return str(self.title) if self.title else str(_("Untitled Document"))
@@ -527,6 +551,32 @@ class Document(MP_Node, BaseModel):
Bucket=default_storage.bucket_name, Key=self.file_key, VersionId=version_id
)
def get_nb_accesses_cache_key(self):
"""Generate a unique cache key for each document."""
return f"document_{self.id!s}_nb_accesses"
@property
def nb_accesses(self):
"""Calculate the number of accesses."""
cache_key = self.get_nb_accesses_cache_key()
nb_accesses = cache.get(cache_key)
if nb_accesses is None:
nb_accesses = DocumentAccess.objects.filter(
document__path=Left(models.Value(self.path), Length("document__path")),
).count()
cache.set(cache_key, nb_accesses)
return nb_accesses
def invalidate_nb_accesses_cache(self):
"""
Invalidate the cache for number of accesses, including on affected descendants.
"""
for document in Document.objects.filter(path__startswith=self.path).only("id"):
cache_key = document.get_nb_accesses_cache_key()
cache.delete(cache_key)
def get_roles(self, user):
"""Return the roles a user has on a document."""
if not user.is_authenticated:
@@ -546,60 +596,78 @@ class Document(MP_Node, BaseModel):
roles = []
return roles
@cached_property
def links_definitions(self):
"""Get links reach/role definitions for the current document and its ancestors."""
links_definitions = {self.link_reach: {self.link_role}}
# Ancestors links definitions are only interesting if the document is not the highest
# ancestor to which the current user has access. Look for the annotation:
if self.depth > 1 and not getattr(self, "is_highest_ancestor_for_user", False):
for ancestor in self.get_ancestors().values("link_reach", "link_role"):
links_definitions.setdefault(ancestor["link_reach"], set()).add(
ancestor["link_role"]
)
return links_definitions
def get_abilities(self, user):
"""
Compute and return abilities for a given user on the document.
"""
roles = set(self.get_roles(user))
roles = set(
self.get_roles(user)
) # at this point only roles based on specific access
# Compute version roles before adding link roles because we don't
# Characteristics that are based only on specific access
is_owner = RoleChoices.OWNER in roles
is_deleted = self.ancestors_deleted_at and not is_owner
is_owner_or_admin = (is_owner or RoleChoices.ADMIN in roles) and not is_deleted
# Compute access roles before adding link roles because we don't
# want anonymous users to access versions (we wouldn't know from
# which date to allow them anyway)
# Anonymous users should also not see document accesses
has_role = bool(roles)
has_access_role = bool(roles) and not is_deleted
# Add roles provided by the document link, taking into account its ancestors
link_reaches = list(self.get_ancestors().values("link_reach", "link_role"))
link_reaches.append(
{"link_reach": self.link_reach, "link_role": self.link_role}
# Add roles provided by the document link
links_definitions = self.links_definitions
public_roles = links_definitions.get(LinkReachChoices.PUBLIC, set())
authenticated_roles = (
links_definitions.get(LinkReachChoices.AUTHENTICATED, set())
if user.is_authenticated
else set()
)
roles = roles | public_roles | authenticated_roles
for lr in link_reaches:
if lr["link_reach"] == LinkReachChoices.PUBLIC:
roles.add(lr["link_role"])
if user.is_authenticated:
for lr in link_reaches:
if lr["link_reach"] == LinkReachChoices.AUTHENTICATED:
roles.add(lr["link_role"])
is_owner_or_admin = bool(
roles.intersection({RoleChoices.OWNER, RoleChoices.ADMIN})
)
can_get = bool(roles)
can_update = is_owner_or_admin or RoleChoices.EDITOR in roles
can_get = bool(roles) and not is_deleted
can_update = (
is_owner_or_admin or RoleChoices.EDITOR in roles
) and not is_deleted
return {
"accesses_manage": is_owner_or_admin,
"accesses_view": has_role,
"accesses_view": has_access_role,
"ai_transform": can_update,
"ai_translate": can_update,
"attachment_upload": can_update,
"children_list": can_get,
"children_create": can_update and user.is_authenticated,
"collaboration_auth": can_get,
"destroy": RoleChoices.OWNER in roles,
"destroy": is_owner,
"favorite": can_get and user.is_authenticated,
"link_configuration": is_owner_or_admin,
"invite_owner": RoleChoices.OWNER in roles,
"move": is_owner_or_admin,
"invite_owner": is_owner,
"move": is_owner_or_admin and not self.ancestors_deleted_at,
"partial_update": can_update,
"retrieve": can_get,
"media_auth": can_get,
"update": can_update,
"versions_destroy": is_owner_or_admin,
"versions_list": has_role,
"versions_retrieve": has_role,
"versions_list": has_access_role,
"versions_retrieve": has_access_role,
}
def send_email(self, subject, emails, context=None, language=None):
@@ -660,6 +728,31 @@ class Document(MP_Node, BaseModel):
self.send_email(subject, [email], context, language)
@transaction.atomic
def soft_delete(self):
"""
Soft delete the document, marking the deletion on descendants.
We still keep the .delete() method untouched for programmatic purposes.
"""
if self.deleted_at or self.ancestors_deleted_at:
raise RuntimeError(
"This document is already deleted or has deleted ancestors."
)
# Check if any ancestors are deleted
if self.get_ancestors().filter(deleted_at__isnull=False).exists():
raise RuntimeError(
"Cannot delete this document because one or more ancestors are already deleted."
)
self.ancestors_deleted_at = self.deleted_at = timezone.now()
self.save()
# Mark all descendants as soft deleted
self.get_descendants().filter(ancestors_deleted_at__isnull=True).update(
ancestors_deleted_at=self.ancestors_deleted_at
)
class LinkTrace(BaseModel):
"""
@@ -762,6 +855,16 @@ class DocumentAccess(BaseAccess):
def __str__(self):
return f"{self.user!s} is {self.role:s} in document {self.document!s}"
def save(self, *args, **kwargs):
"""Override save to clear the document's cache for number of accesses."""
super().save(*args, **kwargs)
self.document.invalidate_nb_accesses_cache()
def delete(self, *args, **kwargs):
"""Override delete to clear the document's cache for number of accesses."""
super().delete(*args, **kwargs)
self.document.invalidate_nb_accesses_cache()
def get_abilities(self, user):
"""
Compute and return abilities for a given user on the document access.
@@ -792,7 +895,7 @@ class Template(BaseModel):
return self.title
def get_roles(self, user):
"""Return the roles a user has on a resource."""
"""Return the roles a user has on a resource as an iterable."""
if not user.is_authenticated:
return []
@@ -915,8 +1018,8 @@ class Invitation(BaseModel):
User.objects.filter(email=self.email).exists()
and not settings.OIDC_ALLOW_DUPLICATE_EMAILS
):
raise exceptions.ValidationError(
{"email": _("This email is already associated to a registered user.")}
raise ValidationError(
{"email": [_("This email is already associated to a registered user.")]}
)
@property

View File

@@ -624,7 +624,9 @@ def test_api_document_invitations_create_cannot_invite_existing_users():
)
assert response.status_code == 400
assert response.json() == ["This email is already associated to a registered user."]
assert response.json() == {
"email": ["This email is already associated to a registered user."]
}
# Update

View File

@@ -604,15 +604,19 @@ def test_api_document_versions_update_authenticated_related(via, mock_user_teams
# Delete
def test_api_document_versions_delete_anonymous():
@pytest.mark.parametrize("reach", models.LinkReachChoices.values)
def test_api_document_versions_delete_anonymous(reach):
"""Anonymous users should not be allowed to destroy a document version."""
access = factories.UserDocumentAccessFactory()
access = factories.UserDocumentAccessFactory(document__link_reach=reach)
response = APIClient().delete(
f"/api/v1.0/documents/{access.document_id!s}/versions/{access.id!s}/",
)
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
@pytest.mark.parametrize("reach", models.LinkReachChoices.values)

View File

@@ -31,8 +31,11 @@ def test_api_documents_children_create_anonymous(reach, role, depth):
},
)
assert response.status_code == 401
assert Document.objects.count() == depth
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
@pytest.mark.parametrize("depth", [1, 2, 3])

View File

@@ -305,7 +305,7 @@ def test_api_documents_children_list_authenticated_related_direct():
client.force_login(user)
document = factories.DocumentFactory()
factories.UserDocumentAccessFactory(document=document, user=user)
access = factories.UserDocumentAccessFactory(document=document, user=user)
factories.UserDocumentAccessFactory(document=document)
child1, child2 = factories.DocumentFactory.create_batch(2, parent=document)
@@ -378,7 +378,6 @@ def test_api_documents_children_list_authenticated_related_parent():
grand_parent_access = factories.UserDocumentAccessFactory(
document=grand_parent, user=user
)
factories.UserDocumentAccessFactory(document=grand_parent, user=user)
response = client.get(
f"/api/v1.0/documents/{document.id!s}/children/",
@@ -400,7 +399,7 @@ def test_api_documents_children_list_authenticated_related_parent():
"link_reach": child1.link_reach,
"link_role": child1.link_role,
"numchild": 0,
"nb_accesses": 3,
"nb_accesses": 2,
"path": child1.path,
"title": child1.title,
"updated_at": child1.updated_at.isoformat().replace("+00:00", "Z"),
@@ -417,7 +416,7 @@ def test_api_documents_children_list_authenticated_related_parent():
"link_reach": child2.link_reach,
"link_role": child2.link_role,
"numchild": 0,
"nb_accesses": 2,
"nb_accesses": 1,
"path": child2.path,
"title": child2.title,
"updated_at": child2.updated_at.isoformat().replace("+00:00", "Z"),

View File

@@ -77,6 +77,37 @@ def test_api_documents_delete_authenticated_not_owner(via, role, mock_user_teams
assert models.Document.objects.count() == 2
@pytest.mark.parametrize("depth", [1, 2, 3])
def test_api_documents_delete_authenticated_owner_of_ancestor(depth):
"""
Authenticated users should not be able to delete a document for which
they are only owner of an ancestor.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
documents = []
for i in range(depth):
documents.append(
factories.UserDocumentAccessFactory(role="owner", user=user).document
if i == 0
else factories.DocumentFactory(parent=documents[-1])
)
assert models.Document.objects.count() == depth
response = client.delete(
f"/api/v1.0/documents/{documents[-1].id}/",
)
assert response.status_code == 204
# Make sure it is only a soft delete
assert models.Document.objects.count() == depth
assert models.Document.objects.filter(deleted_at__isnull=True).count() == depth - 1
assert models.Document.objects.filter(deleted_at__isnull=False).count() == 1
@pytest.mark.parametrize("via", VIA)
def test_api_documents_delete_authenticated_owner(via, mock_user_teams):
"""
@@ -101,4 +132,8 @@ def test_api_documents_delete_authenticated_owner(via, mock_user_teams):
)
assert response.status_code == 204
assert models.Document.objects.exists() is False
# Make sure it is only a soft delete
assert models.Document.objects.count() == 1
assert models.Document.objects.filter(deleted_at__isnull=True).exists() is False
assert models.Document.objects.filter(deleted_at__isnull=False).count() == 1

View File

@@ -3,8 +3,11 @@ Tests for Documents API endpoint in impress's core app: list
"""
import random
from datetime import timedelta
from unittest import mock
from django.utils import timezone
import pytest
from faker import Faker
from rest_framework.pagination import PageNumberPagination
@@ -21,7 +24,7 @@ pytestmark = pytest.mark.django_db
def test_api_documents_list_anonymous(reach, role):
"""
Anonymous users should not be allowed to list documents whatever the
link reach and the role
link reach and link role
"""
factories.DocumentFactory(link_reach=reach, link_role=role)
@@ -76,6 +79,7 @@ def test_api_documents_list_format():
}
# pylint: disable=too-many-locals
def test_api_documents_list_authenticated_direct(django_assert_num_queries):
"""
Authenticated users should be able to list documents they are a direct
@@ -110,17 +114,50 @@ def test_api_documents_list_authenticated_direct(django_assert_num_queries):
hidden_root = factories.DocumentFactory()
child3_with_access = factories.DocumentFactory(parent=hidden_root)
factories.UserDocumentAccessFactory(user=user, document=child3_with_access)
child4_with_access = factories.DocumentFactory(parent=hidden_root)
factories.UserDocumentAccessFactory(user=user, document=child4_with_access)
expected_ids = {str(document1.id), str(document2.id), str(child3_with_access.id)}
# Documents that are soft deleted and children of a soft deleted document should not be listed
soft_deleted_document = factories.DocumentFactory(users=[user])
child_of_soft_deleted_document = factories.DocumentFactory(
users=[user],
parent=soft_deleted_document,
)
factories.DocumentFactory(users=[user], parent=child_of_soft_deleted_document)
soft_deleted_document.soft_delete()
with django_assert_num_queries(7):
# Documents that are permanently deleted and children of a permanently deleted
# document should not be listed
permanently_deleted_document = factories.DocumentFactory(users=[user])
child_of_permanently_deleted_document = factories.DocumentFactory(
users=[user], parent=permanently_deleted_document
)
factories.DocumentFactory(
users=[user], parent=child_of_permanently_deleted_document
)
fourty_days_ago = timezone.now() - timedelta(days=40)
with mock.patch("django.utils.timezone.now", return_value=fourty_days_ago):
permanently_deleted_document.soft_delete()
expected_ids = {
str(document1.id),
str(document2.id),
str(child3_with_access.id),
str(child4_with_access.id),
}
with django_assert_num_queries(8):
response = client.get("/api/v1.0/documents/")
# nb_accesses should now be cached
with django_assert_num_queries(4):
response = client.get("/api/v1.0/documents/")
assert response.status_code == 200
results = response.json()["results"]
assert len(results) == 3
results_id = {result["id"] for result in results}
assert expected_ids == results_id
results_ids = {result["id"] for result in results}
assert expected_ids == results_ids
def test_api_documents_list_authenticated_via_team(
@@ -148,7 +185,11 @@ def test_api_documents_list_authenticated_via_team(
expected_ids = {str(document.id) for document in documents_team1 + documents_team2}
with django_assert_num_queries(8):
with django_assert_num_queries(9):
response = client.get("/api/v1.0/documents/")
# nb_accesses should now be cached
with django_assert_num_queries(4):
response = client.get("/api/v1.0/documents/")
assert response.status_code == 200
@@ -177,10 +218,12 @@ def test_api_documents_list_authenticated_link_reach_restricted(
other_document = factories.DocumentFactory(link_reach="public")
models.LinkTrace.objects.create(document=other_document, user=user)
with django_assert_num_queries(5):
response = client.get("/api/v1.0/documents/")
# nb_accesses should now be cached
with django_assert_num_queries(4):
response = client.get(
"/api/v1.0/documents/",
)
response = client.get("/api/v1.0/documents/")
assert response.status_code == 200
results = response.json()["results"]
@@ -225,9 +268,11 @@ def test_api_documents_list_authenticated_link_reach_public_or_authenticated(
expected_ids = {str(document1.id), str(document2.id), str(visible_child.id)}
with django_assert_num_queries(7):
response = client.get(
"/api/v1.0/documents/",
)
response = client.get("/api/v1.0/documents/")
# nb_accesses should now be cached
with django_assert_num_queries(4):
response = client.get("/api/v1.0/documents/")
assert response.status_code == 200
results = response.json()["results"]
@@ -317,7 +362,11 @@ def test_api_documents_list_favorites_no_extra_queries(django_assert_num_queries
factories.DocumentFactory.create_batch(2, users=[user])
url = "/api/v1.0/documents/"
with django_assert_num_queries(8):
with django_assert_num_queries(9):
response = client.get(url)
# nb_accesses should now be cached
with django_assert_num_queries(4):
response = client.get(url)
assert response.status_code == 200
@@ -330,7 +379,7 @@ def test_api_documents_list_favorites_no_extra_queries(django_assert_num_queries
for document in special_documents:
models.DocumentFavorite.objects.create(document=document, user=user)
with django_assert_num_queries(8):
with django_assert_num_queries(4):
response = client.get(url)
assert response.status_code == 200

View File

@@ -86,8 +86,6 @@ def test_api_documents_list_filter_and_access_rights():
"-created_at",
"is_favorite",
"-is_favorite",
"nb_accesses",
"-nb_accesses",
"title",
"-title",
"updated_at",
@@ -143,8 +141,6 @@ def test_api_documents_list_ordering_by_fields():
"-created_at",
"is_favorite",
"-is_favorite",
"nb_accesses",
"-nb_accesses",
"title",
"-title",
"updated_at",
@@ -165,6 +161,31 @@ def test_api_documents_list_ordering_by_fields():
assert compare(results[i][field], results[i + 1][field])
# Filters: unknown field
def test_api_documents_list_filter_unknown_field():
"""
Trying to filter by an unknown field should raise a 400 error.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
factories.DocumentFactory()
expected_ids = {
str(document.id)
for document in factories.DocumentFactory.create_batch(2, users=[user])
}
response = client.get("/api/v1.0/documents/?unknown=true")
assert response.status_code == 200
results = response.json()["results"]
assert len(results) == 2
assert {result["id"] for result in results} == expected_ids
# Filters: is_creator_me
@@ -291,46 +312,6 @@ def test_api_documents_list_filter_is_favorite_invalid():
assert len(results) == 5
# Filters: link_reach
@pytest.mark.parametrize("reach", models.LinkReachChoices.values)
def test_api_documents_list_filter_link_reach(reach):
"""Authenticated users should be able to filter documents by link reach."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
factories.DocumentFactory.create_batch(5, users=[user])
response = client.get(f"/api/v1.0/documents/?link_reach={reach:s}")
assert response.status_code == 200
results = response.json()["results"]
# Ensure all results have the chosen link reach
for result in results:
assert result["link_reach"] == reach
def test_api_documents_list_filter_link_reach_invalid():
"""Filtering with an invalid `link_reach` value should raise an error."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
factories.DocumentFactory.create_batch(3, users=[user])
response = client.get("/api/v1.0/documents/?link_reach=invalid")
assert response.status_code == 400
assert response.json() == {
"link_reach": [
"Select a valid choice. invalid is not one of the available choices."
]
}
# Filters: title
@@ -360,7 +341,8 @@ def test_api_documents_list_filter_title(query, nb_results):
"Annual Review 2024",
]
for title in titles:
factories.DocumentFactory(title=title, users=[user])
parent = factories.DocumentFactory() if random.choice([True, False]) else None
factories.DocumentFactory(title=title, users=[user], parent=parent)
# Perform the search query
response = client.get(f"/api/v1.0/documents/?title={query:s}")

View File

@@ -3,6 +3,10 @@ Tests for Documents API endpoint in impress's core app: retrieve
"""
import random
from datetime import timedelta
from unittest import mock
from django.utils import timezone
import pytest
from rest_framework.test import APIClient
@@ -735,7 +739,7 @@ def test_api_documents_retrieve_user_roles(django_assert_num_queries):
)
expected_roles = {access.role for access in accesses}
with django_assert_num_queries(8):
with django_assert_num_queries(10):
response = client.get(f"/api/v1.0/documents/{document.id!s}/")
assert response.status_code == 200
@@ -752,9 +756,175 @@ def test_api_documents_retrieve_numqueries_with_link_trace(django_assert_num_que
document = factories.DocumentFactory(users=[user], link_traces=[user])
with django_assert_num_queries(2):
with django_assert_num_queries(4):
response = client.get(f"/api/v1.0/documents/{document.id!s}/")
with django_assert_num_queries(3):
response = client.get(f"/api/v1.0/documents/{document.id!s}/")
assert response.status_code == 200
assert response.json()["id"] == str(document.id)
# Soft/permanent delete
@pytest.mark.parametrize("depth", [1, 2, 3])
@pytest.mark.parametrize("reach", models.LinkReachChoices.values)
def test_api_documents_retrieve_soft_deleted_anonymous(reach, depth):
"""
A soft/permanently deleted public document should not be accessible via its
detail endpoint for anonymous users, and should return a 404.
"""
documents = []
for i in range(depth):
documents.append(
factories.DocumentFactory(link_reach=reach)
if i == 0
else factories.DocumentFactory(parent=documents[-1])
)
assert models.Document.objects.count() == depth
response = APIClient().get(f"/api/v1.0/documents/{documents[-1].id!s}/")
assert response.status_code == 200 if reach == "public" else 401
# Delete any one of the documents...
deleted_document = random.choice(documents)
deleted_document.soft_delete()
response = APIClient().get(f"/api/v1.0/documents/{documents[-1].id!s}/")
assert response.status_code == 404
assert response.json() == {"detail": "Not found."}
fourty_days_ago = timezone.now() - timedelta(days=40)
deleted_document.deleted_at = fourty_days_ago
deleted_document.ancestors_deleted_at = fourty_days_ago
deleted_document.save()
response = APIClient().get(f"/api/v1.0/documents/{documents[-1].id!s}/")
assert response.status_code == 404
assert response.json() == {"detail": "Not found."}
@pytest.mark.parametrize("depth", [1, 2, 3])
@pytest.mark.parametrize("reach", models.LinkReachChoices.values)
def test_api_documents_retrieve_soft_deleted_authenticated(reach, depth):
"""
A soft/permanently deleted document should not be accessible via its detail endpoint for
authenticated users not related to the document.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
documents = []
for i in range(depth):
documents.append(
factories.DocumentFactory(link_reach=reach)
if i == 0
else factories.DocumentFactory(parent=documents[-1])
)
assert models.Document.objects.count() == depth
response = client.get(f"/api/v1.0/documents/{documents[-1].id!s}/")
assert response.status_code == 200 if reach in ["public", "authenticated"] else 403
# Delete any one of the documents...
deleted_document = random.choice(documents)
deleted_document.soft_delete()
response = client.get(f"/api/v1.0/documents/{documents[-1].id!s}/")
assert response.status_code == 404
assert response.json() == {"detail": "Not found."}
fourty_days_ago = timezone.now() - timedelta(days=40)
deleted_document.deleted_at = fourty_days_ago
deleted_document.ancestors_deleted_at = fourty_days_ago
deleted_document.save()
response = client.get(f"/api/v1.0/documents/{documents[-1].id!s}/")
assert response.status_code == 404
assert response.json() == {"detail": "Not found."}
@pytest.mark.parametrize("depth", [1, 2, 3])
@pytest.mark.parametrize("role", models.RoleChoices.values)
def test_api_documents_retrieve_soft_deleted_related(role, depth):
"""
A soft deleted document should only be accessible via its detail endpoint by
users with specific "owner" access rights.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
documents = []
for i in range(depth):
documents.append(
factories.UserDocumentAccessFactory(role=role, user=user).document
if i == 0
else factories.DocumentFactory(parent=documents[-1])
)
assert models.Document.objects.count() == depth
document = documents[-1]
response = client.get(f"/api/v1.0/documents/{document.id!s}/")
assert response.status_code == 200
# Delete any one of the documents
deleted_document = random.choice(documents)
deleted_document.soft_delete()
response = client.get(f"/api/v1.0/documents/{document.id!s}/")
if role == "owner":
assert response.status_code == 200
assert response.json()["id"] == str(document.id)
else:
assert response.status_code == 404
assert response.json() == {"detail": "Not found."}
@pytest.mark.parametrize("depth", [1, 2, 3])
@pytest.mark.parametrize("role", models.RoleChoices.values)
def test_api_documents_retrieve_permanently_deleted_related(role, depth):
"""
A permanently deleted document should not be accessible via its detail endpoint for
authenticated users with specific access rights whatever their role.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
documents = []
for i in range(depth):
documents.append(
factories.UserDocumentAccessFactory(role=role, user=user).document
if i == 0
else factories.DocumentFactory(parent=documents[-1])
)
assert models.Document.objects.count() == depth
document = documents[-1]
response = client.get(f"/api/v1.0/documents/{document.id!s}/")
assert response.status_code == 200
# Delete any one of the documents
deleted_document = random.choice(documents)
fourty_days_ago = timezone.now() - timedelta(days=40)
with mock.patch("django.utils.timezone.now", return_value=fourty_days_ago):
deleted_document.soft_delete()
response = client.get(f"/api/v1.0/documents/{document.id!s}/")
assert response.status_code == 404
assert response.json() == {"detail": "Not found."}

View File

@@ -0,0 +1,275 @@
"""
Tests for Documents API endpoint in impress's core app: list
"""
from datetime import timedelta
from unittest import mock
from django.utils import timezone
import pytest
from faker import Faker
from rest_framework.pagination import PageNumberPagination
from rest_framework.test import APIClient
from core import factories, models
fake = Faker()
pytestmark = pytest.mark.django_db
@pytest.mark.parametrize("role", models.LinkRoleChoices.values)
@pytest.mark.parametrize("reach", models.LinkReachChoices.values)
def test_api_documents_trashbin_anonymous(reach, role):
"""
Anonymous users should not be allowed to list documents from the trashbin
whatever the link reach and link role
"""
factories.DocumentFactory(
link_reach=reach, link_role=role, deleted_at=timezone.now()
)
response = APIClient().get("/api/v1.0/documents/trashbin/")
assert response.status_code == 200
assert response.json() == {
"count": 0,
"next": None,
"previous": None,
"results": [],
}
def test_api_documents_trashbin_format():
"""Validate the format of documents as returned by the trashbin view."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
other_users = factories.UserFactory.create_batch(3)
document = factories.DocumentFactory(
deleted_at=timezone.now(),
users=factories.UserFactory.create_batch(2),
favorited_by=[user, *other_users],
link_traces=other_users,
)
factories.UserDocumentAccessFactory(document=document, user=user, role="owner")
response = client.get("/api/v1.0/documents/trashbin/")
assert response.status_code == 200
content = response.json()
results = content.pop("results")
assert content == {
"count": 1,
"next": None,
"previous": None,
}
assert len(results) == 1
assert results[0] == {
"id": str(document.id),
"abilities": {
"accesses_manage": True,
"accesses_view": True,
"ai_transform": True,
"ai_translate": True,
"attachment_upload": True,
"children_create": True,
"children_list": True,
"collaboration_auth": True,
"destroy": True,
"favorite": True,
"invite_owner": True,
"link_configuration": True,
"media_auth": True,
"move": False, # Can't move a deleted document
"partial_update": True,
"retrieve": True,
"update": True,
"versions_destroy": True,
"versions_list": True,
"versions_retrieve": True,
},
"created_at": document.created_at.isoformat().replace("+00:00", "Z"),
"creator": str(document.creator.id),
"depth": 1,
"excerpt": document.excerpt,
"link_reach": document.link_reach,
"link_role": document.link_role,
"nb_accesses": 3,
"numchild": 0,
"path": document.path,
"title": document.title,
"updated_at": document.updated_at.isoformat().replace("+00:00", "Z"),
"user_roles": ["owner"],
}
def test_api_documents_trashbin_authenticated_direct(django_assert_num_queries):
"""
The trashbin should only list deleted documents for which the current user is owner.
"""
now = timezone.now()
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
document1, document2 = factories.DocumentFactory.create_batch(2, deleted_at=now)
models.DocumentAccess.objects.create(document=document1, user=user, role="owner")
models.DocumentAccess.objects.create(document=document2, user=user, role="owner")
# Unrelated documents
for reach in models.LinkReachChoices:
for role in models.LinkRoleChoices:
factories.DocumentFactory(link_reach=reach, link_role=role, deleted_at=now)
# Role other than "owner"
for role in models.RoleChoices.values:
if role == "owner":
continue
document_not_owner = factories.DocumentFactory(deleted_at=now)
models.DocumentAccess.objects.create(
document=document_not_owner, user=user, role=role
)
# Nested documents should also get listed
parent = factories.DocumentFactory(parent=document1)
document3 = factories.DocumentFactory(parent=parent, deleted_at=now)
models.DocumentAccess.objects.create(document=parent, user=user, role="owner")
# Permanently deleted documents should not be listed
fourty_days_ago = timezone.now() - timedelta(days=40)
permanently_deleted_document = factories.DocumentFactory(users=[(user, "owner")])
with mock.patch("django.utils.timezone.now", return_value=fourty_days_ago):
permanently_deleted_document.soft_delete()
expected_ids = {str(document1.id), str(document2.id), str(document3.id)}
with django_assert_num_queries(7):
response = client.get("/api/v1.0/documents/trashbin/")
with django_assert_num_queries(4):
response = client.get("/api/v1.0/documents/trashbin/")
assert response.status_code == 200
results = response.json()["results"]
results_ids = {result["id"] for result in results}
assert len(results) == 3
assert expected_ids == results_ids
def test_api_documents_trashbin_authenticated_via_team(
django_assert_num_queries, mock_user_teams
):
"""
Authenticated users should be able to list trashbin documents they own via a team.
"""
now = timezone.now()
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
mock_user_teams.return_value = ["team1", "team2", "unknown"]
deleted_document_team1 = factories.DocumentFactory(
teams=[("team1", "owner")], deleted_at=now
)
factories.DocumentFactory(teams=[("team1", "owner")])
factories.DocumentFactory(teams=[("team1", "administrator")], deleted_at=now)
factories.DocumentFactory(teams=[("team1", "administrator")])
deleted_document_team2 = factories.DocumentFactory(
teams=[("team2", "owner")], deleted_at=now
)
factories.DocumentFactory(teams=[("team2", "owner")])
factories.DocumentFactory(teams=[("team2", "administrator")], deleted_at=now)
factories.DocumentFactory(teams=[("team2", "administrator")])
expected_ids = {str(deleted_document_team1.id), str(deleted_document_team2.id)}
with django_assert_num_queries(5):
response = client.get("/api/v1.0/documents/trashbin/")
with django_assert_num_queries(3):
response = client.get("/api/v1.0/documents/trashbin/")
assert response.status_code == 200
results = response.json()["results"]
assert len(results) == 2
results_id = {result["id"] for result in results}
assert expected_ids == results_id
@mock.patch.object(PageNumberPagination, "get_page_size", return_value=2)
def test_api_documents_trashbin_pagination(
_mock_page_size,
):
"""Pagination should work as expected."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
document_ids = [
str(document.id)
for document in factories.DocumentFactory.create_batch(
3, deleted_at=timezone.now()
)
]
for document_id in document_ids:
models.DocumentAccess.objects.create(
document_id=document_id, user=user, role="owner"
)
# Get page 1
response = client.get("/api/v1.0/documents/trashbin/")
assert response.status_code == 200
content = response.json()
assert content["count"] == 3
assert content["next"] == "http://testserver/api/v1.0/documents/trashbin/?page=2"
assert content["previous"] is None
assert len(content["results"]) == 2
for item in content["results"]:
document_ids.remove(item["id"])
# Get page 2
response = client.get(
"/api/v1.0/documents/trashbin/?page=2",
)
assert response.status_code == 200
content = response.json()
assert content["count"] == 3
assert content["next"] is None
assert content["previous"] == "http://testserver/api/v1.0/documents/trashbin/"
assert len(content["results"]) == 1
document_ids.remove(content["results"][0]["id"])
assert document_ids == []
def test_api_documents_trashbin_distinct():
"""A document with several related users should only be listed once."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
other_user = factories.UserFactory()
document = factories.DocumentFactory(
users=[(user, "owner"), other_user], deleted_at=timezone.now()
)
response = client.get(
"/api/v1.0/documents/trashbin/",
)
assert response.status_code == 200
content = response.json()
assert len(content["results"]) == 1
assert content["results"][0]["id"] == str(document.id)

View File

@@ -0,0 +1,94 @@
"""
Unit tests for the filter_root_paths utility function.
"""
from core.api.utils import filter_root_paths
def test_api_utils_filter_root_paths_success():
"""
The `filter_root_paths` function should correctly identify root paths
from a given list of paths.
This test uses a list of paths with missing intermediate paths to ensure that
only the minimal set of root paths is returned.
"""
paths = [
"0001",
"00010001",
"000100010001",
"000100010002",
# missing 00010002
"000100020001",
"000100020002",
"0002",
"00020001",
"00020002",
# missing 0003
"00030001",
"000300010001",
"00030002",
# missing 0004
# missing 00040001
# missing 000400010001
# missing 000400010002
"000400010003",
"0004000100030001",
"000400010004",
]
filtered_paths = filter_root_paths(paths, skip_sorting=True)
assert filtered_paths == [
"0001",
"0002",
"00030001",
"00030002",
"000400010003",
"000400010004",
]
def test_api_utils_filter_root_paths_sorting():
"""
The `filter_root_paths` function should fail is sorting is skipped and paths are not sorted.
This test verifies that when sorting is skipped, the function respects the input order, and
when sorting is enabled, the result is correctly ordered and minimal.
"""
paths = [
"0001",
"00010001",
"000100010001",
"000100020002",
"000100010002",
"000100020001",
"00020001",
"0002",
"00020002",
"000300010001",
"00030001",
"00030002",
"0004000100030001",
"000400010003",
"000400010004",
]
filtered_paths = filter_root_paths(paths, skip_sorting=True)
assert filtered_paths == [
"0001",
"00020001",
"0002",
"000300010001",
"00030001",
"00030002",
"0004000100030001",
"000400010003",
"000400010004",
]
filtered_paths = filter_root_paths(paths)
assert filtered_paths == [
"0001",
"0002",
"00030001",
"00030002",
"000400010003",
"000400010004",
]

View File

@@ -2,12 +2,14 @@
Unit tests for the Document model
"""
import random
import smtplib
from logging import Logger
from unittest import mock
from django.contrib.auth.models import AnonymousUser
from django.core import mail
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.core.files.storage import default_storage
from django.utils import timezone
@@ -81,6 +83,44 @@ def test_models_documents_tree_alphabet():
assert models.Document.objects.count() == 124
@pytest.mark.parametrize("depth", range(5))
def test_models_documents_soft_delete(depth):
"""Trying to delete a document that is already deleted or is a descendant of
a deleted document should raise an error.
"""
documents = []
for i in range(depth + 1):
documents.append(
factories.DocumentFactory()
if i == 0
else factories.DocumentFactory(parent=documents[-1])
)
assert models.Document.objects.count() == depth + 1
# Delete any one of the documents...
deleted_document = random.choice(documents)
deleted_document.soft_delete()
with pytest.raises(RuntimeError):
documents[-1].soft_delete()
assert deleted_document.deleted_at is not None
assert deleted_document.ancestors_deleted_at == deleted_document.deleted_at
descendants = deleted_document.get_descendants()
for child in descendants:
assert child.deleted_at is None
assert child.ancestors_deleted_at is not None
assert child.ancestors_deleted_at == deleted_document.deleted_at
ancestors = deleted_document.get_ancestors()
for parent in ancestors:
assert parent.deleted_at is None
assert parent.ancestors_deleted_at is None
assert len(ancestors) + len(descendants) == depth
# get_abilities
@@ -95,15 +135,16 @@ def test_models_documents_tree_alphabet():
(False, "authenticated", "editor"),
],
)
def test_models_documents_get_abilities_forbidden(is_authenticated, reach, role):
def test_models_documents_get_abilities_forbidden(
is_authenticated, reach, role, django_assert_num_queries
):
"""
Check abilities returned for a document giving insufficient roles to link holders
i.e anonymous users or authenticated users who have no specific role on the document.
"""
document = factories.DocumentFactory(link_reach=reach, link_role=role)
user = factories.UserFactory() if is_authenticated else AnonymousUser()
abilities = document.get_abilities(user)
assert abilities == {
expected_abilities = {
"accesses_manage": False,
"accesses_view": False,
"ai_transform": False,
@@ -125,6 +166,12 @@ def test_models_documents_get_abilities_forbidden(is_authenticated, reach, role)
"versions_list": False,
"versions_retrieve": False,
}
nb_queries = 1 if is_authenticated else 0
with django_assert_num_queries(nb_queries):
assert document.get_abilities(user) == expected_abilities
document.soft_delete()
document.refresh_from_db()
assert document.get_abilities(user) == expected_abilities
@pytest.mark.parametrize(
@@ -135,15 +182,16 @@ def test_models_documents_get_abilities_forbidden(is_authenticated, reach, role)
(True, "authenticated"),
],
)
def test_models_documents_get_abilities_reader(is_authenticated, reach):
def test_models_documents_get_abilities_reader(
is_authenticated, reach, django_assert_num_queries
):
"""
Check abilities returned for a document giving reader role to link holders
i.e anonymous users or authenticated users who have no specific role on the document.
"""
document = factories.DocumentFactory(link_reach=reach, link_role="reader")
user = factories.UserFactory() if is_authenticated else AnonymousUser()
abilities = document.get_abilities(user)
assert abilities == {
expected_abilities = {
"accesses_manage": False,
"accesses_view": False,
"ai_transform": False,
@@ -165,6 +213,12 @@ def test_models_documents_get_abilities_reader(is_authenticated, reach):
"versions_list": False,
"versions_retrieve": False,
}
nb_queries = 1 if is_authenticated else 0
with django_assert_num_queries(nb_queries):
assert document.get_abilities(user) == expected_abilities
document.soft_delete()
document.refresh_from_db()
assert all(value is False for value in document.get_abilities(user).values())
@pytest.mark.parametrize(
@@ -175,15 +229,16 @@ def test_models_documents_get_abilities_reader(is_authenticated, reach):
(True, "authenticated"),
],
)
def test_models_documents_get_abilities_editor(is_authenticated, reach):
def test_models_documents_get_abilities_editor(
is_authenticated, reach, django_assert_num_queries
):
"""
Check abilities returned for a document giving editor role to link holders
i.e anonymous users or authenticated users who have no specific role on the document.
"""
document = factories.DocumentFactory(link_reach=reach, link_role="editor")
user = factories.UserFactory() if is_authenticated else AnonymousUser()
abilities = document.get_abilities(user)
assert abilities == {
expected_abilities = {
"accesses_manage": False,
"accesses_view": False,
"ai_transform": True,
@@ -205,14 +260,19 @@ def test_models_documents_get_abilities_editor(is_authenticated, reach):
"versions_list": False,
"versions_retrieve": False,
}
nb_queries = 1 if is_authenticated else 0
with django_assert_num_queries(nb_queries):
assert document.get_abilities(user) == expected_abilities
document.soft_delete()
document.refresh_from_db()
assert all(value is False for value in document.get_abilities(user).values())
def test_models_documents_get_abilities_owner():
def test_models_documents_get_abilities_owner(django_assert_num_queries):
"""Check abilities returned for the owner of a document."""
user = factories.UserFactory()
access = factories.UserDocumentAccessFactory(role="owner", user=user)
abilities = access.document.get_abilities(access.user)
assert abilities == {
document = factories.DocumentFactory(users=[(user, "owner")])
expected_abilities = {
"accesses_manage": True,
"accesses_view": True,
"ai_transform": True,
@@ -234,13 +294,19 @@ def test_models_documents_get_abilities_owner():
"versions_list": True,
"versions_retrieve": True,
}
with django_assert_num_queries(1):
assert document.get_abilities(user) == expected_abilities
document.soft_delete()
document.refresh_from_db()
expected_abilities["move"] = False
assert document.get_abilities(user) == expected_abilities
def test_models_documents_get_abilities_administrator():
def test_models_documents_get_abilities_administrator(django_assert_num_queries):
"""Check abilities returned for the administrator of a document."""
access = factories.UserDocumentAccessFactory(role="administrator")
abilities = access.document.get_abilities(access.user)
assert abilities == {
user = factories.UserFactory()
document = factories.DocumentFactory(users=[(user, "administrator")])
expected_abilities = {
"accesses_manage": True,
"accesses_view": True,
"ai_transform": True,
@@ -262,16 +328,18 @@ def test_models_documents_get_abilities_administrator():
"versions_list": True,
"versions_retrieve": True,
}
with django_assert_num_queries(1):
assert document.get_abilities(user) == expected_abilities
document.soft_delete()
document.refresh_from_db()
assert all(value is False for value in document.get_abilities(user).values())
def test_models_documents_get_abilities_editor_user(django_assert_num_queries):
"""Check abilities returned for the editor of a document."""
access = factories.UserDocumentAccessFactory(role="editor")
with django_assert_num_queries(1):
abilities = access.document.get_abilities(access.user)
assert abilities == {
user = factories.UserFactory()
document = factories.DocumentFactory(users=[(user, "editor")])
expected_abilities = {
"accesses_manage": False,
"accesses_view": True,
"ai_transform": True,
@@ -293,24 +361,27 @@ def test_models_documents_get_abilities_editor_user(django_assert_num_queries):
"versions_list": True,
"versions_retrieve": True,
}
with django_assert_num_queries(1):
assert document.get_abilities(user) == expected_abilities
document.soft_delete()
document.refresh_from_db()
assert all(value is False for value in document.get_abilities(user).values())
def test_models_documents_get_abilities_reader_user(django_assert_num_queries):
"""Check abilities returned for the reader of a document."""
access = factories.UserDocumentAccessFactory(
role="reader", document__link_role="reader"
user = factories.UserFactory()
document = factories.DocumentFactory(users=[(user, "reader")])
access_from_link = (
document.link_reach != "restricted" and document.link_role == "editor"
)
with django_assert_num_queries(1):
abilities = access.document.get_abilities(access.user)
assert abilities == {
expected_abilities = {
"accesses_manage": False,
"accesses_view": True,
"ai_transform": False,
"ai_translate": False,
"attachment_upload": False,
"children_create": False,
"ai_transform": access_from_link,
"ai_translate": access_from_link,
"attachment_upload": access_from_link,
"children_create": access_from_link,
"children_list": True,
"collaboration_auth": True,
"destroy": False,
@@ -319,13 +390,18 @@ def test_models_documents_get_abilities_reader_user(django_assert_num_queries):
"link_configuration": False,
"media_auth": True,
"move": False,
"partial_update": False,
"partial_update": access_from_link,
"retrieve": True,
"update": False,
"update": access_from_link,
"versions_destroy": False,
"versions_list": True,
"versions_retrieve": True,
}
with django_assert_num_queries(1):
assert document.get_abilities(user) == expected_abilities
document.soft_delete()
document.refresh_from_db()
assert all(value is False for value in document.get_abilities(user).values())
def test_models_documents_get_abilities_preset_role(django_assert_num_queries):
@@ -555,3 +631,62 @@ def test_models_documents__email_invitation__failed(mock_logger, _mock_send_mail
assert emails == ["guest3@example.com"]
assert isinstance(exception, smtplib.SMTPException)
# Document number of accesses
def test_models_documents_nb_accesses_cache_is_set_and_retrieved(
django_assert_num_queries,
):
"""Test that nb_accesses is cached after the first computation."""
document = factories.DocumentFactory()
key = f"document_{document.id!s}_nb_accesses"
nb_accesses = random.randint(1, 4)
factories.UserDocumentAccessFactory.create_batch(nb_accesses, document=document)
factories.UserDocumentAccessFactory() # An unrelated access should not be counted
# Initially, the nb_accesses should not be cached
assert cache.get(key) is None
# Compute the nb_accesses for the first time (this should set the cache)
with django_assert_num_queries(1):
assert document.nb_accesses == nb_accesses
# Ensure that the nb_accesses is now cached
with django_assert_num_queries(0):
assert document.nb_accesses == nb_accesses
assert cache.get(key) == nb_accesses
# The cache value should be invalidated when a document access is created
models.DocumentAccess.objects.create(
document=document, user=factories.UserFactory(), role="reader"
)
assert cache.get(key) is None # Cache should be invalidated
with django_assert_num_queries(1):
new_nb_accesses = document.nb_accesses
assert new_nb_accesses == nb_accesses + 1
assert cache.get(key) == new_nb_accesses # Cache should now contain the new value
def test_models_documents_nb_accesses_cache_is_invalidated_on_access_removal(
django_assert_num_queries,
):
"""Test that the cache is invalidated when a document access is deleted."""
document = factories.DocumentFactory()
key = f"document_{document.id!s}_nb_accesses"
access = factories.UserDocumentAccessFactory(document=document)
# Initially, the nb_accesses should be cached
assert document.nb_accesses == 1
assert cache.get(key) == 1
# Remove the access and check if cache is invalidated
access.delete()
assert cache.get(key) is None # Cache should be invalidated
# Recompute the nb_accesses (this should trigger a cache set)
with django_assert_num_queries(1):
new_nb_accesses = document.nb_accesses
assert new_nb_accesses == 0
assert cache.get(key) == 0 # Cache should now contain the new value

View File

@@ -296,6 +296,7 @@ class Base(Configuration):
"drf_spectacular",
# Third party apps
"corsheaders",
"django_filters",
"dockerflow.django",
"rest_framework",
"parler",
@@ -351,6 +352,10 @@ class Base(Configuration):
"REDOC_DIST": "SIDECAR",
}
TRASHBIN_CUTOFF_DAYS = values.Value(
30, environ_name="TRASHBIN_CUTOFF_DAYS", environ_prefix=None
)
# Mail
EMAIL_BACKEND = values.Value("django.core.mail.backends.smtp.EmailBackend")
EMAIL_BRAND_NAME = values.Value(None)
@@ -771,6 +776,11 @@ class Production(Base):
environ_name="REDIS_URL",
environ_prefix=None,
),
"TIMEOUT": values.IntegerValue(
30, # timeout in seconds
environ_name="CACHES_DEFAULT_TIMEOUT",
environ_prefix=None,
),
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
},