(backend) manage uploaded file status and call to malware detection

In the attachment_upload method, the status in the file metadata to
processing and the malware_detection backend is called. We check in the
media_auth if the status is ready in order to accept the request.
This commit is contained in:
Manuel Raynaud
2025-05-05 16:01:12 +02:00
parent a070e1dd87
commit 25abd964de
6 changed files with 144 additions and 20 deletions

View File

@@ -12,6 +12,7 @@ and this project adheres to
- ✨ Add a custom callout block to the editor #892
- 🚩(frontend) version MIT only #911
- ✨(backend) integrate maleware_detection from django-lasuite #936
## Changed

View File

@@ -24,6 +24,7 @@ from django.views.decorators.cache import cache_page
import requests
import rest_framework as drf
from botocore.exceptions import ClientError
from lasuite.malware_detection import malware_detection
from rest_framework import filters, status, viewsets
from rest_framework import response as drf_response
from rest_framework.permissions import AllowAny
@@ -1156,7 +1157,10 @@ class DocumentViewSet(
# Prepare metadata for storage
extra_args = {
"Metadata": {"owner": str(request.user.id)},
"Metadata": {
"owner": str(request.user.id),
"status": enums.DocumentAttachmentStatus.PROCESSING,
},
"ContentType": serializer.validated_data["content_type"],
}
file_unsafe = ""
@@ -1188,6 +1192,8 @@ class DocumentViewSet(
document.attachments.append(key)
document.save()
malware_detection.analyse_file(key, document_id=document.id)
return drf.response.Response(
{"file": f"{settings.MEDIA_URL:s}{key:s}"},
status=drf.status.HTTP_201_CREATED,
@@ -1271,6 +1277,19 @@ class DocumentViewSet(
logger.debug("User '%s' lacks permission for attachment", user)
raise drf.exceptions.PermissionDenied()
# Check if the attachment is ready
s3_client = default_storage.connection.meta.client
bucket_name = default_storage.bucket_name
head_resp = s3_client.head_object(Bucket=bucket_name, Key=key)
metadata = head_resp.get("Metadata", {})
# In order to be compatible with existing upload without `status` metadata,
# we consider them as ready.
if (
metadata.get("status", enums.DocumentAttachmentStatus.READY)
!= enums.DocumentAttachmentStatus.READY
):
raise drf.exceptions.PermissionDenied()
# Generate S3 authorization headers using the extracted URL parameters
request = utils.generate_s3_authorization_headers(key)

View File

@@ -10,6 +10,7 @@ from core.enums import DocumentAttachmentStatus
from core.models import Document
logger = logging.getLogger(__name__)
security_logger = logging.getLogger("docs.security")
def malware_detection_callback(file_path, status, error_info, **kwargs):
@@ -35,7 +36,7 @@ def malware_detection_callback(file_path, status, error_info, **kwargs):
return
document_id = kwargs.get("document_id")
logger.error(
security_logger.warning(
"File %s for document %s is infected with malware. Error info: %s",
file_path,
document_id,

View File

@@ -4,6 +4,7 @@ Test file uploads API endpoint for users in impress's core app.
import re
import uuid
from unittest import mock
from django.core.files.storage import default_storage
from django.core.files.uploadedfile import SimpleUploadedFile
@@ -12,6 +13,7 @@ import pytest
from rest_framework.test import APIClient
from core import factories
from core.api.viewsets import malware_detection
from core.tests.conftest import TEAM, USER, VIA
pytestmark = pytest.mark.django_db
@@ -59,7 +61,8 @@ def test_api_documents_attachment_upload_anonymous_success():
file = SimpleUploadedFile(name="test.png", content=PIXEL, content_type="image/png")
url = f"/api/v1.0/documents/{document.id!s}/attachment-upload/"
response = APIClient().post(url, {"file": file}, format="multipart")
with mock.patch.object(malware_detection, "analyse_file") as mock_analyse_file:
response = APIClient().post(url, {"file": file}, format="multipart")
assert response.status_code == 201
@@ -74,12 +77,13 @@ def test_api_documents_attachment_upload_anonymous_success():
assert document.attachments == [f"{document.id!s}/attachments/{file_id!s}.png"]
# Now, check the metadata of the uploaded file
key = file_path.replace("/media", "")
key = file_path.replace("/media/", "")
mock_analyse_file.assert_called_once_with(key, document_id=document.id)
file_head = default_storage.connection.meta.client.head_object(
Bucket=default_storage.bucket_name, Key=key
)
assert file_head["Metadata"] == {"owner": "None"}
assert file_head["Metadata"] == {"owner": "None", "status": "processing"}
assert file_head["ContentType"] == "image/png"
assert file_head["ContentDisposition"] == 'inline; filename="test.png"'
@@ -139,7 +143,8 @@ def test_api_documents_attachment_upload_authenticated_success(reach, role):
file = SimpleUploadedFile(name="test.png", content=PIXEL, content_type="image/png")
url = f"/api/v1.0/documents/{document.id!s}/attachment-upload/"
response = client.post(url, {"file": file}, format="multipart")
with mock.patch.object(malware_detection, "analyse_file") as mock_analyse_file:
response = client.post(url, {"file": file}, format="multipart")
assert response.status_code == 201
@@ -147,6 +152,10 @@ def test_api_documents_attachment_upload_authenticated_success(reach, role):
match = pattern.search(response.json()["file"])
file_id = match.group(1)
mock_analyse_file.assert_called_once_with(
f"{document.id!s}/attachments/{file_id!s}.png", document_id=document.id
)
# Validate that file_id is a valid UUID
uuid.UUID(file_id)
@@ -210,7 +219,8 @@ def test_api_documents_attachment_upload_success(via, role, mock_user_teams):
file = SimpleUploadedFile(name="test.png", content=PIXEL, content_type="image/png")
url = f"/api/v1.0/documents/{document.id!s}/attachment-upload/"
response = client.post(url, {"file": file}, format="multipart")
with mock.patch.object(malware_detection, "analyse_file") as mock_analyse_file:
response = client.post(url, {"file": file}, format="multipart")
assert response.status_code == 201
@@ -226,11 +236,12 @@ def test_api_documents_attachment_upload_success(via, role, mock_user_teams):
assert document.attachments == [f"{document.id!s}/attachments/{file_id!s}.png"]
# Now, check the metadata of the uploaded file
key = file_path.replace("/media", "")
key = file_path.replace("/media/", "")
mock_analyse_file.assert_called_once_with(key, document_id=document.id)
file_head = default_storage.connection.meta.client.head_object(
Bucket=default_storage.bucket_name, Key=key
)
assert file_head["Metadata"] == {"owner": str(user.id)}
assert file_head["Metadata"] == {"owner": str(user.id), "status": "processing"}
assert file_head["ContentType"] == "image/png"
assert file_head["ContentDisposition"] == 'inline; filename="test.png"'
@@ -304,7 +315,8 @@ def test_api_documents_attachment_upload_fix_extension(
url = f"/api/v1.0/documents/{document.id!s}/attachment-upload/"
file = SimpleUploadedFile(name=name, content=content)
response = client.post(url, {"file": file}, format="multipart")
with mock.patch.object(malware_detection, "analyse_file") as mock_analyse_file:
response = client.post(url, {"file": file}, format="multipart")
assert response.status_code == 201
@@ -324,11 +336,16 @@ def test_api_documents_attachment_upload_fix_extension(
uuid.UUID(file_id)
# Now, check the metadata of the uploaded file
key = file_path.replace("/media", "")
key = file_path.replace("/media/", "")
mock_analyse_file.assert_called_once_with(key, document_id=document.id)
file_head = default_storage.connection.meta.client.head_object(
Bucket=default_storage.bucket_name, Key=key
)
assert file_head["Metadata"] == {"owner": str(user.id), "is_unsafe": "true"}
assert file_head["Metadata"] == {
"owner": str(user.id),
"is_unsafe": "true",
"status": "processing",
}
assert file_head["ContentType"] == content_type
assert file_head["ContentDisposition"] == f'attachment; filename="{name:s}"'
@@ -364,7 +381,8 @@ def test_api_documents_attachment_upload_unsafe():
file = SimpleUploadedFile(
name="script.exe", content=b"\x4d\x5a\x90\x00\x03\x00\x00\x00"
)
response = client.post(url, {"file": file}, format="multipart")
with mock.patch.object(malware_detection, "analyse_file") as mock_analyse_file:
response = client.post(url, {"file": file}, format="multipart")
assert response.status_code == 201
@@ -381,11 +399,16 @@ def test_api_documents_attachment_upload_unsafe():
file_id = file_id.replace("-unsafe", "")
uuid.UUID(file_id)
key = file_path.replace("/media/", "")
mock_analyse_file.assert_called_once_with(key, document_id=document.id)
# Now, check the metadata of the uploaded file
key = file_path.replace("/media", "")
file_head = default_storage.connection.meta.client.head_object(
Bucket=default_storage.bucket_name, Key=key
)
assert file_head["Metadata"] == {"owner": str(user.id), "is_unsafe": "true"}
assert file_head["Metadata"] == {
"owner": str(user.id),
"is_unsafe": "true",
"status": "processing",
}
assert file_head["ContentType"] == "application/octet-stream"
assert file_head["ContentDisposition"] == 'attachment; filename="script.exe"'

View File

@@ -15,6 +15,7 @@ import requests
from rest_framework.test import APIClient
from core import factories, models
from core.enums import DocumentAttachmentStatus
from core.tests.conftest import TEAM, USER, VIA
pytestmark = pytest.mark.django_db
@@ -45,6 +46,7 @@ def test_api_documents_media_auth_anonymous_public():
Key=key,
Body=BytesIO(b"my prose"),
ContentType="text/plain",
Metadata={"status": DocumentAttachmentStatus.READY},
)
factories.DocumentFactory(id=document_id, link_reach="public", attachments=[key])
@@ -93,7 +95,15 @@ def test_api_documents_media_auth_extensions():
keys = []
for ext in extensions:
filename = f"{uuid4()!s}.{ext:s}"
keys.append(f"{document_id!s}/attachments/{filename:s}")
key = f"{document_id!s}/attachments/{filename:s}"
default_storage.connection.meta.client.put_object(
Bucket=default_storage.bucket_name,
Key=key,
Body=BytesIO(b"my prose"),
ContentType="text/plain",
Metadata={"status": DocumentAttachmentStatus.READY},
)
keys.append(key)
factories.DocumentFactory(link_reach="public", attachments=keys)
@@ -142,6 +152,7 @@ def test_api_documents_media_auth_anonymous_attachments():
Key=key,
Body=BytesIO(b"my prose"),
ContentType="text/plain",
Metadata={"status": DocumentAttachmentStatus.READY},
)
factories.DocumentFactory(id=document_id, link_reach="restricted")
@@ -205,6 +216,7 @@ def test_api_documents_media_auth_authenticated_public_or_authenticated(reach):
Key=key,
Body=BytesIO(b"my prose"),
ContentType="text/plain",
Metadata={"status": DocumentAttachmentStatus.READY},
)
factories.DocumentFactory(id=document_id, link_reach=reach, attachments=[key])
@@ -283,6 +295,7 @@ def test_api_documents_media_auth_related(via, mock_user_teams):
Key=key,
Body=BytesIO(b"my prose"),
ContentType="text/plain",
Metadata={"status": DocumentAttachmentStatus.READY},
)
document = factories.DocumentFactory(
@@ -321,3 +334,70 @@ def test_api_documents_media_auth_related(via, mock_user_teams):
timeout=1,
)
assert response.content.decode("utf-8") == "my prose"
def test_api_documents_media_auth_not_ready_status():
"""Attachments with status not ready should not be accessible"""
document_id = uuid4()
filename = f"{uuid4()!s}.jpg"
key = f"{document_id!s}/attachments/{filename:s}"
default_storage.connection.meta.client.put_object(
Bucket=default_storage.bucket_name,
Key=key,
Body=BytesIO(b"my prose"),
ContentType="text/plain",
Metadata={"status": DocumentAttachmentStatus.PROCESSING},
)
factories.DocumentFactory(id=document_id, link_reach="public", attachments=[key])
original_url = f"http://localhost/media/{key:s}"
response = APIClient().get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=original_url
)
assert response.status_code == 403
def test_api_documents_media_auth_missing_status_metadata():
"""Attachments without status metadata should be considered as ready"""
document_id = uuid4()
filename = f"{uuid4()!s}.jpg"
key = f"{document_id!s}/attachments/{filename:s}"
default_storage.connection.meta.client.put_object(
Bucket=default_storage.bucket_name,
Key=key,
Body=BytesIO(b"my prose"),
ContentType="text/plain",
)
factories.DocumentFactory(id=document_id, link_reach="public", attachments=[key])
original_url = f"http://localhost/media/{key:s}"
response = APIClient().get(
"/api/v1.0/documents/media-auth/", HTTP_X_ORIGINAL_URL=original_url
)
assert response.status_code == 200
authorization = response["Authorization"]
assert "AWS4-HMAC-SHA256 Credential=" in authorization
assert (
"SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature="
in authorization
)
assert response["X-Amz-Date"] == timezone.now().strftime("%Y%m%dT%H%M%SZ")
s3_url = urlparse(settings.AWS_S3_ENDPOINT_URL)
file_url = f"{settings.AWS_S3_ENDPOINT_URL:s}/impress-media-storage/{key:s}"
response = requests.get(
file_url,
headers={
"authorization": authorization,
"x-amz-date": response["x-amz-date"],
"x-amz-content-sha256": response["x-amz-content-sha256"],
"Host": f"{s3_url.hostname:s}:{s3_url.port:d}",
},
timeout=1,
)
assert response.content.decode("utf-8") == "my prose"

View File

@@ -15,8 +15,8 @@ from core.malware_detection import malware_detection_callback
pytestmark = pytest.mark.django_db
@pytest.fixture
def safe_file():
@pytest.fixture(name="safe_file")
def fixture_safe_file():
"""Create a safe file."""
file_path = "test.txt"
default_storage.save(file_path, ContentFile("test"))
@@ -24,8 +24,8 @@ def safe_file():
default_storage.delete(file_path)
@pytest.fixture
def unsafe_file():
@pytest.fixture(name="unsafe_file")
def fixture_unsafe_file():
"""Create an unsafe file."""
file_path = "unsafe.txt"
default_storage.save(file_path, ContentFile("test"))