From a041296f8aa1a9fdff46851cb425f18220447d1a Mon Sep 17 00:00:00 2001 From: Quentin BEY Date: Mon, 4 Nov 2024 11:32:41 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8(backend)=20add=20ServiceProvider?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This adds the ServiceProvider notion to allow to better manage which teams is available for each service provider. --- CHANGELOG.md | 1 + src/backend/core/admin.py | 36 ++- src/backend/core/api/serializers.py | 12 + src/backend/core/api/viewsets.py | 42 ++- src/backend/core/factories.py | 16 ++ .../migrations/0005_add_serviceprovider.py | 39 +++ src/backend/core/models.py | 44 ++++ .../core/resource_server/authentication.py | 17 ++ src/backend/core/resource_server/backend.py | 16 ++ .../core/resource_server_api/__init__.py | 1 + .../core/resource_server_api/serializers.py | 46 ++++ .../core/resource_server_api/viewsets.py | 90 +++++++ src/backend/core/tests/conftest.py | 79 ++++++ .../resource_server/test_authentication.py | 178 +++++++++++++ .../tests/resource_server/test_backend.py | 4 +- .../tests/resource_server_api/__init__.py | 1 + .../resource_server_api/teams/__init__.py | 1 + .../resource_server_api/teams/test_create.py | 163 ++++++++++++ .../resource_server_api/teams/test_delete.py | 49 ++++ .../resource_server_api/teams/test_list.py | 184 +++++++++++++ .../teams/test_retrieve.py | 99 +++++++ .../resource_server_api/teams/test_update.py | 246 ++++++++++++++++++ .../core/tests/team_accesses/__init__.py | 1 + src/backend/core/tests/teams/__init__.py | 1 + .../tests/teams/test_core_api_teams_list.py | 4 - .../teams/test_core_api_teams_retrieve.py | 1 + .../tests/teams/test_core_api_teams_update.py | 31 +++ 27 files changed, 1392 insertions(+), 10 deletions(-) create mode 100644 src/backend/core/migrations/0005_add_serviceprovider.py create mode 100644 src/backend/core/resource_server_api/__init__.py create mode 100644 src/backend/core/resource_server_api/serializers.py create mode 100644 src/backend/core/resource_server_api/viewsets.py create mode 100644 src/backend/core/tests/conftest.py create mode 100644 src/backend/core/tests/resource_server/test_authentication.py create mode 100644 src/backend/core/tests/resource_server_api/__init__.py create mode 100644 src/backend/core/tests/resource_server_api/teams/__init__.py create mode 100644 src/backend/core/tests/resource_server_api/teams/test_create.py create mode 100644 src/backend/core/tests/resource_server_api/teams/test_delete.py create mode 100644 src/backend/core/tests/resource_server_api/teams/test_list.py create mode 100644 src/backend/core/tests/resource_server_api/teams/test_retrieve.py create mode 100644 src/backend/core/tests/resource_server_api/teams/test_update.py create mode 100644 src/backend/core/tests/team_accesses/__init__.py create mode 100644 src/backend/core/tests/teams/__init__.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d46382..661b8da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ and this project adheres to - ✨(domains) allow creation of "pending" mailboxes - ✨(teams) allow team management for team admins/owners #509 +- ✨(backend) add ServiceProvider #522 ### Fixed diff --git a/src/backend/core/admin.py b/src/backend/core/admin.py index c0b3d4a..adfe05d 100644 --- a/src/backend/core/admin.py +++ b/src/backend/core/admin.py @@ -108,11 +108,20 @@ class UserAdmin(auth_admin.UserAdmin): get_user.short_description = _("User") +class TeamServiceProviderInline(admin.TabularInline): + """Inline admin class for service providers.""" + + can_delete = False + model = models.Team.service_providers.through + extra = 0 + + @admin.register(models.Team) class TeamAdmin(admin.ModelAdmin): """Team admin interface declaration.""" - inlines = (TeamAccessInline, TeamWebhookInline) + inlines = (TeamAccessInline, TeamWebhookInline, TeamServiceProviderInline) + exclude = ("service_providers",) # Handled by the inline list_display = ( "name", "created_at", @@ -188,6 +197,14 @@ class ContactAdmin(admin.ModelAdmin): ) +class OrganizationServiceProviderInline(admin.TabularInline): + """Inline admin class for service providers.""" + + can_delete = False + model = models.Organization.service_providers.through + extra = 0 + + @admin.register(models.Organization) class OrganizationAdmin(admin.ModelAdmin): """Admin interface for organizations.""" @@ -198,7 +215,8 @@ class OrganizationAdmin(admin.ModelAdmin): "updated_at", ) search_fields = ("name",) - inlines = (OrganizationAccessInline,) + inlines = (OrganizationAccessInline, OrganizationServiceProviderInline) + exclude = ("service_providers",) # Handled by the inline @admin.register(models.OrganizationAccess) @@ -213,3 +231,17 @@ class OrganizationAccessAdmin(admin.ModelAdmin): "created_at", "updated_at", ) + + +@admin.register(models.ServiceProvider) +class ServiceProviderAdmin(admin.ModelAdmin): + """Admin interface for service providers.""" + + list_display = ( + "name", + "audience_id", + "created_at", + "updated_at", + ) + search_fields = ("name", "audience_id") + readonly_fields = ("created_at", "updated_at") diff --git a/src/backend/core/api/serializers.py b/src/backend/core/api/serializers.py index 4e73651..f847ddf 100644 --- a/src/backend/core/api/serializers.py +++ b/src/backend/core/api/serializers.py @@ -4,6 +4,7 @@ from rest_framework import exceptions, serializers from timezone_field.rest_framework import TimeZoneSerializerField from core import models +from core.models import ServiceProvider class ContactSerializer(serializers.ModelSerializer): @@ -205,6 +206,9 @@ class TeamSerializer(serializers.ModelSerializer): """Serialize teams.""" abilities = serializers.SerializerMethodField(read_only=True) + service_providers = serializers.PrimaryKeyRelatedField( + queryset=ServiceProvider.objects.all(), many=True, required=False + ) class Meta: model = models.Team @@ -215,6 +219,7 @@ class TeamSerializer(serializers.ModelSerializer): "abilities", "created_at", "updated_at", + "service_providers", ] read_only_fields = [ "id", @@ -226,6 +231,13 @@ class TeamSerializer(serializers.ModelSerializer): def create(self, validated_data): """Create a new team with organization enforcement.""" + # When called as a resource server, we enforce the team service provider + if sp_audience := self.context.get("from_service_provider_audience", None): + service_provider, _created = models.ServiceProvider.objects.get_or_create( + audience_id=sp_audience + ) + validated_data["service_providers"] = [service_provider] + # Note: this is not the purpose of this API to check the user has an organization return super().create( validated_data=validated_data diff --git a/src/backend/core/api/viewsets.py b/src/backend/core/api/viewsets.py index 7713cd8..1eda7a3 100644 --- a/src/backend/core/api/viewsets.py +++ b/src/backend/core/api/viewsets.py @@ -18,6 +18,7 @@ from rest_framework.permissions import AllowAny from core import models +from ..resource_server.authentication import ResourceServerAuthentication from . import permissions, serializers SIMILARITY_THRESHOLD = 0.04 @@ -247,15 +248,52 @@ class TeamViewSet( queryset = models.Team.objects.all() pagination_class = None + def _get_service_provider_audience(self): + """Return the audience of the Service Provider from the OIDC introspected token.""" + if not isinstance( + self.request.successful_authenticator, ResourceServerAuthentication + ): + # We could check request.resource_server_token_audience here, but it's + # more explicit to check the authenticator type and assert the attribute + # existence. + return None + + # When used as a resource server, the request has a token audience + service_provider_audience = self.request.resource_server_token_audience + + if not service_provider_audience: # should not happen + raise exceptions.AuthenticationFailed( + "Resource server token audience not found in request" + ) + + return service_provider_audience + def get_queryset(self): """Custom queryset to get user related teams.""" user_role_query = models.TeamAccess.objects.filter( user=self.request.user, team=OuterRef("pk") ).values("role")[:1] - return models.Team.objects.filter(accesses__user=self.request.user).annotate( - user_role=Subquery(user_role_query) + + return ( + models.Team.objects.prefetch_related("accesses", "service_providers") + .filter( + accesses__user=self.request.user, + ) + .annotate(user_role=Subquery(user_role_query)) ) + def get_serializer_context(self): + """Extra context provided to the serializer class.""" + context = super().get_serializer_context() + + # When used as a resource server, we need to know the audience to automatically: + # - add the Service Provider to the team "scope" on creation + context["from_service_provider_audience"] = ( + self._get_service_provider_audience() + ) + + return context + def perform_create(self, serializer): """Set the current user as owner of the newly created team.""" team = serializer.save() diff --git a/src/backend/core/factories.py b/src/backend/core/factories.py index 4f8f19d..8fea20e 100644 --- a/src/backend/core/factories.py +++ b/src/backend/core/factories.py @@ -180,6 +180,13 @@ class TeamFactory(factory.django.DjangoModelFactory): else: TeamAccessFactory(team=self, user=user_entry[0], role=user_entry[1]) + @factory.post_generation + def service_providers(self, create, extracted, **kwargs): + """Add service providers to team from a given list of service providers.""" + if not create or not extracted: + return + self.service_providers.set(extracted) + class TeamAccessFactory(factory.django.DjangoModelFactory): """Create fake team user accesses for testing.""" @@ -212,3 +219,12 @@ class InvitationFactory(factory.django.DjangoModelFactory): email = factory.Faker("email") role = factory.fuzzy.FuzzyChoice([role[0] for role in models.RoleChoices.choices]) issuer = factory.SubFactory(UserFactory) + + +class ServiceProviderFactory(factory.django.DjangoModelFactory): + """A factory to create service providers for testing purposes.""" + + class Meta: + model = models.ServiceProvider + + audience_id = factory.Faker("uuid4") diff --git a/src/backend/core/migrations/0005_add_serviceprovider.py b/src/backend/core/migrations/0005_add_serviceprovider.py new file mode 100644 index 0000000..61677c3 --- /dev/null +++ b/src/backend/core/migrations/0005_add_serviceprovider.py @@ -0,0 +1,39 @@ +# Generated by Django 5.1.2 on 2024-11-07 16:24 + +import uuid +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0004_remove_team_slug'), + ] + + operations = [ + migrations.CreateModel( + name='ServiceProvider', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')), + ('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created at')), + ('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated at')), + ('name', models.CharField(max_length=256, unique=True, verbose_name='name')), + ('audience_id', models.CharField(db_index=True, max_length=256, unique=True, verbose_name='audience id')), + ], + options={ + 'verbose_name': 'service provider', + 'verbose_name_plural': 'service providers', + 'db_table': 'people_service_provider', + }, + ), + migrations.AddField( + model_name='organization', + name='service_providers', + field=models.ManyToManyField(blank=True, related_name='organizations', to='core.serviceprovider'), + ), + migrations.AddField( + model_name='team', + name='service_providers', + field=models.ManyToManyField(blank=True, related_name='teams', to='core.serviceprovider'), + ), + ] diff --git a/src/backend/core/models.py b/src/backend/core/models.py index 547c2dd..4fc5230 100644 --- a/src/backend/core/models.py +++ b/src/backend/core/models.py @@ -173,6 +173,34 @@ class Contact(BaseModel): raise exceptions.ValidationError({"data": [error_message]}) from e +class ServiceProvider(BaseModel): + """ + Represents a service provider that will consume our information. + + Organization uses this model to define the list of SP available to their users. + Team uses this model to define their visibility to the various SP. + """ + + name = models.CharField(_("name"), max_length=256, unique=True) + audience_id = models.CharField( + _("audience id"), max_length=256, unique=True, db_index=True + ) + + class Meta: + db_table = "people_service_provider" + verbose_name = _("service provider") + verbose_name_plural = _("service providers") + + def __str__(self): + return self.name + + def save(self, *args, **kwargs): + """Enforce name (even if ugly) from the `audience_id` field.""" + if not self.name: + self.name = self.audience_id # ok, same length + return super().save(*args, **kwargs) + + class OrganizationManager(models.Manager): """ Custom manager for the Organization model, to manage complexity/automation. @@ -282,6 +310,12 @@ class Organization(BaseModel): validators=[validate_unique_domain], ) + service_providers = models.ManyToManyField( + ServiceProvider, + related_name="organizations", + blank=True, + ) + objects = OrganizationManager() class Meta: @@ -539,6 +573,11 @@ class OrganizationAccess(BaseModel): class Team(BaseModel): """ Represents the link between teams and users, specifying the role a user has in a team. + + When a team is created from here, the user have to choose which Service Providers + can see it. + When a team is created from a Service Provider this one is automatically set in the + Team `service_providers`. """ name = models.CharField(max_length=100) @@ -556,6 +595,11 @@ class Team(BaseModel): null=True, # Need to be set to False when everything is migrated blank=True, # Need to be set to False when everything is migrated ) + service_providers = models.ManyToManyField( + ServiceProvider, + related_name="teams", + blank=True, + ) class Meta: db_table = "people_team" diff --git a/src/backend/core/resource_server/authentication.py b/src/backend/core/resource_server/authentication.py index 1455214..f6ecb2a 100644 --- a/src/backend/core/resource_server/authentication.py +++ b/src/backend/core/resource_server/authentication.py @@ -53,3 +53,20 @@ class ResourceServerAuthentication(OIDCAuthentication): pass return access_token + + def authenticate(self, request): + """ + Authenticate the request and return a tuple of (user, token) or None. + + We override the 'authenticate' method from the parent class to store + the introspected token audience inside the request. + """ + result = super().authenticate(request) # Might raise AuthenticationFailed + + if result is None: # Case when there is no access token + return None + + # Note: at this stage, the request is a "drf_request" object + request.resource_server_token_audience = self.backend.token_origin_audience + + return result diff --git a/src/backend/core/resource_server/backend.py b/src/backend/core/resource_server/backend.py index b7ce773..1f71017 100644 --- a/src/backend/core/resource_server/backend.py +++ b/src/backend/core/resource_server/backend.py @@ -61,6 +61,10 @@ class ResourceServerBackend: token_introspection={"essential": True}, ) + # Declare the token origin audience: to know where the token comes from + # and store it for further use in the application + self.token_origin_audience = None + # pylint: disable=unused-argument def get_or_create_user(self, access_token, id_token, payload): """Maintain API compatibility with OIDCAuthentication class from mozilla-django-oidc @@ -85,6 +89,8 @@ class ResourceServerBackend: that extends RFC 7662 by returning a signed and encrypted JWT for stronger assurance that the authorization server issued the token introspection response. """ + self.token_origin_audience = None # Reset the token origin audience + jwt = self._introspect(access_token) claims = self._verify_claims(jwt) user_info = self._verify_user_info(claims["token_introspection"]) @@ -100,6 +106,8 @@ class ResourceServerBackend: logger.debug("Login failed: No user with %s found", sub) return None + self.token_origin_audience = str(user_info["aud"]) + return user def _verify_user_info(self, introspection_response): @@ -127,6 +135,12 @@ class ResourceServerBackend: logger.debug(message) raise SuspiciousOperation(message) + audience = introspection_response.get("aud", None) + if not audience: + raise SuspiciousOperation( + "Introspection response does not provide source audience." + ) + return introspection_response def _introspect(self, token): @@ -219,6 +233,8 @@ class ResourceServerBackend: class ResourceServerImproperlyConfiguredBackend: """Fallback backend for improperly configured Resource Servers.""" + token_origin_audience = None + def get_or_create_user(self, access_token, id_token, payload): """Indicate that the Resource Server is improperly configured.""" raise AuthenticationFailed("Resource Server is improperly configured") diff --git a/src/backend/core/resource_server_api/__init__.py b/src/backend/core/resource_server_api/__init__.py new file mode 100644 index 0000000..c763d2d --- /dev/null +++ b/src/backend/core/resource_server_api/__init__.py @@ -0,0 +1 @@ +"""People core resource server API endpoints""" diff --git a/src/backend/core/resource_server_api/serializers.py b/src/backend/core/resource_server_api/serializers.py new file mode 100644 index 0000000..01029bf --- /dev/null +++ b/src/backend/core/resource_server_api/serializers.py @@ -0,0 +1,46 @@ +"""Client serializers for the People core app resource server API.""" + +from rest_framework import serializers + +from core import models + + +class TeamSerializer(serializers.ModelSerializer): + """Serialize teams.""" + + class Meta: + model = models.Team + fields = [ + "id", + "name", + "created_at", + "updated_at", + ] + read_only_fields = [ + "id", + "created_at", + "updated_at", + ] + + def create(self, validated_data): + """ + Create a new team with organization enforcement. + + In this context, called as a resource server, + the team service provider is enforced. + + When the service provider audience is unknown it is created on the fly. + """ + sp_audience = self.context["from_service_provider_audience"] + service_provider, _created = models.ServiceProvider.objects.get_or_create( + audience_id=sp_audience + ) + + # Note: this is not the purpose of this API to check the user has an organization + return super().create( + validated_data=validated_data + | { + "organization_id": self.context["request"].user.organization_id, + "service_providers": [service_provider], + }, + ) diff --git a/src/backend/core/resource_server_api/viewsets.py b/src/backend/core/resource_server_api/viewsets.py new file mode 100644 index 0000000..7df6b13 --- /dev/null +++ b/src/backend/core/resource_server_api/viewsets.py @@ -0,0 +1,90 @@ +"""Resource server API endpoints""" + +from django.db.models import OuterRef, Prefetch, Subquery + +from rest_framework import ( + filters, + mixins, + viewsets, +) + +from core import models +from core.api import permissions +from core.resource_server.mixins import ResourceServerMixin + +from ..api.viewsets import Pagination +from . import serializers + + +class TeamViewSet( # pylint: disable=too-many-ancestors + ResourceServerMixin, + mixins.CreateModelMixin, + mixins.ListModelMixin, + mixins.RetrieveModelMixin, + mixins.UpdateModelMixin, + viewsets.GenericViewSet, +): + """ + Team ViewSet dedicated to the resource server. + + The DELETE method is not allowed for now, because the use case is + not clear yet and it comes with complexity to know if we can delete + a team or not (eg. if a team has other SP, it might not be deleted + but what do we do then, only remove the current SP?). + + GET /resource-server/v1.0/teams/ + Return list of Teams of the user and available for the audience. + + POST /resource-server/v1.0/teams/ + Create a new Team for the user for the audience. + + GET /resource-server/v1.0/teams/{team_id}/ + Return the Team details if available for the audience. + + PUT /resource-server/v1.0/teams/{team_id}/ + Update the Team details (only name for now). + + """ + + permission_classes = [permissions.AccessPermission] + serializer_class = serializers.TeamSerializer + filter_backends = [filters.OrderingFilter] + ordering_fields = ["created_at"] + ordering = ["-created_at"] + queryset = models.Team.objects.all() + pagination_class = Pagination + + def get_queryset(self): + """Custom queryset to get user related teams.""" + user_role_query = models.TeamAccess.objects.filter( + user=self.request.user, team=OuterRef("pk") + ).values("role")[:1] + + service_provider_audience = self._get_service_provider_audience() + service_provider_prefetch = Prefetch( + "service_providers", + queryset=models.ServiceProvider.objects.filter( + audience_id=service_provider_audience + ), + ) + + return ( + models.Team.objects.prefetch_related( + "accesses", + service_provider_prefetch, + ) + .filter( + accesses__user=self.request.user, + service_providers__audience_id=service_provider_audience, + ) + .annotate(user_role=Subquery(user_role_query)) + ) + + def perform_create(self, serializer): + """Set the current user as owner of the newly created team.""" + team = serializer.save() + models.TeamAccess.objects.create( + team=team, + user=self.request.user, + role=models.RoleChoices.OWNER, + ) diff --git a/src/backend/core/tests/conftest.py b/src/backend/core/tests/conftest.py new file mode 100644 index 0000000..b0eaaee --- /dev/null +++ b/src/backend/core/tests/conftest.py @@ -0,0 +1,79 @@ +"""Defines fixtures for the resource server API tests.""" + +from contextlib import contextmanager +from typing import Optional +from unittest import mock + +from django.contrib.auth import get_user_model + +import pytest +import responses +from faker import Faker + +from core.resource_server.authentication import ResourceServerAuthentication + +User = get_user_model() +fake = Faker() + + +@contextmanager +def _force_login_via_resource_server( + client_fixture, + user: User, + service_provider_audience: Optional[str], +): + """ + Context manager to authenticate a user with a service provider via + a resource server call. + + This allows to authenticate a user with a service provider without doing + all the introspection process. + + This is a private function, use the `force_login_via_resource_server` + fixture instead. + + The `service_provider_audience` parameter might not match any existing + service provider audience, doing so allow to check the behavior when + the service provider is not yet known. + """ + + def mock_authenticate(self, request): # pylint: disable=unused-argument + request.resource_server_token_audience = ( + service_provider_audience or fake.pystr(min_chars=10, max_chars=10) + ) + return user, "unused-token" + + with mock.patch.object( + ResourceServerAuthentication, "authenticate", mock_authenticate + ): + client_fixture.force_login( + user, + backend="core.resource_server.authentication.ResourceServerAuthentication", + ) + yield + + +@pytest.fixture(name="force_login_via_resource_server") +@responses.activate +def force_login_via_resource_server_fixture(): + """ + Fixture to authenticate a user with a service provider via a resource server call. + + Usage: + ``` + def test_login_with_resource_server( + client, force_login_via_resource_server, + ): + user = UserFactory() + service_provider = ServiceProviderFactory() + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.get( + "/resource-server/v1.0//", + format="json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + # response is authenticated + ``` + """ + return _force_login_via_resource_server diff --git a/src/backend/core/tests/resource_server/test_authentication.py b/src/backend/core/tests/resource_server/test_authentication.py new file mode 100644 index 0000000..f324207 --- /dev/null +++ b/src/backend/core/tests/resource_server/test_authentication.py @@ -0,0 +1,178 @@ +"""Tests for the authentication process of the resource server.""" + +import base64 +import json + +import pytest +import responses +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from joserfc import jwe as jose_jwe +from joserfc import jwt as jose_jwt +from joserfc.rfc7518.rsa_key import RSAKey +from jwt.utils import to_base64url_uint +from rest_framework.request import Request as DRFRequest +from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED + +from core.factories import UserFactory +from core.models import ServiceProvider +from core.resource_server.authentication import ResourceServerAuthentication + +pytestmark = pytest.mark.django_db + + +def build_authorization_bearer(token): + """ + Build an Authorization Bearer header value from a token. + + This can be used like this: + client.post( + ... + HTTP_AUTHORIZATION=f"Bearer {build_authorization_bearer('some_token')}", + ) + """ + return base64.b64encode(token.encode("utf-8")).decode("utf-8") + + +@responses.activate +def test_resource_server_authentication_class(client, settings): + """ + Defines the settings for the resource server + for a full authentication with introspection process. + + This is an integration test that checks the authentication process + when using the ResourceServerAuthentication class. + + This test asserts the DRF request object contains the + `resource_server_token_audience` attribute which is used in + the resource server views. + + This test uses the `/resource-server/v1.0/teams/` URL as an example + because we don't want to create a new URL just for this test. + """ + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + + unencrypted_pem_private_key = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + + pem_public_key = private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + settings.OIDC_RS_PRIVATE_KEY_STR = unencrypted_pem_private_key.decode("utf-8") + settings.OIDC_RS_ENCRYPTION_KEY_TYPE = "RSA" + settings.OIDC_RS_ENCRYPTION_ENCODING = "A256GCM" + settings.OIDC_RS_ENCRYPTION_ALGO = "RSA-OAEP" + settings.OIDC_RS_SIGNING_ALGO = "RS256" + settings.OIDC_RS_CLIENT_ID = "some_client_id" + settings.OIDC_RS_CLIENT_SECRET = "some_client_secret" + + settings.OIDC_OP_URL = "https://oidc.example.com" + settings.OIDC_VERIFY_SSL = False + settings.OIDC_TIMEOUT = 5 + settings.OIDC_PROXY = None + settings.OIDC_OP_JWKS_ENDPOINT = "https://oidc.example.com/jwks" + settings.OIDC_OP_INTROSPECTION_ENDPOINT = "https://oidc.example.com/introspect" + + # Mock the JWKS endpoint + public_numbers = private_key.public_key().public_numbers() + responses.add( + responses.GET, + settings.OIDC_OP_JWKS_ENDPOINT, + body=json.dumps( + { + "keys": [ + { + "kty": settings.OIDC_RS_ENCRYPTION_KEY_TYPE, + "alg": settings.OIDC_RS_SIGNING_ALGO, + "use": "sig", + "kid": "1234567890", + "n": to_base64url_uint(public_numbers.n).decode("ascii"), + "e": to_base64url_uint(public_numbers.e).decode("ascii"), + } + ] + } + ), + ) + + def encrypt_jwt(json_data): + """Encrypt the JWT token for the backend to decrypt.""" + token = jose_jwt.encode( + { + "kid": "1234567890", + "alg": settings.OIDC_RS_SIGNING_ALGO, + }, + json_data, + RSAKey.import_key(unencrypted_pem_private_key), + algorithms=[settings.OIDC_RS_SIGNING_ALGO], + ) + + return jose_jwe.encrypt_compact( + protected={ + "alg": settings.OIDC_RS_ENCRYPTION_ALGO, + "enc": settings.OIDC_RS_ENCRYPTION_ENCODING, + }, + plaintext=token, + public_key=RSAKey.import_key(pem_public_key), + algorithms=[ + settings.OIDC_RS_ENCRYPTION_ALGO, + settings.OIDC_RS_ENCRYPTION_ENCODING, + ], + ) + + responses.add( + responses.POST, + "https://oidc.example.com/introspect", + body=encrypt_jwt( + { + "iss": "https://oidc.example.com", + "aud": "some_client_id", # settings.OIDC_RS_CLIENT_ID + "token_introspection": { + "sub": "very-specific-sub", + "iss": "https://oidc.example.com", + "aud": "some_service_provider", + "scope": "openid groups", + "active": True, + }, + } + ), + ) + + # Try to authenticate while the user does not exist => 401 + response = client.get( + "/resource-server/v1.0/teams/", # use an exising URL here + format="json", + HTTP_AUTHORIZATION=f"Bearer {build_authorization_bearer('some_token')}", + ) + assert response.status_code == HTTP_401_UNAUTHORIZED + assert ServiceProvider.objects.count() == 0 + + # Create a user with the specific sub, the access is authorized + UserFactory(sub="very-specific-sub") + + response = client.get( + "/resource-server/v1.0/teams/", # use an exising URL here + format="json", + HTTP_AUTHORIZATION=f"Bearer {build_authorization_bearer('some_token')}", + ) + + assert response.status_code == HTTP_200_OK + + response_request = response.renderer_context.get("request") + assert isinstance(response_request, DRFRequest) + assert isinstance( + response_request.successful_authenticator, ResourceServerAuthentication + ) + + # Check that the user is authenticated + assert response_request.user.is_authenticated + + # Check the request contains the resource server token audience + assert response_request.resource_server_token_audience == "some_service_provider" + + # Check that no service provider is created here + assert ServiceProvider.objects.count() == 0 diff --git a/src/backend/core/tests/resource_server/test_backend.py b/src/backend/core/tests/resource_server/test_backend.py index 73b808c..1acd82a 100644 --- a/src/backend/core/tests/resource_server/test_backend.py +++ b/src/backend/core/tests/resource_server/test_backend.py @@ -296,7 +296,7 @@ def test_introspect_public_key_import_failure( def test_verify_user_info_success(resource_server_backend): """Test '_verify_user_info' with a successful response.""" - introspection_response = {"active": True, "scope": "groups"} + introspection_response = {"active": True, "scope": "groups", "aud": "123"} result = resource_server_backend._verify_user_info(introspection_response) assert result == introspection_response @@ -333,7 +333,7 @@ def test_get_user_success(resource_server_backend): access_token = "valid_access_token" mock_jwt = Mock() - mock_claims = {"token_introspection": {"sub": "user123"}} + mock_claims = {"token_introspection": {"sub": "user123", "aud": "123"}} mock_user = Mock() resource_server_backend._introspect = Mock(return_value=mock_jwt) diff --git a/src/backend/core/tests/resource_server_api/__init__.py b/src/backend/core/tests/resource_server_api/__init__.py new file mode 100644 index 0000000..68dbc03 --- /dev/null +++ b/src/backend/core/tests/resource_server_api/__init__.py @@ -0,0 +1 @@ +"""Tests for the resource server API endpoints.""" diff --git a/src/backend/core/tests/resource_server_api/teams/__init__.py b/src/backend/core/tests/resource_server_api/teams/__init__.py new file mode 100644 index 0000000..03d9988 --- /dev/null +++ b/src/backend/core/tests/resource_server_api/teams/__init__.py @@ -0,0 +1 @@ +"""Tests for the resource server Team API endpoints.""" diff --git a/src/backend/core/tests/resource_server_api/teams/test_create.py b/src/backend/core/tests/resource_server_api/teams/test_create.py new file mode 100644 index 0000000..68ab4ca --- /dev/null +++ b/src/backend/core/tests/resource_server_api/teams/test_create.py @@ -0,0 +1,163 @@ +""" +Tests for Teams API endpoint in People's core app: create +""" + +import pytest +from rest_framework.status import ( + HTTP_201_CREATED, + HTTP_401_UNAUTHORIZED, +) +from rest_framework.test import APIClient + +from core.factories import OrganizationFactory, ServiceProviderFactory, UserFactory +from core.models import ServiceProvider, Team + +pytestmark = pytest.mark.django_db + + +def test_api_teams_create_anonymous(): + """Anonymous users should not be allowed to create teams.""" + response = APIClient().post( + "/resource-server/v1.0/teams/", + { + "name": "my team", + }, + ) + + assert response.status_code == HTTP_401_UNAUTHORIZED + assert not Team.objects.exists() + + +def test_api_teams_create_authenticated_new_service_provider( + client, force_login_via_resource_server +): + """ + Authenticated users should be able to create teams and should automatically be declared + as the owner of the newly created team and a new service provider should be created and + associated to the team. + """ + organization = OrganizationFactory(with_registration_id=True) + user = UserFactory(organization=organization) + assert ServiceProvider.objects.count() == 0 + + with force_login_via_resource_server(client, user, "some_service_provider"): + response = client.post( + "/resource-server/v1.0/teams/", + { + "name": "my team", + }, + format="json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_201_CREATED + + team = Team.objects.get() + team_access = team.accesses.get() + service_provider = ServiceProvider.objects.get() # service provider created + + assert response.json() == { + "created_at": team.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "id": str(team.pk), + "name": "my team", + "updated_at": team.updated_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + } + + # check team data + assert team.name == "my team" + assert team.organization == organization + + # check team access data + assert team_access.role == "owner" + assert team_access.user == user + + # check service provider data + assert service_provider.audience_id == "some_service_provider" + + +def test_api_teams_create_authenticated_existing_service_provider( + client, + force_login_via_resource_server, +): + """ + Authenticated users should be able to create teams and should automatically be declared + as the owner of the newly created team and an existing service provider should be associated + to the team. + """ + user = UserFactory() + service_provider = ServiceProviderFactory() + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.post( + "/resource-server/v1.0/teams/", + { + "name": "my team", + }, + format="json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_201_CREATED + + assert ServiceProvider.objects.count() == 1 # no object created + team = Team.objects.get() # team created + assert team.service_providers.get().audience_id == service_provider.audience_id + assert team.name == "my team" + assert team.accesses.filter(role="owner", user=user).exists() + + +def test_api_teams_create_cannot_override_organization( + client, force_login_via_resource_server +): + """ + Authenticated users should be able to create teams and not + be able to set the organization manually (for now). + """ + organization = OrganizationFactory(with_registration_id=True) + user = UserFactory(organization=organization) + service_provider = ServiceProviderFactory() + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.post( + "/resource-server/v1.0/teams/", + { + "name": "my team", + "organization": OrganizationFactory( + with_registration_id=True + ).pk, # ignored + }, + format="json", + ) + + assert response.status_code == HTTP_201_CREATED + team = Team.objects.get() + assert team.name == "my team" + assert team.organization == organization + assert team.accesses.filter(role="owner", user=user).exists() + + +def test_api_teams_create_cannot_override_service_provider( + client, force_login_via_resource_server +): + """ + Authenticated users should be able to create teams and not + be able to set the team service provider manually. + """ + user = UserFactory(with_organization=True) + service_provider = ServiceProviderFactory() + + other_service_provider = ServiceProviderFactory() + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.post( + "/resource-server/v1.0/teams/", + { + "name": "my team", + "service_providers": [str(other_service_provider.pk)], # ignored + }, + format="json", + ) + + assert response.status_code == HTTP_201_CREATED + + team = Team.objects.get() + assert team.name == "my team" + assert team.service_providers.get().audience_id == service_provider.audience_id diff --git a/src/backend/core/tests/resource_server_api/teams/test_delete.py b/src/backend/core/tests/resource_server_api/teams/test_delete.py new file mode 100644 index 0000000..2185ad1 --- /dev/null +++ b/src/backend/core/tests/resource_server_api/teams/test_delete.py @@ -0,0 +1,49 @@ +""" +Tests for Teams API endpoint in People's core app: delete +""" + +import pytest +from rest_framework.status import ( + HTTP_401_UNAUTHORIZED, + HTTP_405_METHOD_NOT_ALLOWED, +) +from rest_framework.test import APIClient + +from core import factories, models + +pytestmark = pytest.mark.django_db + + +def test_api_teams_delete_anonymous(): + """Anonymous users should not be allowed to destroy a team.""" + team = factories.TeamFactory() + + response = APIClient().delete( + f"/resource-server/v1.0/teams/{team.id!s}/", + ) + + assert response.status_code == HTTP_401_UNAUTHORIZED + assert models.Team.objects.count() == 1 + + +def test_api_teams_delete_not_allowed(client, force_login_via_resource_server): + """ + Authenticated users should not be allowed to delete a team from a resource + server, even if they have the right permissions. + + This may be implemented in the future, but for now, it is not allowed. + """ + user = factories.UserFactory() + service_provider = factories.ServiceProviderFactory() + team = factories.TeamFactory( + users=[(user, "owner")], service_providers=[service_provider] + ) + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.delete( + f"/resource-server/v1.0/teams/{team.id!s}/", + ) + + assert response.status_code == HTTP_405_METHOD_NOT_ALLOWED + assert response.json() == {"detail": 'Method "DELETE" not allowed.'} + assert models.Team.objects.count() == 1 diff --git a/src/backend/core/tests/resource_server_api/teams/test_list.py b/src/backend/core/tests/resource_server_api/teams/test_list.py new file mode 100644 index 0000000..121b334 --- /dev/null +++ b/src/backend/core/tests/resource_server_api/teams/test_list.py @@ -0,0 +1,184 @@ +""" +Tests for Teams API endpoint in People's core app: list +""" + +import pytest +from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED +from rest_framework.test import APIClient + +from core import factories + +pytestmark = pytest.mark.django_db + + +def test_api_teams_list_anonymous(): + """Anonymous users should not be allowed to list teams.""" + factories.TeamFactory.create_batch(2) + + response = APIClient().get("/resource-server/v1.0/teams/") + + assert response.status_code == HTTP_401_UNAUTHORIZED + assert response.json() == { + "detail": "Authentication credentials were not provided." + } + + +def test_api_teams_list_authenticated( # pylint: disable=too-many-locals + client, django_assert_num_queries, force_login_via_resource_server +): + """ + Authenticated users should be able to list teams + they are an owner/administrator/member of, and only list from the + requesting service provider should appear. + """ + user = factories.UserFactory() + service_provider = factories.ServiceProviderFactory() + hidden_service_provider = factories.ServiceProviderFactory() + + team_access_1 = factories.TeamAccessFactory( + user=user, team__service_providers=[service_provider], role="member" + ) + team_1 = team_access_1.team + + team_access_2 = factories.TeamAccessFactory( + user=user, + team__service_providers=[hidden_service_provider, service_provider], + role="member", + ) + team_2 = team_access_2.team + + team_access_3 = factories.TeamAccessFactory( + user=user, team__service_providers=[service_provider], role="administrator" + ) + team_3 = team_access_3.team + + team_access_4 = factories.TeamAccessFactory( + user=user, team__service_providers=[service_provider], role="owner" + ) + team_4 = team_access_4.team + + # Team filtered out because of the service provider + _not_included_team_access = factories.TeamAccessFactory( + user=user, team__service_providers=[hidden_service_provider] + ) + + # Authenticate using the resource server, ie via the Authorization header + with force_login_via_resource_server(client, user, service_provider.audience_id): + with django_assert_num_queries(4): # Count, Team, ServiceProvider, TeamAccess + response = client.get( + "/resource-server/v1.0/teams/?ordering=created_at", + format="json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_200_OK + assert response.json() == { + "count": 4, + "next": None, + "previous": None, + "results": [ + { + "created_at": team_1.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "id": str(team_1.pk), + "name": team_1.name, + "updated_at": team_1.updated_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + { + "created_at": team_2.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "id": str(team_2.pk), + "name": team_2.name, + "updated_at": team_2.updated_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + { + "created_at": team_3.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "id": str(team_3.pk), + "name": team_3.name, + "updated_at": team_3.updated_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + { + "created_at": team_4.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "id": str(team_4.pk), + "name": team_4.name, + "updated_at": team_4.updated_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + ], + } + + +def test_api_teams_list_authenticated_new_service_provider( + client, force_login_via_resource_server +): + """ + Team list from not yet known service provider should be empty (not fail). + + Teams without service providers should not be listed. + """ + user = factories.UserFactory() + _team = factories.TeamFactory(users=[user]) + + with force_login_via_resource_server(client, user, "some_service_provider"): + response = client.get( + "/resource-server/v1.0/teams/", + ) + + assert response.status_code == HTTP_200_OK + assert response.json() == { + "count": 0, + "next": None, + "previous": None, + "results": [], + } + + +def test_api_teams_list_authenticated_distinct(client, force_login_via_resource_server): + """A team with several related users should only be listed once.""" + user = factories.UserFactory() + service_provider = factories.ServiceProviderFactory() + + other_user = factories.UserFactory() + + team = factories.TeamFactory( + users=[user, other_user], service_providers=[service_provider] + ) + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.get( + "/resource-server/v1.0/teams/", + ) + + assert response.status_code == HTTP_200_OK + content = response.json() + assert content["count"] == 1 + + results = content["results"] + assert len(results) == 1 + assert results[0]["id"] == str(team.id) + + +def test_api_teams_order_param(client, force_login_via_resource_server): + """ + Test that the 'created_at' field is sorted in ascending order + when the 'ordering' query parameter is set. + """ + user = factories.UserFactory() + service_provider = factories.ServiceProviderFactory() + + team_ids = [ + str(team.id) + for team in factories.TeamFactory.create_batch( + 5, users=[user], service_providers=[service_provider] + ) + ] + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.get( + "/resource-server/v1.0/teams/?ordering=created_at", + ) + assert response.status_code == 200 + + response_data = response.json() + response_team_ids = [team["id"] for team in response_data["results"]] + + assert ( + response_team_ids == team_ids + ), "created_at values are not sorted from oldest to newest" diff --git a/src/backend/core/tests/resource_server_api/teams/test_retrieve.py b/src/backend/core/tests/resource_server_api/teams/test_retrieve.py new file mode 100644 index 0000000..0589360 --- /dev/null +++ b/src/backend/core/tests/resource_server_api/teams/test_retrieve.py @@ -0,0 +1,99 @@ +""" +Tests for Teams API endpoint in People's core app: retrieve +""" + +import pytest +from rest_framework import status +from rest_framework.status import HTTP_404_NOT_FOUND +from rest_framework.test import APIClient + +from core import factories +from core.factories import UserFactory + +pytestmark = pytest.mark.django_db + + +def test_api_teams_retrieve_anonymous(): + """Anonymous users should not be allowed to retrieve a team.""" + team = factories.TeamFactory() + response = APIClient().get(f"/resource-server/v1.0/teams/{team.id}/") + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + assert response.json() == { + "detail": "Authentication credentials were not provided." + } + + +def test_api_teams_retrieve_authenticated_unrelated( + client, force_login_via_resource_server +): + """ + Authenticated users should not be allowed to retrieve a team to which they are + not related. + """ + user = factories.UserFactory() + service_provider = factories.ServiceProviderFactory() + + team = factories.TeamFactory(service_providers=[service_provider]) + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.get( + f"/resource-server/v1.0/teams/{team.id!s}/", + ) + assert response.status_code == status.HTTP_404_NOT_FOUND + assert response.json() == {"detail": "No Team matches the given query."} + + +def test_api_teams_retrieve_authenticated_related( + client, force_login_via_resource_server +): + """ + Authenticated users should be allowed to retrieve a team to which they + are related whatever the role even if the request is authenticated via + a resource server. + """ + service_provider = factories.ServiceProviderFactory( + audience_id="some_service_provider" + ) + user = factories.UserFactory() + team = factories.TeamFactory(users=[user], service_providers=[service_provider]) + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.get( + f"/resource-server/v1.0/teams/{team.id!s}/", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == status.HTTP_200_OK + assert response.json() == { + "id": str(team.id), + "name": team.name, + "created_at": team.created_at.isoformat().replace("+00:00", "Z"), + "updated_at": team.updated_at.isoformat().replace("+00:00", "Z"), + } + + +def test_api_teams_retrieve_authenticated_other_service_provider( + client, force_login_via_resource_server +): + """ + Authenticated users should not be able to retrieve a team + if the request is authenticated via a different resource server. + """ + user = UserFactory() + service_provider = factories.ServiceProviderFactory() + + other_service_provider = factories.ServiceProviderFactory( + audience_id="some_service_provider" + ) + team = factories.TeamFactory( + users=[user], service_providers=[other_service_provider] + ) + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.get( + f"/resource-server/v1.0/teams/{team.id!s}/", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_404_NOT_FOUND diff --git a/src/backend/core/tests/resource_server_api/teams/test_update.py b/src/backend/core/tests/resource_server_api/teams/test_update.py new file mode 100644 index 0000000..94e1993 --- /dev/null +++ b/src/backend/core/tests/resource_server_api/teams/test_update.py @@ -0,0 +1,246 @@ +""" +Tests for Teams API endpoint in People's core app: update +""" + +import pytest +from rest_framework.status import ( + HTTP_200_OK, + HTTP_401_UNAUTHORIZED, + HTTP_403_FORBIDDEN, + HTTP_404_NOT_FOUND, +) +from rest_framework.test import APIClient + +from core import factories +from core.resource_server_api import serializers + +pytestmark = pytest.mark.django_db + + +def test_api_teams_update_anonymous(): + """Anonymous users should not be allowed to update a team.""" + team = factories.TeamFactory() + old_team_values = serializers.TeamSerializer(instance=team).data + + new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data + response = APIClient().put( + f"/resource-server/v1.0/teams/{team.id!s}/", + new_team_values, + content_type="application/json", + ) + assert response.status_code == HTTP_401_UNAUTHORIZED + assert response.json() == { + "detail": "Authentication credentials were not provided." + } + + team.refresh_from_db() + team_values = serializers.TeamSerializer(instance=team).data + assert team_values == old_team_values + + +def test_api_teams_update_authenticated_unrelated( + client, force_login_via_resource_server +): + """ + Authenticated users should not be allowed to update a team to which they are not related. + """ + user = factories.UserFactory() + service_provider = factories.ServiceProviderFactory() + team = factories.TeamFactory(service_providers=[service_provider]) + + old_team_values = serializers.TeamSerializer(instance=team).data + + new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.put( + f"/resource-server/v1.0/teams/{team.id!s}/", + new_team_values, + content_type="application/json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_404_NOT_FOUND + assert response.json() == {"detail": "No Team matches the given query."} + + team.refresh_from_db() + team_values = serializers.TeamSerializer(instance=team).data + assert team_values == old_team_values + + +def test_api_teams_update_authenticated( + client, + force_login_via_resource_server, +): + """ + Authenticated users should be allowed to update a team to which they + are related whatever the role even if the request is authenticated via + a resource server. + """ + service_provider = factories.ServiceProviderFactory( + audience_id="some_service_provider" + ) + user = factories.UserFactory() + team = factories.TeamFactory( + name="Old name", + users=[(user, "owner")], + service_providers=[service_provider], + ) + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.put( + f"/resource-server/v1.0/teams/{team.id!s}/", + data=serializers.TeamSerializer(instance=team).data + | { + "name": "New name", + }, + content_type="application/json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_200_OK + + team.refresh_from_db() + assert team.name == "New name" + + +def test_api_teams_update_authenticated_other_resource_server( + client, force_login_via_resource_server +): + """ + Authenticated users should not be able to update a team for which they are directly + owner, if the request is authenticated via a different service provider. + """ + user = factories.UserFactory() + service_provider = factories.ServiceProviderFactory() + + other_service_provider = factories.ServiceProviderFactory( + audience_id="some_service_provider" + ) + team = factories.TeamFactory( + name="Old name", + users=[(user, "owner")], + service_providers=[other_service_provider], + ) + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.put( + f"/resource-server/v1.0/teams/{team.id!s}/", + data=serializers.TeamSerializer(instance=team).data + | { + "name": "New name", + }, + content_type="application/json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_404_NOT_FOUND + assert response.json() == {"detail": "No Team matches the given query."} + + team.refresh_from_db() + assert team.name == "Old name" + + +def test_api_teams_update_authenticated_members( + client, force_login_via_resource_server +): + """ + Users who are members of a team but not administrators should + not be allowed to update it. + """ + user = factories.UserFactory() + service_provider = factories.ServiceProviderFactory() + + team = factories.TeamFactory( + users=[(user, "member")], service_providers=[service_provider] + ) + old_team_values = serializers.TeamSerializer(instance=team).data + + new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.put( + f"/resource-server/v1.0/teams/{team.id!s}/", + new_team_values, + content_type="application/json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_403_FORBIDDEN + assert response.json() == { + "detail": "You do not have permission to perform this action." + } + + team.refresh_from_db() + team_values = serializers.TeamSerializer(instance=team).data + assert team_values == old_team_values + + +@pytest.mark.parametrize("role", ["owner", "administrator"]) +def test_api_teams_update_authenticated_administrators( + client, force_login_via_resource_server, role +): + """Administrators or owners of a team should be allowed to update it.""" + user = factories.UserFactory() + service_provider = factories.ServiceProviderFactory() + + team = factories.TeamFactory( + users=[(user, role)], + service_providers=[service_provider], + name="old name", + ) + initial_created_at = team.created_at + initial_updated_at = team.updated_at + initial_pk = team.pk + + # generate new random values + new_values = serializers.TeamSerializer(instance=factories.TeamFactory.build()).data + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.put( + f"/resource-server/v1.0/teams/{team.id!s}/", + new_values, + content_type="application/json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + assert response.status_code == HTTP_200_OK + + team.refresh_from_db() + assert team.pk == initial_pk + assert team.name == new_values["name"] + assert team.created_at == initial_created_at + assert team.updated_at > initial_updated_at + + +@pytest.mark.parametrize("role", ["owner", "administrator"]) +def test_api_teams_update_administrator_or_owner_of_another( + client, force_login_via_resource_server, role +): + """ + Being administrator or owner of a team should not grant authorization to update + another team. + """ + user = factories.UserFactory() + service_provider = factories.ServiceProviderFactory() + + factories.TeamFactory(users=[(user, role)], service_providers=[service_provider]) + + team = factories.TeamFactory(name="Old name", service_providers=[service_provider]) + old_team_values = serializers.TeamSerializer(instance=team).data + + new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data + + with force_login_via_resource_server(client, user, service_provider.audience_id): + response = client.put( + f"/resource-server/v1.0/teams/{team.id!s}/", + new_team_values, + content_type="application/json", + HTTP_AUTHORIZATION="Bearer b64untestedbearertoken", + ) + + assert response.status_code == HTTP_404_NOT_FOUND + assert response.json() == {"detail": "No Team matches the given query."} + + team.refresh_from_db() + team_values = serializers.TeamSerializer(instance=team).data + assert team_values == old_team_values diff --git a/src/backend/core/tests/team_accesses/__init__.py b/src/backend/core/tests/team_accesses/__init__.py new file mode 100644 index 0000000..bff1c18 --- /dev/null +++ b/src/backend/core/tests/team_accesses/__init__.py @@ -0,0 +1 @@ +"""Team accesses tests package.""" diff --git a/src/backend/core/tests/teams/__init__.py b/src/backend/core/tests/teams/__init__.py new file mode 100644 index 0000000..5630f31 --- /dev/null +++ b/src/backend/core/tests/teams/__init__.py @@ -0,0 +1 @@ +"""Teams tests package.""" diff --git a/src/backend/core/tests/teams/test_core_api_teams_list.py b/src/backend/core/tests/teams/test_core_api_teams_list.py index 420dd59..24ef55a 100644 --- a/src/backend/core/tests/teams/test_core_api_teams_list.py +++ b/src/backend/core/tests/teams/test_core_api_teams_list.py @@ -2,10 +2,7 @@ Tests for Teams API endpoint in People's core app: list """ -from unittest import mock - import pytest -from rest_framework.pagination import PageNumberPagination from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED from rest_framework.test import APIClient @@ -123,7 +120,6 @@ def test_api_teams_order_param(): assert response.status_code == 200 response_data = response.json() - response_team_ids = [team["id"] for team in response_data] assert ( diff --git a/src/backend/core/tests/teams/test_core_api_teams_retrieve.py b/src/backend/core/tests/teams/test_core_api_teams_retrieve.py index 2edd1fa..607dec7 100644 --- a/src/backend/core/tests/teams/test_core_api_teams_retrieve.py +++ b/src/backend/core/tests/teams/test_core_api_teams_retrieve.py @@ -72,4 +72,5 @@ def test_api_teams_retrieve_authenticated_related(): "abilities": team.get_abilities(user), "created_at": team.created_at.isoformat().replace("+00:00", "Z"), "updated_at": team.updated_at.isoformat().replace("+00:00", "Z"), + "service_providers": [], } diff --git a/src/backend/core/tests/teams/test_core_api_teams_update.py b/src/backend/core/tests/teams/test_core_api_teams_update.py index ace2b53..11d6e8b 100644 --- a/src/backend/core/tests/teams/test_core_api_teams_update.py +++ b/src/backend/core/tests/teams/test_core_api_teams_update.py @@ -188,3 +188,34 @@ def test_api_teams_update_administrator_or_owner_of_another(): team.refresh_from_db() team_values = serializers.TeamSerializer(instance=team).data assert team_values == old_team_values + + +def test_api_teams_update_authenticated_owners_add_service_providers(): + """ + Owners of a team should be allowed to update its service providers. + """ + user = factories.UserFactory() + + client = APIClient() + client.force_login(user) + + team = factories.TeamFactory(users=[(user, "owner")]) + new_team_values = serializers.TeamSerializer(instance=team).data + + service_provider_1 = factories.ServiceProviderFactory() + service_provider_2 = factories.ServiceProviderFactory() + new_team_values["service_providers"] = [ + service_provider_1.pk, + service_provider_2.pk, + ] + + response = client.put( + f"/api/v1.0/teams/{team.id!s}/", + new_team_values, + format="json", + ) + assert response.status_code == HTTP_200_OK + + team.refresh_from_db() + assert team.service_providers.count() == 2 + assert set(team.service_providers.all()) == {service_provider_1, service_provider_2}