From e2d362bc7775284cdd118efc1dd4eacb10faf3cf Mon Sep 17 00:00:00 2001 From: Quentin BEY Date: Wed, 2 Apr 2025 12:02:06 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9E=95(backend)=20add=20`django-lasuite`=20d?= =?UTF-8?q?ependency?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use the OIDC backends from the new library. --- CHANGELOG.md | 1 + .../core/api/resource_server/viewsets.py | 2 +- src/backend/core/authentication/backends.py | 158 ++---- src/backend/core/authentication/urls.py | 18 - src/backend/core/authentication/views.py | 137 ----- src/backend/core/resource_server/__init__.py | 1 - .../core/resource_server/authentication.py | 80 --- src/backend/core/resource_server/backend.py | 310 ----------- src/backend/core/resource_server/clients.py | 97 ---- src/backend/core/resource_server/mixins.py | 53 -- src/backend/core/resource_server/urls.py | 9 - src/backend/core/resource_server/utils.py | 48 -- src/backend/core/resource_server/views.py | 40 -- .../tests/authentication/test_backends.py | 4 +- .../core/tests/authentication/test_urls.py | 10 - .../core/tests/authentication/test_views.py | 231 --------- .../resource_server/test_authentication.py | 12 +- .../tests/resource_server/test_backend.py | 486 ------------------ .../tests/resource_server/test_clients.py | 187 ------- .../core/tests/resource_server/test_utils.py | 88 ---- .../core/tests/resource_server/test_views.py | 70 --- .../tests/resource_server_api/conftest.py | 5 +- src/backend/people/api_urls.py | 4 +- src/backend/people/resource_server_urls.py | 2 +- src/backend/people/settings.py | 6 +- src/backend/pyproject.toml | 1 + src/helm/env.d/dev/values.desk.yaml.gotmpl | 2 +- 27 files changed, 51 insertions(+), 2011 deletions(-) delete mode 100644 src/backend/core/authentication/urls.py delete mode 100644 src/backend/core/authentication/views.py delete mode 100644 src/backend/core/resource_server/__init__.py delete mode 100644 src/backend/core/resource_server/authentication.py delete mode 100644 src/backend/core/resource_server/backend.py delete mode 100644 src/backend/core/resource_server/clients.py delete mode 100644 src/backend/core/resource_server/mixins.py delete mode 100644 src/backend/core/resource_server/urls.py delete mode 100644 src/backend/core/resource_server/utils.py delete mode 100644 src/backend/core/resource_server/views.py delete mode 100644 src/backend/core/tests/authentication/test_urls.py delete mode 100644 src/backend/core/tests/authentication/test_views.py delete mode 100644 src/backend/core/tests/resource_server/test_backend.py delete mode 100644 src/backend/core/tests/resource_server/test_clients.py delete mode 100644 src/backend/core/tests/resource_server/test_utils.py delete mode 100644 src/backend/core/tests/resource_server/test_views.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 5575bfd..447f74f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to ### Added +- ➕(backend) add django-lasuite dependency #858 - ✨(plugins) add endpoint to list siret of active organizations #771 - ✨(core) create AccountServiceAuthentication backend #771 - ✨(core) create AccountService model #771 diff --git a/src/backend/core/api/resource_server/viewsets.py b/src/backend/core/api/resource_server/viewsets.py index b20ac31..c1c509f 100644 --- a/src/backend/core/api/resource_server/viewsets.py +++ b/src/backend/core/api/resource_server/viewsets.py @@ -6,6 +6,7 @@ from functools import reduce from django.db.models import OuterRef, Prefetch, Q, Subquery, Value from django.db.models.functions import Coalesce +from lasuite.oidc_resource_server.mixins import ResourceServerMixin from rest_framework import ( filters, mixins, @@ -15,7 +16,6 @@ from rest_framework import ( from core import models from core.api import permissions from core.api.client.viewsets import Pagination -from core.resource_server.mixins import ResourceServerMixin from . import serializers diff --git a/src/backend/core/authentication/backends.py b/src/backend/core/authentication/backends.py index b7c311a..4df8074 100644 --- a/src/backend/core/authentication/backends.py +++ b/src/backend/core/authentication/backends.py @@ -1,18 +1,15 @@ """Authentication Backends for the People core app.""" import logging -from email.headerregistry import Address -from typing import Optional from django.conf import settings -from django.contrib.auth import get_user_model from django.core.exceptions import SuspiciousOperation from django.utils.translation import gettext_lazy as _ -import requests -from mozilla_django_oidc.auth import ( - OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend, +from lasuite.oidc_login.backends import ( + OIDCAuthenticationBackend as LaSuiteOIDCAuthenticationBackend, ) +from lasuite.tools.email import get_domain_from_email from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed @@ -26,100 +23,44 @@ from core.models import ( logger = logging.getLogger(__name__) -User = get_user_model() - -def get_domain_from_email(email: Optional[str]) -> Optional[str]: - """Extract domain from email.""" - try: - return Address(addr_spec=email).domain - except (ValueError, AttributeError): - return None - - -class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend): +class OIDCAuthenticationBackend(LaSuiteOIDCAuthenticationBackend): """Custom OpenID Connect (OIDC) Authentication Backend. This class overrides the default OIDC Authentication Backend to accommodate differences in the User model, and handles signed and/or encrypted UserInfo response. """ - def get_userinfo(self, access_token, id_token, payload): - """Return user details dictionary. + def get_extra_claims(self, user_info): + """ + Return extra claims from user_info. - Parameters: - - access_token (str): The access token. - - id_token (str): The id token (unused). - - payload (dict): The token payload (unused). - - Note: The id_token and payload parameters are unused in this implementation, - but were kept to preserve base method signature. - - Note: It handles signed and/or encrypted UserInfo Response. It is required by - Agent Connect, which follows the OIDC standard. It forces us to override the - base method, which deal with 'application/json' response. + Args: + user_info (dict): The user information dictionary. Returns: - - dict: User details dictionary obtained from the OpenID Connect user endpoint. + dict: A dictionary of extra claims. + """ - - user_response = requests.get( - self.OIDC_OP_USER_ENDPOINT, - headers={"Authorization": f"Bearer {access_token}"}, - verify=self.get_settings("OIDC_VERIFY_SSL", True), - timeout=self.get_settings("OIDC_TIMEOUT", None), - proxies=self.get_settings("OIDC_PROXY", None), - ) - user_response.raise_for_status() - userinfo = self.verify_token(user_response.text) - return userinfo - - def get_or_create_user(self, access_token, id_token, payload): - """Return a User based on userinfo. Create a new user if no match is found. - - Parameters: - - access_token (str): The access token. - - id_token (str): The ID token. - - payload (dict): The user payload. - - Returns: - - User: An existing or newly created User instance. - - Raises: - - Exception: Raised when user creation is not allowed and no existing user is found. - """ - - user_info = self.get_userinfo(access_token, id_token, payload) - - sub = user_info.get("sub") - if not sub: - raise SuspiciousOperation( - _("User info contained no recognizable user identification") - ) - - # Get user's full name from OIDC fields defined in settings - email = user_info.get("email") - - claims = { - "sub": sub, - "email": email, - "name": self.compute_full_name(user_info), - } + extra_claims = super().get_extra_claims(user_info) if settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD: - claims[settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD] = user_info.get( - settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD + extra_claims[settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD] = ( + user_info.get(settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD) ) + return extra_claims - # if sub is absent, try matching on email - user = self.get_existing_user(sub, email) + def post_get_or_create_user(self, user, claims): + """ + Post-processing after user creation or retrieval. - if user: - if not user.is_active: - raise SuspiciousOperation(_("User account is disabled")) - self.update_user_if_needed(user, claims) - elif self.get_settings("OIDC_CREATE_USER", True): - user = self.create_user(claims) + Args: + user (User): The user instance. + claims (dict): The claims dictionary. + Returns: + - None + + """ # Data cleaning, to be removed when user organization is null=False # or all users have an organization. # See https://github.com/suitenumerique/people/issues/504 @@ -127,7 +68,7 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend): organization_registration_id = claims.get( settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD ) - domain = get_domain_from_email(email) + domain = get_domain_from_email(claims["email"]) try: organization, organization_created = ( Organization.objects.get_or_create_from_user_claims( @@ -154,8 +95,6 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend): "User %s updated with organization %s", user.pk, organization ) - return user - def create_user(self, claims): """Return a newly created User instance.""" sub = claims.get("sub") @@ -167,8 +106,9 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend): name = claims.get("name") # Extract or create the organization from the data - organization_registration_id = claims.get( - settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD + organization_registration_id = claims.pop( + settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD, + None, ) domain = get_domain_from_email(email) try: @@ -188,13 +128,8 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend): logger.info("Creating user %s / %s", sub, email) - user = self.UserModel.objects.create( - organization=organization, - password="!", # noqa: S106 - sub=sub, - email=email, - name=name, - ) + user = super().create_user(claims | {"organization": organization}) + if organization_created: # Warning: we may remove this behavior in the near future when we # add a feature to claim the organization ownership. @@ -218,37 +153,6 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend): return user - def compute_full_name(self, user_info): - """Compute user's full name based on OIDC fields in settings.""" - name_fields = settings.USER_OIDC_FIELDS_TO_NAME - full_name = " ".join( - user_info[field] for field in name_fields if user_info.get(field) - ) - return full_name or None - - def get_existing_user(self, sub, email): - """Fetch existing user by sub or email.""" - try: - return User.objects.get(sub=sub) - except User.DoesNotExist: - if email and settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION: - try: - return User.objects.get(email=email) - except User.DoesNotExist: - pass - return None - - def update_user_if_needed(self, user, claims): - """Update user claims if they have changed.""" - updated_claims = {} - for key in ["email", "name"]: - claim_value = claims.get(key) - if claim_value and claim_value != getattr(user, key): - updated_claims[key] = claim_value - - if updated_claims: - self.UserModel.objects.filter(sub=user.sub).update(**updated_claims) - class AccountServiceAuthentication(BaseAuthentication): """Authentication backend for account services using Authorization header. diff --git a/src/backend/core/authentication/urls.py b/src/backend/core/authentication/urls.py deleted file mode 100644 index a8b44cf..0000000 --- a/src/backend/core/authentication/urls.py +++ /dev/null @@ -1,18 +0,0 @@ -"""Authentication URLs for the People core app.""" - -from django.urls import path - -from mozilla_django_oidc.urls import urlpatterns as mozilla_oidc_urls - -from .views import OIDCLogoutCallbackView, OIDCLogoutView - -urlpatterns = [ - # Override the default 'logout/' path from Mozilla Django OIDC with our custom view. - path("logout/", OIDCLogoutView.as_view(), name="oidc_logout_custom"), - path( - "logout-callback/", - OIDCLogoutCallbackView.as_view(), - name="oidc_logout_callback", - ), - *mozilla_oidc_urls, -] diff --git a/src/backend/core/authentication/views.py b/src/backend/core/authentication/views.py deleted file mode 100644 index 61fe0ac..0000000 --- a/src/backend/core/authentication/views.py +++ /dev/null @@ -1,137 +0,0 @@ -"""Authentication Views for the People core app.""" - -from urllib.parse import urlencode - -from django.contrib import auth -from django.core.exceptions import SuspiciousOperation -from django.http import HttpResponseRedirect -from django.urls import reverse -from django.utils import crypto - -from mozilla_django_oidc.utils import ( - absolutify, -) -from mozilla_django_oidc.views import ( - OIDCLogoutView as MozillaOIDCOIDCLogoutView, -) - - -class OIDCLogoutView(MozillaOIDCOIDCLogoutView): - """Custom logout view for handling OpenID Connect (OIDC) logout flow. - - Adds support for handling logout callbacks from the identity provider (OP) - by initiating the logout flow if the user has an active session. - - The Django session is retained during the logout process to persist the 'state' OIDC parameter. - This parameter is crucial for maintaining the integrity of the logout flow between this call - and the subsequent callback. - """ - - @staticmethod - def persist_state(request, state): - """Persist the given 'state' parameter in the session's 'oidc_states' dictionary - - This method is used to store the OIDC state parameter in the session, according to the - structure expected by Mozilla Django OIDC's 'add_state_and_verifier_and_nonce_to_session' - utility function. - """ - - if "oidc_states" not in request.session or not isinstance( - request.session["oidc_states"], dict - ): - request.session["oidc_states"] = {} - - request.session["oidc_states"][state] = {} - request.session.save() - - def construct_oidc_logout_url(self, request): - """Create the redirect URL for interfacing with the OIDC provider. - - Retrieves the necessary parameters from the session and constructs the URL - required to initiate logout with the OpenID Connect provider. - - If no ID token is found in the session, the logout flow will not be initiated, - and the method will return the default redirect URL. - - The 'state' parameter is generated randomly and persisted in the session to ensure - its integrity during the subsequent callback. - """ - - oidc_logout_endpoint = self.get_settings("OIDC_OP_LOGOUT_ENDPOINT") - - if not oidc_logout_endpoint: - return self.redirect_url - - reverse_url = reverse("oidc_logout_callback") - id_token = request.session.get("oidc_id_token", None) - - if not id_token: - return self.redirect_url - - query = { - "id_token_hint": id_token, - "state": crypto.get_random_string(self.get_settings("OIDC_STATE_SIZE", 32)), - "post_logout_redirect_uri": absolutify(request, reverse_url), - } - - self.persist_state(request, query["state"]) - - return f"{oidc_logout_endpoint}?{urlencode(query)}" - - def post(self, request): - """Handle user logout. - - If the user is not authenticated, redirects to the default logout URL. - Otherwise, constructs the OIDC logout URL and redirects the user to start - the logout process. - - If the user is redirected to the default logout URL, ensure her Django session - is terminated. - """ - - logout_url = self.redirect_url - - if request.user.is_authenticated: - logout_url = self.construct_oidc_logout_url(request) - - # If the user is not redirected to the OIDC provider, ensure logout - if logout_url == self.redirect_url: - auth.logout(request) - - return HttpResponseRedirect(logout_url) - - -class OIDCLogoutCallbackView(MozillaOIDCOIDCLogoutView): - """Custom view for handling the logout callback from the OpenID Connect (OIDC) provider. - - Handles the callback after logout from the identity provider (OP). - Verifies the state parameter and performs necessary logout actions. - - The Django session is maintained during the logout process to ensure the integrity - of the logout flow initiated in the previous step. - """ - - http_method_names = ["get"] - - def get(self, request): - """Handle the logout callback. - - If the user is not authenticated, redirects to the default logout URL. - Otherwise, verifies the state parameter and performs necessary logout actions. - """ - - if not request.user.is_authenticated: - return HttpResponseRedirect(self.redirect_url) - - state = request.GET.get("state") - - if state not in request.session.get("oidc_states", {}): - msg = "OIDC callback state not found in session `oidc_states`!" - raise SuspiciousOperation(msg) - - del request.session["oidc_states"][state] - request.session.save() - - auth.logout(request) - - return HttpResponseRedirect(self.redirect_url) diff --git a/src/backend/core/resource_server/__init__.py b/src/backend/core/resource_server/__init__.py deleted file mode 100644 index 7cc02e3..0000000 --- a/src/backend/core/resource_server/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Backend resource server module.""" diff --git a/src/backend/core/resource_server/authentication.py b/src/backend/core/resource_server/authentication.py deleted file mode 100644 index baafe91..0000000 --- a/src/backend/core/resource_server/authentication.py +++ /dev/null @@ -1,80 +0,0 @@ -"""Resource Server Authentication""" - -import base64 -import binascii -import logging -from functools import lru_cache - -from django.conf import settings -from django.core.exceptions import ImproperlyConfigured -from django.utils.module_loading import import_string - -from mozilla_django_oidc.contrib.drf import OIDCAuthentication - -from .backend import ResourceServerImproperlyConfiguredBackend -from .clients import AuthorizationServerClient - -logger = logging.getLogger(__name__) - - -@lru_cache(maxsize=None) -def get_resource_server_backend(): - """Return the resource server backend class based on the settings.""" - return import_string(settings.OIDC_RS_BACKEND_CLASS) - - -class ResourceServerAuthentication(OIDCAuthentication): - """Authenticate clients using the token received from the authorization server.""" - - def __init__(self): - """Require authentication to be configured in order to instantiate.""" - super().__init__() - - try: - authorization_server_client = AuthorizationServerClient( - url=settings.OIDC_OP_URL, - verify_ssl=settings.OIDC_VERIFY_SSL, - timeout=settings.OIDC_TIMEOUT, - proxy=settings.OIDC_PROXY, - url_jwks=settings.OIDC_OP_JWKS_ENDPOINT, - url_introspection=settings.OIDC_OP_INTROSPECTION_ENDPOINT, - ) - self.backend = get_resource_server_backend()(authorization_server_client) - - except ImproperlyConfigured as err: - message = "Resource Server authentication is disabled" - logger.debug("%s. Exception: %s", message, err) - self.backend = ResourceServerImproperlyConfiguredBackend() - - def get_access_token(self, request): - """Retrieve and decode the access token from the request. - - This method overrides the 'get_access_token' method from the parent class, - to support service providers that would base64 encode the bearer token. - """ - - access_token = super().get_access_token(request) - - try: - access_token = base64.b64decode(access_token).decode("utf-8") - except (binascii.Error, TypeError): - pass - - return access_token - - def authenticate(self, request): - """ - Authenticate the request and return a tuple of (user, token) or None. - - We override the 'authenticate' method from the parent class to store - the introspected token audience inside the request. - """ - result = super().authenticate(request) # Might raise AuthenticationFailed - - if result is None: # Case when there is no access token - return None - - # Note: at this stage, the request is a "drf_request" object - request.resource_server_token_audience = self.backend.token_origin_audience - - return result diff --git a/src/backend/core/resource_server/backend.py b/src/backend/core/resource_server/backend.py deleted file mode 100644 index 6877773..0000000 --- a/src/backend/core/resource_server/backend.py +++ /dev/null @@ -1,310 +0,0 @@ -"""Resource Server Backend""" - -import json -import logging -from json import JSONDecodeError - -from django.conf import settings -from django.contrib import auth -from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation - -from joserfc import jwe as jose_jwe -from joserfc import jwt as jose_jwt -from joserfc.errors import InvalidClaimError, InvalidTokenError -from joserfc.jwt import Token -from requests.exceptions import HTTPError -from rest_framework.exceptions import AuthenticationFailed - -from . import utils - -logger = logging.getLogger(__name__) - - -class ResourceServerBackend: - """Backend of an OAuth 2.0 resource server. - - This backend is designed to authenticate resource owners to our API using the access token - they received from the authorization server. - - In the context of OAuth 2.0, a resource server is a server that hosts protected resources and - is capable of accepting and responding to protected resource requests using access tokens. - The resource server verifies the validity of the access tokens issued by the authorization - server to ensure secure access to the resources. - - For more information, visit: https://www.oauth.com/oauth2-servers/the-resource-server/ - """ - - # pylint: disable=too-many-instance-attributes - def __init__(self, authorization_server_client): - """Require client_id, client_secret set and authorization_server_client provided.""" - # pylint: disable=invalid-name - self.UserModel = auth.get_user_model() - - self._client_id = settings.OIDC_RS_CLIENT_ID - self._client_secret = settings.OIDC_RS_CLIENT_SECRET - self._encryption_encoding = settings.OIDC_RS_ENCRYPTION_ENCODING - self._encryption_algorithm = settings.OIDC_RS_ENCRYPTION_ALGO - self._signing_algorithm = settings.OIDC_RS_SIGNING_ALGO - self._scopes = settings.OIDC_RS_SCOPES - - if ( - not self._client_id - or not self._client_secret - or not authorization_server_client - ): - raise ImproperlyConfigured( - f"Could not instantiate {self.__class__.__name__}: some parameters are missing.", - ) - - self._authorization_server_client = authorization_server_client - - self._introspection_claims_registry = jose_jwt.JWTClaimsRegistry( - iss={"essential": True, "value": self._authorization_server_client.url}, - active={"essential": True}, - scope={"essential": True}, # content validated in _verify_user_info - # optional in RFC, but required here: "client_id" or "aud" - **{settings.OIDC_RS_AUDIENCE_CLAIM: {"essential": True}}, - ) - - # Declare the token origin audience: to know where the token comes from - # and store it for further use in the application - self.token_origin_audience = None - - # pylint: disable=unused-argument - def get_or_create_user(self, access_token, id_token, payload): - """Maintain API compatibility with OIDCAuthentication class from mozilla-django-oidc - - Params 'id_token', 'payload' won't be used, and our implementation will only - support 'get_user', not 'get_or_create_user'. - """ - - return self.get_user(access_token) - - def get_user(self, access_token): - """Get user from an access token emitted by the authorization server. - - This method will submit the access token to the authorization server for - introspection, to ensure its validity and obtain the associated metadata. - - It follows the specifications outlined in RFC7662 https://www.rfc-editor.org/info/rfc7662, - https://datatracker.ietf.org/doc/html/draft-ietf-oauth-jwt-introspection-response-12. - - In our eGovernment applications, the standard RFC 7662 doesn't provide sufficient security. - Its introspection response is a plain JSON object. Therefore, we use the draft RFC - that extends RFC 7662 by returning a signed and encrypted JWT for stronger assurance that - the authorization server issued the token introspection response. - """ - self.token_origin_audience = None # Reset the token origin audience - - jwt = self._introspect(access_token) - claims = self._verify_claims(jwt) - user_info = self._verify_user_info(claims) - - sub = user_info.get("sub") - if sub is None: - message = "User info contained no recognizable user identification" - logger.debug(message) - raise SuspiciousOperation(message) - try: - user = self.UserModel.objects.get(sub=sub) - except self.UserModel.DoesNotExist: - logger.debug("Login failed: No user with %s found", sub) - return None - - self.token_origin_audience = str(user_info[settings.OIDC_RS_AUDIENCE_CLAIM]) - - return user - - def _verify_user_info(self, introspection_response): - """Verify the 'introspection_response' to get valid and relevant user info. - - The 'introspection_response' should be still active, and while authenticating - the resource owner should have requested relevant scope to access her data in - our resource server. - - Scope should be configured to match between the AS and the RS. The AS will filter - all the scopes the resource owner requested to expose only the relevant ones to - our resource server. - """ - - active = introspection_response.get("active", None) - - if not active: - message = "Introspection response is not active." - logger.debug(message) - raise SuspiciousOperation(message) - - requested_scopes = introspection_response.get("scope", None).split(" ") - if set(self._scopes).isdisjoint(set(requested_scopes)): - message = "Introspection response contains any required scopes." - logger.debug(message) - raise SuspiciousOperation(message) - - audience = introspection_response.get(settings.OIDC_RS_AUDIENCE_CLAIM, None) - if not audience: - raise SuspiciousOperation( - "Introspection response does not provide source audience." - ) - - return introspection_response - - def _get_introspection(self, access_token): - """Request introspection of an access token to the authorization server.""" - try: - introspection_response = ( - self._authorization_server_client.get_introspection( - self._client_id, - self._client_secret, - access_token, - ) - ) - except HTTPError as err: - message = "Could not fetch introspection" - logger.debug("%s. Exception:", message, exc_info=True) - raise SuspiciousOperation(message) from err - - return introspection_response - - def _introspect(self, access_token) -> Token: - """ - Introspect an access token to the authorization server. - - Not implemented here: - - introspection_str might be a JWT, not a JSON - and therefore should be decoded - - introspection_str might be a JWS, not a JSON - and therefore should be verified (using self._decode) - - introspection_str might be a JWE, not a JSON - and therefore should be decrypted (using self._decrypt) - """ - introspection_str = self._get_introspection(access_token) - try: - introspection_data = json.loads(introspection_str) - except JSONDecodeError as exc: - raise SuspiciousOperation("Invalid JSON for introspection") from exc - - return Token({}, introspection_data) - - def _decrypt(self, encrypted_token, private_key): - """Decrypt the token encrypted by the Authorization Server (AS). - - Resource Server (RS)'s public key is used for encryption, and its private - key is used for decryption. The RS's public key is exposed to the AS via a JWKS endpoint. - Encryption Algorithm and Encoding should be configured to match between the AS - and the RS. - """ - - try: - decrypted_token = jose_jwe.decrypt_compact( - encrypted_token, - private_key, - algorithms=[self._encryption_algorithm, self._encryption_encoding], - ) - except Exception as err: - message = "Token decryption failed" - logger.debug("%s. Exception:", message, exc_info=True) - raise SuspiciousOperation(message) from err - - return decrypted_token.plaintext - - def _decode(self, encoded_token, public_key_set): - """Decode the token signed by the Authorization Server (AS). - - AS's private key is used for signing, and its public key is used for decoding. - The AS public key is exposed via a JWK endpoint. - Signing Algorithm should be configured to match between the AS and the RS. - """ - try: - token = jose_jwt.decode( - encoded_token, - public_key_set, - algorithms=[self._signing_algorithm], - ) - except ValueError as err: - message = "Token decoding failed" - logger.debug("%s. Exception:", message, exc_info=True) - raise SuspiciousOperation(message) from err - - return token - - def _verify_claims(self, token): - """Verify the claims of the token to ensure authentication security. - - By verifying these claims, we ensure that the token was issued by a - trusted authorization server and is intended for this specific - resource server. This prevents various types of attacks, such as - token substitution or misuse of tokens issued for different clients. - """ - try: - self._introspection_claims_registry.validate(token.claims) - except (InvalidClaimError, InvalidTokenError) as err: - message = "Failed to validate token's claims" - logger.debug("%s. Exception:", message, exc_info=True) - raise SuspiciousOperation(message) from err - - return token.claims - - -class JWTResourceServerBackend(ResourceServerBackend): - """Backend of an OAuth 2.0 resource server. - - Override the classic ResourceServerBackend to support JWT introspection - tokens as described in the RFC https://datatracker.ietf.org/doc/rfc9701/ - - For this implementation, we expect the introspection response to be - in JWT format, signed and encrypted. - """ - - def _introspect(self, access_token): - """ - Introspect an access token to the authorization server. - - We expect here the `token_introspection` claim to contain the - JWT information to be verified: - - iss - - aud - - iat - - Not implemented here: - - introspection_str might be a JSON, not a JWE - - introspection_str might be a JWS, not a JWE - """ - introspection_str = self._get_introspection(access_token) - - private_key = utils.import_private_key_from_settings() - jws = self._decrypt(introspection_str, private_key=private_key) - - try: - public_key_set = self._authorization_server_client.import_public_keys() - except (TypeError, ValueError, AttributeError, HTTPError) as err: - message = "Could get authorization server JWKS" - logger.debug("%s. Exception:", message, exc_info=True) - raise SuspiciousOperation(message) from err - - jwt = self._decode(jws, public_key_set) - - token_registry = jose_jwt.JWTClaimsRegistry( - iss={"essential": True, "value": self._authorization_server_client.url}, - aud={"essential": True, "value": self._client_id}, - token_introspection={"essential": True}, - ) - - try: - token_registry.validate(jwt.claims) - except (InvalidClaimError, InvalidTokenError) as err: - logger.exception("JWTResourceServerBackend: %s", err) - raise SuspiciousOperation("Failed to validate token's claims") from err - - introspection_data = jwt.claims["token_introspection"] - - return Token({}, introspection_data) - - -class ResourceServerImproperlyConfiguredBackend: - """Fallback backend for improperly configured Resource Servers.""" - - token_origin_audience = None - - def get_or_create_user(self, access_token, id_token, payload): - """Indicate that the Resource Server is improperly configured.""" - raise AuthenticationFailed("Resource Server is improperly configured") diff --git a/src/backend/core/resource_server/clients.py b/src/backend/core/resource_server/clients.py deleted file mode 100644 index f2ca5aa..0000000 --- a/src/backend/core/resource_server/clients.py +++ /dev/null @@ -1,97 +0,0 @@ -"""Resource Server Clients classes""" - -from django.core.exceptions import ImproperlyConfigured - -import requests -from joserfc.jwk import KeySet - - -class AuthorizationServerClient: - """Client for interacting with an OAuth 2.0 authorization server. - - An authorization server issues access tokens to client applications after authenticating - and obtaining authorization from the resource owner. It also provides endpoints for token - introspection and JSON Web Key Sets (JWKS) to validate and decode tokens. - - This client facilitates communication with the authorization server, including: - - Fetching token introspection responses. - - Fetching JSON Web Key Sets (JWKS) for token validation. - - Setting appropriate headers for secure communication as recommended by RFC drafts. - """ - - # ruff: noqa: PLR0913 PLR0917 - # pylint: disable=too-many-positional-arguments - # pylint: disable=too-many-arguments - def __init__( - self, - url, - url_jwks, - url_introspection, - verify_ssl, - timeout, - proxy, - ): - """Require at a minimum url, url_jwks and url_introspection.""" - - if not url or not url_jwks or not url_introspection: - raise ImproperlyConfigured( - "Could not instantiate AuthorizationServerClient, some parameters are missing." - ) - - self.url = url - self._url_introspection = url_introspection - self._url_jwks = url_jwks - self._verify_ssl = verify_ssl - self._timeout = timeout - self._proxy = proxy - - @property - def _introspection_headers(self): - """Get HTTP header for the introspection request. - - Notify the authorization server that we expect a signed and encrypted response - by setting the appropriate 'Accept' header. - - This follows the recommendation from the draft RFC: - https://datatracker.ietf.org/doc/html/draft-ietf-oauth-jwt-introspection-response-12. - """ - return { - "Content-Type": "application/x-www-form-urlencoded", - "Accept": "application/token-introspection+jwt", - } - - def get_introspection(self, client_id, client_secret, token): - """Retrieve introspection response about a token.""" - response = requests.post( - self._url_introspection, - data={ - "client_id": client_id, - "client_secret": client_secret, - "token": token, - }, - headers=self._introspection_headers, - verify=self._verify_ssl, - timeout=self._timeout, - proxies=self._proxy, - ) - response.raise_for_status() - return response.text - - def get_jwks(self): - """Retrieve Authorization Server JWKS.""" - response = requests.get( - self._url_jwks, - verify=self._verify_ssl, - timeout=self._timeout, - proxies=self._proxy, - ) - response.raise_for_status() - return response.json() - - def import_public_keys(self): - """Retrieve and import Authorization Server JWKS.""" - - jwks = self.get_jwks() - public_keys = KeySet.import_key_set(jwks) - - return public_keys diff --git a/src/backend/core/resource_server/mixins.py b/src/backend/core/resource_server/mixins.py deleted file mode 100644 index ae3b03a..0000000 --- a/src/backend/core/resource_server/mixins.py +++ /dev/null @@ -1,53 +0,0 @@ -""" -Mixins for resource server views. -""" - -from rest_framework import exceptions as drf_exceptions - -from .authentication import ResourceServerAuthentication - - -class ResourceServerMixin: - """ - Mixin for resource server views: - - Restrict the authentication class to ResourceServerAuthentication. - - Adds the Service Provider ID to the serializer context. - - Fetch the Service Provider ID from the OIDC introspected token stored - in the request. - - This Mixin *must* be used in every view that should act as a resource server. - """ - - authentication_classes = [ResourceServerAuthentication] - - def get_serializer_context(self): - """Extra context provided to the serializer class.""" - context = super().get_serializer_context() - - # When used as a resource server, we need to know the audience to automatically: - # - add the Service Provider to the team "scope" on creation - context["from_service_provider_audience"] = ( - self._get_service_provider_audience() - ) - - return context - - def _get_service_provider_audience(self): - """Return the audience of the Service Provider from the OIDC introspected token.""" - if not isinstance( - self.request.successful_authenticator, ResourceServerAuthentication - ): - # We could check request.resource_server_token_audience here, but it's - # more explicit to check the authenticator type and assert the attribute - # existence. - return None - - # When used as a resource server, the request has a token audience - service_provider_audience = self.request.resource_server_token_audience - - if not service_provider_audience: # should not happen - raise drf_exceptions.AuthenticationFailed( - "Resource server token audience not found in request" - ) - - return service_provider_audience diff --git a/src/backend/core/resource_server/urls.py b/src/backend/core/resource_server/urls.py deleted file mode 100644 index 41c79e6..0000000 --- a/src/backend/core/resource_server/urls.py +++ /dev/null @@ -1,9 +0,0 @@ -"""Resource Server URL Configuration""" - -from django.urls import path - -from .views import JWKSView - -urlpatterns = [ - path("jwks", JWKSView.as_view(), name="resource_server_jwks"), -] diff --git a/src/backend/core/resource_server/utils.py b/src/backend/core/resource_server/utils.py deleted file mode 100644 index cbd514b..0000000 --- a/src/backend/core/resource_server/utils.py +++ /dev/null @@ -1,48 +0,0 @@ -"""Resource Server utils functions""" - -from django.conf import settings -from django.core.exceptions import ImproperlyConfigured - -from joserfc.jwk import JWKRegistry - - -def import_private_key_from_settings(): - """Import the private key used by the resource server when interacting with the OIDC provider. - - This private key is crucial; its public components are exposed in the JWK endpoints, - while its private component is used for decrypting the introspection token retrieved - from the OIDC provider. - - By default, we recommend using RSAKey for asymmetric encryption, - known for its strong security features. - - Note: - - The function requires the 'OIDC_RS_PRIVATE_KEY_STR' setting to be configured. - - The 'OIDC_RS_ENCRYPTION_KEY_TYPE' and 'OIDC_RS_ENCRYPTION_ALGO' settings can be customized - based on the chosen key type. - - Raises: - ImproperlyConfigured: If the private key setting is missing, empty, or incorrect. - - Returns: - joserfc.jwk.JWK: The imported private key as a JWK object. - """ - - private_key_str = getattr(settings, "OIDC_RS_PRIVATE_KEY_STR", None) - if not private_key_str: - raise ImproperlyConfigured( - "OIDC_RS_PRIVATE_KEY_STR setting is missing or empty." - ) - - private_key_pem = private_key_str.encode() - - try: - private_key = JWKRegistry.import_key( - private_key_pem, - key_type=settings.OIDC_RS_ENCRYPTION_KEY_TYPE, - parameters={"alg": settings.OIDC_RS_ENCRYPTION_ALGO, "use": "enc"}, - ) - except ValueError as err: - raise ImproperlyConfigured("OIDC_RS_PRIVATE_KEY_STR setting is wrong.") from err - - return private_key diff --git a/src/backend/core/resource_server/views.py b/src/backend/core/resource_server/views.py deleted file mode 100644 index 244afd9..0000000 --- a/src/backend/core/resource_server/views.py +++ /dev/null @@ -1,40 +0,0 @@ -"""Resource Server views""" - -from django.core.exceptions import ImproperlyConfigured - -from joserfc.jwk import KeySet -from rest_framework.response import Response -from rest_framework.views import APIView - -from . import utils - - -class JWKSView(APIView): - """ - API endpoint for retrieving a JSON Web Keys Set (JWKS). - - Returns: - Response: JSON response containing the JWKS data. - """ - - authentication_classes = [] # disable authentication - permission_classes = [] # disable permission - - def get(self, request): - """Handle GET requests to retrieve JSON Web Keys Set (JWKS). - - Returns: - Response: JSON response containing the JWKS data. - """ - - try: - private_key = utils.import_private_key_from_settings() - except (ImproperlyConfigured, ValueError) as err: - return Response({"error": str(err)}, status=500) - - try: - jwk = KeySet([private_key]).as_dict(private=False) - except (TypeError, ValueError, AttributeError): - return Response({"error": "Could not load key"}, status=500) - - return Response(jwk) diff --git a/src/backend/core/tests/authentication/test_backends.py b/src/backend/core/tests/authentication/test_backends.py index 0699cb1..32b98de 100644 --- a/src/backend/core/tests/authentication/test_backends.py +++ b/src/backend/core/tests/authentication/test_backends.py @@ -160,7 +160,7 @@ def test_authentication_getter_existing_user_via_email( monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked) - with django_assert_num_queries(2): + with django_assert_num_queries(3): # user by email + user by sub + update sub user = klass.get_or_create_user( access_token="test-token", id_token=None, payload=None ) @@ -238,7 +238,7 @@ def test_authentication_getter_new_user_with_email(monkeypatch): assert user.sub == "123" assert user.email == email assert user.name == "John Doe" - assert user.password == "!" + assert user.has_usable_password() is False assert models.User.objects.count() == 1 diff --git a/src/backend/core/tests/authentication/test_urls.py b/src/backend/core/tests/authentication/test_urls.py deleted file mode 100644 index 0e20aac..0000000 --- a/src/backend/core/tests/authentication/test_urls.py +++ /dev/null @@ -1,10 +0,0 @@ -"""Unit tests for the Authentication URLs.""" - -from core.authentication.urls import urlpatterns - - -def test_urls_override_default_mozilla_django_oidc(): - """Custom URL patterns should override default ones from Mozilla Django OIDC.""" - - url_names = [u.name for u in urlpatterns] - assert url_names.index("oidc_logout_custom") < url_names.index("oidc_logout") diff --git a/src/backend/core/tests/authentication/test_views.py b/src/backend/core/tests/authentication/test_views.py deleted file mode 100644 index b06cc8c..0000000 --- a/src/backend/core/tests/authentication/test_views.py +++ /dev/null @@ -1,231 +0,0 @@ -"""Unit tests for the Authentication Views.""" - -from unittest import mock -from urllib.parse import parse_qs, urlparse - -from django.contrib.auth.models import AnonymousUser -from django.contrib.sessions.middleware import SessionMiddleware -from django.core.exceptions import SuspiciousOperation -from django.test import RequestFactory -from django.test.utils import override_settings -from django.urls import reverse -from django.utils import crypto - -import pytest -from rest_framework.test import APIClient - -from core import factories -from core.authentication.views import OIDCLogoutCallbackView, OIDCLogoutView - -pytestmark = pytest.mark.django_db - - -@override_settings(LOGOUT_REDIRECT_URL="/example-logout") -def test_view_logout_anonymous(): - """Anonymous users calling the logout url, - should be redirected to the specified LOGOUT_REDIRECT_URL.""" - - url = reverse("oidc_logout_custom") - response = APIClient().get(url) - - assert response.status_code == 302 - assert response.url == "/example-logout" - - -@mock.patch.object( - OIDCLogoutView, "construct_oidc_logout_url", return_value="/example-logout" -) -def test_view_logout(mocked_oidc_logout_url): - """Authenticated users should be redirected to OIDC provider for logout.""" - - user = factories.UserFactory() - - client = APIClient() - client.force_login(user) - - url = reverse("oidc_logout_custom") - response = client.get(url) - - mocked_oidc_logout_url.assert_called_once() - - assert response.status_code == 302 - assert response.url == "/example-logout" - - -@override_settings(LOGOUT_REDIRECT_URL="/default-redirect-logout") -@mock.patch.object( - OIDCLogoutView, "construct_oidc_logout_url", return_value="/default-redirect-logout" -) -def test_view_logout_no_oidc_provider(mocked_oidc_logout_url): - """Authenticated users should be logged out when no OIDC provider is available.""" - - user = factories.UserFactory() - - client = APIClient() - client.force_login(user) - - url = reverse("oidc_logout_custom") - - with mock.patch("mozilla_django_oidc.views.auth.logout") as mock_logout: - response = client.get(url) - mocked_oidc_logout_url.assert_called_once() - mock_logout.assert_called_once() - - assert response.status_code == 302 - assert response.url == "/default-redirect-logout" - - -@override_settings(LOGOUT_REDIRECT_URL="/example-logout") -def test_view_logout_callback_anonymous(): - """Anonymous users calling the logout callback url, - should be redirected to the specified LOGOUT_REDIRECT_URL.""" - - url = reverse("oidc_logout_callback") - response = APIClient().get(url) - - assert response.status_code == 302 - assert response.url == "/example-logout" - - -@pytest.mark.parametrize( - "initial_oidc_states", - [{}, {"other_state": "foo"}], -) -def test_view_logout_persist_state(initial_oidc_states): - """State value should be persisted in session's data.""" - - user = factories.UserFactory() - - request = RequestFactory().request() - request.user = user - - middleware = SessionMiddleware(get_response=lambda x: x) - middleware.process_request(request) - - if initial_oidc_states: - request.session["oidc_states"] = initial_oidc_states - request.session.save() - - mocked_state = "mock_state" - - OIDCLogoutView().persist_state(request, mocked_state) - - assert "oidc_states" in request.session - assert request.session["oidc_states"] == { - "mock_state": {}, - **initial_oidc_states, - } - - -@override_settings(OIDC_OP_LOGOUT_ENDPOINT="/example-logout") -@mock.patch.object(OIDCLogoutView, "persist_state") -@mock.patch.object(crypto, "get_random_string", return_value="mocked_state") -def test_view_logout_construct_oidc_logout_url( - mocked_get_random_string, mocked_persist_state -): - """Should construct the logout URL to initiate the logout flow with the OIDC provider.""" - - user = factories.UserFactory() - - request = RequestFactory().request() - request.user = user - - middleware = SessionMiddleware(get_response=lambda x: x) - middleware.process_request(request) - - request.session["oidc_id_token"] = "mocked_oidc_id_token" - request.session.save() - - redirect_url = OIDCLogoutView().construct_oidc_logout_url(request) - - mocked_persist_state.assert_called_once() - mocked_get_random_string.assert_called_once() - - params = parse_qs(urlparse(redirect_url).query) - - assert params["id_token_hint"][0] == "mocked_oidc_id_token" - assert params["state"][0] == "mocked_state" - - url = reverse("oidc_logout_callback") - assert url in params["post_logout_redirect_uri"][0] - - -@override_settings(LOGOUT_REDIRECT_URL="/") -def test_view_logout_construct_oidc_logout_url_none_id_token(): - """If no ID token is available in the session, - the user should be redirected to the final URL.""" - - user = factories.UserFactory() - - request = RequestFactory().request() - request.user = user - - middleware = SessionMiddleware(get_response=lambda x: x) - middleware.process_request(request) - - redirect_url = OIDCLogoutView().construct_oidc_logout_url(request) - - assert redirect_url == "/" - - -@pytest.mark.parametrize( - "initial_state", - [None, {"other_state": "foo"}], -) -def test_view_logout_callback_wrong_state(initial_state): - """Should raise an error if OIDC state doesn't match session data.""" - - user = factories.UserFactory() - - request = RequestFactory().request() - request.user = user - - middleware = SessionMiddleware(get_response=lambda x: x) - middleware.process_request(request) - - if initial_state: - request.session["oidc_states"] = initial_state - request.session.save() - - callback_view = OIDCLogoutCallbackView.as_view() - - with pytest.raises(SuspiciousOperation) as excinfo: - callback_view(request) - - assert ( - str(excinfo.value) == "OIDC callback state not found in session `oidc_states`!" - ) - - -@override_settings(LOGOUT_REDIRECT_URL="/example-logout") -def test_view_logout_callback(): - """If state matches, callback should clear OIDC state and redirects.""" - - user = factories.UserFactory() - - request = RequestFactory().get("/logout-callback/", data={"state": "mocked_state"}) - request.user = user - - middleware = SessionMiddleware(get_response=lambda x: x) - middleware.process_request(request) - - mocked_state = "mocked_state" - - request.session["oidc_states"] = {mocked_state: {}} - request.session.save() - - callback_view = OIDCLogoutCallbackView.as_view() - - with mock.patch("mozilla_django_oidc.views.auth.logout") as mock_logout: - - def clear_user(request): - # Assert state is cleared prior to logout - assert request.session["oidc_states"] == {} - request.user = AnonymousUser() - - mock_logout.side_effect = clear_user - response = callback_view(request) - mock_logout.assert_called_once() - - assert response.status_code == 302 - assert response.url == "/example-logout" diff --git a/src/backend/core/tests/resource_server/test_authentication.py b/src/backend/core/tests/resource_server/test_authentication.py index f870564..5a2da00 100644 --- a/src/backend/core/tests/resource_server/test_authentication.py +++ b/src/backend/core/tests/resource_server/test_authentication.py @@ -11,15 +11,15 @@ from joserfc import jwe as jose_jwe from joserfc import jwt as jose_jwt from joserfc.rfc7518.rsa_key import RSAKey from jwt.utils import to_base64url_uint +from lasuite.oidc_resource_server.authentication import ( + ResourceServerAuthentication, + get_resource_server_backend, +) from rest_framework.request import Request as DRFRequest from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED from core.factories import UserFactory from core.models import ServiceProvider -from core.resource_server.authentication import ( - ResourceServerAuthentication, - get_resource_server_backend, -) pytestmark = pytest.mark.django_db @@ -30,7 +30,7 @@ def jwt_resource_server_backend_fixture(settings): _original_backend = str(settings.OIDC_RS_BACKEND_CLASS) settings.OIDC_RS_BACKEND_CLASS = ( - "core.resource_server.backend.JWTResourceServerBackend" + "lasuite.oidc_resource_server.backend.JWTResourceServerBackend" ) get_resource_server_backend.cache_clear() @@ -71,7 +71,7 @@ def test_resource_server_authentication_class(client, settings): """ assert ( settings.OIDC_RS_BACKEND_CLASS - == "core.resource_server.backend.ResourceServerBackend" + == "lasuite.oidc_resource_server.backend.ResourceServerBackend" ) settings.OIDC_RS_CLIENT_ID = "some_client_id" diff --git a/src/backend/core/tests/resource_server/test_backend.py b/src/backend/core/tests/resource_server/test_backend.py deleted file mode 100644 index fc7cdf1..0000000 --- a/src/backend/core/tests/resource_server/test_backend.py +++ /dev/null @@ -1,486 +0,0 @@ -""" -Test for the Resource Server (RS) Backend. -""" - -import json - -# pylint: disable=W0212 -from logging import Logger -from unittest.mock import Mock, patch - -from django.contrib import auth -from django.core.exceptions import SuspiciousOperation -from django.test.utils import override_settings - -import pytest -from joserfc.errors import InvalidClaimError, InvalidTokenError -from joserfc.jwt import JWTClaimsRegistry, Token -from requests.exceptions import HTTPError - -from core.resource_server.backend import JWTResourceServerBackend, ResourceServerBackend - - -@pytest.fixture(name="mock_authorization_server") -def fixture_mock_authorization_server(): - """Mock an Authorization Server client.""" - mock_server = Mock() - mock_server.url = "https://auth.server.com" - return mock_server - - -@pytest.fixture(name="mock_token") -def fixture_mock_token(): - """Mock a token""" - mock_token = Mock() - mock_token.claims = {"sub": "user123", "iss": "https://auth.server.com"} - return mock_token - - -@pytest.fixture(name="resource_server_backend") -def fixture_resource_server_backend(settings, mock_authorization_server): - """Generate a Resource Server backend.""" - - settings.OIDC_RS_CLIENT_ID = "client_id" - settings.OIDC_RS_CLIENT_SECRET = "client_secret" - settings.OIDC_RS_ENCRYPTION_ENCODING = "A256GCM" - settings.OIDC_RS_ENCRYPTION_ALGO = "RSA-OAEP" - settings.OIDC_RS_SIGNING_ALGO = "ES256" - settings.OIDC_RS_SCOPES = ["groups"] - - return ResourceServerBackend(mock_authorization_server) - - -@pytest.fixture(name="jwt_resource_server_backend") -def fixture_jwt_resource_server_backend(settings, mock_authorization_server): - """Generate a Resource Server backend.""" - - settings.OIDC_RS_CLIENT_ID = "client_id" - settings.OIDC_RS_CLIENT_SECRET = "client_secret" - settings.OIDC_RS_SCOPES = ["groups"] - - return JWTResourceServerBackend(mock_authorization_server) - - -@override_settings(OIDC_RS_CLIENT_ID="client_id") -@override_settings(OIDC_RS_CLIENT_SECRET="client_secret") -@override_settings(OIDC_RS_ENCRYPTION_ENCODING="A256GCM") -@override_settings(OIDC_RS_ENCRYPTION_ALGO="RSA-OAEP") -@override_settings(OIDC_RS_SIGNING_ALGO="RS256") -@override_settings(OIDC_RS_SCOPES=["scopes"]) -@patch.object(auth, "get_user_model", return_value="foo") -def test_backend_initialization(mock_get_user_model, mock_authorization_server): - """Test the ResourceServerBackend initialization.""" - - backend = ResourceServerBackend(mock_authorization_server) - - mock_get_user_model.assert_called_once() - assert backend.UserModel == "foo" - - assert backend._client_id == "client_id" - assert backend._client_secret == "client_secret" - assert backend._encryption_encoding == "A256GCM" - assert backend._encryption_algorithm == "RSA-OAEP" - assert backend._signing_algorithm == "RS256" - assert backend._scopes == ["scopes"] - - assert backend._authorization_server_client == mock_authorization_server - assert isinstance(backend._introspection_claims_registry, JWTClaimsRegistry) - - assert backend._introspection_claims_registry.options == { - "active": {"essential": True}, - "client_id": {"essential": True}, - "iss": {"essential": True, "value": "https://auth.server.com"}, - "scope": {"essential": True}, - } - - -@patch.object(ResourceServerBackend, "get_user", return_value="user") -def test_get_or_create_user(mock_get_user, resource_server_backend): - """Test 'get_or_create_user' method.""" - - access_token = "access_token" - res = resource_server_backend.get_or_create_user(access_token, None, None) - - mock_get_user.assert_called_once_with(access_token) - assert res == "user" - - -def test_verify_claims_success(resource_server_backend, mock_token): - """Test '_verify_claims' method with a successful response.""" - - with patch.object( - resource_server_backend._introspection_claims_registry, "validate" - ) as mock_validate: - resource_server_backend._verify_claims(mock_token) - mock_validate.assert_called_once_with(mock_token.claims) - - -def test_verify_claims_invalid_claim_error(resource_server_backend, mock_token): - """Test '_verify_claims' method with an invalid claim error.""" - - with patch.object( - resource_server_backend._introspection_claims_registry, "validate" - ) as mock_validate: - mock_validate.side_effect = InvalidClaimError("claim_name") - - expected_message = "Failed to validate token's claims" - with patch.object(Logger, "debug") as mock_logger_debug: - with pytest.raises(SuspiciousOperation, match=expected_message): - resource_server_backend._verify_claims(mock_token) - mock_logger_debug.assert_called_once_with( - "%s. Exception:", expected_message, exc_info=True - ) - - -def test_verify_claims_invalid_token_error(resource_server_backend, mock_token): - """Test '_verify_claims' method with an invalid token error.""" - - with patch.object( - resource_server_backend._introspection_claims_registry, "validate" - ) as mock_validate: - mock_validate.side_effect = InvalidTokenError - - expected_message = "Failed to validate token's claims" - with patch.object(Logger, "debug") as mock_logger_debug: - with pytest.raises(SuspiciousOperation, match=expected_message): - resource_server_backend._verify_claims(mock_token) - mock_logger_debug.assert_called_once_with( - "%s. Exception:", expected_message, exc_info=True - ) - - -def test_decode_success(resource_server_backend): - """Test '_decode' method with a successful response.""" - - encoded_token = "valid_encoded_token" - public_key_set = Mock() - - expected_decoded_token = {"sub": "user123"} - - with patch( - "joserfc.jwt.decode", return_value=expected_decoded_token - ) as mock_decode: - decoded_token = resource_server_backend._decode(encoded_token, public_key_set) - - mock_decode.assert_called_once_with( - "valid_encoded_token", public_key_set, algorithms=["ES256"] - ) - - assert decoded_token == expected_decoded_token - - -def test_decode_failure(resource_server_backend): - """Test '_decode' method with a ValueError""" - encoded_token = "invalid_encoded_token" - public_key_set = Mock() - - with patch("joserfc.jwt.decode", side_effect=ValueError): - with patch.object(Logger, "debug") as mock_logger_debug: - with pytest.raises(SuspiciousOperation, match="Token decoding failed"): - resource_server_backend._decode(encoded_token, public_key_set) - - mock_logger_debug.assert_called_once_with( - "%s. Exception:", "Token decoding failed", exc_info=True - ) - - -def test_decrypt_success(resource_server_backend): - """Test '_decrypt' method with a successful response.""" - encrypted_token = "valid_encrypted_token" - private_key = "private_key" - - expected_decrypted_token = Mock() - expected_decrypted_token.plaintext = "blah" - - with patch( - "joserfc.jwe.decrypt_compact", return_value=expected_decrypted_token - ) as mock_decrypt: - decrypted_token = resource_server_backend._decrypt(encrypted_token, private_key) - mock_decrypt.assert_called_once_with( - encrypted_token, private_key, algorithms=["RSA-OAEP", "A256GCM"] - ) - - assert decrypted_token == "blah" - - -def test_decrypt_failure(resource_server_backend): - """Test '_decrypt' method with an Exception.""" - encrypted_token = "invalid_encrypted_token" - private_key = "private_key" - - with patch( - "joserfc.jwe.decrypt_compact", side_effect=Exception("Decryption error") - ): - expected_message = "Token decryption failed" - with patch.object(Logger, "debug") as mock_logger_debug: - with pytest.raises(SuspiciousOperation, match=expected_message): - resource_server_backend._decrypt(encrypted_token, private_key) - mock_logger_debug.assert_called_once_with( - "%s. Exception:", expected_message, exc_info=True - ) - - -def test_resource_server_backend_introspect_success(resource_server_backend): - """Test '_introspect' method with a successful response.""" - token = "valid_token" - json_data = {"sub": "user123"} - - resource_server_backend._authorization_server_client.get_introspection = Mock( - return_value=json.dumps(json_data) - ) - - result = resource_server_backend._introspect(token) - - assert result.claims == json_data - resource_server_backend._authorization_server_client.get_introspection.assert_called_once_with( - "client_id", "client_secret", token - ) - - -@patch( - "core.resource_server.utils.import_private_key_from_settings", - return_value="private_key", -) -# pylint: disable=unused-argument -def test_jwt_resource_server_backend_introspect_success( - mock_import_private_key_from_settings, jwt_resource_server_backend -): - """Test '_introspect' method with a successful response.""" - jwt_rs_backend = jwt_resource_server_backend # prevent line too long - - token = "valid_token" - jwe = "valid_jwe" - jws = "valid_jws" - jwt = { - "aud": "client_id", - "iss": "https://auth.server.com", - "token_introspection": { - "sub": "user123", - "aud": "client_id", - "iss": "https://auth.server.com", - }, - } - - jwt_rs_backend._authorization_server_client.get_introspection = Mock( - return_value=jwe - ) - jwt_rs_backend._decrypt = Mock(return_value=jws) - jwt_rs_backend._authorization_server_client.import_public_keys = Mock( - return_value="public_key_set" - ) - jwt_rs_backend._decode = Mock(return_value=Token({}, jwt)) - - result = jwt_rs_backend._introspect(token) - - assert result.claims == { - "sub": "user123", - "aud": "client_id", - "iss": "https://auth.server.com", - } - - jwt_rs_backend._authorization_server_client.get_introspection.assert_called_once_with( - "client_id", "client_secret", token - ) - jwt_rs_backend._decrypt.assert_called_once_with(jwe, private_key="private_key") - jwt_rs_backend._authorization_server_client.import_public_keys.assert_called_once() - jwt_rs_backend._decode.assert_called_once_with(jws, "public_key_set") - - -def test_introspect_introspection_failure(resource_server_backend): - """Test '_introspect' method when introspection to the AS fails.""" - token = "invalid_token" - resource_server_backend._authorization_server_client.get_introspection.side_effect = HTTPError( - "Introspection error" - ) - - with patch.object(Logger, "debug") as mock_logger_debug: - expected_message = "Could not fetch introspection" - with pytest.raises(SuspiciousOperation, match=expected_message): - resource_server_backend._introspect(token) - - mock_logger_debug.assert_called_once_with( - "%s. Exception:", expected_message, exc_info=True - ) - - -@patch( - "core.resource_server.utils.import_private_key_from_settings", - return_value="private_key", -) -# pylint: disable=unused-argument -def test_jwt_resource_server_backend_introspect_public_key_import_failure( - mock_import_private_key_from_settings, jwt_resource_server_backend -): - """Test '_introspect' method when fetching AS's jwks fails.""" - token = "valid_token" - jwe = "valid_jwe" - jws = "valid_jws" - - jwt_resource_server_backend._authorization_server_client.get_introspection = Mock( - return_value=jwe - ) - jwt_resource_server_backend._decrypt = Mock(return_value=jws) - - ( - jwt_resource_server_backend._authorization_server_client.import_public_keys.side_effect - ) = HTTPError("Public key error") - - with patch.object(Logger, "debug") as mock_logger_debug: - expected_message = "Could get authorization server JWKS" - with pytest.raises(SuspiciousOperation, match=expected_message): - jwt_resource_server_backend._introspect(token) - - mock_logger_debug.assert_called_once_with( - "%s. Exception:", expected_message, exc_info=True - ) - - -def test_verify_user_info_success(resource_server_backend, settings): - """Test '_verify_user_info' with a successful response.""" - # test default OIDC_RS_AUDIENCE_CLAIM = client_id - introspection_response = {"active": True, "scope": "groups", "client_id": "123"} - result = resource_server_backend._verify_user_info(introspection_response) - assert result == introspection_response - - # test OIDC_RS_AUDIENCE_CLAIM = aud is taken into account - settings.OIDC_RS_AUDIENCE_CLAIM = "aud" - introspection_response = {"active": True, "scope": "groups", "aud": "123"} - result = resource_server_backend._verify_user_info(introspection_response) - assert result == introspection_response - - -def test_verify_user_info_inactive(resource_server_backend): - """Test '_verify_user_info' with an inactive introspection response.""" - - introspection_response = {"active": False, "scope": "groups"} - - expected_message = "Introspection response is not active." - with patch.object(Logger, "debug") as mock_logger_debug: - with pytest.raises(SuspiciousOperation, match=expected_message): - resource_server_backend._verify_user_info(introspection_response) - - mock_logger_debug.assert_called_once_with(expected_message) - - -def test_verify_user_info_wrong_scopes(resource_server_backend): - """Test '_verify_user_info' with wrong requested scopes.""" - - introspection_response = {"active": True, "scope": "wrong-scopes"} - - expected_message = "Introspection response contains any required scopes." - with patch.object(Logger, "debug") as mock_logger_debug: - with pytest.raises(SuspiciousOperation, match=expected_message): - resource_server_backend._verify_user_info(introspection_response) - - mock_logger_debug.assert_called_once_with(expected_message) - - -def test_resource_server_backend_get_user_success(resource_server_backend): - """Test '_get_user' with a successful response.""" - - access_token = "valid_access_token" - mock_jwt = Mock() - mock_claims = {"sub": "user123", "client_id": "123"} - mock_user = Mock() - - resource_server_backend._introspect = Mock(return_value=mock_jwt) - resource_server_backend._verify_claims = Mock(return_value=mock_claims) - resource_server_backend._verify_user_info = Mock(return_value=mock_claims) - resource_server_backend.UserModel.objects.get = Mock(return_value=mock_user) - - user = resource_server_backend.get_user(access_token) - - assert user == mock_user - resource_server_backend._introspect.assert_called_once_with(access_token) - resource_server_backend._verify_claims.assert_called_once_with(mock_jwt) - resource_server_backend._verify_user_info.assert_called_once_with(mock_claims) - resource_server_backend.UserModel.objects.get.assert_called_once_with(sub="user123") - - -def test_get_user_could_not_introspect(resource_server_backend): - """Test '_get_user' with introspection failing.""" - - access_token = "valid_access_token" - - resource_server_backend._introspect = Mock( - side_effect=SuspiciousOperation("Invalid jwt") - ) - resource_server_backend._verify_claims = Mock() - resource_server_backend._verify_user_info = Mock() - - with pytest.raises(SuspiciousOperation, match="Invalid jwt"): - resource_server_backend.get_user(access_token) - - resource_server_backend._introspect.assert_called_once_with(access_token) - resource_server_backend._verify_claims.assert_not_called() - resource_server_backend._verify_user_info.assert_not_called() - - -def test_get_user_invalid_introspection_response(resource_server_backend): - """Test '_get_user' with an invalid introspection response.""" - - access_token = "valid_access_token" - mock_jwt = Mock() - - resource_server_backend._introspect = Mock(return_value=mock_jwt) - resource_server_backend._verify_claims = Mock( - side_effect=SuspiciousOperation("Invalid claims") - ) - resource_server_backend._verify_user_info = Mock() - - with pytest.raises(SuspiciousOperation, match="Invalid claims"): - resource_server_backend.get_user(access_token) - - resource_server_backend._introspect.assert_called_once_with(access_token) - resource_server_backend._verify_claims.assert_called_once_with(mock_jwt) - resource_server_backend._verify_user_info.assert_not_called() - - -def test_resource_server_backend_get_user_user_not_found(resource_server_backend): - """Test '_get_user' if the user is not found.""" - - access_token = "valid_access_token" - mock_jwt = Mock() - mock_claims = {"sub": "user123"} - - resource_server_backend._introspect = Mock(return_value=mock_jwt) - resource_server_backend._verify_claims = Mock(return_value=mock_claims) - resource_server_backend._verify_user_info = Mock(return_value=mock_claims) - resource_server_backend.UserModel.objects.get = Mock( - side_effect=resource_server_backend.UserModel.DoesNotExist - ) - - with patch.object(Logger, "debug") as mock_logger_debug: - user = resource_server_backend.get_user(access_token) - assert user is None - resource_server_backend._introspect.assert_called_once_with(access_token) - resource_server_backend._verify_claims.assert_called_once_with(mock_jwt) - resource_server_backend._verify_user_info.assert_called_once_with(mock_claims) - resource_server_backend.UserModel.objects.get.assert_called_once_with( - sub="user123" - ) - - mock_logger_debug.assert_called_once_with( - "Login failed: No user with %s found", "user123" - ) - - -def test_get_user_no_user_identification(resource_server_backend): - """Test '_get_user' if the response miss a user identification.""" - - access_token = "valid_access_token" - mock_jwt = Mock() - mock_claims = {"token_introspection": {}} - - resource_server_backend._introspect = Mock(return_value=mock_jwt) - resource_server_backend._verify_claims = Mock(return_value=mock_claims) - resource_server_backend._verify_user_info = Mock( - return_value=mock_claims["token_introspection"] - ) - - expected_message = "User info contained no recognizable user identification" - with patch.object(Logger, "debug") as mock_logger_debug: - with pytest.raises(SuspiciousOperation, match=expected_message): - resource_server_backend.get_user(access_token) - - mock_logger_debug.assert_called_once_with(expected_message) diff --git a/src/backend/core/tests/resource_server/test_clients.py b/src/backend/core/tests/resource_server/test_clients.py deleted file mode 100644 index aa59178..0000000 --- a/src/backend/core/tests/resource_server/test_clients.py +++ /dev/null @@ -1,187 +0,0 @@ -""" -Test for the Resource Server (RS) clients classes. -""" - -# pylint: disable=W0212 - -from unittest.mock import MagicMock, patch - -import pytest -from joserfc.jwk import KeySet, RSAKey -from requests.exceptions import HTTPError - -from core.resource_server.clients import AuthorizationServerClient - - -@pytest.fixture(name="client") -def fixture_client(): - """Generate an Authorization Server client.""" - return AuthorizationServerClient( - url="https://auth.example.com/api/v2", - url_jwks="https://auth.example.com/api/v2/jwks", - url_introspection="https://auth.example.com/api/v2/introspect", - verify_ssl=True, - timeout=5, - proxy=None, - ) - - -def test_authorization_server_client_initialization(): - """Test the AuthorizationServerClient initialization.""" - - new_client = AuthorizationServerClient( - url="https://auth.example.com/api/v2", - url_jwks="https://auth.example.com/api/v2/jwks", - url_introspection="https://auth.example.com/api/v2/checktoken/foo", - verify_ssl=True, - timeout=5, - proxy=None, - ) - - assert new_client.url == "https://auth.example.com/api/v2" - assert ( - new_client._url_introspection - == "https://auth.example.com/api/v2/checktoken/foo" - ) - assert new_client._url_jwks == "https://auth.example.com/api/v2/jwks" - assert new_client._verify_ssl is True - assert new_client._timeout == 5 - assert new_client._proxy is None - - -def test_introspection_headers(client): - """Test the introspection headers to ensure they match the expected values.""" - assert client._introspection_headers == { - "Content-Type": "application/x-www-form-urlencoded", - "Accept": "application/token-introspection+jwt", - } - - -@patch("requests.post") -def test_get_introspection_success(mock_post, client): - """Test 'get_introspection' method with a successful response.""" - - mock_response = MagicMock() - mock_response.raise_for_status.return_value = None - mock_response.text = "introspection response" - mock_post.return_value = mock_response - - result = client.get_introspection("client_id", "client_secret", "token") - assert result == "introspection response" - - mock_post.assert_called_once_with( - "https://auth.example.com/api/v2/introspect", - data={ - "client_id": "client_id", - "client_secret": "client_secret", - "token": "token", - }, - headers={ - "Content-Type": "application/x-www-form-urlencoded", - "Accept": "application/token-introspection+jwt", - }, - verify=True, - timeout=5, - proxies=None, - ) - - -@patch("requests.post", side_effect=HTTPError()) -# pylint: disable=(unused-argument -def test_get_introspection_error(mock_post, client): - """Test 'get_introspection' method with an HTTPError.""" - with pytest.raises(HTTPError): - client.get_introspection("client_id", "client_secret", "token") - - -@patch("requests.get") -def test_get_jwks_success(mock_get, client): - """Test 'get_jwks' method with a successful response.""" - - mock_response = MagicMock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"jwks": "foo"} - mock_get.return_value = mock_response - - result = client.get_jwks() - assert result == {"jwks": "foo"} - - mock_get.assert_called_once_with( - "https://auth.example.com/api/v2/jwks", - verify=client._verify_ssl, - timeout=client._timeout, - proxies=client._proxy, - ) - - -@patch("requests.get") -def test_get_jwks_error(mock_get, client): - """Test 'get_jwks' method with an HTTPError.""" - - mock_response = MagicMock() - mock_response.raise_for_status.side_effect = HTTPError( - response=MagicMock(status=500) - ) - mock_get.return_value = mock_response - - with pytest.raises(HTTPError): - client.get_jwks() - - -@patch("requests.get") -def test_import_public_keys_valid(mock_get, client): - """Test 'import_public_keys' method with a successful response.""" - - mocked_key = RSAKey.generate_key(2048) - - mock_response = MagicMock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"keys": [mocked_key.as_dict()]} - mock_get.return_value = mock_response - - response = client.import_public_keys() - - assert isinstance(response, KeySet) - assert response.as_dict() == KeySet([mocked_key]).as_dict() - - -@patch("requests.get") -def test_import_public_keys_http_error(mock_get, client): - """Test 'import_public_keys' method with an HTTPError.""" - - mock_response = MagicMock() - mock_response.raise_for_status.side_effect = HTTPError( - response=MagicMock(status=500) - ) - mock_get.return_value = mock_response - - with pytest.raises(HTTPError): - client.import_public_keys() - - -@patch("requests.get") -def test_import_public_keys_empty_jwks(mock_get, client): - """Test 'import_public_keys' method with empty keys response.""" - - mock_response = MagicMock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"keys": []} - mock_get.return_value = mock_response - - response = client.import_public_keys() - - assert isinstance(response, KeySet) - assert response.as_dict() == {"keys": []} - - -@patch("requests.get") -def test_import_public_keys_invalid_jwks(mock_get, client): - """Test 'import_public_keys' method with invalid keys response.""" - - mock_response = MagicMock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"keys": [{"foo": "foo"}]} - mock_get.return_value = mock_response - - with pytest.raises(ValueError): - client.import_public_keys() diff --git a/src/backend/core/tests/resource_server/test_utils.py b/src/backend/core/tests/resource_server/test_utils.py deleted file mode 100644 index c251072..0000000 --- a/src/backend/core/tests/resource_server/test_utils.py +++ /dev/null @@ -1,88 +0,0 @@ -""" -Test for the Resource Server (RS) utils functions. -""" - -from django.core.exceptions import ImproperlyConfigured -from django.test.utils import override_settings - -import pytest -from joserfc.jwk import ECKey, RSAKey - -from core.resource_server.utils import import_private_key_from_settings - -PRIVATE_KEY_STR_MOCKED = """-----BEGIN PRIVATE KEY----- -MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC3boG1kwEGUYL+ -U58RPrVToIsF9jHB64S6WJIIInPmAclBciXFb6BWG11mbRIgo8ha3WVnC/tGHbXb -ndiKdrH2vKHOsDhV9AmgHgNgWaUK9L0uuKEb/xMLePYWsYlgzcQJx8RZY7RQyWqE -20WfzFxeuCE7QMb6VXSOgwQMnJsKocguIh3VCI9RIBq3B1kdgW35AD63YKOygmGx -qjcWwbjhKLvkF7LpBdlyAEzOKqg4T5uCcHMfksMW2+foTJx70RrZM/KHU+Zysuw7 -uhhVsgPBG+CsqBSjHQhs7jzymqxtQAfe1FkrCRxOq5Pv2Efr7kgtVSkJJiX3KutM -vnWuEypxAgMBAAECggEAGqKS9pbrN+vnmb7yMsqYgVVnQn0aggZNHlLkl4ZLLnuV -aemlhur7zO0JzajqUC+AFQOfaQxiFu8S/FoJ+qccFdATrcPEVmTKbgPVqSyzLKlX -fByGll5eOVT95NMwN8yBGgt2HSW/ZditXS/KxxahVgamGqjAC9MTSutGz/8Ae1U+ -DNDBJCc6RAqu3T02tV9A2pSpVC1rSktDMpLUTscnsfxpaEQATd9DJUcHEvIwoX8q -GJpycPEhNhdPXqpln5SoMHcf/zS5ssF/Mce0lJJXYyE0LnEk9X12jMWyBqmLqXUY -cKLyynaFbis0DpQppwKx2y8GpL76k+Ci4dOHIvFknQKBgQDj/2WRMcWOvfBrggzj -FHpcme2gSo5A5c0CVyI+Xkf1Zab6UR6T7GiImEoj9tq0+o2WEix9rwoypgMBq8rz -/rrJAPSZjgv6z71k4EnO2FIB5R03vQmoBRCN8VlgvLM0xv52zyjV4Wx66Q4MDjyH -EgkpHyB0FzRZh0UzhnE/pYSetQKBgQDN9eLB1nA4CBSr1vMGNfQyfBQl3vpO9EP4 -VSS3KnUqCIjJeLu682Ylu7SFxcJAfzUpy5S43hEvcuJsagsVKfmCAGcYZs9/xq3I -vzYyhaEOS5ezNxLSh4+yCNBPlmrmDyoazag0t8H8YQFBN6BVcxbATHqdWGUhIhYN -eEpEMOh2TQKBgGBr7kRNTENlyHtu8IxIaMcowfn8DdUcWmsW9oBx1vTNHKTYEZp1 -bG/4F8LF7xCCtcY1wWMV17Y7xyG5yYcOv2eqY8dc72wO1wYGZLB5g5URlB2ycJcC -LVIaM7ZZl2BGl+8fBSIOx5XjYfFvQ+HLmtwtMchm19jVAEseHF7SXRfRAoGAK15j -aT2mU6Yf9C9G7T/fM+I8u9zACHAW/+ut14PxN/CkHQh3P16RW9CyqpiB1uLyZuKf -Zm4cYElotDuAKey0xVMgYlsDxnwni+X3m5vX1hLE1s/5/qrc7zg75QZfbCI1U3+K -s88d4e7rPLhh4pxhZgy0pP1ADkIHMr7ppIJH8OECgYEApNfbgsJVPAMzucUhJoJZ -OmZHbyCtJvs4b+zxnmhmSbopifNCgS4zjXH9qC7tsUph1WE6L2KXvtApHGD5H4GQ -IH5em4M/pHIcsqCi1qggBMbdvzHBUtC3R4sK0CpEFHlN+Y59aGazidcN2FPupNJv -MbyqKyC6DAzv4jEEhHaN7oY= ------END PRIVATE KEY----- -""" - - -@override_settings(OIDC_RS_PRIVATE_KEY_STR=PRIVATE_KEY_STR_MOCKED) -@pytest.mark.parametrize("mocked_private_key", [None, ""]) -def test_import_private_key_from_settings_missing_or_empty_key( - settings, mocked_private_key -): - """Should raise an exception if the settings 'OIDC_RS_PRIVATE_KEY_STR' is missing or empty.""" - settings.OIDC_RS_PRIVATE_KEY_STR = mocked_private_key - - with pytest.raises( - ImproperlyConfigured, - match="OIDC_RS_PRIVATE_KEY_STR setting is missing or empty.", - ): - import_private_key_from_settings() - - -@pytest.mark.parametrize("mocked_private_key", ["123", "foo", "invalid_key"]) -@override_settings(OIDC_RS_PRIVATE_KEY_STR=PRIVATE_KEY_STR_MOCKED) -@override_settings(OIDC_RS_ENCRYPTION_KEY_TYPE="RSA") -@override_settings(OIDC_RS_ENCRYPTION_ALGO="RS256") -def test_import_private_key_from_settings_incorrect_key(settings, mocked_private_key): - """Should raise an exception if the setting 'OIDC_RS_PRIVATE_KEY_STR' has an incorrect value.""" - settings.OIDC_RS_PRIVATE_KEY_STR = mocked_private_key - - with pytest.raises( - ImproperlyConfigured, match="OIDC_RS_PRIVATE_KEY_STR setting is wrong." - ): - import_private_key_from_settings() - - -@override_settings(OIDC_RS_PRIVATE_KEY_STR=PRIVATE_KEY_STR_MOCKED) -@override_settings(OIDC_RS_ENCRYPTION_KEY_TYPE="RSA") -@override_settings(OIDC_RS_ENCRYPTION_ALGO="RS256") -def test_import_private_key_from_settings_success_rsa_key(): - """Should import private key string as an RSA key.""" - private_key = import_private_key_from_settings() - assert isinstance(private_key, RSAKey) - - -@override_settings(OIDC_RS_PRIVATE_KEY_STR=PRIVATE_KEY_STR_MOCKED) -@override_settings(OIDC_RS_ENCRYPTION_KEY_TYPE="EC") -@override_settings(OIDC_RS_ENCRYPTION_ALGO="ES256") -def test_import_private_key_from_settings_success_ec_key(): - """Should import private key string as an EC key.""" - private_key = import_private_key_from_settings() - assert isinstance(private_key, ECKey) diff --git a/src/backend/core/tests/resource_server/test_views.py b/src/backend/core/tests/resource_server/test_views.py deleted file mode 100644 index 505bab2..0000000 --- a/src/backend/core/tests/resource_server/test_views.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -Tests for the Resource Server (RS) Views. -""" - -from unittest import mock - -from django.core.exceptions import ImproperlyConfigured -from django.urls import reverse - -import pytest -from joserfc.jwk import RSAKey -from rest_framework.test import APIClient - -pytestmark = pytest.mark.django_db - - -@mock.patch("core.resource_server.utils.import_private_key_from_settings") -def test_view_jwks_valid_public_key(mock_import_private_key_from_settings): - """JWKs endpoint should return a set of valid Json Web Key""" - - mocked_key = RSAKey.generate_key(2048) - mock_import_private_key_from_settings.return_value = mocked_key - - url = reverse("resource_server_jwks") - response = APIClient().get(url) - - mock_import_private_key_from_settings.assert_called_once() - - assert response.status_code == 200 - assert response["Content-Type"] == "application/json" - - jwks = response.json() - assert jwks == {"keys": [mocked_key.as_dict(private=False)]} - - # Security checks to make sure no details from the private key are exposed - private_details = ["d", "p", "q", "dp", "dq", "qi", "oth", "r", "t"] - assert all( - private_detail not in jwks["keys"][0].keys() - for private_detail in private_details - ) - - -@mock.patch("core.resource_server.utils.import_private_key_from_settings") -def test_view_jwks_invalid_private_key(mock_import_private_key_from_settings): - """JWKS endpoint should return a proper exception when loading keys fails.""" - - mock_import_private_key_from_settings.return_value = "wrong_key" - - url = reverse("resource_server_jwks") - response = APIClient().get(url) - - mock_import_private_key_from_settings.assert_called_once() - - assert response.status_code == 500 - assert response.json() == {"error": "Could not load key"} - - -@mock.patch("core.resource_server.utils.import_private_key_from_settings") -def test_view_jwks_missing_private_key(mock_import_private_key_from_settings): - """JWKS endpoint should return a proper exception when private key is missing.""" - - mock_import_private_key_from_settings.side_effect = ImproperlyConfigured("foo.") - - url = reverse("resource_server_jwks") - response = APIClient().get(url) - - mock_import_private_key_from_settings.assert_called_once() - - assert response.status_code == 500 - assert response.json() == {"error": "foo."} diff --git a/src/backend/core/tests/resource_server_api/conftest.py b/src/backend/core/tests/resource_server_api/conftest.py index b0eaaee..0803b4d 100644 --- a/src/backend/core/tests/resource_server_api/conftest.py +++ b/src/backend/core/tests/resource_server_api/conftest.py @@ -9,8 +9,7 @@ from django.contrib.auth import get_user_model import pytest import responses from faker import Faker - -from core.resource_server.authentication import ResourceServerAuthentication +from lasuite.oidc_resource_server.authentication import ResourceServerAuthentication User = get_user_model() fake = Faker() @@ -48,7 +47,7 @@ def _force_login_via_resource_server( ): client_fixture.force_login( user, - backend="core.resource_server.authentication.ResourceServerAuthentication", + backend="lasuite.oidc_resource_server.authentication.ResourceServerAuthentication", ) yield diff --git a/src/backend/people/api_urls.py b/src/backend/people/api_urls.py index ee8927a..d21eda2 100644 --- a/src/backend/people/api_urls.py +++ b/src/backend/people/api_urls.py @@ -3,11 +3,11 @@ from django.conf import settings from django.urls import include, path, re_path +from lasuite.oidc_login.urls import urlpatterns as oidc_urls +from lasuite.oidc_resource_server.urls import urlpatterns as resource_server_urls from rest_framework.routers import DefaultRouter from core.api.client import viewsets -from core.authentication.urls import urlpatterns as oidc_urls -from core.resource_server.urls import urlpatterns as resource_server_urls from mailbox_oauth2.urls import urlpatterns as mailbox_oauth2_urls diff --git a/src/backend/people/resource_server_urls.py b/src/backend/people/resource_server_urls.py index 67a9987..fc67c64 100644 --- a/src/backend/people/resource_server_urls.py +++ b/src/backend/people/resource_server_urls.py @@ -2,10 +2,10 @@ from django.urls import include, path +from lasuite.oidc_resource_server.urls import urlpatterns as resource_server_urls from rest_framework.routers import DefaultRouter from core.api.resource_server import viewsets -from core.resource_server.urls import urlpatterns as resource_server_urls # - Main endpoints # Contacts will be added later diff --git a/src/backend/people/settings.py b/src/backend/people/settings.py index a3affcb..38d49f1 100755 --- a/src/backend/people/settings.py +++ b/src/backend/people/settings.py @@ -474,9 +474,9 @@ class Base(Configuration): environ_prefix=None, ) - USER_OIDC_FIELDS_TO_NAME = values.ListValue( + USER_OIDC_FIELDS_TO_FULLNAME = values.ListValue( default=["first_name", "last_name"], - environ_name="USER_OIDC_FIELDS_TO_NAME", + environ_name="USER_OIDC_FIELDS_TO_FULLNAME", environ_prefix=None, ) OIDC_ORGANIZATION_REGISTRATION_ID_FIELD = values.Value( @@ -490,7 +490,7 @@ class Base(Configuration): ) OIDC_OP_URL = values.Value(None, environ_name="OIDC_OP_URL", environ_prefix=None) OIDC_RS_BACKEND_CLASS = values.Value( - "core.resource_server.backend.ResourceServerBackend", + "lasuite.oidc_resource_server.backend.ResourceServerBackend", environ_name="OIDC_RS_BACKEND_CLASS", environ_prefix=None, ) diff --git a/src/backend/pyproject.toml b/src/backend/pyproject.toml index bff546d..7785357 100644 --- a/src/backend/pyproject.toml +++ b/src/backend/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "django-configurations==2.5.1", "django-cors-headers==4.7.0", "django-countries==7.6.1", + "django-lasuite==0.0.1", "django-oauth-toolkit==3.0.1", "django-parler==2.3", "django-redis==5.4.0", diff --git a/src/helm/env.d/dev/values.desk.yaml.gotmpl b/src/helm/env.d/dev/values.desk.yaml.gotmpl index 0e29122..f306be4 100644 --- a/src/helm/env.d/dev/values.desk.yaml.gotmpl +++ b/src/helm/env.d/dev/values.desk.yaml.gotmpl @@ -50,7 +50,7 @@ backend: key: OIDC_RP_CLIENT_SECRET OIDC_RP_SIGN_ALGO: RS256 OIDC_RP_SCOPES: "openid email siret given_name usual_name" - USER_OIDC_FIELDS_TO_NAME: "given_name,usual_name" + USER_OIDC_FIELDS_TO_FULLNAME: "given_name,usual_name" OIDC_REDIRECT_ALLOWED_HOSTS: https://desk.127.0.0.1.nip.io OIDC_AUTH_REQUEST_EXTRA_PARAMS: "{'acr_values': 'eidas1'}" OAUTH2_PROVIDER_OIDC_ENABLED: True