♻️(backend) rename required claims to essential claims as per spec
It was pointed by @lebaudantoine that the OIDC specification uses the term "essential claims" for what we called required claims. Further more, the Mozilla OIDC library that we use, validates claims in a method called "verify_claims". Let's override this method.
This commit is contained in:
committed by
Samuel Paccoud
parent
f12c06e975
commit
9f83ea7111
@@ -11,8 +11,8 @@ and this project adheres to
|
|||||||
|
|
||||||
## Added
|
## Added
|
||||||
|
|
||||||
🔧(backend) add option to configure list of required OIDC claims #525
|
- 🔧(backend) add option to configure list of essential OIDC claims #525 & #531
|
||||||
🔧(helm) add option to disable default tls setting by @dominikkaminski #519
|
- 🔧(helm) add option to disable default tls setting by @dominikkaminski #519
|
||||||
|
|
||||||
## Changed
|
## Changed
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
"""Authentication Backends for the Impress core app."""
|
"""Authentication Backends for the Impress core app."""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.exceptions import SuspiciousOperation
|
from django.core.exceptions import SuspiciousOperation
|
||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
@@ -11,6 +13,8 @@ from mozilla_django_oidc.auth import (
|
|||||||
|
|
||||||
from core.models import User
|
from core.models import User
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
||||||
"""Custom OpenID Connect (OIDC) Authentication Backend.
|
"""Custom OpenID Connect (OIDC) Authentication Backend.
|
||||||
@@ -57,24 +61,31 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
|||||||
_("Invalid response format or token verification failed")
|
_("Invalid response format or token verification failed")
|
||||||
) from e
|
) from e
|
||||||
|
|
||||||
# Validate required claims
|
|
||||||
missing_claims = [
|
|
||||||
claim
|
|
||||||
for claim in settings.USER_OIDC_REQUIRED_CLAIMS
|
|
||||||
if claim not in userinfo
|
|
||||||
]
|
|
||||||
if missing_claims:
|
|
||||||
raise SuspiciousOperation(
|
|
||||||
_("Missing required claims in user info: %(claims)s")
|
|
||||||
% {"claims": ", ".join(missing_claims)}
|
|
||||||
)
|
|
||||||
|
|
||||||
return userinfo
|
return userinfo
|
||||||
|
|
||||||
|
def verify_claims(self, claims):
|
||||||
|
"""
|
||||||
|
Verify the presence of essential claims and the "sub" (which is mandatory as defined
|
||||||
|
by the OIDC specification) to decide if authentication should be allowed.
|
||||||
|
"""
|
||||||
|
essential_claims = settings.USER_OIDC_ESSENTIAL_CLAIMS
|
||||||
|
missing_claims = [claim for claim in essential_claims if claim not in claims]
|
||||||
|
|
||||||
|
if missing_claims:
|
||||||
|
logger.error("Missing essential claims: %s", missing_claims)
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
def get_or_create_user(self, access_token, id_token, payload):
|
def get_or_create_user(self, access_token, id_token, payload):
|
||||||
"""Return a User based on userinfo. Create a new user if no match is found."""
|
"""Return a User based on userinfo. Create a new user if no match is found."""
|
||||||
|
|
||||||
user_info = self.get_userinfo(access_token, id_token, payload)
|
user_info = self.get_userinfo(access_token, id_token, payload)
|
||||||
|
|
||||||
|
if not self.verify_claims(user_info):
|
||||||
|
raise SuspiciousOperation("Claims verification failed.")
|
||||||
|
|
||||||
|
sub = user_info["sub"]
|
||||||
email = user_info.get("email")
|
email = user_info.get("email")
|
||||||
|
|
||||||
# Get user's full name from OIDC fields defined in settings
|
# Get user's full name from OIDC fields defined in settings
|
||||||
@@ -87,12 +98,6 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
|||||||
"short_name": short_name,
|
"short_name": short_name,
|
||||||
}
|
}
|
||||||
|
|
||||||
sub = user_info.get("sub")
|
|
||||||
if not sub:
|
|
||||||
raise SuspiciousOperation(
|
|
||||||
_("User info contained no recognizable user identification")
|
|
||||||
)
|
|
||||||
|
|
||||||
user = self.get_existing_user(sub, email)
|
user = self.get_existing_user(sub, email)
|
||||||
|
|
||||||
if user:
|
if user:
|
||||||
@@ -113,15 +118,13 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
|||||||
return full_name or None
|
return full_name or None
|
||||||
|
|
||||||
def get_existing_user(self, sub, email):
|
def get_existing_user(self, sub, email):
|
||||||
"""Fetch existing user by sub or email."""
|
"""Fetch an existing user by sub (or email as a fallback respecting fallback setting."""
|
||||||
try:
|
try:
|
||||||
return User.objects.get(sub=sub)
|
return User.objects.get(sub=sub)
|
||||||
except User.DoesNotExist:
|
except User.DoesNotExist:
|
||||||
if email and settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION:
|
if email and settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION:
|
||||||
try:
|
return User.objects.filter(email=email).first()
|
||||||
return User.objects.get(email=email)
|
|
||||||
except User.DoesNotExist:
|
|
||||||
pass
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def update_user_if_needed(self, user, claims):
|
def update_user_if_needed(self, user, claims):
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
"""Unit tests for the Authentication Backends."""
|
"""Unit tests for the Authentication Backends."""
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
from logging import Logger
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
from django.core.exceptions import SuspiciousOperation
|
from django.core.exceptions import SuspiciousOperation
|
||||||
from django.test.utils import override_settings
|
from django.test.utils import override_settings
|
||||||
@@ -213,29 +215,6 @@ def test_authentication_getter_new_user_with_email(monkeypatch):
|
|||||||
assert models.User.objects.count() == 1
|
assert models.User.objects.count() == 1
|
||||||
|
|
||||||
|
|
||||||
def test_authentication_getter_invalid_token(django_assert_num_queries, monkeypatch):
|
|
||||||
"""The user's info doesn't contain a sub."""
|
|
||||||
klass = OIDCAuthenticationBackend()
|
|
||||||
|
|
||||||
def get_userinfo_mocked(*args):
|
|
||||||
return {
|
|
||||||
"test": "123",
|
|
||||||
}
|
|
||||||
|
|
||||||
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
|
|
||||||
|
|
||||||
with (
|
|
||||||
django_assert_num_queries(0),
|
|
||||||
pytest.raises(
|
|
||||||
SuspiciousOperation,
|
|
||||||
match="User info contained no recognizable user identification",
|
|
||||||
),
|
|
||||||
):
|
|
||||||
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
|
|
||||||
|
|
||||||
assert models.User.objects.exists() is False
|
|
||||||
|
|
||||||
|
|
||||||
@override_settings(OIDC_OP_USER_ENDPOINT="http://oidc.endpoint.test/userinfo")
|
@override_settings(OIDC_OP_USER_ENDPOINT="http://oidc.endpoint.test/userinfo")
|
||||||
@responses.activate
|
@responses.activate
|
||||||
def test_authentication_get_userinfo_json_response():
|
def test_authentication_get_userinfo_json_response():
|
||||||
@@ -341,7 +320,7 @@ def test_authentication_getter_existing_disabled_user_via_email(
|
|||||||
django_assert_num_queries, monkeypatch
|
django_assert_num_queries, monkeypatch
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
If an existing user does not matches the sub but matches the email and is disabled,
|
If an existing user does not match the sub but matches the email and is disabled,
|
||||||
an error should be raised and a user should not be created.
|
an error should be raised and a user should not be created.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -367,85 +346,100 @@ def test_authentication_getter_existing_disabled_user_via_email(
|
|||||||
assert models.User.objects.count() == 1
|
assert models.User.objects.count() == 1
|
||||||
|
|
||||||
|
|
||||||
# Required claims
|
# Essential claims
|
||||||
|
|
||||||
|
|
||||||
@override_settings(
|
def test_authentication_verify_claims_default(django_assert_num_queries, monkeypatch):
|
||||||
OIDC_OP_USER_ENDPOINT="http://oidc.endpoint.test/userinfo",
|
"""The sub claim should be mandatory by default."""
|
||||||
USER_OIDC_REQUIRED_CLAIMS=["email", "sub", "address"],
|
klass = OIDCAuthenticationBackend()
|
||||||
)
|
|
||||||
@responses.activate
|
|
||||||
def test_authentication_get_userinfo_required_claims_missing():
|
|
||||||
"""Ensure SuspiciousOperation is raised if required claims are missing."""
|
|
||||||
|
|
||||||
responses.add(
|
def get_userinfo_mocked(*args):
|
||||||
responses.GET,
|
return {
|
||||||
re.compile(r".*/userinfo"),
|
"test": "123",
|
||||||
json={
|
}
|
||||||
"last_name": "Doe",
|
|
||||||
"email": "john.doe@example.com",
|
|
||||||
},
|
|
||||||
status=200,
|
|
||||||
)
|
|
||||||
|
|
||||||
oidc_backend = OIDCAuthenticationBackend()
|
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
|
||||||
|
|
||||||
with pytest.raises(
|
with (
|
||||||
SuspiciousOperation, match="Missing required claims in user info: sub, address"
|
django_assert_num_queries(0),
|
||||||
|
pytest.raises(
|
||||||
|
KeyError,
|
||||||
|
match="sub",
|
||||||
|
),
|
||||||
):
|
):
|
||||||
oidc_backend.get_userinfo("fake_access_token", None, None)
|
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
|
||||||
|
|
||||||
|
assert models.User.objects.exists() is False
|
||||||
|
|
||||||
|
|
||||||
@override_settings(
|
@pytest.mark.parametrize(
|
||||||
OIDC_OP_USER_ENDPOINT="http://oidc.endpoint.test/userinfo",
|
"essential_claims, missing_claims",
|
||||||
USER_OIDC_REQUIRED_CLAIMS=["email", "Sub"],
|
[
|
||||||
|
(["email", "sub"], ["email"]),
|
||||||
|
(["Email", "sub"], ["Email"]), # Case sensitivity
|
||||||
|
],
|
||||||
)
|
)
|
||||||
@responses.activate
|
@override_settings(OIDC_OP_USER_ENDPOINT="http://oidc.endpoint.test/userinfo")
|
||||||
def test_authentication_get_userinfo_required_claims_case_sensitivity():
|
@mock.patch.object(Logger, "error")
|
||||||
"""Ensure the system respects case sensitivity for required claims."""
|
def test_authentication_verify_claims_essential_missing(
|
||||||
|
mock_logger,
|
||||||
|
essential_claims,
|
||||||
|
missing_claims,
|
||||||
|
django_assert_num_queries,
|
||||||
|
monkeypatch,
|
||||||
|
):
|
||||||
|
"""Ensure SuspiciousOperation is raised if essential claims are missing."""
|
||||||
|
|
||||||
responses.add(
|
klass = OIDCAuthenticationBackend()
|
||||||
responses.GET,
|
|
||||||
re.compile(r".*/userinfo"),
|
def get_userinfo_mocked(*args):
|
||||||
json={
|
return {
|
||||||
"sub": "123",
|
"sub": "123",
|
||||||
"last_name": "Doe",
|
"last_name": "Doe",
|
||||||
"email": "john.doe@example.com",
|
}
|
||||||
},
|
|
||||||
status=200,
|
|
||||||
)
|
|
||||||
|
|
||||||
oidc_backend = OIDCAuthenticationBackend()
|
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
|
||||||
|
|
||||||
with pytest.raises(
|
with (
|
||||||
SuspiciousOperation, match="Missing required claims in user info: Sub"
|
django_assert_num_queries(0),
|
||||||
|
pytest.raises(
|
||||||
|
SuspiciousOperation,
|
||||||
|
match="Claims verification failed",
|
||||||
|
),
|
||||||
|
override_settings(USER_OIDC_ESSENTIAL_CLAIMS=essential_claims),
|
||||||
):
|
):
|
||||||
oidc_backend.get_userinfo("fake_access_token", None, None)
|
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
|
||||||
|
|
||||||
|
assert models.User.objects.exists() is False
|
||||||
|
mock_logger.assert_called_once_with("Missing essential claims: %s", missing_claims)
|
||||||
|
|
||||||
|
|
||||||
@override_settings(
|
@override_settings(
|
||||||
OIDC_OP_USER_ENDPOINT="http://oidc.endpoint.test/userinfo",
|
OIDC_OP_USER_ENDPOINT="http://oidc.endpoint.test/userinfo",
|
||||||
USER_OIDC_REQUIRED_CLAIMS=["email", "sub"],
|
USER_OIDC_ESSENTIAL_CLAIMS=["email", "last_name"],
|
||||||
)
|
)
|
||||||
@responses.activate
|
def test_authentication_verify_claims_success(django_assert_num_queries, monkeypatch):
|
||||||
def test_authentication_get_userinfo_required_claims_success():
|
"""Ensure user is authenticated when all essential claims are present."""
|
||||||
"""Ensure user is authenticated when required claims are present."""
|
|
||||||
|
|
||||||
responses.add(
|
klass = OIDCAuthenticationBackend()
|
||||||
responses.GET,
|
|
||||||
re.compile(r".*/userinfo"),
|
def get_userinfo_mocked(*args):
|
||||||
json={
|
return {
|
||||||
"sub": "123",
|
|
||||||
"last_name": "Doe",
|
|
||||||
"email": "john.doe@example.com",
|
"email": "john.doe@example.com",
|
||||||
},
|
"last_name": "Doe",
|
||||||
status=200,
|
"sub": "123",
|
||||||
)
|
}
|
||||||
|
|
||||||
oidc_backend = OIDCAuthenticationBackend()
|
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
|
||||||
result = oidc_backend.get_userinfo("fake_access_token", None, None)
|
|
||||||
|
|
||||||
assert result["sub"] == "123"
|
with django_assert_num_queries(6):
|
||||||
assert result.get("first_name") is None
|
user = klass.get_or_create_user(
|
||||||
assert result["last_name"] == "Doe"
|
access_token="test-token", id_token=None, payload=None
|
||||||
assert result["email"] == "john.doe@example.com"
|
)
|
||||||
|
|
||||||
|
assert models.User.objects.filter(id=user.id).exists()
|
||||||
|
|
||||||
|
assert user.sub == "123"
|
||||||
|
assert user.full_name == "Doe"
|
||||||
|
assert user.short_name is None
|
||||||
|
assert user.email == "john.doe@example.com"
|
||||||
|
|||||||
@@ -474,8 +474,8 @@ class Base(Configuration):
|
|||||||
environ_prefix=None,
|
environ_prefix=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
USER_OIDC_REQUIRED_CLAIMS = values.ListValue(
|
USER_OIDC_ESSENTIAL_CLAIMS = values.ListValue(
|
||||||
default=[], environ_name="USER_OIDC_REQUIRED_CLAIMS", environ_prefix=None
|
default=[], environ_name="USER_OIDC_ESSENTIAL_CLAIMS", environ_prefix=None
|
||||||
)
|
)
|
||||||
USER_OIDC_FIELDS_TO_FULLNAME = values.ListValue(
|
USER_OIDC_FIELDS_TO_FULLNAME = values.ListValue(
|
||||||
default=["first_name", "last_name"],
|
default=["first_name", "last_name"],
|
||||||
|
|||||||
Reference in New Issue
Block a user