(oidc) add simple introspection backend

This provides a configurable OIDC introspection backend to be able to
call introspection endpoints which returns JSON data instead of an
encrypted JWT.

Two backends are currently defined:

 - ResourceServerBackend` which expect a JSON response
 - JWTResourceServerBackend which implements RFC 9701 and expects
   JWE reponse.

There might be other cases (eg: ResourceServerBackend with JWT, JWS or
JWE, etc. but for now we don't use it, so we follow YAGNI).

This also allow to configure the claim to determine the "audience":

 - client_id: for our Keycloak implementation
 - aud: used by ProConnect
This commit is contained in:
Quentin BEY
2025-03-19 16:28:35 +01:00
committed by BEY Quentin
parent b771f614e2
commit 6b2ca88ff2
6 changed files with 320 additions and 88 deletions

View File

@@ -10,6 +10,7 @@ and this project adheres to
### Added
- ✨(oidc) add simple introspection backend #832
- 🧑‍💻(tasks) run management commands #814
## [1.14.1] - 2025-03-17

View File

@@ -3,18 +3,26 @@
import base64
import binascii
import logging
from functools import lru_cache
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.utils.module_loading import import_string
from mozilla_django_oidc.contrib.drf import OIDCAuthentication
from .backend import ResourceServerBackend, ResourceServerImproperlyConfiguredBackend
from .backend import ResourceServerImproperlyConfiguredBackend
from .clients import AuthorizationServerClient
logger = logging.getLogger(__name__)
@lru_cache(maxsize=None)
def get_resource_server_backend():
"""Return the resource server backend class based on the settings."""
return import_string(settings.OIDC_RS_BACKEND_CLASS)
class ResourceServerAuthentication(OIDCAuthentication):
"""Authenticate clients using the token received from the authorization server."""
@@ -31,7 +39,7 @@ class ResourceServerAuthentication(OIDCAuthentication):
url_jwks=settings.OIDC_OP_JWKS_ENDPOINT,
url_introspection=settings.OIDC_OP_INTROSPECTION_ENDPOINT,
)
self.backend = ResourceServerBackend(authorization_server_client)
self.backend = get_resource_server_backend()(authorization_server_client)
except ImproperlyConfigured as err:
message = "Resource Server authentication is disabled"

View File

@@ -1,6 +1,8 @@
"""Resource Server Backend"""
import json
import logging
from json import JSONDecodeError
from django.conf import settings
from django.contrib import auth
@@ -9,6 +11,7 @@ from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation
from joserfc import jwe as jose_jwe
from joserfc import jwt as jose_jwt
from joserfc.errors import InvalidClaimError, InvalidTokenError
from joserfc.jwt import Token
from requests.exceptions import HTTPError
from rest_framework.exceptions import AuthenticationFailed
@@ -50,15 +53,17 @@ class ResourceServerBackend:
or not authorization_server_client
):
raise ImproperlyConfigured(
"Could not instantiate ResourceServerBackend, some parameters are missing."
f"Could not instantiate {self.__class__.__name__}: some parameters are missing.",
)
self._authorization_server_client = authorization_server_client
self._claims_registry = jose_jwt.JWTClaimsRegistry(
self._introspection_claims_registry = jose_jwt.JWTClaimsRegistry(
iss={"essential": True, "value": self._authorization_server_client.url},
aud={"essential": True, "value": self._client_id},
token_introspection={"essential": True},
active={"essential": True},
scope={"essential": True}, # content validated in _verify_user_info
# optional in RFC, but required here: "client_id" or "aud"
**{settings.OIDC_RS_AUDIENCE_CLAIM: {"essential": True}},
)
# Declare the token origin audience: to know where the token comes from
@@ -93,7 +98,7 @@ class ResourceServerBackend:
jwt = self._introspect(access_token)
claims = self._verify_claims(jwt)
user_info = self._verify_user_info(claims["token_introspection"])
user_info = self._verify_user_info(claims)
sub = user_info.get("sub")
if sub is None:
@@ -106,7 +111,7 @@ class ResourceServerBackend:
logger.debug("Login failed: No user with %s found", sub)
return None
self.token_origin_audience = str(user_info["aud"])
self.token_origin_audience = str(user_info[settings.OIDC_RS_AUDIENCE_CLAIM])
return user
@@ -135,7 +140,7 @@ class ResourceServerBackend:
logger.debug(message)
raise SuspiciousOperation(message)
audience = introspection_response.get("aud", None)
audience = introspection_response.get(settings.OIDC_RS_AUDIENCE_CLAIM, None)
if not audience:
raise SuspiciousOperation(
"Introspection response does not provide source audience."
@@ -143,32 +148,42 @@ class ResourceServerBackend:
return introspection_response
def _introspect(self, token):
"""Introspect an access token to the authorization server."""
def _get_introspection(self, access_token):
"""Request introspection of an access token to the authorization server."""
try:
jwe = self._authorization_server_client.get_introspection(
self._client_id,
self._client_secret,
token,
introspection_response = (
self._authorization_server_client.get_introspection(
self._client_id,
self._client_secret,
access_token,
)
)
except HTTPError as err:
message = "Could not fetch introspection"
logger.debug("%s. Exception:", message, exc_info=True)
raise SuspiciousOperation(message) from err
private_key = utils.import_private_key_from_settings()
jws = self._decrypt(jwe, private_key=private_key)
return introspection_response
def _introspect(self, access_token) -> Token:
"""
Introspect an access token to the authorization server.
Not implemented here:
- introspection_str might be a JWT, not a JSON
and therefore should be decoded
- introspection_str might be a JWS, not a JSON
and therefore should be verified (using self._decode)
- introspection_str might be a JWE, not a JSON
and therefore should be decrypted (using self._decrypt)
"""
introspection_str = self._get_introspection(access_token)
try:
public_key_set = self._authorization_server_client.import_public_keys()
except (TypeError, ValueError, AttributeError, HTTPError) as err:
message = "Could get authorization server JWKS"
logger.debug("%s. Exception:", message, exc_info=True)
raise SuspiciousOperation(message) from err
introspection_data = json.loads(introspection_str)
except JSONDecodeError as exc:
raise SuspiciousOperation("Invalid JSON for introspection") from exc
jwt = self._decode(jws, public_key_set)
return jwt
return Token({}, introspection_data)
def _decrypt(self, encrypted_token, private_key):
"""Decrypt the token encrypted by the Authorization Server (AS).
@@ -190,7 +205,7 @@ class ResourceServerBackend:
logger.debug("%s. Exception:", message, exc_info=True)
raise SuspiciousOperation(message) from err
return decrypted_token
return decrypted_token.plaintext
def _decode(self, encoded_token, public_key_set):
"""Decode the token signed by the Authorization Server (AS).
@@ -201,7 +216,7 @@ class ResourceServerBackend:
"""
try:
token = jose_jwt.decode(
encoded_token.plaintext,
encoded_token,
public_key_set,
algorithms=[self._signing_algorithm],
)
@@ -221,7 +236,7 @@ class ResourceServerBackend:
token substitution or misuse of tokens issued for different clients.
"""
try:
self._claims_registry.validate(token.claims)
self._introspection_claims_registry.validate(token.claims)
except (InvalidClaimError, InvalidTokenError) as err:
message = "Failed to validate token's claims"
logger.debug("%s. Exception:", message, exc_info=True)
@@ -230,6 +245,61 @@ class ResourceServerBackend:
return token.claims
class JWTResourceServerBackend(ResourceServerBackend):
"""Backend of an OAuth 2.0 resource server.
Override the classic ResourceServerBackend to support JWT introspection
tokens as described in the RFC https://datatracker.ietf.org/doc/rfc9701/
For this implementation, we expect the introspection response to be
in JWT format, signed and encrypted.
"""
def _introspect(self, access_token):
"""
Introspect an access token to the authorization server.
We expect here the `token_introspection` claim to contain the
JWT information to be verified:
- iss
- aud
- iat
Not implemented here:
- introspection_str might be a JSON, not a JWE
- introspection_str might be a JWS, not a JWE
"""
introspection_str = self._get_introspection(access_token)
private_key = utils.import_private_key_from_settings()
jws = self._decrypt(introspection_str, private_key=private_key)
try:
public_key_set = self._authorization_server_client.import_public_keys()
except (TypeError, ValueError, AttributeError, HTTPError) as err:
message = "Could get authorization server JWKS"
logger.debug("%s. Exception:", message, exc_info=True)
raise SuspiciousOperation(message) from err
jwt = self._decode(jws, public_key_set)
token_registry = jose_jwt.JWTClaimsRegistry(
iss={"essential": True, "value": self._authorization_server_client.url},
aud={"essential": True, "value": self._client_id},
token_introspection={"essential": True},
)
try:
token_registry.validate(jwt.claims)
except (InvalidClaimError, InvalidTokenError) as err:
logger.exception("JWTResourceServerBackend: %s", err)
raise SuspiciousOperation("Failed to validate token's claims") from err
introspection_data = jwt.claims["token_introspection"]
return Token({}, introspection_data)
class ResourceServerImproperlyConfiguredBackend:
"""Fallback backend for improperly configured Resource Servers."""

View File

@@ -16,11 +16,30 @@ from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED
from core.factories import UserFactory
from core.models import ServiceProvider
from core.resource_server.authentication import ResourceServerAuthentication
from core.resource_server.authentication import (
ResourceServerAuthentication,
get_resource_server_backend,
)
pytestmark = pytest.mark.django_db
@pytest.fixture(name="jwt_resource_server_backend")
def jwt_resource_server_backend_fixture(settings):
"""Fixture to switch the backend to the JWTResourceServerBackend."""
_original_backend = str(settings.OIDC_RS_BACKEND_CLASS)
settings.OIDC_RS_BACKEND_CLASS = (
"core.resource_server.backend.JWTResourceServerBackend"
)
get_resource_server_backend.cache_clear()
yield
settings.OIDC_RS_BACKEND_CLASS = _original_backend
get_resource_server_backend.cache_clear()
def build_authorization_bearer(token):
"""
Build an Authorization Bearer header value from a token.
@@ -47,6 +66,88 @@ def test_resource_server_authentication_class(client, settings):
`resource_server_token_audience` attribute which is used in
the resource server views.
This test uses the `/resource-server/v1.0/teams/` URL as an example
because we don't want to create a new URL just for this test.
"""
assert (
settings.OIDC_RS_BACKEND_CLASS
== "core.resource_server.backend.ResourceServerBackend"
)
settings.OIDC_RS_CLIENT_ID = "some_client_id"
settings.OIDC_RS_CLIENT_SECRET = "some_client_secret"
settings.OIDC_OP_URL = "https://oidc.example.com"
settings.OIDC_VERIFY_SSL = False
settings.OIDC_TIMEOUT = 5
settings.OIDC_PROXY = None
settings.OIDC_OP_JWKS_ENDPOINT = "https://oidc.example.com/jwks"
settings.OIDC_OP_INTROSPECTION_ENDPOINT = "https://oidc.example.com/introspect"
responses.add(
responses.POST,
"https://oidc.example.com/introspect",
json={
"iss": "https://oidc.example.com",
"aud": "some_client_id", # settings.OIDC_RS_CLIENT_ID
"sub": "very-specific-sub",
"client_id": "some_service_provider",
"scope": "openid groups",
"active": True,
},
)
# Try to authenticate while the user does not exist => 401
response = client.get(
"/resource-server/v1.0/teams/", # use an exising URL here
format="json",
HTTP_AUTHORIZATION=f"Bearer {build_authorization_bearer('some_token')}",
)
assert response.status_code == HTTP_401_UNAUTHORIZED
assert ServiceProvider.objects.count() == 0
# Create a user with the specific sub, the access is authorized
UserFactory(sub="very-specific-sub")
response = client.get(
"/resource-server/v1.0/teams/", # use an exising URL here
format="json",
HTTP_AUTHORIZATION=f"Bearer {build_authorization_bearer('some_token')}",
)
assert response.status_code == HTTP_200_OK
response_request = response.renderer_context.get("request")
assert isinstance(response_request, DRFRequest)
assert isinstance(
response_request.successful_authenticator, ResourceServerAuthentication
)
# Check that the user is authenticated
assert response_request.user.is_authenticated
# Check the request contains the resource server token audience
assert response_request.resource_server_token_audience == "some_service_provider"
# Check that no service provider is created here
assert ServiceProvider.objects.count() == 0
@responses.activate
def test_jwt_resource_server_authentication_class( # pylint: disable=unused-argument
client, jwt_resource_server_backend, settings
):
"""
Defines the settings for the resource server
for a full authentication with introspection process.
This is an integration test that checks the authentication process
when using the ResourceServerAuthentication class.
This test asserts the DRF request object contains the
`resource_server_token_audience` attribute which is used in
the resource server views.
This test uses the `/resource-server/v1.0/teams/` URL as an example
because we don't want to create a new URL just for this test.
"""
@@ -134,7 +235,8 @@ def test_resource_server_authentication_class(client, settings):
"token_introspection": {
"sub": "very-specific-sub",
"iss": "https://oidc.example.com",
"aud": "some_service_provider",
"aud": "some_client_id",
"client_id": "some_service_provider",
"scope": "openid groups",
"active": True,
},

View File

@@ -2,9 +2,9 @@
Test for the Resource Server (RS) Backend.
"""
import json
# pylint: disable=W0212
from logging import Logger
from unittest.mock import Mock, patch
@@ -14,10 +14,10 @@ from django.test.utils import override_settings
import pytest
from joserfc.errors import InvalidClaimError, InvalidTokenError
from joserfc.jwt import JWTClaimsRegistry
from joserfc.jwt import JWTClaimsRegistry, Token
from requests.exceptions import HTTPError
from core.resource_server.backend import ResourceServerBackend
from core.resource_server.backend import JWTResourceServerBackend, ResourceServerBackend
@pytest.fixture(name="mock_authorization_server")
@@ -50,6 +50,17 @@ def fixture_resource_server_backend(settings, mock_authorization_server):
return ResourceServerBackend(mock_authorization_server)
@pytest.fixture(name="jwt_resource_server_backend")
def fixture_jwt_resource_server_backend(settings, mock_authorization_server):
"""Generate a Resource Server backend."""
settings.OIDC_RS_CLIENT_ID = "client_id"
settings.OIDC_RS_CLIENT_SECRET = "client_secret"
settings.OIDC_RS_SCOPES = ["groups"]
return JWTResourceServerBackend(mock_authorization_server)
@override_settings(OIDC_RS_CLIENT_ID="client_id")
@override_settings(OIDC_RS_CLIENT_SECRET="client_secret")
@override_settings(OIDC_RS_ENCRYPTION_ENCODING="A256GCM")
@@ -73,12 +84,13 @@ def test_backend_initialization(mock_get_user_model, mock_authorization_server):
assert backend._scopes == ["scopes"]
assert backend._authorization_server_client == mock_authorization_server
assert isinstance(backend._claims_registry, JWTClaimsRegistry)
assert isinstance(backend._introspection_claims_registry, JWTClaimsRegistry)
assert backend._claims_registry.options == {
assert backend._introspection_claims_registry.options == {
"active": {"essential": True},
"client_id": {"essential": True},
"iss": {"essential": True, "value": "https://auth.server.com"},
"aud": {"essential": True, "value": "client_id"},
"token_introspection": {"essential": True},
"scope": {"essential": True},
}
@@ -97,7 +109,7 @@ def test_verify_claims_success(resource_server_backend, mock_token):
"""Test '_verify_claims' method with a successful response."""
with patch.object(
resource_server_backend._claims_registry, "validate"
resource_server_backend._introspection_claims_registry, "validate"
) as mock_validate:
resource_server_backend._verify_claims(mock_token)
mock_validate.assert_called_once_with(mock_token.claims)
@@ -107,7 +119,7 @@ def test_verify_claims_invalid_claim_error(resource_server_backend, mock_token):
"""Test '_verify_claims' method with an invalid claim error."""
with patch.object(
resource_server_backend._claims_registry, "validate"
resource_server_backend._introspection_claims_registry, "validate"
) as mock_validate:
mock_validate.side_effect = InvalidClaimError("claim_name")
@@ -124,7 +136,7 @@ def test_verify_claims_invalid_token_error(resource_server_backend, mock_token):
"""Test '_verify_claims' method with an invalid token error."""
with patch.object(
resource_server_backend._claims_registry, "validate"
resource_server_backend._introspection_claims_registry, "validate"
) as mock_validate:
mock_validate.side_effect = InvalidTokenError
@@ -140,8 +152,7 @@ def test_verify_claims_invalid_token_error(resource_server_backend, mock_token):
def test_decode_success(resource_server_backend):
"""Test '_decode' method with a successful response."""
encoded_token = Mock()
encoded_token.plaintext = "valid_encoded_token"
encoded_token = "valid_encoded_token"
public_key_set = Mock()
expected_decoded_token = {"sub": "user123"}
@@ -160,8 +171,7 @@ def test_decode_success(resource_server_backend):
def test_decode_failure(resource_server_backend):
"""Test '_decode' method with a ValueError"""
encoded_token = Mock()
encoded_token.plaintext = "invalid_encoded_token"
encoded_token = "invalid_encoded_token"
public_key_set = Mock()
with patch("joserfc.jwt.decode", side_effect=ValueError):
@@ -179,7 +189,8 @@ def test_decrypt_success(resource_server_backend):
encrypted_token = "valid_encrypted_token"
private_key = "private_key"
expected_decrypted_token = {"sub": "user123"}
expected_decrypted_token = Mock()
expected_decrypted_token.plaintext = "blah"
with patch(
"joserfc.jwe.decrypt_compact", return_value=expected_decrypted_token
@@ -189,7 +200,7 @@ def test_decrypt_success(resource_server_backend):
encrypted_token, private_key, algorithms=["RSA-OAEP", "A256GCM"]
)
assert decrypted_token == expected_decrypted_token
assert decrypted_token == "blah"
def test_decrypt_failure(resource_server_backend):
@@ -209,40 +220,70 @@ def test_decrypt_failure(resource_server_backend):
)
def test_resource_server_backend_introspect_success(resource_server_backend):
"""Test '_introspect' method with a successful response."""
token = "valid_token"
json_data = {"sub": "user123"}
resource_server_backend._authorization_server_client.get_introspection = Mock(
return_value=json.dumps(json_data)
)
result = resource_server_backend._introspect(token)
assert result.claims == json_data
resource_server_backend._authorization_server_client.get_introspection.assert_called_once_with(
"client_id", "client_secret", token
)
@patch(
"core.resource_server.utils.import_private_key_from_settings",
return_value="private_key",
)
# pylint: disable=unused-argument
def test_introspect_success(
mock_import_private_key_from_settings, resource_server_backend
def test_jwt_resource_server_backend_introspect_success(
mock_import_private_key_from_settings, jwt_resource_server_backend
):
"""Test '_introspect' method with a successful response."""
jwt_rs_backend = jwt_resource_server_backend # prevent line too long
token = "valid_token"
jwe = "valid_jwe"
jws = "valid_jws"
jwt = {"sub": "user123"}
jwt = {
"aud": "client_id",
"iss": "https://auth.server.com",
"token_introspection": {
"sub": "user123",
"aud": "client_id",
"iss": "https://auth.server.com",
},
}
resource_server_backend._authorization_server_client.get_introspection = Mock(
jwt_rs_backend._authorization_server_client.get_introspection = Mock(
return_value=jwe
)
resource_server_backend._decrypt = Mock(return_value=jws)
resource_server_backend._authorization_server_client.import_public_keys = Mock(
jwt_rs_backend._decrypt = Mock(return_value=jws)
jwt_rs_backend._authorization_server_client.import_public_keys = Mock(
return_value="public_key_set"
)
resource_server_backend._decode = Mock(return_value=jwt)
jwt_rs_backend._decode = Mock(return_value=Token({}, jwt))
result = resource_server_backend._introspect(token)
result = jwt_rs_backend._introspect(token)
assert result == jwt
resource_server_backend._authorization_server_client.get_introspection.assert_called_once_with(
assert result.claims == {
"sub": "user123",
"aud": "client_id",
"iss": "https://auth.server.com",
}
jwt_rs_backend._authorization_server_client.get_introspection.assert_called_once_with(
"client_id", "client_secret", token
)
resource_server_backend._decrypt.assert_called_once_with(
jwe, private_key="private_key"
)
resource_server_backend._authorization_server_client.import_public_keys.assert_called_once()
resource_server_backend._decode.assert_called_once_with(jws, "public_key_set")
jwt_rs_backend._decrypt.assert_called_once_with(jwe, private_key="private_key")
jwt_rs_backend._authorization_server_client.import_public_keys.assert_called_once()
jwt_rs_backend._decode.assert_called_once_with(jws, "public_key_set")
def test_introspect_introspection_failure(resource_server_backend):
@@ -267,37 +308,43 @@ def test_introspect_introspection_failure(resource_server_backend):
return_value="private_key",
)
# pylint: disable=unused-argument
def test_introspect_public_key_import_failure(
mock_import_private_key_from_settings, resource_server_backend
def test_jwt_resource_server_backend_introspect_public_key_import_failure(
mock_import_private_key_from_settings, jwt_resource_server_backend
):
"""Test '_introspect' method when fetching AS's jwks fails."""
token = "valid_token"
jwe = "valid_jwe"
jws = "valid_jws"
resource_server_backend._authorization_server_client.get_introspection = Mock(
jwt_resource_server_backend._authorization_server_client.get_introspection = Mock(
return_value=jwe
)
resource_server_backend._decrypt = Mock(return_value=jws)
jwt_resource_server_backend._decrypt = Mock(return_value=jws)
resource_server_backend._authorization_server_client.import_public_keys.side_effect = HTTPError(
"Public key error"
)
(
jwt_resource_server_backend._authorization_server_client.import_public_keys.side_effect
) = HTTPError("Public key error")
with patch.object(Logger, "debug") as mock_logger_debug:
expected_message = "Could get authorization server JWKS"
with pytest.raises(SuspiciousOperation, match=expected_message):
resource_server_backend._introspect(token)
jwt_resource_server_backend._introspect(token)
mock_logger_debug.assert_called_once_with(
"%s. Exception:", expected_message, exc_info=True
)
def test_verify_user_info_success(resource_server_backend):
def test_verify_user_info_success(resource_server_backend, settings):
"""Test '_verify_user_info' with a successful response."""
introspection_response = {"active": True, "scope": "groups", "aud": "123"}
# test default OIDC_RS_AUDIENCE_CLAIM = client_id
introspection_response = {"active": True, "scope": "groups", "client_id": "123"}
result = resource_server_backend._verify_user_info(introspection_response)
assert result == introspection_response
# test OIDC_RS_AUDIENCE_CLAIM = aud is taken into account
settings.OIDC_RS_AUDIENCE_CLAIM = "aud"
introspection_response = {"active": True, "scope": "groups", "aud": "123"}
result = resource_server_backend._verify_user_info(introspection_response)
assert result == introspection_response
@@ -328,19 +375,17 @@ def test_verify_user_info_wrong_scopes(resource_server_backend):
mock_logger_debug.assert_called_once_with(expected_message)
def test_get_user_success(resource_server_backend):
def test_resource_server_backend_get_user_success(resource_server_backend):
"""Test '_get_user' with a successful response."""
access_token = "valid_access_token"
mock_jwt = Mock()
mock_claims = {"token_introspection": {"sub": "user123", "aud": "123"}}
mock_claims = {"sub": "user123", "client_id": "123"}
mock_user = Mock()
resource_server_backend._introspect = Mock(return_value=mock_jwt)
resource_server_backend._verify_claims = Mock(return_value=mock_claims)
resource_server_backend._verify_user_info = Mock(
return_value=mock_claims["token_introspection"]
)
resource_server_backend._verify_user_info = Mock(return_value=mock_claims)
resource_server_backend.UserModel.objects.get = Mock(return_value=mock_user)
user = resource_server_backend.get_user(access_token)
@@ -348,9 +393,7 @@ def test_get_user_success(resource_server_backend):
assert user == mock_user
resource_server_backend._introspect.assert_called_once_with(access_token)
resource_server_backend._verify_claims.assert_called_once_with(mock_jwt)
resource_server_backend._verify_user_info.assert_called_once_with(
mock_claims["token_introspection"]
)
resource_server_backend._verify_user_info.assert_called_once_with(mock_claims)
resource_server_backend.UserModel.objects.get.assert_called_once_with(sub="user123")
@@ -393,18 +436,16 @@ def test_get_user_invalid_introspection_response(resource_server_backend):
resource_server_backend._verify_user_info.assert_not_called()
def test_get_user_user_not_found(resource_server_backend):
def test_resource_server_backend_get_user_user_not_found(resource_server_backend):
"""Test '_get_user' if the user is not found."""
access_token = "valid_access_token"
mock_jwt = Mock()
mock_claims = {"token_introspection": {"sub": "user123"}}
mock_claims = {"sub": "user123"}
resource_server_backend._introspect = Mock(return_value=mock_jwt)
resource_server_backend._verify_claims = Mock(return_value=mock_claims)
resource_server_backend._verify_user_info = Mock(
return_value=mock_claims["token_introspection"]
)
resource_server_backend._verify_user_info = Mock(return_value=mock_claims)
resource_server_backend.UserModel.objects.get = Mock(
side_effect=resource_server_backend.UserModel.DoesNotExist
)
@@ -414,9 +455,7 @@ def test_get_user_user_not_found(resource_server_backend):
assert user is None
resource_server_backend._introspect.assert_called_once_with(access_token)
resource_server_backend._verify_claims.assert_called_once_with(mock_jwt)
resource_server_backend._verify_user_info.assert_called_once_with(
mock_claims["token_introspection"]
)
resource_server_backend._verify_user_info.assert_called_once_with(mock_claims)
resource_server_backend.UserModel.objects.get.assert_called_once_with(
sub="user123"
)

View File

@@ -10,6 +10,8 @@ For the full list of settings and their values, see
https://docs.djangoproject.com/en/3.1/ref/settings/
"""
# pylint: disable=too-many-lines
import json
import os
@@ -480,6 +482,16 @@ class Base(Configuration):
None, environ_name="OIDC_OP_TOKEN_INTROSPECTION_ENDPOINT", environ_prefix=None
)
OIDC_OP_URL = values.Value(None, environ_name="OIDC_OP_URL", environ_prefix=None)
OIDC_RS_BACKEND_CLASS = values.Value(
"core.resource_server.backend.ResourceServerBackend",
environ_name="OIDC_RS_BACKEND_CLASS",
environ_prefix=None,
)
OIDC_RS_AUDIENCE_CLAIM = values.Value(
"client_id",
environ_name="OIDC_RS_AUDIENCE_CLAIM",
environ_prefix=None,
)
OIDC_RS_CLIENT_ID = values.Value(
None, environ_name="OIDC_RS_CLIENT_ID", environ_prefix=None
)
@@ -489,7 +501,7 @@ class Base(Configuration):
environ_prefix=None,
)
OIDC_RS_SIGNING_ALGO = values.Value(
default="ES256", environ_name="OIDC_RS_SIGNING_ALG0", environ_prefix=None
default="ES256", environ_name="OIDC_RS_SIGNING_ALGO", environ_prefix=None
)
OIDC_RS_SCOPES = values.ListValue(
["groups"], environ_name="OIDC_RS_SCOPES", environ_prefix=None