diff --git a/src/backend/core/admin.py b/src/backend/core/admin.py index d16e90ad..9c7272e4 100644 --- a/src/backend/core/admin.py +++ b/src/backend/core/admin.py @@ -1,5 +1,6 @@ """Admin classes and registrations for core app.""" +from django import forms from django.contrib import admin from django.contrib.auth import admin as auth_admin from django.utils.translation import gettext_lazy as _ @@ -150,3 +151,66 @@ class RecordingAdmin(admin.ModelAdmin): return _("Multiple owners") return str(owners[0].user) + + +class ApplicationDomainInline(admin.TabularInline): + """Inline admin for managing allowed domains per application.""" + + model = models.ApplicationDomain + extra = 0 + + +class ApplicationAdminForm(forms.ModelForm): + """Custom form for Application admin with multi-select scopes.""" + + scopes = forms.MultipleChoiceField( + choices=models.ApplicationScope.choices, + widget=forms.CheckboxSelectMultiple, + required=False, + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if self.instance.pk and self.instance.scopes: + self.fields["scopes"].initial = self.instance.scopes + + +@admin.register(models.Application) +class ApplicationAdmin(admin.ModelAdmin): + """Admin interface for managing applications and their permissions.""" + + form = ApplicationAdminForm + + list_display = ("id", "name", "client_id", "get_scopes_display") + fields = [ + "name", + "id", + "created_at", + "updated_at", + "scopes", + "client_id", + "client_secret", + ] + readonly_fields = ["id", "created_at", "updated_at"] + inlines = [ApplicationDomainInline] + + def get_readonly_fields(self, request, obj=None): + """Make client_id and client_secret readonly after creation.""" + if obj: # Editing existing object + return self.readonly_fields + ["client_id", "client_secret"] + return self.readonly_fields + + def get_fields(self, request, obj=None): + """Hide client_secret after creation.""" + fields = super().get_fields(request, obj) + if obj: + return [f for f in fields if f != "client_secret"] + return fields + + def get_scopes_display(self, obj): + """Display scopes in list view.""" + if obj.scopes: + return ", ".join(obj.scopes) + return _("No scopes") + + get_scopes_display.short_description = _("Scopes") diff --git a/src/backend/core/factories.py b/src/backend/core/factories.py index 8f019f80..43f4fdfc 100644 --- a/src/backend/core/factories.py +++ b/src/backend/core/factories.py @@ -9,7 +9,7 @@ from django.utils.text import slugify import factory.fuzzy from faker import Faker -from core import models +from core import models, utils fake = Faker() @@ -117,3 +117,39 @@ class TeamRecordingAccessFactory(factory.django.DjangoModelFactory): recording = factory.SubFactory(RecordingFactory) team = factory.Sequence(lambda n: f"team{n}") role = factory.fuzzy.FuzzyChoice(models.RoleChoices.values) + + +class ApplicationFactory(factory.django.DjangoModelFactory): + """Create fake applications for testing.""" + + class Meta: + model = models.Application + + name = factory.Faker("company") + active = True + client_id = factory.LazyFunction(utils.generate_client_id) + client_secret = factory.LazyFunction(utils.generate_client_secret) + scopes = [] + + class Params: + """Factory traits for common application configurations.""" + + with_all_scopes = factory.Trait( + scopes=[ + models.ApplicationScope.ROOMS_LIST, + models.ApplicationScope.ROOMS_RETRIEVE, + models.ApplicationScope.ROOMS_CREATE, + models.ApplicationScope.ROOMS_UPDATE, + models.ApplicationScope.ROOMS_DELETE, + ] + ) + + +class ApplicationDomainFactory(factory.django.DjangoModelFactory): + """Create fake application domains for testing.""" + + class Meta: + model = models.ApplicationDomain + + domain = factory.Faker("domain_name") + application = factory.SubFactory(ApplicationFactory) diff --git a/src/backend/core/fields.py b/src/backend/core/fields.py new file mode 100644 index 00000000..2b3d6ddc --- /dev/null +++ b/src/backend/core/fields.py @@ -0,0 +1,43 @@ +""" +Core application fields +""" + +from logging import getLogger + +from django.contrib.auth.hashers import identify_hasher, make_password +from django.db import models + +logger = getLogger(__name__) + + +class SecretField(models.CharField): + """CharField that automatically hashes secrets before saving. + + Use for API keys, client secrets, or tokens that should never be stored + in plain text. Already-hashed values are preserved to prevent double-hashing. + + Inspired by: https://github.com/django-oauth-toolkit/django-oauth-toolkit + """ + + def pre_save(self, model_instance, add): + """Hash the secret if not already hashed, otherwise preserve it.""" + + secret = getattr(model_instance, self.attname) + + try: + hasher = identify_hasher(secret) + logger.debug( + "%s: %s is already hashed with %s.", + model_instance, + self.attname, + hasher, + ) + except ValueError: + logger.debug( + "%s: %s is not hashed; hashing it now.", model_instance, self.attname + ) + hashed_secret = make_password(secret) + setattr(model_instance, self.attname, hashed_secret) + return hashed_secret + + return super().pre_save(model_instance, add) diff --git a/src/backend/core/migrations/0015_application_and_more.py b/src/backend/core/migrations/0015_application_and_more.py new file mode 100644 index 00000000..260b18d4 --- /dev/null +++ b/src/backend/core/migrations/0015_application_and_more.py @@ -0,0 +1,52 @@ +# Generated by Django 5.2.6 on 2025-10-02 20:55 + +import core.utils +import django.db.models.deletion +import uuid +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0014_room_pin_code'), + ] + + operations = [ + migrations.CreateModel( + name='Application', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')), + ('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')), + ('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')), + ('name', models.CharField(help_text='Descriptive name for this application.', max_length=255, verbose_name='Application name')), + ('active', models.BooleanField(default=True)), + ('client_id', models.CharField(default=core.utils.generate_client_id, max_length=100, unique=True)), + ('client_secret', core.fields.SecretField(blank=True, default=core.utils.generate_client_secret, help_text='Hashed on Save. Copy it now if this is a new secret.', max_length=255)), + ('scopes', django.contrib.postgres.fields.ArrayField(base_field=models.CharField(choices=[('rooms:create', 'Create rooms'), ('rooms:list', 'List rooms'), ('rooms:retrieve', 'Retrieve room details'), ('rooms:update', 'Update rooms'), ('rooms:delete', 'Delete rooms')], max_length=50), blank=True, default=list, size=None)), + ], + options={ + 'verbose_name': 'Application', + 'verbose_name_plural': 'Applications', + 'db_table': 'meet_application', + 'ordering': ('-created_at',), + }, + ), + migrations.CreateModel( + name='ApplicationDomain', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')), + ('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created on')), + ('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated on')), + ('domain', models.CharField(help_text='Email domain this application can act on behalf of.', max_length=253, validators=[django.core.validators.DomainNameValidator(accept_idna=False, message='Enter a valid domain')], verbose_name='Domain')), + ('application', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='allowed_domains', to='core.application')), + ], + options={ + 'verbose_name': 'Application domain', + 'verbose_name_plural': 'Application domains', + 'db_table': 'meet_application_domain', + 'ordering': ('domain',), + 'unique_together': {('application', 'domain')}, + }, + ), + ] diff --git a/src/backend/core/models.py b/src/backend/core/models.py index b92b9291..89ff11cd 100644 --- a/src/backend/core/models.py +++ b/src/backend/core/models.py @@ -11,6 +11,7 @@ from typing import List, Optional from django.conf import settings from django.contrib.auth import models as auth_models from django.contrib.auth.base_user import AbstractBaseUser +from django.contrib.postgres.fields import ArrayField from django.core import mail, validators from django.core.exceptions import PermissionDenied, ValidationError from django.db import models @@ -18,8 +19,10 @@ from django.utils import timezone from django.utils.text import capfirst, slugify from django.utils.translation import gettext_lazy as _ +from lasuite.tools.email import get_domain_from_email from timezone_field import TimeZoneField +from . import fields, utils from .recording.enums import FileExtension logger = getLogger(__name__) @@ -717,3 +720,101 @@ class RecordingAccess(BaseAccess): Compute and return abilities for a given user on the recording access. """ return self._get_abilities(self.recording, user) + + +class ApplicationScope(models.TextChoices): + """Available permission scopes for application operations.""" + + ROOMS_CREATE = "rooms:create", _("Create rooms") + ROOMS_LIST = "rooms:list", _("List rooms") + ROOMS_RETRIEVE = "rooms:retrieve", _("Retrieve room details") + ROOMS_UPDATE = "rooms:update", _("Update rooms") + ROOMS_DELETE = "rooms:delete", _("Delete rooms") + + +class Application(BaseModel): + """External application for API authentication and authorization. + + Represents a third-party integration or automated system that accesses + the API using OAuth2-style client credentials (client_id/client_secret). + Supports scoped permissions and optional domain restrictions for delegation. + """ + + name = models.CharField( + max_length=255, + verbose_name=_("Application name"), + help_text=_("Descriptive name for this application."), + ) + active = models.BooleanField(default=True) + client_id = models.CharField( + max_length=100, unique=True, default=utils.generate_client_id + ) + client_secret = fields.SecretField( + max_length=255, + blank=True, + default=utils.generate_client_secret, + help_text=_("Hashed on Save. Copy it now if this is a new secret."), + ) + scopes = ArrayField( + models.CharField(max_length=50, choices=ApplicationScope.choices), + default=list, + blank=True, + ) + + class Meta: + db_table = "meet_application" + ordering = ("-created_at",) + verbose_name = _("Application") + verbose_name_plural = _("Applications") + + def __str__(self): + return f"{self.name!s}" + + def can_delegate_email(self, email): + """Check if this application can delegate the given email.""" + + if not self.allowed_domains.exists(): + return True # No domain restrictions + + domain = get_domain_from_email(email) + return self.allowed_domains.filter(domain__iexact=domain).exists() + + +class ApplicationDomain(BaseModel): + """Domain authorized for application delegation.""" + + domain = models.CharField( + max_length=253, # Max domain length per RFC 1035 + validators=[ + validators.DomainNameValidator( + accept_idna=False, + message=_("Enter a valid domain"), + ) + ], + verbose_name=_("Domain"), + help_text=_("Email domain this application can act on behalf of."), + ) + + application = models.ForeignKey( + "Application", + on_delete=models.CASCADE, + related_name="allowed_domains", + ) + + class Meta: + db_table = "meet_application_domain" + ordering = ("domain",) + verbose_name = _("Application domain") + verbose_name_plural = _("Application domains") + unique_together = [("application", "domain")] + + def __str__(self): + """Return string representation of the domain.""" + + return self.domain + + def save(self, *args, **kwargs): + """Save the domain after normalizing to lowercase.""" + + self.domain = self.domain.lower().strip() + super().save(*args, **kwargs) diff --git a/src/backend/core/tests/test_models_applications.py b/src/backend/core/tests/test_models_applications.py new file mode 100644 index 00000000..035738c0 --- /dev/null +++ b/src/backend/core/tests/test_models_applications.py @@ -0,0 +1,331 @@ +""" +Unit tests for the Application and ApplicationDomain models +""" + +# pylint: disable=W0613 + +from unittest import mock + +from django.contrib.auth.hashers import check_password +from django.core.exceptions import ValidationError + +import pytest + +from core.factories import ApplicationDomainFactory, ApplicationFactory +from core.models import Application, ApplicationDomain, ApplicationScope + +pytestmark = pytest.mark.django_db + + +# Application Model Tests + + +def test_models_application_str(): + """The str representation should be the name.""" + application = ApplicationFactory(name="My Integration") + assert str(application) == "My Integration" + + +def test_models_application_name_maxlength(): + """The name field should be at most 255 characters.""" + ApplicationFactory(name="a" * 255) + + with pytest.raises(ValidationError) as excinfo: + ApplicationFactory(name="a" * 256) + + assert "Ensure this value has at most 255 characters (it has 256)." in str( + excinfo.value + ) + + +def test_models_application_active_default(): + """An application should be active by default.""" + application = Application.objects.create(name="Test App") + assert application.active is True + + +def test_models_application_scopes_default(): + """Scopes should default to empty list.""" + application = Application.objects.create(name="Test App") + assert application.scopes == [] + + +def test_models_application_client_id_auto_generated(): + """Client ID should be automatically generated on creation.""" + application = ApplicationFactory() + assert application.client_id is not None + assert len(application.client_id) > 0 + + +def test_models_application_client_id_unique(): + """Client IDs should be unique.""" + app1 = ApplicationFactory() + + with pytest.raises(ValidationError) as excinfo: + ApplicationFactory(client_id=app1.client_id) + + assert "Application with this Client id already exists." in str(excinfo.value) + + +def test_models_application_client_id_length(settings): + """Client ID should match configured length.""" + + app1 = ApplicationFactory() + assert len(app1.client_id) == 40 # default value + + settings.APPLICATION_CLIENT_ID_LENGTH = 20 + + app2 = ApplicationFactory() + assert len(app2.client_id) == 20 + + +def test_models_application_client_secret_auto_generated(): + """Client secret should be automatically generated and hashed on creation.""" + application = ApplicationFactory() + + assert application.client_secret is not None + assert len(application.client_secret) > 0 + + +def test_models_application_client_secret_hashed_on_save(): + """Client secret should be hashed when saved.""" + plain_secret = "my-plain-secret" + + with mock.patch( + "core.models.utils.generate_client_secret", return_value=plain_secret + ): + application = ApplicationFactory(client_secret=plain_secret) + + # Secret should be hashed, not plain + assert application.client_secret != plain_secret + # Should verify with check_password + assert check_password(plain_secret, application.client_secret) is True + + +def test_models_application_client_secret_preserves_existing_hash(): + """Re-saving should not re-hash an already hashed secret.""" + application = ApplicationFactory() + original_hash = application.client_secret + + # Update another field and save + application.name = "Updated Name" + application.save() + + # Hash should remain unchanged + assert application.client_secret == original_hash + + +def test_models_application_updates_preserve_client_id(): + """Application updates should preserve existing client_id.""" + application = ApplicationFactory() + original_client_id = application.client_id + + application.name = "Updated Name" + application.save() + + assert application.client_id == original_client_id + + +def test_models_application_scopes_valid_choices(): + """Only valid scope choices should be accepted.""" + application = ApplicationFactory( + scopes=[ + ApplicationScope.ROOMS_LIST, + ApplicationScope.ROOMS_CREATE, + ApplicationScope.ROOMS_RETRIEVE, + ] + ) + + assert len(application.scopes) == 3 + assert ApplicationScope.ROOMS_LIST in application.scopes + + +def test_models_application_scopes_invalid_choice(): + """Invalid scope choices should raise validation error.""" + with pytest.raises(ValidationError) as excinfo: + ApplicationFactory(scopes=["invalid:scope"]) + + assert "is not a valid choice" in str(excinfo.value) + + +def test_models_application_can_delegate_email_no_restrictions(): + """Application with no domain restrictions can delegate any email.""" + application = ApplicationFactory() + + assert application.can_delegate_email("user@example.com") is True + assert application.can_delegate_email("admin@anotherdomain.org") is True + + +def test_models_application_can_delegate_email_allowed_domain(): + """Application can delegate email from allowed domain.""" + application = ApplicationFactory() + ApplicationDomainFactory(application=application, domain="example.com") + + assert application.can_delegate_email("user@example.com") is True + + +def test_models_application_can_delegate_email_denied_domain(): + """Application cannot delegate email from non-allowed domain.""" + application = ApplicationFactory() + ApplicationDomainFactory(application=application, domain="example.com") + + assert application.can_delegate_email("user@other.com") is False + + +def test_models_application_can_delegate_email_case_insensitive(): + """Domain matching should be case-insensitive.""" + application = ApplicationFactory() + ApplicationDomainFactory(application=application, domain="example.com") + + assert application.can_delegate_email("user@EXAMPLE.COM") is True + assert application.can_delegate_email("user@Example.Com") is True + + +def test_models_application_can_delegate_email_multiple_domains(): + """Application with multiple allowed domains should check all.""" + application = ApplicationFactory() + ApplicationDomainFactory(application=application, domain="example.com") + ApplicationDomainFactory(application=application, domain="other.org") + + assert application.can_delegate_email("user@example.com") is True + assert application.can_delegate_email("admin@other.org") is True + assert application.can_delegate_email("test@denied.com") is False + + +# ApplicationDomain Model Tests + + +def test_models_application_domain_str(): + """The str representation should be the domain.""" + domain = ApplicationDomainFactory(domain="example.com") + assert str(domain) == "example.com" + + +def test_models_application_domain_ordering(): + """Domains should be returned ordered by domain name.""" + application = ApplicationFactory() + ApplicationDomainFactory(application=application, domain="zulu.com") + ApplicationDomainFactory(application=application, domain="alpha.com") + ApplicationDomainFactory(application=application, domain="beta.com") + + domains = ApplicationDomain.objects.all() + assert domains[0].domain == "alpha.com" + assert domains[1].domain == "beta.com" + assert domains[2].domain == "zulu.com" + + +@pytest.mark.parametrize( + "valid_domain", + [ + "example.com", + "sub.example.com", + "deep.sub.example.com", + "example-with-dash.com", + "123.example.com", + ], +) +def test_models_application_domain_valid_domain(valid_domain): + """Valid domain names should be accepted.""" + ApplicationDomainFactory(domain=valid_domain) + + +@pytest.mark.parametrize( + "invalid_domain", + [ + "not a domain", + "example..com", + "-example.com", + "example-.com", + "example.com-", + ], +) +def test_models_application_domain_invalid_domain(invalid_domain): + """Invalid domain names should raise validation error.""" + + with pytest.raises(ValidationError): + ApplicationDomainFactory(domain=invalid_domain) + + +def test_models_application_domain_lowercase_on_save(): + """Domain should be normalized to lowercase on save.""" + domain = ApplicationDomainFactory(domain="EXAMPLE.COM") + + assert domain.domain == "example.com" + + +def test_models_application_domain_strip_whitespace_on_save(): + """Domain should strip whitespace on save.""" + domain = ApplicationDomainFactory(domain=" example.com ") + + assert domain.domain == "example.com" + + +def test_models_application_domain_combined_normalization(): + """Domain should strip and lowercase in one operation.""" + domain = ApplicationDomainFactory(domain=" EXAMPLE.COM ") + + assert domain.domain == "example.com" + + +def test_models_application_domain_unique_together(): + """Same domain cannot be added twice to same application.""" + application = ApplicationFactory() + ApplicationDomainFactory(application=application, domain="example.com") + + with pytest.raises(ValidationError) as excinfo: + ApplicationDomainFactory(application=application, domain="example.com") + + assert "Application domain with this Application and Domain already exists." in str( + excinfo.value + ) + + +def test_models_application_domain_same_domain_different_apps(): + """Same domain can belong to different applications.""" + app1 = ApplicationFactory() + app2 = ApplicationFactory() + + ApplicationDomainFactory(application=app1, domain="example.com") + ApplicationDomainFactory(application=app2, domain="example.com") + + assert app1.allowed_domains.count() == 1 + assert app2.allowed_domains.count() == 1 + + +def test_models_application_domain_cascade_delete(): + """Deleting application should delete its domains.""" + application = ApplicationFactory() + ApplicationDomainFactory(application=application, domain="example.com") + ApplicationDomainFactory(application=application, domain="other.com") + + assert ApplicationDomain.objects.count() == 2 + + application.delete() + + assert ApplicationDomain.objects.count() == 0 + + +def test_models_application_domain_related_name(): + """Domains should be accessible via application.allowed_domains.""" + application = ApplicationFactory() + domain1 = ApplicationDomainFactory(application=application, domain="example.com") + domain2 = ApplicationDomainFactory(application=application, domain="other.com") + + assert list(application.allowed_domains.all()) == [domain1, domain2] + + +def test_models_application_domain_filters_delegation(): + """Adding/removing domains should affect can_delegate_email.""" + application = ApplicationFactory() + + # No restrictions initially + assert application.can_delegate_email("user@example.com") is True + + # Add domain restriction + domain = ApplicationDomainFactory(application=application, domain="example.com") + assert application.can_delegate_email("user@example.com") is True + assert application.can_delegate_email("user@other.com") is False + + # Remove domain restriction + domain.delete() + assert application.can_delegate_email("user@other.com") is True diff --git a/src/backend/core/utils.py b/src/backend/core/utils.py index 92f16eaa..8f62bd56 100644 --- a/src/backend/core/utils.py +++ b/src/backend/core/utils.py @@ -8,6 +8,7 @@ Utils functions used in the core app import hashlib import json import random +import string from typing import List, Optional from uuid import uuid4 @@ -240,3 +241,42 @@ async def notify_participants(room_name: str, notification_data: dict): raise NotificationError("Failed to notify room participants") from e finally: await lkapi.aclose() + + +ALPHANUMERIC_CHARSET = string.ascii_letters + string.digits + + +def generate_secure_token(length: int = 30, charset: str = ALPHANUMERIC_CHARSET) -> str: + """Generate a cryptographically secure random token. + + Uses SystemRandom for proper entropy, suitable for OAuth tokens + and API credentials that must be non-guessable. + + Inspired by: https://github.com/oauthlib/oauthlib/blob/master/oauthlib/common.py + + Args: + length: Token length in characters (default: 30) + charset: Character set to use for generation + + Returns: + Cryptographically secure random token + """ + return "".join(secrets.choice(charset) for _ in range(length)) + + +def generate_client_id() -> str: + """Generate a unique client ID for application authentication. + + Returns: + Random client ID string + """ + return generate_secure_token(settings.APPLICATION_CLIENT_ID_LENGTH) + + +def generate_client_secret() -> str: + """Generate a secure client secret for application authentication. + + Returns: + Cryptographically secure client secret + """ + return generate_secure_token(settings.APPLICATION_CLIENT_SECRET_LENGTH) diff --git a/src/backend/meet/settings.py b/src/backend/meet/settings.py index 51d10e88..31b64ec2 100755 --- a/src/backend/meet/settings.py +++ b/src/backend/meet/settings.py @@ -664,6 +664,18 @@ class Base(Configuration): environ_prefix=None, ) + # External Applications + APPLICATION_CLIENT_ID_LENGTH = values.PositiveIntegerValue( + 40, + environ_name="APPLICATION_CLIENT_ID_LENGTH", + environ_prefix=None, + ) + APPLICATION_CLIENT_SECRET_LENGTH = values.PositiveIntegerValue( + 128, + environ_name="APPLICATION_CLIENT_SECRET_LENGTH", + environ_prefix=None, + ) + # pylint: disable=invalid-name @property def ENVIRONMENT(self):