♻️(backend) encapsulate token generation in a service

Encapsulate token generation logic for authenticating to the
external API in a well-scoped service.

This service can later be reused in other parts of the codebase,
especially for providing tokens required by calendar integrations.

Commit was cherry picked from #897
This commit is contained in:
lebaudantoine
2026-01-22 20:15:07 +01:00
committed by aleb_the_flash
parent 259b739160
commit 7cab46dc29
2 changed files with 123 additions and 23 deletions

View File

@@ -1,6 +1,5 @@
"""External API endpoints"""
from datetime import datetime, timedelta, timezone
from logging import getLogger
from django.conf import settings
@@ -8,7 +7,6 @@ from django.contrib.auth.hashers import check_password
from django.core.exceptions import SuspiciousOperation, ValidationError
from django.core.validators import validate_email
import jwt
from lasuite.oidc_resource_server.authentication import ResourceServerAuthentication
from rest_framework import decorators, mixins, viewsets
from rest_framework import (
@@ -22,6 +20,7 @@ from rest_framework import (
)
from core import api, models
from core.services.jwt_token import JwtTokenService
from . import authentication, permissions, serializers
@@ -128,33 +127,28 @@ class ApplicationViewSet(viewsets.ViewSet):
"Multiple user accounts share a common email."
) from e
now = datetime.now(timezone.utc)
scope = " ".join(application.scopes or [])
payload = {
"iss": settings.APPLICATION_JWT_ISSUER,
"aud": settings.APPLICATION_JWT_AUDIENCE,
"iat": now,
"exp": now + timedelta(seconds=settings.APPLICATION_JWT_EXPIRATION_SECONDS),
"client_id": client_id,
"scope": scope,
"user_id": str(user.id),
"delegated": True,
}
token = jwt.encode(
payload,
settings.APPLICATION_JWT_SECRET_KEY,
token_service = JwtTokenService(
secret_key=settings.APPLICATION_JWT_SECRET_KEY,
algorithm=settings.APPLICATION_JWT_ALG,
issuer=settings.APPLICATION_JWT_ISSUER,
audience=settings.APPLICATION_JWT_AUDIENCE,
expiration_seconds=settings.APPLICATION_JWT_EXPIRATION_SECONDS,
token_type=settings.APPLICATION_JWT_TOKEN_TYPE,
)
data = token_service.generate_jwt(
user,
scope,
{
"client_id": client_id,
"delegated": True,
},
)
return drf_response.Response(
{
"access_token": token,
"token_type": settings.APPLICATION_JWT_TOKEN_TYPE,
"expires_in": settings.APPLICATION_JWT_EXPIRATION_SECONDS,
"scope": scope,
},
data,
status=drf_status.HTTP_200_OK,
)

View File

@@ -0,0 +1,106 @@
"""JWT token service."""
# pylint: disable=R0913,R0917
# ruff: noqa: PLR0913
from datetime import datetime, timedelta, timezone
from typing import Optional
from django.core.exceptions import ImproperlyConfigured
import jwt
class JwtTokenService:
"""Generic JWT token service with configurable settings."""
def __init__(
self,
secret_key: str,
algorithm: str,
issuer: str,
audience: str,
expiration_seconds: int,
token_type: str,
):
"""
Initialize the token service with custom settings.
Args:
secret_key: Secret key for JWT encoding/decoding
algorithm: JWT algorithm
issuer: Token issuer identifier
audience: Token audience identifier
expiration_seconds: Token expiration time in seconds
token_type: Token type
Raises:
ImproperlyConfigured: If secret_key is None or empty
"""
if not secret_key:
raise ImproperlyConfigured("Secret key is required.")
if not algorithm:
raise ImproperlyConfigured("Algorithm is required.")
if not token_type:
raise ImproperlyConfigured("Token's type is required.")
if expiration_seconds is None:
raise ImproperlyConfigured("Expiration's seconds is required.")
self._key = secret_key
self._algorithm = algorithm
self._issuer = issuer
self._audience = audience
self._expiration_seconds = expiration_seconds
self._token_type = token_type
def generate_jwt(
self, user, scope: str, extra_payload: Optional[dict] = None
) -> dict:
"""
Generate an access token for the given user.
Note: any extra_payload variables named iat, exp, or user_id will
be overwritten by this service
Args:
user: User instance for whom to generate the token
scope: Space-separated scope string
Returns:
Dictionary containing access_token, token_type, expires_in, and scope optionally
"""
now = datetime.now(timezone.utc)
payload = extra_payload.copy() if extra_payload else {}
payload.update(
{
"iat": now,
"exp": now + timedelta(seconds=self._expiration_seconds),
"user_id": str(user.id),
}
)
if self._issuer:
payload["iss"] = self._issuer
if self._audience:
payload["aud"] = self._audience
if scope:
payload["scope"] = scope
token = jwt.encode(
payload,
self._key,
algorithm=self._algorithm,
)
response = {
"access_token": token,
"token_type": self._token_type,
"expires_in": self._expiration_seconds,
}
if scope:
response["scope"] = scope
return response