This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
people/src/backend/plugins/organizations.py

273 lines
9.8 KiB
Python
Raw Normal View History

"""Organization related plugins."""
import logging
from django.conf import settings
import requests
from requests.adapters import HTTPAdapter, Retry
from core.plugins.base import BaseOrganizationPlugin
from mailbox_manager.enums import MailDomainRoleChoices
from mailbox_manager.models import MailDomain, MailDomainAccess
logger = logging.getLogger(__name__)
class NameFromSiretOrganizationPlugin(BaseOrganizationPlugin):
"""
This plugin is used to convert the organization registration ID
to the proper name. For French organization the registration ID
is the SIRET.
This is a very specific plugin for French organizations and this
first implementation is very basic. It surely needs to be improved
later.
"""
_api_url = "https://recherche-entreprises.api.gouv.fr/search?q={siret}"
@staticmethod
def get_organization_name_from_results(data, siret):
"""Return the organization name from the results of a SIRET search."""
for result in data["results"]:
for organization in result["matching_etablissements"]:
if organization.get("siret") == siret:
store_signs = organization.get("liste_enseignes") or []
if store_signs:
return store_signs[0].title()
if name := result.get("nom_raison_sociale"):
return name.title()
logger.warning("No organization name found for SIRET %s", siret)
return None
def run_after_create(self, organization):
"""After creating an organization, update the organization name."""
if not organization.registration_id_list:
# No registration ID to convert...
return
if organization.name not in organization.registration_id_list:
# The name has probably already been customized
return
# In the nominal case, there is only one registration ID because
# the organization as been created from it.
try:
# Retry logic as the API may be rate limited
s = requests.Session()
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[429])
s.mount("https://", HTTPAdapter(max_retries=retries))
siret = organization.registration_id_list[0]
response = s.get(self._api_url.format(siret=siret), timeout=10)
response.raise_for_status()
data = response.json()
name = self.get_organization_name_from_results(data, siret)
if not name:
return
except requests.RequestException as exc:
logger.exception("%s: Unable to fetch organization name from SIRET", exc)
return
organization.name = name
organization.save(update_fields=["name", "updated_at"])
logger.info("Organization %s name updated to %s", organization, name)
def run_after_grant_access(self, organization_access):
"""After granting an organization access, we don't need to do anything."""
class ApiCall:
"""Encapsulates a call to an external API"""
inputs: dict = {}
method: str = "GET"
base: str = ""
url: str = ""
params: dict = {}
headers: dict = {}
response_data = None
def execute(self):
"""Call the specified API endpoint with supplied parameters and record response"""
if self.method in ("POST", "PATCH"):
response = requests.request(
method=self.method,
url=f"{self.base}/{self.url}",
json=self.params,
headers=self.headers,
timeout=5,
)
else:
response = requests.request(
method=self.method,
url=f"{self.base}/{self.url}",
params=self.params,
headers=self.headers,
timeout=5,
)
self.response_data = response.json()
logger.info(
"API call: %s %s %s %s",
self.method,
self.url,
self.params,
self.response_data,
)
class CommuneCreation(BaseOrganizationPlugin):
"""
This plugin handles setup tasks for French communes.
"""
_api_url = "https://recherche-entreprises.api.gouv.fr/search?q={siret}"
def get_organization_name_from_results(self, data, siret):
"""Return the organization name from the results of a SIRET search."""
for result in data["results"]:
nature = "nature_juridique"
commune = nature in result and result[nature] == "7210"
if commune:
return result["siege"]["libelle_commune"].title()
logger.warning("Not a commune: SIRET %s", siret)
return None
def dns_call(self, spec):
"""Call to add a DNS record"""
records = [
{
"name": item["target"],
"type": item["type"].upper(),
"data": item["value"],
"ttl": 3600,
}
for item in spec.response_data
]
result = ApiCall()
result.method = "PATCH"
result.base = "https://api.scaleway.com"
result.url = (
f"/domain/v2beta1/dns-zones/{spec.inputs['name']}.collectivite.fr/records"
)
result.params = {"changes": [{"add": {"records": records}}]}
result.headers = {"X-Auth-Token": settings.DNS_PROVISIONING_API_CREDENTIALS}
return result
def complete_commune_creation(self, name: str) -> ApiCall:
"""Specify the tasks to be completed after a commune is created."""
inputs = {"name": name.lower()}
create_zone = ApiCall()
create_zone.method = "POST"
create_zone.base = "https://api.scaleway.com"
create_zone.url = "/domain/v2beta1/dns-zones"
create_zone.params = {
"project_id": settings.DNS_PROVISIONING_RESOURCE_ID,
"domain": "collectivite.fr",
"subdomain": inputs["name"],
}
create_zone.headers = {
"X-Auth-Token": settings.DNS_PROVISIONING_API_CREDENTIALS
}
create_domain = ApiCall()
create_domain.method = "POST"
create_domain.base = settings.MAIL_PROVISIONING_API_URL
create_domain.url = "/domains"
create_domain.params = {
"name": f"{inputs['name']}.collectivite.fr",
"delivery": "virtual",
"features": ["webmail", "mailbox"],
"context_name": f"{inputs['name']}.collectivite.fr",
}
create_domain.headers = {
"Authorization": f"Basic {settings.MAIL_PROVISIONING_API_CREDENTIALS}"
}
spec_domain = ApiCall()
spec_domain.inputs = inputs
spec_domain.base = settings.MAIL_PROVISIONING_API_URL
spec_domain.url = f"/domains/{inputs['name']}.collectivite.fr/spec"
spec_domain.headers = {
"Authorization": f"Basic {settings.MAIL_PROVISIONING_API_CREDENTIALS}"
}
return [create_zone, create_domain, spec_domain]
def complete_zone_creation(self, spec_call):
"""Specify the tasks to be performed to set up the zone."""
return self.dns_call(spec_call)
def run_after_create(self, organization):
"""After creating an organization, update the organization name."""
logger.info("In CommuneCreation")
if not organization.registration_id_list:
# No registration ID to convert...
return
# In the nominal case, there is only one registration ID because
# the organization has been created from it.
try:
# Retry logic as the API may be rate limited
s = requests.Session()
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[429])
s.mount("https://", HTTPAdapter(max_retries=retries))
siret = organization.registration_id_list[0]
response = s.get(self._api_url.format(siret=siret), timeout=10)
response.raise_for_status()
data = response.json()
name = self.get_organization_name_from_results(data, siret)
# Not a commune ?
if not name:
return
except requests.RequestException as exc:
logger.exception("%s: Unable to fetch organization name from SIRET", exc)
return
organization.name = name
organization.save(update_fields=["name", "updated_at"])
logger.info("Organization %s name updated to %s", organization, name)
MailDomain.objects.get_or_create(name=f"{name.lower()}.collectivite.fr")
# Compute and execute the rest of the process
tasks = self.complete_commune_creation(name)
for task in tasks:
task.execute()
last_task = self.complete_zone_creation(tasks[-1])
last_task.execute()
def run_after_grant_access(self, organization_access):
"""After granting an organization access, check for needed domain access grant."""
orga = organization_access.organization
user = organization_access.user
zone_name = orga.name.lower() + ".collectivite.fr"
try:
domain = MailDomain.objects.get(name=zone_name)
except MailDomain.DoesNotExist:
domain = None
if domain:
MailDomainAccess.objects.create(
domain=domain, user=user, role=MailDomainRoleChoices.OWNER
)
grant_access = ApiCall()
grant_access.method = "POST"
grant_access.base = settings.MAIL_PROVISIONING_API_URL
grant_access.url = "/allows"
grant_access.params = {
"user": user.sub,
"domain": zone_name,
}
grant_access.headers = {
"Authorization": f"Basic {settings.MAIL_PROVISIONING_API_CREDENTIALS}"
}
grant_access.execute()