This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
people/src/backend/plugins/la_suite/hooks_utils/communes.py
Quentin BEY 28fdee868d ♻️(plugins) rewrite plugin system as django app
This allow more flexibility around the installed plugins, this will
allow to add models in plugins if needed.
2025-03-26 19:56:23 +01:00

244 lines
8.3 KiB
Python

"""Organization related plugins."""
import logging
import re
from django.conf import settings
from django.utils.text import slugify
import requests
from requests.adapters import HTTPAdapter, Retry
from mailbox_manager.enums import MailDomainRoleChoices
from mailbox_manager.models import MailDomain, MailDomainAccess
logger = logging.getLogger(__name__)
class ApiCall:
"""Encapsulates a call to an external API"""
inputs: dict = {}
method: str = "GET"
base: str = ""
url: str = ""
params: dict = {}
headers: dict = {}
response_data = None
def execute(self):
"""Call the specified API endpoint with supplied parameters and record response"""
if self.method in ("POST", "PATCH"):
response = requests.request(
method=self.method,
url=f"{self.base}/{self.url}",
json=self.params,
headers=self.headers,
timeout=20,
)
else:
response = requests.request(
method=self.method,
url=f"{self.base}/{self.url}",
params=self.params,
headers=self.headers,
timeout=20,
)
self.response_data = response.json()
logger.info(
"API call: %s %s %s %s",
self.method,
self.url,
self.params,
self.response_data,
)
class CommuneCreation:
"""
This plugin handles setup tasks for French communes.
"""
_api_url = "https://recherche-entreprises.api.gouv.fr/search?q={siret}"
def get_organization_name_from_results(self, data, siret):
"""Return the organization name from the results of a SIRET search."""
for result in data["results"]:
nature = "nature_juridique"
commune = nature in result and result[nature] == "7210"
if commune:
return result["siege"]["libelle_commune"].title()
logger.warning("Not a commune: SIRET %s", siret)
return None
def dns_call(self, spec):
"""Call to add a DNS record"""
zone_name = self.zone_name(spec.inputs["name"])
records = [
{
"name": item["target"],
"type": item["type"].upper(),
"data": item["value"],
"ttl": 3600,
}
for item in spec.response_data
]
result = ApiCall()
result.method = "PATCH"
result.base = "https://api.scaleway.com"
result.url = f"/domain/v2beta1/dns-zones/{zone_name}/records"
result.params = {"changes": [{"add": {"records": records}}]}
result.headers = {"X-Auth-Token": settings.DNS_PROVISIONING_API_CREDENTIALS}
return result
def normalize_name(self, name: str) -> str:
"""Map the name to a standard form"""
name = re.sub("'", "-", name)
return slugify(name)
def zone_name(self, name: str) -> str:
"""Derive the zone name from the commune name"""
normalized = self.normalize_name(name)
return f"{normalized}.{settings.DNS_PROVISIONING_TARGET_ZONE}"
def complete_commune_creation(self, name: str) -> ApiCall:
"""Specify the tasks to be completed after a commune is created."""
inputs = {"name": self.normalize_name(name)}
create_zone = ApiCall()
create_zone.method = "POST"
create_zone.base = "https://api.scaleway.com"
create_zone.url = "/domain/v2beta1/dns-zones"
create_zone.params = {
"project_id": settings.DNS_PROVISIONING_RESOURCE_ID,
"domain": settings.DNS_PROVISIONING_TARGET_ZONE,
"subdomain": inputs["name"],
}
create_zone.headers = {
"X-Auth-Token": settings.DNS_PROVISIONING_API_CREDENTIALS
}
zone_name = self.zone_name(inputs["name"])
create_domain = ApiCall()
create_domain.method = "POST"
create_domain.base = settings.MAIL_PROVISIONING_API_URL
create_domain.url = "/domains/"
create_domain.params = {
"name": zone_name,
"delivery": "virtual",
"features": ["webmail", "mailbox"],
"context_name": zone_name,
}
create_domain.headers = {
"Authorization": f"Basic {settings.MAIL_PROVISIONING_API_CREDENTIALS}"
}
spec_domain = ApiCall()
spec_domain.inputs = inputs
spec_domain.base = settings.MAIL_PROVISIONING_API_URL
spec_domain.url = f"/domains/{zone_name}/spec"
spec_domain.headers = {
"Authorization": f"Basic {settings.MAIL_PROVISIONING_API_CREDENTIALS}"
}
return [create_zone, create_domain, spec_domain]
def complete_zone_creation(self, spec_call):
"""Specify the tasks to be performed to set up the zone."""
return self.dns_call(spec_call)
def run_after_create(self, organization):
"""After creating an organization, update the organization name."""
logger.info("In CommuneCreation")
if not organization.registration_id_list:
# No registration ID to convert...
return
# In the nominal case, there is only one registration ID because
# the organization has been created from it.
try:
# Retry logic as the API may be rate limited
s = requests.Session()
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[429])
s.mount("https://", HTTPAdapter(max_retries=retries))
siret = organization.registration_id_list[0]
response = s.get(self._api_url.format(siret=siret), timeout=10)
response.raise_for_status()
data = response.json()
name = self.get_organization_name_from_results(data, siret)
# Not a commune ?
if not name:
return
except requests.RequestException as exc:
logger.exception("%s: Unable to fetch organization name from SIRET", exc)
return
organization.name = name
organization.save(update_fields=["name", "updated_at"])
logger.info("Organization %s name updated to %s", organization, name)
zone_name = self.zone_name(name)
support = "support-regie@numerique.gouv.fr"
MailDomain.objects.get_or_create(name=zone_name, support_email=support)
# Compute and execute the rest of the process
tasks = self.complete_commune_creation(name)
for task in tasks:
task.execute()
last_task = self.complete_zone_creation(tasks[-1])
last_task.execute()
def complete_grant_access(self, sub, zone_name):
"""Specify the tasks to be completed after making a user admin"""
create_user = ApiCall()
create_user.method = "POST"
create_user.base = settings.MAIL_PROVISIONING_API_URL
create_user.url = "/users/"
create_user.params = {
"name": sub,
"password": "no",
"is_admin": False,
"perms": [],
}
create_user.headers = {
"Authorization": f"Basic {settings.MAIL_PROVISIONING_API_CREDENTIALS}"
}
grant_access = ApiCall()
grant_access.method = "POST"
grant_access.base = settings.MAIL_PROVISIONING_API_URL
grant_access.url = "/allows/"
grant_access.params = {
"user": sub,
"domain": zone_name,
}
grant_access.headers = {
"Authorization": f"Basic {settings.MAIL_PROVISIONING_API_CREDENTIALS}"
}
return [create_user, grant_access]
def run_after_grant_access(self, organization_access):
"""After granting an organization access, check for needed domain access grant."""
orga = organization_access.organization
user = organization_access.user
zone_name = self.zone_name(orga.name)
try:
domain = MailDomain.objects.get(name=zone_name)
except MailDomain.DoesNotExist:
domain = None
if domain:
MailDomainAccess.objects.create(
domain=domain, user=user, role=MailDomainRoleChoices.OWNER
)
tasks = self.complete_grant_access(user.sub, zone_name)
for task in tasks:
task.execute()