♻️(backend) simplify roles by returning only the max role
We were returning the list of roles a user has on a document (direct and inherited). Now that we introduced priority on roles, we are able to determine what is the max role and return only this one. This commit also changes the role that is returned for the restricted reach: we now return None because the role is not relevant in this case.
This commit is contained in:
committed by
Anthony LC
parent
0a9a583a67
commit
611ba496d2
@@ -6,7 +6,6 @@ Declare and configure the models for the impress core application
|
||||
import hashlib
|
||||
import smtplib
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from datetime import timedelta
|
||||
from logging import getLogger
|
||||
|
||||
@@ -33,7 +32,13 @@ from rest_framework.exceptions import ValidationError
|
||||
from timezone_field import TimeZoneField
|
||||
from treebeard.mp_tree import MP_Node, MP_NodeManager, MP_NodeQuerySet
|
||||
|
||||
from .choices import PRIVILEGED_ROLES, LinkReachChoices, LinkRoleChoices, RoleChoices
|
||||
from .choices import (
|
||||
PRIVILEGED_ROLES,
|
||||
LinkReachChoices,
|
||||
LinkRoleChoices,
|
||||
RoleChoices,
|
||||
get_equivalent_link_definition,
|
||||
)
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
@@ -284,9 +289,9 @@ class BaseAccess(BaseModel):
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
def _get_roles(self, resource, user):
|
||||
def _get_role(self, resource, user):
|
||||
"""
|
||||
Get the roles a user has on a resource.
|
||||
Get the role a user has on a resource.
|
||||
"""
|
||||
roles = []
|
||||
if user.is_authenticated:
|
||||
@@ -301,23 +306,20 @@ class BaseAccess(BaseModel):
|
||||
except (self._meta.model.DoesNotExist, IndexError):
|
||||
roles = []
|
||||
|
||||
return roles
|
||||
return RoleChoices.max(*roles)
|
||||
|
||||
def _get_abilities(self, resource, user):
|
||||
"""
|
||||
Compute and return abilities for a given user taking into account
|
||||
the current state of the object.
|
||||
"""
|
||||
roles = self._get_roles(resource, user)
|
||||
role = self._get_role(resource, user)
|
||||
is_owner_or_admin = role in (RoleChoices.OWNER, RoleChoices.ADMIN)
|
||||
|
||||
is_owner_or_admin = bool(
|
||||
set(roles).intersection({RoleChoices.OWNER, RoleChoices.ADMIN})
|
||||
)
|
||||
if self.role == RoleChoices.OWNER:
|
||||
can_delete = (
|
||||
RoleChoices.OWNER in roles
|
||||
and resource.accesses.filter(role=RoleChoices.OWNER).count() > 1
|
||||
)
|
||||
can_delete = (role == RoleChoices.OWNER) and resource.accesses.filter(
|
||||
role=RoleChoices.OWNER
|
||||
).count() > 1
|
||||
set_role_to = (
|
||||
[RoleChoices.ADMIN, RoleChoices.EDITOR, RoleChoices.READER]
|
||||
if can_delete
|
||||
@@ -326,7 +328,7 @@ class BaseAccess(BaseModel):
|
||||
else:
|
||||
can_delete = is_owner_or_admin
|
||||
set_role_to = []
|
||||
if RoleChoices.OWNER in roles:
|
||||
if role == RoleChoices.OWNER:
|
||||
set_role_to.append(RoleChoices.OWNER)
|
||||
if is_owner_or_admin:
|
||||
set_role_to.extend(
|
||||
@@ -343,7 +345,7 @@ class BaseAccess(BaseModel):
|
||||
"destroy": can_delete,
|
||||
"update": bool(set_role_to),
|
||||
"partial_update": bool(set_role_to),
|
||||
"retrieve": bool(roles),
|
||||
"retrieve": bool(role),
|
||||
"set_role_to": set_role_to,
|
||||
}
|
||||
|
||||
@@ -419,6 +421,7 @@ class DocumentManager(MP_NodeManager.from_queryset(DocumentQuerySet)):
|
||||
return self._queryset_class(self.model).order_by("path")
|
||||
|
||||
|
||||
# pylint: disable=too-many-public-methods
|
||||
class Document(MP_Node, BaseModel):
|
||||
"""Pad document carrying the content."""
|
||||
|
||||
@@ -486,6 +489,11 @@ class Document(MP_Node, BaseModel):
|
||||
def __str__(self):
|
||||
return str(self.title) if self.title else str(_("Untitled Document"))
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Initialize cache property."""
|
||||
super().__init__(*args, **kwargs)
|
||||
self._ancestors_link_definition = None
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
"""Write content to object storage only if _content has changed."""
|
||||
super().save(*args, **kwargs)
|
||||
@@ -673,37 +681,22 @@ class Document(MP_Node, BaseModel):
|
||||
cache_key = document.get_nb_accesses_cache_key()
|
||||
cache.delete(cache_key)
|
||||
|
||||
def get_roles(self, user):
|
||||
def get_role(self, user):
|
||||
"""Return the roles a user has on a document."""
|
||||
if not user.is_authenticated:
|
||||
return []
|
||||
return None
|
||||
|
||||
try:
|
||||
roles = self.user_roles or []
|
||||
except AttributeError:
|
||||
try:
|
||||
roles = DocumentAccess.objects.filter(
|
||||
models.Q(user=user) | models.Q(team__in=user.teams),
|
||||
document__path=Left(
|
||||
models.Value(self.path), Length("document__path")
|
||||
),
|
||||
).values_list("role", flat=True)
|
||||
except (models.ObjectDoesNotExist, IndexError):
|
||||
roles = []
|
||||
return roles
|
||||
roles = DocumentAccess.objects.filter(
|
||||
models.Q(user=user) | models.Q(team__in=user.teams),
|
||||
document__path=Left(models.Value(self.path), Length("document__path")),
|
||||
).values_list("role", flat=True)
|
||||
|
||||
def get_ancestors_links_definitions(self, ancestors_links):
|
||||
"""Get links reach/role definitions for ancestors of the current document."""
|
||||
return RoleChoices.max(*roles)
|
||||
|
||||
ancestors_links_definitions = defaultdict(set)
|
||||
for ancestor in ancestors_links:
|
||||
ancestors_links_definitions[ancestor["link_reach"]].add(
|
||||
ancestor["link_role"]
|
||||
)
|
||||
|
||||
return ancestors_links_definitions
|
||||
|
||||
def compute_ancestors_links(self, user):
|
||||
def compute_ancestors_links_paths_mapping(self):
|
||||
"""
|
||||
Compute the ancestors links for the current document up to the highest readable ancestor.
|
||||
"""
|
||||
@@ -712,73 +705,87 @@ class Document(MP_Node, BaseModel):
|
||||
.filter(ancestors_deleted_at__isnull=True)
|
||||
.order_by("path")
|
||||
)
|
||||
highest_readable = ancestors.readable_per_se(user).only("depth").first()
|
||||
|
||||
if highest_readable is None:
|
||||
return []
|
||||
|
||||
ancestors_links = []
|
||||
paths_links_mapping = {}
|
||||
for ancestor in ancestors.filter(depth__gte=highest_readable.depth):
|
||||
|
||||
for ancestor in ancestors:
|
||||
ancestors_links.append(
|
||||
{"link_reach": ancestor.link_reach, "link_role": ancestor.link_role}
|
||||
)
|
||||
paths_links_mapping[ancestor.path] = ancestors_links.copy()
|
||||
|
||||
ancestors_links = paths_links_mapping.get(self.path[: -self.steplen], [])
|
||||
return paths_links_mapping
|
||||
|
||||
return ancestors_links
|
||||
@property
|
||||
def ancestors_link_definition(self):
|
||||
"""Link defintion equivalent to all document's ancestors."""
|
||||
if getattr(self, "_ancestors_link_definition", None) is None:
|
||||
if self.depth <= 1:
|
||||
ancestors_links = []
|
||||
else:
|
||||
mapping = self.compute_ancestors_links_paths_mapping()
|
||||
ancestors_links = mapping.get(self.path[: -self.steplen], [])
|
||||
self._ancestors_link_definition = get_equivalent_link_definition(
|
||||
ancestors_links
|
||||
)
|
||||
|
||||
def get_abilities(self, user, ancestors_links=None):
|
||||
return self._ancestors_link_definition
|
||||
|
||||
@ancestors_link_definition.setter
|
||||
def ancestors_link_definition(self, definition):
|
||||
"""Cache the ancestors_link_definition."""
|
||||
self._ancestors_link_definition = definition
|
||||
|
||||
@property
|
||||
def ancestors_link_reach(self):
|
||||
"""Link reach equivalent to all document's ancestors."""
|
||||
return self.ancestors_link_definition["link_reach"]
|
||||
|
||||
@property
|
||||
def ancestors_link_role(self):
|
||||
"""Link role equivalent to all document's ancestors."""
|
||||
return self.ancestors_link_definition["link_role"]
|
||||
|
||||
def get_abilities(self, user):
|
||||
"""
|
||||
Compute and return abilities for a given user on the document.
|
||||
"""
|
||||
if self.depth <= 1 or getattr(self, "is_highest_ancestor_for_user", False):
|
||||
ancestors_links = []
|
||||
elif ancestors_links is None:
|
||||
ancestors_links = self.compute_ancestors_links(user=user)
|
||||
|
||||
roles = set(
|
||||
self.get_roles(user)
|
||||
) # at this point only roles based on specific access
|
||||
# First get the role based on specific access
|
||||
role = self.get_role(user)
|
||||
|
||||
# Characteristics that are based only on specific access
|
||||
is_owner = RoleChoices.OWNER in roles
|
||||
is_owner = role == RoleChoices.OWNER
|
||||
is_deleted = self.ancestors_deleted_at and not is_owner
|
||||
is_owner_or_admin = (is_owner or RoleChoices.ADMIN in roles) and not is_deleted
|
||||
is_owner_or_admin = (is_owner or role == RoleChoices.ADMIN) and not is_deleted
|
||||
|
||||
# Compute access roles before adding link roles because we don't
|
||||
# want anonymous users to access versions (we wouldn't know from
|
||||
# which date to allow them anyway)
|
||||
# Anonymous users should also not see document accesses
|
||||
has_access_role = bool(roles) and not is_deleted
|
||||
has_access_role = bool(role) and not is_deleted
|
||||
can_update_from_access = (
|
||||
is_owner_or_admin or RoleChoices.EDITOR in roles
|
||||
is_owner_or_admin or role == RoleChoices.EDITOR
|
||||
) and not is_deleted
|
||||
|
||||
# Add roles provided by the document link, taking into account its ancestors
|
||||
ancestors_links_definitions = self.get_ancestors_links_definitions(
|
||||
ancestors_links
|
||||
link_select_options = LinkReachChoices.get_select_options(
|
||||
**self.ancestors_link_definition
|
||||
)
|
||||
link_definition = get_equivalent_link_definition(
|
||||
[
|
||||
self.ancestors_link_definition,
|
||||
{"link_reach": self.link_reach, "link_role": self.link_role},
|
||||
]
|
||||
)
|
||||
|
||||
public_roles = ancestors_links_definitions.get(
|
||||
LinkReachChoices.PUBLIC, set()
|
||||
) | ({self.link_role} if self.link_reach == LinkReachChoices.PUBLIC else set())
|
||||
authenticated_roles = (
|
||||
ancestors_links_definitions.get(LinkReachChoices.AUTHENTICATED, set())
|
||||
| (
|
||||
{self.link_role}
|
||||
if self.link_reach == LinkReachChoices.AUTHENTICATED
|
||||
else set()
|
||||
)
|
||||
if user.is_authenticated
|
||||
else set()
|
||||
)
|
||||
roles = roles | public_roles | authenticated_roles
|
||||
link_reach = link_definition["link_reach"]
|
||||
if link_reach == LinkReachChoices.PUBLIC or (
|
||||
link_reach == LinkReachChoices.AUTHENTICATED and user.is_authenticated
|
||||
):
|
||||
role = RoleChoices.max(role, link_definition["link_role"])
|
||||
|
||||
can_get = bool(roles) and not is_deleted
|
||||
can_get = bool(role) and not is_deleted
|
||||
can_update = (
|
||||
is_owner_or_admin or RoleChoices.EDITOR in roles
|
||||
is_owner_or_admin or role == RoleChoices.EDITOR
|
||||
) and not is_deleted
|
||||
|
||||
ai_allow_reach_from = settings.AI_ALLOW_REACH_FROM
|
||||
@@ -816,12 +823,7 @@ class Document(MP_Node, BaseModel):
|
||||
"restore": is_owner,
|
||||
"retrieve": can_get,
|
||||
"media_auth": can_get,
|
||||
"ancestors_links_definitions": {
|
||||
k: list(v) for k, v in ancestors_links_definitions.items()
|
||||
},
|
||||
"link_select_options": LinkReachChoices.get_select_options(
|
||||
ancestors_links_definitions
|
||||
),
|
||||
"link_select_options": link_select_options,
|
||||
"tree": can_get,
|
||||
"update": can_update,
|
||||
"versions_destroy": is_owner_or_admin,
|
||||
@@ -1082,11 +1084,11 @@ class DocumentAccess(BaseAccess):
|
||||
"""
|
||||
Compute and return abilities for a given user on the document access.
|
||||
"""
|
||||
roles = self._get_roles(self.document, user)
|
||||
is_owner_or_admin = bool(set(roles).intersection(set(PRIVILEGED_ROLES)))
|
||||
role = self._get_role(self.document, user)
|
||||
is_owner_or_admin = role in PRIVILEGED_ROLES
|
||||
if self.role == RoleChoices.OWNER:
|
||||
can_delete = (
|
||||
RoleChoices.OWNER in roles
|
||||
role == RoleChoices.OWNER
|
||||
and self.document.accesses.filter(role=RoleChoices.OWNER).count() > 1
|
||||
)
|
||||
set_role_to = (
|
||||
@@ -1097,7 +1099,7 @@ class DocumentAccess(BaseAccess):
|
||||
else:
|
||||
can_delete = is_owner_or_admin
|
||||
set_role_to = []
|
||||
if RoleChoices.OWNER in roles:
|
||||
if role == RoleChoices.OWNER:
|
||||
set_role_to.append(RoleChoices.OWNER)
|
||||
if is_owner_or_admin:
|
||||
set_role_to.extend(
|
||||
|
||||
Reference in New Issue
Block a user