✨(oidc) add django-oauth-toolkit w/ configuration
This allows to use `people` as an identity provider using OIDC and local users. This commit is partial, because it does not manage a way to create "local" users and the login page is the admin one, which can't be used for non staff users or login with email.
This commit is contained in:
1
src/backend/mailbox_oauth2/__init__.py
Normal file
1
src/backend/mailbox_oauth2/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""People application to allow OAuth2 authentication with OIDC provider using mailbox."""
|
||||
1
src/backend/mailbox_oauth2/apps.py
Normal file
1
src/backend/mailbox_oauth2/apps.py
Normal file
@@ -0,0 +1 @@
|
||||
"""People application to allow OAuth2 authentication with OIDC provider using mailbox."""
|
||||
129
src/backend/mailbox_oauth2/backends.py
Normal file
129
src/backend/mailbox_oauth2/backends.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""Authentication backend for OIDC provider"""
|
||||
|
||||
import logging
|
||||
from email.errors import HeaderParseError
|
||||
from email.headerregistry import Address
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.backends import ModelBackend
|
||||
from django.core.cache import cache
|
||||
from django.utils.text import slugify
|
||||
|
||||
from mailbox_manager.models import Mailbox
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_username_domain_from_email(email: str):
|
||||
"""Extract local part and domain from email."""
|
||||
try:
|
||||
address = Address(addr_spec=email)
|
||||
if len(address.username) > 64 or len(address.domain) > 255:
|
||||
# Simple length validation using the RFC 5321 limits
|
||||
return None, None
|
||||
return address.username, address.domain
|
||||
except (TypeError, ValueError, AttributeError, IndexError, HeaderParseError) as exc:
|
||||
logger.exception(exc)
|
||||
return None, None
|
||||
|
||||
|
||||
class MailboxModelBackend(ModelBackend):
|
||||
"""
|
||||
Custom authentication backend for OIDC provider, enforce the use of email as the username.
|
||||
|
||||
Warning: This authentication backend is not suitable for general use, it is
|
||||
tailored for the OIDC provider and will only authenticate user and allow
|
||||
them to access the /o/authorize endpoint **only**.
|
||||
"""
|
||||
|
||||
def _get_cache_key(self, email):
|
||||
"""Generate a cache key for tracking login attempts."""
|
||||
stringified_email = email.replace("@", "_at_").replace(".", "_dot_")
|
||||
return f"login_attempts_{slugify(stringified_email)}"
|
||||
|
||||
def _increment_login_attempts(self, email):
|
||||
"""Increment the number of failed login attempts."""
|
||||
cache_key = self._get_cache_key(email)
|
||||
attempts = cache.get(cache_key, 0) + 1
|
||||
cache.set(cache_key, attempts, settings.ACCOUNT_LOCKOUT_TIME)
|
||||
|
||||
def _reset_login_attempts(self, email):
|
||||
"""Reset the number of failed login attempts."""
|
||||
cache_key = self._get_cache_key(email)
|
||||
cache.delete(cache_key)
|
||||
|
||||
def _is_login_attempts_exceeded(self, email) -> bool:
|
||||
"""Check if the account is locked due to too many failed attempts."""
|
||||
cache_key = self._get_cache_key(email)
|
||||
attempts = cache.get(cache_key, 0)
|
||||
return attempts >= settings.MAX_LOGIN_ATTEMPTS
|
||||
|
||||
def get_user(self, user_id):
|
||||
"""Retrieve a user, here a mailbox, by its unique identifier."""
|
||||
try:
|
||||
mailbox = Mailbox.objects.get(pk=user_id)
|
||||
except Mailbox.DoesNotExist:
|
||||
return None
|
||||
|
||||
if self.user_can_authenticate(mailbox):
|
||||
return mailbox
|
||||
|
||||
return None
|
||||
|
||||
def authenticate(self, request, username=None, password=None, email=None, **kwargs):
|
||||
"""Authenticate a user based on email and password"""
|
||||
if username or email is None: # ignore if username is provided
|
||||
return None
|
||||
|
||||
# Disable this backend if the corresponding middleware is not defined.
|
||||
if (
|
||||
"mailbox_oauth2.middleware.one_time_email_authenticated_session"
|
||||
not in settings.MIDDLEWARE
|
||||
):
|
||||
logger.error(
|
||||
"EmailModelBackend was triggered but the `one_time_email_authenticated_session` "
|
||||
"is not set: ignoring authentication."
|
||||
)
|
||||
return None
|
||||
|
||||
# Check if the account is locked
|
||||
if self._is_login_attempts_exceeded(email):
|
||||
logger.warning("Account locked due to too many failed attempts: %s", email)
|
||||
# Run the default password hasher once to reduce the timing
|
||||
# difference between a locked account and valid one (django issue #20760)
|
||||
Mailbox().set_password(password)
|
||||
return None
|
||||
|
||||
local_part, domain = get_username_domain_from_email(email)
|
||||
if local_part is None or domain is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
user = Mailbox.objects.select_related("domain").get(
|
||||
local_part__iexact=local_part, domain__name__iexact=domain
|
||||
)
|
||||
except Mailbox.DoesNotExist:
|
||||
# Run the default password hasher once to reduce the timing
|
||||
# difference between an existing and a nonexistent user (django issue #20760).
|
||||
Mailbox().set_password(password)
|
||||
else:
|
||||
if not self.user_can_authenticate(user):
|
||||
# Run the default password hasher once to reduce the timing
|
||||
# difference between a user who can authenticate and another one.
|
||||
Mailbox().set_password(password)
|
||||
|
||||
elif user.check_password(password):
|
||||
# Reset attempts on successful login
|
||||
self._reset_login_attempts(email)
|
||||
return user
|
||||
|
||||
else:
|
||||
# Track failed attempt
|
||||
self._increment_login_attempts(email)
|
||||
|
||||
return None
|
||||
|
||||
def user_can_authenticate(self, user):
|
||||
"""Verify the user can authenticate."""
|
||||
user_can_authenticate = super().user_can_authenticate(user)
|
||||
return user_can_authenticate and user.domain.is_identity_provider_ready()
|
||||
50
src/backend/mailbox_oauth2/middleware.py
Normal file
50
src/backend/mailbox_oauth2/middleware.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Middleware to allow a user to authenticate against "/o/authorize" only once."""
|
||||
|
||||
from django.contrib.auth import BACKEND_SESSION_KEY
|
||||
from django.contrib.sessions.exceptions import SuspiciousSession
|
||||
from django.urls import reverse
|
||||
|
||||
|
||||
def one_time_email_authenticated_session(get_response):
|
||||
"""Middleware to allow a user to authenticate against "/o/authorize" only once."""
|
||||
|
||||
def middleware(request):
|
||||
# Code executed for each request before the view (and later middleware) are called.
|
||||
if not request.user.is_authenticated:
|
||||
# If user is not authenticated, proceed:
|
||||
# this is not this middleware's concern
|
||||
return get_response(request)
|
||||
|
||||
# Check if the auth backend is stored in session
|
||||
auth_backend = request.session.get(BACKEND_SESSION_KEY)
|
||||
|
||||
if auth_backend != "mailbox_oauth2.backends.MailboxModelBackend":
|
||||
# If the backend is not MailboxModelBackend, proceed:
|
||||
# this is not this middleware's concern
|
||||
return get_response(request)
|
||||
|
||||
# Allow access only to /o/authorize path
|
||||
if request.path != reverse("oauth2_provider:authorize"):
|
||||
# Kill the session immediately
|
||||
request.session.flush()
|
||||
raise SuspiciousSession(
|
||||
"Session was killed because user tried to access unauthorized path"
|
||||
)
|
||||
|
||||
response = get_response(request)
|
||||
|
||||
# Code executed for each request/response after the view is called.
|
||||
# When the response is a 200, the user might still be in the authentication flow
|
||||
# otherwise, we can kill the session as the current login process is done,
|
||||
# the user will be authenticated again when coming back from the OIDC federation.
|
||||
# We preserve the oidc_states for the case when the user wants to access people
|
||||
# in the first place (people -> keycloak -> people login -> keycloak -> people)
|
||||
if response.status_code != 200:
|
||||
state = request.session.get("oidc_states")
|
||||
request.session.flush()
|
||||
if state:
|
||||
request.session["oidc_states"] = state
|
||||
|
||||
return response
|
||||
|
||||
return middleware
|
||||
98
src/backend/mailbox_oauth2/migrations/0001_initial.py
Normal file
98
src/backend/mailbox_oauth2/migrations/0001_initial.py
Normal file
@@ -0,0 +1,98 @@
|
||||
# Generated by Django 5.1.5 on 2025-02-07 10:49
|
||||
|
||||
import django.db.models.deletion
|
||||
import oauth2_provider.models
|
||||
import uuid
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('mailbox_manager', '0023_mailbox_email_mailbox_last_login_mailbox_password'),
|
||||
migrations.swappable_dependency(settings.OAUTH2_PROVIDER_APPLICATION_MODEL),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Grant',
|
||||
fields=[
|
||||
('id', models.BigAutoField(primary_key=True, serialize=False)),
|
||||
('code', models.CharField(max_length=255, unique=True)),
|
||||
('expires', models.DateTimeField()),
|
||||
('redirect_uri', models.TextField()),
|
||||
('scope', models.TextField(blank=True)),
|
||||
('created', models.DateTimeField(auto_now_add=True)),
|
||||
('updated', models.DateTimeField(auto_now=True)),
|
||||
('code_challenge', models.CharField(blank=True, default='', max_length=128)),
|
||||
('code_challenge_method', models.CharField(blank=True, choices=[('plain', 'plain'), ('S256', 'S256')], default='', max_length=10)),
|
||||
('nonce', models.CharField(blank=True, default='', max_length=255)),
|
||||
('claims', models.TextField(blank=True)),
|
||||
('application', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.OAUTH2_PROVIDER_APPLICATION_MODEL)),
|
||||
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='%(app_label)s_%(class)s', to='mailbox_manager.mailbox')),
|
||||
],
|
||||
options={
|
||||
'abstract': False,
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='IDToken',
|
||||
fields=[
|
||||
('id', models.BigAutoField(primary_key=True, serialize=False)),
|
||||
('jti', models.UUIDField(default=uuid.uuid4, editable=False, unique=True, verbose_name='JWT Token ID')),
|
||||
('expires', models.DateTimeField()),
|
||||
('scope', models.TextField(blank=True)),
|
||||
('created', models.DateTimeField(auto_now_add=True)),
|
||||
('updated', models.DateTimeField(auto_now=True)),
|
||||
('application', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.OAUTH2_PROVIDER_APPLICATION_MODEL)),
|
||||
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='%(app_label)s_%(class)s', to='mailbox_manager.mailbox')),
|
||||
],
|
||||
options={
|
||||
'abstract': False,
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='AccessToken',
|
||||
fields=[
|
||||
('id', models.BigAutoField(primary_key=True, serialize=False)),
|
||||
('token', models.TextField()),
|
||||
('token_checksum', oauth2_provider.models.TokenChecksumField(db_index=True, max_length=64, unique=True)),
|
||||
('expires', models.DateTimeField()),
|
||||
('scope', models.TextField(blank=True)),
|
||||
('created', models.DateTimeField(auto_now_add=True)),
|
||||
('updated', models.DateTimeField(auto_now=True)),
|
||||
('application', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.OAUTH2_PROVIDER_APPLICATION_MODEL)),
|
||||
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='%(app_label)s_%(class)s', to='mailbox_manager.mailbox')),
|
||||
('id_token', models.OneToOneField(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='access_token', to=settings.OAUTH2_PROVIDER_ID_TOKEN_MODEL)),
|
||||
],
|
||||
options={
|
||||
'abstract': False,
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='RefreshToken',
|
||||
fields=[
|
||||
('id', models.BigAutoField(primary_key=True, serialize=False)),
|
||||
('token', models.CharField(max_length=255)),
|
||||
('token_family', models.UUIDField(blank=True, editable=False, null=True)),
|
||||
('created', models.DateTimeField(auto_now_add=True)),
|
||||
('updated', models.DateTimeField(auto_now=True)),
|
||||
('revoked', models.DateTimeField(null=True)),
|
||||
('access_token', models.OneToOneField(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='refresh_token', to=settings.OAUTH2_PROVIDER_ACCESS_TOKEN_MODEL)),
|
||||
('application', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.OAUTH2_PROVIDER_APPLICATION_MODEL)),
|
||||
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='%(app_label)s_%(class)s', to='mailbox_manager.mailbox')),
|
||||
],
|
||||
options={
|
||||
'abstract': False,
|
||||
'unique_together': {('token', 'revoked')},
|
||||
},
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='accesstoken',
|
||||
name='source_refresh_token',
|
||||
field=models.OneToOneField(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='refreshed_access_token', to=settings.OAUTH2_PROVIDER_REFRESH_TOKEN_MODEL),
|
||||
),
|
||||
]
|
||||
0
src/backend/mailbox_oauth2/migrations/__init__.py
Normal file
0
src/backend/mailbox_oauth2/migrations/__init__.py
Normal file
78
src/backend/mailbox_oauth2/models.py
Normal file
78
src/backend/mailbox_oauth2/models.py
Normal file
@@ -0,0 +1,78 @@
|
||||
"""
|
||||
Project custom models for the OAuth2 provider.
|
||||
|
||||
We replace the former user model with a mailbox model.
|
||||
"""
|
||||
|
||||
from django.db import models
|
||||
|
||||
from oauth2_provider.models import (
|
||||
AbstractAccessToken,
|
||||
AbstractGrant,
|
||||
AbstractIDToken,
|
||||
AbstractRefreshToken,
|
||||
)
|
||||
|
||||
|
||||
class Grant(AbstractGrant):
|
||||
"""
|
||||
A Grant instance represents a token with a short lifetime that can
|
||||
be swapped for an access token, as described in :rfc:`4.1.2`
|
||||
|
||||
Replaces the user with a mailbox instance.
|
||||
"""
|
||||
|
||||
user = models.ForeignKey(
|
||||
"mailbox_manager.Mailbox",
|
||||
on_delete=models.CASCADE,
|
||||
related_name="%(app_label)s_%(class)s",
|
||||
)
|
||||
|
||||
|
||||
class IDToken(AbstractIDToken):
|
||||
"""
|
||||
An IDToken instance represents the actual token to
|
||||
access user's resources, as in :openid:`2`.
|
||||
|
||||
Replaces the user with a mailbox instance.
|
||||
"""
|
||||
|
||||
user = models.ForeignKey(
|
||||
"mailbox_manager.Mailbox",
|
||||
on_delete=models.CASCADE,
|
||||
blank=True,
|
||||
null=True,
|
||||
related_name="%(app_label)s_%(class)s",
|
||||
)
|
||||
|
||||
|
||||
class AccessToken(AbstractAccessToken):
|
||||
"""
|
||||
An AccessToken instance represents the actual access token to
|
||||
access user's resources, as in :rfc:`5`.
|
||||
|
||||
Replaces the user with a mailbox instance.
|
||||
"""
|
||||
|
||||
user = models.ForeignKey(
|
||||
"mailbox_manager.Mailbox",
|
||||
on_delete=models.CASCADE,
|
||||
blank=True,
|
||||
null=True,
|
||||
related_name="%(app_label)s_%(class)s",
|
||||
)
|
||||
|
||||
|
||||
class RefreshToken(AbstractRefreshToken):
|
||||
"""
|
||||
A RefreshToken instance represents a token that can be swapped for a new
|
||||
access token when it expires.
|
||||
|
||||
Replaces the user with a mailbox instance.
|
||||
"""
|
||||
|
||||
user = models.ForeignKey(
|
||||
"mailbox_manager.Mailbox",
|
||||
on_delete=models.CASCADE,
|
||||
related_name="%(app_label)s_%(class)s",
|
||||
)
|
||||
48
src/backend/mailbox_oauth2/serializers.py
Normal file
48
src/backend/mailbox_oauth2/serializers.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""
|
||||
Serializers for the mailbox_oauth2 app.
|
||||
"""
|
||||
|
||||
from django.contrib.auth import authenticate
|
||||
|
||||
from rest_framework import serializers
|
||||
|
||||
|
||||
class LoginSerializer(serializers.Serializer):
|
||||
"""
|
||||
Serializer for user login authentication.
|
||||
|
||||
Validates the email and password fields required for user authentication.
|
||||
"""
|
||||
|
||||
email = serializers.EmailField(
|
||||
help_text="User's email address for authentication",
|
||||
required=True,
|
||||
)
|
||||
password = serializers.CharField(
|
||||
write_only=True,
|
||||
help_text="User's password for authentication",
|
||||
required=True,
|
||||
)
|
||||
|
||||
def create(self, validated_data):
|
||||
"""Do not allow creating instances from this serializer."""
|
||||
raise RuntimeError("LoginSerializer does not support create method")
|
||||
|
||||
def update(self, instance, validated_data):
|
||||
"""Do not allow updating instances from this serializer."""
|
||||
raise RuntimeError("LoginSerializer does not support update method")
|
||||
|
||||
def validate(self, attrs):
|
||||
"""
|
||||
Validate the email and password fields.
|
||||
"""
|
||||
email = attrs.get("email")
|
||||
password = attrs.get("password")
|
||||
|
||||
if not (email and password):
|
||||
raise serializers.ValidationError('Must include "email" and "password"')
|
||||
|
||||
attrs["user"] = authenticate(
|
||||
self.context["request"], email=email, password=password
|
||||
)
|
||||
return attrs
|
||||
1
src/backend/mailbox_oauth2/tests/__init__.py
Normal file
1
src/backend/mailbox_oauth2/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for OAuth2 authentication with OIDC provider using mailbox."""
|
||||
270
src/backend/mailbox_oauth2/tests/test_backends.py
Normal file
270
src/backend/mailbox_oauth2/tests/test_backends.py
Normal file
@@ -0,0 +1,270 @@
|
||||
"""Test authentication backend for OIDC provider."""
|
||||
|
||||
from django.contrib.auth.hashers import make_password
|
||||
|
||||
import pytest
|
||||
|
||||
from core import factories as core_factories
|
||||
|
||||
from mailbox_manager import factories
|
||||
from mailbox_oauth2.backends import MailboxModelBackend, get_username_domain_from_email
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
def test_authenticate_valid_credentials():
|
||||
"""Test authentication with valid credentials."""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(organization=organization)
|
||||
mailbox = factories.MailboxEnabledFactory(domain=domain)
|
||||
|
||||
assert domain.is_identity_provider_ready()
|
||||
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, email=f"{mailbox.local_part}@{domain.name}", password="password"
|
||||
)
|
||||
|
||||
assert authenticated_user == mailbox
|
||||
|
||||
# Is case insensitive
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, email=f"{mailbox.local_part.upper()}@{domain.name}", password="password"
|
||||
)
|
||||
|
||||
assert authenticated_user == mailbox
|
||||
|
||||
|
||||
def test_authenticate_no_organization():
|
||||
"""Test authentication when domain don't have organization."""
|
||||
domain = factories.MailDomainEnabledFactory()
|
||||
mailbox = factories.MailboxEnabledFactory(domain=domain)
|
||||
|
||||
assert not domain.is_identity_provider_ready()
|
||||
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, email=f"{mailbox.local_part}@{domain.name}", password="password"
|
||||
)
|
||||
|
||||
assert authenticated_user is None
|
||||
|
||||
|
||||
def test_authenticate_invalid_email_format():
|
||||
"""Test authentication with invalid email format."""
|
||||
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, email="invalid-email", password="any-password"
|
||||
)
|
||||
|
||||
assert authenticated_user is None
|
||||
|
||||
|
||||
def test_authenticate_nonexistent_user():
|
||||
"""Test authentication with non-existent user."""
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, email="nonexistent@domain.com", password="any-password"
|
||||
)
|
||||
|
||||
assert authenticated_user is None
|
||||
|
||||
|
||||
def test_authenticate_wrong_password():
|
||||
"""Test authentication with wrong password."""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(organization=organization)
|
||||
mailbox = factories.MailboxEnabledFactory(domain=domain)
|
||||
|
||||
assert domain.is_identity_provider_ready()
|
||||
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, email=f"{mailbox.local_part}@{domain.name}", password="wrong-password"
|
||||
)
|
||||
|
||||
assert authenticated_user is None
|
||||
|
||||
|
||||
def test_authenticate_without_middleware():
|
||||
"""Test authentication without required middleware."""
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, email="any@domain.com", password="any-password"
|
||||
)
|
||||
|
||||
assert authenticated_user is None
|
||||
|
||||
|
||||
def test_authenticate_inactive_domain():
|
||||
"""Test authentication with inactive identity provider domain."""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainFactory(organization=organization)
|
||||
mailbox = factories.MailboxEnabledFactory(domain=domain)
|
||||
|
||||
assert not domain.is_identity_provider_ready()
|
||||
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, email=f"{mailbox.local_part}@{domain.name}", password="password"
|
||||
)
|
||||
|
||||
assert authenticated_user is None
|
||||
|
||||
|
||||
def test_authenticate_unusable_password():
|
||||
"""
|
||||
Test authentication with valid mailbox. but with unusable password.
|
||||
This test is important because we use "make_password(None)" to generate
|
||||
an unusable password for the mailbox (instead of set_unusable_password()
|
||||
which would require an extra query).
|
||||
"""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(organization=organization)
|
||||
mailbox = factories.MailboxEnabledFactory(domain=domain)
|
||||
mailbox.password = make_password(None)
|
||||
mailbox.save()
|
||||
|
||||
assert domain.is_identity_provider_ready()
|
||||
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, email=f"{mailbox.local_part}@{domain.name}", password=mailbox.password
|
||||
)
|
||||
|
||||
assert authenticated_user is None
|
||||
|
||||
|
||||
def test_get_user_exists():
|
||||
"""Test get_user with existing user."""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(organization=organization)
|
||||
mailbox = factories.MailboxEnabledFactory(domain=domain)
|
||||
|
||||
user = MailboxModelBackend().get_user(mailbox.pk)
|
||||
|
||||
assert user == mailbox
|
||||
|
||||
|
||||
def test_get_user_does_not_exist():
|
||||
"""Test get_user with non-existent user."""
|
||||
user = MailboxModelBackend().get_user(999999)
|
||||
|
||||
assert user is None
|
||||
|
||||
|
||||
def test_authenticate_with_username_only():
|
||||
"""Test authentication fails when only username is provided (no email)."""
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, username="test", password="password"
|
||||
)
|
||||
assert authenticated_user is None
|
||||
|
||||
|
||||
def test_authenticate_sql_injection():
|
||||
"""Test authentication is safe against SQL injection attempts."""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(organization=organization)
|
||||
mailbox = factories.MailboxEnabledFactory(domain=domain)
|
||||
|
||||
# Test SQL injection in email field
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, email="' OR '1'='1", password="password"
|
||||
)
|
||||
assert authenticated_user is None
|
||||
|
||||
# Test SQL injection in password field
|
||||
authenticated_user = MailboxModelBackend().authenticate(
|
||||
None, email=f"{mailbox.local_part}@{domain.name}", password="' OR '1'='1"
|
||||
)
|
||||
assert authenticated_user is None
|
||||
|
||||
|
||||
def test_get_inactive_mailbox():
|
||||
"""Test get_user with inactive user."""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(organization=organization)
|
||||
mailbox = factories.MailboxFactory(domain=domain)
|
||||
|
||||
user = MailboxModelBackend().get_user(mailbox.pk)
|
||||
assert user is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"email, expected_username, expected_domain",
|
||||
[
|
||||
("test@example.com", "test", "example.com"),
|
||||
("test+label@example.com", "test+label", "example.com"),
|
||||
("invalid-email", None, None),
|
||||
("", None, None),
|
||||
("üser@exämple.com", None, None),
|
||||
("user@admin@example.com", None, None),
|
||||
("unicodeonly@üser.com", "unicodeonly", "üser.com"),
|
||||
("spaces in@domain.com", None, None),
|
||||
("verylong" + "a" * 1000 + "@example.com", None, None),
|
||||
("verylong@ex" + "a" * 1000 + "mple.com", None, None),
|
||||
("weird\0char@domain.com", None, None),
|
||||
("@domain.com", None, None),
|
||||
("username@", None, None),
|
||||
("quotes'and,commas@example.com", None, None),
|
||||
],
|
||||
)
|
||||
def test_get_username_domain_from_email(email, expected_username, expected_domain):
|
||||
"""Test extracting username and domain from email."""
|
||||
username, domain = get_username_domain_from_email(email)
|
||||
assert username == expected_username
|
||||
assert domain == expected_domain
|
||||
|
||||
|
||||
def test_login_attempts_tracking(settings):
|
||||
"""Test tracking of failed login attempts."""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(organization=organization)
|
||||
mailbox = factories.MailboxEnabledFactory(domain=domain)
|
||||
email = f"{mailbox.local_part}@{domain.name}"
|
||||
backend = MailboxModelBackend()
|
||||
|
||||
# First attempts should allow authentication (but fail due to wrong password)
|
||||
for _ in range(settings.MAX_LOGIN_ATTEMPTS - 1):
|
||||
authenticated_user = backend.authenticate(
|
||||
None, email=email, password="wrong-password"
|
||||
)
|
||||
assert authenticated_user is None
|
||||
|
||||
# Last attempt before lockout
|
||||
authenticated_user = backend.authenticate(
|
||||
None, email=email, password="wrong-password"
|
||||
)
|
||||
assert authenticated_user is None
|
||||
|
||||
# Account should now be locked
|
||||
authenticated_user = backend.authenticate(None, email=email, password="password")
|
||||
assert authenticated_user is None
|
||||
|
||||
|
||||
def test_login_attempts_reset_on_success(settings):
|
||||
"""Test that successful login resets the failed attempts counter."""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(organization=organization)
|
||||
mailbox = factories.MailboxEnabledFactory(domain=domain)
|
||||
email = f"{mailbox.local_part}@{domain.name}"
|
||||
backend = MailboxModelBackend()
|
||||
|
||||
# Make some failed attempts
|
||||
for _ in range(settings.MAX_LOGIN_ATTEMPTS - 1):
|
||||
authenticated_user = backend.authenticate(
|
||||
None, email=email, password="wrong-password"
|
||||
)
|
||||
assert authenticated_user is None
|
||||
|
||||
# Successful login should reset counter
|
||||
authenticated_user = backend.authenticate(None, email=email, password="password")
|
||||
assert authenticated_user == mailbox
|
||||
|
||||
# Should be able to attempt again after reset
|
||||
authenticated_user = backend.authenticate(
|
||||
None, email=email, password="wrong-password"
|
||||
)
|
||||
assert authenticated_user is None
|
||||
|
||||
|
||||
def test_login_attempts_cache_key():
|
||||
"""Test the cache key generation for login attempts."""
|
||||
backend = MailboxModelBackend()
|
||||
email = "test@example.com"
|
||||
|
||||
cache_key = backend._get_cache_key(email) # pylint: disable=protected-access
|
||||
assert cache_key == "login_attempts_test_at_example_dot_com"
|
||||
150
src/backend/mailbox_oauth2/tests/test_middleware.py
Normal file
150
src/backend/mailbox_oauth2/tests/test_middleware.py
Normal file
@@ -0,0 +1,150 @@
|
||||
"""
|
||||
Tests for the mailbox_oauth2.middleware.one_time_email_authenticated_session
|
||||
middleware.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from oauth2_provider.models import Application
|
||||
|
||||
from core import factories as core_factories
|
||||
|
||||
from mailbox_manager import factories
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
@pytest.fixture(name="authorize_data")
|
||||
def authorize_data_fixture():
|
||||
"""Return the authorize data for the OIDC IdP process."""
|
||||
yield {
|
||||
"scope": "openid",
|
||||
"state": (
|
||||
"3C0yk2i25wrx6fNf9zn9287idFqFHGsGIu7UhuJaP0I"
|
||||
".xYBWCFWCFmQ.hpSB0Fd0TmS8MP7cfFiVjw"
|
||||
".eyJydSI6Imh0dHBzOi8vZGVzay4xMjcuMC4wLjEubml"
|
||||
"wLmlvL2FwaS92MS4wL2NhbGxiYWNrLyIsInJ0IjoiY29"
|
||||
"kZSIsInN0IjoiZ2MzazVSdzREZ0tySERBbHlYaW9vaXg"
|
||||
"wa2IzVkMyMTMifQ"
|
||||
),
|
||||
"response_type": "code",
|
||||
"client_id": "people-idp",
|
||||
"redirect_uri": "https://test",
|
||||
"acr_values": "eidas1",
|
||||
"code_challenge": "36Tcgz62tUu7XvNj_g_jYu6IBi-j7BL-5ZwkW-rI9qc",
|
||||
"code_challenge_method": "S256",
|
||||
"nonce": "9CTyx0RNzP6kkywLyK6pwQ",
|
||||
}
|
||||
|
||||
|
||||
def test_one_time_email_authenticated_session_flow_unallowed_url(client):
|
||||
"""
|
||||
Test the middleware with a user that is authenticated during the
|
||||
OIDC IdP process cannot access random pages.
|
||||
"""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(
|
||||
organization=organization, name="example.com"
|
||||
)
|
||||
factories.MailboxEnabledFactory(domain=domain, local_part="user")
|
||||
|
||||
response = client.post(
|
||||
"/api/v1.0/login/", {"email": "user@example.com", "password": "password"}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
# assert the user has a session
|
||||
assert not client.session.is_empty()
|
||||
|
||||
response = client.get("/api/v1.0/users/me/")
|
||||
assert response.status_code == 400
|
||||
|
||||
# assert the user has no more session
|
||||
assert client.session.is_empty()
|
||||
|
||||
|
||||
def test_one_time_email_authenticated_session_flow_allowed_url(client, authorize_data):
|
||||
"""
|
||||
Test the middleware with a user that is authenticated during the
|
||||
OIDC IdP process can access the OIDC authorize page.
|
||||
"""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(
|
||||
organization=organization, name="example.com"
|
||||
)
|
||||
mailbox = factories.MailboxEnabledFactory(domain=domain, local_part="user")
|
||||
|
||||
response = client.post(
|
||||
"/api/v1.0/login/", {"email": "user@example.com", "password": "password"}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
# assert the user has a session
|
||||
assert not client.session.is_empty()
|
||||
|
||||
# to properly test the /o/authorize/ we need to setup OIDC identity provider
|
||||
Application.objects.create(
|
||||
name="people-idp",
|
||||
client_id="people-idp",
|
||||
redirect_uris="https://test",
|
||||
client_type=Application.CLIENT_CONFIDENTIAL,
|
||||
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
|
||||
)
|
||||
|
||||
response = client.get(
|
||||
f"/o/authorize/?{'&'.join(f'{k}={v}' for k, v in authorize_data.items())}"
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.context["user"] == mailbox
|
||||
|
||||
# assert the user has a session
|
||||
assert not client.session.is_empty()
|
||||
|
||||
response = client.post(
|
||||
"/o/authorize/",
|
||||
authorize_data,
|
||||
)
|
||||
assert response.status_code == 302
|
||||
|
||||
# assert the user has no more session
|
||||
assert client.session.is_empty()
|
||||
|
||||
|
||||
def test_one_time_email_authenticated_session_flow_allowed_url_skip_authorization(
|
||||
client,
|
||||
authorize_data,
|
||||
):
|
||||
"""
|
||||
Test the middleware with a user that is authenticated during the
|
||||
OIDC IdP process can access the OIDC authorize page.
|
||||
"""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(
|
||||
organization=organization, name="example.com"
|
||||
)
|
||||
factories.MailboxEnabledFactory(domain=domain, local_part="user")
|
||||
|
||||
response = client.post(
|
||||
"/api/v1.0/login/", {"email": "user@example.com", "password": "password"}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
# assert the user has a session
|
||||
assert not client.session.is_empty()
|
||||
|
||||
# to properly test the /o/authorize/ we need to setup OIDC identity provider
|
||||
Application.objects.create(
|
||||
name="people-idp",
|
||||
client_id="people-idp",
|
||||
redirect_uris="https://test",
|
||||
client_type=Application.CLIENT_CONFIDENTIAL,
|
||||
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
|
||||
skip_authorization=True,
|
||||
)
|
||||
|
||||
response = client.get(
|
||||
f"/o/authorize/?{'&'.join(f'{k}={v}' for k, v in authorize_data.items())}"
|
||||
)
|
||||
assert response.status_code == 302
|
||||
|
||||
# assert the user has no more session
|
||||
assert client.session.is_empty()
|
||||
194
src/backend/mailbox_oauth2/tests/test_validators.py
Normal file
194
src/backend/mailbox_oauth2/tests/test_validators.py
Normal file
@@ -0,0 +1,194 @@
|
||||
"""Tests for OAuth2 validators."""
|
||||
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
|
||||
import pytest
|
||||
from oauth2_provider.models import Application
|
||||
|
||||
from core import factories as core_factories
|
||||
|
||||
from mailbox_manager import factories
|
||||
from mailbox_oauth2.validators import BaseValidator, ProConnectValidator
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
@pytest.fixture(name="mailbox")
|
||||
def mailbox_fixture():
|
||||
"""Create a mailbox for testing."""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(
|
||||
organization=organization, name="example.com"
|
||||
)
|
||||
return factories.MailboxEnabledFactory(
|
||||
domain=domain, local_part="user", first_name="John", last_name="Doe"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="oauth_request_authenticated")
|
||||
def oauth_request_authenticated_fixture(mailbox):
|
||||
"""Create a mock OAuth request object with authenticated user."""
|
||||
|
||||
class MockRequest: # pylint: disable=missing-class-docstring
|
||||
def __init__(self):
|
||||
self.user = mailbox
|
||||
self.scopes = set()
|
||||
self.claims = None
|
||||
self.acr_values = None
|
||||
|
||||
return MockRequest()
|
||||
|
||||
|
||||
@pytest.fixture(name="oauth_request_anonymous")
|
||||
def oauth_request_anonymous_fixture():
|
||||
"""Create a mock OAuth request object with anonymous user."""
|
||||
|
||||
class MockRequest: # pylint: disable=missing-class-docstring
|
||||
def __init__(self):
|
||||
self.user = AnonymousUser()
|
||||
self.scopes = set()
|
||||
self.claims = None
|
||||
self.acr_values = None
|
||||
|
||||
return MockRequest()
|
||||
|
||||
|
||||
@pytest.fixture(name="oauth_request_for_auth_code")
|
||||
def oauth_request_for_auth_code_fixture(oauth_request_authenticated):
|
||||
"""Create a mock OAuth request object with full authorization code attributes."""
|
||||
|
||||
class MockRequestWithClient: # pylint: disable=missing-class-docstring,too-many-instance-attributes
|
||||
def __init__(self, base_request):
|
||||
self.user = base_request.user
|
||||
self.scopes = {"openid"}
|
||||
self.claims = None
|
||||
self.acr_values = None
|
||||
# Required OAuth2 attributes for authorization code
|
||||
self.client = Application.objects.create(
|
||||
name="test_app",
|
||||
client_type=Application.CLIENT_CONFIDENTIAL,
|
||||
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
|
||||
skip_authorization=True,
|
||||
)
|
||||
self.redirect_uri = "https://example.com/callback"
|
||||
self.code_challenge = "test_challenge"
|
||||
self.code_challenge_method = "S256"
|
||||
self.nonce = "test_nonce"
|
||||
|
||||
return MockRequestWithClient(oauth_request_authenticated)
|
||||
|
||||
|
||||
# Base Validator Tests
|
||||
def test_get_additional_claims_basic(oauth_request_authenticated):
|
||||
"""Test basic additional claims without scopes."""
|
||||
validator = BaseValidator()
|
||||
claims = validator.get_additional_claims(oauth_request_authenticated)
|
||||
|
||||
assert claims["sub"] == str(oauth_request_authenticated.user.pk)
|
||||
assert claims["amr"] == "pwd"
|
||||
assert "email" not in claims
|
||||
|
||||
|
||||
def test_get_additional_claims_with_email_scope(oauth_request_authenticated):
|
||||
"""Test additional claims with email scope."""
|
||||
validator = BaseValidator()
|
||||
oauth_request_authenticated.scopes = {"email"}
|
||||
claims = validator.get_additional_claims(oauth_request_authenticated)
|
||||
|
||||
assert claims["email"] == oauth_request_authenticated.user.get_email()
|
||||
|
||||
|
||||
def test_validate_silent_authorization_authenticated(oauth_request_authenticated):
|
||||
"""Test silent authorization with authenticated user."""
|
||||
validator = BaseValidator()
|
||||
assert validator.validate_silent_authorization(oauth_request_authenticated) is True
|
||||
|
||||
|
||||
def test_validate_silent_authorization_unauthenticated(oauth_request_anonymous):
|
||||
"""Test silent authorization with unauthenticated user."""
|
||||
validator = BaseValidator()
|
||||
assert validator.validate_silent_authorization(oauth_request_anonymous) is False
|
||||
|
||||
|
||||
def test_validate_silent_login_authenticated(oauth_request_authenticated):
|
||||
"""Test silent login with authenticated user."""
|
||||
validator = BaseValidator()
|
||||
assert validator.validate_silent_login(oauth_request_authenticated) is True
|
||||
|
||||
|
||||
def test_validate_silent_login_unauthenticated(oauth_request_anonymous):
|
||||
"""Test silent login with unauthenticated user."""
|
||||
validator = BaseValidator()
|
||||
assert validator.validate_silent_login(oauth_request_anonymous) is False
|
||||
|
||||
|
||||
def test_introspect_token_not_implemented():
|
||||
"""Test that introspect_token raises RuntimeError."""
|
||||
validator = BaseValidator()
|
||||
with pytest.raises(RuntimeError, match="Introspection not implemented"):
|
||||
validator.introspect_token(None, None, None)
|
||||
|
||||
|
||||
# ProConnect Validator Tests
|
||||
def test_proconnect_get_additional_claims_given_name(oauth_request_authenticated):
|
||||
"""Test getting given_name claim."""
|
||||
validator = ProConnectValidator()
|
||||
oauth_request_authenticated.scopes = {"given_name"}
|
||||
claims = validator.get_additional_claims(oauth_request_authenticated)
|
||||
assert claims["given_name"] == "John"
|
||||
|
||||
|
||||
def test_proconnect_get_additional_claims_usual_name(oauth_request_authenticated):
|
||||
"""Test getting usual_name claim."""
|
||||
validator = ProConnectValidator()
|
||||
oauth_request_authenticated.scopes = {"usual_name"}
|
||||
claims = validator.get_additional_claims(oauth_request_authenticated)
|
||||
assert claims["usual_name"] == "Doe"
|
||||
|
||||
|
||||
def test_proconnect_get_additional_claims_siret(oauth_request_authenticated):
|
||||
"""Test getting siret claim."""
|
||||
validator = ProConnectValidator()
|
||||
oauth_request_authenticated.scopes = {"siret"}
|
||||
claims = validator.get_additional_claims(oauth_request_authenticated)
|
||||
assert (
|
||||
claims["siret"]
|
||||
== oauth_request_authenticated.user.domain.organization.registration_id_list[0]
|
||||
)
|
||||
|
||||
|
||||
def test_proconnect_get_additional_claims_with_acr_claim(oauth_request_authenticated):
|
||||
"""Test getting acr claim when eidas1 is requested."""
|
||||
validator = ProConnectValidator()
|
||||
oauth_request_authenticated.claims = {"acr": "eidas1"}
|
||||
claims = validator.get_additional_claims(oauth_request_authenticated)
|
||||
assert claims["acr"] == "eidas1"
|
||||
|
||||
|
||||
def test_proconnect_get_additional_claims_without_acr_claim(
|
||||
oauth_request_authenticated,
|
||||
):
|
||||
"""Test no acr claim when not requested."""
|
||||
validator = ProConnectValidator()
|
||||
claims = validator.get_additional_claims(oauth_request_authenticated)
|
||||
assert "acr" not in claims
|
||||
|
||||
|
||||
def test_proconnect_create_authorization_code_with_eidas1(oauth_request_for_auth_code):
|
||||
"""Test creating authorization code with eidas1 acr value."""
|
||||
validator = ProConnectValidator()
|
||||
oauth_request_for_auth_code.acr_values = "eidas1"
|
||||
code = {"code": "test_code"} # OAuth2 provider expects a dict with 'code' key
|
||||
validator._create_authorization_code(oauth_request_for_auth_code, code) # pylint: disable=protected-access
|
||||
assert oauth_request_for_auth_code.claims == {"acr": "eidas1"}
|
||||
|
||||
|
||||
def test_proconnect_create_authorization_code_without_eidas1(
|
||||
oauth_request_for_auth_code,
|
||||
):
|
||||
"""Test creating authorization code without eidas1 acr value."""
|
||||
validator = ProConnectValidator()
|
||||
oauth_request_for_auth_code.acr_values = "other_value"
|
||||
code = {"code": "test_code"} # OAuth2 provider expects a dict with 'code' key
|
||||
validator._create_authorization_code(oauth_request_for_auth_code, code) # pylint: disable=protected-access
|
||||
assert not oauth_request_for_auth_code.claims
|
||||
83
src/backend/mailbox_oauth2/tests/test_views.py
Normal file
83
src/backend/mailbox_oauth2/tests/test_views.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""Tests for the mailbox_oauth2.views module."""
|
||||
|
||||
import datetime
|
||||
|
||||
from django.utils import timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from core import factories as core_factories
|
||||
|
||||
from mailbox_manager import factories
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
def test_login_view_options(client):
|
||||
"""Test the OPTIONS method on the login view."""
|
||||
response = client.options("/api/v1.0/login/")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.headers == {
|
||||
"Content-Type": "application/json",
|
||||
"Vary": "Accept, Authorization, origin, Accept-Language, Cookie",
|
||||
"Allow": "POST, OPTIONS",
|
||||
"Content-Length": "209",
|
||||
"X-Frame-Options": "DENY",
|
||||
"Content-Language": "en-us",
|
||||
"X-Content-Type-Options": "nosniff",
|
||||
"Referrer-Policy": "same-origin",
|
||||
"Cross-Origin-Opener-Policy": "same-origin",
|
||||
}
|
||||
|
||||
|
||||
def test_login_view_authorize(client):
|
||||
"""Test the login view with valid data."""
|
||||
organization = core_factories.OrganizationFactory(with_registration_id=True)
|
||||
domain = factories.MailDomainEnabledFactory(
|
||||
organization=organization, name="example.com"
|
||||
)
|
||||
factories.MailboxEnabledFactory(domain=domain, local_part="user")
|
||||
|
||||
response = client.post(
|
||||
"/api/v1.0/login/", {"email": "user@example.com", "password": "password"}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
# assert the user has a session
|
||||
assert not client.session.is_empty()
|
||||
assert client.session.get_expiry_date() < timezone.now() + datetime.timedelta(
|
||||
minutes=1
|
||||
)
|
||||
|
||||
assert response.headers == {
|
||||
"Content-Type": "application/json",
|
||||
"Vary": "Accept, Authorization, Cookie, origin, Accept-Language",
|
||||
"Allow": "POST, OPTIONS",
|
||||
"Content-Length": "36",
|
||||
"X-Frame-Options": "DENY",
|
||||
"Content-Language": "en-us",
|
||||
"X-Content-Type-Options": "nosniff",
|
||||
"Referrer-Policy": "same-origin",
|
||||
"Cross-Origin-Opener-Policy": "same-origin",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"email, password, status_code, response_json",
|
||||
[
|
||||
("", "password", 400, {"email": ["This field may not be blank."]}),
|
||||
(
|
||||
"email@test.com",
|
||||
"",
|
||||
400,
|
||||
{"password": ["This field may not be blank."]},
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_login_view_invalid_data(client, email, password, status_code, response_json):
|
||||
"""Test the login view with invalid data."""
|
||||
response = client.post("/api/v1.0/login/", {"email": email, "password": password})
|
||||
|
||||
assert response.status_code == status_code
|
||||
assert response.json() == response_json
|
||||
9
src/backend/mailbox_oauth2/urls.py
Normal file
9
src/backend/mailbox_oauth2/urls.py
Normal file
@@ -0,0 +1,9 @@
|
||||
"""URLs for the mailbox_oauth2 app."""
|
||||
|
||||
from django.urls import path
|
||||
|
||||
from .views import LoginView
|
||||
|
||||
urlpatterns = [
|
||||
path("login/", LoginView.as_view(), name="api_login"),
|
||||
]
|
||||
180
src/backend/mailbox_oauth2/validators.py
Normal file
180
src/backend/mailbox_oauth2/validators.py
Normal file
@@ -0,0 +1,180 @@
|
||||
"""
|
||||
Module for OIDC authentication.
|
||||
|
||||
Contains all related code for OIDC authentication using
|
||||
people as an Identity Provider.
|
||||
"""
|
||||
|
||||
from oauth2_provider.oauth2_validators import OAuth2Validator
|
||||
|
||||
|
||||
class BaseValidator(OAuth2Validator):
|
||||
"""This validator adds additional claims to the token based on the requested scopes."""
|
||||
|
||||
def get_additional_claims(self, request):
|
||||
"""
|
||||
Generate additional claims to be included in the token.
|
||||
Warning, here the request.user is a Mailbox object.
|
||||
|
||||
Args:
|
||||
request: The OAuth2 request object containing user and scope information.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary of additional claims to be included in the token.
|
||||
"""
|
||||
additional_claims = super().get_additional_claims(request)
|
||||
|
||||
# Enforce the use of the sub instead of the user pk as sub
|
||||
additional_claims["sub"] = str(request.user.pk)
|
||||
|
||||
# Authentication method reference
|
||||
additional_claims["amr"] = "pwd"
|
||||
|
||||
# Include the user's email if 'email' scope is requested
|
||||
if "email" in request.scopes:
|
||||
additional_claims["email"] = request.user.get_email()
|
||||
|
||||
return additional_claims
|
||||
|
||||
def introspect_token(self, token, token_type_hint, request, *args, **kwargs):
|
||||
"""Introspect an access or refresh token.
|
||||
|
||||
Called once the introspect request is validated. This method should
|
||||
verify the *token* and either return a dictionary with the list of
|
||||
claims associated, or `None` in case the token is unknown.
|
||||
|
||||
Below the list of registered claims you should be interested in:
|
||||
|
||||
- scope : space-separated list of scopes
|
||||
- client_id : client identifier
|
||||
- username : human-readable identifier for the resource owner
|
||||
- token_type : type of the token
|
||||
- exp : integer timestamp indicating when this token will expire
|
||||
- iat : integer timestamp indicating when this token was issued
|
||||
- nbf : integer timestamp indicating when it can be "not-before" used
|
||||
- sub : subject of the token - identifier of the resource owner
|
||||
- aud : list of string identifiers representing the intended audience
|
||||
- iss : string representing issuer of this token
|
||||
- jti : string identifier for the token
|
||||
|
||||
Note that most of them are coming directly from JWT RFC. More details
|
||||
can be found in `Introspect Claims`_ or `JWT Claims`_.
|
||||
|
||||
The implementation can use *token_type_hint* to improve lookup
|
||||
efficiency, but must fallback to other types to be compliant with RFC.
|
||||
|
||||
The dict of claims is added to request.token after this method.
|
||||
"""
|
||||
raise RuntimeError("Introspection not implemented")
|
||||
|
||||
def validate_silent_authorization(self, request):
|
||||
"""Ensure the logged in user has authorized silent OpenID authorization.
|
||||
|
||||
Silent OpenID authorization allows access tokens and id tokens to be
|
||||
granted to clients without any user prompt or interaction.
|
||||
|
||||
:param request: OAuthlib request.
|
||||
:type request: oauthlib.common.Request
|
||||
:rtype: True or False
|
||||
|
||||
Method is used by:
|
||||
- OpenIDConnectAuthCode
|
||||
- OpenIDConnectImplicit
|
||||
- OpenIDConnectHybrid
|
||||
"""
|
||||
return request.user.is_authenticated
|
||||
|
||||
def validate_silent_login(self, request):
|
||||
"""Ensure session user has authorized silent OpenID login.
|
||||
|
||||
If no user is logged in or has not authorized silent login, this
|
||||
method should return False.
|
||||
|
||||
If the user is logged in but associated with multiple accounts and
|
||||
not selected which one to link to the token then this method should
|
||||
raise an oauthlib.oauth2.AccountSelectionRequired error.
|
||||
|
||||
:param request: OAuthlib request.
|
||||
:type request: oauthlib.common.Request
|
||||
:rtype: True or False
|
||||
|
||||
Method is used by:
|
||||
- OpenIDConnectAuthCode
|
||||
- OpenIDConnectImplicit
|
||||
- OpenIDConnectHybrid
|
||||
"""
|
||||
return request.user.is_authenticated
|
||||
|
||||
|
||||
class ProConnectValidator(BaseValidator):
|
||||
"""
|
||||
This validator adds additional claims to be compatible with
|
||||
the french ProConnect API, but not only.
|
||||
"""
|
||||
|
||||
oidc_claim_scope = OAuth2Validator.oidc_claim_scope | {
|
||||
"given_name": "given_name",
|
||||
"usual_name": "usual_name",
|
||||
"siret": "profile",
|
||||
}
|
||||
|
||||
def get_additional_claims(self, request):
|
||||
"""
|
||||
Generate additional claims to be included in the token.
|
||||
|
||||
Args:
|
||||
request: The OAuth2 request object containing user and scope information.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary of additional claims to be included in the token.
|
||||
"""
|
||||
additional_claims = super().get_additional_claims(request)
|
||||
|
||||
# Include the user's name if 'profile' scope is requested
|
||||
if "given_name" in request.scopes:
|
||||
additional_claims["given_name"] = request.user.first_name
|
||||
|
||||
if "usual_name" in request.scopes:
|
||||
additional_claims["usual_name"] = request.user.last_name
|
||||
|
||||
if "siret" in request.scopes:
|
||||
# The following line will fail on purpose if we don't have the proper information
|
||||
additional_claims["siret"] = (
|
||||
request.user.domain.organization.registration_id_list[0]
|
||||
)
|
||||
|
||||
# Include 'acr' claim if it is present in the request claims and equals 'eidas1'
|
||||
# see _create_authorization_code method for more details
|
||||
if request.claims and request.claims.get("acr") == "eidas1":
|
||||
additional_claims["acr"] = "eidas1"
|
||||
|
||||
return additional_claims
|
||||
|
||||
def _create_authorization_code(self, request, code, expires=None):
|
||||
"""
|
||||
Create an authorization code and handle 'acr_values' in the request.
|
||||
|
||||
Args:
|
||||
request: The OAuth2 request object containing user and scope information.
|
||||
code: The authorization code to be created.
|
||||
expires: The expiration time of the authorization code.
|
||||
|
||||
Returns:
|
||||
The created authorization code.
|
||||
"""
|
||||
# Split and strip 'acr_values' from the request, if present
|
||||
acr_values = (
|
||||
[value.strip() for value in request.acr_values.split(",")]
|
||||
if request.acr_values
|
||||
else []
|
||||
)
|
||||
|
||||
# If 'eidas1' is in 'acr_values', add 'acr' claim to the request claims
|
||||
# This allows the token to have this information and pass it to the /token
|
||||
# endpoint and return it in the token response
|
||||
if "eidas1" in acr_values:
|
||||
request.claims = request.claims or {}
|
||||
request.claims["acr"] = "eidas1"
|
||||
|
||||
# Call the superclass method to create the authorization code
|
||||
return super()._create_authorization_code(request, code, expires)
|
||||
43
src/backend/mailbox_oauth2/views.py
Normal file
43
src/backend/mailbox_oauth2/views.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""Views for handling OAuth2 authentication via API."""
|
||||
|
||||
import datetime
|
||||
|
||||
from django.contrib.auth import login
|
||||
|
||||
from rest_framework.permissions import AllowAny
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.status import HTTP_401_UNAUTHORIZED
|
||||
from rest_framework.views import APIView
|
||||
|
||||
from .serializers import LoginSerializer
|
||||
|
||||
|
||||
class LoginView(APIView):
|
||||
"""Login view to allow users to authenticate and create a session from the frontend."""
|
||||
|
||||
permission_classes = [AllowAny]
|
||||
|
||||
def post(self, request):
|
||||
"""
|
||||
Authenticate user and create session.
|
||||
"""
|
||||
serializer = LoginSerializer(data=request.data, context={"request": request})
|
||||
serializer.is_valid(raise_exception=True)
|
||||
|
||||
# User is None if the credentials are invalid
|
||||
user = serializer.validated_data["user"]
|
||||
|
||||
if user is not None:
|
||||
login(request, user)
|
||||
# In this context we need a session long enough to allow the user to
|
||||
# authenticate to make the OIDC loop. A minute should be enough.
|
||||
# Even if the session is longer, the one_time_email_authenticated_session
|
||||
# middleware will kill the session as soon as the user tries to access
|
||||
# paths outside the OIDC process or when the OIDC process is done here.
|
||||
request.session.set_expiry(datetime.timedelta(minutes=1))
|
||||
return Response({"message": "Successfully logged in"})
|
||||
|
||||
return Response(
|
||||
{"error": "Invalid credentials"},
|
||||
status=HTTP_401_UNAUTHORIZED,
|
||||
)
|
||||
Reference in New Issue
Block a user