(dimail) automate allows requests to dimail

Automatically send user creation and allow creation requests to dimail
upon creating domain access.
This commit is contained in:
Marie PUPO JEAMMET
2024-11-20 19:14:48 +01:00
committed by Marie
parent cf155dc033
commit 54120eb179
6 changed files with 384 additions and 42 deletions

View File

@@ -10,6 +10,7 @@ and this project adheres to
### Added
- ✨(dimail) automate allows requests to dimail
- ✨(contacts) add notes & force full_name #565
### Changed

View File

@@ -99,7 +99,7 @@ class MailDomainSerializer(serializers.ModelSerializer):
client.create_domain(validated_data["name"], self.context["request"].user.sub)
# no exception raised ? Then actually save domain on our database
return models.MailDomain.objects.create(**validated_data)
return super().create(validated_data)
class MailDomainAccessSerializer(serializers.ModelSerializer):
@@ -169,6 +169,7 @@ class MailDomainAccessSerializer(serializers.ModelSerializer):
raise exceptions.PermissionDenied(
"You are not allowed to manage accesses for this domain."
)
# only an owner can set an owner role to another user
if (
role == enums.MailDomainRoleChoices.OWNER
@@ -183,6 +184,23 @@ class MailDomainAccessSerializer(serializers.ModelSerializer):
)
return attrs
def create(self, validated_data):
"""
Override create function to fire requests to dimail on access creation.
"""
dimail = DimailAPIClient()
if validated_data["role"] in [
enums.MailDomainRoleChoices.ADMIN,
enums.MailDomainRoleChoices.OWNER,
]:
dimail.create_user(validated_data["user"].sub)
dimail.create_allow(
validated_data["user"].sub, validated_data["domain"].name
)
return super().create(validated_data)
class MailDomainAccessReadOnlySerializer(MailDomainAccessSerializer):
"""Serialize mail domain access for list and retrieve actions."""

View File

@@ -51,10 +51,12 @@ class MailDomainViewSet(
"""Set the current user as owner of the newly created mail domain."""
domain = serializer.save()
models.MailDomainAccess.objects.create(
user=self.request.user,
domain=domain,
role=core_models.RoleChoices.OWNER,
serializers.MailDomainAccessSerializer().create(
validated_data={
"user": self.request.user,
"domain": domain,
"role": str(core_models.RoleChoices.OWNER),
}
)

View File

@@ -2,9 +2,8 @@
Tests for MailDomains API endpoint in People's app mailbox_manager. Focus on "create" action.
"""
import logging
import re
from logging import Logger
from unittest import mock
import pytest
import responses
@@ -77,6 +76,27 @@ def test_api_mail_domains__create_authenticated():
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
"/api/v1.0/mail-domains/",
{"name": domain_name, "context": "null", "features": ["webmail"]},
@@ -103,13 +123,13 @@ def test_api_mail_domains__create_authenticated():
## SYNC TO DIMAIL
@mock.patch.object(Logger, "info")
def test_api_mail_domains__create_dimail_domain(mock_info):
def test_api_mail_domains__create_dimail_domain(caplog):
"""
Creating a domain should trigger a call to create a domain on dimail too.
"""
user = core_factories.UserFactory()
caplog.set_level(logging.INFO)
user = core_factories.UserFactory()
client = APIClient()
client.force_login(user)
domain_name = "test.fr"
@@ -126,6 +146,27 @@ def test_api_mail_domains__create_dimail_domain(mock_info):
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{
"name": "request-user-sub",
"is_admin": "false",
"uuid": "user-uuid-on-dimail",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": "request-user-sub", "domain": str(domain_name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
"/api/v1.0/mail-domains/",
{
@@ -138,13 +179,18 @@ def test_api_mail_domains__create_dimail_domain(mock_info):
assert rsp.call_count == 1 # endpoint was called
# Logger
assert len(caplog.records) == 4 # should be 3. Last empty info still here.
assert (
mock_info.call_count == 2
) # should be 1. A new empty info has been added. To be investigated
assert mock_info.call_args_list[0][0] == (
"Domain %s successfully created on dimail by user %s",
domain_name,
user.sub,
caplog.records[0].message
== f"Domain {domain_name} successfully created on dimail by user {user.sub}"
)
assert (
caplog.records[1].message
== f'[DIMAIL] User "{user.sub}" successfully created on dimail'
)
assert (
caplog.records[2].message
== f'[DIMAIL] Permissions granted for user "{user.sub}" on domain {domain_name}.'
)

View File

@@ -2,9 +2,12 @@
Test for mail domain accesses API endpoints in People's core app : create
"""
import logging
import random
import re
import pytest
import responses
from rest_framework import status
from rest_framework.test import APIClient
@@ -105,15 +108,16 @@ def test_api_mail_domain__accesses_create_authenticated_administrator():
client = APIClient()
client.force_login(authenticated_user)
# It should not be allowed to create an owner access
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": enums.MailDomainRoleChoices.OWNER,
},
format="json",
)
with responses.RequestsMock() as rsps:
# It should not be allowed to create an owner access
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": enums.MailDomainRoleChoices.OWNER,
},
format="json",
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json() == {
@@ -123,14 +127,38 @@ def test_api_mail_domain__accesses_create_authenticated_administrator():
# It should be allowed to create a lower access
for role in [enums.MailDomainRoleChoices.ADMIN, enums.MailDomainRoleChoices.VIEWER]:
other_user = core_factories.UserFactory()
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
)
with responses.RequestsMock() as rsps:
if role != enums.MailDomainRoleChoices.VIEWER:
# viewers don't have allows in dimail
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{
"name": str(other_user.sub),
"is_admin": "false",
"uuid": "71f60d74-a3ad-46bc-bc2b-20d79a2e36fb",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": other_user.sub, "domain": str(mail_domain.name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
new_mail_domain_access = models.MailDomainAccess.objects.filter(
user=other_user
@@ -150,19 +178,41 @@ def test_api_mail_domain__accesses_create_authenticated_owner():
users=[(authenticated_user, enums.MailDomainRoleChoices.OWNER)]
)
other_user = core_factories.UserFactory()
role = random.choice([role[0] for role in enums.MailDomainRoleChoices.choices])
client = APIClient()
client.force_login(authenticated_user)
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
)
with responses.RequestsMock() as rsps:
if role != enums.MailDomainRoleChoices.VIEWER:
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{
"name": str(other_user.sub),
"is_admin": "false",
"uuid": "71f60d74-a3ad-46bc-bc2b-20d79a2e36fb",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": other_user.sub, "domain": str(mail_domain.name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
f"/api/v1.0/mail-domains/{mail_domain.slug}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
assert models.MailDomainAccess.objects.filter(user=other_user).count() == 1
@@ -171,3 +221,149 @@ def test_api_mail_domain__accesses_create_authenticated_owner():
).get()
assert response.json()["id"] == str(new_mail_domain_access.id)
assert response.json()["role"] == role
## INTEROP WITH DIMAIL
def test_api_mail_domains_accesses__create_dimail_allows(caplog):
"""
Creating a domain access on our API should trigger a request to create an access on dimail too.
"""
caplog.set_level(logging.INFO)
authenticated_user = core_factories.UserFactory()
domain = factories.MailDomainFactory(status="enabled")
factories.MailDomainAccessFactory(
domain=domain, user=authenticated_user, role=enums.MailDomainRoleChoices.OWNER
)
client = APIClient()
client.force_login(authenticated_user)
allowed_user = core_factories.UserFactory()
with responses.RequestsMock() as rsps:
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{
"name": str(allowed_user.sub),
"is_admin": "false",
"uuid": "71f60d74-a3ad-46bc-bc2b-20d79a2e36fb",
"perms": [],
}
),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": allowed_user.sub, "domain": str(domain.name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
f"/api/v1.0/mail-domains/{domain.slug}/accesses/",
{
"user": str(allowed_user.id),
"role": enums.MailDomainRoleChoices.ADMIN,
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
# check logs
assert (
caplog.records[0].message
== f'[DIMAIL] User "{allowed_user.sub}" successfully created on dimail'
)
assert (
caplog.records[1].message
== f'[DIMAIL] Permissions granted for user "{allowed_user.sub}" on domain {domain.name}.'
)
def test_api_mail_domains_accesses__dont_create_dimail_allows_for_viewer(caplog):
"""Dimail should not be called when creating an access to a simple viewer."""
caplog.set_level(logging.INFO)
authenticated_user = core_factories.UserFactory()
domain = factories.MailDomainFactory(status="enabled")
factories.MailDomainAccessFactory(
domain=domain, user=authenticated_user, role=enums.MailDomainRoleChoices.OWNER
)
client = APIClient()
client.force_login(authenticated_user)
allowed_user = core_factories.UserFactory()
with responses.RequestsMock():
# No call expected
response = client.post(
f"/api/v1.0/mail-domains/{domain.slug}/accesses/",
{
"user": str(allowed_user.id),
"role": enums.MailDomainRoleChoices.VIEWER,
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
# check logs
assert len(caplog.records) == 1 # should be 0, investigate this damn empty message
def test_api_mail_domains_accesses__user_already_on_dimail(caplog):
"""The expected allow should be created when an user already exists on dimail
(i.e. previous admin/owner of same domain or current on another domain)."""
caplog.set_level(logging.INFO)
authenticated_user = core_factories.UserFactory()
domain = factories.MailDomainFactory()
factories.MailDomainAccessFactory(
domain=domain, user=authenticated_user, role=enums.MailDomainRoleChoices.OWNER
)
client = APIClient()
client.force_login(authenticated_user)
allowed_user = core_factories.UserFactory()
with responses.RequestsMock() as rsps:
# No call expected
rsps.add(
rsps.POST,
re.compile(r".*/users/"),
body=str(
{"detail": "User already exists"}
), # the user is already on dimail
status=status.HTTP_409_CONFLICT,
content_type="application/json",
)
rsps.add(
rsps.POST,
re.compile(r".*/allows/"),
body=str({"user": allowed_user.sub, "domain": str(domain.name)}),
status=status.HTTP_201_CREATED,
content_type="application/json",
)
response = client.post(
f"/api/v1.0/mail-domains/{domain.slug}/accesses/",
{
"user": str(allowed_user.id),
"role": enums.MailDomainRoleChoices.ADMIN,
},
format="json",
)
assert response.status_code == status.HTTP_201_CREATED
# check logs
assert len(caplog.records) == 3 # should be 2, investigate this damn empty message
assert (
caplog.records[0].message
== f'[DIMAIL] Attempt to create user "{allowed_user.sub}" which already exists.'
)
assert (
caplog.records[1].message
== f'[DIMAIL] Permissions granted for user "{allowed_user.sub}" on domain {domain.name}.'
)

View File

@@ -157,6 +157,85 @@ class DimailAPIClient:
return self.raise_exception_for_unexpected_response(response)
def create_user(self, user_sub):
"""Send a request to dimail, to create a new user there."""
payload = {"name": user_sub, "password": "no", "is_admin": "false", "perms": []}
try:
response = session.post(
f"{self.API_URL}/users/",
headers={"Authorization": f"Basic {self.API_CREDENTIALS}"},
json=payload,
verify=True,
timeout=10,
)
except requests.exceptions.ConnectionError as error:
logger.error(
"Connection error while trying to reach %s.",
self.API_URL,
exc_info=error,
)
raise error
if response.status_code == status.HTTP_201_CREATED:
logger.info(
'[DIMAIL] User "%s" successfully created on dimail',
user_sub,
)
return response
if response.status_code == status.HTTP_409_CONFLICT:
logger.info(
'[DIMAIL] Attempt to create user "%s" which already exists.',
user_sub,
)
return response
return self.raise_exception_for_unexpected_response(response)
def create_allow(self, user_sub, domain_name):
"""Send a request to dimail for a new 'allow' between user and the domain."""
payload = {
"user": user_sub,
"domain": domain_name,
}
try:
response = session.post(
f"{self.API_URL}/allows/",
headers={"Authorization": f"Basic {self.API_CREDENTIALS}"},
json=payload,
verify=True,
timeout=10,
)
except requests.exceptions.ConnectionError as error:
logger.error(
"Connection error while trying to reach %s.",
self.API_URL,
exc_info=error,
)
raise error
if response.status_code == status.HTTP_201_CREATED:
logger.info(
'[DIMAIL] Permissions granted for user "%s" on domain %s.',
user_sub,
domain_name,
)
return response
if response.status_code == status.HTTP_409_CONFLICT:
logger.info(
'[DIMAIL] Attempt to create already existing permission between "%s" and "%s".',
user_sub,
domain_name,
)
return response
return self.raise_exception_for_unexpected_response(response)
def raise_exception_for_unexpected_response(self, response):
"""Raise error when encountering an unexpected error in dimail."""
try: