(resource-server) add SCIM /Me endpoint

This provide a "self-care" SCIM endpoint, authenticated with OIDC token
introspection. This endpoint will be use by services to fetch the user's
team list.

We chose to use the SCIM format (even if this is not a SCIM context) to
make it easier to understand/maintain/plug.
This commit is contained in:
Quentin BEY
2025-04-30 14:14:27 +02:00
parent 160c45bf35
commit 4dfd682cb6
11 changed files with 438 additions and 3 deletions

View File

@@ -8,6 +8,10 @@ and this project adheres to
## [Unreleased]
### Added
- ✨(resource-server) add SCIM /Me endpoint #895
## [1.17.0] - 2025-06-11
### Added

View File

@@ -0,0 +1 @@
"""SCIM compliant API for resource server"""

View File

@@ -0,0 +1,16 @@
"""Exceptions for SCIM API."""
from django.conf import settings
from core.api.resource_server.scim.response import ScimJsonResponse
def scim_exception_handler(exc, _context):
"""Handle SCIM exceptions and return them in the correct format."""
data = {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
"status": str(exc.status_code),
"detail": str(exc.detail) if settings.DEBUG else "",
}
return ScimJsonResponse(data, status=exc.status_code)

View File

@@ -0,0 +1,16 @@
"""SCIM API response classes."""
from rest_framework.response import Response
class ScimJsonResponse(Response):
"""
Custom JSON response class for SCIM API.
This class sets the content type to "application/json+scim" for SCIM
responses.
"""
def __init__(self, *args, **kwargs):
"""JSON response with enforced SCIM content type."""
super().__init__(*args, content_type="application/json+scim", **kwargs)

View File

@@ -0,0 +1,90 @@
"""SCIM serializers for resource server API."""
from django.urls import reverse
from rest_framework import serializers
from core import models
class SCIMUserSerializer(serializers.ModelSerializer):
"""Serialize users in SCIM format."""
schemas = serializers.SerializerMethodField()
userName = serializers.CharField(source="sub")
displayName = serializers.CharField(source="name")
emails = serializers.SerializerMethodField()
meta = serializers.SerializerMethodField()
groups = serializers.SerializerMethodField()
active = serializers.BooleanField(source="is_active")
class Meta:
model = models.User
fields = [
"id",
"schemas",
"active",
"userName",
"displayName",
"emails",
"groups",
"meta",
]
read_only_fields = [
"id",
"schemas",
"active",
"userName",
"displayName",
"emails",
"groups",
"meta",
]
def get_schemas(self, _obj):
"""Return the SCIM schemas for the user."""
return ["urn:ietf:params:scim:schemas:core:2.0:User"]
def get_emails(self, obj):
"""Return the user's email as a list of email objects."""
if obj.email:
return [
{
"value": obj.email,
"primary": True,
"type": "work",
}
]
return []
def get_groups(self, obj):
"""
Return the groups the user belongs to.
WARNING: you need to prefetch the team accesses in the
viewset to avoid N+1 queries.
"""
return [
{
"value": str(team_access.team.external_id),
"display": team_access.team.name,
"type": "direct",
}
for team_access in obj.accesses.all()
]
def get_meta(self, obj):
"""Return metadata about the user."""
request = self.context.get("request")
location = (
f"{request.build_absolute_uri('/').rstrip('/')}{reverse('scim-me-list')}"
if request
else None
)
return {
"resourceType": "User",
"created": obj.created_at.isoformat(),
"lastModified": obj.updated_at.isoformat(),
"location": location,
}

View File

@@ -0,0 +1,62 @@
"""Resource server SCIM API endpoints"""
from django.contrib.auth import get_user_model
from django.db.models import Prefetch, Q
from lasuite.oidc_resource_server.mixins import ResourceServerMixin
from rest_framework import (
viewsets,
)
from core.api import permissions
from core.models import TeamAccess
from . import serializers
from .exceptions import scim_exception_handler
from .response import ScimJsonResponse
User = get_user_model()
class MeViewSet(ResourceServerMixin, viewsets.ViewSet):
"""
SCIM-compliant ViewSet for the /Me endpoint.
This endpoint provides information about the currently authenticated user
in SCIM (System for Cross-domain Identity Management) format.
Features:
- Returns user details in SCIM format.
- Includes the user's teams, restricted to the audience.
Limitations:
- Does not currently support managing Team hierarchies.
Endpoint:
GET /resource-server/v1.0/scim/Me/
Retrieves the authenticated user's details and associated teams.
"""
permission_classes = [permissions.IsAuthenticated]
serializer_class = serializers.SCIMUserSerializer
def get_exception_handler(self):
"""Override the default exception handler to use SCIM-specific handling."""
return scim_exception_handler
def list(self, request, *args, **kwargs):
"""Return the current user's details in SCIM format."""
service_provider_audience = self._get_service_provider_audience()
user = User.objects.prefetch_related(
Prefetch(
"accesses",
queryset=TeamAccess.objects.select_related("team").filter(
Q(team__service_providers__audience_id=service_provider_audience)
| Q(team__is_visible_all_services=True)
),
)
).get(pk=request.user.pk)
serializer = self.serializer_class(user, context={"request": request})
return ScimJsonResponse(serializer.data)

View File

@@ -0,0 +1,40 @@
# Generated by Django 5.2 on 2025-06-12 09:58
import uuid
from django.conf import settings
from django.db import migrations, models
def gen_team_uuid(apps, schema_editor):
Team = apps.get_model("core", "Team")
for row in Team.objects.all():
row.external_id = uuid.uuid4()
# Don't need to batch update here, as the table is likely small or empty
row.save(update_fields=["external_id"])
class Migration(migrations.Migration):
dependencies = [
('core', '0015_alter_accountservice_api_key'),
]
operations = [
migrations.AddField(
model_name='team',
name='external_id',
field=models.UUIDField(default=uuid.uuid4, editable=False, help_text='Team external UUID for synchronization with external systems', unique=False, verbose_name='external_id'),
),
migrations.RunPython(gen_team_uuid, reverse_code=migrations.RunPython.noop),
migrations.AlterField(
model_name='team',
name='external_id',
field=models.UUIDField(default=uuid.uuid4, editable=False, help_text='Team external UUID for synchronization with external systems', unique=True, verbose_name='external_id'),
),
#
migrations.AlterField(
model_name='team',
name='users',
field=models.ManyToManyField(related_name='teams', through='core.TeamAccess', through_fields=('team', 'user'), to=settings.AUTH_USER_MODEL),
),
]

View File

@@ -742,6 +742,11 @@ class Team(MP_Node, BaseModel):
can see it.
When a team is created from a Service Provider this one is automatically set in the
Team `service_providers`.
The team `external_id` is used to synchronize the team with external systems, this
is the equivalent of the User `sub` field but for teams (note: `sub` is highly related
to the OIDC standard, while `external_id` is not). The `external_id` is NOT the same as
the `externalId` field in the SCIM standard when importing teams from SCIM.
"""
# Allow up to 80 nested teams with 62^5 (916_132_832) root nodes
@@ -752,6 +757,14 @@ class Team(MP_Node, BaseModel):
name = models.CharField(max_length=100)
external_id = models.UUIDField(
verbose_name=_("external_id"),
help_text=_("Team external UUID for synchronization with external systems"),
unique=True,
default=uuid.uuid4,
editable=False,
)
users = models.ManyToManyField(
User,
through="TeamAccess",

View File

@@ -0,0 +1 @@
"""Tests for the resource server SCIM API endpoints."""

View File

@@ -0,0 +1,186 @@
"""
Tests for the SCIM Me API endpoint in People's core app
"""
import pytest
from rest_framework.status import (
HTTP_200_OK,
HTTP_401_UNAUTHORIZED,
HTTP_405_METHOD_NOT_ALLOWED,
)
from core import factories
from core.models import RoleChoices
pytestmark = pytest.mark.django_db
def test_api_me_anonymous(client):
"""Anonymous users should not be allowed to access the Me endpoint."""
response = client.get("/resource-server/v1.0/scim/Me/")
assert response.status_code == HTTP_401_UNAUTHORIZED
assert response.headers["Content-Type"] == "application/json+scim"
# Check the full response with the expected structure
assert response.json() == {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
"status": "401",
"detail": "",
}
def test_api_me_authenticated(
client, force_login_via_resource_server, django_assert_num_queries
):
"""
Authenticated users should be able to access their own information
in SCIM format.
"""
user = factories.UserFactory(name="Test User", email="test@example.com")
service_provider = factories.ServiceProviderFactory()
# Authenticate using the resource server, ie via the Authorization header
with force_login_via_resource_server(client, user, service_provider.audience_id):
with django_assert_num_queries(2):
response = client.get(
"/resource-server/v1.0/scim/Me/",
format="json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_200_OK
assert response.headers["Content-Type"] == "application/json+scim"
# Check the full response with the expected structure
assert response.json() == {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"id": str(user.pk),
"active": True,
"userName": user.sub,
"displayName": user.name,
"emails": [
{
"value": user.email,
"primary": True,
"type": "work",
}
],
"groups": [],
"meta": {
"resourceType": "User",
"created": user.created_at.isoformat(),
"lastModified": user.updated_at.isoformat(),
"location": "http://testserver/resource-server/v1.0/scim/Me/",
},
}
def test_api_me_authenticated_with_team_access(
client, force_login_via_resource_server, django_assert_num_queries
):
"""
Authenticated users with TeamAccess should see their team information
in SCIM format, but only for teams visible to the service provider.
"""
user = factories.UserFactory(name="Test User", email="test@example.com")
service_provider = factories.ServiceProviderFactory()
# Create teams with different visibility settings
team_visible = factories.TeamFactory(name="Visible Team")
team_visible.service_providers.add(service_provider)
team_all_services = factories.TeamFactory(
name="All Services Team", is_visible_all_services=True
)
team_not_visible = factories.TeamFactory(name="Not Visible Team")
# This team is not associated with the service provider
# Add user to all teams
factories.TeamAccessFactory(user=user, team=team_visible, role=RoleChoices.MEMBER)
factories.TeamAccessFactory(
user=user, team=team_all_services, role=RoleChoices.ADMIN
)
factories.TeamAccessFactory(
user=user, team=team_not_visible, role=RoleChoices.OWNER
)
# Authenticate using the resource server, ie via the Authorization header
with force_login_via_resource_server(client, user, service_provider.audience_id):
with django_assert_num_queries(
2
): # User + TeamAccess (with select_related teams)
response = client.get(
"/resource-server/v1.0/scim/Me/",
format="json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_200_OK
assert response.headers["Content-Type"] == "application/json+scim"
# Check the full response with the expected structure
assert response.json() == {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"id": str(user.pk),
"active": True,
"userName": user.sub,
"displayName": user.name,
"emails": [
{
"value": user.email,
"primary": True,
"type": "work",
}
],
"groups": [
{
"value": str(team_visible.external_id),
"display": team_visible.name,
"type": "direct",
},
{
"value": str(team_all_services.external_id),
"display": team_all_services.name,
"type": "direct",
},
],
"meta": {
"resourceType": "User",
"created": user.created_at.isoformat(),
"lastModified": user.updated_at.isoformat(),
"location": "http://testserver/resource-server/v1.0/scim/Me/",
},
}
@pytest.mark.parametrize(
"http_method",
["post", "put", "patch", "delete"],
ids=["POST", "PUT", "PATCH", "DELETE"],
)
def test_api_me_method_not_allowed(
client, force_login_via_resource_server, http_method
):
"""Test that methods other than GET are not allowed for the Me endpoint."""
user = factories.UserFactory()
service_provider = factories.ServiceProviderFactory()
# Authenticate using the resource server, ie via the Authorization header
with force_login_via_resource_server(client, user, service_provider.audience_id):
client_method = getattr(client, http_method)
response = client_method(
"/resource-server/v1.0/scim/Me/",
format="json",
HTTP_AUTHORIZATION="Bearer b64untestedbearertoken",
)
assert response.status_code == HTTP_405_METHOD_NOT_ALLOWED
assert response.headers["Content-Type"] == "application/json+scim"
# Check the full response with the expected structure
assert response.json() == {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
"status": str(HTTP_405_METHOD_NOT_ALLOWED),
"detail": "",
}

View File

@@ -3,18 +3,23 @@
from django.urls import include, path
from lasuite.oidc_resource_server.urls import urlpatterns as resource_server_urls
from rest_framework.routers import DefaultRouter
from rest_framework.routers import SimpleRouter
from core.api.resource_server import viewsets
from core.api.resource_server.scim import viewsets as scim_viewsets
# - Main endpoints
# Contacts will be added later
# Users will be added later
router = DefaultRouter()
router = SimpleRouter()
router.register("teams", viewsets.TeamViewSet, basename="teams")
# - SCIM endpoints
scim_router = SimpleRouter()
scim_router.register("Me", scim_viewsets.MeViewSet, basename="scim-me")
# - Routes nested under a team
team_related_router = DefaultRouter()
team_related_router = SimpleRouter()
team_related_router.register(
"invitations",
viewsets.InvitationViewset,
@@ -33,6 +38,7 @@ urlpatterns = [
*router.urls,
*resource_server_urls,
path("teams/<uuid:team_id>/", include(team_related_router.urls)),
path("scim/", include(scim_router.urls)),
]
),
),