➕(backend) add django-lasuite dependency
Use the OIDC backend from the new library and add settings to setup OIDC token storage required for later calls to OIDC Resource Servers.
This commit is contained in:
@@ -1,130 +1,59 @@
|
||||
"""Authentication Backends for the Impress core app."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import SuspiciousOperation
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
import requests
|
||||
from mozilla_django_oidc.auth import (
|
||||
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
|
||||
from lasuite.oidc_login.backends import (
|
||||
OIDCAuthenticationBackend as LaSuiteOIDCAuthenticationBackend,
|
||||
)
|
||||
|
||||
from core.models import DuplicateEmailError, User
|
||||
from core.models import DuplicateEmailError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Settings renamed warnings
|
||||
if os.environ.get("USER_OIDC_FIELDS_TO_FULLNAME"):
|
||||
logger.warning(
|
||||
"USER_OIDC_FIELDS_TO_FULLNAME has been renamed to "
|
||||
"OIDC_USERINFO_FULLNAME_FIELDS please update your settings."
|
||||
)
|
||||
|
||||
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
||||
if os.environ.get("USER_OIDC_FIELD_TO_SHORTNAME"):
|
||||
logger.warning(
|
||||
"USER_OIDC_FIELD_TO_SHORTNAME has been renamed to "
|
||||
"OIDC_USERINFO_SHORTNAME_FIELD please update your settings."
|
||||
)
|
||||
|
||||
|
||||
class OIDCAuthenticationBackend(LaSuiteOIDCAuthenticationBackend):
|
||||
"""Custom OpenID Connect (OIDC) Authentication Backend.
|
||||
|
||||
This class overrides the default OIDC Authentication Backend to accommodate differences
|
||||
in the User and Identity models, and handles signed and/or encrypted UserInfo response.
|
||||
"""
|
||||
|
||||
def get_userinfo(self, access_token, id_token, payload):
|
||||
"""Return user details dictionary.
|
||||
def get_extra_claims(self, user_info):
|
||||
"""
|
||||
Return extra claims from user_info.
|
||||
|
||||
Parameters:
|
||||
- access_token (str): The access token.
|
||||
- id_token (str): The id token (unused).
|
||||
- payload (dict): The token payload (unused).
|
||||
|
||||
Note: The id_token and payload parameters are unused in this implementation,
|
||||
but were kept to preserve base method signature.
|
||||
|
||||
Note: It handles signed and/or encrypted UserInfo Response. It is required by
|
||||
Agent Connect, which follows the OIDC standard. It forces us to override the
|
||||
base method, which deal with 'application/json' response.
|
||||
Args:
|
||||
user_info (dict): The user information dictionary.
|
||||
|
||||
Returns:
|
||||
- dict: User details dictionary obtained from the OpenID Connect user endpoint.
|
||||
dict: A dictionary of extra claims.
|
||||
"""
|
||||
|
||||
user_response = requests.get(
|
||||
self.OIDC_OP_USER_ENDPOINT,
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
verify=self.get_settings("OIDC_VERIFY_SSL", True),
|
||||
timeout=self.get_settings("OIDC_TIMEOUT", None),
|
||||
proxies=self.get_settings("OIDC_PROXY", None),
|
||||
)
|
||||
user_response.raise_for_status()
|
||||
|
||||
try:
|
||||
userinfo = user_response.json()
|
||||
except ValueError:
|
||||
try:
|
||||
userinfo = self.verify_token(user_response.text)
|
||||
except Exception as e:
|
||||
raise SuspiciousOperation(
|
||||
_("Invalid response format or token verification failed")
|
||||
) from e
|
||||
|
||||
return userinfo
|
||||
|
||||
def verify_claims(self, claims):
|
||||
"""
|
||||
Verify the presence of essential claims and the "sub" (which is mandatory as defined
|
||||
by the OIDC specification) to decide if authentication should be allowed.
|
||||
"""
|
||||
essential_claims = settings.USER_OIDC_ESSENTIAL_CLAIMS
|
||||
missing_claims = [claim for claim in essential_claims if claim not in claims]
|
||||
|
||||
if missing_claims:
|
||||
logger.error("Missing essential claims: %s", missing_claims)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def get_or_create_user(self, access_token, id_token, payload):
|
||||
"""Return a User based on userinfo. Create a new user if no match is found."""
|
||||
|
||||
user_info = self.get_userinfo(access_token, id_token, payload)
|
||||
|
||||
if not self.verify_claims(user_info):
|
||||
raise SuspiciousOperation("Claims verification failed.")
|
||||
|
||||
sub = user_info["sub"]
|
||||
email = user_info.get("email")
|
||||
|
||||
# Get user's full name from OIDC fields defined in settings
|
||||
full_name = self.compute_full_name(user_info)
|
||||
short_name = user_info.get(settings.USER_OIDC_FIELD_TO_SHORTNAME)
|
||||
|
||||
claims = {
|
||||
"email": email,
|
||||
"full_name": full_name,
|
||||
"short_name": short_name,
|
||||
return {
|
||||
"full_name": self.compute_full_name(user_info),
|
||||
"short_name": user_info.get(settings.OIDC_USERINFO_SHORTNAME_FIELD),
|
||||
}
|
||||
|
||||
def get_existing_user(self, sub, email):
|
||||
"""Fetch existing user by sub or email."""
|
||||
|
||||
try:
|
||||
user = User.objects.get_user_by_sub_or_email(sub, email)
|
||||
return self.UserModel.objects.get_user_by_sub_or_email(sub, email)
|
||||
except DuplicateEmailError as err:
|
||||
raise SuspiciousOperation(err.message) from err
|
||||
|
||||
if user:
|
||||
if not user.is_active:
|
||||
raise SuspiciousOperation(_("User account is disabled"))
|
||||
self.update_user_if_needed(user, claims)
|
||||
elif self.get_settings("OIDC_CREATE_USER", True):
|
||||
user = User.objects.create(sub=sub, password="!", **claims) # noqa: S106
|
||||
|
||||
return user
|
||||
|
||||
def compute_full_name(self, user_info):
|
||||
"""Compute user's full name based on OIDC fields in settings."""
|
||||
name_fields = settings.USER_OIDC_FIELDS_TO_FULLNAME
|
||||
full_name = " ".join(
|
||||
user_info[field] for field in name_fields if user_info.get(field)
|
||||
)
|
||||
return full_name or None
|
||||
|
||||
def update_user_if_needed(self, user, claims):
|
||||
"""Update user claims if they have changed."""
|
||||
has_changed = any(
|
||||
value and value != getattr(user, key) for key, value in claims.items()
|
||||
)
|
||||
if has_changed:
|
||||
updated_claims = {key: value for key, value in claims.items() if value}
|
||||
self.UserModel.objects.filter(id=user.id).update(**updated_claims)
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
"""Authentication URLs for the People core app."""
|
||||
|
||||
from django.urls import path
|
||||
|
||||
from mozilla_django_oidc.urls import urlpatterns as mozzila_oidc_urls
|
||||
|
||||
from .views import OIDCLogoutCallbackView, OIDCLogoutView
|
||||
|
||||
urlpatterns = [
|
||||
# Override the default 'logout/' path from Mozilla Django OIDC with our custom view.
|
||||
path("logout/", OIDCLogoutView.as_view(), name="oidc_logout_custom"),
|
||||
path(
|
||||
"logout-callback/",
|
||||
OIDCLogoutCallbackView.as_view(),
|
||||
name="oidc_logout_callback",
|
||||
),
|
||||
*mozzila_oidc_urls,
|
||||
]
|
||||
@@ -1,137 +0,0 @@
|
||||
"""Authentication Views for the People core app."""
|
||||
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from django.contrib import auth
|
||||
from django.core.exceptions import SuspiciousOperation
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.urls import reverse
|
||||
from django.utils import crypto
|
||||
|
||||
from mozilla_django_oidc.utils import (
|
||||
absolutify,
|
||||
)
|
||||
from mozilla_django_oidc.views import (
|
||||
OIDCLogoutView as MozillaOIDCOIDCLogoutView,
|
||||
)
|
||||
|
||||
|
||||
class OIDCLogoutView(MozillaOIDCOIDCLogoutView):
|
||||
"""Custom logout view for handling OpenID Connect (OIDC) logout flow.
|
||||
|
||||
Adds support for handling logout callbacks from the identity provider (OP)
|
||||
by initiating the logout flow if the user has an active session.
|
||||
|
||||
The Django session is retained during the logout process to persist the 'state' OIDC parameter.
|
||||
This parameter is crucial for maintaining the integrity of the logout flow between this call
|
||||
and the subsequent callback.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def persist_state(request, state):
|
||||
"""Persist the given 'state' parameter in the session's 'oidc_states' dictionary
|
||||
|
||||
This method is used to store the OIDC state parameter in the session, according to the
|
||||
structure expected by Mozilla Django OIDC's 'add_state_and_verifier_and_nonce_to_session'
|
||||
utility function.
|
||||
"""
|
||||
|
||||
if "oidc_states" not in request.session or not isinstance(
|
||||
request.session["oidc_states"], dict
|
||||
):
|
||||
request.session["oidc_states"] = {}
|
||||
|
||||
request.session["oidc_states"][state] = {}
|
||||
request.session.save()
|
||||
|
||||
def construct_oidc_logout_url(self, request):
|
||||
"""Create the redirect URL for interfacing with the OIDC provider.
|
||||
|
||||
Retrieves the necessary parameters from the session and constructs the URL
|
||||
required to initiate logout with the OpenID Connect provider.
|
||||
|
||||
If no ID token is found in the session, the logout flow will not be initiated,
|
||||
and the method will return the default redirect URL.
|
||||
|
||||
The 'state' parameter is generated randomly and persisted in the session to ensure
|
||||
its integrity during the subsequent callback.
|
||||
"""
|
||||
|
||||
oidc_logout_endpoint = self.get_settings("OIDC_OP_LOGOUT_ENDPOINT")
|
||||
|
||||
if not oidc_logout_endpoint:
|
||||
return self.redirect_url
|
||||
|
||||
reverse_url = reverse("oidc_logout_callback")
|
||||
id_token = request.session.get("oidc_id_token", None)
|
||||
|
||||
if not id_token:
|
||||
return self.redirect_url
|
||||
|
||||
query = {
|
||||
"id_token_hint": id_token,
|
||||
"state": crypto.get_random_string(self.get_settings("OIDC_STATE_SIZE", 32)),
|
||||
"post_logout_redirect_uri": absolutify(request, reverse_url),
|
||||
}
|
||||
|
||||
self.persist_state(request, query["state"])
|
||||
|
||||
return f"{oidc_logout_endpoint}?{urlencode(query)}"
|
||||
|
||||
def post(self, request):
|
||||
"""Handle user logout.
|
||||
|
||||
If the user is not authenticated, redirects to the default logout URL.
|
||||
Otherwise, constructs the OIDC logout URL and redirects the user to start
|
||||
the logout process.
|
||||
|
||||
If the user is redirected to the default logout URL, ensure her Django session
|
||||
is terminated.
|
||||
"""
|
||||
|
||||
logout_url = self.redirect_url
|
||||
|
||||
if request.user.is_authenticated:
|
||||
logout_url = self.construct_oidc_logout_url(request)
|
||||
|
||||
# If the user is not redirected to the OIDC provider, ensure logout
|
||||
if logout_url == self.redirect_url:
|
||||
auth.logout(request)
|
||||
|
||||
return HttpResponseRedirect(logout_url)
|
||||
|
||||
|
||||
class OIDCLogoutCallbackView(MozillaOIDCOIDCLogoutView):
|
||||
"""Custom view for handling the logout callback from the OpenID Connect (OIDC) provider.
|
||||
|
||||
Handles the callback after logout from the identity provider (OP).
|
||||
Verifies the state parameter and performs necessary logout actions.
|
||||
|
||||
The Django session is maintained during the logout process to ensure the integrity
|
||||
of the logout flow initiated in the previous step.
|
||||
"""
|
||||
|
||||
http_method_names = ["get"]
|
||||
|
||||
def get(self, request):
|
||||
"""Handle the logout callback.
|
||||
|
||||
If the user is not authenticated, redirects to the default logout URL.
|
||||
Otherwise, verifies the state parameter and performs necessary logout actions.
|
||||
"""
|
||||
|
||||
if not request.user.is_authenticated:
|
||||
return HttpResponseRedirect(self.redirect_url)
|
||||
|
||||
state = request.GET.get("state")
|
||||
|
||||
if state not in request.session.get("oidc_states", {}):
|
||||
msg = "OIDC callback state not found in session `oidc_states`!"
|
||||
raise SuspiciousOperation(msg)
|
||||
|
||||
del request.session["oidc_states"][state]
|
||||
request.session.save()
|
||||
|
||||
auth.logout(request)
|
||||
|
||||
return HttpResponseRedirect(self.redirect_url)
|
||||
Reference in New Issue
Block a user