✨(backend) add resource server backend
Why: Many services in La Suite rely on Agent Connect to authenticate their users. Delegating authentication to Agent Connect is highly beneficial. With a central party (Agent Connect) handling user authentication, our services can seamlessly communicate with each other. Our backend must be able to receive and verify access tokens issued by Agent Connect. Additionally, it should ensure that the resource owner has granted permission for our data to the service provider transmitting the access token. How: Our backend needs to verify access tokens by introspecting them. This involves requesting the Authorization Server to validate the access token received in the authentication header. The Authorization Server validates the token's integrity, provides authentication and authorization information about the user currently logged into the service provider requesting data from the resource server. The data returned by the Authorization Server to the resource server is encrypted and signed. To encrypt the introspection token, the Authorization Server retrieves the resource server's public key from the new ‘/jwks’ endpoint. Encryption parameters, such as algorithm and encoding, are configured on the resource server. Ensure that these parameters match between the Authorization Server and the resource server. The resource server verifies the token signature using the Authorization Server's public key, exposed through its `/jwks` endpoint. Make sure the signature algorithms match between both servers. Finally, introspection token claims are verified to adhere to good practices for handling JWTs, including checks on issuer, audience, and expiration time. The introspection token contains a subject (`sub`). The resource server uses this subject to retrieve the requested database user, compatible with both pairwise and public subjects. Important: Agent Connect does not follow RFC 7662 but uses a draft RFC that adds security (signing/encryption) to the initial specification. Refer to the "References" section for more information. References: The initial RFC describing token introspection is RFC 7662 "OAuth 2.0 Token Introspection". However, this RFC specifies that the introspection response is a plain JSON object. In eGovernment applications, our resource server requires stronger assurance that the Authorization Server issued the token introspection response. France Connect's team implemented a stronger version of the spec, returning a signed and encrypted token introspection response. This version is still a draft, available under: "draft-ietf-oauth-jwt-introspection-response".
This commit is contained in:
committed by
aleb_the_flash
parent
9c05167d80
commit
5634a7f390
@@ -33,6 +33,9 @@ LOGOUT_REDIRECT_URL=http://localhost:3000
|
||||
|
||||
OIDC_REDIRECT_ALLOWED_HOSTS=["http://localhost:8083", "http://localhost:3000"]
|
||||
OIDC_AUTH_REQUEST_EXTRA_PARAMS={"acr_values": "eidas1"}
|
||||
|
||||
OIDC_RS_CLIENT_ID=people
|
||||
OIDC_RS_CLIENT_SECRET=ThisIsAnExampleKeyForDevPurposeOnly
|
||||
OIDC_RS_PRIVATE_KEY_STR="-----BEGIN PRIVATE KEY-----
|
||||
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC3boG1kwEGUYL+
|
||||
U58RPrVToIsF9jHB64S6WJIIInPmAclBciXFb6BWG11mbRIgo8ha3WVnC/tGHbXb
|
||||
|
||||
@@ -79,7 +79,7 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
||||
)
|
||||
|
||||
try:
|
||||
user = self.UserModel.objects.get(sub=sub)
|
||||
user = self.UserModel.objects.get(sub=sub, is_active=True)
|
||||
except self.UserModel.DoesNotExist:
|
||||
if self.get_settings("OIDC_CREATE_USER", True):
|
||||
user = self.create_user(user_info)
|
||||
|
||||
@@ -0,0 +1,203 @@
|
||||
"""Resource Server Backend"""
|
||||
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib import auth
|
||||
from django.core.exceptions import SuspiciousOperation
|
||||
|
||||
from joserfc import jwe as jose_jwe
|
||||
from joserfc import jwt as jose_jwt
|
||||
from joserfc.errors import InvalidClaimError, InvalidTokenError
|
||||
from requests.exceptions import HTTPError
|
||||
|
||||
from . import utils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ResourceServerBackend:
|
||||
"""Backend of an OAuth 2.0 resource server.
|
||||
|
||||
This backend is designed to authenticate resource owners to our API using the access token
|
||||
they received from the authorization server.
|
||||
|
||||
In the context of OAuth 2.0, a resource server is a server that hosts protected resources and
|
||||
is capable of accepting and responding to protected resource requests using access tokens.
|
||||
The resource server verifies the validity of the access tokens issued by the authorization
|
||||
server to ensure secure access to the resources.
|
||||
|
||||
For more information, visit: https://www.oauth.com/oauth2-servers/the-resource-server/
|
||||
"""
|
||||
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
def __init__(self, authorization_server_client):
|
||||
# pylint: disable=invalid-name
|
||||
self.UserModel = auth.get_user_model()
|
||||
|
||||
self._client_id = settings.OIDC_RS_CLIENT_ID
|
||||
self._client_secret = settings.OIDC_RS_CLIENT_SECRET
|
||||
self._encryption_encoding = settings.OIDC_RS_ENCRYPTION_ENCODING
|
||||
self._encryption_algorithm = settings.OIDC_RS_ENCRYPTION_ALGO
|
||||
self._signing_algorithm = settings.OIDC_RS_SIGNING_ALGO
|
||||
self._scopes = settings.OIDC_RS_SCOPES
|
||||
self._authorization_server_client = authorization_server_client
|
||||
|
||||
self._claims_registry = jose_jwt.JWTClaimsRegistry(
|
||||
iss={"essential": True, "value": self._authorization_server_client.url},
|
||||
aud={"essential": True, "value": self._client_id},
|
||||
token_introspection={"essential": True},
|
||||
)
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def get_or_create_user(self, access_token, id_token, payload):
|
||||
"""Maintain API compatibility with OIDCAuthentication class from mozilla-django-oidc
|
||||
|
||||
Params 'id_token', 'payload' won't be used, and our implementation will only
|
||||
support 'get_user', not 'get_or_create_user'.
|
||||
"""
|
||||
return self.get_user(access_token)
|
||||
|
||||
def get_user(self, access_token):
|
||||
"""Get user from an access token emitted by the authorization server.
|
||||
|
||||
This method will submit the access token to the authorization server for
|
||||
introspection, to ensure its validity and obtain the associated metadata.
|
||||
|
||||
It follows the specifications outlined in RFC7662 https://www.rfc-editor.org/info/rfc7662,
|
||||
https://datatracker.ietf.org/doc/html/draft-ietf-oauth-jwt-introspection-response-12.
|
||||
|
||||
In our eGovernment applications, the standard RFC 7662 doesn't provide sufficient security.
|
||||
Its introspection response is a plain JSON object. Therefore, we use the draft RFC
|
||||
that extends RFC 7662 by returning a signed and encrypted JWT for stronger assurance that
|
||||
the authorization server issued the token introspection response.
|
||||
"""
|
||||
jwt = self._introspect(access_token)
|
||||
claims = self._verify_claims(jwt)
|
||||
user_info = self._verify_user_info(claims["token_introspection"])
|
||||
|
||||
sub = user_info.get("sub")
|
||||
if sub is None:
|
||||
message = "User info contained no recognizable user identification"
|
||||
logger.debug(message)
|
||||
raise SuspiciousOperation(message)
|
||||
try:
|
||||
user = self.UserModel.objects.get(sub=sub)
|
||||
except self.UserModel.DoesNotExist:
|
||||
logger.debug("Login failed: No user with %s found", sub)
|
||||
return None
|
||||
|
||||
return user
|
||||
|
||||
def _verify_user_info(self, introspection_response):
|
||||
"""Verify the 'introspection_response' to get valid and relevant user info.
|
||||
|
||||
The 'introspection_response' should be still active, and while authenticating
|
||||
the resource owner should have requested relevant scope to access her data in
|
||||
our resource server.
|
||||
|
||||
Scope should be configured to match between the AS and the RS. The AS will filter
|
||||
all the scopes the resource owner requested to expose only the relevant ones to
|
||||
our resource server.
|
||||
"""
|
||||
|
||||
active = introspection_response.get("active", None)
|
||||
|
||||
if not active:
|
||||
message = "Introspection response is not active."
|
||||
logger.debug(message)
|
||||
raise SuspiciousOperation(message)
|
||||
|
||||
requested_scopes = introspection_response.get("scope", None).split(" ")
|
||||
if set(self._scopes).isdisjoint(set(requested_scopes)):
|
||||
message = "Introspection response contains any required scopes."
|
||||
logger.debug(message)
|
||||
raise SuspiciousOperation(message)
|
||||
|
||||
return introspection_response
|
||||
|
||||
def _introspect(self, token):
|
||||
"""Introspect an access token to the authorization server."""
|
||||
try:
|
||||
jwe = self._authorization_server_client.get_introspection(
|
||||
self._client_id,
|
||||
self._client_secret,
|
||||
token,
|
||||
)
|
||||
except HTTPError as err:
|
||||
message = "Could not fetch introspection"
|
||||
logger.debug("%s. Exception:", message, exc_info=True)
|
||||
raise SuspiciousOperation(message) from err
|
||||
|
||||
private_key = utils.import_private_key_from_settings()
|
||||
jws = self._decrypt(jwe, private_key=private_key)
|
||||
|
||||
try:
|
||||
public_key_set = self._authorization_server_client.import_public_keys()
|
||||
except (TypeError, ValueError, AttributeError, HTTPError) as err:
|
||||
message = "Could get authorization server JWKS"
|
||||
logger.debug("%s. Exception:", message, exc_info=True)
|
||||
raise SuspiciousOperation(message) from err
|
||||
|
||||
jwt = self._decode(jws, public_key_set)
|
||||
|
||||
return jwt
|
||||
|
||||
def _decrypt(self, encrypted_token, private_key):
|
||||
"""Decrypt the token encrypted by the Authorization Server (AS).
|
||||
|
||||
Resource Server (RS)'s public key is used for encryption, and its private
|
||||
key is used for decryption. The RS's public key is exposed to the AS via a JWKS endpoint.
|
||||
Encryption Algorithm and Encoding should be configured to match between the AS
|
||||
and the RS.
|
||||
"""
|
||||
|
||||
try:
|
||||
decrypted_token = jose_jwe.decrypt_compact(
|
||||
encrypted_token,
|
||||
private_key,
|
||||
algorithms=[self._encryption_algorithm, self._encryption_encoding],
|
||||
)
|
||||
except Exception as err:
|
||||
message = "Token decryption failed"
|
||||
logger.debug("%s. Exception:", message, exc_info=True)
|
||||
raise SuspiciousOperation(message) from err
|
||||
|
||||
return decrypted_token
|
||||
|
||||
def _decode(self, encoded_token, public_key_set):
|
||||
"""Decode the token signed by the Authorization Server (AS).
|
||||
|
||||
AS's private key is used for signing, and its public key is used for decoding.
|
||||
The AS public key is exposed via a JWK endpoint.
|
||||
Signing Algorithm should be configured to match between the AS and the RS.
|
||||
"""
|
||||
try:
|
||||
token = jose_jwt.decode(
|
||||
encoded_token.plaintext,
|
||||
public_key_set,
|
||||
algorithms=[self._signing_algorithm],
|
||||
)
|
||||
except ValueError as err:
|
||||
message = "Token decoding failed"
|
||||
logger.debug("%s. Exception:", message, exc_info=True)
|
||||
raise SuspiciousOperation(message) from err
|
||||
|
||||
return token
|
||||
|
||||
def _verify_claims(self, token):
|
||||
"""Verify the claims of the token to ensure authentication security.
|
||||
|
||||
By verifying these claims, we ensure that the token was issued by a
|
||||
trusted authorization server and is intended for this specific
|
||||
resource server. This prevents various types of attacks, such as
|
||||
token substitution or misuse of tokens issued for different clients.
|
||||
"""
|
||||
try:
|
||||
self._claims_registry.validate(token.claims)
|
||||
except (InvalidClaimError, InvalidTokenError) as err:
|
||||
message = "Failed to validate token's claims"
|
||||
logger.debug("%s. Exception:", message, exc_info=True)
|
||||
raise SuspiciousOperation(message) from err
|
||||
|
||||
return token.claims
|
||||
|
||||
447
src/backend/core/tests/resource_server/test_backend.py
Normal file
447
src/backend/core/tests/resource_server/test_backend.py
Normal file
@@ -0,0 +1,447 @@
|
||||
"""
|
||||
Test for the Resource Server (RS) Backend.
|
||||
"""
|
||||
|
||||
|
||||
# pylint: disable=W0212
|
||||
|
||||
from logging import Logger
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from django.contrib import auth
|
||||
from django.core.exceptions import SuspiciousOperation
|
||||
from django.test.utils import override_settings
|
||||
|
||||
import pytest
|
||||
from joserfc.errors import InvalidClaimError, InvalidTokenError
|
||||
from joserfc.jwt import JWTClaimsRegistry
|
||||
from requests.exceptions import HTTPError
|
||||
|
||||
from core.resource_server.backend import ResourceServerBackend
|
||||
|
||||
|
||||
@pytest.fixture(name="mock_authorization_server")
|
||||
def fixture_mock_authorization_server():
|
||||
"""Mock an Authorization Server client."""
|
||||
mock_server = Mock()
|
||||
mock_server.url = "https://auth.server.com"
|
||||
return mock_server
|
||||
|
||||
|
||||
@pytest.fixture(name="mock_token")
|
||||
def fixture_mock_token():
|
||||
"""Mock a token"""
|
||||
mock_token = Mock()
|
||||
mock_token.claims = {"sub": "user123", "iss": "https://auth.server.com"}
|
||||
return mock_token
|
||||
|
||||
|
||||
@pytest.fixture(name="resource_server_backend")
|
||||
def fixture_resource_server_backend(settings, mock_authorization_server):
|
||||
"""Generate a Resource Server backend."""
|
||||
|
||||
settings.OIDC_RS_CLIENT_ID = "client_id"
|
||||
settings.OIDC_RS_CLIENT_SECRET = "client_secret"
|
||||
settings.OIDC_RS_ENCRYPTION_ENCODING = "A256GCM"
|
||||
settings.OIDC_RS_ENCRYPTION_ALGO = "RSA-OAEP"
|
||||
settings.OIDC_RS_SIGNING_ALGO = "ES256"
|
||||
settings.OIDC_RS_SCOPES = ["groups"]
|
||||
|
||||
return ResourceServerBackend(mock_authorization_server)
|
||||
|
||||
|
||||
@override_settings(OIDC_RS_CLIENT_ID="client_id")
|
||||
@override_settings(OIDC_RS_CLIENT_SECRET="client_secret")
|
||||
@override_settings(OIDC_RS_ENCRYPTION_ENCODING="A256GCM")
|
||||
@override_settings(OIDC_RS_ENCRYPTION_ALGO="RSA-OAEP")
|
||||
@override_settings(OIDC_RS_SIGNING_ALGO="RS256")
|
||||
@override_settings(OIDC_RS_SCOPES=["scopes"])
|
||||
@patch.object(auth, "get_user_model", return_value="foo")
|
||||
def test_backend_initialization(mock_get_user_model, mock_authorization_server):
|
||||
"""Test the ResourceServerBackend initialization."""
|
||||
|
||||
backend = ResourceServerBackend(mock_authorization_server)
|
||||
|
||||
mock_get_user_model.assert_called_once()
|
||||
assert backend.UserModel == "foo"
|
||||
|
||||
assert backend._client_id == "client_id"
|
||||
assert backend._client_secret == "client_secret"
|
||||
assert backend._encryption_encoding == "A256GCM"
|
||||
assert backend._encryption_algorithm == "RSA-OAEP"
|
||||
assert backend._signing_algorithm == "RS256"
|
||||
assert backend._scopes == ["scopes"]
|
||||
|
||||
assert backend._authorization_server_client == mock_authorization_server
|
||||
assert isinstance(backend._claims_registry, JWTClaimsRegistry)
|
||||
|
||||
assert backend._claims_registry.options == {
|
||||
"iss": {"essential": True, "value": "https://auth.server.com"},
|
||||
"aud": {"essential": True, "value": "client_id"},
|
||||
"token_introspection": {"essential": True},
|
||||
}
|
||||
|
||||
|
||||
@patch.object(ResourceServerBackend, "get_user", return_value="user")
|
||||
def test_get_or_create_user(mock_get_user, resource_server_backend):
|
||||
"""Test 'get_or_create_user' method."""
|
||||
|
||||
access_token = "access_token"
|
||||
res = resource_server_backend.get_or_create_user(access_token, None, None)
|
||||
|
||||
mock_get_user.assert_called_once_with(access_token)
|
||||
assert res == "user"
|
||||
|
||||
|
||||
def test_verify_claims_success(resource_server_backend, mock_token):
|
||||
"""Test '_verify_claims' method with a successful response."""
|
||||
|
||||
with patch.object(
|
||||
resource_server_backend._claims_registry, "validate"
|
||||
) as mock_validate:
|
||||
resource_server_backend._verify_claims(mock_token)
|
||||
mock_validate.assert_called_once_with(mock_token.claims)
|
||||
|
||||
|
||||
def test_verify_claims_invalid_claim_error(resource_server_backend, mock_token):
|
||||
"""Test '_verify_claims' method with an invalid claim error."""
|
||||
|
||||
with patch.object(
|
||||
resource_server_backend._claims_registry, "validate"
|
||||
) as mock_validate:
|
||||
mock_validate.side_effect = InvalidClaimError("claim_name")
|
||||
|
||||
expected_message = "Failed to validate token's claims"
|
||||
with patch.object(Logger, "debug") as mock_logger_debug:
|
||||
with pytest.raises(SuspiciousOperation, match=expected_message):
|
||||
resource_server_backend._verify_claims(mock_token)
|
||||
mock_logger_debug.assert_called_once_with(
|
||||
"%s. Exception:", expected_message, exc_info=True
|
||||
)
|
||||
|
||||
|
||||
def test_verify_claims_invalid_token_error(resource_server_backend, mock_token):
|
||||
"""Test '_verify_claims' method with an invalid token error."""
|
||||
|
||||
with patch.object(
|
||||
resource_server_backend._claims_registry, "validate"
|
||||
) as mock_validate:
|
||||
mock_validate.side_effect = InvalidTokenError
|
||||
|
||||
expected_message = "Failed to validate token's claims"
|
||||
with patch.object(Logger, "debug") as mock_logger_debug:
|
||||
with pytest.raises(SuspiciousOperation, match=expected_message):
|
||||
resource_server_backend._verify_claims(mock_token)
|
||||
mock_logger_debug.assert_called_once_with(
|
||||
"%s. Exception:", expected_message, exc_info=True
|
||||
)
|
||||
|
||||
|
||||
def test_decode_success(resource_server_backend):
|
||||
"""Test '_decode' method with a successful response."""
|
||||
|
||||
encoded_token = Mock()
|
||||
encoded_token.plaintext = "valid_encoded_token"
|
||||
public_key_set = Mock()
|
||||
|
||||
expected_decoded_token = {"sub": "user123"}
|
||||
|
||||
with patch(
|
||||
"joserfc.jwt.decode", return_value=expected_decoded_token
|
||||
) as mock_decode:
|
||||
decoded_token = resource_server_backend._decode(encoded_token, public_key_set)
|
||||
|
||||
mock_decode.assert_called_once_with(
|
||||
"valid_encoded_token", public_key_set, algorithms=["ES256"]
|
||||
)
|
||||
|
||||
assert decoded_token == expected_decoded_token
|
||||
|
||||
|
||||
def test_decode_failure(resource_server_backend):
|
||||
"""Test '_decode' method with a ValueError"""
|
||||
encoded_token = Mock()
|
||||
encoded_token.plaintext = "invalid_encoded_token"
|
||||
public_key_set = Mock()
|
||||
|
||||
with patch("joserfc.jwt.decode", side_effect=ValueError):
|
||||
with patch.object(Logger, "debug") as mock_logger_debug:
|
||||
with pytest.raises(SuspiciousOperation, match="Token decoding failed"):
|
||||
resource_server_backend._decode(encoded_token, public_key_set)
|
||||
|
||||
mock_logger_debug.assert_called_once_with(
|
||||
"%s. Exception:", "Token decoding failed", exc_info=True
|
||||
)
|
||||
|
||||
|
||||
def test_decrypt_success(resource_server_backend):
|
||||
"""Test '_decrypt' method with a successful response."""
|
||||
encrypted_token = "valid_encrypted_token"
|
||||
private_key = "private_key"
|
||||
|
||||
expected_decrypted_token = {"sub": "user123"}
|
||||
|
||||
with patch(
|
||||
"joserfc.jwe.decrypt_compact", return_value=expected_decrypted_token
|
||||
) as mock_decrypt:
|
||||
decrypted_token = resource_server_backend._decrypt(encrypted_token, private_key)
|
||||
mock_decrypt.assert_called_once_with(
|
||||
encrypted_token, private_key, algorithms=["RSA-OAEP", "A256GCM"]
|
||||
)
|
||||
|
||||
assert decrypted_token == expected_decrypted_token
|
||||
|
||||
|
||||
def test_decrypt_failure(resource_server_backend):
|
||||
"""Test '_decrypt' method with an Exception."""
|
||||
encrypted_token = "invalid_encrypted_token"
|
||||
private_key = "private_key"
|
||||
|
||||
with patch(
|
||||
"joserfc.jwe.decrypt_compact", side_effect=Exception("Decryption error")
|
||||
):
|
||||
expected_message = "Token decryption failed"
|
||||
with patch.object(Logger, "debug") as mock_logger_debug:
|
||||
with pytest.raises(SuspiciousOperation, match=expected_message):
|
||||
resource_server_backend._decrypt(encrypted_token, private_key)
|
||||
mock_logger_debug.assert_called_once_with(
|
||||
"%s. Exception:", expected_message, exc_info=True
|
||||
)
|
||||
|
||||
|
||||
@patch(
|
||||
"core.resource_server.utils.import_private_key_from_settings",
|
||||
return_value="private_key",
|
||||
)
|
||||
# pylint: disable=unused-argument
|
||||
def test_introspect_success(
|
||||
mock_import_private_key_from_settings, resource_server_backend
|
||||
):
|
||||
"""Test '_introspect' method with a successful response."""
|
||||
token = "valid_token"
|
||||
jwe = "valid_jwe"
|
||||
jws = "valid_jws"
|
||||
jwt = {"sub": "user123"}
|
||||
|
||||
resource_server_backend._authorization_server_client.get_introspection = Mock(
|
||||
return_value=jwe
|
||||
)
|
||||
resource_server_backend._decrypt = Mock(return_value=jws)
|
||||
resource_server_backend._authorization_server_client.import_public_keys = Mock(
|
||||
return_value="public_key_set"
|
||||
)
|
||||
resource_server_backend._decode = Mock(return_value=jwt)
|
||||
|
||||
result = resource_server_backend._introspect(token)
|
||||
|
||||
assert result == jwt
|
||||
resource_server_backend._authorization_server_client.get_introspection.assert_called_once_with(
|
||||
"client_id", "client_secret", token
|
||||
)
|
||||
resource_server_backend._decrypt.assert_called_once_with(
|
||||
jwe, private_key="private_key"
|
||||
)
|
||||
resource_server_backend._authorization_server_client.import_public_keys.assert_called_once()
|
||||
resource_server_backend._decode.assert_called_once_with(jws, "public_key_set")
|
||||
|
||||
|
||||
def test_introspect_introspection_failure(resource_server_backend):
|
||||
"""Test '_introspect' method when introspection to the AS fails."""
|
||||
token = "invalid_token"
|
||||
resource_server_backend._authorization_server_client.get_introspection.side_effect = HTTPError(
|
||||
"Introspection error"
|
||||
)
|
||||
|
||||
with patch.object(Logger, "debug") as mock_logger_debug:
|
||||
expected_message = "Could not fetch introspection"
|
||||
with pytest.raises(SuspiciousOperation, match=expected_message):
|
||||
resource_server_backend._introspect(token)
|
||||
|
||||
mock_logger_debug.assert_called_once_with(
|
||||
"%s. Exception:", expected_message, exc_info=True
|
||||
)
|
||||
|
||||
|
||||
@patch(
|
||||
"core.resource_server.utils.import_private_key_from_settings",
|
||||
return_value="private_key",
|
||||
)
|
||||
# pylint: disable=unused-argument
|
||||
def test_introspect_public_key_import_failure(
|
||||
mock_import_private_key_from_settings, resource_server_backend
|
||||
):
|
||||
"""Test '_introspect' method when fetching AS's jwks fails."""
|
||||
token = "valid_token"
|
||||
jwe = "valid_jwe"
|
||||
jws = "valid_jws"
|
||||
|
||||
resource_server_backend._authorization_server_client.get_introspection = Mock(
|
||||
return_value=jwe
|
||||
)
|
||||
resource_server_backend._decrypt = Mock(return_value=jws)
|
||||
|
||||
resource_server_backend._authorization_server_client.import_public_keys.side_effect = HTTPError(
|
||||
"Public key error"
|
||||
)
|
||||
|
||||
with patch.object(Logger, "debug") as mock_logger_debug:
|
||||
expected_message = "Could get authorization server JWKS"
|
||||
with pytest.raises(SuspiciousOperation, match=expected_message):
|
||||
resource_server_backend._introspect(token)
|
||||
|
||||
mock_logger_debug.assert_called_once_with(
|
||||
"%s. Exception:", expected_message, exc_info=True
|
||||
)
|
||||
|
||||
|
||||
def test_verify_user_info_success(resource_server_backend):
|
||||
"""Test '_verify_user_info' with a successful response."""
|
||||
introspection_response = {"active": True, "scope": "groups"}
|
||||
|
||||
result = resource_server_backend._verify_user_info(introspection_response)
|
||||
assert result == introspection_response
|
||||
|
||||
|
||||
def test_verify_user_info_inactive(resource_server_backend):
|
||||
"""Test '_verify_user_info' with an inactive introspection response."""
|
||||
|
||||
introspection_response = {"active": False, "scope": "groups"}
|
||||
|
||||
expected_message = "Introspection response is not active."
|
||||
with patch.object(Logger, "debug") as mock_logger_debug:
|
||||
with pytest.raises(SuspiciousOperation, match=expected_message):
|
||||
resource_server_backend._verify_user_info(introspection_response)
|
||||
|
||||
mock_logger_debug.assert_called_once_with(expected_message)
|
||||
|
||||
|
||||
def test_verify_user_info_wrong_scopes(resource_server_backend):
|
||||
"""Test '_verify_user_info' with wrong requested scopes."""
|
||||
|
||||
introspection_response = {"active": True, "scope": "wrong-scopes"}
|
||||
|
||||
expected_message = "Introspection response contains any required scopes."
|
||||
with patch.object(Logger, "debug") as mock_logger_debug:
|
||||
with pytest.raises(SuspiciousOperation, match=expected_message):
|
||||
resource_server_backend._verify_user_info(introspection_response)
|
||||
|
||||
mock_logger_debug.assert_called_once_with(expected_message)
|
||||
|
||||
|
||||
def test_get_user_success(resource_server_backend):
|
||||
"""Test '_get_user' with a successful response."""
|
||||
|
||||
access_token = "valid_access_token"
|
||||
mock_jwt = Mock()
|
||||
mock_claims = {"token_introspection": {"sub": "user123"}}
|
||||
mock_user = Mock()
|
||||
|
||||
resource_server_backend._introspect = Mock(return_value=mock_jwt)
|
||||
resource_server_backend._verify_claims = Mock(return_value=mock_claims)
|
||||
resource_server_backend._verify_user_info = Mock(
|
||||
return_value=mock_claims["token_introspection"]
|
||||
)
|
||||
resource_server_backend.UserModel.objects.get = Mock(return_value=mock_user)
|
||||
|
||||
user = resource_server_backend.get_user(access_token)
|
||||
|
||||
assert user == mock_user
|
||||
resource_server_backend._introspect.assert_called_once_with(access_token)
|
||||
resource_server_backend._verify_claims.assert_called_once_with(mock_jwt)
|
||||
resource_server_backend._verify_user_info.assert_called_once_with(
|
||||
mock_claims["token_introspection"]
|
||||
)
|
||||
resource_server_backend.UserModel.objects.get.assert_called_once_with(sub="user123")
|
||||
|
||||
|
||||
def test_get_user_could_not_introspect(resource_server_backend):
|
||||
"""Test '_get_user' with introspection failing."""
|
||||
|
||||
access_token = "valid_access_token"
|
||||
|
||||
resource_server_backend._introspect = Mock(
|
||||
side_effect=SuspiciousOperation("Invalid jwt")
|
||||
)
|
||||
resource_server_backend._verify_claims = Mock()
|
||||
resource_server_backend._verify_user_info = Mock()
|
||||
|
||||
with pytest.raises(SuspiciousOperation, match="Invalid jwt"):
|
||||
resource_server_backend.get_user(access_token)
|
||||
|
||||
resource_server_backend._introspect.assert_called_once_with(access_token)
|
||||
resource_server_backend._verify_claims.assert_not_called()
|
||||
resource_server_backend._verify_user_info.assert_not_called()
|
||||
|
||||
|
||||
def test_get_user_invalid_introspection_response(resource_server_backend):
|
||||
"""Test '_get_user' with an invalid introspection response."""
|
||||
|
||||
access_token = "valid_access_token"
|
||||
mock_jwt = Mock()
|
||||
|
||||
resource_server_backend._introspect = Mock(return_value=mock_jwt)
|
||||
resource_server_backend._verify_claims = Mock(
|
||||
side_effect=SuspiciousOperation("Invalid claims")
|
||||
)
|
||||
resource_server_backend._verify_user_info = Mock()
|
||||
|
||||
with pytest.raises(SuspiciousOperation, match="Invalid claims"):
|
||||
resource_server_backend.get_user(access_token)
|
||||
|
||||
resource_server_backend._introspect.assert_called_once_with(access_token)
|
||||
resource_server_backend._verify_claims.assert_called_once_with(mock_jwt)
|
||||
resource_server_backend._verify_user_info.assert_not_called()
|
||||
|
||||
|
||||
def test_get_user_user_not_found(resource_server_backend):
|
||||
"""Test '_get_user' if the user is not found."""
|
||||
|
||||
access_token = "valid_access_token"
|
||||
mock_jwt = Mock()
|
||||
mock_claims = {"token_introspection": {"sub": "user123"}}
|
||||
|
||||
resource_server_backend._introspect = Mock(return_value=mock_jwt)
|
||||
resource_server_backend._verify_claims = Mock(return_value=mock_claims)
|
||||
resource_server_backend._verify_user_info = Mock(
|
||||
return_value=mock_claims["token_introspection"]
|
||||
)
|
||||
resource_server_backend.UserModel.objects.get = Mock(
|
||||
side_effect=resource_server_backend.UserModel.DoesNotExist
|
||||
)
|
||||
|
||||
with patch.object(Logger, "debug") as mock_logger_debug:
|
||||
user = resource_server_backend.get_user(access_token)
|
||||
assert user is None
|
||||
resource_server_backend._introspect.assert_called_once_with(access_token)
|
||||
resource_server_backend._verify_claims.assert_called_once_with(mock_jwt)
|
||||
resource_server_backend._verify_user_info.assert_called_once_with(
|
||||
mock_claims["token_introspection"]
|
||||
)
|
||||
resource_server_backend.UserModel.objects.get.assert_called_once_with(
|
||||
sub="user123"
|
||||
)
|
||||
|
||||
mock_logger_debug.assert_called_once_with(
|
||||
"Login failed: No user with %s found", "user123"
|
||||
)
|
||||
|
||||
|
||||
def test_get_user_no_user_identification(resource_server_backend):
|
||||
"""Test '_get_user' if the response miss a user identification."""
|
||||
|
||||
access_token = "valid_access_token"
|
||||
mock_jwt = Mock()
|
||||
mock_claims = {"token_introspection": {}}
|
||||
|
||||
resource_server_backend._introspect = Mock(return_value=mock_jwt)
|
||||
resource_server_backend._verify_claims = Mock(return_value=mock_claims)
|
||||
resource_server_backend._verify_user_info = Mock(
|
||||
return_value=mock_claims["token_introspection"]
|
||||
)
|
||||
|
||||
expected_message = "User info contained no recognizable user identification"
|
||||
with patch.object(Logger, "debug") as mock_logger_debug:
|
||||
with pytest.raises(SuspiciousOperation, match=expected_message):
|
||||
resource_server_backend.get_user(access_token)
|
||||
|
||||
mock_logger_debug.assert_called_once_with(expected_message)
|
||||
@@ -388,6 +388,32 @@ class Base(Configuration):
|
||||
environ_prefix=None,
|
||||
)
|
||||
|
||||
OIDC_OP_TOKEN_INTROSPECTION_ENDPOINT = values.Value(
|
||||
None, environ_name="OIDC_OP_TOKEN_INTROSPECTION_ENDPOINT", environ_prefix=None
|
||||
)
|
||||
OIDC_OP_URL = values.Value(None, environ_name="OIDC_OP_URL", environ_prefix=None)
|
||||
OIDC_RS_CLIENT_ID = values.Value(
|
||||
None, environ_name="OIDC_RS_CLIENT_ID", environ_prefix=None
|
||||
)
|
||||
OIDC_RS_CLIENT_SECRET = values.Value(
|
||||
None,
|
||||
environ_name="OIDC_RS_CLIENT_SECRET",
|
||||
environ_prefix=None,
|
||||
)
|
||||
OIDC_RS_SIGNING_ALGO = values.Value(
|
||||
default="ES256", environ_name="OIDC_RS_SIGNING_ALG0", environ_prefix=None
|
||||
)
|
||||
OIDC_RS_SCOPES = values.ListValue(
|
||||
["groups"], environ_name="OIDC_RS_SCOPES", environ_prefix=None
|
||||
)
|
||||
OIDC_PROXY = values.Value(None, environ_name="OIDC_PROXY", environ_prefix=None)
|
||||
|
||||
OIDC_VERIFY_SSL = values.BooleanValue(
|
||||
True, environ_name="OIDC_VERIFY_SSL", environ_prefix=None
|
||||
)
|
||||
|
||||
OIDC_TIMEOUT = values.Value(None, environ_name="OIDC_TIMEOUT", environ_prefix=None)
|
||||
|
||||
# mailboxes provisioning API
|
||||
MAIL_PROVISIONING_API_URL = values.Value(
|
||||
default="https://api.dev.ox.numerique.gouv.fr",
|
||||
|
||||
Reference in New Issue
Block a user