♻️(backend) refactor media_auth and collaboration_auth for flexibility

These 2 actions had factorized code but a few iterations lead to
spaghetti code where factorized code includes "if" clauses.

Refactor abstractions so that code factorization really works.
This commit is contained in:
Samuel Paccoud - DINUM
2024-12-27 17:19:16 +01:00
committed by Manuel Raynaud
parent 710bbf512c
commit 54f9b3963e

View File

@@ -1088,10 +1088,10 @@ class DocumentViewSet(
status=drf.status.HTTP_201_CREATED,
)
def _authorize_subrequest(self, request, pattern):
def _auth_get_original_url(self, request):
"""
Shared method to authorize access based on the original URL of an Nginx subrequest
and user permissions. Returns a dictionary of URL parameters if authorized.
Extracts and parses the original URL from the "HTTP_X_ORIGINAL_URL" header.
Raises PermissionDenied if the header is missing.
The original url is passed by nginx in the "HTTP_X_ORIGINAL_URL" header.
See corresponding ingress configuration in Helm chart and read about the
@@ -1102,14 +1102,6 @@ class DocumentViewSet(
to let this request go through (by returning a 200 code) or if we block it (by returning
a 403 error). Note that we return 403 errors without any further details for security
reasons.
Parameters:
- pattern: The regex pattern to extract identifiers from the URL.
Returns:
- A dictionary of URL parameters if the request is authorized.
Raises:
- PermissionDenied if authorization fails.
"""
# Extract the original URL from the request header
original_url = request.META.get("HTTP_X_ORIGINAL_URL")
@@ -1117,52 +1109,32 @@ class DocumentViewSet(
logger.debug("Missing HTTP_X_ORIGINAL_URL header in subrequest")
raise drf.exceptions.PermissionDenied()
parsed_url = urlparse(original_url)
match = pattern.search(parsed_url.path)
# If the path does not match the pattern, try to extract the parameters from the query
if not match:
match = pattern.search(parsed_url.query)
if not match:
logger.debug(
"Subrequest URL '%s' did not match pattern '%s'",
parsed_url.path,
pattern,
)
raise drf.exceptions.PermissionDenied()
logger.debug("Original url: '%s'", original_url)
return urlparse(original_url)
def _auth_get_url_params(self, pattern, fragment):
"""
Extracts URL parameters from the given fragment using the specified regex pattern.
Raises PermissionDenied if parameters cannot be extracted.
"""
match = pattern.search(fragment)
try:
url_params = match.groupdict()
return match.groupdict()
except (ValueError, AttributeError) as exc:
logger.debug("Failed to extract parameters from subrequest URL: %s", exc)
raise drf.exceptions.PermissionDenied() from exc
pk = url_params.get("pk")
if not pk:
logger.debug("Document ID (pk) not found in URL parameters: %s", url_params)
raise drf.exceptions.PermissionDenied()
# Fetch the document and check if the user has access
def _auth_get_document(self, pk):
"""
Retrieves the document corresponding to the given primary key (pk).
Raises PermissionDenied if the document is not found.
"""
try:
document = models.Document.objects.get(pk=pk)
return models.Document.objects.get(pk=pk)
except models.Document.DoesNotExist as exc:
logger.debug("Document with ID '%s' does not exist", pk)
raise drf.exceptions.PermissionDenied() from exc
user_abilities = document.get_abilities(request.user)
if not user_abilities.get(self.action, False):
logger.debug(
"User '%s' lacks permission for document '%s'", request.user, pk
)
raise drf.exceptions.PermissionDenied()
logger.debug(
"Subrequest authorization successful. Extracted parameters: %s", url_params
)
return url_params, user_abilities, request.user.id
@drf.decorators.action(detail=False, methods=["get"], url_path="media-auth")
def media_auth(self, request, *args, **kwargs):
"""
@@ -1174,13 +1146,24 @@ class DocumentViewSet(
annotation. The request will then be proxied to the object storage backend who will
respond with the file after checking the signature included in headers.
"""
url_params, _, _ = self._authorize_subrequest(
request, MEDIA_STORAGE_URL_PATTERN
parsed_url = self._auth_get_original_url(request)
url_params = self._auth_get_url_params(
enums.MEDIA_STORAGE_URL_PATTERN, parsed_url.path
)
pk, key = url_params.values()
document = self._auth_get_document(url_params["pk"])
if not document.get_abilities(request.user).get(self.action, False):
logger.debug(
"User '%s' lacks permission for document '%s'",
request.user,
document.pk,
)
raise drf.exceptions.PermissionDenied()
# Generate S3 authorization headers using the extracted URL parameters
request = utils.generate_s3_authorization_headers(f"{pk:s}/{key:s}")
request = utils.generate_s3_authorization_headers(
f"{url_params['pk']:s}/{url_params['key']:s}"
)
return drf.response.Response("authorized", headers=request.headers, status=200)
@@ -1190,18 +1173,34 @@ class DocumentViewSet(
This view is used by an Nginx subrequest to control access to a document's
collaboration server.
"""
_, user_abilities, user_id = self._authorize_subrequest(
request, COLLABORATION_WS_URL_PATTERN
parsed_url = self._auth_get_original_url(request)
url_params = self._auth_get_url_params(
enums.COLLABORATION_WS_URL_PATTERN, parsed_url.query
)
can_edit = user_abilities["partial_update"]
document = self._auth_get_document(url_params["pk"])
abilities = document.get_abilities(request.user)
if not abilities.get(self.action, False):
logger.debug(
"User '%s' lacks permission for document '%s'",
request.user,
document.pk,
)
raise drf.exceptions.PermissionDenied()
if not settings.COLLABORATION_SERVER_SECRET:
logger.debug("Collaboration server secret is not defined")
raise drf.exceptions.PermissionDenied()
# Add the collaboration server secret token to the headers
headers = {
"Authorization": settings.COLLABORATION_SERVER_SECRET,
"X-Can-Edit": str(can_edit),
"X-User-Id": str(user_id),
"X-Can-Edit": str(abilities["partial_update"]),
}
if request.user.is_authenticated:
headers["X-User-Id"] = str(request.user.id)
return drf.response.Response("authorized", headers=headers, status=200)
@drf.decorators.action(