diff --git a/CHANGELOG.md b/CHANGELOG.md index 312896ea..6414a57f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ and this project adheres to ### Fixed - 🔐(backend) enforce object-level permission checks on room endpoint #959 +- 🔒️(backend) add application validation when consuming external JWT #963 ## [1.5.0] - 2026-01-28 diff --git a/src/backend/core/external_api/authentication.py b/src/backend/core/external_api/authentication.py index 03c4b253..2ff16e2c 100644 --- a/src/backend/core/external_api/authentication.py +++ b/src/backend/core/external_api/authentication.py @@ -10,6 +10,8 @@ import jwt as pyJwt from lasuite.oidc_resource_server.backend import ResourceServerBackend as LaSuiteBackend from rest_framework import authentication, exceptions +from core.models import Application + User = get_user_model() logger = logging.getLogger(__name__) @@ -94,6 +96,18 @@ class ApplicationJWTAuthentication(authentication.BaseAuthentication): logger.warning("Missing 'client_id' in JWT payload") raise exceptions.AuthenticationFailed("Invalid token claims.") + try: + application = Application.objects.get(client_id=client_id) + except Application.DoesNotExist as e: + logger.warning("Application not found: %s", client_id) + raise exceptions.AuthenticationFailed("Application not found.") from e + + if not application.active: + logger.warning( + "Inactive application attempted authentication: %s", client_id + ) + raise exceptions.AuthenticationFailed("Application is disabled.") + if not is_delegated: logger.warning("Token is not marked as delegated") raise exceptions.AuthenticationFailed("Invalid token type.") diff --git a/src/backend/core/tests/test_external_api_rooms.py b/src/backend/core/tests/test_external_api_rooms.py index 1fc8c276..7a6ae487 100644 --- a/src/backend/core/tests/test_external_api_rooms.py +++ b/src/backend/core/tests/test_external_api_rooms.py @@ -16,10 +16,7 @@ import responses from lasuite.oidc_resource_server.authentication import ResourceServerAuthentication from rest_framework.test import APIClient -from core.factories import ( - RoomFactory, - UserFactory, -) +from core.factories import ApplicationFactory, RoomFactory, UserFactory from core.models import ApplicationScope, RoleChoices, Room, RoomAccessLevel, User pytestmark = pytest.mark.django_db @@ -30,12 +27,14 @@ def generate_test_token(user, scopes): now = datetime.now(timezone.utc) scope_string = " ".join(scopes) + application = ApplicationFactory() + payload = { "iss": settings.APPLICATION_JWT_ISSUER, "aud": settings.APPLICATION_JWT_AUDIENCE, "iat": now, "exp": now + timedelta(seconds=settings.APPLICATION_JWT_EXPIRATION_SECONDS), - "client_id": "test-client-id", + "client_id": str(application.client_id), "scope": scope_string, "user_id": str(user.id), "delegated": True, @@ -664,6 +663,7 @@ def test_api_rooms_token_scope_case_insensitive(settings): """Token's scope should be case-insensitive.""" settings.APPLICATION_JWT_SECRET_KEY = "devKey" user = UserFactory() + application = ApplicationFactory() # Generate token with mixed-case scope "Rooms:List" to verify that scope # validation is case-insensitive (should match "rooms:list") @@ -673,7 +673,7 @@ def test_api_rooms_token_scope_case_insensitive(settings): "aud": settings.APPLICATION_JWT_AUDIENCE, "iat": now, "exp": now + timedelta(hours=1), - "client_id": "test-client", + "client_id": str(application.client_id), "scope": "Rooms:List", # Mixed case - should be accepted as "rooms:list" "user_id": str(user.id), "delegated": True, @@ -695,6 +695,7 @@ def test_api_rooms_token_without_delegated_flag(settings): """Token without delegated flag should be rejected.""" settings.APPLICATION_JWT_SECRET_KEY = "devKey" user = UserFactory() + application = ApplicationFactory() # Generate token without delegated flag now = datetime.now(timezone.utc) @@ -703,7 +704,7 @@ def test_api_rooms_token_without_delegated_flag(settings): "aud": settings.APPLICATION_JWT_AUDIENCE, "iat": now, "exp": now + timedelta(hours=1), - "client_id": "test-client", + "client_id": str(application.client_id), "scope": "rooms:list", "user_id": str(user.id), "delegated": False, # Not delegated @@ -727,6 +728,7 @@ def test_api_rooms_token_invalid_signature(mock_rs_authenticate, settings): """Token signed with an invalid key should defer to the next authentication.""" settings.APPLICATION_JWT_SECRET_KEY = "devKey" user = UserFactory() + application = ApplicationFactory() # Generate token without delegated flag now = datetime.now(timezone.utc) @@ -735,7 +737,7 @@ def test_api_rooms_token_invalid_signature(mock_rs_authenticate, settings): "aud": settings.APPLICATION_JWT_AUDIENCE, "iat": now, "exp": now + timedelta(hours=1), - "client_id": "test-client", + "client_id": str(application.client_id), "scope": "rooms:list", "user_id": str(user.id), "delegated": True, @@ -820,6 +822,7 @@ def test_api_rooms_token_missing_client_id(settings): def test_api_rooms_token_missing_user_id(settings): """Token without user_id should be rejected.""" settings.APPLICATION_JWT_SECRET_KEY = "devKey" + application = ApplicationFactory() now = datetime.now(timezone.utc) payload = { @@ -827,7 +830,7 @@ def test_api_rooms_token_missing_user_id(settings): "aud": settings.APPLICATION_JWT_AUDIENCE, "iat": now, "exp": now + timedelta(hours=1), - "client_id": "test-client", + "client_id": str(application.client_id), "scope": "rooms:list", "delegated": True, # Missing user_id @@ -850,6 +853,7 @@ def test_api_rooms_token_invalid_audience(settings): """Token with an invalid audience should be rejected.""" settings.APPLICATION_JWT_SECRET_KEY = "devKey" user = UserFactory() + application = ApplicationFactory() now = datetime.now(timezone.utc) payload = { @@ -857,7 +861,7 @@ def test_api_rooms_token_invalid_audience(settings): "aud": "invalid-audience", "iat": now, "exp": now + timedelta(hours=1), - "client_id": "test-client", + "client_id": str(application.client_id), "user_id": str(user.id), "scope": "rooms:list", "delegated": True, @@ -879,6 +883,7 @@ def test_api_rooms_token_invalid_audience(settings): def test_api_rooms_token_unknown_user(settings): """Token for unknown user should be rejected.""" settings.APPLICATION_JWT_SECRET_KEY = "devKey" + application = ApplicationFactory() now = datetime.now(timezone.utc) payload = { @@ -886,7 +891,7 @@ def test_api_rooms_token_unknown_user(settings): "aud": settings.APPLICATION_JWT_AUDIENCE, "iat": now, "exp": now + timedelta(hours=1), - "client_id": "test-client", + "client_id": str(application.client_id), "user_id": str(uuid.uuid4()), "scope": "rooms:list", "delegated": True, @@ -905,6 +910,65 @@ def test_api_rooms_token_unknown_user(settings): assert "user not found." in str(response.data).lower() +def test_api_rooms_token_unknown_application(settings): + """Token for unknown application should be rejected.""" + settings.APPLICATION_JWT_SECRET_KEY = "devKey" + + now = datetime.now(timezone.utc) + payload = { + "iss": settings.APPLICATION_JWT_ISSUER, + "aud": settings.APPLICATION_JWT_AUDIENCE, + "iat": now, + "exp": now + timedelta(hours=1), + "client_id": "unknown-client-id", + "user_id": str(uuid.uuid4()), + "scope": "rooms:list", + "delegated": True, + } + token = jwt.encode( + payload, + settings.APPLICATION_JWT_SECRET_KEY, + algorithm=settings.APPLICATION_JWT_ALG, + ) + + client = APIClient() + client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}") + response = client.get("/external-api/v1.0/rooms/") + + assert response.status_code == 401 + assert "application not found." in str(response.data).lower() + + +def test_api_rooms_token_inactive_application(settings): + """Token for inactive application should be rejected.""" + settings.APPLICATION_JWT_SECRET_KEY = "devKey" + application = ApplicationFactory(active=False) + + now = datetime.now(timezone.utc) + payload = { + "iss": settings.APPLICATION_JWT_ISSUER, + "aud": settings.APPLICATION_JWT_AUDIENCE, + "iat": now, + "exp": now + timedelta(hours=1), + "client_id": str(application.client_id), + "user_id": str(uuid.uuid4()), + "scope": "rooms:list", + "delegated": True, + } + token = jwt.encode( + payload, + settings.APPLICATION_JWT_SECRET_KEY, + algorithm=settings.APPLICATION_JWT_ALG, + ) + + client = APIClient() + client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}") + response = client.get("/external-api/v1.0/rooms/") + + assert response.status_code == 401 + assert "application is disabled." in str(response.data).lower() + + @responses.activate def test_resource_server_creates_user_on_first_authentication(settings): """New user should be created during first authentication.