➕(backend) add django-lasuite dependency
Use the OIDC backend from the `django-lasuite` library
This commit is contained in:
committed by
aleb_the_flash
parent
51f1f0ebbf
commit
10d759bdbb
@@ -1,12 +1,13 @@
|
||||
"""Authentication Backends for the Meet core app."""
|
||||
|
||||
import contextlib
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
import requests
|
||||
from mozilla_django_oidc.auth import (
|
||||
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
|
||||
from lasuite.oidc_login.backends import (
|
||||
OIDCAuthenticationBackend as LaSuiteOIDCAuthenticationBackend,
|
||||
)
|
||||
|
||||
from core.models import User
|
||||
@@ -17,93 +18,46 @@ from core.services.marketing import (
|
||||
)
|
||||
|
||||
|
||||
class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
||||
class OIDCAuthenticationBackend(LaSuiteOIDCAuthenticationBackend):
|
||||
"""Custom OpenID Connect (OIDC) Authentication Backend.
|
||||
|
||||
This class overrides the default OIDC Authentication Backend to accommodate differences
|
||||
in the User and Identity models, and handles signed and/or encrypted UserInfo response.
|
||||
"""
|
||||
|
||||
def get_userinfo(self, access_token, id_token, payload):
|
||||
"""Return user details dictionary.
|
||||
def get_extra_claims(self, user_info):
|
||||
"""
|
||||
Return extra claims from user_info.
|
||||
|
||||
Parameters:
|
||||
- access_token (str): The access token.
|
||||
- id_token (str): The id token (unused).
|
||||
- payload (dict): The token payload (unused).
|
||||
|
||||
Note: The id_token and payload parameters are unused in this implementation,
|
||||
but were kept to preserve base method signature.
|
||||
|
||||
Note: It handles signed and/or encrypted UserInfo Response. It is required by
|
||||
Agent Connect, which follows the OIDC standard. It forces us to override the
|
||||
base method, which deal with 'application/json' response.
|
||||
Args:
|
||||
user_info (dict): The user information dictionary.
|
||||
|
||||
Returns:
|
||||
- dict: User details dictionary obtained from the OpenID Connect user endpoint.
|
||||
dict: A dictionary of extra claims.
|
||||
|
||||
"""
|
||||
|
||||
user_response = requests.get(
|
||||
self.OIDC_OP_USER_ENDPOINT,
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
verify=self.get_settings("OIDC_VERIFY_SSL", True),
|
||||
timeout=self.get_settings("OIDC_TIMEOUT", None),
|
||||
proxies=self.get_settings("OIDC_PROXY", None),
|
||||
)
|
||||
user_response.raise_for_status()
|
||||
userinfo = self.verify_token(user_response.text)
|
||||
return userinfo
|
||||
|
||||
def get_or_create_user(self, access_token, id_token, payload):
|
||||
"""Return a User based on userinfo. Get or create a new user if no user matches the Sub.
|
||||
|
||||
Parameters:
|
||||
- access_token (str): The access token.
|
||||
- id_token (str): The ID token.
|
||||
- payload (dict): The user payload.
|
||||
|
||||
Returns:
|
||||
- User: An existing or newly created User instance.
|
||||
|
||||
Raises:
|
||||
- Exception: Raised when user creation is not allowed and no existing user is found.
|
||||
"""
|
||||
|
||||
user_info = self.get_userinfo(access_token, id_token, payload)
|
||||
sub = user_info.get("sub")
|
||||
|
||||
if not sub:
|
||||
raise SuspiciousOperation(
|
||||
_("User info contained no recognizable user identification")
|
||||
)
|
||||
|
||||
email = user_info.get("email")
|
||||
user = self.get_existing_user(sub, email)
|
||||
|
||||
claims = {
|
||||
"email": email,
|
||||
return {
|
||||
# Get user's full name from OIDC fields defined in settings
|
||||
"full_name": self.compute_full_name(user_info),
|
||||
"short_name": user_info.get(settings.OIDC_USERINFO_SHORTNAME_FIELD),
|
||||
}
|
||||
if not user and self.get_settings("OIDC_CREATE_USER", True):
|
||||
user = User.objects.create(
|
||||
sub=sub,
|
||||
password="!", # noqa: S106
|
||||
**claims,
|
||||
)
|
||||
|
||||
if settings.SIGNUP_NEW_USER_TO_MARKETING_EMAIL:
|
||||
self.signup_to_marketing_email(email)
|
||||
def post_get_or_create_user(self, user, claims, is_new_user):
|
||||
"""
|
||||
Post-processing after user creation or retrieval.
|
||||
|
||||
elif not user:
|
||||
return None
|
||||
Args:
|
||||
user (User): The user instance.
|
||||
claims (dict): The claims dictionary.
|
||||
is_new_user (bool): Indicates if the user was newly created.
|
||||
|
||||
if not user.is_active:
|
||||
raise SuspiciousOperation(_("User account is disabled"))
|
||||
Returns:
|
||||
- None
|
||||
|
||||
self.update_user_if_needed(user, claims)
|
||||
|
||||
return user
|
||||
"""
|
||||
email = claims["email"]
|
||||
if is_new_user and email and settings.SIGNUP_NEW_USER_TO_MARKETING_EMAIL:
|
||||
self.signup_to_marketing_email(email)
|
||||
|
||||
@staticmethod
|
||||
def signup_to_marketing_email(email):
|
||||
@@ -116,7 +70,9 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
||||
|
||||
Note: For a more robust solution, consider using Async task processing (Celery/Django-Q)
|
||||
"""
|
||||
try:
|
||||
with contextlib.suppress(
|
||||
ContactCreationError, ImproperlyConfigured, ImportError
|
||||
):
|
||||
marketing_service = get_marketing_service()
|
||||
contact_data = ContactData(
|
||||
email=email, attributes={"VISIO_SOURCE": ["SIGNIN"]}
|
||||
@@ -124,8 +80,6 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
||||
marketing_service.create_contact(
|
||||
contact_data, timeout=settings.BREVO_API_TIMEOUT
|
||||
)
|
||||
except (ContactCreationError, ImproperlyConfigured, ImportError):
|
||||
pass
|
||||
|
||||
def get_existing_user(self, sub, email):
|
||||
"""Fetch existing user by sub or email."""
|
||||
@@ -142,32 +96,3 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
|
||||
_("Multiple user accounts share a common email.")
|
||||
) from e
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def compute_full_name(user_info):
|
||||
"""Compute user's full name based on OIDC fields in settings."""
|
||||
full_name = " ".join(
|
||||
filter(
|
||||
None,
|
||||
(
|
||||
user_info.get(field)
|
||||
for field in settings.OIDC_USERINFO_FULLNAME_FIELDS
|
||||
),
|
||||
)
|
||||
)
|
||||
return full_name or None
|
||||
|
||||
@staticmethod
|
||||
def update_user_if_needed(user, claims):
|
||||
"""Update user claims if they have changed."""
|
||||
user_fields = vars(user.__class__) # Get available model fields
|
||||
updated_claims = {
|
||||
key: value
|
||||
for key, value in claims.items()
|
||||
if value and key in user_fields and value != getattr(user, key)
|
||||
}
|
||||
|
||||
if not updated_claims:
|
||||
return
|
||||
|
||||
User.objects.filter(sub=user.sub).update(**updated_claims)
|
||||
|
||||
Reference in New Issue
Block a user