👔(backend) add Organization model

We introduce the Organization model has a "hat" for all
users and team.

Each User must have a "default" organization.
Each Team must have an organization.

When a User creates a new Team, the team is linked to their
default Organization.

For now the Organization should not be visible to end users
this is a purely technical aspect as it.

The models are also adding a permission to allow User to edit
an Organization, but for now there are no endpoints for that.

Next steps:
- Add an Organization to each User and Team on all environments
  to mark Organization as mandatory in database.
- Add scope to Organization to list the Service Provider list
  allowed for a User in an Organization.
- Add endpoints + frontend to manage Organization's scopes
This commit is contained in:
Quentin BEY
2024-10-17 15:30:00 +02:00
committed by BEY Quentin
parent b602478406
commit ca886c19b0
14 changed files with 844 additions and 32 deletions

View File

@@ -53,6 +53,7 @@ and this project adheres to
- ✨(api) add RELEASE version on config endpoint #459
- ✨(backend) manage roles on domain admin view
- ✨(frontend) show version number in footer #369
- 👔(backend) add Organization model
### Changed

View File

@@ -363,7 +363,8 @@ name-group=
# Regular expression which should only match function or class names that do
# not require a docstring.
no-docstring-rgx=^_
# Ignore: private stuff and `Params` class from FactoryBoy
no-docstring-rgx=(^_|^Params$)
# List of decorators that produce properties, such as abc.abstractproperty. Add
# to this list to register other decorators that produce valid properties.

View File

@@ -18,6 +18,15 @@ class TeamAccessInline(admin.TabularInline):
readonly_fields = ("created_at", "updated_at")
class OrganizationAccessInline(admin.TabularInline):
"""Inline admin class for organization accesses."""
autocomplete_fields = ["user", "organization"]
extra = 0
model = models.OrganizationAccess
readonly_fields = ("created_at", "updated_at")
class TeamWebhookInline(admin.TabularInline):
"""Inline admin class for team webhooks."""
@@ -31,6 +40,7 @@ class TeamWebhookInline(admin.TabularInline):
class UserAdmin(auth_admin.UserAdmin):
"""Admin class for the User model"""
autocomplete_fields = ["organization"]
fieldsets = (
(
None,
@@ -67,9 +77,10 @@ class UserAdmin(auth_admin.UserAdmin):
},
),
)
inlines = (TeamAccessInline, MailDomainAccessInline)
inlines = (TeamAccessInline, MailDomainAccessInline, OrganizationAccessInline)
list_display = (
"get_user",
"organization",
"created_at",
"updated_at",
"is_active",
@@ -176,3 +187,30 @@ class ContactAdmin(admin.ModelAdmin):
"owner",
"base",
)
@admin.register(models.Organization)
class OrganizationAdmin(admin.ModelAdmin):
"""Admin interface for organizations."""
list_display = (
"name",
"created_at",
"updated_at",
)
search_fields = ("name",)
inlines = (OrganizationAccessInline,)
@admin.register(models.OrganizationAccess)
class OrganizationAccessAdmin(admin.ModelAdmin):
"""Organization access admin interface declaration."""
autocomplete_fields = ("user", "organization")
list_display = (
"user",
"organization",
"role",
"created_at",
"updated_at",
)

View File

@@ -194,6 +194,14 @@ class TeamSerializer(serializers.ModelSerializer):
"updated_at",
]
def create(self, validated_data):
"""Create a new team with organization enforcement."""
# Note: this is not the purpose of this API to check the user has an organization
return super().create(
validated_data=validated_data
| {"organization_id": self.context["request"].user.organization_id}
)
def get_abilities(self, team) -> dict:
"""Return abilities of the logged-in user on the instance."""
request = self.context.get("request")

View File

@@ -1,5 +1,9 @@
"""Authentication Backends for the People core app."""
import logging
from email.headerregistry import Address
from typing import Optional
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import SuspiciousOperation
@@ -10,9 +14,21 @@ from mozilla_django_oidc.auth import (
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
)
from core.models import Organization, OrganizationAccess, OrganizationRoleChoices
logger = logging.getLogger(__name__)
User = get_user_model()
def get_domain_from_email(email: Optional[str]) -> Optional[str]:
"""Extract domain from email."""
try:
return Address(addr_spec=email).domain
except (ValueError, AttributeError):
return None
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
"""Custom OpenID Connect (OIDC) Authentication Backend.
@@ -67,19 +83,24 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
user_info = self.get_userinfo(access_token, id_token, payload)
sub = user_info.get("sub")
if not sub:
raise SuspiciousOperation(
_("User info contained no recognizable user identification")
)
# Get user's full name from OIDC fields defined in settings
full_name = self.compute_full_name(user_info)
email = user_info.get("email")
claims = {
"sub": sub,
"email": email,
"name": full_name,
}
sub = user_info.get("sub")
if not sub:
raise SuspiciousOperation(
_("User info contained no recognizable user identification")
if settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD:
claims[settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD] = user_info.get(
settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD
)
# if sub is absent, try matching on email
@@ -90,7 +111,41 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
raise SuspiciousOperation(_("User account is disabled"))
self.update_user_if_needed(user, claims)
elif self.get_settings("OIDC_CREATE_USER", True):
user = User.objects.create(sub=sub, password="!", **claims) # noqa: S106
user = self.create_user(claims)
# Data cleaning, to be removed when user organization is null=False
# or all users have an organization.
# See https://github.com/numerique-gouv/people/issues/504
if not user.organization_id:
organization_registration_id = claims.get(
settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD
)
domain = get_domain_from_email(email)
try:
organization, organization_created = (
Organization.objects.get_or_create_from_user_claims(
registration_id=organization_registration_id,
domain=domain,
)
)
if organization_created:
logger.info("Organization %s created", organization)
# For this case, we don't create an OrganizationAccess we will
# manage this manually later, because we don't want the first
# user who log in after the release to be the admin of their
# organization. We will keep organization without admin, and
# we will have to manually clean things up (while there is
# not that much organization in the database).
except ValueError as exc:
# Raised when there is no recognizable organization
# identifier (domain or registration_id)
logger.warning("Unable to update user organization: %s", exc)
else:
user.organization = organization
user.save()
logger.info(
"User %s updated with organization %s", user.pk, organization
)
return user
@@ -101,13 +156,47 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
raise SuspiciousOperation(
_("Claims contained no recognizable user identification")
)
email = claims.get("email")
name = claims.get("name")
return self.UserModel.objects.create(
# Extract or create the organization from the data
organization_registration_id = claims.get(
settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD
)
domain = get_domain_from_email(email)
try:
organization, organization_created = (
Organization.objects.get_or_create_from_user_claims(
registration_id=organization_registration_id,
domain=domain,
)
)
except ValueError as exc:
raise SuspiciousOperation(
_("Claims contained no recognizable organization identification")
) from exc
if organization_created:
logger.info("Organization %s created", organization)
logger.info("Creating user %s / %s", sub, email)
user = self.UserModel.objects.create(
organization=organization,
password="!", # noqa: S106
sub=sub,
email=claims.get("email"),
name=claims.get("name"),
email=email,
name=name,
)
if organization_created:
# Warning: we may remove this behavior in the near future when we
# add a feature to claim the organization ownership.
OrganizationAccess.objects.create(
organization=organization,
user=user,
role=OrganizationRoleChoices.ADMIN,
)
return user
def compute_full_name(self, user_info):
"""Compute user's full name based on OIDC fields in settings."""
@@ -132,8 +221,12 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
def update_user_if_needed(self, user, claims):
"""Update user claims if they have changed."""
has_changed = any(
value and value != getattr(user, key) for key, value in claims.items()
value and value != getattr(user, key)
for key, value in claims.items()
if key != "sub"
)
if has_changed:
updated_claims = {key: value for key, value in claims.items() if value}
updated_claims = {
key: value for key, value in claims.items() if value and key != "sub"
}
self.UserModel.objects.filter(sub=user.sub).update(**updated_claims)

View File

@@ -119,6 +119,25 @@ class ContactFactory(BaseContactFactory):
owner = factory.SubFactory("core.factories.UserFactory", profile_contact=None)
class OrganizationFactory(factory.django.DjangoModelFactory):
"""Factory to create organizations for testing purposes."""
name = factory.Faker("company")
class Meta:
model = models.Organization
class Params: # pylint: disable=missing-class-docstring
with_registration_id = factory.Trait(
registration_id_list=factory.List(
[factory.Sequence(lambda n: f"{n:014d}")]
),
)
with_domain = factory.Trait(
domain_list=factory.List([factory.Faker("domain_name")]),
)
class UserFactory(factory.django.DjangoModelFactory):
"""A factory to create random users for testing purposes."""
@@ -126,6 +145,13 @@ class UserFactory(factory.django.DjangoModelFactory):
model = models.User
django_get_or_create = ("sub",)
class Params:
with_organization = factory.Trait(
organization=factory.SubFactory(
OrganizationFactory, with_registration_id=True
),
)
sub = factory.Sequence(lambda n: f"user{n!s}")
email = factory.Faker("email")
name = factory.Faker("name")

View File

@@ -0,0 +1,62 @@
# Generated by Django 5.1.1 on 2024-10-22 10:07
import core.models
import django.contrib.postgres.fields
import django.db.models.deletion
import uuid
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='Organization',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created at')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated at')),
('name', models.CharField(max_length=100, verbose_name='name')),
('registration_id_list', django.contrib.postgres.fields.ArrayField(base_field=models.CharField(max_length=128), blank=True, default=list, size=None, validators=[core.models.validate_unique_registration_id], verbose_name='registration ID list')),
('domain_list', django.contrib.postgres.fields.ArrayField(base_field=models.CharField(max_length=256), blank=True, default=list, size=None, validators=[core.models.validate_unique_domain], verbose_name='domain list')),
],
options={
'verbose_name': 'organization',
'verbose_name_plural': 'organizations',
'db_table': 'people_organization',
'constraints': [models.CheckConstraint(condition=models.Q(('registration_id_list__len__gt', 0), ('domain_list__len__gt', 0), _connector='OR'), name='registration_id_or_domain', violation_error_message='An organization must have at least a registration ID or a domain.')],
},
),
migrations.AddField(
model_name='team',
name='organization',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='teams', to='core.organization'),
),
migrations.AddField(
model_name='user',
name='organization',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='users', to='core.organization'),
),
migrations.CreateModel(
name='OrganizationAccess',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created at')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated at')),
('role', models.CharField(choices=[('administrator', 'Administrator')], default='administrator', max_length=20)),
('organization', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='organization_accesses', to='core.organization')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='organization_accesses', to=settings.AUTH_USER_MODEL)),
],
options={
'verbose_name': 'Organization/user relation',
'verbose_name_plural': 'Organization/user relations',
'db_table': 'people_organization_access',
'constraints': [models.UniqueConstraint(fields=('user', 'organization'), name='unique_organization_user', violation_error_message='This user is already in this organization.')],
},
),
]

View File

@@ -6,14 +6,18 @@ import json
import os
import smtplib
import uuid
from contextlib import suppress
from datetime import timedelta
from logging import getLogger
from typing import Tuple
from django.conf import settings
from django.contrib.auth import models as auth_models
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.postgres.fields import ArrayField
from django.contrib.sites.models import Site
from django.core import exceptions, mail, validators
from django.core.exceptions import ValidationError
from django.db import models, transaction
from django.template.loader import render_to_string
from django.utils import timezone
@@ -27,6 +31,7 @@ from timezone_field import TimeZoneField
from core.enums import WebhookStatusChoices
from core.utils.webhooks import scim_synchronizer
from core.validators import get_field_validators_from_setting
logger = getLogger(__name__)
@@ -44,6 +49,17 @@ class RoleChoices(models.TextChoices):
OWNER = "owner", _("Owner")
class OrganizationRoleChoices(models.TextChoices):
"""
Defines the possible roles a user can have in an organization.
For now, we only have one role, but we might add more in the future.
administrator: The user can manage the organization: change name, add/remove users.
"""
ADMIN = "administrator", _("Administrator")
class BaseModel(models.Model):
"""
Serves as an abstract base model for other models, ensuring that records are validated
@@ -158,6 +174,140 @@ class Contact(BaseModel):
raise exceptions.ValidationError({"data": [error_message]}) from e
class OrganizationManager(models.Manager):
"""
Custom manager for the Organization model, to manage complexity/automation.
"""
def get_or_create_from_user_claims(
self, registration_id: str = None, domain: str = None, **kwargs
) -> Tuple["Organization", bool]:
"""
Get or create an organization using the most fitting information from the user's claims.
We expect to have only one organization per registration_id, but
the registration_id might not be provided.
When the registration_id is not provided, we use the domain to identify the organization.
If both are provided, we use the registration_id first to create missing organization.
Dev note: When a registration_id is provided by the Identity Provider, we don't want
to use the domain to create the organization, because it is less reliable: for example,
a professional user, may have a personal email address, and the domain would be gmail.com
which is not a good identifier for an organization. The domain email is just a fallback
when the registration_id is not provided by the Identity Provider. We can use the domain
to create the organization manually when we are sure about the "safety" of it.
"""
if not any([registration_id, domain]):
raise ValueError("You must provide either a registration_id or a domain.")
filters = models.Q()
if registration_id:
filters |= models.Q(registration_id_list__icontains=registration_id)
if domain:
filters |= models.Q(domain_list__icontains=domain)
with suppress(self.model.DoesNotExist):
# If there are several organizations, we must raise an error and fix the data
# If there is an organization, we return it
return self.get(filters, **kwargs), False
# Manage the case where the organization does not exist: we create one
if registration_id:
return self.create(
name=registration_id, registration_id_list=[registration_id], **kwargs
), True
if domain:
return self.create(name=domain, domain_list=[domain], **kwargs), True
raise ValueError("Should never reach this point.")
def validate_unique_registration_id(value):
"""
Validate that the registration ID values in an array field are unique across all instances.
"""
if Organization.objects.filter(registration_id_list__overlap=value).exists():
raise ValidationError(
"registration_id_list value must be unique across all instances."
)
def validate_unique_domain(value):
"""
Validate that the domain values in an array field are unique across all instances.
"""
if Organization.objects.filter(domain_list__overlap=value).exists():
raise ValidationError("domain_list value must be unique across all instances.")
class Organization(BaseModel):
"""
Organization model used to regroup Teams.
Each User have an Organization, which corresponds actually to a default organization
because a user can belong to a Team from another organization.
Each Team have an Organization, which is the Organization from the User who created
the Team.
Organization is managed automatically, the User should never choose their Organization.
When creating a User, you must use the `get_or_create` method from the
OrganizationManager to find the proper Organization.
An Organization can have several registration IDs and domains but during automatic
creation process, only one will be used. We may want to allow (manual) organization merge
later, to regroup several registration IDs or domain in the same Organization.
"""
name = models.CharField(_("name"), max_length=100)
registration_id_list = ArrayField(
models.CharField(
max_length=128,
validators=get_field_validators_from_setting(
"ORGANIZATION_REGISTRATION_ID_VALIDATORS"
),
),
verbose_name=_("registration ID list"),
default=list,
blank=True,
validators=[
validate_unique_registration_id,
],
)
domain_list = ArrayField(
models.CharField(max_length=256),
verbose_name=_("domain list"),
default=list,
blank=True,
validators=[validate_unique_domain],
)
objects = OrganizationManager()
class Meta:
db_table = "people_organization"
verbose_name = _("organization")
verbose_name_plural = _("organizations")
constraints = [
models.CheckConstraint(
name="registration_id_or_domain",
condition=models.Q(registration_id_list__len__gt=0)
| models.Q(domain_list__len__gt=0),
violation_error_message=_(
"An organization must have at least a registration ID or a domain."
),
),
# Check a registration ID str can only be present in one
# organization registration ID list
# Check a domain str can only be present in one organization domain list
# Those checks cannot be done with Django constraints
]
def __str__(self):
return f"{self.name} (# {self.pk})"
class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
"""User model to work with OIDC only authentication."""
@@ -218,6 +368,13 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
"Unselect this instead of deleting accounts."
),
)
organization = models.ForeignKey(
Organization,
on_delete=models.PROTECT,
related_name="users",
null=True, # Need to be set to False when everything is migrated
blank=True, # Need to be set to False when everything is migrated
)
objects = auth_models.UserManager()
@@ -285,6 +442,44 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
mail.send_mail(subject, message, from_email, [self.email], **kwargs)
class OrganizationAccess(BaseModel):
"""
Link table between organization and users,
only for user with specific rights on Organization.
"""
organization = models.ForeignKey(
Organization,
on_delete=models.CASCADE,
related_name="organization_accesses",
)
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name="organization_accesses",
)
role = models.CharField(
max_length=20,
choices=OrganizationRoleChoices.choices,
default=OrganizationRoleChoices.ADMIN,
)
class Meta:
db_table = "people_organization_access"
verbose_name = _("Organization/user relation")
verbose_name_plural = _("Organization/user relations")
constraints = [
models.UniqueConstraint(
fields=["user", "organization"],
name="unique_organization_user",
violation_error_message=_("This user is already in this organization."),
),
]
def __str__(self):
return f"{self.user!s} is {self.role:s} in organization {self.organization!s}"
class Team(BaseModel):
"""
Represents the link between teams and users, specifying the role a user has in a team.
@@ -299,6 +494,13 @@ class Team(BaseModel):
through_fields=("team", "user"),
related_name="teams",
)
organization = models.ForeignKey(
Organization,
on_delete=models.PROTECT,
related_name="teams",
null=True, # Need to be set to False when everything is migrated
blank=True, # Need to be set to False when everything is migrated
)
class Meta:
db_table = "people_team"

View File

@@ -42,7 +42,8 @@ def test_authentication_getter_existing_user_with_email(
When the user's info contains an email and targets an existing user,
"""
klass = OIDCAuthenticationBackend()
user = factories.UserFactory(name="John Doe")
user = factories.UserFactory(name="John Doe", with_organization=True)
def get_userinfo_mocked(*args):
return {
@@ -79,7 +80,9 @@ def test_authentication_getter_existing_user_change_fields(
It should update the email or name fields on the user when they change.
"""
klass = OIDCAuthenticationBackend()
user = factories.UserFactory(name="John Doe", email="john.doe@example.com")
user = factories.UserFactory(
name="John Doe", email="john.doe@example.com", with_organization=True
)
def get_userinfo_mocked(*args):
return {
@@ -112,7 +115,7 @@ def test_authentication_getter_existing_user_via_email(
"""
klass = OIDCAuthenticationBackend()
db_user = factories.UserFactory()
db_user = factories.UserFactory(with_organization=True)
def get_userinfo_mocked(*args):
return {"sub": "123", "email": db_user.email}
@@ -158,25 +161,23 @@ def test_authentication_getter_existing_user_no_fallback_to_email(
def test_authentication_getter_new_user_no_email(monkeypatch):
"""
If no user matches the user's info sub, a user should be created.
User's info doesn't contain an email/name, created user's email/name should be empty.
If no user matches the user's info sub, a user should not be created without email
nor organization registration ID.
"""
klass = OIDCAuthenticationBackend()
def get_userinfo_mocked(*args):
return {"sub": "123"}
return {"sub": "123"} # No email, no organization registration ID
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user.sub == "123"
assert user.email is None
assert user.name is None
assert user.password == "!"
assert models.User.objects.count() == 1
with (
pytest.raises(
SuspiciousOperation,
match="Claims contained no recognizable organization identification",
),
):
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
def test_authentication_getter_new_user_with_email(monkeypatch):
@@ -284,3 +285,99 @@ def test_authentication_getter_existing_disabled_user_via_email(
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
assert models.User.objects.count() == 1
def test_authentication_getter_new_user_with_email_new_organization(monkeypatch):
"""
If no user matches the user's info sub, a user should be created.
If the corresponding organization doesn't exist, it should be created.
"""
klass = OIDCAuthenticationBackend()
email = "people@example.com"
def get_userinfo_mocked(*args):
return {"sub": "123", "email": email, "first_name": "John", "last_name": "Doe"}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user.organization is not None
assert user.organization.domain_list == ["example.com"]
assert user.organization.registration_id_list == []
@pytest.mark.parametrize(
"registration_id_setting,expected_registration_id_list,expected_domain_list",
[
(None, [], ["example.com"]),
("missing-claim", [], ["example.com"]),
("registration_number", ["12345678901234"], []),
],
)
def test_authentication_getter_new_user_with_registration_id_new_organization(
monkeypatch,
settings,
registration_id_setting,
expected_registration_id_list,
expected_domain_list,
):
"""
If no user matches the user's info sub, a user should be created.
If the corresponding organization doesn't exist, it should be created.
"""
settings.OIDC_ORGANIZATION_REGISTRATION_ID_FIELD = registration_id_setting
klass = OIDCAuthenticationBackend()
email = "people@example.com"
def get_userinfo_mocked(*args):
return {
"sub": "123",
"email": email,
"first_name": "John",
"last_name": "Doe",
"registration_number": "12345678901234",
}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user.organization is not None
assert user.organization.domain_list == expected_domain_list
assert user.organization.registration_id_list == expected_registration_id_list
def test_authentication_getter_existing_user_via_email_update_organization(
django_assert_num_queries, monkeypatch
):
"""
If an existing user already exists without organization, the organization must be updated.
"""
klass = OIDCAuthenticationBackend()
db_user = factories.UserFactory(name="John Doe", email="toto@my-domain.com")
def get_userinfo_mocked(*args):
return {
"sub": db_user.sub,
"email": db_user.email,
"first_name": "John",
"last_name": "Doe",
}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
with django_assert_num_queries(9):
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user == db_user
assert user.organization is not None
assert user.organization.domain_list == ["my-domain.com"]

View File

@@ -10,7 +10,7 @@ from rest_framework.status import (
)
from rest_framework.test import APIClient
from core.factories import TeamFactory, UserFactory
from core.factories import OrganizationFactory, TeamFactory, UserFactory
from core.models import Team
pytestmark = pytest.mark.django_db
@@ -34,7 +34,8 @@ def test_api_teams_create_authenticated():
Authenticated users should be able to create teams and should automatically be declared
as the owner of the newly created team.
"""
user = UserFactory()
organization = OrganizationFactory(with_registration_id=True)
user = UserFactory(organization=organization)
client = APIClient()
client.force_login(user)
@@ -50,6 +51,7 @@ def test_api_teams_create_authenticated():
assert response.status_code == HTTP_201_CREATED
team = Team.objects.get()
assert team.name == "my team"
assert team.organization == organization
assert team.accesses.filter(role="owner", user=user).exists()
@@ -57,7 +59,8 @@ def test_api_teams_create_authenticated_slugify_name():
"""
Creating teams should automatically generate a slug.
"""
user = UserFactory()
organization = OrganizationFactory(with_registration_id=True)
user = UserFactory(organization=organization)
client = APIClient()
client.force_login(user)
@@ -70,6 +73,7 @@ def test_api_teams_create_authenticated_slugify_name():
team = Team.objects.get()
assert team.name == "my team"
assert team.slug == "my-team"
assert team.organization == organization
@pytest.mark.parametrize(
@@ -86,7 +90,8 @@ def test_api_teams_create_authenticated_expected_slug(param):
"""
Creating teams should automatically create unaccented, no unicode, lower-case slug.
"""
user = UserFactory()
organization = OrganizationFactory(with_registration_id=True)
user = UserFactory(organization=organization)
client = APIClient()
client.force_login(user)
@@ -102,6 +107,7 @@ def test_api_teams_create_authenticated_expected_slug(param):
team = Team.objects.get()
assert team.name == param[0]
assert team.slug == param[1]
assert team.organization == organization
def test_api_teams_create_authenticated_unique_slugs():
@@ -123,3 +129,32 @@ def test_api_teams_create_authenticated_unique_slugs():
assert response.status_code == HTTP_400_BAD_REQUEST
assert response.json()["slug"] == ["Team with this Slug already exists."]
def test_api_teams_create_cannot_override_organization():
"""
Authenticated users should be able to create teams and not
be able to set the organization manually (for now).
"""
organization = OrganizationFactory(with_registration_id=True)
user = UserFactory(organization=organization)
client = APIClient()
client.force_login(user)
response = client.post(
"/api/v1.0/teams/",
{
"name": "my team",
"organization": OrganizationFactory(
with_registration_id=True
).pk, # ignored
},
format="json",
)
assert response.status_code == HTTP_201_CREATED
team = Team.objects.get()
assert team.name == "my team"
assert team.organization == organization
assert team.accesses.filter(role="owner", user=user).exists()

View File

@@ -0,0 +1,127 @@
"""
Unit tests for the Organization model
"""
from django.core.exceptions import ValidationError
import pytest
from core import factories, models
pytestmark = pytest.mark.django_db
def test_models_organization_str():
"""The str representation should be the organization's name."""
organization = factories.OrganizationFactory(
name="HAL 9000", registration_id_list=["12345678901234"]
)
assert str(organization) == f"HAL 9000 (# {organization.pk})"
def test_models_organization_constraints():
"""It should not be possible to create an organization."""
organization = factories.OrganizationFactory(
registration_id_list=["12345678901234"], domain_list=["hal9000.com"]
)
with pytest.raises(ValidationError):
models.Organization.objects.create(name="HAL 9000")
with pytest.raises(ValidationError):
models.Organization.objects.create(
name="HAL 9000",
registration_id_list=[
organization.registration_id_list[0],
"12345678901235",
],
)
with pytest.raises(ValidationError):
models.Organization.objects.create(
name="HAL 9000", domain_list=[organization.domain_list[0], "hal9001.com"]
)
def test_models_organization_get_or_create_from_user_claims_no_kwargs():
"""It should fail."""
with pytest.raises(ValueError):
models.Organization.objects.get_or_create_from_user_claims()
def test_models_organization_get_or_create_from_user_claims_with_registration_id():
"""It should create an organization with a registration ID number."""
organization, created = models.Organization.objects.get_or_create_from_user_claims(
registration_id="12345678901234"
)
assert created is True
assert organization.registration_id_list == ["12345678901234"]
assert organization.domain_list == []
same_organization, created = (
models.Organization.objects.get_or_create_from_user_claims(
registration_id="12345678901234"
)
)
assert created is False
assert organization == same_organization
assert same_organization.registration_id_list == ["12345678901234"]
assert same_organization.domain_list == []
def test_models_organization_get_or_create_from_user_claims_with_domain():
"""It should create an organization with a domain."""
organization, created = models.Organization.objects.get_or_create_from_user_claims(
domain="hal9000.com"
)
assert created is True
assert organization.registration_id_list == []
assert organization.domain_list == ["hal9000.com"]
same_organization, created = (
models.Organization.objects.get_or_create_from_user_claims(domain="hal9000.com")
)
assert created is False
assert organization == same_organization
assert same_organization.registration_id_list == []
assert same_organization.domain_list == ["hal9000.com"]
def test_models_organization_get_or_create_from_user_claims_with_registration_id_and_domain():
"""It should create an organization with a registration ID number."""
organization, created = models.Organization.objects.get_or_create_from_user_claims(
registration_id="12345678901234", domain="hal9000.com"
)
assert created is True
assert organization.registration_id_list == ["12345678901234"]
assert organization.domain_list == []
same_organization, created = (
models.Organization.objects.get_or_create_from_user_claims(
registration_id="12345678901234", domain="hal9000.com"
)
)
assert created is False
assert organization == same_organization
assert same_organization.registration_id_list == ["12345678901234"]
assert same_organization.domain_list == []
def test_models_organization_registration_id_validators():
"""
Test the registration ID validators.
This cannot be tested dynamically because the validators are set at model loading
and this is not possible to reload the models on the fly. We therefore enforce the
setting in Test environment.
"""
models.Organization.objects.create(
name="hu",
registration_id_list=["12345678901234"],
)
with pytest.raises(ValidationError):
models.Organization.objects.create(
name="hi",
registration_id_list=["a12345678912345"],
)

View File

@@ -0,0 +1,58 @@
"""
Test cases for core.validators module.
"""
from django.core.exceptions import ImproperlyConfigured
from django.core.validators import EmailValidator, RegexValidator
import pytest
from core.validators import get_field_validators_from_setting
def test_get_field_validators_from_setting_without_option(settings):
"""Test get_field_validators_from_setting without options."""
settings.VALIDATOR_NO_OPTION = [
{
"NAME": "django.core.validators.EmailValidator",
},
]
validators = get_field_validators_from_setting("VALIDATOR_NO_OPTION")
assert len(validators) == 1
assert isinstance(validators[0], EmailValidator)
def test_get_field_validators_from_setting_with_option(settings):
"""Test get_field_validators_from_setting with options."""
settings.REGEX_WITH_OPTIONS = [
{
"NAME": "django.core.validators.RegexValidator",
"OPTIONS": {
"regex": "[a-z][0-9]{14}",
},
},
]
validators = get_field_validators_from_setting("REGEX_WITH_OPTIONS")
assert len(validators) == 1
assert isinstance(validators[0], RegexValidator)
assert validators[0].regex.pattern == "[a-z][0-9]{14}"
def test_get_field_validators_from_setting_invalid_class_name(settings):
"""Test get_field_validators_from_setting with an invalid class name."""
settings.INVALID_VALIDATORS = [
{
"NAME": "non.existent.Validator",
},
]
with pytest.raises(ImproperlyConfigured):
get_field_validators_from_setting("INVALID_VALIDATORS")
def test_get_field_validators_from_setting_empty_setting(settings):
"""Test get_field_validators_from_setting with an empty setting."""
settings.EMPTY_VALIDATORS = []
validators = get_field_validators_from_setting("EMPTY_VALIDATORS")
assert not validators

View File

@@ -0,0 +1,41 @@
"""
Declare validators that can be used in our Django models.
"""
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.utils.module_loading import import_string
def get_field_validators_from_setting(setting_name: str) -> list:
"""
Get field validators from a setting.
Highly inspired by Django's `get_password_validators` function.
The setting should be a list of dictionaries, where each dictionary
should have a NAME key that points to the validator class and an
optional OPTIONS key that points to the validator options.
Example:
```
ORGANIZATION_REGISTRATION_ID_VALIDATORS = [
{
"NAME": "django.core.validators.RegexValidator",
"OPTIONS": {
"regex": "[a-z][0-9]{14}",
},
},
]
```
"""
validators = []
for validator in getattr(settings, setting_name):
try:
klass = import_string(validator["NAME"])
except ImportError as exc:
msg = "The module in NAME could not be imported: %s. Check your %s setting."
raise ImproperlyConfigured(msg % (validator["NAME"], setting_name)) from exc
validators.append(klass(**validator.get("OPTIONS", {})))
return validators

View File

@@ -388,6 +388,11 @@ class Base(Configuration):
environ_name="USER_OIDC_FIELDS_TO_NAME",
environ_prefix=None,
)
OIDC_ORGANIZATION_REGISTRATION_ID_FIELD = values.Value(
default=None,
environ_name="OIDC_ORGANIZATION_REGISTRATION_ID_FIELD",
environ_prefix=None,
)
OIDC_OP_TOKEN_INTROSPECTION_ENDPOINT = values.Value(
None, environ_name="OIDC_OP_TOKEN_INTROSPECTION_ENDPOINT", environ_prefix=None
@@ -437,6 +442,15 @@ class Base(Configuration):
environ_prefix=None,
)
# Organizations
ORGANIZATION_REGISTRATION_ID_VALIDATORS = json.loads(
values.Value(
default="[]",
environ_name="ORGANIZATION_REGISTRATION_ID_VALIDATORS",
environ_prefix=None,
)
)
FEATURES = {
"TEAMS": values.BooleanValue(
default=True, environ_name="FEATURE_TEAMS", environ_prefix=None
@@ -599,6 +613,15 @@ class Test(Base):
# this is a dev credentials for mail provisioning API
MAIL_PROVISIONING_API_CREDENTIALS = "bGFfcmVnaWU6cGFzc3dvcmQ="
ORGANIZATION_REGISTRATION_ID_VALIDATORS = [
{
"NAME": "django.core.validators.RegexValidator",
"OPTIONS": {
"regex": "^[0-9]{14}$",
},
},
]
class ContinuousIntegration(Test):
"""