From ac87980a27a7b078b129bf79592da6da3b271897 Mon Sep 17 00:00:00 2001 From: lebaudantoine Date: Mon, 26 Jan 2026 16:23:46 +0100 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F(backend)=20refactor=20extern?= =?UTF-8?q?al=20API=20authentication=20classes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor external API authentication classes to inherit from a common base authentication backend. Prepare the introduction of a new authentication class responsible for verifying tokens provided to calendar integrations. Move token decoding responsibility to the new token service so it can both generate and validate tokens. Encapsulate external exceptions and expose a clear interface by defining custom Python exceptions raised during token validation. Taken from #897. --- CHANGELOG.md | 1 + .../core/external_api/authentication.py | 179 +++++++++++++----- src/backend/core/services/jwt_token.py | 47 +++++ 3 files changed, 180 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ace88363..0ef1780e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ and this project adheres to ### Changed - ✨(frontend) add clickable settings general link in idle modal #974 +- ♻️(backend) refactor external API token-related items #1006 ## [1.6.0] - 2026-02-10 diff --git a/src/backend/core/external_api/authentication.py b/src/backend/core/external_api/authentication.py index 2ff16e2c..c16f6573 100644 --- a/src/backend/core/external_api/authentication.py +++ b/src/backend/core/external_api/authentication.py @@ -1,27 +1,51 @@ """Authentication Backends for external application to the Meet core app.""" +# pylint: disable=R0913,R0917 +# ruff: noqa: PLR0913 + import logging from django.conf import settings from django.contrib.auth import get_user_model from django.core.exceptions import SuspiciousOperation -import jwt as pyJwt from lasuite.oidc_resource_server.backend import ResourceServerBackend as LaSuiteBackend from rest_framework import authentication, exceptions from core.models import Application +from core.services import jwt_token User = get_user_model() logger = logging.getLogger(__name__) -class ApplicationJWTAuthentication(authentication.BaseAuthentication): - """JWT authentication for application-delegated API access. +class BaseJWTAuthentication(authentication.BaseAuthentication): + """Base JWT authentication class.""" - Validates JWT tokens issued to applications that are acting on behalf - of users. Tokens must include user_id, client_id, and delegation flag. - """ + def __init__( + self, secret_key, algorithm, issuer, audience, expiration_seconds, token_type + ): + """Initialize the JWT authentication backend with the given token service configuration. + + Args: + secret_key: Secret key for JWT encoding/decoding + algorithm: JWT algorithm (e.g. HS256) + issuer: Expected token issuer identifier + audience: Expected token audience identifier + expiration_seconds: Token expiration time in seconds + token_type: Token type (e.g. Bearer) + """ + + super().__init__() + + self._token_service = jwt_token.JwtTokenService( + secret_key=secret_key, + algorithm=algorithm, + issuer=issuer, + audience=audience, + expiration_seconds=expiration_seconds, + token_type=token_type, + ) def authenticate(self, request): """Extract and validate JWT from Authorization header. @@ -48,6 +72,78 @@ class ApplicationJWTAuthentication(authentication.BaseAuthentication): return self.authenticate_credentials(token) + def decode_jwt(self, token): + """Decode and validate JWT token. + + Args: + token: JWT token string + + Returns: + Decoded payload dict, or None if token is invalid + + Raises: + AuthenticationFailed: If token is expired or has invalid issuer/audience + """ + + try: + payload = self._token_service.decode_jwt(token) + return payload + except jwt_token.TokenExpiredError as e: + logger.warning("Token expired") + raise exceptions.AuthenticationFailed("Token expired.") from e + except jwt_token.TokenInvalidError as e: + logger.warning("Invalid JWT issuer or audience: %s", e) + raise exceptions.AuthenticationFailed("Invalid token.") from e + except jwt_token.TokenDecodeError: + # Invalid JWT token - defer to next authentication backend + return None + + def validate_payload(self, payload): + """Validate JWT payload claims. + + Override in subclasses to add custom validation. + + Args: + payload: Decoded JWT payload + + Raises: + AuthenticationFailed: If required claims are missing or invalid + """ + + def get_user(self, payload): + """Retrieve and validate user from payload. + + Args: + payload: Decoded JWT payload + + Returns: + User instance + + Raises: + AuthenticationFailed: If user not found or inactive + """ + user_id = payload.get("user_id") + + if not user_id: + logger.warning("Missing 'user_id' in JWT payload") + raise exceptions.AuthenticationFailed("Invalid token claims.") + + try: + user = User.objects.get(id=user_id) + except User.DoesNotExist as e: + logger.warning("User not found: %s", user_id) + raise exceptions.AuthenticationFailed("User not found.") from e + + if not user.is_active: + logger.warning("Inactive user attempted authentication: %s", user_id) + raise exceptions.AuthenticationFailed("User account is disabled.") + + return user + + def authenticate_header(self, request): + """Return authentication scheme for WWW-Authenticate header.""" + return "Bearer" + def authenticate_credentials(self, token): """Validate JWT token and return authenticated user. @@ -62,36 +158,41 @@ class ApplicationJWTAuthentication(authentication.BaseAuthentication): Raises: AuthenticationFailed: If token is expired, or user not found """ - # Decode and validate JWT - try: - payload = pyJwt.decode( - token, - settings.APPLICATION_JWT_SECRET_KEY, - algorithms=[settings.APPLICATION_JWT_ALG], - issuer=settings.APPLICATION_JWT_ISSUER, - audience=settings.APPLICATION_JWT_AUDIENCE, - ) - except pyJwt.ExpiredSignatureError as e: - logger.warning("Token expired") - raise exceptions.AuthenticationFailed("Token expired.") from e - except pyJwt.InvalidIssuerError as e: - logger.warning("Invalid JWT issuer: %s", e) - raise exceptions.AuthenticationFailed("Invalid token.") from e - except pyJwt.InvalidAudienceError as e: - logger.warning("Invalid JWT audience: %s", e) - raise exceptions.AuthenticationFailed("Invalid token.") from e - except pyJwt.InvalidTokenError: - # Invalid JWT token - defer to next authentication backend + + payload = self.decode_jwt(token) + + if payload is None: return None - user_id = payload.get("user_id") + self.validate_payload(payload) + user = self.get_user(payload) + + return (user, payload) + + +class ApplicationJWTAuthentication(BaseJWTAuthentication): + """JWT authentication for application-delegated API access. + + Validates JWT tokens issued to applications that are acting on behalf + of users. Tokens must include user_id, client_id, and delegation flag. + """ + + def __init__(self): + """Initialize authentication backend with application JWT settings from Django settings.""" + super().__init__( + secret_key=settings.APPLICATION_JWT_SECRET_KEY, + algorithm=settings.APPLICATION_JWT_ALG, + issuer=settings.APPLICATION_JWT_ISSUER, + audience=settings.APPLICATION_JWT_AUDIENCE, + expiration_seconds=settings.APPLICATION_JWT_EXPIRATION_SECONDS, + token_type=settings.APPLICATION_JWT_TOKEN_TYPE, + ) + + def validate_payload(self, payload): + """Validate application-specific claims.""" client_id = payload.get("client_id") is_delegated = payload.get("delegated", False) - if not user_id: - logger.warning("Missing 'user_id' in JWT payload") - raise exceptions.AuthenticationFailed("Invalid token claims.") - if not client_id: logger.warning("Missing 'client_id' in JWT payload") raise exceptions.AuthenticationFailed("Invalid token claims.") @@ -112,22 +213,6 @@ class ApplicationJWTAuthentication(authentication.BaseAuthentication): logger.warning("Token is not marked as delegated") raise exceptions.AuthenticationFailed("Invalid token type.") - try: - user = User.objects.get(id=user_id) - except User.DoesNotExist as e: - logger.warning("User not found: %s", user_id) - raise exceptions.AuthenticationFailed("User not found.") from e - - if not user.is_active: - logger.warning("Inactive user attempted authentication: %s", user_id) - raise exceptions.AuthenticationFailed("User account is disabled.") - - return (user, payload) - - def authenticate_header(self, request): - """Return authentication scheme for WWW-Authenticate header.""" - return "Bearer" - class ResourceServerBackend(LaSuiteBackend): """OIDC Resource Server backend for user creation and retrieval.""" diff --git a/src/backend/core/services/jwt_token.py b/src/backend/core/services/jwt_token.py index 40428e58..19178fc6 100644 --- a/src/backend/core/services/jwt_token.py +++ b/src/backend/core/services/jwt_token.py @@ -11,6 +11,22 @@ from django.core.exceptions import ImproperlyConfigured import jwt +class JWTError(Exception): + """Base exception for all JWT token errors.""" + + +class TokenExpiredError(JWTError): + """Raised when the JWT token has expired.""" + + +class TokenInvalidError(JWTError): + """Raised when the JWT token has an invalid issuer or audience.""" + + +class TokenDecodeError(JWTError): + """Raised for any other unrecoverable JWT decode failure.""" + + class JwtTokenService: """Generic JWT token service with configurable settings.""" @@ -104,3 +120,34 @@ class JwtTokenService: response["scope"] = scope return response + + def decode_jwt(self, token): + """Decode and validate JWT token. + + Args: + token: JWT token string + + Returns: + Decoded payload dict. + + Raises: + TokenExpiredError: If the token has expired. + TokenInvalidError: If the token has an invalid issuer or audience. + TokenDecodeError: If the token is malformed or cannot be decoded. + """ + + try: + payload = jwt.decode( + token, + self._key, + algorithms=[self._algorithm], + issuer=self._issuer, + audience=self._audience, + ) + return payload + except jwt.ExpiredSignatureError as e: + raise TokenExpiredError("Token expired.") from e + except (jwt.InvalidIssuerError, jwt.InvalidAudienceError) as e: + raise TokenInvalidError("Invalid token.") from e + except jwt.InvalidTokenError as e: + raise TokenDecodeError("Token decode error.") from e