(api) add invitations CRUD

Nest invitation router below team router and add create endpoints for
authenticated administrators/owners to invite new members to their team,
list valid and expired invitations or delete invite altogether.

Update will not be handled for now. Delete and recreate if needed.
This commit is contained in:
Marie PUPO JEAMMET
2024-02-12 19:07:11 +01:00
committed by aleb_the_flash
parent a15e46a2f9
commit 62758763df
8 changed files with 694 additions and 14 deletions

View File

@@ -229,5 +229,31 @@ class InvitationSerializer(serializers.ModelSerializer):
class Meta:
model = models.Invitation
fields = ["email", "team", "role", "issuer"]
read_only_fields = ["team", "issuer"]
fields = ["id", "created_at", "email", "team", "role", "issuer", "is_expired"]
read_only_fields = ["id", "created_at", "team", "issuer", "is_expired"]
def validate(self, attrs):
"""Validate and restrict invitation to new user based on email."""
request = self.context.get("request")
user = getattr(request, "user", None)
try:
team_id = self.context["team_id"]
except KeyError as exc:
raise exceptions.ValidationError(
"You must set a team ID in kwargs to create a new team invitation."
) from exc
if not models.TeamAccess.objects.filter(
team=team_id,
user=user,
role__in=[models.RoleChoices.OWNER, models.RoleChoices.ADMIN],
).exists():
raise exceptions.PermissionDenied(
"You are not allowed to manage invitation for this team."
)
attrs["team_id"] = team_id
attrs["issuer"] = user
return attrs

View File

@@ -10,9 +10,9 @@ from rest_framework import (
mixins,
pagination,
response,
throttling,
viewsets,
)
from rest_framework.throttling import UserRateThrottle
from core import models
@@ -103,7 +103,7 @@ class Pagination(pagination.PageNumberPagination):
page_size_query_param = "page_size"
class BurstRateThrottle(UserRateThrottle):
class BurstRateThrottle(throttling.UserRateThrottle):
"""
Throttle rate for minutes. See DRF section in settings for default value.
"""
@@ -111,7 +111,7 @@ class BurstRateThrottle(UserRateThrottle):
scope = "burst"
class SustainedRateThrottle(UserRateThrottle):
class SustainedRateThrottle(throttling.UserRateThrottle):
"""
Throttle rate for hours. See DRF section in settings for default value.
"""
@@ -401,3 +401,77 @@ class TeamAccessViewSet(
raise exceptions.ValidationError({"role": message})
serializer.save()
class InvitationViewset(
mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.DestroyModelMixin,
viewsets.GenericViewSet,
):
"""API ViewSet for user invitations to team.
GET /api/v1.0/teams/<team_id>/invitations/:<invitation_id>/
Return list of invitations related to that team or or one
team access if an id is provided.
POST /api/v1.0/teams/<team_id>/invitations/ with expected data:
- email: str
- role: str [owner|admin|member]
- issuer : User, automatically added from user making query, if allowed
- team : Team, automatically added from requested URI
Return newly created invitation
PUT / PATCH : Not permitted. Instead of updating your invitation,
delete and create a new one.
DELETE /api/v1.0/teams/<team_id>/invitations/<invitation_id>/
Delete targeted invitation
"""
lookup_field = "id"
pagination_class = Pagination
permission_classes = [permissions.AccessPermission]
queryset = (
models.Invitation.objects.all().select_related("team").order_by("-created_at")
)
serializer_class = serializers.InvitationSerializer
def get_permissions(self):
"""User only needs to be authenticated to list invitations"""
if self.action == "list":
permission_classes = [permissions.IsAuthenticated]
else:
return super().get_permissions()
return [permission() for permission in permission_classes]
def get_serializer_context(self):
"""Extra context provided to the serializer class."""
context = super().get_serializer_context()
context["team_id"] = self.kwargs["team_id"]
return context
def get_queryset(self):
"""Return the queryset according to the action."""
queryset = super().get_queryset()
queryset = queryset.filter(team=self.kwargs["team_id"])
if self.action == "list":
# Determine which role the logged-in user has in the team
user_role_query = models.TeamAccess.objects.filter(
user=self.request.user, team=self.kwargs["team_id"]
).values("role")[:1]
queryset = (
# The logged-in user should be part of a team to see its accesses
queryset.filter(
team__accesses__user=self.request.user,
)
# Abilities are computed based on logged-in user's role and
# the user role on each team access
.annotate(user_role=Subquery(user_role_query))
.distinct()
)
return queryset

View File

@@ -181,7 +181,7 @@ class InvitationFactory(factory.django.DjangoModelFactory):
class Meta:
model = models.Invitation
email = factory.Faker("email")
team = factory.SubFactory(TeamFactory)
email = factory.Faker("email")
role = factory.fuzzy.FuzzyChoice([role[0] for role in models.RoleChoices.choices])
issuer = factory.SubFactory(UserFactory)

View File

@@ -1,4 +1,4 @@
# Generated by Django 5.0.2 on 2024-02-23 06:46
# Generated by Django 5.0.2 on 2024-03-05 17:09
import django.contrib.auth.models
import django.core.validators
@@ -118,7 +118,9 @@ class Migration(migrations.Migration):
('team', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='invitations', to='core.team')),
],
options={
'abstract': False,
'verbose_name': 'Team invitation',
'verbose_name_plural': 'Team invitations',
'db_table': 'people_invitation',
},
),
migrations.CreateModel(
@@ -158,6 +160,10 @@ class Migration(migrations.Migration):
model_name='identity',
constraint=models.UniqueConstraint(fields=('user', 'email'), name='unique_user_email', violation_error_message='This email address is already declared for this user.'),
),
migrations.AddConstraint(
model_name='invitation',
constraint=models.UniqueConstraint(fields=('email', 'team'), name='email_and_team_unique_together'),
),
migrations.AddConstraint(
model_name='teamaccess',
constraint=models.UniqueConstraint(fields=('user', 'team'), name='unique_team_user', violation_error_message='This user is already in this team.'),

View File

@@ -463,11 +463,65 @@ class Invitation(BaseModel):
related_name="invitations",
)
class Meta:
db_table = "people_invitation"
verbose_name = _("Team invitation")
verbose_name_plural = _("Team invitations")
constraints = [
models.UniqueConstraint(
fields=["email", "team"], name="email_and_team_unique_together"
)
]
def __str__(self):
return f"{self.email} invited to {self.team}"
def save(self, *args, **kwargs):
"""Make invitations read-only."""
if self.created_at:
raise exceptions.PermissionDenied()
super().save(*args, **kwargs)
def clean(self):
"""Validate fields."""
super().clean()
# Check if an identity already exists for the provided email
if Identity.objects.filter(email=self.email).exists():
raise exceptions.ValidationError(
{"email": _("This email is already associated to a registered user.")}
)
@property
def is_expired(self):
"""Calculate if invitation is still valid or has expired."""
if not self.created_at:
return None
validity_duration = timedelta(seconds=settings.INVITATION_VALIDITY_DURATION)
return timezone.now() > (self.created_at + validity_duration)
def get_abilities(self, user):
"""Compute and return abilities for a given user."""
can_delete = False
role = None
if user.is_authenticated:
try:
role = self.user_role
except AttributeError:
try:
role = self.team.accesses.filter(user=user).values("role")[0][
"role"
]
except (self._meta.model.DoesNotExist, IndexError):
role = None
can_delete = role in [RoleChoices.OWNER, RoleChoices.ADMIN]
return {
"delete": can_delete,
"get": bool(role),
"patch": False,
"put": False,
}

View File

@@ -0,0 +1,421 @@
"""
Unit tests for the Invitation model
"""
import time
import pytest
from rest_framework import status
from rest_framework.test import APIClient
from core import factories
from core.api import serializers
pytestmark = pytest.mark.django_db
def test_api_team_invitations__create__anonymous():
"""Anonymous users should not be able to create invitations."""
team = factories.TeamFactory()
invitation_values = serializers.InvitationSerializer(
factories.InvitationFactory.build()
).data
response = APIClient().post(
f"/api/v1.0/teams/{team.id}/invitations/",
invitation_values,
format="json",
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
def test_api_team_invitations__create__authenticated_outsider():
"""Users outside of team should not be permitted to invite to team."""
identity = factories.IdentityFactory()
team = factories.TeamFactory()
invitation_values = serializers.InvitationSerializer(
factories.InvitationFactory.build()
).data
client = APIClient()
client.force_login(identity.user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
invitation_values,
format="json",
)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.parametrize(
"role",
["owner", "administrator"],
)
def test_api_team_invitations__create__privileged_members(role):
"""Owners and administrators should be able to invite new members."""
identity = factories.IdentityFactory()
team = factories.TeamFactory(users=[(identity.user, role)])
invitation_values = serializers.InvitationSerializer(
factories.InvitationFactory.build()
).data
client = APIClient()
client.force_login(identity.user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
invitation_values,
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
def test_api_team_invitations__create__members():
"""
Members should not be able to invite new members.
"""
identity = factories.IdentityFactory()
team = factories.TeamFactory(users=[(identity.user, "member")])
invitation_values = serializers.InvitationSerializer(
factories.InvitationFactory.build()
).data
client = APIClient()
client.force_login(identity.user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
invitation_values,
format="json",
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json() == {
"detail": "You are not allowed to manage invitation for this team."
}
def test_api_team_invitations__create__issuer_and_team_automatically_added():
"""Team and issuer fields should auto-complete."""
identity = factories.IdentityFactory()
team = factories.TeamFactory(users=[(identity.user, "owner")])
# Generate a random invitation
invitation = factories.InvitationFactory.build()
invitation_data = {"email": invitation.email, "role": invitation.role}
client = APIClient()
client.force_login(identity.user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
invitation_data,
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
# team and issuer automatically set
assert response.json()["team"] == str(team.id)
assert response.json()["issuer"] == str(identity.user.id)
def test_api_team_invitations__create__cannot_duplicate_invitation():
"""An email should not be invited multiple times to the same team."""
existing_invitation = factories.InvitationFactory()
team = existing_invitation.team
# Grant privileged role on the Team to the user
identity = factories.IdentityFactory()
factories.TeamAccessFactory(team=team, user=identity.user, role="administrator")
# Create a new invitation to the same team with the exact same email address
duplicated_invitation = serializers.InvitationSerializer(
factories.InvitationFactory.build(email=existing_invitation.email)
).data
client = APIClient()
client.force_login(identity.user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
duplicated_invitation,
format="json",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json()["__all__"] == [
"Team invitation with this Email address and Team already exists."
]
def test_api_team_invitations__create__cannot_invite_existing_users():
"""
Should not be able to invite already existing users.
"""
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "administrator")])
existing_user = factories.IdentityFactory(is_main=True)
# Build an invitation to the email of an exising identity in the db
invitation_values = serializers.InvitationSerializer(
factories.InvitationFactory.build(email=existing_user.email, team=team)
).data
client = APIClient()
client.force_login(user)
response = client.post(
f"/api/v1.0/teams/{team.id}/invitations/",
invitation_values,
format="json",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json()["email"] == [
"This email is already associated to a registered user."
]
def test_api_team_invitations__list__anonymous_user():
"""Anonymous users should not be able to list invitations."""
team = factories.TeamFactory()
response = APIClient().get(f"/api/v1.0/teams/{team.id}/invitations/")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_api_team_invitations__list__authenticated():
"""
Authenticated user should be able to list invitations
in teams they belong to, including from other issuers.
"""
identity = factories.IdentityFactory()
other_user = factories.UserFactory()
team = factories.TeamFactory(
users=[(identity.user, "administrator"), (other_user, "owner")]
)
invitation = factories.InvitationFactory(
team=team, role="administrator", issuer=identity.user
)
other_invitations = factories.InvitationFactory.create_batch(
2, team=team, role="member", issuer=other_user
)
# invitations from other teams should not be listed
other_team = factories.TeamFactory()
factories.InvitationFactory.create_batch(2, team=other_team, role="member")
client = APIClient()
client.force_login(identity.user)
response = client.get(
f"/api/v1.0/teams/{team.id}/invitations/",
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["count"] == 3
assert sorted(response.json()["results"], key=lambda x: x["created_at"]) == sorted(
[
{
"id": str(i.id),
"created_at": i.created_at.isoformat().replace("+00:00", "Z"),
"email": str(i.email),
"team": str(team.id),
"role": i.role,
"issuer": str(i.issuer.id),
"is_expired": False,
}
for i in [invitation, *other_invitations]
],
key=lambda x: x["created_at"],
)
def test_api_team_invitations__list__expired_invitations_still_listed(settings):
"""
Expired invitations are still listed.
"""
identity = factories.IdentityFactory()
other_user = factories.UserFactory()
team = factories.TeamFactory(
users=[(identity.user, "administrator"), (other_user, "owner")]
)
# override settings to accelerate validation expiration
settings.INVITATION_VALIDITY_DURATION = 1 # second
expired_invitation = factories.InvitationFactory(
team=team,
role="member",
issuer=identity.user,
)
time.sleep(1)
client = APIClient()
client.force_login(identity.user)
response = client.get(
f"/api/v1.0/teams/{team.id}/invitations/",
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["count"] == 1
assert sorted(response.json()["results"], key=lambda x: x["created_at"]) == sorted(
[
{
"id": str(expired_invitation.id),
"created_at": expired_invitation.created_at.isoformat().replace(
"+00:00", "Z"
),
"email": str(expired_invitation.email),
"team": str(team.id),
"role": expired_invitation.role,
"issuer": str(expired_invitation.issuer.id),
"is_expired": True,
},
],
key=lambda x: x["created_at"],
)
def test_api_team_invitations__retrieve__anonymous_user():
"""
Anonymous user should not be able to retrieve invitations.
"""
invitation = factories.InvitationFactory()
response = APIClient().get(
f"/api/v1.0/teams/{invitation.team.id}/invitations/{invitation.id}/",
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_api_team_invitations__retrieve__unrelated_user():
"""
Authenticated unrelated users should not be able to retrieve invitations.
"""
user = factories.IdentityFactory(user=factories.UserFactory()).user
invitation = factories.InvitationFactory()
client = APIClient()
client.force_login(user)
response = client.get(
f"/api/v1.0/teams/{invitation.team.id}/invitations/{invitation.id}/",
)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_api_team_invitations__retrieve__team_member():
"""
Authenticated team members should be able to retrieve invitations
whatever their role in the team.
"""
user = factories.IdentityFactory(user=factories.UserFactory()).user
invitation = factories.InvitationFactory()
factories.TeamAccessFactory(team=invitation.team, user=user, role="member")
client = APIClient()
client.force_login(user)
response = client.get(
f"/api/v1.0/teams/{invitation.team.id}/invitations/{invitation.id}/",
)
assert response.status_code == status.HTTP_200_OK
assert response.json() == {
"id": str(invitation.id),
"created_at": invitation.created_at.isoformat().replace("+00:00", "Z"),
"email": invitation.email,
"team": str(invitation.team.id),
"role": str(invitation.role),
"issuer": str(invitation.issuer.id),
"is_expired": False,
}
@pytest.mark.parametrize(
"method",
["put", "patch"],
)
def test_api_team_invitations__update__forbidden(method):
"""
Update of invitations is currently forbidden.
"""
user = factories.IdentityFactory(user=factories.UserFactory()).user
invitation = factories.InvitationFactory()
factories.TeamAccessFactory(team=invitation.team, user=user, role="owner")
client = APIClient()
client.force_login(user)
if method == "put":
response = client.put(
f"/api/v1.0/teams/{invitation.team.id}/invitations/{invitation.id}/",
)
if method == "patch":
response = client.patch(
f"/api/v1.0/teams/{invitation.team.id}/invitations/{invitation.id}/",
)
assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED
assert response.json()["detail"] == f'Method "{method.upper()}" not allowed.'
def test_api_team_invitations__delete__anonymous():
"""Anonymous user should not be able to delete invitations."""
invitation = factories.InvitationFactory()
response = APIClient().delete(
f"/api/v1.0/teams/{invitation.team.id}/invitations/{invitation.id}/",
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_api_team_invitations__delete__authenticated_outsider():
"""Members outside of team should not cancel invitations."""
identity = factories.IdentityFactory()
team = factories.TeamFactory()
invitation = factories.InvitationFactory(team=team)
client = APIClient()
client.force_login(identity.user)
response = client.delete(
f"/api/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.parametrize("role", ["owner", "administrator"])
def test_api_team_invitations__delete__privileged_members(role):
"""Privileged member should be able to cancel invitation."""
identity = factories.IdentityFactory()
team = factories.TeamFactory(users=[(identity.user, role)])
invitation = factories.InvitationFactory(team=team)
client = APIClient()
client.force_login(identity.user)
response = client.delete(
f"/api/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == status.HTTP_204_NO_CONTENT
def test_api_team_invitations__delete__members():
"""Member should not be able to cancel invitation."""
identity = factories.IdentityFactory()
team = factories.TeamFactory(users=[(identity.user, "member")])
invitation = factories.InvitationFactory(team=team)
client = APIClient()
client.force_login(identity.user)
response = client.delete(
f"/api/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert (
response.json()["detail"]
== "You do not have permission to perform this action."
)

View File

@@ -2,7 +2,10 @@
Unit tests for the Invitation model
"""
from django.core.exceptions import ValidationError
import time
from django.contrib.auth.models import AnonymousUser
from django.core import exceptions
import pytest
@@ -11,21 +14,28 @@ from core import factories
pytestmark = pytest.mark.django_db
def test_models_invitations_readonly_after_create():
"""Existing invitations should be readonly."""
invitation = factories.InvitationFactory()
with pytest.raises(exceptions.PermissionDenied):
invitation.save()
def test_models_invitations_email_no_empty_mail():
"""The "email" field should not be empty."""
with pytest.raises(ValidationError, match="This field cannot be blank"):
with pytest.raises(exceptions.ValidationError, match="This field cannot be blank"):
factories.InvitationFactory(email="")
def test_models_invitations_email_no_null_mail():
"""The "email" field is required."""
with pytest.raises(ValidationError, match="This field cannot be null"):
with pytest.raises(exceptions.ValidationError, match="This field cannot be null"):
factories.InvitationFactory(email=None)
def test_models_invitations_team_required():
"""The "team" field is required."""
with pytest.raises(ValidationError, match="This field cannot be null"):
with pytest.raises(exceptions.ValidationError, match="This field cannot be null"):
factories.InvitationFactory(team=None)
@@ -37,11 +47,94 @@ def test_models_invitations_team_should_be_team_instance():
def test_models_invitations_role_required():
"""The "role" field is required."""
with pytest.raises(ValidationError, match="This field cannot be blank"):
with pytest.raises(exceptions.ValidationError, match="This field cannot be blank"):
factories.InvitationFactory(role="")
def test_models_invitations_role_among_choices():
"""The "role" field should be a valid choice."""
with pytest.raises(ValidationError, match="Value 'boss' is not a valid choice"):
with pytest.raises(
exceptions.ValidationError, match="Value 'boss' is not a valid choice"
):
factories.InvitationFactory(role="boss")
def test_models_invitations__is_expired(settings):
"""
The 'is_expired' property should return False until validity duration
is exceeded and True afterwards.
"""
expired_invitation = factories.InvitationFactory()
assert expired_invitation.is_expired is False
settings.INVITATION_VALIDITY_DURATION = 1
time.sleep(1)
assert expired_invitation.is_expired is True
# get_abilities
def test_models_team_invitations_get_abilities_anonymous():
"""Check abilities returned for an anonymous user."""
access = factories.InvitationFactory()
abilities = access.get_abilities(AnonymousUser())
assert abilities == {
"delete": False,
"get": False,
"patch": False,
"put": False,
}
def test_models_team_invitations_get_abilities_authenticated():
"""Check abilities returned for an authenticated user."""
access = factories.InvitationFactory()
user = factories.UserFactory()
abilities = access.get_abilities(user)
assert abilities == {
"delete": False,
"get": False,
"patch": False,
"put": False,
}
@pytest.mark.parametrize("role", ["administrator", "owner"])
def test_models_team_invitations_get_abilities_privileged_member(role):
"""Check abilities for a team member with a privileged role."""
pivileged_access = factories.TeamAccessFactory(role=role)
team = pivileged_access.team
factories.TeamAccessFactory(team=team) # another one
invitation = factories.InvitationFactory(team=team)
abilities = invitation.get_abilities(pivileged_access.user)
assert abilities == {
"delete": True,
"get": True,
"patch": False,
"put": False,
}
def test_models_team_invitations_get_abilities_member():
"""Check abilities for a team member with 'member' role."""
member_access = factories.TeamAccessFactory(role="member")
team = member_access.team
factories.TeamAccessFactory(team=team) # another one
invitation = factories.InvitationFactory(team=team)
abilities = invitation.get_abilities(member_access.user)
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
}

View File

@@ -22,6 +22,12 @@ team_related_router.register(
basename="team_accesses",
)
team_related_router.register(
"invitations",
viewsets.InvitationViewset,
basename="invitations",
)
urlpatterns = [
path(
f"api/{settings.API_VERSION}/",