(organization) add metadata field

This allows to store custom values which can be reused along the
organization lifetime.
This commit is contained in:
Quentin BEY
2025-03-10 15:59:01 +01:00
committed by BEY Quentin
parent 3aaddc0493
commit 7ce5b28af4
8 changed files with 228 additions and 13 deletions

View File

@@ -10,6 +10,7 @@ and this project adheres to
### Added ### Added
- ✨(organization) add metadata field #790
- ⬆️(nginx) bump nginx-unprivileged to 1.27 #797 - ⬆️(nginx) bump nginx-unprivileged to 1.27 #797
- ✨(teams) allow broadly available teams #796 - ✨(teams) allow broadly available teams #796
- ✨(teams) update and enhance team invitation email - ✨(teams) update and enhance team invitation email

View File

@@ -0,0 +1,15 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"title": "Organization Metadata",
"properties": {
"is_public_service": {
"type": "boolean",
"title": "Is public service"
},
"is_commune": {
"type": "boolean",
"title": "Is commune"
}
}
}

View File

@@ -0,0 +1,18 @@
# Generated by Django 5.1.7 on 2025-03-10 14:08
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0011_team_is_visible_all_services'),
]
operations = [
migrations.AddField(
model_name='organization',
name='metadata',
field=models.JSONField(blank=True, default=dict, help_text='A JSON object containing the organization metadata', verbose_name='metadata'),
),
]

View File

@@ -9,8 +9,9 @@ import smtplib
import uuid import uuid
from contextlib import suppress from contextlib import suppress
from datetime import timedelta from datetime import timedelta
from functools import lru_cache
from logging import getLogger from logging import getLogger
from typing import Tuple from typing import Optional, Tuple
from django.conf import settings from django.conf import settings
from django.contrib.auth import models as auth_models from django.contrib.auth import models as auth_models
@@ -40,11 +41,37 @@ from core.validators import get_field_validators_from_setting
logger = getLogger(__name__) logger = getLogger(__name__)
current_dir = os.path.dirname(os.path.abspath(__file__)) current_dir = os.path.dirname(os.path.abspath(__file__))
contact_schema_path = os.path.join(current_dir, "jsonschema", "contact_data.json") contact_schema_path = os.path.join(current_dir, "jsonschema", "contact_data.json")
with open(contact_schema_path, "r", encoding="utf-8") as contact_schema_file: with open(contact_schema_path, "r", encoding="utf-8") as contact_schema_file:
contact_schema = json.load(contact_schema_file) contact_schema = json.load(contact_schema_file)
@lru_cache(maxsize=None)
def get_organization_metadata_schema() -> Optional[dict]:
"""Load the organization metadata schema from the settings."""
if not settings.ORGANIZATION_METADATA_SCHEMA:
logger.info("No organization metadata schema specified")
return None
organization_metadata_schema_path = os.path.join(
current_dir,
"jsonschema",
settings.ORGANIZATION_METADATA_SCHEMA,
)
with open(
organization_metadata_schema_path,
"r",
encoding="utf-8",
) as organization_metadata_schema_file:
organization_metadata_schema = json.load(organization_metadata_schema_file)
logger.info(
"Loaded organization metadata schema from %s", organization_metadata_schema_path
)
return organization_metadata_schema
class RoleChoices(models.TextChoices): # pylint: disable=too-many-ancestors class RoleChoices(models.TextChoices): # pylint: disable=too-many-ancestors
"""Defines the possible roles a user can have in a team.""" """Defines the possible roles a user can have in a team."""
@@ -357,6 +384,13 @@ class Organization(BaseModel):
# list overlap validation is done in the validate_unique method # list overlap validation is done in the validate_unique method
) )
metadata = models.JSONField(
_("metadata"),
help_text=_("A JSON object containing the organization metadata"),
blank=True,
default=dict,
)
service_providers = models.ManyToManyField( service_providers = models.ManyToManyField(
ServiceProvider, ServiceProvider,
related_name="organizations", related_name="organizations",
@@ -387,6 +421,22 @@ class Organization(BaseModel):
def __str__(self): def __str__(self):
return f"{self.name} (# {self.pk})" return f"{self.name} (# {self.pk})"
def clean(self):
"""Validate fields."""
super().clean()
organization_metadata_schema = get_organization_metadata_schema()
if not organization_metadata_schema:
return
try:
jsonschema.validate(self.metadata, organization_metadata_schema)
except jsonschema.ValidationError as e:
# Specify the property in the data in which the error occurred
field_path = ".".join(map(str, e.path))
error_message = f"Validation error in '{field_path:s}': {e.message}"
raise exceptions.ValidationError({"metadata": [error_message]}) from e
def validate_unique(self, exclude=None): def validate_unique(self, exclude=None):
""" """
Validate Registration/Domain values in an array field are unique Validate Registration/Domain values in an array field are unique

View File

@@ -125,3 +125,77 @@ def test_models_organization_registration_id_validators():
name="hi", name="hi",
registration_id_list=["a12345678912345"], registration_id_list=["a12345678912345"],
) )
def test_models_organization_metadata_schema_valid(settings):
"""When a schema is provided, valid metadata should pass validation."""
settings.ORGANIZATION_METADATA_SCHEMA = "fr/organization_metadata.json"
# Clear the cache to reload the schema
models.get_organization_metadata_schema.cache_clear()
organization = models.Organization(
name="Valid Metadata Org",
registration_id_list=["12345678901234"],
metadata={"is_public_service": True, "is_commune": False},
)
# This should not raise any validation errors
organization.full_clean()
organization.save()
# Verify the metadata was saved correctly
org = models.Organization.objects.get(pk=organization.pk)
assert org.metadata["is_public_service"] is True
assert org.metadata["is_commune"] is False
settings.ORGANIZATION_METADATA_SCHEMA = None
# Clear the cache to reload the schema
models.get_organization_metadata_schema.cache_clear()
def test_models_organization_metadata_schema_invalid(settings):
"""When a schema is provided, invalid metadata should fail validation."""
settings.ORGANIZATION_METADATA_SCHEMA = "fr/organization_metadata.json"
# Clear the cache to reload the schema
models.get_organization_metadata_schema.cache_clear()
# Integer instead of boolean for is_public_service
organization = models.Organization(
name="Invalid Metadata Org",
registration_id_list=["12345678901234"],
metadata={"is_public_service": 1, "is_commune": False},
)
with pytest.raises(ValidationError) as excinfo:
organization.full_clean()
assert "metadata" in str(excinfo.value)
assert "is_public_service" in str(excinfo.value)
assert "is_commune" not in str(excinfo.value)
settings.ORGANIZATION_METADATA_SCHEMA = None
# Clear the cache to reload the schema
models.get_organization_metadata_schema.cache_clear()
def test_models_organization_no_metadata_schema(settings):
"""When no schema is provided, any metadata should be allowed."""
settings.ORGANIZATION_METADATA_SCHEMA = None
# Clear the cache to reload the schema
models.get_organization_metadata_schema.cache_clear()
# Random metadata that wouldn't match the schema
organization = models.Organization(
name="No Schema Org",
registration_id_list=["12345678901234"],
metadata={"random_field": "anything", "numeric_value": 123},
)
# This should not raise any validation errors
organization.full_clean()
organization.save()
# Verify the metadata was saved correctly
org = models.Organization.objects.get(pk=organization.pk)
assert org.metadata["random_field"] == "anything"
assert org.metadata["numeric_value"] == 123

View File

@@ -550,6 +550,11 @@ class Base(Configuration):
environ_name="ORGANIZATION_PLUGINS", environ_name="ORGANIZATION_PLUGINS",
environ_prefix=None, environ_prefix=None,
) )
ORGANIZATION_METADATA_SCHEMA = values.Value(
default=None,
environ_name="ORGANIZATION_METADATA_SCHEMA",
environ_prefix=None,
)
OAUTH2_PROVIDER_APPLICATION_MODEL = "oauth2_provider.Application" OAUTH2_PROVIDER_APPLICATION_MODEL = "oauth2_provider.Application"
OAUTH2_PROVIDER_GRANT_MODEL = "mailbox_oauth2.Grant" OAUTH2_PROVIDER_GRANT_MODEL = "mailbox_oauth2.Grant"

View File

@@ -31,19 +31,27 @@ class NameFromSiretOrganizationPlugin(BaseOrganizationPlugin):
_api_url = "https://recherche-entreprises.api.gouv.fr/search?q={siret}" _api_url = "https://recherche-entreprises.api.gouv.fr/search?q={siret}"
@staticmethod @staticmethod
def get_organization_name_from_results(data, siret): def get_organization_name_and_metadata_from_results(data, siret):
"""Return the organization name from the results of a SIRET search.""" """Return the organization name and metadata from the results of a SIRET search."""
org_metadata = {}
for result in data["results"]: for result in data["results"]:
for organization in result["matching_etablissements"]: for organization in result["matching_etablissements"]:
if organization.get("siret") == siret: if organization.get("siret") == siret:
org_metadata["is_public_service"] = result.get(
"complements", {}
).get("est_service_public", False)
org_metadata["is_commune"] = (
str(result.get("nature_juridique", "")) == "7210"
)
store_signs = organization.get("liste_enseignes") or [] store_signs = organization.get("liste_enseignes") or []
if store_signs: if store_signs:
return store_signs[0].title() return store_signs[0].title(), org_metadata
if name := result.get("nom_raison_sociale"): if name := result.get("nom_raison_sociale"):
return name.title() return name.title(), org_metadata
logger.warning("No organization name found for SIRET %s", siret) logger.warning("No organization name found for SIRET %s", siret)
return None return None, org_metadata
def run_after_create(self, organization): def run_after_create(self, organization):
"""After creating an organization, update the organization name.""" """After creating an organization, update the organization name."""
@@ -67,15 +75,20 @@ class NameFromSiretOrganizationPlugin(BaseOrganizationPlugin):
response = s.get(self._api_url.format(siret=siret), timeout=10) response = s.get(self._api_url.format(siret=siret), timeout=10)
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()
name = self.get_organization_name_from_results(data, siret)
if not name:
return
except requests.RequestException as exc: except requests.RequestException as exc:
logger.exception("%s: Unable to fetch organization name from SIRET", exc) logger.exception("%s: Unable to fetch organization name from SIRET", exc)
return return
name, metadata = self.get_organization_name_and_metadata_from_results(
data, siret
)
if not name: # don't consider metadata either
return
organization.name = name organization.name = name
organization.save(update_fields=["name", "updated_at"]) organization.metadata = (organization.metadata or {}) | metadata
organization.save(update_fields=["name", "metadata", "updated_at"])
logger.info("Organization %s name updated to %s", organization, name) logger.info("Organization %s name updated to %s", organization, name)
def run_after_grant_access(self, organization_access): def run_after_grant_access(self, organization_access):

View File

@@ -3,7 +3,7 @@
import pytest import pytest
import responses import responses
from core.models import Organization from core.models import Organization, get_organization_metadata_schema
from core.plugins.loader import get_organization_plugins from core.plugins.loader import get_organization_plugins
pytestmark = pytest.mark.django_db pytestmark = pytest.mark.django_db
@@ -30,6 +30,12 @@ def organization_plugins_settings_fixture(settings):
get_organization_plugins.cache_clear() get_organization_plugins.cache_clear()
get_organization_plugins() # call to populate the cache get_organization_plugins() # call to populate the cache
settings.ORGANIZATION_METADATA_SCHEMA = "fr/organization_metadata.json"
# Reset the model validation cache
get_organization_metadata_schema.cache_clear()
get_organization_metadata_schema()
yield yield
# reset get_organization_plugins cache # reset get_organization_plugins cache
@@ -37,9 +43,26 @@ def organization_plugins_settings_fixture(settings):
get_organization_plugins.cache_clear() get_organization_plugins.cache_clear()
get_organization_plugins() # call to populate the cache get_organization_plugins() # call to populate the cache
settings.ORGANIZATION_METADATA_SCHEMA = None
# Reset the model validation cache
get_organization_metadata_schema.cache_clear()
get_organization_metadata_schema()
@responses.activate @responses.activate
def test_organization_plugins_run_after_create(organization_plugins_settings): @pytest.mark.parametrize(
"nature_juridique,is_commune,is_public_service",
[
("123", False, False),
("7210", True, False),
("123", False, True),
("7210", True, True),
],
)
def test_organization_plugins_run_after_create(
organization_plugins_settings, nature_juridique, is_commune, is_public_service
):
"""Test the run_after_create method of the organization plugins for nominal case.""" """Test the run_after_create method of the organization plugins for nominal case."""
responses.add( responses.add(
responses.GET, responses.GET,
@@ -54,7 +77,11 @@ def test_organization_plugins_run_after_create(organization_plugins_settings):
"liste_enseignes": ["AMAZING ORGANIZATION"], "liste_enseignes": ["AMAZING ORGANIZATION"],
"siret": "12345678901234", "siret": "12345678901234",
} }
] ],
"nature_juridique": nature_juridique,
"complements": {
"est_service_public": is_public_service,
},
} }
], ],
"total_results": 1, "total_results": 1,
@@ -69,10 +96,14 @@ def test_organization_plugins_run_after_create(organization_plugins_settings):
name="12345678901234", registration_id_list=["12345678901234"] name="12345678901234", registration_id_list=["12345678901234"]
) )
assert organization.name == "Amazing Organization" assert organization.name == "Amazing Organization"
assert organization.metadata["is_commune"] == is_commune
assert organization.metadata["is_public_service"] == is_public_service
# Check that the organization has been updated in the database also # Check that the organization has been updated in the database also
organization.refresh_from_db() organization.refresh_from_db()
assert organization.name == "Amazing Organization" assert organization.name == "Amazing Organization"
assert organization.metadata["is_commune"] == is_commune
assert organization.metadata["is_public_service"] == is_public_service
@responses.activate @responses.activate
@@ -157,6 +188,10 @@ def test_organization_plugins_run_after_create_no_list_enseignes(
"siret": "12345678901234", "siret": "12345678901234",
} }
], ],
"nature_juridique": "123",
"complements": {
"est_service_public": True,
},
} }
], ],
"total_results": 1, "total_results": 1,
@@ -171,7 +206,11 @@ def test_organization_plugins_run_after_create_no_list_enseignes(
name="12345678901234", registration_id_list=["12345678901234"] name="12345678901234", registration_id_list=["12345678901234"]
) )
assert organization.name == "Amazing Organization" assert organization.name == "Amazing Organization"
assert organization.metadata["is_commune"] is False
assert organization.metadata["is_public_service"] is True
# Check that the organization has been updated in the database also # Check that the organization has been updated in the database also
organization.refresh_from_db() organization.refresh_from_db()
assert organization.name == "Amazing Organization" assert organization.name == "Amazing Organization"
assert organization.metadata["is_commune"] is False
assert organization.metadata["is_public_service"] is True