(backend) add minio event parser

When a new file is uploaded to a Minio Bucket, a webhook can be
configured to notify third parties about the event. Basically,
it's a POST call with a payload providing informations on the
event that just happened.

When a recording worker will stop, it will upload its data to a Minio
bucket, which will trigger the webhook.

Try to introduce the minimalest code to parse these events, discard
them whener it's relevant, and extract the recording ID, thus we
know which recording was successfully saved to the Minio bucket.

In the longer runner, it will trigger a callback.
This commit is contained in:
lebaudantoine
2024-11-08 11:27:37 +01:00
committed by aleb_the_flash
parent 840033fcbc
commit 8309545ec6
8 changed files with 721 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Meet event parser classes, authentication and exceptions."""

View File

@@ -0,0 +1,93 @@
"""Authentication class for storage event token validation."""
import logging
import secrets
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
logger = logging.getLogger(__name__)
class MachineUser:
"""Represent a non-interactive system user for automated storage operations."""
def __init__(self) -> None:
self.pk = None
self.username = "storage_event_user"
self.is_active = True
@property
def is_authenticated(self):
"""Indicate if this machine user is authenticated."""
return True
@property
def is_anonymous(self) -> bool:
"""Indicate if this is an anonymous user."""
return False
def get_username(self) -> str:
"""Return the machine user identifier."""
return self.username
class StorageEventAuthentication(BaseAuthentication):
"""Authenticate requests using a Bearer token for storage event integration.
This class validates Bearer tokens for storage events that don't map to database users.
It's designed for S3-compatible storage integrations and similar use cases.
Events are submitted when a webhook is configured on some bucket's events.
"""
AUTH_HEADER = "Authorization"
TOKEN_TYPE = "Bearer" # noqa S105
def authenticate(self, request):
"""Validate the Bearer token from the Authorization header."""
if not settings.RECORDING_ENABLE_STORAGE_EVENT_AUTH:
return MachineUser(), None
required_token = settings.RECORDING_STORAGE_EVENT_TOKEN
if not required_token:
if settings.RECORDING_ENABLE_STORAGE_EVENT_AUTH:
raise AuthenticationFailed(
_("Authentication is enabled but token is not configured.")
)
return MachineUser(), None
auth_header = request.headers.get(self.AUTH_HEADER)
if not auth_header:
logger.warning(
"Authentication failed: Missing Authorization header (ip: %s)",
request.META.get("REMOTE_ADDR"),
)
raise AuthenticationFailed(_("Authorization header is required"))
auth_parts = auth_header.split(" ")
if len(auth_parts) != 2 or auth_parts[0] != self.TOKEN_TYPE:
logger.warning(
"Authentication failed: Invalid authorization header (ip: %s)",
request.META.get("REMOTE_ADDR"),
)
raise AuthenticationFailed(_("Invalid authorization header."))
token = auth_parts[1]
# Use constant-time comparison to prevent timing attacks
if not secrets.compare_digest(token.encode(), required_token.encode()):
logger.warning(
"Authentication failed: Invalid token (ip: %s)",
request.META.get("REMOTE_ADDR"),
)
raise AuthenticationFailed(_("Invalid token"))
return MachineUser(), token
def authenticate_header(self, request):
"""Return the WWW-Authenticate header value."""
return f"{self.TOKEN_TYPE} realm='Storage event API'"

View File

@@ -0,0 +1,17 @@
"""Storage parsers specific exceptions."""
class ParsingEventDataError(Exception):
"""Raised when the request data is malformed, incomplete, or missing."""
class InvalidBucketError(Exception):
"""Raised when the bucket name in the request does not match the expected one."""
class InvalidFileTypeError(Exception):
"""Raised when the file type in the request is not supported."""
class InvalidFilepathError(Exception):
"""Raised when the filepath in the request is invalid."""

View File

@@ -0,0 +1,147 @@
"""Meet storage event parser classes."""
import logging
import re
from dataclasses import dataclass
from functools import lru_cache
from typing import Any, Dict, Optional, Protocol
from django.conf import settings
from django.utils.module_loading import import_string
from .exceptions import (
InvalidBucketError,
InvalidFilepathError,
InvalidFileTypeError,
ParsingEventDataError,
)
logger = logging.getLogger(__name__)
@dataclass
class StorageEvent:
"""Represents a storage event with relevant metadata.
Attributes:
filepath: Identifier for the affected recording
filetype: Type of storage event
bucket_name: When the event occurred
metadata: Additional event data
"""
filepath: str
filetype: str
bucket_name: str
metadata: Optional[Dict[str, Any]]
def __post_init__(self):
if self.filepath is None:
raise TypeError("filepath cannot be None")
if self.filetype is None:
raise TypeError("filetype cannot be None")
if self.bucket_name is None:
raise TypeError("bucket_name cannot be None")
class EventParser(Protocol):
"""Interface for parsing storage events."""
def __init__(self, bucket_name, allowed_filetypes=None):
"""Initialize parser with bucket name and optional allowed filetypes."""
def parse(self, data: Dict) -> StorageEvent:
"""Extract storage event data from raw dictionary input."""
def validate(self, data: StorageEvent) -> None:
"""Verify storage event data meets all requirements."""
def get_recording_id(self, data: Dict) -> str:
"""Extract recording ID from event dictionary."""
@lru_cache(maxsize=1)
def get_parser() -> EventParser:
"""Return cached instance of configured event parser.
Uses function memoization instead of a factory class since the only
varying parameter is the parser class from settings. A factory class
would add unnecessary complexity when a cached function provides the
same singleton behavior with simpler code.
"""
event_parser_cls = import_string(settings.RECORDING_EVENT_PARSER_CLASS)
return event_parser_cls(bucket_name=settings.AWS_STORAGE_BUCKET_NAME)
class MinioParser:
"""Handle parsing and validation of Minio storage events."""
def __init__(self, bucket_name: str, allowed_filetypes=None):
"""Initialize parser with target bucket name and accepted filetypes."""
if not bucket_name:
raise ValueError("Bucket name cannot be None or empty")
self._bucket_name = bucket_name
self._allowed_filetypes = allowed_filetypes or {"audio/ogg", "video/mp4"}
# pylint: disable=line-too-long
self._filepath_regex = re.compile(
r"(?P<url_encoded_folder_path>(?:[^%]+%2F)*)?(?P<recording_id>[0-9a-fA-F\-]{36})\.(?P<extension>[a-zA-Z0-9]+)"
)
@staticmethod
def parse(data):
"""Convert raw Minio event dictionary to StorageEvent object."""
if not data:
raise ParsingEventDataError("Received empty data.")
try:
record = data["Records"][0]
s3 = record["s3"]
bucket_name = s3["bucket"]["name"]
file_object = s3["object"]
filepath = file_object["key"]
filetype = file_object["contentType"]
except (KeyError, IndexError) as e:
raise ParsingEventDataError(f"Missing or malformed key: {e}.") from e
try:
return StorageEvent(
filepath=filepath,
filetype=filetype,
bucket_name=bucket_name,
metadata=None,
)
except TypeError as e:
raise ParsingEventDataError(f"Missing essential data fields: {e}") from e
def validate(self, event_data: StorageEvent) -> str:
"""Verify StorageEvent matches bucket, filetype and filepath requirements."""
if event_data.bucket_name != self._bucket_name:
raise InvalidBucketError(
f"Invalid bucket: expected {self._bucket_name}, got {event_data.bucket_name}"
)
if not event_data.filetype in self._allowed_filetypes:
raise InvalidFileTypeError(
f"Invalid file type, expected {self._allowed_filetypes},"
f"got '{event_data.filetype}'"
)
match = self._filepath_regex.match(event_data.filepath)
if not match:
raise InvalidFilepathError(
f"Invalid filepath structure: {event_data.filepath}"
)
recording_id = match.group("recording_id")
return recording_id
def get_recording_id(self, data):
"""Extract recording ID from Minio event through parsing and validation."""
event_data = self.parse(data)
recording_id = self.validate(event_data)
return recording_id