2025-10-03 00:28:56 +02:00
|
|
|
"""External API endpoints"""
|
|
|
|
|
|
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
|
from logging import getLogger
|
|
|
|
|
|
|
|
|
|
from django.conf import settings
|
|
|
|
|
from django.contrib.auth.hashers import check_password
|
|
|
|
|
from django.core.exceptions import ValidationError
|
|
|
|
|
from django.core.validators import validate_email
|
|
|
|
|
|
|
|
|
|
import jwt
|
2025-10-03 01:43:59 +02:00
|
|
|
from rest_framework import decorators, mixins, viewsets
|
2025-10-03 00:28:56 +02:00
|
|
|
from rest_framework import (
|
|
|
|
|
exceptions as drf_exceptions,
|
|
|
|
|
)
|
|
|
|
|
from rest_framework import (
|
|
|
|
|
response as drf_response,
|
|
|
|
|
)
|
|
|
|
|
from rest_framework import (
|
|
|
|
|
status as drf_status,
|
|
|
|
|
)
|
|
|
|
|
|
2025-10-03 01:43:59 +02:00
|
|
|
from core import api, models
|
2025-10-03 00:28:56 +02:00
|
|
|
|
2025-10-03 01:43:59 +02:00
|
|
|
from . import authentication, permissions, serializers
|
2025-10-03 00:28:56 +02:00
|
|
|
|
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ApplicationViewSet(viewsets.GenericViewSet):
|
|
|
|
|
"""API endpoints for application authentication and token generation."""
|
|
|
|
|
|
|
|
|
|
@decorators.action(
|
|
|
|
|
detail=False,
|
|
|
|
|
methods=["post"],
|
|
|
|
|
url_path="token",
|
|
|
|
|
url_name="token",
|
|
|
|
|
)
|
|
|
|
|
def generate_jwt_access_token(self, request, *args, **kwargs):
|
|
|
|
|
"""Generate JWT access token for application delegation.
|
|
|
|
|
|
|
|
|
|
Validates application credentials and generates a JWT token scoped
|
|
|
|
|
to a specific user email, allowing the application to act on behalf
|
|
|
|
|
of that user.
|
|
|
|
|
|
|
|
|
|
Note: The 'scope' parameter accepts an email address to identify the user
|
|
|
|
|
being delegated. This design allows applications to obtain user-scoped tokens
|
|
|
|
|
for delegation purposes. The scope field is intentionally generic and can be
|
|
|
|
|
extended to support other values in the future.
|
|
|
|
|
|
|
|
|
|
Reference: https://stackoverflow.com/a/27711422
|
|
|
|
|
"""
|
|
|
|
|
serializer = serializers.ApplicationJwtSerializer(data=request.data)
|
|
|
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
|
|
|
|
|
|
client_id = serializer.validated_data["client_id"]
|
|
|
|
|
client_secret = serializer.validated_data["client_secret"]
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
application = models.Application.objects.get(client_id=client_id)
|
|
|
|
|
except models.Application.DoesNotExist as e:
|
|
|
|
|
raise drf_exceptions.AuthenticationFailed("Invalid credentials") from e
|
|
|
|
|
|
|
|
|
|
if not application.active:
|
|
|
|
|
raise drf_exceptions.AuthenticationFailed("Application is inactive")
|
|
|
|
|
|
|
|
|
|
if not check_password(client_secret, application.client_secret):
|
|
|
|
|
raise drf_exceptions.AuthenticationFailed("Invalid credentials")
|
|
|
|
|
|
|
|
|
|
email = serializer.validated_data["scope"]
|
|
|
|
|
try:
|
|
|
|
|
validate_email(email)
|
|
|
|
|
except ValidationError:
|
|
|
|
|
return drf_response.Response(
|
|
|
|
|
{
|
|
|
|
|
"error": "Scope should be a valid email address.",
|
|
|
|
|
},
|
|
|
|
|
status=drf_status.HTTP_400_BAD_REQUEST,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not application.can_delegate_email(email):
|
|
|
|
|
logger.warning(
|
|
|
|
|
"Application %s denied delegation for %s",
|
|
|
|
|
application.client_id,
|
|
|
|
|
email,
|
|
|
|
|
)
|
|
|
|
|
return drf_response.Response(
|
|
|
|
|
{
|
|
|
|
|
"error": "This application is not authorized for this email domain.",
|
|
|
|
|
},
|
|
|
|
|
status=drf_status.HTTP_403_FORBIDDEN,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
user = models.User.objects.get(email=email)
|
|
|
|
|
except models.User.DoesNotExist as e:
|
|
|
|
|
raise drf_exceptions.NotFound(
|
|
|
|
|
{
|
|
|
|
|
"error": "User not found.",
|
|
|
|
|
}
|
|
|
|
|
) from e
|
|
|
|
|
|
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
|
scope = " ".join(application.scopes or [])
|
|
|
|
|
|
|
|
|
|
payload = {
|
|
|
|
|
"iss": settings.APPLICATION_JWT_ISSUER,
|
|
|
|
|
"aud": settings.APPLICATION_JWT_AUDIENCE,
|
|
|
|
|
"iat": now,
|
|
|
|
|
"exp": now + timedelta(seconds=settings.APPLICATION_JWT_EXPIRATION_SECONDS),
|
|
|
|
|
"client_id": client_id,
|
|
|
|
|
"scope": scope,
|
|
|
|
|
"user_id": str(user.id),
|
|
|
|
|
"delegated": True,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
token = jwt.encode(
|
|
|
|
|
payload,
|
|
|
|
|
settings.APPLICATION_JWT_SECRET_KEY,
|
|
|
|
|
algorithm=settings.APPLICATION_JWT_ALG,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return drf_response.Response(
|
|
|
|
|
{
|
|
|
|
|
"access_token": token,
|
|
|
|
|
"token_type": settings.APPLICATION_JWT_TOKEN_TYPE,
|
|
|
|
|
"expires_in": settings.APPLICATION_JWT_EXPIRATION_SECONDS,
|
|
|
|
|
"scope": scope,
|
|
|
|
|
},
|
|
|
|
|
status=drf_status.HTTP_200_OK,
|
|
|
|
|
)
|
2025-10-03 01:43:59 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class RoomViewSet(
|
|
|
|
|
mixins.CreateModelMixin,
|
|
|
|
|
mixins.RetrieveModelMixin,
|
|
|
|
|
mixins.ListModelMixin,
|
|
|
|
|
viewsets.GenericViewSet,
|
|
|
|
|
):
|
|
|
|
|
"""Application-delegated API for room management.
|
|
|
|
|
|
|
|
|
|
Provides JWT-authenticated access to room operations for external applications
|
|
|
|
|
acting on behalf of users. All operations are scope-based and filtered to the
|
|
|
|
|
authenticated user's accessible rooms.
|
|
|
|
|
|
|
|
|
|
Supported operations:
|
|
|
|
|
- list: List rooms the user has access to (requires 'rooms:list' scope)
|
|
|
|
|
- retrieve: Get room details (requires 'rooms:retrieve' scope)
|
|
|
|
|
- create: Create a new room owned by the user (requires 'rooms:create' scope)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
authentication_classes = [authentication.ApplicationJWTAuthentication]
|
|
|
|
|
permission_classes = [
|
|
|
|
|
api.permissions.IsAuthenticated & permissions.HasRequiredRoomScope
|
|
|
|
|
]
|
|
|
|
|
queryset = models.Room.objects.all()
|
|
|
|
|
serializer_class = serializers.RoomSerializer
|
|
|
|
|
|
|
|
|
|
def list(self, request, *args, **kwargs):
|
|
|
|
|
"""Limit listed rooms to the ones related to the authenticated user."""
|
|
|
|
|
|
|
|
|
|
user = self.request.user
|
|
|
|
|
|
|
|
|
|
if user.is_authenticated:
|
|
|
|
|
queryset = (
|
|
|
|
|
self.filter_queryset(self.get_queryset()).filter(users=user).distinct()
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
queryset = self.get_queryset().none()
|
|
|
|
|
|
|
|
|
|
page = self.paginate_queryset(queryset)
|
|
|
|
|
if page is not None:
|
|
|
|
|
serializer = self.get_serializer(page, many=True)
|
|
|
|
|
return self.get_paginated_response(serializer.data)
|
|
|
|
|
|
|
|
|
|
serializer = self.get_serializer(queryset, many=True)
|
|
|
|
|
return drf_response.Response(serializer.data)
|
|
|
|
|
|
|
|
|
|
def perform_create(self, serializer):
|
|
|
|
|
"""Set the current user as owner of the newly created room."""
|
|
|
|
|
room = serializer.save()
|
|
|
|
|
models.ResourceAccess.objects.create(
|
|
|
|
|
resource=room,
|
|
|
|
|
user=self.request.user,
|
|
|
|
|
role=models.RoleChoices.OWNER,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Log for auditing
|
|
|
|
|
logger.info(
|
|
|
|
|
"Room created via application: room_id=%s, user_id=%s, client_id=%s",
|
|
|
|
|
room.id,
|
|
|
|
|
self.request.user.id,
|
|
|
|
|
getattr(self.request.auth, "client_id", "unknown"),
|
|
|
|
|
)
|