♻️(models) remove multiple identities

Multiple identities were complicating this project's code.
We moved the management of multiple identities to our
OIDC provider.
This commit is contained in:
Samuel Paccoud - DINUM
2024-06-09 22:43:42 +02:00
committed by Samuel Paccoud
parent 79330015e0
commit 2ec292bb91
37 changed files with 509 additions and 1149 deletions

View File

@@ -189,7 +189,8 @@ showmigrations: ## run django showmigrations for the people project.
superuser: ## Create an admin superuser with password "admin"
@echo "$(BOLD)Creating a Django superuser$(RESET)"
@$(MANAGE) createsuperuser --admin_email admin@example.com --password admin
$(MANAGE) createsuperuser --username admin --password admin
.PHONY: superuser
back-i18n-compile: ## compile the gettext files

View File

@@ -1,6 +1,5 @@
"""Admin classes and registrations for People's core app."""
from django import forms
from django.contrib import admin
from django.contrib.auth import admin as auth_admin
from django.utils.translation import gettext_lazy as _
@@ -8,42 +7,6 @@ from django.utils.translation import gettext_lazy as _
from . import models
class IdentityFormSet(forms.BaseInlineFormSet):
"""
Make the "is_main" field readonly when it is True so that declaring another identity
works in the admin.
"""
def add_fields(self, form, index):
"""Disable the "is_main" field when it is set to True"""
super().add_fields(form, index)
is_main_value = form.instance.is_main if form.instance else False
form.fields["is_main"].disabled = is_main_value
class IdentityInline(admin.TabularInline):
"""Inline admin class for user identities."""
fields = (
"sub",
"email",
"is_main",
"created_at",
"updated_at",
)
formset = IdentityFormSet
model = models.Identity
extra = 0
readonly_fields = ("email", "created_at", "sub", "updated_at")
def has_add_permission(self, request, obj):
"""
Identities are automatically created on successful OIDC logins.
Disable creating identities via the admin.
"""
return False
class TeamAccessInline(admin.TabularInline):
"""Inline admin class for team accesses."""
@@ -72,11 +35,12 @@ class UserAdmin(auth_admin.UserAdmin):
{
"fields": (
"id",
"sub",
"password",
)
},
),
(_("Personal info"), {"fields": ("admin_email", "language", "timezone")}),
(_("Personal info"), {"fields": ("email", "language", "timezone")}),
(
_("Permissions"),
{
@@ -97,13 +61,14 @@ class UserAdmin(auth_admin.UserAdmin):
None,
{
"classes": ("wide",),
"fields": ("admin_email", "password1", "password2"),
"fields": ("sub", "email", "password1", "password2"),
},
),
)
inlines = (IdentityInline, TeamAccessInline)
inlines = (TeamAccessInline,)
list_display = (
"admin_email",
"sub",
"email",
"created_at",
"updated_at",
"is_active",
@@ -113,8 +78,14 @@ class UserAdmin(auth_admin.UserAdmin):
)
list_filter = ("is_staff", "is_superuser", "is_device", "is_active")
ordering = ("is_active", "-is_superuser", "-is_staff", "-is_device", "-updated_at")
readonly_fields = ("id", "created_at", "updated_at")
search_fields = ("id", "admin_email", "identities__sub", "identities__email")
readonly_fields = ["id", "created_at", "updated_at"]
search_fields = ("id", "email", "sub")
def get_readonly_fields(self, request, obj=None):
"""The sub should only be editable for a create, not for updates."""
if obj:
return self.readonly_fields + ["sub"]
return self.readonly_fields
@admin.register(models.Team)

View File

@@ -1,7 +1,7 @@
"""API endpoints"""
from django.contrib.postgres.search import TrigramSimilarity
from django.db.models import Func, Max, OuterRef, Prefetch, Q, Subquery, Value
from django.db.models import Func, Max, OuterRef, Q, Subquery, Value
from django.db.models.functions import Coalesce
from rest_framework import (
@@ -198,12 +198,6 @@ class UserViewSet(
# Exclude inactive contacts
queryset = queryset.filter(
is_active=True,
).prefetch_related(
Prefetch(
"identities",
queryset=models.Identity.objects.filter(is_main=True),
to_attr="_identities_main",
)
)
# Exclude all users already in the given team
@@ -214,15 +208,11 @@ class UserViewSet(
if query := self.request.GET.get("q", ""):
similarity = Max(
TrigramSimilarity(
Coalesce(
Func("identities__email", function="unaccent"), Value("")
),
Coalesce(Func("email", function="unaccent"), Value("")),
Func(Value(query), function="unaccent"),
)
+ TrigramSimilarity(
Coalesce(
Func("identities__name", function="unaccent"), Value("")
),
Coalesce(Func("name", function="unaccent"), Value("")),
Func(Value(query), function="unaccent"),
)
)
@@ -329,7 +319,7 @@ class TeamAccessViewSet(
filter_backends = [filters.OrderingFilter]
ordering = ["role"]
ordering_fields = ["role", "email", "name"]
ordering_fields = ["role", "user__email", "user__name"]
def get_permissions(self):
"""User only needs to be authenticated to list team accesses"""
@@ -362,29 +352,17 @@ class TeamAccessViewSet(
user=self.request.user, team=self.kwargs["team_id"]
).values("role")[:1]
user_main_identity_query = models.Identity.objects.filter(
user=OuterRef("user_id"), is_main=True
)
queryset = (
# The logged-in user should be part of a team to see its accesses
queryset.filter(
team__accesses__user=self.request.user,
)
.prefetch_related(
Prefetch(
"user__identities",
queryset=models.Identity.objects.filter(is_main=True),
to_attr="_identities_main",
)
)
# Abilities are computed based on logged-in user's role and
# the user role on each team access
.annotate(
user_role=Subquery(user_role_query),
email=Subquery(user_main_identity_query.values("email")[:1]),
name=Subquery(user_main_identity_query.values("name")[:1]),
)
.select_related("user")
.distinct()
)
return queryset

View File

@@ -2,7 +2,6 @@
from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from django.db import models
from django.utils.translation import gettext_lazy as _
import requests
@@ -10,14 +9,12 @@ from mozilla_django_oidc.auth import (
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
)
from core.models import Identity
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
"""Custom OpenID Connect (OIDC) Authentication Backend.
This class overrides the default OIDC Authentication Backend to accommodate differences
in the User and Identity models, and handles signed and/or encrypted UserInfo response.
in the User model, and handles signed and/or encrypted UserInfo response.
"""
def get_userinfo(self, access_token, id_token, payload):
@@ -81,28 +78,16 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
_("User info contained no recognizable user identification")
)
user = (
self.UserModel.objects.filter(identities__sub=sub)
.annotate(
identity_email=models.F("identities__email"),
identity_name=models.F("identities__name"),
)
.distinct()
.first()
)
if user:
try:
user = self.UserModel.objects.get(sub=sub)
except self.UserModel.DoesNotExist:
if self.get_settings("OIDC_CREATE_USER", True):
user = self.create_user(user_info)
else:
email = user_info.get("email")
name = user_info.get("name")
if (
email
and email != user.identity_email
or name
and name != user.identity_name
):
Identity.objects.filter(sub=sub).update(email=email, name=name)
elif self.get_settings("OIDC_CREATE_USER", True):
user = self.create_user(user_info)
if email and email != user.email or name and name != user.name:
self.UserModel.objects.filter(sub=sub).update(email=email, name=name)
return user
@@ -114,9 +99,9 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
_("Claims contained no recognizable user identification")
)
user = self.UserModel.objects.create(password="!") # noqa: S106
Identity.objects.create(
user=user, sub=sub, email=claims.get("email"), name=claims.get("name")
return self.UserModel.objects.create(
password="!", # noqa: S106
sub=sub,
email=claims.get("email"),
name=claims.get("name"),
)
return user

View File

@@ -124,24 +124,13 @@ class UserFactory(factory.django.DjangoModelFactory):
class Meta:
model = models.User
django_get_or_create = ("admin_email",)
admin_email = factory.Faker("email")
language = factory.fuzzy.FuzzyChoice([lang[0] for lang in settings.LANGUAGES])
password = make_password("password")
class IdentityFactory(factory.django.DjangoModelFactory):
"""A factory to create identities for a user"""
class Meta:
model = models.Identity
django_get_or_create = ("sub",)
user = factory.SubFactory(UserFactory)
sub = factory.Sequence(lambda n: f"user{n!s}")
email = factory.Faker("email")
name = factory.Faker("name")
language = factory.fuzzy.FuzzyChoice([lang[0] for lang in settings.LANGUAGES])
password = make_password("password")
class TeamFactory(factory.django.DjangoModelFactory):

View File

@@ -1,4 +1,4 @@
# Generated by Django 5.0.3 on 2024-03-25 22:58
# Generated by Django 5.0.3 on 2024-06-08 09:25
import django.contrib.auth.models
import django.core.validators
@@ -45,7 +45,9 @@ class Migration(migrations.Migration):
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created at')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated at')),
('admin_email', models.EmailField(blank=True, max_length=254, null=True, unique=True, verbose_name='admin email address')),
('sub', models.CharField(help_text='Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_ characters only.', max_length=255, unique=True, validators=[django.core.validators.RegexValidator(message='Enter a valid sub. This value may contain only letters, numbers, and @/./+/-/_ characters.', regex='^[\\w.@+-]+\\Z')], verbose_name='sub')),
('email', models.EmailField(blank=True, max_length=254, null=True, verbose_name='email address')),
('name', models.CharField(blank=True, max_length=100, null=True, verbose_name='name')),
('language', models.CharField(choices="(('en-us', 'English'), ('fr-fr', 'French'))", default='en-us', help_text='The language in which the user wants to see the interface.', max_length=10, verbose_name='language')),
('timezone', timezone_field.fields.TimeZoneField(choices_display='WITH_GMT_OFFSET', default='UTC', help_text='The timezone in which the user wants to see times.', use_pytz=False)),
('is_device', models.BooleanField(default=False, help_text='Whether the user is a device or a real user.', verbose_name='device')),
@@ -87,25 +89,6 @@ class Migration(migrations.Migration):
name='profile_contact',
field=models.OneToOneField(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='user', to='core.contact'),
),
migrations.CreateModel(
name='Identity',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created at')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated at')),
('sub', models.CharField(help_text='Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_ characters only.', max_length=255, unique=True, validators=[django.core.validators.RegexValidator(message='Enter a valid sub. This value may contain only letters, numbers, and @/./+/-/_ characters.', regex='^[\\w.@+-]+\\Z')], verbose_name='sub')),
('email', models.EmailField(blank=True, max_length=254, null=True, verbose_name='email address')),
('name', models.CharField(blank=True, max_length=100, null=True, verbose_name='name')),
('is_main', models.BooleanField(default=False, help_text='Designates whether the email is the main one.', verbose_name='main')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='identities', to=settings.AUTH_USER_MODEL)),
],
options={
'verbose_name': 'identity',
'verbose_name_plural': 'identities',
'db_table': 'people_identity',
'ordering': ('-is_main', 'email'),
},
),
migrations.CreateModel(
name='Invitation',
fields=[
@@ -173,10 +156,6 @@ class Migration(migrations.Migration):
name='contact',
unique_together={('owner', 'base')},
),
migrations.AddConstraint(
model_name='identity',
constraint=models.UniqueConstraint(fields=('user', 'email'), name='unique_user_email', violation_error_message='This email address is already declared for this user.'),
),
migrations.AddConstraint(
model_name='invitation',
constraint=models.UniqueConstraint(fields=('email', 'team'), name='email_and_team_unique_together'),

View File

@@ -161,9 +161,25 @@ class Contact(BaseModel):
class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
"""User model to work with OIDC only authentication."""
admin_email = models.EmailField(
_("admin email address"), unique=True, null=True, blank=True
sub_validator = validators.RegexValidator(
regex=r"^[\w.@+-]+\Z",
message=_(
"Enter a valid sub. This value may contain only letters, "
"numbers, and @/./+/-/_ characters."
),
)
sub = models.CharField(
_("sub"),
help_text=_(
"Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_ characters only."
),
max_length=255,
unique=True,
validators=[sub_validator],
)
email = models.EmailField(_("email address"), null=True, blank=True)
name = models.CharField(_("name"), max_length=100, null=True, blank=True)
profile_contact = models.OneToOneField(
Contact,
on_delete=models.SET_NULL,
@@ -205,7 +221,7 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
objects = auth_models.UserManager()
USERNAME_FIELD = "admin_email"
USERNAME_FIELD = "sub"
REQUIRED_FIELDS = []
class Meta:
@@ -217,113 +233,12 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
return (
str(self.profile_contact)
if self.profile_contact
else self.admin_email or str(self.id)
else self.email or str(self.sub)
)
def _get_identities_main(self):
"""Return a list with the main identity or an empty list."""
try:
return self._identities_main
except AttributeError:
return self.identities.filter(is_main=True)
@property
def name(self):
"""Return main identity's name."""
try:
return self._get_identities_main()[0].name
except IndexError:
return None
@property
def email(self):
"""Return main identity's email."""
try:
return self._get_identities_main()[0].email
except IndexError:
return None
def clean(self):
"""Validate fields."""
super().clean()
if self.profile_contact_id and not self.profile_contact.owner == self:
raise exceptions.ValidationError(
"Users can only declare as profile a contact they own."
)
def email_user(self, subject, message, from_email=None, **kwargs):
"""Email this user."""
email = self.email or self.admin_email
if not email:
raise ValueError("You must first set an email for the user.")
mail.send_mail(subject, message, from_email, [email], **kwargs)
@classmethod
def get_email_field_name(cls):
"""
Raise error when trying to get email field name from the user as we are using
a separate Email model to allow several emails per user.
"""
raise NotImplementedError(
"This feature is deactivated to allow several emails per user."
)
class Identity(BaseModel):
"""User identity"""
sub_validator = validators.RegexValidator(
regex=r"^[\w.@+-]+\Z",
message=_(
"Enter a valid sub. This value may contain only letters, "
"numbers, and @/./+/-/_ characters."
),
)
user = models.ForeignKey(User, related_name="identities", on_delete=models.CASCADE)
sub = models.CharField(
_("sub"),
help_text=_(
"Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_ characters only."
),
max_length=255,
unique=True,
validators=[sub_validator],
)
email = models.EmailField(_("email address"), null=True, blank=True)
name = models.CharField(_("name"), max_length=100, null=True, blank=True)
is_main = models.BooleanField(
_("main"),
default=False,
help_text=_("Designates whether the email is the main one."),
)
class Meta:
db_table = "people_identity"
ordering = ("-is_main", "email")
verbose_name = _("identity")
verbose_name_plural = _("identities")
constraints = [
# Uniqueness
models.UniqueConstraint(
fields=["user", "email"],
name="unique_user_email",
violation_error_message=_(
"This email address is already declared for this user."
),
),
]
def __str__(self):
main_str = "[main]" if self.is_main else ""
id_str = self.email or self.sub
return f"{id_str:s}{main_str:s}"
def save(self, *args, **kwargs):
"""
Saves identity, ensuring users always have exactly one main identity.
If it's a new identity, give its user access to the relevant teams.
If it's a new user, give her access to the relevant teams.
"""
if self._state.adding:
@@ -331,9 +246,16 @@ class Identity(BaseModel):
super().save(*args, **kwargs)
# Ensure users always have one and only one main identity.
if self.is_main is True:
self.user.identities.exclude(id=self.id).update(is_main=False)
def clean(self):
"""Validate fields."""
super().clean()
if self.email:
self.email = User.objects.normalize_email(self.email)
if self.profile_contact_id and not self.profile_contact.owner == self:
raise exceptions.ValidationError(
"Users can only declare as profile a contact they own."
)
def _convert_valid_invitations(self):
"""
@@ -354,24 +276,17 @@ class Identity(BaseModel):
TeamAccess.objects.bulk_create(
[
TeamAccess(user=self.user, team=invitation.team, role=invitation.role)
TeamAccess(user=self, team=invitation.team, role=invitation.role)
for invitation in valid_invitations
]
)
valid_invitations.delete()
def clean(self):
"""Normalize the email field and clean the 'is_main' field."""
if self.email:
self.email = User.objects.normalize_email(self.email)
if not self.user.identities.exclude(pk=self.pk).filter(is_main=True).exists():
if not self.created_at:
self.is_main = True
elif not self.is_main:
raise exceptions.ValidationError(
{"is_main": "A user should have one and only one main identity."}
)
super().clean()
def email_user(self, subject, message, from_email=None, **kwargs):
"""Email this user."""
if not self.email:
raise ValueError("You must first set an email for the user.")
mail.send_mail(subject, message, from_email, [self.email], **kwargs)
class Team(BaseModel):
@@ -615,8 +530,8 @@ class Invitation(BaseModel):
"""Validate fields."""
super().clean()
# Check if an identity already exists for the provided email
if Identity.objects.filter(email=self.email).exists():
# Check if a user already exists for the provided email
if User.objects.filter(email=self.email).exists():
raise exceptions.ValidationError(
{"email": _("This email is already associated to a registered user.")}
)

View File

@@ -4,9 +4,8 @@ from django.core.exceptions import SuspiciousOperation
import pytest
from core import models
from core import factories, models
from core.authentication.backends import OIDCAuthenticationBackend
from core.factories import IdentityFactory
pytestmark = pytest.mark.django_db
@@ -19,26 +18,19 @@ def test_authentication_getter_existing_user_no_email(
"""
klass = OIDCAuthenticationBackend()
# Create a user and its identity
identity = IdentityFactory(name=None)
# Create multiple identities for a user
for _ in range(5):
IdentityFactory(user=identity.user)
user = factories.UserFactory()
def get_userinfo_mocked(*args):
return {"sub": identity.sub}
return {"sub": user.sub}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
with django_assert_num_queries(1):
user = klass.get_or_create_user(
authenticated_user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
identity.refresh_from_db()
assert user == identity.user
assert user == authenticated_user
def test_authentication_getter_existing_user_with_email(
@@ -48,19 +40,12 @@ def test_authentication_getter_existing_user_with_email(
When the user's info contains an email and targets an existing user,
"""
klass = OIDCAuthenticationBackend()
identity = IdentityFactory(name="John Doe")
# Create multiple identities for a user
for _ in range(5):
IdentityFactory(user=identity.user)
assert models.User.objects.count() == 1
user = factories.UserFactory(name="John Doe")
def get_userinfo_mocked(*args):
return {
"sub": identity.sub,
"email": identity.email,
"sub": user.sub,
"email": user.email,
"first_name": "John",
"last_name": "Doe",
}
@@ -69,11 +54,11 @@ def test_authentication_getter_existing_user_with_email(
# Only 1 query because email and names have not changed
with django_assert_num_queries(1):
user = klass.get_or_create_user(
authenticated_user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert models.User.objects.get() == user
assert user == authenticated_user
@pytest.mark.parametrize(
@@ -89,23 +74,14 @@ def test_authentication_getter_existing_user_change_fields(
first_name, last_name, email, django_assert_num_queries, monkeypatch
):
"""
It should update the email or name fields on the identity when they change.
The email on the user should not be changed.
It should update the email or name fields on the user when they change.
"""
klass = OIDCAuthenticationBackend()
identity = IdentityFactory(name="John Doe", email="john.doe@example.com")
user_email = identity.user.admin_email
# Create multiple identities for a user
for _ in range(5):
IdentityFactory(user=identity.user)
assert models.User.objects.count() == 1
user = factories.UserFactory(name="John Doe", email="john.doe@example.com")
def get_userinfo_mocked(*args):
return {
"sub": identity.sub,
"sub": user.sub,
"email": email,
"first_name": first_name,
"last_name": last_name,
@@ -115,23 +91,20 @@ def test_authentication_getter_existing_user_change_fields(
# One and only one additional update query when a field has changed
with django_assert_num_queries(2):
user = klass.get_or_create_user(
authenticated_user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
identity.refresh_from_db()
assert identity.email == email
assert identity.name == f"{first_name:s} {last_name:s}"
assert models.User.objects.count() == 1
assert user == identity.user
assert user.admin_email == user_email
assert user == authenticated_user
user.refresh_from_db()
assert user.email == email
assert user.name == f"{first_name:s} {last_name:s}"
def test_authentication_getter_new_user_no_email(monkeypatch):
"""
If no user matches the user's info sub, a user should be created.
User's info doesn't contain an email, created user's email should be empty.
User's info doesn't contain an email/name, created user's email/name should be empty.
"""
klass = OIDCAuthenticationBackend()
@@ -144,11 +117,9 @@ def test_authentication_getter_new_user_no_email(monkeypatch):
access_token="test-token", id_token=None, payload=None
)
identity = user.identities.get()
assert identity.sub == "123"
assert identity.email is None
assert user.admin_email is None
assert user.sub == "123"
assert user.email is None
assert user.name is None
assert user.password == "!"
assert models.User.objects.count() == 1
@@ -156,11 +127,9 @@ def test_authentication_getter_new_user_no_email(monkeypatch):
def test_authentication_getter_new_user_with_email(monkeypatch):
"""
If no user matches the user's info sub, a user should be created.
User's email and name should be set on the identity.
The "email" field on the User model should not be set as it is reserved for staff users.
User's email and name should be set on the user.
"""
klass = OIDCAuthenticationBackend()
email = "people@example.com"
def get_userinfo_mocked(*args):
@@ -172,12 +141,10 @@ def test_authentication_getter_new_user_with_email(monkeypatch):
access_token="test-token", id_token=None, payload=None
)
identity = user.identities.get()
assert identity.sub == "123"
assert identity.email == email
assert identity.name == "John Doe"
assert user.admin_email is None
assert user.sub == "123"
assert user.email == email
assert user.name == "John Doe"
assert user.password == "!"
assert models.User.objects.count() == 1

View File

@@ -38,8 +38,7 @@ def test_view_logout_anonymous():
def test_view_logout(mocked_oidc_logout_url):
"""Authenticated users should be redirected to OIDC provider for logout."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -60,8 +59,7 @@ def test_view_logout(mocked_oidc_logout_url):
def test_view_logout_no_oidc_provider(mocked_oidc_logout_url):
"""Authenticated users should be logged out when no OIDC provider is available."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -96,8 +94,7 @@ def test_view_logout_callback_anonymous():
def test_view_logout_persist_state(initial_oidc_states):
"""State value should be persisted in session's data."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
request = RequestFactory().request()
request.user = user
@@ -128,8 +125,7 @@ def test_view_logout_construct_oidc_logout_url(
):
"""Should construct the logout URL to initiate the logout flow with the OIDC provider."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
request = RequestFactory().request()
request.user = user
@@ -159,8 +155,7 @@ def test_view_logout_construct_oidc_logout_url_none_id_token():
"""If no ID token is available in the session,
the user should be redirected to the final URL."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
request = RequestFactory().request()
request.user = user
@@ -180,8 +175,7 @@ def test_view_logout_construct_oidc_logout_url_none_id_token():
def test_view_logout_callback_wrong_state(initial_state):
"""Should raise an error if OIDC state doesn't match session data."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
request = RequestFactory().request()
request.user = user
@@ -207,8 +201,7 @@ def test_view_logout_callback_wrong_state(initial_state):
def test_view_logout_callback():
"""If state matches, callback should clear OIDC state and redirects."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
request = RequestFactory().get("/logout-callback/", data={"state": "mocked_state"})
request.user = user

View File

@@ -41,9 +41,7 @@ def test_api_team_accesses_create_authenticated_unrelated():
Authenticated users should not be allowed to create team accesses for a team to
which they are not related.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
other_user = factories.UserFactory()
team = factories.TeamFactory()
@@ -66,9 +64,7 @@ def test_api_team_accesses_create_authenticated_unrelated():
def test_api_team_accesses_create_authenticated_member():
"""Members of a team should not be allowed to create team accesses."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "member")])
other_user = factories.UserFactory()
@@ -96,9 +92,7 @@ def test_api_team_accesses_create_authenticated_administrator():
"""
Administrators of a team should be able to create team accesses except for the "owner" role.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "administrator")])
other_user = factories.UserFactory()
@@ -149,9 +143,7 @@ def test_api_team_accesses_create_authenticated_owner():
"""
Owners of a team should be able to create team accesses whatever the role.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "owner")])
other_user = factories.UserFactory()
@@ -227,7 +219,7 @@ def test_api_team_accesses_create_webhook():
"value": [
{
"value": str(other_user.id),
"email": None,
"email": other_user.email,
"type": "User",
}
],

View File

@@ -32,9 +32,7 @@ def test_api_team_accesses_delete_authenticated():
Authenticated users should not be allowed to delete a team access for a
team to which they are not related.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
access = factories.TeamAccessFactory()
client = APIClient()
@@ -52,9 +50,7 @@ def test_api_team_accesses_delete_member():
Authenticated users should not be allowed to delete a team access for a
team in which they are a simple member.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "member")])
access = factories.TeamAccessFactory(team=team)
@@ -76,9 +72,7 @@ def test_api_team_accesses_delete_administrators():
Users who are administrators in a team should be allowed to delete an access
from the team provided it is not ownership.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "administrator")])
access = factories.TeamAccessFactory(
team=team, role=random.choice(["member", "administrator"])
@@ -102,9 +96,7 @@ def test_api_team_accesses_delete_owners_except_owners():
Users should be able to delete the team access of another user
for a team of which they are owner provided it is not an owner access.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "owner")])
access = factories.TeamAccessFactory(
team=team, role=random.choice(["member", "administrator"])
@@ -128,9 +120,7 @@ def test_api_team_accesses_delete_owners_for_owners():
Users should not be allowed to delete the team access of another owner
even for a team in which they are direct owner.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "owner")])
access = factories.TeamAccessFactory(team=team, role="owner")
@@ -152,7 +142,6 @@ def test_api_team_accesses_delete_owners_last_owner():
It should not be possible to delete the last owner access from a team
"""
user = factories.UserFactory()
team = factories.TeamFactory()
access = factories.TeamAccessFactory(team=team, user=user, role="owner")
assert models.TeamAccess.objects.count() == 1
@@ -173,7 +162,6 @@ def test_api_team_accesses_delete_webhook():
When the team has a webhook, deleting a team access should fire a call.
"""
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "administrator")])
webhook = factories.TeamWebhookFactory(team=team)
access = factories.TeamAccessFactory(
@@ -215,7 +203,7 @@ def test_api_team_accesses_delete_webhook():
"value": [
{
"value": str(access.user.id),
"email": None,
"email": access.user.email,
"type": "User",
}
],

View File

@@ -28,9 +28,7 @@ def test_api_team_accesses_list_authenticated_unrelated():
Authenticated users should not be allowed to list team accesses for a team
to which they are not related.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory()
factories.TeamAccessFactory.create_batch(3, team=team)
@@ -57,19 +55,12 @@ def test_api_team_accesses_list_authenticated_related():
Authenticated users should be able to list team accesses for a team
to which they are related, with a given role.
"""
identity = factories.IdentityFactory(is_main=True)
user = identity.user
user, administrator, owner = factories.UserFactory.create_batch(3)
team = factories.TeamFactory()
owner = factories.IdentityFactory(is_main=True)
access1 = factories.TeamAccessFactory.create(
team=team, user=owner.user, role="owner"
)
administrator = factories.IdentityFactory(is_main=True)
access1 = factories.TeamAccessFactory.create(team=team, user=owner, role="owner")
access2 = factories.TeamAccessFactory.create(
team=team, user=administrator.user, role="administrator"
team=team, user=administrator, role="administrator"
)
# Ensure this user's role is different from other team members to test abilities' computation
@@ -93,8 +84,8 @@ def test_api_team_accesses_list_authenticated_related():
"id": str(user_access.id),
"user": {
"id": str(user_access.user.id),
"email": str(identity.email),
"name": str(identity.name),
"email": str(user.email),
"name": str(user.name),
},
"role": str(user_access.role),
"abilities": user_access.get_abilities(user),
@@ -124,48 +115,6 @@ def test_api_team_accesses_list_authenticated_related():
)
def test_api_team_accesses_list_authenticated_main_identity():
"""
Name and email should be returned from main identity only
"""
user = factories.UserFactory()
identity = factories.IdentityFactory(user=user, is_main=True)
factories.IdentityFactory(user=user) # additional non-main identity
team = factories.TeamFactory()
models.TeamAccess.objects.create(team=team, user=user) # random role
# other team members should appear, with correct identity
other_user = factories.UserFactory()
other_main_identity = factories.IdentityFactory(is_main=True, user=other_user)
factories.IdentityFactory(user=other_user)
factories.TeamAccessFactory.create(team=team, user=other_user)
# Accesses for other teams to which the user is related should not be listed either
other_access = factories.TeamAccessFactory(user=user)
factories.TeamAccessFactory(team=other_access.team)
client = APIClient()
client.force_login(user)
response = client.get(
f"/api/v1.0/teams/{team.id!s}/accesses/",
)
assert response.status_code == 200
assert response.json()["count"] == 2
users_info = [
(access["user"]["email"], access["user"]["name"])
for access in response.json()["results"]
]
# user information should be returned from main identity
assert sorted(users_info) == sorted(
[
(str(identity.email), str(identity.name)),
(str(other_main_identity.email), str(other_main_identity.name)),
]
)
def test_api_team_accesses_list_authenticated_constant_numqueries(
django_assert_num_queries,
):
@@ -173,31 +122,28 @@ def test_api_team_accesses_list_authenticated_constant_numqueries(
The number of queries should not depend on the amount of fetched accesses.
"""
user = factories.UserFactory()
factories.IdentityFactory(user=user, is_main=True)
team = factories.TeamFactory()
models.TeamAccess.objects.create(team=team, user=user) # random role
client = APIClient()
client.force_login(user)
# Only 4 queries are needed to efficiently fetch team accesses,
# Only 3 queries are needed to efficiently fetch team accesses,
# related users and identities :
# - query retrieving logged-in user for user_role annotation
# - count from pagination
# - query prefetching users' main identity
# - distinct from viewset
with django_assert_num_queries(4):
with django_assert_num_queries(3):
response = client.get(
f"/api/v1.0/teams/{team.id!s}/accesses/",
)
# create 20 new team members
for _ in range(20):
extra_user = factories.IdentityFactory(is_main=True).user
extra_user = factories.UserFactory()
factories.TeamAccessFactory(team=team, user=extra_user)
# num queries should still be 4
with django_assert_num_queries(4):
# num queries should still be the same
with django_assert_num_queries(3):
response = client.get(
f"/api/v1.0/teams/{team.id!s}/accesses/",
)
@@ -210,14 +156,12 @@ def test_api_team_accesses_list_authenticated_ordering():
"""Team accesses can be ordered by "role"."""
user = factories.UserFactory()
factories.IdentityFactory(user=user, is_main=True)
team = factories.TeamFactory()
models.TeamAccess.objects.create(team=team, user=user)
# create 20 new team members
for _ in range(20):
extra_user = factories.IdentityFactory(is_main=True).user
extra_user = factories.UserFactory()
factories.TeamAccessFactory(team=team, user=extra_user)
client = APIClient()
@@ -242,26 +186,24 @@ def test_api_team_accesses_list_authenticated_ordering():
assert sorted(results, reverse=True) == results
@pytest.mark.parametrize("ordering_fields", ["name", "email"])
def test_api_team_accesses_list_authenticated_ordering_user(ordering_fields):
"""Team accesses can be ordered by user's fields "email" or "name"."""
@pytest.mark.parametrize("ordering_field", ["email", "name"])
def test_api_team_accesses_list_authenticated_ordering_user(ordering_field):
"""Team accesses can be ordered by user's fields."""
user = factories.UserFactory()
factories.IdentityFactory(user=user, is_main=True)
team = factories.TeamFactory()
models.TeamAccess.objects.create(team=team, user=user)
# create 20 new team members
for _ in range(20):
extra_user = factories.IdentityFactory(is_main=True).user
extra_user = factories.UserFactory()
factories.TeamAccessFactory(team=team, user=extra_user)
client = APIClient()
client.force_login(user)
response = client.get(
f"/api/v1.0/teams/{team.id!s}/accesses/?ordering={ordering_fields}",
f"/api/v1.0/teams/{team.id!s}/accesses/?ordering=user__{ordering_field}",
)
assert response.status_code == HTTP_200_OK
assert response.json()["count"] == 21
@@ -271,19 +213,7 @@ def test_api_team_accesses_list_authenticated_ordering_user(ordering_fields):
return x.casefold().replace(" ", "")
results = [
team_access["user"][ordering_fields]
team_access["user"][ordering_field]
for team_access in response.json()["results"]
]
assert sorted(results, key=normalize) == results
response = client.get(
f"/api/v1.0/teams/{team.id!s}/accesses/?ordering=-{ordering_fields}",
)
assert response.status_code == HTTP_200_OK
assert response.json()["count"] == 21
results = [
team_access["user"][ordering_fields]
for team_access in response.json()["results"]
]
assert sorted(results, reverse=True, key=normalize) == results

View File

@@ -31,9 +31,7 @@ def test_api_team_accesses_retrieve_authenticated_unrelated():
Authenticated users should not be allowed to retrieve a team access for
a team to which they are not related.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
access = factories.TeamAccessFactory(team=factories.TeamFactory())
client = APIClient()
@@ -62,9 +60,7 @@ def test_api_team_accesses_retrieve_authenticated_related():
A user who is related to a team should be allowed to retrieve the
associated team user accesses.
"""
identity = factories.IdentityFactory(is_main=True)
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory()
access = factories.TeamAccessFactory(team=team, user=user)
@@ -79,8 +75,8 @@ def test_api_team_accesses_retrieve_authenticated_related():
"id": str(access.id),
"user": {
"id": str(access.user.id),
"email": str(identity.email),
"name": str(identity.name),
"email": str(user.email),
"name": str(user.name),
},
"role": str(access.role),
"abilities": access.get_abilities(user),

View File

@@ -43,9 +43,7 @@ def test_api_team_accesses_update_authenticated_unrelated():
Authenticated users should not be allowed to update a team access for a team to which
they are not related.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
access = factories.TeamAccessFactory()
old_values = serializers.TeamAccessSerializer(instance=access).data
@@ -72,9 +70,7 @@ def test_api_team_accesses_update_authenticated_unrelated():
def test_api_team_accesses_update_authenticated_member():
"""Members of a team should not be allowed to update its accesses."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "member")])
access = factories.TeamAccessFactory(team=team)
old_values = serializers.TeamAccessSerializer(instance=access).data
@@ -105,9 +101,7 @@ def test_api_team_accesses_update_administrator_except_owner():
A user who is an administrator in a team should be allowed to update a user
access for this team, as long as they don't try to set the role to owner.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "administrator")])
access = factories.TeamAccessFactory(
team=team,
@@ -151,9 +145,7 @@ def test_api_team_accesses_update_administrator_from_owner():
A user who is an administrator in a team, should not be allowed to update
the user access of an "owner" for this team.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "administrator")])
other_user = factories.UserFactory()
access = factories.TeamAccessFactory(team=team, user=other_user, role="owner")
@@ -184,9 +176,7 @@ def test_api_team_accesses_update_administrator_to_owner():
A user who is an administrator in a team, should not be allowed to update
the user access of another user to grant team ownership.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "administrator")])
other_user = factories.UserFactory()
access = factories.TeamAccessFactory(
@@ -227,9 +217,7 @@ def test_api_team_accesses_update_owner_except_owner():
A user who is an owner in a team should be allowed to update
a user access for this team except for existing "owner" accesses.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "owner")])
factories.UserFactory()
access = factories.TeamAccessFactory(
@@ -275,9 +263,7 @@ def test_api_team_accesses_update_owner_for_owners():
A user who is "owner" of a team should not be allowed to update
an existing owner access for this team.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "owner")])
access = factories.TeamAccessFactory(team=team, role="owner")
old_values = serializers.TeamAccessSerializer(instance=access).data
@@ -307,9 +293,7 @@ def test_api_team_accesses_update_owner_self():
A user who is owner of a team should be allowed to update
their own user access provided there are other owners in the team.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory()
access = factories.TeamAccessFactory(team=team, user=user, role="owner")
old_values = serializers.TeamAccessSerializer(instance=access).data

View File

@@ -10,7 +10,7 @@ from rest_framework.status import (
)
from rest_framework.test import APIClient
from core.factories import IdentityFactory, TeamFactory
from core.factories import TeamFactory, UserFactory
from core.models import Team
pytestmark = pytest.mark.django_db
@@ -34,11 +34,10 @@ def test_api_teams_create_authenticated():
Authenticated users should be able to create teams and should automatically be declared
as the owner of the newly created team.
"""
identity = IdentityFactory()
user = identity.user
user = UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
"/api/v1.0/teams/",
@@ -58,9 +57,9 @@ def test_api_teams_create_authenticated_slugify_name():
"""
Creating teams should automatically generate a slug.
"""
identity = IdentityFactory()
user = UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
"/api/v1.0/teams/",
@@ -87,10 +86,10 @@ def test_api_teams_create_authenticated_expected_slug(param):
"""
Creating teams should automatically create unaccented, no unicode, lower-case slug.
"""
identity = IdentityFactory()
user = UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
"/api/v1.0/teams/",
@@ -110,10 +109,10 @@ def test_api_teams_create_authenticated_unique_slugs():
Creating teams should raise an error if already existing slug.
"""
TeamFactory(name="existing team")
identity = IdentityFactory()
user = UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
"/api/v1.0/teams/",

View File

@@ -33,10 +33,10 @@ def test_api_teams_delete_authenticated_unrelated():
Authenticated users should not be allowed to delete a team to which they are not
related.
"""
identity = factories.IdentityFactory()
user = factories.UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
team = factories.TeamFactory()
@@ -54,8 +54,7 @@ def test_api_teams_delete_authenticated_member():
Authenticated users should not be allowed to delete a team for which they are
only a member.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -78,8 +77,7 @@ def test_api_teams_delete_authenticated_administrator():
Authenticated users should not be allowed to delete a team for which they are
administrator.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -102,8 +100,7 @@ def test_api_teams_delete_authenticated_owner():
Authenticated users should be able to delete a team for which they are directly
owner.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)

View File

@@ -31,8 +31,7 @@ def test_api_teams_list_authenticated():
Authenticated users should be able to list teams
they are an owner/administrator/member of.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -59,8 +58,7 @@ def test_api_teams_list_pagination(
_mock_page_size,
):
"""Pagination should work as expected."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -105,8 +103,7 @@ def test_api_teams_list_pagination(
def test_api_teams_list_authenticated_distinct():
"""A team with several related users should only be listed once."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -129,8 +126,7 @@ def test_api_teams_order():
"""
Test that the endpoint GET teams is sorted in 'created_at' descending order by default.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -159,8 +155,7 @@ def test_api_teams_order_param():
Test that the 'created_at' field is sorted in ascending order
when the 'ordering' query parameter is set.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)

View File

@@ -27,10 +27,10 @@ def test_api_teams_retrieve_authenticated_unrelated():
Authenticated users should not be allowed to retrieve a team to which they are
not related.
"""
identity = factories.IdentityFactory()
user = factories.UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
team = factories.TeamFactory()
@@ -46,8 +46,7 @@ def test_api_teams_retrieve_authenticated_related():
Authenticated users should be allowed to retrieve a team to which they
are related whatever the role.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)

View File

@@ -45,10 +45,10 @@ def test_api_teams_update_authenticated_unrelated():
"""
Authenticated users should not be allowed to update a team to which they are not related.
"""
identity = factories.IdentityFactory()
user = factories.UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
team = factories.TeamFactory()
old_team_values = serializers.TeamSerializer(instance=team).data
@@ -73,8 +73,7 @@ def test_api_teams_update_authenticated_members():
Users who are members of a team but not administrators should
not be allowed to update it.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -101,8 +100,7 @@ def test_api_teams_update_authenticated_members():
def test_api_teams_update_authenticated_administrators():
"""Administrators of a team should be allowed to update it."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -134,8 +132,7 @@ def test_api_teams_update_authenticated_administrators():
def test_api_teams_update_authenticated_owners():
"""Administrators of a team should be allowed to update it,
apart from read-only fields."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -170,8 +167,7 @@ def test_api_teams_update_administrator_or_owner_of_another():
Being administrator or owner of a team should not grant authorization to update
another team.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -200,8 +196,7 @@ def test_api_teams_update_existing_slug_should_return_error():
Updating a team's name to an existing slug should return a bad request,
instead of creating a duplicate.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)

View File

@@ -69,8 +69,7 @@ def test_api_contacts_list_authenticated_no_query():
Authenticated users should be able to list contacts without applying a query.
Profile and base contacts should be excluded.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
contact = factories.ContactFactory(owner=user)
user.profile_contact = contact
user.save()
@@ -108,8 +107,7 @@ def test_api_contacts_list_authenticated_by_full_name():
Authenticated users should be able to search users with a case insensitive and
partial query on the full name.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
dave = factories.BaseContactFactory(full_name="David Bowman")
nicole = factories.BaseContactFactory(full_name="Nicole Foole")
@@ -150,8 +148,7 @@ def test_api_contacts_list_authenticated_by_full_name():
def test_api_contacts_list_authenticated_uppercase_content():
"""Upper case content should be found by lower case query."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
dave = factories.BaseContactFactory(full_name="EEE", short_name="AAA")
@@ -175,8 +172,7 @@ def test_api_contacts_list_authenticated_uppercase_content():
def test_api_contacts_list_authenticated_capital_query():
"""Upper case query should find lower case content."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
dave = factories.BaseContactFactory(full_name="eee", short_name="aaa")
@@ -200,8 +196,7 @@ def test_api_contacts_list_authenticated_capital_query():
def test_api_contacts_list_authenticated_accented_content():
"""Accented content should be found by unaccented query."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
dave = factories.BaseContactFactory(full_name="ééé", short_name="ààà")
@@ -225,8 +220,7 @@ def test_api_contacts_list_authenticated_accented_content():
def test_api_contacts_list_authenticated_accented_query():
"""Accented query should find unaccented content."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
dave = factories.BaseContactFactory(full_name="eee", short_name="aaa")
@@ -264,9 +258,7 @@ def test_api_contacts_retrieve_authenticated_owned():
"""
Authenticated users should be allowed to retrieve a contact they own.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
contact = factories.ContactFactory(owner=user)
client = APIClient()
@@ -289,11 +281,11 @@ def test_api_contacts_retrieve_authenticated_public():
"""
Authenticated users should be able to retrieve public contacts.
"""
identity = factories.IdentityFactory()
user = factories.UserFactory()
contact = factories.BaseContactFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.get(f"/api/v1.0/contacts/{contact.id!s}/")
assert response.status_code == 200
@@ -311,11 +303,11 @@ def test_api_contacts_retrieve_authenticated_other():
"""
Authenticated users should not be allowed to retrieve another user's contacts.
"""
identity = factories.IdentityFactory()
user = factories.UserFactory()
contact = factories.ContactFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.get(f"/api/v1.0/contacts/{contact.id!s}/")
assert response.status_code == 403
@@ -339,10 +331,10 @@ def test_api_contacts_create_anonymous_forbidden():
def test_api_contacts_create_authenticated_missing_base():
"""Anonymous users should be able to create users."""
identity = factories.IdentityFactory(user__profile_contact=None)
user = factories.UserFactory(profile_contact=None)
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
"/api/v1.0/contacts/",
@@ -360,8 +352,7 @@ def test_api_contacts_create_authenticated_missing_base():
def test_api_contacts_create_authenticated_successful():
"""Authenticated users should be able to create contacts."""
identity = factories.IdentityFactory(user__profile_contact=None)
user = identity.user
user = factories.UserFactory(profile_contact=None)
base_contact = factories.BaseContactFactory()
client = APIClient()
@@ -407,9 +398,7 @@ def test_api_contacts_create_authenticated_existing_override():
Trying to create a contact for base contact that is already overriden by the user
should receive a 400 error.
"""
identity = factories.IdentityFactory(user__profile_contact=None)
user = identity.user
user = factories.UserFactory(profile_contact=None)
base_contact = factories.BaseContactFactory()
factories.ContactFactory(base=base_contact, owner=user)
@@ -463,8 +452,7 @@ def test_api_contacts_update_authenticated_owned():
"""
Authenticated users should be allowed to update their own contacts.
"""
identity = factories.IdentityFactory(user__profile_contact=None)
user = identity.user
user = factories.UserFactory(profile_contact=None)
client = APIClient()
client.force_login(user)
@@ -498,8 +486,7 @@ def test_api_contacts_update_authenticated_profile():
"""
Authenticated users should be allowed to update their profile contact.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -534,8 +521,7 @@ def test_api_contacts_update_authenticated_other():
"""
Authenticated users should not be allowed to update contacts owned by other users.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -573,8 +559,7 @@ def test_api_contacts_delete_list_anonymous():
def test_api_contacts_delete_list_authenticated():
"""Authenticated users should not be allowed to delete a list of contacts."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -602,8 +587,7 @@ def test_api_contacts_delete_authenticated_public():
"""
Authenticated users should not be allowed to delete a public contact.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -622,8 +606,7 @@ def test_api_contacts_delete_authenticated_owner():
"""
Authenticated users should be allowed to delete a contact they own.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
contact = factories.ContactFactory(owner=user)
client = APIClient()
@@ -642,8 +625,7 @@ def test_api_contacts_delete_authenticated_profile():
"""
Authenticated users should be allowed to delete their profile contact.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
contact = factories.ContactFactory(owner=user, base=None)
user.profile_contact = contact
user.save()
@@ -663,8 +645,7 @@ def test_api_contacts_delete_authenticated_other():
"""
Authenticated users should not be allowed to delete a contact they don't own.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
contact = factories.ContactFactory()
client = APIClient()

View File

@@ -34,15 +34,15 @@ def test_api_team_invitations__create__anonymous():
def test_api_team_invitations__create__authenticated_outsider():
"""Users outside of team should not be permitted to invite to team."""
identity = factories.IdentityFactory()
user = factories.UserFactory()
team = factories.TeamFactory()
invitation_values = serializers.InvitationSerializer(
factories.InvitationFactory.build()
).data
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
invitation_values,
@@ -57,16 +57,16 @@ def test_api_team_invitations__create__authenticated_outsider():
)
def test_api_team_invitations__create__privileged_members(role):
"""Owners and administrators should be able to invite new members."""
identity = factories.IdentityFactory()
team = factories.TeamFactory(users=[(identity.user, role)])
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, role)])
invitation_values = serializers.InvitationSerializer(
factories.InvitationFactory.build()
).data
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
invitation_values,
@@ -79,16 +79,16 @@ def test_api_team_invitations__create__members():
"""
Members should not be able to invite new members.
"""
identity = factories.IdentityFactory()
team = factories.TeamFactory(users=[(identity.user, "member")])
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "member")])
invitation_values = serializers.InvitationSerializer(
factories.InvitationFactory.build()
).data
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
invitation_values,
@@ -102,16 +102,16 @@ def test_api_team_invitations__create__members():
def test_api_team_invitations__create__issuer_and_team_automatically_added():
"""Team and issuer fields should auto-complete."""
identity = factories.IdentityFactory()
team = factories.TeamFactory(users=[(identity.user, "owner")])
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "owner")])
# Generate a random invitation
invitation = factories.InvitationFactory.build()
invitation_data = {"email": invitation.email, "role": invitation.role}
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
invitation_data,
@@ -120,7 +120,7 @@ def test_api_team_invitations__create__issuer_and_team_automatically_added():
assert response.status_code == status.HTTP_201_CREATED
# team and issuer automatically set
assert response.json()["team"] == str(team.id)
assert response.json()["issuer"] == str(identity.user.id)
assert response.json()["issuer"] == str(user.id)
def test_api_team_invitations__create__cannot_duplicate_invitation():
@@ -129,8 +129,8 @@ def test_api_team_invitations__create__cannot_duplicate_invitation():
team = existing_invitation.team
# Grant privileged role on the Team to the user
identity = factories.IdentityFactory()
factories.TeamAccessFactory(team=team, user=identity.user, role="administrator")
user = factories.UserFactory()
factories.TeamAccessFactory(team=team, user=user, role="administrator")
# Create a new invitation to the same team with the exact same email address
duplicated_invitation = serializers.InvitationSerializer(
@@ -138,7 +138,7 @@ def test_api_team_invitations__create__cannot_duplicate_invitation():
).data
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
duplicated_invitation,
@@ -154,11 +154,9 @@ def test_api_team_invitations__create__cannot_invite_existing_users():
"""
Should not be able to invite already existing users.
"""
user = factories.UserFactory()
user, existing_user = factories.UserFactory.create_batch(2)
team = factories.TeamFactory(users=[(user, "administrator")])
existing_user = factories.IdentityFactory(is_main=True)
# Build an invitation to the email of an exising identity in the db
invitation_values = serializers.InvitationSerializer(
factories.InvitationFactory.build(email=existing_user.email, team=team)
@@ -166,6 +164,7 @@ def test_api_team_invitations__create__cannot_invite_existing_users():
client = APIClient()
client.force_login(user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
invitation_values,
@@ -190,14 +189,10 @@ def test_api_team_invitations__list__authenticated():
Authenticated user should be able to list invitations
in teams they belong to, including from other issuers.
"""
identity = factories.IdentityFactory()
other_user = factories.UserFactory()
team = factories.TeamFactory(
users=[(identity.user, "administrator"), (other_user, "owner")]
)
user, other_user = factories.UserFactory.create_batch(2)
team = factories.TeamFactory(users=[(user, "administrator"), (other_user, "owner")])
invitation = factories.InvitationFactory(
team=team, role="administrator", issuer=identity.user
team=team, role="administrator", issuer=user
)
other_invitations = factories.InvitationFactory.create_batch(
2, team=team, role="member", issuer=other_user
@@ -208,7 +203,7 @@ def test_api_team_invitations__list__authenticated():
factories.InvitationFactory.create_batch(2, team=other_team, role="member")
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.get(
f"/api/v1.0/teams/{team.id}/invitations/",
)
@@ -235,24 +230,22 @@ def test_api_team_invitations__list__expired_invitations_still_listed(settings):
"""
Expired invitations are still listed.
"""
identity = factories.IdentityFactory()
user = factories.UserFactory()
other_user = factories.UserFactory()
team = factories.TeamFactory(
users=[(identity.user, "administrator"), (other_user, "owner")]
)
team = factories.TeamFactory(users=[(user, "administrator"), (other_user, "owner")])
# override settings to accelerate validation expiration
settings.INVITATION_VALIDITY_DURATION = 1 # second
expired_invitation = factories.InvitationFactory(
team=team,
role="member",
issuer=identity.user,
issuer=user,
)
time.sleep(1)
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.get(
f"/api/v1.0/teams/{team.id}/invitations/",
)
@@ -293,12 +286,12 @@ def test_api_team_invitations__retrieve__unrelated_user():
"""
Authenticated unrelated users should not be able to retrieve invitations.
"""
user = factories.IdentityFactory(user=factories.UserFactory()).user
user = factories.UserFactory()
invitation = factories.InvitationFactory()
client = APIClient()
client.force_login(user)
response = client.get(
f"/api/v1.0/teams/{invitation.team.id}/invitations/{invitation.id}/",
)
@@ -311,8 +304,7 @@ def test_api_team_invitations__retrieve__team_member():
Authenticated team members should be able to retrieve invitations
whatever their role in the team.
"""
user = factories.IdentityFactory(user=factories.UserFactory()).user
user = factories.UserFactory()
invitation = factories.InvitationFactory()
factories.TeamAccessFactory(team=invitation.team, user=user, role="member")
@@ -342,8 +334,7 @@ def test_api_team_invitations__update__forbidden(method):
"""
Update of invitations is currently forbidden.
"""
user = factories.IdentityFactory(user=factories.UserFactory()).user
user = factories.UserFactory()
invitation = factories.InvitationFactory()
factories.TeamAccessFactory(team=invitation.team, user=user, role="owner")
@@ -376,13 +367,12 @@ def test_api_team_invitations__delete__anonymous():
def test_api_team_invitations__delete__authenticated_outsider():
"""Members outside of team should not cancel invitations."""
identity = factories.IdentityFactory()
user = factories.UserFactory()
team = factories.TeamFactory()
invitation = factories.InvitationFactory(team=team)
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.delete(
f"/api/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
@@ -392,13 +382,12 @@ def test_api_team_invitations__delete__authenticated_outsider():
@pytest.mark.parametrize("role", ["owner", "administrator"])
def test_api_team_invitations__delete__privileged_members(role):
"""Privileged member should be able to cancel invitation."""
identity = factories.IdentityFactory()
team = factories.TeamFactory(users=[(identity.user, role)])
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, role)])
invitation = factories.InvitationFactory(team=team)
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.delete(
f"/api/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
@@ -407,13 +396,12 @@ def test_api_team_invitations__delete__privileged_members(role):
def test_api_team_invitations__delete__members():
"""Member should not be able to cancel invitation."""
identity = factories.IdentityFactory()
team = factories.TeamFactory(users=[(identity.user, "member")])
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "member")])
invitation = factories.InvitationFactory(team=team)
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.delete(
f"/api/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)

View File

@@ -35,10 +35,10 @@ def test_api_users_list_authenticated():
"""
Authenticated users should be able to list all users.
"""
identity = factories.IdentityFactory()
user = factories.UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
factories.UserFactory.create_batch(2)
response = client.get(
@@ -53,23 +53,15 @@ def test_api_users_authenticated_list_by_email():
Authenticated users should be able to search users with a case-insensitive and
partial query on the email.
"""
user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
user = factories.UserFactory(email="tester@ministry.fr", name="john doe")
dave = factories.UserFactory(email="david.bowman@work.com", name=None)
nicole = factories.UserFactory(email="nicole_foole@work.com", name=None)
frank = factories.UserFactory(email="frank_poole@work.com", name=None)
factories.UserFactory(email="heywood_floyd@work.com", name=None)
client = APIClient()
client.force_login(user)
dave = factories.IdentityFactory(
email="david.bowman@work.com", name=None, is_main=True
)
nicole = factories.IdentityFactory(
email="nicole_foole@work.com", name=None, is_main=True
)
frank = factories.IdentityFactory(
email="frank_poole@work.com", name=None, is_main=True
)
factories.IdentityFactory(email="heywood_floyd@work.com", name=None, is_main=True)
# Full query should work
response = client.get(
"/api/v1.0/users/?q=david.bowman@work.com",
@@ -77,14 +69,14 @@ def test_api_users_authenticated_list_by_email():
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids[0] == str(dave.user.id)
assert user_ids[0] == str(dave.id)
# Partial query should work
response = client.get("/api/v1.0/users/?q=fran")
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids[0] == str(frank.user.id)
assert user_ids[0] == str(frank.id)
# Result that matches a trigram twice ranks better than result that matches once
response = client.get("/api/v1.0/users/?q=ole")
@@ -92,7 +84,7 @@ def test_api_users_authenticated_list_by_email():
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
# "Nicole Foole" matches twice on "ole"
assert user_ids == [str(nicole.user.id), str(frank.user.id)]
assert user_ids == [str(nicole.id), str(frank.id)]
# Even with a low similarity threshold, query should match expected emails
response = client.get("/api/v1.0/users/?q=ool")
@@ -100,22 +92,22 @@ def test_api_users_authenticated_list_by_email():
assert response.status_code == HTTP_200_OK
assert response.json()["results"] == [
{
"id": str(nicole.user.id),
"id": str(nicole.id),
"email": nicole.email,
"name": nicole.name,
"is_device": nicole.user.is_device,
"is_staff": nicole.user.is_staff,
"language": nicole.user.language,
"timezone": str(nicole.user.timezone),
"is_device": nicole.is_device,
"is_staff": nicole.is_staff,
"language": nicole.language,
"timezone": str(nicole.timezone),
},
{
"id": str(frank.user.id),
"id": str(frank.id),
"email": frank.email,
"name": frank.name,
"is_device": frank.user.is_device,
"is_staff": frank.user.is_staff,
"language": frank.user.language,
"timezone": str(frank.user.timezone),
"is_device": frank.is_device,
"is_staff": frank.is_staff,
"language": frank.language,
"timezone": str(frank.timezone),
},
]
@@ -125,17 +117,15 @@ def test_api_users_authenticated_list_by_name():
Authenticated users should be able to search users with a case-insensitive and
partial query on the name.
"""
user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
user = factories.UserFactory(email="tester@ministry.fr", name="john doe")
dave = factories.UserFactory(name="dave bowman", email=None)
nicole = factories.UserFactory(name="nicole foole", email=None)
frank = factories.UserFactory(name="frank poole", email=None)
factories.UserFactory(name="heywood floyd", email=None)
client = APIClient()
client.force_login(user)
dave = factories.IdentityFactory(name="dave bowman", email=None, is_main=True)
nicole = factories.IdentityFactory(name="nicole foole", email=None, is_main=True)
frank = factories.IdentityFactory(name="frank poole", email=None, is_main=True)
factories.IdentityFactory(name="heywood floyd", email=None, is_main=True)
# Full query should work
response = client.get(
"/api/v1.0/users/?q=david.bowman@work.com",
@@ -143,14 +133,14 @@ def test_api_users_authenticated_list_by_name():
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids[0] == str(dave.user.id)
assert user_ids[0] == str(dave.id)
# Partial query should work
response = client.get("/api/v1.0/users/?q=fran")
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids[0] == str(frank.user.id)
assert user_ids[0] == str(frank.id)
# Result that matches a trigram twice ranks better than result that matches once
response = client.get("/api/v1.0/users/?q=ole")
@@ -158,7 +148,7 @@ def test_api_users_authenticated_list_by_name():
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
# "Nicole Foole" matches twice on "ole"
assert user_ids == [str(nicole.user.id), str(frank.user.id)]
assert user_ids == [str(nicole.id), str(frank.id)]
# Even with a low similarity threshold, query should match expected user
response = client.get("/api/v1.0/users/?q=ool")
@@ -166,22 +156,22 @@ def test_api_users_authenticated_list_by_name():
assert response.status_code == HTTP_200_OK
assert response.json()["results"] == [
{
"id": str(nicole.user.id),
"id": str(nicole.id),
"email": nicole.email,
"name": nicole.name,
"is_device": nicole.user.is_device,
"is_staff": nicole.user.is_staff,
"language": nicole.user.language,
"timezone": str(nicole.user.timezone),
"is_device": nicole.is_device,
"is_staff": nicole.is_staff,
"language": nicole.language,
"timezone": str(nicole.timezone),
},
{
"id": str(frank.user.id),
"id": str(frank.id),
"email": frank.email,
"name": frank.name,
"is_device": frank.user.is_device,
"is_staff": frank.user.is_staff,
"language": frank.user.language,
"timezone": str(frank.user.timezone),
"is_device": frank.is_device,
"is_staff": frank.is_staff,
"language": frank.language,
"timezone": str(frank.timezone),
},
]
@@ -192,20 +182,14 @@ def test_api_users_authenticated_list_by_name_and_email():
partial query on the name and email.
"""
user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
user = factories.UserFactory(email="tester@ministry.fr", name="john doe")
nicole = factories.UserFactory(email="nicole_foole@work.com", name="nicole foole")
frank = factories.UserFactory(email="oleg_poole@work.com", name=None)
david = factories.UserFactory(email=None, name="david role")
client = APIClient()
client.force_login(user)
nicole = factories.IdentityFactory(
email="nicole_foole@work.com", name="nicole foole", is_main=True
)
frank = factories.IdentityFactory(
email="oleg_poole@work.com", name=None, is_main=True
)
david = factories.IdentityFactory(email=None, name="david role", is_main=True)
# Result that matches a trigram in name and email ranks better than result that matches once
response = client.get("/api/v1.0/users/?q=ole")
@@ -215,7 +199,7 @@ def test_api_users_authenticated_list_by_name_and_email():
# "Nicole Foole" matches twice on "ole" in her name and twice on "ole" in her email
# "Oleg poole" matches twice on "ole" in her email
# "David role" matches once on "ole" in his name
assert user_ids == [str(nicole.user.id), str(frank.user.id), str(david.user.id)]
assert user_ids == [str(nicole.id), str(frank.id), str(david.id)]
def test_api_users_authenticated_list_exclude_users_already_in_team(
@@ -225,27 +209,26 @@ def test_api_users_authenticated_list_exclude_users_already_in_team(
Authenticated users should be able to search users
but the result should exclude all users already in the given team.
"""
user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.email, name="john doe")
user = factories.UserFactory(email="tester@ministry.fr", name="john doe")
dave = factories.UserFactory(name="dave bowman", email=None)
nicole = factories.UserFactory(name="nicole foole", email=None)
frank = factories.UserFactory(name="frank poole", email=None)
mary = factories.UserFactory(name="mary poole", email=None)
factories.UserFactory(name="heywood floyd", email=None)
factories.UserFactory(name="Andrei Smyslov", email=None)
factories.TeamFactory.create_batch(10)
client = APIClient()
client.force_login(user)
dave = factories.IdentityFactory(name="dave bowman", email=None, is_main=True)
nicole = factories.IdentityFactory(name="nicole foole", email=None, is_main=True)
frank = factories.IdentityFactory(name="frank poole", email=None, is_main=True)
mary = factories.IdentityFactory(name="mary poole", email=None, is_main=True)
factories.IdentityFactory(name="heywood floyd", email=None, is_main=True)
factories.IdentityFactory(name="Andrei Smyslov", email=None, is_main=True)
factories.TeamFactory.create_batch(10)
# Add Dave and Frank in the same team
team = factories.TeamFactory(
users=[
(dave.user, models.RoleChoices.MEMBER),
(frank.user, models.RoleChoices.MEMBER),
(dave, models.RoleChoices.MEMBER),
(frank, models.RoleChoices.MEMBER),
]
)
factories.TeamFactory(users=[(nicole.user, models.RoleChoices.MEMBER)])
factories.TeamFactory(users=[(nicole, models.RoleChoices.MEMBER)])
# Search user to add him/her to a team, we give a team id to the request
# to exclude all users already in the team
@@ -266,118 +249,25 @@ def test_api_users_authenticated_list_exclude_users_already_in_team(
# - user authenticated
# - search user query
# - User
# - Identity
with django_assert_num_queries(4):
with django_assert_num_queries(3):
response = client.get(
f"/api/v1.0/users/?q=ool&team_id={team.id}",
)
assert response.status_code == HTTP_200_OK
user_ids = sorted([user["id"] for user in response.json()["results"]])
assert user_ids == sorted([str(mary.user.id), str(nicole.user.id)])
def test_api_users_authenticated_list_multiple_identities_single_user():
"""
User with multiple identities should appear only once in results.
"""
user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.admin_email, name="eva karl")
client = APIClient()
client.force_login(user)
dave = factories.UserFactory()
factories.IdentityFactory(
user=dave, email="dave.bowman@work.com", name="dave bowman"
)
factories.IdentityFactory(user=dave, email="dave.bowman@fun.fr", name="dave bowman")
# Full query should work
response = client.get(
"/api/v1.0/users/?q=david.bowman@work.com",
)
assert response.status_code == HTTP_200_OK
# A single user is returned, despite similarity matching both emails
assert response.json()["count"] == 1
assert response.json()["results"][0]["id"] == str(dave.id)
def test_api_users_authenticated_list_multiple_identities_multiple_users():
"""
User with multiple identities should be ranked
on their best matching identity.
"""
user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
client = APIClient()
client.force_login(user)
dave = factories.UserFactory()
dave_identity = factories.IdentityFactory(
user=dave, email="dave.bowman@work.com", is_main=True, name="dave bowman"
)
factories.IdentityFactory(user=dave, email="babibou@ehehe.com", name="babihou")
davina_identity = factories.IdentityFactory(
user=factories.UserFactory(), email="davina.bowan@work.com", name="davina"
)
prue_identity = factories.IdentityFactory(
user=factories.UserFactory(),
email="prudence.crandall@work.com",
name="prudence",
)
# Full query should work
response = client.get(
"/api/v1.0/users/?q=david.bowman@work.com",
)
assert response.status_code == HTTP_200_OK
assert response.json()["count"] == 3
assert response.json()["results"] == [
{
"id": str(dave.id),
"email": dave_identity.email,
"name": dave_identity.name,
"is_device": dave_identity.user.is_device,
"is_staff": dave_identity.user.is_staff,
"language": dave_identity.user.language,
"timezone": str(dave_identity.user.timezone),
},
{
"id": str(davina_identity.user.id),
"email": davina_identity.email,
"name": davina_identity.name,
"is_device": davina_identity.user.is_device,
"is_staff": davina_identity.user.is_staff,
"language": davina_identity.user.language,
"timezone": str(davina_identity.user.timezone),
},
{
"id": str(prue_identity.user.id),
"email": prue_identity.email,
"name": prue_identity.name,
"is_device": prue_identity.user.is_device,
"is_staff": prue_identity.user.is_staff,
"language": prue_identity.user.language,
"timezone": str(prue_identity.user.timezone),
},
]
assert user_ids == sorted([str(mary.id), str(nicole.id)])
def test_api_users_authenticated_list_uppercase_content():
"""Upper case content should be found by lower case query."""
user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.admin_email, name="eva karl")
user = factories.UserFactory(email="tester@ministry.fr", name="eva karl")
dave = factories.UserFactory(
email="DAVID.BOWMAN@INTENSEWORK.COM", name="DAVID BOWMAN"
)
client = APIClient()
client.force_login(user)
dave = factories.IdentityFactory(
email="DAVID.BOWMAN@INTENSEWORK.COM", name="DAVID BOWMAN"
)
# Unaccented full address
response = client.get(
"/api/v1.0/users/?q=david.bowman@intensework.com",
@@ -385,7 +275,7 @@ def test_api_users_authenticated_list_uppercase_content():
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids == [str(dave.user.id)]
assert user_ids == [str(dave.id)]
# Partial query
response = client.get(
@@ -394,19 +284,17 @@ def test_api_users_authenticated_list_uppercase_content():
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids == [str(dave.user.id)]
assert user_ids == [str(dave.id)]
def test_api_users_list_authenticated_capital_query():
"""Upper case query should find lower case content."""
user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.admin_email, name="eva karl")
user = factories.UserFactory(email="tester@ministry.fr", name="eva karl")
dave = factories.UserFactory(email="david.bowman@work.com", name="david bowman")
client = APIClient()
client.force_login(user)
dave = factories.IdentityFactory(email="david.bowman@work.com", name="david bowman")
# Full uppercase query
response = client.get(
"/api/v1.0/users/?q=DAVID.BOWMAN@WORK.COM",
@@ -414,7 +302,7 @@ def test_api_users_list_authenticated_capital_query():
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids == [str(dave.user.id)]
assert user_ids == [str(dave.id)]
# Partial uppercase email
response = client.get(
@@ -423,21 +311,17 @@ def test_api_users_list_authenticated_capital_query():
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids == [str(dave.user.id)]
assert user_ids == [str(dave.id)]
def test_api_contacts_list_authenticated_accented_query():
"""Accented content should be found by unaccented query."""
user = factories.UserFactory(admin_email="tester@ministry.fr")
factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
user = factories.UserFactory(email="tester@ministry.fr", name="john doe")
helene = factories.UserFactory(email="helene.bowman@work.com", name="helene bowman")
client = APIClient()
client.force_login(user)
helene = factories.IdentityFactory(
email="helene.bowman@work.com", name="helene bowman"
)
# Accented full query
response = client.get(
"/api/v1.0/users/?q=hélène.bowman@work.com",
@@ -445,7 +329,7 @@ def test_api_contacts_list_authenticated_accented_query():
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids == [str(helene.user.id)]
assert user_ids == [str(helene.id)]
# Unaccented partial email
response = client.get(
@@ -454,7 +338,7 @@ def test_api_contacts_list_authenticated_accented_query():
assert response.status_code == HTTP_200_OK
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids == [str(helene.user.id)]
assert user_ids == [str(helene.id)]
@mock.patch.object(Pagination, "get_page_size", return_value=3)
@@ -462,8 +346,7 @@ def test_api_users_list_pagination(
_mock_page_size,
):
"""Pagination should work as expected."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -503,14 +386,13 @@ def test_api_users_list_pagination_page_size(
page_size,
):
"""Page's size on pagination should work as expected."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
for i in range(page_size):
factories.UserFactory.create(admin_email=f"user-{i}@people.com")
factories.UserFactory.create(email=f"user-{i}@people.com")
response = client.get(
f"/api/v1.0/users/?page_size={page_size}",
@@ -528,14 +410,13 @@ def test_api_users_list_pagination_wrong_page_size(
page_size,
):
"""Page's size on pagination should be limited to "max_page_size"."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
for i in range(page_size):
factories.UserFactory.create(admin_email=f"user-{i}@people.com")
factories.UserFactory.create(email=f"user-{i}@people.com")
response = client.get(
f"/api/v1.0/users/?page_size={page_size}",
@@ -563,8 +444,7 @@ def test_api_users_retrieve_me_anonymous():
def test_api_users_retrieve_me_authenticated():
"""Authenticated users should be able to retrieve their own user via the "/users/me" path."""
identity = factories.IdentityFactory(is_main=True)
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -582,8 +462,8 @@ def test_api_users_retrieve_me_authenticated():
assert response.status_code == HTTP_200_OK
assert response.json() == {
"id": str(user.id),
"name": str(identity.name),
"email": str(identity.email),
"name": str(user.name),
"email": str(user.email),
"language": user.language,
"timezone": str(user.timezone),
"is_device": False,
@@ -608,8 +488,7 @@ def test_api_users_retrieve_authenticated_self():
Authenticated users should be allowed to retrieve their own user.
The returned object should not contain the password.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -626,12 +505,10 @@ def test_api_users_retrieve_authenticated_other():
Authenticated users should be able to retrieve another user's detail view with
limited information.
"""
identity = factories.IdentityFactory()
user, other_user = factories.UserFactory.create_batch(2)
client = APIClient()
client.force_login(identity.user)
other_user = factories.UserFactory()
client.force_login(user)
response = client.get(
f"/api/v1.0/users/{other_user.id!s}/",
@@ -658,8 +535,7 @@ def test_api_users_create_anonymous():
def test_api_users_create_authenticated():
"""Authenticated users should not be able to create users via the API."""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -706,8 +582,7 @@ def test_api_users_update_authenticated_self():
Authenticated users should be able to update their own user but only "language"
and "timezone" fields.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -735,10 +610,10 @@ def test_api_users_update_authenticated_self():
def test_api_users_update_authenticated_other():
"""Authenticated users should not be allowed to update other users."""
identity = factories.IdentityFactory()
user = factories.UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
user = factories.UserFactory()
old_user_values = dict(serializers.UserSerializer(instance=user).data)
@@ -788,8 +663,7 @@ def test_api_users_patch_authenticated_self():
Authenticated users should be able to patch their own user but only "language"
and "timezone" fields.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
@@ -818,27 +692,27 @@ def test_api_users_patch_authenticated_self():
def test_api_users_patch_authenticated_other():
"""Authenticated users should not be allowed to patch other users."""
identity = factories.IdentityFactory()
user = factories.UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
user = factories.UserFactory()
old_user_values = dict(serializers.UserSerializer(instance=user).data)
other_user = factories.UserFactory()
old_user_values = dict(serializers.UserSerializer(instance=other_user).data)
new_user_values = dict(
serializers.UserSerializer(instance=factories.UserFactory()).data
)
for key, new_value in new_user_values.items():
response = client.put(
f"/api/v1.0/users/{user.id!s}/",
f"/api/v1.0/users/{other_user.id!s}/",
{key: new_value},
format="json",
)
assert response.status_code == HTTP_403_FORBIDDEN
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
other_user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=other_user).data)
for key, value in user_values.items():
assert value == old_user_values[key]
@@ -856,11 +730,11 @@ def test_api_users_delete_list_anonymous():
def test_api_users_delete_list_authenticated():
"""Authenticated users should not be allowed to delete a list of users."""
user = factories.UserFactory()
factories.UserFactory.create_batch(2)
identity = factories.IdentityFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.delete(
"/api/v1.0/users/",
@@ -884,11 +758,10 @@ def test_api_users_delete_authenticated():
"""
Authenticated users should not be allowed to delete a user other than themselves.
"""
identity = factories.IdentityFactory()
other_user = factories.UserFactory()
user, other_user = factories.UserFactory.create_batch(2)
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.delete(f"/api/v1.0/users/{other_user.id!s}/")
@@ -898,13 +771,13 @@ def test_api_users_delete_authenticated():
def test_api_users_delete_self():
"""Authenticated users should not be able to delete their own user."""
identity = factories.IdentityFactory()
user = factories.UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.delete(
f"/api/v1.0/users/{identity.user.id!s}/",
f"/api/v1.0/users/{user.id!s}/",
)
assert response.status_code == HTTP_405_METHOD_NOT_ALLOWED

View File

@@ -1,183 +0,0 @@
"""
Unit tests for the Identity model
"""
from django.core.exceptions import ValidationError
import pytest
from core import factories, models
pytestmark = pytest.mark.django_db
def test_models_identities_str_main():
"""The str representation should be the email address with indication that it is main."""
identity = factories.IdentityFactory(email="david@example.com")
assert str(identity) == "david@example.com[main]"
def test_models_identities_str_secondary():
"""The str representation of a secondary email should be the email address."""
main_identity = factories.IdentityFactory()
secondary_identity = factories.IdentityFactory(
user=main_identity.user, email="david@example.com"
)
assert str(secondary_identity) == "david@example.com"
def test_models_identities_is_main_automatic():
"""The first identity created for a user should automatically be set as main."""
user = factories.UserFactory()
identity = models.Identity.objects.create(
user=user, sub="123", email="david@example.com"
)
assert identity.is_main is True
def test_models_identities_is_main_exists():
"""A user should always keep one and only one of its identities as main."""
user = factories.UserFactory()
main_identity, _secondary_identity = factories.IdentityFactory.create_batch(
2, user=user
)
assert main_identity.is_main is True
main_identity.is_main = False
with pytest.raises(
ValidationError, match="A user should have one and only one main identity."
):
main_identity.save()
def test_models_identities_is_main_switch():
"""Setting a secondary identity as main should reset the existing main identity."""
user = factories.UserFactory()
first_identity, second_identity = factories.IdentityFactory.create_batch(
2, user=user
)
assert first_identity.is_main is True
second_identity.is_main = True
second_identity.save()
second_identity.refresh_from_db()
assert second_identity.is_main is True
first_identity.refresh_from_db()
assert first_identity.is_main is False
def test_models_identities_email_not_required():
"""The 'email' field can be blank."""
user = factories.UserFactory()
models.Identity.objects.create(user=user, sub="123", email=None)
def test_models_identities_user_required():
"""The 'user' field is required."""
with pytest.raises(models.User.DoesNotExist, match="Identity has no user."):
models.Identity.objects.create(user=None, email="david@example.com")
def test_models_identities_email_unique_same_user():
"""The 'email' field should be unique for a given user."""
email = factories.IdentityFactory()
with pytest.raises(
ValidationError,
match="Identity with this User and Email address already exists.",
):
factories.IdentityFactory(user=email.user, email=email.email)
def test_models_identities_email_unique_different_users():
"""The 'email' field should not be unique among users."""
email = factories.IdentityFactory()
factories.IdentityFactory(email=email.email)
def test_models_identities_email_normalization():
"""The 'email' field should be automatically normalized upon saving."""
email = factories.IdentityFactory()
email.email = "Thomas.Jefferson@Example.com"
email.save()
assert email.email == "Thomas.Jefferson@example.com"
def test_models_identities_ordering():
"""Identities should be returned ordered by main status then by their email address."""
user = factories.UserFactory()
factories.IdentityFactory.create_batch(5, user=user)
emails = models.Identity.objects.all()
assert emails[0].is_main is True
for i in range(3):
assert emails[i + 1].is_main is False
assert emails[i + 2].email >= emails[i + 1].email
def test_models_identities_sub_null():
"""The 'sub' field should not be null."""
user = factories.UserFactory()
with pytest.raises(ValidationError, match="This field cannot be null."):
models.Identity.objects.create(user=user, sub=None)
def test_models_identities_sub_blank():
"""The 'sub' field should not be blank."""
user = factories.UserFactory()
with pytest.raises(ValidationError, match="This field cannot be blank."):
models.Identity.objects.create(user=user, email="david@example.com", sub="")
def test_models_identities_sub_unique():
"""The 'sub' field should be unique."""
user = factories.UserFactory()
identity = factories.IdentityFactory()
with pytest.raises(ValidationError, match="Identity with this Sub already exists."):
models.Identity.objects.create(user=user, sub=identity.sub)
def test_models_identities_sub_max_length():
"""The 'sub' field should be 255 characters maximum."""
factories.IdentityFactory(sub="a" * 255)
with pytest.raises(ValidationError) as excinfo:
factories.IdentityFactory(sub="a" * 256)
assert (
str(excinfo.value)
== "{'sub': ['Ensure this value has at most 255 characters (it has 256).']}"
)
def test_models_identities_sub_special_characters():
"""The 'sub' field should accept periods, dashes, +, @ and underscores."""
identity = factories.IdentityFactory(sub="dave.bowman-1+2@hal_9000")
assert identity.sub == "dave.bowman-1+2@hal_9000"
def test_models_identities_sub_spaces():
"""The 'sub' field should not accept spaces."""
with pytest.raises(ValidationError) as excinfo:
factories.IdentityFactory(sub="a b")
assert str(excinfo.value) == (
"{'sub': ['Enter a valid sub. This value may contain only letters, numbers, "
"and @/./+/-/_ characters.']}"
)
def test_models_identities_sub_upper_case():
"""The 'sub' field should accept upper case characters."""
identity = factories.IdentityFactory(sub="John")
assert identity.sub == "John"
def test_models_identities_sub_ascii():
"""The 'sub' field should accept non ASCII letters."""
identity = factories.IdentityFactory(sub="rené")
assert identity.sub == "rené"

View File

@@ -84,7 +84,7 @@ def test_models_invitations__is_expired(settings):
def test_models_invitation__new_user__convert_invitations_to_accesses():
"""
Upon creating a new identity, invitations linked to that email
Upon creating a new user, invitations linked to that email
should be converted to accesses and then deleted.
"""
# Two invitations to the same mail but to different teams
@@ -95,16 +95,14 @@ def test_models_invitation__new_user__convert_invitations_to_accesses():
team=invitation_to_team2.team
) # another person invited to team2
new_identity = factories.IdentityFactory(
is_main=True, email=invitation_to_team1.email
)
new_user = factories.UserFactory(email=invitation_to_team1.email)
# The invitation regarding
assert models.TeamAccess.objects.filter(
team=invitation_to_team1.team, user=new_identity.user
team=invitation_to_team1.team, user=new_user
).exists()
assert models.TeamAccess.objects.filter(
team=invitation_to_team2.team, user=new_identity.user
team=invitation_to_team2.team, user=new_user
).exists()
assert not models.Invitation.objects.filter(
team=invitation_to_team1.team, email=invitation_to_team1.email
@@ -119,7 +117,7 @@ def test_models_invitation__new_user__convert_invitations_to_accesses():
def test_models_invitation__new_user__filter_expired_invitations():
"""
Upon creating a new identity, valid invitations should be converted into accesses
Upon creating a new user, valid invitations should be converted into accesses
and expired invitations should remain unchanged.
"""
with freeze_time("2020-01-01"):
@@ -127,11 +125,11 @@ def test_models_invitation__new_user__filter_expired_invitations():
user_email = expired_invitation.email
valid_invitation = factories.InvitationFactory(email=user_email)
new_identity = factories.IdentityFactory(is_main=True, email=user_email)
new_user = factories.UserFactory(email=user_email)
# valid invitation should have granted access to the related team
assert models.TeamAccess.objects.filter(
team=valid_invitation.team, user=new_identity.user
team=valid_invitation.team, user=new_user
).exists()
assert not models.Invitation.objects.filter(
team=valid_invitation.team, email=user_email
@@ -139,14 +137,14 @@ def test_models_invitation__new_user__filter_expired_invitations():
# expired invitation should not have been consumed
assert not models.TeamAccess.objects.filter(
team=expired_invitation.team, user=new_identity.user
team=expired_invitation.team, user=new_user
).exists()
assert models.Invitation.objects.filter(
team=expired_invitation.team, email=user_email
).exists()
@pytest.mark.parametrize("num_invitations, num_queries", [(0, 8), (1, 11), (20, 11)])
@pytest.mark.parametrize("num_invitations, num_queries", [(0, 4), (1, 7), (20, 7)])
def test_models_invitation__new_user__user_creation_constant_num_queries(
django_assert_num_queries, num_invitations, num_queries
):
@@ -160,15 +158,14 @@ def test_models_invitation__new_user__user_creation_constant_num_queries(
for _ in range(0, num_invitations):
factories.InvitationFactory(email=user_email, team=factories.TeamFactory())
user = factories.UserFactory()
factories.UserFactory()
# with no invitation, we skip an "if", resulting in 8 requests
# otherwise, we should have 11 queries with any number of invitations
with django_assert_num_queries(num_queries):
models.Identity.objects.create(
is_main=True,
models.User.objects.create(
email=user_email,
user=user,
password="!",
name="Prudence C.",
sub=uuid.uuid4(),
)

View File

@@ -77,7 +77,7 @@ def test_models_team_accesses_create_webhook():
"value": [
{
"value": str(user.id),
"email": None,
"email": user.email,
"type": "User",
}
],
@@ -120,7 +120,7 @@ def test_models_team_accesses_delete_webhook():
"value": [
{
"value": str(access.user.id),
"email": None,
"email": access.user.email,
"type": "User",
}
],

View File

@@ -30,21 +30,99 @@ def test_models_users_id_unique():
factories.UserFactory(id=user.id)
def test_models_users_email_unique():
"""The "admin_email" field should be unique except for the null value."""
def test_models_users_sub_null():
"""The 'sub' field should not be null."""
with pytest.raises(ValidationError, match="This field cannot be null.") as excinfo:
models.User.objects.create(sub=None, password="password")
assert str(excinfo.value) == "{'sub': ['This field cannot be null.']}"
def test_models_users_sub_blank():
"""The 'sub' field should not be blank."""
with pytest.raises(ValidationError, match="This field cannot be blank.") as excinfo:
models.User.objects.create(sub="", password="password")
assert str(excinfo.value) == "{'sub': ['This field cannot be blank.']}"
def test_models_users_sub_unique():
"""The 'sub' field should be unique."""
user = factories.UserFactory()
with pytest.raises(
ValidationError, match="User with this Admin email address already exists."
):
models.User.objects.create(admin_email=user.admin_email, password="password")
with pytest.raises(ValidationError, match="User with this Sub already exists."):
models.User.objects.create(sub=user.sub, password="password")
def test_models_users_sub_max_length():
"""The 'sub' field should be 255 characters maximum."""
factories.UserFactory(sub="a" * 255)
with pytest.raises(ValidationError) as excinfo:
factories.UserFactory(sub="a" * 256)
assert (
str(excinfo.value)
== "{'sub': ['Ensure this value has at most 255 characters (it has 256).']}"
)
def test_models_users_sub_special_characters():
"""The 'sub' field should accept periods, dashes, +, @ and underscores."""
user = factories.UserFactory(sub="dave.bowman-1+2@hal_9000")
assert user.sub == "dave.bowman-1+2@hal_9000"
def test_models_users_sub_spaces():
"""The 'sub' field should not accept spaces."""
with pytest.raises(ValidationError) as excinfo:
factories.UserFactory(sub="a b")
assert str(excinfo.value) == (
"{'sub': ['Enter a valid sub. This value may contain only letters, numbers, "
"and @/./+/-/_ characters.']}"
)
def test_models_users_sub_upper_case():
"""The 'sub' field should accept upper case characters."""
user = factories.UserFactory(sub="John")
assert user.sub == "John"
def test_models_users_sub_ascii():
"""The 'sub' field should accept non ASCII letters."""
user = factories.UserFactory(sub="rené")
assert user.sub == "rené"
def test_models_users_email_not_required():
"""The 'email' field can be blank."""
user = factories.UserFactory(email=None)
assert user.email is None
def test_models_users_email_normalization():
"""The 'email' field should be automatically normalized upon saving."""
user = factories.UserFactory()
user.email = "Thomas.Jefferson@Example.com"
user.save()
assert user.email == "Thomas.Jefferson@example.com"
def test_models_users_email_several_null():
"""Several users with a null value for the "email" field can co-exist."""
factories.UserFactory(admin_email=None)
models.User.objects.create(admin_email=None, password="foo.")
factories.UserFactory(email=None)
models.User.objects.create(email=None, sub="123", password="foo.")
assert models.User.objects.filter(admin_email__isnull=True).count() == 2
assert models.User.objects.filter(email__isnull=True).count() == 2
def test_models_users_email_not_unique():
"""The 'email' field should not necessarily be unique among users."""
user = factories.UserFactory()
other_user = factories.UserFactory(email=user.email)
assert user.email == other_user.email
def test_models_users_profile_not_owned():
@@ -78,10 +156,9 @@ def test_models_users_profile_owned_by_other():
def test_models_users_send_mail_main_existing():
"""The 'email_user' method should send mail to the user's main email address."""
main_email = factories.IdentityFactory(email="dave@example.com")
user = main_email.user
factories.IdentityFactory.create_batch(2, user=user)
"""The 'email_user' method should send mail to the user's email address."""
user = factories.UserFactory(email="dave@example.com")
factories.UserFactory.create_batch(2)
with mock.patch("django.core.mail.send_mail") as mock_send:
user.email_user("my subject", "my message")
@@ -93,22 +170,19 @@ def test_models_users_send_mail_main_existing():
def test_models_users_send_mail_main_admin():
"""
The 'email_user' method should send mail to the user's admin email address if the
user has no related identities.
The 'email_user' method should send mail to the user's email address.
"""
user = factories.UserFactory()
with mock.patch("django.core.mail.send_mail") as mock_send:
user.email_user("my subject", "my message")
mock_send.assert_called_once_with(
"my subject", "my message", None, [user.admin_email]
)
mock_send.assert_called_once_with("my subject", "my message", None, [user.email])
def test_models_users_send_mail_main_missing():
"""The 'email_user' method should fail if the user has no email address."""
user = factories.UserFactory(admin_email=None)
user = factories.UserFactory(email=None)
with pytest.raises(ValueError) as excinfo:
user.email_user("my subject", "my message")

View File

@@ -16,8 +16,7 @@ def test_throttle():
"""
Throttle protection should block requests if too many.
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
client = APIClient()
client.force_login(user)

View File

@@ -28,8 +28,8 @@ def test_utils_webhooks_add_user_to_group_no_webhooks():
@mock.patch.object(Logger, "info")
def test_utils_webhooks_add_user_to_group_success(mock_info):
"""The user passed to the function should get added."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
user = factories.UserFactory()
access = factories.TeamAccessFactory(user=user)
webhooks = factories.TeamWebhookFactory.create_batch(2, team=access.team)
with responses.RequestsMock() as rsps:
@@ -64,7 +64,7 @@ def test_utils_webhooks_add_user_to_group_success(mock_info):
"value": [
{
"value": str(access.user.id),
"email": identity.email,
"email": user.email,
"type": "User",
}
],
@@ -90,8 +90,8 @@ def test_utils_webhooks_add_user_to_group_success(mock_info):
@mock.patch.object(Logger, "info")
def test_utils_webhooks_remove_user_from_group_success(mock_info):
"""The user passed to the function should get removed."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
user = factories.UserFactory()
access = factories.TeamAccessFactory(user=user)
webhooks = factories.TeamWebhookFactory.create_batch(2, team=access.team)
with responses.RequestsMock() as rsps:
@@ -121,7 +121,7 @@ def test_utils_webhooks_remove_user_from_group_success(mock_info):
"value": [
{
"value": str(access.user.id),
"email": identity.email,
"email": user.email,
"type": "User",
}
],
@@ -148,8 +148,8 @@ def test_utils_webhooks_remove_user_from_group_success(mock_info):
@mock.patch.object(Logger, "info")
def test_utils_webhooks_add_user_to_group_failure(mock_info, mock_error):
"""The logger should be called on webhook call failure."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
user = factories.UserFactory()
access = factories.TeamAccessFactory(user=user)
webhooks = factories.TeamWebhookFactory.create_batch(2, team=access.team)
with responses.RequestsMock() as rsps:
@@ -179,7 +179,7 @@ def test_utils_webhooks_add_user_to_group_failure(mock_info, mock_error):
"value": [
{
"value": str(access.user.id),
"email": identity.email,
"email": user.email,
"type": "User",
}
],
@@ -207,8 +207,8 @@ def test_utils_webhooks_add_user_to_group_failure(mock_info, mock_error):
@mock.patch.object(Logger, "info")
def test_utils_webhooks_add_user_to_group_retries(mock_info, mock_error):
"""webhooks synchronization supports retries."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
user = factories.UserFactory()
access = factories.TeamAccessFactory(user=user)
webhook = factories.TeamWebhookFactory(team=access.team)
url = re.compile(r".*/Groups/.*")
@@ -236,7 +236,7 @@ def test_utils_webhooks_add_user_to_group_retries(mock_info, mock_error):
"value": [
{
"value": str(access.user.id),
"email": identity.email,
"email": user.email,
"type": "User",
}
],
@@ -262,8 +262,8 @@ def test_utils_webhooks_add_user_to_group_retries(mock_info, mock_error):
@mock.patch.object(Logger, "info")
def test_utils_synchronize_course_runs_max_retries_exceeded(mock_info, mock_error):
"""Webhooks synchronization has exceeded max retries and should get logged."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
user = factories.UserFactory()
access = factories.TeamAccessFactory(user=user)
webhook = factories.TeamWebhookFactory(team=access.team)
with responses.RequestsMock() as rsps:
@@ -290,7 +290,7 @@ def test_utils_synchronize_course_runs_max_retries_exceeded(mock_info, mock_erro
"value": [
{
"value": str(access.user.id),
"email": identity.email,
"email": user.email,
"type": "User",
}
],
@@ -314,8 +314,8 @@ def test_utils_synchronize_course_runs_max_retries_exceeded(mock_info, mock_erro
def test_utils_webhooks_add_user_to_group_authorization():
"""Secret token should be passed in authorization header when set."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
user = factories.UserFactory()
access = factories.TeamAccessFactory(user=user)
webhook = factories.TeamWebhookFactory(team=access.team, secret="123")
with responses.RequestsMock() as rsps:

View File

@@ -115,7 +115,9 @@ def create_demo(stdout):
for i in range(defaults.NB_OBJECTS["users"]):
queue.push(
models.User(
admin_email=f"user{i:d}@example.com",
sub=uuid4(),
email=f"user{i:d}@example.com" if random.random() < 0.97 else None,
name=fake.name() if random.random() < 0.97 else None,
password="!",
is_superuser=False,
is_active=True,
@@ -125,27 +127,6 @@ def create_demo(stdout):
)
queue.flush()
with Timeit(stdout, "Creating identities"):
users_values = list(models.User.objects.values("id", "admin_email"))
for user_dict in users_values:
for i in range(
random.choices(range(5), weights=[5, 50, 30, 10, 5], k=1)[0]
):
user_email = user_dict["admin_email"]
queue.push(
models.Identity(
user_id=user_dict["id"],
sub=uuid4(),
is_main=(i == 0),
# Leave 3% of emails and names empty
email=f"identity{i:d}{user_email:s}"
if random.random() < 0.97
else None,
name=fake.name() if random.random() < 0.97 else None,
)
)
queue.flush()
with Timeit(stdout, "Creating teams"):
for i in range(defaults.NB_OBJECTS["teams"]):
queue.push(

View File

@@ -10,15 +10,17 @@ User = get_user_model()
class Command(BaseCommand):
"""Management command to create superusers from an email and a password"""
"""
Management command to create superusers from a username and a password for demo purposes.
"""
help = "Create a superuser with an email and a password"
help = "Create a superuser with a username and a password for demo purposes"
def add_arguments(self, parser):
"""Define required arguments "email" and "password"."""
"""Define required arguments "username" and "password"."""
parser.add_argument(
"--admin_email",
help=("Email for the user."),
"--username",
help=("Username for the user."),
)
parser.add_argument(
"--password",
@@ -30,11 +32,11 @@ class Command(BaseCommand):
Given an email and a password, create a superuser or upgrade the existing
user to superuser status.
"""
email = options.get("admin_email")
username = options.get("username")
try:
user = User.objects.get(admin_email=email)
user = User.objects.get(sub=username)
except User.DoesNotExist:
user = User(admin_email=email)
user = User(sub=username)
message = "Superuser created successfully."
else:
if user.is_superuser and user.is_staff:

View File

@@ -28,7 +28,6 @@ def test_commands_create_demo():
call_command("create_demo")
assert models.User.objects.count() == 5
assert models.Identity.objects.exists()
assert models.Team.objects.count() == 3
assert models.TeamAccess.objects.count() >= 3
@@ -39,6 +38,8 @@ def test_commands_createsuperuser():
with superuser permissions.
"""
call_command("createsuperuser")
call_command("createsuperuser", username="admin", password="admin")
assert models.User.objects.count() == 1
user = models.User.objects.get()
assert user.sub == "admin"

View File

@@ -31,10 +31,10 @@ def test_api_mail_domains__create_name_unique():
Creating domain should raise an error if already existing name.
"""
factories.MailDomainFactory(name="existing_domain.com")
identity = core_factories.IdentityFactory()
user = core_factories.UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
"/api/v1.0/mail-domains/",
@@ -53,11 +53,10 @@ def test_api_mail_domains__create_authenticated():
and should automatically be added as owner of the newly created domain.
"""
identity = core_factories.IdentityFactory()
user = identity.user
user = core_factories.UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.post(
"/api/v1.0/mail-domains/",

View File

@@ -30,11 +30,11 @@ def test_api_mail_domains__delete_authenticated_unrelated():
Authenticated users should not be allowed to delete a domain to which they are not
related.
"""
identity = core_factories.IdentityFactory()
user = core_factories.UserFactory()
domain = factories.MailDomainFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
response = client.delete(
f"/api/v1.0/mail-domains/{domain.slug}/",
)
@@ -49,12 +49,12 @@ def test_api_mail_domains__delete_authenticated_member():
Authenticated users should not be allowed to delete a domain
to which they are only a member.
"""
identity = core_factories.IdentityFactory()
user = identity.user
user = core_factories.UserFactory()
domain = factories.MailDomainFactory(users=[(user, "member")])
client = APIClient()
client.force_login(user)
response = client.delete(
f"/api/v1.0/mail-domains/{domain.slug}/",
)
@@ -71,12 +71,12 @@ def test_api_mail_domains__delete_authenticated_administrator():
Authenticated users should not be allowed to delete a domain
for which they are administrator.
"""
identity = core_factories.IdentityFactory()
user = identity.user
user = core_factories.UserFactory()
domain = factories.MailDomainFactory(users=[(user, "administrator")])
client = APIClient()
client.force_login(user)
response = client.delete(
f"/api/v1.0/mail-domains/{domain.slug}/",
)
@@ -93,12 +93,12 @@ def test_api_mail_domains__delete_authenticated_owner():
Authenticated users should be able to delete a domain
for which they are directly owner.
"""
identity = core_factories.IdentityFactory()
user = identity.user
user = core_factories.UserFactory()
domain = factories.MailDomainFactory(users=[(user, "owner")])
client = APIClient()
client.force_login(user)
response = client.delete(
f"/api/v1.0/mail-domains/{domain.slug}/",
)

View File

@@ -32,8 +32,7 @@ def test_api_mail_domains__list_authenticated():
to which they have access.
"""
identity = core_factories.IdentityFactory()
user = identity.user
user = core_factories.UserFactory()
client = APIClient()
client.force_login(user)

View File

@@ -30,10 +30,10 @@ def test_api_mail_domains__retrieve_authenticated_unrelated():
Authenticated users should not be allowed to retrieve a domain
to which they have access.
"""
identity = core_factories.IdentityFactory()
user = core_factories.UserFactory()
client = APIClient()
client.force_login(identity.user)
client.force_login(user)
domain = factories.MailDomainFactory()
@@ -49,8 +49,7 @@ def test_api_mail_domains__retrieve_authenticated_related():
Authenticated users should be allowed to retrieve a domain
to which they have access.
"""
identity = core_factories.IdentityFactory()
user = identity.user
user = core_factories.UserFactory()
client = APIClient()
client.force_login(user)

View File

@@ -36,8 +36,7 @@ def test_api_mailboxes__create_authenticated_missing_fields():
Authenticated users should not be able to create mailboxes
without local part or secondary mail.
"""
user = core_factories.UserFactory(admin_email="tester@ministry.fr")
core_factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
user = core_factories.UserFactory(email="tester@ministry.fr", name="john doe")
client = APIClient()
client.force_login(user)
@@ -74,8 +73,7 @@ def test_api_mailboxes__create_authenticated_missing_fields():
def test_api_mailboxes__create_authenticated_successful():
"""Authenticated users should be able to create mailbox."""
user = core_factories.UserFactory(admin_email="tester@ministry.fr")
core_factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
user = core_factories.UserFactory(email="tester@ministry.fr", name="john doe")
client = APIClient()
client.force_login(user)

View File

@@ -27,8 +27,7 @@ def test_api_mailboxes__list_anonymous():
def test_api_mailboxes__list_authenticated_no_query():
"""Authenticated users should be able to list mailboxes without applying a query."""
user = core_factories.UserFactory(admin_email="tester@ministry.fr")
core_factories.IdentityFactory(user=user, email=user.admin_email, name="john doe")
user = core_factories.UserFactory(email="tester@ministry.fr", name="john doe")
client = APIClient()
client.force_login(user)