diff --git a/CHANGELOG.md b/CHANGELOG.md index ace88363..0ef1780e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ and this project adheres to ### Changed - ✨(frontend) add clickable settings general link in idle modal #974 +- ♻️(backend) refactor external API token-related items #1006 ## [1.6.0] - 2026-02-10 diff --git a/src/backend/core/external_api/authentication.py b/src/backend/core/external_api/authentication.py index 2ff16e2c..c16f6573 100644 --- a/src/backend/core/external_api/authentication.py +++ b/src/backend/core/external_api/authentication.py @@ -1,27 +1,51 @@ """Authentication Backends for external application to the Meet core app.""" +# pylint: disable=R0913,R0917 +# ruff: noqa: PLR0913 + import logging from django.conf import settings from django.contrib.auth import get_user_model from django.core.exceptions import SuspiciousOperation -import jwt as pyJwt from lasuite.oidc_resource_server.backend import ResourceServerBackend as LaSuiteBackend from rest_framework import authentication, exceptions from core.models import Application +from core.services import jwt_token User = get_user_model() logger = logging.getLogger(__name__) -class ApplicationJWTAuthentication(authentication.BaseAuthentication): - """JWT authentication for application-delegated API access. +class BaseJWTAuthentication(authentication.BaseAuthentication): + """Base JWT authentication class.""" - Validates JWT tokens issued to applications that are acting on behalf - of users. Tokens must include user_id, client_id, and delegation flag. - """ + def __init__( + self, secret_key, algorithm, issuer, audience, expiration_seconds, token_type + ): + """Initialize the JWT authentication backend with the given token service configuration. + + Args: + secret_key: Secret key for JWT encoding/decoding + algorithm: JWT algorithm (e.g. HS256) + issuer: Expected token issuer identifier + audience: Expected token audience identifier + expiration_seconds: Token expiration time in seconds + token_type: Token type (e.g. Bearer) + """ + + super().__init__() + + self._token_service = jwt_token.JwtTokenService( + secret_key=secret_key, + algorithm=algorithm, + issuer=issuer, + audience=audience, + expiration_seconds=expiration_seconds, + token_type=token_type, + ) def authenticate(self, request): """Extract and validate JWT from Authorization header. @@ -48,6 +72,78 @@ class ApplicationJWTAuthentication(authentication.BaseAuthentication): return self.authenticate_credentials(token) + def decode_jwt(self, token): + """Decode and validate JWT token. + + Args: + token: JWT token string + + Returns: + Decoded payload dict, or None if token is invalid + + Raises: + AuthenticationFailed: If token is expired or has invalid issuer/audience + """ + + try: + payload = self._token_service.decode_jwt(token) + return payload + except jwt_token.TokenExpiredError as e: + logger.warning("Token expired") + raise exceptions.AuthenticationFailed("Token expired.") from e + except jwt_token.TokenInvalidError as e: + logger.warning("Invalid JWT issuer or audience: %s", e) + raise exceptions.AuthenticationFailed("Invalid token.") from e + except jwt_token.TokenDecodeError: + # Invalid JWT token - defer to next authentication backend + return None + + def validate_payload(self, payload): + """Validate JWT payload claims. + + Override in subclasses to add custom validation. + + Args: + payload: Decoded JWT payload + + Raises: + AuthenticationFailed: If required claims are missing or invalid + """ + + def get_user(self, payload): + """Retrieve and validate user from payload. + + Args: + payload: Decoded JWT payload + + Returns: + User instance + + Raises: + AuthenticationFailed: If user not found or inactive + """ + user_id = payload.get("user_id") + + if not user_id: + logger.warning("Missing 'user_id' in JWT payload") + raise exceptions.AuthenticationFailed("Invalid token claims.") + + try: + user = User.objects.get(id=user_id) + except User.DoesNotExist as e: + logger.warning("User not found: %s", user_id) + raise exceptions.AuthenticationFailed("User not found.") from e + + if not user.is_active: + logger.warning("Inactive user attempted authentication: %s", user_id) + raise exceptions.AuthenticationFailed("User account is disabled.") + + return user + + def authenticate_header(self, request): + """Return authentication scheme for WWW-Authenticate header.""" + return "Bearer" + def authenticate_credentials(self, token): """Validate JWT token and return authenticated user. @@ -62,36 +158,41 @@ class ApplicationJWTAuthentication(authentication.BaseAuthentication): Raises: AuthenticationFailed: If token is expired, or user not found """ - # Decode and validate JWT - try: - payload = pyJwt.decode( - token, - settings.APPLICATION_JWT_SECRET_KEY, - algorithms=[settings.APPLICATION_JWT_ALG], - issuer=settings.APPLICATION_JWT_ISSUER, - audience=settings.APPLICATION_JWT_AUDIENCE, - ) - except pyJwt.ExpiredSignatureError as e: - logger.warning("Token expired") - raise exceptions.AuthenticationFailed("Token expired.") from e - except pyJwt.InvalidIssuerError as e: - logger.warning("Invalid JWT issuer: %s", e) - raise exceptions.AuthenticationFailed("Invalid token.") from e - except pyJwt.InvalidAudienceError as e: - logger.warning("Invalid JWT audience: %s", e) - raise exceptions.AuthenticationFailed("Invalid token.") from e - except pyJwt.InvalidTokenError: - # Invalid JWT token - defer to next authentication backend + + payload = self.decode_jwt(token) + + if payload is None: return None - user_id = payload.get("user_id") + self.validate_payload(payload) + user = self.get_user(payload) + + return (user, payload) + + +class ApplicationJWTAuthentication(BaseJWTAuthentication): + """JWT authentication for application-delegated API access. + + Validates JWT tokens issued to applications that are acting on behalf + of users. Tokens must include user_id, client_id, and delegation flag. + """ + + def __init__(self): + """Initialize authentication backend with application JWT settings from Django settings.""" + super().__init__( + secret_key=settings.APPLICATION_JWT_SECRET_KEY, + algorithm=settings.APPLICATION_JWT_ALG, + issuer=settings.APPLICATION_JWT_ISSUER, + audience=settings.APPLICATION_JWT_AUDIENCE, + expiration_seconds=settings.APPLICATION_JWT_EXPIRATION_SECONDS, + token_type=settings.APPLICATION_JWT_TOKEN_TYPE, + ) + + def validate_payload(self, payload): + """Validate application-specific claims.""" client_id = payload.get("client_id") is_delegated = payload.get("delegated", False) - if not user_id: - logger.warning("Missing 'user_id' in JWT payload") - raise exceptions.AuthenticationFailed("Invalid token claims.") - if not client_id: logger.warning("Missing 'client_id' in JWT payload") raise exceptions.AuthenticationFailed("Invalid token claims.") @@ -112,22 +213,6 @@ class ApplicationJWTAuthentication(authentication.BaseAuthentication): logger.warning("Token is not marked as delegated") raise exceptions.AuthenticationFailed("Invalid token type.") - try: - user = User.objects.get(id=user_id) - except User.DoesNotExist as e: - logger.warning("User not found: %s", user_id) - raise exceptions.AuthenticationFailed("User not found.") from e - - if not user.is_active: - logger.warning("Inactive user attempted authentication: %s", user_id) - raise exceptions.AuthenticationFailed("User account is disabled.") - - return (user, payload) - - def authenticate_header(self, request): - """Return authentication scheme for WWW-Authenticate header.""" - return "Bearer" - class ResourceServerBackend(LaSuiteBackend): """OIDC Resource Server backend for user creation and retrieval.""" diff --git a/src/backend/core/services/jwt_token.py b/src/backend/core/services/jwt_token.py index 40428e58..19178fc6 100644 --- a/src/backend/core/services/jwt_token.py +++ b/src/backend/core/services/jwt_token.py @@ -11,6 +11,22 @@ from django.core.exceptions import ImproperlyConfigured import jwt +class JWTError(Exception): + """Base exception for all JWT token errors.""" + + +class TokenExpiredError(JWTError): + """Raised when the JWT token has expired.""" + + +class TokenInvalidError(JWTError): + """Raised when the JWT token has an invalid issuer or audience.""" + + +class TokenDecodeError(JWTError): + """Raised for any other unrecoverable JWT decode failure.""" + + class JwtTokenService: """Generic JWT token service with configurable settings.""" @@ -104,3 +120,34 @@ class JwtTokenService: response["scope"] = scope return response + + def decode_jwt(self, token): + """Decode and validate JWT token. + + Args: + token: JWT token string + + Returns: + Decoded payload dict. + + Raises: + TokenExpiredError: If the token has expired. + TokenInvalidError: If the token has an invalid issuer or audience. + TokenDecodeError: If the token is malformed or cannot be decoded. + """ + + try: + payload = jwt.decode( + token, + self._key, + algorithms=[self._algorithm], + issuer=self._issuer, + audience=self._audience, + ) + return payload + except jwt.ExpiredSignatureError as e: + raise TokenExpiredError("Token expired.") from e + except (jwt.InvalidIssuerError, jwt.InvalidAudienceError) as e: + raise TokenInvalidError("Invalid token.") from e + except jwt.InvalidTokenError as e: + raise TokenDecodeError("Token decode error.") from e