diff --git a/.github/workflows/calendars.yml b/.github/workflows/calendars.yml
index 11c9b23..1ab02ef 100644
--- a/.github/workflows/calendars.yml
+++ b/.github/workflows/calendars.yml
@@ -9,32 +9,6 @@ on:
- "*"
jobs:
- check-changelog:
- runs-on: ubuntu-latest
- if: |
- contains(github.event.pull_request.labels.*.name, 'noChangeLog') == false &&
- github.event_name == 'pull_request'
- steps:
- - name: Checkout repository
- uses: actions/checkout@v6
- with:
- fetch-depth: 50
- - name: Check that the CHANGELOG has been modified in the current branch
- run: git diff --name-only ${{ github.event.pull_request.base.sha }} ${{ github.event.after }} | grep 'CHANGELOG.md'
-
- lint-changelog:
- runs-on: ubuntu-latest
- steps:
- - name: Checkout repository
- uses: actions/checkout@v6
- - name: Check CHANGELOG max line length
- run: |
- max_line_length=$(cat CHANGELOG.md | grep -Ev "^\[.*\]: https://github.com" | wc -L)
- if [ $max_line_length -ge 80 ]; then
- echo "ERROR: CHANGELOG has lines longer than 80 characters."
- exit 1
- fi
-
lint-back:
runs-on: ubuntu-latest
defaults:
diff --git a/docker/sabredav/init-database.sh b/docker/sabredav/init-database.sh
old mode 100644
new mode 100755
diff --git a/docs/entitlements.md b/docs/entitlements.md
new file mode 100644
index 0000000..1a16500
--- /dev/null
+++ b/docs/entitlements.md
@@ -0,0 +1,246 @@
+# Entitlements System
+
+The entitlements system provides a pluggable backend architecture for
+checking whether a user is allowed to access the application. It
+integrates with the DeployCenter API in production and uses a local
+backend for development.
+
+Unlike La Suite Messages, Calendars only checks `can_access` — there
+is no admin permission sync.
+
+## Architecture
+
+```
+┌─────────────────────────────────────────────┐
+│ OIDC Authentication Backend │
+│ post_get_or_create_user() — warms cache │
+└──────────────┬──────────────────────────────┘
+ │
+┌──────────────▼──────────────────────────────┐
+│ UserMeSerializer │
+│ GET /users/me/ → { can_access: bool } │
+└──────────────┬──────────────────────────────┘
+ │
+┌──────────────▼──────────────────────────────┐
+│ Service Layer │
+│ get_user_entitlements() │
+└──────────────┬──────────────────────────────┘
+ │
+┌──────────────▼──────────────────────────────┐
+│ Backend Factory (singleton) │
+│ get_entitlements_backend() │
+└──────────────┬──────────────────────────────┘
+ │
+ ┌───────┴───────┐
+ │ │
+┌──────▼─────┐ ┌──────▼───────────────┐
+│ Local │ │ DeployCenter │
+│ Backend │ │ Backend │
+│ (dev/test) │ │ (production, cached) │
+└────────────┘ └──────────────────────┘
+```
+
+### Components
+
+- **Service layer** (`core/entitlements/__init__.py`): Public
+ `get_user_entitlements()` function and
+ `EntitlementsUnavailableError` exception.
+- **Backend factory** (`core/entitlements/factory.py`):
+ `@functools.cache` singleton that imports and instantiates the
+ configured backend class.
+- **Abstract base** (`core/entitlements/backends/base.py`): Defines
+ the `EntitlementsBackend` interface.
+- **Local backend** (`core/entitlements/backends/local.py`): Always
+ grants access. Used for local development.
+- **DeployCenter backend**
+ (`core/entitlements/backends/deploycenter.py`): Calls the
+ DeployCenter API with Django cache and stale fallback.
+
+### Integration points
+
+1. **OIDC login** (`core/authentication/backends.py`):
+ `post_get_or_create_user()` calls `get_user_entitlements()` with
+ `force_refresh=True` to warm the cache. Login always succeeds
+ regardless of `can_access` value — access is gated at API level
+ and in the frontend.
+2. **User API** (`core/api/serializers.py`): `UserMeSerializer`
+ exposes `can_access` as a field on `GET /users/me/`. Fail-open:
+ returns `True` when entitlements are unavailable.
+3. **Default calendar creation** (`core/signals.py`):
+ `provision_default_calendar` checks entitlements before creating a
+ calendar for a new user. Fail-closed: skips creation when
+ entitlements are unavailable.
+4. **CalDAV proxy** (`core/api/viewsets_caldav.py`): Blocks
+ `MKCALENDAR` and `MKCOL` methods for non-entitled users.
+ Other methods (PROPFIND, REPORT, GET, PUT, DELETE) are allowed
+ so that users invited to shared calendars can still use them.
+ Fail-closed: denies creation when entitlements are unavailable.
+5. **Import events** (`core/api/viewsets.py`): Blocks
+ `POST /calendars/import-events/` for non-entitled users.
+ Fail-closed: denies import when entitlements are unavailable.
+6. **Frontend** (`pages/index.tsx`, `pages/calendar.tsx`): Checks
+ `user.can_access` and redirects to `/no-access` when `false`.
+ Calendar creation uses MKCALENDAR via CalDAV proxy (no Django
+ endpoint).
+
+### Error handling
+
+- **Login is fail-open**: if the entitlements service is unavailable,
+ login succeeds and the cache warming is skipped.
+- **User API is fail-open**: if the entitlements service is
+ unavailable, `can_access` defaults to `True`.
+- **Calendar creation is fail-closed**: if the entitlements service
+ is unavailable, the default calendar is not created (avoids
+ provisioning resources for users who may not be entitled).
+- **CalDAV proxy MKCALENDAR/MKCOL is fail-closed**: if the
+ entitlements service is unavailable, calendar creation via CalDAV
+ is denied (returns 403).
+- **Import events is fail-closed**: if the entitlements service is
+ unavailable, ICS import is denied (returns 403).
+- The DeployCenter backend falls back to stale cached data when the
+ API is unavailable.
+- `EntitlementsUnavailableError` is only raised when the API fails
+ **and** no cache exists.
+
+## Configuration
+
+### Environment variables
+
+| Variable | Default | Description |
+|---|---|---|
+| `ENTITLEMENTS_BACKEND` | `core.entitlements.backends.local.LocalEntitlementsBackend` | Python import path of the backend class |
+| `ENTITLEMENTS_BACKEND_PARAMETERS` | `{}` | JSON object passed to the backend constructor |
+| `ENTITLEMENTS_CACHE_TIMEOUT` | `300` | Cache TTL in seconds |
+
+### DeployCenter backend parameters
+
+When using
+`core.entitlements.backends.deploycenter.DeployCenterEntitlementsBackend`,
+provide these in `ENTITLEMENTS_BACKEND_PARAMETERS`:
+
+```json
+{
+ "base_url": "https://deploycenter.example.com/api/v1.0/entitlements/",
+ "service_id": "calendar",
+ "api_key": "your-api-key",
+ "timeout": 10,
+ "oidc_claims": ["siret"]
+}
+```
+
+| Parameter | Required | Description |
+|---|---|---|
+| `base_url` | Yes | Full URL of the DeployCenter entitlements endpoint |
+| `service_id` | Yes | Service identifier in DeployCenter |
+| `api_key` | Yes | API key for `X-Service-Auth: Bearer` header |
+| `timeout` | No | HTTP timeout in seconds (default: 10) |
+| `oidc_claims` | No | OIDC claim names to forward as query params |
+
+### Example production configuration
+
+```bash
+ENTITLEMENTS_BACKEND=core.entitlements.backends.deploycenter.DeployCenterEntitlementsBackend
+ENTITLEMENTS_BACKEND_PARAMETERS='{"base_url":"https://deploycenter.example.com/api/v1.0/entitlements/","service_id":"calendar","api_key":"secret","timeout":10,"oidc_claims":["siret"]}'
+ENTITLEMENTS_CACHE_TIMEOUT=300
+```
+
+## Backend interface
+
+Custom backends must extend `EntitlementsBackend` and implement:
+
+```python
+class MyBackend(EntitlementsBackend):
+ def __init__(self, **kwargs):
+ # Receives ENTITLEMENTS_BACKEND_PARAMETERS as kwargs
+ pass
+
+ def get_user_entitlements(
+ self, user_sub, user_email, user_info=None, force_refresh=False
+ ):
+ # Return: {"can_access": bool}
+ # Raise EntitlementsUnavailableError on failure.
+ pass
+```
+
+## DeployCenter API
+
+The DeployCenter backend calls:
+
+```
+GET {base_url}?service_id=X&account_type=user&account_email=X
+```
+
+Headers: `X-Service-Auth: Bearer {api_key}`
+
+Query parameters include any configured `oidc_claims` extracted from
+the OIDC user_info response (e.g. `siret`).
+
+Expected response: `{"entitlements": {"can_access": true}}`
+
+## Access control flow
+
+The entitlements check follows a two-step approach: the backend
+exposes entitlements data, and the frontend gates access.
+
+### On login
+
+1. User authenticates via OIDC — login always succeeds
+2. `post_get_or_create_user` calls `get_user_entitlements()` with
+ `force_refresh=True` to warm the cache
+3. If entitlements are unavailable, a warning is logged but login
+ proceeds
+
+### On page load
+
+1. Frontend calls `GET /users/me/` which includes `can_access`
+2. If `can_access` is `false`, the frontend redirects to `/no-access`
+3. The user remains authenticated — they see the header, logo, and
+ their profile, but cannot use the app
+4. The `/no-access` page offers a logout button and a message to
+ contact support
+
+This approach ensures users always have a session (important for
+shared calendars and other interactions) while still gating access to
+the main application.
+
+### Caching behavior
+
+- The DeployCenter backend caches results in Django's cache framework
+ (key: `entitlements:user:{user_sub}`, TTL:
+ `ENTITLEMENTS_CACHE_TIMEOUT`).
+- On login, `force_refresh=True` bypasses the cache for fresh data.
+- If the API fails during a forced refresh, stale cached data is
+ returned as fallback.
+- Subsequent `GET /users/me/` calls use the cached value (no
+ `force_refresh`).
+
+## Frontend
+
+Users denied access see `/no-access` — a page using the main layout
+(header with logo and user profile visible) with:
+
+- A message explaining the app is not available for their account
+- A suggestion to contact support
+- A logout button
+
+The user is fully authenticated and can see their profile in the
+header, but cannot access calendars or events.
+
+## Key files
+
+| Area | Path |
+|------|------|
+| Service layer | `src/backend/core/entitlements/__init__.py` |
+| Backend factory | `src/backend/core/entitlements/factory.py` |
+| Abstract base | `src/backend/core/entitlements/backends/base.py` |
+| Local backend | `src/backend/core/entitlements/backends/local.py` |
+| DeployCenter backend | `src/backend/core/entitlements/backends/deploycenter.py` |
+| Auth integration | `src/backend/core/authentication/backends.py` |
+| User API serializer | `src/backend/core/api/serializers.py` |
+| Calendar gating (signal) | `src/backend/core/signals.py` |
+| CalDAV proxy gating | `src/backend/core/api/viewsets_caldav.py` |
+| Import events gating | `src/backend/core/api/viewsets.py` |
+| No-access page | `src/frontend/apps/calendars/src/pages/no-access.tsx` |
+| Homepage gate | `src/frontend/apps/calendars/src/pages/index.tsx` |
+| Calendar gate | `src/frontend/apps/calendars/src/pages/calendar.tsx` |
+| Tests | `src/backend/core/tests/test_entitlements.py` |
diff --git a/renovate.json b/renovate.json
deleted file mode 100644
index 7aa9e0f..0000000
--- a/renovate.json
+++ /dev/null
@@ -1,25 +0,0 @@
-{
- "extends": ["github>suitenumerique/renovate-configuration"],
- "dependencyDashboard": true,
- "labels": ["dependencies", "noChangeLog"],
- "packageRules": [
- {
- "groupName": "allowed redis versions",
- "matchManagers": ["pep621"],
- "matchPackageNames": ["redis"],
- "allowedVersions": "<6.0.0"
- },
- {
- "groupName": "allowed pylint versions",
- "matchManagers": ["pep621"],
- "matchPackageNames": ["pylint"],
- "allowedVersions": "<4.0.0"
- },
- {
- "description": "Disable requires-python updates - managed manually",
- "matchManagers": ["pep621"],
- "matchPackageNames": ["python"],
- "enabled": false
- }
- ]
-}
\ No newline at end of file
diff --git a/src/backend/calendars/settings.py b/src/backend/calendars/settings.py
index 2989718..0eebf70 100755
--- a/src/backend/calendars/settings.py
+++ b/src/backend/calendars/settings.py
@@ -133,6 +133,23 @@ class Base(Configuration):
environ_prefix=None,
)
+ # Entitlements
+ ENTITLEMENTS_BACKEND = values.Value(
+ "core.entitlements.backends.local.LocalEntitlementsBackend",
+ environ_name="ENTITLEMENTS_BACKEND",
+ environ_prefix=None,
+ )
+ ENTITLEMENTS_BACKEND_PARAMETERS = values.DictValue(
+ {},
+ environ_name="ENTITLEMENTS_BACKEND_PARAMETERS",
+ environ_prefix=None,
+ )
+ ENTITLEMENTS_CACHE_TIMEOUT = values.IntegerValue(
+ 300,
+ environ_name="ENTITLEMENTS_CACHE_TIMEOUT",
+ environ_prefix=None,
+ )
+
# Security
ALLOWED_HOSTS = values.ListValue([])
SECRET_KEY = SecretFileValue(None)
diff --git a/src/backend/core/api/permissions.py b/src/backend/core/api/permissions.py
index f94a19d..de57cda 100644
--- a/src/backend/core/api/permissions.py
+++ b/src/backend/core/api/permissions.py
@@ -1,9 +1,15 @@
"""Permission handlers for the calendars core app."""
+import logging
+
from django.core import exceptions
from rest_framework import permissions
+from core.entitlements import EntitlementsUnavailableError, get_user_entitlements
+
+logger = logging.getLogger(__name__)
+
ACTION_FOR_METHOD_TO_PERMISSION = {
"versions_detail": {"DELETE": "versions_destroy", "GET": "versions_retrieve"},
"children": {"GET": "children_list", "POST": "children_create"},
@@ -60,6 +66,23 @@ class IsOwnedOrPublic(IsAuthenticated):
return False
+class IsEntitled(IsAuthenticated):
+ """Allows access only to users with can_access entitlement.
+
+ Fail-closed: denies access when the entitlements service is
+ unavailable and no cached value exists.
+ """
+
+ def has_permission(self, request, view):
+ if not super().has_permission(request, view):
+ return False
+ try:
+ entitlements = get_user_entitlements(request.user.sub, request.user.email)
+ return entitlements.get("can_access", True)
+ except EntitlementsUnavailableError:
+ return False
+
+
class AccessPermission(permissions.BasePermission):
"""Permission class for access objects."""
diff --git a/src/backend/core/api/serializers.py b/src/backend/core/api/serializers.py
index 0441a3b..8575d00 100644
--- a/src/backend/core/api/serializers.py
+++ b/src/backend/core/api/serializers.py
@@ -7,6 +7,7 @@ from django.utils.translation import gettext_lazy as _
from rest_framework import exceptions, serializers
from core import models
+from core.entitlements import EntitlementsUnavailableError, get_user_entitlements
class UserLiteSerializer(serializers.ModelSerializer):
@@ -108,18 +109,20 @@ class UserSerializer(serializers.ModelSerializer):
class UserMeSerializer(UserSerializer):
"""Serialize users for me endpoint."""
+ can_access = serializers.SerializerMethodField(read_only=True)
+
class Meta:
model = models.User
- fields = UserSerializer.Meta.fields
- read_only_fields = UserSerializer.Meta.read_only_fields
+ fields = [*UserSerializer.Meta.fields, "can_access"]
+ read_only_fields = [*UserSerializer.Meta.read_only_fields, "can_access"]
-
-class CalendarCreateSerializer(serializers.Serializer): # pylint: disable=abstract-method
- """Serializer for creating a Calendar (CalDAV only, no Django model)."""
-
- name = serializers.CharField(max_length=255)
- color = serializers.CharField(max_length=7, required=False, default="")
- description = serializers.CharField(required=False, default="")
+ def get_can_access(self, user) -> bool:
+ """Check entitlements for the current user."""
+ try:
+ entitlements = get_user_entitlements(user.sub, user.email)
+ return entitlements.get("can_access", True)
+ except EntitlementsUnavailableError:
+ return True # fail-open
class CalendarSubscriptionTokenSerializer(serializers.ModelSerializer):
diff --git a/src/backend/core/api/viewsets.py b/src/backend/core/api/viewsets.py
index dd23790..a62d679 100644
--- a/src/backend/core/api/viewsets.py
+++ b/src/backend/core/api/viewsets.py
@@ -18,7 +18,6 @@ from rest_framework.throttling import UserRateThrottle
from core import models
from core.services.caldav_service import (
- CalendarService,
normalize_caldav_path,
verify_caldav_access,
)
@@ -266,34 +265,15 @@ class ConfigView(drf.views.APIView):
class CalendarViewSet(viewsets.GenericViewSet):
"""ViewSet for calendar operations.
- create: Create a new calendar (CalDAV only, no Django record).
import_events: Import events from an ICS file.
"""
permission_classes = [IsAuthenticated]
- serializer_class = serializers.CalendarCreateSerializer
- def create(self, request):
- """Create a new calendar via CalDAV.
-
- POST /api/v1.0/calendars/
- Body: { name, color?, description? }
- Returns: { caldav_path }
- """
- serializer = serializers.CalendarCreateSerializer(data=request.data)
- serializer.is_valid(raise_exception=True)
-
- service = CalendarService()
- caldav_path = service.create_calendar(
- user=request.user,
- name=serializer.validated_data["name"],
- color=serializer.validated_data.get("color", ""),
- )
-
- return drf_response.Response(
- {"caldav_path": caldav_path},
- status=status.HTTP_201_CREATED,
- )
+ def get_permissions(self):
+ if self.action == "import_events":
+ return [permissions.IsEntitled()]
+ return super().get_permissions()
@action(
detail=False,
diff --git a/src/backend/core/api/viewsets_caldav.py b/src/backend/core/api/viewsets_caldav.py
index 0549f0d..4e7bc81 100644
--- a/src/backend/core/api/viewsets_caldav.py
+++ b/src/backend/core/api/viewsets_caldav.py
@@ -12,6 +12,7 @@ from django.views.decorators.csrf import csrf_exempt
import requests
+from core.entitlements import EntitlementsUnavailableError, get_user_entitlements
from core.services.caldav_service import CalDAVHTTPClient, validate_caldav_proxy_path
from core.services.calendar_invitation_service import calendar_invitation_service
@@ -29,7 +30,28 @@ class CalDAVProxyView(View):
Authentication is handled via session cookies instead.
"""
- def dispatch(self, request, *args, **kwargs): # noqa: PLR0912 # pylint: disable=too-many-branches
+ @staticmethod
+ def _check_entitlements_for_creation(user):
+ """Check if user is entitled to create calendars.
+
+ Returns None if allowed, or an HttpResponse(403) if denied.
+ Fail-closed: denies if the entitlements service is unavailable.
+ """
+ try:
+ entitlements = get_user_entitlements(user.sub, user.email)
+ if not entitlements.get("can_access", True):
+ return HttpResponse(
+ status=403,
+ content="Calendar creation not allowed",
+ )
+ except EntitlementsUnavailableError:
+ return HttpResponse(
+ status=403,
+ content="Calendar creation not allowed",
+ )
+ return None
+
+ def dispatch(self, request, *args, **kwargs): # noqa: PLR0912, PLR0911, PLR0915 # pylint: disable=too-many-branches,too-many-return-statements,too-many-statements
"""Forward all HTTP methods to CalDAV server."""
# Handle CORS preflight requests
if request.method == "OPTIONS":
@@ -45,6 +67,13 @@ class CalDAVProxyView(View):
if not request.user.is_authenticated:
return HttpResponse(status=401)
+ # Block calendar creation (MKCALENDAR/MKCOL) for non-entitled users.
+ # Other methods (GET, PROPFIND, REPORT, PUT, DELETE, etc.) are allowed
+ # so that users invited to shared calendars can still use them.
+ if request.method in ("MKCALENDAR", "MKCOL"):
+ if denied := self._check_entitlements_for_creation(request.user):
+ return denied
+
# Build the CalDAV server URL
path = kwargs.get("path", "")
diff --git a/src/backend/core/authentication/backends.py b/src/backend/core/authentication/backends.py
index a8715de..1faa27c 100644
--- a/src/backend/core/authentication/backends.py
+++ b/src/backend/core/authentication/backends.py
@@ -4,12 +4,12 @@ import logging
from django.conf import settings
from django.core.exceptions import SuspiciousOperation
-from django.utils.translation import gettext_lazy as _
from lasuite.oidc_login.backends import (
OIDCAuthenticationBackend as LaSuiteOIDCAuthenticationBackend,
)
+from core.entitlements import EntitlementsUnavailableError, get_user_entitlements
from core.models import DuplicateEmailError
logger = logging.getLogger(__name__)
@@ -51,3 +51,18 @@ class OIDCAuthenticationBackend(LaSuiteOIDCAuthenticationBackend):
return self.UserModel.objects.get_user_by_sub_or_email(sub, email)
except DuplicateEmailError as err:
raise SuspiciousOperation(err.message) from err
+
+ def post_get_or_create_user(self, user, claims, is_new_user):
+ """Warm the entitlements cache on login (force_refresh)."""
+ try:
+ get_user_entitlements(
+ user_sub=user.sub,
+ user_email=user.email,
+ user_info=claims,
+ force_refresh=True,
+ )
+ except EntitlementsUnavailableError:
+ logger.warning(
+ "Entitlements unavailable for %s during login",
+ user.email,
+ )
diff --git a/src/backend/core/entitlements/__init__.py b/src/backend/core/entitlements/__init__.py
new file mode 100644
index 0000000..36cd45f
--- /dev/null
+++ b/src/backend/core/entitlements/__init__.py
@@ -0,0 +1,29 @@
+"""Entitlements service layer."""
+
+from core.entitlements.factory import get_entitlements_backend
+
+
+class EntitlementsUnavailableError(Exception):
+ """Raised when the entitlements backend cannot be reached or returns an error."""
+
+
+def get_user_entitlements(user_sub, user_email, user_info=None, force_refresh=False):
+ """Get user entitlements, delegating to the configured backend.
+
+ Args:
+ user_sub: The user's OIDC subject identifier.
+ user_email: The user's email address.
+ user_info: The full OIDC user_info dict (forwarded to backend).
+ force_refresh: If True, bypass backend cache and fetch fresh data.
+
+ Returns:
+ dict: {"can_access": bool}
+
+ Raises:
+ EntitlementsUnavailableError: If the backend cannot be reached
+ and no cache exists.
+ """
+ backend = get_entitlements_backend()
+ return backend.get_user_entitlements(
+ user_sub, user_email, user_info=user_info, force_refresh=force_refresh
+ )
diff --git a/src/backend/core/entitlements/backends/__init__.py b/src/backend/core/entitlements/backends/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/backend/core/entitlements/backends/base.py b/src/backend/core/entitlements/backends/base.py
new file mode 100644
index 0000000..fbeea6a
--- /dev/null
+++ b/src/backend/core/entitlements/backends/base.py
@@ -0,0 +1,27 @@
+"""Abstract base class for entitlements backends."""
+
+from abc import ABC, abstractmethod
+
+
+class EntitlementsBackend(ABC):
+ """Abstract base class that defines the interface for entitlements backends."""
+
+ @abstractmethod
+ def get_user_entitlements(
+ self, user_sub, user_email, user_info=None, force_refresh=False
+ ):
+ """Fetch user entitlements.
+
+ Args:
+ user_sub: The user's OIDC subject identifier.
+ user_email: The user's email address.
+ user_info: The full OIDC user_info dict (backends may
+ extract claims from it).
+ force_refresh: If True, bypass any cache and fetch fresh data.
+
+ Returns:
+ dict: {"can_access": bool}
+
+ Raises:
+ EntitlementsUnavailableError: If the backend cannot be reached.
+ """
diff --git a/src/backend/core/entitlements/backends/deploycenter.py b/src/backend/core/entitlements/backends/deploycenter.py
new file mode 100644
index 0000000..43f5720
--- /dev/null
+++ b/src/backend/core/entitlements/backends/deploycenter.py
@@ -0,0 +1,120 @@
+"""DeployCenter (Espace Operateur) entitlements backend."""
+
+import logging
+
+from django.conf import settings
+from django.core.cache import cache
+
+import requests
+
+from core.entitlements import EntitlementsUnavailableError
+from core.entitlements.backends.base import EntitlementsBackend
+
+logger = logging.getLogger(__name__)
+
+
+class DeployCenterEntitlementsBackend(EntitlementsBackend):
+ """Backend that fetches entitlements from the DeployCenter API.
+
+ Args:
+ base_url: Full URL of the entitlements endpoint
+ (e.g. "https://dc.example.com/api/v1.0/entitlements/").
+ service_id: The service identifier in DeployCenter.
+ api_key: API key for X-Service-Auth header.
+ timeout: HTTP request timeout in seconds.
+ oidc_claims: List of OIDC claim names to extract from user_info
+ and forward as query params (e.g. ["siret"]).
+ """
+
+ def __init__( # pylint: disable=too-many-arguments
+ self,
+ base_url,
+ service_id,
+ api_key,
+ *,
+ timeout=10,
+ oidc_claims=None,
+ ):
+ self.base_url = base_url
+ self.service_id = service_id
+ self.api_key = api_key
+ self.timeout = timeout
+ self.oidc_claims = oidc_claims or []
+
+ def _cache_key(self, user_sub):
+ return f"entitlements:user:{user_sub}"
+
+ def _make_request(self, user_email, user_info=None):
+ """Make a request to the DeployCenter entitlements API.
+
+ Returns:
+ dict | None: The response data, or None on failure.
+ """
+ params = {
+ "service_id": self.service_id,
+ "account_type": "user",
+ "account_email": user_email,
+ }
+
+ # Forward configured OIDC claims as query params
+ if user_info:
+ for claim in self.oidc_claims:
+ if claim in user_info:
+ params[claim] = user_info[claim]
+
+ headers = {
+ "X-Service-Auth": f"Bearer {self.api_key}",
+ }
+
+ try:
+ response = requests.get(
+ self.base_url,
+ params=params,
+ headers=headers,
+ timeout=self.timeout,
+ )
+ response.raise_for_status()
+ return response.json()
+ except (requests.RequestException, ValueError):
+ email_domain = user_email.split("@")[-1] if "@" in user_email else "?"
+ logger.warning(
+ "DeployCenter entitlements request failed for user@%s",
+ email_domain,
+ exc_info=True,
+ )
+ return None
+
+ def get_user_entitlements(
+ self, user_sub, user_email, user_info=None, force_refresh=False
+ ):
+ """Fetch user entitlements from DeployCenter with caching.
+
+ On cache miss or force_refresh: fetches from the API.
+ On API failure: falls back to stale cache if available,
+ otherwise raises EntitlementsUnavailableError.
+ """
+ cache_key = self._cache_key(user_sub)
+
+ if not force_refresh:
+ cached = cache.get(cache_key)
+ if cached is not None:
+ return cached
+
+ data = self._make_request(user_email, user_info=user_info)
+
+ if data is None:
+ # API failed — try stale cache as fallback
+ cached = cache.get(cache_key)
+ if cached is not None:
+ return cached
+ raise EntitlementsUnavailableError(
+ "Failed to fetch user entitlements from DeployCenter"
+ )
+
+ entitlements = data.get("entitlements", {})
+ result = {
+ "can_access": entitlements.get("can_access", False),
+ }
+
+ cache.set(cache_key, result, settings.ENTITLEMENTS_CACHE_TIMEOUT)
+ return result
diff --git a/src/backend/core/entitlements/backends/local.py b/src/backend/core/entitlements/backends/local.py
new file mode 100644
index 0000000..370a600
--- /dev/null
+++ b/src/backend/core/entitlements/backends/local.py
@@ -0,0 +1,12 @@
+"""Local entitlements backend for development and testing."""
+
+from core.entitlements.backends.base import EntitlementsBackend
+
+
+class LocalEntitlementsBackend(EntitlementsBackend):
+ """Local backend that always grants access."""
+
+ def get_user_entitlements(
+ self, user_sub, user_email, user_info=None, force_refresh=False
+ ):
+ return {"can_access": True}
diff --git a/src/backend/core/entitlements/factory.py b/src/backend/core/entitlements/factory.py
new file mode 100644
index 0000000..c5564f7
--- /dev/null
+++ b/src/backend/core/entitlements/factory.py
@@ -0,0 +1,13 @@
+"""Factory for creating entitlements backend instances."""
+
+import functools
+
+from django.conf import settings
+from django.utils.module_loading import import_string
+
+
+@functools.cache
+def get_entitlements_backend():
+ """Return a singleton instance of the configured entitlements backend."""
+ backend_class = import_string(settings.ENTITLEMENTS_BACKEND)
+ return backend_class(**settings.ENTITLEMENTS_BACKEND_PARAMETERS)
diff --git a/src/backend/core/signals.py b/src/backend/core/signals.py
index 02fe84a..e7008cd 100644
--- a/src/backend/core/signals.py
+++ b/src/backend/core/signals.py
@@ -9,6 +9,7 @@ from django.contrib.auth import get_user_model
from django.db.models.signals import post_save
from django.dispatch import receiver
+from core.entitlements import EntitlementsUnavailableError, get_user_entitlements
from core.services.caldav_service import CalendarService
logger = logging.getLogger(__name__)
@@ -27,6 +28,23 @@ def provision_default_calendar(sender, instance, created, **kwargs): # pylint:
if not settings.CALDAV_URL:
return
+ # Check entitlements before creating calendar — fail-closed:
+ # never create a calendar if we can't confirm access.
+ try:
+ entitlements = get_user_entitlements(instance.sub, instance.email)
+ if not entitlements.get("can_access", True):
+ logger.info(
+ "Skipped calendar creation for %s (not entitled)",
+ instance.email,
+ )
+ return
+ except EntitlementsUnavailableError:
+ logger.warning(
+ "Entitlements unavailable for %s, skipping calendar creation",
+ instance.email,
+ )
+ return
+
try:
service = CalendarService()
service.create_default_calendar(instance)
diff --git a/src/backend/core/tests/test_api_users.py b/src/backend/core/tests/test_api_users.py
index 642c663..a837808 100644
--- a/src/backend/core/tests/test_api_users.py
+++ b/src/backend/core/tests/test_api_users.py
@@ -262,6 +262,7 @@ def test_api_users_retrieve_me_authenticated():
"full_name": user.full_name,
"short_name": user.short_name,
"language": user.language,
+ "can_access": True,
}
diff --git a/src/backend/core/tests/test_entitlements.py b/src/backend/core/tests/test_entitlements.py
new file mode 100644
index 0000000..82a6da8
--- /dev/null
+++ b/src/backend/core/tests/test_entitlements.py
@@ -0,0 +1,670 @@
+"""Tests for the entitlements module."""
+
+from unittest import mock
+
+from django.conf import settings
+from django.test import override_settings
+
+import pytest
+import requests
+import responses
+from rest_framework.status import (
+ HTTP_200_OK,
+ HTTP_207_MULTI_STATUS,
+ HTTP_403_FORBIDDEN,
+)
+from rest_framework.test import APIClient
+
+from core import factories
+from core.api.serializers import UserMeSerializer
+from core.authentication.backends import OIDCAuthenticationBackend
+from core.entitlements import EntitlementsUnavailableError, get_user_entitlements
+from core.entitlements.backends.deploycenter import DeployCenterEntitlementsBackend
+from core.entitlements.backends.local import LocalEntitlementsBackend
+from core.entitlements.factory import get_entitlements_backend
+
+# -- LocalEntitlementsBackend --
+
+
+def test_local_backend_always_grants_access():
+ """The local backend should always return can_access=True."""
+ backend = LocalEntitlementsBackend()
+ result = backend.get_user_entitlements("sub-123", "user@example.com")
+ assert result == {"can_access": True}
+
+
+def test_local_backend_ignores_parameters():
+ """The local backend should work regardless of parameters passed."""
+ backend = LocalEntitlementsBackend()
+ result = backend.get_user_entitlements(
+ "sub-123",
+ "user@example.com",
+ user_info={"some": "claim"},
+ force_refresh=True,
+ )
+ assert result == {"can_access": True}
+
+
+# -- Factory --
+
+
+@override_settings(
+ ENTITLEMENTS_BACKEND="core.entitlements.backends.local.LocalEntitlementsBackend",
+ ENTITLEMENTS_BACKEND_PARAMETERS={},
+)
+def test_factory_returns_local_backend():
+ """The factory should instantiate the configured backend."""
+ get_entitlements_backend.cache_clear()
+ backend = get_entitlements_backend()
+ assert isinstance(backend, LocalEntitlementsBackend)
+ get_entitlements_backend.cache_clear()
+
+
+@override_settings(
+ ENTITLEMENTS_BACKEND="core.entitlements.backends.local.LocalEntitlementsBackend",
+ ENTITLEMENTS_BACKEND_PARAMETERS={},
+)
+def test_factory_singleton():
+ """The factory should return the same instance on repeated calls."""
+ get_entitlements_backend.cache_clear()
+ backend1 = get_entitlements_backend()
+ backend2 = get_entitlements_backend()
+ assert backend1 is backend2
+ get_entitlements_backend.cache_clear()
+
+
+# -- get_user_entitlements public API --
+
+
+@override_settings(
+ ENTITLEMENTS_BACKEND="core.entitlements.backends.local.LocalEntitlementsBackend",
+ ENTITLEMENTS_BACKEND_PARAMETERS={},
+)
+def test_get_user_entitlements_with_local_backend():
+ """The public API should delegate to the configured backend."""
+ get_entitlements_backend.cache_clear()
+ result = get_user_entitlements("sub-123", "user@example.com")
+ assert result["can_access"] is True
+ get_entitlements_backend.cache_clear()
+
+
+# -- DeployCenterEntitlementsBackend --
+
+DC_URL = "https://deploy.example.com/api/v1.0/entitlements/"
+
+
+@responses.activate
+def test_deploycenter_backend_grants_access():
+ """DeployCenter backend should return can_access from API response."""
+ responses.add(
+ responses.GET,
+ DC_URL,
+ json={"entitlements": {"can_access": True}},
+ status=200,
+ )
+
+ backend = DeployCenterEntitlementsBackend(
+ base_url=DC_URL,
+ service_id="calendar",
+ api_key="test-key",
+ )
+ result = backend.get_user_entitlements("sub-123", "user@example.com")
+ assert result == {"can_access": True}
+
+ # Verify request was made with correct params and header
+ assert len(responses.calls) == 1
+ request = responses.calls[0].request
+ assert "service_id=calendar" in request.url
+ assert "account_email=user%40example.com" in request.url
+ assert request.headers["X-Service-Auth"] == "Bearer test-key"
+
+
+@responses.activate
+def test_deploycenter_backend_denies_access():
+ """DeployCenter backend should return can_access=False when API says so."""
+ responses.add(
+ responses.GET,
+ DC_URL,
+ json={"entitlements": {"can_access": False}},
+ status=200,
+ )
+
+ backend = DeployCenterEntitlementsBackend(
+ base_url=DC_URL,
+ service_id="calendar",
+ api_key="test-key",
+ )
+ result = backend.get_user_entitlements("sub-123", "user@example.com")
+ assert result == {"can_access": False}
+
+
+@responses.activate
+@override_settings(ENTITLEMENTS_CACHE_TIMEOUT=300)
+def test_deploycenter_backend_uses_cache():
+ """DeployCenter should use cached results when not force_refresh."""
+ responses.add(
+ responses.GET,
+ DC_URL,
+ json={"entitlements": {"can_access": True}},
+ status=200,
+ )
+
+ backend = DeployCenterEntitlementsBackend(
+ base_url=DC_URL,
+ service_id="calendar",
+ api_key="test-key",
+ )
+
+ # First call hits the API
+ result1 = backend.get_user_entitlements("sub-123", "user@example.com")
+ assert result1 == {"can_access": True}
+ assert len(responses.calls) == 1
+
+ # Second call should use cache
+ result2 = backend.get_user_entitlements("sub-123", "user@example.com")
+ assert result2 == {"can_access": True}
+ assert len(responses.calls) == 1 # No additional API call
+
+
+@responses.activate
+@override_settings(ENTITLEMENTS_CACHE_TIMEOUT=300)
+def test_deploycenter_backend_force_refresh_bypasses_cache():
+ """force_refresh=True should bypass cache and hit the API."""
+ responses.add(
+ responses.GET,
+ DC_URL,
+ json={"entitlements": {"can_access": True}},
+ status=200,
+ )
+ responses.add(
+ responses.GET,
+ DC_URL,
+ json={"entitlements": {"can_access": False}},
+ status=200,
+ )
+
+ backend = DeployCenterEntitlementsBackend(
+ base_url=DC_URL,
+ service_id="calendar",
+ api_key="test-key",
+ )
+
+ result1 = backend.get_user_entitlements("sub-123", "user@example.com")
+ assert result1["can_access"] is True
+
+ result2 = backend.get_user_entitlements(
+ "sub-123", "user@example.com", force_refresh=True
+ )
+ assert result2["can_access"] is False
+ assert len(responses.calls) == 2
+
+
+@responses.activate
+@override_settings(ENTITLEMENTS_CACHE_TIMEOUT=300)
+def test_deploycenter_backend_fallback_to_stale_cache():
+ """When API fails, should return stale cached value if available."""
+ responses.add(
+ responses.GET,
+ DC_URL,
+ json={"entitlements": {"can_access": True}},
+ status=200,
+ )
+
+ backend = DeployCenterEntitlementsBackend(
+ base_url=DC_URL,
+ service_id="calendar",
+ api_key="test-key",
+ )
+
+ # Populate cache
+ backend.get_user_entitlements("sub-123", "user@example.com")
+
+ # Now API fails
+ responses.replace(
+ responses.GET,
+ DC_URL,
+ body=requests.ConnectionError("Connection error"),
+ )
+
+ # force_refresh to hit API, but should fall back to cache
+ result = backend.get_user_entitlements(
+ "sub-123", "user@example.com", force_refresh=True
+ )
+ assert result == {"can_access": True}
+
+
+@responses.activate
+def test_deploycenter_backend_raises_when_no_cache():
+ """When API fails and no cache exists, should raise."""
+ responses.add(
+ responses.GET,
+ DC_URL,
+ body=requests.ConnectionError("Connection error"),
+ )
+
+ backend = DeployCenterEntitlementsBackend(
+ base_url=DC_URL,
+ service_id="calendar",
+ api_key="test-key",
+ )
+
+ with pytest.raises(EntitlementsUnavailableError):
+ backend.get_user_entitlements("sub-123", "user@example.com")
+
+
+@responses.activate
+def test_deploycenter_backend_sends_oidc_claims():
+ """DeployCenter should forward configured OIDC claims."""
+ responses.add(
+ responses.GET,
+ DC_URL,
+ json={"entitlements": {"can_access": True}},
+ status=200,
+ )
+
+ backend = DeployCenterEntitlementsBackend(
+ base_url=DC_URL,
+ service_id="calendar",
+ api_key="test-key",
+ oidc_claims=["organization"],
+ )
+
+ backend.get_user_entitlements(
+ "sub-123",
+ "user@example.com",
+ user_info={"organization": "org-42", "other": "ignored"},
+ )
+
+ request = responses.calls[0].request
+ assert "organization=org-42" in request.url
+ assert "other" not in request.url
+
+
+# -- Auth backend integration --
+
+
+pytestmark = pytest.mark.django_db
+
+
+def test_auth_backend_warms_cache_on_login():
+ """post_get_or_create_user should call get_user_entitlements with force_refresh."""
+ user = factories.UserFactory()
+ backend = OIDCAuthenticationBackend()
+
+ with mock.patch(
+ "core.authentication.backends.get_user_entitlements",
+ return_value={"can_access": True},
+ ) as mock_ent:
+ backend.post_get_or_create_user(user, {"sub": "x"}, is_new_user=False)
+ mock_ent.assert_called_once_with(
+ user_sub=user.sub,
+ user_email=user.email,
+ user_info={"sub": "x"},
+ force_refresh=True,
+ )
+
+
+def test_auth_backend_login_succeeds_when_access_denied():
+ """Login should succeed even when can_access is False (gated in frontend)."""
+ user = factories.UserFactory()
+ backend = OIDCAuthenticationBackend()
+
+ with mock.patch(
+ "core.authentication.backends.get_user_entitlements",
+ return_value={"can_access": False},
+ ):
+ # Should not raise — user logs in, frontend gates access
+ backend.post_get_or_create_user(user, {}, is_new_user=False)
+
+
+def test_auth_backend_login_succeeds_when_entitlements_unavailable():
+ """Login should succeed when entitlements service is unavailable."""
+ user = factories.UserFactory()
+ backend = OIDCAuthenticationBackend()
+
+ with mock.patch(
+ "core.authentication.backends.get_user_entitlements",
+ side_effect=EntitlementsUnavailableError("unavailable"),
+ ):
+ # Should not raise
+ backend.post_get_or_create_user(user, {}, is_new_user=False)
+
+
+# -- UserMeSerializer (can_access field) --
+
+
+def test_user_me_serializer_includes_can_access_true():
+ """UserMeSerializer should include can_access=True when entitled."""
+ user = factories.UserFactory()
+ with mock.patch(
+ "core.api.serializers.get_user_entitlements",
+ return_value={"can_access": True},
+ ):
+ data = UserMeSerializer(user).data
+ assert data["can_access"] is True
+
+
+def test_user_me_serializer_includes_can_access_false():
+ """UserMeSerializer should include can_access=False when not entitled."""
+ user = factories.UserFactory()
+ with mock.patch(
+ "core.api.serializers.get_user_entitlements",
+ return_value={"can_access": False},
+ ):
+ data = UserMeSerializer(user).data
+ assert data["can_access"] is False
+
+
+def test_user_me_serializer_can_access_fail_open():
+ """UserMeSerializer should return can_access=True when entitlements unavailable."""
+ user = factories.UserFactory()
+ with mock.patch(
+ "core.api.serializers.get_user_entitlements",
+ side_effect=EntitlementsUnavailableError("unavailable"),
+ ):
+ data = UserMeSerializer(user).data
+ assert data["can_access"] is True
+
+
+# -- Signals integration --
+
+
+@override_settings(
+ CALDAV_URL="http://caldav:80",
+ ENTITLEMENTS_BACKEND="core.entitlements.backends.local.LocalEntitlementsBackend",
+ ENTITLEMENTS_BACKEND_PARAMETERS={},
+)
+def test_signal_skips_calendar_when_not_entitled():
+ """Calendar should NOT be created when entitlements deny access."""
+ get_entitlements_backend.cache_clear()
+
+ with (
+ mock.patch(
+ "core.signals.get_user_entitlements",
+ return_value={"can_access": False},
+ ) as mock_ent,
+ mock.patch("core.signals.CalendarService") as mock_svc,
+ ):
+ factories.UserFactory()
+ mock_ent.assert_called_once()
+ mock_svc.assert_not_called()
+
+ get_entitlements_backend.cache_clear()
+
+
+@override_settings(
+ CALDAV_URL="http://caldav:80",
+ ENTITLEMENTS_BACKEND="core.entitlements.backends.local.LocalEntitlementsBackend",
+ ENTITLEMENTS_BACKEND_PARAMETERS={},
+)
+def test_signal_skips_calendar_when_entitlements_unavailable():
+ """Calendar should NOT be created when entitlements are unavailable (fail-closed)."""
+ get_entitlements_backend.cache_clear()
+
+ with (
+ mock.patch(
+ "core.signals.get_user_entitlements",
+ side_effect=EntitlementsUnavailableError("unavailable"),
+ ),
+ mock.patch("core.signals.CalendarService") as mock_svc,
+ ):
+ factories.UserFactory()
+ mock_svc.assert_not_called()
+
+ get_entitlements_backend.cache_clear()
+
+
+@override_settings(
+ CALDAV_URL="http://caldav:80",
+ ENTITLEMENTS_BACKEND="core.entitlements.backends.local.LocalEntitlementsBackend",
+ ENTITLEMENTS_BACKEND_PARAMETERS={},
+)
+def test_signal_creates_calendar_when_entitled():
+ """Calendar should be created when entitlements grant access."""
+ get_entitlements_backend.cache_clear()
+
+ with (
+ mock.patch(
+ "core.signals.get_user_entitlements",
+ return_value={"can_access": True},
+ ),
+ mock.patch("core.signals.CalendarService") as mock_svc,
+ ):
+ factories.UserFactory()
+ mock_svc.return_value.create_default_calendar.assert_called_once()
+
+ get_entitlements_backend.cache_clear()
+
+
+# -- CalDAV proxy entitlements enforcement --
+
+
+@pytest.mark.django_db
+class TestCalDAVProxyEntitlements: # pylint: disable=no-member
+ """Tests for entitlements enforcement in the CalDAV proxy."""
+
+ @responses.activate
+ def test_mkcalendar_blocked_when_not_entitled(self):
+ """MKCALENDAR should return 403 when user has can_access=False."""
+ user = factories.UserFactory(email="test@example.com")
+ client = APIClient()
+ client.force_login(user)
+
+ with mock.patch(
+ "core.api.viewsets_caldav.get_user_entitlements",
+ return_value={"can_access": False},
+ ):
+ response = client.generic(
+ "MKCALENDAR",
+ "/api/v1.0/caldav/calendars/test@example.com/new-cal/",
+ )
+
+ assert response.status_code == HTTP_403_FORBIDDEN
+
+ @responses.activate
+ def test_mkcol_blocked_when_not_entitled(self):
+ """MKCOL should return 403 when user has can_access=False."""
+ user = factories.UserFactory(email="test@example.com")
+ client = APIClient()
+ client.force_login(user)
+
+ with mock.patch(
+ "core.api.viewsets_caldav.get_user_entitlements",
+ return_value={"can_access": False},
+ ):
+ response = client.generic(
+ "MKCOL",
+ "/api/v1.0/caldav/calendars/test@example.com/new-cal/",
+ )
+
+ assert response.status_code == HTTP_403_FORBIDDEN
+
+ @responses.activate
+ def test_mkcalendar_blocked_when_entitlements_unavailable(self):
+ """MKCALENDAR should return 403 when entitlements service
+ is unavailable (fail-closed)."""
+ user = factories.UserFactory(email="test@example.com")
+ client = APIClient()
+ client.force_login(user)
+
+ with mock.patch(
+ "core.api.viewsets_caldav.get_user_entitlements",
+ side_effect=EntitlementsUnavailableError("unavailable"),
+ ):
+ response = client.generic(
+ "MKCALENDAR",
+ "/api/v1.0/caldav/calendars/test@example.com/new-cal/",
+ )
+
+ assert response.status_code == HTTP_403_FORBIDDEN
+
+ @responses.activate
+ def test_mkcalendar_allowed_when_entitled(self):
+ """MKCALENDAR should be forwarded when user has can_access=True."""
+ user = factories.UserFactory(email="test@example.com")
+ client = APIClient()
+ client.force_login(user)
+
+ caldav_url = settings.CALDAV_URL
+ responses.add(
+ responses.Response(
+ method="MKCALENDAR",
+ url=f"{caldav_url}/api/v1.0/caldav/calendars/test@example.com/new-cal/",
+ status=201,
+ body="",
+ )
+ )
+
+ with mock.patch(
+ "core.api.viewsets_caldav.get_user_entitlements",
+ return_value={"can_access": True},
+ ):
+ response = client.generic(
+ "MKCALENDAR",
+ "/api/v1.0/caldav/calendars/test@example.com/new-cal/",
+ )
+
+ assert response.status_code == 201
+ assert len(responses.calls) == 1
+
+ @responses.activate
+ def test_propfind_allowed_when_not_entitled(self):
+ """PROPFIND should work for non-entitled users (shared calendars)."""
+ user = factories.UserFactory(email="test@example.com")
+ client = APIClient()
+ client.force_login(user)
+
+ caldav_url = settings.CALDAV_URL
+ responses.add(
+ responses.Response(
+ method="PROPFIND",
+ url=f"{caldav_url}/api/v1.0/caldav/",
+ status=HTTP_207_MULTI_STATUS,
+ body='
{t("no_access.description")}
+ +