🐛(backend) fix create document for user when sub does not match

When creating a document on behalf of a user via the server-to-server
API, a special edge case was broken that should should never happen
but happens in our OIDC federation because one of the provider modifies
the users "sub" each time they login.

We end-up with existing users for who the email matches but not the sub.
They were not correctly handled.

I made a few additional fixes and improvements to the endpoint.
This commit is contained in:
Samuel Paccoud - DINUM
2025-01-10 09:50:48 +01:00
committed by Samuel Paccoud
parent 96bb99d6ec
commit 610948cd16
8 changed files with 409 additions and 42 deletions

View File

@@ -29,6 +29,9 @@ and this project adheres to
- 💄(frontend) update doc versioning ui #463
- 💄(frontend) update doc summary ui #473
## Fixed
- 🐛(backend) fix create document via s2s if sub unknown but email found #543
## [1.10.0] - 2024-12-17

View File

@@ -264,13 +264,17 @@ class ServerCreateDocumentSerializer(serializers.Serializer):
"""Create the document and associate it with the user or send an invitation."""
language = validated_data.get("language", settings.LANGUAGE_CODE)
# Get the user based on the sub (unique identifier)
# Get the user on its sub (unique identifier). Default on email if allowed in settings
email = validated_data["email"]
try:
user = models.User.objects.get(sub=validated_data["sub"])
except (models.User.DoesNotExist, KeyError):
user = None
email = validated_data["email"]
else:
user = models.User.objects.get_user_by_sub_or_email(
validated_data["sub"], email
)
except models.DuplicateEmailError as err:
raise serializers.ValidationError({"email": [err.message]}) from err
if user:
email = user.email
language = user.language or language
@@ -279,7 +283,9 @@ class ServerCreateDocumentSerializer(serializers.Serializer):
validated_data["content"]
)
except ConversionError as err:
raise exceptions.APIException(detail="could not convert content") from err
raise serializers.ValidationError(
{"content": ["Could not convert content"]}
) from err
document = models.Document.objects.create(
title=validated_data["title"],
@@ -302,7 +308,11 @@ class ServerCreateDocumentSerializer(serializers.Serializer):
role=models.RoleChoices.OWNER,
)
# Notify the user about the newly created document
self._send_email_notification(document, validated_data, email, language)
return document
def _send_email_notification(self, document, validated_data, email, language):
"""Notify the user about the newly created document."""
subject = validated_data.get("subject") or _(
"A new document was created on your behalf!"
)
@@ -313,8 +323,6 @@ class ServerCreateDocumentSerializer(serializers.Serializer):
}
document.send_email(subject, [email], context, language)
return document
def update(self, instance, validated_data):
"""
This serializer does not support updates.

View File

@@ -11,7 +11,7 @@ from mozilla_django_oidc.auth import (
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
)
from core.models import User
from core.models import DuplicateEmailError, User
logger = logging.getLogger(__name__)
@@ -98,7 +98,10 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
"short_name": short_name,
}
user = self.get_existing_user(sub, email)
try:
user = User.objects.get_user_by_sub_or_email(sub, email)
except DuplicateEmailError as err:
raise SuspiciousOperation(err.message) from err
if user:
if not user.is_active:
@@ -117,16 +120,6 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
)
return full_name or None
def get_existing_user(self, sub, email):
"""Fetch an existing user by sub (or email as a fallback respecting fallback setting."""
try:
return User.objects.get(sub=sub)
except User.DoesNotExist:
if email and settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION:
return User.objects.filter(email=email).first()
return None
def update_user_if_needed(self, user, claims):
"""Update user claims if they have changed."""
has_changed = any(

View File

@@ -1,6 +1,7 @@
"""
Declare and configure the models for the impress core application
"""
# pylint: disable=too-many-lines
import hashlib
import smtplib
@@ -89,6 +90,16 @@ class LinkReachChoices(models.TextChoices):
PUBLIC = "public", _("Public") # Even anonymous users can access the document
class DuplicateEmailError(Exception):
"""Raised when an email is already associated with a pre-existing user."""
def __init__(self, message=None, email=None):
"""Set message and email to describe the exception."""
self.message = message
self.email = email
super().__init__(self.message)
class BaseModel(models.Model):
"""
Serves as an abstract base model for other models, ensuring that records are validated
@@ -126,6 +137,35 @@ class BaseModel(models.Model):
super().save(*args, **kwargs)
class UserManager(auth_models.UserManager):
"""Custom manager for User model with additional methods."""
def get_user_by_sub_or_email(self, sub, email):
"""Fetch existing user by sub or email."""
try:
return self.get(sub=sub)
except self.model.DoesNotExist as err:
if not email:
return None
if settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION:
try:
return self.get(email=email)
except self.model.DoesNotExist:
pass
elif (
self.filter(email=email).exists()
and not settings.OIDC_ALLOW_DUPLICATE_EMAILS
):
raise DuplicateEmailError(
_(
"We couldn't find a user with this sub but the email is already "
"associated with a registered user."
)
) from err
return None
class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
"""User model to work with OIDC only authentication."""
@@ -192,7 +232,7 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
),
)
objects = auth_models.UserManager()
objects = UserManager()
USERNAME_FIELD = "admin_email"
REQUIRED_FIELDS = []
@@ -939,7 +979,10 @@ class Invitation(BaseModel):
super().clean()
# Check if an identity already exists for the provided email
if User.objects.filter(email=self.email).exists():
if (
User.objects.filter(email=self.email).exists()
and not settings.OIDC_ALLOW_DUPLICATE_EMAILS
):
raise exceptions.ValidationError(
{"email": _("This email is already associated to a registered user.")}
)

View File

@@ -1,5 +1,6 @@
"""Unit tests for the Authentication Backends."""
import random
import re
from logging import Logger
from unittest import mock
@@ -64,7 +65,33 @@ def test_authentication_getter_existing_user_via_email(
assert user == db_user
def test_authentication_getter_existing_user_no_fallback_to_email(
def test_authentication_getter_email_none(monkeypatch):
"""
If no user is found with the sub and no email is provided, a new user should be created.
"""
klass = OIDCAuthenticationBackend()
db_user = UserFactory(email=None)
def get_userinfo_mocked(*args):
user_info = {"sub": "123"}
if random.choice([True, False]):
user_info["email"] = None
return user_info
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
# Since the sub and email didn't match, it should create a new user
assert models.User.objects.count() == 2
assert user != db_user
assert user.sub == "123"
def test_authentication_getter_existing_user_no_fallback_to_email_allow_duplicate(
settings, monkeypatch
):
"""
@@ -77,6 +104,7 @@ def test_authentication_getter_existing_user_no_fallback_to_email(
# Set the setting to False
settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION = False
settings.OIDC_ALLOW_DUPLICATE_EMAILS = True
def get_userinfo_mocked(*args):
return {"sub": "123", "email": db_user.email}
@@ -93,6 +121,39 @@ def test_authentication_getter_existing_user_no_fallback_to_email(
assert user.sub == "123"
def test_authentication_getter_existing_user_no_fallback_to_email_no_duplicate(
settings, monkeypatch
):
"""
When the "OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION" setting is set to False,
the system should not match users by email, even if the email matches.
"""
klass = OIDCAuthenticationBackend()
db_user = UserFactory()
# Set the setting to False
settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION = False
settings.OIDC_ALLOW_DUPLICATE_EMAILS = False
def get_userinfo_mocked(*args):
return {"sub": "123", "email": db_user.email}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
with pytest.raises(
SuspiciousOperation,
match=(
"We couldn't find a user with this sub but the email is already associated "
"with a registered user."
),
):
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
# Since the sub doesn't match, it should not create a new user
assert models.User.objects.count() == 1
def test_authentication_getter_existing_user_with_email(
django_assert_num_queries, monkeypatch
):

View File

@@ -13,6 +13,7 @@ import pytest
from rest_framework.test import APIClient
from core import factories
from core.api.serializers import ServerCreateDocumentSerializer
from core.models import Document, Invitation, User
from core.services.converter_services import ConversionError, YdocConverter
@@ -20,7 +21,7 @@ pytestmark = pytest.mark.django_db
@pytest.fixture
def mock_convert_markdown():
def mock_convert_md():
"""Mock YdocConverter.convert_markdown to return a converted content."""
with patch.object(
YdocConverter,
@@ -169,8 +170,11 @@ def test_api_documents_create_for_owner_invalid_sub():
@override_settings(SERVER_TO_SERVER_API_TOKENS=["DummyToken"])
def test_api_documents_create_for_owner_existing(mock_convert_markdown):
"""It should be possible to create a document on behalf of a pre-existing user."""
def test_api_documents_create_for_owner_existing(mock_convert_md):
"""
It should be possible to create a document on behalf of a pre-existing user
by passing their sub and email.
"""
user = factories.UserFactory(language="en-us")
data = {
@@ -189,7 +193,7 @@ def test_api_documents_create_for_owner_existing(mock_convert_markdown):
assert response.status_code == 201
mock_convert_markdown.assert_called_once_with("Document content")
mock_convert_md.assert_called_once_with("Document content")
document = Document.objects.get()
assert response.json() == {"id": str(document.id)}
@@ -213,10 +217,10 @@ def test_api_documents_create_for_owner_existing(mock_convert_markdown):
@override_settings(SERVER_TO_SERVER_API_TOKENS=["DummyToken"])
def test_api_documents_create_for_owner_new_user(mock_convert_markdown):
def test_api_documents_create_for_owner_new_user(mock_convert_md):
"""
It should be possible to create a document on behalf of new users by
passing only their email address.
passing their unknown sub and email address.
"""
data = {
"title": "My Document",
@@ -234,7 +238,7 @@ def test_api_documents_create_for_owner_new_user(mock_convert_markdown):
assert response.status_code == 201
mock_convert_markdown.assert_called_once_with("Document content")
mock_convert_md.assert_called_once_with("Document content")
document = Document.objects.get()
assert response.json() == {"id": str(document.id)}
@@ -264,8 +268,190 @@ def test_api_documents_create_for_owner_new_user(mock_convert_markdown):
assert document.creator == user
@override_settings(
SERVER_TO_SERVER_API_TOKENS=["DummyToken"],
OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION=True,
)
def test_api_documents_create_for_owner_existing_user_email_no_sub_with_fallback(
mock_convert_md,
):
"""
It should be possible to create a document on behalf of a pre-existing user for
who the sub was not found if the settings allow it. This edge case should not
happen in a healthy OIDC federation but can be usefull if an OIDC provider modifies
users sub on each login for example...
"""
user = factories.UserFactory(language="en-us")
data = {
"title": "My Document",
"content": "Document content",
"sub": "123",
"email": user.email,
}
response = APIClient().post(
"/api/v1.0/documents/create-for-owner/",
data,
format="json",
HTTP_AUTHORIZATION="Bearer DummyToken",
)
assert response.status_code == 201
mock_convert_md.assert_called_once_with("Document content")
document = Document.objects.get()
assert response.json() == {"id": str(document.id)}
assert document.title == "My Document"
assert document.content == "Converted document content"
assert document.creator == user
assert document.accesses.filter(user=user, role="owner").exists()
assert Invitation.objects.exists() is False
assert len(mail.outbox) == 1
email = mail.outbox[0]
assert email.to == [user.email]
assert email.subject == "A new document was created on your behalf!"
email_content = " ".join(email.body.split())
assert "A new document was created on your behalf!" in email_content
assert (
"You have been granted ownership of a new document: My Document"
) in email_content
@override_settings(
SERVER_TO_SERVER_API_TOKENS=["DummyToken"],
OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION=False,
OIDC_ALLOW_DUPLICATE_EMAILS=False,
)
def test_api_documents_create_for_owner_existing_user_email_no_sub_no_fallback(
mock_convert_md,
):
"""
When a user does not match an existing sub and fallback to matching on email is
not allowed in settings, it should raise an error if the email is already used by
a registered user and duplicate emails are not allowed.
"""
user = factories.UserFactory()
data = {
"title": "My Document",
"content": "Document content",
"sub": "123",
"email": user.email,
}
response = APIClient().post(
"/api/v1.0/documents/create-for-owner/",
data,
format="json",
HTTP_AUTHORIZATION="Bearer DummyToken",
)
assert response.status_code == 400
assert response.json() == {
"email": [
(
"We couldn't find a user with this sub but the email is already "
"associated with a registered user."
)
]
}
assert mock_convert_md.called is False
assert Document.objects.exists() is False
assert Invitation.objects.exists() is False
assert len(mail.outbox) == 0
@override_settings(
SERVER_TO_SERVER_API_TOKENS=["DummyToken"],
OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION=False,
OIDC_ALLOW_DUPLICATE_EMAILS=True,
)
def test_api_documents_create_for_owner_new_user_no_sub_no_fallback_allow_duplicate(
mock_convert_md,
):
"""
When a user does not match an existing sub and fallback to matching on email is
not allowed in settings, it should be possible to create a new user with the same
email as an existing user if the settings allow it (identification is still done
via the sub in this case).
"""
user = factories.UserFactory()
data = {
"title": "My Document",
"content": "Document content",
"sub": "123",
"email": user.email,
}
response = APIClient().post(
"/api/v1.0/documents/create-for-owner/",
data,
format="json",
HTTP_AUTHORIZATION="Bearer DummyToken",
)
assert response.status_code == 201
mock_convert_md.assert_called_once_with("Document content")
document = Document.objects.get()
assert response.json() == {"id": str(document.id)}
assert document.title == "My Document"
assert document.content == "Converted document content"
assert document.creator is None
assert document.accesses.exists() is False
invitation = Invitation.objects.get()
assert invitation.email == user.email
assert invitation.role == "owner"
assert len(mail.outbox) == 1
email = mail.outbox[0]
assert email.to == [user.email]
assert email.subject == "A new document was created on your behalf!"
email_content = " ".join(email.body.split())
assert "A new document was created on your behalf!" in email_content
assert (
"You have been granted ownership of a new document: My Document"
) in email_content
# The creator field on the document should be set when the user is created
user = User.objects.create(email=user.email, password="!")
document.refresh_from_db()
assert document.creator == user
@patch.object(ServerCreateDocumentSerializer, "_send_email_notification")
@override_settings(SERVER_TO_SERVER_API_TOKENS=["DummyToken"], LANGUAGE_CODE="de-de")
def test_api_documents_create_for_owner_with_default_language(
mock_send, mock_convert_md
):
"""The default language from settings should apply by default."""
data = {
"title": "My Document",
"content": "Document content",
"sub": "123",
"email": "john.doe@example.com",
}
response = APIClient().post(
"/api/v1.0/documents/create-for-owner/",
data,
format="json",
HTTP_AUTHORIZATION="Bearer DummyToken",
)
assert response.status_code == 201
mock_convert_md.assert_called_once_with("Document content")
assert mock_send.call_args[0][3] == "de-de"
@override_settings(SERVER_TO_SERVER_API_TOKENS=["DummyToken"])
def test_api_documents_create_for_owner_with_custom_language(mock_convert_markdown):
def test_api_documents_create_for_owner_with_custom_language(mock_convert_md):
"""
Test creating a document with a specific language.
Useful if the remote server knows the user's language.
@@ -287,7 +473,7 @@ def test_api_documents_create_for_owner_with_custom_language(mock_convert_markdo
assert response.status_code == 201
mock_convert_markdown.assert_called_once_with("Document content")
mock_convert_md.assert_called_once_with("Document content")
assert len(mail.outbox) == 1
email = mail.outbox[0]
@@ -302,7 +488,7 @@ def test_api_documents_create_for_owner_with_custom_language(mock_convert_markdo
@override_settings(SERVER_TO_SERVER_API_TOKENS=["DummyToken"])
def test_api_documents_create_for_owner_with_custom_subject_and_message(
mock_convert_markdown,
mock_convert_md,
):
"""It should be possible to customize the subject and message of the invitation email."""
data = {
@@ -323,7 +509,7 @@ def test_api_documents_create_for_owner_with_custom_subject_and_message(
assert response.status_code == 201
mock_convert_markdown.assert_called_once_with("Document content")
mock_convert_md.assert_called_once_with("Document content")
assert len(mail.outbox) == 1
email = mail.outbox[0]
@@ -336,11 +522,11 @@ def test_api_documents_create_for_owner_with_custom_subject_and_message(
@override_settings(SERVER_TO_SERVER_API_TOKENS=["DummyToken"])
def test_api_documents_create_for_owner_with_converter_exception(
mock_convert_markdown,
mock_convert_md,
):
"""It should be possible to customize the subject and message of the invitation email."""
"""In case of converter error, a 400 error should be raised."""
mock_convert_markdown.side_effect = ConversionError("Conversion failed")
mock_convert_md.side_effect = ConversionError("Conversion failed")
data = {
"title": "My Document",
@@ -357,8 +543,33 @@ def test_api_documents_create_for_owner_with_converter_exception(
format="json",
HTTP_AUTHORIZATION="Bearer DummyToken",
)
mock_convert_md.assert_called_once_with("Document content")
mock_convert_markdown.assert_called_once_with("Document content")
assert response.status_code == 400
assert response.json() == {"content": ["Could not convert content"]}
assert response.status_code == 500
assert response.json() == {"detail": "could not convert content"}
@override_settings(SERVER_TO_SERVER_API_TOKENS=["DummyToken"])
def test_api_documents_create_for_owner_with_empty_content():
"""The content should not be empty or a 400 error should be raised."""
data = {
"title": "My Document",
"content": " ",
"sub": "123",
"email": "john.doe@example.com",
}
response = APIClient().post(
"/api/v1.0/documents/create-for-owner/",
data,
format="json",
HTTP_AUTHORIZATION="Bearer DummyToken",
)
assert response.status_code == 400
assert response.json() == {
"content": [
"This field may not be blank.",
],
}

View File

@@ -0,0 +1,30 @@
"""
Unit tests for the User model
"""
import pytest
from impress.settings import Base
def test_invalid_settings_oidc_email_configuration():
"""
The OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION and OIDC_ALLOW_DUPLICATE_EMAILS settings
should not be both set to True simultaneously.
"""
class TestSettings(Base):
"""Fake test settings."""
OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION = True
OIDC_ALLOW_DUPLICATE_EMAILS = True
# The validation is performed during post_setup
with pytest.raises(ValueError) as excinfo:
TestSettings().post_setup()
# Check the exception message
assert str(excinfo.value) == (
"Both OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION and "
"OIDC_ALLOW_DUPLICATE_EMAILS cannot be set to True simultaneously. "
)

View File

@@ -474,6 +474,15 @@ class Base(Configuration):
environ_prefix=None,
)
# WARNING: Enabling this setting allows multiple user accounts to share the same email
# address. This may cause security issues and is not recommended for production use when
# email is activated as fallback for identification (see previous setting).
OIDC_ALLOW_DUPLICATE_EMAILS = values.BooleanValue(
default=False,
environ_name="OIDC_ALLOW_DUPLICATE_EMAILS",
environ_prefix=None,
)
USER_OIDC_ESSENTIAL_CLAIMS = values.ListValue(
default=[], environ_name="USER_OIDC_ESSENTIAL_CLAIMS", environ_prefix=None
)
@@ -625,6 +634,15 @@ class Base(Configuration):
with sentry_sdk.configure_scope() as scope:
scope.set_extra("application", "backend")
if (
cls.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION
and cls.OIDC_ALLOW_DUPLICATE_EMAILS
):
raise ValueError(
"Both OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION and "
"OIDC_ALLOW_DUPLICATE_EMAILS cannot be set to True simultaneously. "
)
class Build(Base):
"""Settings used when the application is built.