🛂(dimail) simplify interop with dimail

In this commit, we stop creating /users and /allows in dimail
for our dbs to be in sync. People with stop impersonating users
in dimail and will create mailboxes using its own credentials.
This commit is contained in:
Marie PUPO JEAMMET
2025-04-14 16:05:31 +02:00
committed by Marie
parent 6721328b2d
commit 056a4bd7ac
10 changed files with 75 additions and 496 deletions

View File

@@ -82,27 +82,6 @@ def test_api_mail_domains__create_authenticated():
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
body_content_domain1 = CHECK_DOMAIN_BROKEN.copy()
body_content_domain1["name"] = domain_name
responses.add(
@@ -166,101 +145,6 @@ def test_api_mail_domains__create_authenticated():
assert domain.accesses.filter(role="owner", user=user).exists()
@responses.activate
def test_api_mail_domains__create_authenticated__dimail_failure(caplog):
"""
Despite a dimail failure for user and/or allow creation,
an authenticated user should be able to create a mail domain
and should automatically be added as owner of the newly created domain.
"""
caplog.set_level(logging.ERROR)
user = core_factories.UserFactory()
client = APIClient()
client.force_login(user)
domain_name = "test.domain.fr"
responses.add(
responses.POST,
re.compile(r".*/domains/"),
body=str(
{
"name": domain_name,
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_403_FORBIDDEN,
content_type="application/json",
)
dimail_error = {
"status_code": status.HTTP_401_UNAUTHORIZED,
"detail": "Not authorized",
}
responses.add(
responses.GET,
re.compile(rf".*/domains/{domain_name}/check/"),
body=json.dumps(dimail_error),
status=dimail_error["status_code"],
content_type="application/json",
)
response = client.post(
"/api/v1.0/mail-domains/",
{
"name": domain_name,
"context": "null",
"features": ["webmail"],
"support_email": f"support@{domain_name}",
},
format="json",
)
domain = models.MailDomain.objects.get()
# response is as expected
assert response.json() == {
"id": str(domain.id),
"name": domain.name,
"slug": domain.slug,
"status": enums.MailDomainStatusChoices.FAILED,
"created_at": domain.created_at.isoformat().replace("+00:00", "Z"),
"updated_at": domain.updated_at.isoformat().replace("+00:00", "Z"),
"abilities": domain.get_abilities(user),
"count_mailboxes": 0,
"support_email": domain.support_email,
"last_check_details": None,
"action_required_details": {},
"expected_config": None,
}
# a new domain with status "failed" is created and authenticated user is the owner
assert domain.status == enums.MailDomainStatusChoices.FAILED
assert domain.name == domain_name
assert domain.accesses.filter(role="owner", user=user).exists()
assert caplog.records[0].levelname == "ERROR"
assert "Not authorized" in caplog.records[0].message
## SYNC TO DIMAIL
@responses.activate
def test_api_mail_domains__create_dimail_domain(caplog):
@@ -285,28 +169,6 @@ def test_api_mail_domains__create_dimail_domain(caplog):
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
body_content_domain1 = CHECK_DOMAIN_OK.copy()
body_content_domain1["name"] = domain_name
responses.add(
@@ -340,11 +202,6 @@ def test_api_mail_domains__create_dimail_domain(caplog):
f"Domain {domain_name} successfully created on dimail by user {user.sub}"
in log_messages
)
assert f'[DIMAIL] User "{user.sub}" successfully created on dimail' in log_messages
assert (
f'[DIMAIL] Permissions granted for user "{user.sub}" on domain {domain_name}.'
in log_messages
)
@responses.activate
@@ -359,27 +216,6 @@ def test_api_mail_domains__no_creation_when_dimail_duplicate(caplog):
"status_code": status.HTTP_409_CONFLICT,
"detail": "Domain already exists",
}
responses.add(
responses.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
responses.add(
responses.POST,
re.compile(r".*/domains/"),

View File

@@ -2,12 +2,9 @@
Test for mail domain accesses API endpoints in People's core app : create
"""
import logging
import random
import re
import pytest
import responses
from rest_framework import status
from rest_framework.test import APIClient
@@ -108,16 +105,15 @@ def test_api_mail_domain__accesses_create_authenticated_administrator():
client = APIClient()
client.force_login(authenticated_user)
with responses.RequestsMock() as rsps:
# It should not be allowed to create an owner access
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": enums.MailDomainRoleChoices.OWNER,
},
format="json",
)
# It should not be allowed to create an owner access
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": enums.MailDomainRoleChoices.OWNER,
},
format="json",
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json() == {
@@ -127,38 +123,14 @@ def test_api_mail_domain__accesses_create_authenticated_administrator():
# It should be allowed to create a lower access
for role in [enums.MailDomainRoleChoices.ADMIN, enums.MailDomainRoleChoices.VIEWER]:
other_user = core_factories.UserFactory()
with responses.RequestsMock() as rsps:
if role != enums.MailDomainRoleChoices.VIEWER:
# viewers don't have allows in dimail
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{
"name": str(other_user.sub),
"is_admin": "false",
"uuid": "71f60d74-a3ad-46bc-bc2b-20d79a2e36fb",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": other_user.sub, "domain": str(mail_domain.name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
)
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
new_mail_domain_access = models.MailDomainAccess.objects.filter(
user=other_user
@@ -182,37 +154,15 @@ def test_api_mail_domain__accesses_create_authenticated_owner():
client = APIClient()
client.force_login(authenticated_user)
with responses.RequestsMock() as rsps:
if role != enums.MailDomainRoleChoices.VIEWER:
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{
"name": str(other_user.sub),
"is_admin": "false",
"uuid": "71f60d74-a3ad-46bc-bc2b-20d79a2e36fb",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": other_user.sub, "domain": str(mail_domain.name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
)
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
assert models.MailDomainAccess.objects.filter(user=other_user).count() == 1
@@ -221,146 +171,3 @@ def test_api_mail_domain__accesses_create_authenticated_owner():
).get()
assert response.json()["id"] == str(new_mail_domain_access.id)
assert response.json()["role"] == role
## INTEROP WITH DIMAIL
def test_api_mail_domains_accesses__create_dimail_allows(caplog):
"""
Creating a domain access on our API should trigger a request to create an access on dimail too.
"""
caplog.set_level(logging.INFO)
authenticated_user = core_factories.UserFactory()
domain = factories.MailDomainFactory(status="enabled")
factories.MailDomainAccessFactory(
domain=domain, user=authenticated_user, role=enums.MailDomainRoleChoices.OWNER
)
client = APIClient()
client.force_login(authenticated_user)
allowed_user = core_factories.UserFactory()
with responses.RequestsMock() as rsps:
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{
"name": str(allowed_user.sub),
"is_admin": "false",
"uuid": "71f60d74-a3ad-46bc-bc2b-20d79a2e36fb",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": allowed_user.sub, "domain": str(domain.name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
f"/api/v1.0/mail-domains/{domain.slug}/accesses/",
{
"user": str(allowed_user.id),
"role": enums.MailDomainRoleChoices.ADMIN,
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
log_messages = [msg.message for msg in caplog.records]
# check logs
assert (
f'[DIMAIL] User "{allowed_user.sub}" successfully created on dimail'
in log_messages
)
assert (
f'[DIMAIL] Permissions granted for user "{allowed_user.sub}" on domain {domain.name}.'
in log_messages
)
def test_api_mail_domains_accesses__dont_create_dimail_allows_for_viewer():
"""Dimail should not be called when creating an access to a simple viewer."""
authenticated_user = core_factories.UserFactory()
domain = factories.MailDomainFactory(status="enabled")
factories.MailDomainAccessFactory(
domain=domain, user=authenticated_user, role=enums.MailDomainRoleChoices.OWNER
)
client = APIClient()
client.force_login(authenticated_user)
allowed_user = core_factories.UserFactory()
with responses.RequestsMock():
# No call expected
response = client.post(
f"/api/v1.0/mail-domains/{domain.slug}/accesses/",
{
"user": str(allowed_user.id),
"role": enums.MailDomainRoleChoices.VIEWER,
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
def test_api_mail_domains_accesses__user_already_on_dimail(caplog):
"""The expected allow should be created when an user already exists on dimail
(i.e. previous admin/owner of same domain or current on another domain)."""
caplog.set_level(logging.INFO)
authenticated_user = core_factories.UserFactory()
domain = factories.MailDomainFactory()
factories.MailDomainAccessFactory(
domain=domain, user=authenticated_user, role=enums.MailDomainRoleChoices.OWNER
)
client = APIClient()
client.force_login(authenticated_user)
allowed_user = core_factories.UserFactory()
with responses.RequestsMock() as rsps:
# No call expected
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{"detail": "User already exists"}
), # the user is already on dimail
status=status.HTTP_409_CONFLICT,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": allowed_user.sub, "domain": str(domain.name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
f"/api/v1.0/mail-domains/{domain.slug}/accesses/",
{
"user": str(allowed_user.id),
"role": enums.MailDomainRoleChoices.ADMIN,
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
# check logs
log_messages = [msg.message for msg in caplog.records]
assert (
f'[DIMAIL] Attempt to create user "{allowed_user.sub}" which already exists.'
in log_messages
)
assert (
f'[DIMAIL] Permissions granted for user "{allowed_user.sub}" on domain {domain.name}.'
in log_messages
)

View File

@@ -765,8 +765,7 @@ def test_api_mailboxes__handling_dimail_unexpected_error(caplog):
@mock.patch.object(Logger, "info")
def test_api_mailboxes__send_correct_logger_infos(mock_info, mock_error):
"""
Upon requesting mailbox creation, la régie should impersonate
querying user in dimail and log things correctly.
Upon requesting mailbox creation, logs should report request user.
"""
access = factories.MailDomainAccessFactory(role=enums.MailDomainRoleChoices.OWNER)
@@ -802,9 +801,6 @@ def test_api_mailboxes__send_correct_logger_infos(mock_info, mock_error):
)
assert response.status_code == status.HTTP_201_CREATED
# user sub is sent to payload as a parameter
assert rsps.calls[0].request.params == {"username": access.user.sub}
# Logger
assert not mock_error.called
# Check all expected log messages are present, order doesn't matter

View File

@@ -2,21 +2,17 @@
Unit tests for the Mail Domain Invitation model
"""
import re
import time
from django.conf import settings
from django.core import exceptions
import pytest
import responses
from freezegun import freeze_time
from rest_framework import status
from core import factories as core_factories
from mailbox_manager import enums, factories, models
from mailbox_manager.tests.fixtures import dimail
pytestmark = pytest.mark.django_db
@@ -63,38 +59,22 @@ def test_models_domain_invitation__should_convert_invitations_to_accesses_upon_j
domain=invitation_to_domain2.domain
)
new_user = core_factories.UserFactory.build(email=email)
with responses.RequestsMock() as rsps:
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=dimail.response_user_created("sub"),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=dimail.response_allows_created(
"sub", invitation_to_domain1.domain.name
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
new_user = core_factories.UserFactory(email=email)
new_user = core_factories.UserFactory(email=email)
assert models.MailDomainAccess.objects.filter(
domain=invitation_to_domain1.domain, user=new_user
).exists()
assert not models.MailDomainInvitation.objects.filter(
domain=invitation_to_domain1.domain, email=email
).exists() # invitation "consumed"
assert models.MailDomainAccess.objects.filter(
domain=invitation_to_domain2.domain, user=new_user
).exists()
assert not models.MailDomainInvitation.objects.filter(
domain=invitation_to_domain1.domain, email=email
).exists() # invitation "consumed"
assert not models.MailDomainInvitation.objects.filter(
domain=invitation_to_domain2.domain, email=email
).exists() # invitation "consumed"
assert models.MailDomainInvitation.objects.filter(
domain=expired_invitation.domain, email=email
).exists() # expired invitation remains