In the attachment_upload method, the status in the file metadata to processing and the malware_detection backend is called. We check in the media_auth if the status is ready in order to accept the request.
53 lines
1.7 KiB
Python
53 lines
1.7 KiB
Python
"""Malware detection callbacks"""
|
|
|
|
import logging
|
|
|
|
from django.core.files.storage import default_storage
|
|
|
|
from lasuite.malware_detection.enums import ReportStatus
|
|
|
|
from core.enums import DocumentAttachmentStatus
|
|
from core.models import Document
|
|
|
|
logger = logging.getLogger(__name__)
|
|
security_logger = logging.getLogger("docs.security")
|
|
|
|
|
|
def malware_detection_callback(file_path, status, error_info, **kwargs):
|
|
"""Malware detection callback"""
|
|
|
|
if status == ReportStatus.SAFE:
|
|
logger.info("File %s is safe", file_path)
|
|
# Get existing metadata
|
|
s3_client = default_storage.connection.meta.client
|
|
bucket_name = default_storage.bucket_name
|
|
head_resp = s3_client.head_object(Bucket=bucket_name, Key=file_path)
|
|
metadata = head_resp.get("Metadata", {})
|
|
metadata.update({"status": DocumentAttachmentStatus.READY})
|
|
# Update status in metadata
|
|
s3_client.copy_object(
|
|
Bucket=bucket_name,
|
|
CopySource={"Bucket": bucket_name, "Key": file_path},
|
|
Key=file_path,
|
|
ContentType=head_resp.get("ContentType"),
|
|
Metadata=metadata,
|
|
MetadataDirective="REPLACE",
|
|
)
|
|
return
|
|
|
|
document_id = kwargs.get("document_id")
|
|
security_logger.warning(
|
|
"File %s for document %s is infected with malware. Error info: %s",
|
|
file_path,
|
|
document_id,
|
|
error_info,
|
|
)
|
|
|
|
# Remove the file from the document and change the status to unsafe
|
|
document = Document.objects.get(pk=document_id)
|
|
document.attachments.remove(file_path)
|
|
document.save(update_fields=["attachments"])
|
|
|
|
# Delete the file from the storage
|
|
default_storage.delete(file_path)
|