✨(models) make user and authentication work with Keycloak and admin
The admin was broken as we did not worry about it up to now. On the frontend we want to use OIDC authentication only but for the admin, it is better if the default authentication works as well. To allow this, we propose to add an "email" field to the user model and make it the identifier in place of the usual username. Some changes are necessary to make the "createsuperuser" management command work. We also had to fix the "oidc_user_getter" method to make it work with Keycloak. Some tests were added to secure that everything works as expected.
This commit is contained in:
committed by
Anthony LC
parent
e1688b923e
commit
8b026078bc
2
Makefile
2
Makefile
@@ -177,7 +177,7 @@ migrate: ## run django migrations for the people project.
|
||||
|
||||
superuser: ## Create an admin superuser with password "admin"
|
||||
@echo "$(BOLD)Creating a Django superuser$(RESET)"
|
||||
@$(MANAGE) createsuperuser --username admin --email admin@example.com --no-input
|
||||
@$(MANAGE) createsuperuser --email admin@example.com --password admin
|
||||
.PHONY: superuser
|
||||
|
||||
back-i18n-compile: ## compile the gettext files
|
||||
|
||||
@@ -124,6 +124,7 @@ class UserFactory(factory.django.DjangoModelFactory):
|
||||
class Meta:
|
||||
model = models.User
|
||||
|
||||
email = factory.Faker("email")
|
||||
language = factory.fuzzy.FuzzyChoice([lang[0] for lang in settings.LANGUAGES])
|
||||
password = make_password("password")
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Generated by Django 5.0 on 2023-12-31 17:11
|
||||
# Generated by Django 5.0 on 2024-01-14 13:41
|
||||
|
||||
import core.models
|
||||
import django.core.validators
|
||||
@@ -44,6 +44,7 @@ class Migration(migrations.Migration):
|
||||
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
|
||||
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
|
||||
('email', models.EmailField(blank=True, max_length=254, null=True, unique=True, verbose_name='email address')),
|
||||
('language', models.CharField(choices="(('en-us', 'English'), ('fr-fr', 'French'))", default='en-us', help_text='The language in which the user wants to see the interface.', max_length=10, verbose_name='language')),
|
||||
('timezone', timezone_field.fields.TimeZoneField(choices_display='WITH_GMT_OFFSET', default='UTC', help_text='The timezone in which the user wants to see times.', use_pytz=False)),
|
||||
('is_device', models.BooleanField(default=False, help_text='Whether the user is a device or a real user.', verbose_name='device')),
|
||||
@@ -92,7 +93,7 @@ class Migration(migrations.Migration):
|
||||
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
|
||||
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
|
||||
('sub', models.CharField(help_text='Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_ characters only.', max_length=255, unique=True, validators=[django.core.validators.RegexValidator(message='Enter a valid sub. This value may contain only letters, numbers, and @/./+/-/_ characters.', regex='^[\\w.@+-]+\\Z')], verbose_name='sub')),
|
||||
('email', models.EmailField(max_length=254, verbose_name='email address')),
|
||||
('email', models.EmailField(blank=True, max_length=254, null=True, verbose_name='email address')),
|
||||
('is_main', models.BooleanField(default=False, help_text='Designates whether the email is the main one.', verbose_name='main')),
|
||||
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='identities', to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
|
||||
@@ -159,6 +159,7 @@ class UserManager(auth_models.UserManager):
|
||||
class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
|
||||
"""User model to work with OIDC only authentication."""
|
||||
|
||||
email = models.EmailField(_("email address"), unique=True, null=True, blank=True)
|
||||
profile_contact = models.OneToOneField(
|
||||
Contact,
|
||||
on_delete=models.SET_NULL,
|
||||
@@ -200,7 +201,7 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
|
||||
|
||||
objects = UserManager()
|
||||
|
||||
USERNAME_FIELD = "id"
|
||||
USERNAME_FIELD = "email"
|
||||
REQUIRED_FIELDS = []
|
||||
|
||||
class Meta:
|
||||
@@ -209,7 +210,11 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
|
||||
verbose_name_plural = _("users")
|
||||
|
||||
def __str__(self):
|
||||
return str(self.profile_contact) if self.profile_contact else str(self.id)
|
||||
return (
|
||||
str(self.profile_contact)
|
||||
if self.profile_contact
|
||||
else self.email or str(self.id)
|
||||
)
|
||||
|
||||
def clean(self):
|
||||
"""Validate fields."""
|
||||
@@ -257,7 +262,7 @@ class Identity(BaseModel):
|
||||
unique=True,
|
||||
validators=[sub_validator],
|
||||
)
|
||||
email = models.EmailField(_("email address"))
|
||||
email = models.EmailField(_("email address"), null=True, blank=True)
|
||||
is_main = models.BooleanField(
|
||||
_("main"),
|
||||
default=False,
|
||||
@@ -286,7 +291,8 @@ class Identity(BaseModel):
|
||||
|
||||
def clean(self):
|
||||
"""Normalize the email field and clean the 'is_main' field."""
|
||||
self.email = User.objects.normalize_email(self.email)
|
||||
if self.email:
|
||||
self.email = User.objects.normalize_email(self.email)
|
||||
if not self.user.identities.exclude(pk=self.pk).filter(is_main=True).exists():
|
||||
if not self.created_at:
|
||||
self.is_main = True
|
||||
@@ -452,20 +458,21 @@ def oidc_user_getter(validated_token):
|
||||
) from exc
|
||||
|
||||
try:
|
||||
user = User.objects.select_related("profile_contact").get(
|
||||
**{api_settings.USER_ID_FIELD: user_id}
|
||||
)
|
||||
except User.DoesNotExist:
|
||||
contact = Contact.objects.create()
|
||||
user = User.objects.create(
|
||||
**{api_settings.USER_ID_FIELD: user_id}, profile_contact=contact
|
||||
)
|
||||
email_param = {"email": validated_token["email"]}
|
||||
except KeyError:
|
||||
email_param = {}
|
||||
|
||||
# If the identity in the token is seen for the first time, make it the main email.
|
||||
# Otherwise, update the email and respect the main identity set by the user
|
||||
if email := validated_token["email"]:
|
||||
Identity.objects.update_or_create(
|
||||
user=user, email=email, create_defaults={"is_main": True}
|
||||
)
|
||||
user = (
|
||||
User.objects.filter(identities__sub=user_id)
|
||||
.annotate(identity_email=models.F("identities__email"))
|
||||
.distinct()
|
||||
.first()
|
||||
)
|
||||
|
||||
if user is None:
|
||||
user = User.objects.create(password="!", **email_param) # noqa: S106
|
||||
Identity.objects.create(user=user, sub=user_id, **email_param)
|
||||
elif email_param and validated_token["email"] != user.identity_email:
|
||||
Identity.objects.filter(sub=user_id).update(email=validated_token["email"])
|
||||
|
||||
return user
|
||||
|
||||
@@ -69,11 +69,10 @@ def test_models_identities_is_main_switch():
|
||||
assert first_identity.is_main is False
|
||||
|
||||
|
||||
def test_models_identities_email_required():
|
||||
"""The "email" field is required."""
|
||||
def test_models_identities_email_not_required():
|
||||
"""The "email" field can be blank."""
|
||||
user = factories.UserFactory()
|
||||
with pytest.raises(ValidationError, match="This field cannot be null."):
|
||||
models.Identity.objects.create(user=user, email=None)
|
||||
models.Identity.objects.create(user=user, sub="123", email=None)
|
||||
|
||||
|
||||
def test_models_identities_user_required():
|
||||
|
||||
107
src/backend/core/tests/test_models_oidc_user_getter.py
Normal file
107
src/backend/core/tests/test_models_oidc_user_getter.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""Unit tests for the `oidc_user_getter` function."""
|
||||
import pytest
|
||||
from rest_framework_simplejwt.exceptions import InvalidToken
|
||||
from rest_framework_simplejwt.tokens import AccessToken
|
||||
|
||||
from core import factories, models
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
def test_models_oidc_user_getter_exsting_user_no_email(django_assert_num_queries):
|
||||
"""
|
||||
When a valid token is passed, an existing user matching the token's sub should be returned.
|
||||
"""
|
||||
identity = factories.IdentityFactory()
|
||||
factories.IdentityFactory(user=identity.user) # another identity for the user
|
||||
token = AccessToken()
|
||||
token["sub"] = str(identity.sub)
|
||||
|
||||
with django_assert_num_queries(1):
|
||||
user = models.oidc_user_getter(token)
|
||||
|
||||
identity.refresh_from_db()
|
||||
assert user == identity.user
|
||||
|
||||
|
||||
def test_models_oidc_user_getter_existing_user_with_email(django_assert_num_queries):
|
||||
"""
|
||||
When the valid token passed contains an email and targets an existing user,
|
||||
it should update the email on the identity but not on the user.
|
||||
"""
|
||||
identity = factories.IdentityFactory()
|
||||
factories.IdentityFactory(user=identity.user) # another identity for the user
|
||||
user_email = identity.user.email
|
||||
assert models.User.objects.count() == 1
|
||||
|
||||
token = AccessToken()
|
||||
token["sub"] = str(identity.sub)
|
||||
|
||||
# Only 1 query if the email has not changed
|
||||
token["email"] = identity.email
|
||||
with django_assert_num_queries(1):
|
||||
user = models.oidc_user_getter(token)
|
||||
|
||||
# Additional update query if the email has changed
|
||||
new_email = "people@example.com"
|
||||
token["email"] = new_email
|
||||
with django_assert_num_queries(2):
|
||||
user = models.oidc_user_getter(token)
|
||||
|
||||
identity.refresh_from_db()
|
||||
assert identity.email == new_email
|
||||
|
||||
assert models.User.objects.count() == 1
|
||||
assert user == identity.user
|
||||
assert user.email == user_email
|
||||
|
||||
|
||||
def test_models_oidc_user_getter_new_user_no_email():
|
||||
"""
|
||||
When a valid token is passed, a user should be created if the sub
|
||||
does not match any existing user.
|
||||
"""
|
||||
token = AccessToken()
|
||||
token["sub"] = "123"
|
||||
|
||||
user = models.oidc_user_getter(token)
|
||||
|
||||
identity = user.identities.get()
|
||||
assert identity.sub == "123"
|
||||
assert identity.email is None
|
||||
|
||||
assert user.email is None
|
||||
assert user.password == "!"
|
||||
assert models.User.objects.count() == 1
|
||||
|
||||
|
||||
def test_models_oidc_user_getter_new_user_with_email():
|
||||
"""
|
||||
When the valid token passed contains an email and a new user is created,
|
||||
the email should be set on the user and on the identity.
|
||||
"""
|
||||
email = "people@example.com"
|
||||
token = AccessToken()
|
||||
token["sub"] = "123"
|
||||
token["email"] = email
|
||||
|
||||
user = models.oidc_user_getter(token)
|
||||
|
||||
identity = user.identities.get()
|
||||
assert identity.sub == "123"
|
||||
assert identity.email == email
|
||||
|
||||
assert user.email == email
|
||||
assert models.User.objects.count() == 1
|
||||
|
||||
|
||||
def test_models_oidc_user_getter_invalid_token(django_assert_num_queries):
|
||||
"""The token passed in argument should contain the configured user id claim."""
|
||||
token = AccessToken()
|
||||
|
||||
with django_assert_num_queries(0), pytest.raises(
|
||||
InvalidToken, match="Token contained no recognizable user identification"
|
||||
):
|
||||
models.oidc_user_getter(token)
|
||||
|
||||
assert models.User.objects.exists() is False
|
||||
@@ -29,6 +29,23 @@ def test_models_users_id_unique():
|
||||
factories.UserFactory(id=user.id)
|
||||
|
||||
|
||||
def test_models_users_email_unique():
|
||||
"""The "email" field should be unique except for the null value."""
|
||||
user = factories.UserFactory()
|
||||
with pytest.raises(
|
||||
ValidationError, match="User with this Email address already exists."
|
||||
):
|
||||
factories.UserFactory(email=user.email)
|
||||
|
||||
|
||||
def test_models_users_email_several_null():
|
||||
"""Several users with a null value for the "email" field can co-exist."""
|
||||
factories.UserFactory(email=None)
|
||||
factories.UserFactory(email=None)
|
||||
|
||||
assert models.User.objects.filter(email__isnull=True).count() == 2
|
||||
|
||||
|
||||
def test_models_users_profile_not_owned():
|
||||
"""A user cannot declare as profile a contact that not is owned."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
@@ -7,8 +7,10 @@ class OIDCToken(AccessToken):
|
||||
|
||||
@classmethod
|
||||
def for_user(cls, user):
|
||||
token = super().for_user(user)
|
||||
"""Returns an authorization token for the given user for testing."""
|
||||
identity = user.identities.filter(is_main=True).first()
|
||||
|
||||
token = cls()
|
||||
token["first_name"] = (
|
||||
user.profile_contact.short_name if user.profile_contact else "David"
|
||||
)
|
||||
@@ -17,5 +19,7 @@ class OIDCToken(AccessToken):
|
||||
if user.profile_contact
|
||||
else "Bowman"
|
||||
)
|
||||
token["email"] = identity.email
|
||||
token["sub"] = str(identity.sub)
|
||||
token["email"] = user.email
|
||||
|
||||
return token
|
||||
|
||||
10
src/backend/core/tokens.py
Normal file
10
src/backend/core/tokens.py
Normal file
@@ -0,0 +1,10 @@
|
||||
"""Tokens for Magnify's core app."""
|
||||
from rest_framework_simplejwt.settings import api_settings
|
||||
from rest_framework_simplejwt.tokens import Token
|
||||
|
||||
|
||||
class BearerToken(Token):
|
||||
"""Bearer token as emitted by Keycloak OIDC for example."""
|
||||
|
||||
token_type = "Bearer" # noqa: S105
|
||||
lifetime = api_settings.ACCESS_TOKEN_LIFETIME
|
||||
@@ -239,17 +239,28 @@ class Base(Configuration):
|
||||
"REDOC_DIST": "SIDECAR",
|
||||
}
|
||||
|
||||
# Simple JWT
|
||||
SIMPLE_JWT = {
|
||||
"ALGORITHM": values.Value("HS256", environ_name="JWT_ALGORITHM"),
|
||||
"SIGNING_KEY": values.SecretValue(
|
||||
environ_name="JWT_PRIVATE_SIGNING_KEY",
|
||||
"ALGORITHM": values.Value(
|
||||
"RS256", environ_name="SIMPLE_JWT_ALGORITHM", environ_prefix=None
|
||||
),
|
||||
"JWK_URL": values.Value(
|
||||
None, environ_name="SIMPLE_JWT_JWK_URL", environ_prefix=None
|
||||
),
|
||||
"SIGNING_KEY": values.Value(
|
||||
None, environ_name="SIMPLE_JWT_SIGNING_KEY", environ_prefix=None
|
||||
),
|
||||
"VERIFYING_KEY": values.Value(
|
||||
None, environ_name="SIMPLE_JWT_VERIFYING_KEY", environ_prefix=None
|
||||
),
|
||||
"AUTH_HEADER_TYPES": ("Bearer",),
|
||||
"AUTH_HEADER_NAME": "HTTP_AUTHORIZATION",
|
||||
"USER_ID_FIELD": "id",
|
||||
"TOKEN_TYPE_CLAIM": "typ",
|
||||
"USER_ID_FIELD": "sub",
|
||||
"USER_ID_CLAIM": "sub",
|
||||
"AUTH_TOKEN_CLASSES": ("rest_framework_simplejwt.tokens.AccessToken",),
|
||||
"AUTH_TOKEN_CLASSES": ("core.tokens.BearerToken",),
|
||||
}
|
||||
|
||||
JWT_USER_GETTER = values.Value(
|
||||
"core.models.oidc_user_getter",
|
||||
environ_name="PEOPLE_JWT_USER_GETTER",
|
||||
@@ -380,6 +391,10 @@ class Development(Base):
|
||||
class Test(Base):
|
||||
"""Test environment settings"""
|
||||
|
||||
SIMPLE_JWT = {
|
||||
"USER_ID_FIELD": "sub",
|
||||
"USER_ID_CLAIM": "sub",
|
||||
}
|
||||
LOGGING = values.DictValue(
|
||||
{
|
||||
"version": 1,
|
||||
@@ -413,10 +428,6 @@ class Test(Base):
|
||||
|
||||
CELERY_TASK_ALWAYS_EAGER = values.BooleanValue(True)
|
||||
|
||||
def __init__(self):
|
||||
# pylint: disable=invalid-name
|
||||
self.INSTALLED_APPS += ["drf_spectacular_sidecar"]
|
||||
|
||||
|
||||
class ContinuousIntegration(Test):
|
||||
"""
|
||||
|
||||
@@ -35,7 +35,7 @@ dependencies = [
|
||||
"django-storages==1.14.2",
|
||||
"django-timezone-field>=5.1",
|
||||
"django==5.0",
|
||||
"djangorestframework-simplejwt==5.3.0",
|
||||
"djangorestframework-simplejwt[crypto]==5.3.0",
|
||||
"djangorestframework==3.14.0",
|
||||
"drf_spectacular==0.26.5",
|
||||
"dockerflow==2022.8.0",
|
||||
|
||||
Reference in New Issue
Block a user