(teams) return parent teams in resource server

Also return the parent teams in the user's team endpoints.
This commit is contained in:
Quentin BEY
2024-12-17 18:21:24 +01:00
committed by BEY Quentin
parent 201864db3a
commit 45fd10fd2d
4 changed files with 337 additions and 5 deletions

View File

@@ -1,6 +1,10 @@
"""Resource server API endpoints"""
from django.db.models import OuterRef, Prefetch, Subquery
import operator
from functools import reduce
from django.db.models import OuterRef, Prefetch, Q, Subquery, Value
from django.db.models.functions import Coalesce
from rest_framework import (
filters,
@@ -56,6 +60,14 @@ class TeamViewSet( # pylint: disable=too-many-ancestors
def get_queryset(self):
"""Custom queryset to get user related teams."""
teams_queryset = models.Team.objects.filter(
accesses__user=self.request.user,
)
depth_path = teams_queryset.values("depth", "path")
if not depth_path:
return models.Team.objects.none()
user_role_query = models.TeamAccess.objects.filter(
user=self.request.user, team=OuterRef("pk")
).values("role")[:1]
@@ -74,10 +86,33 @@ class TeamViewSet( # pylint: disable=too-many-ancestors
service_provider_prefetch,
)
.filter(
accesses__user=self.request.user,
reduce(
operator.or_,
(
Q(
# The team the user has access to
depth=d["depth"],
path=d["path"],
)
| Q(
# The parent team the user has access to
depth__lt=d["depth"],
path__startswith=d["path"][: models.Team.steplen],
organization_id=self.request.user.organization_id,
)
for d in depth_path
),
),
service_providers__audience_id=service_provider_audience,
)
.annotate(user_role=Subquery(user_role_query))
# Abilities are computed based on logged-in user's role for the team
# and if the user does not have access, it's ok to consider them as a member
# because it's a parent team.
.annotate(
user_role=Coalesce(
Subquery(user_role_query), Value(models.RoleChoices.MEMBER.value)
)
)
)
def perform_create(self, serializer):

View File

@@ -64,7 +64,8 @@ def test_api_teams_list_authenticated( # pylint: disable=too-many-locals
# Authenticate using the resource server, ie via the Authorization header
with force_login_via_resource_server(client, user, service_provider.audience_id):
with django_assert_num_queries(4): # Count, Team, ServiceProvider, TeamAccess
with django_assert_num_queries(5):
# queries: Team path, Count, Team, ServiceProvider, TeamAccess
response = client.get(
"/resource-server/v1.0/teams/?ordering=created_at",
format="json",
@@ -182,3 +183,81 @@ def test_api_teams_order_param(client, force_login_via_resource_server):
assert (
response_team_ids == team_ids
), "created_at values are not sorted from oldest to newest"
def test_api_teams_list_with_parent_teams(client, force_login_via_resource_server):
"""
Authenticated users should be able to list teams including parent teams.
Parent teams should not be listed if they don't have the service provider.
"""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
root_team = factories.TeamFactory(name="Root", service_providers=[service_provider])
first_team = factories.TeamFactory(name="First", parent_id=root_team.pk)
second_team = factories.TeamFactory(
name="Second", parent_id=first_team.pk, service_providers=[service_provider]
)
factories.TeamAccessFactory(user=user, team=second_team, role="member")
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
"/resource-server/v1.0/teams/",
format="json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_200_OK
response_data = response.json()
assert response_data["count"] == 2
team_ids = [team["id"] for team in response_data["results"]]
assert len(team_ids) == 2
assert set(team_ids) == {str(root_team.id), str(second_team.id)}
def test_api_teams_list_with_parent_teams_other_organization(
client, force_login_via_resource_server
):
"""
Authenticated users should be able to list teams including parent teams.
Parent teams should not be listed if they don't have the service provider
or if the user does not belong to the organization.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()
other_organization = factories.OrganizationFactory(with_registration_id=True)
root_team = factories.TeamFactory(
name="Root",
service_providers=[service_provider],
organization=other_organization,
)
first_team = factories.TeamFactory(
name="First", parent_id=root_team.pk, organization=other_organization
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=other_organization,
)
factories.TeamAccessFactory(user=user, team=second_team, role="member")
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
"/resource-server/v1.0/teams/",
format="json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_200_OK
response_data = response.json()
assert response_data["count"] == 1
team_ids = [team["id"] for team in response_data["results"]]
assert len(team_ids) == 1
assert set(team_ids) == {str(second_team.id)}

View File

@@ -8,7 +8,7 @@ from rest_framework.status import HTTP_404_NOT_FOUND
from rest_framework.test import APIClient
from core import factories
from core.factories import UserFactory
from core.factories import TeamAccessFactory, UserFactory
pytestmark = pytest.mark.django_db
@@ -97,3 +97,129 @@ def test_api_teams_retrieve_authenticated_other_service_provider(
)
assert response.status_code == HTTP_404_NOT_FOUND
def test_api_teams_retrieve_authenticated_related_parent_same_organization(
client, force_login_via_resource_server
):
"""
Authenticated users should be allowed to retrieve a parent team
of a team to which they are related, only if they belong to the
same organization.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()
root_team = factories.TeamFactory(
name="Root",
organization=organization,
)
first_team = factories.TeamFactory(
name="First",
parent_id=root_team.pk,
organization=organization,
service_providers=[service_provider],
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=organization,
)
TeamAccessFactory(user=user, team=second_team, role="member")
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{first_team.pk}/",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == status.HTTP_200_OK
assert response.json() == {
"id": str(first_team.pk),
"name": first_team.name,
"created_at": first_team.created_at.isoformat().replace("+00:00", "Z"),
"updated_at": first_team.updated_at.isoformat().replace("+00:00", "Z"),
}
def test_api_teams_retrieve_authenticated_related_parent_other_organization(
client, force_login_via_resource_server
):
"""
Authenticated users should be allowed to retrieve a parent team
of a team to which they are related, only if they belong to the
same organization.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()
other_organization = factories.OrganizationFactory(with_registration_id=True)
root_team = factories.TeamFactory(
name="Root",
organization=other_organization,
)
first_team = factories.TeamFactory(
name="First",
parent_id=root_team.pk,
organization=other_organization,
service_providers=[service_provider],
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=other_organization,
)
TeamAccessFactory(user=user, team=second_team, role="member")
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{first_team.pk}/",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Team matches the given query."}
def test_api_teams_retrieve_authenticated_related_child_same_organization(
client, force_login_via_resource_server
):
"""
Authenticated users should NOT be allowed to retrieve a child team
of a team to which they are related, even if they belong to the
same organization.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()
root_team = factories.TeamFactory(
name="Root",
organization=organization,
)
first_team = factories.TeamFactory(
name="First",
parent_id=root_team.pk,
organization=organization,
service_providers=[service_provider],
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=organization,
)
TeamAccessFactory(user=user, team=first_team, role="member")
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{second_team.pk}/",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Team matches the given query."}

View File

@@ -244,3 +244,95 @@ def test_api_teams_update_administrator_or_owner_of_another(
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
assert team_values == old_team_values
@pytest.mark.parametrize("role", ["administrator", "member", "owner"])
def test_api_teams_update_parent_team(client, force_login_via_resource_server, role):
"""
Belonging to a team should NOT grant authorization to update
another parent team.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()
root_team = factories.TeamFactory(
name="Root",
organization=organization,
)
first_team = factories.TeamFactory(
name="First",
parent_id=root_team.pk,
organization=organization,
service_providers=[service_provider],
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=organization,
)
factories.TeamAccessFactory(user=user, team=second_team, role=role)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.patch(
f"/resource-server/v1.0/teams/{first_team.pk}/",
{
"name": "New name",
},
content_type="application/json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_403_FORBIDDEN
assert response.json() == {
"detail": "You do not have permission to perform this action."
}
first_team.refresh_from_db()
assert first_team.name == "First"
@pytest.mark.parametrize("role", ["administrator", "member", "owner"])
def test_api_teams_update_child_team(client, force_login_via_resource_server, role):
"""
Belonging to a team should NOT grant authorization to update
another child team.
"""
organization = factories.OrganizationFactory(with_registration_id=True)
user = factories.UserFactory(organization=organization)
service_provider = factories.ServiceProviderFactory()
root_team = factories.TeamFactory(
name="Root",
organization=organization,
)
first_team = factories.TeamFactory(
name="First",
parent_id=root_team.pk,
organization=organization,
service_providers=[service_provider],
)
second_team = factories.TeamFactory(
name="Second",
parent_id=first_team.pk,
service_providers=[service_provider],
organization=organization,
)
factories.TeamAccessFactory(user=user, team=first_team, role=role)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.patch(
f"/resource-server/v1.0/teams/{second_team.pk}/",
{
"name": "New name",
},
content_type="application/json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Team matches the given query."}
second_team.refresh_from_db()
assert second_team.name == "Second"