(resource-server) add team invitation endpoint

This allows a service provider to add new members to a team.
This commit is contained in:
Quentin BEY
2025-03-04 13:17:35 +01:00
committed by BEY Quentin
parent ae92ab5dd8
commit b063f690f6
9 changed files with 753 additions and 0 deletions

View File

@@ -4,6 +4,8 @@ from django.core import exceptions
from rest_framework import permissions
from core import models
class IsAuthenticated(permissions.BasePermission):
"""
@@ -68,3 +70,28 @@ class TeamPermission(IsAuthenticated):
abilities = request.user.get_abilities()
return abilities["teams"]["can_create"]
class TeamInvitationCreationPermission(IsAuthenticated):
"""Permission class that allows only team owners and admins to perform actions."""
def has_permission(self, request, view):
"""Check if user is authenticated and has required role for the team."""
if not super().has_permission(request, view):
return False
# Only check roles for edition operations
if request.method in permissions.SAFE_METHODS:
return True
team_id = view.kwargs.get("team_id")
if not team_id:
return False
team_access = models.TeamAccess.objects.filter(
team_id=team_id,
user=request.user,
role__in=[models.RoleChoices.OWNER, models.RoleChoices.ADMIN],
).exists()
return team_access

View File

@@ -50,3 +50,28 @@ class TeamSerializer(serializers.ModelSerializer):
"service_providers": [service_provider],
},
)
class InvitationSerializer(serializers.ModelSerializer):
"""Serialize invitations."""
class Meta:
model = models.Invitation
fields = ["id", "created_at", "email", "team", "role", "issuer", "is_expired"]
read_only_fields = ["id", "created_at", "team", "issuer", "is_expired"]
def validate(self, attrs):
"""Fill team and issuer from request."""
is_team_available_for_service_provider = models.Team.objects.filter(
id=self.context["team_id"],
service_providers__audience_id=self.context[
"from_service_provider_audience"
],
).exists()
if not is_team_available_for_service_provider:
raise serializers.ValidationError({"team": "Team not found."})
attrs["team_id"] = self.context["team_id"]
attrs["issuer"] = self.context["request"].user # User is authenticated
return attrs

View File

@@ -123,3 +123,79 @@ class TeamViewSet( # pylint: disable=too-many-ancestors
user=self.request.user,
role=models.RoleChoices.OWNER,
)
class InvitationViewset( # pylint: disable=too-many-ancestors
ResourceServerMixin,
mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.DestroyModelMixin,
viewsets.GenericViewSet,
):
"""API ViewSet for user invitations to team via resource server.
GET /resource-server/v1.0/teams/<team_id>/invitations/:<invitation_id>/
Return list of invitations related to that team or one
team access if an id is provided.
POST /resource-server/v1.0/teams/<team_id>/invitations/ with expected data:
- email: str
- role: str [owner|admin|member]
- issuer : User, automatically added from user making query, if allowed
- team : Team, automatically added from requested URI
Return newly created invitation
PUT / PATCH : Not permitted. Instead of updating your invitation,
delete and create a new one.
DELETE /resource-server/v1.0/teams/<team_id>/invitations/<invitation_id>/
Delete targeted invitation
"""
lookup_field = "id"
pagination_class = Pagination
permission_classes = [permissions.AccessPermission]
serializer_class = serializers.InvitationSerializer
def get_permissions(self):
"""Set specific permissions based on the action."""
if self.action == "list":
permission_classes = [permissions.IsAuthenticated]
elif self.action == "create":
permission_classes = [permissions.TeamInvitationCreationPermission]
else:
permission_classes = [permissions.AccessPermission]
return [permission() for permission in permission_classes]
def get_serializer_context(self):
"""Extra context provided to the serializer class."""
context = super().get_serializer_context()
context["team_id"] = self.kwargs["team_id"]
return context
def get_queryset(self):
"""Return the queryset according to the action."""
service_provider_audience = self._get_service_provider_audience()
# Determine which role the logged-in user has in the team
user_role_query = models.TeamAccess.objects.filter(
user=self.request.user, team=self.kwargs["team_id"]
).values("role")[:1]
queryset = (
models.Invitation.objects.select_related("team")
.filter(
team=self.kwargs["team_id"],
# The logged-in user should be part of a team to see its accesses
team__accesses__user=self.request.user,
# The team should be accessible by the service provider audience
team__service_providers__audience_id=service_provider_audience,
)
.annotate(user_role=Subquery(user_role_query))
.order_by("-created_at")
.distinct()
)
return queryset

View File

@@ -0,0 +1 @@
"""Tests for the resource server Team Invitation API endpoints."""

View File

@@ -0,0 +1,180 @@
"""
Tests for Teams API endpoint in People's core app: create
"""
import pytest
from rest_framework.status import (
HTTP_201_CREATED,
HTTP_400_BAD_REQUEST,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
)
from rest_framework.test import APIClient
from core.factories import (
ServiceProviderFactory,
TeamAccessFactory,
TeamFactory,
UserFactory,
)
from core.models import Invitation
pytestmark = pytest.mark.django_db
def test_api_teams_invitations_create_anonymous():
"""Anonymous users should not be allowed to create team invitation."""
response = APIClient().post(
"/resource-server/v1.0/teams/ca143ed4-f83d-11ef-a8c5-af2e53ad69fb/invitations/",
{
"email": "toto@example.com",
"role": "member",
},
)
assert response.status_code == HTTP_401_UNAUTHORIZED
assert not Invitation.objects.exists()
def test_api_teams_invitations_create_authenticated_outsider(
client, force_login_via_resource_server
):
"""Users outside of team should not be permitted to invite to team."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.post(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
{
"email": "new@example.com",
"role": "member",
},
format="json",
)
assert response.status_code == HTTP_403_FORBIDDEN
assert not Invitation.objects.exists()
def test_api_teams_invitations_create_authenticated_member(
client, force_login_via_resource_server
):
"""Team members should not be permitted to create invitations."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role="member")
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.post(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
{
"email": "new@example.com",
"role": "member",
},
format="json",
)
assert response.status_code == HTTP_403_FORBIDDEN
assert not Invitation.objects.exists()
@pytest.mark.parametrize("role", ["owner", "administrator"])
def test_api_teams_invitations_create_authenticated_privileged(
client, force_login_via_resource_server, role
):
"""Owners and administrators should be able to create invitations."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role=role)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.post(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
{
"email": "new@example.com",
"role": "member",
},
format="json",
)
assert response.status_code == HTTP_201_CREATED
invitation = Invitation.objects.get()
assert response.json() == {
"id": str(invitation.id),
"created_at": invitation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"email": "new@example.com",
"team": str(team.id),
"role": "member",
"issuer": str(user.id),
"is_expired": False,
}
def test_api_teams_invitations_create_duplicate_email(
client, force_login_via_resource_server
):
"""Should not be able to invite the same email twice."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role="administrator")
# Create first invitation
with force_login_via_resource_server(client, user, service_provider.audience_id):
client.post(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
{
"email": "new@example.com",
"role": "member",
},
format="json",
)
# Try to create duplicate invitation
response = client.post(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
{
"email": "new@example.com",
"role": "member",
},
format="json",
)
assert response.status_code == HTTP_400_BAD_REQUEST
assert Invitation.objects.count() == 1
def test_api_teams_invitations_create_wrong_service_provider(
client, force_login_via_resource_server
):
"""Should not create invitation when accessing with wrong service provider."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
wrong_service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role="administrator")
with force_login_via_resource_server(
client, user, wrong_service_provider.audience_id
):
response = client.post(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
{
"email": "new@example.com",
"role": "member",
},
format="json",
)
assert response.status_code == HTTP_400_BAD_REQUEST
assert response.json() == {"team": ["Team not found."]}
assert not Invitation.objects.exists()

View File

@@ -0,0 +1,138 @@
"""
Tests for Teams Invitations API endpoint in People's core app: delete
"""
import pytest
from rest_framework.status import (
HTTP_204_NO_CONTENT,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
)
from rest_framework.test import APIClient
from core.factories import (
InvitationFactory,
ServiceProviderFactory,
TeamAccessFactory,
TeamFactory,
UserFactory,
)
from core.models import Invitation
pytestmark = pytest.mark.django_db
def test_api_teams_invitations_delete_anonymous():
"""Anonymous users should not be allowed to delete team invitations."""
invitation = InvitationFactory()
team = invitation.team
response = APIClient().delete(
f"/resource-server/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == HTTP_401_UNAUTHORIZED
assert Invitation.objects.filter(id=invitation.id).exists()
def test_api_teams_invitations_delete_authenticated_outsider(
client, force_login_via_resource_server
):
"""Users outside of team should not be permitted to delete team invitations."""
user = UserFactory()
invitation = InvitationFactory()
team = invitation.team
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.delete(
f"/resource-server/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Invitation matches the given query."}
assert Invitation.objects.filter(id=invitation.id).exists()
def test_api_teams_invitations_delete_authenticated_member(
client, force_login_via_resource_server
):
"""Team members should not be permitted to delete invitations."""
user = UserFactory()
invitation = InvitationFactory()
team = invitation.team
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role="member")
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.delete(
f"/resource-server/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == HTTP_403_FORBIDDEN
assert Invitation.objects.filter(id=invitation.id).exists()
@pytest.mark.parametrize("role", ["owner", "administrator"])
def test_api_teams_invitations_delete_authenticated_privileged(
client, force_login_via_resource_server, role
):
"""Owners and administrators should be able to delete invitations."""
user = UserFactory()
invitation = InvitationFactory()
team = invitation.team
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role=role)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.delete(
f"/resource-server/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == HTTP_204_NO_CONTENT
assert not Invitation.objects.filter(id=invitation.id).exists()
def test_api_teams_invitations_delete_nonexistent(
client, force_login_via_resource_server
):
"""Should return 404 when trying to delete non-existent invitation."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role="administrator")
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.delete(
f"/resource-server/v1.0/teams/{team.id}/invitations/nonexistent-uuid/",
)
assert response.status_code == HTTP_404_NOT_FOUND
def test_api_teams_invitations_delete_wrong_service_provider(
client, force_login_via_resource_server
):
"""Should not delete invitation when accessing with wrong service provider."""
user = UserFactory()
invitation = InvitationFactory()
team = invitation.team
service_provider = ServiceProviderFactory()
wrong_service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role="administrator")
with force_login_via_resource_server(
client, user, wrong_service_provider.audience_id
):
response = client.delete(
f"/resource-server/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == HTTP_404_NOT_FOUND
assert Invitation.objects.filter(id=invitation.id).exists()

View File

@@ -0,0 +1,157 @@
"""
Tests for Teams Invitations API endpoint in People's core app: list
"""
import pytest
from rest_framework.status import (
HTTP_200_OK,
HTTP_401_UNAUTHORIZED,
)
from rest_framework.test import APIClient
from core.factories import (
InvitationFactory,
ServiceProviderFactory,
TeamAccessFactory,
TeamFactory,
UserFactory,
)
pytestmark = pytest.mark.django_db
def test_api_teams_invitations_list_anonymous():
"""Anonymous users should not be allowed to list team invitations."""
team = TeamFactory()
InvitationFactory.create_batch(size=3, team=team)
response = APIClient().get(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
)
assert response.status_code == HTTP_401_UNAUTHORIZED
def test_api_teams_invitations_list_authenticated_outsider(
client, force_login_via_resource_server
):
"""Users outside of team should not be permitted to list team invitations."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
InvitationFactory.create_batch(size=3, team=team)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
)
assert response.status_code == HTTP_200_OK
assert response.json() == {
"count": 0,
"next": None,
"previous": None,
"results": [],
}
@pytest.mark.parametrize("role", ["member", "administrator", "owner"])
def test_api_teams_invitations_list_authenticated_team_member(
client, force_login_via_resource_server, role
):
"""Team members should be able to list invitations."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role=role)
invitations = InvitationFactory.create_batch(size=3, team=team)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
)
assert response.status_code == HTTP_200_OK
data = response.json()
assert data["count"] == 3
assert len(data["results"]) == 3
# Check invitations are ordered by creation date (most recent first)
assert [item["id"] for item in data["results"]] == [
str(invitation.id) for invitation in reversed(invitations)
]
def test_api_teams_invitations_list_pagination(client, force_login_via_resource_server):
"""Should properly paginate results."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role="member")
InvitationFactory.create_batch(size=15, team=team)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
{"page_size": 10},
)
assert response.status_code == HTTP_200_OK
data = response.json()
assert data["count"] == 15
assert len(data["results"]) == 10
assert data["next"] is not None
assert data["previous"] is None
def test_api_teams_invitations_list_empty(client, force_login_via_resource_server):
"""Should return empty list when no invitations exist."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role="member")
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
)
assert response.status_code == HTTP_200_OK
data = response.json()
assert data["count"] == 0
assert len(data["results"]) == 0
def test_api_teams_invitations_list_wrong_service_provider(
client, force_login_via_resource_server
):
"""Should not list invitations when accessing with wrong service provider."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
wrong_service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role="member")
InvitationFactory.create_batch(size=3, team=team)
with force_login_via_resource_server(
client, user, wrong_service_provider.audience_id
):
response = client.get(
f"/resource-server/v1.0/teams/{team.id}/invitations/",
)
assert response.status_code == HTTP_200_OK
assert response.json() == {
"count": 0,
"next": None,
"previous": None,
"results": [],
}

View File

@@ -0,0 +1,140 @@
"""
Tests for Teams Invitations API endpoint in People's core app: retrieve
"""
import pytest
from rest_framework.status import (
HTTP_200_OK,
HTTP_401_UNAUTHORIZED,
HTTP_404_NOT_FOUND,
)
from rest_framework.test import APIClient
from core.factories import (
InvitationFactory,
ServiceProviderFactory,
TeamAccessFactory,
TeamFactory,
UserFactory,
)
pytestmark = pytest.mark.django_db
def test_api_teams_invitations_retrieve_anonymous():
"""Anonymous users should not be allowed to retrieve team invitations."""
invitation = InvitationFactory()
team = invitation.team
response = APIClient().get(
f"/resource-server/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == HTTP_401_UNAUTHORIZED
def test_api_teams_invitations_retrieve_authenticated_outsider(
client, force_login_via_resource_server
):
"""Users outside of team should not be permitted to retrieve team invitations."""
user = UserFactory()
invitation = InvitationFactory()
team = invitation.team
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Invitation matches the given query."}
@pytest.mark.parametrize("role", ["member", "administrator", "owner"])
def test_api_teams_invitations_retrieve_authenticated_team_member(
client, force_login_via_resource_server, role
):
"""Team members should be able to retrieve invitations."""
user = UserFactory()
invitation = InvitationFactory()
team = invitation.team
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role=role)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == HTTP_200_OK
assert response.json() == {
"id": str(invitation.id),
"created_at": invitation.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"email": invitation.email,
"team": str(team.id),
"role": invitation.role,
"issuer": str(invitation.issuer.id),
"is_expired": invitation.is_expired,
}
def test_api_teams_invitations_retrieve_nonexistent(
client, force_login_via_resource_server
):
"""Should return 404 when trying to retrieve non-existent invitation."""
user = UserFactory()
team = TeamFactory()
service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role="member")
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{team.id}/invitations/nonexistent-uuid/",
)
assert response.status_code == HTTP_404_NOT_FOUND
def test_api_teams_invitations_retrieve_wrong_team(
client, force_login_via_resource_server
):
"""Should return 404 when trying to retrieve invitation from wrong team."""
user = UserFactory()
invitation = InvitationFactory()
wrong_team = TeamFactory()
service_provider = ServiceProviderFactory()
wrong_team.service_providers.add(service_provider)
TeamAccessFactory(team=wrong_team, user=user, role="member")
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{wrong_team.id}/invitations/{invitation.id}/",
)
assert response.status_code == HTTP_404_NOT_FOUND
def test_api_teams_invitations_retrieve_wrong_service_provider(
client, force_login_via_resource_server
):
"""Should not retrieve invitation when accessing with wrong service provider."""
user = UserFactory()
invitation = InvitationFactory()
team = invitation.team
service_provider = ServiceProviderFactory()
wrong_service_provider = ServiceProviderFactory()
team.service_providers.add(service_provider)
TeamAccessFactory(team=team, user=user, role="member")
with force_login_via_resource_server(
client, user, wrong_service_provider.audience_id
):
response = client.get(
f"/resource-server/v1.0/teams/{team.id}/invitations/{invitation.id}/",
)
assert response.status_code == HTTP_404_NOT_FOUND

View File

@@ -13,6 +13,14 @@ from core.resource_server.urls import urlpatterns as resource_server_urls
router = DefaultRouter()
router.register("teams", viewsets.TeamViewSet, basename="teams")
# - Routes nested under a team
team_related_router = DefaultRouter()
team_related_router.register(
"invitations",
viewsets.InvitationViewset,
basename="invitations",
)
# - Routes nested under a team
# Invitations will be added later
@@ -24,6 +32,7 @@ urlpatterns = [
[
*router.urls,
*resource_server_urls,
path("teams/<uuid:team_id>/", include(team_related_router.urls)),
]
),
),