✨(backend) add application model with secure secret handling
We need to integrate with external applications. Objective: enable them to securely generate room links with proper ownership attribution. Proposed solution: Following the OAuth2 Machine-to-Machine specification, we expose an endpoint allowing external applications to exchange a client_id and client_secret pair for a JWT. This JWT is valid only within a well-scoped, isolated external API, served through a dedicated viewset. This commit introduces a model to persist application records in the database. The main challenge lies in generating a secure client_secret and ensuring it is properly stored. The restframework-apikey dependency was discarded, as its approach diverges significantly from OAuth2. Instead, inspiration was taken from oauthlib and django-oauth-toolkit. However, their implementations proved either too heavy or not entirely suitable for the intended use case. To avoid pulling in large dependencies for minimal utility, the necessary components were selectively copied, adapted, and improved. A generic SecretField was introduced, designed for reuse and potentially suitable for upstream contribution to Django. Secrets are exposed only once at object creation time in the Django admin. Once the object is saved, the secret is immediately hashed, ensuring it can never be retrieved again. One limitation remains: enforcing client_id and client_secret as read-only during edits. At object creation, marking them read-only excluded them from the Django form, which unintentionally regenerated new values. This area requires further refinement. The design prioritizes configurability while adhering to the principle of least privilege. By default, new applications are created without any assigned scopes, preventing them from performing actions on the API until explicitly configured. If no domain is specified, domain delegation is not applied, allowing tokens to be issued for any email domain.
This commit is contained in:
committed by
aleb_the_flash
parent
c07b8f920f
commit
3fd5a4404c
@@ -1,5 +1,6 @@
|
||||
"""Admin classes and registrations for core app."""
|
||||
|
||||
from django import forms
|
||||
from django.contrib import admin
|
||||
from django.contrib.auth import admin as auth_admin
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
@@ -150,3 +151,66 @@ class RecordingAdmin(admin.ModelAdmin):
|
||||
return _("Multiple owners")
|
||||
|
||||
return str(owners[0].user)
|
||||
|
||||
|
||||
class ApplicationDomainInline(admin.TabularInline):
|
||||
"""Inline admin for managing allowed domains per application."""
|
||||
|
||||
model = models.ApplicationDomain
|
||||
extra = 0
|
||||
|
||||
|
||||
class ApplicationAdminForm(forms.ModelForm):
|
||||
"""Custom form for Application admin with multi-select scopes."""
|
||||
|
||||
scopes = forms.MultipleChoiceField(
|
||||
choices=models.ApplicationScope.choices,
|
||||
widget=forms.CheckboxSelectMultiple,
|
||||
required=False,
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
if self.instance.pk and self.instance.scopes:
|
||||
self.fields["scopes"].initial = self.instance.scopes
|
||||
|
||||
|
||||
@admin.register(models.Application)
|
||||
class ApplicationAdmin(admin.ModelAdmin):
|
||||
"""Admin interface for managing applications and their permissions."""
|
||||
|
||||
form = ApplicationAdminForm
|
||||
|
||||
list_display = ("id", "name", "client_id", "get_scopes_display")
|
||||
fields = [
|
||||
"name",
|
||||
"id",
|
||||
"created_at",
|
||||
"updated_at",
|
||||
"scopes",
|
||||
"client_id",
|
||||
"client_secret",
|
||||
]
|
||||
readonly_fields = ["id", "created_at", "updated_at"]
|
||||
inlines = [ApplicationDomainInline]
|
||||
|
||||
def get_readonly_fields(self, request, obj=None):
|
||||
"""Make client_id and client_secret readonly after creation."""
|
||||
if obj: # Editing existing object
|
||||
return self.readonly_fields + ["client_id", "client_secret"]
|
||||
return self.readonly_fields
|
||||
|
||||
def get_fields(self, request, obj=None):
|
||||
"""Hide client_secret after creation."""
|
||||
fields = super().get_fields(request, obj)
|
||||
if obj:
|
||||
return [f for f in fields if f != "client_secret"]
|
||||
return fields
|
||||
|
||||
def get_scopes_display(self, obj):
|
||||
"""Display scopes in list view."""
|
||||
if obj.scopes:
|
||||
return ", ".join(obj.scopes)
|
||||
return _("No scopes")
|
||||
|
||||
get_scopes_display.short_description = _("Scopes")
|
||||
|
||||
@@ -9,7 +9,7 @@ from django.utils.text import slugify
|
||||
import factory.fuzzy
|
||||
from faker import Faker
|
||||
|
||||
from core import models
|
||||
from core import models, utils
|
||||
|
||||
fake = Faker()
|
||||
|
||||
@@ -117,3 +117,39 @@ class TeamRecordingAccessFactory(factory.django.DjangoModelFactory):
|
||||
recording = factory.SubFactory(RecordingFactory)
|
||||
team = factory.Sequence(lambda n: f"team{n}")
|
||||
role = factory.fuzzy.FuzzyChoice(models.RoleChoices.values)
|
||||
|
||||
|
||||
class ApplicationFactory(factory.django.DjangoModelFactory):
|
||||
"""Create fake applications for testing."""
|
||||
|
||||
class Meta:
|
||||
model = models.Application
|
||||
|
||||
name = factory.Faker("company")
|
||||
active = True
|
||||
client_id = factory.LazyFunction(utils.generate_client_id)
|
||||
client_secret = factory.LazyFunction(utils.generate_client_secret)
|
||||
scopes = []
|
||||
|
||||
class Params:
|
||||
"""Factory traits for common application configurations."""
|
||||
|
||||
with_all_scopes = factory.Trait(
|
||||
scopes=[
|
||||
models.ApplicationScope.ROOMS_LIST,
|
||||
models.ApplicationScope.ROOMS_RETRIEVE,
|
||||
models.ApplicationScope.ROOMS_CREATE,
|
||||
models.ApplicationScope.ROOMS_UPDATE,
|
||||
models.ApplicationScope.ROOMS_DELETE,
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class ApplicationDomainFactory(factory.django.DjangoModelFactory):
|
||||
"""Create fake application domains for testing."""
|
||||
|
||||
class Meta:
|
||||
model = models.ApplicationDomain
|
||||
|
||||
domain = factory.Faker("domain_name")
|
||||
application = factory.SubFactory(ApplicationFactory)
|
||||
|
||||
43
src/backend/core/fields.py
Normal file
43
src/backend/core/fields.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""
|
||||
Core application fields
|
||||
"""
|
||||
|
||||
from logging import getLogger
|
||||
|
||||
from django.contrib.auth.hashers import identify_hasher, make_password
|
||||
from django.db import models
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
class SecretField(models.CharField):
|
||||
"""CharField that automatically hashes secrets before saving.
|
||||
|
||||
Use for API keys, client secrets, or tokens that should never be stored
|
||||
in plain text. Already-hashed values are preserved to prevent double-hashing.
|
||||
|
||||
Inspired by: https://github.com/django-oauth-toolkit/django-oauth-toolkit
|
||||
"""
|
||||
|
||||
def pre_save(self, model_instance, add):
|
||||
"""Hash the secret if not already hashed, otherwise preserve it."""
|
||||
|
||||
secret = getattr(model_instance, self.attname)
|
||||
|
||||
try:
|
||||
hasher = identify_hasher(secret)
|
||||
logger.debug(
|
||||
"%s: %s is already hashed with %s.",
|
||||
model_instance,
|
||||
self.attname,
|
||||
hasher,
|
||||
)
|
||||
except ValueError:
|
||||
logger.debug(
|
||||
"%s: %s is not hashed; hashing it now.", model_instance, self.attname
|
||||
)
|
||||
hashed_secret = make_password(secret)
|
||||
setattr(model_instance, self.attname, hashed_secret)
|
||||
return hashed_secret
|
||||
|
||||
return super().pre_save(model_instance, add)
|
||||
52
src/backend/core/migrations/0015_application_and_more.py
Normal file
52
src/backend/core/migrations/0015_application_and_more.py
Normal file
@@ -0,0 +1,52 @@
|
||||
# Generated by Django 5.2.6 on 2025-10-02 20:55
|
||||
|
||||
import core.utils
|
||||
import django.db.models.deletion
|
||||
import uuid
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('core', '0014_room_pin_code'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Application',
|
||||
fields=[
|
||||
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
|
||||
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
|
||||
('name', models.CharField(help_text='Descriptive name for this application.', max_length=255, verbose_name='Application name')),
|
||||
('active', models.BooleanField(default=True)),
|
||||
('client_id', models.CharField(default=core.utils.generate_client_id, max_length=100, unique=True)),
|
||||
('client_secret', core.fields.SecretField(blank=True, default=core.utils.generate_client_secret, help_text='Hashed on Save. Copy it now if this is a new secret.', max_length=255)),
|
||||
('scopes', django.contrib.postgres.fields.ArrayField(base_field=models.CharField(choices=[('rooms:create', 'Create rooms'), ('rooms:list', 'List rooms'), ('rooms:retrieve', 'Retrieve room details'), ('rooms:update', 'Update rooms'), ('rooms:delete', 'Delete rooms')], max_length=50), blank=True, default=list, size=None)),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Application',
|
||||
'verbose_name_plural': 'Applications',
|
||||
'db_table': 'meet_application',
|
||||
'ordering': ('-created_at',),
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='ApplicationDomain',
|
||||
fields=[
|
||||
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')),
|
||||
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')),
|
||||
('domain', models.CharField(help_text='Email domain this application can act on behalf of.', max_length=253, validators=[django.core.validators.DomainNameValidator(accept_idna=False, message='Enter a valid domain')], verbose_name='Domain')),
|
||||
('application', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='allowed_domains', to='core.application')),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Application domain',
|
||||
'verbose_name_plural': 'Application domains',
|
||||
'db_table': 'meet_application_domain',
|
||||
'ordering': ('domain',),
|
||||
'unique_together': {('application', 'domain')},
|
||||
},
|
||||
),
|
||||
]
|
||||
@@ -11,6 +11,7 @@ from typing import List, Optional
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import models as auth_models
|
||||
from django.contrib.auth.base_user import AbstractBaseUser
|
||||
from django.contrib.postgres.fields import ArrayField
|
||||
from django.core import mail, validators
|
||||
from django.core.exceptions import PermissionDenied, ValidationError
|
||||
from django.db import models
|
||||
@@ -18,8 +19,10 @@ from django.utils import timezone
|
||||
from django.utils.text import capfirst, slugify
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from lasuite.tools.email import get_domain_from_email
|
||||
from timezone_field import TimeZoneField
|
||||
|
||||
from . import fields, utils
|
||||
from .recording.enums import FileExtension
|
||||
|
||||
logger = getLogger(__name__)
|
||||
@@ -717,3 +720,101 @@ class RecordingAccess(BaseAccess):
|
||||
Compute and return abilities for a given user on the recording access.
|
||||
"""
|
||||
return self._get_abilities(self.recording, user)
|
||||
|
||||
|
||||
class ApplicationScope(models.TextChoices):
|
||||
"""Available permission scopes for application operations."""
|
||||
|
||||
ROOMS_CREATE = "rooms:create", _("Create rooms")
|
||||
ROOMS_LIST = "rooms:list", _("List rooms")
|
||||
ROOMS_RETRIEVE = "rooms:retrieve", _("Retrieve room details")
|
||||
ROOMS_UPDATE = "rooms:update", _("Update rooms")
|
||||
ROOMS_DELETE = "rooms:delete", _("Delete rooms")
|
||||
|
||||
|
||||
class Application(BaseModel):
|
||||
"""External application for API authentication and authorization.
|
||||
|
||||
Represents a third-party integration or automated system that accesses
|
||||
the API using OAuth2-style client credentials (client_id/client_secret).
|
||||
Supports scoped permissions and optional domain restrictions for delegation.
|
||||
"""
|
||||
|
||||
name = models.CharField(
|
||||
max_length=255,
|
||||
verbose_name=_("Application name"),
|
||||
help_text=_("Descriptive name for this application."),
|
||||
)
|
||||
active = models.BooleanField(default=True)
|
||||
client_id = models.CharField(
|
||||
max_length=100, unique=True, default=utils.generate_client_id
|
||||
)
|
||||
client_secret = fields.SecretField(
|
||||
max_length=255,
|
||||
blank=True,
|
||||
default=utils.generate_client_secret,
|
||||
help_text=_("Hashed on Save. Copy it now if this is a new secret."),
|
||||
)
|
||||
scopes = ArrayField(
|
||||
models.CharField(max_length=50, choices=ApplicationScope.choices),
|
||||
default=list,
|
||||
blank=True,
|
||||
)
|
||||
|
||||
class Meta:
|
||||
db_table = "meet_application"
|
||||
ordering = ("-created_at",)
|
||||
verbose_name = _("Application")
|
||||
verbose_name_plural = _("Applications")
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name!s}"
|
||||
|
||||
def can_delegate_email(self, email):
|
||||
"""Check if this application can delegate the given email."""
|
||||
|
||||
if not self.allowed_domains.exists():
|
||||
return True # No domain restrictions
|
||||
|
||||
domain = get_domain_from_email(email)
|
||||
return self.allowed_domains.filter(domain__iexact=domain).exists()
|
||||
|
||||
|
||||
class ApplicationDomain(BaseModel):
|
||||
"""Domain authorized for application delegation."""
|
||||
|
||||
domain = models.CharField(
|
||||
max_length=253, # Max domain length per RFC 1035
|
||||
validators=[
|
||||
validators.DomainNameValidator(
|
||||
accept_idna=False,
|
||||
message=_("Enter a valid domain"),
|
||||
)
|
||||
],
|
||||
verbose_name=_("Domain"),
|
||||
help_text=_("Email domain this application can act on behalf of."),
|
||||
)
|
||||
|
||||
application = models.ForeignKey(
|
||||
"Application",
|
||||
on_delete=models.CASCADE,
|
||||
related_name="allowed_domains",
|
||||
)
|
||||
|
||||
class Meta:
|
||||
db_table = "meet_application_domain"
|
||||
ordering = ("domain",)
|
||||
verbose_name = _("Application domain")
|
||||
verbose_name_plural = _("Application domains")
|
||||
unique_together = [("application", "domain")]
|
||||
|
||||
def __str__(self):
|
||||
"""Return string representation of the domain."""
|
||||
|
||||
return self.domain
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
"""Save the domain after normalizing to lowercase."""
|
||||
|
||||
self.domain = self.domain.lower().strip()
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
331
src/backend/core/tests/test_models_applications.py
Normal file
331
src/backend/core/tests/test_models_applications.py
Normal file
@@ -0,0 +1,331 @@
|
||||
"""
|
||||
Unit tests for the Application and ApplicationDomain models
|
||||
"""
|
||||
|
||||
# pylint: disable=W0613
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from django.contrib.auth.hashers import check_password
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
import pytest
|
||||
|
||||
from core.factories import ApplicationDomainFactory, ApplicationFactory
|
||||
from core.models import Application, ApplicationDomain, ApplicationScope
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
||||
# Application Model Tests
|
||||
|
||||
|
||||
def test_models_application_str():
|
||||
"""The str representation should be the name."""
|
||||
application = ApplicationFactory(name="My Integration")
|
||||
assert str(application) == "My Integration"
|
||||
|
||||
|
||||
def test_models_application_name_maxlength():
|
||||
"""The name field should be at most 255 characters."""
|
||||
ApplicationFactory(name="a" * 255)
|
||||
|
||||
with pytest.raises(ValidationError) as excinfo:
|
||||
ApplicationFactory(name="a" * 256)
|
||||
|
||||
assert "Ensure this value has at most 255 characters (it has 256)." in str(
|
||||
excinfo.value
|
||||
)
|
||||
|
||||
|
||||
def test_models_application_active_default():
|
||||
"""An application should be active by default."""
|
||||
application = Application.objects.create(name="Test App")
|
||||
assert application.active is True
|
||||
|
||||
|
||||
def test_models_application_scopes_default():
|
||||
"""Scopes should default to empty list."""
|
||||
application = Application.objects.create(name="Test App")
|
||||
assert application.scopes == []
|
||||
|
||||
|
||||
def test_models_application_client_id_auto_generated():
|
||||
"""Client ID should be automatically generated on creation."""
|
||||
application = ApplicationFactory()
|
||||
assert application.client_id is not None
|
||||
assert len(application.client_id) > 0
|
||||
|
||||
|
||||
def test_models_application_client_id_unique():
|
||||
"""Client IDs should be unique."""
|
||||
app1 = ApplicationFactory()
|
||||
|
||||
with pytest.raises(ValidationError) as excinfo:
|
||||
ApplicationFactory(client_id=app1.client_id)
|
||||
|
||||
assert "Application with this Client id already exists." in str(excinfo.value)
|
||||
|
||||
|
||||
def test_models_application_client_id_length(settings):
|
||||
"""Client ID should match configured length."""
|
||||
|
||||
app1 = ApplicationFactory()
|
||||
assert len(app1.client_id) == 40 # default value
|
||||
|
||||
settings.APPLICATION_CLIENT_ID_LENGTH = 20
|
||||
|
||||
app2 = ApplicationFactory()
|
||||
assert len(app2.client_id) == 20
|
||||
|
||||
|
||||
def test_models_application_client_secret_auto_generated():
|
||||
"""Client secret should be automatically generated and hashed on creation."""
|
||||
application = ApplicationFactory()
|
||||
|
||||
assert application.client_secret is not None
|
||||
assert len(application.client_secret) > 0
|
||||
|
||||
|
||||
def test_models_application_client_secret_hashed_on_save():
|
||||
"""Client secret should be hashed when saved."""
|
||||
plain_secret = "my-plain-secret"
|
||||
|
||||
with mock.patch(
|
||||
"core.models.utils.generate_client_secret", return_value=plain_secret
|
||||
):
|
||||
application = ApplicationFactory(client_secret=plain_secret)
|
||||
|
||||
# Secret should be hashed, not plain
|
||||
assert application.client_secret != plain_secret
|
||||
# Should verify with check_password
|
||||
assert check_password(plain_secret, application.client_secret) is True
|
||||
|
||||
|
||||
def test_models_application_client_secret_preserves_existing_hash():
|
||||
"""Re-saving should not re-hash an already hashed secret."""
|
||||
application = ApplicationFactory()
|
||||
original_hash = application.client_secret
|
||||
|
||||
# Update another field and save
|
||||
application.name = "Updated Name"
|
||||
application.save()
|
||||
|
||||
# Hash should remain unchanged
|
||||
assert application.client_secret == original_hash
|
||||
|
||||
|
||||
def test_models_application_updates_preserve_client_id():
|
||||
"""Application updates should preserve existing client_id."""
|
||||
application = ApplicationFactory()
|
||||
original_client_id = application.client_id
|
||||
|
||||
application.name = "Updated Name"
|
||||
application.save()
|
||||
|
||||
assert application.client_id == original_client_id
|
||||
|
||||
|
||||
def test_models_application_scopes_valid_choices():
|
||||
"""Only valid scope choices should be accepted."""
|
||||
application = ApplicationFactory(
|
||||
scopes=[
|
||||
ApplicationScope.ROOMS_LIST,
|
||||
ApplicationScope.ROOMS_CREATE,
|
||||
ApplicationScope.ROOMS_RETRIEVE,
|
||||
]
|
||||
)
|
||||
|
||||
assert len(application.scopes) == 3
|
||||
assert ApplicationScope.ROOMS_LIST in application.scopes
|
||||
|
||||
|
||||
def test_models_application_scopes_invalid_choice():
|
||||
"""Invalid scope choices should raise validation error."""
|
||||
with pytest.raises(ValidationError) as excinfo:
|
||||
ApplicationFactory(scopes=["invalid:scope"])
|
||||
|
||||
assert "is not a valid choice" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_models_application_can_delegate_email_no_restrictions():
|
||||
"""Application with no domain restrictions can delegate any email."""
|
||||
application = ApplicationFactory()
|
||||
|
||||
assert application.can_delegate_email("user@example.com") is True
|
||||
assert application.can_delegate_email("admin@anotherdomain.org") is True
|
||||
|
||||
|
||||
def test_models_application_can_delegate_email_allowed_domain():
|
||||
"""Application can delegate email from allowed domain."""
|
||||
application = ApplicationFactory()
|
||||
ApplicationDomainFactory(application=application, domain="example.com")
|
||||
|
||||
assert application.can_delegate_email("user@example.com") is True
|
||||
|
||||
|
||||
def test_models_application_can_delegate_email_denied_domain():
|
||||
"""Application cannot delegate email from non-allowed domain."""
|
||||
application = ApplicationFactory()
|
||||
ApplicationDomainFactory(application=application, domain="example.com")
|
||||
|
||||
assert application.can_delegate_email("user@other.com") is False
|
||||
|
||||
|
||||
def test_models_application_can_delegate_email_case_insensitive():
|
||||
"""Domain matching should be case-insensitive."""
|
||||
application = ApplicationFactory()
|
||||
ApplicationDomainFactory(application=application, domain="example.com")
|
||||
|
||||
assert application.can_delegate_email("user@EXAMPLE.COM") is True
|
||||
assert application.can_delegate_email("user@Example.Com") is True
|
||||
|
||||
|
||||
def test_models_application_can_delegate_email_multiple_domains():
|
||||
"""Application with multiple allowed domains should check all."""
|
||||
application = ApplicationFactory()
|
||||
ApplicationDomainFactory(application=application, domain="example.com")
|
||||
ApplicationDomainFactory(application=application, domain="other.org")
|
||||
|
||||
assert application.can_delegate_email("user@example.com") is True
|
||||
assert application.can_delegate_email("admin@other.org") is True
|
||||
assert application.can_delegate_email("test@denied.com") is False
|
||||
|
||||
|
||||
# ApplicationDomain Model Tests
|
||||
|
||||
|
||||
def test_models_application_domain_str():
|
||||
"""The str representation should be the domain."""
|
||||
domain = ApplicationDomainFactory(domain="example.com")
|
||||
assert str(domain) == "example.com"
|
||||
|
||||
|
||||
def test_models_application_domain_ordering():
|
||||
"""Domains should be returned ordered by domain name."""
|
||||
application = ApplicationFactory()
|
||||
ApplicationDomainFactory(application=application, domain="zulu.com")
|
||||
ApplicationDomainFactory(application=application, domain="alpha.com")
|
||||
ApplicationDomainFactory(application=application, domain="beta.com")
|
||||
|
||||
domains = ApplicationDomain.objects.all()
|
||||
assert domains[0].domain == "alpha.com"
|
||||
assert domains[1].domain == "beta.com"
|
||||
assert domains[2].domain == "zulu.com"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"valid_domain",
|
||||
[
|
||||
"example.com",
|
||||
"sub.example.com",
|
||||
"deep.sub.example.com",
|
||||
"example-with-dash.com",
|
||||
"123.example.com",
|
||||
],
|
||||
)
|
||||
def test_models_application_domain_valid_domain(valid_domain):
|
||||
"""Valid domain names should be accepted."""
|
||||
ApplicationDomainFactory(domain=valid_domain)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"invalid_domain",
|
||||
[
|
||||
"not a domain",
|
||||
"example..com",
|
||||
"-example.com",
|
||||
"example-.com",
|
||||
"example.com-",
|
||||
],
|
||||
)
|
||||
def test_models_application_domain_invalid_domain(invalid_domain):
|
||||
"""Invalid domain names should raise validation error."""
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
ApplicationDomainFactory(domain=invalid_domain)
|
||||
|
||||
|
||||
def test_models_application_domain_lowercase_on_save():
|
||||
"""Domain should be normalized to lowercase on save."""
|
||||
domain = ApplicationDomainFactory(domain="EXAMPLE.COM")
|
||||
|
||||
assert domain.domain == "example.com"
|
||||
|
||||
|
||||
def test_models_application_domain_strip_whitespace_on_save():
|
||||
"""Domain should strip whitespace on save."""
|
||||
domain = ApplicationDomainFactory(domain=" example.com ")
|
||||
|
||||
assert domain.domain == "example.com"
|
||||
|
||||
|
||||
def test_models_application_domain_combined_normalization():
|
||||
"""Domain should strip and lowercase in one operation."""
|
||||
domain = ApplicationDomainFactory(domain=" EXAMPLE.COM ")
|
||||
|
||||
assert domain.domain == "example.com"
|
||||
|
||||
|
||||
def test_models_application_domain_unique_together():
|
||||
"""Same domain cannot be added twice to same application."""
|
||||
application = ApplicationFactory()
|
||||
ApplicationDomainFactory(application=application, domain="example.com")
|
||||
|
||||
with pytest.raises(ValidationError) as excinfo:
|
||||
ApplicationDomainFactory(application=application, domain="example.com")
|
||||
|
||||
assert "Application domain with this Application and Domain already exists." in str(
|
||||
excinfo.value
|
||||
)
|
||||
|
||||
|
||||
def test_models_application_domain_same_domain_different_apps():
|
||||
"""Same domain can belong to different applications."""
|
||||
app1 = ApplicationFactory()
|
||||
app2 = ApplicationFactory()
|
||||
|
||||
ApplicationDomainFactory(application=app1, domain="example.com")
|
||||
ApplicationDomainFactory(application=app2, domain="example.com")
|
||||
|
||||
assert app1.allowed_domains.count() == 1
|
||||
assert app2.allowed_domains.count() == 1
|
||||
|
||||
|
||||
def test_models_application_domain_cascade_delete():
|
||||
"""Deleting application should delete its domains."""
|
||||
application = ApplicationFactory()
|
||||
ApplicationDomainFactory(application=application, domain="example.com")
|
||||
ApplicationDomainFactory(application=application, domain="other.com")
|
||||
|
||||
assert ApplicationDomain.objects.count() == 2
|
||||
|
||||
application.delete()
|
||||
|
||||
assert ApplicationDomain.objects.count() == 0
|
||||
|
||||
|
||||
def test_models_application_domain_related_name():
|
||||
"""Domains should be accessible via application.allowed_domains."""
|
||||
application = ApplicationFactory()
|
||||
domain1 = ApplicationDomainFactory(application=application, domain="example.com")
|
||||
domain2 = ApplicationDomainFactory(application=application, domain="other.com")
|
||||
|
||||
assert list(application.allowed_domains.all()) == [domain1, domain2]
|
||||
|
||||
|
||||
def test_models_application_domain_filters_delegation():
|
||||
"""Adding/removing domains should affect can_delegate_email."""
|
||||
application = ApplicationFactory()
|
||||
|
||||
# No restrictions initially
|
||||
assert application.can_delegate_email("user@example.com") is True
|
||||
|
||||
# Add domain restriction
|
||||
domain = ApplicationDomainFactory(application=application, domain="example.com")
|
||||
assert application.can_delegate_email("user@example.com") is True
|
||||
assert application.can_delegate_email("user@other.com") is False
|
||||
|
||||
# Remove domain restriction
|
||||
domain.delete()
|
||||
assert application.can_delegate_email("user@other.com") is True
|
||||
@@ -8,6 +8,7 @@ Utils functions used in the core app
|
||||
import hashlib
|
||||
import json
|
||||
import random
|
||||
import string
|
||||
from typing import List, Optional
|
||||
from uuid import uuid4
|
||||
|
||||
@@ -240,3 +241,42 @@ async def notify_participants(room_name: str, notification_data: dict):
|
||||
raise NotificationError("Failed to notify room participants") from e
|
||||
finally:
|
||||
await lkapi.aclose()
|
||||
|
||||
|
||||
ALPHANUMERIC_CHARSET = string.ascii_letters + string.digits
|
||||
|
||||
|
||||
def generate_secure_token(length: int = 30, charset: str = ALPHANUMERIC_CHARSET) -> str:
|
||||
"""Generate a cryptographically secure random token.
|
||||
|
||||
Uses SystemRandom for proper entropy, suitable for OAuth tokens
|
||||
and API credentials that must be non-guessable.
|
||||
|
||||
Inspired by: https://github.com/oauthlib/oauthlib/blob/master/oauthlib/common.py
|
||||
|
||||
Args:
|
||||
length: Token length in characters (default: 30)
|
||||
charset: Character set to use for generation
|
||||
|
||||
Returns:
|
||||
Cryptographically secure random token
|
||||
"""
|
||||
return "".join(secrets.choice(charset) for _ in range(length))
|
||||
|
||||
|
||||
def generate_client_id() -> str:
|
||||
"""Generate a unique client ID for application authentication.
|
||||
|
||||
Returns:
|
||||
Random client ID string
|
||||
"""
|
||||
return generate_secure_token(settings.APPLICATION_CLIENT_ID_LENGTH)
|
||||
|
||||
|
||||
def generate_client_secret() -> str:
|
||||
"""Generate a secure client secret for application authentication.
|
||||
|
||||
Returns:
|
||||
Cryptographically secure client secret
|
||||
"""
|
||||
return generate_secure_token(settings.APPLICATION_CLIENT_SECRET_LENGTH)
|
||||
|
||||
@@ -664,6 +664,18 @@ class Base(Configuration):
|
||||
environ_prefix=None,
|
||||
)
|
||||
|
||||
# External Applications
|
||||
APPLICATION_CLIENT_ID_LENGTH = values.PositiveIntegerValue(
|
||||
40,
|
||||
environ_name="APPLICATION_CLIENT_ID_LENGTH",
|
||||
environ_prefix=None,
|
||||
)
|
||||
APPLICATION_CLIENT_SECRET_LENGTH = values.PositiveIntegerValue(
|
||||
128,
|
||||
environ_name="APPLICATION_CLIENT_SECRET_LENGTH",
|
||||
environ_prefix=None,
|
||||
)
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
@property
|
||||
def ENVIRONMENT(self):
|
||||
|
||||
Reference in New Issue
Block a user