(backend) support Authorization code flow

Integrate 'mozilla-django-oidc' dependency, to support
Authorization Code flow, which is required by Agent Connect.

Thus, we provide a secure back channel OIDC flow, and return
to the client only a session cookie.

Done:
- Replace JWT authentication by Session based authentication in DRF
- Update Django settings to make OIDC configurations easily editable
- Add 'mozilla-django-oidc' routes to our router
- Implement a custom Django Authentication class to adapt
'mozilla-django-oidc' to our needs

'mozilla-django-oidc' routes added are:
- /authenticate
- /callback (the redirect_uri called back by the Idp)
- /logout
This commit is contained in:
Lebaud Antoine
2024-02-15 11:00:30 +01:00
committed by aleb_the_flash
parent ec28c28d47
commit 38c4d33791
11 changed files with 335 additions and 250 deletions

View File

@@ -1,14 +0,0 @@
"""
Utils that can be useful throughout the People core app
"""
from rest_framework_simplejwt.tokens import RefreshToken
def get_tokens_for_user(user):
"""Get JWT tokens for user authentication."""
refresh = RefreshToken.for_user(user)
return {
"refresh": str(refresh),
"access": str(refresh.access_token),
}

View File

@@ -1,59 +1,107 @@
"""Authentication for the People core app."""
from django.conf import settings
from django.utils.functional import SimpleLazyObject
from django.utils.module_loading import import_string
from django.core.exceptions import SuspiciousOperation
from django.db import models
from django.utils.translation import gettext_lazy as _
from drf_spectacular.authentication import SessionScheme, TokenScheme
from drf_spectacular.plumbing import build_bearer_security_scheme_object
from rest_framework import authentication
from rest_framework_simplejwt.authentication import JWTAuthentication
import requests
from mozilla_django_oidc.auth import (
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
)
from .models import Identity
class DelegatedJWTAuthentication(JWTAuthentication):
"""Override JWTAuthentication to create missing users on the fly."""
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
"""Custom OpenID Connect (OIDC) Authentication Backend.
def get_user(self, validated_token):
This class overrides the default OIDC Authentication Backend to accommodate differences
in the User and Identity models, and handles signed and/or encrypted UserInfo response.
"""
def get_userinfo(self, access_token, id_token, payload):
"""Return user details dictionary.
Parameters:
- access_token (str): The access token.
- id_token (str): The id token (unused).
- payload (dict): The token payload (unused).
Note: The id_token and payload parameters are unused in this implementation,
but were kept to preserve base method signature.
Note: It handles signed and/or encrypted UserInfo Response. It is required by
Agent Connect, which follows the OIDC standard. It forces us to override the
base method, which deal with 'application/json' response.
Returns:
- dict: User details dictionary obtained from the OpenID Connect user endpoint.
"""
Return the user related to the given validated token, creating or updating it if necessary.
user_response = requests.get(
self.OIDC_OP_USER_ENDPOINT,
headers={"Authorization": f"Bearer {access_token}"},
verify=self.get_settings("OIDC_VERIFY_SSL", True),
timeout=self.get_settings("OIDC_TIMEOUT", None),
proxies=self.get_settings("OIDC_PROXY", None),
)
user_response.raise_for_status()
userinfo = self.verify_token(user_response.text)
return userinfo
def get_or_create_user(self, access_token, id_token, payload):
"""Return a User based on userinfo. Get or create a new user if no user matches the Sub.
Parameters:
- access_token (str): The access token.
- id_token (str): The ID token.
- payload (dict): The user payload.
Returns:
- User: An existing or newly created User instance.
Raises:
- Exception: Raised when user creation is not allowed and no existing user is found.
"""
get_user = import_string(settings.JWT_USER_GETTER)
return SimpleLazyObject(lambda: get_user(validated_token))
user_info = self.get_userinfo(access_token, id_token, payload)
class OpenApiJWTAuthenticationExtension(TokenScheme):
"""Extension for specifying JWT authentication schemes."""
email = user_info.get("email")
sub = user_info.get("sub")
target_class = "core.authentication.DelegatedJWTAuthentication"
name = "DelegatedJWTAuthentication"
if sub is None:
raise SuspiciousOperation(
_("User info contained no recognizable user identification")
)
def get_security_definition(self, auto_schema):
"""Return the security definition for JWT authentication."""
return build_bearer_security_scheme_object(
header_name="Authorization",
token_prefix="Bearer", # noqa S106
user = (
self.UserModel.objects.filter(identities__sub=sub)
.annotate(identity_email=models.F("identities__email"))
.distinct()
.first()
)
if user:
if email and email != user.identity_email:
Identity.objects.filter(sub=sub).update(email=email)
class SessionAuthenticationWithAuthenticateHeader(authentication.SessionAuthentication):
"""
This class is needed, because REST Framework's default SessionAuthentication does
never return 401's, because they cannot fill the WWW-Authenticate header with a
valid value in the 401 response. As a result, we cannot distinguish calls that are
not unauthorized (401 unauthorized) and calls for which the user does not have
permission (403 forbidden).
See https://github.com/encode/django-rest-framework/issues/5968
elif self.get_settings("OIDC_CREATE_USER", True):
user = self.create_user(user_info)
We do set authenticate_header function in SessionAuthentication, so that a value
for the WWW-Authenticate header can be retrieved and the response code is
automatically set to 401 in case of unauthenticated requests.
"""
return user
def authenticate_header(self, request):
return "Session"
def create_user(self, claims):
"""Return a newly created User instance."""
email = claims.get("email")
sub = claims.get("sub")
class OpenApiSessionAuthenticationExtension(SessionScheme):
"""Extension for specifying session authentication schemes."""
if sub is None:
raise SuspiciousOperation(
_("Claims contained no recognizable user identification")
)
target_class = "core.api.authentication.SessionAuthenticationWithAuthenticateHeader"
user = self.UserModel.objects.create(password="!", email=email) # noqa: S106
Identity.objects.create(user=user, sub=sub, email=email)
return user

View File

@@ -17,8 +17,6 @@ from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
import jsonschema
from rest_framework_simplejwt.exceptions import InvalidToken
from rest_framework_simplejwt.settings import api_settings
from timezone_field import TimeZoneField
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -471,40 +469,3 @@ class Invitation(BaseModel):
"""Calculate if invitation is still valid or has expired."""
validity_duration = timedelta(seconds=settings.INVITATION_VALIDITY_DURATION)
return timezone.now() > (self.created_at + validity_duration)
def oidc_user_getter(validated_token):
"""
Given a valid OIDC token , retrieve, create or update corresponding user/contact/email from db.
The token is expected to have the following fields in payload:
- sub
- email
- ...
"""
try:
user_id = validated_token[api_settings.USER_ID_CLAIM]
except KeyError as exc:
raise InvalidToken(
_("Token contained no recognizable user identification")
) from exc
try:
email_param = {"email": validated_token["email"]}
except KeyError:
email_param = {}
user = (
User.objects.filter(identities__sub=user_id)
.annotate(identity_email=models.F("identities__email"))
.distinct()
.first()
)
if user is None:
user = User.objects.create(password="!", **email_param) # noqa: S106
Identity.objects.create(user=user, sub=user_id, **email_param)
elif email_param and validated_token["email"] != user.identity_email:
Identity.objects.filter(sub=user_id).update(email=validated_token["email"])
return user

View File

@@ -0,0 +1,164 @@
"""Unit tests for the `get_or_create_user` function."""
from django.core.exceptions import SuspiciousOperation
import pytest
from core import models
from core.authentication import OIDCAuthenticationBackend
from core.factories import IdentityFactory
pytestmark = pytest.mark.django_db
def test_authentication_getter_existing_user_no_email(
django_assert_num_queries, monkeypatch
):
"""
If an existing user matches the user's info sub, the user should be returned.
"""
klass = OIDCAuthenticationBackend()
# Create a user and its identity
identity = IdentityFactory()
# Create multiple identities for a user
for _ in range(5):
IdentityFactory(user=identity.user)
def get_userinfo_mocked(*args):
return {"sub": identity.sub}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
with django_assert_num_queries(1):
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
identity.refresh_from_db()
assert user == identity.user
def test_authentication_getter_existing_user_with_email(
django_assert_num_queries, monkeypatch
):
"""
When the user's info contains an email and targets an existing user,
it should update the email on the identity but not on the user.
"""
klass = OIDCAuthenticationBackend()
identity = IdentityFactory()
# Create multiple identities for a user
for _ in range(5):
IdentityFactory(user=identity.user)
user_email = identity.user.email
assert models.User.objects.count() == 1
def get_userinfo_mocked(*args):
return {"sub": identity.sub, "email": identity.email}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
# Only 1 query if the email has not changed
with django_assert_num_queries(1):
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
new_email = "test@fooo.com"
def get_userinfo_mocked_new_email(*args):
return {"sub": identity.sub, "email": new_email}
monkeypatch.setattr(
OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked_new_email
)
# Additional update query if the email has changed
with django_assert_num_queries(2):
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
identity.refresh_from_db()
assert identity.email == new_email
assert models.User.objects.count() == 1
assert user == identity.user
assert user.email == user_email
def test_authentication_getter_new_user_no_email(monkeypatch):
"""
If no user matches the user's info sub, a user should be created.
User's info doesn't contain an email, created user's email should be empty.
"""
klass = OIDCAuthenticationBackend()
def get_userinfo_mocked(*args):
return {"sub": "123"}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
identity = user.identities.get()
assert identity.sub == "123"
assert identity.email is None
assert user.email is None
assert user.password == "!"
assert models.User.objects.count() == 1
def test_authentication_getter_new_user_with_email(monkeypatch):
"""
If no user matches the user's info sub, a user should be created.
User's info contains an email, created user's email should be set.
"""
klass = OIDCAuthenticationBackend()
email = "people@example.com"
def get_userinfo_mocked(*args):
return {"sub": "123", "email": email}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
user = klass.get_or_create_user(
access_token="test-token", id_token=None, payload=None
)
identity = user.identities.get()
assert identity.sub == "123"
assert identity.email == email
assert user.email == email
assert models.User.objects.count() == 1
def test_models_oidc_user_getter_invalid_token(django_assert_num_queries, monkeypatch):
"""The user's info doesn't contain a sub."""
klass = OIDCAuthenticationBackend()
def get_userinfo_mocked(*args):
return {
"test": "123",
}
monkeypatch.setattr(OIDCAuthenticationBackend, "get_userinfo", get_userinfo_mocked)
with django_assert_num_queries(0), pytest.raises(
SuspiciousOperation,
match="User info contained no recognizable user identification",
):
klass.get_or_create_user(access_token="test-token", id_token=None, payload=None)
assert models.User.objects.exists() is False

View File

@@ -1,107 +0,0 @@
"""Unit tests for the `oidc_user_getter` function."""
import pytest
from rest_framework_simplejwt.exceptions import InvalidToken
from rest_framework_simplejwt.tokens import AccessToken
from core import factories, models
pytestmark = pytest.mark.django_db
def test_models_oidc_user_getter_existing_user_no_email(django_assert_num_queries):
"""
When a valid token is passed, an existing user matching the token's sub should be returned.
"""
identity = factories.IdentityFactory()
factories.IdentityFactory(user=identity.user) # another identity for the user
token = AccessToken()
token["sub"] = str(identity.sub)
with django_assert_num_queries(1):
user = models.oidc_user_getter(token)
identity.refresh_from_db()
assert user == identity.user
def test_models_oidc_user_getter_existing_user_with_email(django_assert_num_queries):
"""
When the valid token passed contains an email and targets an existing user,
it should update the email on the identity but not on the user.
"""
identity = factories.IdentityFactory()
factories.IdentityFactory(user=identity.user) # another identity for the user
user_email = identity.user.email
assert models.User.objects.count() == 1
token = AccessToken()
token["sub"] = str(identity.sub)
# Only 1 query if the email has not changed
token["email"] = identity.email
with django_assert_num_queries(1):
user = models.oidc_user_getter(token)
# Additional update query if the email has changed
new_email = "people@example.com"
token["email"] = new_email
with django_assert_num_queries(2):
user = models.oidc_user_getter(token)
identity.refresh_from_db()
assert identity.email == new_email
assert models.User.objects.count() == 1
assert user == identity.user
assert user.email == user_email
def test_models_oidc_user_getter_new_user_no_email():
"""
When a valid token is passed, a user should be created if the sub
does not match any existing user.
"""
token = AccessToken()
token["sub"] = "123"
user = models.oidc_user_getter(token)
identity = user.identities.get()
assert identity.sub == "123"
assert identity.email is None
assert user.email is None
assert user.password == "!"
assert models.User.objects.count() == 1
def test_models_oidc_user_getter_new_user_with_email():
"""
When the valid token passed contains an email and a new user is created,
the email should be set on the user and on the identity.
"""
email = "people@example.com"
token = AccessToken()
token["sub"] = "123"
token["email"] = email
user = models.oidc_user_getter(token)
identity = user.identities.get()
assert identity.sub == "123"
assert identity.email == email
assert user.email == email
assert models.User.objects.count() == 1
def test_models_oidc_user_getter_invalid_token(django_assert_num_queries):
"""The token passed in argument should contain the configured user id claim."""
token = AccessToken()
with django_assert_num_queries(0), pytest.raises(
InvalidToken, match="Token contained no recognizable user identification"
):
models.oidc_user_getter(token)
assert models.User.objects.exists() is False

View File

@@ -1,10 +0,0 @@
"""Tokens for People's core app."""
from rest_framework_simplejwt.settings import api_settings
from rest_framework_simplejwt.tokens import Token
class BearerToken(Token):
"""Bearer token as emitted by Keycloak OIDC for example."""
token_type = "Bearer" # noqa: S105
lifetime = api_settings.ACCESS_TOKEN_LIFETIME