♻️(models) refactor user email fields

The email field on the user is renamed to "admin_email" for clarity. The
"email" and "name" fields of user's main identity are made available on
the user model so it is easier to access it.
This commit is contained in:
Samuel Paccoud - DINUM
2024-03-25 23:56:32 +01:00
committed by Marie
parent 6d807113bc
commit 7ea6342a01
12 changed files with 113 additions and 83 deletions

View File

@@ -183,7 +183,7 @@ migrate: ## run django migrations for the people project.
superuser: ## Create an admin superuser with password "admin" superuser: ## Create an admin superuser with password "admin"
@echo "$(BOLD)Creating a Django superuser$(RESET)" @echo "$(BOLD)Creating a Django superuser$(RESET)"
@$(MANAGE) createsuperuser --email admin@example.com --password admin @$(MANAGE) createsuperuser --admin_email admin@example.com --password admin
.PHONY: superuser .PHONY: superuser
back-i18n-compile: ## compile the gettext files back-i18n-compile: ## compile the gettext files

View File

@@ -67,7 +67,7 @@ class UserAdmin(auth_admin.UserAdmin):
) )
}, },
), ),
(_("Personal info"), {"fields": ("email", "language", "timezone")}), (_("Personal info"), {"fields": ("admin_email", "language", "timezone")}),
( (
_("Permissions"), _("Permissions"),
{ {
@@ -88,13 +88,13 @@ class UserAdmin(auth_admin.UserAdmin):
None, None,
{ {
"classes": ("wide",), "classes": ("wide",),
"fields": ("email", "password1", "password2"), "fields": ("admin_email", "password1", "password2"),
}, },
), ),
) )
inlines = (IdentityInline, TeamAccessInline) inlines = (IdentityInline, TeamAccessInline)
list_display = ( list_display = (
"email", "admin_email",
"created_at", "created_at",
"updated_at", "updated_at",
"is_active", "is_active",
@@ -105,7 +105,7 @@ class UserAdmin(auth_admin.UserAdmin):
list_filter = ("is_staff", "is_superuser", "is_device", "is_active") list_filter = ("is_staff", "is_superuser", "is_device", "is_active")
ordering = ("is_active", "-is_superuser", "-is_staff", "-is_device", "-updated_at") ordering = ("is_active", "-is_superuser", "-is_staff", "-is_device", "-updated_at")
readonly_fields = ("id", "created_at", "updated_at") readonly_fields = ("id", "created_at", "updated_at")
search_fields = ("id", "email", "identities__sub", "identities__email") search_fields = ("id", "admin_email", "identities__sub", "identities__email")
@admin.register(models.Team) @admin.register(models.Team)

View File

@@ -52,42 +52,22 @@ class UserSerializer(DynamicFieldsModelSerializer):
"""Serialize users.""" """Serialize users."""
timezone = TimeZoneSerializerField(use_pytz=False, required=True) timezone = TimeZoneSerializerField(use_pytz=False, required=True)
name = serializers.SerializerMethodField(read_only=True) email = serializers.ReadOnlyField()
email = serializers.SerializerMethodField(read_only=True) name = serializers.ReadOnlyField()
class Meta: class Meta:
model = models.User model = models.User
fields = [ fields = [
"id", "id",
"name",
"email", "email",
"language", "language",
"name",
"timezone", "timezone",
"is_device", "is_device",
"is_staff", "is_staff",
] ]
read_only_fields = ["id", "name", "email", "is_device", "is_staff"] read_only_fields = ["id", "name", "email", "is_device", "is_staff"]
def _get_main_identity_attr(self, obj, attribute_name):
"""Return the specified attribute of the main identity."""
try:
return getattr(obj.main_identity[0], attribute_name)
except TypeError:
return getattr(obj.main_identity, attribute_name)
except IndexError:
main_identity = obj.identities.filter(is_main=True).first()
return getattr(obj.main_identity, attribute_name) if main_identity else None
except AttributeError:
return None
def get_name(self, obj):
"""Return main identity's name."""
return self._get_main_identity_attr(obj, "name")
def get_email(self, obj):
"""Return main identity's email."""
return self._get_main_identity_attr(obj, "email")
class TeamAccessSerializer(serializers.ModelSerializer): class TeamAccessSerializer(serializers.ModelSerializer):
"""Serialize team accesses.""" """Serialize team accesses."""

View File

@@ -202,7 +202,7 @@ class UserViewSet(
Prefetch( Prefetch(
"identities", "identities",
queryset=models.Identity.objects.filter(is_main=True), queryset=models.Identity.objects.filter(is_main=True),
to_attr="main_identity", to_attr="_identities_main",
) )
) )
@@ -245,9 +245,6 @@ class UserViewSet(
Return information on currently logged user Return information on currently logged user
""" """
user = request.user user = request.user
user.main_identity = models.Identity.objects.filter(
user=user, is_main=True
).first()
return response.Response( return response.Response(
self.serializer_class(user, context={"request": request}).data self.serializer_class(user, context={"request": request}).data
) )
@@ -378,7 +375,7 @@ class TeamAccessViewSet(
Prefetch( Prefetch(
"user__identities", "user__identities",
queryset=models.Identity.objects.filter(is_main=True), queryset=models.Identity.objects.filter(is_main=True),
to_attr="main_identity", to_attr="_identities_main",
) )
) )
# Abilities are computed based on logged-in user's role and # Abilities are computed based on logged-in user's role and

View File

@@ -120,13 +120,13 @@ class ContactFactory(BaseContactFactory):
class UserFactory(factory.django.DjangoModelFactory): class UserFactory(factory.django.DjangoModelFactory):
"""A factory to random users for testing purposes.""" """A factory to create random users for testing purposes."""
class Meta: class Meta:
model = models.User model = models.User
django_get_or_create = ("email",) django_get_or_create = ("admin_email",)
email = factory.Faker("email") admin_email = factory.Faker("email")
language = factory.fuzzy.FuzzyChoice([lang[0] for lang in settings.LANGUAGES]) language = factory.fuzzy.FuzzyChoice([lang[0] for lang in settings.LANGUAGES])
password = make_password("password") password = make_password("password")

View File

@@ -74,7 +74,7 @@ class BaseModel(models.Model):
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
"""Call `full_clean` before saving.""" """Call `full_clean` before saving."""
self.full_clean() self.full_clean()
super().save(*args, **kwargs) return super().save(*args, **kwargs)
class Contact(BaseModel): class Contact(BaseModel):
@@ -157,7 +157,9 @@ class Contact(BaseModel):
class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin): class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
"""User model to work with OIDC only authentication.""" """User model to work with OIDC only authentication."""
email = models.EmailField(_("email address"), unique=True, null=True, blank=True) admin_email = models.EmailField(
_("admin email address"), unique=True, null=True, blank=True
)
profile_contact = models.OneToOneField( profile_contact = models.OneToOneField(
Contact, Contact,
on_delete=models.SET_NULL, on_delete=models.SET_NULL,
@@ -199,7 +201,7 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
objects = auth_models.UserManager() objects = auth_models.UserManager()
USERNAME_FIELD = "email" USERNAME_FIELD = "admin_email"
REQUIRED_FIELDS = [] REQUIRED_FIELDS = []
class Meta: class Meta:
@@ -211,9 +213,32 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
return ( return (
str(self.profile_contact) str(self.profile_contact)
if self.profile_contact if self.profile_contact
else self.email or str(self.id) else self.admin_email or str(self.id)
) )
def _get_identities_main(self):
"""Return a list with the main identity or an empty list."""
try:
return self._identities_main
except AttributeError:
return self.identities.filter(is_main=True)
@property
def name(self):
"""Return main identity's name."""
try:
return self._get_identities_main()[0].name
except IndexError:
return None
@property
def email(self):
"""Return main identity's email."""
try:
return self._get_identities_main()[0].email
except IndexError:
return None
def clean(self): def clean(self):
"""Validate fields.""" """Validate fields."""
super().clean() super().clean()
@@ -225,8 +250,10 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
def email_user(self, subject, message, from_email=None, **kwargs): def email_user(self, subject, message, from_email=None, **kwargs):
"""Email this user.""" """Email this user."""
main_identity = self.identities.get(is_main=True) email = self.email or self.admin_email
mail.send_mail(subject, message, from_email, [main_identity.email], **kwargs) if not email:
raise ValueError("You must first set an email for the user.")
mail.send_mail(subject, message, from_email, [email], **kwargs)
@classmethod @classmethod
def get_email_field_name(cls): def get_email_field_name(cls):
@@ -368,7 +395,7 @@ class Team(BaseModel):
return self.name return self.name
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
"""Overriding save function to compute the slug.""" """Override save function to compute the slug."""
self.slug = self.get_slug() self.slug = self.get_slug()
return super().save(*args, **kwargs) return super().save(*args, **kwargs)

View File

@@ -53,8 +53,8 @@ def test_api_users_authenticated_list_by_email():
Authenticated users should be able to search users with a case-insensitive and Authenticated users should be able to search users with a case-insensitive and
partial query on the email. partial query on the email.
""" """
user = factories.UserFactory(email="tester@ministry.fr") user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.email, name="john doe") factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
@@ -125,8 +125,8 @@ def test_api_users_authenticated_list_by_name():
Authenticated users should be able to search users with a case-insensitive and Authenticated users should be able to search users with a case-insensitive and
partial query on the name. partial query on the name.
""" """
user = factories.UserFactory(email="tester@ministry.fr") user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.email, name="john doe") factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
@@ -192,8 +192,8 @@ def test_api_users_authenticated_list_by_name_and_email():
partial query on the name and email. partial query on the name and email.
""" """
user = factories.UserFactory(email="tester@ministry.fr") user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.email, name="john doe") factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
@@ -225,7 +225,7 @@ def test_api_users_authenticated_list_exclude_users_already_in_team(
Authenticated users should be able to search users Authenticated users should be able to search users
but the result should exclude all users already in the given team. but the result should exclude all users already in the given team.
""" """
user = factories.UserFactory(email="tester@ministry.fr") user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.email, name="john doe") factories.IdentityFactory(user=user, email=user.email, name="john doe")
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
@@ -280,8 +280,8 @@ def test_api_users_authenticated_list_multiple_identities_single_user():
""" """
User with multiple identities should appear only once in results. User with multiple identities should appear only once in results.
""" """
user = factories.UserFactory(email="tester@ministry.fr") user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.email, name="eva karl") factories.IdentityFactory(user=user, email=user.admin_email, name="eva karl")
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
@@ -308,8 +308,8 @@ def test_api_users_authenticated_list_multiple_identities_multiple_users():
User with multiple identities should be ranked User with multiple identities should be ranked
on their best matching identity. on their best matching identity.
""" """
user = factories.UserFactory(email="tester@ministry.fr") user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.email, name="john doe") factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
@@ -368,8 +368,8 @@ def test_api_users_authenticated_list_multiple_identities_multiple_users():
def test_api_users_authenticated_list_uppercase_content(): def test_api_users_authenticated_list_uppercase_content():
"""Upper case content should be found by lower case query.""" """Upper case content should be found by lower case query."""
user = factories.UserFactory(email="tester@ministry.fr") user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.email, name="eva karl") factories.IdentityFactory(user=user, email=user.admin_email, name="eva karl")
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
@@ -399,8 +399,8 @@ def test_api_users_authenticated_list_uppercase_content():
def test_api_users_list_authenticated_capital_query(): def test_api_users_list_authenticated_capital_query():
"""Upper case query should find lower case content.""" """Upper case query should find lower case content."""
user = factories.UserFactory(email="tester@ministry.fr") user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.email, name="eva karl") factories.IdentityFactory(user=user, email=user.admin_email, name="eva karl")
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
@@ -428,8 +428,8 @@ def test_api_users_list_authenticated_capital_query():
def test_api_contacts_list_authenticated_accented_query(): def test_api_contacts_list_authenticated_accented_query():
"""Accented content should be found by unaccented query.""" """Accented content should be found by unaccented query."""
user = factories.UserFactory(email="tester@ministry.fr") user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.email, name="john doe") factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
client = APIClient() client = APIClient()
client.force_login(user) client.force_login(user)
@@ -510,7 +510,7 @@ def test_api_users_list_pagination_page_size(
client.force_login(user) client.force_login(user)
for i in range(page_size): for i in range(page_size):
factories.UserFactory.create(email=f"user-{i}@people.com") factories.UserFactory.create(admin_email=f"user-{i}@people.com")
response = client.get( response = client.get(
f"/api/v1.0/users/?page_size={page_size}", f"/api/v1.0/users/?page_size={page_size}",
@@ -535,7 +535,7 @@ def test_api_users_list_pagination_wrong_page_size(
client.force_login(user) client.force_login(user)
for i in range(page_size): for i in range(page_size):
factories.UserFactory.create(email=f"user-{i}@people.com") factories.UserFactory.create(admin_email=f"user-{i}@people.com")
response = client.get( response = client.get(
f"/api/v1.0/users/?page_size={page_size}", f"/api/v1.0/users/?page_size={page_size}",

View File

@@ -95,7 +95,7 @@ def test_authentication_getter_existing_user_change_fields(
klass = OIDCAuthenticationBackend() klass = OIDCAuthenticationBackend()
identity = IdentityFactory(name="John Doe", email="john.doe@example.com") identity = IdentityFactory(name="John Doe", email="john.doe@example.com")
user_email = identity.user.email user_email = identity.user.admin_email
# Create multiple identities for a user # Create multiple identities for a user
for _ in range(5): for _ in range(5):
@@ -125,7 +125,7 @@ def test_authentication_getter_existing_user_change_fields(
assert models.User.objects.count() == 1 assert models.User.objects.count() == 1
assert user == identity.user assert user == identity.user
assert user.email == user_email assert user.admin_email == user_email
def test_authentication_getter_new_user_no_email(monkeypatch): def test_authentication_getter_new_user_no_email(monkeypatch):
@@ -148,7 +148,7 @@ def test_authentication_getter_new_user_no_email(monkeypatch):
assert identity.sub == "123" assert identity.sub == "123"
assert identity.email is None assert identity.email is None
assert user.email is None assert user.admin_email is None
assert user.password == "!" assert user.password == "!"
assert models.User.objects.count() == 1 assert models.User.objects.count() == 1
@@ -177,7 +177,7 @@ def test_authentication_getter_new_user_with_email(monkeypatch):
assert identity.email == email assert identity.email == email
assert identity.name == "John Doe" assert identity.name == "John Doe"
assert user.email is None assert user.admin_email is None
assert models.User.objects.count() == 1 assert models.User.objects.count() == 1

View File

@@ -31,20 +31,20 @@ def test_models_users_id_unique():
def test_models_users_email_unique(): def test_models_users_email_unique():
"""The "email" field should be unique except for the null value.""" """The "admin_email" field should be unique except for the null value."""
user = factories.UserFactory() user = factories.UserFactory()
with pytest.raises( with pytest.raises(
ValidationError, match="User with this Email address already exists." ValidationError, match="User with this Admin email address already exists."
): ):
models.User.objects.create(email=user.email) models.User.objects.create(admin_email=user.admin_email, password="password")
def test_models_users_email_several_null(): def test_models_users_email_several_null():
"""Several users with a null value for the "email" field can co-exist.""" """Several users with a null value for the "email" field can co-exist."""
factories.UserFactory(email=None) factories.UserFactory(admin_email=None)
models.User.objects.create(email=None, password="foo.") models.User.objects.create(admin_email=None, password="foo.")
assert models.User.objects.filter(email__isnull=True).count() == 2 assert models.User.objects.filter(admin_email__isnull=True).count() == 2
def test_models_users_profile_not_owned(): def test_models_users_profile_not_owned():
@@ -91,11 +91,26 @@ def test_models_users_send_mail_main_existing():
) )
def test_models_users_send_mail_main_missing(): def test_models_users_send_mail_main_admin():
"""The 'email_user' method should fail if the user has no email address.""" """
The 'email_user' method should send mail to the user's admin email address if the
user has no related identities.
"""
user = factories.UserFactory() user = factories.UserFactory()
with pytest.raises(models.Identity.DoesNotExist) as excinfo: with mock.patch("django.core.mail.send_mail") as mock_send:
user.email_user("my subject", "my message") user.email_user("my subject", "my message")
assert str(excinfo.value) == "Identity matching query does not exist." mock_send.assert_called_once_with(
"my subject", "my message", None, [user.admin_email]
)
def test_models_users_send_mail_main_missing():
"""The 'email_user' method should fail if the user has no email address."""
user = factories.UserFactory(admin_email=None)
with pytest.raises(ValueError) as excinfo:
user.email_user("my subject", "my message")
assert str(excinfo.value) == "You must first set an email for the user."

View File

@@ -115,7 +115,7 @@ def create_demo(stdout):
for i in range(defaults.NB_OBJECTS["users"]): for i in range(defaults.NB_OBJECTS["users"]):
queue.push( queue.push(
models.User( models.User(
email=f"user{i:d}@example.com", admin_email=f"user{i:d}@example.com",
password="!", password="!",
is_superuser=False, is_superuser=False,
is_active=True, is_active=True,
@@ -126,12 +126,12 @@ def create_demo(stdout):
queue.flush() queue.flush()
with Timeit(stdout, "Creating identities"): with Timeit(stdout, "Creating identities"):
users_values = list(models.User.objects.values("id", "email")) users_values = list(models.User.objects.values("id", "admin_email"))
for user_dict in users_values: for user_dict in users_values:
for i in range( for i in range(
random.choices(range(5), weights=[5, 50, 30, 10, 5], k=1)[0] random.choices(range(5), weights=[5, 50, 30, 10, 5], k=1)[0]
): ):
user_email = user_dict["email"] user_email = user_dict["admin_email"]
queue.push( queue.push(
models.Identity( models.Identity(
user_id=user_dict["id"], user_id=user_dict["id"],

View File

@@ -17,7 +17,7 @@ class Command(BaseCommand):
def add_arguments(self, parser): def add_arguments(self, parser):
"""Define required arguments "email" and "password".""" """Define required arguments "email" and "password"."""
parser.add_argument( parser.add_argument(
"--email", "--admin_email",
help=("Email for the user."), help=("Email for the user."),
) )
parser.add_argument( parser.add_argument(
@@ -30,11 +30,11 @@ class Command(BaseCommand):
Given an email and a password, create a superuser or upgrade the existing Given an email and a password, create a superuser or upgrade the existing
user to superuser status. user to superuser status.
""" """
email = options.get("email") email = options.get("admin_email")
try: try:
user = User.objects.get(email=email) user = User.objects.get(admin_email=email)
except User.DoesNotExist: except User.DoesNotExist:
user = User(email=email) user = User(admin_email=email)
message = "Superuser created successfully." message = "Superuser created successfully."
else: else:
if user.is_superuser and user.is_staff: if user.is_superuser and user.is_staff:

View File

@@ -31,3 +31,14 @@ def test_commands_create_demo():
assert models.Identity.objects.exists() assert models.Identity.objects.exists()
assert models.Team.objects.count() == 3 assert models.Team.objects.count() == 3
assert models.TeamAccess.objects.count() >= 3 assert models.TeamAccess.objects.count() >= 3
def test_commands_createsuperuser():
"""
The createsuperuser management command should create an use
with superuser permissions.
"""
call_command("createsuperuser")
assert models.User.objects.count() == 1