✨(models/api) allow inviting external users to a document by their email
We want to be able to share a document with a person even if this person does not have an account in impress yet. This code is ported from https://github.com/numerique-gouv/people.
This commit is contained in:
committed by
Samuel Paccoud
parent
125284456f
commit
515b686795
@@ -3,21 +3,27 @@ Declare and configure the models for the impress core application
|
||||
"""
|
||||
import hashlib
|
||||
import json
|
||||
import smtplib
|
||||
import textwrap
|
||||
import uuid
|
||||
from datetime import timedelta
|
||||
from logging import getLogger
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import models as auth_models
|
||||
from django.contrib.auth.base_user import AbstractBaseUser
|
||||
from django.core import mail, validators
|
||||
from django.contrib.sites.models import Site
|
||||
from django.core import exceptions, mail, validators
|
||||
from django.core.files.base import ContentFile
|
||||
from django.core.files.storage import default_storage
|
||||
from django.db import models
|
||||
from django.template.base import Template as DjangoTemplate
|
||||
from django.template.context import Context
|
||||
from django.template.loader import render_to_string
|
||||
from django.utils import html, timezone
|
||||
from django.utils.functional import lazy
|
||||
from django.utils.html import format_html
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.utils.translation import override
|
||||
|
||||
import frontmatter
|
||||
import markdown
|
||||
@@ -26,6 +32,8 @@ from timezone_field import TimeZoneField
|
||||
from weasyprint import CSS, HTML
|
||||
from weasyprint.text.fonts import FontConfiguration
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
def get_resource_roles(resource, user):
|
||||
"""Compute the roles a user has on a resource."""
|
||||
@@ -164,6 +172,42 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
|
||||
def __str__(self):
|
||||
return self.email or self.admin_email or str(self.id)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
"""
|
||||
If it's a new user, give its user access to the documents to which s.he was invited.
|
||||
"""
|
||||
is_adding = self._state.adding
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
if is_adding:
|
||||
self._convert_valid_invitations()
|
||||
|
||||
def _convert_valid_invitations(self):
|
||||
"""
|
||||
Convert valid invitations to document accesses.
|
||||
Expired invitations are ignored.
|
||||
"""
|
||||
valid_invitations = Invitation.objects.filter(
|
||||
email=self.email,
|
||||
created_at__gte=(
|
||||
timezone.now()
|
||||
- timedelta(seconds=settings.INVITATION_VALIDITY_DURATION)
|
||||
),
|
||||
).select_related("document")
|
||||
|
||||
if not valid_invitations.exists():
|
||||
return
|
||||
|
||||
DocumentAccess.objects.bulk_create(
|
||||
[
|
||||
DocumentAccess(
|
||||
user=self, document=invitation.document, role=invitation.role
|
||||
)
|
||||
for invitation in valid_invitations
|
||||
]
|
||||
)
|
||||
valid_invitations.delete()
|
||||
|
||||
def email_user(self, subject, message, from_email=None, **kwargs):
|
||||
"""Email this user."""
|
||||
if not self.email:
|
||||
@@ -523,7 +567,7 @@ class Template(BaseModel):
|
||||
|
||||
document_html = HTML(
|
||||
string=DjangoTemplate(self.code).render(
|
||||
Context({"body": format_html(body_html), **metadata})
|
||||
Context({"body": html.format_html(body_html), **metadata})
|
||||
)
|
||||
)
|
||||
css = CSS(
|
||||
@@ -576,3 +620,110 @@ class TemplateAccess(BaseAccess):
|
||||
Compute and return abilities for a given user on the template access.
|
||||
"""
|
||||
return self._get_abilities(self.template, user)
|
||||
|
||||
|
||||
class Invitation(BaseModel):
|
||||
"""User invitation to a document."""
|
||||
|
||||
email = models.EmailField(_("email address"), null=False, blank=False)
|
||||
document = models.ForeignKey(
|
||||
Document,
|
||||
on_delete=models.CASCADE,
|
||||
related_name="invitations",
|
||||
)
|
||||
role = models.CharField(
|
||||
max_length=20, choices=RoleChoices.choices, default=RoleChoices.MEMBER
|
||||
)
|
||||
issuer = models.ForeignKey(
|
||||
User,
|
||||
on_delete=models.CASCADE,
|
||||
related_name="invitations",
|
||||
)
|
||||
|
||||
class Meta:
|
||||
db_table = "impress_invitation"
|
||||
verbose_name = _("Document invitation")
|
||||
verbose_name_plural = _("Document invitations")
|
||||
constraints = [
|
||||
models.UniqueConstraint(
|
||||
fields=["email", "document"], name="email_and_document_unique_together"
|
||||
)
|
||||
]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.email} invited to {self.document}"
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
"""Make invitations read-only."""
|
||||
if self.created_at:
|
||||
raise exceptions.PermissionDenied()
|
||||
|
||||
super().save(*args, **kwargs)
|
||||
self.email_invitation()
|
||||
|
||||
def clean(self):
|
||||
"""Validate fields."""
|
||||
super().clean()
|
||||
|
||||
# Check if an identity already exists for the provided email
|
||||
if User.objects.filter(email=self.email).exists():
|
||||
raise exceptions.ValidationError(
|
||||
{"email": _("This email is already associated to a registered user.")}
|
||||
)
|
||||
|
||||
@property
|
||||
def is_expired(self):
|
||||
"""Calculate if invitation is still valid or has expired."""
|
||||
if not self.created_at:
|
||||
return None
|
||||
|
||||
validity_duration = timedelta(seconds=settings.INVITATION_VALIDITY_DURATION)
|
||||
return timezone.now() > (self.created_at + validity_duration)
|
||||
|
||||
def get_abilities(self, user):
|
||||
"""Compute and return abilities for a given user."""
|
||||
can_delete = False
|
||||
roles = []
|
||||
|
||||
if user.is_authenticated:
|
||||
teams = user.get_teams()
|
||||
try:
|
||||
roles = self.user_roles or []
|
||||
except AttributeError:
|
||||
try:
|
||||
roles = self.document.accesses.filter(
|
||||
models.Q(user=user) | models.Q(team__in=teams),
|
||||
).values_list("role", flat=True)
|
||||
except (self._meta.model.DoesNotExist, IndexError):
|
||||
roles = []
|
||||
|
||||
can_delete = bool(
|
||||
set(roles).intersection({RoleChoices.OWNER, RoleChoices.ADMIN})
|
||||
)
|
||||
|
||||
return {
|
||||
"destroy": can_delete,
|
||||
"update": False,
|
||||
"partial_update": False,
|
||||
"retrieve": bool(roles),
|
||||
}
|
||||
|
||||
def email_invitation(self):
|
||||
"""Email invitation to the user."""
|
||||
try:
|
||||
with override(self.issuer.language):
|
||||
title = _("Invitation to join Impress!")
|
||||
template_vars = {"title": title, "site": Site.objects.get_current()}
|
||||
msg_html = render_to_string("mail/html/invitation.html", template_vars)
|
||||
msg_plain = render_to_string("mail/text/invitation.txt", template_vars)
|
||||
mail.send_mail(
|
||||
title,
|
||||
msg_plain,
|
||||
settings.EMAIL_FROM,
|
||||
[self.email],
|
||||
html_message=msg_html,
|
||||
fail_silently=False,
|
||||
)
|
||||
|
||||
except smtplib.SMTPException as exception:
|
||||
logger.error("invitation to %s was not sent: %s", self.email, exception)
|
||||
|
||||
Reference in New Issue
Block a user