2024-01-09 15:30:36 +01:00
|
|
|
"""
|
2024-03-07 19:46:46 +01:00
|
|
|
Declare and configure the models for the impress core application
|
2024-01-09 15:30:36 +01:00
|
|
|
"""
|
2026-01-20 12:03:54 +01:00
|
|
|
|
2025-01-10 09:50:48 +01:00
|
|
|
# pylint: disable=too-many-lines
|
2024-08-20 16:42:27 +02:00
|
|
|
|
2024-04-06 09:09:46 +02:00
|
|
|
import hashlib
|
2024-09-25 12:43:02 +02:00
|
|
|
import smtplib
|
2024-01-09 15:30:36 +01:00
|
|
|
import uuid
|
2024-05-13 23:31:00 +02:00
|
|
|
from datetime import timedelta
|
|
|
|
|
from logging import getLogger
|
2024-01-09 15:30:36 +01:00
|
|
|
|
|
|
|
|
from django.conf import settings
|
|
|
|
|
from django.contrib.auth import models as auth_models
|
|
|
|
|
from django.contrib.auth.base_user import AbstractBaseUser
|
2025-01-20 10:23:18 +01:00
|
|
|
from django.contrib.postgres.fields import ArrayField
|
2024-09-25 12:43:02 +02:00
|
|
|
from django.contrib.sites.models import Site
|
2025-01-02 17:20:09 +01:00
|
|
|
from django.core.cache import cache
|
2024-04-06 09:09:46 +02:00
|
|
|
from django.core.files.base import ContentFile
|
|
|
|
|
from django.core.files.storage import default_storage
|
2024-09-25 12:43:02 +02:00
|
|
|
from django.core.mail import send_mail
|
2025-01-02 17:20:09 +01:00
|
|
|
from django.db import models, transaction
|
2024-12-17 07:47:23 +01:00
|
|
|
from django.db.models.functions import Left, Length
|
2024-09-25 12:43:02 +02:00
|
|
|
from django.template.loader import render_to_string
|
2025-01-06 16:20:57 +01:00
|
|
|
from django.utils import timezone
|
2025-01-27 23:27:23 +01:00
|
|
|
from django.utils.functional import cached_property
|
2024-12-01 11:25:01 +01:00
|
|
|
from django.utils.translation import get_language, override
|
2024-02-09 19:32:12 +01:00
|
|
|
from django.utils.translation import gettext_lazy as _
|
2024-01-09 15:30:36 +01:00
|
|
|
|
2024-04-08 23:37:15 +02:00
|
|
|
from botocore.exceptions import ClientError
|
2025-01-02 17:20:09 +01:00
|
|
|
from rest_framework.exceptions import ValidationError
|
2024-01-09 15:30:36 +01:00
|
|
|
from timezone_field import TimeZoneField
|
2025-02-16 17:26:51 +01:00
|
|
|
from treebeard.mp_tree import MP_Node, MP_NodeManager, MP_NodeQuerySet
|
2024-01-09 15:30:36 +01:00
|
|
|
|
✨(backend) manage reconciliation requests for user accounts (#1878)
For now, the reconciliation requests are imported through CSV in the
Django admin, which sends confirmation email to both addresses. When
both are checked, the actual reconciliation is processed, and all
user-related content is updated.
## Purpose
Fix #1616 // Replaces #1708
For now, the reconciliation requests are imported through CSV in the
Django admin, which sends confirmation email to both addresses. When
both are checked, the actual reconciliation is processed, and all
user-related content is updated.
## Proposal
- [x] New `UserReconciliationCsvImport` model to manage the import of
reconciliation requests through a task
(`user_reconciliation_csv_import_job`)
- [x] New `UserReconciliation` model to store the user reconciliation
requests themselves (a row = a `active_user`/`inactive_user` pair)
- [x] On save, a confirmation email is sent to the users
- [x] A `process_reconciliation` admin action process the action on the
requested entries, if both emails have been checked.
- [x] Bulk update the `DocumentAccess` items, while managing the case
where both users have access to the document (keeping the higher role)
- [x] Bulk update the `LinkTrace` items, while managing the case where
both users have link traces to the document
- [x] Bulk update the `DocumentFavorite` items, while managing the case
where both users have put the document in their favorites
- [x] Bulk update the comment system items (`Thread`, `Comment` and
`Reaction` items)
- [x] Bulk update the `is_active` status on both users
- [x] New `USER_RECONCILIATION_FORM_URL` env variable for the "make a
new request" URL in an email.
- [x] Write unit tests
- [x] Remove the unused `email_user()` method on `User`, replaced with
`send_email()` similar to the one on the `Document` model
## Demo page reconciliation success
<img width="1149" height="746" alt="image"
src="https://github.com/user-attachments/assets/09ba2b38-7af3-41fa-a64f-ce3c4fd8548d"
/>
---------
Co-authored-by: Anthony LC <anthony.le-courric@mail.numerique.gouv.fr>
2026-02-11 19:09:20 +01:00
|
|
|
from core.choices import (
|
2025-04-25 08:03:12 +02:00
|
|
|
PRIVILEGED_ROLES,
|
|
|
|
|
LinkReachChoices,
|
|
|
|
|
LinkRoleChoices,
|
|
|
|
|
RoleChoices,
|
|
|
|
|
get_equivalent_link_definition,
|
|
|
|
|
)
|
✨(backend) manage reconciliation requests for user accounts (#1878)
For now, the reconciliation requests are imported through CSV in the
Django admin, which sends confirmation email to both addresses. When
both are checked, the actual reconciliation is processed, and all
user-related content is updated.
## Purpose
Fix #1616 // Replaces #1708
For now, the reconciliation requests are imported through CSV in the
Django admin, which sends confirmation email to both addresses. When
both are checked, the actual reconciliation is processed, and all
user-related content is updated.
## Proposal
- [x] New `UserReconciliationCsvImport` model to manage the import of
reconciliation requests through a task
(`user_reconciliation_csv_import_job`)
- [x] New `UserReconciliation` model to store the user reconciliation
requests themselves (a row = a `active_user`/`inactive_user` pair)
- [x] On save, a confirmation email is sent to the users
- [x] A `process_reconciliation` admin action process the action on the
requested entries, if both emails have been checked.
- [x] Bulk update the `DocumentAccess` items, while managing the case
where both users have access to the document (keeping the higher role)
- [x] Bulk update the `LinkTrace` items, while managing the case where
both users have link traces to the document
- [x] Bulk update the `DocumentFavorite` items, while managing the case
where both users have put the document in their favorites
- [x] Bulk update the comment system items (`Thread`, `Comment` and
`Reaction` items)
- [x] Bulk update the `is_active` status on both users
- [x] New `USER_RECONCILIATION_FORM_URL` env variable for the "make a
new request" URL in an email.
- [x] Write unit tests
- [x] Remove the unused `email_user()` method on `User`, replaced with
`send_email()` similar to the one on the `Document` model
## Demo page reconciliation success
<img width="1149" height="746" alt="image"
src="https://github.com/user-attachments/assets/09ba2b38-7af3-41fa-a64f-ce3c4fd8548d"
/>
---------
Co-authored-by: Anthony LC <anthony.le-courric@mail.numerique.gouv.fr>
2026-02-11 19:09:20 +01:00
|
|
|
from core.validators import sub_validator
|
2025-04-23 22:47:24 +02:00
|
|
|
|
2024-05-13 23:31:00 +02:00
|
|
|
logger = getLogger(__name__)
|
|
|
|
|
|
2024-01-09 15:30:36 +01:00
|
|
|
|
2025-01-02 17:20:09 +01:00
|
|
|
def get_trashbin_cutoff():
|
|
|
|
|
"""
|
|
|
|
|
Calculate the cutoff datetime for soft-deleted items based on the retention policy.
|
|
|
|
|
|
|
|
|
|
The function returns the current datetime minus the number of days specified in
|
|
|
|
|
the TRASHBIN_CUTOFF_DAYS setting, indicating the oldest date for items that can
|
|
|
|
|
remain in the trash bin.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
datetime: The cutoff datetime for soft-deleted items.
|
|
|
|
|
"""
|
|
|
|
|
return timezone.now() - timedelta(days=settings.TRASHBIN_CUTOFF_DAYS)
|
|
|
|
|
|
|
|
|
|
|
2025-01-10 09:50:48 +01:00
|
|
|
class DuplicateEmailError(Exception):
|
|
|
|
|
"""Raised when an email is already associated with a pre-existing user."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, message=None, email=None):
|
|
|
|
|
"""Set message and email to describe the exception."""
|
|
|
|
|
self.message = message
|
|
|
|
|
self.email = email
|
|
|
|
|
super().__init__(self.message)
|
|
|
|
|
|
|
|
|
|
|
2024-01-09 15:30:36 +01:00
|
|
|
class BaseModel(models.Model):
|
|
|
|
|
"""
|
|
|
|
|
Serves as an abstract base model for other models, ensuring that records are validated
|
|
|
|
|
before saving as Django doesn't do it by default.
|
|
|
|
|
|
|
|
|
|
Includes fields common to all models: a UUID primary key and creation/update timestamps.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
id = models.UUIDField(
|
|
|
|
|
verbose_name=_("id"),
|
|
|
|
|
help_text=_("primary key for the record as UUID"),
|
|
|
|
|
primary_key=True,
|
|
|
|
|
default=uuid.uuid4,
|
|
|
|
|
editable=False,
|
|
|
|
|
)
|
|
|
|
|
created_at = models.DateTimeField(
|
|
|
|
|
verbose_name=_("created on"),
|
|
|
|
|
help_text=_("date and time at which a record was created"),
|
|
|
|
|
auto_now_add=True,
|
|
|
|
|
editable=False,
|
|
|
|
|
)
|
|
|
|
|
updated_at = models.DateTimeField(
|
|
|
|
|
verbose_name=_("updated on"),
|
|
|
|
|
help_text=_("date and time at which a record was last updated"),
|
|
|
|
|
auto_now=True,
|
|
|
|
|
editable=False,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
abstract = True
|
|
|
|
|
|
|
|
|
|
def save(self, *args, **kwargs):
|
|
|
|
|
"""Call `full_clean` before saving."""
|
|
|
|
|
self.full_clean()
|
|
|
|
|
super().save(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
2025-01-10 09:50:48 +01:00
|
|
|
class UserManager(auth_models.UserManager):
|
|
|
|
|
"""Custom manager for User model with additional methods."""
|
|
|
|
|
|
|
|
|
|
def get_user_by_sub_or_email(self, sub, email):
|
|
|
|
|
"""Fetch existing user by sub or email."""
|
|
|
|
|
try:
|
|
|
|
|
return self.get(sub=sub)
|
|
|
|
|
except self.model.DoesNotExist as err:
|
|
|
|
|
if not email:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
if settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION:
|
|
|
|
|
try:
|
2026-02-11 19:48:51 +01:00
|
|
|
return self.get(email__iexact=email)
|
2025-01-10 09:50:48 +01:00
|
|
|
except self.model.DoesNotExist:
|
|
|
|
|
pass
|
|
|
|
|
elif (
|
2026-02-11 19:48:51 +01:00
|
|
|
self.filter(email__iexact=email).exists()
|
2025-01-10 09:50:48 +01:00
|
|
|
and not settings.OIDC_ALLOW_DUPLICATE_EMAILS
|
|
|
|
|
):
|
|
|
|
|
raise DuplicateEmailError(
|
|
|
|
|
_(
|
|
|
|
|
"We couldn't find a user with this sub but the email is already "
|
|
|
|
|
"associated with a registered user."
|
|
|
|
|
)
|
|
|
|
|
) from err
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
2024-01-09 15:30:36 +01:00
|
|
|
class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
|
|
|
|
|
"""User model to work with OIDC only authentication."""
|
|
|
|
|
|
2024-02-09 19:32:12 +01:00
|
|
|
sub = models.CharField(
|
|
|
|
|
_("sub"),
|
2025-08-29 15:59:06 +02:00
|
|
|
help_text=_("Required. 255 characters or fewer. ASCII characters only."),
|
2024-02-09 19:32:12 +01:00
|
|
|
max_length=255,
|
|
|
|
|
validators=[sub_validator],
|
2025-08-29 15:59:06 +02:00
|
|
|
unique=True,
|
2024-02-09 19:32:12 +01:00
|
|
|
blank=True,
|
|
|
|
|
null=True,
|
|
|
|
|
)
|
2024-09-30 15:13:42 +02:00
|
|
|
|
|
|
|
|
full_name = models.CharField(_("full name"), max_length=100, null=True, blank=True)
|
2025-10-22 11:44:39 +02:00
|
|
|
short_name = models.CharField(
|
|
|
|
|
_("short name"), max_length=100, null=True, blank=True
|
|
|
|
|
)
|
2024-09-30 15:13:42 +02:00
|
|
|
|
2024-02-09 19:32:12 +01:00
|
|
|
email = models.EmailField(_("identity email address"), blank=True, null=True)
|
|
|
|
|
|
|
|
|
|
# Unlike the "email" field which stores the email coming from the OIDC token, this field
|
|
|
|
|
# stores the email used by staff users to login to the admin site
|
|
|
|
|
admin_email = models.EmailField(
|
|
|
|
|
_("admin email address"), unique=True, blank=True, null=True
|
|
|
|
|
)
|
|
|
|
|
|
2024-01-09 15:30:36 +01:00
|
|
|
language = models.CharField(
|
|
|
|
|
max_length=10,
|
2025-01-27 23:27:23 +01:00
|
|
|
choices=settings.LANGUAGES,
|
2025-02-25 15:08:59 +01:00
|
|
|
default=None,
|
2024-01-09 15:30:36 +01:00
|
|
|
verbose_name=_("language"),
|
|
|
|
|
help_text=_("The language in which the user wants to see the interface."),
|
2025-02-25 15:08:59 +01:00
|
|
|
null=True,
|
|
|
|
|
blank=True,
|
2024-01-09 15:30:36 +01:00
|
|
|
)
|
|
|
|
|
timezone = TimeZoneField(
|
|
|
|
|
choices_display="WITH_GMT_OFFSET",
|
|
|
|
|
use_pytz=False,
|
|
|
|
|
default=settings.TIME_ZONE,
|
|
|
|
|
help_text=_("The timezone in which the user wants to see times."),
|
|
|
|
|
)
|
|
|
|
|
is_device = models.BooleanField(
|
|
|
|
|
_("device"),
|
|
|
|
|
default=False,
|
|
|
|
|
help_text=_("Whether the user is a device or a real user."),
|
|
|
|
|
)
|
|
|
|
|
is_staff = models.BooleanField(
|
|
|
|
|
_("staff status"),
|
|
|
|
|
default=False,
|
|
|
|
|
help_text=_("Whether the user can log into this admin site."),
|
|
|
|
|
)
|
|
|
|
|
is_active = models.BooleanField(
|
|
|
|
|
_("active"),
|
|
|
|
|
default=True,
|
|
|
|
|
help_text=_(
|
|
|
|
|
"Whether this user should be treated as active. "
|
|
|
|
|
"Unselect this instead of deleting accounts."
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
2025-01-10 09:50:48 +01:00
|
|
|
objects = UserManager()
|
2024-01-09 15:30:36 +01:00
|
|
|
|
2024-02-09 19:32:12 +01:00
|
|
|
USERNAME_FIELD = "admin_email"
|
2024-01-09 15:30:36 +01:00
|
|
|
REQUIRED_FIELDS = []
|
|
|
|
|
|
|
|
|
|
class Meta:
|
2024-03-07 19:46:46 +01:00
|
|
|
db_table = "impress_user"
|
2024-01-09 15:30:36 +01:00
|
|
|
verbose_name = _("user")
|
|
|
|
|
verbose_name_plural = _("users")
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
2024-02-09 19:32:12 +01:00
|
|
|
return self.email or self.admin_email or str(self.id)
|
2024-01-09 15:30:36 +01:00
|
|
|
|
2024-05-13 23:31:00 +02:00
|
|
|
def save(self, *args, **kwargs):
|
|
|
|
|
"""
|
|
|
|
|
If it's a new user, give its user access to the documents to which s.he was invited.
|
|
|
|
|
"""
|
|
|
|
|
is_adding = self._state.adding
|
|
|
|
|
super().save(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
if is_adding:
|
|
|
|
|
self._convert_valid_invitations()
|
|
|
|
|
|
|
|
|
|
def _convert_valid_invitations(self):
|
|
|
|
|
"""
|
|
|
|
|
Convert valid invitations to document accesses.
|
|
|
|
|
Expired invitations are ignored.
|
|
|
|
|
"""
|
|
|
|
|
valid_invitations = Invitation.objects.filter(
|
2025-10-07 11:41:45 +02:00
|
|
|
email__iexact=self.email,
|
2024-05-13 23:31:00 +02:00
|
|
|
created_at__gte=(
|
|
|
|
|
timezone.now()
|
|
|
|
|
- timedelta(seconds=settings.INVITATION_VALIDITY_DURATION)
|
|
|
|
|
),
|
|
|
|
|
).select_related("document")
|
|
|
|
|
|
|
|
|
|
if not valid_invitations.exists():
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
DocumentAccess.objects.bulk_create(
|
|
|
|
|
[
|
|
|
|
|
DocumentAccess(
|
|
|
|
|
user=self, document=invitation.document, role=invitation.role
|
|
|
|
|
)
|
|
|
|
|
for invitation in valid_invitations
|
|
|
|
|
]
|
|
|
|
|
)
|
2024-12-01 11:25:01 +01:00
|
|
|
|
|
|
|
|
# Set creator of documents if not yet set (e.g. documents created via server-to-server API)
|
|
|
|
|
document_ids = [invitation.document_id for invitation in valid_invitations]
|
|
|
|
|
Document.objects.filter(id__in=document_ids, creator__isnull=True).update(
|
|
|
|
|
creator=self
|
|
|
|
|
)
|
|
|
|
|
|
2024-05-13 23:31:00 +02:00
|
|
|
valid_invitations.delete()
|
|
|
|
|
|
✨(backend) manage reconciliation requests for user accounts (#1878)
For now, the reconciliation requests are imported through CSV in the
Django admin, which sends confirmation email to both addresses. When
both are checked, the actual reconciliation is processed, and all
user-related content is updated.
## Purpose
Fix #1616 // Replaces #1708
For now, the reconciliation requests are imported through CSV in the
Django admin, which sends confirmation email to both addresses. When
both are checked, the actual reconciliation is processed, and all
user-related content is updated.
## Proposal
- [x] New `UserReconciliationCsvImport` model to manage the import of
reconciliation requests through a task
(`user_reconciliation_csv_import_job`)
- [x] New `UserReconciliation` model to store the user reconciliation
requests themselves (a row = a `active_user`/`inactive_user` pair)
- [x] On save, a confirmation email is sent to the users
- [x] A `process_reconciliation` admin action process the action on the
requested entries, if both emails have been checked.
- [x] Bulk update the `DocumentAccess` items, while managing the case
where both users have access to the document (keeping the higher role)
- [x] Bulk update the `LinkTrace` items, while managing the case where
both users have link traces to the document
- [x] Bulk update the `DocumentFavorite` items, while managing the case
where both users have put the document in their favorites
- [x] Bulk update the comment system items (`Thread`, `Comment` and
`Reaction` items)
- [x] Bulk update the `is_active` status on both users
- [x] New `USER_RECONCILIATION_FORM_URL` env variable for the "make a
new request" URL in an email.
- [x] Write unit tests
- [x] Remove the unused `email_user()` method on `User`, replaced with
`send_email()` similar to the one on the `Document` model
## Demo page reconciliation success
<img width="1149" height="746" alt="image"
src="https://github.com/user-attachments/assets/09ba2b38-7af3-41fa-a64f-ce3c4fd8548d"
/>
---------
Co-authored-by: Anthony LC <anthony.le-courric@mail.numerique.gouv.fr>
2026-02-11 19:09:20 +01:00
|
|
|
def send_email(self, subject, context=None, language=None):
|
|
|
|
|
"""Generate and send email to the user from a template."""
|
|
|
|
|
emails = [self.email]
|
|
|
|
|
context = context or {}
|
|
|
|
|
domain = settings.EMAIL_URL_APP or Site.objects.get_current().domain
|
|
|
|
|
|
|
|
|
|
language = language or get_language()
|
|
|
|
|
context.update(
|
|
|
|
|
{
|
|
|
|
|
"brandname": settings.EMAIL_BRAND_NAME,
|
|
|
|
|
"domain": domain,
|
|
|
|
|
"logo_img": settings.EMAIL_LOGO_IMG,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with override(language):
|
|
|
|
|
msg_html = render_to_string("mail/html/template.html", context)
|
|
|
|
|
msg_plain = render_to_string("mail/text/template.txt", context)
|
|
|
|
|
subject = str(subject) # Force translation
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
send_mail(
|
|
|
|
|
subject.capitalize(),
|
|
|
|
|
msg_plain,
|
|
|
|
|
settings.EMAIL_FROM,
|
|
|
|
|
emails,
|
|
|
|
|
html_message=msg_html,
|
|
|
|
|
fail_silently=False,
|
|
|
|
|
)
|
|
|
|
|
except smtplib.SMTPException as exception:
|
|
|
|
|
logger.error("invitation to %s was not sent: %s", emails, exception)
|
2024-01-09 15:30:36 +01:00
|
|
|
|
2024-09-06 16:12:02 +02:00
|
|
|
@cached_property
|
|
|
|
|
def teams(self):
|
2024-03-03 08:49:27 +01:00
|
|
|
"""
|
|
|
|
|
Get list of teams in which the user is, as a list of strings.
|
|
|
|
|
Must be cached if retrieved remotely.
|
|
|
|
|
"""
|
|
|
|
|
return []
|
2024-01-09 15:30:36 +01:00
|
|
|
|
|
|
|
|
|
✨(backend) manage reconciliation requests for user accounts (#1878)
For now, the reconciliation requests are imported through CSV in the
Django admin, which sends confirmation email to both addresses. When
both are checked, the actual reconciliation is processed, and all
user-related content is updated.
## Purpose
Fix #1616 // Replaces #1708
For now, the reconciliation requests are imported through CSV in the
Django admin, which sends confirmation email to both addresses. When
both are checked, the actual reconciliation is processed, and all
user-related content is updated.
## Proposal
- [x] New `UserReconciliationCsvImport` model to manage the import of
reconciliation requests through a task
(`user_reconciliation_csv_import_job`)
- [x] New `UserReconciliation` model to store the user reconciliation
requests themselves (a row = a `active_user`/`inactive_user` pair)
- [x] On save, a confirmation email is sent to the users
- [x] A `process_reconciliation` admin action process the action on the
requested entries, if both emails have been checked.
- [x] Bulk update the `DocumentAccess` items, while managing the case
where both users have access to the document (keeping the higher role)
- [x] Bulk update the `LinkTrace` items, while managing the case where
both users have link traces to the document
- [x] Bulk update the `DocumentFavorite` items, while managing the case
where both users have put the document in their favorites
- [x] Bulk update the comment system items (`Thread`, `Comment` and
`Reaction` items)
- [x] Bulk update the `is_active` status on both users
- [x] New `USER_RECONCILIATION_FORM_URL` env variable for the "make a
new request" URL in an email.
- [x] Write unit tests
- [x] Remove the unused `email_user()` method on `User`, replaced with
`send_email()` similar to the one on the `Document` model
## Demo page reconciliation success
<img width="1149" height="746" alt="image"
src="https://github.com/user-attachments/assets/09ba2b38-7af3-41fa-a64f-ce3c4fd8548d"
/>
---------
Co-authored-by: Anthony LC <anthony.le-courric@mail.numerique.gouv.fr>
2026-02-11 19:09:20 +01:00
|
|
|
class UserReconciliation(BaseModel):
|
|
|
|
|
"""Model to run batch jobs to replace an active user by another one"""
|
|
|
|
|
|
|
|
|
|
active_email = models.EmailField(_("Active email address"))
|
|
|
|
|
inactive_email = models.EmailField(_("Email address to deactivate"))
|
|
|
|
|
active_email_checked = models.BooleanField(default=False)
|
|
|
|
|
inactive_email_checked = models.BooleanField(default=False)
|
|
|
|
|
active_user = models.ForeignKey(
|
|
|
|
|
User,
|
|
|
|
|
on_delete=models.CASCADE,
|
|
|
|
|
null=True,
|
|
|
|
|
blank=True,
|
|
|
|
|
related_name="active_user",
|
|
|
|
|
)
|
|
|
|
|
inactive_user = models.ForeignKey(
|
|
|
|
|
User,
|
|
|
|
|
on_delete=models.CASCADE,
|
|
|
|
|
null=True,
|
|
|
|
|
blank=True,
|
|
|
|
|
related_name="inactive_user",
|
|
|
|
|
)
|
|
|
|
|
active_email_confirmation_id = models.UUIDField(
|
|
|
|
|
default=uuid.uuid4, unique=True, editable=False, null=True
|
|
|
|
|
)
|
|
|
|
|
inactive_email_confirmation_id = models.UUIDField(
|
|
|
|
|
default=uuid.uuid4, unique=True, editable=False, null=True
|
|
|
|
|
)
|
|
|
|
|
source_unique_id = models.CharField(
|
|
|
|
|
max_length=100,
|
|
|
|
|
blank=True,
|
|
|
|
|
null=True,
|
|
|
|
|
verbose_name=_("Unique ID in the source file"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
status = models.CharField(
|
|
|
|
|
max_length=20,
|
|
|
|
|
choices=[
|
|
|
|
|
("pending", _("Pending")),
|
|
|
|
|
("ready", _("Ready")),
|
|
|
|
|
("done", _("Done")),
|
|
|
|
|
("error", _("Error")),
|
|
|
|
|
],
|
|
|
|
|
default="pending",
|
|
|
|
|
)
|
|
|
|
|
logs = models.TextField(blank=True)
|
|
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
db_table = "impress_user_reconciliation"
|
|
|
|
|
verbose_name = _("user reconciliation")
|
|
|
|
|
verbose_name_plural = _("user reconciliations")
|
|
|
|
|
ordering = ["-created_at"]
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return f"Reconciliation from {self.inactive_email} to {self.active_email}"
|
|
|
|
|
|
|
|
|
|
def save(self, *args, **kwargs):
|
|
|
|
|
"""
|
|
|
|
|
For pending queries, identify the actual users and send validation emails
|
|
|
|
|
"""
|
|
|
|
|
if self.status == "pending":
|
|
|
|
|
self.active_user = User.objects.filter(email=self.active_email).first()
|
|
|
|
|
self.inactive_user = User.objects.filter(email=self.inactive_email).first()
|
|
|
|
|
|
|
|
|
|
if self.active_user and self.inactive_user:
|
|
|
|
|
if not self.active_email_checked:
|
|
|
|
|
self.send_reconciliation_confirm_email(
|
|
|
|
|
self.active_user, "active", self.active_email_confirmation_id
|
|
|
|
|
)
|
|
|
|
|
if not self.inactive_email_checked:
|
|
|
|
|
self.send_reconciliation_confirm_email(
|
|
|
|
|
self.inactive_user,
|
|
|
|
|
"inactive",
|
|
|
|
|
self.inactive_email_confirmation_id,
|
|
|
|
|
)
|
|
|
|
|
self.status = "ready"
|
|
|
|
|
else:
|
|
|
|
|
self.status = "error"
|
|
|
|
|
self.logs = "Error: Both active and inactive users need to exist."
|
|
|
|
|
|
|
|
|
|
super().save(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
@transaction.atomic
|
|
|
|
|
def process_reconciliation_request(self):
|
|
|
|
|
"""
|
|
|
|
|
Process the reconciliation request as a transaction.
|
|
|
|
|
|
|
|
|
|
- Transfer document accesses from inactive to active user, updating roles as needed.
|
|
|
|
|
- Transfer document favorites from inactive to active user.
|
|
|
|
|
- Transfer link traces from inactive to active user.
|
|
|
|
|
- Transfer comment-related content from inactive to active user
|
|
|
|
|
(threads, comments and reactions)
|
|
|
|
|
- Activate the active user and deactivate the inactive user.
|
|
|
|
|
- Update the reconciliation entry itself.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# Prepare the data to perform the reconciliation on
|
|
|
|
|
updated_accesses, removed_accesses = (
|
|
|
|
|
self.prepare_documentaccess_reconciliation()
|
|
|
|
|
)
|
|
|
|
|
updated_linktraces, removed_linktraces = self.prepare_linktrace_reconciliation()
|
|
|
|
|
update_favorites, removed_favorites = (
|
|
|
|
|
self.prepare_document_favorite_reconciliation()
|
|
|
|
|
)
|
|
|
|
|
updated_threads = self.prepare_thread_reconciliation()
|
|
|
|
|
updated_comments = self.prepare_comment_reconciliation()
|
|
|
|
|
updated_reactions, removed_reactions = self.prepare_reaction_reconciliation()
|
|
|
|
|
|
|
|
|
|
self.active_user.is_active = True
|
|
|
|
|
self.inactive_user.is_active = False
|
|
|
|
|
|
|
|
|
|
# Actually perform the bulk operations
|
|
|
|
|
DocumentAccess.objects.bulk_update(updated_accesses, ["user", "role"])
|
|
|
|
|
|
|
|
|
|
if removed_accesses:
|
|
|
|
|
ids_to_delete = [entry.id for entry in removed_accesses]
|
|
|
|
|
DocumentAccess.objects.filter(id__in=ids_to_delete).delete()
|
|
|
|
|
|
|
|
|
|
DocumentFavorite.objects.bulk_update(update_favorites, ["user"])
|
|
|
|
|
if removed_favorites:
|
|
|
|
|
ids_to_delete = [entry.id for entry in removed_favorites]
|
|
|
|
|
DocumentFavorite.objects.filter(id__in=ids_to_delete).delete()
|
|
|
|
|
|
|
|
|
|
LinkTrace.objects.bulk_update(updated_linktraces, ["user"])
|
|
|
|
|
if removed_linktraces:
|
|
|
|
|
ids_to_delete = [entry.id for entry in removed_linktraces]
|
|
|
|
|
LinkTrace.objects.filter(id__in=ids_to_delete).delete()
|
|
|
|
|
|
|
|
|
|
Thread.objects.bulk_update(updated_threads, ["creator"])
|
|
|
|
|
Comment.objects.bulk_update(updated_comments, ["user"])
|
|
|
|
|
|
|
|
|
|
# pylint: disable=C0103
|
|
|
|
|
ReactionThroughModel = Reaction.users.through
|
|
|
|
|
reactions_to_create = []
|
|
|
|
|
for updated_reaction in updated_reactions:
|
|
|
|
|
reactions_to_create.append(
|
|
|
|
|
ReactionThroughModel(
|
|
|
|
|
user_id=self.active_user.pk, reaction_id=updated_reaction.pk
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if reactions_to_create:
|
|
|
|
|
ReactionThroughModel.objects.bulk_create(reactions_to_create)
|
|
|
|
|
|
|
|
|
|
if removed_reactions:
|
|
|
|
|
ids_to_delete = [entry.id for entry in removed_reactions]
|
|
|
|
|
ReactionThroughModel.objects.filter(
|
|
|
|
|
reaction_id__in=ids_to_delete, user_id=self.inactive_user.pk
|
|
|
|
|
).delete()
|
|
|
|
|
|
|
|
|
|
User.objects.bulk_update([self.active_user, self.inactive_user], ["is_active"])
|
|
|
|
|
|
|
|
|
|
# Wrap up the reconciliation entry
|
|
|
|
|
self.logs += f"""Requested update for {len(updated_accesses)} DocumentAccess items
|
|
|
|
|
and deletion for {len(removed_accesses)} DocumentAccess items.\n"""
|
|
|
|
|
self.status = "done"
|
|
|
|
|
self.save()
|
|
|
|
|
|
|
|
|
|
self.send_reconciliation_done_email()
|
|
|
|
|
|
|
|
|
|
def prepare_documentaccess_reconciliation(self):
|
|
|
|
|
"""
|
|
|
|
|
Prepare the reconciliation by transferring document accesses from the inactive user
|
|
|
|
|
to the active user.
|
|
|
|
|
"""
|
|
|
|
|
updated_accesses = []
|
|
|
|
|
removed_accesses = []
|
|
|
|
|
inactive_accesses = DocumentAccess.objects.filter(user=self.inactive_user)
|
|
|
|
|
|
|
|
|
|
# Check documents where the active user already has access
|
|
|
|
|
inactive_accesses_documents = inactive_accesses.values_list(
|
|
|
|
|
"document", flat=True
|
|
|
|
|
)
|
|
|
|
|
existing_accesses = DocumentAccess.objects.filter(user=self.active_user).filter(
|
|
|
|
|
document__in=inactive_accesses_documents
|
|
|
|
|
)
|
|
|
|
|
existing_roles_per_doc = dict(existing_accesses.values_list("document", "role"))
|
|
|
|
|
|
|
|
|
|
for entry in inactive_accesses:
|
|
|
|
|
if entry.document_id in existing_roles_per_doc:
|
|
|
|
|
# Update role if needed
|
|
|
|
|
existing_role = existing_roles_per_doc[entry.document_id]
|
|
|
|
|
max_role = RoleChoices.max(entry.role, existing_role)
|
|
|
|
|
if existing_role != max_role:
|
|
|
|
|
existing_access = existing_accesses.get(document=entry.document)
|
|
|
|
|
existing_access.role = max_role
|
|
|
|
|
updated_accesses.append(existing_access)
|
|
|
|
|
removed_accesses.append(entry)
|
|
|
|
|
else:
|
|
|
|
|
entry.user = self.active_user
|
|
|
|
|
updated_accesses.append(entry)
|
|
|
|
|
|
|
|
|
|
return updated_accesses, removed_accesses
|
|
|
|
|
|
|
|
|
|
def prepare_document_favorite_reconciliation(self):
|
|
|
|
|
"""
|
|
|
|
|
Prepare the reconciliation by transferring document favorites from the inactive user
|
|
|
|
|
to the active user.
|
|
|
|
|
"""
|
|
|
|
|
updated_favorites = []
|
|
|
|
|
removed_favorites = []
|
|
|
|
|
|
|
|
|
|
existing_favorites = DocumentFavorite.objects.filter(user=self.active_user)
|
|
|
|
|
existing_favorite_doc_ids = set(
|
|
|
|
|
existing_favorites.values_list("document_id", flat=True)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
inactive_favorites = DocumentFavorite.objects.filter(user=self.inactive_user)
|
|
|
|
|
|
|
|
|
|
for entry in inactive_favorites:
|
|
|
|
|
if entry.document_id in existing_favorite_doc_ids:
|
|
|
|
|
removed_favorites.append(entry)
|
|
|
|
|
else:
|
|
|
|
|
entry.user = self.active_user
|
|
|
|
|
updated_favorites.append(entry)
|
|
|
|
|
|
|
|
|
|
return updated_favorites, removed_favorites
|
|
|
|
|
|
|
|
|
|
def prepare_linktrace_reconciliation(self):
|
|
|
|
|
"""
|
|
|
|
|
Prepare the reconciliation by transferring link traces from the inactive user
|
|
|
|
|
to the active user.
|
|
|
|
|
"""
|
|
|
|
|
updated_linktraces = []
|
|
|
|
|
removed_linktraces = []
|
|
|
|
|
|
|
|
|
|
existing_linktraces = LinkTrace.objects.filter(user=self.active_user)
|
|
|
|
|
inactive_linktraces = LinkTrace.objects.filter(user=self.inactive_user)
|
|
|
|
|
|
|
|
|
|
for entry in inactive_linktraces:
|
|
|
|
|
if existing_linktraces.filter(document=entry.document).exists():
|
|
|
|
|
removed_linktraces.append(entry)
|
|
|
|
|
else:
|
|
|
|
|
entry.user = self.active_user
|
|
|
|
|
updated_linktraces.append(entry)
|
|
|
|
|
|
|
|
|
|
return updated_linktraces, removed_linktraces
|
|
|
|
|
|
|
|
|
|
def prepare_thread_reconciliation(self):
|
|
|
|
|
"""
|
|
|
|
|
Prepare the reconciliation by transferring threads from the inactive user
|
|
|
|
|
to the active user.
|
|
|
|
|
"""
|
|
|
|
|
updated_threads = []
|
|
|
|
|
|
|
|
|
|
inactive_threads = Thread.objects.filter(creator=self.inactive_user)
|
|
|
|
|
|
|
|
|
|
for entry in inactive_threads:
|
|
|
|
|
entry.creator = self.active_user
|
|
|
|
|
updated_threads.append(entry)
|
|
|
|
|
|
|
|
|
|
return updated_threads
|
|
|
|
|
|
|
|
|
|
def prepare_comment_reconciliation(self):
|
|
|
|
|
"""
|
|
|
|
|
Prepare the reconciliation by transferring comments from the inactive user
|
|
|
|
|
to the active user.
|
|
|
|
|
"""
|
|
|
|
|
updated_comments = []
|
|
|
|
|
|
|
|
|
|
inactive_comments = Comment.objects.filter(user=self.inactive_user)
|
|
|
|
|
|
|
|
|
|
for entry in inactive_comments:
|
|
|
|
|
entry.user = self.active_user
|
|
|
|
|
updated_comments.append(entry)
|
|
|
|
|
|
|
|
|
|
return updated_comments
|
|
|
|
|
|
|
|
|
|
def prepare_reaction_reconciliation(self):
|
|
|
|
|
"""
|
|
|
|
|
Prepare the reconciliation by creating missing reactions for the active user
|
|
|
|
|
(ie, the ones that exist for the inactive user but not the active user)
|
|
|
|
|
and then deleting all reactions of the inactive user.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
inactive_reactions = Reaction.objects.filter(users=self.inactive_user)
|
|
|
|
|
updated_reactions = inactive_reactions.exclude(users=self.active_user)
|
|
|
|
|
|
|
|
|
|
return updated_reactions, inactive_reactions
|
|
|
|
|
|
|
|
|
|
def send_reconciliation_confirm_email(
|
|
|
|
|
self, user, user_type, confirmation_id, language=None
|
|
|
|
|
):
|
|
|
|
|
"""Method allowing to send confirmation email for reconciliation requests."""
|
|
|
|
|
language = language or get_language()
|
|
|
|
|
domain = settings.EMAIL_URL_APP or Site.objects.get_current().domain
|
|
|
|
|
|
|
|
|
|
message = _(
|
|
|
|
|
"""You have requested a reconciliation of your user accounts on Docs.
|
|
|
|
|
To confirm that you are the one who initiated the request
|
|
|
|
|
and that this email belongs to you:"""
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with override(language):
|
|
|
|
|
subject = _("Confirm by clicking the link to start the reconciliation")
|
|
|
|
|
context = {
|
|
|
|
|
"title": subject,
|
|
|
|
|
"message": message,
|
|
|
|
|
"link": f"{domain}/user-reconciliations/{user_type}/{confirmation_id}/",
|
|
|
|
|
"link_label": str(_("Click here")),
|
|
|
|
|
"button_label": str(_("Confirm")),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
user.send_email(subject, context, language)
|
|
|
|
|
|
|
|
|
|
def send_reconciliation_done_email(self, language=None):
|
|
|
|
|
"""Method allowing to send done email for reconciliation requests."""
|
|
|
|
|
language = language or get_language()
|
|
|
|
|
domain = settings.EMAIL_URL_APP or Site.objects.get_current().domain
|
|
|
|
|
|
|
|
|
|
message = _(
|
|
|
|
|
"""Your reconciliation request has been processed.
|
|
|
|
|
New documents are likely associated with your account:"""
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with override(language):
|
|
|
|
|
subject = _("Your accounts have been merged")
|
|
|
|
|
context = {
|
|
|
|
|
"title": subject,
|
|
|
|
|
"message": message,
|
|
|
|
|
"link": f"{domain}/",
|
|
|
|
|
"link_label": str(_("Click here to see")),
|
|
|
|
|
"button_label": str(_("See my documents")),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.active_user.send_email(subject, context, language)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UserReconciliationCsvImport(BaseModel):
|
|
|
|
|
"""Model to import reconciliations requests from an external source
|
|
|
|
|
(eg, )"""
|
|
|
|
|
|
|
|
|
|
file = models.FileField(upload_to="imports/", verbose_name=_("CSV file"))
|
|
|
|
|
status = models.CharField(
|
|
|
|
|
max_length=20,
|
|
|
|
|
choices=[
|
|
|
|
|
("pending", _("Pending")),
|
|
|
|
|
("running", _("Running")),
|
|
|
|
|
("done", _("Done")),
|
|
|
|
|
("error", _("Error")),
|
|
|
|
|
],
|
|
|
|
|
default="pending",
|
|
|
|
|
)
|
|
|
|
|
logs = models.TextField(blank=True)
|
|
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
db_table = "impress_user_reconciliation_csv_import"
|
|
|
|
|
verbose_name = _("user reconciliation CSV import")
|
|
|
|
|
verbose_name_plural = _("user reconciliation CSV imports")
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return f"User reconciliation CSV import {self.id}"
|
|
|
|
|
|
|
|
|
|
def send_email(self, subject, emails, context=None, language=None):
|
|
|
|
|
"""Generate and send email to the user from a template."""
|
|
|
|
|
context = context or {}
|
|
|
|
|
domain = settings.EMAIL_URL_APP or Site.objects.get_current().domain
|
|
|
|
|
language = language or get_language()
|
|
|
|
|
context.update(
|
|
|
|
|
{
|
|
|
|
|
"brandname": settings.EMAIL_BRAND_NAME,
|
|
|
|
|
"domain": domain,
|
|
|
|
|
"logo_img": settings.EMAIL_LOGO_IMG,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with override(language):
|
|
|
|
|
msg_html = render_to_string("mail/html/template.html", context)
|
|
|
|
|
msg_plain = render_to_string("mail/text/template.txt", context)
|
|
|
|
|
subject = str(subject) # Force translation
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
send_mail(
|
|
|
|
|
subject.capitalize(),
|
|
|
|
|
msg_plain,
|
|
|
|
|
settings.EMAIL_FROM,
|
|
|
|
|
emails,
|
|
|
|
|
html_message=msg_html,
|
|
|
|
|
fail_silently=False,
|
|
|
|
|
)
|
|
|
|
|
except smtplib.SMTPException as exception:
|
|
|
|
|
logger.error("invitation to %s was not sent: %s", emails, exception)
|
|
|
|
|
|
|
|
|
|
def send_reconciliation_error_email(
|
|
|
|
|
self, recipient_email, other_email, language=None
|
|
|
|
|
):
|
|
|
|
|
"""Method allowing to send email for reconciliation requests with errors."""
|
|
|
|
|
language = language or get_language()
|
|
|
|
|
|
|
|
|
|
emails = [recipient_email]
|
|
|
|
|
|
|
|
|
|
message = _(
|
|
|
|
|
"""Your request for reconciliation was unsuccessful.
|
|
|
|
|
Reconciliation failed for the following email addresses:
|
|
|
|
|
{recipient_email}, {other_email}.
|
|
|
|
|
Please check for typos.
|
|
|
|
|
You can submit another request with the valid email addresses."""
|
|
|
|
|
).format(recipient_email=recipient_email, other_email=other_email)
|
|
|
|
|
|
|
|
|
|
with override(language):
|
|
|
|
|
subject = _("Reconciliation of your Docs accounts not completed")
|
|
|
|
|
context = {
|
|
|
|
|
"title": subject,
|
|
|
|
|
"message": message,
|
|
|
|
|
"link": settings.USER_RECONCILIATION_FORM_URL,
|
|
|
|
|
"link_label": str(_("Click here")),
|
|
|
|
|
"button_label": str(_("Make a new request")),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.send_email(subject, emails, context, language)
|
|
|
|
|
|
|
|
|
|
|
2024-04-03 18:50:28 +02:00
|
|
|
class BaseAccess(BaseModel):
|
|
|
|
|
"""Base model for accesses to handle resources."""
|
|
|
|
|
|
|
|
|
|
user = models.ForeignKey(
|
|
|
|
|
User,
|
|
|
|
|
on_delete=models.CASCADE,
|
|
|
|
|
null=True,
|
|
|
|
|
blank=True,
|
|
|
|
|
)
|
|
|
|
|
team = models.CharField(max_length=100, blank=True)
|
|
|
|
|
role = models.CharField(
|
2024-05-25 08:15:34 +02:00
|
|
|
max_length=20, choices=RoleChoices.choices, default=RoleChoices.READER
|
2024-04-03 18:50:28 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
abstract = True
|
|
|
|
|
|
|
|
|
|
|
2025-02-16 17:26:51 +01:00
|
|
|
class DocumentQuerySet(MP_NodeQuerySet):
|
|
|
|
|
"""
|
|
|
|
|
Custom queryset for the Document model, providing additional methods
|
|
|
|
|
to filter documents based on user permissions.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def readable_per_se(self, user):
|
|
|
|
|
"""
|
2025-01-20 10:23:18 +01:00
|
|
|
Filters the queryset to return documents on which the given user has
|
|
|
|
|
direct access, team access or link access. This will not return all the
|
|
|
|
|
documents that a user can read because it can be obtained via an ancestor.
|
2025-02-16 17:26:51 +01:00
|
|
|
:param user: The user for whom readable documents are to be fetched.
|
2025-01-20 10:23:18 +01:00
|
|
|
:return: A queryset of documents for which the user has direct access,
|
|
|
|
|
team access or link access.
|
2025-02-16 17:26:51 +01:00
|
|
|
"""
|
|
|
|
|
if user.is_authenticated:
|
|
|
|
|
return self.filter(
|
|
|
|
|
models.Q(accesses__user=user)
|
|
|
|
|
| models.Q(accesses__team__in=user.teams)
|
|
|
|
|
| ~models.Q(link_reach=LinkReachChoices.RESTRICTED)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return self.filter(link_reach=LinkReachChoices.PUBLIC)
|
|
|
|
|
|
2025-04-12 11:35:36 +02:00
|
|
|
def annotate_is_favorite(self, user):
|
|
|
|
|
"""
|
|
|
|
|
Annotate document queryset with the favorite status for the current user.
|
|
|
|
|
"""
|
|
|
|
|
if user.is_authenticated:
|
|
|
|
|
favorite_exists_subquery = DocumentFavorite.objects.filter(
|
|
|
|
|
document_id=models.OuterRef("pk"), user=user
|
|
|
|
|
)
|
|
|
|
|
return self.annotate(is_favorite=models.Exists(favorite_exists_subquery))
|
|
|
|
|
|
|
|
|
|
return self.annotate(is_favorite=models.Value(False))
|
|
|
|
|
|
|
|
|
|
def annotate_user_roles(self, user):
|
|
|
|
|
"""
|
|
|
|
|
Annotate document queryset with the roles of the current user
|
|
|
|
|
on the document or its ancestors.
|
|
|
|
|
"""
|
|
|
|
|
output_field = ArrayField(base_field=models.CharField())
|
|
|
|
|
|
|
|
|
|
if user.is_authenticated:
|
|
|
|
|
user_roles_subquery = DocumentAccess.objects.filter(
|
|
|
|
|
models.Q(user=user) | models.Q(team__in=user.teams),
|
|
|
|
|
document__path=Left(models.OuterRef("path"), Length("document__path")),
|
|
|
|
|
).values_list("role", flat=True)
|
|
|
|
|
|
|
|
|
|
return self.annotate(
|
|
|
|
|
user_roles=models.Func(
|
|
|
|
|
user_roles_subquery, function="ARRAY", output_field=output_field
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return self.annotate(
|
|
|
|
|
user_roles=models.Value([], output_field=output_field),
|
|
|
|
|
)
|
|
|
|
|
|
2025-02-16 17:26:51 +01:00
|
|
|
|
2025-03-24 12:22:08 +01:00
|
|
|
class DocumentManager(MP_NodeManager.from_queryset(DocumentQuerySet)):
|
2025-02-16 17:26:51 +01:00
|
|
|
"""
|
|
|
|
|
Custom manager for the Document model, enabling the use of the custom
|
|
|
|
|
queryset methods directly from the model manager.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def get_queryset(self):
|
2025-03-24 12:22:08 +01:00
|
|
|
"""Sets the custom queryset as the default."""
|
|
|
|
|
return self._queryset_class(self.model).order_by("path")
|
2025-02-16 17:26:51 +01:00
|
|
|
|
|
|
|
|
|
2025-04-25 08:03:12 +02:00
|
|
|
# pylint: disable=too-many-public-methods
|
2024-12-16 16:58:14 +01:00
|
|
|
class Document(MP_Node, BaseModel):
|
2024-04-03 18:50:28 +02:00
|
|
|
"""Pad document carrying the content."""
|
|
|
|
|
|
2024-09-09 20:01:27 +02:00
|
|
|
title = models.CharField(_("title"), max_length=255, null=True, blank=True)
|
2024-12-18 11:37:01 +01:00
|
|
|
excerpt = models.TextField(_("excerpt"), max_length=300, null=True, blank=True)
|
2024-09-08 23:37:49 +02:00
|
|
|
link_reach = models.CharField(
|
|
|
|
|
max_length=20,
|
|
|
|
|
choices=LinkReachChoices.choices,
|
2024-10-25 14:16:29 +02:00
|
|
|
default=LinkReachChoices.RESTRICTED,
|
2024-09-08 23:37:49 +02:00
|
|
|
)
|
|
|
|
|
link_role = models.CharField(
|
|
|
|
|
max_length=20, choices=LinkRoleChoices.choices, default=LinkRoleChoices.READER
|
2024-04-03 18:50:28 +02:00
|
|
|
)
|
2024-11-12 16:28:34 +01:00
|
|
|
creator = models.ForeignKey(
|
2024-12-01 11:25:01 +01:00
|
|
|
User,
|
|
|
|
|
on_delete=models.RESTRICT,
|
|
|
|
|
related_name="documents_created",
|
|
|
|
|
blank=True,
|
|
|
|
|
null=True,
|
2024-11-12 16:28:34 +01:00
|
|
|
)
|
2025-01-02 17:20:09 +01:00
|
|
|
deleted_at = models.DateTimeField(null=True, blank=True)
|
|
|
|
|
ancestors_deleted_at = models.DateTimeField(null=True, blank=True)
|
2025-03-17 14:46:59 +01:00
|
|
|
has_deleted_children = models.BooleanField(default=False)
|
2025-01-20 10:23:18 +01:00
|
|
|
duplicated_from = models.ForeignKey(
|
|
|
|
|
"self",
|
|
|
|
|
on_delete=models.SET_NULL,
|
|
|
|
|
related_name="duplicates",
|
|
|
|
|
editable=False,
|
|
|
|
|
blank=True,
|
|
|
|
|
null=True,
|
|
|
|
|
)
|
|
|
|
|
attachments = ArrayField(
|
|
|
|
|
models.CharField(max_length=255),
|
|
|
|
|
default=list,
|
|
|
|
|
editable=False,
|
|
|
|
|
blank=True,
|
|
|
|
|
null=True,
|
|
|
|
|
)
|
2024-04-03 18:50:28 +02:00
|
|
|
|
2024-04-06 09:09:46 +02:00
|
|
|
_content = None
|
|
|
|
|
|
2024-12-16 16:58:14 +01:00
|
|
|
# Tree structure
|
|
|
|
|
alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
|
|
|
|
|
steplen = 7 # nb siblings max: 3,521,614,606,208
|
|
|
|
|
node_order_by = [] # Manual ordering
|
|
|
|
|
|
|
|
|
|
path = models.CharField(max_length=7 * 36, unique=True, db_collation="C")
|
|
|
|
|
|
2025-02-16 17:26:51 +01:00
|
|
|
objects = DocumentManager()
|
|
|
|
|
|
2024-04-03 18:50:28 +02:00
|
|
|
class Meta:
|
|
|
|
|
db_table = "impress_document"
|
2024-12-16 16:58:14 +01:00
|
|
|
ordering = ("path",)
|
2024-04-03 18:50:28 +02:00
|
|
|
verbose_name = _("Document")
|
|
|
|
|
verbose_name_plural = _("Documents")
|
2025-01-02 17:20:09 +01:00
|
|
|
constraints = [
|
|
|
|
|
models.CheckConstraint(
|
2025-06-16 15:05:25 +02:00
|
|
|
condition=(
|
2025-01-02 17:20:09 +01:00
|
|
|
models.Q(deleted_at__isnull=True)
|
|
|
|
|
| models.Q(deleted_at=models.F("ancestors_deleted_at"))
|
|
|
|
|
),
|
|
|
|
|
name="check_deleted_at_matches_ancestors_deleted_at_when_set",
|
|
|
|
|
),
|
|
|
|
|
]
|
2024-04-03 18:50:28 +02:00
|
|
|
|
|
|
|
|
def __str__(self):
|
2024-09-09 20:01:27 +02:00
|
|
|
return str(self.title) if self.title else str(_("Untitled Document"))
|
2024-04-03 18:50:28 +02:00
|
|
|
|
2025-04-25 08:03:12 +02:00
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
|
"""Initialize cache property."""
|
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
self._ancestors_link_definition = None
|
2025-04-28 21:43:59 +02:00
|
|
|
self._computed_link_definition = None
|
2025-04-25 08:03:12 +02:00
|
|
|
|
2024-08-20 16:42:27 +02:00
|
|
|
def save(self, *args, **kwargs):
|
|
|
|
|
"""Write content to object storage only if _content has changed."""
|
|
|
|
|
super().save(*args, **kwargs)
|
|
|
|
|
if self._content:
|
2025-08-06 07:32:27 +02:00
|
|
|
self.save_content(self._content)
|
2024-08-20 16:42:27 +02:00
|
|
|
|
2025-08-06 07:32:27 +02:00
|
|
|
def save_content(self, content):
|
|
|
|
|
"""Save content to object storage."""
|
|
|
|
|
|
|
|
|
|
file_key = self.file_key
|
|
|
|
|
bytes_content = content.encode("utf-8")
|
|
|
|
|
|
|
|
|
|
# Attempt to directly check if the object exists using the storage client.
|
|
|
|
|
try:
|
|
|
|
|
response = default_storage.connection.meta.client.head_object(
|
|
|
|
|
Bucket=default_storage.bucket_name, Key=file_key
|
|
|
|
|
)
|
|
|
|
|
except ClientError as excpt:
|
|
|
|
|
# If the error is a 404, the object doesn't exist, so we should create it.
|
|
|
|
|
if excpt.response["Error"]["Code"] == "404":
|
|
|
|
|
has_changed = True
|
2024-08-19 22:28:36 +02:00
|
|
|
else:
|
2025-08-06 07:32:27 +02:00
|
|
|
raise
|
|
|
|
|
else:
|
|
|
|
|
# Compare the existing ETag with the MD5 hash of the new content.
|
|
|
|
|
has_changed = (
|
|
|
|
|
response["ETag"].strip('"') != hashlib.md5(bytes_content).hexdigest() # noqa: S324
|
|
|
|
|
)
|
2024-08-20 16:42:27 +02:00
|
|
|
|
2025-08-06 07:32:27 +02:00
|
|
|
if has_changed:
|
|
|
|
|
content_file = ContentFile(bytes_content)
|
|
|
|
|
default_storage.save(file_key, content_file)
|
2024-08-20 16:42:27 +02:00
|
|
|
|
2025-03-17 14:46:59 +01:00
|
|
|
def is_leaf(self):
|
|
|
|
|
"""
|
|
|
|
|
:returns: True if the node is has no children
|
|
|
|
|
"""
|
|
|
|
|
return not self.has_deleted_children and self.numchild == 0
|
|
|
|
|
|
2024-04-08 23:37:15 +02:00
|
|
|
@property
|
2024-07-01 15:41:13 +02:00
|
|
|
def key_base(self):
|
|
|
|
|
"""Key base of the location where the document is stored in object storage."""
|
2024-04-08 23:37:15 +02:00
|
|
|
if not self.pk:
|
2024-07-01 15:41:13 +02:00
|
|
|
raise RuntimeError(
|
|
|
|
|
"The document instance must be saved before requesting a storage key."
|
|
|
|
|
)
|
2024-04-08 23:37:15 +02:00
|
|
|
return str(self.pk)
|
|
|
|
|
|
2024-07-01 15:41:13 +02:00
|
|
|
@property
|
|
|
|
|
def file_key(self):
|
|
|
|
|
"""Key of the object storage file to which the document content is stored"""
|
|
|
|
|
return f"{self.key_base}/file"
|
|
|
|
|
|
2024-04-06 09:09:46 +02:00
|
|
|
@property
|
|
|
|
|
def content(self):
|
|
|
|
|
"""Return the json content from object storage if available"""
|
|
|
|
|
if self._content is None and self.id:
|
|
|
|
|
try:
|
2024-04-08 23:37:15 +02:00
|
|
|
response = self.get_content_response()
|
|
|
|
|
except (FileNotFoundError, ClientError):
|
2024-04-06 09:09:46 +02:00
|
|
|
pass
|
2024-04-08 23:37:15 +02:00
|
|
|
else:
|
2024-05-25 08:15:34 +02:00
|
|
|
self._content = response["Body"].read().decode("utf-8")
|
2024-04-06 09:09:46 +02:00
|
|
|
return self._content
|
|
|
|
|
|
|
|
|
|
@content.setter
|
|
|
|
|
def content(self, content):
|
|
|
|
|
"""Cache the content, don't write to object storage yet"""
|
2024-05-21 14:46:23 +02:00
|
|
|
if not isinstance(content, str):
|
|
|
|
|
raise ValueError("content should be a string.")
|
2024-05-25 08:15:34 +02:00
|
|
|
|
2024-04-06 09:09:46 +02:00
|
|
|
self._content = content
|
|
|
|
|
|
2024-04-08 23:37:15 +02:00
|
|
|
def get_content_response(self, version_id=""):
|
|
|
|
|
"""Get the content in a specific version of the document"""
|
2025-03-20 00:16:00 +01:00
|
|
|
params = {
|
|
|
|
|
"Bucket": default_storage.bucket_name,
|
|
|
|
|
"Key": self.file_key,
|
|
|
|
|
}
|
|
|
|
|
if version_id:
|
|
|
|
|
params["VersionId"] = version_id
|
|
|
|
|
return default_storage.connection.meta.client.get_object(**params)
|
2024-04-08 23:37:15 +02:00
|
|
|
|
2024-09-16 19:27:48 +02:00
|
|
|
def get_versions_slice(self, from_version_id="", min_datetime=None, page_size=None):
|
2024-04-08 23:37:15 +02:00
|
|
|
"""Get document versions from object storage with pagination and starting conditions"""
|
|
|
|
|
# /!\ Trick here /!\
|
|
|
|
|
# The "KeyMarker" and "VersionIdMarker" fields must either be both set or both not set.
|
|
|
|
|
# The error we get otherwise is not helpful at all.
|
2024-09-16 19:27:48 +02:00
|
|
|
markers = {}
|
2024-04-08 23:37:15 +02:00
|
|
|
if from_version_id:
|
2024-09-16 19:27:48 +02:00
|
|
|
markers.update(
|
2024-04-08 23:37:15 +02:00
|
|
|
{"KeyMarker": self.file_key, "VersionIdMarker": from_version_id}
|
|
|
|
|
)
|
|
|
|
|
|
2024-09-16 19:27:48 +02:00
|
|
|
real_page_size = (
|
|
|
|
|
min(page_size, settings.DOCUMENT_VERSIONS_PAGE_SIZE)
|
|
|
|
|
if page_size
|
|
|
|
|
else settings.DOCUMENT_VERSIONS_PAGE_SIZE
|
|
|
|
|
)
|
2024-04-08 23:37:15 +02:00
|
|
|
|
|
|
|
|
response = default_storage.connection.meta.client.list_object_versions(
|
|
|
|
|
Bucket=default_storage.bucket_name,
|
|
|
|
|
Prefix=self.file_key,
|
2024-09-16 19:27:48 +02:00
|
|
|
# compensate the latest version that we exclude below and get one more to
|
|
|
|
|
# know if there are more pages
|
|
|
|
|
MaxKeys=real_page_size + 2,
|
|
|
|
|
**markers,
|
2024-04-08 23:37:15 +02:00
|
|
|
)
|
2024-09-16 19:27:48 +02:00
|
|
|
|
|
|
|
|
min_last_modified = min_datetime or self.created_at
|
|
|
|
|
versions = [
|
|
|
|
|
{
|
|
|
|
|
key_snake: version[key_camel]
|
|
|
|
|
for key_snake, key_camel in [
|
|
|
|
|
("etag", "ETag"),
|
|
|
|
|
("is_latest", "IsLatest"),
|
|
|
|
|
("last_modified", "LastModified"),
|
|
|
|
|
("version_id", "VersionId"),
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
for version in response.get("Versions", [])
|
|
|
|
|
if version["LastModified"] >= min_last_modified
|
|
|
|
|
and version["IsLatest"] is False
|
|
|
|
|
]
|
|
|
|
|
results = versions[:real_page_size]
|
|
|
|
|
|
|
|
|
|
count = len(results)
|
|
|
|
|
if count == len(versions):
|
|
|
|
|
is_truncated = False
|
|
|
|
|
next_version_id_marker = ""
|
|
|
|
|
else:
|
|
|
|
|
is_truncated = True
|
|
|
|
|
next_version_id_marker = versions[count - 1]["version_id"]
|
|
|
|
|
|
2024-04-08 23:37:15 +02:00
|
|
|
return {
|
2024-09-16 19:27:48 +02:00
|
|
|
"next_version_id_marker": next_version_id_marker,
|
|
|
|
|
"is_truncated": is_truncated,
|
|
|
|
|
"versions": results,
|
|
|
|
|
"count": count,
|
2024-04-08 23:37:15 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def delete_version(self, version_id):
|
|
|
|
|
"""Delete a version from object storage given its version id"""
|
|
|
|
|
return default_storage.connection.meta.client.delete_object(
|
|
|
|
|
Bucket=default_storage.bucket_name, Key=self.file_key, VersionId=version_id
|
|
|
|
|
)
|
|
|
|
|
|
2025-01-02 17:20:09 +01:00
|
|
|
def get_nb_accesses_cache_key(self):
|
|
|
|
|
"""Generate a unique cache key for each document."""
|
|
|
|
|
return f"document_{self.id!s}_nb_accesses"
|
|
|
|
|
|
2025-02-26 09:27:00 +01:00
|
|
|
def get_nb_accesses(self):
|
|
|
|
|
"""
|
|
|
|
|
Calculate the number of accesses:
|
|
|
|
|
- directly attached to the document
|
|
|
|
|
- attached to any of the document's ancestors
|
|
|
|
|
"""
|
2025-01-02 17:20:09 +01:00
|
|
|
cache_key = self.get_nb_accesses_cache_key()
|
|
|
|
|
nb_accesses = cache.get(cache_key)
|
|
|
|
|
|
|
|
|
|
if nb_accesses is None:
|
2025-02-26 09:27:00 +01:00
|
|
|
nb_accesses = (
|
|
|
|
|
DocumentAccess.objects.filter(document=self).count(),
|
|
|
|
|
DocumentAccess.objects.filter(
|
|
|
|
|
document__path=Left(
|
|
|
|
|
models.Value(self.path), Length("document__path")
|
|
|
|
|
),
|
|
|
|
|
document__ancestors_deleted_at__isnull=True,
|
|
|
|
|
).count(),
|
|
|
|
|
)
|
2025-01-02 17:20:09 +01:00
|
|
|
cache.set(cache_key, nb_accesses)
|
|
|
|
|
|
|
|
|
|
return nb_accesses
|
|
|
|
|
|
2025-02-26 09:27:00 +01:00
|
|
|
@property
|
|
|
|
|
def nb_accesses_direct(self):
|
|
|
|
|
"""Returns the number of accesses related to the document or one of its ancestors."""
|
|
|
|
|
return self.get_nb_accesses()[0]
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def nb_accesses_ancestors(self):
|
|
|
|
|
"""Returns the number of accesses related to the document or one of its ancestors."""
|
|
|
|
|
return self.get_nb_accesses()[1]
|
|
|
|
|
|
2025-01-02 17:20:09 +01:00
|
|
|
def invalidate_nb_accesses_cache(self):
|
|
|
|
|
"""
|
|
|
|
|
Invalidate the cache for number of accesses, including on affected descendants.
|
2025-02-16 17:26:51 +01:00
|
|
|
Args:
|
|
|
|
|
path: can optionally be passed as argument (useful when invalidating cache for a
|
|
|
|
|
document we just deleted)
|
2025-01-02 17:20:09 +01:00
|
|
|
"""
|
2025-02-16 17:26:51 +01:00
|
|
|
|
2025-01-02 17:20:09 +01:00
|
|
|
for document in Document.objects.filter(path__startswith=self.path).only("id"):
|
|
|
|
|
cache_key = document.get_nb_accesses_cache_key()
|
|
|
|
|
cache.delete(cache_key)
|
|
|
|
|
|
2025-04-25 08:03:12 +02:00
|
|
|
def get_role(self, user):
|
2024-12-17 07:47:23 +01:00
|
|
|
"""Return the roles a user has on a document."""
|
|
|
|
|
if not user.is_authenticated:
|
2025-04-25 08:03:12 +02:00
|
|
|
return None
|
2024-12-17 07:47:23 +01:00
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
roles = self.user_roles or []
|
|
|
|
|
except AttributeError:
|
2025-04-25 08:03:12 +02:00
|
|
|
roles = DocumentAccess.objects.filter(
|
|
|
|
|
models.Q(user=user) | models.Q(team__in=user.teams),
|
|
|
|
|
document__path=Left(models.Value(self.path), Length("document__path")),
|
|
|
|
|
).values_list("role", flat=True)
|
2025-02-17 10:25:07 +01:00
|
|
|
|
2025-04-25 08:03:12 +02:00
|
|
|
return RoleChoices.max(*roles)
|
2025-01-02 17:20:09 +01:00
|
|
|
|
2025-04-25 08:03:12 +02:00
|
|
|
def compute_ancestors_links_paths_mapping(self):
|
2025-03-24 10:46:40 +01:00
|
|
|
"""
|
|
|
|
|
Compute the ancestors links for the current document up to the highest readable ancestor.
|
|
|
|
|
"""
|
|
|
|
|
ancestors = (
|
|
|
|
|
(self.get_ancestors() | self._meta.model.objects.filter(pk=self.pk))
|
|
|
|
|
.filter(ancestors_deleted_at__isnull=True)
|
|
|
|
|
.order_by("path")
|
|
|
|
|
)
|
|
|
|
|
ancestors_links = []
|
|
|
|
|
paths_links_mapping = {}
|
2025-04-25 08:03:12 +02:00
|
|
|
|
|
|
|
|
for ancestor in ancestors:
|
2025-03-24 10:46:40 +01:00
|
|
|
ancestors_links.append(
|
|
|
|
|
{"link_reach": ancestor.link_reach, "link_role": ancestor.link_role}
|
|
|
|
|
)
|
|
|
|
|
paths_links_mapping[ancestor.path] = ancestors_links.copy()
|
|
|
|
|
|
2025-04-25 08:03:12 +02:00
|
|
|
return paths_links_mapping
|
|
|
|
|
|
2025-04-28 21:43:59 +02:00
|
|
|
@property
|
|
|
|
|
def link_definition(self):
|
|
|
|
|
"""Returns link reach/role as a definition in dictionary format."""
|
|
|
|
|
return {"link_reach": self.link_reach, "link_role": self.link_role}
|
|
|
|
|
|
2025-04-25 08:03:12 +02:00
|
|
|
@property
|
|
|
|
|
def ancestors_link_definition(self):
|
2025-04-25 08:03:12 +02:00
|
|
|
"""Link definition equivalent to all document's ancestors."""
|
2025-04-25 08:03:12 +02:00
|
|
|
if getattr(self, "_ancestors_link_definition", None) is None:
|
|
|
|
|
if self.depth <= 1:
|
|
|
|
|
ancestors_links = []
|
|
|
|
|
else:
|
|
|
|
|
mapping = self.compute_ancestors_links_paths_mapping()
|
|
|
|
|
ancestors_links = mapping.get(self.path[: -self.steplen], [])
|
|
|
|
|
self._ancestors_link_definition = get_equivalent_link_definition(
|
|
|
|
|
ancestors_links
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return self._ancestors_link_definition
|
|
|
|
|
|
|
|
|
|
@ancestors_link_definition.setter
|
|
|
|
|
def ancestors_link_definition(self, definition):
|
|
|
|
|
"""Cache the ancestors_link_definition."""
|
|
|
|
|
self._ancestors_link_definition = definition
|
2025-03-24 10:46:40 +01:00
|
|
|
|
2025-04-25 08:03:12 +02:00
|
|
|
@property
|
|
|
|
|
def ancestors_link_reach(self):
|
|
|
|
|
"""Link reach equivalent to all document's ancestors."""
|
|
|
|
|
return self.ancestors_link_definition["link_reach"]
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def ancestors_link_role(self):
|
|
|
|
|
"""Link role equivalent to all document's ancestors."""
|
|
|
|
|
return self.ancestors_link_definition["link_role"]
|
2025-03-24 10:46:40 +01:00
|
|
|
|
2025-04-28 21:43:59 +02:00
|
|
|
@property
|
|
|
|
|
def computed_link_definition(self):
|
|
|
|
|
"""
|
|
|
|
|
Link reach/role on the document, combining inherited ancestors' link
|
|
|
|
|
definitions and the document's own link definition.
|
|
|
|
|
"""
|
|
|
|
|
if getattr(self, "_computed_link_definition", None) is None:
|
|
|
|
|
self._computed_link_definition = get_equivalent_link_definition(
|
|
|
|
|
[self.ancestors_link_definition, self.link_definition]
|
|
|
|
|
)
|
|
|
|
|
return self._computed_link_definition
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def computed_link_reach(self):
|
|
|
|
|
"""Actual link reach on the document."""
|
|
|
|
|
return self.computed_link_definition["link_reach"]
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def computed_link_role(self):
|
|
|
|
|
"""Actual link role on the document."""
|
|
|
|
|
return self.computed_link_definition["link_role"]
|
|
|
|
|
|
2025-04-25 08:03:12 +02:00
|
|
|
def get_abilities(self, user):
|
2024-04-03 18:50:28 +02:00
|
|
|
"""
|
|
|
|
|
Compute and return abilities for a given user on the document.
|
|
|
|
|
"""
|
2025-04-25 08:03:12 +02:00
|
|
|
# First get the role based on specific access
|
|
|
|
|
role = self.get_role(user)
|
2025-01-02 17:20:09 +01:00
|
|
|
|
|
|
|
|
# Characteristics that are based only on specific access
|
2025-04-25 08:03:12 +02:00
|
|
|
is_owner = role == RoleChoices.OWNER
|
2025-10-06 08:50:06 +02:00
|
|
|
is_deleted = self.ancestors_deleted_at
|
2025-04-25 08:03:12 +02:00
|
|
|
is_owner_or_admin = (is_owner or role == RoleChoices.ADMIN) and not is_deleted
|
2024-09-08 23:37:49 +02:00
|
|
|
|
2025-01-02 17:20:09 +01:00
|
|
|
# Compute access roles before adding link roles because we don't
|
2024-09-08 23:37:49 +02:00
|
|
|
# want anonymous users to access versions (we wouldn't know from
|
|
|
|
|
# which date to allow them anyway)
|
2024-10-23 08:26:40 +02:00
|
|
|
# Anonymous users should also not see document accesses
|
2025-04-25 08:03:12 +02:00
|
|
|
has_access_role = bool(role) and not is_deleted
|
2025-02-11 08:45:21 +01:00
|
|
|
can_update_from_access = (
|
2025-04-25 08:03:12 +02:00
|
|
|
is_owner_or_admin or role == RoleChoices.EDITOR
|
2025-02-11 08:45:21 +01:00
|
|
|
) and not is_deleted
|
2024-09-08 23:37:49 +02:00
|
|
|
|
2025-04-25 08:03:12 +02:00
|
|
|
link_select_options = LinkReachChoices.get_select_options(
|
|
|
|
|
**self.ancestors_link_definition
|
2025-04-06 21:20:04 +02:00
|
|
|
)
|
2025-04-25 08:03:12 +02:00
|
|
|
link_definition = get_equivalent_link_definition(
|
|
|
|
|
[
|
|
|
|
|
self.ancestors_link_definition,
|
|
|
|
|
{"link_reach": self.link_reach, "link_role": self.link_role},
|
|
|
|
|
]
|
2024-04-03 18:50:28 +02:00
|
|
|
)
|
2025-01-02 17:20:09 +01:00
|
|
|
|
2025-04-25 08:03:12 +02:00
|
|
|
link_reach = link_definition["link_reach"]
|
|
|
|
|
if link_reach == LinkReachChoices.PUBLIC or (
|
|
|
|
|
link_reach == LinkReachChoices.AUTHENTICATED and user.is_authenticated
|
|
|
|
|
):
|
|
|
|
|
role = RoleChoices.max(role, link_definition["link_role"])
|
|
|
|
|
|
|
|
|
|
can_get = bool(role) and not is_deleted
|
2025-10-06 08:50:06 +02:00
|
|
|
retrieve = can_get or is_owner
|
2025-01-02 17:20:09 +01:00
|
|
|
can_update = (
|
2025-04-25 08:03:12 +02:00
|
|
|
is_owner_or_admin or role == RoleChoices.EDITOR
|
2025-01-02 17:20:09 +01:00
|
|
|
) and not is_deleted
|
2025-09-12 15:28:25 +02:00
|
|
|
can_comment = (can_update or role == RoleChoices.COMMENTER) and not is_deleted
|
2025-08-22 14:57:02 +02:00
|
|
|
can_create_children = can_update and user.is_authenticated
|
|
|
|
|
can_destroy = (
|
|
|
|
|
is_owner
|
|
|
|
|
if self.is_root()
|
|
|
|
|
else (is_owner_or_admin or (user.is_authenticated and self.creator == user))
|
2025-10-06 08:50:06 +02:00
|
|
|
) and not is_deleted
|
2024-04-03 18:50:28 +02:00
|
|
|
|
2025-02-11 08:45:21 +01:00
|
|
|
ai_allow_reach_from = settings.AI_ALLOW_REACH_FROM
|
|
|
|
|
ai_access = any(
|
|
|
|
|
[
|
|
|
|
|
ai_allow_reach_from == LinkReachChoices.PUBLIC and can_update,
|
|
|
|
|
ai_allow_reach_from == LinkReachChoices.AUTHENTICATED
|
|
|
|
|
and user.is_authenticated
|
|
|
|
|
and can_update,
|
|
|
|
|
ai_allow_reach_from == LinkReachChoices.RESTRICTED
|
|
|
|
|
and can_update_from_access,
|
|
|
|
|
]
|
|
|
|
|
)
|
|
|
|
|
|
2024-04-03 18:50:28 +02:00
|
|
|
return {
|
2024-10-23 08:26:40 +02:00
|
|
|
"accesses_manage": is_owner_or_admin,
|
2025-01-02 17:20:09 +01:00
|
|
|
"accesses_view": has_access_role,
|
2025-02-11 08:45:21 +01:00
|
|
|
"ai_transform": ai_access,
|
|
|
|
|
"ai_translate": ai_access,
|
2024-11-18 07:59:55 +01:00
|
|
|
"attachment_upload": can_update,
|
2025-05-21 14:22:31 +02:00
|
|
|
"media_check": can_get,
|
2025-06-26 07:17:00 +02:00
|
|
|
"can_edit": can_update,
|
2024-12-18 08:44:12 +01:00
|
|
|
"children_list": can_get,
|
2025-08-22 14:57:02 +02:00
|
|
|
"children_create": can_create_children,
|
2024-11-18 08:05:54 +01:00
|
|
|
"collaboration_auth": can_get,
|
2025-08-26 17:55:53 +02:00
|
|
|
"comment": can_comment,
|
2025-07-24 02:31:50 +02:00
|
|
|
"content": can_get,
|
2025-03-10 10:11:38 +01:00
|
|
|
"cors_proxy": can_get,
|
2025-02-17 10:25:07 +01:00
|
|
|
"descendants": can_get,
|
2025-08-22 14:57:02 +02:00
|
|
|
"destroy": can_destroy,
|
2025-07-02 13:45:50 +02:00
|
|
|
"duplicate": can_get and user.is_authenticated,
|
2024-11-09 10:45:38 +01:00
|
|
|
"favorite": can_get and user.is_authenticated,
|
2024-09-08 23:07:47 +02:00
|
|
|
"link_configuration": is_owner_or_admin,
|
2025-10-06 08:50:06 +02:00
|
|
|
"invite_owner": is_owner and not is_deleted,
|
2025-07-13 19:56:07 +02:00
|
|
|
"mask": can_get and user.is_authenticated,
|
2025-10-06 08:50:06 +02:00
|
|
|
"move": is_owner_or_admin and not is_deleted,
|
2024-11-18 07:59:55 +01:00
|
|
|
"partial_update": can_update,
|
2025-01-05 14:43:15 +01:00
|
|
|
"restore": is_owner,
|
2025-10-06 08:50:06 +02:00
|
|
|
"retrieve": retrieve,
|
2024-11-18 07:59:55 +01:00
|
|
|
"media_auth": can_get,
|
2025-04-25 08:03:12 +02:00
|
|
|
"link_select_options": link_select_options,
|
2025-10-06 08:50:06 +02:00
|
|
|
"tree": retrieve,
|
2024-11-18 07:59:55 +01:00
|
|
|
"update": can_update,
|
2024-08-19 22:35:48 +02:00
|
|
|
"versions_destroy": is_owner_or_admin,
|
2025-01-02 17:20:09 +01:00
|
|
|
"versions_list": has_access_role,
|
|
|
|
|
"versions_retrieve": has_access_role,
|
2024-04-03 18:50:28 +02:00
|
|
|
}
|
|
|
|
|
|
2024-12-01 11:25:01 +01:00
|
|
|
def send_email(self, subject, emails, context=None, language=None):
|
|
|
|
|
"""Generate and send email from a template."""
|
|
|
|
|
context = context or {}
|
2026-01-23 17:56:31 +01:00
|
|
|
domain = settings.EMAIL_URL_APP or Site.objects.get_current().domain
|
2024-12-01 11:25:01 +01:00
|
|
|
language = language or get_language()
|
|
|
|
|
context.update(
|
|
|
|
|
{
|
2024-12-13 17:48:08 +01:00
|
|
|
"brandname": settings.EMAIL_BRAND_NAME,
|
|
|
|
|
"document": self,
|
2024-12-01 11:25:01 +01:00
|
|
|
"domain": domain,
|
|
|
|
|
"link": f"{domain}/docs/{self.id}/",
|
2026-01-20 12:03:54 +01:00
|
|
|
"link_label": self.title or str(_("Untitled Document")),
|
|
|
|
|
"button_label": _("Open"),
|
2024-12-13 17:48:08 +01:00
|
|
|
"logo_img": settings.EMAIL_LOGO_IMG,
|
2024-12-01 11:25:01 +01:00
|
|
|
}
|
|
|
|
|
)
|
2024-09-25 12:43:02 +02:00
|
|
|
|
2024-12-01 11:25:01 +01:00
|
|
|
with override(language):
|
2025-06-20 15:24:46 +02:00
|
|
|
msg_html = render_to_string("mail/html/template.html", context)
|
|
|
|
|
msg_plain = render_to_string("mail/text/template.txt", context)
|
2024-12-01 11:25:01 +01:00
|
|
|
subject = str(subject) # Force translation
|
|
|
|
|
|
|
|
|
|
try:
|
2024-09-25 12:43:02 +02:00
|
|
|
send_mail(
|
2024-12-01 11:25:01 +01:00
|
|
|
subject.capitalize(),
|
2024-09-25 12:43:02 +02:00
|
|
|
msg_plain,
|
|
|
|
|
settings.EMAIL_FROM,
|
2024-12-01 11:25:01 +01:00
|
|
|
emails,
|
2024-09-25 12:43:02 +02:00
|
|
|
html_message=msg_html,
|
|
|
|
|
fail_silently=False,
|
|
|
|
|
)
|
2024-12-01 11:25:01 +01:00
|
|
|
except smtplib.SMTPException as exception:
|
|
|
|
|
logger.error("invitation to %s was not sent: %s", emails, exception)
|
2024-09-25 12:43:02 +02:00
|
|
|
|
2024-12-01 11:25:01 +01:00
|
|
|
def send_invitation_email(self, email, role, sender, language=None):
|
|
|
|
|
"""Method allowing a user to send an email invitation to another user for a document."""
|
|
|
|
|
language = language or get_language()
|
|
|
|
|
role = RoleChoices(role).label
|
|
|
|
|
sender_name = sender.full_name or sender.email
|
|
|
|
|
sender_name_email = (
|
|
|
|
|
f"{sender.full_name:s} ({sender.email})"
|
|
|
|
|
if sender.full_name
|
|
|
|
|
else sender.email
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with override(language):
|
|
|
|
|
context = {
|
|
|
|
|
"title": _("{name} shared a document with you!").format(
|
|
|
|
|
name=sender_name
|
|
|
|
|
),
|
|
|
|
|
"message": _(
|
2024-12-17 17:15:30 +01:00
|
|
|
'{name} invited you with the role "{role}" on the following document:'
|
2024-12-01 11:25:01 +01:00
|
|
|
).format(name=sender_name_email, role=role.lower()),
|
|
|
|
|
}
|
2025-03-04 09:09:02 +01:00
|
|
|
subject = (
|
|
|
|
|
context["title"]
|
|
|
|
|
if not self.title
|
|
|
|
|
else _("{name} shared a document with you: {title}").format(
|
|
|
|
|
name=sender_name, title=self.title
|
|
|
|
|
)
|
2024-12-01 11:25:01 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.send_email(subject, [email], context, language)
|
2024-09-25 12:43:02 +02:00
|
|
|
|
2025-01-02 17:20:09 +01:00
|
|
|
@transaction.atomic
|
|
|
|
|
def soft_delete(self):
|
|
|
|
|
"""
|
|
|
|
|
Soft delete the document, marking the deletion on descendants.
|
|
|
|
|
We still keep the .delete() method untouched for programmatic purposes.
|
|
|
|
|
"""
|
2025-02-17 10:19:06 +01:00
|
|
|
if (
|
|
|
|
|
self._meta.model.objects.filter(
|
|
|
|
|
models.Q(deleted_at__isnull=False)
|
|
|
|
|
| models.Q(ancestors_deleted_at__isnull=False),
|
|
|
|
|
pk=self.pk,
|
|
|
|
|
).exists()
|
|
|
|
|
or self.get_ancestors().filter(deleted_at__isnull=False).exists()
|
|
|
|
|
):
|
2025-01-02 17:20:09 +01:00
|
|
|
raise RuntimeError(
|
|
|
|
|
"This document is already deleted or has deleted ancestors."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.ancestors_deleted_at = self.deleted_at = timezone.now()
|
|
|
|
|
self.save()
|
2025-02-26 09:27:00 +01:00
|
|
|
self.invalidate_nb_accesses_cache()
|
2025-01-02 17:20:09 +01:00
|
|
|
|
2025-02-26 16:33:23 +01:00
|
|
|
if self.depth > 1:
|
|
|
|
|
self._meta.model.objects.filter(pk=self.get_parent().pk).update(
|
2025-03-17 14:46:59 +01:00
|
|
|
numchild=models.F("numchild") - 1,
|
|
|
|
|
has_deleted_children=True,
|
2025-02-26 16:33:23 +01:00
|
|
|
)
|
|
|
|
|
|
2025-01-02 17:20:09 +01:00
|
|
|
# Mark all descendants as soft deleted
|
|
|
|
|
self.get_descendants().filter(ancestors_deleted_at__isnull=True).update(
|
2025-10-31 17:06:46 +01:00
|
|
|
ancestors_deleted_at=self.ancestors_deleted_at,
|
|
|
|
|
updated_at=self.updated_at,
|
2025-01-02 17:20:09 +01:00
|
|
|
)
|
|
|
|
|
|
2025-01-05 14:43:15 +01:00
|
|
|
@transaction.atomic
|
|
|
|
|
def restore(self):
|
|
|
|
|
"""Cancelling a soft delete with checks."""
|
|
|
|
|
# This should not happen
|
2025-02-26 16:33:23 +01:00
|
|
|
if self._meta.model.objects.filter(
|
|
|
|
|
pk=self.pk, deleted_at__isnull=True
|
|
|
|
|
).exists():
|
|
|
|
|
raise RuntimeError("This document is not deleted.")
|
2025-01-05 14:43:15 +01:00
|
|
|
|
|
|
|
|
if self.deleted_at < get_trashbin_cutoff():
|
2025-02-17 10:19:06 +01:00
|
|
|
raise RuntimeError(
|
|
|
|
|
"This document was permanently deleted and cannot be restored."
|
2025-01-05 14:43:15 +01:00
|
|
|
)
|
|
|
|
|
|
2025-03-03 14:12:34 +01:00
|
|
|
# save the current deleted_at value to exclude it from the descendants update
|
|
|
|
|
current_deleted_at = self.deleted_at
|
|
|
|
|
|
2025-01-05 14:43:15 +01:00
|
|
|
# Restore the current document
|
|
|
|
|
self.deleted_at = None
|
|
|
|
|
|
|
|
|
|
# Calculate the minimum `deleted_at` among all ancestors
|
|
|
|
|
ancestors_deleted_at = (
|
|
|
|
|
self.get_ancestors()
|
|
|
|
|
.filter(deleted_at__isnull=False)
|
2025-02-24 15:38:27 +01:00
|
|
|
.order_by("deleted_at")
|
2025-01-05 14:43:15 +01:00
|
|
|
.values_list("deleted_at", flat=True)
|
2025-02-24 15:38:27 +01:00
|
|
|
.first()
|
2025-01-05 14:43:15 +01:00
|
|
|
)
|
2025-02-24 15:38:27 +01:00
|
|
|
self.ancestors_deleted_at = ancestors_deleted_at
|
2025-03-03 14:12:34 +01:00
|
|
|
self.save(update_fields=["deleted_at", "ancestors_deleted_at"])
|
2025-02-26 09:27:00 +01:00
|
|
|
self.invalidate_nb_accesses_cache()
|
2025-01-05 14:43:15 +01:00
|
|
|
|
2025-03-03 14:12:34 +01:00
|
|
|
self.get_descendants().exclude(
|
|
|
|
|
models.Q(deleted_at__isnull=False)
|
|
|
|
|
| models.Q(ancestors_deleted_at__lt=current_deleted_at)
|
|
|
|
|
).update(ancestors_deleted_at=self.ancestors_deleted_at)
|
2025-01-05 14:43:15 +01:00
|
|
|
|
2025-02-26 16:33:23 +01:00
|
|
|
if self.depth > 1:
|
|
|
|
|
self._meta.model.objects.filter(pk=self.get_parent().pk).update(
|
|
|
|
|
numchild=models.F("numchild") + 1
|
|
|
|
|
)
|
|
|
|
|
|
2024-04-03 18:50:28 +02:00
|
|
|
|
2024-09-08 23:37:49 +02:00
|
|
|
class LinkTrace(BaseModel):
|
|
|
|
|
"""
|
|
|
|
|
Relation model to trace accesses to a document via a link by a logged-in user.
|
|
|
|
|
This is necessary to show the document in the user's list of documents even
|
|
|
|
|
though the user does not have a role on the document.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
document = models.ForeignKey(
|
|
|
|
|
Document,
|
|
|
|
|
on_delete=models.CASCADE,
|
|
|
|
|
related_name="link_traces",
|
|
|
|
|
)
|
|
|
|
|
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="link_traces")
|
2025-07-13 19:56:07 +02:00
|
|
|
is_masked = models.BooleanField(default=False)
|
2024-09-08 23:37:49 +02:00
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
db_table = "impress_link_trace"
|
|
|
|
|
verbose_name = _("Document/user link trace")
|
|
|
|
|
verbose_name_plural = _("Document/user link traces")
|
|
|
|
|
constraints = [
|
|
|
|
|
models.UniqueConstraint(
|
|
|
|
|
fields=["user", "document"],
|
|
|
|
|
name="unique_link_trace_document_user",
|
|
|
|
|
violation_error_message=_(
|
|
|
|
|
"A link trace already exists for this document/user."
|
|
|
|
|
),
|
|
|
|
|
),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return f"{self.user!s} trace on document {self.document!s}"
|
|
|
|
|
|
|
|
|
|
|
2024-11-09 10:45:38 +01:00
|
|
|
class DocumentFavorite(BaseModel):
|
|
|
|
|
"""Relation model to store a user's favorite documents."""
|
|
|
|
|
|
|
|
|
|
document = models.ForeignKey(
|
|
|
|
|
Document,
|
|
|
|
|
on_delete=models.CASCADE,
|
|
|
|
|
related_name="favorited_by_users",
|
|
|
|
|
)
|
|
|
|
|
user = models.ForeignKey(
|
|
|
|
|
User, on_delete=models.CASCADE, related_name="favorite_documents"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
db_table = "impress_document_favorite"
|
|
|
|
|
verbose_name = _("Document favorite")
|
|
|
|
|
verbose_name_plural = _("Document favorites")
|
|
|
|
|
constraints = [
|
|
|
|
|
models.UniqueConstraint(
|
|
|
|
|
fields=["user", "document"],
|
|
|
|
|
name="unique_document_favorite_user",
|
|
|
|
|
violation_error_message=_(
|
|
|
|
|
"This document is already targeted by a favorite relation instance "
|
|
|
|
|
"for the same user."
|
|
|
|
|
),
|
|
|
|
|
),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return f"{self.user!s} favorite on document {self.document!s}"
|
|
|
|
|
|
|
|
|
|
|
2024-04-03 18:50:28 +02:00
|
|
|
class DocumentAccess(BaseAccess):
|
|
|
|
|
"""Relation model to give access to a document for a user or a team with a role."""
|
|
|
|
|
|
|
|
|
|
document = models.ForeignKey(
|
|
|
|
|
Document,
|
|
|
|
|
on_delete=models.CASCADE,
|
|
|
|
|
related_name="accesses",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
db_table = "impress_document_access"
|
|
|
|
|
ordering = ("-created_at",)
|
|
|
|
|
verbose_name = _("Document/user relation")
|
|
|
|
|
verbose_name_plural = _("Document/user relations")
|
|
|
|
|
constraints = [
|
|
|
|
|
models.UniqueConstraint(
|
|
|
|
|
fields=["user", "document"],
|
|
|
|
|
condition=models.Q(user__isnull=False), # Exclude null users
|
|
|
|
|
name="unique_document_user",
|
|
|
|
|
violation_error_message=_("This user is already in this document."),
|
|
|
|
|
),
|
|
|
|
|
models.UniqueConstraint(
|
|
|
|
|
fields=["team", "document"],
|
|
|
|
|
condition=models.Q(team__gt=""), # Exclude empty string teams
|
|
|
|
|
name="unique_document_team",
|
|
|
|
|
violation_error_message=_("This team is already in this document."),
|
|
|
|
|
),
|
|
|
|
|
models.CheckConstraint(
|
2025-06-16 15:05:25 +02:00
|
|
|
condition=models.Q(user__isnull=False, team="")
|
2024-04-03 18:50:28 +02:00
|
|
|
| models.Q(user__isnull=True, team__gt=""),
|
|
|
|
|
name="check_document_access_either_user_or_team",
|
|
|
|
|
violation_error_message=_("Either user or team must be set, not both."),
|
|
|
|
|
),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return f"{self.user!s} is {self.role:s} in document {self.document!s}"
|
|
|
|
|
|
2025-01-02 17:20:09 +01:00
|
|
|
def save(self, *args, **kwargs):
|
|
|
|
|
"""Override save to clear the document's cache for number of accesses."""
|
|
|
|
|
super().save(*args, **kwargs)
|
|
|
|
|
self.document.invalidate_nb_accesses_cache()
|
|
|
|
|
|
2025-05-02 18:30:12 +02:00
|
|
|
@property
|
|
|
|
|
def target_key(self):
|
|
|
|
|
"""Get a unique key for the actor targeted by the access, without possible conflict."""
|
|
|
|
|
return f"user:{self.user_id!s}" if self.user_id else f"team:{self.team:s}"
|
|
|
|
|
|
2025-01-02 17:20:09 +01:00
|
|
|
def delete(self, *args, **kwargs):
|
|
|
|
|
"""Override delete to clear the document's cache for number of accesses."""
|
|
|
|
|
super().delete(*args, **kwargs)
|
|
|
|
|
self.document.invalidate_nb_accesses_cache()
|
|
|
|
|
|
2025-05-02 18:30:12 +02:00
|
|
|
def set_user_roles_tuple(self, ancestors_role, current_role):
|
|
|
|
|
"""
|
|
|
|
|
Set a precomputed (ancestor_role, current_role) tuple for this instance.
|
|
|
|
|
|
|
|
|
|
This avoids querying the database in `get_roles_tuple()` and is useful
|
|
|
|
|
when roles are already known, such as in bulk serialization.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
ancestor_role (str | None): Highest role on any ancestor document.
|
|
|
|
|
current_role (str | None): Role on the current document.
|
|
|
|
|
"""
|
|
|
|
|
# pylint: disable=attribute-defined-outside-init
|
|
|
|
|
self._prefetched_user_roles_tuple = (ancestors_role, current_role)
|
|
|
|
|
|
|
|
|
|
def get_user_roles_tuple(self, user):
|
|
|
|
|
"""
|
|
|
|
|
Return a tuple of:
|
|
|
|
|
- the highest role the user has on any ancestor of the document
|
|
|
|
|
- the role the user has on the current document
|
|
|
|
|
|
|
|
|
|
If roles have been explicitly set using `set_user_roles_tuple()`,
|
|
|
|
|
those will be returned instead of querying the database.
|
|
|
|
|
|
|
|
|
|
This allows viewsets or serializers to precompute roles for performance
|
|
|
|
|
when handling multiple documents at once.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
user (User): The user whose roles are being evaluated.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
tuple[str | None, str | None]: (max_ancestor_role, current_document_role)
|
|
|
|
|
"""
|
|
|
|
|
if not user.is_authenticated:
|
|
|
|
|
return None, None
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
return self._prefetched_user_roles_tuple
|
|
|
|
|
except AttributeError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
ancestors = (
|
|
|
|
|
self.document.get_ancestors() | Document.objects.filter(pk=self.document_id)
|
|
|
|
|
).filter(ancestors_deleted_at__isnull=True)
|
|
|
|
|
|
|
|
|
|
access_tuples = DocumentAccess.objects.filter(
|
|
|
|
|
models.Q(user=user) | models.Q(team__in=user.teams),
|
|
|
|
|
document__in=ancestors,
|
|
|
|
|
).values_list("document_id", "role")
|
|
|
|
|
|
|
|
|
|
ancestors_roles = []
|
|
|
|
|
current_roles = []
|
|
|
|
|
for doc_id, role in access_tuples:
|
|
|
|
|
if doc_id == self.document_id:
|
|
|
|
|
current_roles.append(role)
|
|
|
|
|
else:
|
|
|
|
|
ancestors_roles.append(role)
|
|
|
|
|
|
|
|
|
|
return RoleChoices.max(*ancestors_roles), RoleChoices.max(*current_roles)
|
|
|
|
|
|
2024-04-03 18:50:28 +02:00
|
|
|
def get_abilities(self, user):
|
|
|
|
|
"""
|
|
|
|
|
Compute and return abilities for a given user on the document access.
|
|
|
|
|
"""
|
2025-05-02 18:30:12 +02:00
|
|
|
ancestors_role, current_role = self.get_user_roles_tuple(user)
|
|
|
|
|
role = RoleChoices.max(ancestors_role, current_role)
|
2025-04-25 08:03:12 +02:00
|
|
|
is_owner_or_admin = role in PRIVILEGED_ROLES
|
2025-05-02 18:30:12 +02:00
|
|
|
|
2025-03-24 23:36:24 +01:00
|
|
|
if self.role == RoleChoices.OWNER:
|
2025-05-04 22:16:34 +02:00
|
|
|
can_delete = role == RoleChoices.OWNER and (
|
|
|
|
|
# check if document is not root trying to avoid an extra query
|
2025-05-07 18:48:08 +02:00
|
|
|
self.document.depth > 1
|
2025-05-04 22:16:34 +02:00
|
|
|
or DocumentAccess.objects.filter(
|
2025-05-02 18:30:12 +02:00
|
|
|
document_id=self.document_id, role=RoleChoices.OWNER
|
|
|
|
|
).count()
|
|
|
|
|
> 1
|
2025-03-24 23:36:24 +01:00
|
|
|
)
|
2025-05-02 18:30:12 +02:00
|
|
|
set_role_to = RoleChoices.values if can_delete else []
|
2025-03-24 23:36:24 +01:00
|
|
|
else:
|
|
|
|
|
can_delete = is_owner_or_admin
|
|
|
|
|
set_role_to = []
|
|
|
|
|
if is_owner_or_admin:
|
|
|
|
|
set_role_to.extend(
|
2025-08-26 17:55:53 +02:00
|
|
|
[
|
|
|
|
|
RoleChoices.READER,
|
2025-09-12 15:28:25 +02:00
|
|
|
RoleChoices.COMMENTER,
|
2025-08-26 17:55:53 +02:00
|
|
|
RoleChoices.EDITOR,
|
|
|
|
|
RoleChoices.ADMIN,
|
|
|
|
|
]
|
2025-03-24 23:36:24 +01:00
|
|
|
)
|
2025-05-02 18:30:12 +02:00
|
|
|
if role == RoleChoices.OWNER:
|
|
|
|
|
set_role_to.append(RoleChoices.OWNER)
|
2025-03-24 23:36:24 +01:00
|
|
|
|
2025-05-02 18:30:12 +02:00
|
|
|
# Filter out roles that would be lower than the one the user already has
|
2025-05-02 19:18:30 +02:00
|
|
|
ancestors_role_priority = RoleChoices.get_priority(
|
|
|
|
|
getattr(self, "max_ancestors_role", None)
|
|
|
|
|
)
|
2025-05-02 18:30:12 +02:00
|
|
|
set_role_to = [
|
|
|
|
|
candidate_role
|
|
|
|
|
for candidate_role in set_role_to
|
|
|
|
|
if RoleChoices.get_priority(candidate_role) >= ancestors_role_priority
|
|
|
|
|
]
|
|
|
|
|
if len(set_role_to) == 1:
|
|
|
|
|
set_role_to = []
|
2025-03-24 23:36:24 +01:00
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"destroy": can_delete,
|
|
|
|
|
"update": bool(set_role_to) and is_owner_or_admin,
|
|
|
|
|
"partial_update": bool(set_role_to) and is_owner_or_admin,
|
2025-05-02 18:30:12 +02:00
|
|
|
"retrieve": (self.user and self.user.id == user.id) or is_owner_or_admin,
|
2025-03-24 23:36:24 +01:00
|
|
|
"set_role_to": set_role_to,
|
|
|
|
|
}
|
2024-04-03 18:50:28 +02:00
|
|
|
|
|
|
|
|
|
2025-06-18 15:13:48 +02:00
|
|
|
class DocumentAskForAccess(BaseModel):
|
|
|
|
|
"""Relation model to ask for access to a document."""
|
|
|
|
|
|
|
|
|
|
document = models.ForeignKey(
|
|
|
|
|
Document, on_delete=models.CASCADE, related_name="ask_for_accesses"
|
|
|
|
|
)
|
|
|
|
|
user = models.ForeignKey(
|
|
|
|
|
User, on_delete=models.CASCADE, related_name="ask_for_accesses"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
role = models.CharField(
|
|
|
|
|
max_length=20, choices=RoleChoices.choices, default=RoleChoices.READER
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
db_table = "impress_document_ask_for_access"
|
|
|
|
|
verbose_name = _("Document ask for access")
|
|
|
|
|
verbose_name_plural = _("Document ask for accesses")
|
|
|
|
|
constraints = [
|
|
|
|
|
models.UniqueConstraint(
|
|
|
|
|
fields=["user", "document"],
|
|
|
|
|
name="unique_document_ask_for_access_user",
|
|
|
|
|
violation_error_message=_(
|
|
|
|
|
"This user has already asked for access to this document."
|
|
|
|
|
),
|
|
|
|
|
),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return f"{self.user!s} asked for access to document {self.document!s}"
|
|
|
|
|
|
|
|
|
|
def get_abilities(self, user):
|
|
|
|
|
"""Compute and return abilities for a given user."""
|
2025-11-13 14:44:28 +01:00
|
|
|
user_role = self.document.get_role(user)
|
|
|
|
|
is_admin_or_owner = user_role in PRIVILEGED_ROLES
|
2025-06-18 15:13:48 +02:00
|
|
|
|
2025-11-13 14:44:28 +01:00
|
|
|
set_role_to = [
|
|
|
|
|
role
|
|
|
|
|
for role in RoleChoices.values
|
|
|
|
|
if RoleChoices.get_priority(role) <= RoleChoices.get_priority(user_role)
|
|
|
|
|
]
|
2025-06-18 15:13:48 +02:00
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"destroy": is_admin_or_owner,
|
|
|
|
|
"update": is_admin_or_owner,
|
|
|
|
|
"partial_update": is_admin_or_owner,
|
|
|
|
|
"retrieve": is_admin_or_owner,
|
2025-06-18 15:50:12 +02:00
|
|
|
"accept": is_admin_or_owner,
|
2025-11-13 14:44:28 +01:00
|
|
|
"set_role_to": set_role_to,
|
2025-06-18 15:13:48 +02:00
|
|
|
}
|
|
|
|
|
|
2025-06-18 15:50:12 +02:00
|
|
|
def accept(self, role=None):
|
|
|
|
|
"""Accept a document ask for access resource."""
|
|
|
|
|
if role is None:
|
|
|
|
|
role = self.role
|
|
|
|
|
|
|
|
|
|
DocumentAccess.objects.update_or_create(
|
|
|
|
|
document=self.document,
|
|
|
|
|
user=self.user,
|
|
|
|
|
defaults={"role": role},
|
|
|
|
|
create_defaults={"role": role},
|
|
|
|
|
)
|
|
|
|
|
self.delete()
|
|
|
|
|
|
2025-06-20 15:24:46 +02:00
|
|
|
def send_ask_for_access_email(self, email, language=None):
|
|
|
|
|
"""
|
|
|
|
|
Method allowing a user to send an email notification when asking for access to a document.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
language = language or get_language()
|
|
|
|
|
sender = self.user
|
|
|
|
|
sender_name = sender.full_name or sender.email
|
|
|
|
|
sender_name_email = (
|
|
|
|
|
f"{sender.full_name:s} ({sender.email})"
|
|
|
|
|
if sender.full_name
|
|
|
|
|
else sender.email
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with override(language):
|
|
|
|
|
context = {
|
|
|
|
|
"title": _("{name} would like access to a document!").format(
|
|
|
|
|
name=sender_name
|
|
|
|
|
),
|
|
|
|
|
"message": _(
|
|
|
|
|
"{name} would like access to the following document:"
|
|
|
|
|
).format(name=sender_name_email),
|
|
|
|
|
}
|
|
|
|
|
subject = (
|
|
|
|
|
context["title"]
|
|
|
|
|
if not self.document.title
|
|
|
|
|
else _("{name} is asking for access to the document: {title}").format(
|
|
|
|
|
name=sender_name, title=self.document.title
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.document.send_email(subject, [email], context, language)
|
|
|
|
|
|
2025-06-18 15:13:48 +02:00
|
|
|
|
2025-09-12 15:28:25 +02:00
|
|
|
class Thread(BaseModel):
|
|
|
|
|
"""Discussion thread attached to a document.
|
|
|
|
|
|
|
|
|
|
A thread groups one or many comments. For backward compatibility with the
|
|
|
|
|
existing frontend (useComments hook) we still expose a flattened serializer
|
|
|
|
|
that returns a "content" field representing the first comment's body.
|
|
|
|
|
"""
|
2025-08-27 16:38:42 +02:00
|
|
|
|
|
|
|
|
document = models.ForeignKey(
|
|
|
|
|
Document,
|
|
|
|
|
on_delete=models.CASCADE,
|
2025-09-12 15:28:25 +02:00
|
|
|
related_name="threads",
|
|
|
|
|
)
|
|
|
|
|
creator = models.ForeignKey(
|
|
|
|
|
User,
|
|
|
|
|
on_delete=models.SET_NULL,
|
|
|
|
|
related_name="threads",
|
|
|
|
|
null=True,
|
|
|
|
|
blank=True,
|
|
|
|
|
)
|
|
|
|
|
resolved = models.BooleanField(default=False)
|
|
|
|
|
resolved_at = models.DateTimeField(null=True, blank=True)
|
|
|
|
|
resolved_by = models.ForeignKey(
|
|
|
|
|
User,
|
|
|
|
|
on_delete=models.SET_NULL,
|
|
|
|
|
related_name="resolved_threads",
|
|
|
|
|
null=True,
|
|
|
|
|
blank=True,
|
|
|
|
|
)
|
|
|
|
|
metadata = models.JSONField(default=dict, blank=True)
|
|
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
db_table = "impress_thread"
|
|
|
|
|
ordering = ("-created_at",)
|
|
|
|
|
verbose_name = _("Thread")
|
|
|
|
|
verbose_name_plural = _("Threads")
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
author = self.creator or _("Anonymous")
|
|
|
|
|
return f"Thread by {author!s} on {self.document!s}"
|
|
|
|
|
|
|
|
|
|
def get_abilities(self, user):
|
|
|
|
|
"""Compute and return abilities for a given user (mirrors comment logic)."""
|
|
|
|
|
role = self.document.get_role(user)
|
|
|
|
|
doc_abilities = self.document.get_abilities(user)
|
|
|
|
|
read_access = doc_abilities.get("comment", False)
|
|
|
|
|
write_access = self.creator == user or role in [
|
|
|
|
|
RoleChoices.OWNER,
|
|
|
|
|
RoleChoices.ADMIN,
|
|
|
|
|
]
|
|
|
|
|
return {
|
|
|
|
|
"destroy": write_access,
|
|
|
|
|
"update": write_access,
|
|
|
|
|
"partial_update": write_access,
|
|
|
|
|
"resolve": write_access,
|
|
|
|
|
"retrieve": read_access,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def first_comment(self):
|
|
|
|
|
"""Return the first createdcomment of the thread."""
|
|
|
|
|
return self.comments.order_by("created_at").first()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Comment(BaseModel):
|
|
|
|
|
"""A comment belonging to a thread."""
|
|
|
|
|
|
|
|
|
|
thread = models.ForeignKey(
|
|
|
|
|
Thread,
|
|
|
|
|
on_delete=models.CASCADE,
|
2025-08-27 16:38:42 +02:00
|
|
|
related_name="comments",
|
|
|
|
|
)
|
|
|
|
|
user = models.ForeignKey(
|
|
|
|
|
User,
|
|
|
|
|
on_delete=models.SET_NULL,
|
2025-09-12 15:28:25 +02:00
|
|
|
related_name="thread_comment",
|
2025-08-27 16:38:42 +02:00
|
|
|
null=True,
|
|
|
|
|
blank=True,
|
|
|
|
|
)
|
2025-09-12 15:28:25 +02:00
|
|
|
body = models.JSONField()
|
|
|
|
|
metadata = models.JSONField(default=dict, blank=True)
|
2025-08-27 16:38:42 +02:00
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
db_table = "impress_comment"
|
2025-09-12 15:28:25 +02:00
|
|
|
ordering = ("created_at",)
|
2025-08-27 16:38:42 +02:00
|
|
|
verbose_name = _("Comment")
|
|
|
|
|
verbose_name_plural = _("Comments")
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
2025-09-12 15:28:25 +02:00
|
|
|
"""Return the string representation of the comment."""
|
2025-08-27 16:38:42 +02:00
|
|
|
author = self.user or _("Anonymous")
|
2025-09-12 15:28:25 +02:00
|
|
|
return f"Comment by {author!s} on thread {self.thread_id}"
|
2025-08-27 16:38:42 +02:00
|
|
|
|
|
|
|
|
def get_abilities(self, user):
|
2025-09-12 15:28:25 +02:00
|
|
|
"""Return the abilities of the comment."""
|
|
|
|
|
role = self.thread.document.get_role(user)
|
|
|
|
|
doc_abilities = self.thread.document.get_abilities(user)
|
|
|
|
|
read_access = doc_abilities.get("comment", False)
|
|
|
|
|
can_react = read_access and user.is_authenticated
|
|
|
|
|
write_access = self.user == user or role in [
|
|
|
|
|
RoleChoices.OWNER,
|
|
|
|
|
RoleChoices.ADMIN,
|
|
|
|
|
]
|
2025-08-27 16:38:42 +02:00
|
|
|
return {
|
2025-09-12 15:28:25 +02:00
|
|
|
"destroy": write_access,
|
|
|
|
|
"update": write_access,
|
|
|
|
|
"partial_update": write_access,
|
|
|
|
|
"reactions": can_react,
|
|
|
|
|
"retrieve": read_access,
|
2025-08-27 16:38:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2025-09-12 15:28:25 +02:00
|
|
|
class Reaction(BaseModel):
|
|
|
|
|
"""Aggregated reactions for a given emoji on a comment.
|
|
|
|
|
|
|
|
|
|
We store one row per (comment, emoji) and maintain the list of user IDs who
|
|
|
|
|
reacted with that emoji. This matches the frontend interface where a
|
|
|
|
|
reaction exposes: emoji, createdAt (first reaction date) and userIds.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
comment = models.ForeignKey(
|
|
|
|
|
Comment,
|
|
|
|
|
on_delete=models.CASCADE,
|
|
|
|
|
related_name="reactions",
|
|
|
|
|
)
|
|
|
|
|
emoji = models.CharField(max_length=32)
|
|
|
|
|
users = models.ManyToManyField(User, related_name="reactions")
|
|
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
db_table = "impress_comment_reaction"
|
|
|
|
|
constraints = [
|
|
|
|
|
models.UniqueConstraint(
|
|
|
|
|
fields=["comment", "emoji"],
|
|
|
|
|
name="unique_comment_emoji",
|
|
|
|
|
violation_error_message=_(
|
|
|
|
|
"This emoji has already been reacted to this comment."
|
|
|
|
|
),
|
|
|
|
|
),
|
|
|
|
|
]
|
|
|
|
|
verbose_name = _("Reaction")
|
|
|
|
|
verbose_name_plural = _("Reactions")
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
"""Return the string representation of the reaction."""
|
|
|
|
|
return f"Reaction {self.emoji} on comment {self.comment.id}"
|
|
|
|
|
|
|
|
|
|
|
2024-05-13 23:31:00 +02:00
|
|
|
class Invitation(BaseModel):
|
|
|
|
|
"""User invitation to a document."""
|
|
|
|
|
|
|
|
|
|
email = models.EmailField(_("email address"), null=False, blank=False)
|
|
|
|
|
document = models.ForeignKey(
|
|
|
|
|
Document,
|
|
|
|
|
on_delete=models.CASCADE,
|
|
|
|
|
related_name="invitations",
|
|
|
|
|
)
|
|
|
|
|
role = models.CharField(
|
2024-05-25 08:15:34 +02:00
|
|
|
max_length=20, choices=RoleChoices.choices, default=RoleChoices.READER
|
2024-05-13 23:31:00 +02:00
|
|
|
)
|
|
|
|
|
issuer = models.ForeignKey(
|
|
|
|
|
User,
|
|
|
|
|
on_delete=models.CASCADE,
|
|
|
|
|
related_name="invitations",
|
2024-12-01 11:25:01 +01:00
|
|
|
blank=True,
|
|
|
|
|
null=True,
|
2024-05-13 23:31:00 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
|
db_table = "impress_invitation"
|
|
|
|
|
verbose_name = _("Document invitation")
|
|
|
|
|
verbose_name_plural = _("Document invitations")
|
|
|
|
|
constraints = [
|
|
|
|
|
models.UniqueConstraint(
|
|
|
|
|
fields=["email", "document"], name="email_and_document_unique_together"
|
|
|
|
|
)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
|
return f"{self.email} invited to {self.document}"
|
|
|
|
|
|
|
|
|
|
def clean(self):
|
|
|
|
|
"""Validate fields."""
|
|
|
|
|
super().clean()
|
|
|
|
|
|
|
|
|
|
# Check if an identity already exists for the provided email
|
2025-01-10 09:50:48 +01:00
|
|
|
if (
|
2026-02-11 19:48:51 +01:00
|
|
|
User.objects.filter(email__iexact=self.email).exists()
|
2025-01-10 09:50:48 +01:00
|
|
|
and not settings.OIDC_ALLOW_DUPLICATE_EMAILS
|
|
|
|
|
):
|
2025-01-02 17:20:09 +01:00
|
|
|
raise ValidationError(
|
|
|
|
|
{"email": [_("This email is already associated to a registered user.")]}
|
2024-05-13 23:31:00 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def is_expired(self):
|
|
|
|
|
"""Calculate if invitation is still valid or has expired."""
|
|
|
|
|
if not self.created_at:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
validity_duration = timedelta(seconds=settings.INVITATION_VALIDITY_DURATION)
|
|
|
|
|
return timezone.now() > (self.created_at + validity_duration)
|
|
|
|
|
|
|
|
|
|
def get_abilities(self, user):
|
|
|
|
|
"""Compute and return abilities for a given user."""
|
|
|
|
|
roles = []
|
|
|
|
|
|
|
|
|
|
if user.is_authenticated:
|
2024-09-06 16:12:02 +02:00
|
|
|
teams = user.teams
|
2024-05-13 23:31:00 +02:00
|
|
|
try:
|
|
|
|
|
roles = self.user_roles or []
|
|
|
|
|
except AttributeError:
|
|
|
|
|
try:
|
|
|
|
|
roles = self.document.accesses.filter(
|
|
|
|
|
models.Q(user=user) | models.Q(team__in=teams),
|
|
|
|
|
).values_list("role", flat=True)
|
|
|
|
|
except (self._meta.model.DoesNotExist, IndexError):
|
|
|
|
|
roles = []
|
|
|
|
|
|
2024-10-22 00:28:16 +02:00
|
|
|
is_admin_or_owner = bool(
|
|
|
|
|
set(roles).intersection({RoleChoices.OWNER, RoleChoices.ADMIN})
|
|
|
|
|
)
|
2024-08-16 16:55:00 +02:00
|
|
|
|
2024-05-13 23:31:00 +02:00
|
|
|
return {
|
2024-10-22 00:28:16 +02:00
|
|
|
"destroy": is_admin_or_owner,
|
|
|
|
|
"update": is_admin_or_owner,
|
|
|
|
|
"partial_update": is_admin_or_owner,
|
|
|
|
|
"retrieve": is_admin_or_owner,
|
2024-05-13 23:31:00 +02:00
|
|
|
}
|