(backend) add duplicate action to the document API endpoint

We took this opportunity to refactor the way access is controlled on
media attachments. We now add the media key to a list on the document
instance each time a media is uploaded to a document. This list is
passed along when a document is duplicated, allowing us to grant
access to readers on the new document, even if they don't have or
lost access to the original document.

We also propose an option to reproduce the same access rights on the
duplicate document as what was in place on the original document.
This can be requested by passing the "with_accesses=true" option in
the query string.

The tricky point is that we need to extract attachment keys from the
existing documents and set them on the new "attachments" field that is
now used to track access rights on media files.
This commit is contained in:
Samuel Paccoud - DINUM
2025-01-20 10:23:18 +01:00
committed by Manuel Raynaud
parent 6976bb7c78
commit 34a208a80d
19 changed files with 1066 additions and 79 deletions

View File

@@ -132,6 +132,7 @@ and this project adheres to
## Added
- ✨(backend) add duplicate action to the document API endpoint
- ⚗️(backend) add util to extract text from base64 yjs document
- ✨(backend) add soft delete and restore API endpoints to documents #516
- ✨(backend) allow organizing documents in a tree structure #516

View File

@@ -151,6 +151,8 @@ class DocumentAdmin(TreeAdmin):
"path",
"depth",
"numchild",
"duplicated_from",
"attachments",
)
},
),
@@ -166,8 +168,10 @@ class DocumentAdmin(TreeAdmin):
"updated_at",
)
readonly_fields = (
"attachments",
"creator",
"depth",
"duplicated_from",
"id",
"numchild",
"path",

View File

@@ -381,6 +381,27 @@ class LinkDocumentSerializer(serializers.ModelSerializer):
]
class DocumentDuplicationSerializer(serializers.Serializer):
"""
Serializer for duplicating a document.
Allows specifying whether to keep access permissions.
"""
with_accesses = serializers.BooleanField(default=False)
def create(self, validated_data):
"""
This serializer is not intended to create objects.
"""
raise NotImplementedError("This serializer does not support creation.")
def update(self, instance, validated_data):
"""
This serializer is not intended to update objects.
"""
raise NotImplementedError("This serializer does not support updating.")
# Suppress the warning about not implementing `create` and `update` methods
# since we don't use a model and only rely on the serializer for validation
# pylint: disable=abstract-method

View File

@@ -16,6 +16,8 @@ from django.db import transaction
from django.db.models.expressions import RawSQL
from django.db.models.functions import Left, Length
from django.http import Http404, StreamingHttpResponse
from django.utils.text import capfirst
from django.utils.translation import gettext_lazy as _
import requests
import rest_framework as drf
@@ -28,26 +30,13 @@ from rest_framework.throttling import UserRateThrottle
from core import authentication, enums, models
from core.services.ai_services import AIService
from core.services.collaboration_services import CollaborationService
from core.utils import extract_attachments, filter_descendants
from . import permissions, serializers, utils
from .filters import DocumentFilter, ListDocumentFilter
logger = logging.getLogger(__name__)
<<<<<<< HEAD
ATTACHMENTS_FOLDER = "attachments"
UUID_REGEX = (
r"[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}"
)
FILE_EXT_REGEX = r"\.[a-zA-Z0-9]{1,10}"
MEDIA_STORAGE_URL_PATTERN = re.compile(
f"{settings.MEDIA_URL:s}(?P<pk>{UUID_REGEX:s})/"
f"(?P<key>{ATTACHMENTS_FOLDER:s}/{UUID_REGEX:s}(?:-unsafe)?{FILE_EXT_REGEX:s})$"
)
COLLABORATION_WS_URL_PATTERN = re.compile(rf"(?:^|&)room=(?P<pk>{UUID_REGEX})(?:&|$)")
=======
>>>>>>> 8076486a ((backend) add missing test on media-auth and collaboration-auth)
# pylint: disable=too-many-ancestors
@@ -904,6 +893,82 @@ class DocumentViewSet(
utils.nest_tree(serializer.data, self.queryset.model.steplen)
)
@drf.decorators.action(
detail=True,
methods=["post"],
permission_classes=[permissions.IsAuthenticated, permissions.AccessPermission],
url_path="duplicate",
)
@transaction.atomic
def duplicate(self, request, *args, **kwargs):
"""
Duplicate a document and store the links to attached files in the duplicated
document to allow cross-access.
Optionally duplicates accesses if `with_accesses` is set to true
in the payload.
"""
# Get document while checking permissions
document = self.get_object()
serializer = serializers.DocumentDuplicationSerializer(
data=request.data, partial=True
)
serializer.is_valid(raise_exception=True)
with_accesses = serializer.validated_data.get("with_accesses", False)
base64_yjs_content = document.content
# Duplicate the document instance
link_kwargs = (
{"link_reach": document.link_reach, "link_role": document.link_role}
if with_accesses
else {}
)
extracted_attachments = set(extract_attachments(document.content))
attachments = list(extracted_attachments & set(document.attachments))
duplicated_document = document.add_sibling(
"right",
title=capfirst(_("copy of {title}").format(title=document.title)),
content=base64_yjs_content,
attachments=attachments,
duplicated_from=document,
creator=request.user,
**link_kwargs,
)
# Always add the logged-in user as OWNER
accesses_to_create = [
models.DocumentAccess(
document=duplicated_document,
user=request.user,
role=models.RoleChoices.OWNER,
)
]
# If accesses should be duplicated, add other users' accesses as per original document
if with_accesses:
original_accesses = models.DocumentAccess.objects.filter(
document=document
).exclude(user=request.user)
accesses_to_create.extend(
models.DocumentAccess(
document=duplicated_document,
user_id=access.user_id,
team=access.team,
role=access.role,
)
for access in original_accesses
)
# Bulk create all the duplicated accesses
models.DocumentAccess.objects.bulk_create(accesses_to_create)
return drf_response.Response(
{"id": str(duplicated_document.id)}, status=status.HTTP_201_CREATED
)
@drf.decorators.action(detail=True, methods=["get"], url_path="versions")
def versions_list(self, request, *args, **kwargs):
"""
@@ -1053,7 +1118,7 @@ class DocumentViewSet(
# Generate a generic yet unique filename to store the image in object storage
file_id = uuid.uuid4()
extension = serializer.validated_data["expected_extension"]
ext = serializer.validated_data["expected_extension"]
# Prepare metadata for storage
extra_args = {
@@ -1065,7 +1130,7 @@ class DocumentViewSet(
extra_args["Metadata"]["is_unsafe"] = "true"
file_unsafe = "-unsafe"
key = f"{document.key_base}/{ATTACHMENTS_FOLDER:s}/{file_id!s}{file_unsafe}.{extension:s}"
key = f"{document.key_base}/{enums.ATTACHMENTS_FOLDER:s}/{file_id!s}{file_unsafe}.{ext:s}"
file_name = serializer.validated_data["file_name"]
if (
@@ -1085,6 +1150,10 @@ class DocumentViewSet(
file, default_storage.bucket_name, key, ExtraArgs=extra_args
)
# Make the attachment readable by document readers
document.attachments.append(key)
document.save()
return drf.response.Response(
{"file": f"{settings.MEDIA_URL:s}{key:s}"},
status=drf.status.HTTP_201_CREATED,
@@ -1152,20 +1221,35 @@ class DocumentViewSet(
url_params = self._auth_get_url_params(
enums.MEDIA_STORAGE_URL_PATTERN, parsed_url.path
)
document = self._auth_get_document(url_params["pk"])
if not document.get_abilities(request.user).get(self.action, False):
logger.debug(
"User '%s' lacks permission for document '%s'",
request.user,
document.pk,
)
user = request.user
key = f"{url_params['pk']:s}/{url_params['attachment']:s}"
# Look for a document to which the user has access and that includes this attachment
# We must look into all descendants of any document to which the user has access per se
readable_per_se_paths = (
self.queryset.readable_per_se(user)
.order_by("path")
.values_list("path", flat=True)
)
attachments_documents = (
self.queryset.filter(attachments__contains=[key])
.only("path")
.order_by("path")
)
readable_attachments_paths = filter_descendants(
[doc.path for doc in attachments_documents],
readable_per_se_paths,
skip_sorting=True,
)
if not readable_attachments_paths:
logger.debug("User '%s' lacks permission for attachment", user)
raise drf.exceptions.PermissionDenied()
# Generate S3 authorization headers using the extracted URL parameters
request = utils.generate_s3_authorization_headers(
f"{url_params['pk']:s}/{url_params['key']:s}"
)
request = utils.generate_s3_authorization_headers(key)
return drf.response.Response("authorized", headers=request.headers, status=200)

View File

@@ -12,10 +12,13 @@ ATTACHMENTS_FOLDER = "attachments"
UUID_REGEX = (
r"[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}"
)
FILE_EXT_REGEX = r"\.[a-zA-Z]{3,4}"
FILE_EXT_REGEX = r"\.[a-zA-Z0-9]{1,10}"
MEDIA_STORAGE_URL_PATTERN = re.compile(
f"{settings.MEDIA_URL:s}(?P<pk>{UUID_REGEX:s})/"
f"(?P<key>{ATTACHMENTS_FOLDER:s}/{UUID_REGEX:s}{FILE_EXT_REGEX:s})$"
f"(?P<attachment>{ATTACHMENTS_FOLDER:s}/{UUID_REGEX:s}(?:-unsafe)?{FILE_EXT_REGEX:s})$"
)
MEDIA_STORAGE_URL_EXTRACT = re.compile(
f"{settings.MEDIA_URL:s}({UUID_REGEX}/{ATTACHMENTS_FOLDER}/{UUID_REGEX}{FILE_EXT_REGEX})"
)
COLLABORATION_WS_URL_PATTERN = re.compile(rf"(?:^|&)room=(?P<pk>{UUID_REGEX})(?:&|$)")

View File

@@ -0,0 +1,77 @@
# Generated by Django 5.1.4 on 2025-01-18 11:53
import re
import django.contrib.postgres.fields
import django.db.models.deletion
from django.core.files.storage import default_storage
from django.db import migrations, models
from botocore.exceptions import ClientError
import core.models
from core.utils import extract_attachments
def populate_attachments_on_all_documents(apps, schema_editor):
"""Populate "attachments" field on all existing documents in the database."""
Document = apps.get_model("core", "Document")
for document in Document.objects.all():
try:
response = default_storage.connection.meta.client.get_object(
Bucket=default_storage.bucket_name, Key=f"{document.pk!s}/file"
)
except (FileNotFoundError, ClientError):
pass
else:
content = response["Body"].read().decode("utf-8")
document.attachments = extract_attachments(content)
document.save(update_fields=["attachments"])
class Migration(migrations.Migration):
dependencies = [
("core", "0019_alter_user_language_default_to_null"),
]
operations = [
# v2.0.0 was released so we can now remove BC field "is_public"
migrations.RemoveField(
model_name="document",
name="is_public",
),
migrations.AlterModelManagers(
name="user",
managers=[
("objects", core.models.UserManager()),
],
),
migrations.AddField(
model_name="document",
name="attachments",
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(max_length=255),
blank=True,
default=list,
editable=False,
null=True,
size=None,
),
),
migrations.AddField(
model_name="document",
name="duplicated_from",
field=models.ForeignKey(
blank=True,
editable=False,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="duplicates",
to="core.document",
),
),
migrations.RunPython(
populate_attachments_on_all_documents,
reverse_code=migrations.RunPython.noop,
),
]

View File

@@ -13,6 +13,7 @@ from logging import getLogger
from django.conf import settings
from django.contrib.auth import models as auth_models
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.postgres.fields import ArrayField
from django.contrib.sites.models import Site
from django.core import mail, validators
from django.core.cache import cache
@@ -427,10 +428,12 @@ class DocumentQuerySet(MP_NodeQuerySet):
def readable_per_se(self, user):
"""
Filters the queryset to return documents that the given user has
permission to read.
Filters the queryset to return documents on which the given user has
direct access, team access or link access. This will not return all the
documents that a user can read because it can be obtained via an ancestor.
:param user: The user for whom readable documents are to be fetched.
:return: A queryset of documents readable by the user.
:return: A queryset of documents for which the user has direct access,
team access or link access.
"""
if user.is_authenticated:
return self.filter(
@@ -459,7 +462,9 @@ class DocumentManager(MP_NodeManager):
"""
Filters documents based on user permissions using the custom queryset.
:param user: The user for whom readable documents are to be fetched.
:return: A queryset of documents readable by the user.
:return: A queryset of documents for which the user has direct access,
team access or link access. This will not return all the documents
that a user can read because it can be obtained via an ancestor.
"""
return self.get_queryset().readable_per_se(user)
@@ -486,6 +491,21 @@ class Document(MP_Node, BaseModel):
)
deleted_at = models.DateTimeField(null=True, blank=True)
ancestors_deleted_at = models.DateTimeField(null=True, blank=True)
duplicated_from = models.ForeignKey(
"self",
on_delete=models.SET_NULL,
related_name="duplicates",
editable=False,
blank=True,
null=True,
)
attachments = ArrayField(
models.CharField(max_length=255),
default=list,
editable=False,
blank=True,
null=True,
)
_content = None
@@ -800,6 +820,7 @@ class Document(MP_Node, BaseModel):
"cors_proxy": can_get,
"descendants": can_get,
"destroy": is_owner,
"duplicate": can_get,
"favorite": can_get and user.is_authenticated,
"link_configuration": is_owner_or_admin,
"invite_owner": is_owner,

View File

@@ -67,10 +67,12 @@ def test_api_documents_attachment_upload_anonymous_success():
file_path = response.json()["file"]
match = pattern.search(file_path)
file_id = match.group(1)
# Validate that file_id is a valid UUID
uuid.UUID(file_id)
document.refresh_from_db()
assert document.attachments == [f"{document.id!s}/attachments/{file_id!s}.png"]
# Now, check the metadata of the uploaded file
key = file_path.replace("/media", "")
file_head = default_storage.connection.meta.client.head_object(
@@ -112,6 +114,9 @@ def test_api_documents_attachment_upload_authenticated_forbidden(reach, role):
"detail": "You do not have permission to perform this action."
}
document.refresh_from_db()
assert document.attachments == []
@pytest.mark.parametrize(
"reach, role",
@@ -122,8 +127,8 @@ def test_api_documents_attachment_upload_authenticated_forbidden(reach, role):
)
def test_api_documents_attachment_upload_authenticated_success(reach, role):
"""
Autenticated who are not related to a document should be able to upload a file
if the link reach and role permit it.
Autenticated users who are not related to a document should be able to upload
a file when the link reach and role permit it.
"""
user = factories.UserFactory()
@@ -145,6 +150,9 @@ def test_api_documents_attachment_upload_authenticated_success(reach, role):
# Validate that file_id is a valid UUID
uuid.UUID(file_id)
document.refresh_from_db()
assert document.attachments == [f"{document.id!s}/attachments/{file_id!s}.png"]
@pytest.mark.parametrize("via", VIA)
def test_api_documents_attachment_upload_reader(via, mock_user_teams):
@@ -175,6 +183,9 @@ def test_api_documents_attachment_upload_reader(via, mock_user_teams):
"detail": "You do not have permission to perform this action."
}
document.refresh_from_db()
assert document.attachments == []
@pytest.mark.parametrize("role", ["editor", "administrator", "owner"])
@pytest.mark.parametrize("via", VIA)
@@ -211,6 +222,9 @@ def test_api_documents_attachment_upload_success(via, role, mock_user_teams):
# Validate that file_id is a valid UUID
uuid.UUID(file_id)
document.refresh_from_db()
assert document.attachments == [f"{document.id!s}/attachments/{file_id!s}.png"]
# Now, check the metadata of the uploaded file
key = file_path.replace("/media", "")
file_head = default_storage.connection.meta.client.head_object(
@@ -236,6 +250,9 @@ def test_api_documents_attachment_upload_invalid(client):
assert response.status_code == 400
assert response.json() == {"file": ["No file was submitted."]}
document.refresh_from_db()
assert document.attachments == []
def test_api_documents_attachment_upload_size_limit_exceeded(settings):
"""The uploaded file should not exceeed the maximum size in settings."""
@@ -258,6 +275,9 @@ def test_api_documents_attachment_upload_size_limit_exceeded(settings):
assert response.status_code == 400
assert response.json() == {"file": ["File size exceeds the maximum limit of 1 MB."]}
document.refresh_from_db()
assert document.attachments == []
@pytest.mark.parametrize(
"name,content,extension,content_type",
@@ -293,6 +313,11 @@ def test_api_documents_attachment_upload_fix_extension(
match = pattern.search(file_path)
file_id = match.group(1)
document.refresh_from_db()
assert document.attachments == [
f"{document.id!s}/attachments/{file_id!s}.{extension:s}"
]
assert "-unsafe" in file_id
# Validate that file_id is a valid UUID
file_id = file_id.replace("-unsafe", "")
@@ -323,6 +348,9 @@ def test_api_documents_attachment_upload_empty_file():
assert response.status_code == 400
assert response.json() == {"file": ["The submitted file is empty."]}
document.refresh_from_db()
assert document.attachments == []
def test_api_documents_attachment_upload_unsafe():
"""A file with an unsafe mime type should be tagged as such."""
@@ -345,6 +373,9 @@ def test_api_documents_attachment_upload_unsafe():
match = pattern.search(file_path)
file_id = match.group(1)
document.refresh_from_db()
assert document.attachments == [f"{document.id!s}/attachments/{file_id!s}.exe"]
assert "-unsafe" in file_id
# Validate that file_id is a valid UUID
file_id = file_id.replace("-unsafe", "")

View File

@@ -0,0 +1,206 @@
"""
Test file uploads API endpoint for users in impress's core app.
"""
import base64
import uuid
from io import BytesIO
from urllib.parse import urlparse
from django.conf import settings
from django.core.files.storage import default_storage
from django.utils import timezone
import pytest
import requests
import y_py
from rest_framework.test import APIClient
from core import factories, models
pytestmark = pytest.mark.django_db
PIXEL = (
b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00"
b"\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\nIDATx\x9cc\xf8\xff\xff?\x00\x05\xfe\x02\xfe"
b"\xa7V\xbd\xfa\x00\x00\x00\x00IEND\xaeB`\x82"
)
def get_image_refs(document_id):
"""Generate an image key for testing."""
image_key = f"{document_id!s}/attachments/{uuid.uuid4()!s}.png"
default_storage.connection.meta.client.put_object(
Bucket=default_storage.bucket_name,
Key=image_key,
Body=BytesIO(PIXEL),
ContentType="image/png",
)
return image_key, f"http://localhost/media/{image_key:s}"
def test_api_documents_duplicate_forbidden():
"""A user who doesn't have read access to a document should not be allowed to duplicate it."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
document = factories.DocumentFactory(
link_reach="restricted",
users=[factories.UserFactory()],
title="my document",
)
response = client.post(f"/api/v1.0/documents/{document.id!s}/duplicate/")
assert response.status_code == 403
assert models.Document.objects.count() == 1
def test_api_documents_duplicate_anonymous():
"""Anonymous users should not be able to duplicate documents even with read access."""
document = factories.DocumentFactory(link_reach="public")
response = APIClient().post(f"/api/v1.0/documents/{document.id!s}/duplicate/")
assert response.status_code == 401
assert models.Document.objects.count() == 1
@pytest.mark.parametrize("index", range(3))
def test_api_documents_duplicate_success(index):
"""
Anonymous users should be able to retrieve attachments linked to a public document.
Accesses should not be duplicated if the user does not request it specifically.
Attachments that are not in the content should not be passed for access in the
duplicated document's "attachments" list.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
document_ids = [uuid.uuid4() for _ in range(3)]
image_refs = [get_image_refs(doc_id) for doc_id in document_ids]
# Create document content with the first image only
ydoc = y_py.YDoc() # pylint: disable=no-member
with ydoc.begin_transaction() as txn:
xml_fragment = ydoc.get_xml_element("document-store")
xml_fragment.push_xml_element(txn, "image").set_attribute(
txn, "src", image_refs[0][1]
)
update = y_py.encode_state_as_update(ydoc) # pylint: disable=no-member
base64_content = base64.b64encode(update).decode("utf-8")
# Create documents
document = factories.DocumentFactory(
id=document_ids[index],
content=base64_content,
link_reach="restricted",
users=[user, factories.UserFactory()],
title="document with an image",
attachments=[key for key, _ in image_refs],
)
factories.DocumentFactory(id=document_ids[(index + 1) % 3])
# Don't create document for third ID to check that it doesn't impact access to attachments
# Duplicate the document via the API endpoint
response = client.post(f"/api/v1.0/documents/{document.id}/duplicate/")
assert response.status_code == 201
duplicated_document = models.Document.objects.get(id=response.json()["id"])
assert duplicated_document.title == "Copy of document with an image"
assert duplicated_document.content == document.content
assert duplicated_document.creator == user
assert duplicated_document.link_reach == "restricted"
assert duplicated_document.link_role == "reader"
assert duplicated_document.duplicated_from == document
assert duplicated_document.attachments == [
image_refs[0][0]
] # Only the first image key
assert duplicated_document.get_parent() == document.get_parent()
assert duplicated_document.path == document.get_next_sibling().path
# Check that accesses were not duplicated.
# The user who did the duplicate is forced as owner
assert duplicated_document.accesses.count() == 1
access = duplicated_document.accesses.first()
assert access.user == user
assert access.role == "owner"
# Ensure access persists after the owner loses access to the original document
models.DocumentAccess.objects.filter(document=document).delete()
response = client.get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=image_refs[0][1]
)
assert response.status_code == 200
authorization = response["Authorization"]
assert "AWS4-HMAC-SHA256 Credential=" in authorization
assert (
"SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature="
in authorization
)
assert response["X-Amz-Date"] == timezone.now().strftime("%Y%m%dT%H%M%SZ")
s3_url = urlparse(settings.AWS_S3_ENDPOINT_URL)
response = requests.get(
f"{settings.AWS_S3_ENDPOINT_URL:s}/impress-media-storage/{image_refs[0][0]:s}",
headers={
"authorization": authorization,
"x-amz-date": response["x-amz-date"],
"x-amz-content-sha256": response["x-amz-content-sha256"],
"Host": f"{s3_url.hostname:s}:{s3_url.port:d}",
},
timeout=1,
)
assert response.content == PIXEL
# Ensure the other images are not accessible
for _, url in image_refs[1:]:
response = client.get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=url
)
assert response.status_code == 403
def test_api_documents_duplicate_with_accesses():
"""Accesses should be duplicated if the user requests it specifically."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
document = factories.DocumentFactory(
users=[user],
title="document with accesses",
)
user_access = factories.UserDocumentAccessFactory(document=document)
team_access = factories.TeamDocumentAccessFactory(document=document)
# Duplicate the document via the API endpoint requesting to duplicate accesses
response = client.post(
f"/api/v1.0/documents/{document.id!s}/duplicate/",
{"with_accesses": True},
format="json",
)
assert response.status_code == 201
duplicated_document = models.Document.objects.get(id=response.json()["id"])
assert duplicated_document.title == "Copy of document with accesses"
assert duplicated_document.content == document.content
assert duplicated_document.link_reach == document.link_reach
assert duplicated_document.link_role == document.link_role
assert duplicated_document.creator == user
assert duplicated_document.duplicated_from == document
assert duplicated_document.attachments == []
# Check that accesses were duplicated and the user who did the duplicate is forced as owner
duplicated_accesses = duplicated_document.accesses
assert duplicated_accesses.count() == 3
assert duplicated_accesses.get(user=user).role == "owner"
assert duplicated_accesses.get(user=user_access.user).role == user_access.role
assert duplicated_accesses.get(team=team_access.team).role == team_access.role

View File

@@ -37,11 +37,9 @@ def test_api_documents_media_auth_unkown_document():
def test_api_documents_media_auth_anonymous_public():
"""Anonymous users should be able to retrieve attachments linked to a public document"""
document = factories.DocumentFactory(link_reach="public")
document_id = uuid4()
filename = f"{uuid4()!s}.jpg"
key = f"{document.pk!s}/attachments/{filename:s}"
key = f"{document_id!s}/attachments/{filename:s}"
default_storage.connection.meta.client.put_object(
Bucket=default_storage.bucket_name,
Key=key,
@@ -49,6 +47,8 @@ def test_api_documents_media_auth_anonymous_public():
ContentType="text/plain",
)
factories.DocumentFactory(id=document_id, link_reach="public", attachments=[key])
original_url = f"http://localhost/media/{key:s}"
response = APIClient().get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=original_url
@@ -81,8 +81,6 @@ def test_api_documents_media_auth_anonymous_public():
def test_api_documents_media_auth_extensions():
"""Files with extensions of any format should work."""
document = factories.DocumentFactory(link_reach="public")
extensions = [
"c",
"go",
@@ -91,10 +89,15 @@ def test_api_documents_media_auth_extensions():
"woff2",
"appimage",
]
document_id = uuid4()
keys = []
for ext in extensions:
filename = f"{uuid.uuid4()!s}.{ext:s}"
key = f"{document.pk!s}/attachments/{filename:s}"
filename = f"{uuid4()!s}.{ext:s}"
keys.append(f"{document_id!s}/attachments/{filename:s}")
factories.DocumentFactory(link_reach="public", attachments=keys)
for key in keys:
original_url = f"http://localhost/media/{key:s}"
response = APIClient().get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=original_url
@@ -109,10 +112,11 @@ def test_api_documents_media_auth_anonymous_authenticated_or_restricted(reach):
Anonymous users should not be allowed to retrieve attachments linked to a document
with link reach set to authenticated or restricted.
"""
document = factories.DocumentFactory(link_reach=reach)
document_id = uuid4()
filename = f"{uuid4()!s}.jpg"
media_url = f"http://localhost/media/{document.pk!s}/attachments/{filename:s}"
media_url = f"http://localhost/media/{document_id!s}/attachments/{filename:s}"
factories.DocumentFactory(id=document_id, link_reach=reach)
response = APIClient().get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=media_url
@@ -122,20 +126,16 @@ def test_api_documents_media_auth_anonymous_authenticated_or_restricted(reach):
assert "Authorization" not in response
@pytest.mark.parametrize("reach", ["public", "authenticated"])
def test_api_documents_media_auth_authenticated_public_or_authenticated(reach):
def test_api_documents_media_auth_anonymous_attachments():
"""
Authenticated users who are not related to a document should be able to retrieve
attachments related to a document with public or authenticated link reach.
Declaring a media key as original attachment on a document to which
a user has access should give them access to the attachment file
regardless of their access rights on the original document.
"""
document = factories.DocumentFactory(link_reach=reach)
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
document_id = uuid4()
filename = f"{uuid4()!s}.jpg"
key = f"{document.pk!s}/attachments/{filename:s}"
key = f"{document_id!s}/attachments/{filename:s}"
media_url = f"http://localhost/media/{key:s}"
default_storage.connection.meta.client.put_object(
Bucket=default_storage.bucket_name,
@@ -144,9 +144,73 @@ def test_api_documents_media_auth_authenticated_public_or_authenticated(reach):
ContentType="text/plain",
)
original_url = f"http://localhost/media/{key:s}"
factories.DocumentFactory(id=document_id, link_reach="restricted")
response = APIClient().get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=media_url
)
assert response.status_code == 403
# Let's now add a document to which the anonymous user has access and
# pointing to the attachment
parent = factories.DocumentFactory(link_reach="public")
factories.DocumentFactory(parent=parent, link_reach="restricted", attachments=[key])
response = APIClient().get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=media_url
)
assert response.status_code == 200
authorization = response["Authorization"]
assert "AWS4-HMAC-SHA256 Credential=" in authorization
assert (
"SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature="
in authorization
)
assert response["X-Amz-Date"] == timezone.now().strftime("%Y%m%dT%H%M%SZ")
s3_url = urlparse(settings.AWS_S3_ENDPOINT_URL)
file_url = f"{settings.AWS_S3_ENDPOINT_URL:s}/impress-media-storage/{key:s}"
response = requests.get(
file_url,
headers={
"authorization": authorization,
"x-amz-date": response["x-amz-date"],
"x-amz-content-sha256": response["x-amz-content-sha256"],
"Host": f"{s3_url.hostname:s}:{s3_url.port:d}",
},
timeout=1,
)
assert response.content.decode("utf-8") == "my prose"
@pytest.mark.parametrize("reach", ["public", "authenticated"])
def test_api_documents_media_auth_authenticated_public_or_authenticated(reach):
"""
Authenticated users who are not related to a document should be able to retrieve
attachments related to a document with public or authenticated link reach.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
document_id = uuid4()
filename = f"{uuid4()!s}.jpg"
key = f"{document_id!s}/attachments/{filename:s}"
media_url = f"http://localhost/media/{key:s}"
default_storage.connection.meta.client.put_object(
Bucket=default_storage.bucket_name,
Key=key,
Body=BytesIO(b"my prose"),
ContentType="text/plain",
)
factories.DocumentFactory(id=document_id, link_reach=reach, attachments=[key])
response = client.get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=original_url
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=media_url
)
assert response.status_code == 200
@@ -179,14 +243,18 @@ def test_api_documents_media_auth_authenticated_restricted():
Authenticated users who are not related to a document should not be allowed to
retrieve attachments linked to a document that is restricted.
"""
document = factories.DocumentFactory(link_reach="restricted")
user = factories.UserFactory(with_owned_document=True)
client = APIClient()
client.force_login(user)
document_id = uuid4()
filename = f"{uuid4()!s}.jpg"
media_url = f"http://localhost/media/{document.pk!s}/attachments/{filename:s}"
key = f"{document_id!s}/attachments/{filename:s}"
media_url = f"http://localhost/media/{key:s}"
factories.DocumentFactory(
id=document_id, link_reach="restricted", attachments=[key]
)
response = client.get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=media_url
@@ -206,16 +274,10 @@ def test_api_documents_media_auth_related(via, mock_user_teams):
client = APIClient()
client.force_login(user)
document = factories.DocumentFactory()
if via == USER:
factories.UserDocumentAccessFactory(document=document, user=user)
elif via == TEAM:
mock_user_teams.return_value = ["lasuite", "unknown"]
factories.TeamDocumentAccessFactory(document=document, team="lasuite")
document_id = uuid4()
filename = f"{uuid4()!s}.jpg"
key = f"{document.pk!s}/attachments/{filename:s}"
key = f"{document_id!s}/attachments/{filename:s}"
media_url = f"http://localhost/media/{key:s}"
default_storage.connection.meta.client.put_object(
Bucket=default_storage.bucket_name,
Key=key,
@@ -223,9 +285,17 @@ def test_api_documents_media_auth_related(via, mock_user_teams):
ContentType="text/plain",
)
original_url = f"http://localhost/media/{key:s}"
document = factories.DocumentFactory(
id=document_id, link_reach="restricted", attachments=[key]
)
if via == USER:
factories.UserDocumentAccessFactory(document=document, user=user)
elif via == TEAM:
mock_user_teams.return_value = ["lasuite", "unknown"]
factories.TeamDocumentAccessFactory(document=document, team="lasuite")
response = client.get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=original_url
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=media_url
)
assert response.status_code == 200

View File

@@ -37,6 +37,7 @@ def test_api_documents_retrieve_anonymous_public_standalone():
"cors_proxy": True,
"descendants": True,
"destroy": False,
"duplicate": True,
# Anonymous user can't favorite a document even with read access
"favorite": False,
"invite_owner": False,
@@ -103,6 +104,7 @@ def test_api_documents_retrieve_anonymous_public_parent():
"descendants": True,
"cors_proxy": True,
"destroy": False,
"duplicate": True,
# Anonymous user can't favorite a document even with read access
"favorite": False,
"invite_owner": False,
@@ -198,6 +200,7 @@ def test_api_documents_retrieve_authenticated_unrelated_public_or_authenticated(
"descendants": True,
"cors_proxy": True,
"destroy": False,
"duplicate": True,
"favorite": True,
"invite_owner": False,
"link_configuration": False,
@@ -271,6 +274,7 @@ def test_api_documents_retrieve_authenticated_public_or_authenticated_parent(rea
"descendants": True,
"cors_proxy": True,
"destroy": False,
"duplicate": True,
"favorite": True,
"invite_owner": False,
"link_configuration": False,
@@ -450,6 +454,7 @@ def test_api_documents_retrieve_authenticated_related_parent():
"descendants": True,
"cors_proxy": True,
"destroy": access.role == "owner",
"duplicate": True,
"favorite": True,
"invite_owner": access.role == "owner",
"link_configuration": access.role in ["administrator", "owner"],

View File

@@ -81,6 +81,7 @@ def test_api_documents_trashbin_format():
"descendants": True,
"cors_proxy": True,
"destroy": True,
"duplicate": True,
"favorite": True,
"invite_owner": True,
"link_configuration": True,

View File

@@ -0,0 +1,153 @@
"""
Test extract-attachments on document update in docs core app.
"""
import base64
from uuid import uuid4
import pytest
import y_py
from rest_framework.test import APIClient
from core import factories
pytestmark = pytest.mark.django_db
def get_ydoc_with_mages(image_keys):
"""Return a ydoc from text for testing purposes."""
ydoc = y_py.YDoc() # pylint: disable=no-member
with ydoc.begin_transaction() as txn:
xml_fragment = ydoc.get_xml_element("document-store")
for key in image_keys:
xml_image = xml_fragment.push_xml_element(txn, "image")
xml_image.set_attribute(txn, "src", f"http://localhost/media/{key:s}")
update = y_py.encode_state_as_update(ydoc) # pylint: disable=no-member
return base64.b64encode(update).decode("utf-8")
def test_api_documents_update_new_attachment_keys_anonymous(django_assert_num_queries):
"""
When an anonymous user updates a document, the attachment keys extracted from the
updated content should be added to the list of "attachments" ot the document if these
attachments are already readable by anonymous users.
"""
image_keys = [f"{uuid4()!s}/attachments/{uuid4()!s}.png" for _ in range(4)]
document = factories.DocumentFactory(
content=get_ydoc_with_mages(image_keys[:1]),
attachments=[image_keys[0]],
link_reach="public",
link_role="editor",
)
factories.DocumentFactory(attachments=[image_keys[1]], link_reach="public")
factories.DocumentFactory(attachments=[image_keys[2]], link_reach="authenticated")
factories.DocumentFactory(attachments=[image_keys[3]], link_reach="restricted")
expected_keys = {image_keys[i] for i in [0, 1]}
with django_assert_num_queries(9):
response = APIClient().put(
f"/api/v1.0/documents/{document.id!s}/",
{"content": get_ydoc_with_mages(image_keys)},
format="json",
)
assert response.status_code == 200
document.refresh_from_db()
assert set(document.attachments) == expected_keys
# Check that the db query to check attachments readability for extracted
# keys is not done if the content changes but no new keys are found
with django_assert_num_queries(5):
response = APIClient().put(
f"/api/v1.0/documents/{document.id!s}/",
{"content": get_ydoc_with_mages(image_keys[:2])},
format="json",
)
assert response.status_code == 200
document.refresh_from_db()
assert len(document.attachments) == 2
assert set(document.attachments) == expected_keys
def test_api_documents_update_new_attachment_keys_authenticated(
django_assert_num_queries,
):
"""
When an authenticated user updates a document, the attachment keys extracted from the
updated content should be added to the list of "attachments" ot the document if these
attachments are already readable by the editing user.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
image_keys = [f"{uuid4()!s}/attachments/{uuid4()!s}.png" for _ in range(5)]
document = factories.DocumentFactory(
content=get_ydoc_with_mages(image_keys[:1]),
attachments=[image_keys[0]],
users=[(user, "editor")],
)
factories.DocumentFactory(attachments=[image_keys[1]], link_reach="public")
factories.DocumentFactory(attachments=[image_keys[2]], link_reach="authenticated")
factories.DocumentFactory(attachments=[image_keys[3]], link_reach="restricted")
factories.DocumentFactory(attachments=[image_keys[4]], users=[user])
expected_keys = {image_keys[i] for i in [0, 1, 2, 4]}
with django_assert_num_queries(10):
response = client.put(
f"/api/v1.0/documents/{document.id!s}/",
{"content": get_ydoc_with_mages(image_keys)},
format="json",
)
assert response.status_code == 200
document.refresh_from_db()
assert set(document.attachments) == expected_keys
# Check that the db query to check attachments readability for extracted
# keys is not done if the content changes but no new keys are found
with django_assert_num_queries(6):
response = client.put(
f"/api/v1.0/documents/{document.id!s}/",
{"content": get_ydoc_with_mages(image_keys[:2])},
format="json",
)
assert response.status_code == 200
document.refresh_from_db()
assert len(document.attachments) == 4
assert set(document.attachments) == expected_keys
def test_api_documents_update_new_attachment_keys_duplicate():
"""
Duplicate keys in the content should not result in duplicates in the document's attachments.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
image_key1 = f"{uuid4()!s}/attachments/{uuid4()!s}.png"
image_key2 = f"{uuid4()!s}/attachments/{uuid4()!s}.png"
document = factories.DocumentFactory(
content=get_ydoc_with_mages([image_key1]),
attachments=[image_key1],
users=[(user, "editor")],
)
factories.DocumentFactory(attachments=[image_key2], users=[user])
response = client.put(
f"/api/v1.0/documents/{document.id!s}/",
{"content": get_ydoc_with_mages([image_key1, image_key2, image_key2])},
format="json",
)
assert response.status_code == 200
document.refresh_from_db()
assert len(document.attachments) == 2
assert set(document.attachments) == {image_key1, image_key2}

View File

@@ -0,0 +1,55 @@
import base64
import uuid
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
import pytest
import y_py
from core import models
@pytest.mark.django_db
def test_populate_attachments_on_all_documents(migrator):
"""Test that the migration populates attachments on existing documents."""
old_state = migrator.apply_initial_migration(
("core", "0019_alter_user_language_default_to_null")
)
OldDocument = old_state.apps.get_model("core", "Document")
old_doc_without_attachments = OldDocument.objects.create(
title="Doc without attachments", depth=1, path="0000002"
)
old_doc_with_attachments = OldDocument.objects.create(
title="Doc with attachments", depth=1, path="0000001"
)
# Create document content with an image
file_key = f"{old_doc_with_attachments.id!s}/file"
ydoc = y_py.YDoc() # pylint: disable=no-member
image_key = f"{old_doc_with_attachments.id!s}/attachments/{uuid.uuid4()!s}.png"
with ydoc.begin_transaction() as txn:
xml_fragment = ydoc.get_xml_element("document-store")
xml_fragment.push_xml_element(txn, "image").set_attribute(
txn, "src", f"http://localhost/media/{image_key:s}"
)
update = y_py.encode_state_as_update(ydoc) # pylint: disable=no-member
base64_content = base64.b64encode(update).decode("utf-8")
bytes_content = base64_content.encode("utf-8")
content_file = ContentFile(bytes_content)
default_storage.save(file_key, content_file)
# Apply the migration
new_state = migrator.apply_tested_migration(
("core", "0020_remove_is_public_add_field_attachments_and_duplicated_from")
)
NewDocument = new_state.apps.get_model("core", "Document")
new_doc_with_attachments = NewDocument.objects.get(pk=old_doc_with_attachments.pk)
new_doc_without_attachments = NewDocument.objects.get(
pk=old_doc_without_attachments.pk
)
assert new_doc_without_attachments.attachments == []
assert new_doc_with_attachments.attachments == [image_key]

View File

@@ -161,6 +161,7 @@ def test_models_documents_get_abilities_forbidden(
"descendants": False,
"cors_proxy": False,
"destroy": False,
"duplicate": False,
"favorite": False,
"invite_owner": False,
"media_auth": False,
@@ -220,6 +221,7 @@ def test_models_documents_get_abilities_reader(
"descendants": True,
"cors_proxy": True,
"destroy": False,
"duplicate": True,
"favorite": is_authenticated,
"invite_owner": False,
"link_configuration": False,
@@ -281,6 +283,7 @@ def test_models_documents_get_abilities_editor(
"descendants": True,
"cors_proxy": True,
"destroy": False,
"duplicate": True,
"favorite": is_authenticated,
"invite_owner": False,
"link_configuration": False,
@@ -331,6 +334,7 @@ def test_models_documents_get_abilities_owner(django_assert_num_queries):
"descendants": True,
"cors_proxy": True,
"destroy": True,
"duplicate": True,
"favorite": True,
"invite_owner": True,
"link_configuration": True,
@@ -378,6 +382,7 @@ def test_models_documents_get_abilities_administrator(django_assert_num_queries)
"descendants": True,
"cors_proxy": True,
"destroy": False,
"duplicate": True,
"favorite": True,
"invite_owner": False,
"link_configuration": True,
@@ -428,6 +433,7 @@ def test_models_documents_get_abilities_editor_user(django_assert_num_queries):
"descendants": True,
"cors_proxy": True,
"destroy": False,
"duplicate": True,
"favorite": True,
"invite_owner": False,
"link_configuration": False,
@@ -485,6 +491,7 @@ def test_models_documents_get_abilities_reader_user(
"descendants": True,
"cors_proxy": True,
"destroy": False,
"duplicate": True,
"favorite": True,
"invite_owner": False,
"link_configuration": False,
@@ -540,6 +547,7 @@ def test_models_documents_get_abilities_preset_role(django_assert_num_queries):
"descendants": True,
"cors_proxy": True,
"destroy": False,
"duplicate": True,
"favorite": True,
"invite_owner": False,
"link_configuration": False,

View File

@@ -1,9 +1,15 @@
"""Test util base64_yjs_to_text."""
import base64
import uuid
import y_py
from core import utils
from core.utils import base64_yjs_to_text
def test_base64_yjs_to_text():
def test_utils_base64_yjs_to_text():
"""
Test extract_text_from_saved_yjs_document
This base64 string is an example of what is saved in the database.
@@ -27,3 +33,38 @@ def test_base64_yjs_to_text():
)
assert base64_yjs_to_text(base64_string) == "Hello world"
def test_utils_extract_attachments():
"""
All attachment keys in the document content should be extracted.
"""
document_id = uuid.uuid4()
image_key1 = f"{document_id!s}/attachments/{uuid.uuid4()!s}.png"
image_url1 = f"http://localhost/media/{image_key1:s}"
image_key2 = f"{uuid.uuid4()!s}/attachments/{uuid.uuid4()!s}.png"
image_url2 = f"http://localhost/{image_key2:s}"
image_key3 = f"{uuid.uuid4()!s}/attachments/{uuid.uuid4()!s}.png"
image_url3 = f"http://localhost/media/{image_key3:s}"
ydoc = y_py.YDoc() # pylint: disable=no-member
with ydoc.begin_transaction() as txn:
xml_fragment = ydoc.get_xml_element("document-store")
xml_image = xml_fragment.push_xml_element(txn, "image")
xml_image.set_attribute(txn, "src", image_url1)
xml_image = xml_fragment.push_xml_element(txn, "image")
xml_image.set_attribute(txn, "src", image_url2)
xml_paragraph = xml_fragment.push_xml_element(txn, "paragraph")
xml_text = xml_paragraph.push_xml_text(txn)
xml_text.push(txn, image_url3)
update = y_py.encode_state_as_update(ydoc) # pylint: disable=no-member
base64_string = base64.b64encode(update).decode("utf-8")
# image_url3 is missing the "/media/" part and shouldn't get extracted
assert utils.extract_attachments(base64_string) == [image_key1, image_key3]

View File

@@ -0,0 +1,163 @@
"""
Unit tests for the filter_root_paths utility function.
"""
from core.utils import filter_descendants
def test_utils_filter_descendants_success():
"""
The `filter_descendants` function should correctly identify descendant paths
from a given list of paths and root paths.
This test verifies that the function returns only the paths that have a prefix
matching one of the root paths.
"""
paths = [
"0001",
"00010001",
"000100010001",
"000100010002",
"000100020001",
"000100020002",
"0002",
"00020001",
"00020002",
"00030001",
"000300010001",
"00030002",
"0004",
"000400010003",
"0004000100030001",
"000400010004",
]
root_paths = [
"0001",
"0002",
"000400010003",
]
filtered_paths = filter_descendants(paths, root_paths, skip_sorting=True)
assert filtered_paths == [
"0001",
"00010001",
"000100010001",
"000100010002",
"000100020001",
"000100020002",
"0002",
"00020001",
"00020002",
"000400010003",
"0004000100030001",
]
def test_utils_filter_descendants_sorting():
"""
The `filter_descendants` function should handle unsorted input when sorting is enabled.
This test verifies that the function sorts the input if sorting is not skipped
and still correctly identifies accessible descendant paths.
"""
paths = [
"000300010001",
"000100010002",
"0001",
"00010001",
"000100010001",
"000100020002",
"000100020001",
"0002",
"00020001",
"00020002",
"00030001",
"00030002",
"0004000100030001",
"0004",
"000400010003",
"000400010004",
]
root_paths = [
"0002",
"000400010003",
"0001",
]
filtered_paths = filter_descendants(paths, root_paths)
assert filtered_paths == [
"0001",
"00010001",
"000100010001",
"000100010002",
"000100020001",
"000100020002",
"0002",
"00020001",
"00020002",
"000400010003",
"0004000100030001",
]
filtered_paths = filter_descendants(paths, root_paths, skip_sorting=True)
assert filtered_paths == [
"0001",
"00010001",
"000100010001",
"000100010002",
"000100020001",
"000100020002",
"0002",
"00020001",
"00020002",
"000400010003",
"0004000100030001",
]
def test_utils_filter_descendants_empty():
"""
The function should return an empty list if one or both inputs are empty.
"""
assert not filter_descendants([], ["0001"])
assert not filter_descendants(["0001"], [])
assert not filter_descendants([], [])
def test_utils_filter_descendants_no_match():
"""
The function should return an empty list if no path starts with any root path.
"""
paths = ["0001", "0002", "0003"]
root_paths = ["0004", "0005"]
assert not filter_descendants(paths, root_paths, skip_sorting=True)
def test_utils_filter_descendants_exact_match():
"""
The function should include paths that exactly match a root path.
"""
paths = ["0001", "0002", "0003"]
root_paths = ["0001", "0002"]
assert filter_descendants(paths, root_paths, skip_sorting=True) == ["0001", "0002"]
def test_utils_filter_descendants_single_root_matches_all():
"""
A single root path should match all its descendants.
"""
paths = ["0001", "00010001", "000100010001", "00010002"]
root_paths = ["0001"]
assert filter_descendants(paths, root_paths) == [
"0001",
"00010001",
"000100010001",
"00010002",
]
def test_utils_filter_descendants_path_shorter_than_root():
"""
A path shorter than any root path should not match.
"""
paths = ["0001", "0002"]
root_paths = ["00010001"]
assert not filter_descendants(paths, root_paths)

View File

@@ -1,10 +1,52 @@
"""Utils for the core app."""
import base64
import re
import y_py as Y
from bs4 import BeautifulSoup
from core import enums
def filter_descendants(paths, root_paths, skip_sorting=False):
"""
Filters paths to keep only those that are descendants of any path in root_paths.
A path is considered a descendant of a root path if it starts with the root path.
If `skip_sorting` is not set to True, the function will sort both lists before
processing because both `paths` and `root_paths` need to be in lexicographic order
before going through the algorithm.
Args:
paths (iterable of str): List of paths to be filtered.
root_paths (iterable of str): List of paths to check as potential prefixes.
skip_sorting (bool): If True, assumes both `paths` and `root_paths` are already sorted.
Returns:
list of str: A list of sorted paths that are descendants of any path in `root_paths`.
"""
results = []
i = 0
n = len(root_paths)
if not skip_sorting:
paths.sort()
root_paths.sort()
for path in paths:
# Try to find a matching prefix in the sorted accessible paths
while i < n:
if path.startswith(root_paths[i]):
results.append(path)
break
if root_paths[i] < path:
i += 1
else:
# If paths[i] > path, no need to keep searching
break
return results
def base64_yjs_to_xml(base64_string):
"""Extract xml from base64 yjs document."""
@@ -23,3 +65,9 @@ def base64_yjs_to_text(base64_string):
blocknote_structure = base64_yjs_to_xml(base64_string)
soup = BeautifulSoup(blocknote_structure, "html.parser")
return soup.get_text(separator=" ").strip()
def extract_attachments(content):
"""Helper method to extract media paths from a document's content."""
xml_content = base64_yjs_to_xml(content)
return re.findall(enums.MEDIA_STORAGE_URL_EXTRACT, xml_content)

View File

@@ -57,13 +57,8 @@ dependencies = [
"requests==2.32.3",
"sentry-sdk==2.24.0",
"url-normalize==1.4.3",
<<<<<<< HEAD
"whitenoise==6.9.0",
"mozilla-django-oidc==4.0.1",
=======
"whitenoise==6.8.2",
"y-py==0.6.2",
>>>>>>> f087cd70 ((backend) add util to extract text from Ydoc content)
]
[project.urls]