✨(project) Django boilerplate
This commit introduces a boilerplate inspired by https://github.com/numerique-gouv/impress. The code has been cleaned to remove unnecessary Impress logic and dependencies. Changes made: - Removed Minio, WebRTC, and create bucket from the stack. - Removed the Next.js frontend (it will be replaced by Vite). - Cleaned up impress-specific backend logics. The whole stack remains functional: - All tests pass. - Linter checks pass. - Agent Connexion sources are already set-up. Why clear out the code? To adhere to the KISS principle, we aim to maintain a minimalist codebase. Cloning Impress allowed us to quickly inherit its code quality tools and deployment configurations for staging, pre-production, and production environments. What’s broken? - The tsclient is not functional anymore. - Some make commands need to be fixed. - Helm sources are outdated. - Naming across the project sources are inconsistent (impress, visio, etc.) - CI is not configured properly. This list might be incomplete. Let's grind it.
This commit is contained in:
committed by
lebaudantoine
parent
2d81979b0a
commit
5b1a2b20de
0
src/backend/core/__init__.py
Normal file
0
src/backend/core/__init__.py
Normal file
64
src/backend/core/admin.py
Normal file
64
src/backend/core/admin.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""Admin classes and registrations for core app."""
|
||||
from django.contrib import admin
|
||||
from django.contrib.auth import admin as auth_admin
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from . import models
|
||||
|
||||
|
||||
@admin.register(models.User)
|
||||
class UserAdmin(auth_admin.UserAdmin):
|
||||
"""Admin class for the User model"""
|
||||
|
||||
fieldsets = (
|
||||
(
|
||||
None,
|
||||
{
|
||||
"fields": (
|
||||
"id",
|
||||
"admin_email",
|
||||
"password",
|
||||
)
|
||||
},
|
||||
),
|
||||
(_("Personal info"), {"fields": ("sub", "email", "language", "timezone")}),
|
||||
(
|
||||
_("Permissions"),
|
||||
{
|
||||
"fields": (
|
||||
"is_active",
|
||||
"is_device",
|
||||
"is_staff",
|
||||
"is_superuser",
|
||||
"groups",
|
||||
"user_permissions",
|
||||
),
|
||||
},
|
||||
),
|
||||
(_("Important dates"), {"fields": ("created_at", "updated_at")}),
|
||||
)
|
||||
add_fieldsets = (
|
||||
(
|
||||
None,
|
||||
{
|
||||
"classes": ("wide",),
|
||||
"fields": ("email", "password1", "password2"),
|
||||
},
|
||||
),
|
||||
)
|
||||
list_display = (
|
||||
"id",
|
||||
"sub",
|
||||
"admin_email",
|
||||
"email",
|
||||
"is_active",
|
||||
"is_staff",
|
||||
"is_superuser",
|
||||
"is_device",
|
||||
"created_at",
|
||||
"updated_at",
|
||||
)
|
||||
list_filter = ("is_staff", "is_superuser", "is_device", "is_active")
|
||||
ordering = ("is_active", "-is_superuser", "-is_staff", "-is_device", "-updated_at")
|
||||
readonly_fields = ("id", "sub", "email", "created_at", "updated_at")
|
||||
search_fields = ("id", "sub", "admin_email", "email")
|
||||
39
src/backend/core/api/__init__.py
Normal file
39
src/backend/core/api/__init__.py
Normal file
@@ -0,0 +1,39 @@
|
||||
"""Impress core API endpoints"""
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
from rest_framework import exceptions as drf_exceptions
|
||||
from rest_framework import views as drf_views
|
||||
from rest_framework.decorators import api_view
|
||||
from rest_framework.response import Response
|
||||
|
||||
|
||||
def exception_handler(exc, context):
|
||||
"""Handle Django ValidationError as an accepted exception.
|
||||
|
||||
For the parameters, see ``exception_handler``
|
||||
This code comes from twidi's gist:
|
||||
https://gist.github.com/twidi/9d55486c36b6a51bdcb05ce3a763e79f
|
||||
"""
|
||||
if isinstance(exc, ValidationError):
|
||||
if hasattr(exc, "message_dict"):
|
||||
detail = exc.message_dict
|
||||
elif hasattr(exc, "message"):
|
||||
detail = exc.message
|
||||
elif hasattr(exc, "messages"):
|
||||
detail = exc.messages
|
||||
|
||||
exc = drf_exceptions.ValidationError(detail=detail)
|
||||
|
||||
return drf_views.exception_handler(exc, context)
|
||||
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
@api_view(["GET"])
|
||||
def get_frontend_configuration(request):
|
||||
"""Returns the frontend configuration dict as configured in settings."""
|
||||
frontend_configuration = {
|
||||
"LANGUAGE_CODE": settings.LANGUAGE_CODE,
|
||||
}
|
||||
frontend_configuration.update(settings.FRONTEND_CONFIGURATION)
|
||||
return Response(frontend_configuration)
|
||||
36
src/backend/core/api/permissions.py
Normal file
36
src/backend/core/api/permissions.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""Permission handlers for the impress core app."""
|
||||
from rest_framework import permissions
|
||||
|
||||
ACTION_FOR_METHOD_TO_PERMISSION = {
|
||||
"versions_detail": {"DELETE": "versions_destroy", "GET": "versions_retrieve"}
|
||||
}
|
||||
|
||||
|
||||
class IsAuthenticated(permissions.BasePermission):
|
||||
"""
|
||||
Allows access only to authenticated users. Alternative method checking the presence
|
||||
of the auth token to avoid hitting the database.
|
||||
"""
|
||||
|
||||
def has_permission(self, request, view):
|
||||
return bool(request.auth) or request.user.is_authenticated
|
||||
|
||||
|
||||
class IsAuthenticatedOrSafe(IsAuthenticated):
|
||||
"""Allows access to authenticated users (or anonymous users but only on safe methods)."""
|
||||
|
||||
def has_permission(self, request, view):
|
||||
if request.method in permissions.SAFE_METHODS:
|
||||
return True
|
||||
return super().has_permission(request, view)
|
||||
|
||||
|
||||
class IsSelf(IsAuthenticated):
|
||||
"""
|
||||
Allows access only to authenticated users. Alternative method checking the presence
|
||||
of the auth token to avoid hitting the database.
|
||||
"""
|
||||
|
||||
def has_object_permission(self, request, view, obj):
|
||||
"""Write permissions are only allowed to the user itself."""
|
||||
return obj == request.user
|
||||
13
src/backend/core/api/serializers.py
Normal file
13
src/backend/core/api/serializers.py
Normal file
@@ -0,0 +1,13 @@
|
||||
"""Client serializers for the impress core app."""
|
||||
from rest_framework import serializers
|
||||
|
||||
from core import models
|
||||
|
||||
|
||||
class UserSerializer(serializers.ModelSerializer):
|
||||
"""Serialize users."""
|
||||
|
||||
class Meta:
|
||||
model = models.User
|
||||
fields = ["id", "email"]
|
||||
read_only_fields = ["id", "email"]
|
||||
142
src/backend/core/api/viewsets.py
Normal file
142
src/backend/core/api/viewsets.py
Normal file
@@ -0,0 +1,142 @@
|
||||
"""API endpoints"""
|
||||
from rest_framework import (
|
||||
decorators,
|
||||
mixins,
|
||||
pagination,
|
||||
viewsets,
|
||||
)
|
||||
from rest_framework import (
|
||||
response as drf_response,
|
||||
)
|
||||
|
||||
from core import models
|
||||
|
||||
from . import permissions, serializers
|
||||
|
||||
# pylint: disable=too-many-ancestors
|
||||
|
||||
|
||||
class NestedGenericViewSet(viewsets.GenericViewSet):
|
||||
"""
|
||||
A generic Viewset aims to be used in a nested route context.
|
||||
e.g: `/api/v1.0/resource_1/<resource_1_pk>/resource_2/<resource_2_pk>/`
|
||||
|
||||
It allows to define all url kwargs and lookup fields to perform the lookup.
|
||||
"""
|
||||
|
||||
lookup_fields: list[str] = ["pk"]
|
||||
lookup_url_kwargs: list[str] = []
|
||||
|
||||
def __getattribute__(self, item):
|
||||
"""
|
||||
This method is overridden to allow to get the last lookup field or lookup url kwarg
|
||||
when accessing the `lookup_field` or `lookup_url_kwarg` attribute. This is useful
|
||||
to keep compatibility with all methods used by the parent class `GenericViewSet`.
|
||||
"""
|
||||
if item in ["lookup_field", "lookup_url_kwarg"]:
|
||||
return getattr(self, item + "s", [None])[-1]
|
||||
|
||||
return super().__getattribute__(item)
|
||||
|
||||
def get_queryset(self):
|
||||
"""
|
||||
Get the list of items for this view.
|
||||
|
||||
`lookup_fields` attribute is enumerated here to perform the nested lookup.
|
||||
"""
|
||||
queryset = super().get_queryset()
|
||||
|
||||
# The last lookup field is removed to perform the nested lookup as it corresponds
|
||||
# to the object pk, it is used within get_object method.
|
||||
lookup_url_kwargs = (
|
||||
self.lookup_url_kwargs[:-1]
|
||||
if self.lookup_url_kwargs
|
||||
else self.lookup_fields[:-1]
|
||||
)
|
||||
|
||||
filter_kwargs = {}
|
||||
for index, lookup_url_kwarg in enumerate(lookup_url_kwargs):
|
||||
if lookup_url_kwarg not in self.kwargs:
|
||||
raise KeyError(
|
||||
f"Expected view {self.__class__.__name__} to be called with a URL "
|
||||
f'keyword argument named "{lookup_url_kwarg}". Fix your URL conf, or '
|
||||
"set the `.lookup_fields` attribute on the view correctly."
|
||||
)
|
||||
|
||||
filter_kwargs.update(
|
||||
{self.lookup_fields[index]: self.kwargs[lookup_url_kwarg]}
|
||||
)
|
||||
|
||||
return queryset.filter(**filter_kwargs)
|
||||
|
||||
|
||||
class SerializerPerActionMixin:
|
||||
"""
|
||||
A mixin to allow to define serializer classes for each action.
|
||||
|
||||
This mixin is useful to avoid to define a serializer class for each action in the
|
||||
`get_serializer_class` method.
|
||||
"""
|
||||
|
||||
serializer_classes: dict[str, type] = {}
|
||||
default_serializer_class: type = None
|
||||
|
||||
def get_serializer_class(self):
|
||||
"""
|
||||
Return the serializer class to use depending on the action.
|
||||
"""
|
||||
return self.serializer_classes.get(self.action, self.default_serializer_class)
|
||||
|
||||
|
||||
class Pagination(pagination.PageNumberPagination):
|
||||
"""Pagination to display no more than 100 objects per page sorted by creation date."""
|
||||
|
||||
ordering = "-created_on"
|
||||
max_page_size = 100
|
||||
page_size_query_param = "page_size"
|
||||
|
||||
|
||||
class UserViewSet(
|
||||
mixins.UpdateModelMixin, viewsets.GenericViewSet, mixins.ListModelMixin
|
||||
):
|
||||
"""User ViewSet"""
|
||||
|
||||
permission_classes = [permissions.IsSelf]
|
||||
queryset = models.User.objects.all()
|
||||
serializer_class = serializers.UserSerializer
|
||||
|
||||
def get_queryset(self):
|
||||
"""
|
||||
Limit listed users by querying the email field with a trigram similarity
|
||||
search if a query is provided.
|
||||
Limit listed users by excluding users already in the document if a document_id
|
||||
is provided.
|
||||
"""
|
||||
queryset = self.queryset
|
||||
|
||||
if self.action == "list":
|
||||
# Exclude all users already in the given document
|
||||
if document_id := self.request.GET.get("document_id", ""):
|
||||
queryset = queryset.exclude(documentaccess__document_id=document_id)
|
||||
|
||||
# Filter users by email similarity
|
||||
if query := self.request.GET.get("q", ""):
|
||||
queryset = queryset.filter(email__trigram_word_similar=query)
|
||||
|
||||
return queryset
|
||||
|
||||
@decorators.action(
|
||||
detail=False,
|
||||
methods=["get"],
|
||||
url_name="me",
|
||||
url_path="me",
|
||||
permission_classes=[permissions.IsAuthenticated],
|
||||
)
|
||||
def get_me(self, request):
|
||||
"""
|
||||
Return information on currently logged user
|
||||
"""
|
||||
context = {"request": request}
|
||||
return drf_response.Response(
|
||||
self.serializer_class(request.user, context=context).data
|
||||
)
|
||||
11
src/backend/core/apps.py
Normal file
11
src/backend/core/apps.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""Impress Core application"""
|
||||
# from django.apps import AppConfig
|
||||
# from django.utils.translation import gettext_lazy as _
|
||||
|
||||
|
||||
# class CoreConfig(AppConfig):
|
||||
# """Configuration class for the impress core app."""
|
||||
|
||||
# name = "core"
|
||||
# app_label = "core"
|
||||
# verbose_name = _("impress core application")
|
||||
0
src/backend/core/authentication/__init__.py
Normal file
0
src/backend/core/authentication/__init__.py
Normal file
100
src/backend/core/authentication/backends.py
Normal file
100
src/backend/core/authentication/backends.py
Normal file
@@ -0,0 +1,100 @@
|
||||
"""Authentication Backends for the Impress core app."""
|
||||
|
||||
from django.core.exceptions import SuspiciousOperation
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
import requests
|
||||
from mozilla_django_oidc.auth import (
|
||||
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
|
||||
)
|
||||
|
||||
from core.models import User
|
||||
|
||||
|
||||
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
||||
"""Custom OpenID Connect (OIDC) Authentication Backend.
|
||||
|
||||
This class overrides the default OIDC Authentication Backend to accommodate differences
|
||||
in the User and Identity models, and handles signed and/or encrypted UserInfo response.
|
||||
"""
|
||||
|
||||
def get_userinfo(self, access_token, id_token, payload):
|
||||
"""Return user details dictionary.
|
||||
|
||||
Parameters:
|
||||
- access_token (str): The access token.
|
||||
- id_token (str): The id token (unused).
|
||||
- payload (dict): The token payload (unused).
|
||||
|
||||
Note: The id_token and payload parameters are unused in this implementation,
|
||||
but were kept to preserve base method signature.
|
||||
|
||||
Note: It handles signed and/or encrypted UserInfo Response. It is required by
|
||||
Agent Connect, which follows the OIDC standard. It forces us to override the
|
||||
base method, which deal with 'application/json' response.
|
||||
|
||||
Returns:
|
||||
- dict: User details dictionary obtained from the OpenID Connect user endpoint.
|
||||
"""
|
||||
|
||||
user_response = requests.get(
|
||||
self.OIDC_OP_USER_ENDPOINT,
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
verify=self.get_settings("OIDC_VERIFY_SSL", True),
|
||||
timeout=self.get_settings("OIDC_TIMEOUT", None),
|
||||
proxies=self.get_settings("OIDC_PROXY", None),
|
||||
)
|
||||
user_response.raise_for_status()
|
||||
userinfo = self.verify_token(user_response.text)
|
||||
return userinfo
|
||||
|
||||
def get_or_create_user(self, access_token, id_token, payload):
|
||||
"""Return a User based on userinfo. Get or create a new user if no user matches the Sub.
|
||||
|
||||
Parameters:
|
||||
- access_token (str): The access token.
|
||||
- id_token (str): The ID token.
|
||||
- payload (dict): The user payload.
|
||||
|
||||
Returns:
|
||||
- User: An existing or newly created User instance.
|
||||
|
||||
Raises:
|
||||
- Exception: Raised when user creation is not allowed and no existing user is found.
|
||||
"""
|
||||
|
||||
user_info = self.get_userinfo(access_token, id_token, payload)
|
||||
sub = user_info.get("sub")
|
||||
|
||||
if sub is None:
|
||||
raise SuspiciousOperation(
|
||||
_("User info contained no recognizable user identification")
|
||||
)
|
||||
|
||||
try:
|
||||
user = User.objects.get(sub=sub)
|
||||
except User.DoesNotExist:
|
||||
if self.get_settings("OIDC_CREATE_USER", True):
|
||||
user = self.create_user(user_info)
|
||||
else:
|
||||
user = None
|
||||
|
||||
return user
|
||||
|
||||
def create_user(self, claims):
|
||||
"""Return a newly created User instance."""
|
||||
|
||||
sub = claims.get("sub")
|
||||
|
||||
if sub is None:
|
||||
raise SuspiciousOperation(
|
||||
_("Claims contained no recognizable user identification")
|
||||
)
|
||||
|
||||
user = User.objects.create(
|
||||
sub=sub,
|
||||
email=claims.get("email"),
|
||||
password="!", # noqa: S106
|
||||
)
|
||||
|
||||
return user
|
||||
18
src/backend/core/authentication/urls.py
Normal file
18
src/backend/core/authentication/urls.py
Normal file
@@ -0,0 +1,18 @@
|
||||
"""Authentication URLs for the People core app."""
|
||||
|
||||
from django.urls import path
|
||||
|
||||
from mozilla_django_oidc.urls import urlpatterns as mozzila_oidc_urls
|
||||
|
||||
from .views import OIDCLogoutCallbackView, OIDCLogoutView
|
||||
|
||||
urlpatterns = [
|
||||
# Override the default 'logout/' path from Mozilla Django OIDC with our custom view.
|
||||
path("logout/", OIDCLogoutView.as_view(), name="oidc_logout_custom"),
|
||||
path(
|
||||
"logout-callback/",
|
||||
OIDCLogoutCallbackView.as_view(),
|
||||
name="oidc_logout_callback",
|
||||
),
|
||||
*mozzila_oidc_urls,
|
||||
]
|
||||
137
src/backend/core/authentication/views.py
Normal file
137
src/backend/core/authentication/views.py
Normal file
@@ -0,0 +1,137 @@
|
||||
"""Authentication Views for the People core app."""
|
||||
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from django.contrib import auth
|
||||
from django.core.exceptions import SuspiciousOperation
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.urls import reverse
|
||||
from django.utils import crypto
|
||||
|
||||
from mozilla_django_oidc.utils import (
|
||||
absolutify,
|
||||
)
|
||||
from mozilla_django_oidc.views import (
|
||||
OIDCLogoutView as MozillaOIDCOIDCLogoutView,
|
||||
)
|
||||
|
||||
|
||||
class OIDCLogoutView(MozillaOIDCOIDCLogoutView):
|
||||
"""Custom logout view for handling OpenID Connect (OIDC) logout flow.
|
||||
|
||||
Adds support for handling logout callbacks from the identity provider (OP)
|
||||
by initiating the logout flow if the user has an active session.
|
||||
|
||||
The Django session is retained during the logout process to persist the 'state' OIDC parameter.
|
||||
This parameter is crucial for maintaining the integrity of the logout flow between this call
|
||||
and the subsequent callback.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def persist_state(request, state):
|
||||
"""Persist the given 'state' parameter in the session's 'oidc_states' dictionary
|
||||
|
||||
This method is used to store the OIDC state parameter in the session, according to the
|
||||
structure expected by Mozilla Django OIDC's 'add_state_and_verifier_and_nonce_to_session'
|
||||
utility function.
|
||||
"""
|
||||
|
||||
if "oidc_states" not in request.session or not isinstance(
|
||||
request.session["oidc_states"], dict
|
||||
):
|
||||
request.session["oidc_states"] = {}
|
||||
|
||||
request.session["oidc_states"][state] = {}
|
||||
request.session.save()
|
||||
|
||||
def construct_oidc_logout_url(self, request):
|
||||
"""Create the redirect URL for interfacing with the OIDC provider.
|
||||
|
||||
Retrieves the necessary parameters from the session and constructs the URL
|
||||
required to initiate logout with the OpenID Connect provider.
|
||||
|
||||
If no ID token is found in the session, the logout flow will not be initiated,
|
||||
and the method will return the default redirect URL.
|
||||
|
||||
The 'state' parameter is generated randomly and persisted in the session to ensure
|
||||
its integrity during the subsequent callback.
|
||||
"""
|
||||
|
||||
oidc_logout_endpoint = self.get_settings("OIDC_OP_LOGOUT_ENDPOINT")
|
||||
|
||||
if not oidc_logout_endpoint:
|
||||
return self.redirect_url
|
||||
|
||||
reverse_url = reverse("oidc_logout_callback")
|
||||
id_token = request.session.get("oidc_id_token", None)
|
||||
|
||||
if not id_token:
|
||||
return self.redirect_url
|
||||
|
||||
query = {
|
||||
"id_token_hint": id_token,
|
||||
"state": crypto.get_random_string(self.get_settings("OIDC_STATE_SIZE", 32)),
|
||||
"post_logout_redirect_uri": absolutify(request, reverse_url),
|
||||
}
|
||||
|
||||
self.persist_state(request, query["state"])
|
||||
|
||||
return f"{oidc_logout_endpoint}?{urlencode(query)}"
|
||||
|
||||
def post(self, request):
|
||||
"""Handle user logout.
|
||||
|
||||
If the user is not authenticated, redirects to the default logout URL.
|
||||
Otherwise, constructs the OIDC logout URL and redirects the user to start
|
||||
the logout process.
|
||||
|
||||
If the user is redirected to the default logout URL, ensure her Django session
|
||||
is terminated.
|
||||
"""
|
||||
|
||||
logout_url = self.redirect_url
|
||||
|
||||
if request.user.is_authenticated:
|
||||
logout_url = self.construct_oidc_logout_url(request)
|
||||
|
||||
# If the user is not redirected to the OIDC provider, ensure logout
|
||||
if logout_url == self.redirect_url:
|
||||
auth.logout(request)
|
||||
|
||||
return HttpResponseRedirect(logout_url)
|
||||
|
||||
|
||||
class OIDCLogoutCallbackView(MozillaOIDCOIDCLogoutView):
|
||||
"""Custom view for handling the logout callback from the OpenID Connect (OIDC) provider.
|
||||
|
||||
Handles the callback after logout from the identity provider (OP).
|
||||
Verifies the state parameter and performs necessary logout actions.
|
||||
|
||||
The Django session is maintained during the logout process to ensure the integrity
|
||||
of the logout flow initiated in the previous step.
|
||||
"""
|
||||
|
||||
http_method_names = ["get"]
|
||||
|
||||
def get(self, request):
|
||||
"""Handle the logout callback.
|
||||
|
||||
If the user is not authenticated, redirects to the default logout URL.
|
||||
Otherwise, verifies the state parameter and performs necessary logout actions.
|
||||
"""
|
||||
|
||||
if not request.user.is_authenticated:
|
||||
return HttpResponseRedirect(self.redirect_url)
|
||||
|
||||
state = request.GET.get("state")
|
||||
|
||||
if state not in request.session.get("oidc_states", {}):
|
||||
msg = "OIDC callback state not found in session `oidc_states`!"
|
||||
raise SuspiciousOperation(msg)
|
||||
|
||||
del request.session["oidc_states"][state]
|
||||
request.session.save()
|
||||
|
||||
auth.logout(request)
|
||||
|
||||
return HttpResponseRedirect(self.redirect_url)
|
||||
15
src/backend/core/enums.py
Normal file
15
src/backend/core/enums.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""
|
||||
Core application enums declaration
|
||||
"""
|
||||
from django.conf import global_settings, settings
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
# Django sets `LANGUAGES` by default with all supported languages. We can use it for
|
||||
# the choice of languages which should not be limited to the few languages active in
|
||||
# the app.
|
||||
# pylint: disable=no-member
|
||||
ALL_LANGUAGES = getattr(
|
||||
settings,
|
||||
"ALL_LANGUAGES",
|
||||
[(language, _(name)) for language, name in global_settings.LANGUAGES],
|
||||
)
|
||||
25
src/backend/core/factories.py
Normal file
25
src/backend/core/factories.py
Normal file
@@ -0,0 +1,25 @@
|
||||
# ruff: noqa: S311
|
||||
"""
|
||||
Core application factories
|
||||
"""
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.hashers import make_password
|
||||
|
||||
import factory.fuzzy
|
||||
from faker import Faker
|
||||
|
||||
from core import models
|
||||
|
||||
fake = Faker()
|
||||
|
||||
|
||||
class UserFactory(factory.django.DjangoModelFactory):
|
||||
"""A factory to random users for testing purposes."""
|
||||
|
||||
class Meta:
|
||||
model = models.User
|
||||
|
||||
sub = factory.Sequence(lambda n: f"user{n!s}")
|
||||
email = factory.Faker("email")
|
||||
language = factory.fuzzy.FuzzyChoice([lang[0] for lang in settings.LANGUAGES])
|
||||
password = make_password("password")
|
||||
166
src/backend/core/migrations/0001_initial.py
Normal file
166
src/backend/core/migrations/0001_initial.py
Normal file
@@ -0,0 +1,166 @@
|
||||
# Generated by Django 5.0.3 on 2024-05-28 20:29
|
||||
|
||||
import django.contrib.auth.models
|
||||
import django.core.validators
|
||||
import django.db.models.deletion
|
||||
import timezone_field.fields
|
||||
import uuid
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('auth', '0012_alter_user_first_name_max_length'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Document',
|
||||
fields=[
|
||||
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
|
||||
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
|
||||
('title', models.CharField(max_length=255, verbose_name='title')),
|
||||
('is_public', models.BooleanField(default=False, help_text='Whether this document is public for anyone to use.', verbose_name='public')),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Document',
|
||||
'verbose_name_plural': 'Documents',
|
||||
'db_table': 'impress_document',
|
||||
'ordering': ('title',),
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Template',
|
||||
fields=[
|
||||
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
|
||||
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
|
||||
('title', models.CharField(max_length=255, verbose_name='title')),
|
||||
('description', models.TextField(blank=True, verbose_name='description')),
|
||||
('code', models.TextField(blank=True, verbose_name='code')),
|
||||
('css', models.TextField(blank=True, verbose_name='css')),
|
||||
('is_public', models.BooleanField(default=False, help_text='Whether this template is public for anyone to use.', verbose_name='public')),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Template',
|
||||
'verbose_name_plural': 'Templates',
|
||||
'db_table': 'impress_template',
|
||||
'ordering': ('title',),
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='User',
|
||||
fields=[
|
||||
('password', models.CharField(max_length=128, verbose_name='password')),
|
||||
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
|
||||
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
|
||||
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
|
||||
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
|
||||
('sub', models.CharField(blank=True, help_text='Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_ characters only.', max_length=255, null=True, unique=True, validators=[django.core.validators.RegexValidator(message='Enter a valid sub. This value may contain only letters, numbers, and @/./+/-/_ characters.', regex='^[\\w.@+-]+\\Z')], verbose_name='sub')),
|
||||
('email', models.EmailField(blank=True, max_length=254, null=True, verbose_name='identity email address')),
|
||||
('admin_email', models.EmailField(blank=True, max_length=254, null=True, unique=True, verbose_name='admin email address')),
|
||||
('language', models.CharField(choices="(('en-us', 'English'), ('fr-fr', 'French'))", default='en-us', help_text='The language in which the user wants to see the interface.', max_length=10, verbose_name='language')),
|
||||
('timezone', timezone_field.fields.TimeZoneField(choices_display='WITH_GMT_OFFSET', default='UTC', help_text='The timezone in which the user wants to see times.', use_pytz=False)),
|
||||
('is_device', models.BooleanField(default=False, help_text='Whether the user is a device or a real user.', verbose_name='device')),
|
||||
('is_staff', models.BooleanField(default=False, help_text='Whether the user can log into this admin site.', verbose_name='staff status')),
|
||||
('is_active', models.BooleanField(default=True, help_text='Whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
|
||||
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
|
||||
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'user',
|
||||
'verbose_name_plural': 'users',
|
||||
'db_table': 'impress_user',
|
||||
},
|
||||
managers=[
|
||||
('objects', django.contrib.auth.models.UserManager()),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='DocumentAccess',
|
||||
fields=[
|
||||
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
|
||||
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
|
||||
('team', models.CharField(blank=True, max_length=100)),
|
||||
('role', models.CharField(choices=[('reader', 'Reader'), ('editor', 'Editor'), ('administrator', 'Administrator'), ('owner', 'Owner')], default='reader', max_length=20)),
|
||||
('document', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='accesses', to='core.document')),
|
||||
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Document/user relation',
|
||||
'verbose_name_plural': 'Document/user relations',
|
||||
'db_table': 'impress_document_access',
|
||||
'ordering': ('-created_at',),
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Invitation',
|
||||
fields=[
|
||||
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
|
||||
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
|
||||
('email', models.EmailField(max_length=254, verbose_name='email address')),
|
||||
('role', models.CharField(choices=[('reader', 'Reader'), ('editor', 'Editor'), ('administrator', 'Administrator'), ('owner', 'Owner')], default='reader', max_length=20)),
|
||||
('document', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='invitations', to='core.document')),
|
||||
('issuer', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='invitations', to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Document invitation',
|
||||
'verbose_name_plural': 'Document invitations',
|
||||
'db_table': 'impress_invitation',
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='TemplateAccess',
|
||||
fields=[
|
||||
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
|
||||
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
|
||||
('team', models.CharField(blank=True, max_length=100)),
|
||||
('role', models.CharField(choices=[('reader', 'Reader'), ('editor', 'Editor'), ('administrator', 'Administrator'), ('owner', 'Owner')], default='reader', max_length=20)),
|
||||
('template', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='accesses', to='core.template')),
|
||||
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Template/user relation',
|
||||
'verbose_name_plural': 'Template/user relations',
|
||||
'db_table': 'impress_template_access',
|
||||
'ordering': ('-created_at',),
|
||||
},
|
||||
),
|
||||
migrations.AddConstraint(
|
||||
model_name='documentaccess',
|
||||
constraint=models.UniqueConstraint(condition=models.Q(('user__isnull', False)), fields=('user', 'document'), name='unique_document_user', violation_error_message='This user is already in this document.'),
|
||||
),
|
||||
migrations.AddConstraint(
|
||||
model_name='documentaccess',
|
||||
constraint=models.UniqueConstraint(condition=models.Q(('team__gt', '')), fields=('team', 'document'), name='unique_document_team', violation_error_message='This team is already in this document.'),
|
||||
),
|
||||
migrations.AddConstraint(
|
||||
model_name='documentaccess',
|
||||
constraint=models.CheckConstraint(check=models.Q(models.Q(('team', ''), ('user__isnull', False)), models.Q(('team__gt', ''), ('user__isnull', True)), _connector='OR'), name='check_document_access_either_user_or_team', violation_error_message='Either user or team must be set, not both.'),
|
||||
),
|
||||
migrations.AddConstraint(
|
||||
model_name='invitation',
|
||||
constraint=models.UniqueConstraint(fields=('email', 'document'), name='email_and_document_unique_together'),
|
||||
),
|
||||
migrations.AddConstraint(
|
||||
model_name='templateaccess',
|
||||
constraint=models.UniqueConstraint(condition=models.Q(('user__isnull', False)), fields=('user', 'template'), name='unique_template_user', violation_error_message='This user is already in this template.'),
|
||||
),
|
||||
migrations.AddConstraint(
|
||||
model_name='templateaccess',
|
||||
constraint=models.UniqueConstraint(condition=models.Q(('team__gt', '')), fields=('team', 'template'), name='unique_template_team', violation_error_message='This team is already in this template.'),
|
||||
),
|
||||
migrations.AddConstraint(
|
||||
model_name='templateaccess',
|
||||
constraint=models.CheckConstraint(check=models.Q(models.Q(('team', ''), ('user__isnull', False)), models.Q(('team__gt', ''), ('user__isnull', True)), _connector='OR'), name='check_template_access_either_user_or_team', violation_error_message='Either user or team must be set, not both.'),
|
||||
),
|
||||
]
|
||||
14
src/backend/core/migrations/0002_create_pg_trgm_extension.py
Normal file
14
src/backend/core/migrations/0002_create_pg_trgm_extension.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from django.db import migrations
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('core', '0001_initial'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RunSQL(
|
||||
"CREATE EXTENSION IF NOT EXISTS pg_trgm;",
|
||||
reverse_sql="DROP EXTENSION IF EXISTS pg_trgm;",
|
||||
),
|
||||
]
|
||||
0
src/backend/core/migrations/__init__.py
Normal file
0
src/backend/core/migrations/__init__.py
Normal file
143
src/backend/core/models.py
Normal file
143
src/backend/core/models.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""
|
||||
Declare and configure the models for the impress core application
|
||||
"""
|
||||
import uuid
|
||||
from logging import getLogger
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import models as auth_models
|
||||
from django.contrib.auth.base_user import AbstractBaseUser
|
||||
from django.core import mail, validators
|
||||
from django.db import models
|
||||
from django.utils.functional import lazy
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from timezone_field import TimeZoneField
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
class BaseModel(models.Model):
|
||||
"""
|
||||
Serves as an abstract base model for other models, ensuring that records are validated
|
||||
before saving as Django doesn't do it by default.
|
||||
|
||||
Includes fields common to all models: a UUID primary key and creation/update timestamps.
|
||||
"""
|
||||
|
||||
id = models.UUIDField(
|
||||
verbose_name=_("id"),
|
||||
help_text=_("primary key for the record as UUID"),
|
||||
primary_key=True,
|
||||
default=uuid.uuid4,
|
||||
editable=False,
|
||||
)
|
||||
created_at = models.DateTimeField(
|
||||
verbose_name=_("created on"),
|
||||
help_text=_("date and time at which a record was created"),
|
||||
auto_now_add=True,
|
||||
editable=False,
|
||||
)
|
||||
updated_at = models.DateTimeField(
|
||||
verbose_name=_("updated on"),
|
||||
help_text=_("date and time at which a record was last updated"),
|
||||
auto_now=True,
|
||||
editable=False,
|
||||
)
|
||||
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
"""Call `full_clean` before saving."""
|
||||
self.full_clean()
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
|
||||
class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
|
||||
"""User model to work with OIDC only authentication."""
|
||||
|
||||
sub_validator = validators.RegexValidator(
|
||||
regex=r"^[\w.@+-]+\Z",
|
||||
message=_(
|
||||
"Enter a valid sub. This value may contain only letters, "
|
||||
"numbers, and @/./+/-/_ characters."
|
||||
),
|
||||
)
|
||||
|
||||
sub = models.CharField(
|
||||
_("sub"),
|
||||
help_text=_(
|
||||
"Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_ characters only."
|
||||
),
|
||||
max_length=255,
|
||||
unique=True,
|
||||
validators=[sub_validator],
|
||||
blank=True,
|
||||
null=True,
|
||||
)
|
||||
email = models.EmailField(_("identity email address"), blank=True, null=True)
|
||||
|
||||
# Unlike the "email" field which stores the email coming from the OIDC token, this field
|
||||
# stores the email used by staff users to login to the admin site
|
||||
admin_email = models.EmailField(
|
||||
_("admin email address"), unique=True, blank=True, null=True
|
||||
)
|
||||
|
||||
language = models.CharField(
|
||||
max_length=10,
|
||||
choices=lazy(lambda: settings.LANGUAGES, tuple)(),
|
||||
default=settings.LANGUAGE_CODE,
|
||||
verbose_name=_("language"),
|
||||
help_text=_("The language in which the user wants to see the interface."),
|
||||
)
|
||||
timezone = TimeZoneField(
|
||||
choices_display="WITH_GMT_OFFSET",
|
||||
use_pytz=False,
|
||||
default=settings.TIME_ZONE,
|
||||
help_text=_("The timezone in which the user wants to see times."),
|
||||
)
|
||||
is_device = models.BooleanField(
|
||||
_("device"),
|
||||
default=False,
|
||||
help_text=_("Whether the user is a device or a real user."),
|
||||
)
|
||||
is_staff = models.BooleanField(
|
||||
_("staff status"),
|
||||
default=False,
|
||||
help_text=_("Whether the user can log into this admin site."),
|
||||
)
|
||||
is_active = models.BooleanField(
|
||||
_("active"),
|
||||
default=True,
|
||||
help_text=_(
|
||||
"Whether this user should be treated as active. "
|
||||
"Unselect this instead of deleting accounts."
|
||||
),
|
||||
)
|
||||
|
||||
objects = auth_models.UserManager()
|
||||
|
||||
USERNAME_FIELD = "admin_email"
|
||||
REQUIRED_FIELDS = []
|
||||
|
||||
class Meta:
|
||||
db_table = "impress_user"
|
||||
verbose_name = _("user")
|
||||
verbose_name_plural = _("users")
|
||||
|
||||
def __str__(self):
|
||||
return self.email or self.admin_email or str(self.id)
|
||||
|
||||
def email_user(self, subject, message, from_email=None, **kwargs):
|
||||
"""Email this user."""
|
||||
if not self.email:
|
||||
raise ValueError("User has no email address.")
|
||||
mail.send_mail(subject, message, from_email, [self.email], **kwargs)
|
||||
|
||||
def get_teams(self):
|
||||
"""
|
||||
Get list of teams in which the user is, as a list of strings.
|
||||
Must be cached if retrieved remotely.
|
||||
"""
|
||||
return []
|
||||
BIN
src/backend/core/static/images/logo-suite-numerique.png
Normal file
BIN
src/backend/core/static/images/logo-suite-numerique.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 13 KiB |
BIN
src/backend/core/static/images/logo.png
Normal file
BIN
src/backend/core/static/images/logo.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.2 KiB |
BIN
src/backend/core/static/images/mail-header-background.png
Normal file
BIN
src/backend/core/static/images/mail-header-background.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 40 KiB |
14
src/backend/core/templates/core/generate_document.html
Normal file
14
src/backend/core/templates/core/generate_document.html
Normal file
@@ -0,0 +1,14 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Generate Document</title>
|
||||
</head>
|
||||
<body>
|
||||
<h2>Generate Document</h2>
|
||||
<form method="post" enctype="multipart/form-data">
|
||||
{% csrf_token %}
|
||||
{{ form.as_p }}
|
||||
<button type="submit">Generate PDF</button>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
||||
0
src/backend/core/templatetags/__init__.py
Normal file
0
src/backend/core/templatetags/__init__.py
Normal file
58
src/backend/core/templatetags/extra_tags.py
Normal file
58
src/backend/core/templatetags/extra_tags.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""Custom template tags for the core application of People."""
|
||||
|
||||
import base64
|
||||
|
||||
from django import template
|
||||
from django.contrib.staticfiles import finders
|
||||
|
||||
from PIL import ImageFile as PillowImageFile
|
||||
|
||||
register = template.Library()
|
||||
|
||||
|
||||
def image_to_base64(file_or_path, close=False):
|
||||
"""
|
||||
Return the src string of the base64 encoding of an image represented by its path
|
||||
or file opened or not.
|
||||
|
||||
Inspired by Django's "get_image_dimensions"
|
||||
"""
|
||||
pil_parser = PillowImageFile.Parser()
|
||||
if hasattr(file_or_path, "read"):
|
||||
file = file_or_path
|
||||
if file.closed and hasattr(file, "open"):
|
||||
file_or_path.open()
|
||||
file_pos = file.tell()
|
||||
file.seek(0)
|
||||
else:
|
||||
try:
|
||||
# pylint: disable=consider-using-with
|
||||
file = open(file_or_path, "rb")
|
||||
except OSError:
|
||||
return ""
|
||||
close = True
|
||||
|
||||
try:
|
||||
image_data = file.read()
|
||||
if not image_data:
|
||||
return ""
|
||||
pil_parser.feed(image_data)
|
||||
if pil_parser.image:
|
||||
mime_type = pil_parser.image.get_format_mimetype()
|
||||
encoded_string = base64.b64encode(image_data)
|
||||
return f"data:{mime_type:s};base64, {encoded_string.decode('utf-8'):s}"
|
||||
return ""
|
||||
finally:
|
||||
if close:
|
||||
file.close()
|
||||
else:
|
||||
file.seek(file_pos)
|
||||
|
||||
|
||||
@register.simple_tag
|
||||
def base64_static(path):
|
||||
"""Return a static file into a base64."""
|
||||
full_path = finders.find(path)
|
||||
if full_path:
|
||||
return image_to_base64(full_path, True)
|
||||
return ""
|
||||
0
src/backend/core/tests/__init__.py
Normal file
0
src/backend/core/tests/__init__.py
Normal file
0
src/backend/core/tests/authentication/__init__.py
Normal file
0
src/backend/core/tests/authentication/__init__.py
Normal file
101
src/backend/core/tests/authentication/test_backends.py
Normal file
101
src/backend/core/tests/authentication/test_backends.py
Normal file
@@ -0,0 +1,101 @@
|
||||
"""Unit tests for the Authentication Backends."""
|
||||
|
||||
from django.core.exceptions import SuspiciousOperation
|
||||
|
||||
import pytest
|
||||
|
||||
from core import models
|
||||
from core.authentication.backends import OIDCAuthenticationBackend
|
||||
from core.factories import UserFactory
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
def test_authentication_getter_existing_user_no_email(
|
||||
django_assert_num_queries, monkeypatch
|
||||
):
|
||||
"""
|
||||
If an existing user matches the user's info sub, the user should be returned.
|
||||
"""
|
||||
|
||||
klass = OIDCAuthenticationBackend()
|
||||
db_user = UserFactory()
|
||||
|
||||
def get_userinfo_mocked(*args):
|
||||
return {"sub": db_user.sub}
|
||||
|
||||
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
|
||||
|
||||
with django_assert_num_queries(1):
|
||||
user = klass.get_or_create_user(
|
||||
access_token="test-token", id_token=None, payload=None
|
||||
)
|
||||
|
||||
assert user == db_user
|
||||
|
||||
|
||||
def test_authentication_getter_new_user_no_email(monkeypatch):
|
||||
"""
|
||||
If no user matches the user's info sub, a user should be created.
|
||||
User's info doesn't contain an email, created user's email should be empty.
|
||||
"""
|
||||
klass = OIDCAuthenticationBackend()
|
||||
|
||||
def get_userinfo_mocked(*args):
|
||||
return {"sub": "123"}
|
||||
|
||||
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
|
||||
|
||||
user = klass.get_or_create_user(
|
||||
access_token="test-token", id_token=None, payload=None
|
||||
)
|
||||
|
||||
assert user.sub == "123"
|
||||
assert user.email is None
|
||||
assert user.password == "!"
|
||||
assert models.User.objects.count() == 1
|
||||
|
||||
|
||||
def test_authentication_getter_new_user_with_email(monkeypatch):
|
||||
"""
|
||||
If no user matches the user's info sub, a user should be created.
|
||||
User's email and name should be set on the identity.
|
||||
The "email" field on the User model should not be set as it is reserved for staff users.
|
||||
"""
|
||||
klass = OIDCAuthenticationBackend()
|
||||
|
||||
email = "impress@example.com"
|
||||
|
||||
def get_userinfo_mocked(*args):
|
||||
return {"sub": "123", "email": email, "first_name": "John", "last_name": "Doe"}
|
||||
|
||||
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
|
||||
|
||||
user = klass.get_or_create_user(
|
||||
access_token="test-token", id_token=None, payload=None
|
||||
)
|
||||
|
||||
assert user.sub == "123"
|
||||
assert user.email == email
|
||||
assert user.password == "!"
|
||||
assert models.User.objects.count() == 1
|
||||
|
||||
|
||||
def test_models_oidc_user_getter_invalid_token(django_assert_num_queries, monkeypatch):
|
||||
"""The user's info doesn't contain a sub."""
|
||||
klass = OIDCAuthenticationBackend()
|
||||
|
||||
def get_userinfo_mocked(*args):
|
||||
return {
|
||||
"test": "123",
|
||||
}
|
||||
|
||||
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
|
||||
|
||||
with django_assert_num_queries(0), pytest.raises(
|
||||
SuspiciousOperation,
|
||||
match="User info contained no recognizable user identification",
|
||||
):
|
||||
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
|
||||
|
||||
assert models.User.objects.exists() is False
|
||||
10
src/backend/core/tests/authentication/test_urls.py
Normal file
10
src/backend/core/tests/authentication/test_urls.py
Normal file
@@ -0,0 +1,10 @@
|
||||
"""Unit tests for the Authentication URLs."""
|
||||
|
||||
from core.authentication.urls import urlpatterns
|
||||
|
||||
|
||||
def test_urls_override_default_mozilla_django_oidc():
|
||||
"""Custom URL patterns should override default ones from Mozilla Django OIDC."""
|
||||
|
||||
url_names = [u.name for u in urlpatterns]
|
||||
assert url_names.index("oidc_logout_custom") < url_names.index("oidc_logout")
|
||||
231
src/backend/core/tests/authentication/test_views.py
Normal file
231
src/backend/core/tests/authentication/test_views.py
Normal file
@@ -0,0 +1,231 @@
|
||||
"""Unit tests for the Authentication Views."""
|
||||
|
||||
from unittest import mock
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
from django.contrib.sessions.middleware import SessionMiddleware
|
||||
from django.core.exceptions import SuspiciousOperation
|
||||
from django.test import RequestFactory
|
||||
from django.test.utils import override_settings
|
||||
from django.urls import reverse
|
||||
from django.utils import crypto
|
||||
|
||||
import pytest
|
||||
from rest_framework.test import APIClient
|
||||
|
||||
from core import factories
|
||||
from core.authentication.views import OIDCLogoutCallbackView, OIDCLogoutView
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
@override_settings(LOGOUT_REDIRECT_URL="/example-logout")
|
||||
def test_view_logout_anonymous():
|
||||
"""Anonymous users calling the logout url,
|
||||
should be redirected to the specified LOGOUT_REDIRECT_URL."""
|
||||
|
||||
url = reverse("oidc_logout_custom")
|
||||
response = APIClient().get(url)
|
||||
|
||||
assert response.status_code == 302
|
||||
assert response.url == "/example-logout"
|
||||
|
||||
|
||||
@mock.patch.object(
|
||||
OIDCLogoutView, "construct_oidc_logout_url", return_value="/example-logout"
|
||||
)
|
||||
def test_view_logout(mocked_oidc_logout_url):
|
||||
"""Authenticated users should be redirected to OIDC provider for logout."""
|
||||
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
url = reverse("oidc_logout_custom")
|
||||
response = client.get(url)
|
||||
|
||||
mocked_oidc_logout_url.assert_called_once()
|
||||
|
||||
assert response.status_code == 302
|
||||
assert response.url == "/example-logout"
|
||||
|
||||
|
||||
@override_settings(LOGOUT_REDIRECT_URL="/default-redirect-logout")
|
||||
@mock.patch.object(
|
||||
OIDCLogoutView, "construct_oidc_logout_url", return_value="/default-redirect-logout"
|
||||
)
|
||||
def test_view_logout_no_oidc_provider(mocked_oidc_logout_url):
|
||||
"""Authenticated users should be logged out when no OIDC provider is available."""
|
||||
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
url = reverse("oidc_logout_custom")
|
||||
|
||||
with mock.patch("mozilla_django_oidc.views.auth.logout") as mock_logout:
|
||||
response = client.get(url)
|
||||
mocked_oidc_logout_url.assert_called_once()
|
||||
mock_logout.assert_called_once()
|
||||
|
||||
assert response.status_code == 302
|
||||
assert response.url == "/default-redirect-logout"
|
||||
|
||||
|
||||
@override_settings(LOGOUT_REDIRECT_URL="/example-logout")
|
||||
def test_view_logout_callback_anonymous():
|
||||
"""Anonymous users calling the logout callback url,
|
||||
should be redirected to the specified LOGOUT_REDIRECT_URL."""
|
||||
|
||||
url = reverse("oidc_logout_callback")
|
||||
response = APIClient().get(url)
|
||||
|
||||
assert response.status_code == 302
|
||||
assert response.url == "/example-logout"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"initial_oidc_states",
|
||||
[{}, {"other_state": "foo"}],
|
||||
)
|
||||
def test_view_logout_persist_state(initial_oidc_states):
|
||||
"""State value should be persisted in session's data."""
|
||||
|
||||
user = factories.UserFactory()
|
||||
|
||||
request = RequestFactory().request()
|
||||
request.user = user
|
||||
|
||||
middleware = SessionMiddleware(get_response=lambda x: x)
|
||||
middleware.process_request(request)
|
||||
|
||||
if initial_oidc_states:
|
||||
request.session["oidc_states"] = initial_oidc_states
|
||||
request.session.save()
|
||||
|
||||
mocked_state = "mock_state"
|
||||
|
||||
OIDCLogoutView().persist_state(request, mocked_state)
|
||||
|
||||
assert "oidc_states" in request.session
|
||||
assert request.session["oidc_states"] == {
|
||||
"mock_state": {},
|
||||
**initial_oidc_states,
|
||||
}
|
||||
|
||||
|
||||
@override_settings(OIDC_OP_LOGOUT_ENDPOINT="/example-logout")
|
||||
@mock.patch.object(OIDCLogoutView, "persist_state")
|
||||
@mock.patch.object(crypto, "get_random_string", return_value="mocked_state")
|
||||
def test_view_logout_construct_oidc_logout_url(
|
||||
mocked_get_random_string, mocked_persist_state
|
||||
):
|
||||
"""Should construct the logout URL to initiate the logout flow with the OIDC provider."""
|
||||
|
||||
user = factories.UserFactory()
|
||||
|
||||
request = RequestFactory().request()
|
||||
request.user = user
|
||||
|
||||
middleware = SessionMiddleware(get_response=lambda x: x)
|
||||
middleware.process_request(request)
|
||||
|
||||
request.session["oidc_id_token"] = "mocked_oidc_id_token"
|
||||
request.session.save()
|
||||
|
||||
redirect_url = OIDCLogoutView().construct_oidc_logout_url(request)
|
||||
|
||||
mocked_persist_state.assert_called_once()
|
||||
mocked_get_random_string.assert_called_once()
|
||||
|
||||
params = parse_qs(urlparse(redirect_url).query)
|
||||
|
||||
assert params["id_token_hint"][0] == "mocked_oidc_id_token"
|
||||
assert params["state"][0] == "mocked_state"
|
||||
|
||||
url = reverse("oidc_logout_callback")
|
||||
assert url in params["post_logout_redirect_uri"][0]
|
||||
|
||||
|
||||
@override_settings(LOGOUT_REDIRECT_URL="/")
|
||||
def test_view_logout_construct_oidc_logout_url_none_id_token():
|
||||
"""If no ID token is available in the session,
|
||||
the user should be redirected to the final URL."""
|
||||
|
||||
user = factories.UserFactory()
|
||||
|
||||
request = RequestFactory().request()
|
||||
request.user = user
|
||||
|
||||
middleware = SessionMiddleware(get_response=lambda x: x)
|
||||
middleware.process_request(request)
|
||||
|
||||
redirect_url = OIDCLogoutView().construct_oidc_logout_url(request)
|
||||
|
||||
assert redirect_url == "/"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"initial_state",
|
||||
[None, {"other_state": "foo"}],
|
||||
)
|
||||
def test_view_logout_callback_wrong_state(initial_state):
|
||||
"""Should raise an error if OIDC state doesn't match session data."""
|
||||
|
||||
user = factories.UserFactory()
|
||||
|
||||
request = RequestFactory().request()
|
||||
request.user = user
|
||||
|
||||
middleware = SessionMiddleware(get_response=lambda x: x)
|
||||
middleware.process_request(request)
|
||||
|
||||
if initial_state:
|
||||
request.session["oidc_states"] = initial_state
|
||||
request.session.save()
|
||||
|
||||
callback_view = OIDCLogoutCallbackView.as_view()
|
||||
|
||||
with pytest.raises(SuspiciousOperation) as excinfo:
|
||||
callback_view(request)
|
||||
|
||||
assert (
|
||||
str(excinfo.value) == "OIDC callback state not found in session `oidc_states`!"
|
||||
)
|
||||
|
||||
|
||||
@override_settings(LOGOUT_REDIRECT_URL="/example-logout")
|
||||
def test_view_logout_callback():
|
||||
"""If state matches, callback should clear OIDC state and redirects."""
|
||||
|
||||
user = factories.UserFactory()
|
||||
|
||||
request = RequestFactory().get("/logout-callback/", data={"state": "mocked_state"})
|
||||
request.user = user
|
||||
|
||||
middleware = SessionMiddleware(get_response=lambda x: x)
|
||||
middleware.process_request(request)
|
||||
|
||||
mocked_state = "mocked_state"
|
||||
|
||||
request.session["oidc_states"] = {mocked_state: {}}
|
||||
request.session.save()
|
||||
|
||||
callback_view = OIDCLogoutCallbackView.as_view()
|
||||
|
||||
with mock.patch("mozilla_django_oidc.views.auth.logout") as mock_logout:
|
||||
|
||||
def clear_user(request):
|
||||
# Assert state is cleared prior to logout
|
||||
assert request.session["oidc_states"] == {}
|
||||
request.user = AnonymousUser()
|
||||
|
||||
mock_logout.side_effect = clear_user
|
||||
response = callback_view(request)
|
||||
mock_logout.assert_called_once()
|
||||
|
||||
assert response.status_code == 302
|
||||
assert response.url == "/example-logout"
|
||||
15
src/backend/core/tests/conftest.py
Normal file
15
src/backend/core/tests/conftest.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""Fixtures for tests in the impress core application"""
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
USER = "user"
|
||||
TEAM = "team"
|
||||
VIA = [USER, TEAM]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_user_get_teams():
|
||||
"""Mock for the "get_teams" method on the User model."""
|
||||
with mock.patch("core.models.User.get_teams") as mock_get_teams:
|
||||
yield mock_get_teams
|
||||
0
src/backend/core/tests/swagger/__init__.py
Normal file
0
src/backend/core/tests/swagger/__init__.py
Normal file
41
src/backend/core/tests/swagger/test_openapi_schema.py
Normal file
41
src/backend/core/tests/swagger/test_openapi_schema.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""
|
||||
Test suite for generated openapi schema.
|
||||
"""
|
||||
import json
|
||||
from io import StringIO
|
||||
|
||||
from django.core.management import call_command
|
||||
from django.test import Client
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
def test_openapi_client_schema():
|
||||
"""
|
||||
Generated and served OpenAPI client schema should be correct.
|
||||
"""
|
||||
# Start by generating the swagger.json file
|
||||
output = StringIO()
|
||||
call_command(
|
||||
"spectacular",
|
||||
"--api-version",
|
||||
"v1.0",
|
||||
"--urlconf",
|
||||
"core.urls",
|
||||
"--format",
|
||||
"openapi-json",
|
||||
"--file",
|
||||
"core/tests/swagger/swagger.json",
|
||||
stdout=output,
|
||||
)
|
||||
assert output.getvalue() == ""
|
||||
|
||||
response = Client().get("/v1.0/swagger.json")
|
||||
|
||||
assert response.status_code == 200
|
||||
with open(
|
||||
"core/tests/swagger/swagger.json", "r", encoding="utf-8"
|
||||
) as expected_schema:
|
||||
assert response.json() == json.load(expected_schema)
|
||||
417
src/backend/core/tests/test_api_users.py
Normal file
417
src/backend/core/tests/test_api_users.py
Normal file
@@ -0,0 +1,417 @@
|
||||
"""
|
||||
Test users API endpoints in the impress core app.
|
||||
"""
|
||||
import pytest
|
||||
from rest_framework.test import APIClient
|
||||
|
||||
from core import factories, models
|
||||
from core.api import serializers
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
def test_api_users_list_anonymous():
|
||||
"""Anonymous users should not be allowed to list users."""
|
||||
factories.UserFactory()
|
||||
client = APIClient()
|
||||
response = client.get("/api/v1.0/users/")
|
||||
assert response.status_code == 401
|
||||
assert response.json() == {
|
||||
"detail": "Authentication credentials were not provided."
|
||||
}
|
||||
|
||||
|
||||
def test_api_users_list_authenticated():
|
||||
"""
|
||||
Authenticated users should be able to list users.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
factories.UserFactory.create_batch(2)
|
||||
response = client.get(
|
||||
"/api/v1.0/users/",
|
||||
)
|
||||
assert response.status_code == 200
|
||||
content = response.json()
|
||||
assert len(content["results"]) == 3
|
||||
|
||||
|
||||
def test_api_users_list_query_email():
|
||||
"""
|
||||
Authenticated users should be able to list users
|
||||
and filter by email.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
dave = factories.UserFactory(email="david.bowman@work.com")
|
||||
nicole = factories.UserFactory(email="nicole_foole@work.com")
|
||||
frank = factories.UserFactory(email="frank_poole@work.com")
|
||||
factories.UserFactory(email="heywood_floyd@work.com")
|
||||
|
||||
response = client.get(
|
||||
"/api/v1.0/users/?q=david.bowman@work.com",
|
||||
)
|
||||
assert response.status_code == 200
|
||||
user_ids = [user["id"] for user in response.json()["results"]]
|
||||
assert user_ids == [str(dave.id)]
|
||||
|
||||
response = client.get("/api/v1.0/users/?q=oole")
|
||||
|
||||
assert response.status_code == 200
|
||||
user_ids = [user["id"] for user in response.json()["results"]]
|
||||
assert user_ids == [str(nicole.id), str(frank.id)]
|
||||
|
||||
|
||||
def test_api_users_retrieve_me_anonymous():
|
||||
"""Anonymous users should not be allowed to list users."""
|
||||
factories.UserFactory.create_batch(2)
|
||||
client = APIClient()
|
||||
response = client.get("/api/v1.0/users/me/")
|
||||
assert response.status_code == 401
|
||||
assert response.json() == {
|
||||
"detail": "Authentication credentials were not provided."
|
||||
}
|
||||
|
||||
|
||||
def test_api_users_retrieve_me_authenticated():
|
||||
"""Authenticated users should be able to retrieve their own user via the "/users/me" path."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
factories.UserFactory.create_batch(2)
|
||||
response = client.get(
|
||||
"/api/v1.0/users/me/",
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {
|
||||
"id": str(user.id),
|
||||
"email": user.email,
|
||||
}
|
||||
|
||||
|
||||
def test_api_users_retrieve_anonymous():
|
||||
"""Anonymous users should not be allowed to retrieve a user."""
|
||||
client = APIClient()
|
||||
user = factories.UserFactory()
|
||||
response = client.get(f"/api/v1.0/users/{user.id!s}/")
|
||||
|
||||
assert response.status_code == 401
|
||||
assert response.json() == {
|
||||
"detail": "Authentication credentials were not provided."
|
||||
}
|
||||
|
||||
|
||||
def test_api_users_retrieve_authenticated_self():
|
||||
"""
|
||||
Authenticated users should be allowed to retrieve their own user.
|
||||
The returned object should not contain the password.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
response = client.get(
|
||||
f"/api/v1.0/users/{user.id!s}/",
|
||||
)
|
||||
assert response.status_code == 405
|
||||
assert response.json() == {"detail": 'Method "GET" not allowed.'}
|
||||
|
||||
|
||||
def test_api_users_retrieve_authenticated_other():
|
||||
"""
|
||||
Authenticated users should be able to retrieve another user's detail view with
|
||||
limited information.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
other_user = factories.UserFactory()
|
||||
|
||||
response = client.get(
|
||||
f"/api/v1.0/users/{other_user.id!s}/",
|
||||
)
|
||||
assert response.status_code == 405
|
||||
assert response.json() == {"detail": 'Method "GET" not allowed.'}
|
||||
|
||||
|
||||
def test_api_users_create_anonymous():
|
||||
"""Anonymous users should not be able to create users via the API."""
|
||||
response = APIClient().post(
|
||||
"/api/v1.0/users/",
|
||||
{
|
||||
"language": "fr-fr",
|
||||
"password": "mypassword",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
assert response.json() == {
|
||||
"detail": "Authentication credentials were not provided."
|
||||
}
|
||||
assert models.User.objects.exists() is False
|
||||
|
||||
|
||||
def test_api_users_create_authenticated():
|
||||
"""Authenticated users should not be able to create users via the API."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
response = client.post(
|
||||
"/api/v1.0/users/",
|
||||
{
|
||||
"language": "fr-fr",
|
||||
"password": "mypassword",
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
assert response.status_code == 405
|
||||
assert response.json() == {"detail": 'Method "POST" not allowed.'}
|
||||
assert models.User.objects.exclude(id=user.id).exists() is False
|
||||
|
||||
|
||||
def test_api_users_update_anonymous():
|
||||
"""Anonymous users should not be able to update users via the API."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
old_user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
new_user_values = serializers.UserSerializer(instance=factories.UserFactory()).data
|
||||
|
||||
response = APIClient().put(
|
||||
f"/api/v1.0/users/{user.id!s}/",
|
||||
new_user_values,
|
||||
format="json",
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
assert response.json() == {
|
||||
"detail": "Authentication credentials were not provided."
|
||||
}
|
||||
|
||||
user.refresh_from_db()
|
||||
user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
for key, value in user_values.items():
|
||||
assert value == old_user_values[key]
|
||||
|
||||
|
||||
def test_api_users_update_authenticated_self():
|
||||
"""
|
||||
Authenticated users should be able to update their own user but only "language"
|
||||
and "timezone" fields.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
old_user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
new_user_values = dict(
|
||||
serializers.UserSerializer(instance=factories.UserFactory()).data
|
||||
)
|
||||
|
||||
response = client.put(
|
||||
f"/api/v1.0/users/{user.id!s}/",
|
||||
new_user_values,
|
||||
format="json",
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
user.refresh_from_db()
|
||||
user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
for key, value in user_values.items():
|
||||
if key in ["language", "timezone"]:
|
||||
assert value == new_user_values[key]
|
||||
else:
|
||||
assert value == old_user_values[key]
|
||||
|
||||
|
||||
def test_api_users_update_authenticated_other():
|
||||
"""Authenticated users should not be allowed to update other users."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
user = factories.UserFactory()
|
||||
old_user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
new_user_values = serializers.UserSerializer(instance=factories.UserFactory()).data
|
||||
|
||||
response = client.put(
|
||||
f"/api/v1.0/users/{user.id!s}/",
|
||||
new_user_values,
|
||||
format="json",
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
user.refresh_from_db()
|
||||
user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
for key, value in user_values.items():
|
||||
assert value == old_user_values[key]
|
||||
|
||||
|
||||
def test_api_users_patch_anonymous():
|
||||
"""Anonymous users should not be able to patch users via the API."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
old_user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
new_user_values = dict(
|
||||
serializers.UserSerializer(instance=factories.UserFactory()).data
|
||||
)
|
||||
|
||||
for key, new_value in new_user_values.items():
|
||||
response = APIClient().patch(
|
||||
f"/api/v1.0/users/{user.id!s}/",
|
||||
{key: new_value},
|
||||
format="json",
|
||||
)
|
||||
assert response.status_code == 401
|
||||
assert response.json() == {
|
||||
"detail": "Authentication credentials were not provided."
|
||||
}
|
||||
|
||||
user.refresh_from_db()
|
||||
user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
for key, value in user_values.items():
|
||||
assert value == old_user_values[key]
|
||||
|
||||
|
||||
def test_api_users_patch_authenticated_self():
|
||||
"""
|
||||
Authenticated users should be able to patch their own user but only "language"
|
||||
and "timezone" fields.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
old_user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
new_user_values = dict(
|
||||
serializers.UserSerializer(instance=factories.UserFactory()).data
|
||||
)
|
||||
|
||||
for key, new_value in new_user_values.items():
|
||||
response = client.patch(
|
||||
f"/api/v1.0/users/{user.id!s}/",
|
||||
{key: new_value},
|
||||
format="json",
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
user.refresh_from_db()
|
||||
user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
for key, value in user_values.items():
|
||||
if key in ["language", "timezone"]:
|
||||
assert value == new_user_values[key]
|
||||
else:
|
||||
assert value == old_user_values[key]
|
||||
|
||||
|
||||
def test_api_users_patch_authenticated_other():
|
||||
"""Authenticated users should not be allowed to patch other users."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
user = factories.UserFactory()
|
||||
old_user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
new_user_values = dict(
|
||||
serializers.UserSerializer(instance=factories.UserFactory()).data
|
||||
)
|
||||
|
||||
for key, new_value in new_user_values.items():
|
||||
response = client.put(
|
||||
f"/api/v1.0/users/{user.id!s}/",
|
||||
{key: new_value},
|
||||
format="json",
|
||||
)
|
||||
assert response.status_code == 403
|
||||
|
||||
user.refresh_from_db()
|
||||
user_values = dict(serializers.UserSerializer(instance=user).data)
|
||||
for key, value in user_values.items():
|
||||
assert value == old_user_values[key]
|
||||
|
||||
|
||||
def test_api_users_delete_list_anonymous():
|
||||
"""Anonymous users should not be allowed to delete a list of users."""
|
||||
factories.UserFactory.create_batch(2)
|
||||
|
||||
client = APIClient()
|
||||
response = client.delete("/api/v1.0/users/")
|
||||
|
||||
assert response.status_code == 401
|
||||
assert models.User.objects.count() == 2
|
||||
|
||||
|
||||
def test_api_users_delete_list_authenticated():
|
||||
"""Authenticated users should not be allowed to delete a list of users."""
|
||||
factories.UserFactory.create_batch(2)
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
response = client.delete(
|
||||
"/api/v1.0/users/",
|
||||
)
|
||||
|
||||
assert response.status_code == 405
|
||||
assert models.User.objects.count() == 3
|
||||
|
||||
|
||||
def test_api_users_delete_anonymous():
|
||||
"""Anonymous users should not be allowed to delete a user."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
response = APIClient().delete(f"/api/v1.0/users/{user.id!s}/")
|
||||
|
||||
assert response.status_code == 401
|
||||
assert models.User.objects.count() == 1
|
||||
|
||||
|
||||
def test_api_users_delete_authenticated():
|
||||
"""
|
||||
Authenticated users should not be allowed to delete a user other than themselves.
|
||||
"""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
other_user = factories.UserFactory()
|
||||
|
||||
response = client.delete(
|
||||
f"/api/v1.0/users/{other_user.id!s}/",
|
||||
)
|
||||
|
||||
assert response.status_code == 405
|
||||
assert models.User.objects.count() == 2
|
||||
|
||||
|
||||
def test_api_users_delete_self():
|
||||
"""Authenticated users should not be able to delete their own user."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
client = APIClient()
|
||||
client.force_login(user)
|
||||
|
||||
response = client.delete(
|
||||
f"/api/v1.0/users/{user.id!s}/",
|
||||
)
|
||||
|
||||
assert response.status_code == 405
|
||||
assert models.User.objects.count() == 1
|
||||
45
src/backend/core/tests/test_models_users.py
Normal file
45
src/backend/core/tests/test_models_users.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""
|
||||
Unit tests for the User model
|
||||
"""
|
||||
from unittest import mock
|
||||
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
import pytest
|
||||
|
||||
from core import factories
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
def test_models_users_str():
|
||||
"""The str representation should be the email."""
|
||||
user = factories.UserFactory()
|
||||
assert str(user) == user.email
|
||||
|
||||
|
||||
def test_models_users_id_unique():
|
||||
"""The "id" field should be unique."""
|
||||
user = factories.UserFactory()
|
||||
with pytest.raises(ValidationError, match="User with this Id already exists."):
|
||||
factories.UserFactory(id=user.id)
|
||||
|
||||
|
||||
def test_models_users_send_mail_main_existing():
|
||||
"""The "email_user' method should send mail to the user's email address."""
|
||||
user = factories.UserFactory()
|
||||
|
||||
with mock.patch("django.core.mail.send_mail") as mock_send:
|
||||
user.email_user("my subject", "my message")
|
||||
|
||||
mock_send.assert_called_once_with("my subject", "my message", None, [user.email])
|
||||
|
||||
|
||||
def test_models_users_send_mail_main_missing():
|
||||
"""The "email_user' method should fail if the user has no email address."""
|
||||
user = factories.UserFactory(email=None)
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
user.email_user("my subject", "my message")
|
||||
|
||||
assert str(excinfo.value) == "User has no email address."
|
||||
24
src/backend/core/urls.py
Normal file
24
src/backend/core/urls.py
Normal file
@@ -0,0 +1,24 @@
|
||||
"""URL configuration for the core app."""
|
||||
from django.conf import settings
|
||||
from django.urls import include, path
|
||||
|
||||
from rest_framework.routers import DefaultRouter
|
||||
|
||||
from core.api import viewsets
|
||||
from core.authentication.urls import urlpatterns as oidc_urls
|
||||
|
||||
# - Main endpoints
|
||||
router = DefaultRouter()
|
||||
router.register("users", viewsets.UserViewSet, basename="users")
|
||||
|
||||
urlpatterns = [
|
||||
path(
|
||||
f"api/{settings.API_VERSION}/",
|
||||
include(
|
||||
[
|
||||
*router.urls,
|
||||
*oidc_urls,
|
||||
]
|
||||
),
|
||||
),
|
||||
]
|
||||
Reference in New Issue
Block a user