From 7cab46dc2961c8163c6f3140a4945cd27629c130 Mon Sep 17 00:00:00 2001 From: lebaudantoine Date: Thu, 22 Jan 2026 20:15:07 +0100 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F(backend)=20encapsulate=20tok?= =?UTF-8?q?en=20generation=20in=20a=20service?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Encapsulate token generation logic for authenticating to the external API in a well-scoped service. This service can later be reused in other parts of the codebase, especially for providing tokens required by calendar integrations. Commit was cherry picked from #897 --- src/backend/core/external_api/viewsets.py | 40 ++++---- src/backend/core/services/jwt_token.py | 106 ++++++++++++++++++++++ 2 files changed, 123 insertions(+), 23 deletions(-) create mode 100644 src/backend/core/services/jwt_token.py diff --git a/src/backend/core/external_api/viewsets.py b/src/backend/core/external_api/viewsets.py index 41f7d0cc..5a0e550c 100644 --- a/src/backend/core/external_api/viewsets.py +++ b/src/backend/core/external_api/viewsets.py @@ -1,6 +1,5 @@ """External API endpoints""" -from datetime import datetime, timedelta, timezone from logging import getLogger from django.conf import settings @@ -8,7 +7,6 @@ from django.contrib.auth.hashers import check_password from django.core.exceptions import SuspiciousOperation, ValidationError from django.core.validators import validate_email -import jwt from lasuite.oidc_resource_server.authentication import ResourceServerAuthentication from rest_framework import decorators, mixins, viewsets from rest_framework import ( @@ -22,6 +20,7 @@ from rest_framework import ( ) from core import api, models +from core.services.jwt_token import JwtTokenService from . import authentication, permissions, serializers @@ -128,33 +127,28 @@ class ApplicationViewSet(viewsets.ViewSet): "Multiple user accounts share a common email." ) from e - now = datetime.now(timezone.utc) scope = " ".join(application.scopes or []) - payload = { - "iss": settings.APPLICATION_JWT_ISSUER, - "aud": settings.APPLICATION_JWT_AUDIENCE, - "iat": now, - "exp": now + timedelta(seconds=settings.APPLICATION_JWT_EXPIRATION_SECONDS), - "client_id": client_id, - "scope": scope, - "user_id": str(user.id), - "delegated": True, - } - - token = jwt.encode( - payload, - settings.APPLICATION_JWT_SECRET_KEY, + token_service = JwtTokenService( + secret_key=settings.APPLICATION_JWT_SECRET_KEY, algorithm=settings.APPLICATION_JWT_ALG, + issuer=settings.APPLICATION_JWT_ISSUER, + audience=settings.APPLICATION_JWT_AUDIENCE, + expiration_seconds=settings.APPLICATION_JWT_EXPIRATION_SECONDS, + token_type=settings.APPLICATION_JWT_TOKEN_TYPE, + ) + + data = token_service.generate_jwt( + user, + scope, + { + "client_id": client_id, + "delegated": True, + }, ) return drf_response.Response( - { - "access_token": token, - "token_type": settings.APPLICATION_JWT_TOKEN_TYPE, - "expires_in": settings.APPLICATION_JWT_EXPIRATION_SECONDS, - "scope": scope, - }, + data, status=drf_status.HTTP_200_OK, ) diff --git a/src/backend/core/services/jwt_token.py b/src/backend/core/services/jwt_token.py new file mode 100644 index 00000000..40428e58 --- /dev/null +++ b/src/backend/core/services/jwt_token.py @@ -0,0 +1,106 @@ +"""JWT token service.""" + +# pylint: disable=R0913,R0917 +# ruff: noqa: PLR0913 + +from datetime import datetime, timedelta, timezone +from typing import Optional + +from django.core.exceptions import ImproperlyConfigured + +import jwt + + +class JwtTokenService: + """Generic JWT token service with configurable settings.""" + + def __init__( + self, + secret_key: str, + algorithm: str, + issuer: str, + audience: str, + expiration_seconds: int, + token_type: str, + ): + """ + Initialize the token service with custom settings. + + Args: + secret_key: Secret key for JWT encoding/decoding + algorithm: JWT algorithm + issuer: Token issuer identifier + audience: Token audience identifier + expiration_seconds: Token expiration time in seconds + token_type: Token type + + Raises: + ImproperlyConfigured: If secret_key is None or empty + """ + if not secret_key: + raise ImproperlyConfigured("Secret key is required.") + if not algorithm: + raise ImproperlyConfigured("Algorithm is required.") + if not token_type: + raise ImproperlyConfigured("Token's type is required.") + if expiration_seconds is None: + raise ImproperlyConfigured("Expiration's seconds is required.") + + self._key = secret_key + self._algorithm = algorithm + self._issuer = issuer + self._audience = audience + self._expiration_seconds = expiration_seconds + self._token_type = token_type + + def generate_jwt( + self, user, scope: str, extra_payload: Optional[dict] = None + ) -> dict: + """ + Generate an access token for the given user. + + Note: any extra_payload variables named iat, exp, or user_id will + be overwritten by this service + + Args: + user: User instance for whom to generate the token + scope: Space-separated scope string + + Returns: + Dictionary containing access_token, token_type, expires_in, and scope optionally + """ + now = datetime.now(timezone.utc) + + payload = extra_payload.copy() if extra_payload else {} + + payload.update( + { + "iat": now, + "exp": now + timedelta(seconds=self._expiration_seconds), + "user_id": str(user.id), + } + ) + + if self._issuer: + payload["iss"] = self._issuer + if self._audience: + payload["aud"] = self._audience + if scope: + payload["scope"] = scope + + token = jwt.encode( + payload, + self._key, + algorithm=self._algorithm, + ) + + response = { + "access_token": token, + "token_type": self._token_type, + "expires_in": self._expiration_seconds, + } + + if scope: + response["scope"] = scope + + return response