(project) first proof of concept printing pdf from markdown

This is a boilerplate inspired from https://github.com/openfun/joanie
This commit is contained in:
Samuel Paccoud - DINUM
2024-01-09 15:30:36 +01:00
parent 2d81979b0a
commit 62df0524ac
95 changed files with 8252 additions and 1 deletions

View File

37
src/backend/core/admin.py Normal file
View File

@@ -0,0 +1,37 @@
"""Admin classes and registrations for Magnify's core app."""
from django.contrib import admin
from . import models
class IdentityInline(admin.TabularInline):
"""Inline admin class for user identities."""
model = models.Identity
extra = 0
@admin.register(models.User)
class UserAdmin(admin.ModelAdmin):
"""User admin interface declaration."""
inlines = (IdentityInline,)
class TemplateAccessInline(admin.TabularInline):
"""Inline admin class for template accesses."""
model = models.TemplateAccess
extra = 0
@admin.register(models.Template)
class TemplateAdmin(admin.ModelAdmin):
"""Template admin interface declaration."""
inlines = (TemplateAccessInline,)
@admin.register(models.Team)
class TeamAdmin(admin.ModelAdmin):
"""Team admin interface declaration."""

View File

@@ -0,0 +1,39 @@
"""publish core API endpoints"""
from django.conf import settings
from django.core.exceptions import ValidationError
from rest_framework import exceptions as drf_exceptions
from rest_framework import views as drf_views
from rest_framework.decorators import api_view
from rest_framework.response import Response
def exception_handler(exc, context):
"""Handle Django ValidationError as an accepted exception.
For the parameters, see ``exception_handler``
This code comes from twidi's gist:
https://gist.github.com/twidi/9d55486c36b6a51bdcb05ce3a763e79f
"""
if isinstance(exc, ValidationError):
if hasattr(exc, "message_dict"):
detail = exc.message_dict
elif hasattr(exc, "message"):
detail = exc.message
elif hasattr(exc, "messages"):
detail = exc.messages
exc = drf_exceptions.ValidationError(detail=detail)
return drf_views.exception_handler(exc, context)
# pylint: disable=unused-argument
@api_view(["GET"])
def get_frontend_configuration(request):
"""Returns the frontend configuration dict as configured in settings."""
frontend_configuration = {
"LANGUAGE_CODE": settings.LANGUAGE_CODE,
}
frontend_configuration.update(settings.FRONTEND_CONFIGURATION)
return Response(frontend_configuration)

View File

@@ -0,0 +1,54 @@
"""Permission handlers for the publish core app."""
from django.core import exceptions
from rest_framework import permissions
class IsAuthenticated(permissions.BasePermission):
"""
Allows access only to authenticated users. Alternative method checking the presence
of the auth token to avoid hitting the database.
"""
def has_permission(self, request, view):
return bool(request.auth) if request.auth else request.user.is_authenticated
class IsSelf(IsAuthenticated):
"""
Allows access only to authenticated users. Alternative method checking the presence
of the auth token to avoid hitting the database.
"""
def has_object_permission(self, request, view, obj):
"""Write permissions are only allowed to the user itself."""
return obj == request.user
class IsOwnedOrPublic(IsAuthenticated):
"""
Allows access to authenticated users only for objects that are owned or not related
to any user via the "owner" field.
"""
def has_object_permission(self, request, view, obj):
"""Unsafe permissions are only allowed for the owner of the object."""
if obj.owner == request.user:
return True
if request.method in permissions.SAFE_METHODS and obj.owner is None:
return True
try:
return obj.user == request.user
except exceptions.ObjectDoesNotExist:
return False
class AccessPermission(IsAuthenticated):
"""Permission class for access objects."""
def has_object_permission(self, request, view, obj):
"""Check permission for a given object."""
abilities = obj.get_abilities(request.user)
return abilities.get(request.method.lower(), False)

View File

@@ -0,0 +1,137 @@
"""Client serializers for the publish core app."""
from rest_framework import exceptions, serializers
from timezone_field.rest_framework import TimeZoneSerializerField
from core import models
class ContactSerializer(serializers.ModelSerializer):
"""Serialize contacts."""
class Meta:
model = models.Contact
fields = [
"id",
"base",
"data",
"full_name",
"owner",
"short_name",
]
read_only_fields = ["id", "owner"]
def update(self, instance, validated_data):
"""Make "base" field readonly but only for update/patch."""
validated_data.pop("base", None)
return super().update(instance, validated_data)
class UserSerializer(serializers.ModelSerializer):
"""Serialize users."""
timezone = TimeZoneSerializerField(use_pytz=False, required=True)
class Meta:
model = models.User
fields = [
"id",
"language",
"timezone",
"is_device",
"is_staff",
]
read_only_fields = ["id", "is_device", "is_staff"]
class TeamAccessSerializer(serializers.ModelSerializer):
"""Serialize team accesses."""
abilities = serializers.SerializerMethodField(read_only=True)
class Meta:
model = models.TeamAccess
fields = ["id", "user", "role", "abilities"]
read_only_fields = ["id", "abilities"]
def update(self, instance, validated_data):
"""Make "user" field is readonly but only on update."""
validated_data.pop("user", None)
return super().update(instance, validated_data)
def get_abilities(self, access) -> dict:
"""Return abilities of the logged-in user on the instance."""
request = self.context.get("request")
if request:
return access.get_abilities(request.user)
return {}
def validate(self, attrs):
"""
Check access rights specific to writing (create/update)
"""
request = self.context.get("request")
user = getattr(request, "user", None)
role = attrs.get("role")
# Update
if self.instance:
can_set_role_to = self.instance.get_abilities(user)["set_role_to"]
if role and role not in can_set_role_to:
message = (
f"You are only allowed to set role to {', '.join(can_set_role_to)}"
if can_set_role_to
else "You are not allowed to set this role for this team."
)
raise exceptions.PermissionDenied(message)
# Create
else:
try:
team_id = self.context["team_id"]
except KeyError as exc:
raise exceptions.ValidationError(
"You must set a team ID in kwargs to create a new team access."
) from exc
if not models.TeamAccess.objects.filter(
team=team_id,
user=user,
role__in=[models.RoleChoices.OWNER, models.RoleChoices.ADMIN],
).exists():
raise exceptions.PermissionDenied(
"You are not allowed to manage accesses for this team."
)
if (
role == models.RoleChoices.OWNER
and not models.TeamAccess.objects.filter(
team=team_id,
user=user,
role=models.RoleChoices.OWNER,
).exists()
):
raise exceptions.PermissionDenied(
"Only owners of a team can assign other users as owners."
)
attrs["team_id"] = self.context["team_id"]
return attrs
class TeamSerializer(serializers.ModelSerializer):
"""Serialize teams."""
abilities = serializers.SerializerMethodField(read_only=True)
accesses = TeamAccessSerializer(many=True, read_only=True)
class Meta:
model = models.Team
fields = ["id", "name", "accesses", "abilities"]
read_only_fields = ["id", "accesses", "abilities"]
def get_abilities(self, team) -> dict:
"""Return abilities of the logged-in user on the instance."""
request = self.context.get("request")
if request:
return team.get_abilities(request.user)
return {}

View File

@@ -0,0 +1,14 @@
"""
Utils that can be useful throughout the publish core app
"""
from rest_framework_simplejwt.tokens import RefreshToken
def get_tokens_for_user(user):
"""Get JWT tokens for user authentication."""
refresh = RefreshToken.for_user(user)
return {
"refresh": str(refresh),
"access": str(refresh.access_token),
}

View File

@@ -0,0 +1,349 @@
"""API endpoints"""
from django.contrib.postgres.search import TrigramSimilarity
from django.core.cache import cache
from django.db.models import (
Func,
OuterRef,
Q,
Subquery,
Value,
)
from rest_framework import (
decorators,
exceptions,
mixins,
pagination,
response,
viewsets,
)
from core import models
from . import permissions, serializers
class NestedGenericViewSet(viewsets.GenericViewSet):
"""
A generic Viewset aims to be used in a nested route context.
e.g: `/api/v1.0/resource_1/<resource_1_pk>/resource_2/<resource_2_pk>/`
It allows to define all url kwargs and lookup fields to perform the lookup.
"""
lookup_fields: list[str] = ["pk"]
lookup_url_kwargs: list[str] = []
def __getattribute__(self, item):
"""
This method is overridden to allow to get the last lookup field or lookup url kwarg
when accessing the `lookup_field` or `lookup_url_kwarg` attribute. This is useful
to keep compatibility with all methods used by the parent class `GenericViewSet`.
"""
if item in ["lookup_field", "lookup_url_kwarg"]:
return getattr(self, item + "s", [None])[-1]
return super().__getattribute__(item)
def get_queryset(self):
"""
Get the list of items for this view.
`lookup_fields` attribute is enumerated here to perform the nested lookup.
"""
queryset = super().get_queryset()
# The last lookup field is removed to perform the nested lookup as it corresponds
# to the object pk, it is used within get_object method.
lookup_url_kwargs = (
self.lookup_url_kwargs[:-1]
if self.lookup_url_kwargs
else self.lookup_fields[:-1]
)
filter_kwargs = {}
for index, lookup_url_kwarg in enumerate(lookup_url_kwargs):
if lookup_url_kwarg not in self.kwargs:
raise KeyError(
f"Expected view {self.__class__.__name__} to be called with a URL "
f'keyword argument named "{lookup_url_kwarg}". Fix your URL conf, or '
"set the `.lookup_fields` attribute on the view correctly."
)
filter_kwargs.update(
{self.lookup_fields[index]: self.kwargs[lookup_url_kwarg]}
)
return queryset.filter(**filter_kwargs)
class SerializerPerActionMixin:
"""
A mixin to allow to define serializer classes for each action.
This mixin is useful to avoid to define a serializer class for each action in the
`get_serializer_class` method.
"""
serializer_classes: dict[str, type] = {}
default_serializer_class: type = None
def get_serializer_class(self):
"""
Return the serializer class to use depending on the action.
"""
return self.serializer_classes.get(self.action, self.default_serializer_class)
class Pagination(pagination.PageNumberPagination):
"""Pagination to display no more than 100 objects per page sorted by creation date."""
ordering = "-created_on"
max_page_size = 100
page_size_query_param = "page_size"
# pylint: disable=too-many-ancestors
class ContactViewSet(
mixins.CreateModelMixin,
mixins.DestroyModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
):
"""Contact ViewSet"""
permission_classes = [permissions.IsOwnedOrPublic]
queryset = models.Contact.objects.all()
serializer_class = serializers.ContactSerializer
def list(self, request, *args, **kwargs):
"""Limit listed users by a query with throttle protection."""
user = self.request.user
queryset = self.filter_queryset(self.get_queryset())
if not user.is_authenticated:
return queryset.none()
# Exclude contacts that:
queryset = queryset.filter(
# - belong to another user (keep public and owned contacts)
Q(owner__isnull=True) | Q(owner=user),
# - are profile contacts for a user
user__isnull=True,
# - are overriden base contacts
overriding_contacts__isnull=True,
)
# Search by case-insensitive and accent-insensitive trigram similarity
if query := self.request.GET.get("q", ""):
query = Func(Value(query), function="unaccent")
similarity = TrigramSimilarity(
Func("full_name", function="unaccent"),
query,
) + TrigramSimilarity(Func("short_name", function="unaccent"), query)
queryset = (
queryset.annotate(similarity=similarity)
.filter(
similarity__gte=0.05
) # Value determined by testing (test_api_contacts.py)
.order_by("-similarity")
)
# Throttle protection
key_base = f"throttle-contact-list-{user.id!s}"
key_minute = f"{key_base:s}-minute"
key_hour = f"{key_base:s}-hour"
try:
count_minute = cache.incr(key_minute)
except ValueError:
cache.set(key_minute, 1, 60)
count_minute = 1
try:
count_hour = cache.incr(key_hour)
except ValueError:
cache.set(key_hour, 1, 3600)
count_hour = 1
if count_minute > 20 or count_hour > 150:
raise exceptions.Throttled()
serializer = self.get_serializer(queryset, many=True)
return response.Response(serializer.data)
def perform_create(self, serializer):
"""Set the current user as owner of the newly created contact."""
user = self.request.user
serializer.validated_data["owner"] = user
return super().perform_create(serializer)
class UserViewSet(
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
):
"""User ViewSet"""
permission_classes = [permissions.IsSelf]
queryset = models.User.objects.all()
serializer_class = serializers.UserSerializer
@decorators.action(
detail=False,
methods=["get"],
url_name="me",
url_path="me",
permission_classes=[permissions.IsAuthenticated],
)
def get_me(self, request):
"""
Return information on currently logged user
"""
context = {"request": request}
return response.Response(
self.serializer_class(request.user, context=context).data
)
class TeamViewSet(
mixins.CreateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
):
"""Team ViewSet"""
permission_classes = [permissions.AccessPermission]
serializer_class = serializers.TeamSerializer
queryset = models.Team.objects.all()
def get_queryset(self):
"""Custom queryset to get user related teams."""
user_role_query = models.TeamAccess.objects.filter(
user=self.request.user, team=OuterRef("pk")
).values("role")[:1]
return models.Team.objects.filter(accesses__user=self.request.user).annotate(
user_role=Subquery(user_role_query)
)
def perform_create(self, serializer):
"""Set the current user as owner of the newly created team."""
team = serializer.save()
models.TeamAccess.objects.create(
team=team,
user=self.request.user,
role=models.RoleChoices.OWNER,
)
class TeamAccessViewSet(
mixins.CreateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
):
"""
API ViewSet for all interactions with team accesses.
GET /api/v1.0/teams/<team_id>/accesses/:<team_access_id>
Return list of all team accesses related to the logged-in user or one
team access if an id is provided.
POST /api/v1.0/teams/<team_id>/accesses/ with expected data:
- user: str
- role: str [owner|admin|member]
Return newly created team access
PUT /api/v1.0/teams/<team_id>/accesses/<team_access_id>/ with expected data:
- role: str [owner|admin|member]
Return updated team access
PATCH /api/v1.0/teams/<team_id>/accesses/<team_access_id>/ with expected data:
- role: str [owner|admin|member]
Return partially updated team access
DELETE /api/v1.0/teams/<team_id>/accesses/<team_access_id>/
Delete targeted team access
"""
lookup_field = "pk"
pagination_class = Pagination
permission_classes = [permissions.AccessPermission]
queryset = models.TeamAccess.objects.all().select_related("user")
serializer_class = serializers.TeamAccessSerializer
def get_permissions(self):
"""User only needs to be authenticated to list team accesses"""
if self.action == "list":
permission_classes = [permissions.IsAuthenticated]
else:
return super().get_permissions()
return [permission() for permission in permission_classes]
def get_serializer_context(self):
"""Extra context provided to the serializer class."""
context = super().get_serializer_context()
context["team_id"] = self.kwargs["team_id"]
return context
def get_queryset(self):
"""Return the queryset according to the action."""
queryset = super().get_queryset()
queryset = queryset.filter(team=self.kwargs["team_id"])
if self.action == "list":
# Limit to team access instances related to a team THAT also has a team access
# instance for the logged-in user (we don't want to list only the team access
# instances pointing to the logged-in user)
user_role_query = models.TeamAccess.objects.filter(
team__accesses__user=self.request.user
).values("role")[:1]
queryset = (
queryset.filter(
team__accesses__user=self.request.user,
)
.annotate(user_role=Subquery(user_role_query))
.distinct()
)
return queryset
def destroy(self, request, *args, **kwargs):
"""Forbid deleting the last owner access"""
instance = self.get_object()
team = instance.team
# Check if the access being deleted is the last owner access for the team
if instance.role == "owner" and team.accesses.filter(role="owner").count() == 1:
return response.Response(
{"detail": "Cannot delete the last owner access for the team."},
status=400,
)
return super().destroy(request, *args, **kwargs)
def perform_update(self, serializer):
"""Check that we don't change the role if it leads to losing the last owner."""
instance = serializer.instance
# Check if the role is being updated and the new role is not "owner"
if (
"role" in self.request.data
and self.request.data["role"] != models.RoleChoices.OWNER
):
team = instance.team
# Check if the access being updated is the last owner access for the team
if (
instance.role == models.RoleChoices.OWNER
and team.accesses.filter(role=models.RoleChoices.OWNER).count() == 1
):
message = "Cannot change the role to a non-owner role for the last owner access."
raise exceptions.ValidationError({"role": message})
serializer.save()

11
src/backend/core/apps.py Normal file
View File

@@ -0,0 +1,11 @@
"""publish Core application"""
# from django.apps import AppConfig
# from django.utils.translation import gettext_lazy as _
# class CoreConfig(AppConfig):
# """Configuration class for the publish core app."""
# name = "core"
# app_label = "core"
# verbose_name = _("publish core application")

View File

@@ -0,0 +1,59 @@
"""Authentication for the publish core app."""
from django.conf import settings
from django.utils.functional import SimpleLazyObject
from django.utils.module_loading import import_string
from django.utils.translation import gettext_lazy as _
from drf_spectacular.authentication import SessionScheme, TokenScheme
from drf_spectacular.plumbing import build_bearer_security_scheme_object
from rest_framework import authentication
from rest_framework_simplejwt.authentication import JWTAuthentication
class DelegatedJWTAuthentication(JWTAuthentication):
"""Override JWTAuthentication to create missing users on the fly."""
def get_user(self, validated_token):
"""
Return the user related to the given validated token, creating or updating it if necessary.
"""
get_user = import_string(settings.JWT_USER_GETTER)
return SimpleLazyObject(lambda: get_user(validated_token))
class OpenApiJWTAuthenticationExtension(TokenScheme):
"""Extension for specifying JWT authentication schemes."""
target_class = "core.authentication.DelegatedJWTAuthentication"
name = "DelegatedJWTAuthentication"
def get_security_definition(self, auto_schema):
"""Return the security definition for JWT authentication."""
return build_bearer_security_scheme_object(
header_name="Authorization",
token_prefix="Bearer", # noqa S106
)
class SessionAuthenticationWithAuthenticateHeader(authentication.SessionAuthentication):
"""
This class is needed, because REST Framework's default SessionAuthentication does
never return 401's, because they cannot fill the WWW-Authenticate header with a
valid value in the 401 response. As a result, we cannot distinguish calls that are
not unauthorized (401 unauthorized) and calls for which the user does not have
permission (403 forbidden).
See https://github.com/encode/django-rest-framework/issues/5968
We do set authenticate_header function in SessionAuthentication, so that a value
for the WWW-Authenticate header can be retrieved and the response code is
automatically set to 401 in case of unauthenticated requests.
"""
def authenticate_header(self, request):
return "Session"
class OpenApiSessionAuthenticationExtension(SessionScheme):
"""Extension for specifying session authentication schemes."""
target_class = "core.api.authentication.SessionAuthenticationWithAuthenticateHeader"

15
src/backend/core/enums.py Normal file
View File

@@ -0,0 +1,15 @@
"""
Core application enums declaration
"""
from django.conf import global_settings, settings
from django.utils.translation import gettext_lazy as _
# Django sets `LANGUAGES` by default with all supported languages. We can use it for
# the choice of languages which should not be limited to the few languages active in
# the app.
# pylint: disable=no-member
ALL_LANGUAGES = getattr(
settings,
"ALL_LANGUAGES",
[(language, _(name)) for language, name in global_settings.LANGUAGES],
)

View File

@@ -0,0 +1,66 @@
# ruff: noqa: S311
"""
Core application factories
"""
from django.conf import settings
from django.contrib.auth.hashers import make_password
import factory.fuzzy
from faker import Faker
from core import models
fake = Faker()
class UserFactory(factory.django.DjangoModelFactory):
"""A factory to random users for testing purposes."""
class Meta:
model = models.User
language = factory.fuzzy.FuzzyChoice([lang[0] for lang in settings.LANGUAGES])
password = make_password("password")
class IdentityFactory(factory.django.DjangoModelFactory):
"""A factory to create identities for a user"""
class Meta:
model = models.Identity
django_get_or_create = ("sub",)
user = factory.SubFactory(UserFactory)
sub = factory.Sequence(lambda n: f"user{n!s}")
email = factory.Faker("email")
class TeamFactory(factory.django.DjangoModelFactory):
"""A factory to create teams"""
class Meta:
model = models.Team
django_get_or_create = ("name",)
name = factory.Sequence(lambda n: f"team{n}")
@factory.post_generation
def users(self, create, extracted, **kwargs):
"""Add users to team from a given list of users with or without roles."""
if create and extracted:
for item in extracted:
if isinstance(item, models.User):
TeamAccessFactory(team=self, user=item)
else:
TeamAccessFactory(team=self, user=item[0], role=item[1])
class TeamAccessFactory(factory.django.DjangoModelFactory):
"""Create fake team user accesses for testing."""
class Meta:
model = models.TeamAccess
team = factory.SubFactory(TeamFactory)
user = factory.SubFactory(UserFactory)
role = factory.fuzzy.FuzzyChoice([r[0] for r in models.RoleChoices.choices])

View File

@@ -0,0 +1,8 @@
"""Forms for the core app of Publish"""
from django import forms
from .models import Template
class DocumentGenerationForm(forms.Form):
body = forms.CharField(widget=forms.Textarea, label="Markdown Body")
template = forms.ModelChoiceField(queryset=Template.objects.all(), label="Choose Template")

View File

@@ -0,0 +1,129 @@
# Generated by Django 5.0 on 2024-01-06 18:05
import django.contrib.auth.models
import django.core.validators
import django.db.models.deletion
import timezone_field.fields
import uuid
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
]
operations = [
migrations.CreateModel(
name='Team',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
('name', models.CharField(max_length=100)),
],
options={
'verbose_name': 'Team',
'verbose_name_plural': 'Teams',
'db_table': 'publish_role',
'ordering': ('name',),
},
),
migrations.CreateModel(
name='Template',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
('title', models.CharField(max_length=255, verbose_name='title')),
('description', models.TextField(blank=True, verbose_name='description')),
('code', models.TextField(blank=True, verbose_name='code')),
('css', models.TextField(blank=True, verbose_name='css')),
('is_public', models.BooleanField(default=False, help_text='Whether this template is public for anyone to use.', verbose_name='public')),
],
options={
'verbose_name': 'Template',
'verbose_name_plural': 'Templates',
'db_table': 'publish_template',
'ordering': ('title',),
},
),
migrations.CreateModel(
name='User',
fields=[
('password', models.CharField(max_length=128, verbose_name='password')),
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
('email', models.EmailField(max_length=254, unique=True, verbose_name='email address')),
('language', models.CharField(choices="(('en-us', 'English'), ('fr-fr', 'French'))", default='en-us', help_text='The language in which the user wants to see the interface.', max_length=10, verbose_name='language')),
('timezone', timezone_field.fields.TimeZoneField(choices_display='WITH_GMT_OFFSET', default='UTC', help_text='The timezone in which the user wants to see times.', use_pytz=False)),
('is_device', models.BooleanField(default=False, help_text='Whether the user is a device or a real user.', verbose_name='device')),
('is_staff', models.BooleanField(default=False, help_text='Whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
],
options={
'verbose_name': 'user',
'verbose_name_plural': 'users',
'db_table': 'publish_user',
},
managers=[
('objects', django.contrib.auth.models.UserManager()),
],
),
migrations.CreateModel(
name='Identity',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
('sub', models.CharField(blank=True, help_text='Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_ characters only.', max_length=255, null=True, unique=True, validators=[django.core.validators.RegexValidator(message='Enter a valid sub. This value may contain only letters, numbers, and @/./+/-/_ characters.', regex='^[\\w.@+-]+\\Z')], verbose_name='sub')),
('email', models.EmailField(max_length=254, verbose_name='email address')),
('is_main', models.BooleanField(default=False, help_text='Designates whether the email is the main one.', verbose_name='main')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='identities', to=settings.AUTH_USER_MODEL)),
],
options={
'verbose_name': 'identity',
'verbose_name_plural': 'identities',
'db_table': 'publish_identity',
'ordering': ('-is_main', 'email'),
},
),
migrations.CreateModel(
name='TemplateAccess',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
('role', models.CharField(choices=[('member', 'Member'), ('administrator', 'Administrator'), ('owner', 'Owner')], default='member', max_length=20)),
('team', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='accesses', to='core.team')),
('template', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='accesses', to='core.template')),
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='accesses', to=settings.AUTH_USER_MODEL)),
],
options={
'verbose_name': 'Template/user relation',
'verbose_name_plural': 'Template/user relations',
'db_table': 'publish_template_access',
},
),
migrations.AddConstraint(
model_name='identity',
constraint=models.UniqueConstraint(fields=('user', 'email'), name='unique_user_email', violation_error_message='This email address is already declared for this user.'),
),
migrations.AddConstraint(
model_name='templateaccess',
constraint=models.UniqueConstraint(fields=('user', 'template'), name='unique_template_user', violation_error_message='This user is already in this template.'),
),
migrations.AddConstraint(
model_name='templateaccess',
constraint=models.UniqueConstraint(fields=('team', 'template'), name='unique_template_team', violation_error_message='This team is already in this template.'),
),
]

View File

393
src/backend/core/models.py Normal file
View File

@@ -0,0 +1,393 @@
"""
Declare and configure the models for the publish core application
"""
import json
import os
import textwrap
import uuid
from django.conf import settings
from django.contrib.auth import models as auth_models
from django.contrib.auth.base_user import AbstractBaseUser
from django.core import exceptions, mail, validators
from django.db import models
from django.utils.functional import lazy
from django.utils.translation import gettext_lazy as _
from django.template.base import Template as DjangoTemplate
from django.template.context import Context
from django.template.engine import Engine
import markdown
from weasyprint import CSS, HTML
from weasyprint.text.fonts import FontConfiguration
import jsonschema
from rest_framework_simplejwt.exceptions import InvalidToken
from rest_framework_simplejwt.settings import api_settings
from timezone_field import TimeZoneField
class RoleChoices(models.TextChoices):
"""Defines the possible roles a user can have in a template."""
MEMBER = "member", _("Member")
ADMIN = "administrator", _("Administrator")
OWNER = "owner", _("Owner")
class BaseModel(models.Model):
"""
Serves as an abstract base model for other models, ensuring that records are validated
before saving as Django doesn't do it by default.
Includes fields common to all models: a UUID primary key and creation/update timestamps.
"""
id = models.UUIDField(
verbose_name=_("id"),
help_text=_("primary key for the record as UUID"),
primary_key=True,
default=uuid.uuid4,
editable=False,
)
created_at = models.DateTimeField(
verbose_name=_("created on"),
help_text=_("date and time at which a record was created"),
auto_now_add=True,
editable=False,
)
updated_at = models.DateTimeField(
verbose_name=_("updated on"),
help_text=_("date and time at which a record was last updated"),
auto_now=True,
editable=False,
)
class Meta:
abstract = True
def save(self, *args, **kwargs):
"""Call `full_clean` before saving."""
self.full_clean()
super().save(*args, **kwargs)
class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
"""User model to work with OIDC only authentication."""
email = models.EmailField(_("email address"), unique=True)
language = models.CharField(
max_length=10,
choices=lazy(lambda: settings.LANGUAGES, tuple)(),
default=settings.LANGUAGE_CODE,
verbose_name=_("language"),
help_text=_("The language in which the user wants to see the interface."),
)
timezone = TimeZoneField(
choices_display="WITH_GMT_OFFSET",
use_pytz=False,
default=settings.TIME_ZONE,
help_text=_("The timezone in which the user wants to see times."),
)
is_device = models.BooleanField(
_("device"),
default=False,
help_text=_("Whether the user is a device or a real user."),
)
is_staff = models.BooleanField(
_("staff status"),
default=False,
help_text=_("Whether the user can log into this admin site."),
)
is_active = models.BooleanField(
_("active"),
default=True,
help_text=_(
"Whether this user should be treated as active. "
"Unselect this instead of deleting accounts."
),
)
objects = auth_models.UserManager()
USERNAME_FIELD = "email"
REQUIRED_FIELDS = []
class Meta:
db_table = "publish_user"
verbose_name = _("user")
verbose_name_plural = _("users")
class Identity(BaseModel):
"""User identity"""
sub_validator = validators.RegexValidator(
regex=r"^[\w.@+-]+\Z",
message=_(
"Enter a valid sub. This value may contain only letters, "
"numbers, and @/./+/-/_ characters."
),
)
user = models.ForeignKey(User, related_name="identities", on_delete=models.CASCADE)
sub = models.CharField(
_("sub"),
help_text=_(
"Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_ characters only."
),
max_length=255,
unique=True,
validators=[sub_validator],
blank=True,
null=True,
)
email = models.EmailField(_("email address"))
is_main = models.BooleanField(
_("main"),
default=False,
help_text=_("Designates whether the email is the main one."),
)
class Meta:
db_table = "publish_identity"
ordering = ("-is_main", "email")
verbose_name = _("identity")
verbose_name_plural = _("identities")
constraints = [
# Uniqueness
models.UniqueConstraint(
fields=["user", "email"],
name="unique_user_email",
violation_error_message=_(
"This email address is already declared for this user."
),
),
]
def __str__(self):
main_str = "[main]" if self.is_main else ""
return f"{self.email:s}{main_str:s}"
def clean(self):
"""Normalize the email field and clean the 'is_main' field."""
self.email = User.objects.normalize_email(self.email)
if not self.user.identities.exclude(pk=self.pk).filter(is_main=True).exists():
if not self.created_at:
self.is_main = True
elif not self.is_main:
raise exceptions.ValidationError(
{"is_main": "A user should have one and only one main identity."}
)
super().clean()
def save(self, *args, **kwargs):
"""Ensure users always have one and only one main identity."""
super().save(*args, **kwargs)
if self.is_main is True:
self.user.identities.exclude(id=self.id).update(is_main=False)
class Team(BaseModel):
"""Team used for role based access control when matched with teams in OIDC tokens."""
name = models.CharField(max_length=100)
class Meta:
db_table = "publish_role"
ordering = ("name",)
verbose_name = _("Team")
verbose_name_plural = _("Teams")
def __str__(self):
return self.name
class Template(BaseModel):
"""HTML and CSS code used for formatting the print around the MarkDown body."""
title = models.CharField(_("title"), max_length=255)
description = models.TextField(_("description"), blank=True)
code = models.TextField(_("code"), blank=True)
css = models.TextField(_("css"), blank=True)
is_public = models.BooleanField(
_("public"),
default=False,
help_text=_("Whether this template is public for anyone to use."),
)
class Meta:
db_table = "publish_template"
ordering = ("title",)
verbose_name = _("Template")
verbose_name_plural = _("Templates")
def __str__(self):
return self.title
if not self.body:
return ""
return markdown.markdown(textwrap.dedent(self.body))
def generate_document(self, body):
"""
Generate and return a PDF document for this template around the
markdown body passed as argument.
"""
body_html = markdown.markdown(textwrap.dedent(body)) if body else ""
document_html = HTML(string=DjangoTemplate(self.code).render(Context({"body": body_html})))
css = CSS(
string=self.css,
font_config=FontConfiguration(),
)
return document_html.write_pdf(stylesheets=[css], zoom=1)
def get_abilities(self, user):
"""
Compute and return abilities for a given user on the template.
"""
is_owner_or_admin = False
role = None
if user.is_authenticated:
try:
role = self.user_role
except AttributeError:
try:
role = self.accesses.filter(user=user).values("role")[0]["role"]
except (TemplateAccess.DoesNotExist, IndexError):
role = None
is_owner_or_admin = role in [RoleChoices.OWNER, RoleChoices.ADMIN]
return {
"get": True,
"patch": is_owner_or_admin,
"put": is_owner_or_admin,
"delete": role == RoleChoices.OWNER,
"manage_accesses": is_owner_or_admin,
}
class TemplateAccess(BaseModel):
"""Link table between templates and contacts."""
template = models.ForeignKey(
Template,
on_delete=models.CASCADE,
related_name="accesses",
)
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name="accesses",
null=True,
blank=True,
)
team = models.ForeignKey(
Team,
on_delete=models.CASCADE,
related_name="accesses",
null=True,
blank=True,
)
role = models.CharField(
max_length=20, choices=RoleChoices.choices, default=RoleChoices.MEMBER
)
class Meta:
db_table = "publish_template_access"
verbose_name = _("Template/user relation")
verbose_name_plural = _("Template/user relations")
constraints = [
models.UniqueConstraint(
fields=["user", "template"],
name="unique_template_user",
violation_error_message=_("This user is already in this template."),
),
models.UniqueConstraint(
fields=["team", "template"],
name="unique_template_team",
violation_error_message=_("This team is already in this template."),
),
]
def __str__(self):
return f"{self.user!s} is {self.role:s} in template {self.template!s}"
def get_abilities(self, user):
"""
Compute and return abilities for a given user taking into account
the current state of the object.
"""
is_template_owner_or_admin = False
role = None
if user.is_authenticated:
try:
role = self.user_role
except AttributeError:
try:
role = self._meta.model.objects.filter(
template=self.template_id, user=user
).values("role")[0]["role"]
except (self._meta.model.DoesNotExist, IndexError):
role = None
is_template_owner_or_admin = role in [RoleChoices.OWNER, RoleChoices.ADMIN]
if self.role == RoleChoices.OWNER:
can_delete = (
user.id == self.user_id
and self.template.accesses.filter(role=RoleChoices.OWNER).count() > 1
)
set_role_to = [RoleChoices.ADMIN, RoleChoices.MEMBER] if can_delete else []
else:
can_delete = is_template_owner_or_admin
set_role_to = []
if role == RoleChoices.OWNER:
set_role_to.append(RoleChoices.OWNER)
if is_template_owner_or_admin:
set_role_to.extend([RoleChoices.ADMIN, RoleChoices.MEMBER])
# Remove the current role as we don't want to propose it as an option
try:
set_role_to.remove(self.role)
except ValueError:
pass
return {
"delete": can_delete,
"get": bool(role),
"patch": bool(set_role_to),
"put": bool(set_role_to),
"set_role_to": set_role_to,
}
def oidc_user_getter(validated_token):
"""
Given a valid OIDC token , retrieve, create or update corresponding user/contact/email from db.
The token is expected to have the following fields in payload:
- sub
- email
- ...
"""
try:
user_id = validated_token[api_settings.USER_ID_CLAIM]
except KeyError as exc:
raise InvalidToken(
_("Token contained no recognizable user identification")
) from exc
try:
user = User.objects.get(identities__sub=user_id)
except User.DoesNotExist:
user = User.objects.create()
Identities.objects.create(user=user, sub=user_id, email=validated_token["email"])
return user

View File

@@ -0,0 +1,14 @@
<!DOCTYPE html>
<html>
<head>
<title>Generate Document</title>
</head>
<body>
<h2>Generate Document</h2>
<form method="post" enctype="multipart/form-data">
{% csrf_token %}
{{ form.as_p }}
<button type="submit">Generate PDF</button>
</form>
</body>
</html>

View File

View File

@@ -0,0 +1,41 @@
"""
Test suite for generated openapi schema.
"""
import json
from io import StringIO
from django.core.management import call_command
from django.test import Client
import pytest
pytestmark = pytest.mark.django_db
def test_openapi_client_schema():
"""
Generated and served OpenAPI client schema should be correct.
"""
# Start by generating the swagger.json file
output = StringIO()
call_command(
"spectacular",
"--api-version",
"v1.0",
"--urlconf",
"publish.api_urls",
"--format",
"openapi-json",
"--file",
"core/tests/swagger/swagger.json",
stdout=output,
)
assert output.getvalue() == ""
response = Client().get("/v1.0/swagger.json")
assert response.status_code == 200
with open(
"core/tests/swagger/swagger.json", "r", encoding="utf-8"
) as expected_schema:
assert response.json() == json.load(expected_schema)

View File

@@ -0,0 +1,50 @@
"""
Tests for Teams API endpoint in publish's core app: create
"""
import pytest
from rest_framework.test import APIClient
from rest_framework_simplejwt.tokens import AccessToken
from core.factories import IdentityFactory, TeamFactory, UserFactory
from core.models import Team
from ..utils import OIDCToken
pytestmark = pytest.mark.django_db
def test_api_teams_create_anonymous():
"""Anonymous users should not be allowed to create teams."""
response = APIClient().post(
"/api/v1.0/teams/",
{
"name": "my team",
},
)
assert response.status_code == 401
assert not Team.objects.exists()
def test_api_teams_create_authenticated():
"""
Authenticated users should be able to create teams and should automatically be declared
as the owner of the newly created team.
"""
identity = IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
response = APIClient().post(
"/api/v1.0/teams/",
{
"name": "my team",
},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 201
team = Team.objects.get()
assert team.name == "my team"
assert team.accesses.filter(role="owner", user=user).exists()

View File

@@ -0,0 +1,107 @@
"""
Tests for Teams API endpoint in publish's core app: delete
"""
import pytest
from rest_framework.test import APIClient
from rest_framework_simplejwt.tokens import AccessToken
from core import factories, models
from ..utils import OIDCToken
pytestmark = pytest.mark.django_db
def test_api_teams_delete_anonymous():
"""Anonymous users should not be allowed to destroy a team."""
team = factories.TeamFactory()
response = APIClient().delete(
f"/api/v1.0/teams/{team.id!s}/",
)
assert response.status_code == 401
assert models.Team.objects.count() == 1
def test_api_teams_delete_authenticated_unrelated():
"""
Authenticated users should not be allowed to delete a team to which they are not
related.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory()
response = APIClient().delete(
f"/api/v1.0/teams/{team.id!s}/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 404
assert response.json() == {"detail": "Not found."}
assert models.Team.objects.count() == 1
def test_api_teams_delete_authenticated_member():
"""
Authenticated users should not be allowed to delete a team for which they are
only a member.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "member")])
response = APIClient().delete(
f"/api/v1.0/teams/{team.id}/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 403
assert response.json() == {
"detail": "You do not have permission to perform this action."
}
assert models.Team.objects.count() == 1
def test_api_teams_delete_authenticated_administrator():
"""
Authenticated users should not be allowed to delete a team for which they are
administrator.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "administrator")])
response = APIClient().delete(
f"/api/v1.0/teams/{team.id}/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 403
assert response.json() == {
"detail": "You do not have permission to perform this action."
}
assert models.Team.objects.count() == 1
def test_api_teams_delete_authenticated_owner():
"""
Authenticated users should be able to delete a team for which they are directly
owner.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "owner")])
response = APIClient().delete(
f"/api/v1.0/teams/{team.id}/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 204
assert models.Team.objects.exists() is False

View File

@@ -0,0 +1,118 @@
"""
Tests for Teams API endpoint in publish's core app: list
"""
from unittest import mock
import pytest
from rest_framework.pagination import PageNumberPagination
from rest_framework.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED
from rest_framework.test import APIClient
from core import factories, models
from core.api import serializers
from ..utils import OIDCToken
pytestmark = pytest.mark.django_db
def test_api_teams_list_anonymous():
"""Anonymous users should not be allowed to list teams."""
factories.TeamFactory.create_batch(2)
response = APIClient().get("/api/v1.0/teams/")
assert response.status_code == HTTP_401_UNAUTHORIZED
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
def test_api_teams_list_authenticated():
"""Authenticated users should be able to list teams they are an owner/administrator/member of."""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
expected_ids = {
str(access.team.id)
for access in factories.TeamAccessFactory.create_batch(5, user=user)
}
factories.TeamFactory.create_batch(2) # Other teams
response = APIClient().get(
"/api/v1.0/teams/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == HTTP_200_OK
results = response.json()["results"]
assert len(results) == 5
results_id = {result["id"] for result in results}
assert expected_ids == results_id
@mock.patch.object(PageNumberPagination, "get_page_size", return_value=2)
def test_api_teams_list_pagination(
_mock_page_size,
):
"""Pagination should work as expected."""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team_ids = [
str(access.team.id)
for access in factories.TeamAccessFactory.create_batch(3, user=user)
]
# Get page 1
response = APIClient().get(
"/api/v1.0/teams/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == HTTP_200_OK
content = response.json()
assert content["count"] == 3
assert content["next"] == "http://testserver/api/v1.0/teams/?page=2"
assert content["previous"] is None
assert len(content["results"]) == 2
for item in content["results"]:
team_ids.remove(item["id"])
# Get page 2
response = APIClient().get(
"/api/v1.0/teams/?page=2", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == HTTP_200_OK
content = response.json()
assert content["count"] == 3
assert content["next"] is None
assert content["previous"] == "http://testserver/api/v1.0/teams/"
assert len(content["results"]) == 1
team_ids.remove(content["results"][0]["id"])
assert team_ids == []
def test_api_teams_list_authenticated_distinct():
"""A team with several related users should only be listed once."""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
other_user = factories.UserFactory()
team = factories.TeamFactory(users=[user, other_user])
response = APIClient().get(
"/api/v1.0/teams/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == HTTP_200_OK
content = response.json()
assert len(content["results"]) == 1
assert content["results"][0]["id"] == str(team.id)

View File

@@ -0,0 +1,86 @@
"""
Tests for Teams API endpoint in publish's core app: retrieve
"""
import random
from collections import Counter
from unittest import mock
import pytest
from rest_framework.test import APIClient
from core import factories
from ..utils import OIDCToken
pytestmark = pytest.mark.django_db
def test_api_teams_retrieve_anonymous():
"""Anonymous users should not be allowed to retrieve a team."""
team = factories.TeamFactory()
response = APIClient().get(f"/api/v1.0/teams/{team.id}/")
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
def test_api_teams_retrieve_authenticated_unrelated():
"""
Authenticated users should not be allowed to retrieve a team to which they are
not related.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory()
response = APIClient().get(
f"/api/v1.0/teams/{team.id!s}/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 404
assert response.json() == {"detail": "Not found."}
def test_api_teams_retrieve_authenticated_related():
"""
Authenticated users should be allowed to retrieve a team to which they
are related whatever the role.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory()
access1 = factories.TeamAccessFactory(team=team, user=user)
access2 = factories.TeamAccessFactory(team=team)
response = APIClient().get(
f"/api/v1.0/teams/{team.id!s}/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 200
content = response.json()
assert sorted(content.pop("accesses"), key=lambda x: x["user"]) == sorted(
[
{
"id": str(access1.id),
"user": str(user.id),
"role": access1.role,
"abilities": access1.get_abilities(user),
},
{
"id": str(access2.id),
"user": str(access2.user.id),
"role": access2.role,
"abilities": access2.get_abilities(user),
},
],
key=lambda x: x["user"],
)
assert response.json() == {
"id": str(team.id),
"name": team.name,
"abilities": team.get_abilities(user),
}

View File

@@ -0,0 +1,176 @@
"""
Tests for Teams API endpoint in publish's core app: update
"""
import random
import pytest
from rest_framework.test import APIClient
from rest_framework_simplejwt.tokens import AccessToken
from core import factories, models
from core.api import serializers
from ..utils import OIDCToken
pytestmark = pytest.mark.django_db
def test_api_teams_update_anonymous():
"""Anonymous users should not be allowed to update a team."""
team = factories.TeamFactory()
old_team_values = serializers.TeamSerializer(instance=team).data
new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data
response = APIClient().put(
f"/api/v1.0/teams/{team.id!s}/",
new_team_values,
format="json",
)
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
assert team_values == old_team_values
def test_api_teams_update_authenticated_unrelated():
"""
Authenticated users should not be allowed to update a team to which they are not related.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory()
old_team_values = serializers.TeamSerializer(instance=team).data
new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data
response = APIClient().put(
f"/api/v1.0/teams/{team.id!s}/",
new_team_values,
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 404
assert response.json() == {"detail": "Not found."}
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
assert team_values == old_team_values
def test_api_teams_update_authenticated_members():
"""
Users who are members of a team but not administrators should
not be allowed to update it.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "member")])
old_team_values = serializers.TeamSerializer(instance=team).data
new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data
response = APIClient().put(
f"/api/v1.0/teams/{team.id!s}/",
new_team_values,
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
assert response.json() == {
"detail": "You do not have permission to perform this action."
}
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
assert team_values == old_team_values
def test_api_teams_update_authenticated_administrators():
"""Administrators of a team should be allowed to update it."""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "administrator")])
old_team_values = serializers.TeamSerializer(instance=team).data
new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data
response = APIClient().put(
f"/api/v1.0/teams/{team.id!s}/",
new_team_values,
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 200
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
for key, value in team_values.items():
if key in ["id", "accesses"]:
assert value == old_team_values[key]
else:
assert value == new_team_values[key]
def test_api_teams_update_authenticated_owners():
"""Administrators of a team should be allowed to update it."""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "owner")])
old_team_values = serializers.TeamSerializer(instance=team).data
new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data
response = APIClient().put(
f"/api/v1.0/teams/{team.id!s}/",
new_team_values,
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 200
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
for key, value in team_values.items():
if key in ["id", "accesses"]:
assert value == old_team_values[key]
else:
assert value == new_team_values[key]
def test_api_teams_update_administrator_or_owner_of_another():
"""
Being administrator or owner of a team should not grant authorization to update
another team.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
factories.TeamFactory(users=[(user, random.choice(["administrator", "owner"]))])
team = factories.TeamFactory(name="Old name")
old_team_values = serializers.TeamSerializer(instance=team).data
new_team_values = serializers.TeamSerializer(instance=factories.TeamFactory()).data
response = APIClient().put(
f"/api/v1.0/teams/{team.id!s}/",
new_team_values,
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 404
assert response.json() == {"detail": "Not found."}
team.refresh_from_db()
team_values = serializers.TeamSerializer(instance=team).data
assert team_values == old_team_values

View File

@@ -0,0 +1,843 @@
"""
Test team accesses API endpoints for users in publish's core app.
"""
import random
from uuid import uuid4
import pytest
from rest_framework.test import APIClient
from core import factories, models
from core.api import serializers
from .utils import OIDCToken
pytestmark = pytest.mark.django_db
def test_api_team_accesses_list_anonymous():
"""Anonymous users should not be allowed to list team accesses."""
team = factories.TeamFactory()
factories.TeamAccessFactory.create_batch(2, team=team)
response = APIClient().get(f"/api/v1.0/teams/{team.id!s}/accesses/")
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
def test_api_team_accesses_list_authenticated_unrelated():
"""
Authenticated users should not be allowed to list team accesses for a team
to which they are not related.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory()
factories.TeamAccessFactory.create_batch(3, team=team)
# Accesses for other teams to which the user is related should not be listed either
other_access = factories.TeamAccessFactory(user=user)
factories.TeamAccessFactory(team=other_access.team)
response = APIClient().get(
f"/api/v1.0/teams/{team.id!s}/accesses/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 200
assert response.json() == {
"count": 0,
"next": None,
"previous": None,
"results": [],
}
def test_api_team_accesses_list_authenticated_related():
"""
Authenticated users should be able to list team accesses for a team
to which they are related, whatever their role in the team.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory()
user_access = models.TeamAccess.objects.create(team=team, user=user) # random role
access1, access2 = factories.TeamAccessFactory.create_batch(2, team=team)
# Accesses for other teams to which the user is related should not be listed either
other_access = factories.TeamAccessFactory(user=user)
factories.TeamAccessFactory(team=other_access.team)
response = APIClient().get(
f"/api/v1.0/teams/{team.id!s}/accesses/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 200
content = response.json()
assert len(content["results"]) == 3
assert sorted(content["results"], key=lambda x: x["id"]) == sorted(
[
{
"id": str(user_access.id),
"user": str(user.id),
"role": user_access.role,
"abilities": user_access.get_abilities(user),
},
{
"id": str(access1.id),
"user": str(access1.user.id),
"role": access1.role,
"abilities": access1.get_abilities(user),
},
{
"id": str(access2.id),
"user": str(access2.user.id),
"role": access2.role,
"abilities": access2.get_abilities(user),
},
],
key=lambda x: x["id"],
)
def test_api_team_accesses_retrieve_anonymous():
"""
Anonymous users should not be allowed to retrieve a team access.
"""
access = factories.TeamAccessFactory()
response = APIClient().get(
f"/api/v1.0/teams/{access.team.id!s}/accesses/{access.id!s}/",
)
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
def test_api_team_accesses_retrieve_authenticated_unrelated():
"""
Authenticated users should not be allowed to retrieve a team access for
a team to which they are not related.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory()
access = factories.TeamAccessFactory(team=team)
response = APIClient().get(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
assert response.json() == {
"detail": "You do not have permission to perform this action."
}
# Accesses related to another team should be excluded even if the user is related to it
for access in [
factories.TeamAccessFactory(),
factories.TeamAccessFactory(user=user),
]:
response = APIClient().get(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 404
assert response.json() == {"detail": "Not found."}
def test_api_team_accesses_retrieve_authenticated_related():
"""
A user who is related to a team should be allowed to retrieve the
associated team user accesses.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[user])
access = factories.TeamAccessFactory(team=team)
response = APIClient().get(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 200
assert response.json() == {
"id": str(access.id),
"user": str(access.user.id),
"role": access.role,
"abilities": access.get_abilities(user),
}
def test_api_team_accesses_create_anonymous():
"""Anonymous users should not be allowed to create team accesses."""
user = factories.UserFactory()
team = factories.TeamFactory()
response = APIClient().post(
f"/api/v1.0/teams/{team.id!s}/accesses/",
{
"user": str(user.id),
"team": str(team.id),
"role": random.choice(models.RoleChoices.choices)[0],
},
format="json",
)
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
assert models.TeamAccess.objects.exists() is False
def test_api_team_accesses_create_authenticated_unrelated():
"""
Authenticated users should not be allowed to create team accesses for a team to
which they are not related.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
other_user = factories.UserFactory()
team = factories.TeamFactory()
response = APIClient().post(
f"/api/v1.0/teams/{team.id!s}/accesses/",
{
"user": str(other_user.id),
},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
assert response.json() == {
"detail": "You are not allowed to manage accesses for this team."
}
assert not models.TeamAccess.objects.filter(user=other_user).exists()
def test_api_team_accesses_create_authenticated_member():
"""Members of a team should not be allowed to create team accesses."""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "member")])
other_user = factories.UserFactory()
api_client = APIClient()
for role in [role[0] for role in models.RoleChoices.choices]:
response = api_client.post(
f"/api/v1.0/teams/{team.id!s}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
assert response.json() == {
"detail": "You are not allowed to manage accesses for this team."
}
assert not models.TeamAccess.objects.filter(user=other_user).exists()
def test_api_team_accesses_create_authenticated_administrator():
"""
Administrators of a team should be able to create team accesses except for the "owner" role.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "administrator")])
other_user = factories.UserFactory()
api_client = APIClient()
# It should not be allowed to create an owner access
response = api_client.post(
f"/api/v1.0/teams/{team.id!s}/accesses/",
{
"user": str(other_user.id),
"role": "owner",
},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
assert response.json() == {
"detail": "Only owners of a team can assign other users as owners."
}
# It should be allowed to create a lower access
role = random.choice(
[role[0] for role in models.RoleChoices.choices if role[0] != "owner"]
)
response = api_client.post(
f"/api/v1.0/teams/{team.id!s}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 201
assert models.TeamAccess.objects.filter(user=other_user).count() == 1
new_team_access = models.TeamAccess.objects.filter(user=other_user).get()
assert response.json() == {
"abilities": new_team_access.get_abilities(user),
"id": str(new_team_access.id),
"role": role,
"user": str(other_user.id),
}
def test_api_team_accesses_create_authenticated_owner():
"""
Owners of a team should be able to create team accesses whatever the role.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "owner")])
other_user = factories.UserFactory()
role = random.choice([role[0] for role in models.RoleChoices.choices])
response = APIClient().post(
f"/api/v1.0/teams/{team.id!s}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 201
assert models.TeamAccess.objects.filter(user=other_user).count() == 1
new_team_access = models.TeamAccess.objects.filter(user=other_user).get()
assert response.json() == {
"abilities": new_team_access.get_abilities(user),
"id": str(new_team_access.id),
"role": role,
"user": str(other_user.id),
}
def test_api_team_accesses_update_anonymous():
"""Anonymous users should not be allowed to update a team access."""
access = factories.TeamAccessFactory()
old_values = serializers.TeamAccessSerializer(instance=access).data
new_values = {
"id": uuid4(),
"user": factories.UserFactory().id,
"role": random.choice(models.RoleChoices.choices)[0],
}
api_client = APIClient()
for field, value in new_values.items():
response = api_client.put(
f"/api/v1.0/teams/{access.team.id!s}/accesses/{access.id!s}/",
{**old_values, field: value},
format="json",
)
assert response.status_code == 401
access.refresh_from_db()
updated_values = serializers.TeamAccessSerializer(instance=access).data
assert updated_values == old_values
def test_api_team_accesses_update_authenticated_unrelated():
"""
Authenticated users should not be allowed to update a team access for a team to which
they are not related.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
access = factories.TeamAccessFactory()
old_values = serializers.TeamAccessSerializer(instance=access).data
new_values = {
"id": uuid4(),
"user": factories.UserFactory().id,
"role": random.choice(models.RoleChoices.choices)[0],
}
api_client = APIClient()
for field, value in new_values.items():
response = api_client.put(
f"/api/v1.0/teams/{access.team.id!s}/accesses/{access.id!s}/",
{**old_values, field: value},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
access.refresh_from_db()
updated_values = serializers.TeamAccessSerializer(instance=access).data
assert updated_values == old_values
def test_api_team_accesses_update_authenticated_member():
"""Members of a team should not be allowed to update its accesses."""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "member")])
access = factories.TeamAccessFactory(team=team)
old_values = serializers.TeamAccessSerializer(instance=access).data
new_values = {
"id": uuid4(),
"user": factories.UserFactory().id,
"role": random.choice(models.RoleChoices.choices)[0],
}
api_client = APIClient()
for field, value in new_values.items():
response = api_client.put(
f"/api/v1.0/teams/{access.team.id!s}/accesses/{access.id!s}/",
{**old_values, field: value},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
access.refresh_from_db()
updated_values = serializers.TeamAccessSerializer(instance=access).data
assert updated_values == old_values
def test_api_team_accesses_update_administrator_except_owner():
"""
A user who is an administrator in a team should be allowed to update a user
access for this team, as long as they don't try to set the role to owner.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "administrator")])
access = factories.TeamAccessFactory(
team=team,
role=random.choice(["administrator", "member"]),
)
old_values = serializers.TeamAccessSerializer(instance=access).data
new_values = {
"id": uuid4(),
"user_id": factories.UserFactory().id,
"role": random.choice(["administrator", "member"]),
}
api_client = APIClient()
for field, value in new_values.items():
new_data = {**old_values, field: value}
response = api_client.put(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
data=new_data,
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
if (
new_data["role"] == old_values["role"]
): # we are not really updating the role
assert response.status_code == 403
else:
assert response.status_code == 200
access.refresh_from_db()
updated_values = serializers.TeamAccessSerializer(instance=access).data
if field == "role":
assert updated_values == {**old_values, "role": new_values["role"]}
else:
assert updated_values == old_values
def test_api_team_accesses_update_administrator_from_owner():
"""
A user who is an administrator in a team, should not be allowed to update
the user access of an "owner" for this team.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "administrator")])
other_user = factories.UserFactory()
access = factories.TeamAccessFactory(team=team, user=other_user, role="owner")
old_values = serializers.TeamAccessSerializer(instance=access).data
new_values = {
"id": uuid4(),
"user_id": factories.UserFactory().id,
"role": random.choice(models.RoleChoices.choices)[0],
}
api_client = APIClient()
for field, value in new_values.items():
response = api_client.put(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
data={**old_values, field: value},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
access.refresh_from_db()
updated_values = serializers.TeamAccessSerializer(instance=access).data
assert updated_values == old_values
def test_api_team_accesses_update_administrator_to_owner():
"""
A user who is an administrator in a team, should not be allowed to update
the user access of another user to grant team ownership.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "administrator")])
other_user = factories.UserFactory()
access = factories.TeamAccessFactory(
team=team,
user=other_user,
role=random.choice(["administrator", "member"]),
)
old_values = serializers.TeamAccessSerializer(instance=access).data
new_values = {
"id": uuid4(),
"user_id": factories.UserFactory().id,
"role": "owner",
}
api_client = APIClient()
for field, value in new_values.items():
new_data = {**old_values, field: value}
response = api_client.put(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
data=new_data,
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
# We are not allowed or not really updating the role
if field == "role" or new_data["role"] == old_values["role"]:
assert response.status_code == 403
else:
assert response.status_code == 200
access.refresh_from_db()
updated_values = serializers.TeamAccessSerializer(instance=access).data
assert updated_values == old_values
def test_api_team_accesses_update_owner_except_owner():
"""
A user who is an owner in a team should be allowed to update
a user access for this team except for existing "owner" accesses.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "owner")])
factories.UserFactory()
access = factories.TeamAccessFactory(
team=team,
role=random.choice(["administrator", "member"]),
)
old_values = serializers.TeamAccessSerializer(instance=access).data
new_values = {
"id": uuid4(),
"user_id": factories.UserFactory().id,
"role": random.choice(models.RoleChoices.choices)[0],
}
api_client = APIClient()
for field, value in new_values.items():
new_data = {**old_values, field: value}
response = api_client.put(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
data=new_data,
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
if (
new_data["role"] == old_values["role"]
): # we are not really updating the role
assert response.status_code == 403
else:
assert response.status_code == 200
access.refresh_from_db()
updated_values = serializers.TeamAccessSerializer(instance=access).data
if field == "role":
assert updated_values == {**old_values, "role": new_values["role"]}
else:
assert updated_values == old_values
def test_api_team_accesses_update_owner_for_owners():
"""
A user who is "owner" of a team should not be allowed to update
an existing owner access for this team.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "owner")])
access = factories.TeamAccessFactory(team=team, role="owner")
old_values = serializers.TeamAccessSerializer(instance=access).data
new_values = {
"id": uuid4(),
"user_id": factories.UserFactory().id,
"role": random.choice(models.RoleChoices.choices)[0],
}
api_client = APIClient()
for field, value in new_values.items():
response = api_client.put(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
data={**old_values, field: value},
content_type="application/json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
access.refresh_from_db()
updated_values = serializers.TeamAccessSerializer(instance=access).data
assert updated_values == old_values
def test_api_team_accesses_update_owner_self():
"""
A user who is owner of a team should be allowed to update
their own user access provided there are other owners in the team.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory()
access = factories.TeamAccessFactory(team=team, user=user, role="owner")
old_values = serializers.TeamAccessSerializer(instance=access).data
new_role = random.choice(["administrator", "member"])
api_client = APIClient()
response = api_client.put(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
data={**old_values, "role": new_role},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
access.refresh_from_db()
assert access.role == "owner"
# Add another owner and it should now work
factories.TeamAccessFactory(team=team, role="owner")
response = api_client.put(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
data={**old_values, "role": new_role},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 200
access.refresh_from_db()
assert access.role == new_role
# Delete
def test_api_team_accesses_delete_anonymous():
"""Anonymous users should not be allowed to destroy a team access."""
access = factories.TeamAccessFactory()
response = APIClient().delete(
f"/api/v1.0/teams/{access.team.id!s}/accesses/{access.id!s}/",
)
assert response.status_code == 401
assert models.TeamAccess.objects.count() == 1
def test_api_team_accesses_delete_authenticated():
"""
Authenticated users should not be allowed to delete a team access for a
team to which they are not related.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
access = factories.TeamAccessFactory()
response = APIClient().delete(
f"/api/v1.0/teams/{access.team.id!s}/accesses/{access.id!s}/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
assert models.TeamAccess.objects.count() == 1
def test_api_team_accesses_delete_member():
"""
Authenticated users should not be allowed to delete a team access for a
team in which they are a simple member.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "member")])
access = factories.TeamAccessFactory(team=team)
assert models.TeamAccess.objects.count() == 2
assert models.TeamAccess.objects.filter(user=access.user).exists()
response = APIClient().delete(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
assert models.TeamAccess.objects.count() == 2
def test_api_team_accesses_delete_administrators():
"""
Users who are administrators in a team should be allowed to delete an access
from the team provided it is not ownership.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "administrator")])
access = factories.TeamAccessFactory(
team=team, role=random.choice(["member", "administrator"])
)
assert models.TeamAccess.objects.count() == 2
assert models.TeamAccess.objects.filter(user=access.user).exists()
response = APIClient().delete(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 204
assert models.TeamAccess.objects.count() == 1
def test_api_team_accesses_delete_owners_except_owners():
"""
Users should be able to delete the team access of another user
for a team of which they are owner provided it is not an owner access.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "owner")])
access = factories.TeamAccessFactory(
team=team, role=random.choice(["member", "administrator"])
)
assert models.TeamAccess.objects.count() == 2
assert models.TeamAccess.objects.filter(user=access.user).exists()
response = APIClient().delete(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 204
assert models.TeamAccess.objects.count() == 1
def test_api_team_accesses_delete_owners_for_owners():
"""
Users should not be allowed to delete the team access of another owner
even for a team in which they are direct owner.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory(users=[(user, "owner")])
access = factories.TeamAccessFactory(team=team, role="owner")
assert models.TeamAccess.objects.count() == 2
assert models.TeamAccess.objects.filter(user=access.user).exists()
response = APIClient().delete(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
assert models.TeamAccess.objects.count() == 2
def test_api_team_accesses_delete_owners_last_owner():
"""
It should not be possible to delete the last owner access from a team
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
team = factories.TeamFactory()
access = factories.TeamAccessFactory(team=team, user=user, role="owner")
assert models.TeamAccess.objects.count() == 1
response = APIClient().delete(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
assert models.TeamAccess.objects.count() == 1

View File

@@ -0,0 +1,381 @@
"""
Test users API endpoints in the publish core app.
"""
import pytest
from rest_framework.test import APIClient
from core import factories, models
from core.api import serializers
from .utils import OIDCToken
pytestmark = pytest.mark.django_db
def test_api_users_list_anonymous():
"""Anonymous users should not be allowed to list users."""
factories.UserFactory()
client = APIClient()
response = client.get("/api/v1.0/users/")
assert response.status_code == 404
assert "Not Found" in response.content.decode("utf-8")
def test_api_users_list_authenticated():
"""
Authenticated users should not be able to list users.
"""
identity = factories.IdentityFactory()
jwt_token = OIDCToken.for_user(identity.user)
factories.UserFactory.create_batch(2)
response = APIClient().get(
"/api/v1.0/users/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 404
assert "Not Found" in response.content.decode("utf-8")
def test_api_users_retrieve_me_anonymous():
"""Anonymous users should not be allowed to list users."""
factories.UserFactory.create_batch(2)
client = APIClient()
response = client.get("/api/v1.0/users/me/")
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
def test_api_users_retrieve_me_authenticated():
"""Authenticated users should be able to retrieve their own user via the "/users/me" path."""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
# Define profile contact
contact = factories.ContactFactory(owner=user)
user.profile_contact = contact
user.save()
factories.UserFactory.create_batch(2)
response = APIClient().get(
"/api/v1.0/users/me/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 200
assert response.json() == {
"id": str(user.id),
"language": user.language,
"timezone": str(user.timezone),
"is_device": False,
"is_staff": False,
"data": user.profile_contact.data,
}
def test_api_users_retrieve_anonymous():
"""Anonymous users should not be allowed to retrieve a user."""
client = APIClient()
user = factories.UserFactory()
response = client.get(f"/api/v1.0/users/{user.id!s}/")
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
def test_api_users_retrieve_authenticated_self():
"""
Authenticated users should be allowed to retrieve their own user.
The returned object should not contain the password.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
response = APIClient().get(
f"/api/v1.0/users/{user.id!s}/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 405
assert response.json() == {"detail": 'Method "GET" not allowed.'}
def test_api_users_retrieve_authenticated_other():
"""
Authenticated users should be able to retrieve another user's detail view with
limited information.
"""
identity = factories.IdentityFactory()
jwt_token = OIDCToken.for_user(identity.user)
other_user = factories.UserFactory()
response = APIClient().get(
f"/api/v1.0/users/{other_user.id!s}/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 405
assert response.json() == {"detail": 'Method "GET" not allowed.'}
def test_api_users_create_anonymous():
"""Anonymous users should not be able to create users via the API."""
response = APIClient().post(
"/api/v1.0/users/",
{
"language": "fr-fr",
"password": "mypassword",
},
)
assert response.status_code == 404
assert "Not Found" in response.content.decode("utf-8")
assert models.User.objects.exists() is False
def test_api_users_create_authenticated():
"""Authenticated users should not be able to create users via the API."""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
response = APIClient().post(
"/api/v1.0/users/",
{
"language": "fr-fr",
"password": "mypassword",
},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 404
assert "Not Found" in response.content.decode("utf-8")
assert models.User.objects.exclude(id=user.id).exists() is False
def test_api_users_update_anonymous():
"""Anonymous users should not be able to update users via the API."""
user = factories.UserFactory()
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = serializers.UserSerializer(instance=factories.UserFactory()).data
response = APIClient().put(
f"/api/v1.0/users/{user.id!s}/",
new_user_values,
format="json",
)
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
assert value == old_user_values[key]
def test_api_users_update_authenticated_self():
"""
Authenticated users should be able to update their own user but only "language"
and "timezone" fields.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = dict(
serializers.UserSerializer(instance=factories.UserFactory()).data
)
response = APIClient().put(
f"/api/v1.0/users/{user.id!s}/",
new_user_values,
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 200
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
if key in ["language", "timezone"]:
assert value == new_user_values[key]
else:
assert value == old_user_values[key]
def test_api_users_update_authenticated_other():
"""Authenticated users should not be allowed to update other users."""
identity = factories.IdentityFactory()
jwt_token = OIDCToken.for_user(identity.user)
user = factories.UserFactory()
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = serializers.UserSerializer(instance=factories.UserFactory()).data
response = APIClient().put(
f"/api/v1.0/users/{user.id!s}/",
new_user_values,
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
assert value == old_user_values[key]
def test_api_users_patch_anonymous():
"""Anonymous users should not be able to patch users via the API."""
user = factories.UserFactory()
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = dict(
serializers.UserSerializer(instance=factories.UserFactory()).data
)
for key, new_value in new_user_values.items():
response = APIClient().patch(
f"/api/v1.0/users/{user.id!s}/",
{key: new_value},
format="json",
)
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
assert value == old_user_values[key]
def test_api_users_patch_authenticated_self():
"""
Authenticated users should be able to patch their own user but only "language"
and "timezone" fields.
"""
identity = factories.IdentityFactory()
user = identity.user
jwt_token = OIDCToken.for_user(user)
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = dict(
serializers.UserSerializer(instance=factories.UserFactory()).data
)
for key, new_value in new_user_values.items():
response = APIClient().patch(
f"/api/v1.0/users/{user.id!s}/",
{key: new_value},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 200
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
if key in ["language", "timezone"]:
assert value == new_user_values[key]
else:
assert value == old_user_values[key]
def test_api_users_patch_authenticated_other():
"""Authenticated users should not be allowed to patch other users."""
identity = factories.IdentityFactory()
jwt_token = OIDCToken.for_user(identity.user)
user = factories.UserFactory()
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = dict(
serializers.UserSerializer(instance=factories.UserFactory()).data
)
for key, new_value in new_user_values.items():
response = APIClient().put(
f"/api/v1.0/users/{user.id!s}/",
{key: new_value},
format="json",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 403
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
assert value == old_user_values[key]
def test_api_users_delete_list_anonymous():
"""Anonymous users should not be allowed to delete a list of users."""
factories.UserFactory.create_batch(2)
client = APIClient()
response = client.delete("/api/v1.0/users/")
assert response.status_code == 404
assert models.User.objects.count() == 2
def test_api_users_delete_list_authenticated():
"""Authenticated users should not be allowed to delete a list of users."""
factories.UserFactory.create_batch(2)
identity = factories.IdentityFactory()
jwt_token = OIDCToken.for_user(identity.user)
client = APIClient()
response = client.delete(
"/api/v1.0/users/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 404
assert models.User.objects.count() == 3
def test_api_users_delete_anonymous():
"""Anonymous users should not be allowed to delete a user."""
user = factories.UserFactory()
response = APIClient().delete(f"/api/v1.0/users/{user.id!s}/")
assert response.status_code == 401
assert models.User.objects.count() == 1
def test_api_users_delete_authenticated():
"""
Authenticated users should not be allowed to delete a user other than themselves.
"""
identity = factories.IdentityFactory()
jwt_token = OIDCToken.for_user(identity.user)
other_user = factories.UserFactory()
response = APIClient().delete(
f"/api/v1.0/users/{other_user.id!s}/", HTTP_AUTHORIZATION=f"Bearer {jwt_token}"
)
assert response.status_code == 405
assert models.User.objects.count() == 2
def test_api_users_delete_self():
"""Authenticated users should not be able to delete their own user."""
identity = factories.IdentityFactory()
jwt_token = OIDCToken.for_user(identity.user)
response = APIClient().delete(
f"/api/v1.0/users/{identity.user.id!s}/",
HTTP_AUTHORIZATION=f"Bearer {jwt_token}",
)
assert response.status_code == 405
assert models.User.objects.count() == 1

View File

@@ -0,0 +1,183 @@
"""
Unit tests for the Identity model
"""
from django.core.exceptions import ValidationError
import pytest
from core import factories, models
pytestmark = pytest.mark.django_db
def test_models_identities_str_main():
"""The str representation should be the email address with indication that it is main."""
identity = factories.IdentityFactory(email="david@example.com")
assert str(identity) == "david@example.com[main]"
def test_models_identities_str_secondary():
"""The str representation of a secondary email should be the email address."""
main_identity = factories.IdentityFactory()
secondary_identity = factories.IdentityFactory(
user=main_identity.user, email="david@example.com"
)
assert str(secondary_identity) == "david@example.com"
def test_models_identities_is_main_automatic():
"""The first identity created for a user should automatically be set as main."""
user = factories.UserFactory()
identity = models.Identity.objects.create(
user=user, sub="123", email="david@example.com"
)
assert identity.is_main is True
def test_models_identities_is_main_exists():
"""A user should always keep one and only one of its identities as main."""
user = factories.UserFactory()
main_identity, _secondary_identity = factories.IdentityFactory.create_batch(
2, user=user
)
assert main_identity.is_main is True
main_identity.is_main = False
with pytest.raises(
ValidationError, match="A user should have one and only one main identity."
):
main_identity.save()
def test_models_identities_is_main_switch():
"""Setting a secondary identity as main should reset the existing main identity."""
user = factories.UserFactory()
first_identity, second_identity = factories.IdentityFactory.create_batch(
2, user=user
)
assert first_identity.is_main is True
second_identity.is_main = True
second_identity.save()
second_identity.refresh_from_db()
assert second_identity.is_main is True
first_identity.refresh_from_db()
assert first_identity.is_main is False
def test_models_identities_email_required():
"""The "email" field is required."""
user = factories.UserFactory()
with pytest.raises(ValidationError, match="This field cannot be null."):
models.Identity.objects.create(user=user, email=None)
def test_models_identities_user_required():
"""The "user" field is required."""
with pytest.raises(models.User.DoesNotExist, match="Identity has no user."):
models.Identity.objects.create(user=None, email="david@example.com")
def test_models_identities_email_unique_same_user():
"""The "email" field should be unique for a given user."""
email = factories.IdentityFactory()
with pytest.raises(
ValidationError,
match="Identity with this User and Email address already exists.",
):
factories.IdentityFactory(user=email.user, email=email.email)
def test_models_identities_email_unique_different_users():
"""The "email" field should not be unique among users."""
email = factories.IdentityFactory()
factories.IdentityFactory(email=email.email)
def test_models_identities_email_normalization():
"""The email field should be automatically normalized upon saving."""
email = factories.IdentityFactory()
email.email = "Thomas.Jefferson@Example.com"
email.save()
assert email.email == "Thomas.Jefferson@example.com"
def test_models_identities_ordering():
"""Identitys should be returned ordered by main status then by their email address."""
user = factories.UserFactory()
factories.IdentityFactory.create_batch(5, user=user)
emails = models.Identity.objects.all()
assert emails[0].is_main is True
for i in range(3):
assert emails[i + 1].is_main is False
assert emails[i + 2].email >= emails[i + 1].email
def test_models_identities_sub_null():
"""The "sub" field should not be null."""
user = factories.UserFactory()
with pytest.raises(ValidationError, match="This field cannot be null."):
models.Identity.objects.create(user=user, sub=None)
def test_models_identities_sub_blank():
"""The "sub" field should not be blank."""
user = factories.UserFactory()
with pytest.raises(ValidationError, match="This field cannot be blank."):
models.Identity.objects.create(user=user, email="david@example.com", sub="")
def test_models_identities_sub_unique():
"""The "sub" field should be unique."""
user = factories.UserFactory()
identity = factories.IdentityFactory()
with pytest.raises(ValidationError, match="Identity with this Sub already exists."):
models.Identity.objects.create(user=user, sub=identity.sub)
def test_models_identities_sub_max_length():
"""The sub field should be 255 characters maximum."""
factories.IdentityFactory(sub="a" * 255)
with pytest.raises(ValidationError) as excinfo:
factories.IdentityFactory(sub="a" * 256)
assert (
str(excinfo.value)
== "{'sub': ['Ensure this value has at most 255 characters (it has 256).']}"
)
def test_models_identities_sub_special_characters():
"""The sub field should accept periods, dashes, +, @ and underscores."""
identity = factories.IdentityFactory(sub="dave.bowman-1+2@hal_9000")
assert identity.sub == "dave.bowman-1+2@hal_9000"
def test_models_identities_sub_spaces():
"""The sub field should not accept spaces."""
with pytest.raises(ValidationError) as excinfo:
factories.IdentityFactory(sub="a b")
assert str(excinfo.value) == (
"{'sub': ['Enter a valid sub. This value may contain only letters, numbers, "
"and @/./+/-/_ characters.']}"
)
def test_models_identities_sub_upper_case():
"""The sub field should accept upper case characters."""
identity = factories.IdentityFactory(sub="John")
assert identity.sub == "John"
def test_models_identities_sub_ascii():
"""The sub field should accept non ASCII letters."""
identity = factories.IdentityFactory(sub="rené")
assert identity.sub == "rené"

View File

@@ -0,0 +1,264 @@
"""
Unit tests for the TeamAccess model
"""
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import ValidationError
import pytest
from core import factories
pytestmark = pytest.mark.django_db
def test_models_team_accesses_str():
"""
The str representation should include user name, team full name and role.
"""
contact = factories.ContactFactory(full_name="David Bowman")
user = contact.owner
user.profile_contact = contact
user.save()
access = factories.TeamAccessFactory(
role="member",
user=user,
team__name="admins",
)
assert str(access) == "David Bowman is member in team admins"
def test_models_team_accesses_unique():
"""Team accesses should be unique for a given couple of user and team."""
access = factories.TeamAccessFactory()
with pytest.raises(
ValidationError,
match="Team/user relation with this User and Team already exists.",
):
factories.TeamAccessFactory(user=access.user, team=access.team)
# get_abilities
def test_models_team_access_get_abilities_anonymous():
"""Check abilities returned for an anonymous user."""
access = factories.TeamAccessFactory()
abilities = access.get_abilities(AnonymousUser())
assert abilities == {
"delete": False,
"get": False,
"patch": False,
"put": False,
"set_role_to": [],
}
def test_models_team_access_get_abilities_authenticated():
"""Check abilities returned for an authenticated user."""
access = factories.TeamAccessFactory()
user = factories.UserFactory()
abilities = access.get_abilities(user)
assert abilities == {
"delete": False,
"get": False,
"patch": False,
"put": False,
"set_role_to": [],
}
# - for owner
def test_models_team_access_get_abilities_for_owner_of_self_allowed():
"""
Check abilities of self access for the owner of a team when there is more than one user left.
"""
access = factories.TeamAccessFactory(role="owner")
factories.TeamAccessFactory(team=access.team, role="owner")
abilities = access.get_abilities(access.user)
assert abilities == {
"delete": True,
"get": True,
"patch": True,
"put": True,
"set_role_to": ["administrator", "member"],
}
def test_models_team_access_get_abilities_for_owner_of_self_last():
"""Check abilities of self access for the owner of a team when there is only one owner left."""
access = factories.TeamAccessFactory(role="owner")
abilities = access.get_abilities(access.user)
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
"set_role_to": [],
}
def test_models_team_access_get_abilities_for_owner_of_owner():
"""Check abilities of owner access for the owner of a team."""
access = factories.TeamAccessFactory(role="owner")
factories.TeamAccessFactory(team=access.team) # another one
user = factories.TeamAccessFactory(team=access.team, role="owner").user
abilities = access.get_abilities(user)
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
"set_role_to": [],
}
def test_models_team_access_get_abilities_for_owner_of_administrator():
"""Check abilities of administrator access for the owner of a team."""
access = factories.TeamAccessFactory(role="administrator")
factories.TeamAccessFactory(team=access.team) # another one
user = factories.TeamAccessFactory(team=access.team, role="owner").user
abilities = access.get_abilities(user)
assert abilities == {
"delete": True,
"get": True,
"patch": True,
"put": True,
"set_role_to": ["owner", "member"],
}
def test_models_team_access_get_abilities_for_owner_of_member():
"""Check abilities of member access for the owner of a team."""
access = factories.TeamAccessFactory(role="member")
factories.TeamAccessFactory(team=access.team) # another one
user = factories.TeamAccessFactory(team=access.team, role="owner").user
abilities = access.get_abilities(user)
assert abilities == {
"delete": True,
"get": True,
"patch": True,
"put": True,
"set_role_to": ["owner", "administrator"],
}
# - for administrator
def test_models_team_access_get_abilities_for_administrator_of_owner():
"""Check abilities of owner access for the administrator of a team."""
access = factories.TeamAccessFactory(role="owner")
factories.TeamAccessFactory(team=access.team) # another one
user = factories.TeamAccessFactory(team=access.team, role="administrator").user
abilities = access.get_abilities(user)
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
"set_role_to": [],
}
def test_models_team_access_get_abilities_for_administrator_of_administrator():
"""Check abilities of administrator access for the administrator of a team."""
access = factories.TeamAccessFactory(role="administrator")
factories.TeamAccessFactory(team=access.team) # another one
user = factories.TeamAccessFactory(team=access.team, role="administrator").user
abilities = access.get_abilities(user)
assert abilities == {
"delete": True,
"get": True,
"patch": True,
"put": True,
"set_role_to": ["member"],
}
def test_models_team_access_get_abilities_for_administrator_of_member():
"""Check abilities of member access for the administrator of a team."""
access = factories.TeamAccessFactory(role="member")
factories.TeamAccessFactory(team=access.team) # another one
user = factories.TeamAccessFactory(team=access.team, role="administrator").user
abilities = access.get_abilities(user)
assert abilities == {
"delete": True,
"get": True,
"patch": True,
"put": True,
"set_role_to": ["administrator"],
}
# - for member
def test_models_team_access_get_abilities_for_member_of_owner():
"""Check abilities of owner access for the member of a team."""
access = factories.TeamAccessFactory(role="owner")
factories.TeamAccessFactory(team=access.team) # another one
user = factories.TeamAccessFactory(team=access.team, role="member").user
abilities = access.get_abilities(user)
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
"set_role_to": [],
}
def test_models_team_access_get_abilities_for_member_of_administrator():
"""Check abilities of administrator access for the member of a team."""
access = factories.TeamAccessFactory(role="administrator")
factories.TeamAccessFactory(team=access.team) # another one
user = factories.TeamAccessFactory(team=access.team, role="member").user
abilities = access.get_abilities(user)
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
"set_role_to": [],
}
def test_models_team_access_get_abilities_for_member_of_member_user(
django_assert_num_queries
):
"""Check abilities of member access for the member of a team."""
access = factories.TeamAccessFactory(role="member")
factories.TeamAccessFactory(team=access.team) # another one
user = factories.TeamAccessFactory(team=access.team, role="member").user
with django_assert_num_queries(1):
abilities = access.get_abilities(user)
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
"set_role_to": [],
}
def test_models_team_access_get_abilities_preset_role(django_assert_num_queries):
"""No query is done if the role is preset, e.g., with a query annotation."""
access = factories.TeamAccessFactory(role="member")
user = factories.TeamAccessFactory(team=access.team, role="member").user
access.user_role = "member"
with django_assert_num_queries(0):
abilities = access.get_abilities(user)
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
"set_role_to": [],
}

View File

@@ -0,0 +1,135 @@
"""
Unit tests for the Team model
"""
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import ValidationError
import pytest
from core import factories, models
pytestmark = pytest.mark.django_db
def test_models_teams_str():
"""The str representation should be the name of the team."""
team = factories.TeamFactory(name="admins")
assert str(team) == "admins"
def test_models_teams_id_unique():
"""The "id" field should be unique."""
team = factories.TeamFactory()
with pytest.raises(ValidationError, match="Team with this Id already exists."):
factories.TeamFactory(id=team.id)
def test_models_teams_name_null():
"""The "name" field should not be null."""
with pytest.raises(ValidationError, match="This field cannot be null."):
models.Team.objects.create(name=None)
def test_models_teams_name_empty():
"""The "name" field should not be empty."""
with pytest.raises(ValidationError, match="This field cannot be blank."):
models.Team.objects.create(name="")
def test_models_teams_name_max_length():
"""The "name" field should be 100 characters maximum."""
factories.TeamFactory(name="a " * 50)
with pytest.raises(
ValidationError,
match=r"Ensure this value has at most 100 characters \(it has 102\)\.",
):
factories.TeamFactory(name="a " * 51)
# get_abilities
def test_models_teams_get_abilities_anonymous():
"""Check abilities returned for an anonymous user."""
team = factories.TeamFactory()
abilities = team.get_abilities(AnonymousUser())
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
"manage_accesses": False,
}
def test_models_teams_get_abilities_authenticated():
"""Check abilities returned for an authenticated user."""
team = factories.TeamFactory()
abilities = team.get_abilities(factories.UserFactory())
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
"manage_accesses": False,
}
def test_models_teams_get_abilities_owner():
"""Check abilities returned for the owner of a team."""
user = factories.UserFactory()
access = factories.TeamAccessFactory(role="owner", user=user)
abilities = access.team.get_abilities(access.user)
assert abilities == {
"delete": True,
"get": True,
"patch": True,
"put": True,
"manage_accesses": True,
}
def test_models_teams_get_abilities_administrator():
"""Check abilities returned for the administrator of a team."""
access = factories.TeamAccessFactory(role="administrator")
abilities = access.team.get_abilities(access.user)
assert abilities == {
"delete": False,
"get": True,
"patch": True,
"put": True,
"manage_accesses": True,
}
def test_models_teams_get_abilities_member_user(django_assert_num_queries):
"""Check abilities returned for the member of a team."""
access = factories.TeamAccessFactory(role="member")
with django_assert_num_queries(1):
abilities = access.team.get_abilities(access.user)
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
"manage_accesses": False,
}
def test_models_teams_get_abilities_preset_role(django_assert_num_queries):
"""No query is done if the role is preset e.g. with query annotation."""
access = factories.TeamAccessFactory(role="member")
access.team.user_role = "member"
with django_assert_num_queries(0):
abilities = access.team.get_abilities(access.user)
assert abilities == {
"delete": False,
"get": True,
"patch": False,
"put": False,
"manage_accesses": False,
}

View File

@@ -0,0 +1,83 @@
"""
Unit tests for the User model
"""
from unittest import mock
from django.core.exceptions import ValidationError
import pytest
from core import factories, models
pytestmark = pytest.mark.django_db
def test_models_users_str():
"""The str representation should be the full name."""
user = factories.UserFactory()
contact = factories.ContactFactory(full_name="david bowman", owner=user)
user.profile_contact = contact
user.save()
assert str(user) == "david bowman"
def test_models_users_id_unique():
"""The "id" field should be unique."""
user = factories.UserFactory()
with pytest.raises(ValidationError, match="User with this Id already exists."):
factories.UserFactory(id=user.id)
def test_models_users_profile_not_owned():
"""A user cannot declare as profile a contact that not is owned."""
user = factories.UserFactory()
contact = factories.ContactFactory(base=None, owner=None)
user.profile_contact = contact
with pytest.raises(ValidationError) as excinfo:
user.save()
assert (
str(excinfo.value)
== "{'__all__': ['Users can only declare as profile a contact they own.']}"
)
def test_models_users_profile_owned_by_other():
"""A user cannot declare as profile a contact that is owned by another user."""
user = factories.UserFactory()
contact = factories.ContactFactory()
user.profile_contact = contact
with pytest.raises(ValidationError) as excinfo:
user.save()
assert (
str(excinfo.value)
== "{'__all__': ['Users can only declare as profile a contact they own.']}"
)
def test_models_users_send_mail_main_existing():
"""The "email_user' method should send mail to the user's main email address."""
main_email = factories.IdentityFactory(email="dave@example.com")
user = main_email.user
factories.IdentityFactory.create_batch(2, user=user)
with mock.patch("django.core.mail.send_mail") as mock_send:
user.email_user("my subject", "my message")
mock_send.assert_called_once_with(
"my subject", "my message", None, ["dave@example.com"]
)
def test_models_users_send_mail_main_missing():
"""The "email_user' method should fail if the user has no email address."""
user = factories.UserFactory()
with pytest.raises(models.Identity.DoesNotExist) as excinfo:
user.email_user("my subject", "my message")
assert str(excinfo.value) == "Identity matching query does not exist."

View File

@@ -0,0 +1,21 @@
"""Utils for tests in the publish core application"""
from rest_framework_simplejwt.tokens import AccessToken
class OIDCToken(AccessToken):
"""Set payload on token from user/contact/email"""
@classmethod
def for_user(cls, user):
token = super().for_user(user)
identity = user.identities.filter(is_main=True).first()
token["first_name"] = (
user.profile_contact.short_name if user.profile_contact else "David"
)
token["last_name"] = (
" ".join(user.profile_contact.full_name.split()[1:])
if user.profile_contact
else "Bowman"
)
token["email"] = identity.email
return token

8
src/backend/core/urls.py Normal file
View File

@@ -0,0 +1,8 @@
"""URL configuration for the core app."""
from django.urls import path
from core.views import generate_document
urlpatterns = [
path('generate-document/', generate_document, name='generate_document'),
]

27
src/backend/core/views.py Normal file
View File

@@ -0,0 +1,27 @@
from django.shortcuts import render, HttpResponse
from .forms import DocumentGenerationForm
from .models import Template
def generate_document(request):
if request.method == 'POST':
form = DocumentGenerationForm(request.POST)
if form.is_valid():
# Get the selected template from the form
template = form.cleaned_data['template']
# Get the body content from the form
body = form.cleaned_data['body']
# Call the generate_document method
pdf_content = template.generate_document(body)
# Return the generated PDF as a response for download
response = HttpResponse(pdf_content, content_type='application/pdf')
response['Content-Disposition'] = f'attachment; filename={template.title}.pdf'
return response
else:
form = DocumentGenerationForm()
return render(request, 'core/generate_document.html', {'form': form})