This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
people/src/backend/core/api/viewsets.py

365 lines
12 KiB
Python
Raw Normal View History

"""API endpoints"""
import io
import uuid
from http import HTTPStatus
from django.contrib.postgres.search import TrigramSimilarity
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.db.models import (
CharField,
Count,
F,
Func,
OuterRef,
Prefetch,
Q,
Subquery,
TextField,
Value,
functions,
)
from rest_framework import decorators, mixins, pagination, response, viewsets
from rest_framework import permissions as drf_permissions
from rest_framework.exceptions import ValidationError as DRFValidationError
from core import enums, models
from . import permissions, serializers
class NestedGenericViewSet(viewsets.GenericViewSet):
"""
A generic Viewset aims to be used in a nested route context.
e.g: `/api/v1.0/resource_1/<resource_1_pk>/resource_2/<resource_2_pk>/`
It allows to define all url kwargs and lookup fields to perform the lookup.
"""
lookup_fields: list[str] = ["pk"]
lookup_url_kwargs: list[str] = []
def __getattribute__(self, item):
"""
This method is overridden to allow to get the last lookup field or lookup url kwarg
when accessing the `lookup_field` or `lookup_url_kwarg` attribute. This is useful
to keep compatibility with all methods used by the parent class `GenericViewSet`.
"""
if item in ["lookup_field", "lookup_url_kwarg"]:
return getattr(self, item + "s", [None])[-1]
return super().__getattribute__(item)
def get_queryset(self):
"""
Get the list of items for this view.
`lookup_fields` attribute is enumerated here to perform the nested lookup.
"""
queryset = super().get_queryset()
# The last lookup field is removed to perform the nested lookup as it corresponds
# to the object pk, it is used within get_object method.
lookup_url_kwargs = (
self.lookup_url_kwargs[:-1]
if self.lookup_url_kwargs
else self.lookup_fields[:-1]
)
filter_kwargs = {}
for index, lookup_url_kwarg in enumerate(lookup_url_kwargs):
if lookup_url_kwarg not in self.kwargs:
raise KeyError(
f"Expected view {self.__class__.__name__} to be called with a URL "
f'keyword argument named "{lookup_url_kwarg}". Fix your URL conf, or '
"set the `.lookup_fields` attribute on the view correctly."
)
filter_kwargs.update(
{self.lookup_fields[index]: self.kwargs[lookup_url_kwarg]}
)
return queryset.filter(**filter_kwargs)
class SerializerPerActionMixin:
"""
A mixin to allow to define serializer classes for each action.
This mixin is useful to avoid to define a serializer class for each action in the
`get_serializer_class` method.
"""
serializer_classes: dict[str, type] = {}
default_serializer_class: type = None
def get_serializer_class(self):
"""
Return the serializer class to use depending on the action.
"""
return self.serializer_classes.get(self.action, self.default_serializer_class)
class Pagination(pagination.PageNumberPagination):
"""Pagination to display no more than 100 objects per page sorted by creation date."""
ordering = "-created_on"
max_page_size = 100
page_size_query_param = "page_size"
class ContactViewSet(
mixins.CreateModelMixin,
mixins.DestroyModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
):
"""Contact ViewSet"""
permission_classes = [permissions.IsOwnedOrPublic]
queryset = models.Contact.objects.all()
serializer_class = serializers.ContactSerializer
def list(self, request, *args, **kwargs):
"""Limit listed users by a query with throttle protection."""
user = self.request.user
queryset = self.filter_queryset(self.get_queryset())
if not user.is_authenticated:
return queryset.none()
# Exclude contacts that:
queryset = queryset.filter(
# - belong to another user (keep public and owned contacts)
Q(owner__isnull=True) | Q(owner=user),
# - are profile contacts for a user
user__isnull=True,
# - are overriden base contacts
overriding_contacts__isnull=True,
)
# Search by case-insensitive and accent-insensitive trigram similarity
if query := self.request.GET.get("q", ""):
query = Func(Value(query), function="unaccent")
similarity = TrigramSimilarity(
Func("full_name", function="unaccent"),
query,
) + TrigramSimilarity(Func("short_name", function="unaccent"), query)
queryset = (
queryset.annotate(similarity=similarity)
.filter(
similarity__gte=0.05
) # Value determined by testing (test_api_contacts.py)
.order_by("-similarity")
)
# Throttle protection
key_base = f"throttle-contact-list-{user.id!s}"
key_minute = f"{key_base:s}-minute"
key_hour = f"{key_base:s}-hour"
key_day = f"{key_base:s}-day"
try:
count_minute = cache.incr(key_minute)
except ValueError:
cache.set(key_minute, 1, 60)
count_minute = 1
try:
count_hour = cache.incr(key_hour)
except ValueError:
cache.set(key_hour, 1, 3600)
count_hour = 1
try:
count_day = cache.incr(key_day)
except ValueError:
cache.set(key_day, 1, 86400)
count_day = 1
if count_minute > 20 or count_hour > 150 or count_day > 500:
raise drf_exceptions.Throttled()
serializer = self.get_serializer(queryset, many=True)
return response.Response(serializer.data)
def perform_create(self, serializer):
"""Set the current user as owner of the newly created contact."""
user = self.request.user
serializer.validated_data["owner"] = user
return super().perform_create(serializer)
class UserViewSet(
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
):
"""User ViewSet"""
permission_classes = [permissions.IsSelf]
queryset = models.User.objects.all().select_related("profile_contact")
serializer_class = serializers.UserSerializer
@decorators.action(
detail=False,
methods=["get"],
url_name="me",
url_path="me",
permission_classes=[permissions.IsAuthenticated],
)
def get_me(self, request):
"""
Return information on currently logged user
"""
context = {"request": request}
return response.Response(
self.serializer_class(request.user, context=context).data
)
class TeamViewSet(
mixins.CreateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
):
"""Team ViewSet"""
permission_classes = [permissions.AccessPermission]
serializer_class = serializers.TeamSerializer
queryset = models.Team.objects.all()
def get_queryset(self):
"""Custom queryset to get user related teams."""
user_role_query = models.TeamAccess.objects.filter(
user=self.request.user, team=OuterRef("pk")
).values("role")[:1]
return models.Team.objects.filter(accesses__user=self.request.user).annotate(
user_role=Subquery(user_role_query)
)
def perform_create(self, serializer):
"""Set the current user as owner of the newly created team."""
team = serializer.save()
models.TeamAccess.objects.create(
team=team,
user=self.request.user,
role=models.RoleChoices.OWNER,
)
class TeamAccessViewSet(
mixins.CreateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
):
"""
API ViewSet for all interactions with team accesses.
GET /api/v1.0/teams/<team_id>/accesses/:<team_access_id>
Return list of all team accesses related to the logged-in user or one
team access if an id is provided.
POST /api/v1.0/teams/<team_id>/accesses/ with expected data:
- user: str
- role: str [owner|admin|member]
Return newly created team access
PUT /api/v1.0/teams/<team_id>/accesses/<team_access_id>/ with expected data:
- role: str [owner|admin|member]
Return updated team access
PATCH /api/v1.0/teams/<team_id>/accesses/<team_access_id>/ with expected data:
- role: str [owner|admin|member]
Return partially updated team access
DELETE /api/v1.0/teams/<team_id>/accesses/<team_access_id>/
Delete targeted team access
"""
lookup_field = "pk"
pagination_class = Pagination
permission_classes = [permissions.AccessPermission]
queryset = models.TeamAccess.objects.all().select_related("user")
serializer_class = serializers.TeamAccessSerializer
def get_permissions(self):
"""User only needs to be authenticated to list team accesses"""
if self.action == "list":
permission_classes = [permissions.IsAuthenticated]
else:
return super().get_permissions()
return [permission() for permission in permission_classes]
def get_serializer_context(self):
"""Extra context provided to the serializer class."""
context = super().get_serializer_context()
context["team_id"] = self.kwargs["team_id"]
return context
def get_queryset(self):
"""Return the queryset according to the action."""
queryset = super().get_queryset()
queryset = queryset.filter(team=self.kwargs["team_id"])
if self.action == "list":
# Limit to team access instances related to a team THAT also has a team access
# instance for the logged-in user (we don't want to list only the team access
# instances pointing to the logged-in user)
user_role_query = models.TeamAccess.objects.filter(
team__accesses__user=self.request.user
).values("role")[:1]
queryset = (
queryset.filter(
team__accesses__user=self.request.user,
)
.annotate(user_role=Subquery(user_role_query))
.distinct()
)
return queryset
def destroy(self, request, *args, **kwargs):
"""Forbid deleting the last owner access"""
instance = self.get_object()
team = instance.team
# Check if the access being deleted is the last owner access for the team
if instance.role == "owner" and team.accesses.filter(role="owner").count() == 1:
return Response(
{"detail": "Cannot delete the last owner access for the team."},
status=400,
)
return super().destroy(request, *args, **kwargs)
def perform_update(self, serializer):
"""Check that we don't change the role if it leads to losing the last owner."""
instance = serializer.instance
# Check if the role is being updated and the new role is not "owner"
if (
"role" in self.request.data
and self.request.data["role"] != models.RoleChoices.OWNER
):
team = instance.team
# Check if the access being updated is the last owner access for the team
if (
instance.role == models.RoleChoices.OWNER
and team.accesses.filter(role=models.RoleChoices.OWNER).count() == 1
):
raise serializers.ValidationError(
{
"role": "Cannot change the role to a non-owner role for the last owner access."
}
)
serializer.save()