(models/api) add document model and API

We do this by making copies of existing Template and TemplateAccess
models and API. A little refactoring is done to try to limit duplicate
code.
This commit is contained in:
Samuel Paccoud - DINUM
2024-04-03 18:50:28 +02:00
committed by Anthony LC
parent 0024cc5814
commit 3e0739cd0a
17 changed files with 2978 additions and 247 deletions

View File

@@ -25,16 +25,11 @@ class UserSerializer(serializers.ModelSerializer):
read_only_fields = ["id", "is_device", "is_staff"]
class TemplateAccessSerializer(serializers.ModelSerializer):
class BaseAccessSerializer(serializers.ModelSerializer):
"""Serialize template accesses."""
abilities = serializers.SerializerMethodField(read_only=True)
class Meta:
model = models.TemplateAccess
fields = ["id", "user", "team", "role", "abilities"]
read_only_fields = ["id", "abilities"]
def update(self, instance, validated_data):
"""Make "user" field is readonly but only on update."""
validated_data.pop("user", None)
@@ -71,55 +66,88 @@ class TemplateAccessSerializer(serializers.ModelSerializer):
else:
teams = user.get_teams()
try:
template_id = self.context["template_id"]
resource_id = self.context["resource_id"]
except KeyError as exc:
raise exceptions.ValidationError(
"You must set a template ID in kwargs to create a new template access."
"You must set a resource ID in kwargs to create a new access."
) from exc
if not models.TemplateAccess.objects.filter(
if not self.Meta.model.objects.filter( # pylint: disable=no-member
Q(user=user) | Q(team__in=teams),
template=template_id,
role__in=[models.RoleChoices.OWNER, models.RoleChoices.ADMIN],
).exists():
raise exceptions.PermissionDenied(
"You are not allowed to manage accesses for this template."
"You are not allowed to manage accesses for this resource."
)
if (
role == models.RoleChoices.OWNER
and not models.TemplateAccess.objects.filter(
and not self.Meta.model.objects.filter( # pylint: disable=no-member
Q(user=user) | Q(team__in=teams),
template=template_id,
role=models.RoleChoices.OWNER,
**{self.Meta.resource_field_name: resource_id}, # pylint: disable=no-member
).exists()
):
raise exceptions.PermissionDenied(
"Only owners of a template can assign other users as owners."
"Only owners of a resource can assign other users as owners."
)
attrs["template_id"] = self.context["template_id"]
# pylint: disable=no-member
attrs[f"{self.Meta.resource_field_name}_id"] = self.context["resource_id"]
return attrs
class TemplateSerializer(serializers.ModelSerializer):
"""Serialize templates."""
class DocumentAccessSerializer(BaseAccessSerializer):
"""Serialize document accesses."""
class Meta:
model = models.DocumentAccess
resource_field_name = "document"
fields = ["id", "user", "team", "role", "abilities"]
read_only_fields = ["id", "abilities"]
class TemplateAccessSerializer(BaseAccessSerializer):
"""Serialize template accesses."""
class Meta:
model = models.TemplateAccess
resource_field_name = "template"
fields = ["id", "user", "team", "role", "abilities"]
read_only_fields = ["id", "abilities"]
class BaseResourceSerializer(serializers.ModelSerializer):
"""Serialize documents."""
abilities = serializers.SerializerMethodField(read_only=True)
accesses = TemplateAccessSerializer(many=True, read_only=True)
def get_abilities(self, document) -> dict:
"""Return abilities of the logged-in user on the instance."""
request = self.context.get("request")
if request:
return document.get_abilities(request.user)
return {}
class DocumentSerializer(BaseResourceSerializer):
"""Serialize documents."""
class Meta:
model = models.Document
fields = ["id", "title", "accesses", "abilities"]
read_only_fields = ["id", "accesses", "abilities"]
class TemplateSerializer(BaseResourceSerializer):
"""Serialize templates."""
class Meta:
model = models.Template
fields = ["id", "title", "accesses", "abilities"]
read_only_fields = ["id", "accesses", "abilities"]
def get_abilities(self, template) -> dict:
"""Return abilities of the logged-in user on the instance."""
request = self.context.get("request")
if request:
return template.get_abilities(request.user)
return {}
# pylint: disable=abstract-method
class DocumentGenerationSerializer(serializers.Serializer):

View File

@@ -135,7 +135,201 @@ class UserViewSet(
)
class ResourceViewsetMixin:
"""Mixin with methods common to all resource viewsets that are managed with accesses."""
def get_queryset(self):
"""Custom queryset to get user related resources."""
queryset = super().get_queryset()
if not self.request.user.is_authenticated:
return queryset.filter(is_public=True)
user = self.request.user
teams = user.get_teams()
user_roles_query = (
self.access_model_class.objects.filter(
Q(user=user) | Q(team__in=teams),
**{self.resource_field_name: OuterRef("pk")},
)
.values(self.resource_field_name)
.annotate(roles_array=ArrayAgg("role"))
.values("roles_array")
)
return (
queryset.filter(
Q(accesses__user=user) | Q(accesses__team__in=teams) | Q(is_public=True)
)
.annotate(user_roles=Subquery(user_roles_query))
.distinct()
)
def perform_create(self, serializer):
"""Set the current user as owner of the newly created object."""
obj = serializer.save()
self.access_model_class.objects.create(
user=self.request.user,
role=models.RoleChoices.OWNER,
**{self.resource_field_name: obj},
)
class ResourceAccessViewsetMixin:
"""Mixin with methods common to all access viewsets."""
def get_permissions(self):
"""User only needs to be authenticated to list resource accesses"""
if self.action == "list":
permission_classes = [permissions.IsAuthenticated]
else:
return super().get_permissions()
return [permission() for permission in permission_classes]
def get_serializer_context(self):
"""Extra context provided to the serializer class."""
context = super().get_serializer_context()
context["resource_id"] = self.kwargs["resource_id"]
return context
def get_queryset(self):
"""Return the queryset according to the action."""
queryset = super().get_queryset()
queryset = queryset.filter(
**{self.resource_field_name: self.kwargs["resource_id"]}
)
if self.action == "list":
user = self.request.user
teams = user.get_teams()
user_roles_query = (
queryset.filter(
Q(user=user) | Q(team__in=teams),
**{self.resource_field_name: self.kwargs["resource_id"]},
)
.values(self.resource_field_name)
.annotate(roles_array=ArrayAgg("role"))
.values("roles_array")
)
# Limit to resource access instances related to a resource THAT also has
# a resource access
# instance for the logged-in user (we don't want to list only the resource
# access instances pointing to the logged-in user)
queryset = (
queryset.filter(
Q(**{f"{self.resource_field_name}__accesses__user": user})
| Q(**{f"{self.resource_field_name}__accesses__team__in": teams}),
**{self.resource_field_name: self.kwargs["resource_id"]},
)
.annotate(user_roles=Subquery(user_roles_query))
.distinct()
)
return queryset
def destroy(self, request, *args, **kwargs):
"""Forbid deleting the last owner access"""
instance = self.get_object()
resource = getattr(instance, self.resource_field_name)
# Check if the access being deleted is the last owner access for the resource
if (
instance.role == "owner"
and resource.accesses.filter(role="owner").count() == 1
):
return drf_response.Response(
{"detail": "Cannot delete the last owner access for the resource."},
status=403,
)
return super().destroy(request, *args, **kwargs)
def perform_update(self, serializer):
"""Check that we don't change the role if it leads to losing the last owner."""
instance = serializer.instance
# Check if the role is being updated and the new role is not "owner"
if (
"role" in self.request.data
and self.request.data["role"] != models.RoleChoices.OWNER
):
resource = getattr(instance, self.resource_field_name)
# Check if the access being updated is the last owner access for the resource
if (
instance.role == models.RoleChoices.OWNER
and resource.accesses.filter(role=models.RoleChoices.OWNER).count() == 1
):
message = "Cannot change the role to a non-owner role for the last owner access."
raise exceptions.PermissionDenied({"detail": message})
serializer.save()
class DocumentViewSet(
ResourceViewsetMixin,
mixins.CreateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
):
"""Document ViewSet"""
permission_classes = [
permissions.IsAuthenticatedOrSafe,
permissions.AccessPermission,
]
serializer_class = serializers.DocumentSerializer
access_model_class = models.DocumentAccess
resource_field_name = "document"
queryset = models.Document.objects.all()
class DocumentAccessViewSet(
ResourceAccessViewsetMixin,
mixins.CreateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
):
"""
API ViewSet for all interactions with document accesses.
GET /api/v1.0/documents/<resource_id>/accesses/:<document_access_id>
Return list of all document accesses related to the logged-in user or one
document access if an id is provided.
POST /api/v1.0/documents/<resource_id>/accesses/ with expected data:
- user: str
- role: str [owner|admin|member]
Return newly created document access
PUT /api/v1.0/documents/<resource_id>/accesses/<document_access_id>/ with expected data:
- role: str [owner|admin|member]
Return updated document access
PATCH /api/v1.0/documents/<resource_id>/accesses/<document_access_id>/ with expected data:
- role: str [owner|admin|member]
Return partially updated document access
DELETE /api/v1.0/documents/<resource_id>/accesses/<document_access_id>/
Delete targeted document access
"""
lookup_field = "pk"
pagination_class = Pagination
permission_classes = [permissions.IsAuthenticated, permissions.AccessPermission]
queryset = models.DocumentAccess.objects.select_related("user").all()
resource_field_name = "document"
serializer_class = serializers.DocumentAccessSerializer
class TemplateViewSet(
ResourceViewsetMixin,
mixins.CreateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
@@ -150,41 +344,10 @@ class TemplateViewSet(
permissions.AccessPermission,
]
serializer_class = serializers.TemplateSerializer
access_model_class = models.TemplateAccess
resource_field_name = "template"
queryset = models.Template.objects.all()
def get_queryset(self):
"""Custom queryset to get user related templates."""
if not self.request.user.is_authenticated:
return models.Template.objects.filter(is_public=True)
user = self.request.user
teams = user.get_teams()
user_roles_query = (
models.TemplateAccess.objects.filter(
Q(user=user) | Q(team__in=teams), template=OuterRef("pk")
)
.values("template")
.annotate(roles_array=ArrayAgg("role"))
.values("roles_array")
)
return (
models.Template.objects.filter(
Q(accesses__user=user) | Q(accesses__team__in=teams) | Q(is_public=True)
)
.annotate(user_roles=Subquery(user_roles_query))
.distinct()
)
def perform_create(self, serializer):
"""Set the current user as owner of the newly created template."""
template = serializer.save()
models.TemplateAccess.objects.create(
template=template,
user=self.request.user,
role=models.RoleChoices.OWNER,
)
@decorators.action(
detail=True,
methods=["post"],
@@ -214,6 +377,7 @@ class TemplateViewSet(
class TemplateAccessViewSet(
ResourceAccessViewsetMixin,
mixins.CreateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
@@ -248,91 +412,6 @@ class TemplateAccessViewSet(
lookup_field = "pk"
pagination_class = Pagination
permission_classes = [permissions.IsAuthenticated, permissions.AccessPermission]
queryset = models.TemplateAccess.objects.all().select_related("user")
queryset = models.TemplateAccess.objects.select_related("user").all()
resource_field_name = "template"
serializer_class = serializers.TemplateAccessSerializer
def get_permissions(self):
"""User only needs to be authenticated to list template accesses"""
if self.action == "list":
permission_classes = [permissions.IsAuthenticated]
else:
return super().get_permissions()
return [permission() for permission in permission_classes]
def get_serializer_context(self):
"""Extra context provided to the serializer class."""
context = super().get_serializer_context()
context["template_id"] = self.kwargs["template_id"]
return context
def get_queryset(self):
"""Return the queryset according to the action."""
queryset = super().get_queryset()
queryset = queryset.filter(template=self.kwargs["template_id"])
if self.action == "list":
user = self.request.user
teams = user.get_teams()
user_roles_query = (
models.TemplateAccess.objects.filter(
Q(user=user) | Q(team__in=teams),
template=self.kwargs["template_id"],
)
.values("template")
.annotate(roles_array=ArrayAgg("role"))
.values("roles_array")
)
# Limit to template access instances related to a template THAT also has
# a template access
# instance for the logged-in user (we don't want to list only the template
# access instances pointing to the logged-in user)
queryset = (
queryset.filter(
Q(template__accesses__user=user)
| Q(template__accesses__team__in=teams),
template=self.kwargs["template_id"],
)
.annotate(user_roles=Subquery(user_roles_query))
.distinct()
)
return queryset
def destroy(self, request, *args, **kwargs):
"""Forbid deleting the last owner access"""
instance = self.get_object()
template = instance.template
# Check if the access being deleted is the last owner access for the template
if (
instance.role == "owner"
and template.accesses.filter(role="owner").count() == 1
):
return drf_response.Response(
{"detail": "Cannot delete the last owner access for the template."},
status=403,
)
return super().destroy(request, *args, **kwargs)
def perform_update(self, serializer):
"""Check that we don't change the role if it leads to losing the last owner."""
instance = serializer.instance
# Check if the role is being updated and the new role is not "owner"
if (
"role" in self.request.data
and self.request.data["role"] != models.RoleChoices.OWNER
):
template = instance.template
# Check if the access being updated is the last owner access for the template
if (
instance.role == models.RoleChoices.OWNER
and template.accesses.filter(role=models.RoleChoices.OWNER).count() == 1
):
message = "Cannot change the role to a non-owner role for the last owner access."
raise exceptions.PermissionDenied({"detail": message})
serializer.save()