(domains) check status after creation

Fetch domain status from dimail just after domain creation.
This commit is contained in:
Sabrina Demagny
2025-02-12 18:48:22 +01:00
parent a811431070
commit ab03cd9db9
4 changed files with 278 additions and 188 deletions

View File

@@ -10,6 +10,7 @@ and this project adheres to
### Added
- ✨(domains) check status after creation
- ✨(domains) display required actions to do on domain
- ✨(plugin) add CommuneCreation plugin with domain provisioning #658
- ✨(frontend) display action required status on domain

View File

@@ -118,9 +118,17 @@ class MailDomainSerializer(serializers.ModelSerializer):
# send new domain request to dimail
client = DimailAPIClient()
client.create_domain(validated_data["name"], self.context["request"].user.sub)
# no exception raised ? Then actually save domain on our database
return super().create(validated_data)
domain = super().create(validated_data)
# check domain status and update it
try:
client.fetch_domain_status(domain)
except HTTPError as e:
logger.exception(
"[DIMAIL] domain status fetch after creation failed %s with error %s",
domain.name,
e,
)
return domain
class MailDomainAccessSerializer(serializers.ModelSerializer):

View File

@@ -2,6 +2,7 @@
Tests for MailDomains API endpoint in People's app mailbox_manager. Focus on "create" action.
"""
import json
import logging
import re
@@ -14,6 +15,7 @@ from rest_framework.test import APIClient
from core import factories as core_factories
from mailbox_manager import enums, factories, models
from mailbox_manager.tests.fixtures.dimail import CHECK_DOMAIN_BROKEN, CHECK_DOMAIN_OK
pytestmark = pytest.mark.django_db
@@ -52,6 +54,7 @@ def test_api_mail_domains__create_name_unique():
assert response.json()["name"] == ["Mail domain with this name already exists."]
@responses.activate
def test_api_mail_domains__create_authenticated():
"""
Authenticated users should be able to create mail domains
@@ -64,49 +67,57 @@ def test_api_mail_domains__create_authenticated():
domain_name = "test.domain.fr"
with responses.RequestsMock() as rsps:
rsps.add(
rsps.POST,
re.compile(r".*/domains/"),
body=str(
{
"name": domain_name,
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
"/api/v1.0/mail-domains/",
responses.add(
responses.POST,
re.compile(r".*/domains/"),
body=str(
{
"name": domain_name,
"context": "null",
"features": ["webmail"],
"support_email": f"support@{domain_name}",
},
format="json",
)
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
body_content_domain1 = CHECK_DOMAIN_BROKEN.copy()
body_content_domain1["name"] = domain_name
responses.add(
responses.GET,
re.compile(rf".*/domains/{domain_name}/check/"),
body=json.dumps(body_content_domain1),
status=status.HTTP_200_OK,
content_type="application/json",
)
response = client.post(
"/api/v1.0/mail-domains/",
{
"name": domain_name,
"context": "null",
"features": ["webmail"],
"support_email": f"support@{domain_name}",
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
domain = models.MailDomain.objects.get()
@@ -115,28 +126,42 @@ def test_api_mail_domains__create_authenticated():
"id": str(domain.id),
"name": domain.name,
"slug": domain.slug,
"status": enums.MailDomainStatusChoices.PENDING,
"status": enums.MailDomainStatusChoices.ACTION_REQUIRED,
"created_at": domain.created_at.isoformat().replace("+00:00", "Z"),
"updated_at": domain.updated_at.isoformat().replace("+00:00", "Z"),
"abilities": domain.get_abilities(user),
"count_mailboxes": 0,
"support_email": domain.support_email,
"last_check_details": None,
"action_required_details": {},
"last_check_details": body_content_domain1,
"action_required_details": {
"cname_imap": "Il faut un CNAME 'imap.example.fr' qui renvoie vers "
"'imap.ox.numerique.gouv.fr.'",
"cname_smtp": "Le CNAME pour 'smtp.example.fr' n'est pas bon, "
"il renvoie vers 'ns0.ovh.net.' et je veux 'smtp.ox.numerique.gouv.fr.'",
"cname_webmail": "Il faut un CNAME 'webmail.example.fr' qui "
"renvoie vers 'webmail.ox.numerique.gouv.fr.'",
"dkim": "Il faut un DKIM record, avec la bonne clef",
"mx": "Je veux que le MX du domaine soit mx.ox.numerique.gouv.fr., "
"or je trouve example-fr.mail.protection.outlook.com.",
"spf": "Le SPF record ne contient pas include:_spf.ox.numerique.gouv.fr",
},
}
# a new domain with status "pending" is created and authenticated user is the owner
assert domain.status == enums.MailDomainStatusChoices.PENDING
# a new domain with status "action required" is created and authenticated user is the owner
assert domain.status == enums.MailDomainStatusChoices.ACTION_REQUIRED
assert domain.last_check_details == body_content_domain1
assert domain.name == domain_name
assert domain.accesses.filter(role="owner", user=user).exists()
def test_api_mail_domains__create_authenticated__dimail_failure():
@responses.activate
def test_api_mail_domains__create_authenticated__dimail_failure(caplog):
"""
Despite a dimail failure for user and/or allow creation,
an authenticated user should be able to create a mail domain
and should automatically be added as owner of the newly created domain.
"""
caplog.set_level(logging.ERROR)
user = core_factories.UserFactory()
client = APIClient()
@@ -144,49 +169,59 @@ def test_api_mail_domains__create_authenticated__dimail_failure():
domain_name = "test.domain.fr"
with responses.RequestsMock() as rsps:
rsps.add(
rsps.POST,
re.compile(r".*/domains/"),
body=str(
{
"name": domain_name,
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_403_FORBIDDEN,
content_type="application/json",
)
response = client.post(
"/api/v1.0/mail-domains/",
responses.add(
responses.POST,
re.compile(r".*/domains/"),
body=str(
{
"name": domain_name,
"context": "null",
"features": ["webmail"],
"support_email": f"support@{domain_name}",
},
format="json",
)
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_403_FORBIDDEN,
content_type="application/json",
)
dimail_error = {
"status_code": status.HTTP_401_UNAUTHORIZED,
"detail": "Not authorized",
}
responses.add(
responses.GET,
re.compile(rf".*/domains/{domain_name}/check/"),
body=json.dumps(dimail_error),
status=dimail_error["status_code"],
content_type="application/json",
)
response = client.post(
"/api/v1.0/mail-domains/",
{
"name": domain_name,
"context": "null",
"features": ["webmail"],
"support_email": f"support@{domain_name}",
},
format="json",
)
domain = models.MailDomain.objects.get()
# response is as expected
@@ -208,9 +243,12 @@ def test_api_mail_domains__create_authenticated__dimail_failure():
assert domain.status == enums.MailDomainStatusChoices.FAILED
assert domain.name == domain_name
assert domain.accesses.filter(role="owner", user=user).exists()
assert caplog.records[0].levelname == "ERROR"
assert "Not authorized" in caplog.records[0].message
## SYNC TO DIMAIL
@responses.activate
def test_api_mail_domains__create_dimail_domain(caplog):
"""
Creating a domain should trigger a call to create a domain on dimail too.
@@ -222,50 +260,58 @@ def test_api_mail_domains__create_dimail_domain(caplog):
client.force_login(user)
domain_name = "test.fr"
with responses.RequestsMock() as rsps:
rsp = rsps.add(
rsps.POST,
re.compile(r".*/domains/"),
body=str(
{
"name": domain_name,
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
"/api/v1.0/mail-domains/",
responses.add(
responses.POST,
re.compile(r".*/domains/"),
body=str(
{
"name": domain_name,
"support_email": f"support@{domain_name}",
},
format="json",
)
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
body_content_domain1 = CHECK_DOMAIN_OK.copy()
body_content_domain1["name"] = domain_name
responses.add(
responses.GET,
re.compile(rf".*/domains/{domain_name}/check/"),
body=json.dumps(body_content_domain1),
status=status.HTTP_200_OK,
content_type="application/json",
)
response = client.post(
"/api/v1.0/mail-domains/",
{
"name": domain_name,
"support_email": f"support@{domain_name}",
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
assert rsp.call_count == 1 # endpoint was called
# Logger
assert len(caplog.records) == 4 # should be 3. Last empty info still here.
@@ -283,6 +329,7 @@ def test_api_mail_domains__create_dimail_domain(caplog):
)
@responses.activate
def test_api_mail_domains__no_creation_when_dimail_duplicate(caplog):
"""No domain should be created when dimail returns a 409 conflict."""
user = core_factories.UserFactory()
@@ -294,28 +341,47 @@ def test_api_mail_domains__no_creation_when_dimail_duplicate(caplog):
"status_code": status.HTTP_409_CONFLICT,
"detail": "Domain already exists",
}
responses.add(
responses.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/domains/"),
body=str({"detail": dimail_error["detail"]}),
status=dimail_error["status_code"],
content_type="application/json",
)
with responses.RequestsMock() as rsps:
rsp = rsps.add(
rsps.POST,
re.compile(r".*/domains/"),
body=str({"detail": dimail_error["detail"]}),
status=dimail_error["status_code"],
content_type="application/json",
with pytest.raises(HTTPError):
response = client.post(
"/api/v1.0/mail-domains/",
{
"name": domain_name,
"support_email": f"support@{domain_name}",
},
format="json",
)
with pytest.raises(HTTPError):
response = client.post(
"/api/v1.0/mail-domains/",
{
"name": domain_name,
"support_email": f"support@{domain_name}",
},
format="json",
)
assert rsp.call_count == 1 # endpoint was called
assert response.status_code == dimail_error["status_code"]
assert response.json() == {"detail": dimail_error["detail"]}
assert response.status_code == dimail_error["status_code"]
assert response.json() == {"detail": dimail_error["detail"]}
# check logs
record = caplog.records[0]

View File

@@ -418,12 +418,19 @@ class DimailAPIClient:
def check_domain(self, domain):
"""Send a request to dimail to check domain health."""
response = session.get(
f"{self.API_URL}/domains/{domain.name}/check/",
headers={"Authorization": f"Basic {self.API_CREDENTIALS}"},
verify=True,
timeout=10,
)
try:
response = session.get(
f"{self.API_URL}/domains/{domain.name}/check/",
headers={"Authorization": f"Basic {self.API_CREDENTIALS}"},
verify=True,
timeout=10,
)
except requests.exceptions.ConnectionError as error:
logger.error(
"Connection error while trying to reach %s.",
self.API_URL,
exc_info=error,
)
if response.status_code == status.HTTP_200_OK:
return response.json()
return self.raise_exception_for_unexpected_response(response)
@@ -448,52 +455,60 @@ class DimailAPIClient:
def fetch_domain_status(self, domain):
"""Send a request to check and update status of a domain."""
dimail_response = self.check_domain(domain)
dimail_state = dimail_response["state"]
# if domain is not enabled and dimail returns ok status, enable it
if (
domain.status != enums.MailDomainStatusChoices.ENABLED
and dimail_state == "ok"
):
self.enable_pending_mailboxes(domain)
domain.status = enums.MailDomainStatusChoices.ENABLED
domain.last_check_details = dimail_response
domain.save()
# if dimail returns broken status, we need to extract external and internal checks
# and manage the case where the domain has to be fixed by support
elif dimail_state == "broken":
external_checks = self._get_dimail_checks(dimail_response, internal=False)
internal_checks = self._get_dimail_checks(dimail_response, internal=True)
# manage the case where the domain has to be fixed by support
if not all(external_checks.values()):
domain.status = enums.MailDomainStatusChoices.ACTION_REQUIRED
if dimail_response:
dimail_state = dimail_response["state"]
# if domain is not enabled and dimail returns ok status, enable it
if (
domain.status != enums.MailDomainStatusChoices.ENABLED
and dimail_state == "ok"
):
self.enable_pending_mailboxes(domain)
domain.status = enums.MailDomainStatusChoices.ENABLED
domain.last_check_details = dimail_response
domain.save()
# if all external checks are ok but not internal checks, we need to fix internal checks
elif all(external_checks.values()) and not all(internal_checks.values()):
# a call to fix endpoint is needed because all external checks are ok
dimail_response = self.fix_domain(domain)
# we need to check again if all internal and external checks are ok
# if dimail returns broken status, we need to extract external and internal checks
# and manage the case where the domain has to be fixed by support
elif dimail_state == "broken":
external_checks = self._get_dimail_checks(
dimail_response, internal=False
)
internal_checks = self._get_dimail_checks(
dimail_response, internal=True
)
if all(external_checks.values()) and all(internal_checks.values()):
domain.status = enums.MailDomainStatusChoices.ENABLED
# manage the case where the domain has to be fixed by support
if not all(external_checks.values()):
domain.status = enums.MailDomainStatusChoices.ACTION_REQUIRED
domain.last_check_details = dimail_response
domain.save()
# if all external checks are ok but not internal checks, we need to fix
# internal checks
elif all(external_checks.values()) and not all(
internal_checks.values()
):
domain.status = enums.MailDomainStatusChoices.FAILED
domain.last_check_details = dimail_response
domain.save()
# a call to fix endpoint is needed because all external checks are ok
dimail_response = self.fix_domain(domain)
# we need to check again if all internal and external checks are ok
external_checks = self._get_dimail_checks(
dimail_response, internal=False
)
internal_checks = self._get_dimail_checks(
dimail_response, internal=True
)
if all(external_checks.values()) and all(internal_checks.values()):
domain.status = enums.MailDomainStatusChoices.ENABLED
domain.last_check_details = dimail_response
domain.save()
elif all(external_checks.values()) and not all(
internal_checks.values()
):
domain.status = enums.MailDomainStatusChoices.FAILED
domain.last_check_details = dimail_response
domain.save()
# if no health check data is stored on the domain, we store it now
if not domain.last_check_details:
domain.last_check_details = dimail_response
domain.save()
# if no health check data is stored on the domain, we store it now
if not domain.last_check_details:
domain.last_check_details = dimail_response
domain.save()
return dimail_response