(api) allow invitations for domain management

add an endpoint to allow domain managers to invite someone on people,
using their email address
This commit is contained in:
Marie PUPO JEAMMET
2025-02-07 19:01:57 +01:00
committed by Sabrina Demagny
parent 9ee1ef5ba0
commit 2224acf12d
12 changed files with 592 additions and 5 deletions

View File

@@ -8,6 +8,10 @@ and this project adheres to
## [Unreleased]
### Added
- ✨(api) allow invitations for domain management #708
## [1.13.1] - 2025-03-04
### Fixed

View File

@@ -1,6 +1,7 @@
"""
Declare and configure the models for the People core application
"""
# pylint: disable=too-many-lines import-outside-toplevel
import json
import os
@@ -36,6 +37,8 @@ from core.plugins.loader import (
from core.utils.webhooks import scim_synchronizer
from core.validators import get_field_validators_from_setting
from mailbox_manager import enums
logger = getLogger(__name__)
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -512,11 +515,12 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
def save(self, *args, **kwargs):
"""
If it's a new user, give her access to the relevant teams.
If it's a new user, give them access to the relevant teams.
"""
if self._state.adding:
self._convert_valid_invitations()
self._convert_valid_team_invitations()
self._convert_valid_domain_invitations()
super().save(*args, **kwargs)
@@ -526,7 +530,7 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
if self.email:
self.email = User.objects.normalize_email(self.email)
def _convert_valid_invitations(self):
def _convert_valid_team_invitations(self):
"""
Convert valid invitations to team accesses.
Expired invitations are ignored.
@@ -551,6 +555,53 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
)
valid_invitations.delete()
def _convert_valid_domain_invitations(self):
"""
Convert valid domain invitations to domain accesses.
Expired invitations are ignored.
"""
from mailbox_manager.models import DomainInvitation, MailDomainAccess
from mailbox_manager.utils.dimail import DimailAPIClient
valid_domain_invitations = DomainInvitation.objects.filter(
email=self.email,
created_at__gte=(
timezone.now()
- timedelta(seconds=settings.INVITATION_VALIDITY_DURATION)
),
)
if not valid_domain_invitations.exists():
return
MailDomainAccess.objects.bulk_create(
[
MailDomainAccess(
user=self, domain=invitation.domain, role=invitation.role
)
for invitation in valid_domain_invitations
]
)
management_role = set(valid_domain_invitations.values_list("role", flat="True"))
if (
enums.MailDomainRoleChoices.OWNER in management_role
or enums.MailDomainRoleChoices.ADMIN in management_role
):
# Sync with dimail
dimail = DimailAPIClient()
dimail.create_user(self.sub)
for invitation in valid_domain_invitations:
if invitation.role in [
enums.MailDomainRoleChoices.OWNER,
enums.MailDomainRoleChoices.ADMIN,
]:
dimail.create_allow(self.sub, invitation.domain.name)
valid_domain_invitations.delete()
def email_user(self, subject, message, from_email=None, **kwargs):
"""Email this user."""
if not self.email:

View File

@@ -144,7 +144,7 @@ def test_models_invitation__new_user__filter_expired_invitations():
).exists()
@pytest.mark.parametrize("num_invitations, num_queries", [(0, 4), (1, 7), (20, 7)])
@pytest.mark.parametrize("num_invitations, num_queries", [(0, 5), (1, 8), (20, 8)])
def test_models_invitation__new_user__user_creation_constant_num_queries(
django_assert_num_queries, num_invitations, num_queries
):

View File

@@ -267,3 +267,35 @@ class MailDomainAccessReadOnlySerializer(MailDomainAccessSerializer):
"role",
"can_set_role_to",
]
class DomainInvitationSerializer(serializers.ModelSerializer):
"""Serialize invitations."""
class Meta:
model = models.DomainInvitation
fields = ["id", "created_at", "email", "domain", "role", "issuer", "is_expired"]
read_only_fields = ["id", "created_at", "domain", "issuer", "is_expired"]
def validate(self, attrs):
"""Validate and restrict invitation to new user based on email."""
request = self.context.get("request")
user = getattr(request, "user", None)
try:
domain_slug = self.context["domain_slug"]
except KeyError as exc:
raise exceptions.ValidationError(
"You must set a domain slug in kwargs to create a new domain management invitation."
) from exc
domain = models.MailDomain.objects.get(slug=domain_slug)
if not domain.get_abilities(user)["manage_accesses"]:
raise exceptions.PermissionDenied(
"You are not allowed to manage invitations for this domain."
)
attrs["domain"] = domain
attrs["issuer"] = user
return attrs

View File

@@ -49,7 +49,7 @@ class MailDomainViewSet(
queryset = models.MailDomain.objects.all()
def get_queryset(self):
"""Restrict results to the current user's team."""
"""Restrict results to the current user's domain."""
return self.queryset.filter(accesses__user=self.request.user)
def perform_create(self, serializer):
@@ -292,3 +292,66 @@ class MailBoxViewSet(
mailbox.status = enums.MailboxStatusChoices.ENABLED
mailbox.save()
return Response(serializers.MailboxSerializer(mailbox).data)
class DomainInvitationViewset(
mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
viewsets.GenericViewSet,
):
"""API ViewSet for user invitations to domain management.
GET /api/<version>/mail-domains/<domain_slug>/invitations/:<invitation_id>/
Return list of invitations related to that domain or one
domain access if an id is provided.
POST /api/<version>/mail-domains/<domain_slug>/invitations/ with expected data:
- email: str
- role: str [owner|admin|member]
- issuer : User, automatically added from user making query, if allowed
- domain : Domain, automatically added from requested URI
Return a newly created invitation
PUT / PATCH : Not permitted. Instead of updating your invitation,
delete and create a new one.
"""
lookup_field = "id"
permission_classes = [permissions.AccessPermission]
queryset = (
models.DomainInvitation.objects.all()
.select_related("domain")
.order_by("-created_at")
)
serializer_class = serializers.DomainInvitationSerializer
def get_serializer_context(self):
"""Extra context provided to the serializer class."""
context = super().get_serializer_context()
context["domain_slug"] = self.kwargs["domain_slug"]
return context
def get_queryset(self):
"""Return the queryset according to the action."""
queryset = super().get_queryset()
queryset = queryset.filter(domain__slug=self.kwargs["domain_slug"])
if self.action == "list":
# Determine which role the logged-in user has in the domain
user_role_query = models.MailDomainAccess.objects.filter(
user=self.request.user, domain__slug=self.kwargs["domain_slug"]
).values("role")
queryset = (
# The logged-in user should be part of a domain to see its accesses
queryset.filter(
domain__accesses__user=self.request.user,
)
# Abilities are computed based on logged-in user's role and
# the user role on each domain access
.annotate(user_role=Subquery(user_role_query))
.distinct()
)
return queryset

View File

@@ -84,3 +84,17 @@ class MailboxEnabledFactory(MailboxFactory):
"""A factory to create mailbox enabled."""
status = enums.MailboxStatusChoices.ENABLED
class DomainInvitationFactory(factory.django.DjangoModelFactory):
"""A factory to create invitations for a user"""
class Meta:
model = models.DomainInvitation
domain = factory.SubFactory(MailDomainEnabledFactory)
email = factory.Faker("email")
role = factory.fuzzy.FuzzyChoice(
[role[0] for role in enums.MailDomainRoleChoices.choices]
)
issuer = factory.SubFactory(core_factories.UserFactory)

View File

@@ -0,0 +1,134 @@
"""
Tests for DomainInvitations API endpoint in People's app mailbox_manager. Focus on "create" action.
"""
import pytest
from rest_framework import status
from rest_framework.test import APIClient
from core import factories as core_factories
from mailbox_manager import enums, factories
from mailbox_manager.api.client import serializers
pytestmark = pytest.mark.django_db
def test_api_domain_invitations__create__anonymous():
"""Anonymous users should not be able to create invitations."""
domain = factories.MailDomainEnabledFactory()
invitation_values = serializers.DomainInvitationSerializer(
factories.DomainInvitationFactory.build()
).data
response = APIClient().post(
f"/api/v1.0/mail-domains/{domain.slug}/invitations/",
invitation_values,
format="json",
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
def test_api_domain_invitations__create__authenticated_outsider():
"""Users should not be permitted to send domain management invitations
for a domain they don't manage."""
user = core_factories.UserFactory()
domain = factories.MailDomainEnabledFactory()
invitation_values = serializers.DomainInvitationSerializer(
factories.DomainInvitationFactory.build()
).data
client = APIClient()
client.force_login(user)
response = client.post(
f"/api/v1.0/mail-domains/{domain.slug}/invitations/",
invitation_values,
format="json",
)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.parametrize(
"role",
["owner", "administrator"],
)
def test_api_domain_invitations__admin_should_create_invites(role):
"""Owners and administrators should be able to invite new domain managers."""
user = core_factories.UserFactory()
domain = factories.MailDomainEnabledFactory()
factories.MailDomainAccessFactory(domain=domain, user=user, role=role)
invitation_values = serializers.DomainInvitationSerializer(
factories.DomainInvitationFactory.build()
).data
client = APIClient()
client.force_login(user)
response = client.post(
f"/api/v1.0/mail-domains/{domain.slug}/invitations/",
invitation_values,
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
def test_api_domain_invitations__viewers_should_not_invite_to_manage_domains():
"""
Domain viewers should not be able to invite new domain managers.
"""
user = core_factories.UserFactory()
domain = factories.MailDomainEnabledFactory()
factories.MailDomainAccessFactory(
domain=domain, user=user, role=enums.MailDomainRoleChoices.VIEWER
)
invitation_values = serializers.DomainInvitationSerializer(
factories.DomainInvitationFactory.build()
).data
client = APIClient()
client.force_login(user)
response = client.post(
f"/api/v1.0/mail-domains/{domain.slug}/invitations/",
invitation_values,
format="json",
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json() == {
"detail": "You are not allowed to manage invitations for this domain."
}
def test_api_domain_invitations__should_not_create_duplicate_invitations():
"""An email should not be invited multiple times to the same domain."""
existing_invitation = factories.DomainInvitationFactory()
domain = existing_invitation.domain
# Grant privileged role on the domain to the user
user = core_factories.UserFactory()
factories.MailDomainAccessFactory(
domain=domain, user=user, role=enums.MailDomainRoleChoices.OWNER
)
# Create a new invitation to the same domain with the exact same email address
duplicated_invitation = serializers.DomainInvitationSerializer(
factories.DomainInvitationFactory.build(email=existing_invitation.email)
).data
client = APIClient()
client.force_login(user)
response = client.post(
f"/api/v1.0/mail-domains/{domain.slug}/invitations/",
duplicated_invitation,
format="json",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json()["__all__"] == [
"Domain invitation with this Email address and Domain already exists."
]

View File

@@ -0,0 +1,83 @@
"""
Tests for DomainInvitations API endpoint in People's app mailbox_manager. Focus on "list" action.
"""
import time
from django.conf import settings
import pytest
from rest_framework import status
from rest_framework.test import APIClient
from core import factories as core_factories
from mailbox_manager import enums, factories
pytestmark = pytest.mark.django_db
def test_api_domain_invitations__anonymous_user_should_not_list_invitations():
"""Anonymous users should not be able to list invitations."""
domain = factories.MailDomainEnabledFactory()
response = APIClient().get(f"/api/v1.0/mail-domains/{domain.slug}/invitations/")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_api_domain_invitations__domain_managers_should_list_invitations():
"""
Authenticated user should be able to list invitations
in domains they manage, including from other issuers.
"""
auth_user, other_user = core_factories.UserFactory.create_batch(2)
domain = factories.MailDomainEnabledFactory()
factories.MailDomainAccessFactory(
domain=domain, user=auth_user, role=enums.MailDomainRoleChoices.ADMIN
)
factories.MailDomainAccessFactory(
domain=domain, user=other_user, role=enums.MailDomainRoleChoices.OWNER
)
invitation = factories.DomainInvitationFactory(
domain=domain, role=enums.MailDomainRoleChoices.ADMIN, issuer=auth_user
)
other_invitations = factories.DomainInvitationFactory.create_batch(
2, domain=domain, role=enums.MailDomainRoleChoices.VIEWER, issuer=other_user
)
# expired invitations should be listed too
# override settings to accelerate validation expiration
settings.INVITATION_VALIDITY_DURATION = 1 # second
expired_invitation = factories.DomainInvitationFactory(
domain=domain, role=enums.MailDomainRoleChoices.VIEWER, issuer=auth_user
)
time.sleep(1)
# invitations from other teams should not be listed
other_domain = factories.MailDomainEnabledFactory()
factories.DomainInvitationFactory.create_batch(
2, domain=other_domain, role=enums.MailDomainRoleChoices.OWNER
)
client = APIClient()
client.force_login(auth_user)
response = client.get(
f"/api/v1.0/mail-domains/{domain.slug}/invitations/",
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["count"] == 4
assert sorted(response.json()["results"], key=lambda x: x["created_at"]) == sorted(
[
{
"id": str(i.id),
"created_at": i.created_at.isoformat().replace("+00:00", "Z"),
"email": str(i.email),
"domain": str(domain.id),
"role": str(i.role),
"issuer": str(i.issuer.id),
"is_expired": i.is_expired,
}
for i in [invitation, *other_invitations, expired_invitation]
],
key=lambda x: x["created_at"],
)

View File

@@ -0,0 +1,73 @@
"""
Tests for DomainInvitations API endpoint. Focus on "retrieve" action.
"""
import pytest
from rest_framework import status
from rest_framework.test import APIClient
from core import factories as core_factories
from mailbox_manager import enums, factories
pytestmark = pytest.mark.django_db
def test_api_domain_invitations__anonymous_user_should_not_retrieve_invitations():
"""
Anonymous user should not be able to retrieve invitations.
"""
invitation = factories.DomainInvitationFactory()
response = APIClient().get(
f"/api/v1.0/mail-domains/{invitation.domain.slug}/invitations/",
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_api_domain_invitations__unrelated_user_should_not_retrieve_invitations():
"""
Authenticated unrelated users should not be able to retrieve invitations.
"""
auth_user = core_factories.UserFactory()
invitation = factories.DomainInvitationFactory()
client = APIClient()
client.force_login(auth_user)
response = client.get(
f"/api/v1.0/mail-domains/{invitation.domain.slug}/invitations/",
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["count"] == 0
def test_api_domain_invitations__domain_managers_should_list_invitations():
"""
Authenticated domain managers should be able to retrieve invitations
whatever their role in the domain.
"""
auth_user = core_factories.UserFactory()
invitation = factories.DomainInvitationFactory()
factories.MailDomainAccessFactory(
domain=invitation.domain, user=auth_user, role=enums.MailDomainRoleChoices.ADMIN
)
client = APIClient()
client.force_login(auth_user)
response = client.get(
f"/api/v1.0/mail-domains/{invitation.domain.slug}/invitations/{invitation.id}/",
)
assert response.status_code == status.HTTP_200_OK
assert response.json() == {
"id": str(invitation.id),
"created_at": invitation.created_at.isoformat().replace("+00:00", "Z"),
"email": invitation.email,
"domain": str(invitation.domain.id),
"role": str(invitation.role),
"issuer": str(invitation.issuer.id),
"is_expired": False,
}

View File

@@ -3,6 +3,21 @@
import json
## USERS
def response_user_created(user_sub):
"""mimic dimail response upon succesfull user creation."""
return json.dumps(
{
"name": user_sub,
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
)
## DOMAINS
CHECK_DOMAIN_BROKEN = {
@@ -278,6 +293,15 @@ DOMAIN_SPEC = [
TOKEN_OK = json.dumps({"access_token": "token", "token_type": "bearer"})
## ALLOWS
def response_allows_created(user_name, domain_name):
"""mimic dimail response upon succesfull allows creation.
Dimail expects a name but our names are ProConnect's uuids."""
return json.dumps({"user": user_name, "domain": domain_name})
## MAILBOXES

View File

@@ -0,0 +1,103 @@
"""
Unit tests for the Domain Invitation model
"""
import re
import time
from django.conf import settings
from django.core import exceptions
import pytest
import responses
from freezegun import freeze_time
from rest_framework import status
from core import factories as core_factories
from mailbox_manager import enums, factories, models
from mailbox_manager.tests.fixtures import dimail
pytestmark = pytest.mark.django_db
def test_models_domain_invitations_readonly_after_create():
"""Existing invitations should be readonly."""
invitation = factories.DomainInvitationFactory()
with pytest.raises(exceptions.PermissionDenied):
invitation.save()
def test_models_domain_invitations__is_expired():
"""
The 'is_expired' property should return False until validity duration
is exceeded and True afterwards.
"""
expired_invitation = factories.DomainInvitationFactory()
assert expired_invitation.is_expired is False
settings.INVITATION_VALIDITY_DURATION = 1
time.sleep(1)
assert expired_invitation.is_expired is True
def test_models_domain_invitation__should_convert_invitations_to_accesses_upon_joining():
"""
Upon creating a new user, domain invitations linked to that email
should be converted to accesses and then deleted.
"""
# Two invitations to the same mail but to different domains
email = "future_admin@example.com"
invitation_to_domain1 = factories.DomainInvitationFactory(
email=email, role=enums.MailDomainRoleChoices.OWNER
)
invitation_to_domain2 = factories.DomainInvitationFactory(email=email)
# an expired invitation that should not be converted
with freeze_time("1985-10-30"):
expired_invitation = factories.DomainInvitationFactory(email=email)
# another person invited to domain2
other_invitation = factories.DomainInvitationFactory(
domain=invitation_to_domain2.domain
)
new_user = core_factories.UserFactory.build(email=email)
with responses.RequestsMock() as rsps:
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=dimail.response_user_created("sub"),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=dimail.response_allows_created(
"sub", invitation_to_domain1.domain.name
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
new_user = core_factories.UserFactory(email=email)
assert models.MailDomainAccess.objects.filter(
domain=invitation_to_domain1.domain, user=new_user
).exists()
assert models.MailDomainAccess.objects.filter(
domain=invitation_to_domain2.domain, user=new_user
).exists()
assert not models.DomainInvitation.objects.filter(
domain=invitation_to_domain1.domain, email=email
).exists() # invitation "consumed"
assert not models.DomainInvitation.objects.filter(
domain=invitation_to_domain2.domain, email=email
).exists() # invitation "consumed"
assert models.DomainInvitation.objects.filter(
domain=expired_invitation.domain, email=email
).exists() # expired invitation remains
assert models.DomainInvitation.objects.filter(
domain=invitation_to_domain2.domain, email=other_invitation.email
).exists() # the other invitation remains

View File

@@ -24,6 +24,12 @@ maildomain_related_router.register(
basename="mailboxes",
)
maildomain_related_router.register(
"invitations",
viewsets.DomainInvitationViewset,
basename="invitations",
)
urlpatterns = [
path(