♻️(backend) refactor post hackathon to a first working version
This project was copied and hacked to make a POC in a 2-day hackathon. We need to clean and refactor things in order to get a first version of the product we want.
This commit is contained in:
committed by
Samuel Paccoud
parent
0a3e26486e
commit
0f9327a1de
@@ -1,32 +1,26 @@
|
||||
"""
|
||||
Declare and configure the models for the publish core application
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import textwrap
|
||||
import uuid
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import models as auth_models
|
||||
from django.contrib.auth.base_user import AbstractBaseUser
|
||||
from django.core import exceptions, mail, validators
|
||||
from django.core import mail, validators
|
||||
from django.db import models
|
||||
from django.template.base import Template as DjangoTemplate
|
||||
from django.template.context import Context
|
||||
from django.utils.functional import lazy
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from django.template.base import Template as DjangoTemplate
|
||||
from django.template.context import Context
|
||||
from django.template.engine import Engine
|
||||
|
||||
import frontmatter
|
||||
import markdown
|
||||
from weasyprint import CSS, HTML
|
||||
from weasyprint.text.fonts import FontConfiguration
|
||||
|
||||
import jsonschema
|
||||
from rest_framework_simplejwt.exceptions import InvalidToken
|
||||
from rest_framework_simplejwt.settings import api_settings
|
||||
from timezone_field import TimeZoneField
|
||||
from weasyprint import CSS, HTML
|
||||
from weasyprint.text.fonts import FontConfiguration
|
||||
|
||||
|
||||
class RoleChoices(models.TextChoices):
|
||||
@@ -77,7 +71,33 @@ class BaseModel(models.Model):
|
||||
class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
|
||||
"""User model to work with OIDC only authentication."""
|
||||
|
||||
email = models.EmailField(_("email address"), unique=True)
|
||||
sub_validator = validators.RegexValidator(
|
||||
regex=r"^[\w.@+-]+\Z",
|
||||
message=_(
|
||||
"Enter a valid sub. This value may contain only letters, "
|
||||
"numbers, and @/./+/-/_ characters."
|
||||
),
|
||||
)
|
||||
|
||||
sub = models.CharField(
|
||||
_("sub"),
|
||||
help_text=_(
|
||||
"Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_ characters only."
|
||||
),
|
||||
max_length=255,
|
||||
unique=True,
|
||||
validators=[sub_validator],
|
||||
blank=True,
|
||||
null=True,
|
||||
)
|
||||
email = models.EmailField(_("identity email address"), blank=True, null=True)
|
||||
|
||||
# Unlike the "email" field which stores the email coming from the OIDC token, this field
|
||||
# stores the email used by staff users to login to the admin site
|
||||
admin_email = models.EmailField(
|
||||
_("admin email address"), unique=True, blank=True, null=True
|
||||
)
|
||||
|
||||
language = models.CharField(
|
||||
max_length=10,
|
||||
choices=lazy(lambda: settings.LANGUAGES, tuple)(),
|
||||
@@ -112,7 +132,7 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
|
||||
|
||||
objects = auth_models.UserManager()
|
||||
|
||||
USERNAME_FIELD = "email"
|
||||
USERNAME_FIELD = "admin_email"
|
||||
REQUIRED_FIELDS = []
|
||||
|
||||
class Meta:
|
||||
@@ -120,80 +140,20 @@ class User(AbstractBaseUser, BaseModel, auth_models.PermissionsMixin):
|
||||
verbose_name = _("user")
|
||||
verbose_name_plural = _("users")
|
||||
|
||||
|
||||
class Identity(BaseModel):
|
||||
"""User identity"""
|
||||
|
||||
sub_validator = validators.RegexValidator(
|
||||
regex=r"^[\w.@+-]+\Z",
|
||||
message=_(
|
||||
"Enter a valid sub. This value may contain only letters, "
|
||||
"numbers, and @/./+/-/_ characters."
|
||||
),
|
||||
)
|
||||
|
||||
user = models.ForeignKey(User, related_name="identities", on_delete=models.CASCADE)
|
||||
sub = models.CharField(
|
||||
_("sub"),
|
||||
help_text=_(
|
||||
"Required. 255 characters or fewer. Letters, numbers, and @/./+/-/_ characters only."
|
||||
),
|
||||
max_length=255,
|
||||
unique=True,
|
||||
validators=[sub_validator],
|
||||
blank=True,
|
||||
null=True,
|
||||
)
|
||||
email = models.EmailField(_("email address"))
|
||||
is_main = models.BooleanField(
|
||||
_("main"),
|
||||
default=False,
|
||||
help_text=_("Designates whether the email is the main one."),
|
||||
)
|
||||
|
||||
class Meta:
|
||||
db_table = "publish_identity"
|
||||
ordering = ("-is_main", "email")
|
||||
verbose_name = _("identity")
|
||||
verbose_name_plural = _("identities")
|
||||
constraints = [
|
||||
# Uniqueness
|
||||
models.UniqueConstraint(
|
||||
fields=["user", "email"],
|
||||
name="unique_user_email",
|
||||
violation_error_message=_(
|
||||
"This email address is already declared for this user."
|
||||
),
|
||||
),
|
||||
]
|
||||
|
||||
def __str__(self):
|
||||
main_str = "[main]" if self.is_main else ""
|
||||
return f"{self.email:s}{main_str:s}"
|
||||
return self.email or self.admin_email or str(self.id)
|
||||
|
||||
def clean(self):
|
||||
"""Normalize the email field and clean the 'is_main' field."""
|
||||
self.email = User.objects.normalize_email(self.email)
|
||||
if not self.user.identities.exclude(pk=self.pk).filter(is_main=True).exists():
|
||||
if not self.created_at:
|
||||
self.is_main = True
|
||||
elif not self.is_main:
|
||||
raise exceptions.ValidationError(
|
||||
{"is_main": "A user should have one and only one main identity."}
|
||||
)
|
||||
super().clean()
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
"""Ensure users always have one and only one main identity."""
|
||||
super().save(*args, **kwargs)
|
||||
if self.is_main is True:
|
||||
self.user.identities.exclude(id=self.id).update(is_main=False)
|
||||
def email_user(self, subject, message, from_email=None, **kwargs):
|
||||
"""Email this user."""
|
||||
if not self.email:
|
||||
raise ValueError("User has no email address.")
|
||||
mail.send_mail(subject, message, from_email, [self.email], **kwargs)
|
||||
|
||||
|
||||
class Team(BaseModel):
|
||||
"""Team used for role based access control when matched with teams in OIDC tokens."""
|
||||
"""Team used for role based access control when matched with templates in OIDC tokens."""
|
||||
|
||||
name = models.CharField(max_length=100)
|
||||
name = models.CharField(max_length=100, unique=True)
|
||||
|
||||
class Meta:
|
||||
db_table = "publish_role"
|
||||
@@ -227,11 +187,6 @@ class Template(BaseModel):
|
||||
def __str__(self):
|
||||
return self.title
|
||||
|
||||
|
||||
if not self.body:
|
||||
return ""
|
||||
return markdown.markdown(textwrap.dedent(self.body))
|
||||
|
||||
def generate_document(self, body):
|
||||
"""
|
||||
Generate and return a PDF document for this template around the
|
||||
@@ -240,23 +195,27 @@ class Template(BaseModel):
|
||||
document = frontmatter.loads(body)
|
||||
metadata = document.metadata
|
||||
markdown_body = document.content.strip()
|
||||
body_html = markdown.markdown(textwrap.dedent(markdown_body)) if markdown_body else ""
|
||||
body_html = (
|
||||
markdown.markdown(textwrap.dedent(markdown_body)) if markdown_body else ""
|
||||
)
|
||||
|
||||
document_html = HTML(string=DjangoTemplate(self.code).render(Context({"body": body_html, **metadata})))
|
||||
document_html = HTML(
|
||||
string=DjangoTemplate(self.code).render(
|
||||
Context({"body": body_html, **metadata})
|
||||
)
|
||||
)
|
||||
css = CSS(
|
||||
string=self.css,
|
||||
font_config=FontConfiguration(),
|
||||
)
|
||||
return document_html.write_pdf(stylesheets=[css], zoom=1)
|
||||
|
||||
|
||||
def get_abilities(self, user):
|
||||
"""
|
||||
Compute and return abilities for a given user on the template.
|
||||
"""
|
||||
is_owner_or_admin = False
|
||||
# Compute user role
|
||||
role = None
|
||||
|
||||
if user.is_authenticated:
|
||||
try:
|
||||
role = self.user_role
|
||||
@@ -266,19 +225,20 @@ class Template(BaseModel):
|
||||
except (TemplateAccess.DoesNotExist, IndexError):
|
||||
role = None
|
||||
|
||||
is_owner_or_admin = role in [RoleChoices.OWNER, RoleChoices.ADMIN]
|
||||
is_owner_or_admin = role in [RoleChoices.OWNER, RoleChoices.ADMIN]
|
||||
can_get = self.is_public or role is not None
|
||||
|
||||
return {
|
||||
"get": True,
|
||||
"patch": is_owner_or_admin,
|
||||
"put": is_owner_or_admin,
|
||||
"delete": role == RoleChoices.OWNER,
|
||||
"destroy": role == RoleChoices.OWNER,
|
||||
"generate_document": can_get,
|
||||
"manage_accesses": is_owner_or_admin,
|
||||
"update": is_owner_or_admin,
|
||||
"retrieve": can_get,
|
||||
}
|
||||
|
||||
|
||||
class TemplateAccess(BaseModel):
|
||||
"""Link table between templates and contacts."""
|
||||
"""Relation model to give access to a template for a user or a team with a role."""
|
||||
|
||||
template = models.ForeignKey(
|
||||
Template,
|
||||
@@ -337,16 +297,17 @@ class TemplateAccess(BaseModel):
|
||||
except AttributeError:
|
||||
try:
|
||||
role = self._meta.model.objects.filter(
|
||||
template=self.template_id, user=user
|
||||
template=self.template_id,
|
||||
user=user,
|
||||
).values("role")[0]["role"]
|
||||
except (self._meta.model.DoesNotExist, IndexError):
|
||||
role = None
|
||||
|
||||
is_template_owner_or_admin = role in [RoleChoices.OWNER, RoleChoices.ADMIN]
|
||||
is_template_owner_or_admin = role in [RoleChoices.OWNER, RoleChoices.ADMIN]
|
||||
|
||||
if self.role == RoleChoices.OWNER:
|
||||
can_delete = (
|
||||
user.id == self.user_id
|
||||
role == RoleChoices.OWNER
|
||||
and self.template.accesses.filter(role=RoleChoices.OWNER).count() > 1
|
||||
)
|
||||
set_role_to = [RoleChoices.ADMIN, RoleChoices.MEMBER] if can_delete else []
|
||||
@@ -365,10 +326,9 @@ class TemplateAccess(BaseModel):
|
||||
pass
|
||||
|
||||
return {
|
||||
"delete": can_delete,
|
||||
"get": bool(role),
|
||||
"patch": bool(set_role_to),
|
||||
"put": bool(set_role_to),
|
||||
"destroy": can_delete,
|
||||
"update": bool(set_role_to),
|
||||
"retrieve": bool(role),
|
||||
"set_role_to": set_role_to,
|
||||
}
|
||||
|
||||
@@ -390,9 +350,8 @@ def oidc_user_getter(validated_token):
|
||||
) from exc
|
||||
|
||||
try:
|
||||
user = User.objects.get(identities__sub=user_id)
|
||||
user = User.objects.get(sub=user_id)
|
||||
except User.DoesNotExist:
|
||||
user = User.objects.create()
|
||||
Identities.objects.create(user=user, sub=user_id, email=validated_token["email"])
|
||||
user = User.objects.create(sub=user_id, email=validated_token.get("email"))
|
||||
|
||||
return user
|
||||
|
||||
Reference in New Issue
Block a user