(backend) add ServiceProvider

This adds the ServiceProvider notion to allow to better
manage which teams is available for each service provider.
This commit is contained in:
Quentin BEY
2024-11-04 11:32:41 +01:00
committed by BEY Quentin
parent 512d9fe82c
commit a041296f8a
27 changed files with 1392 additions and 10 deletions

View File

@@ -25,6 +25,7 @@ and this project adheres to
- ✨(domains) allow creation of "pending" mailboxes
- ✨(teams) allow team management for team admins/owners #509
- ✨(backend) add ServiceProvider #522
### Fixed

View File

@@ -108,11 +108,20 @@ class UserAdmin(auth_admin.UserAdmin):
get_user.short_description = _("User")
class TeamServiceProviderInline(admin.TabularInline):
"""Inline admin class for service providers."""
can_delete = False
model = models.Team.service_providers.through
extra = 0
@admin.register(models.Team)
class TeamAdmin(admin.ModelAdmin):
"""Team admin interface declaration."""
inlines = (TeamAccessInline, TeamWebhookInline)
inlines = (TeamAccessInline, TeamWebhookInline, TeamServiceProviderInline)
exclude = ("service_providers",) # Handled by the inline
list_display = (
"name",
"created_at",
@@ -188,6 +197,14 @@ class ContactAdmin(admin.ModelAdmin):
)
class OrganizationServiceProviderInline(admin.TabularInline):
"""Inline admin class for service providers."""
can_delete = False
model = models.Organization.service_providers.through
extra = 0
@admin.register(models.Organization)
class OrganizationAdmin(admin.ModelAdmin):
"""Admin interface for organizations."""
@@ -198,7 +215,8 @@ class OrganizationAdmin(admin.ModelAdmin):
"updated_at",
)
search_fields = ("name",)
inlines = (OrganizationAccessInline,)
inlines = (OrganizationAccessInline, OrganizationServiceProviderInline)
exclude = ("service_providers",) # Handled by the inline
@admin.register(models.OrganizationAccess)
@@ -213,3 +231,17 @@ class OrganizationAccessAdmin(admin.ModelAdmin):
"created_at",
"updated_at",
)
@admin.register(models.ServiceProvider)
class ServiceProviderAdmin(admin.ModelAdmin):
"""Admin interface for service providers."""
list_display = (
"name",
"audience_id",
"created_at",
"updated_at",
)
search_fields = ("name", "audience_id")
readonly_fields = ("created_at", "updated_at")

View File

@@ -4,6 +4,7 @@ from rest_framework import exceptions, serializers
from timezone_field.rest_framework import TimeZoneSerializerField
from core import models
from core.models import ServiceProvider
class ContactSerializer(serializers.ModelSerializer):
@@ -205,6 +206,9 @@ class TeamSerializer(serializers.ModelSerializer):
"""Serialize teams."""
abilities = serializers.SerializerMethodField(read_only=True)
service_providers = serializers.PrimaryKeyRelatedField(
queryset=ServiceProvider.objects.all(), many=True, required=False
)
class Meta:
model = models.Team
@@ -215,6 +219,7 @@ class TeamSerializer(serializers.ModelSerializer):
"abilities",
"created_at",
"updated_at",
"service_providers",
]
read_only_fields = [
"id",
@@ -226,6 +231,13 @@ class TeamSerializer(serializers.ModelSerializer):
def create(self, validated_data):
"""Create a new team with organization enforcement."""
# When called as a resource server, we enforce the team service provider
if sp_audience := self.context.get("from_service_provider_audience", None):
service_provider, _created = models.ServiceProvider.objects.get_or_create(
audience_id=sp_audience
)
validated_data["service_providers"] = [service_provider]
# Note: this is not the purpose of this API to check the user has an organization
return super().create(
validated_data=validated_data

View File

@@ -18,6 +18,7 @@ from rest_framework.permissions import AllowAny
from core import models
from ..resource_server.authentication import ResourceServerAuthentication
from . import permissions, serializers
SIMILARITY_THRESHOLD = 0.04
@@ -247,15 +248,52 @@ class TeamViewSet(
queryset = models.Team.objects.all()
pagination_class = None
def _get_service_provider_audience(self):
"""Return the audience of the Service Provider from the OIDC introspected token."""
if not isinstance(
self.request.successful_authenticator, ResourceServerAuthentication
):
# We could check request.resource_server_token_audience here, but it's
# more explicit to check the authenticator type and assert the attribute
# existence.
return None
# When used as a resource server, the request has a token audience
service_provider_audience = self.request.resource_server_token_audience
if not service_provider_audience: # should not happen
raise exceptions.AuthenticationFailed(
"Resource server token audience not found in request"
)
return service_provider_audience
def get_queryset(self):
"""Custom queryset to get user related teams."""
user_role_query = models.TeamAccess.objects.filter(
user=self.request.user, team=OuterRef("pk")
).values("role")[:1]
return models.Team.objects.filter(accesses__user=self.request.user).annotate(
user_role=Subquery(user_role_query)
return (
models.Team.objects.prefetch_related("accesses", "service_providers")
.filter(
accesses__user=self.request.user,
)
.annotate(user_role=Subquery(user_role_query))
)
def get_serializer_context(self):
"""Extra context provided to the serializer class."""
context = super().get_serializer_context()
# When used as a resource server, we need to know the audience to automatically:
# - add the Service Provider to the team "scope" on creation
context["from_service_provider_audience"] = (
self._get_service_provider_audience()
)
return context
def perform_create(self, serializer):
"""Set the current user as owner of the newly created team."""
team = serializer.save()

View File

@@ -180,6 +180,13 @@ class TeamFactory(factory.django.DjangoModelFactory):
else:
TeamAccessFactory(team=self, user=user_entry[0], role=user_entry[1])
@factory.post_generation
def service_providers(self, create, extracted, **kwargs):
"""Add service providers to team from a given list of service providers."""
if not create or not extracted:
return
self.service_providers.set(extracted)
class TeamAccessFactory(factory.django.DjangoModelFactory):
"""Create fake team user accesses for testing."""
@@ -212,3 +219,12 @@ class InvitationFactory(factory.django.DjangoModelFactory):
email = factory.Faker("email")
role = factory.fuzzy.FuzzyChoice([role[0] for role in models.RoleChoices.choices])
issuer = factory.SubFactory(UserFactory)
class ServiceProviderFactory(factory.django.DjangoModelFactory):
"""A factory to create service providers for testing purposes."""
class Meta:
model = models.ServiceProvider
audience_id = factory.Faker("uuid4")

View File

@@ -0,0 +1,39 @@
# Generated by Django 5.1.2 on 2024-11-07 16:24
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0004_remove_team_slug'),
]
operations = [
migrations.CreateModel(
name='ServiceProvider',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created at')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated at')),
('name', models.CharField(max_length=256, unique=True, verbose_name='name')),
('audience_id', models.CharField(db_index=True, max_length=256, unique=True, verbose_name='audience id')),
],
options={
'verbose_name': 'service provider',
'verbose_name_plural': 'service providers',
'db_table': 'people_service_provider',
},
),
migrations.AddField(
model_name='organization',
name='service_providers',
field=models.ManyToManyField(blank=True, related_name='organizations', to='core.serviceprovider'),
),
migrations.AddField(
model_name='team',
name='service_providers',
field=models.ManyToManyField(blank=True, related_name='teams', to='core.serviceprovider'),
),
]

View File

@@ -173,6 +173,34 @@ class Contact(BaseModel):
raise exceptions.ValidationError({"data": [error_message]}) from e
class ServiceProvider(BaseModel):
"""
Represents a service provider that will consume our information.
Organization uses this model to define the list of SP available to their users.
Team uses this model to define their visibility to the various SP.
"""
name = models.CharField(_("name"), max_length=256, unique=True)
audience_id = models.CharField(
_("audience id"), max_length=256, unique=True, db_index=True
)
class Meta:
db_table = "people_service_provider"
verbose_name = _("service provider")
verbose_name_plural = _("service providers")
def __str__(self):
return self.name
def save(self, *args, **kwargs):
"""Enforce name (even if ugly) from the `audience_id` field."""
if not self.name:
self.name = self.audience_id # ok, same length
return super().save(*args, **kwargs)
class OrganizationManager(models.Manager):
"""
Custom manager for the Organization model, to manage complexity/automation.
@@ -282,6 +310,12 @@ class Organization(BaseModel):
validators=[validate_unique_domain],
)
service_providers = models.ManyToManyField(
ServiceProvider,
related_name="organizations",
blank=True,
)
objects = OrganizationManager()
class Meta:
@@ -539,6 +573,11 @@ class OrganizationAccess(BaseModel):
class Team(BaseModel):
"""
Represents the link between teams and users, specifying the role a user has in a team.
When a team is created from here, the user have to choose which Service Providers
can see it.
When a team is created from a Service Provider this one is automatically set in the
Team `service_providers`.
"""
name = models.CharField(max_length=100)
@@ -556,6 +595,11 @@ class Team(BaseModel):
null=True, # Need to be set to False when everything is migrated
blank=True, # Need to be set to False when everything is migrated
)
service_providers = models.ManyToManyField(
ServiceProvider,
related_name="teams",
blank=True,
)
class Meta:
db_table = "people_team"

View File

@@ -53,3 +53,20 @@ class ResourceServerAuthentication(OIDCAuthentication):
pass
return access_token
def authenticate(self, request):
"""
Authenticate the request and return a tuple of (user, token) or None.
We override the 'authenticate' method from the parent class to store
the introspected token audience inside the request.
"""
result = super().authenticate(request) # Might raise AuthenticationFailed
if result is None: # Case when there is no access token
return None
# Note: at this stage, the request is a "drf_request" object
request.resource_server_token_audience = self.backend.token_origin_audience
return result

View File

@@ -61,6 +61,10 @@ class ResourceServerBackend:
token_introspection={"essential": True},
)
# Declare the token origin audience: to know where the token comes from
# and store it for further use in the application
self.token_origin_audience = None
# pylint: disable=unused-argument
def get_or_create_user(self, access_token, id_token, payload):
"""Maintain API compatibility with OIDCAuthentication class from mozilla-django-oidc
@@ -85,6 +89,8 @@ class ResourceServerBackend:
that extends RFC 7662 by returning a signed and encrypted JWT for stronger assurance that
the authorization server issued the token introspection response.
"""
self.token_origin_audience = None # Reset the token origin audience
jwt = self._introspect(access_token)
claims = self._verify_claims(jwt)
user_info = self._verify_user_info(claims["token_introspection"])
@@ -100,6 +106,8 @@ class ResourceServerBackend:
logger.debug("Login failed: No user with %s found", sub)
return None
self.token_origin_audience = str(user_info["aud"])
return user
def _verify_user_info(self, introspection_response):
@@ -127,6 +135,12 @@ class ResourceServerBackend:
logger.debug(message)
raise SuspiciousOperation(message)
audience = introspection_response.get("aud", None)
if not audience:
raise SuspiciousOperation(
"Introspection response does not provide source audience."
)
return introspection_response
def _introspect(self, token):
@@ -219,6 +233,8 @@ class ResourceServerBackend:
class ResourceServerImproperlyConfiguredBackend:
"""Fallback backend for improperly configured Resource Servers."""
token_origin_audience = None
def get_or_create_user(self, access_token, id_token, payload):
"""Indicate that the Resource Server is improperly configured."""
raise AuthenticationFailed("Resource Server is improperly configured")

View File

@@ -0,0 +1 @@
"""People core resource server API endpoints"""

View File

@@ -0,0 +1,46 @@
"""Client serializers for the People core app resource server API."""
from rest_framework import serializers
from core import models
class TeamSerializer(serializers.ModelSerializer):
"""Serialize teams."""
class Meta:
model = models.Team
fields = [
"id",
"name",
"created_at",
"updated_at",
]
read_only_fields = [
"id",
"created_at",
"updated_at",
]
def create(self, validated_data):
"""
Create a new team with organization enforcement.
In this context, called as a resource server,
the team service provider is enforced.
When the service provider audience is unknown it is created on the fly.
"""
sp_audience = self.context["from_service_provider_audience"]
service_provider, _created = models.ServiceProvider.objects.get_or_create(
audience_id=sp_audience
)
# Note: this is not the purpose of this API to check the user has an organization
return super().create(
validated_data=validated_data
| {
"organization_id": self.context["request"].user.organization_id,
"service_providers": [service_provider],
},
)

View File

@@ -0,0 +1,90 @@
"""Resource server API endpoints"""
from django.db.models import OuterRef, Prefetch, Subquery
from rest_framework import (
filters,
mixins,
viewsets,
)
from core import models
from core.api import permissions
from core.resource_server.mixins import ResourceServerMixin
from ..api.viewsets import Pagination
from . import serializers
class TeamViewSet( # pylint: disable=too-many-ancestors
ResourceServerMixin,
mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
):
"""
Team ViewSet dedicated to the resource server.
The DELETE method is not allowed for now, because the use case is
not clear yet and it comes with complexity to know if we can delete
a team or not (eg. if a team has other SP, it might not be deleted
but what do we do then, only remove the current SP?).
GET /resource-server/v1.0/teams/
Return list of Teams of the user and available for the audience.
POST /resource-server/v1.0/teams/
Create a new Team for the user for the audience.
GET /resource-server/v1.0/teams/{team_id}/
Return the Team details if available for the audience.
PUT /resource-server/v1.0/teams/{team_id}/
Update the Team details (only name for now).
"""
permission_classes = [permissions.AccessPermission]
serializer_class = serializers.TeamSerializer
filter_backends = [filters.OrderingFilter]
ordering_fields = ["created_at"]
ordering = ["-created_at"]
queryset = models.Team.objects.all()
pagination_class = Pagination
def get_queryset(self):
"""Custom queryset to get user related teams."""
user_role_query = models.TeamAccess.objects.filter(
user=self.request.user, team=OuterRef("pk")
).values("role")[:1]
service_provider_audience = self._get_service_provider_audience()
service_provider_prefetch = Prefetch(
"service_providers",
queryset=models.ServiceProvider.objects.filter(
audience_id=service_provider_audience
),
)
return (
models.Team.objects.prefetch_related(
"accesses",
service_provider_prefetch,
)
.filter(
accesses__user=self.request.user,
service_providers__audience_id=service_provider_audience,
)
.annotate(user_role=Subquery(user_role_query))
)
def perform_create(self, serializer):
"""Set the current user as owner of the newly created team."""
team = serializer.save()
models.TeamAccess.objects.create(
team=team,
user=self.request.user,
role=models.RoleChoices.OWNER,
)

View File

@@ -0,0 +1,79 @@
"""Defines fixtures for the resource server API tests."""
from contextlib import contextmanager
from typing import Optional
from unittest import mock
from django.contrib.auth import get_user_model
import pytest
import responses
from faker import Faker
from core.resource_server.authentication import ResourceServerAuthentication
User = get_user_model()
fake = Faker()
@contextmanager
def _force_login_via_resource_server(
client_fixture,
user: User,
service_provider_audience: Optional[str],
):
"""
Context manager to authenticate a user with a service provider via
a resource server call.
This allows to authenticate a user with a service provider without doing
all the introspection process.
This is a private function, use the `force_login_via_resource_server`
fixture instead.
The `service_provider_audience` parameter might not match any existing
service provider audience, doing so allow to check the behavior when
the service provider is not yet known.
"""
def mock_authenticate(self, request): # pylint: disable=unused-argument
request.resource_server_token_audience = (
service_provider_audience or fake.pystr(min_chars=10, max_chars=10)
)
return user, "unused-token"
with mock.patch.object(
ResourceServerAuthentication, "authenticate", mock_authenticate
):
client_fixture.force_login(
user,
backend="core.resource_server.authentication.ResourceServerAuthentication",
)
yield
@pytest.fixture(name="force_login_via_resource_server")
@responses.activate
def force_login_via_resource_server_fixture():
"""
Fixture to authenticate a user with a service provider via a resource server call.
Usage:
```
def test_login_with_resource_server(
client, force_login_via_resource_server,
):
user = UserFactory()
service_provider = ServiceProviderFactory()
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
"/resource-server/v1.0/<whatever>/",
format="json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
# response is authenticated
```
"""
return _force_login_via_resource_server

View File

@@ -0,0 +1,178 @@
"""Tests for the authentication process of the resource server."""
import base64
import json
import pytest
import responses
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from joserfc import jwe as jose_jwe
from joserfc import jwt as jose_jwt
from joserfc.rfc7518.rsa_key import RSAKey
from jwt.utils import to_base64url_uint
from rest_framework.request import Request as DRFRequest
from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED
from core.factories import UserFactory
from core.models import ServiceProvider
from core.resource_server.authentication import ResourceServerAuthentication
pytestmark = pytest.mark.django_db
def build_authorization_bearer(token):
"""
Build an Authorization Bearer header value from a token.
This can be used like this:
client.post(
...
HTTP_AUTHORIZATION=f"Bearer {build_authorization_bearer('some_token')}",
)
"""
return base64.b64encode(token.encode("utf-8")).decode("utf-8")
@responses.activate
def test_resource_server_authentication_class(client, settings):
"""
Defines the settings for the resource server
for a full authentication with introspection process.
This is an integration test that checks the authentication process
when using the ResourceServerAuthentication class.
This test asserts the DRF request object contains the
`resource_server_token_audience` attribute which is used in
the resource server views.
This test uses the `/resource-server/v1.0/teams/` URL as an example
because we don't want to create a new URL just for this test.
"""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
unencrypted_pem_private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
pem_public_key = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
settings.OIDC_RS_PRIVATE_KEY_STR = unencrypted_pem_private_key.decode("utf-8")
settings.OIDC_RS_ENCRYPTION_KEY_TYPE = "RSA"
settings.OIDC_RS_ENCRYPTION_ENCODING = "A256GCM"
settings.OIDC_RS_ENCRYPTION_ALGO = "RSA-OAEP"
settings.OIDC_RS_SIGNING_ALGO = "RS256"
settings.OIDC_RS_CLIENT_ID = "some_client_id"
settings.OIDC_RS_CLIENT_SECRET = "some_client_secret"
settings.OIDC_OP_URL = "https://oidc.example.com"
settings.OIDC_VERIFY_SSL = False
settings.OIDC_TIMEOUT = 5
settings.OIDC_PROXY = None
settings.OIDC_OP_JWKS_ENDPOINT = "https://oidc.example.com/jwks"
settings.OIDC_OP_INTROSPECTION_ENDPOINT = "https://oidc.example.com/introspect"
# Mock the JWKS endpoint
public_numbers = private_key.public_key().public_numbers()
responses.add(
responses.GET,
settings.OIDC_OP_JWKS_ENDPOINT,
body=json.dumps(
{
"keys": [
{
"kty": settings.OIDC_RS_ENCRYPTION_KEY_TYPE,
"alg": settings.OIDC_RS_SIGNING_ALGO,
"use": "sig",
"kid": "1234567890",
"n": to_base64url_uint(public_numbers.n).decode("ascii"),
"e": to_base64url_uint(public_numbers.e).decode("ascii"),
}
]
}
),
)
def encrypt_jwt(json_data):
"""Encrypt the JWT token for the backend to decrypt."""
token = jose_jwt.encode(
{
"kid": "1234567890",
"alg": settings.OIDC_RS_SIGNING_ALGO,
},
json_data,
RSAKey.import_key(unencrypted_pem_private_key),
algorithms=[settings.OIDC_RS_SIGNING_ALGO],
)
return jose_jwe.encrypt_compact(
protected={
"alg": settings.OIDC_RS_ENCRYPTION_ALGO,
"enc": settings.OIDC_RS_ENCRYPTION_ENCODING,
},
plaintext=token,
public_key=RSAKey.import_key(pem_public_key),
algorithms=[
settings.OIDC_RS_ENCRYPTION_ALGO,
settings.OIDC_RS_ENCRYPTION_ENCODING,
],
)
responses.add(
responses.POST,
"https://oidc.example.com/introspect",
body=encrypt_jwt(
{
"iss": "https://oidc.example.com",
"aud": "some_client_id", # settings.OIDC_RS_CLIENT_ID
"token_introspection": {
"sub": "very-specific-sub",
"iss": "https://oidc.example.com",
"aud": "some_service_provider",
"scope": "openid groups",
"active": True,
},
}
),
)
# Try to authenticate while the user does not exist => 401
response = client.get(
"/resource-server/v1.0/teams/", # use an exising URL here
format="json",
HTTP_AUTHORIZATION=f"Bearer {build_authorization_bearer('some_token')}",
)
assert response.status_code == HTTP_401_UNAUTHORIZED
assert ServiceProvider.objects.count() == 0
# Create a user with the specific sub, the access is authorized
UserFactory(sub="very-specific-sub")
response = client.get(
"/resource-server/v1.0/teams/", # use an exising URL here
format="json",
HTTP_AUTHORIZATION=f"Bearer {build_authorization_bearer('some_token')}",
)
assert response.status_code == HTTP_200_OK
response_request = response.renderer_context.get("request")
assert isinstance(response_request, DRFRequest)
assert isinstance(
response_request.successful_authenticator, ResourceServerAuthentication
)
# Check that the user is authenticated
assert response_request.user.is_authenticated
# Check the request contains the resource server token audience
assert response_request.resource_server_token_audience == "some_service_provider"
# Check that no service provider is created here
assert ServiceProvider.objects.count() == 0

View File

@@ -296,7 +296,7 @@ def test_introspect_public_key_import_failure(
def test_verify_user_info_success(resource_server_backend):
"""Test '_verify_user_info' with a successful response."""
introspection_response = {"active": True, "scope": "groups"}
introspection_response = {"active": True, "scope": "groups", "aud": "123"}
result = resource_server_backend._verify_user_info(introspection_response)
assert result == introspection_response
@@ -333,7 +333,7 @@ def test_get_user_success(resource_server_backend):
access_token = "valid_access_token"
mock_jwt = Mock()
mock_claims = {"token_introspection": {"sub": "user123"}}
mock_claims = {"token_introspection": {"sub": "user123", "aud": "123"}}
mock_user = Mock()
resource_server_backend._introspect = Mock(return_value=mock_jwt)

View File

@@ -0,0 +1 @@
"""Tests for the resource server API endpoints."""

View File

@@ -0,0 +1 @@
"""Tests for the resource server Team API endpoints."""

View File

@@ -0,0 +1,163 @@
"""
Tests for Teams API endpoint in People's core app: create
"""
import pytest
from rest_framework.status import (
HTTP_201_CREATED,
HTTP_401_UNAUTHORIZED,
)
from rest_framework.test import APIClient
from core.factories import OrganizationFactory, ServiceProviderFactory, UserFactory
from core.models import ServiceProvider, Team
pytestmark = pytest.mark.django_db
def test_api_teams_create_anonymous():
"""Anonymous users should not be allowed to create teams."""
response = APIClient().post(
"/resource-server/v1.0/teams/",
{
"name": "my team",
},
)
assert response.status_code == HTTP_401_UNAUTHORIZED
assert not Team.objects.exists()
def test_api_teams_create_authenticated_new_service_provider(
client, force_login_via_resource_server
):
"""
Authenticated users should be able to create teams and should automatically be declared
as the owner of the newly created team and a new service provider should be created and
associated to the team.
"""
organization = OrganizationFactory(with_registration_id=True)
user = UserFactory(organization=organization)
assert ServiceProvider.objects.count() == 0
with force_login_via_resource_server(client, user, "some_service_provider"):
response = client.post(
"/resource-server/v1.0/teams/",
{
"name": "my team",
},
format="json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_201_CREATED
team = Team.objects.get()
team_access = team.accesses.get()
service_provider = ServiceProvider.objects.get() # service provider created
assert response.json() == {
"created_at": team.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"id": str(team.pk),
"name": "my team",
"updated_at": team.updated_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
}
# check team data
assert team.name == "my team"
assert team.organization == organization
# check team access data
assert team_access.role == "owner"
assert team_access.user == user
# check service provider data
assert service_provider.audience_id == "some_service_provider"
def test_api_teams_create_authenticated_existing_service_provider(
client,
force_login_via_resource_server,
):
"""
Authenticated users should be able to create teams and should automatically be declared
as the owner of the newly created team and an existing service provider should be associated
to the team.
"""
user = UserFactory()
service_provider = ServiceProviderFactory()
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.post(
"/resource-server/v1.0/teams/",
{
"name": "my team",
},
format="json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_201_CREATED
assert ServiceProvider.objects.count() == 1 # no object created
team = Team.objects.get() # team created
assert team.service_providers.get().audience_id == service_provider.audience_id
assert team.name == "my team"
assert team.accesses.filter(role="owner", user=user).exists()
def test_api_teams_create_cannot_override_organization(
client, force_login_via_resource_server
):
"""
Authenticated users should be able to create teams and not
be able to set the organization manually (for now).
"""
organization = OrganizationFactory(with_registration_id=True)
user = UserFactory(organization=organization)
service_provider = ServiceProviderFactory()
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.post(
"/resource-server/v1.0/teams/",
{
"name": "my team",
"organization": OrganizationFactory(
with_registration_id=True
).pk, # ignored
},
format="json",
)
assert response.status_code == HTTP_201_CREATED
team = Team.objects.get()
assert team.name == "my team"
assert team.organization == organization
assert team.accesses.filter(role="owner", user=user).exists()
def test_api_teams_create_cannot_override_service_provider(
client, force_login_via_resource_server
):
"""
Authenticated users should be able to create teams and not
be able to set the team service provider manually.
"""
user = UserFactory(with_organization=True)
service_provider = ServiceProviderFactory()
other_service_provider = ServiceProviderFactory()
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.post(
"/resource-server/v1.0/teams/",
{
"name": "my team",
"service_providers": [str(other_service_provider.pk)], # ignored
},
format="json",
)
assert response.status_code == HTTP_201_CREATED
team = Team.objects.get()
assert team.name == "my team"
assert team.service_providers.get().audience_id == service_provider.audience_id

View File

@@ -0,0 +1,49 @@
"""
Tests for Teams API endpoint in People's core app: delete
"""
import pytest
from rest_framework.status import (
HTTP_401_UNAUTHORIZED,
HTTP_405_METHOD_NOT_ALLOWED,
)
from rest_framework.test import APIClient
from core import factories, models
pytestmark = pytest.mark.django_db
def test_api_teams_delete_anonymous():
"""Anonymous users should not be allowed to destroy a team."""
team = factories.TeamFactory()
response = APIClient().delete(
f"/resource-server/v1.0/teams/{team.id!s}/",
)
assert response.status_code == HTTP_401_UNAUTHORIZED
assert models.Team.objects.count() == 1
def test_api_teams_delete_not_allowed(client, force_login_via_resource_server):
"""
Authenticated users should not be allowed to delete a team from a resource
server, even if they have the right permissions.
This may be implemented in the future, but for now, it is not allowed.
"""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
team = factories.TeamFactory(
users=[(user, "owner")], service_providers=[service_provider]
)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.delete(
f"/resource-server/v1.0/teams/{team.id!s}/",
)
assert response.status_code == HTTP_405_METHOD_NOT_ALLOWED
assert response.json() == {"detail": 'Method "DELETE" not allowed.'}
assert models.Team.objects.count() == 1

View File

@@ -0,0 +1,184 @@
"""
Tests for Teams API endpoint in People's core app: list
"""
import pytest
from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED
from rest_framework.test import APIClient
from core import factories
pytestmark = pytest.mark.django_db
def test_api_teams_list_anonymous():
"""Anonymous users should not be allowed to list teams."""
factories.TeamFactory.create_batch(2)
response = APIClient().get("/resource-server/v1.0/teams/")
assert response.status_code == HTTP_401_UNAUTHORIZED
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
def test_api_teams_list_authenticated( # pylint: disable=too-many-locals
client, django_assert_num_queries, force_login_via_resource_server
):
"""
Authenticated users should be able to list teams
they are an owner/administrator/member of, and only list from the
requesting service provider should appear.
"""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
hidden_service_provider = factories.ServiceProviderFactory()
team_access_1 = factories.TeamAccessFactory(
user=user, team__service_providers=[service_provider], role="member"
)
team_1 = team_access_1.team
team_access_2 = factories.TeamAccessFactory(
user=user,
team__service_providers=[hidden_service_provider, service_provider],
role="member",
)
team_2 = team_access_2.team
team_access_3 = factories.TeamAccessFactory(
user=user, team__service_providers=[service_provider], role="administrator"
)
team_3 = team_access_3.team
team_access_4 = factories.TeamAccessFactory(
user=user, team__service_providers=[service_provider], role="owner"
)
team_4 = team_access_4.team
# Team filtered out because of the service provider
_not_included_team_access = factories.TeamAccessFactory(
user=user, team__service_providers=[hidden_service_provider]
)
# Authenticate using the resource server, ie via the Authorization header
with force_login_via_resource_server(client, user, service_provider.audience_id):
with django_assert_num_queries(4): # Count, Team, ServiceProvider, TeamAccess
response = client.get(
"/resource-server/v1.0/teams/?ordering=created_at",
format="json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_200_OK
assert response.json() == {
"count": 4,
"next": None,
"previous": None,
"results": [
{
"created_at": team_1.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"id": str(team_1.pk),
"name": team_1.name,
"updated_at": team_1.updated_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
{
"created_at": team_2.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"id": str(team_2.pk),
"name": team_2.name,
"updated_at": team_2.updated_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
{
"created_at": team_3.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"id": str(team_3.pk),
"name": team_3.name,
"updated_at": team_3.updated_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
{
"created_at": team_4.created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"id": str(team_4.pk),
"name": team_4.name,
"updated_at": team_4.updated_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
],
}
def test_api_teams_list_authenticated_new_service_provider(
client, force_login_via_resource_server
):
"""
Team list from not yet known service provider should be empty (not fail).
Teams without service providers should not be listed.
"""
user = factories.UserFactory()
_team = factories.TeamFactory(users=[user])
with force_login_via_resource_server(client, user, "some_service_provider"):
response = client.get(
"/resource-server/v1.0/teams/",
)
assert response.status_code == HTTP_200_OK
assert response.json() == {
"count": 0,
"next": None,
"previous": None,
"results": [],
}
def test_api_teams_list_authenticated_distinct(client, force_login_via_resource_server):
"""A team with several related users should only be listed once."""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
other_user = factories.UserFactory()
team = factories.TeamFactory(
users=[user, other_user], service_providers=[service_provider]
)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
"/resource-server/v1.0/teams/",
)
assert response.status_code == HTTP_200_OK
content = response.json()
assert content["count"] == 1
results = content["results"]
assert len(results) == 1
assert results[0]["id"] == str(team.id)
def test_api_teams_order_param(client, force_login_via_resource_server):
"""
Test that the 'created_at' field is sorted in ascending order
when the 'ordering' query parameter is set.
"""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
team_ids = [
str(team.id)
for team in factories.TeamFactory.create_batch(
5, users=[user], service_providers=[service_provider]
)
]
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
"/resource-server/v1.0/teams/?ordering=created_at",
)
assert response.status_code == 200
response_data = response.json()
response_team_ids = [team["id"] for team in response_data["results"]]
assert (
response_team_ids == team_ids
), "created_at values are not sorted from oldest to newest"

View File

@@ -0,0 +1,99 @@
"""
Tests for Teams API endpoint in People's core app: retrieve
"""
import pytest
from rest_framework import status
from rest_framework.status import HTTP_404_NOT_FOUND
from rest_framework.test import APIClient
from core import factories
from core.factories import UserFactory
pytestmark = pytest.mark.django_db
def test_api_teams_retrieve_anonymous():
"""Anonymous users should not be allowed to retrieve a team."""
team = factories.TeamFactory()
response = APIClient().get(f"/resource-server/v1.0/teams/{team.id}/")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
def test_api_teams_retrieve_authenticated_unrelated(
client, force_login_via_resource_server
):
"""
Authenticated users should not be allowed to retrieve a team to which they are
not related.
"""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
team = factories.TeamFactory(service_providers=[service_provider])
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{team.id!s}/",
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Team matches the given query."}
def test_api_teams_retrieve_authenticated_related(
client, force_login_via_resource_server
):
"""
Authenticated users should be allowed to retrieve a team to which they
are related whatever the role even if the request is authenticated via
a resource server.
"""
service_provider = factories.ServiceProviderFactory(
audience_id="some_service_provider"
)
user = factories.UserFactory()
team = factories.TeamFactory(users=[user], service_providers=[service_provider])
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{team.id!s}/",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == status.HTTP_200_OK
assert response.json() == {
"id": str(team.id),
"name": team.name,
"created_at": team.created_at.isoformat().replace("+00:00", "Z"),
"updated_at": team.updated_at.isoformat().replace("+00:00", "Z"),
}
def test_api_teams_retrieve_authenticated_other_service_provider(
client, force_login_via_resource_server
):
"""
Authenticated users should not be able to retrieve a team
if the request is authenticated via a different resource server.
"""
user = UserFactory()
service_provider = factories.ServiceProviderFactory()
other_service_provider = factories.ServiceProviderFactory(
audience_id="some_service_provider"
)
team = factories.TeamFactory(
users=[user], service_providers=[other_service_provider]
)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.get(
f"/resource-server/v1.0/teams/{team.id!s}/",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_404_NOT_FOUND

View File

@@ -0,0 +1,246 @@
"""
Tests for Teams API endpoint in People's core app: update
"""
import pytest
from rest_framework.status import (
HTTP_200_OK,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
)
from rest_framework.test import APIClient
from core import factories
from core.resource_server_api import serializers
pytestmark = pytest.mark.django_db
def test_api_teams_update_anonymous():
"""Anonymous users should not be allowed to update a team."""
team = factories.TeamFactory()
old_team_values = serializers.TeamSerializer(instance=team).data
new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data
response = APIClient().put(
f"/resource-server/v1.0/teams/{team.id!s}/",
new_team_values,
content_type="application/json",
)
assert response.status_code == HTTP_401_UNAUTHORIZED
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
assert team_values == old_team_values
def test_api_teams_update_authenticated_unrelated(
client, force_login_via_resource_server
):
"""
Authenticated users should not be allowed to update a team to which they are not related.
"""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
team = factories.TeamFactory(service_providers=[service_provider])
old_team_values = serializers.TeamSerializer(instance=team).data
new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.put(
f"/resource-server/v1.0/teams/{team.id!s}/",
new_team_values,
content_type="application/json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Team matches the given query."}
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
assert team_values == old_team_values
def test_api_teams_update_authenticated(
client,
force_login_via_resource_server,
):
"""
Authenticated users should be allowed to update a team to which they
are related whatever the role even if the request is authenticated via
a resource server.
"""
service_provider = factories.ServiceProviderFactory(
audience_id="some_service_provider"
)
user = factories.UserFactory()
team = factories.TeamFactory(
name="Old name",
users=[(user, "owner")],
service_providers=[service_provider],
)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.put(
f"/resource-server/v1.0/teams/{team.id!s}/",
data=serializers.TeamSerializer(instance=team).data
| {
"name": "New name",
},
content_type="application/json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_200_OK
team.refresh_from_db()
assert team.name == "New name"
def test_api_teams_update_authenticated_other_resource_server(
client, force_login_via_resource_server
):
"""
Authenticated users should not be able to update a team for which they are directly
owner, if the request is authenticated via a different service provider.
"""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
other_service_provider = factories.ServiceProviderFactory(
audience_id="some_service_provider"
)
team = factories.TeamFactory(
name="Old name",
users=[(user, "owner")],
service_providers=[other_service_provider],
)
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.put(
f"/resource-server/v1.0/teams/{team.id!s}/",
data=serializers.TeamSerializer(instance=team).data
| {
"name": "New name",
},
content_type="application/json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Team matches the given query."}
team.refresh_from_db()
assert team.name == "Old name"
def test_api_teams_update_authenticated_members(
client, force_login_via_resource_server
):
"""
Users who are members of a team but not administrators should
not be allowed to update it.
"""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
team = factories.TeamFactory(
users=[(user, "member")], service_providers=[service_provider]
)
old_team_values = serializers.TeamSerializer(instance=team).data
new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.put(
f"/resource-server/v1.0/teams/{team.id!s}/",
new_team_values,
content_type="application/json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_403_FORBIDDEN
assert response.json() == {
"detail": "You do not have permission to perform this action."
}
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
assert team_values == old_team_values
@pytest.mark.parametrize("role", ["owner", "administrator"])
def test_api_teams_update_authenticated_administrators(
client, force_login_via_resource_server, role
):
"""Administrators or owners of a team should be allowed to update it."""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
team = factories.TeamFactory(
users=[(user, role)],
service_providers=[service_provider],
name="old name",
)
initial_created_at = team.created_at
initial_updated_at = team.updated_at
initial_pk = team.pk
# generate new random values
new_values = serializers.TeamSerializer(instance=factories.TeamFactory.build()).data
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.put(
f"/resource-server/v1.0/teams/{team.id!s}/",
new_values,
content_type="application/json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_200_OK
team.refresh_from_db()
assert team.pk == initial_pk
assert team.name == new_values["name"]
assert team.created_at == initial_created_at
assert team.updated_at > initial_updated_at
@pytest.mark.parametrize("role", ["owner", "administrator"])
def test_api_teams_update_administrator_or_owner_of_another(
client, force_login_via_resource_server, role
):
"""
Being administrator or owner of a team should not grant authorization to update
another team.
"""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
factories.TeamFactory(users=[(user, role)], service_providers=[service_provider])
team = factories.TeamFactory(name="Old name", service_providers=[service_provider])
old_team_values = serializers.TeamSerializer(instance=team).data
new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data
with force_login_via_resource_server(client, user, service_provider.audience_id):
response = client.put(
f"/resource-server/v1.0/teams/{team.id!s}/",
new_team_values,
content_type="application/json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_404_NOT_FOUND
assert response.json() == {"detail": "No Team matches the given query."}
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
assert team_values == old_team_values

View File

@@ -0,0 +1 @@
"""Team accesses tests package."""

View File

@@ -0,0 +1 @@
"""Teams tests package."""

View File

@@ -2,10 +2,7 @@
Tests for Teams API endpoint in People's core app: list
"""
from unittest import mock
import pytest
from rest_framework.pagination import PageNumberPagination
from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED
from rest_framework.test import APIClient
@@ -123,7 +120,6 @@ def test_api_teams_order_param():
assert response.status_code == 200
response_data = response.json()
response_team_ids = [team["id"] for team in response_data]
assert (

View File

@@ -72,4 +72,5 @@ def test_api_teams_retrieve_authenticated_related():
"abilities": team.get_abilities(user),
"created_at": team.created_at.isoformat().replace("+00:00", "Z"),
"updated_at": team.updated_at.isoformat().replace("+00:00", "Z"),
"service_providers": [],
}

View File

@@ -188,3 +188,34 @@ def test_api_teams_update_administrator_or_owner_of_another():
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
assert team_values == old_team_values
def test_api_teams_update_authenticated_owners_add_service_providers():
"""
Owners of a team should be allowed to update its service providers.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
team = factories.TeamFactory(users=[(user, "owner")])
new_team_values = serializers.TeamSerializer(instance=team).data
service_provider_1 = factories.ServiceProviderFactory()
service_provider_2 = factories.ServiceProviderFactory()
new_team_values["service_providers"] = [
service_provider_1.pk,
service_provider_2.pk,
]
response = client.put(
f"/api/v1.0/teams/{team.id!s}/",
new_team_values,
format="json",
)
assert response.status_code == HTTP_200_OK
team.refresh_from_db()
assert team.service_providers.count() == 2
assert set(team.service_providers.all()) == {service_provider_1, service_provider_2}