✨(domains) add periodic task to fetch domains status
Add celery crontab to check and update domains status. This task calls dimail API.
This commit is contained in:
@@ -10,6 +10,7 @@ and this project adheres to
|
|||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
|
- ✨(domains) add periodic tasks to fetch domain status
|
||||||
- 🧑💻(docker) add celery beat to manage periodic tasks
|
- 🧑💻(docker) add celery beat to manage periodic tasks
|
||||||
- ✨(organization) add metadata field #790
|
- ✨(organization) add metadata field #790
|
||||||
- ⬆️(nginx) bump nginx-unprivileged to 1.27 #797
|
- ⬆️(nginx) bump nginx-unprivileged to 1.27 #797
|
||||||
|
|||||||
65
src/backend/mailbox_manager/tasks.py
Normal file
65
src/backend/mailbox_manager/tasks.py
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
"""Mailbox manager tasks."""
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from celery import Celery
|
||||||
|
from celery.schedules import crontab
|
||||||
|
from celery.utils.log import get_task_logger
|
||||||
|
|
||||||
|
from mailbox_manager import enums
|
||||||
|
from mailbox_manager.models import MailDomain
|
||||||
|
from mailbox_manager.utils.dimail import DimailAPIClient
|
||||||
|
from people.celery_app import app as celery_app
|
||||||
|
|
||||||
|
logger = get_task_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@celery_app.on_after_finalize.connect
|
||||||
|
def setup_periodic_tasks(sender: Celery, **kwargs):
|
||||||
|
"""Setup periodic tasks."""
|
||||||
|
sender.add_periodic_task(
|
||||||
|
crontab(hour="3", minute="45", day_of_week="1"),
|
||||||
|
fetch_domains_status_task.s(status=enums.MailDomainStatusChoices.ENABLED),
|
||||||
|
name="fetch_enabled_domains_every_monday_at_3_45",
|
||||||
|
serializer="json",
|
||||||
|
)
|
||||||
|
sender.add_periodic_task(
|
||||||
|
crontab(minute="0"), # Run at the start of every hour
|
||||||
|
fetch_domains_status_task.s(status=enums.MailDomainStatusChoices.PENDING),
|
||||||
|
name="fetch_pending_domains_every_hour",
|
||||||
|
serializer="json",
|
||||||
|
)
|
||||||
|
sender.add_periodic_task(
|
||||||
|
crontab(minute="30"), # Run at the 30th minute of every hour
|
||||||
|
fetch_domains_status_task.s(
|
||||||
|
status=enums.MailDomainStatusChoices.ACTION_REQUIRED
|
||||||
|
),
|
||||||
|
name="fetch_action_required_domains_every_hour",
|
||||||
|
serializer="json",
|
||||||
|
)
|
||||||
|
sender.add_periodic_task(
|
||||||
|
crontab(minute="45"), # Run at the 45th minute of every hour
|
||||||
|
fetch_domains_status_task.s(status=enums.MailDomainStatusChoices.FAILED),
|
||||||
|
name="fetch_failed_domains_every_hour",
|
||||||
|
serializer="json",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@celery_app.task
|
||||||
|
def fetch_domains_status_task(status: str):
|
||||||
|
"""Celery task to call dimail to check and update domains status."""
|
||||||
|
client = DimailAPIClient()
|
||||||
|
changed_domains = []
|
||||||
|
for domain in MailDomain.objects.filter(status=status):
|
||||||
|
old_status = domain.status
|
||||||
|
# wait 10 seconds between each domain treatment to avoid overloading dimail
|
||||||
|
time.sleep(10)
|
||||||
|
try:
|
||||||
|
client.fetch_domain_status(domain)
|
||||||
|
except requests.exceptions.HTTPError as err:
|
||||||
|
logger.error("Failed to fetch status for domain %s: %s", domain.name, err)
|
||||||
|
else:
|
||||||
|
if old_status != domain.status:
|
||||||
|
changed_domains.append(f"{domain.name} ({domain.status})")
|
||||||
|
return changed_domains
|
||||||
103
src/backend/mailbox_manager/tests/test_tasks.py
Normal file
103
src/backend/mailbox_manager/tests/test_tasks.py
Normal file
@@ -0,0 +1,103 @@
|
|||||||
|
"""
|
||||||
|
Unit tests for mailbox manager tasks.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import responses
|
||||||
|
|
||||||
|
from mailbox_manager import enums, factories, tasks
|
||||||
|
|
||||||
|
from .fixtures.dimail import CHECK_DOMAIN_BROKEN_INTERNAL, CHECK_DOMAIN_OK
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.django_db
|
||||||
|
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_fetch_domain_status_task_success(): # pylint: disable=too-many-locals
|
||||||
|
"""Test fetch domain status from dimail task"""
|
||||||
|
|
||||||
|
domain_enabled1 = factories.MailDomainEnabledFactory()
|
||||||
|
domain_enabled2 = factories.MailDomainEnabledFactory()
|
||||||
|
domain_disabled = factories.MailDomainFactory(
|
||||||
|
status=enums.MailDomainStatusChoices.DISABLED
|
||||||
|
)
|
||||||
|
domain_failed = factories.MailDomainFactory(
|
||||||
|
status=enums.MailDomainStatusChoices.FAILED
|
||||||
|
)
|
||||||
|
|
||||||
|
body_content_ok1 = CHECK_DOMAIN_OK.copy()
|
||||||
|
body_content_ok1["name"] = domain_enabled1.name
|
||||||
|
|
||||||
|
body_content_broken = CHECK_DOMAIN_BROKEN_INTERNAL.copy()
|
||||||
|
body_content_broken["name"] = domain_enabled2.name
|
||||||
|
|
||||||
|
body_content_ok2 = CHECK_DOMAIN_OK.copy()
|
||||||
|
body_content_ok2["name"] = domain_disabled.name
|
||||||
|
|
||||||
|
body_content_ok3 = CHECK_DOMAIN_OK.copy()
|
||||||
|
body_content_ok3["name"] = domain_failed.name
|
||||||
|
for domain, body_content in [
|
||||||
|
(domain_enabled1, body_content_ok1),
|
||||||
|
(domain_enabled2, body_content_broken),
|
||||||
|
(domain_failed, body_content_ok3),
|
||||||
|
]:
|
||||||
|
# Mock dimail API with success response
|
||||||
|
responses.add(
|
||||||
|
responses.GET,
|
||||||
|
re.compile(rf".*/domains/{domain.name}/check/"),
|
||||||
|
body=json.dumps(body_content),
|
||||||
|
status=200,
|
||||||
|
content_type="application/json",
|
||||||
|
)
|
||||||
|
# domain_enabled2 is broken with internal error, we try to fix it
|
||||||
|
responses.add(
|
||||||
|
responses.GET,
|
||||||
|
re.compile(rf".*/domains/{domain_enabled2.name}/fix/"),
|
||||||
|
body=json.dumps(body_content_broken),
|
||||||
|
status=200,
|
||||||
|
content_type="application/json",
|
||||||
|
)
|
||||||
|
tasks.fetch_domains_status_task(enums.MailDomainStatusChoices.ENABLED)
|
||||||
|
tasks.fetch_domains_status_task(enums.MailDomainStatusChoices.FAILED)
|
||||||
|
tasks.fetch_domains_status_task(enums.MailDomainStatusChoices.ACTION_REQUIRED)
|
||||||
|
tasks.fetch_domains_status_task(enums.MailDomainStatusChoices.PENDING)
|
||||||
|
domain_enabled1.refresh_from_db()
|
||||||
|
domain_enabled2.refresh_from_db()
|
||||||
|
domain_disabled.refresh_from_db()
|
||||||
|
domain_failed.refresh_from_db()
|
||||||
|
# Nothing change for the first domain enable
|
||||||
|
assert domain_enabled1.status == enums.MailDomainStatusChoices.ENABLED
|
||||||
|
# Status of the second activated domain has changed to failure
|
||||||
|
assert domain_enabled2.status == enums.MailDomainStatusChoices.FAILED
|
||||||
|
# Status of the failed domain has changed to enabled
|
||||||
|
assert domain_failed.status == enums.MailDomainStatusChoices.ENABLED
|
||||||
|
# Disabled domain was excluded
|
||||||
|
assert domain_disabled.status == enums.MailDomainStatusChoices.DISABLED
|
||||||
|
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_fetch_domains_status_error_handling(caplog):
|
||||||
|
"""Test fetch domain status from dimail task with error"""
|
||||||
|
caplog.set_level("ERROR")
|
||||||
|
|
||||||
|
domain = factories.MailDomainEnabledFactory()
|
||||||
|
|
||||||
|
# Mock dimail API with error response
|
||||||
|
responses.add(
|
||||||
|
responses.GET,
|
||||||
|
re.compile(rf".*/domains/{domain.name}/check/"),
|
||||||
|
body=json.dumps({"error": "Internal Server Error"}),
|
||||||
|
status=500,
|
||||||
|
content_type="application/json",
|
||||||
|
)
|
||||||
|
|
||||||
|
tasks.fetch_domains_status_task(enums.MailDomainStatusChoices.ENABLED)
|
||||||
|
domain.refresh_from_db()
|
||||||
|
|
||||||
|
# Domain status should remain unchanged
|
||||||
|
assert domain.status == enums.MailDomainStatusChoices.ENABLED
|
||||||
|
# Check that error was logged
|
||||||
|
assert f"Failed to fetch status for domain {domain.name}" in caplog.text
|
||||||
@@ -1 +1,5 @@
|
|||||||
"""People module."""
|
"""People module."""
|
||||||
|
|
||||||
|
from .celery_app import app as celery_app
|
||||||
|
|
||||||
|
__all__ = ("celery_app",)
|
||||||
|
|||||||
@@ -220,12 +220,14 @@ class Base(Configuration):
|
|||||||
"admin.apps.PeopleAdminConfig", # replaces 'django.contrib.admin'
|
"admin.apps.PeopleAdminConfig", # replaces 'django.contrib.admin'
|
||||||
"core",
|
"core",
|
||||||
"demo",
|
"demo",
|
||||||
"mailbox_manager",
|
"mailbox_manager.apps.MailboxManagerConfig",
|
||||||
"mailbox_oauth2",
|
"mailbox_oauth2",
|
||||||
"drf_spectacular",
|
"drf_spectacular",
|
||||||
"drf_spectacular_sidecar", # required for Django collectstatic discovery
|
"drf_spectacular_sidecar", # required for Django collectstatic discovery
|
||||||
# Third party apps
|
# Third party apps
|
||||||
"corsheaders",
|
"corsheaders",
|
||||||
|
"django_celery_beat",
|
||||||
|
"django_celery_results",
|
||||||
"dockerflow.django",
|
"dockerflow.django",
|
||||||
"easy_thumbnails",
|
"easy_thumbnails",
|
||||||
"oauth2_provider",
|
"oauth2_provider",
|
||||||
@@ -331,7 +333,12 @@ class Base(Configuration):
|
|||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
CELERY_BROKER_URL = values.Value("redis://redis:6379/0")
|
CELERY_BROKER_URL = values.Value("redis://redis:6379/0")
|
||||||
|
CELERY_RESULT_BACKEND = "django-db"
|
||||||
|
CELERY_CACHE_BACKEND = "django-cache"
|
||||||
CELERY_BROKER_TRANSPORT_OPTIONS = values.DictValue({})
|
CELERY_BROKER_TRANSPORT_OPTIONS = values.DictValue({})
|
||||||
|
CELERY_RESULT_EXTENDED = True
|
||||||
|
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 * 30 # 30 days
|
||||||
|
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
|
||||||
|
|
||||||
# Session
|
# Session
|
||||||
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
|
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
|
||||||
|
|||||||
@@ -29,6 +29,8 @@ dependencies = [
|
|||||||
"PyJWT==2.10.1",
|
"PyJWT==2.10.1",
|
||||||
"boto3==1.37.10",
|
"boto3==1.37.10",
|
||||||
"celery[redis]==5.4.0",
|
"celery[redis]==5.4.0",
|
||||||
|
"django-celery-beat==2.7.0",
|
||||||
|
"django-celery-results==2.5.1",
|
||||||
"django-configurations==2.5.1",
|
"django-configurations==2.5.1",
|
||||||
"django-cors-headers==4.7.0",
|
"django-cors-headers==4.7.0",
|
||||||
"django-countries==7.6.1",
|
"django-countries==7.6.1",
|
||||||
|
|||||||
Reference in New Issue
Block a user