(backend) add django-lasuite dependency

Use the OIDC backends from the new library.
This commit is contained in:
Quentin BEY
2025-04-02 12:02:06 +02:00
parent 594d3af0d0
commit e2d362bc77
27 changed files with 51 additions and 2011 deletions

View File

@@ -10,6 +10,7 @@ and this project adheres to
### Added
- (backend) add django-lasuite dependency #858
- ✨(plugins) add endpoint to list siret of active organizations #771
- ✨(core) create AccountServiceAuthentication backend #771
- ✨(core) create AccountService model #771

View File

@@ -6,6 +6,7 @@ from functools import reduce
from django.db.models import OuterRef, Prefetch, Q, Subquery, Value
from django.db.models.functions import Coalesce
from lasuite.oidc_resource_server.mixins import ResourceServerMixin
from rest_framework import (
filters,
mixins,
@@ -15,7 +16,6 @@ from rest_framework import (
from core import models
from core.api import permissions
from core.api.client.viewsets import Pagination
from core.resource_server.mixins import ResourceServerMixin
from . import serializers

View File

@@ -1,18 +1,15 @@
"""Authentication Backends for the People core app."""
import logging
from email.headerregistry import Address
from typing import Optional
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import SuspiciousOperation
from django.utils.translation import gettext_lazy as _
import requests
from mozilla_django_oidc.auth import (
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
from lasuite.oidc_login.backends import (
OIDCAuthenticationBackend as LaSuiteOIDCAuthenticationBackend,
)
from lasuite.tools.email import get_domain_from_email
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
@@ -26,100 +23,44 @@ from core.models import (
logger = logging.getLogger(__name__)
User = get_user_model()
def get_domain_from_email(email: Optional[str]) -> Optional[str]:
"""Extract domain from email."""
try:
return Address(addr_spec=email).domain
except (ValueError, AttributeError):
return None
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
class OIDCAuthenticationBackend(LaSuiteOIDCAuthenticationBackend):
"""Custom OpenID Connect (OIDC) Authentication Backend.
This class overrides the default OIDC Authentication Backend to accommodate differences
in the User model, and handles signed and/or encrypted UserInfo response.
"""
def get_userinfo(self, access_token, id_token, payload):
"""Return user details dictionary.
def get_extra_claims(self, user_info):
"""
Return extra claims from user_info.
Parameters:
- access_token (str): The access token.
- id_token (str): The id token (unused).
- payload (dict): The token payload (unused).
Note: The id_token and payload parameters are unused in this implementation,
but were kept to preserve base method signature.
Note: It handles signed and/or encrypted UserInfo Response. It is required by
Agent Connect, which follows the OIDC standard. It forces us to override the
base method, which deal with 'application/json' response.
Args:
user_info (dict): The user information dictionary.
Returns:
- dict: User details dictionary obtained from the OpenID Connect user endpoint.
dict: A dictionary of extra claims.
"""
user_response = requests.get(
self.OIDC_OP_USER_ENDPOINT,
headers={"Authorization": f"Bearer {access_token}"},
verify=self.get_settings("OIDC_VERIFY_SSL", True),
timeout=self.get_settings("OIDC_TIMEOUT", None),
proxies=self.get_settings("OIDC_PROXY", None),
)
user_response.raise_for_status()
userinfo = self.verify_token(user_response.text)
return userinfo
def get_or_create_user(self, access_token, id_token, payload):
"""Return a User based on userinfo. Create a new user if no match is found.
Parameters:
- access_token (str): The access token.
- id_token (str): The ID token.
- payload (dict): The user payload.
Returns:
- User: An existing or newly created User instance.
Raises:
- Exception: Raised when user creation is not allowed and no existing user is found.
"""
user_info = self.get_userinfo(access_token, id_token, payload)
sub = user_info.get("sub")
if not sub:
raise SuspiciousOperation(
_("User info contained no recognizable user identification")
)
# Get user's full name from OIDC fields defined in settings
email = user_info.get("email")
claims = {
"sub": sub,
"email": email,
"name": self.compute_full_name(user_info),
}
extra_claims = super().get_extra_claims(user_info)
if settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD:
claims[settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD] = user_info.get(
settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD
extra_claims[settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD] = (
user_info.get(settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD)
)
return extra_claims
# if sub is absent, try matching on email
user = self.get_existing_user(sub, email)
def post_get_or_create_user(self, user, claims):
"""
Post-processing after user creation or retrieval.
if user:
if not user.is_active:
raise SuspiciousOperation(_("User account is disabled"))
self.update_user_if_needed(user, claims)
elif self.get_settings("OIDC_CREATE_USER", True):
user = self.create_user(claims)
Args:
user (User): The user instance.
claims (dict): The claims dictionary.
Returns:
- None
"""
# Data cleaning, to be removed when user organization is null=False
# or all users have an organization.
# See https://github.com/suitenumerique/people/issues/504
@@ -127,7 +68,7 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
organization_registration_id = claims.get(
settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD
)
domain = get_domain_from_email(email)
domain = get_domain_from_email(claims["email"])
try:
organization, organization_created = (
Organization.objects.get_or_create_from_user_claims(
@@ -154,8 +95,6 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
"User %s updated with organization %s", user.pk, organization
)
return user
def create_user(self, claims):
"""Return a newly created User instance."""
sub = claims.get("sub")
@@ -167,8 +106,9 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
name = claims.get("name")
# Extract or create the organization from the data
organization_registration_id = claims.get(
settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD
organization_registration_id = claims.pop(
settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD,
None,
)
domain = get_domain_from_email(email)
try:
@@ -188,13 +128,8 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
logger.info("Creating user %s / %s", sub, email)
user = self.UserModel.objects.create(
organization=organization,
password="!", # noqa: S106
sub=sub,
email=email,
name=name,
)
user = super().create_user(claims | {"organization": organization})
if organization_created:
# Warning: we may remove this behavior in the near future when we
# add a feature to claim the organization ownership.
@@ -218,37 +153,6 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
return user
def compute_full_name(self, user_info):
"""Compute user's full name based on OIDC fields in settings."""
name_fields = settings.USER_OIDC_FIELDS_TO_NAME
full_name = " ".join(
user_info[field] for field in name_fields if user_info.get(field)
)
return full_name or None
def get_existing_user(self, sub, email):
"""Fetch existing user by sub or email."""
try:
return User.objects.get(sub=sub)
except User.DoesNotExist:
if email and settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION:
try:
return User.objects.get(email=email)
except User.DoesNotExist:
pass
return None
def update_user_if_needed(self, user, claims):
"""Update user claims if they have changed."""
updated_claims = {}
for key in ["email", "name"]:
claim_value = claims.get(key)
if claim_value and claim_value != getattr(user, key):
updated_claims[key] = claim_value
if updated_claims:
self.UserModel.objects.filter(sub=user.sub).update(**updated_claims)
class AccountServiceAuthentication(BaseAuthentication):
"""Authentication backend for account services using Authorization header.

View File

@@ -1,18 +0,0 @@
"""Authentication URLs for the People core app."""
from django.urls import path
from mozilla_django_oidc.urls import urlpatterns as mozilla_oidc_urls
from .views import OIDCLogoutCallbackView, OIDCLogoutView
urlpatterns = [
# Override the default 'logout/' path from Mozilla Django OIDC with our custom view.
path("logout/", OIDCLogoutView.as_view(), name="oidc_logout_custom"),
path(
"logout-callback/",
OIDCLogoutCallbackView.as_view(),
name="oidc_logout_callback",
),
*mozilla_oidc_urls,
]

View File

@@ -1,137 +0,0 @@
"""Authentication Views for the People core app."""
from urllib.parse import urlencode
from django.contrib import auth
from django.core.exceptions import SuspiciousOperation
from django.http import HttpResponseRedirect
from django.urls import reverse
from django.utils import crypto
from mozilla_django_oidc.utils import (
absolutify,
)
from mozilla_django_oidc.views import (
OIDCLogoutView as MozillaOIDCOIDCLogoutView,
)
class OIDCLogoutView(MozillaOIDCOIDCLogoutView):
"""Custom logout view for handling OpenID Connect (OIDC) logout flow.
Adds support for handling logout callbacks from the identity provider (OP)
by initiating the logout flow if the user has an active session.
The Django session is retained during the logout process to persist the 'state' OIDC parameter.
This parameter is crucial for maintaining the integrity of the logout flow between this call
and the subsequent callback.
"""
@staticmethod
def persist_state(request, state):
"""Persist the given 'state' parameter in the session's 'oidc_states' dictionary
This method is used to store the OIDC state parameter in the session, according to the
structure expected by Mozilla Django OIDC's 'add_state_and_verifier_and_nonce_to_session'
utility function.
"""
if "oidc_states" not in request.session or not isinstance(
request.session["oidc_states"], dict
):
request.session["oidc_states"] = {}
request.session["oidc_states"][state] = {}
request.session.save()
def construct_oidc_logout_url(self, request):
"""Create the redirect URL for interfacing with the OIDC provider.
Retrieves the necessary parameters from the session and constructs the URL
required to initiate logout with the OpenID Connect provider.
If no ID token is found in the session, the logout flow will not be initiated,
and the method will return the default redirect URL.
The 'state' parameter is generated randomly and persisted in the session to ensure
its integrity during the subsequent callback.
"""
oidc_logout_endpoint = self.get_settings("OIDC_OP_LOGOUT_ENDPOINT")
if not oidc_logout_endpoint:
return self.redirect_url
reverse_url = reverse("oidc_logout_callback")
id_token = request.session.get("oidc_id_token", None)
if not id_token:
return self.redirect_url
query = {
"id_token_hint": id_token,
"state": crypto.get_random_string(self.get_settings("OIDC_STATE_SIZE", 32)),
"post_logout_redirect_uri": absolutify(request, reverse_url),
}
self.persist_state(request, query["state"])
return f"{oidc_logout_endpoint}?{urlencode(query)}"
def post(self, request):
"""Handle user logout.
If the user is not authenticated, redirects to the default logout URL.
Otherwise, constructs the OIDC logout URL and redirects the user to start
the logout process.
If the user is redirected to the default logout URL, ensure her Django session
is terminated.
"""
logout_url = self.redirect_url
if request.user.is_authenticated:
logout_url = self.construct_oidc_logout_url(request)
# If the user is not redirected to the OIDC provider, ensure logout
if logout_url == self.redirect_url:
auth.logout(request)
return HttpResponseRedirect(logout_url)
class OIDCLogoutCallbackView(MozillaOIDCOIDCLogoutView):
"""Custom view for handling the logout callback from the OpenID Connect (OIDC) provider.
Handles the callback after logout from the identity provider (OP).
Verifies the state parameter and performs necessary logout actions.
The Django session is maintained during the logout process to ensure the integrity
of the logout flow initiated in the previous step.
"""
http_method_names = ["get"]
def get(self, request):
"""Handle the logout callback.
If the user is not authenticated, redirects to the default logout URL.
Otherwise, verifies the state parameter and performs necessary logout actions.
"""
if not request.user.is_authenticated:
return HttpResponseRedirect(self.redirect_url)
state = request.GET.get("state")
if state not in request.session.get("oidc_states", {}):
msg = "OIDC callback state not found in session `oidc_states`!"
raise SuspiciousOperation(msg)
del request.session["oidc_states"][state]
request.session.save()
auth.logout(request)
return HttpResponseRedirect(self.redirect_url)

View File

@@ -1 +0,0 @@
"""Backend resource server module."""

View File

@@ -1,80 +0,0 @@
"""Resource Server Authentication"""
import base64
import binascii
import logging
from functools import lru_cache
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.utils.module_loading import import_string
from mozilla_django_oidc.contrib.drf import OIDCAuthentication
from .backend import ResourceServerImproperlyConfiguredBackend
from .clients import AuthorizationServerClient
logger = logging.getLogger(__name__)
@lru_cache(maxsize=None)
def get_resource_server_backend():
"""Return the resource server backend class based on the settings."""
return import_string(settings.OIDC_RS_BACKEND_CLASS)
class ResourceServerAuthentication(OIDCAuthentication):
"""Authenticate clients using the token received from the authorization server."""
def __init__(self):
"""Require authentication to be configured in order to instantiate."""
super().__init__()
try:
authorization_server_client = AuthorizationServerClient(
url=settings.OIDC_OP_URL,
verify_ssl=settings.OIDC_VERIFY_SSL,
timeout=settings.OIDC_TIMEOUT,
proxy=settings.OIDC_PROXY,
url_jwks=settings.OIDC_OP_JWKS_ENDPOINT,
url_introspection=settings.OIDC_OP_INTROSPECTION_ENDPOINT,
)
self.backend = get_resource_server_backend()(authorization_server_client)
except ImproperlyConfigured as err:
message = "Resource Server authentication is disabled"
logger.debug("%s. Exception: %s", message, err)
self.backend = ResourceServerImproperlyConfiguredBackend()
def get_access_token(self, request):
"""Retrieve and decode the access token from the request.
This method overrides the 'get_access_token' method from the parent class,
to support service providers that would base64 encode the bearer token.
"""
access_token = super().get_access_token(request)
try:
access_token = base64.b64decode(access_token).decode("utf-8")
except (binascii.Error, TypeError):
pass
return access_token
def authenticate(self, request):
"""
Authenticate the request and return a tuple of (user, token) or None.
We override the 'authenticate' method from the parent class to store
the introspected token audience inside the request.
"""
result = super().authenticate(request) # Might raise AuthenticationFailed
if result is None: # Case when there is no access token
return None
# Note: at this stage, the request is a "drf_request" object
request.resource_server_token_audience = self.backend.token_origin_audience
return result

View File

@@ -1,310 +0,0 @@
"""Resource Server Backend"""
import json
import logging
from json import JSONDecodeError
from django.conf import settings
from django.contrib import auth
from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation
from joserfc import jwe as jose_jwe
from joserfc import jwt as jose_jwt
from joserfc.errors import InvalidClaimError, InvalidTokenError
from joserfc.jwt import Token
from requests.exceptions import HTTPError
from rest_framework.exceptions import AuthenticationFailed
from . import utils
logger = logging.getLogger(__name__)
class ResourceServerBackend:
"""Backend of an OAuth 2.0 resource server.
This backend is designed to authenticate resource owners to our API using the access token
they received from the authorization server.
In the context of OAuth 2.0, a resource server is a server that hosts protected resources and
is capable of accepting and responding to protected resource requests using access tokens.
The resource server verifies the validity of the access tokens issued by the authorization
server to ensure secure access to the resources.
For more information, visit: https://www.oauth.com/oauth2-servers/the-resource-server/
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, authorization_server_client):
"""Require client_id, client_secret set and authorization_server_client provided."""
# pylint: disable=invalid-name
self.UserModel = auth.get_user_model()
self._client_id = settings.OIDC_RS_CLIENT_ID
self._client_secret = settings.OIDC_RS_CLIENT_SECRET
self._encryption_encoding = settings.OIDC_RS_ENCRYPTION_ENCODING
self._encryption_algorithm = settings.OIDC_RS_ENCRYPTION_ALGO
self._signing_algorithm = settings.OIDC_RS_SIGNING_ALGO
self._scopes = settings.OIDC_RS_SCOPES
if (
not self._client_id
or not self._client_secret
or not authorization_server_client
):
raise ImproperlyConfigured(
f"Could not instantiate {self.__class__.__name__}: some parameters are missing.",
)
self._authorization_server_client = authorization_server_client
self._introspection_claims_registry = jose_jwt.JWTClaimsRegistry(
iss={"essential": True, "value": self._authorization_server_client.url},
active={"essential": True},
scope={"essential": True}, # content validated in _verify_user_info
# optional in RFC, but required here: "client_id" or "aud"
**{settings.OIDC_RS_AUDIENCE_CLAIM: {"essential": True}},
)
# Declare the token origin audience: to know where the token comes from
# and store it for further use in the application
self.token_origin_audience = None
# pylint: disable=unused-argument
def get_or_create_user(self, access_token, id_token, payload):
"""Maintain API compatibility with OIDCAuthentication class from mozilla-django-oidc
Params 'id_token', 'payload' won't be used, and our implementation will only
support 'get_user', not 'get_or_create_user'.
"""
return self.get_user(access_token)
def get_user(self, access_token):
"""Get user from an access token emitted by the authorization server.
This method will submit the access token to the authorization server for
introspection, to ensure its validity and obtain the associated metadata.
It follows the specifications outlined in RFC7662 https://www.rfc-editor.org/info/rfc7662,
https://datatracker.ietf.org/doc/html/draft-ietf-oauth-jwt-introspection-response-12.
In our eGovernment applications, the standard RFC 7662 doesn't provide sufficient security.
Its introspection response is a plain JSON object. Therefore, we use the draft RFC
that extends RFC 7662 by returning a signed and encrypted JWT for stronger assurance that
the authorization server issued the token introspection response.
"""
self.token_origin_audience = None # Reset the token origin audience
jwt = self._introspect(access_token)
claims = self._verify_claims(jwt)
user_info = self._verify_user_info(claims)
sub = user_info.get("sub")
if sub is None:
message = "User info contained no recognizable user identification"
logger.debug(message)
raise SuspiciousOperation(message)
try:
user = self.UserModel.objects.get(sub=sub)
except self.UserModel.DoesNotExist:
logger.debug("Login failed: No user with %s found", sub)
return None
self.token_origin_audience = str(user_info[settings.OIDC_RS_AUDIENCE_CLAIM])
return user
def _verify_user_info(self, introspection_response):
"""Verify the 'introspection_response' to get valid and relevant user info.
The 'introspection_response' should be still active, and while authenticating
the resource owner should have requested relevant scope to access her data in
our resource server.
Scope should be configured to match between the AS and the RS. The AS will filter
all the scopes the resource owner requested to expose only the relevant ones to
our resource server.
"""
active = introspection_response.get("active", None)
if not active:
message = "Introspection response is not active."
logger.debug(message)
raise SuspiciousOperation(message)
requested_scopes = introspection_response.get("scope", None).split(" ")
if set(self._scopes).isdisjoint(set(requested_scopes)):
message = "Introspection response contains any required scopes."
logger.debug(message)
raise SuspiciousOperation(message)
audience = introspection_response.get(settings.OIDC_RS_AUDIENCE_CLAIM, None)
if not audience:
raise SuspiciousOperation(
"Introspection response does not provide source audience."
)
return introspection_response
def _get_introspection(self, access_token):
"""Request introspection of an access token to the authorization server."""
try:
introspection_response = (
self._authorization_server_client.get_introspection(
self._client_id,
self._client_secret,
access_token,
)
)
except HTTPError as err:
message = "Could not fetch introspection"
logger.debug("%s. Exception:", message, exc_info=True)
raise SuspiciousOperation(message) from err
return introspection_response
def _introspect(self, access_token) -> Token:
"""
Introspect an access token to the authorization server.
Not implemented here:
- introspection_str might be a JWT, not a JSON
and therefore should be decoded
- introspection_str might be a JWS, not a JSON
and therefore should be verified (using self._decode)
- introspection_str might be a JWE, not a JSON
and therefore should be decrypted (using self._decrypt)
"""
introspection_str = self._get_introspection(access_token)
try:
introspection_data = json.loads(introspection_str)
except JSONDecodeError as exc:
raise SuspiciousOperation("Invalid JSON for introspection") from exc
return Token({}, introspection_data)
def _decrypt(self, encrypted_token, private_key):
"""Decrypt the token encrypted by the Authorization Server (AS).
Resource Server (RS)'s public key is used for encryption, and its private
key is used for decryption. The RS's public key is exposed to the AS via a JWKS endpoint.
Encryption Algorithm and Encoding should be configured to match between the AS
and the RS.
"""
try:
decrypted_token = jose_jwe.decrypt_compact(
encrypted_token,
private_key,
algorithms=[self._encryption_algorithm, self._encryption_encoding],
)
except Exception as err:
message = "Token decryption failed"
logger.debug("%s. Exception:", message, exc_info=True)
raise SuspiciousOperation(message) from err
return decrypted_token.plaintext
def _decode(self, encoded_token, public_key_set):
"""Decode the token signed by the Authorization Server (AS).
AS's private key is used for signing, and its public key is used for decoding.
The AS public key is exposed via a JWK endpoint.
Signing Algorithm should be configured to match between the AS and the RS.
"""
try:
token = jose_jwt.decode(
encoded_token,
public_key_set,
algorithms=[self._signing_algorithm],
)
except ValueError as err:
message = "Token decoding failed"
logger.debug("%s. Exception:", message, exc_info=True)
raise SuspiciousOperation(message) from err
return token
def _verify_claims(self, token):
"""Verify the claims of the token to ensure authentication security.
By verifying these claims, we ensure that the token was issued by a
trusted authorization server and is intended for this specific
resource server. This prevents various types of attacks, such as
token substitution or misuse of tokens issued for different clients.
"""
try:
self._introspection_claims_registry.validate(token.claims)
except (InvalidClaimError, InvalidTokenError) as err:
message = "Failed to validate token's claims"
logger.debug("%s. Exception:", message, exc_info=True)
raise SuspiciousOperation(message) from err
return token.claims
class JWTResourceServerBackend(ResourceServerBackend):
"""Backend of an OAuth 2.0 resource server.
Override the classic ResourceServerBackend to support JWT introspection
tokens as described in the RFC https://datatracker.ietf.org/doc/rfc9701/
For this implementation, we expect the introspection response to be
in JWT format, signed and encrypted.
"""
def _introspect(self, access_token):
"""
Introspect an access token to the authorization server.
We expect here the `token_introspection` claim to contain the
JWT information to be verified:
- iss
- aud
- iat
Not implemented here:
- introspection_str might be a JSON, not a JWE
- introspection_str might be a JWS, not a JWE
"""
introspection_str = self._get_introspection(access_token)
private_key = utils.import_private_key_from_settings()
jws = self._decrypt(introspection_str, private_key=private_key)
try:
public_key_set = self._authorization_server_client.import_public_keys()
except (TypeError, ValueError, AttributeError, HTTPError) as err:
message = "Could get authorization server JWKS"
logger.debug("%s. Exception:", message, exc_info=True)
raise SuspiciousOperation(message) from err
jwt = self._decode(jws, public_key_set)
token_registry = jose_jwt.JWTClaimsRegistry(
iss={"essential": True, "value": self._authorization_server_client.url},
aud={"essential": True, "value": self._client_id},
token_introspection={"essential": True},
)
try:
token_registry.validate(jwt.claims)
except (InvalidClaimError, InvalidTokenError) as err:
logger.exception("JWTResourceServerBackend: %s", err)
raise SuspiciousOperation("Failed to validate token's claims") from err
introspection_data = jwt.claims["token_introspection"]
return Token({}, introspection_data)
class ResourceServerImproperlyConfiguredBackend:
"""Fallback backend for improperly configured Resource Servers."""
token_origin_audience = None
def get_or_create_user(self, access_token, id_token, payload):
"""Indicate that the Resource Server is improperly configured."""
raise AuthenticationFailed("Resource Server is improperly configured")

View File

@@ -1,97 +0,0 @@
"""Resource Server Clients classes"""
from django.core.exceptions import ImproperlyConfigured
import requests
from joserfc.jwk import KeySet
class AuthorizationServerClient:
"""Client for interacting with an OAuth 2.0 authorization server.
An authorization server issues access tokens to client applications after authenticating
and obtaining authorization from the resource owner. It also provides endpoints for token
introspection and JSON Web Key Sets (JWKS) to validate and decode tokens.
This client facilitates communication with the authorization server, including:
- Fetching token introspection responses.
- Fetching JSON Web Key Sets (JWKS) for token validation.
- Setting appropriate headers for secure communication as recommended by RFC drafts.
"""
# ruff: noqa: PLR0913 PLR0917
# pylint: disable=too-many-positional-arguments
# pylint: disable=too-many-arguments
def __init__(
self,
url,
url_jwks,
url_introspection,
verify_ssl,
timeout,
proxy,
):
"""Require at a minimum url, url_jwks and url_introspection."""
if not url or not url_jwks or not url_introspection:
raise ImproperlyConfigured(
"Could not instantiate AuthorizationServerClient, some parameters are missing."
)
self.url = url
self._url_introspection = url_introspection
self._url_jwks = url_jwks
self._verify_ssl = verify_ssl
self._timeout = timeout
self._proxy = proxy
@property
def _introspection_headers(self):
"""Get HTTP header for the introspection request.
Notify the authorization server that we expect a signed and encrypted response
by setting the appropriate 'Accept' header.
This follows the recommendation from the draft RFC:
https://datatracker.ietf.org/doc/html/draft-ietf-oauth-jwt-introspection-response-12.
"""
return {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/token-introspection+jwt",
}
def get_introspection(self, client_id, client_secret, token):
"""Retrieve introspection response about a token."""
response = requests.post(
self._url_introspection,
data={
"client_id": client_id,
"client_secret": client_secret,
"token": token,
},
headers=self._introspection_headers,
verify=self._verify_ssl,
timeout=self._timeout,
proxies=self._proxy,
)
response.raise_for_status()
return response.text
def get_jwks(self):
"""Retrieve Authorization Server JWKS."""
response = requests.get(
self._url_jwks,
verify=self._verify_ssl,
timeout=self._timeout,
proxies=self._proxy,
)
response.raise_for_status()
return response.json()
def import_public_keys(self):
"""Retrieve and import Authorization Server JWKS."""
jwks = self.get_jwks()
public_keys = KeySet.import_key_set(jwks)
return public_keys

View File

@@ -1,53 +0,0 @@
"""
Mixins for resource server views.
"""
from rest_framework import exceptions as drf_exceptions
from .authentication import ResourceServerAuthentication
class ResourceServerMixin:
"""
Mixin for resource server views:
- Restrict the authentication class to ResourceServerAuthentication.
- Adds the Service Provider ID to the serializer context.
- Fetch the Service Provider ID from the OIDC introspected token stored
in the request.
This Mixin *must* be used in every view that should act as a resource server.
"""
authentication_classes = [ResourceServerAuthentication]
def get_serializer_context(self):
"""Extra context provided to the serializer class."""
context = super().get_serializer_context()
# When used as a resource server, we need to know the audience to automatically:
# - add the Service Provider to the team "scope" on creation
context["from_service_provider_audience"] = (
self._get_service_provider_audience()
)
return context
def _get_service_provider_audience(self):
"""Return the audience of the Service Provider from the OIDC introspected token."""
if not isinstance(
self.request.successful_authenticator, ResourceServerAuthentication
):
# We could check request.resource_server_token_audience here, but it's
# more explicit to check the authenticator type and assert the attribute
# existence.
return None
# When used as a resource server, the request has a token audience
service_provider_audience = self.request.resource_server_token_audience
if not service_provider_audience: # should not happen
raise drf_exceptions.AuthenticationFailed(
"Resource server token audience not found in request"
)
return service_provider_audience

View File

@@ -1,9 +0,0 @@
"""Resource Server URL Configuration"""
from django.urls import path
from .views import JWKSView
urlpatterns = [
path("jwks", JWKSView.as_view(), name="resource_server_jwks"),
]

View File

@@ -1,48 +0,0 @@
"""Resource Server utils functions"""
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from joserfc.jwk import JWKRegistry
def import_private_key_from_settings():
"""Import the private key used by the resource server when interacting with the OIDC provider.
This private key is crucial; its public components are exposed in the JWK endpoints,
while its private component is used for decrypting the introspection token retrieved
from the OIDC provider.
By default, we recommend using RSAKey for asymmetric encryption,
known for its strong security features.
Note:
- The function requires the 'OIDC_RS_PRIVATE_KEY_STR' setting to be configured.
- The 'OIDC_RS_ENCRYPTION_KEY_TYPE' and 'OIDC_RS_ENCRYPTION_ALGO' settings can be customized
based on the chosen key type.
Raises:
ImproperlyConfigured: If the private key setting is missing, empty, or incorrect.
Returns:
joserfc.jwk.JWK: The imported private key as a JWK object.
"""
private_key_str = getattr(settings, "OIDC_RS_PRIVATE_KEY_STR", None)
if not private_key_str:
raise ImproperlyConfigured(
"OIDC_RS_PRIVATE_KEY_STR setting is missing or empty."
)
private_key_pem = private_key_str.encode()
try:
private_key = JWKRegistry.import_key(
private_key_pem,
key_type=settings.OIDC_RS_ENCRYPTION_KEY_TYPE,
parameters={"alg": settings.OIDC_RS_ENCRYPTION_ALGO, "use": "enc"},
)
except ValueError as err:
raise ImproperlyConfigured("OIDC_RS_PRIVATE_KEY_STR setting is wrong.") from err
return private_key

View File

@@ -1,40 +0,0 @@
"""Resource Server views"""
from django.core.exceptions import ImproperlyConfigured
from joserfc.jwk import KeySet
from rest_framework.response import Response
from rest_framework.views import APIView
from . import utils
class JWKSView(APIView):
"""
API endpoint for retrieving a JSON Web Keys Set (JWKS).
Returns:
Response: JSON response containing the JWKS data.
"""
authentication_classes = [] # disable authentication
permission_classes = [] # disable permission
def get(self, request):
"""Handle GET requests to retrieve JSON Web Keys Set (JWKS).
Returns:
Response: JSON response containing the JWKS data.
"""
try:
private_key = utils.import_private_key_from_settings()
except (ImproperlyConfigured, ValueError) as err:
return Response({"error": str(err)}, status=500)
try:
jwk = KeySet([private_key]).as_dict(private=False)
except (TypeError, ValueError, AttributeError):
return Response({"error": "Could not load key"}, status=500)
return Response(jwk)

View File

@@ -160,7 +160,7 @@ def test_authentication_getter_existing_user_via_email(
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
with django_assert_num_queries(2):
with django_assert_num_queries(3): # user by email + user by sub + update sub
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
@@ -238,7 +238,7 @@ def test_authentication_getter_new_user_with_email(monkeypatch):
assert user.sub == "123"
assert user.email == email
assert user.name == "John Doe"
assert user.password == "!"
assert user.has_usable_password() is False
assert models.User.objects.count() == 1

View File

@@ -1,10 +0,0 @@
"""Unit tests for the Authentication URLs."""
from core.authentication.urls import urlpatterns
def test_urls_override_default_mozilla_django_oidc():
"""Custom URL patterns should override default ones from Mozilla Django OIDC."""
url_names = [u.name for u in urlpatterns]
assert url_names.index("oidc_logout_custom") < url_names.index("oidc_logout")

View File

@@ -1,231 +0,0 @@
"""Unit tests for the Authentication Views."""
from unittest import mock
from urllib.parse import parse_qs, urlparse
from django.contrib.auth.models import AnonymousUser
from django.contrib.sessions.middleware import SessionMiddleware
from django.core.exceptions import SuspiciousOperation
from django.test import RequestFactory
from django.test.utils import override_settings
from django.urls import reverse
from django.utils import crypto
import pytest
from rest_framework.test import APIClient
from core import factories
from core.authentication.views import OIDCLogoutCallbackView, OIDCLogoutView
pytestmark = pytest.mark.django_db
@override_settings(LOGOUT_REDIRECT_URL="/example-logout")
def test_view_logout_anonymous():
"""Anonymous users calling the logout url,
should be redirected to the specified LOGOUT_REDIRECT_URL."""
url = reverse("oidc_logout_custom")
response = APIClient().get(url)
assert response.status_code == 302
assert response.url == "/example-logout"
@mock.patch.object(
OIDCLogoutView, "construct_oidc_logout_url", return_value="/example-logout"
)
def test_view_logout(mocked_oidc_logout_url):
"""Authenticated users should be redirected to OIDC provider for logout."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
url = reverse("oidc_logout_custom")
response = client.get(url)
mocked_oidc_logout_url.assert_called_once()
assert response.status_code == 302
assert response.url == "/example-logout"
@override_settings(LOGOUT_REDIRECT_URL="/default-redirect-logout")
@mock.patch.object(
OIDCLogoutView, "construct_oidc_logout_url", return_value="/default-redirect-logout"
)
def test_view_logout_no_oidc_provider(mocked_oidc_logout_url):
"""Authenticated users should be logged out when no OIDC provider is available."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
url = reverse("oidc_logout_custom")
with mock.patch("mozilla_django_oidc.views.auth.logout") as mock_logout:
response = client.get(url)
mocked_oidc_logout_url.assert_called_once()
mock_logout.assert_called_once()
assert response.status_code == 302
assert response.url == "/default-redirect-logout"
@override_settings(LOGOUT_REDIRECT_URL="/example-logout")
def test_view_logout_callback_anonymous():
"""Anonymous users calling the logout callback url,
should be redirected to the specified LOGOUT_REDIRECT_URL."""
url = reverse("oidc_logout_callback")
response = APIClient().get(url)
assert response.status_code == 302
assert response.url == "/example-logout"
@pytest.mark.parametrize(
"initial_oidc_states",
[{}, {"other_state": "foo"}],
)
def test_view_logout_persist_state(initial_oidc_states):
"""State value should be persisted in session's data."""
user = factories.UserFactory()
request = RequestFactory().request()
request.user = user
middleware = SessionMiddleware(get_response=lambda x: x)
middleware.process_request(request)
if initial_oidc_states:
request.session["oidc_states"] = initial_oidc_states
request.session.save()
mocked_state = "mock_state"
OIDCLogoutView().persist_state(request, mocked_state)
assert "oidc_states" in request.session
assert request.session["oidc_states"] == {
"mock_state": {},
**initial_oidc_states,
}
@override_settings(OIDC_OP_LOGOUT_ENDPOINT="/example-logout")
@mock.patch.object(OIDCLogoutView, "persist_state")
@mock.patch.object(crypto, "get_random_string", return_value="mocked_state")
def test_view_logout_construct_oidc_logout_url(
mocked_get_random_string, mocked_persist_state
):
"""Should construct the logout URL to initiate the logout flow with the OIDC provider."""
user = factories.UserFactory()
request = RequestFactory().request()
request.user = user
middleware = SessionMiddleware(get_response=lambda x: x)
middleware.process_request(request)
request.session["oidc_id_token"] = "mocked_oidc_id_token"
request.session.save()
redirect_url = OIDCLogoutView().construct_oidc_logout_url(request)
mocked_persist_state.assert_called_once()
mocked_get_random_string.assert_called_once()
params = parse_qs(urlparse(redirect_url).query)
assert params["id_token_hint"][0] == "mocked_oidc_id_token"
assert params["state"][0] == "mocked_state"
url = reverse("oidc_logout_callback")
assert url in params["post_logout_redirect_uri"][0]
@override_settings(LOGOUT_REDIRECT_URL="/")
def test_view_logout_construct_oidc_logout_url_none_id_token():
"""If no ID token is available in the session,
the user should be redirected to the final URL."""
user = factories.UserFactory()
request = RequestFactory().request()
request.user = user
middleware = SessionMiddleware(get_response=lambda x: x)
middleware.process_request(request)
redirect_url = OIDCLogoutView().construct_oidc_logout_url(request)
assert redirect_url == "/"
@pytest.mark.parametrize(
"initial_state",
[None, {"other_state": "foo"}],
)
def test_view_logout_callback_wrong_state(initial_state):
"""Should raise an error if OIDC state doesn't match session data."""
user = factories.UserFactory()
request = RequestFactory().request()
request.user = user
middleware = SessionMiddleware(get_response=lambda x: x)
middleware.process_request(request)
if initial_state:
request.session["oidc_states"] = initial_state
request.session.save()
callback_view = OIDCLogoutCallbackView.as_view()
with pytest.raises(SuspiciousOperation) as excinfo:
callback_view(request)
assert (
str(excinfo.value) == "OIDC callback state not found in session `oidc_states`!"
)
@override_settings(LOGOUT_REDIRECT_URL="/example-logout")
def test_view_logout_callback():
"""If state matches, callback should clear OIDC state and redirects."""
user = factories.UserFactory()
request = RequestFactory().get("/logout-callback/", data={"state": "mocked_state"})
request.user = user
middleware = SessionMiddleware(get_response=lambda x: x)
middleware.process_request(request)
mocked_state = "mocked_state"
request.session["oidc_states"] = {mocked_state: {}}
request.session.save()
callback_view = OIDCLogoutCallbackView.as_view()
with mock.patch("mozilla_django_oidc.views.auth.logout") as mock_logout:
def clear_user(request):
# Assert state is cleared prior to logout
assert request.session["oidc_states"] == {}
request.user = AnonymousUser()
mock_logout.side_effect = clear_user
response = callback_view(request)
mock_logout.assert_called_once()
assert response.status_code == 302
assert response.url == "/example-logout"

View File

@@ -11,15 +11,15 @@ from joserfc import jwe as jose_jwe
from joserfc import jwt as jose_jwt
from joserfc.rfc7518.rsa_key import RSAKey
from jwt.utils import to_base64url_uint
from lasuite.oidc_resource_server.authentication import (
ResourceServerAuthentication,
get_resource_server_backend,
)
from rest_framework.request import Request as DRFRequest
from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED
from core.factories import UserFactory
from core.models import ServiceProvider
from core.resource_server.authentication import (
ResourceServerAuthentication,
get_resource_server_backend,
)
pytestmark = pytest.mark.django_db
@@ -30,7 +30,7 @@ def jwt_resource_server_backend_fixture(settings):
_original_backend = str(settings.OIDC_RS_BACKEND_CLASS)
settings.OIDC_RS_BACKEND_CLASS = (
"core.resource_server.backend.JWTResourceServerBackend"
"lasuite.oidc_resource_server.backend.JWTResourceServerBackend"
)
get_resource_server_backend.cache_clear()
@@ -71,7 +71,7 @@ def test_resource_server_authentication_class(client, settings):
"""
assert (
settings.OIDC_RS_BACKEND_CLASS
== "core.resource_server.backend.ResourceServerBackend"
== "lasuite.oidc_resource_server.backend.ResourceServerBackend"
)
settings.OIDC_RS_CLIENT_ID = "some_client_id"

View File

@@ -1,486 +0,0 @@
"""
Test for the Resource Server (RS) Backend.
"""
import json
# pylint: disable=W0212
from logging import Logger
from unittest.mock import Mock, patch
from django.contrib import auth
from django.core.exceptions import SuspiciousOperation
from django.test.utils import override_settings
import pytest
from joserfc.errors import InvalidClaimError, InvalidTokenError
from joserfc.jwt import JWTClaimsRegistry, Token
from requests.exceptions import HTTPError
from core.resource_server.backend import JWTResourceServerBackend, ResourceServerBackend
@pytest.fixture(name="mock_authorization_server")
def fixture_mock_authorization_server():
"""Mock an Authorization Server client."""
mock_server = Mock()
mock_server.url = "https://auth.server.com"
return mock_server
@pytest.fixture(name="mock_token")
def fixture_mock_token():
"""Mock a token"""
mock_token = Mock()
mock_token.claims = {"sub": "user123", "iss": "https://auth.server.com"}
return mock_token
@pytest.fixture(name="resource_server_backend")
def fixture_resource_server_backend(settings, mock_authorization_server):
"""Generate a Resource Server backend."""
settings.OIDC_RS_CLIENT_ID = "client_id"
settings.OIDC_RS_CLIENT_SECRET = "client_secret"
settings.OIDC_RS_ENCRYPTION_ENCODING = "A256GCM"
settings.OIDC_RS_ENCRYPTION_ALGO = "RSA-OAEP"
settings.OIDC_RS_SIGNING_ALGO = "ES256"
settings.OIDC_RS_SCOPES = ["groups"]
return ResourceServerBackend(mock_authorization_server)
@pytest.fixture(name="jwt_resource_server_backend")
def fixture_jwt_resource_server_backend(settings, mock_authorization_server):
"""Generate a Resource Server backend."""
settings.OIDC_RS_CLIENT_ID = "client_id"
settings.OIDC_RS_CLIENT_SECRET = "client_secret"
settings.OIDC_RS_SCOPES = ["groups"]
return JWTResourceServerBackend(mock_authorization_server)
@override_settings(OIDC_RS_CLIENT_ID="client_id")
@override_settings(OIDC_RS_CLIENT_SECRET="client_secret")
@override_settings(OIDC_RS_ENCRYPTION_ENCODING="A256GCM")
@override_settings(OIDC_RS_ENCRYPTION_ALGO="RSA-OAEP")
@override_settings(OIDC_RS_SIGNING_ALGO="RS256")
@override_settings(OIDC_RS_SCOPES=["scopes"])
@patch.object(auth, "get_user_model", return_value="foo")
def test_backend_initialization(mock_get_user_model, mock_authorization_server):
"""Test the ResourceServerBackend initialization."""
backend = ResourceServerBackend(mock_authorization_server)
mock_get_user_model.assert_called_once()
assert backend.UserModel == "foo"
assert backend._client_id == "client_id"
assert backend._client_secret == "client_secret"
assert backend._encryption_encoding == "A256GCM"
assert backend._encryption_algorithm == "RSA-OAEP"
assert backend._signing_algorithm == "RS256"
assert backend._scopes == ["scopes"]
assert backend._authorization_server_client == mock_authorization_server
assert isinstance(backend._introspection_claims_registry, JWTClaimsRegistry)
assert backend._introspection_claims_registry.options == {
"active": {"essential": True},
"client_id": {"essential": True},
"iss": {"essential": True, "value": "https://auth.server.com"},
"scope": {"essential": True},
}
@patch.object(ResourceServerBackend, "get_user", return_value="user")
def test_get_or_create_user(mock_get_user, resource_server_backend):
"""Test 'get_or_create_user' method."""
access_token = "access_token"
res = resource_server_backend.get_or_create_user(access_token, None, None)
mock_get_user.assert_called_once_with(access_token)
assert res == "user"
def test_verify_claims_success(resource_server_backend, mock_token):
"""Test '_verify_claims' method with a successful response."""
with patch.object(
resource_server_backend._introspection_claims_registry, "validate"
) as mock_validate:
resource_server_backend._verify_claims(mock_token)
mock_validate.assert_called_once_with(mock_token.claims)
def test_verify_claims_invalid_claim_error(resource_server_backend, mock_token):
"""Test '_verify_claims' method with an invalid claim error."""
with patch.object(
resource_server_backend._introspection_claims_registry, "validate"
) as mock_validate:
mock_validate.side_effect = InvalidClaimError("claim_name")
expected_message = "Failed to validate token's claims"
with patch.object(Logger, "debug") as mock_logger_debug:
with pytest.raises(SuspiciousOperation, match=expected_message):
resource_server_backend._verify_claims(mock_token)
mock_logger_debug.assert_called_once_with(
"%s. Exception:", expected_message, exc_info=True
)
def test_verify_claims_invalid_token_error(resource_server_backend, mock_token):
"""Test '_verify_claims' method with an invalid token error."""
with patch.object(
resource_server_backend._introspection_claims_registry, "validate"
) as mock_validate:
mock_validate.side_effect = InvalidTokenError
expected_message = "Failed to validate token's claims"
with patch.object(Logger, "debug") as mock_logger_debug:
with pytest.raises(SuspiciousOperation, match=expected_message):
resource_server_backend._verify_claims(mock_token)
mock_logger_debug.assert_called_once_with(
"%s. Exception:", expected_message, exc_info=True
)
def test_decode_success(resource_server_backend):
"""Test '_decode' method with a successful response."""
encoded_token = "valid_encoded_token"
public_key_set = Mock()
expected_decoded_token = {"sub": "user123"}
with patch(
"joserfc.jwt.decode", return_value=expected_decoded_token
) as mock_decode:
decoded_token = resource_server_backend._decode(encoded_token, public_key_set)
mock_decode.assert_called_once_with(
"valid_encoded_token", public_key_set, algorithms=["ES256"]
)
assert decoded_token == expected_decoded_token
def test_decode_failure(resource_server_backend):
"""Test '_decode' method with a ValueError"""
encoded_token = "invalid_encoded_token"
public_key_set = Mock()
with patch("joserfc.jwt.decode", side_effect=ValueError):
with patch.object(Logger, "debug") as mock_logger_debug:
with pytest.raises(SuspiciousOperation, match="Token decoding failed"):
resource_server_backend._decode(encoded_token, public_key_set)
mock_logger_debug.assert_called_once_with(
"%s. Exception:", "Token decoding failed", exc_info=True
)
def test_decrypt_success(resource_server_backend):
"""Test '_decrypt' method with a successful response."""
encrypted_token = "valid_encrypted_token"
private_key = "private_key"
expected_decrypted_token = Mock()
expected_decrypted_token.plaintext = "blah"
with patch(
"joserfc.jwe.decrypt_compact", return_value=expected_decrypted_token
) as mock_decrypt:
decrypted_token = resource_server_backend._decrypt(encrypted_token, private_key)
mock_decrypt.assert_called_once_with(
encrypted_token, private_key, algorithms=["RSA-OAEP", "A256GCM"]
)
assert decrypted_token == "blah"
def test_decrypt_failure(resource_server_backend):
"""Test '_decrypt' method with an Exception."""
encrypted_token = "invalid_encrypted_token"
private_key = "private_key"
with patch(
"joserfc.jwe.decrypt_compact", side_effect=Exception("Decryption error")
):
expected_message = "Token decryption failed"
with patch.object(Logger, "debug") as mock_logger_debug:
with pytest.raises(SuspiciousOperation, match=expected_message):
resource_server_backend._decrypt(encrypted_token, private_key)
mock_logger_debug.assert_called_once_with(
"%s. Exception:", expected_message, exc_info=True
)
def test_resource_server_backend_introspect_success(resource_server_backend):
"""Test '_introspect' method with a successful response."""
token = "valid_token"
json_data = {"sub": "user123"}
resource_server_backend._authorization_server_client.get_introspection = Mock(
return_value=json.dumps(json_data)
)
result = resource_server_backend._introspect(token)
assert result.claims == json_data
resource_server_backend._authorization_server_client.get_introspection.assert_called_once_with(
"client_id", "client_secret", token
)
@patch(
"core.resource_server.utils.import_private_key_from_settings",
return_value="private_key",
)
# pylint: disable=unused-argument
def test_jwt_resource_server_backend_introspect_success(
mock_import_private_key_from_settings, jwt_resource_server_backend
):
"""Test '_introspect' method with a successful response."""
jwt_rs_backend = jwt_resource_server_backend # prevent line too long
token = "valid_token"
jwe = "valid_jwe"
jws = "valid_jws"
jwt = {
"aud": "client_id",
"iss": "https://auth.server.com",
"token_introspection": {
"sub": "user123",
"aud": "client_id",
"iss": "https://auth.server.com",
},
}
jwt_rs_backend._authorization_server_client.get_introspection = Mock(
return_value=jwe
)
jwt_rs_backend._decrypt = Mock(return_value=jws)
jwt_rs_backend._authorization_server_client.import_public_keys = Mock(
return_value="public_key_set"
)
jwt_rs_backend._decode = Mock(return_value=Token({}, jwt))
result = jwt_rs_backend._introspect(token)
assert result.claims == {
"sub": "user123",
"aud": "client_id",
"iss": "https://auth.server.com",
}
jwt_rs_backend._authorization_server_client.get_introspection.assert_called_once_with(
"client_id", "client_secret", token
)
jwt_rs_backend._decrypt.assert_called_once_with(jwe, private_key="private_key")
jwt_rs_backend._authorization_server_client.import_public_keys.assert_called_once()
jwt_rs_backend._decode.assert_called_once_with(jws, "public_key_set")
def test_introspect_introspection_failure(resource_server_backend):
"""Test '_introspect' method when introspection to the AS fails."""
token = "invalid_token"
resource_server_backend._authorization_server_client.get_introspection.side_effect = HTTPError(
"Introspection error"
)
with patch.object(Logger, "debug") as mock_logger_debug:
expected_message = "Could not fetch introspection"
with pytest.raises(SuspiciousOperation, match=expected_message):
resource_server_backend._introspect(token)
mock_logger_debug.assert_called_once_with(
"%s. Exception:", expected_message, exc_info=True
)
@patch(
"core.resource_server.utils.import_private_key_from_settings",
return_value="private_key",
)
# pylint: disable=unused-argument
def test_jwt_resource_server_backend_introspect_public_key_import_failure(
mock_import_private_key_from_settings, jwt_resource_server_backend
):
"""Test '_introspect' method when fetching AS's jwks fails."""
token = "valid_token"
jwe = "valid_jwe"
jws = "valid_jws"
jwt_resource_server_backend._authorization_server_client.get_introspection = Mock(
return_value=jwe
)
jwt_resource_server_backend._decrypt = Mock(return_value=jws)
(
jwt_resource_server_backend._authorization_server_client.import_public_keys.side_effect
) = HTTPError("Public key error")
with patch.object(Logger, "debug") as mock_logger_debug:
expected_message = "Could get authorization server JWKS"
with pytest.raises(SuspiciousOperation, match=expected_message):
jwt_resource_server_backend._introspect(token)
mock_logger_debug.assert_called_once_with(
"%s. Exception:", expected_message, exc_info=True
)
def test_verify_user_info_success(resource_server_backend, settings):
"""Test '_verify_user_info' with a successful response."""
# test default OIDC_RS_AUDIENCE_CLAIM = client_id
introspection_response = {"active": True, "scope": "groups", "client_id": "123"}
result = resource_server_backend._verify_user_info(introspection_response)
assert result == introspection_response
# test OIDC_RS_AUDIENCE_CLAIM = aud is taken into account
settings.OIDC_RS_AUDIENCE_CLAIM = "aud"
introspection_response = {"active": True, "scope": "groups", "aud": "123"}
result = resource_server_backend._verify_user_info(introspection_response)
assert result == introspection_response
def test_verify_user_info_inactive(resource_server_backend):
"""Test '_verify_user_info' with an inactive introspection response."""
introspection_response = {"active": False, "scope": "groups"}
expected_message = "Introspection response is not active."
with patch.object(Logger, "debug") as mock_logger_debug:
with pytest.raises(SuspiciousOperation, match=expected_message):
resource_server_backend._verify_user_info(introspection_response)
mock_logger_debug.assert_called_once_with(expected_message)
def test_verify_user_info_wrong_scopes(resource_server_backend):
"""Test '_verify_user_info' with wrong requested scopes."""
introspection_response = {"active": True, "scope": "wrong-scopes"}
expected_message = "Introspection response contains any required scopes."
with patch.object(Logger, "debug") as mock_logger_debug:
with pytest.raises(SuspiciousOperation, match=expected_message):
resource_server_backend._verify_user_info(introspection_response)
mock_logger_debug.assert_called_once_with(expected_message)
def test_resource_server_backend_get_user_success(resource_server_backend):
"""Test '_get_user' with a successful response."""
access_token = "valid_access_token"
mock_jwt = Mock()
mock_claims = {"sub": "user123", "client_id": "123"}
mock_user = Mock()
resource_server_backend._introspect = Mock(return_value=mock_jwt)
resource_server_backend._verify_claims = Mock(return_value=mock_claims)
resource_server_backend._verify_user_info = Mock(return_value=mock_claims)
resource_server_backend.UserModel.objects.get = Mock(return_value=mock_user)
user = resource_server_backend.get_user(access_token)
assert user == mock_user
resource_server_backend._introspect.assert_called_once_with(access_token)
resource_server_backend._verify_claims.assert_called_once_with(mock_jwt)
resource_server_backend._verify_user_info.assert_called_once_with(mock_claims)
resource_server_backend.UserModel.objects.get.assert_called_once_with(sub="user123")
def test_get_user_could_not_introspect(resource_server_backend):
"""Test '_get_user' with introspection failing."""
access_token = "valid_access_token"
resource_server_backend._introspect = Mock(
side_effect=SuspiciousOperation("Invalid jwt")
)
resource_server_backend._verify_claims = Mock()
resource_server_backend._verify_user_info = Mock()
with pytest.raises(SuspiciousOperation, match="Invalid jwt"):
resource_server_backend.get_user(access_token)
resource_server_backend._introspect.assert_called_once_with(access_token)
resource_server_backend._verify_claims.assert_not_called()
resource_server_backend._verify_user_info.assert_not_called()
def test_get_user_invalid_introspection_response(resource_server_backend):
"""Test '_get_user' with an invalid introspection response."""
access_token = "valid_access_token"
mock_jwt = Mock()
resource_server_backend._introspect = Mock(return_value=mock_jwt)
resource_server_backend._verify_claims = Mock(
side_effect=SuspiciousOperation("Invalid claims")
)
resource_server_backend._verify_user_info = Mock()
with pytest.raises(SuspiciousOperation, match="Invalid claims"):
resource_server_backend.get_user(access_token)
resource_server_backend._introspect.assert_called_once_with(access_token)
resource_server_backend._verify_claims.assert_called_once_with(mock_jwt)
resource_server_backend._verify_user_info.assert_not_called()
def test_resource_server_backend_get_user_user_not_found(resource_server_backend):
"""Test '_get_user' if the user is not found."""
access_token = "valid_access_token"
mock_jwt = Mock()
mock_claims = {"sub": "user123"}
resource_server_backend._introspect = Mock(return_value=mock_jwt)
resource_server_backend._verify_claims = Mock(return_value=mock_claims)
resource_server_backend._verify_user_info = Mock(return_value=mock_claims)
resource_server_backend.UserModel.objects.get = Mock(
side_effect=resource_server_backend.UserModel.DoesNotExist
)
with patch.object(Logger, "debug") as mock_logger_debug:
user = resource_server_backend.get_user(access_token)
assert user is None
resource_server_backend._introspect.assert_called_once_with(access_token)
resource_server_backend._verify_claims.assert_called_once_with(mock_jwt)
resource_server_backend._verify_user_info.assert_called_once_with(mock_claims)
resource_server_backend.UserModel.objects.get.assert_called_once_with(
sub="user123"
)
mock_logger_debug.assert_called_once_with(
"Login failed: No user with %s found", "user123"
)
def test_get_user_no_user_identification(resource_server_backend):
"""Test '_get_user' if the response miss a user identification."""
access_token = "valid_access_token"
mock_jwt = Mock()
mock_claims = {"token_introspection": {}}
resource_server_backend._introspect = Mock(return_value=mock_jwt)
resource_server_backend._verify_claims = Mock(return_value=mock_claims)
resource_server_backend._verify_user_info = Mock(
return_value=mock_claims["token_introspection"]
)
expected_message = "User info contained no recognizable user identification"
with patch.object(Logger, "debug") as mock_logger_debug:
with pytest.raises(SuspiciousOperation, match=expected_message):
resource_server_backend.get_user(access_token)
mock_logger_debug.assert_called_once_with(expected_message)

View File

@@ -1,187 +0,0 @@
"""
Test for the Resource Server (RS) clients classes.
"""
# pylint: disable=W0212
from unittest.mock import MagicMock, patch
import pytest
from joserfc.jwk import KeySet, RSAKey
from requests.exceptions import HTTPError
from core.resource_server.clients import AuthorizationServerClient
@pytest.fixture(name="client")
def fixture_client():
"""Generate an Authorization Server client."""
return AuthorizationServerClient(
url="https://auth.example.com/api/v2",
url_jwks="https://auth.example.com/api/v2/jwks",
url_introspection="https://auth.example.com/api/v2/introspect",
verify_ssl=True,
timeout=5,
proxy=None,
)
def test_authorization_server_client_initialization():
"""Test the AuthorizationServerClient initialization."""
new_client = AuthorizationServerClient(
url="https://auth.example.com/api/v2",
url_jwks="https://auth.example.com/api/v2/jwks",
url_introspection="https://auth.example.com/api/v2/checktoken/foo",
verify_ssl=True,
timeout=5,
proxy=None,
)
assert new_client.url == "https://auth.example.com/api/v2"
assert (
new_client._url_introspection
== "https://auth.example.com/api/v2/checktoken/foo"
)
assert new_client._url_jwks == "https://auth.example.com/api/v2/jwks"
assert new_client._verify_ssl is True
assert new_client._timeout == 5
assert new_client._proxy is None
def test_introspection_headers(client):
"""Test the introspection headers to ensure they match the expected values."""
assert client._introspection_headers == {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/token-introspection+jwt",
}
@patch("requests.post")
def test_get_introspection_success(mock_post, client):
"""Test 'get_introspection' method with a successful response."""
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_response.text = "introspection response"
mock_post.return_value = mock_response
result = client.get_introspection("client_id", "client_secret", "token")
assert result == "introspection response"
mock_post.assert_called_once_with(
"https://auth.example.com/api/v2/introspect",
data={
"client_id": "client_id",
"client_secret": "client_secret",
"token": "token",
},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/token-introspection+jwt",
},
verify=True,
timeout=5,
proxies=None,
)
@patch("requests.post", side_effect=HTTPError())
# pylint: disable=(unused-argument
def test_get_introspection_error(mock_post, client):
"""Test 'get_introspection' method with an HTTPError."""
with pytest.raises(HTTPError):
client.get_introspection("client_id", "client_secret", "token")
@patch("requests.get")
def test_get_jwks_success(mock_get, client):
"""Test 'get_jwks' method with a successful response."""
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"jwks": "foo"}
mock_get.return_value = mock_response
result = client.get_jwks()
assert result == {"jwks": "foo"}
mock_get.assert_called_once_with(
"https://auth.example.com/api/v2/jwks",
verify=client._verify_ssl,
timeout=client._timeout,
proxies=client._proxy,
)
@patch("requests.get")
def test_get_jwks_error(mock_get, client):
"""Test 'get_jwks' method with an HTTPError."""
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = HTTPError(
response=MagicMock(status=500)
)
mock_get.return_value = mock_response
with pytest.raises(HTTPError):
client.get_jwks()
@patch("requests.get")
def test_import_public_keys_valid(mock_get, client):
"""Test 'import_public_keys' method with a successful response."""
mocked_key = RSAKey.generate_key(2048)
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"keys": [mocked_key.as_dict()]}
mock_get.return_value = mock_response
response = client.import_public_keys()
assert isinstance(response, KeySet)
assert response.as_dict() == KeySet([mocked_key]).as_dict()
@patch("requests.get")
def test_import_public_keys_http_error(mock_get, client):
"""Test 'import_public_keys' method with an HTTPError."""
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = HTTPError(
response=MagicMock(status=500)
)
mock_get.return_value = mock_response
with pytest.raises(HTTPError):
client.import_public_keys()
@patch("requests.get")
def test_import_public_keys_empty_jwks(mock_get, client):
"""Test 'import_public_keys' method with empty keys response."""
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"keys": []}
mock_get.return_value = mock_response
response = client.import_public_keys()
assert isinstance(response, KeySet)
assert response.as_dict() == {"keys": []}
@patch("requests.get")
def test_import_public_keys_invalid_jwks(mock_get, client):
"""Test 'import_public_keys' method with invalid keys response."""
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"keys": [{"foo": "foo"}]}
mock_get.return_value = mock_response
with pytest.raises(ValueError):
client.import_public_keys()

View File

@@ -1,88 +0,0 @@
"""
Test for the Resource Server (RS) utils functions.
"""
from django.core.exceptions import ImproperlyConfigured
from django.test.utils import override_settings
import pytest
from joserfc.jwk import ECKey, RSAKey
from core.resource_server.utils import import_private_key_from_settings
PRIVATE_KEY_STR_MOCKED = """-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC3boG1kwEGUYL+
U58RPrVToIsF9jHB64S6WJIIInPmAclBciXFb6BWG11mbRIgo8ha3WVnC/tGHbXb
ndiKdrH2vKHOsDhV9AmgHgNgWaUK9L0uuKEb/xMLePYWsYlgzcQJx8RZY7RQyWqE
20WfzFxeuCE7QMb6VXSOgwQMnJsKocguIh3VCI9RIBq3B1kdgW35AD63YKOygmGx
qjcWwbjhKLvkF7LpBdlyAEzOKqg4T5uCcHMfksMW2+foTJx70RrZM/KHU+Zysuw7
uhhVsgPBG+CsqBSjHQhs7jzymqxtQAfe1FkrCRxOq5Pv2Efr7kgtVSkJJiX3KutM
vnWuEypxAgMBAAECggEAGqKS9pbrN+vnmb7yMsqYgVVnQn0aggZNHlLkl4ZLLnuV
aemlhur7zO0JzajqUC+AFQOfaQxiFu8S/FoJ+qccFdATrcPEVmTKbgPVqSyzLKlX
fByGll5eOVT95NMwN8yBGgt2HSW/ZditXS/KxxahVgamGqjAC9MTSutGz/8Ae1U+
DNDBJCc6RAqu3T02tV9A2pSpVC1rSktDMpLUTscnsfxpaEQATd9DJUcHEvIwoX8q
GJpycPEhNhdPXqpln5SoMHcf/zS5ssF/Mce0lJJXYyE0LnEk9X12jMWyBqmLqXUY
cKLyynaFbis0DpQppwKx2y8GpL76k+Ci4dOHIvFknQKBgQDj/2WRMcWOvfBrggzj
FHpcme2gSo5A5c0CVyI+Xkf1Zab6UR6T7GiImEoj9tq0+o2WEix9rwoypgMBq8rz
/rrJAPSZjgv6z71k4EnO2FIB5R03vQmoBRCN8VlgvLM0xv52zyjV4Wx66Q4MDjyH
EgkpHyB0FzRZh0UzhnE/pYSetQKBgQDN9eLB1nA4CBSr1vMGNfQyfBQl3vpO9EP4
VSS3KnUqCIjJeLu682Ylu7SFxcJAfzUpy5S43hEvcuJsagsVKfmCAGcYZs9/xq3I
vzYyhaEOS5ezNxLSh4+yCNBPlmrmDyoazag0t8H8YQFBN6BVcxbATHqdWGUhIhYN
eEpEMOh2TQKBgGBr7kRNTENlyHtu8IxIaMcowfn8DdUcWmsW9oBx1vTNHKTYEZp1
bG/4F8LF7xCCtcY1wWMV17Y7xyG5yYcOv2eqY8dc72wO1wYGZLB5g5URlB2ycJcC
LVIaM7ZZl2BGl+8fBSIOx5XjYfFvQ+HLmtwtMchm19jVAEseHF7SXRfRAoGAK15j
aT2mU6Yf9C9G7T/fM+I8u9zACHAW/+ut14PxN/CkHQh3P16RW9CyqpiB1uLyZuKf
Zm4cYElotDuAKey0xVMgYlsDxnwni+X3m5vX1hLE1s/5/qrc7zg75QZfbCI1U3+K
s88d4e7rPLhh4pxhZgy0pP1ADkIHMr7ppIJH8OECgYEApNfbgsJVPAMzucUhJoJZ
OmZHbyCtJvs4b+zxnmhmSbopifNCgS4zjXH9qC7tsUph1WE6L2KXvtApHGD5H4GQ
IH5em4M/pHIcsqCi1qggBMbdvzHBUtC3R4sK0CpEFHlN+Y59aGazidcN2FPupNJv
MbyqKyC6DAzv4jEEhHaN7oY=
-----END PRIVATE KEY-----
"""
@override_settings(OIDC_RS_PRIVATE_KEY_STR=PRIVATE_KEY_STR_MOCKED)
@pytest.mark.parametrize("mocked_private_key", [None, ""])
def test_import_private_key_from_settings_missing_or_empty_key(
settings, mocked_private_key
):
"""Should raise an exception if the settings 'OIDC_RS_PRIVATE_KEY_STR' is missing or empty."""
settings.OIDC_RS_PRIVATE_KEY_STR = mocked_private_key
with pytest.raises(
ImproperlyConfigured,
match="OIDC_RS_PRIVATE_KEY_STR setting is missing or empty.",
):
import_private_key_from_settings()
@pytest.mark.parametrize("mocked_private_key", ["123", "foo", "invalid_key"])
@override_settings(OIDC_RS_PRIVATE_KEY_STR=PRIVATE_KEY_STR_MOCKED)
@override_settings(OIDC_RS_ENCRYPTION_KEY_TYPE="RSA")
@override_settings(OIDC_RS_ENCRYPTION_ALGO="RS256")
def test_import_private_key_from_settings_incorrect_key(settings, mocked_private_key):
"""Should raise an exception if the setting 'OIDC_RS_PRIVATE_KEY_STR' has an incorrect value."""
settings.OIDC_RS_PRIVATE_KEY_STR = mocked_private_key
with pytest.raises(
ImproperlyConfigured, match="OIDC_RS_PRIVATE_KEY_STR setting is wrong."
):
import_private_key_from_settings()
@override_settings(OIDC_RS_PRIVATE_KEY_STR=PRIVATE_KEY_STR_MOCKED)
@override_settings(OIDC_RS_ENCRYPTION_KEY_TYPE="RSA")
@override_settings(OIDC_RS_ENCRYPTION_ALGO="RS256")
def test_import_private_key_from_settings_success_rsa_key():
"""Should import private key string as an RSA key."""
private_key = import_private_key_from_settings()
assert isinstance(private_key, RSAKey)
@override_settings(OIDC_RS_PRIVATE_KEY_STR=PRIVATE_KEY_STR_MOCKED)
@override_settings(OIDC_RS_ENCRYPTION_KEY_TYPE="EC")
@override_settings(OIDC_RS_ENCRYPTION_ALGO="ES256")
def test_import_private_key_from_settings_success_ec_key():
"""Should import private key string as an EC key."""
private_key = import_private_key_from_settings()
assert isinstance(private_key, ECKey)

View File

@@ -1,70 +0,0 @@
"""
Tests for the Resource Server (RS) Views.
"""
from unittest import mock
from django.core.exceptions import ImproperlyConfigured
from django.urls import reverse
import pytest
from joserfc.jwk import RSAKey
from rest_framework.test import APIClient
pytestmark = pytest.mark.django_db
@mock.patch("core.resource_server.utils.import_private_key_from_settings")
def test_view_jwks_valid_public_key(mock_import_private_key_from_settings):
"""JWKs endpoint should return a set of valid Json Web Key"""
mocked_key = RSAKey.generate_key(2048)
mock_import_private_key_from_settings.return_value = mocked_key
url = reverse("resource_server_jwks")
response = APIClient().get(url)
mock_import_private_key_from_settings.assert_called_once()
assert response.status_code == 200
assert response["Content-Type"] == "application/json"
jwks = response.json()
assert jwks == {"keys": [mocked_key.as_dict(private=False)]}
# Security checks to make sure no details from the private key are exposed
private_details = ["d", "p", "q", "dp", "dq", "qi", "oth", "r", "t"]
assert all(
private_detail not in jwks["keys"][0].keys()
for private_detail in private_details
)
@mock.patch("core.resource_server.utils.import_private_key_from_settings")
def test_view_jwks_invalid_private_key(mock_import_private_key_from_settings):
"""JWKS endpoint should return a proper exception when loading keys fails."""
mock_import_private_key_from_settings.return_value = "wrong_key"
url = reverse("resource_server_jwks")
response = APIClient().get(url)
mock_import_private_key_from_settings.assert_called_once()
assert response.status_code == 500
assert response.json() == {"error": "Could not load key"}
@mock.patch("core.resource_server.utils.import_private_key_from_settings")
def test_view_jwks_missing_private_key(mock_import_private_key_from_settings):
"""JWKS endpoint should return a proper exception when private key is missing."""
mock_import_private_key_from_settings.side_effect = ImproperlyConfigured("foo.")
url = reverse("resource_server_jwks")
response = APIClient().get(url)
mock_import_private_key_from_settings.assert_called_once()
assert response.status_code == 500
assert response.json() == {"error": "foo."}

View File

@@ -9,8 +9,7 @@ from django.contrib.auth import get_user_model
import pytest
import responses
from faker import Faker
from core.resource_server.authentication import ResourceServerAuthentication
from lasuite.oidc_resource_server.authentication import ResourceServerAuthentication
User = get_user_model()
fake = Faker()
@@ -48,7 +47,7 @@ def _force_login_via_resource_server(
):
client_fixture.force_login(
user,
backend="core.resource_server.authentication.ResourceServerAuthentication",
backend="lasuite.oidc_resource_server.authentication.ResourceServerAuthentication",
)
yield

View File

@@ -3,11 +3,11 @@
from django.conf import settings
from django.urls import include, path, re_path
from lasuite.oidc_login.urls import urlpatterns as oidc_urls
from lasuite.oidc_resource_server.urls import urlpatterns as resource_server_urls
from rest_framework.routers import DefaultRouter
from core.api.client import viewsets
from core.authentication.urls import urlpatterns as oidc_urls
from core.resource_server.urls import urlpatterns as resource_server_urls
from mailbox_oauth2.urls import urlpatterns as mailbox_oauth2_urls

View File

@@ -2,10 +2,10 @@
from django.urls import include, path
from lasuite.oidc_resource_server.urls import urlpatterns as resource_server_urls
from rest_framework.routers import DefaultRouter
from core.api.resource_server import viewsets
from core.resource_server.urls import urlpatterns as resource_server_urls
# - Main endpoints
# Contacts will be added later

View File

@@ -474,9 +474,9 @@ class Base(Configuration):
environ_prefix=None,
)
USER_OIDC_FIELDS_TO_NAME = values.ListValue(
USER_OIDC_FIELDS_TO_FULLNAME = values.ListValue(
default=["first_name", "last_name"],
environ_name="USER_OIDC_FIELDS_TO_NAME",
environ_name="USER_OIDC_FIELDS_TO_FULLNAME",
environ_prefix=None,
)
OIDC_ORGANIZATION_REGISTRATION_ID_FIELD = values.Value(
@@ -490,7 +490,7 @@ class Base(Configuration):
)
OIDC_OP_URL = values.Value(None, environ_name="OIDC_OP_URL", environ_prefix=None)
OIDC_RS_BACKEND_CLASS = values.Value(
"core.resource_server.backend.ResourceServerBackend",
"lasuite.oidc_resource_server.backend.ResourceServerBackend",
environ_name="OIDC_RS_BACKEND_CLASS",
environ_prefix=None,
)

View File

@@ -34,6 +34,7 @@ dependencies = [
"django-configurations==2.5.1",
"django-cors-headers==4.7.0",
"django-countries==7.6.1",
"django-lasuite==0.0.1",
"django-oauth-toolkit==3.0.1",
"django-parler==2.3",
"django-redis==5.4.0",

View File

@@ -50,7 +50,7 @@ backend:
key: OIDC_RP_CLIENT_SECRET
OIDC_RP_SIGN_ALGO: RS256
OIDC_RP_SCOPES: "openid email siret given_name usual_name"
USER_OIDC_FIELDS_TO_NAME: "given_name,usual_name"
USER_OIDC_FIELDS_TO_FULLNAME: "given_name,usual_name"
OIDC_REDIRECT_ALLOWED_HOSTS: https://desk.127.0.0.1.nip.io
OIDC_AUTH_REQUEST_EXTRA_PARAMS: "{'acr_values': 'eidas1'}"
OAUTH2_PROVIDER_OIDC_ENABLED: True