🎉(all) bootstrap the Calendars project

This repository was forked from Drive in late December 2025 and
boostraped as a minimal demo of backend+caldav server+frontend
integration. There is much left to do and to fix!
This commit is contained in:
Sylvain Zimmer
2026-01-09 00:51:25 +01:00
commit a36348ead1
298 changed files with 41036 additions and 0 deletions

View File

93
src/backend/core/admin.py Normal file
View File

@@ -0,0 +1,93 @@
"""Admin classes and registrations for core app."""
from django.contrib import admin
from django.contrib.auth import admin as auth_admin
from django.utils.translation import gettext_lazy as _
from . import models
@admin.register(models.User)
class UserAdmin(auth_admin.UserAdmin):
"""Admin class for the User model"""
fieldsets = (
(
None,
{
"fields": (
"id",
"admin_email",
"password",
)
},
),
(
_("Personal info"),
{
"fields": (
"sub",
"email",
"full_name",
"short_name",
"language",
"timezone",
)
},
),
(
_("Permissions"),
{
"fields": (
"is_active",
"is_device",
"is_staff",
"is_superuser",
"groups",
"user_permissions",
),
},
),
(_("Important dates"), {"fields": ("created_at", "updated_at")}),
)
add_fieldsets = (
(
None,
{
"classes": ("wide",),
"fields": ("email", "password1", "password2"),
},
),
)
list_display = (
"id",
"sub",
"full_name",
"admin_email",
"email",
"is_active",
"is_staff",
"is_superuser",
"is_device",
"created_at",
"updated_at",
)
list_filter = ("is_staff", "is_superuser", "is_device", "is_active")
ordering = (
"is_active",
"-is_superuser",
"-is_staff",
"-is_device",
"-updated_at",
"full_name",
)
readonly_fields = (
"id",
"sub",
"email",
"full_name",
"short_name",
"created_at",
"updated_at",
)
search_fields = ("id", "sub", "admin_email", "email", "full_name")

View File

@@ -0,0 +1,34 @@
"""Calendars core API endpoints"""
from django.conf import settings
from django.core.exceptions import ValidationError as DjangoValidationError
from drf_standardized_errors.handler import exception_handler as drf_exception_handler
from rest_framework import exceptions as drf_exceptions
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework.serializers import as_serializer_error
def exception_handler(exc, context):
"""Handle Django ValidationError as an accepted exception.
For the parameters, see ``exception_handler``
This code comes from twidi's gist:
https://gist.github.com/twidi/9d55486c36b6a51bdcb05ce3a763e79f
"""
if isinstance(exc, DjangoValidationError):
exc = drf_exceptions.ValidationError(as_serializer_error(exc))
return drf_exception_handler(exc, context)
# pylint: disable=unused-argument
@api_view(["GET"])
def get_frontend_configuration(request):
"""Returns the frontend configuration dict as configured in settings."""
frontend_configuration = {
"LANGUAGE_CODE": settings.LANGUAGE_CODE,
}
frontend_configuration.update(settings.FRONTEND_CONFIGURATION)
return Response(frontend_configuration)

View File

@@ -0,0 +1,25 @@
"""A JSONField for DRF to handle serialization/deserialization."""
import json
from rest_framework import serializers
class JSONField(serializers.Field):
"""
A custom field for handling JSON data.
"""
def to_representation(self, value):
"""
Convert the JSON string to a Python dictionary for serialization.
"""
return value
def to_internal_value(self, data):
"""
Convert the Python dictionary to a JSON string for deserialization.
"""
if data is None:
return None
return json.dumps(data)

View File

@@ -0,0 +1,79 @@
"""Permission handlers for the calendars core app."""
from django.core import exceptions
from rest_framework import permissions
ACTION_FOR_METHOD_TO_PERMISSION = {
"versions_detail": {"DELETE": "versions_destroy", "GET": "versions_retrieve"},
"children": {"GET": "children_list", "POST": "children_create"},
}
class IsAuthenticated(permissions.BasePermission):
"""
Allows access only to authenticated users. Alternative method checking the presence
of the auth token to avoid hitting the database.
"""
def has_permission(self, request, view):
return bool(request.auth) or request.user.is_authenticated
class IsAuthenticatedOrSafe(IsAuthenticated):
"""Allows access to authenticated users (or anonymous users but only on safe methods)."""
def has_permission(self, request, view):
if request.method in permissions.SAFE_METHODS:
return True
return super().has_permission(request, view)
class IsSelf(IsAuthenticated):
"""
Allows access only to authenticated users. Alternative method checking the presence
of the auth token to avoid hitting the database.
"""
def has_object_permission(self, request, view, obj):
"""Write permissions are only allowed to the user itself."""
return obj == request.user
class IsOwnedOrPublic(IsAuthenticated):
"""
Allows access to authenticated users only for objects that are owned or not related
to any user via the "owner" field.
"""
def has_object_permission(self, request, view, obj):
"""Unsafe permissions are only allowed for the owner of the object."""
if obj.owner == request.user:
return True
if request.method in permissions.SAFE_METHODS and obj.owner is None:
return True
try:
return obj.user == request.user
except exceptions.ObjectDoesNotExist:
return False
class AccessPermission(permissions.BasePermission):
"""Permission class for access objects."""
def has_permission(self, request, view):
return request.user.is_authenticated or view.action not in [
"create",
]
def has_object_permission(self, request, view, obj):
"""Check permission for a given object."""
abilities = obj.get_abilities(request.user)
action = view.action
try:
action = ACTION_FOR_METHOD_TO_PERMISSION[view.action][request.method]
except KeyError:
pass
return abilities.get(action, False)

View File

@@ -0,0 +1,155 @@
"""Client serializers for the calendars core app."""
import json
from datetime import timedelta
from django.conf import settings
from django.db.models import Q
from django.utils.translation import gettext_lazy as _
from rest_framework import exceptions, serializers
from core import models
class UserLiteSerializer(serializers.ModelSerializer):
"""Serialize users with limited fields."""
class Meta:
model = models.User
fields = ["id", "full_name", "short_name"]
read_only_fields = ["id", "full_name", "short_name"]
class BaseAccessSerializer(serializers.ModelSerializer):
"""Serialize template accesses."""
abilities = serializers.SerializerMethodField(read_only=True)
def update(self, instance, validated_data):
"""Make "user" field is readonly but only on update."""
validated_data.pop("user", None)
return super().update(instance, validated_data)
def get_abilities(self, access) -> dict:
"""Return abilities of the logged-in user on the instance."""
request = self.context.get("request")
if request:
return access.get_abilities(request.user)
return {}
def validate(self, attrs):
"""
Check access rights specific to writing (create/update)
"""
request = self.context.get("request")
user = getattr(request, "user", None)
role = attrs.get("role")
# Update
if self.instance:
can_set_role_to = self.instance.get_abilities(user)["set_role_to"]
if role and role not in can_set_role_to:
message = (
f"You are only allowed to set role to {', '.join(can_set_role_to)}"
if can_set_role_to
else "You are not allowed to set this role for this template."
)
raise exceptions.PermissionDenied(message)
# Create
else:
try:
resource_id = self.context["resource_id"]
except KeyError as exc:
raise exceptions.ValidationError(
"You must set a resource ID in kwargs to create a new access."
) from exc
if not self.Meta.model.objects.filter( # pylint: disable=no-member
Q(user=user) | Q(team__in=user.teams),
role__in=[models.RoleChoices.OWNER, models.RoleChoices.ADMIN],
**{self.Meta.resource_field_name: resource_id}, # pylint: disable=no-member
).exists():
raise exceptions.PermissionDenied(
"You are not allowed to manage accesses for this resource."
)
if (
role == models.RoleChoices.OWNER
and not self.Meta.model.objects.filter( # pylint: disable=no-member
Q(user=user) | Q(team__in=user.teams),
role=models.RoleChoices.OWNER,
**{self.Meta.resource_field_name: resource_id}, # pylint: disable=no-member
).exists()
):
raise exceptions.PermissionDenied(
"Only owners of a resource can assign other users as owners."
)
# pylint: disable=no-member
attrs[f"{self.Meta.resource_field_name}_id"] = self.context["resource_id"]
return attrs
class UserSerializer(serializers.ModelSerializer):
"""Serialize users."""
class Meta:
model = models.User
fields = [
"id",
"email",
"full_name",
"short_name",
"language",
]
read_only_fields = ["id", "email", "full_name", "short_name"]
class UserMeSerializer(UserSerializer):
"""Serialize users for me endpoint."""
class Meta:
model = models.User
fields = UserSerializer.Meta.fields
read_only_fields = UserSerializer.Meta.read_only_fields
# CalDAV serializers
class CalendarSerializer(serializers.ModelSerializer):
"""Serializer for Calendar model."""
class Meta:
model = models.Calendar
fields = [
"id",
"name",
"color",
"description",
"is_default",
"is_visible",
"created_at",
"updated_at",
]
read_only_fields = ["id", "is_default", "created_at", "updated_at"]
class CalendarCreateSerializer(serializers.ModelSerializer):
"""Serializer for creating a Calendar."""
class Meta:
model = models.Calendar
fields = ["name", "color", "description"]
class CalendarShareSerializer(serializers.ModelSerializer):
"""Serializer for CalendarShare model."""
shared_with_email = serializers.EmailField(write_only=True)
class Meta:
model = models.CalendarShare
fields = ["id", "shared_with_email", "permission", "is_visible", "created_at"]
read_only_fields = ["id", "created_at"]

View File

@@ -0,0 +1,397 @@
"""API endpoints"""
# pylint: disable=too-many-lines
import json
import logging
import re
from urllib.parse import unquote, urlparse
from django.conf import settings
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.db import models as db
from django.db import transaction
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.utils.text import slugify
import rest_framework as drf
from corsheaders.middleware import (
ACCESS_CONTROL_ALLOW_METHODS,
ACCESS_CONTROL_ALLOW_ORIGIN,
)
from lasuite.oidc_login.decorators import refresh_oidc_access_token
from rest_framework import filters, mixins, status, viewsets
from rest_framework import response as drf_response
from rest_framework.decorators import action
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.throttling import UserRateThrottle
from rest_framework_api_key.permissions import HasAPIKey
from core import enums, models
from core.services.caldav_service import CalendarService
from . import permissions, serializers
logger = logging.getLogger(__name__)
# pylint: disable=too-many-ancestors
class NestedGenericViewSet(viewsets.GenericViewSet):
"""
A generic Viewset aims to be used in a nested route context.
e.g: `/api/v1.0/resource_1/<resource_1_pk>/resource_2/<resource_2_pk>/`
It allows to define all url kwargs and lookup fields to perform the lookup.
"""
lookup_fields: list[str] = ["pk"]
lookup_url_kwargs: list[str] = []
def __getattribute__(self, item):
"""
This method is overridden to allow to get the last lookup field or lookup url kwarg
when accessing the `lookup_field` or `lookup_url_kwarg` attribute. This is useful
to keep compatibility with all methods used by the parent class `GenericViewSet`.
"""
if item in ["lookup_field", "lookup_url_kwarg"]:
return getattr(self, item + "s", [None])[-1]
return super().__getattribute__(item)
def get_queryset(self):
"""
Get the list of items for this view.
`lookup_fields` attribute is enumerated here to perform the nested lookup.
"""
queryset = super().get_queryset()
# The last lookup field is removed to perform the nested lookup as it corresponds
# to the object pk, it is used within get_object method.
lookup_url_kwargs = (
self.lookup_url_kwargs[:-1]
if self.lookup_url_kwargs
else self.lookup_fields[:-1]
)
filter_kwargs = {}
for index, lookup_url_kwarg in enumerate(lookup_url_kwargs):
if lookup_url_kwarg not in self.kwargs:
raise KeyError(
f"Expected view {self.__class__.__name__} to be called with a URL "
f'keyword argument named "{lookup_url_kwarg}". Fix your URL conf, or '
"set the `.lookup_fields` attribute on the view correctly."
)
filter_kwargs.update(
{self.lookup_fields[index]: self.kwargs[lookup_url_kwarg]}
)
return queryset.filter(**filter_kwargs)
class SerializerPerActionMixin:
"""
A mixin to allow to define serializer classes for each action.
This mixin is useful to avoid to define a serializer class for each action in the
`get_serializer_class` method.
Example:
```
class MyViewSet(SerializerPerActionMixin, viewsets.GenericViewSet):
serializer_class = MySerializer
list_serializer_class = MyListSerializer
retrieve_serializer_class = MyRetrieveSerializer
```
"""
def get_serializer_class(self):
"""
Return the serializer class to use depending on the action.
"""
if serializer_class := getattr(self, f"{self.action}_serializer_class", None):
return serializer_class
return super().get_serializer_class()
class Pagination(drf.pagination.PageNumberPagination):
"""Pagination to display no more than 100 objects per page sorted by creation date."""
ordering = "-created_on"
max_page_size = settings.MAX_PAGE_SIZE
page_size_query_param = "page_size"
class UserListThrottleBurst(UserRateThrottle):
"""Throttle for the user list endpoint."""
scope = "user_list_burst"
class UserListThrottleSustained(UserRateThrottle):
"""Throttle for the user list endpoint."""
scope = "user_list_sustained"
class UserViewSet(
SerializerPerActionMixin,
drf.mixins.UpdateModelMixin,
viewsets.GenericViewSet,
drf.mixins.ListModelMixin,
):
"""User ViewSet"""
permission_classes = [permissions.IsSelf]
queryset = models.User.objects.all().filter(is_active=True)
serializer_class = serializers.UserSerializer
get_me_serializer_class = serializers.UserMeSerializer
pagination_class = None
throttle_classes = []
def get_throttles(self):
self.throttle_classes = []
if self.action == "list":
self.throttle_classes = [UserListThrottleBurst, UserListThrottleSustained]
return super().get_throttles()
def get_queryset(self):
"""
Limit listed users by querying the email field.
If query contains "@", search exactly. Otherwise return empty.
"""
queryset = self.queryset
if self.action != "list":
return queryset
if not (query := self.request.query_params.get("q", "")) or len(query) < 5:
return queryset.none()
# For emails, match exactly
if "@" in query:
return queryset.filter(email__iexact=query).order_by("email")[
: settings.API_USERS_LIST_LIMIT
]
# For non-email queries, return empty (no fuzzy search)
return queryset.none()
@drf.decorators.action(
detail=False,
methods=["get"],
url_name="me",
url_path="me",
)
def get_me(self, request):
"""
Return information on currently logged user
"""
context = {"request": request}
return drf.response.Response(
self.get_serializer(request.user, context=context).data
)
class ConfigView(drf.views.APIView):
"""API ViewSet for sharing some public settings."""
permission_classes = [AllowAny]
def get(self, request):
"""
GET /api/v1.0/config/
Return a dictionary of public settings.
"""
array_settings = [
"ENVIRONMENT",
"FRONTEND_THEME",
"FRONTEND_MORE_LINK",
"FRONTEND_FEEDBACK_BUTTON_SHOW",
"FRONTEND_FEEDBACK_BUTTON_IDLE",
"FRONTEND_FEEDBACK_ITEMS",
"FRONTEND_FEEDBACK_MESSAGES_WIDGET_ENABLED",
"FRONTEND_FEEDBACK_MESSAGES_WIDGET_API_URL",
"FRONTEND_FEEDBACK_MESSAGES_WIDGET_CHANNEL",
"FRONTEND_FEEDBACK_MESSAGES_WIDGET_PATH",
"FRONTEND_HIDE_GAUFRE",
"MEDIA_BASE_URL",
"LANGUAGES",
"LANGUAGE_CODE",
"SENTRY_DSN",
]
dict_settings = {}
for setting in array_settings:
if hasattr(settings, setting):
dict_settings[setting] = getattr(settings, setting)
dict_settings["theme_customization"] = self._load_theme_customization()
return drf.response.Response(dict_settings)
def _load_theme_customization(self):
if not settings.THEME_CUSTOMIZATION_FILE_PATH:
return {}
cache_key = (
f"theme_customization_{slugify(settings.THEME_CUSTOMIZATION_FILE_PATH)}"
)
theme_customization = cache.get(cache_key, {})
if theme_customization:
return theme_customization
try:
with open(
settings.THEME_CUSTOMIZATION_FILE_PATH, "r", encoding="utf-8"
) as f:
theme_customization = json.load(f)
except FileNotFoundError:
logger.error(
"Configuration file not found: %s",
settings.THEME_CUSTOMIZATION_FILE_PATH,
)
except json.JSONDecodeError:
logger.error(
"Configuration file is not a valid JSON: %s",
settings.THEME_CUSTOMIZATION_FILE_PATH,
)
else:
cache.set(
cache_key,
theme_customization,
settings.THEME_CUSTOMIZATION_CACHE_TIMEOUT,
)
return theme_customization
# CalDAV ViewSets
class CalendarViewSet(
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.CreateModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
viewsets.GenericViewSet,
):
"""
ViewSet for managing user calendars.
list: Get all calendars accessible by the user (owned + shared)
retrieve: Get a specific calendar
create: Create a new calendar
update: Update calendar properties
destroy: Delete a calendar
"""
permission_classes = [IsAuthenticated]
serializer_class = serializers.CalendarSerializer
def get_queryset(self):
"""Return calendars owned by or shared with the current user."""
user = self.request.user
owned = models.Calendar.objects.filter(owner=user)
shared_ids = models.CalendarShare.objects.filter(shared_with=user).values_list(
"calendar_id", flat=True
)
shared = models.Calendar.objects.filter(id__in=shared_ids)
return owned.union(shared).order_by("-is_default", "name")
def get_serializer_class(self):
if self.action == "create":
return serializers.CalendarCreateSerializer
return serializers.CalendarSerializer
def perform_create(self, serializer):
"""Create a new calendar via CalendarService."""
service = CalendarService()
calendar = service.create_calendar(
user=self.request.user,
name=serializer.validated_data["name"],
color=serializer.validated_data.get("color", "#3174ad"),
)
# Update the serializer instance with the created calendar
serializer.instance = calendar
def perform_destroy(self, instance):
"""Delete calendar. Prevent deletion of default calendar."""
if instance.is_default:
raise ValueError("Cannot delete the default calendar.")
if instance.owner != self.request.user:
raise PermissionError("You can only delete your own calendars.")
instance.delete()
@action(detail=True, methods=["patch"])
def toggle_visibility(self, request, pk=None):
"""Toggle calendar visibility."""
calendar = self.get_object()
# Check if it's a shared calendar
share = models.CalendarShare.objects.filter(
calendar=calendar, shared_with=request.user
).first()
if share:
share.is_visible = not share.is_visible
share.save()
is_visible = share.is_visible
elif calendar.owner == request.user:
calendar.is_visible = not calendar.is_visible
calendar.save()
is_visible = calendar.is_visible
else:
return drf_response.Response(
{"error": "Permission denied"}, status=status.HTTP_403_FORBIDDEN
)
return drf_response.Response({"is_visible": is_visible})
@action(
detail=True,
methods=["post"],
serializer_class=serializers.CalendarShareSerializer,
)
def share(self, request, pk=None):
"""Share calendar with another user."""
calendar = self.get_object()
if calendar.owner != request.user:
return drf_response.Response(
{"error": "Only the owner can share this calendar"},
status=status.HTTP_403_FORBIDDEN,
)
serializer = serializers.CalendarShareSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
email = serializer.validated_data["shared_with_email"]
try:
user_to_share = models.User.objects.get(email=email)
except models.User.DoesNotExist:
return drf_response.Response(
{"error": "User not found"}, status=status.HTTP_404_NOT_FOUND
)
share, created = models.CalendarShare.objects.get_or_create(
calendar=calendar,
shared_with=user_to_share,
defaults={
"permission": serializer.validated_data.get("permission", "read")
},
)
if not created:
share.permission = serializer.validated_data.get(
"permission", share.permission
)
share.save()
return drf_response.Response(
serializers.CalendarShareSerializer(share).data,
status=status.HTTP_201_CREATED if created else status.HTTP_200_OK,
)

View File

@@ -0,0 +1,220 @@
"""CalDAV proxy views for forwarding requests to DAViCal."""
import logging
from django.conf import settings
from django.http import HttpResponse
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.csrf import csrf_exempt
import requests
from core.services.caldav_service import DAViCalClient
logger = logging.getLogger(__name__)
@method_decorator(csrf_exempt, name="dispatch")
class CalDAVProxyView(View):
"""
Proxy view that forwards all CalDAV requests to DAViCal.
Handles authentication and adds appropriate headers.
CSRF protection is disabled because CalDAV uses non-standard HTTP methods
(PROPFIND, REPORT, etc.) that don't work with Django's CSRF middleware.
Authentication is handled via session cookies instead.
"""
def dispatch(self, request, *args, **kwargs):
"""Forward all HTTP methods to DAViCal."""
# Handle CORS preflight requests
if request.method == "OPTIONS":
response = HttpResponse(status=200)
response["Access-Control-Allow-Methods"] = (
"GET, OPTIONS, PROPFIND, REPORT, MKCOL, MKCALENDAR, PUT, DELETE"
)
response["Access-Control-Allow-Headers"] = (
"Content-Type, depth, authorization, if-match, if-none-match, prefer"
)
return response
if not request.user.is_authenticated:
return HttpResponse(status=401)
# Ensure user exists in DAViCal before making requests
try:
davical_client = DAViCalClient()
davical_client.ensure_user_exists(request.user)
except Exception as e:
logger.warning("Failed to ensure user exists in DAViCal: %s", str(e))
# Continue anyway - user might already exist
# Build the DAViCal URL
davical_url = getattr(settings, "DAVICAL_URL", "http://davical:80")
path = kwargs.get("path", "")
# Use user email as the principal (DAViCal uses email as username)
user_principal = request.user.email
# Handle root CalDAV requests - return principal collection
if not path or path == user_principal:
# For PROPFIND on root, return the user's principal collection
if request.method == "PROPFIND":
# Get the request path to match the href in response
request_path = request.path
if not request_path.endswith("/"):
request_path += "/"
# Return multistatus with href matching request URL and calendar-home-set
multistatus = f"""<?xml version="1.0" encoding="utf-8"?>
<D:multistatus xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:response>
<D:href>{request_path}</D:href>
<D:propstat>
<D:prop>
<D:displayname>{user_principal}</D:displayname>
<C:calendar-home-set>
<D:href>/api/v1.0/caldav/{user_principal}/</D:href>
</C:calendar-home-set>
</D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
</D:response>
</D:multistatus>"""
response = HttpResponse(
content=multistatus,
status=207,
content_type="application/xml; charset=utf-8",
)
return response
# For other methods, redirect to principal URL
target_url = f"{davical_url}/caldav.php/{user_principal}/"
else:
# Build target URL with path
# Remove leading slash if present
clean_path = path.lstrip("/")
if clean_path.startswith(user_principal):
# Path already includes principal
target_url = f"{davical_url}/caldav.php/{clean_path}"
else:
# Path is relative to principal
target_url = f"{davical_url}/caldav.php/{user_principal}/{clean_path}"
# Prepare headers for DAViCal
# Set headers to tell DAViCal it's behind a proxy so it generates correct URLs
script_name = "/api/v1.0/caldav"
headers = {
"Content-Type": request.content_type or "application/xml",
"X-Forwarded-User": user_principal,
"X-Forwarded-For": request.META.get("REMOTE_ADDR", ""),
"X-Forwarded-Prefix": script_name,
"X-Forwarded-Host": request.get_host(),
"X-Forwarded-Proto": request.scheme,
"X-Script-Name": script_name, # Tell DAViCal the base path
}
# DAViCal authentication: users with password '*' use external auth
# We send the username via X-Forwarded-User header
# For HTTP Basic Auth, we use the email as username with empty password
# This works with DAViCal's external authentication when trust_x_forwarded is true
auth = (user_principal, "")
# Copy relevant headers from the original request
if "HTTP_DEPTH" in request.META:
headers["Depth"] = request.META["HTTP_DEPTH"]
if "HTTP_IF_MATCH" in request.META:
headers["If-Match"] = request.META["HTTP_IF_MATCH"]
if "HTTP_IF_NONE_MATCH" in request.META:
headers["If-None-Match"] = request.META["HTTP_IF_NONE_MATCH"]
if "HTTP_PREFER" in request.META:
headers["Prefer"] = request.META["HTTP_PREFER"]
# Get request body
body = request.body if request.body else None
try:
# Forward the request to DAViCal
# Use HTTP Basic Auth with username (email) and empty password
# DAViCal will authenticate based on X-Forwarded-User header when trust_x_forwarded is true
logger.debug(
"Forwarding %s request to DAViCal: %s (user: %s)",
request.method,
target_url,
user_principal,
)
response = requests.request(
method=request.method,
url=target_url,
headers=headers,
data=body,
auth=auth,
timeout=30,
allow_redirects=False,
)
# Log authentication failures for debugging
if response.status_code == 401:
logger.warning(
"DAViCal returned 401 for user %s at %s. Headers sent: %s",
user_principal,
target_url,
headers,
)
# Build Django response
django_response = HttpResponse(
content=response.content,
status=response.status_code,
content_type=response.headers.get("Content-Type", "application/xml"),
)
# Copy relevant headers from DAViCal response
for header in ["ETag", "DAV", "Allow", "Location"]:
if header in response.headers:
django_response[header] = response.headers[header]
return django_response
except requests.exceptions.RequestException as e:
logger.error("DAViCal proxy error: %s", str(e))
return HttpResponse(
content=f"CalDAV server error: {str(e)}",
status=502,
content_type="text/plain",
)
@method_decorator(csrf_exempt, name="dispatch")
class CalDAVDiscoveryView(View):
"""
Handle CalDAV discovery requests (well-known URLs).
Per RFC 6764, this endpoint should redirect to the CalDAV server base URL,
not to a user-specific principal. Clients will then perform PROPFIND on
the base URL to discover their principal.
CSRF protection is disabled because CalDAV uses non-standard HTTP methods
and this endpoint should be accessible without authentication.
"""
def dispatch(self, request, *args, **kwargs):
"""Handle discovery requests."""
# Handle CORS preflight requests
if request.method == "OPTIONS":
response = HttpResponse(status=200)
response["Access-Control-Allow-Methods"] = "GET, OPTIONS, PROPFIND"
response["Access-Control-Allow-Headers"] = (
"Content-Type, depth, authorization"
)
return response
# Note: Authentication is not required for discovery per RFC 6764
# Clients need to discover the CalDAV URL before authenticating
# Return redirect to CalDAV server base URL
caldav_base_url = f"/api/v1.0/caldav/"
response = HttpResponse(status=301)
response["Location"] = caldav_base_url
return response

19
src/backend/core/apps.py Normal file
View File

@@ -0,0 +1,19 @@
"""Calendars Core application"""
from django.apps import AppConfig
from django.utils.translation import gettext_lazy as _
class CoreConfig(AppConfig):
"""Configuration class for the calendars core app."""
name = "core"
app_label = "core"
verbose_name = _("calendars core application")
def ready(self):
"""
Import signals when the app is ready.
"""
# pylint: disable=import-outside-toplevel, unused-import
from . import signals # noqa: PLC0415

View File

@@ -0,0 +1,52 @@
"""Custom authentication classes for the calendars core app"""
from django.conf import settings
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
class ServerToServerAuthentication(BaseAuthentication):
"""
Custom authentication class for server-to-server requests.
Validates the presence and correctness of the Authorization header.
"""
AUTH_HEADER = "Authorization"
TOKEN_TYPE = "Bearer" # noqa S105
def authenticate(self, request):
"""
Authenticate the server-to-server request by validating the Authorization header.
This method checks if the Authorization header is present in the request, ensures it
contains a valid token with the correct format, and verifies the token against the
list of allowed server-to-server tokens. If the header is missing, improperly formatted,
or contains an invalid token, an AuthenticationFailed exception is raised.
Returns:
None: If authentication is successful
(no user is authenticated for server-to-server requests).
Raises:
AuthenticationFailed: If the Authorization header is missing, malformed,
or contains an invalid token.
"""
auth_header = request.headers.get(self.AUTH_HEADER)
if not auth_header:
raise AuthenticationFailed("Authorization header is missing.")
# Validate token format and existence
auth_parts = auth_header.split(" ")
if len(auth_parts) != 2 or auth_parts[0] != self.TOKEN_TYPE:
raise AuthenticationFailed("Invalid authorization header.")
token = auth_parts[1]
if token not in settings.SERVER_TO_SERVER_API_TOKENS:
raise AuthenticationFailed("Invalid server-to-server token.")
# Authentication is successful, but no user is authenticated
def authenticate_header(self, request):
"""Return the WWW-Authenticate header value."""
return f"{self.TOKEN_TYPE} realm='Create item server to server'"

View File

@@ -0,0 +1,54 @@
"""Authentication Backends for the Calendars core app."""
import logging
from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from django.utils.translation import gettext_lazy as _
from lasuite.oidc_login.backends import (
OIDCAuthenticationBackend as LaSuiteOIDCAuthenticationBackend,
)
from core.authentication.exceptions import UserCannotAccessApp
from core.models import DuplicateEmailError
logger = logging.getLogger(__name__)
class OIDCAuthenticationBackend(LaSuiteOIDCAuthenticationBackend):
"""Custom OpenID Connect (OIDC) Authentication Backend.
This class overrides the default OIDC Authentication Backend to accommodate differences
in the User and Identity models, and handles signed and/or encrypted UserInfo response.
"""
def get_extra_claims(self, user_info):
"""
Return extra claims from user_info.
Args:
user_info (dict): The user information dictionary.
Returns:
dict: A dictionary of extra claims.
"""
# We need to add the claims that we want to store so that they are
# available in the post_get_or_create_user method.
claims_to_store = {
claim: user_info.get(claim) for claim in settings.OIDC_STORE_CLAIMS
}
return {
"full_name": self.compute_full_name(user_info),
"short_name": user_info.get(settings.OIDC_USERINFO_SHORTNAME_FIELD),
"claims": claims_to_store,
}
def get_existing_user(self, sub, email):
"""Fetch existing user by sub or email."""
try:
return self.UserModel.objects.get_user_by_sub_or_email(sub, email)
except DuplicateEmailError as err:
raise SuspiciousOperation(err.message) from err

View File

@@ -0,0 +1,5 @@
"""Exceptions for the authentication module."""
class UserCannotAccessApp(Exception):
"""Exception raised when a user cannot access the app."""

View File

@@ -0,0 +1,25 @@
"""Calendars core authentication views."""
from django.http import HttpResponseRedirect
from lasuite.oidc_login.views import (
OIDCAuthenticationCallbackView as LaSuiteOIDCAuthenticationCallbackView,
)
from core.authentication.exceptions import UserCannotAccessApp
class OIDCAuthenticationCallbackView(LaSuiteOIDCAuthenticationCallbackView):
"""
Custom view for handling the authentication callback from the OpenID Connect (OIDC) provider.
Handles the callback after authentication from the identity provider (OP).
Verifies the state parameter and performs necessary authentication actions.
"""
def get(self, request):
try:
return super().get(request)
except UserCannotAccessApp:
return HttpResponseRedirect(
self.failure_url + "?auth_error=user_cannot_access_app"
)

12
src/backend/core/enums.py Normal file
View File

@@ -0,0 +1,12 @@
"""
Core application enums declaration
"""
from django.conf import global_settings
from django.utils.translation import gettext_lazy as _
# In Django's code base, `LANGUAGES` is set by default with all supported languages.
# We can use it for the choice of languages which should not be limited to the few languages
# active in the app.
# pylint: disable=no-member
ALL_LANGUAGES = {language: _(name) for language, name in global_settings.LANGUAGES}

View File

@@ -0,0 +1,41 @@
"""Resource Server Permissions for the Calendars app."""
from django.conf import settings
from lasuite.oidc_resource_server.authentication import ResourceServerAuthentication
from rest_framework import permissions
class ResourceServerClientPermission(permissions.BasePermission):
"""
Permission class for resource server views.
This provides a way to open the resource server views to a limited set of
Service Providers.
Note: we might add a more complex permission system in the future, based on
the Service Provider ID and the requested scopes.
"""
def has_permission(self, request, view):
"""
Check if the user is authenticated and the token introspection
provides an authorized Service Provider.
"""
if not isinstance(
request.successful_authenticator, ResourceServerAuthentication
):
# Not a resource server request
return False
# Check if the user is authenticated
if not request.user.is_authenticated:
return False
if (
hasattr(view, "resource_server_actions")
and view.action not in view.resource_server_actions
):
return False
# When used as a resource server, the request has a token audience
return (
request.resource_server_token_audience in settings.OIDC_RS_ALLOWED_AUDIENCES
)

View File

@@ -0,0 +1,36 @@
"""Resource Server Viewsets for the Calendars app."""
from django.conf import settings
from lasuite.oidc_resource_server.authentication import ResourceServerAuthentication
from core.api.permissions import AccessPermission, IsSelf
from core.api.viewsets import UserViewSet
from core.external_api.permissions import ResourceServerClientPermission
# pylint: disable=too-many-ancestors
class ResourceServerRestrictionMixin:
"""
Mixin for Resource Server Viewsets to provide shortcut to get
configured actions for a given resource.
"""
def _get_resource_server_actions(self, resource_name):
"""Get resource_server_actions from settings."""
external_api_config = settings.EXTERNAL_API.get(resource_name, {})
return list(external_api_config.get("actions", []))
class ResourceServerUserViewSet(ResourceServerRestrictionMixin, UserViewSet):
"""Resource Server Viewset for the Calendars app."""
authentication_classes = [ResourceServerAuthentication]
permission_classes = [ResourceServerClientPermission & IsSelf]
@property
def resource_server_actions(self):
"""Get resource_server_actions from settings."""
return self._get_resource_server_actions("users")

View File

@@ -0,0 +1,28 @@
"""
Core application factories
"""
from django.conf import settings
from django.contrib.auth.hashers import make_password
import factory.fuzzy
from faker import Faker
from core import models
fake = Faker()
class UserFactory(factory.django.DjangoModelFactory):
"""A factory to random users for testing purposes."""
class Meta:
model = models.User
skip_postgeneration_save = True
sub = factory.Sequence(lambda n: f"user{n!s}")
email = factory.Faker("email")
full_name = factory.Faker("name")
short_name = factory.Faker("first_name")
language = factory.fuzzy.FuzzyChoice([lang[0] for lang in settings.LANGUAGES])
password = make_password("password")

View File

@@ -0,0 +1,47 @@
"""Management user to create a superuser."""
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand
UserModel = get_user_model()
class Command(BaseCommand):
"""Management command to create a superuser from an email and password."""
help = "Create a superuser with an email and a password"
def add_arguments(self, parser):
"""Define required arguments "email" and "password"."""
parser.add_argument(
"--email",
help=("Email for the user."),
)
parser.add_argument(
"--password",
help="Password for the user.",
)
def handle(self, *args, **options):
"""
Given an email and a password, create a superuser or upgrade the existing
user to superuser status.
"""
email = options.get("email")
try:
user = UserModel.objects.get(admin_email=email)
except UserModel.DoesNotExist:
user = UserModel(admin_email=email)
message = "Superuser created successfully."
else:
if user.is_superuser and user.is_staff:
message = "Superuser already exists."
else:
message = "User already existed and was upgraded to superuser."
user.is_superuser = True
user.is_staff = True
user.set_password(options["password"])
user.save()
self.stdout.write(self.style.SUCCESS(message))

View File

@@ -0,0 +1,90 @@
# Generated by Django 5.2.9 on 2026-01-08 23:49
import core.models
import django.core.validators
import django.db.models.deletion
import timezone_field.fields
import uuid
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
]
operations = [
migrations.CreateModel(
name='User',
fields=[
('password', models.CharField(max_length=128, verbose_name='password')),
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
('sub', models.CharField(blank=True, help_text='Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_/: characters only.', max_length=255, null=True, unique=True, validators=[django.core.validators.RegexValidator(message='Enter a valid sub. This value may contain only letters, numbers, and @/./+/-/_/: characters.', regex='^[\\w.@+-:]+\\Z')], verbose_name='sub')),
('full_name', models.CharField(blank=True, max_length=100, null=True, verbose_name='full name')),
('short_name', models.CharField(blank=True, max_length=20, null=True, verbose_name='short name')),
('email', models.EmailField(blank=True, max_length=254, null=True, verbose_name='identity email address')),
('admin_email', models.EmailField(blank=True, max_length=254, null=True, unique=True, verbose_name='admin email address')),
('language', models.CharField(blank=True, choices=[('en-us', 'English'), ('fr-fr', 'French'), ('de-de', 'German'), ('nl-nl', 'Dutch')], default=None, help_text='The language in which the user wants to see the interface.', max_length=10, null=True, verbose_name='language')),
('timezone', timezone_field.fields.TimeZoneField(choices_display='WITH_GMT_OFFSET', default='UTC', help_text='The timezone in which the user wants to see times.', use_pytz=False)),
('is_device', models.BooleanField(default=False, help_text='Whether the user is a device or a real user.', verbose_name='device')),
('is_staff', models.BooleanField(default=False, help_text='Whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
('claims', models.JSONField(blank=True, default=dict, help_text='Claims from the OIDC token.')),
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
],
options={
'verbose_name': 'user',
'verbose_name_plural': 'users',
'db_table': 'calendars_user',
},
managers=[
('objects', core.models.UserManager()),
],
),
migrations.CreateModel(
name='Calendar',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('name', models.CharField(max_length=255)),
('color', models.CharField(default='#3174ad', max_length=7)),
('description', models.TextField(blank=True, default='')),
('is_default', models.BooleanField(default=False)),
('is_visible', models.BooleanField(default=True)),
('davical_path', models.CharField(max_length=512, unique=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('owner', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='calendars', to=settings.AUTH_USER_MODEL)),
],
options={
'ordering': ['-is_default', 'name'],
},
),
migrations.CreateModel(
name='CalendarShare',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('permission', models.CharField(choices=[('read', 'Read only'), ('write', 'Read and write')], default='read', max_length=10)),
('is_visible', models.BooleanField(default=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('calendar', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='shares', to='core.calendar')),
('shared_with', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='shared_calendars', to=settings.AUTH_USER_MODEL)),
],
),
migrations.AddConstraint(
model_name='calendar',
constraint=models.UniqueConstraint(condition=models.Q(('is_default', True)), fields=('owner',), name='unique_default_calendar_per_user'),
),
migrations.AlterUniqueTogether(
name='calendarshare',
unique_together={('calendar', 'shared_with')},
),
]

View File

402
src/backend/core/models.py Normal file
View File

@@ -0,0 +1,402 @@
"""
Declare and configure the models for the calendars core application
"""
# pylint: disable=too-many-lines
import uuid
from datetime import timedelta
from logging import getLogger
from django.conf import settings
from django.contrib.auth import models as auth_models
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.postgres.indexes import GistIndex
from django.contrib.sites.models import Site
from django.core import mail, validators
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.core.mail import send_mail
from django.db import models, transaction
from django.db.models.expressions import RawSQL
from django.template.loader import render_to_string
from django.utils import timezone
from django.utils.functional import cached_property
from django.utils.translation import get_language, override
from django.utils.translation import gettext_lazy as _
from timezone_field import TimeZoneField
logger = getLogger(__name__)
class LinkRoleChoices(models.TextChoices):
"""Defines the possible roles a link can offer on a item."""
READER = "reader", _("Reader") # Can read
EDITOR = "editor", _("Editor") # Can read and edit
class RoleChoices(models.TextChoices):
"""Defines the possible roles a user can have in a resource."""
READER = "reader", _("Reader") # Can read
EDITOR = "editor", _("Editor") # Can read and edit
ADMIN = "administrator", _("Administrator") # Can read, edit, delete and share
OWNER = "owner", _("Owner")
PRIVILEGED_ROLES = [RoleChoices.ADMIN, RoleChoices.OWNER]
class LinkReachChoices(models.TextChoices):
"""Defines types of access for links"""
RESTRICTED = (
"restricted",
_("Restricted"),
) # Only users with a specific access can read/edit the item
AUTHENTICATED = (
"authenticated",
_("Authenticated"),
) # Any authenticated user can access the item
PUBLIC = "public", _("Public") # Even anonymous users can access the item
class DuplicateEmailError(Exception):
"""Raised when an email is already associated with a pre-existing user."""
def __init__(self, message=None, email=None):
"""Set message and email to describe the exception."""
self.message = message
self.email = email
super().__init__(self.message)
class BaseModel(models.Model):
"""
Serves as an abstract base model for other models, ensuring that records are validated
before saving as Django doesn't do it by default.
Includes fields common to all models: a UUID primary key and creation/update timestamps.
"""
id = models.UUIDField(
verbose_name=_("id"),
help_text=_("primary key for the record as UUID"),
primary_key=True,
default=uuid.uuid4,
editable=False,
)
created_at = models.DateTimeField(
verbose_name=_("created on"),
help_text=_("date and time at which a record was created"),
auto_now_add=True,
editable=False,
)
updated_at = models.DateTimeField(
verbose_name=_("updated on"),
help_text=_("date and time at which a record was last updated"),
auto_now=True,
editable=False,
)
class Meta:
abstract = True
def save(self, *args, **kwargs):
"""Call `full_clean` before saving."""
self.full_clean()
super().save(*args, **kwargs)
class UserManager(auth_models.UserManager):
"""Custom manager for User model with additional methods."""
def get_user_by_sub_or_email(self, sub, email):
"""Fetch existing user by sub or email."""
try:
return self.get(sub=sub)
except self.model.DoesNotExist as err:
if not email:
return None
if settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION:
try:
return self.get(email=email)
except self.model.DoesNotExist:
pass
elif (
self.filter(email=email).exists()
and not settings.OIDC_ALLOW_DUPLICATE_EMAILS
):
raise DuplicateEmailError(
_(
"We couldn't find a user with this sub but the email is already "
"associated with a registered user."
)
) from err
return None
class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
"""User model to work with OIDC only authentication."""
sub_validator = validators.RegexValidator(
regex=r"^[\w.@+-:]+\Z",
message=_(
"Enter a valid sub. This value may contain only letters, "
"numbers, and @/./+/-/_/: characters."
),
)
sub = models.CharField(
_("sub"),
help_text=_(
"Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_/: characters only."
),
max_length=255,
unique=True,
validators=[sub_validator],
blank=True,
null=True,
)
full_name = models.CharField(_("full name"), max_length=100, null=True, blank=True)
short_name = models.CharField(_("short name"), max_length=20, null=True, blank=True)
email = models.EmailField(_("identity email address"), blank=True, null=True)
# Unlike the "email" field which stores the email coming from the OIDC token, this field
# stores the email used by staff users to login to the admin site
admin_email = models.EmailField(
_("admin email address"), unique=True, blank=True, null=True
)
language = models.CharField(
max_length=10,
choices=settings.LANGUAGES,
default=None,
verbose_name=_("language"),
help_text=_("The language in which the user wants to see the interface."),
null=True,
blank=True,
)
timezone = TimeZoneField(
choices_display="WITH_GMT_OFFSET",
use_pytz=False,
default=settings.TIME_ZONE,
help_text=_("The timezone in which the user wants to see times."),
)
is_device = models.BooleanField(
_("device"),
default=False,
help_text=_("Whether the user is a device or a real user."),
)
is_staff = models.BooleanField(
_("staff status"),
default=False,
help_text=_("Whether the user can log into this admin site."),
)
is_active = models.BooleanField(
_("active"),
default=True,
help_text=_(
"Whether this user should be treated as active. "
"Unselect this instead of deleting accounts."
),
)
claims = models.JSONField(
blank=True,
default=dict,
help_text=_("Claims from the OIDC token."),
)
objects = UserManager()
USERNAME_FIELD = "admin_email"
REQUIRED_FIELDS = []
class Meta:
db_table = "calendars_user"
verbose_name = _("user")
verbose_name_plural = _("users")
def __str__(self):
return self.email or self.admin_email or str(self.id)
def save(self, *args, **kwargs):
"""
If it's a new user, give its user access to the items to which s.he was invited.
"""
is_adding = self._state.adding
super().save(*args, **kwargs)
def email_user(self, subject, message, from_email=None, **kwargs):
"""Email this user."""
if not self.email:
raise ValueError("User has no email address.")
mail.send_mail(subject, message, from_email, [self.email], **kwargs)
@cached_property
def teams(self):
"""
Get list of teams in which the user is, as a list of strings.
Must be cached if retrieved remotely.
"""
return []
class BaseAccess(BaseModel):
"""Base model for accesses to handle resources."""
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
null=True,
blank=True,
)
team = models.CharField(max_length=100, blank=True)
role = models.CharField(
max_length=20, choices=RoleChoices.choices, default=RoleChoices.READER
)
class Meta:
abstract = True
def _get_abilities(self, resource, user):
"""
Compute and return abilities for a given user taking into account
the current state of the object.
"""
roles = []
if user.is_authenticated:
teams = user.teams
try:
roles = self.user_roles or []
except AttributeError:
try:
roles = resource.accesses.filter(
models.Q(user=user) | models.Q(team__in=teams),
).values_list("role", flat=True)
except (self._meta.model.DoesNotExist, IndexError):
roles = []
is_owner_or_admin = bool(
set(roles).intersection({RoleChoices.OWNER, RoleChoices.ADMIN})
)
if self.role == RoleChoices.OWNER:
can_delete = (
RoleChoices.OWNER in roles
and resource.accesses.filter(role=RoleChoices.OWNER).count() > 1
)
set_role_to = (
[RoleChoices.ADMIN, RoleChoices.EDITOR, RoleChoices.READER]
if can_delete
else []
)
else:
can_delete = is_owner_or_admin
set_role_to = []
if RoleChoices.OWNER in roles:
set_role_to.append(RoleChoices.OWNER)
if is_owner_or_admin:
set_role_to.extend(
[RoleChoices.ADMIN, RoleChoices.EDITOR, RoleChoices.READER]
)
# Remove the current role as we don't want to propose it as an option
try:
set_role_to.remove(self.role)
except ValueError:
pass
return {
"destroy": can_delete,
"update": bool(set_role_to),
"partial_update": bool(set_role_to),
"retrieve": bool(roles),
"set_role_to": set_role_to,
}
class Calendar(models.Model):
"""
Represents a calendar owned by a user.
This model tracks calendars stored in DAViCal and links them to Django users.
"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
owner = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
related_name="calendars",
)
name = models.CharField(max_length=255)
color = models.CharField(max_length=7, default="#3174ad") # Hex color
description = models.TextField(blank=True, default="")
is_default = models.BooleanField(default=False)
is_visible = models.BooleanField(default=True)
# DAViCal reference - the calendar path in DAViCal
davical_path = models.CharField(max_length=512, unique=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
"""Meta options for Calendar model."""
ordering = ["-is_default", "name"]
constraints = [
models.UniqueConstraint(
fields=["owner"],
condition=models.Q(is_default=True),
name="unique_default_calendar_per_user",
)
]
def __str__(self):
return f"{self.name} ({self.owner.email})"
class CalendarShare(models.Model):
"""
Represents a calendar shared with another user.
"""
PERMISSION_READ = "read"
PERMISSION_WRITE = "write"
PERMISSION_CHOICES = [
(PERMISSION_READ, "Read only"),
(PERMISSION_WRITE, "Read and write"),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
calendar = models.ForeignKey(
Calendar,
on_delete=models.CASCADE,
related_name="shares",
)
shared_with = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
related_name="shared_calendars",
)
permission = models.CharField(
max_length=10,
choices=PERMISSION_CHOICES,
default=PERMISSION_READ,
)
is_visible = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
"""Meta options for CalendarShare model."""
unique_together = ["calendar", "shared_with"]
def __str__(self):
return f"{self.calendar.name} shared with {self.shared_with.email}"

View File

View File

@@ -0,0 +1,477 @@
"""Services for CalDAV integration with DAViCal."""
import logging
from datetime import date, datetime, timedelta
from typing import Optional
from uuid import uuid4
from django.conf import settings
from django.utils import timezone
import psycopg
from caldav import DAVClient
from caldav.lib.error import NotFoundError
from core.models import Calendar
logger = logging.getLogger(__name__)
class DAViCalClient:
"""
Client for communicating with DAViCal CalDAV server using the caldav library.
"""
def __init__(self):
self.base_url = getattr(settings, "DAVICAL_URL", "http://davical:80")
self.timeout = 30
def _get_client(self, user) -> DAVClient:
"""
Get a CalDAV client for the given user.
DAViCal uses X-Forwarded-User header for authentication. The caldav
library requires username/password for Basic Auth, but DAViCal users have
password '*' (external auth). We pass the X-Forwarded-User header directly
to the DAVClient constructor.
"""
# DAViCal base URL - the caldav library will discover the principal
caldav_url = f"{self.base_url}/caldav.php/"
return DAVClient(
url=caldav_url,
username=user.email,
password="", # Empty password - DAViCal uses X-Forwarded-User header
timeout=self.timeout,
headers={
"X-Forwarded-User": user.email,
},
)
def ensure_user_exists(self, user) -> None:
"""
Ensure the user exists in DAViCal's database.
Creates the user if they don't exist.
"""
# Connect to shared calendars database (public schema)
default_db = settings.DATABASES["default"]
db_name = default_db.get("NAME", "calendars")
# Get password - handle SecretValue objects
password = default_db.get("PASSWORD", "pass")
if hasattr(password, "value"):
password = password.value
# Connect to calendars database
conn = psycopg.connect(
host=default_db.get("HOST", "localhost"),
port=default_db.get("PORT", 5432),
dbname=db_name,
user=default_db.get("USER", "pgroot"),
password=password,
)
try:
with conn.cursor() as cursor:
# Check if user exists (in public schema)
cursor.execute(
"SELECT user_no FROM usr WHERE lower(username) = lower(%s)",
[user.email],
)
if cursor.fetchone():
# User already exists
return
# Create user in DAViCal (public schema)
# Use email as username, password '*' means external auth
# Get user's full name or use email prefix
fullname = (
getattr(user, "full_name", None)
or getattr(user, "get_full_name", lambda: None)()
or user.email.split("@")[0]
)
cursor.execute(
"""
INSERT INTO usr (username, email, fullname, active, password)
VALUES (%s, %s, %s, true, '*')
ON CONFLICT (lower(username)) DO NOTHING
RETURNING user_no
""",
[user.email, user.email, fullname],
)
result = cursor.fetchone()
if result:
user_no = result[0]
logger.info(
"Created DAViCal user: %s (user_no: %s)", user.email, user_no
)
# Also create a principal record for the user (public schema)
# DAViCal needs both usr and principal records
# Principal type 1 is for users
type_id = 1
cursor.execute(
"""
INSERT INTO principal (type_id, user_no, displayname)
SELECT %s, %s, %s
WHERE NOT EXISTS (SELECT 1 FROM principal WHERE user_no = %s)
RETURNING principal_id
""",
[type_id, user_no, fullname, user_no],
)
principal_result = cursor.fetchone()
if principal_result:
logger.info(
"Created DAViCal principal: %s (principal_id: %s)",
user.email,
principal_result[0],
)
else:
logger.warning("User %s already exists in DAViCal", user.email)
conn.commit()
finally:
conn.close()
def create_calendar(self, user, calendar_name: str, calendar_id: str) -> str:
"""
Create a new calendar in DAViCal for the given user.
Returns the DAViCal path for the calendar.
"""
# Ensure user exists first
self.ensure_user_exists(user)
client = self._get_client(user)
principal = client.principal()
try:
# Create calendar using caldav library
calendar = principal.make_calendar(name=calendar_name)
# DAViCal calendar path format: /caldav.php/{username}/{calendar_id}/
# The caldav library returns a URL object, convert to string and extract path
calendar_url = str(calendar.url)
# Extract path from full URL
if calendar_url.startswith(self.base_url):
path = calendar_url[len(self.base_url) :]
else:
# Fallback: construct path manually based on DAViCal's structure
# DAViCal creates calendars with a specific path structure
path = f"/caldav.php/{user.email}/{calendar_id}/"
logger.info("Created calendar in DAViCal: %s at %s", calendar_name, path)
return path
except Exception as e:
logger.error("Failed to create calendar in DAViCal: %s", str(e))
raise
def get_events(
self,
user,
calendar_path: str,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
) -> list:
"""
Get events from a calendar within a time range.
Returns list of event dictionaries with parsed data.
"""
# Ensure user exists first
self.ensure_user_exists(user)
# Default to current month if no range specified
if start is None:
start = timezone.now().replace(day=1, hour=0, minute=0, second=0)
if end is None:
end = start + timedelta(days=31)
client = self._get_client(user)
# Get calendar by URL
calendar_url = f"{self.base_url}{calendar_path}"
calendar = client.calendar(url=calendar_url)
try:
# Search for events in the date range
# Convert datetime to date for search if needed
start_date = start.date() if isinstance(start, datetime) else start
end_date = end.date() if isinstance(end, datetime) else end
events = calendar.search(
event=True,
start=start_date,
end=end_date,
expand=True, # Expand recurring events
)
# Parse events into dictionaries
parsed_events = []
for event in events:
event_data = self._parse_event(event)
if event_data:
parsed_events.append(event_data)
return parsed_events
except NotFoundError:
logger.warning("Calendar not found at path: %s", calendar_path)
return []
except Exception as e:
logger.error("Failed to get events from DAViCal: %s", str(e))
raise
def create_event(self, user, calendar_path: str, event_data: dict) -> str:
"""
Create a new event in DAViCal.
Returns the event UID.
"""
# Ensure user exists first
self.ensure_user_exists(user)
client = self._get_client(user)
calendar_url = f"{self.base_url}{calendar_path}"
calendar = client.calendar(url=calendar_url)
# Extract event data
dtstart = event_data.get("start", timezone.now())
dtend = event_data.get("end", dtstart + timedelta(hours=1))
summary = event_data.get("title", "New Event")
description = event_data.get("description", "")
location = event_data.get("location", "")
# Generate UID if not provided
event_uid = event_data.get("uid", str(uuid4()))
try:
# Create event using caldav library
event = calendar.save_event(
dtstart=dtstart,
dtend=dtend,
uid=event_uid,
summary=summary,
description=description,
location=location,
)
# Extract UID from created event
# The caldav library returns an Event object
if hasattr(event, "icalendar_component"):
event_uid = str(event.icalendar_component.get("uid", event_uid))
elif hasattr(event, "vobject_instance"):
event_uid = event.vobject_instance.vevent.uid.value
logger.info("Created event in DAViCal: %s", event_uid)
return event_uid
except Exception as e:
logger.error("Failed to create event in DAViCal: %s", str(e))
raise
def update_event(
self, user, calendar_path: str, event_uid: str, event_data: dict
) -> None:
"""Update an existing event in DAViCal."""
# Ensure user exists first
self.ensure_user_exists(user)
client = self._get_client(user)
calendar_url = f"{self.base_url}{calendar_path}"
calendar = client.calendar(url=calendar_url)
try:
# Search for the event by UID
events = calendar.search(event=True)
target_event = None
for event in events:
event_uid_value = None
if hasattr(event, "icalendar_component"):
event_uid_value = str(event.icalendar_component.get("uid", ""))
elif hasattr(event, "vobject_instance"):
event_uid_value = event.vobject_instance.vevent.uid.value
if event_uid_value == event_uid:
target_event = event
break
if not target_event:
raise ValueError(f"Event with UID {event_uid} not found")
# Update event properties
dtstart = event_data.get("start")
dtend = event_data.get("end")
summary = event_data.get("title")
description = event_data.get("description")
location = event_data.get("location")
# Update using icalendar component
component = target_event.icalendar_component
if dtstart:
component["dtstart"] = dtstart
if dtend:
component["dtend"] = dtend
if summary:
component["summary"] = summary
if description is not None:
component["description"] = description
if location is not None:
component["location"] = location
# Save the updated event
target_event.save()
logger.info("Updated event in DAViCal: %s", event_uid)
except Exception as e:
logger.error("Failed to update event in DAViCal: %s", str(e))
raise
def delete_event(self, user, calendar_path: str, event_uid: str) -> None:
"""Delete an event from DAViCal."""
# Ensure user exists first
self.ensure_user_exists(user)
client = self._get_client(user)
calendar_url = f"{self.base_url}{calendar_path}"
calendar = client.calendar(url=calendar_url)
try:
# Search for the event by UID
events = calendar.search(event=True)
target_event = None
for event in events:
event_uid_value = None
if hasattr(event, "icalendar_component"):
event_uid_value = str(event.icalendar_component.get("uid", ""))
elif hasattr(event, "vobject_instance"):
event_uid_value = event.vobject_instance.vevent.uid.value
if event_uid_value == event_uid:
target_event = event
break
if not target_event:
raise ValueError(f"Event with UID {event_uid} not found")
# Delete the event
target_event.delete()
logger.info("Deleted event from DAViCal: %s", event_uid)
except Exception as e:
logger.error("Failed to delete event from DAViCal: %s", str(e))
raise
def _parse_event(self, event) -> Optional[dict]:
"""
Parse a caldav Event object and return event data as dictionary.
"""
try:
component = event.icalendar_component
event_data = {
"uid": str(component.get("uid", "")),
"title": str(component.get("summary", "")),
"start": component.get("dtstart").dt
if component.get("dtstart")
else None,
"end": component.get("dtend").dt if component.get("dtend") else None,
"description": str(component.get("description", "")),
"location": str(component.get("location", "")),
}
# Convert datetime to string format for consistency
if event_data["start"]:
if isinstance(event_data["start"], datetime):
event_data["start"] = event_data["start"].strftime("%Y%m%dT%H%M%SZ")
elif isinstance(event_data["start"], date):
event_data["start"] = event_data["start"].strftime("%Y%m%d")
if event_data["end"]:
if isinstance(event_data["end"], datetime):
event_data["end"] = event_data["end"].strftime("%Y%m%dT%H%M%SZ")
elif isinstance(event_data["end"], date):
event_data["end"] = event_data["end"].strftime("%Y%m%d")
return event_data if event_data.get("uid") else None
except Exception as e:
logger.warning("Failed to parse event: %s", str(e))
return None
class CalendarService:
"""
High-level service for managing calendars and events.
"""
def __init__(self):
self.davical = DAViCalClient()
def create_default_calendar(self, user) -> Calendar:
"""
Create a default calendar for a user.
"""
calendar_id = str(uuid4())
calendar_name = "Mon calendrier"
# Create calendar in DAViCal
davical_path = self.davical.create_calendar(user, calendar_name, calendar_id)
# Create local Calendar record
calendar = Calendar.objects.create(
owner=user,
name=calendar_name,
davical_path=davical_path,
is_default=True,
color="#3174ad",
)
return calendar
def create_calendar(self, user, name: str, color: str = "#3174ad") -> Calendar:
"""
Create a new calendar for a user.
"""
calendar_id = str(uuid4())
# Create calendar in DAViCal
davical_path = self.davical.create_calendar(user, name, calendar_id)
# Create local Calendar record
calendar = Calendar.objects.create(
owner=user,
name=name,
davical_path=davical_path,
is_default=False,
color=color,
)
return calendar
def get_user_calendars(self, user):
"""
Get all calendars accessible by a user (owned + shared).
"""
owned = Calendar.objects.filter(owner=user)
shared = Calendar.objects.filter(shares__shared_with=user)
return owned.union(shared)
def get_events(self, user, calendar: Calendar, start=None, end=None) -> list:
"""
Get events from a calendar.
Returns parsed event data.
"""
return self.davical.get_events(user, calendar.davical_path, start, end)
def create_event(self, user, calendar: Calendar, event_data: dict) -> str:
"""Create a new event."""
return self.davical.create_event(user, calendar.davical_path, event_data)
def update_event(
self, user, calendar: Calendar, event_uid: str, event_data: dict
) -> None:
"""Update an existing event."""
self.davical.update_event(user, calendar.davical_path, event_uid, event_data)
def delete_event(self, user, calendar: Calendar, event_uid: str) -> None:
"""Delete an event."""
self.davical.delete_event(user, calendar.davical_path, event_uid)

View File

@@ -0,0 +1,55 @@
"""
Declare and configure the signals for the calendars core application
"""
import logging
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db.models.signals import post_save
from django.dispatch import receiver
from core.services.caldav_service import CalendarService
logger = logging.getLogger(__name__)
User = get_user_model()
@receiver(post_save, sender=User)
def provision_default_calendar(sender, instance, created, **kwargs):
"""
Auto-provision a default calendar when a new user is created.
"""
if not created:
return
# Check if user already has a default calendar
if instance.calendars.filter(is_default=True).exists():
return
# Skip calendar creation if DAViCal is not configured
if not getattr(settings, "DAVICAL_URL", None):
return
try:
service = CalendarService()
service.create_default_calendar(instance)
logger.info("Created default calendar for user %s", instance.email)
except Exception as e:
# In tests, DAViCal tables don't exist, so fail silently
# Check if it's a database error that suggests we're in tests
error_str = str(e).lower()
if "does not exist" in error_str or "relation" in error_str:
# Likely in test environment, fail silently
logger.debug(
"Skipped calendar creation for user %s (likely test environment): %s",
instance.email,
str(e),
)
else:
# Real error, log it
logger.error(
"Failed to create default calendar for user %s: %s",
instance.email,
str(e),
)

View File

@@ -0,0 +1,58 @@
"""Custom template tags for the calendars core application."""
import base64
from django import template
from django.contrib.staticfiles import finders
from PIL import ImageFile as PillowImageFile
register = template.Library()
def image_to_base64(file_or_path, close=False):
"""
Return the src string of the base64 encoding of an image represented by its path
or file opened or not.
Inspired by Django's "get_image_dimensions"
"""
pil_parser = PillowImageFile.Parser()
if hasattr(file_or_path, "read"):
file = file_or_path
if file.closed and hasattr(file, "open"):
file_or_path.open()
file_pos = file.tell()
file.seek(0)
else:
try:
# pylint: disable=consider-using-with
file = open(file_or_path, "rb")
except OSError:
return ""
close = True
try:
image_data = file.read()
if not image_data:
return ""
pil_parser.feed(image_data)
if pil_parser.image:
mime_type = pil_parser.image.get_format_mimetype()
encoded_string = base64.b64encode(image_data)
return f"data:{mime_type:s};base64, {encoded_string.decode('utf-8'):s}"
return ""
finally:
if close:
file.close()
else:
file.seek(file_pos)
@register.simple_tag
def base64_static(path):
"""Return a static file into a base64."""
full_path = finders.find(path)
if full_path:
return image_to_base64(full_path, True)
return ""

View File

View File

@@ -0,0 +1,579 @@
"""Unit tests for the Authentication Backends."""
import random
import re
from unittest import mock
from django.core.exceptions import SuspiciousOperation
from django.test.utils import override_settings
import pytest
import responses
from cryptography.fernet import Fernet
from lasuite.oidc_login.backends import get_oidc_refresh_token
from core import models
from core.authentication.backends import OIDCAuthenticationBackend
from core.authentication.exceptions import UserCannotAccessApp
from core.factories import UserFactory
pytestmark = pytest.mark.django_db
def test_authentication_getter_existing_user_no_email(
django_assert_num_queries, monkeypatch
):
"""
If an existing user matches the user's info sub, the user should be returned.
"""
klass = OIDCAuthenticationBackend()
db_user = UserFactory()
def get_userinfo_mocked(*args):
return {"sub": db_user.sub}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
with django_assert_num_queries(1):
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user == db_user
def test_authentication_getter_existing_user_via_email(
django_assert_num_queries, monkeypatch
):
"""
If an existing user doesn't match the sub but matches the email,
the user should be returned.
"""
klass = OIDCAuthenticationBackend()
db_user = UserFactory()
def get_userinfo_mocked(*args):
return {"sub": "123", "email": db_user.email}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
with django_assert_num_queries(4): # user by sub, user by mail, update sub
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user == db_user
def test_authentication_getter_email_none(monkeypatch):
"""
If no user is found with the sub and no email is provided, a new user should be created.
"""
klass = OIDCAuthenticationBackend()
db_user = UserFactory(email=None)
def get_userinfo_mocked(*args):
user_info = {"sub": "123"}
if random.choice([True, False]):
user_info["email"] = None
return user_info
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
# Since the sub and email didn't match, it should create a new user
assert models.User.objects.count() == 2
assert user != db_user
assert user.sub == "123"
def test_authentication_getter_existing_user_no_fallback_to_email_allow_duplicate(
settings, monkeypatch
):
"""
When the "OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION" setting is set to False,
the system should not match users by email, even if the email matches.
"""
klass = OIDCAuthenticationBackend()
db_user = UserFactory()
# Set the setting to False
settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION = False
settings.OIDC_ALLOW_DUPLICATE_EMAILS = True
def get_userinfo_mocked(*args):
return {"sub": "123", "email": db_user.email}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
# Since the sub doesn't match, it should create a new user
assert models.User.objects.count() == 2
assert user != db_user
assert user.sub == "123"
def test_authentication_getter_existing_user_no_fallback_to_email_no_duplicate(
settings, monkeypatch
):
"""
When the "OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION" setting is set to False,
the system should not match users by email, even if the email matches.
"""
klass = OIDCAuthenticationBackend()
db_user = UserFactory()
# Set the setting to False
settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION = False
settings.OIDC_ALLOW_DUPLICATE_EMAILS = False
def get_userinfo_mocked(*args):
return {"sub": "123", "email": db_user.email}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
with pytest.raises(
SuspiciousOperation,
match=(
"We couldn't find a user with this sub but the email is already associated "
"with a registered user."
),
):
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
# Since the sub doesn't match, it should not create a new user
assert models.User.objects.count() == 1
def test_authentication_getter_existing_user_with_email(
django_assert_num_queries, monkeypatch
):
"""
When the user's info contains an email and targets an existing user,
"""
klass = OIDCAuthenticationBackend()
user = UserFactory(full_name="John Doe", short_name="John")
def get_userinfo_mocked(*args):
return {
"sub": user.sub,
"email": user.email,
"first_name": "John",
"last_name": "Doe",
}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
# Only 1 query because email and names have not changed
with django_assert_num_queries(1):
authenticated_user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user == authenticated_user
@pytest.mark.parametrize(
"first_name, last_name, email",
[
("Jack", "Doe", "john.doe@example.com"),
("John", "Duy", "john.doe@example.com"),
("John", "Doe", "jack.duy@example.com"),
("Jack", "Duy", "jack.duy@example.com"),
],
)
def test_authentication_getter_existing_user_change_fields_sub(
first_name, last_name, email, django_assert_num_queries, monkeypatch
):
"""
It should update the email or name fields on the user when they change
and the user was identified by its "sub".
"""
klass = OIDCAuthenticationBackend()
user = UserFactory(
full_name="John Doe", short_name="John", email="john.doe@example.com"
)
def get_userinfo_mocked(*args):
return {
"sub": user.sub,
"email": email,
"first_name": first_name,
"last_name": last_name,
}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
# One and only one additional update query when a field has changed
with django_assert_num_queries(3):
authenticated_user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user == authenticated_user
user.refresh_from_db()
assert user.email == email
assert user.full_name == f"{first_name:s} {last_name:s}"
assert user.short_name == first_name
@pytest.mark.parametrize(
"first_name, last_name, email",
[
("Jack", "Doe", "john.doe@example.com"),
("John", "Duy", "john.doe@example.com"),
],
)
def test_authentication_getter_existing_user_change_fields_email(
first_name, last_name, email, django_assert_num_queries, monkeypatch
):
"""
It should update the name fields on the user when they change
and the user was identified by its "email" as fallback.
"""
klass = OIDCAuthenticationBackend()
user = UserFactory(
full_name="John Doe", short_name="John", email="john.doe@example.com"
)
def get_userinfo_mocked(*args):
return {
"sub": "123",
"email": user.email,
"first_name": first_name,
"last_name": last_name,
}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
# One and only one additional update query when a field has changed
with django_assert_num_queries(4):
authenticated_user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user == authenticated_user
user.refresh_from_db()
assert user.email == email
assert user.full_name == f"{first_name:s} {last_name:s}"
assert user.short_name == first_name
def test_authentication_getter_new_user_no_email(monkeypatch):
"""
If no user matches the user's info sub, a user should be created.
User's info doesn't contain an email, created user's email should be empty.
"""
klass = OIDCAuthenticationBackend()
def get_userinfo_mocked(*args):
return {"sub": "123"}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user.sub == "123"
assert user.email is None
assert user.full_name is None
assert user.short_name is None
assert user.has_usable_password() is False
assert models.User.objects.count() == 1
def test_authentication_getter_new_user_with_email(monkeypatch):
"""
If no user matches the user's info sub, a user should be created.
User's email and name should be set on the identity.
The "email" field on the User model should not be set as it is reserved for staff users.
"""
klass = OIDCAuthenticationBackend()
email = "calendars@example.com"
def get_userinfo_mocked(*args):
return {"sub": "123", "email": email, "first_name": "John", "last_name": "Doe"}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user.sub == "123"
assert user.email == email
assert user.full_name == "John Doe"
assert user.short_name == "John"
assert user.has_usable_password() is False
assert models.User.objects.count() == 1
@override_settings(OIDC_OP_USER_ENDPOINT="http://oidc.endpoint.test/userinfo")
@responses.activate
def test_authentication_get_userinfo_json_response():
"""Test get_userinfo method with a JSON response."""
responses.add(
responses.GET,
re.compile(r".*/userinfo"),
json={
"first_name": "John",
"last_name": "Doe",
"email": "john.doe@example.com",
},
status=200,
)
oidc_backend = OIDCAuthenticationBackend()
result = oidc_backend.get_userinfo("fake_access_token", None, None)
assert result["first_name"] == "John"
assert result["last_name"] == "Doe"
assert result["email"] == "john.doe@example.com"
@override_settings(OIDC_OP_USER_ENDPOINT="http://oidc.endpoint.test/userinfo")
@responses.activate
def test_authentication_get_userinfo_token_response(monkeypatch, settings):
"""Test get_userinfo method with a token response."""
settings.OIDC_RP_SIGN_ALGO = "HS256" # disable JWKS URL call
responses.add(
responses.GET,
re.compile(r".*/userinfo"),
body="fake.jwt.token",
status=200,
content_type="application/jwt",
)
def mock_verify_token(self, token): # pylint: disable=unused-argument
return {
"first_name": "Jane",
"last_name": "Doe",
"email": "jane.doe@example.com",
}
monkeypatch.setattr(OIDCAuthenticationBackend, "verify_token", mock_verify_token)
oidc_backend = OIDCAuthenticationBackend()
result = oidc_backend.get_userinfo("fake_access_token", None, None)
assert result["first_name"] == "Jane"
assert result["last_name"] == "Doe"
assert result["email"] == "jane.doe@example.com"
@override_settings(OIDC_OP_USER_ENDPOINT="http://oidc.endpoint.test/userinfo")
@responses.activate
def test_authentication_get_userinfo_invalid_response(settings):
"""
Test get_userinfo method with an invalid JWT response that
causes verify_token to raise an error.
"""
settings.OIDC_RP_SIGN_ALGO = "HS256" # disable JWKS URL call
responses.add(
responses.GET,
re.compile(r".*/userinfo"),
body="fake.jwt.token",
status=200,
content_type="application/jwt",
)
oidc_backend = OIDCAuthenticationBackend()
with pytest.raises(
SuspiciousOperation,
match="User info response was not valid JWT",
):
oidc_backend.get_userinfo("fake_access_token", None, None)
def test_authentication_getter_existing_disabled_user_via_sub(
django_assert_num_queries, monkeypatch
):
"""
If an existing user matches the sub but is disabled,
an error should be raised and a user should not be created.
"""
klass = OIDCAuthenticationBackend()
db_user = UserFactory(is_active=False)
def get_userinfo_mocked(*args):
return {
"sub": db_user.sub,
"email": db_user.email,
"first_name": "John",
"last_name": "Doe",
}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
with (
django_assert_num_queries(1),
pytest.raises(SuspiciousOperation, match="User account is disabled"),
):
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
assert models.User.objects.count() == 1
def test_authentication_getter_existing_disabled_user_via_email(
django_assert_num_queries, monkeypatch
):
"""
If an existing user does not match the sub but matches the email and is disabled,
an error should be raised and a user should not be created.
"""
klass = OIDCAuthenticationBackend()
db_user = UserFactory(is_active=False)
def get_userinfo_mocked(*args):
return {
"sub": "random",
"email": db_user.email,
"first_name": "John",
"last_name": "Doe",
}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
with (
django_assert_num_queries(2),
pytest.raises(SuspiciousOperation, match="User account is disabled"),
):
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
assert models.User.objects.count() == 1
@responses.activate
def test_authentication_session_tokens(
django_assert_num_queries, monkeypatch, rf, settings
):
"""
Test that the session contains oidc_refresh_token and oidc_access_token after authentication.
"""
settings.OIDC_OP_TOKEN_ENDPOINT = "http://oidc.endpoint.test/token"
settings.OIDC_OP_USER_ENDPOINT = "http://oidc.endpoint.test/userinfo"
settings.OIDC_OP_JWKS_ENDPOINT = "http://oidc.endpoint.test/jwks"
settings.OIDC_STORE_ACCESS_TOKEN = True
settings.OIDC_STORE_REFRESH_TOKEN = True
settings.OIDC_STORE_REFRESH_TOKEN_KEY = Fernet.generate_key()
klass = OIDCAuthenticationBackend()
request = rf.get("/some-url", {"state": "test-state", "code": "test-code"})
request.session = {}
def verify_token_mocked(*args, **kwargs):
return {"sub": "123", "email": "test@example.com"}
monkeypatch.setattr(OIDCAuthenticationBackend, "verify_token", verify_token_mocked)
responses.add(
responses.POST,
re.compile(settings.OIDC_OP_TOKEN_ENDPOINT),
json={
"access_token": "test-access-token",
"refresh_token": "test-refresh-token",
},
status=200,
)
responses.add(
responses.GET,
re.compile(settings.OIDC_OP_USER_ENDPOINT),
json={"sub": "123", "email": "test@example.com"},
status=200,
)
with django_assert_num_queries(27):
user = klass.authenticate(
request,
code="test-code",
nonce="test-nonce",
code_verifier="test-code-verifier",
)
assert user is not None
assert request.session["oidc_access_token"] == "test-access-token"
assert get_oidc_refresh_token(request.session) == "test-refresh-token"
@override_settings(OIDC_STORE_CLAIMS=["iss"])
def test_authentication_store_claims_new_user(monkeypatch):
"""
Test that the claims are stored on the user when a new user is created.
"""
klass = OIDCAuthenticationBackend()
email = "calendars@example.com"
def get_userinfo_mocked(*args):
return {
"sub": "123",
"email": email,
"first_name": "John",
"last_name": "Doe",
"iss": "https://example.com",
}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
assert user.sub == "123"
assert user.email == email
assert user.full_name == "John Doe"
assert user.short_name == "John"
assert user.has_usable_password() is False
assert user.claims == {"iss": "https://example.com"}
assert models.User.objects.count() == 1
@override_settings(OIDC_STORE_CLAIMS=["iss"])
def test_authentication_store_claims_existing_user(monkeypatch):
"""
Test that the claims are stored on the user when an existing user is authenticated.
"""
klass = OIDCAuthenticationBackend()
user = UserFactory(
email="calendars@example.com", sub="123", claims={"iss": "https://obsolete.com"}
)
email = "calendars@example.com"
def get_userinfo_mocked(*args):
return {
"sub": "123",
"email": email,
"first_name": "John",
"last_name": "Doe",
"iss": "https://example.com",
}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
user.refresh_from_db()
assert user.sub == "123"
assert user.email == email
assert user.claims == {"iss": "https://example.com"}
assert models.User.objects.count() == 1

View File

@@ -0,0 +1,66 @@
"""Unit tests for the Authentication Views."""
from unittest import mock
from django.contrib.sessions.middleware import SessionMiddleware
from django.test import RequestFactory
from django.test.utils import override_settings
import pytest
from mozilla_django_oidc.auth import (
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
)
from core import factories
from core.authentication.backends import OIDCAuthenticationBackend
from core.authentication.views import (
OIDCAuthenticationCallbackView,
)
pytestmark = pytest.mark.django_db
@override_settings(
LOGIN_REDIRECT_URL_FAILURE="/auth/failure",
LOGIN_REDIRECT_URL="/auth/success",
)
@mock.patch.object(
MozillaOIDCAuthenticationBackend,
"get_token",
return_value={"id_token": "mocked_id_token", "access_token": "mocked_access_token"},
)
@mock.patch.object(
MozillaOIDCAuthenticationBackend, "verify_token", return_value={"not": "needed"}
)
@mock.patch.object(
OIDCAuthenticationBackend,
"get_userinfo",
return_value={"sub": "mocked_sub", "email": "allowed@example.com"},
)
def test_view_login_callback_authorized_by_default(
mocked_get_userinfo, mocked_verify_token, mocked_get_token
):
"""By default, all users are authorized to login."""
user = factories.UserFactory(email="allowed@example.com")
request = RequestFactory().get(
"/callback/", data={"state": "mocked_state", "code": "mocked_code"}
)
request.user = user
middleware = SessionMiddleware(get_response=lambda x: x)
middleware.process_request(request)
mocked_state = "mocked_state"
request.session["oidc_states"] = {mocked_state: {"nonce": "mocked_nonce"}}
request.session.save()
callback_view = OIDCAuthenticationCallbackView.as_view()
response = callback_view(request)
mocked_get_token.assert_called_once()
mocked_verify_token.assert_called_once()
mocked_get_userinfo.assert_called_once()
assert response.status_code == 302
assert response.url == "/auth/success"

View File

@@ -0,0 +1,148 @@
"""Fixtures for tests in the calendars core application"""
import base64
from unittest import mock
from django.core.cache import cache
from django.db import connection
import pytest
import responses
from cryptography.fernet import Fernet
from core import factories
from core.tests.utils.urls import reload_urls
USER = "user"
TEAM = "team"
VIA = [USER, TEAM]
@pytest.fixture(autouse=True)
def truncate_davical_tables(django_db_setup, django_db_blocker):
"""Fixture to truncate DAViCal tables at the start of each test.
DAViCal tables are created by the DAViCal container migrations, not Django.
We just truncate them to ensure clean state for each test.
"""
with django_db_blocker.unblock():
with connection.cursor() as cursor:
# Truncate DAViCal tables if they exist (created by DAViCal container)
cursor.execute("""
DO $$
BEGIN
IF EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'principal') THEN
TRUNCATE TABLE principal CASCADE;
END IF;
IF EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'usr') THEN
TRUNCATE TABLE usr CASCADE;
END IF;
END $$;
""")
yield
@pytest.fixture(autouse=True)
def clear_cache():
"""Fixture to clear the cache after each test."""
yield
cache.clear()
# Clear functools.cache for functions decorated with @functools.cache
@pytest.fixture
def mock_user_teams():
"""Mock for the "teams" property on the User model."""
with mock.patch(
"core.models.User.teams", new_callable=mock.PropertyMock
) as mock_teams:
yield mock_teams
def resource_server_backend_setup(settings):
"""
A fixture to create a user token for testing.
"""
assert (
settings.OIDC_RS_BACKEND_CLASS
== "lasuite.oidc_resource_server.backend.ResourceServerBackend"
)
settings.OIDC_RESOURCE_SERVER_ENABLED = True
settings.OIDC_RS_CLIENT_ID = "some_client_id"
settings.OIDC_RS_CLIENT_SECRET = "some_client_secret"
settings.OIDC_OP_URL = "https://oidc.example.com"
settings.OIDC_VERIFY_SSL = False
settings.OIDC_TIMEOUT = 5
settings.OIDC_PROXY = None
settings.OIDC_OP_JWKS_ENDPOINT = "https://oidc.example.com/jwks"
settings.OIDC_OP_INTROSPECTION_ENDPOINT = "https://oidc.example.com/introspect"
settings.OIDC_RS_SCOPES = ["openid", "groups"]
settings.OIDC_RS_ALLOWED_AUDIENCES = ["some_service_provider"]
@pytest.fixture
def resource_server_backend_conf(settings):
"""
A fixture to create a user token for testing.
"""
resource_server_backend_setup(settings)
reload_urls()
@pytest.fixture
def resource_server_backend(settings):
"""
A fixture to create a user token for testing.
Including a mocked introspection endpoint.
"""
resource_server_backend_setup(settings)
reload_urls()
with responses.RequestsMock() as rsps:
rsps.add(
responses.POST,
"https://oidc.example.com/introspect",
json={
"iss": "https://oidc.example.com",
"aud": "some_client_id", # settings.OIDC_RS_CLIENT_ID
"sub": "very-specific-sub",
"client_id": "some_service_provider",
"scope": "openid groups",
"active": True,
},
)
yield rsps
@pytest.fixture
def user_specific_sub():
"""
A fixture to create a user token for testing.
"""
user = factories.UserFactory(sub="very-specific-sub")
yield user
def build_authorization_bearer(token):
"""
Build an Authorization Bearer header value from a token.
This can be used like this:
client.post(
...
HTTP_AUTHORIZATION=f"Bearer {build_authorization_bearer('some_token')}",
)
"""
return base64.b64encode(token.encode("utf-8")).decode("utf-8")
@pytest.fixture
def user_token():
"""
A fixture to create a user token for testing.
"""
return build_authorization_bearer("some_token")

View File

@@ -0,0 +1,152 @@
"""
Tests for the Resource Server API for users.
Not testing external API endpoints that are already tested in the /api
because the resource server viewsets inherit from the api viewsets.
"""
import pytest
from rest_framework.test import APIClient
from core import factories
from core.api import serializers
from core.tests.utils.urls import reload_urls
pytestmark = pytest.mark.django_db
# pylint: disable=unused-argument
def test_api_users_me_anonymous_public_standalone():
"""
Anonymous users should not be allowed to retrieve their own user information from external
API if resource server is not enabled.
"""
reload_urls()
response = APIClient().get("/external_api/v1.0/users/me/")
assert response.status_code == 404
def test_api_users_me_connected_not_resource_server():
"""
Connected users should not be allowed to retrieve their own user information from external
API if resource server is not enabled.
"""
reload_urls()
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
response = client.get("/external_api/v1.0/users/me/")
assert response.status_code == 404
def test_api_users_me_connected_resource_server(
user_token, resource_server_backend, user_specific_sub
):
"""
Connected users should be allowed to retrieve their own user information from external API
if resource server is enabled.
"""
client = APIClient()
client.credentials(HTTP_AUTHORIZATION=f"Bearer {user_token}")
response = client.get("/external_api/v1.0/users/me/")
assert response.status_code == 200
data = response.json()
assert data["id"] == str(user_specific_sub.id)
assert data["email"] == user_specific_sub.email
def test_api_users_me_connected_resource_server_with_invalid_token(
user_token, resource_server_backend
):
"""
Connected users should not be allowed to retrieve their own user information from external API
if resource server is enabled with an invalid token.
"""
client = APIClient()
client.credentials(HTTP_AUTHORIZATION=f"Bearer {user_token}")
response = client.get("/external_api/v1.0/users/me/")
assert response.status_code == 401
# Non allowed actions on resource server.
def test_api_users_list_resource_server_not_allowed(
user_token, resource_server_backend, user_specific_sub
):
"""
Connected users should notbe allowed to list users from a resource server.
"""
client = APIClient()
client.credentials(HTTP_AUTHORIZATION=f"Bearer {user_token}")
response = client.get("/external_api/v1.0/users/")
assert response.status_code == 403
def test_api_users_retrieve_resource_server_not_allowed(
user_token, resource_server_backend, user_specific_sub
):
"""
Connected users should notbe allowed to list users from a resource server.
"""
client = APIClient()
client.credentials(HTTP_AUTHORIZATION=f"Bearer {user_token}")
other_user = factories.UserFactory()
response = client.get(f"/external_api/v1.0/users/{other_user.id!s}/")
assert response.status_code == 403
def test_api_users_put_patch_resource_server_not_allowed(
user_token, resource_server_backend, user_specific_sub
):
"""
Connected users should notbe allowed to list users from a resource server.
"""
client = APIClient()
client.credentials(HTTP_AUTHORIZATION=f"Bearer {user_token}")
other_user = factories.UserFactory()
new_user_values = serializers.UserSerializer(instance=factories.UserFactory()).data
response = client.put(
f"/external_api/v1.0/users/{other_user.id!s}/", new_user_values
)
assert response.status_code == 403
response = client.patch(
f"/external_api/v1.0/users/{other_user.id!s}/",
{"email": "new_email@example.com"},
)
assert response.status_code == 403
def test_api_users_delete_resource_server_not_allowed(
user_token, resource_server_backend, user_specific_sub
):
"""
Connected users should notbe allowed to list users from a resource server.
"""
client = APIClient()
client.credentials(HTTP_AUTHORIZATION=f"Bearer {user_token}")
other_user = factories.UserFactory()
response = client.delete(f"/external_api/v1.0/users/{other_user.id!s}/")
assert response.status_code == 403

View File

@@ -0,0 +1,42 @@
"""
Test suite for generated openapi schema.
"""
import json
from io import StringIO
from django.core.management import call_command
from django.test import Client
import pytest
pytestmark = pytest.mark.django_db
def test_openapi_client_schema():
"""
Generated and served OpenAPI client schema should be correct.
"""
# Start by generating the swagger.json file
output = StringIO()
call_command(
"spectacular",
"--api-version",
"v1.0",
"--urlconf",
"core.urls",
"--format",
"openapi-json",
"--file",
"core/tests/swagger/swagger.json",
stdout=output,
)
assert output.getvalue() == ""
response = Client().get("/api/v1.0/swagger.json")
assert response.status_code == 200
with open(
"core/tests/swagger/swagger.json", "r", encoding="utf-8"
) as expected_schema:
assert response.json() == json.load(expected_schema)

View File

@@ -0,0 +1,161 @@
"""
Test config API endpoints in the calendars core app.
"""
import json
from django.test import override_settings
import pytest
from rest_framework.status import (
HTTP_200_OK,
)
from rest_framework.test import APIClient
from core import factories
pytestmark = pytest.mark.django_db
@override_settings(
FRONTEND_THEME="test-theme",
FRONTEND_MORE_LINK="https://test.com",
FRONTEND_FEEDBACK_BUTTON_SHOW=True,
FRONTEND_FEEDBACK_BUTTON_IDLE=False,
FRONTEND_FEEDBACK_ITEMS={"form": {"url": "https://test.com"}},
FRONTEND_FEEDBACK_MESSAGES_WIDGET_ENABLED=True,
FRONTEND_FEEDBACK_MESSAGES_WIDGET_API_URL="https://test.com",
FRONTEND_FEEDBACK_MESSAGES_WIDGET_CHANNEL="test",
FRONTEND_FEEDBACK_MESSAGES_WIDGET_PATH="https://test.com",
FRONTEND_HIDE_GAUFRE=True,
MEDIA_BASE_URL="http://testserver/",
SENTRY_DSN="https://sentry.test/123",
THEME_CUSTOMIZATION_FILE_PATH="",
)
@pytest.mark.parametrize("is_authenticated", [False, True])
def test_api_config(is_authenticated):
"""Anonymous users should be allowed to get the configuration."""
client = APIClient()
if is_authenticated:
user = factories.UserFactory()
client.force_login(user)
response = client.get("/api/v1.0/config/")
assert response.status_code == HTTP_200_OK
assert response.json() == {
"ENVIRONMENT": "test",
"FRONTEND_THEME": "test-theme",
"FRONTEND_MORE_LINK": "https://test.com",
"FRONTEND_FEEDBACK_BUTTON_SHOW": True,
"FRONTEND_FEEDBACK_BUTTON_IDLE": False,
"FRONTEND_FEEDBACK_ITEMS": {"form": {"url": "https://test.com"}},
"FRONTEND_FEEDBACK_MESSAGES_WIDGET_ENABLED": True,
"FRONTEND_FEEDBACK_MESSAGES_WIDGET_API_URL": "https://test.com",
"FRONTEND_FEEDBACK_MESSAGES_WIDGET_CHANNEL": "test",
"FRONTEND_FEEDBACK_MESSAGES_WIDGET_PATH": "https://test.com",
"FRONTEND_HIDE_GAUFRE": True,
"LANGUAGES": [
["en-us", "English"],
["fr-fr", "French"],
["de-de", "German"],
["nl-nl", "Dutch"],
],
"LANGUAGE_CODE": "en-us",
"MEDIA_BASE_URL": "http://testserver/",
"SENTRY_DSN": "https://sentry.test/123",
"theme_customization": {},
}
@override_settings(
THEME_CUSTOMIZATION_FILE_PATH="/not/existing/file.json",
)
@pytest.mark.parametrize("is_authenticated", [False, True])
def test_api_config_with_invalid_theme_customization_file(is_authenticated):
"""Anonymous users should be allowed to get the configuration."""
client = APIClient()
if is_authenticated:
user = factories.UserFactory()
client.force_login(user)
response = client.get("/api/v1.0/config/")
assert response.status_code == HTTP_200_OK
content = response.json()
assert content["theme_customization"] == {}
@override_settings(
THEME_CUSTOMIZATION_FILE_PATH="/configuration/theme/invalid.json",
)
@pytest.mark.parametrize("is_authenticated", [False, True])
def test_api_config_with_invalid_json_theme_customization_file(is_authenticated, fs):
"""Anonymous users should be allowed to get the configuration."""
fs.create_file(
"/configuration/theme/invalid.json",
contents="invalid json",
)
client = APIClient()
if is_authenticated:
user = factories.UserFactory()
client.force_login(user)
response = client.get("/api/v1.0/config/")
assert response.status_code == HTTP_200_OK
content = response.json()
assert content["theme_customization"] == {}
@override_settings(
THEME_CUSTOMIZATION_FILE_PATH="/configuration/theme/default.json",
)
@pytest.mark.parametrize("is_authenticated", [False, True])
def test_api_config_with_theme_customization(is_authenticated, fs):
"""Anonymous users should be allowed to get the configuration."""
fs.create_file(
"/configuration/theme/default.json",
contents=json.dumps(
{
"colors": {
"primary": "#000000",
"secondary": "#000000",
},
}
),
)
client = APIClient()
if is_authenticated:
user = factories.UserFactory()
client.force_login(user)
response = client.get("/api/v1.0/config/")
assert response.status_code == HTTP_200_OK
content = response.json()
assert content["theme_customization"] == {
"colors": {
"primary": "#000000",
"secondary": "#000000",
},
}
@pytest.mark.parametrize("is_authenticated", [False, True])
def test_api_config_with_original_theme_customization(is_authenticated, settings):
"""Anonymous users should be allowed to get the configuration."""
client = APIClient()
if is_authenticated:
user = factories.UserFactory()
client.force_login(user)
response = client.get("/api/v1.0/config/")
assert response.status_code == HTTP_200_OK
content = response.json()
with open(settings.THEME_CUSTOMIZATION_FILE_PATH, "r", encoding="utf-8") as f:
theme_customization = json.load(f)
assert content["theme_customization"] == theme_customization

View File

@@ -0,0 +1,638 @@
"""
Test users API endpoints in the calendars core app.
"""
import pytest
from rest_framework.test import APIClient
from core import factories, models
from core.api import serializers
pytestmark = pytest.mark.django_db
def test_api_users_list_anonymous():
"""Anonymous users should not be allowed to list users."""
factories.UserFactory()
client = APIClient()
response = client.get("/api/v1.0/users/")
assert response.status_code == 401
assert response.json() == {
"errors": [
{
"attr": None,
"code": "not_authenticated",
"detail": "Authentication credentials were not provided.",
},
],
"type": "client_error",
}
def test_api_users_list_authenticated():
"""
Authenticated users should not be able to list users without a query.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
factories.UserFactory.create_batch(2)
response = client.get(
"/api/v1.0/users/",
)
assert response.status_code == 200
assert response.json() == []
def test_api_users_list_query_inactive():
"""Inactive users should not be listed."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
factories.UserFactory(email="john.doe@example.com", is_active=False)
lennon = factories.UserFactory(email="john.lennon@example.com")
# Use email query to get exact match
response = client.get("/api/v1.0/users/?q=john.lennon@example.com")
assert response.status_code == 200
user_ids = [user["id"] for user in response.json()]
assert user_ids == [str(lennon.id)]
# Inactive user should not be returned even with exact match
response = client.get("/api/v1.0/users/?q=john.doe@example.com")
assert response.status_code == 200
user_ids = [user["id"] for user in response.json()]
assert user_ids == []
def test_api_users_list_query_short_queries():
"""
Queries shorter than 5 characters should return an empty result set.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
factories.UserFactory(email="john.doe@example.com")
factories.UserFactory(email="john.lennon@example.com")
response = client.get("/api/v1.0/users/?q=jo")
assert response.status_code == 200
assert response.json() == []
response = client.get("/api/v1.0/users/?q=john")
assert response.status_code == 200
assert response.json() == []
# Non-email queries (without @) return empty
response = client.get("/api/v1.0/users/?q=john.")
assert response.status_code == 200
assert response.json() == []
def test_api_users_list_limit(settings):
"""
Authenticated users should be able to list users and the number of results
should be limited to 10.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
# Use a base name with a length equal 5 to test that the limit is applied
base_name = "alice"
for i in range(15):
factories.UserFactory(email=f"{base_name}.{i}@example.com")
# Non-email queries (without @) return empty
response = client.get(
"/api/v1.0/users/?q=alice",
)
assert response.status_code == 200
assert response.json() == []
# Email queries require exact match
settings.API_USERS_LIST_LIMIT = 100
response = client.get(
"/api/v1.0/users/?q=alice.0@example.com",
)
assert response.status_code == 200
assert len(response.json()) == 1
def test_api_users_list_throttling_authenticated(settings):
"""
Authenticated users should be throttled.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
settings.REST_FRAMEWORK["DEFAULT_THROTTLE_RATES"]["user_list_burst"] = "3/minute"
for _i in range(3):
response = client.get(
"/api/v1.0/users/?q=alice",
)
assert response.status_code == 200
response = client.get(
"/api/v1.0/users/?q=alice",
)
assert response.status_code == 429
def test_api_users_list_query_email():
"""
Authenticated users should be able to list users and filter by email.
Only exact email matches are returned (case-insensitive).
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
dave = factories.UserFactory(email="david.bowman@work.com")
factories.UserFactory(email="nicole.bowman@work.com")
# Exact match works
response = client.get(
"/api/v1.0/users/?q=david.bowman@work.com",
)
assert response.status_code == 200
user_ids = [user["id"] for user in response.json()]
assert user_ids == [str(dave.id)]
# Case-insensitive match works
response = client.get(
"/api/v1.0/users/?q=David.Bowman@Work.COM",
)
assert response.status_code == 200
user_ids = [user["id"] for user in response.json()]
assert user_ids == [str(dave.id)]
# Typos don't match (exact match only)
response = client.get(
"/api/v1.0/users/?q=davig.bovman@worm.com",
)
assert response.status_code == 200
user_ids = [user["id"] for user in response.json()]
assert user_ids == []
response = client.get(
"/api/v1.0/users/?q=davig.bovman@worm.cop",
)
assert response.status_code == 200
user_ids = [user["id"] for user in response.json()]
assert user_ids == []
def test_api_users_list_query_email_matching():
"""Email queries return exact matches only (case-insensitive)."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
user1 = factories.UserFactory(email="alice.johnson@example.gouv.fr")
user2 = factories.UserFactory(email="alice.johnnson@example.gouv.fr")
user3 = factories.UserFactory(email="alice.kohlson@example.gouv.fr")
user4 = factories.UserFactory(email="alicia.johnnson@example.gouv.fr")
user5 = factories.UserFactory(email="alicia.johnnson@example.gov.uk")
factories.UserFactory(email="alice.thomson@example.gouv.fr")
# Exact match returns only that user
response = client.get(
"/api/v1.0/users/?q=alice.johnson@example.gouv.fr",
)
assert response.status_code == 200
user_ids = [user["id"] for user in response.json()]
assert user_ids == [str(user1.id)]
# Different email returns different user
response = client.get("/api/v1.0/users/?q=alicia.johnnson@example.gouv.fr")
assert response.status_code == 200
user_ids = [user["id"] for user in response.json()]
assert user_ids == [str(user4.id)]
def test_api_users_retrieve_me_anonymous():
"""Anonymous users should not be allowed to list users."""
factories.UserFactory.create_batch(2)
client = APIClient()
response = client.get("/api/v1.0/users/me/")
assert response.status_code == 401
assert response.json() == {
"errors": [
{
"attr": None,
"code": "not_authenticated",
"detail": "Authentication credentials were not provided.",
},
],
"type": "client_error",
}
def test_api_users_retrieve_me_authenticated():
"""Authenticated users should be able to retrieve their own user via the "/users/me" path."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
factories.UserFactory.create_batch(2)
response = client.get(
"/api/v1.0/users/me/",
)
assert response.status_code == 200
assert response.json() == {
"id": str(user.id),
"email": user.email,
"full_name": user.full_name,
"short_name": user.short_name,
"language": user.language,
}
def test_api_users_retrieve_anonymous():
"""Anonymous users should not be allowed to retrieve a user."""
client = APIClient()
user = factories.UserFactory()
response = client.get(f"/api/v1.0/users/{user.id!s}/")
assert response.status_code == 401
assert response.json() == {
"errors": [
{
"attr": None,
"code": "not_authenticated",
"detail": "Authentication credentials were not provided.",
},
],
"type": "client_error",
}
def test_api_users_retrieve_authenticated_self():
"""
Authenticated users should be allowed to retrieve their own user.
The returned object should not contain the password.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
response = client.get(
f"/api/v1.0/users/{user.id!s}/",
)
assert response.status_code == 405
assert response.json() == {
"errors": [
{
"attr": None,
"code": "method_not_allowed",
"detail": 'Method "GET" not allowed.',
},
],
"type": "client_error",
}
def test_api_users_retrieve_authenticated_other():
"""
Authenticated users should be able to retrieve another user's detail view with
limited information.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
other_user = factories.UserFactory()
response = client.get(
f"/api/v1.0/users/{other_user.id!s}/",
)
assert response.status_code == 405
assert response.json() == {
"errors": [
{
"attr": None,
"code": "method_not_allowed",
"detail": 'Method "GET" not allowed.',
},
],
"type": "client_error",
}
def test_api_users_create_anonymous():
"""Anonymous users should not be able to create users via the API."""
response = APIClient().post(
"/api/v1.0/users/",
{
"language": "fr-fr",
"password": "mypassword",
},
)
assert response.status_code == 401
assert response.json() == {
"errors": [
{
"attr": None,
"code": "not_authenticated",
"detail": "Authentication credentials were not provided.",
},
],
"type": "client_error",
}
assert models.User.objects.exists() is False
def test_api_users_create_authenticated():
"""Authenticated users should not be able to create users via the API."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
response = client.post(
"/api/v1.0/users/",
{
"language": "fr-fr",
"password": "mypassword",
},
format="json",
)
assert response.status_code == 405
assert response.json() == {
"errors": [
{
"attr": None,
"code": "method_not_allowed",
"detail": 'Method "POST" not allowed.',
},
],
"type": "client_error",
}
assert models.User.objects.exclude(id=user.id).exists() is False
def test_api_users_update_anonymous():
"""Anonymous users should not be able to update users via the API."""
user = factories.UserFactory()
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = serializers.UserSerializer(instance=factories.UserFactory()).data
response = APIClient().put(
f"/api/v1.0/users/{user.id!s}/",
new_user_values,
format="json",
)
assert response.status_code == 401
assert response.json() == {
"errors": [
{
"attr": None,
"code": "not_authenticated",
"detail": "Authentication credentials were not provided.",
},
],
"type": "client_error",
}
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
assert value == old_user_values[key]
def test_api_users_update_authenticated_self():
"""
Authenticated users should be able to update their own user but only "language"
and "timezone" fields.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = dict(
serializers.UserSerializer(instance=factories.UserFactory()).data
)
response = client.put(
f"/api/v1.0/users/{user.id!s}/",
new_user_values,
format="json",
)
assert response.status_code == 200
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
if key in ["language", "timezone"]:
assert value == new_user_values[key]
else:
assert value == old_user_values[key]
def test_api_users_update_authenticated_other():
"""Authenticated users should not be allowed to update other users."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
user = factories.UserFactory()
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = serializers.UserSerializer(instance=factories.UserFactory()).data
response = client.put(
f"/api/v1.0/users/{user.id!s}/",
new_user_values,
format="json",
)
assert response.status_code == 403
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
assert value == old_user_values[key]
def test_api_users_patch_anonymous():
"""Anonymous users should not be able to patch users via the API."""
user = factories.UserFactory()
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = dict(
serializers.UserSerializer(instance=factories.UserFactory()).data
)
for key, new_value in new_user_values.items():
response = APIClient().patch(
f"/api/v1.0/users/{user.id!s}/",
{key: new_value},
format="json",
)
assert response.status_code == 401
assert response.json() == {
"errors": [
{
"attr": None,
"code": "not_authenticated",
"detail": "Authentication credentials were not provided.",
},
],
"type": "client_error",
}
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
assert value == old_user_values[key]
def test_api_users_patch_authenticated_self():
"""
Authenticated users should be able to patch their own user but only "language"
and "timezone" fields.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = dict(
serializers.UserSerializer(instance=factories.UserFactory()).data
)
for key, new_value in new_user_values.items():
response = client.patch(
f"/api/v1.0/users/{user.id!s}/",
{key: new_value},
format="json",
)
assert response.status_code == 200
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
if key in ["language", "timezone"]:
assert value == new_user_values[key]
else:
assert value == old_user_values[key]
def test_api_users_patch_authenticated_other():
"""Authenticated users should not be allowed to patch other users."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
user = factories.UserFactory()
old_user_values = dict(serializers.UserSerializer(instance=user).data)
new_user_values = dict(
serializers.UserSerializer(instance=factories.UserFactory()).data
)
for key, new_value in new_user_values.items():
response = client.put(
f"/api/v1.0/users/{user.id!s}/",
{key: new_value},
format="json",
)
assert response.status_code == 403
user.refresh_from_db()
user_values = dict(serializers.UserSerializer(instance=user).data)
for key, value in user_values.items():
assert value == old_user_values[key]
def test_api_users_delete_list_anonymous():
"""Anonymous users should not be allowed to delete a list of users."""
factories.UserFactory.create_batch(2)
client = APIClient()
response = client.delete("/api/v1.0/users/")
assert response.status_code == 401
assert models.User.objects.count() == 2
def test_api_users_delete_list_authenticated():
"""Authenticated users should not be allowed to delete a list of users."""
factories.UserFactory.create_batch(2)
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
response = client.delete(
"/api/v1.0/users/",
)
assert response.status_code == 405
assert models.User.objects.count() == 3
def test_api_users_delete_anonymous():
"""Anonymous users should not be allowed to delete a user."""
user = factories.UserFactory()
response = APIClient().delete(f"/api/v1.0/users/{user.id!s}/")
assert response.status_code == 401
assert models.User.objects.count() == 1
def test_api_users_delete_authenticated():
"""
Authenticated users should not be allowed to delete a user other than themselves.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
other_user = factories.UserFactory()
response = client.delete(
f"/api/v1.0/users/{other_user.id!s}/",
)
assert response.status_code == 405
assert models.User.objects.count() == 2
def test_api_users_delete_self():
"""Authenticated users should not be able to delete their own user."""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
response = client.delete(
f"/api/v1.0/users/{user.id!s}/",
)
assert response.status_code == 405
assert models.User.objects.count() == 1

View File

@@ -0,0 +1,71 @@
"""Tests for CalDAV service integration with DAViCal."""
from unittest.mock import Mock, patch
from django.conf import settings
import pytest
from core import factories
from core.services.caldav_service import CalendarService, DAViCalClient
@pytest.mark.django_db
class TestDAViCalClient:
"""Tests for DAViCalClient authentication and communication."""
def test_get_client_sends_x_forwarded_user_header(self):
"""Test that DAVClient is configured with X-Forwarded-User header."""
user = factories.UserFactory(email="test@example.com")
client = DAViCalClient()
dav_client = client._get_client(user)
# Verify the client is configured correctly
assert dav_client.username == user.email
# Password should be empty (None or empty string) for external auth
assert not dav_client.password or dav_client.password == ""
# Verify the X-Forwarded-User header is set
# The caldav library stores headers as a CaseInsensitiveDict
assert hasattr(dav_client, "headers")
assert "X-Forwarded-User" in dav_client.headers
assert dav_client.headers["X-Forwarded-User"] == user.email
@pytest.mark.skipif(
not getattr(settings, "DAVICAL_URL", None),
reason="DAViCal URL not configured",
)
def test_create_calendar_authenticates_with_davical(self):
"""Test that calendar creation authenticates successfully with DAViCal."""
user = factories.UserFactory(email="test@example.com")
client = DAViCalClient()
# Ensure user exists in DAViCal
client.ensure_user_exists(user)
# Try to create a calendar - this should authenticate successfully
calendar_path = client.create_calendar(
user, calendar_name="Test Calendar", calendar_id="test-calendar-id"
)
# Verify calendar path was returned
assert calendar_path is not None
assert calendar_path.startswith("/caldav.php/")
assert user.email in calendar_path
def test_calendar_service_creates_calendar(self):
"""Test that CalendarService can create a calendar through DAViCal."""
user = factories.UserFactory(email="test@example.com")
service = CalendarService()
# Create a calendar
calendar = service.create_calendar(user, name="My Calendar", color="#ff0000")
# Verify calendar was created
assert calendar is not None
assert calendar.owner == user
assert calendar.name == "My Calendar"
assert calendar.color == "#ff0000"
assert calendar.davical_path is not None
assert calendar.davical_path.startswith("/caldav.php/")

View File

@@ -0,0 +1,46 @@
"""
Unit tests for the User model
"""
from unittest import mock
from django.core.exceptions import ValidationError
import pytest
from core import factories, models
pytestmark = pytest.mark.django_db
def test_models_users_str():
"""The str representation should be the email."""
user = factories.UserFactory()
assert str(user) == user.email
def test_models_users_id_unique():
"""The "id" field should be unique."""
user = factories.UserFactory()
with pytest.raises(ValidationError, match="User with this Id already exists."):
factories.UserFactory(id=user.id)
def test_models_users_send_mail_main_existing():
"""The "email_user' method should send mail to the user's email address."""
user = factories.UserFactory()
with mock.patch("django.core.mail.send_mail") as mock_send:
user.email_user("my subject", "my message")
mock_send.assert_called_once_with("my subject", "my message", None, [user.email])
def test_models_users_send_mail_main_missing():
"""The "email_user' method should fail if the user has no email address."""
user = factories.UserFactory(email=None)
with pytest.raises(ValueError) as excinfo:
user.email_user("my subject", "my message")
assert str(excinfo.value) == "User has no email address."

View File

@@ -0,0 +1,30 @@
"""
Unit tests for the User model
"""
import pytest
from calendars.settings import Base
def test_invalid_settings_oidc_email_configuration():
"""
The OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION and OIDC_ALLOW_DUPLICATE_EMAILS settings
should not be both set to True simultaneously.
"""
class TestSettings(Base):
"""Fake test settings."""
OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION = True
OIDC_ALLOW_DUPLICATE_EMAILS = True
# The validation is performed during post_setup
with pytest.raises(ValueError) as excinfo:
TestSettings().post_setup()
# Check the exception message
assert str(excinfo.value) == (
"Both OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION and "
"OIDC_ALLOW_DUPLICATE_EMAILS cannot be set to True simultaneously. "
)

View File

@@ -0,0 +1,20 @@
"""Utils for testing URLs."""
import importlib
from django.urls import clear_url_caches
def reload_urls():
"""
Reload the URLs. Since the url are loaded based on a
settings value, we need to reload the urls to make the
URL settings based condition effective.
"""
import core.urls # pylint:disable=import-outside-toplevel # noqa: PLC0415
import calendars.urls # pylint:disable=import-outside-toplevel # noqa: PLC0415
importlib.reload(core.urls)
importlib.reload(calendars.urls)
clear_url_caches()

61
src/backend/core/urls.py Normal file
View File

@@ -0,0 +1,61 @@
"""URL configuration for the core app."""
from django.conf import settings
from django.urls import include, path, re_path
from lasuite.oidc_login.urls import urlpatterns as oidc_urls
from rest_framework.routers import DefaultRouter
from core.api import viewsets
from core.api.viewsets_caldav import CalDAVProxyView
from core.external_api import viewsets as external_api_viewsets
# - Main endpoints
router = DefaultRouter()
router.register("users", viewsets.UserViewSet, basename="users")
router.register("calendars", viewsets.CalendarViewSet, basename="calendars")
urlpatterns = [
path(
f"api/{settings.API_VERSION}/",
include(
[
*router.urls,
*oidc_urls,
# CalDAV proxy - root path (must come before catch-all to match /caldav exactly)
path("caldav", CalDAVProxyView.as_view(), name="caldav-root"),
path("caldav/", CalDAVProxyView.as_view(), name="caldav-root-slash"),
# CalDAV proxy - catch all paths with content
re_path(
r"^caldav/(?P<path>.+)$",
CalDAVProxyView.as_view(),
name="caldav-proxy",
),
]
),
),
path(f"api/{settings.API_VERSION}/config/", viewsets.ConfigView.as_view()),
]
if settings.OIDC_RESOURCE_SERVER_ENABLED:
# - Resource server routes
external_api_router = DefaultRouter()
users_access_config = settings.EXTERNAL_API.get("users", {})
if users_access_config.get("enabled", False):
external_api_router.register(
"users",
external_api_viewsets.ResourceServerUserViewSet,
basename="resource_server_users",
)
external_api_urls = [*external_api_router.urls]
if external_api_urls:
urlpatterns.append(
path(
f"external_api/{settings.API_VERSION}/",
include(external_api_urls),
)
)