(aliases) import existing aliases from dimail

import existing aliases from dimail, making sure usernames don't clash with
existing mailboxes. Convenient when people fall out of sync
with dimail or for domains partially operated outside people.
This commit is contained in:
Marie PUPO JEAMMET
2025-11-03 19:06:51 +01:00
committed by Marie
parent befcecf33c
commit 040f949e5e
4 changed files with 277 additions and 155 deletions

View File

@@ -8,6 +8,7 @@ and this project adheres to
## [Unreleased]
- ✨(aliases) import existing aliases from dimail
- 🛂(permissions) return 404 to users with no access to domain #985
- ✨(aliases) can create, list and delete aliases #974

View File

@@ -10,8 +10,6 @@ from requests import exceptions
from mailbox_manager import enums, models
from mailbox_manager.utils.dimail import DimailAPIClient
# Prevent Ruff complaining about mark_safe below
@admin.action(description=_("Import emails from dimail"))
def sync_mailboxes_from_dimail(modeladmin, request, queryset): # pylint: disable=unused-argument
@@ -50,6 +48,50 @@ def sync_mailboxes_from_dimail(modeladmin, request, queryset): # pylint: disabl
)
@admin.action(description=_("Import aliases from dimail"))
def sync_aliases_from_dimail(modeladmin, request, queryset): # pylint: disable=unused-argument
"""
Admin action to import existing aliases from dimail.
Checks alias is not a duplicate and that usernames don't clash with existing mailboxes.
"""
excluded_domains = []
client = DimailAPIClient()
for domain in queryset:
if domain.status != enums.MailDomainStatusChoices.ENABLED:
excluded_domains.append(domain.name)
continue
try:
imported_aliases = client.import_aliases(domain)
except exceptions.HTTPError as err:
messages.error(
request,
_("Synchronisation failed for %(domain)s with message: %(err)s")
% {"domain": domain.name, "err": err},
)
else:
messages.success(
request,
_(
"Synchronisation succeed for %(domain)s. %(imported_aliases)\
imported aliases: %(mailboxes)s"
)
% {
"domain": domain.name,
"number_imported": len(imported_aliases),
"mailboxes": ", ".join(imported_aliases),
},
)
if excluded_domains:
messages.warning(
request,
_("Sync require enabled domains. Excluded domains: %(domains)s")
% {"domains": ", ".join(excluded_domains)},
)
@admin.action(description=_("Check and update status from dimail"))
def fetch_domain_status_from_dimail(modeladmin, request, queryset): # pylint: disable=unused-argument
"""Admin action to check domain health with dimail and update domain status."""

View File

@@ -2,7 +2,8 @@
Unit tests for dimail client
"""
import json
# pylint: disable=W0613
import logging
import re
from email.errors import HeaderParseError, NonASCIILocalPartDefect
@@ -21,14 +22,14 @@ from .fixtures.dimail import (
CHECK_DOMAIN_BROKEN_EXTERNAL,
CHECK_DOMAIN_BROKEN_INTERNAL,
CHECK_DOMAIN_OK,
TOKEN_OK,
response_mailbox_created,
)
pytestmark = pytest.mark.django_db
def test_dimail_synchronization__already_sync():
@responses.activate
def test_dimail_synchronization__already_sync(dimail_token_ok):
"""
No mailbox should be created when everything is already synced.
"""
@@ -39,35 +40,26 @@ def test_dimail_synchronization__already_sync():
assert pre_sync_mailboxes.count() == 3
dimail_client = DimailAPIClient()
with responses.RequestsMock() as rsps:
# Ensure successful response using "responses":
rsps.add(
rsps.GET,
re.compile(r".*/token/"),
body='{"access_token": "dimail_people_token"}',
status=status.HTTP_200_OK,
content_type="application/json",
)
rsps.add(
rsps.GET,
re.compile(rf".*/domains/{domain.name}/mailboxes/"),
body=json.dumps(
[
{
"type": "mailbox",
"status": "broken",
"email": f"{mailbox.local_part}@{domain.name}",
"givenName": mailbox.first_name,
"surName": mailbox.last_name,
"displayName": f"{mailbox.first_name} {mailbox.last_name}",
}
for mailbox in pre_sync_mailboxes
]
),
status=status.HTTP_200_OK,
content_type="application/json",
)
imported_mailboxes = dimail_client.import_mailboxes(domain)
# Ensure successful response using "responses":
# token response in fixtures
responses.get(
re.compile(rf".*/domains/{domain.name}/mailboxes/"),
json=[
{
"type": "mailbox",
"status": "broken",
"email": f"{mailbox.local_part}@{domain.name}",
"givenName": mailbox.first_name,
"surName": mailbox.last_name,
"displayName": f"{mailbox.first_name} {mailbox.last_name}",
}
for mailbox in pre_sync_mailboxes
],
status=status.HTTP_200_OK,
content_type="application/json",
)
imported_mailboxes = dimail_client.import_mailboxes(domain)
post_sync_mailboxes = models.Mailbox.objects.filter(domain=domain)
assert post_sync_mailboxes.count() == 3
@@ -75,97 +67,138 @@ def test_dimail_synchronization__already_sync():
assert set(models.Mailbox.objects.filter(domain=domain)) == set(pre_sync_mailboxes)
@responses.activate
@mock.patch.object(Logger, "warning")
def test_dimail_synchronization__synchronize_mailboxes(mock_warning):
def test_dimail_synchronization__synchronize_mailboxes(mock_warning, dimail_token_ok):
"""A mailbox existing solely on dimail should be synchronized
upon calling sync function on its domain"""
domain = factories.MailDomainEnabledFactory()
assert not models.Mailbox.objects.exists()
dimail_client = DimailAPIClient()
with responses.RequestsMock() as rsps:
# Ensure successful response using "responses":
rsps.add(
rsps.GET,
re.compile(r".*/token/"),
body='{"access_token": "dimail_people_token"}',
status=status.HTTP_200_OK,
content_type="application/json",
)
# Ensure successful response using "responses":
# token response in fixtures
mailbox_valid = {
"type": "mailbox",
"status": "broken",
"email": f"oxadmin@{domain.name}",
"givenName": "Admin",
"surName": "Context",
"displayName": "Context Admin",
}
mailbox_with_wrong_domain = {
"type": "mailbox",
"status": "broken",
"email": "johndoe@wrongdomain.com",
"givenName": "John",
"surName": "Doe",
"displayName": "John Doe",
}
mailbox_with_invalid_domain = {
"type": "mailbox",
"status": "broken",
"email": f"naw@ake@{domain.name}",
"givenName": "Joe",
"surName": "Doe",
"displayName": "Joe Doe",
}
mailbox_with_invalid_local_part = {
"type": "mailbox",
"status": "broken",
"email": f"obalmaské@{domain.name}",
"givenName": "Jean",
"surName": "Vang",
"displayName": "Jean Vang",
}
mailbox_valid = {
"type": "mailbox",
"status": "broken",
"email": f"oxadmin@{domain.name}",
"givenName": "Admin",
"surName": "Context",
"displayName": "Context Admin",
}
mailbox_with_wrong_domain = {
"type": "mailbox",
"status": "broken",
"email": "johndoe@wrongdomain.com",
"givenName": "John",
"surName": "Doe",
"displayName": "John Doe",
}
mailbox_with_invalid_domain = {
"type": "mailbox",
"status": "broken",
"email": f"naw@ake@{domain.name}",
"givenName": "Joe",
"surName": "Doe",
"displayName": "Joe Doe",
}
mailbox_with_invalid_local_part = {
"type": "mailbox",
"status": "broken",
"email": f"obalmaské@{domain.name}",
"givenName": "Jean",
"surName": "Vang",
"displayName": "Jean Vang",
}
responses.get(
re.compile(rf".*/domains/{domain.name}/mailboxes/"),
json=[
mailbox_valid,
mailbox_with_wrong_domain,
mailbox_with_invalid_domain,
mailbox_with_invalid_local_part,
],
status=status.HTTP_200_OK,
content_type="application/json",
)
rsps.add(
rsps.GET,
re.compile(rf".*/domains/{domain.name}/mailboxes/"),
body=json.dumps(
[
mailbox_valid,
mailbox_with_wrong_domain,
mailbox_with_invalid_domain,
mailbox_with_invalid_local_part,
]
),
status=status.HTTP_200_OK,
content_type="application/json",
)
imported_mailboxes = dimail_client.import_mailboxes(domain)
imported_mailboxes = dimail_client.import_mailboxes(domain)
# 3 imports failed: wrong domain, HeaderParseError, NonASCIILocalPartDefect
assert mock_warning.call_count == 3
# 3 imports failed: wrong domain, HeaderParseError, NonASCIILocalPartDefect
assert mock_warning.call_count == 3
# first we try to import email with a wrong domain
assert mock_warning.call_args_list[0][0] == (
"Import of email %s failed because of a wrong domain",
mailbox_with_wrong_domain["email"],
)
# first we try to import email with a wrong domain
assert mock_warning.call_args_list[0][0] == (
"Import of email %s failed because of a wrong domain",
mailbox_with_wrong_domain["email"],
)
# then we try to import email with invalid domain
invalid_mailbox_log = mock_warning.call_args_list[1][0]
assert invalid_mailbox_log[1] == mailbox_with_invalid_domain["email"]
assert isinstance(invalid_mailbox_log[2], HeaderParseError)
# then we try to import email with invalid domain
invalid_mailbox_log = mock_warning.call_args_list[1][0]
assert invalid_mailbox_log[1] == mailbox_with_invalid_domain["email"]
assert isinstance(invalid_mailbox_log[2], HeaderParseError)
# finally we try to import email with non ascii local part
non_ascii_mailbox_log = mock_warning.call_args_list[2][0]
assert non_ascii_mailbox_log[1] == mailbox_with_invalid_local_part["email"]
assert isinstance(non_ascii_mailbox_log[2], NonASCIILocalPartDefect)
# finally we try to import email with non ascii local part
non_ascii_mailbox_log = mock_warning.call_args_list[2][0]
assert non_ascii_mailbox_log[1] == mailbox_with_invalid_local_part["email"]
assert isinstance(non_ascii_mailbox_log[2], NonASCIILocalPartDefect)
mailbox = models.Mailbox.objects.get()
assert mailbox.local_part == "oxadmin"
assert mailbox.status == enums.MailboxStatusChoices.ENABLED
assert imported_mailboxes == [mailbox_valid["email"]]
mailbox = models.Mailbox.objects.get()
assert mailbox.local_part == "oxadmin"
assert mailbox.status == enums.MailboxStatusChoices.ENABLED
assert imported_mailboxes == [mailbox_valid["email"]]
@responses.activate
def test_dimail_synchronization__synchronize_aliases(dimail_token_ok): # pylint: disable=unused-argument
"""Should import aliases from dimail if they don't already exist
and if username is not already used for mailbox"""
alias = factories.AliasFactory()
dimail_client = DimailAPIClient()
existing_mailbox = factories.MailboxFactory(domain=alias.domain)
# Ensure successful response using "responses":
# token response in fixtures
incoming_aliases = [
{
"username": "contact",
"domain": alias.domain.name,
"destination": alias.destination, # same destination
"allow_to_send": False,
},
{
"username": alias.local_part, # same username
"domain": alias.domain.name,
"destination": "maheius.endorecles@somethingelse.com",
"allow_to_send": False,
},
{ # same username + same destination = big nono
"username": alias.local_part,
"domain": alias.domain.name,
"destination": alias.destination,
"allow_to_send": False,
},
{ # username already used for a mailbox
"username": existing_mailbox.local_part,
"domain": alias.domain.name,
"destination": existing_mailbox.secondary_email,
"allow_to_send": False,
},
]
responses.get(
re.compile(rf".*/domains/{alias.domain.name}/aliases/"),
json=incoming_aliases,
status=status.HTTP_200_OK,
content_type="application/json",
)
imported_aliases = dimail_client.import_aliases(alias.domain)
assert len(imported_aliases) == 2
assert models.Alias.objects.count() == 3
@pytest.mark.parametrize(
@@ -183,10 +216,9 @@ def test_dimail__fetch_domain_status__switch_to_enabled(domain_status):
domain = factories.MailDomainFactory(status=domain_status)
body_content = CHECK_DOMAIN_OK.copy()
body_content["name"] = domain.name
responses.add(
responses.GET,
responses.get(
re.compile(rf".*/domains/{domain.name}/check/"),
body=json.dumps(body_content),
json=body_content,
status=status.HTTP_200_OK,
content_type="application/json",
)
@@ -221,10 +253,9 @@ def test_dimail__fetch_domain_status__switch_to_action_required(
domain = factories.MailDomainFactory(status=domain_status)
body_domain_broken = CHECK_DOMAIN_BROKEN_EXTERNAL.copy()
body_domain_broken["name"] = domain.name
responses.add(
responses.GET,
responses.get(
re.compile(rf".*/domains/{domain.name}/check/"),
body=json.dumps(body_domain_broken),
json=body_domain_broken,
status=status.HTTP_200_OK,
content_type="application/json",
)
@@ -238,10 +269,9 @@ def test_dimail__fetch_domain_status__switch_to_action_required(
# Now domain is OK again
body_domain_ok = CHECK_DOMAIN_OK.copy()
body_domain_ok["name"] = domain.name
responses.add(
responses.GET,
responses.get(
re.compile(rf".*/domains/{domain.name}/check/"),
body=json.dumps(body_domain_ok),
json=body_domain_ok,
status=status.HTTP_200_OK,
content_type="application/json",
)
@@ -267,18 +297,16 @@ def test_dimail__fetch_domain_status__switch_to_failed(domain_status):
# nothing can be done by support team, domain should be in failed
body_domain_broken = CHECK_DOMAIN_BROKEN_INTERNAL.copy()
body_domain_broken["name"] = domain.name
responses.add(
responses.GET,
responses.get(
re.compile(rf".*/domains/{domain.name}/check/"),
body=json.dumps(body_domain_broken),
json=body_domain_broken,
status=status.HTTP_200_OK,
content_type="application/json",
)
# the endpoint fix is called and still returns KO for internal checks
responses.add(
responses.GET,
responses.get(
re.compile(rf".*/domains/{domain.name}/fix/"),
body=json.dumps(body_domain_broken),
json=body_domain_broken,
status=status.HTTP_200_OK,
content_type="application/json",
)
@@ -305,10 +333,9 @@ def test_dimail__fetch_domain_status__full_fix_scenario(domain_status):
# with all checks KO, domain should be in action required
body_domain_broken = CHECK_DOMAIN_BROKEN.copy()
body_domain_broken["name"] = domain.name
responses.add(
responses.GET,
responses.get(
re.compile(rf".*/domains/{domain.name}/check/"),
body=json.dumps(body_domain_broken),
json=body_domain_broken,
status=status.HTTP_200_OK,
content_type="application/json",
)
@@ -324,20 +351,18 @@ def test_dimail__fetch_domain_status__full_fix_scenario(domain_status):
# the fetch_domain_status call
body_domain_broken_internal = CHECK_DOMAIN_BROKEN_INTERNAL.copy()
body_domain_broken_internal["name"] = domain.name
responses.add(
responses.GET,
responses.get(
re.compile(rf".*/domains/{domain.name}/check/"),
body=json.dumps(body_domain_broken_internal),
json=body_domain_broken_internal,
status=status.HTTP_200_OK,
content_type="application/json",
)
# the endpoint fix is called and returns OK. Hooray!
body_domain_ok = CHECK_DOMAIN_OK.copy()
body_domain_ok["name"] = domain.name
responses.add(
responses.GET,
responses.get(
re.compile(rf".*/domains/{domain.name}/fix/"),
body=json.dumps(body_domain_ok),
json=body_domain_ok,
status=status.HTTP_200_OK,
content_type="application/json",
)
@@ -348,7 +373,8 @@ def test_dimail__fetch_domain_status__full_fix_scenario(domain_status):
assert domain.last_check_details == body_domain_ok
def test_dimail__send_pending_mailboxes(caplog):
@responses.activate
def test_dimail__send_pending_mailboxes(caplog, dimail_token_ok):
"""Status of pending mailboxes should switch to "enabled"
when calling send_pending_mailboxes."""
caplog.set_level(logging.INFO)
@@ -365,22 +391,16 @@ def test_dimail__send_pending_mailboxes(caplog):
)
dimail_client = DimailAPIClient()
with responses.RequestsMock() as rsps:
rsps.add(
rsps.GET,
re.compile(r".*/token/"),
body=TOKEN_OK,
status=status.HTTP_200_OK,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(rf".*/domains/{domain.name}/mailboxes/"),
body=response_mailbox_created(f"mock@{domain.name}"),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
dimail_client.send_pending_mailboxes(domain=domain)
# Ensure successful response using "responses":
# token response in fixtures
responses.post(
re.compile(rf".*/domains/{domain.name}/mailboxes/"),
body=response_mailbox_created(f"mock@{domain.name}"),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
dimail_client.send_pending_mailboxes(domain=domain)
mailbox1.refresh_from_db()
mailbox2.refresh_from_db()

View File

@@ -375,12 +375,16 @@ class DimailAPIClient:
return self.raise_exception_for_unexpected_response(response)
dimail_mailboxes = response.json()
people_mailboxes = models.Mailbox.objects.filter(domain=domain)
known_mailboxes = models.Mailbox.objects.filter(domain=domain)
known_aliases = [
known_alias.local_part
for known_alias in models.Alias.objects.filter(domain=domain)
]
imported_mailboxes = []
for dimail_mailbox in dimail_mailboxes:
if not dimail_mailbox["email"] in [
str(people_mailbox) for people_mailbox in people_mailboxes
]:
if dimail_mailbox["email"] not in [
str(known_mailboxes) for known_mailboxes in known_mailboxes
] and dimail_mailbox['email'].split('@')[0] not in known_aliases:
try:
# sometimes dimail api returns email from another domain,
# so we decide to exclude this kind of email
@@ -776,3 +780,58 @@ class DimailAPIClient:
return response
return self.raise_exception_for_unexpected_response(response)
def import_aliases(self, domain):
"""Import aliases from dimail. Useful if people fall out of sync with dimail."""
try:
response = session.get(
f"{self.API_URL}/domains/{domain.name}/aliases/",
headers=self.get_headers(),
verify=True,
timeout=self.API_TIMEOUT,
)
except requests.exceptions.ConnectionError as error:
logger.error(
"Connection error while trying to reach %s.",
self.API_URL,
exc_info=error,
)
raise error
if response.status_code != status.HTTP_200_OK:
return self.raise_exception_for_unexpected_response(response)
incoming_aliases = response.json()
known_aliases = [
(known_alias.local_part, known_alias.destination)
for known_alias in models.Alias.objects.filter(domain=domain)
]
known_mailboxes = [
known_mailbox.local_part
for known_mailbox in models.Mailbox.objects.filter(domain=domain)
]
imported_aliases = []
for incoming_alias in incoming_aliases:
if (
incoming_alias["username"],
incoming_alias["destination"],
) not in known_aliases and incoming_alias[
"username"
] not in known_mailboxes:
try:
new_alias = models.Alias.objects.create(
local_part=incoming_alias["username"],
destination=incoming_alias["destination"],
domain=domain,
)
except (HeaderParseError, NonASCIILocalPartDefect) as err:
logger.warning(
"Import of alias %s to %s failed with error %s",
incoming_alias["username"],
incoming_alias["destination"],
err,
)
else:
imported_aliases.append(str(new_alias))
return imported_aliases