🔒️(backend) add application validation when consuming external JWT

Token generation already verifies that the application is active, but this
guarantee was not enforced when the token was used. This change adds a
runtime check to ensure the client_id claim matches an existing and active
application when evaluating permissions.

This also introduces an emergency revocation mechanism, allowing all previously
issued tokens for a given application to be invalidated if the application is
disabled.
This commit is contained in:
lebaudantoine
2026-02-09 13:48:55 +01:00
committed by aleb_the_flash
parent 6742f5d19d
commit 69c6e58017
3 changed files with 90 additions and 11 deletions

View File

@@ -24,6 +24,7 @@ and this project adheres to
### Fixed
- 🔐(backend) enforce object-level permission checks on room endpoint #959
- 🔒️(backend) add application validation when consuming external JWT #963
## [1.5.0] - 2026-01-28

View File

@@ -10,6 +10,8 @@ import jwt as pyJwt
from lasuite.oidc_resource_server.backend import ResourceServerBackend as LaSuiteBackend
from rest_framework import authentication, exceptions
from core.models import Application
User = get_user_model()
logger = logging.getLogger(__name__)
@@ -94,6 +96,18 @@ class ApplicationJWTAuthentication(authentication.BaseAuthentication):
logger.warning("Missing 'client_id' in JWT payload")
raise exceptions.AuthenticationFailed("Invalid token claims.")
try:
application = Application.objects.get(client_id=client_id)
except Application.DoesNotExist as e:
logger.warning("Application not found: %s", client_id)
raise exceptions.AuthenticationFailed("Application not found.") from e
if not application.active:
logger.warning(
"Inactive application attempted authentication: %s", client_id
)
raise exceptions.AuthenticationFailed("Application is disabled.")
if not is_delegated:
logger.warning("Token is not marked as delegated")
raise exceptions.AuthenticationFailed("Invalid token type.")

View File

@@ -16,10 +16,7 @@ import responses
from lasuite.oidc_resource_server.authentication import ResourceServerAuthentication
from rest_framework.test import APIClient
from core.factories import (
RoomFactory,
UserFactory,
)
from core.factories import ApplicationFactory, RoomFactory, UserFactory
from core.models import ApplicationScope, RoleChoices, Room, RoomAccessLevel, User
pytestmark = pytest.mark.django_db
@@ -30,12 +27,14 @@ def generate_test_token(user, scopes):
now = datetime.now(timezone.utc)
scope_string = " ".join(scopes)
application = ApplicationFactory()
payload = {
"iss": settings.APPLICATION_JWT_ISSUER,
"aud": settings.APPLICATION_JWT_AUDIENCE,
"iat": now,
"exp": now + timedelta(seconds=settings.APPLICATION_JWT_EXPIRATION_SECONDS),
"client_id": "test-client-id",
"client_id": str(application.client_id),
"scope": scope_string,
"user_id": str(user.id),
"delegated": True,
@@ -664,6 +663,7 @@ def test_api_rooms_token_scope_case_insensitive(settings):
"""Token's scope should be case-insensitive."""
settings.APPLICATION_JWT_SECRET_KEY = "devKey"
user = UserFactory()
application = ApplicationFactory()
# Generate token with mixed-case scope "Rooms:List" to verify that scope
# validation is case-insensitive (should match "rooms:list")
@@ -673,7 +673,7 @@ def test_api_rooms_token_scope_case_insensitive(settings):
"aud": settings.APPLICATION_JWT_AUDIENCE,
"iat": now,
"exp": now + timedelta(hours=1),
"client_id": "test-client",
"client_id": str(application.client_id),
"scope": "Rooms:List", # Mixed case - should be accepted as "rooms:list"
"user_id": str(user.id),
"delegated": True,
@@ -695,6 +695,7 @@ def test_api_rooms_token_without_delegated_flag(settings):
"""Token without delegated flag should be rejected."""
settings.APPLICATION_JWT_SECRET_KEY = "devKey"
user = UserFactory()
application = ApplicationFactory()
# Generate token without delegated flag
now = datetime.now(timezone.utc)
@@ -703,7 +704,7 @@ def test_api_rooms_token_without_delegated_flag(settings):
"aud": settings.APPLICATION_JWT_AUDIENCE,
"iat": now,
"exp": now + timedelta(hours=1),
"client_id": "test-client",
"client_id": str(application.client_id),
"scope": "rooms:list",
"user_id": str(user.id),
"delegated": False, # Not delegated
@@ -727,6 +728,7 @@ def test_api_rooms_token_invalid_signature(mock_rs_authenticate, settings):
"""Token signed with an invalid key should defer to the next authentication."""
settings.APPLICATION_JWT_SECRET_KEY = "devKey"
user = UserFactory()
application = ApplicationFactory()
# Generate token without delegated flag
now = datetime.now(timezone.utc)
@@ -735,7 +737,7 @@ def test_api_rooms_token_invalid_signature(mock_rs_authenticate, settings):
"aud": settings.APPLICATION_JWT_AUDIENCE,
"iat": now,
"exp": now + timedelta(hours=1),
"client_id": "test-client",
"client_id": str(application.client_id),
"scope": "rooms:list",
"user_id": str(user.id),
"delegated": True,
@@ -820,6 +822,7 @@ def test_api_rooms_token_missing_client_id(settings):
def test_api_rooms_token_missing_user_id(settings):
"""Token without user_id should be rejected."""
settings.APPLICATION_JWT_SECRET_KEY = "devKey"
application = ApplicationFactory()
now = datetime.now(timezone.utc)
payload = {
@@ -827,7 +830,7 @@ def test_api_rooms_token_missing_user_id(settings):
"aud": settings.APPLICATION_JWT_AUDIENCE,
"iat": now,
"exp": now + timedelta(hours=1),
"client_id": "test-client",
"client_id": str(application.client_id),
"scope": "rooms:list",
"delegated": True,
# Missing user_id
@@ -850,6 +853,7 @@ def test_api_rooms_token_invalid_audience(settings):
"""Token with an invalid audience should be rejected."""
settings.APPLICATION_JWT_SECRET_KEY = "devKey"
user = UserFactory()
application = ApplicationFactory()
now = datetime.now(timezone.utc)
payload = {
@@ -857,7 +861,7 @@ def test_api_rooms_token_invalid_audience(settings):
"aud": "invalid-audience",
"iat": now,
"exp": now + timedelta(hours=1),
"client_id": "test-client",
"client_id": str(application.client_id),
"user_id": str(user.id),
"scope": "rooms:list",
"delegated": True,
@@ -879,6 +883,7 @@ def test_api_rooms_token_invalid_audience(settings):
def test_api_rooms_token_unknown_user(settings):
"""Token for unknown user should be rejected."""
settings.APPLICATION_JWT_SECRET_KEY = "devKey"
application = ApplicationFactory()
now = datetime.now(timezone.utc)
payload = {
@@ -886,7 +891,7 @@ def test_api_rooms_token_unknown_user(settings):
"aud": settings.APPLICATION_JWT_AUDIENCE,
"iat": now,
"exp": now + timedelta(hours=1),
"client_id": "test-client",
"client_id": str(application.client_id),
"user_id": str(uuid.uuid4()),
"scope": "rooms:list",
"delegated": True,
@@ -905,6 +910,65 @@ def test_api_rooms_token_unknown_user(settings):
assert "user not found." in str(response.data).lower()
def test_api_rooms_token_unknown_application(settings):
"""Token for unknown application should be rejected."""
settings.APPLICATION_JWT_SECRET_KEY = "devKey"
now = datetime.now(timezone.utc)
payload = {
"iss": settings.APPLICATION_JWT_ISSUER,
"aud": settings.APPLICATION_JWT_AUDIENCE,
"iat": now,
"exp": now + timedelta(hours=1),
"client_id": "unknown-client-id",
"user_id": str(uuid.uuid4()),
"scope": "rooms:list",
"delegated": True,
}
token = jwt.encode(
payload,
settings.APPLICATION_JWT_SECRET_KEY,
algorithm=settings.APPLICATION_JWT_ALG,
)
client = APIClient()
client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
response = client.get("/external-api/v1.0/rooms/")
assert response.status_code == 401
assert "application not found." in str(response.data).lower()
def test_api_rooms_token_inactive_application(settings):
"""Token for inactive application should be rejected."""
settings.APPLICATION_JWT_SECRET_KEY = "devKey"
application = ApplicationFactory(active=False)
now = datetime.now(timezone.utc)
payload = {
"iss": settings.APPLICATION_JWT_ISSUER,
"aud": settings.APPLICATION_JWT_AUDIENCE,
"iat": now,
"exp": now + timedelta(hours=1),
"client_id": str(application.client_id),
"user_id": str(uuid.uuid4()),
"scope": "rooms:list",
"delegated": True,
}
token = jwt.encode(
payload,
settings.APPLICATION_JWT_SECRET_KEY,
algorithm=settings.APPLICATION_JWT_ALG,
)
client = APIClient()
client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
response = client.get("/external-api/v1.0/rooms/")
assert response.status_code == 401
assert "application is disabled." in str(response.data).lower()
@responses.activate
def test_resource_server_creates_user_on_first_authentication(settings):
"""New user should be created during first authentication.