🛂(backend) match email if no existing user matches the sub
Some OIDC identity providers may provide a random value in the "sub" field instead of an identifying ID. In this case, it may be a good idea to fallback to matching the user on its email field.
This commit is contained in:
committed by
Samuel Paccoud
parent
647e6c1cf5
commit
ff7914f6d3
@@ -25,6 +25,7 @@ and this project adheres to
|
|||||||
|
|
||||||
## Fixed
|
## Fixed
|
||||||
|
|
||||||
|
- 🛂(frontend) match email if no existing user matches the sub
|
||||||
- 🐛(backend) gitlab oicd userinfo endpoint #232
|
- 🐛(backend) gitlab oicd userinfo endpoint #232
|
||||||
- 🛂(frontend) redirect to the OIDC when private doc and unauthentified #292
|
- 🛂(frontend) redirect to the OIDC when private doc and unauthentified #292
|
||||||
- ♻️(backend) getting list of document versions available for a user #258
|
- ♻️(backend) getting list of document versions available for a user #258
|
||||||
|
|||||||
@@ -60,57 +60,61 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
|||||||
return userinfo
|
return userinfo
|
||||||
|
|
||||||
def get_or_create_user(self, access_token, id_token, payload):
|
def get_or_create_user(self, access_token, id_token, payload):
|
||||||
"""Return a User based on userinfo. Get or create a new user if no user matches the Sub.
|
"""Return a User based on userinfo. Create a new user if no match is found."""
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- access_token (str): The access token.
|
|
||||||
- id_token (str): The ID token.
|
|
||||||
- payload (dict): The user payload.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
- User: An existing or newly created User instance.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
- Exception: Raised when user creation is not allowed and no existing user is found.
|
|
||||||
"""
|
|
||||||
|
|
||||||
user_info = self.get_userinfo(access_token, id_token, payload)
|
user_info = self.get_userinfo(access_token, id_token, payload)
|
||||||
|
email = user_info.get("email")
|
||||||
|
|
||||||
|
# Get user's full name from OIDC fields defined in settings
|
||||||
|
full_name = self.compute_full_name(user_info)
|
||||||
|
short_name = user_info.get(settings.USER_OIDC_FIELD_TO_SHORTNAME)
|
||||||
|
|
||||||
# Compute user full name from OIDC name fields as defined in settings
|
|
||||||
names_list = [
|
|
||||||
user_info[field]
|
|
||||||
for field in settings.USER_OIDC_FIELDS_TO_FULLNAME
|
|
||||||
if user_info.get(field)
|
|
||||||
]
|
|
||||||
claims = {
|
claims = {
|
||||||
"email": user_info.get("email"),
|
"email": email,
|
||||||
"full_name": " ".join(names_list) or None,
|
"full_name": full_name,
|
||||||
"short_name": user_info.get(settings.USER_OIDC_FIELD_TO_SHORTNAME),
|
"short_name": short_name,
|
||||||
}
|
}
|
||||||
|
|
||||||
sub = user_info.get("sub")
|
sub = user_info.get("sub")
|
||||||
if sub is None:
|
if not sub:
|
||||||
raise SuspiciousOperation(
|
raise SuspiciousOperation(
|
||||||
_("User info contained no recognizable user identification")
|
_("User info contained no recognizable user identification")
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
user = self.get_existing_user(sub, email)
|
||||||
user = User.objects.get(sub=sub, is_active=True)
|
|
||||||
except User.DoesNotExist:
|
if user:
|
||||||
if self.get_settings("OIDC_CREATE_USER", True):
|
self.update_user_if_needed(user, claims)
|
||||||
user = User.objects.create(
|
elif self.get_settings("OIDC_CREATE_USER", True):
|
||||||
sub=sub,
|
user = User.objects.create(sub=sub, password="!", **claims) # noqa: S106
|
||||||
password="!", # noqa: S106
|
|
||||||
**claims,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
user = None
|
|
||||||
else:
|
|
||||||
has_changed = any(
|
|
||||||
value and value != getattr(user, key) for key, value in claims.items()
|
|
||||||
)
|
|
||||||
if has_changed:
|
|
||||||
updated_claims = {key: value for key, value in claims.items() if value}
|
|
||||||
self.UserModel.objects.filter(sub=sub).update(**updated_claims)
|
|
||||||
|
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
def compute_full_name(self, user_info):
|
||||||
|
"""Compute user's full name based on OIDC fields in settings."""
|
||||||
|
name_fields = settings.USER_OIDC_FIELDS_TO_FULLNAME
|
||||||
|
full_name = " ".join(
|
||||||
|
user_info[field] for field in name_fields if user_info.get(field)
|
||||||
|
)
|
||||||
|
return full_name or None
|
||||||
|
|
||||||
|
def get_existing_user(self, sub, email):
|
||||||
|
"""Fetch existing user by sub or email."""
|
||||||
|
try:
|
||||||
|
return User.objects.get(sub=sub, is_active=True)
|
||||||
|
except User.DoesNotExist:
|
||||||
|
if email and settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION:
|
||||||
|
try:
|
||||||
|
return User.objects.get(email=email, is_active=True)
|
||||||
|
except User.DoesNotExist:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
def update_user_if_needed(self, user, claims):
|
||||||
|
"""Update user claims if they have changed."""
|
||||||
|
has_changed = any(
|
||||||
|
value and value != getattr(user, key) for key, value in claims.items()
|
||||||
|
)
|
||||||
|
if has_changed:
|
||||||
|
updated_claims = {key: value for key, value in claims.items() if value}
|
||||||
|
self.UserModel.objects.filter(sub=user.sub).update(**updated_claims)
|
||||||
|
|||||||
@@ -38,6 +38,59 @@ def test_authentication_getter_existing_user_no_email(
|
|||||||
assert user == db_user
|
assert user == db_user
|
||||||
|
|
||||||
|
|
||||||
|
def test_authentication_getter_existing_user_via_email(
|
||||||
|
django_assert_num_queries, monkeypatch
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
If an existing user doesn't match the sub but matches the email,
|
||||||
|
the user should be returned.
|
||||||
|
"""
|
||||||
|
|
||||||
|
klass = OIDCAuthenticationBackend()
|
||||||
|
db_user = UserFactory()
|
||||||
|
|
||||||
|
def get_userinfo_mocked(*args):
|
||||||
|
return {"sub": "123", "email": db_user.email}
|
||||||
|
|
||||||
|
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
|
||||||
|
|
||||||
|
with django_assert_num_queries(2):
|
||||||
|
user = klass.get_or_create_user(
|
||||||
|
access_token="test-token", id_token=None, payload=None
|
||||||
|
)
|
||||||
|
|
||||||
|
assert user == db_user
|
||||||
|
|
||||||
|
|
||||||
|
def test_authentication_getter_existing_user_no_fallback_to_email(
|
||||||
|
settings, monkeypatch
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
When the "OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION" setting is set to False,
|
||||||
|
the system should not match users by email, even if the email matches.
|
||||||
|
"""
|
||||||
|
|
||||||
|
klass = OIDCAuthenticationBackend()
|
||||||
|
db_user = UserFactory()
|
||||||
|
|
||||||
|
# Set the setting to False
|
||||||
|
settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION = False
|
||||||
|
|
||||||
|
def get_userinfo_mocked(*args):
|
||||||
|
return {"sub": "123", "email": db_user.email}
|
||||||
|
|
||||||
|
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
|
||||||
|
|
||||||
|
user = klass.get_or_create_user(
|
||||||
|
access_token="test-token", id_token=None, payload=None
|
||||||
|
)
|
||||||
|
|
||||||
|
# Since the sub doesn't match, it should create a new user
|
||||||
|
assert models.User.objects.count() == 2
|
||||||
|
assert user != db_user
|
||||||
|
assert user.sub == "123"
|
||||||
|
|
||||||
|
|
||||||
def test_authentication_getter_existing_user_with_email(
|
def test_authentication_getter_existing_user_with_email(
|
||||||
django_assert_num_queries, monkeypatch
|
django_assert_num_queries, monkeypatch
|
||||||
):
|
):
|
||||||
|
|||||||
@@ -384,6 +384,12 @@ class Base(Configuration):
|
|||||||
OIDC_STORE_ID_TOKEN = values.BooleanValue(
|
OIDC_STORE_ID_TOKEN = values.BooleanValue(
|
||||||
default=True, environ_name="OIDC_STORE_ID_TOKEN", environ_prefix=None
|
default=True, environ_name="OIDC_STORE_ID_TOKEN", environ_prefix=None
|
||||||
)
|
)
|
||||||
|
OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION = values.BooleanValue(
|
||||||
|
default=True,
|
||||||
|
environ_name="OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION",
|
||||||
|
environ_prefix=None,
|
||||||
|
)
|
||||||
|
|
||||||
ALLOW_LOGOUT_GET_METHOD = values.BooleanValue(
|
ALLOW_LOGOUT_GET_METHOD = values.BooleanValue(
|
||||||
default=True, environ_name="ALLOW_LOGOUT_GET_METHOD", environ_prefix=None
|
default=True, environ_name="ALLOW_LOGOUT_GET_METHOD", environ_prefix=None
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user