diff --git a/src/backend/core/tasks/search.py b/src/backend/core/tasks/search.py index c9d36b15..9f3a8cf1 100644 --- a/src/backend/core/tasks/search.py +++ b/src/backend/core/tasks/search.py @@ -5,56 +5,50 @@ from logging import getLogger from django.conf import settings from django.core.cache import cache +from django_redis.cache import RedisCache + +from core import models +from core.services.search_indexers import ( + get_batch_accesses_by_users_and_teams, + get_document_indexer, +) + from impress.celery_app import app logger = getLogger(__file__) -def indexer_debounce_lock(document_id): - """Increase or reset counter""" - key = f"doc-indexer-debounce-{document_id}" +def indexer_throttle_acquire(document_id, timeout=0, atomic=True): + """ + Enable the task throttle flag for a delay. + Uses redis locks if available to ensure atomic changes + """ + key = f"doc-indexer-throttle-{document_id}" - try: - return cache.incr(key) - except ValueError: - cache.set(key, 1) - return 1 + if isinstance(cache, RedisCache) and atomic: + with cache.locks(key): + return indexer_throttle_acquire(document_id, timeout, atomic=False) - -def indexer_debounce_release(document_id): - """Decrease or reset counter""" - key = f"doc-indexer-debounce-{document_id}" - - try: - return cache.decr(key) - except ValueError: - cache.set(key, 0) - return 0 + # Use add() here : + # - set the flag and returns true if not exist + # - do nothing and return false if exist + return cache.add(key, 1, timeout=timeout) @app.task def document_indexer_task(document_id): """Celery Task : Sends indexation query for a document.""" - # Prevents some circular imports - # pylint: disable=import-outside-toplevel - from core import models # noqa : PLC0415 - from core.services.search_indexers import ( # noqa : PLC0415 - get_batch_accesses_by_users_and_teams, - get_document_indexer, - ) - - # check if the counter : if still up, skip the task. only the last one - # within the countdown delay will do the query. - if indexer_debounce_release(document_id) > 0: - logger.info("Skip document %s indexation", document_id) - return - indexer = get_document_indexer() if indexer is None: return - doc = models.Document.objects.get(pk=document_id) + try: + doc = models.Document.objects.get(pk=document_id) + except models.Document.DoesNotExist: + # Skip the task if the document does not exist. + return + accesses = get_batch_accesses_by_users_and_teams((doc.path,)) data = indexer.serialize_document(document=doc, accesses=accesses) @@ -76,14 +70,15 @@ def trigger_document_indexer(document): if not settings.SEARCH_INDEXER_CLASS: return - logger.info( - "Add task for document %s indexation in %.2f seconds", - document.pk, - countdown, - ) - - # Each time this method is called during the countdown, we increment the + # Each time this method is called during a countdown, we increment the # counter and each task decrease it, so the index be run only once. - indexer_debounce_lock(document.pk) + if indexer_throttle_acquire(document.pk, timeout=countdown): + logger.info( + "Add task for document %s indexation in %.2f seconds", + document.pk, + countdown, + ) - document_indexer_task.apply_async(args=[document.pk], countdown=countdown) + document_indexer_task.apply_async(args=[document.pk]) + else: + logger.info("Skip task for document %s indexation", document.pk) diff --git a/src/backend/core/tests/documents/test_api_documents_search.py b/src/backend/core/tests/documents/test_api_documents_search.py index 0c46eb2b..869a8d56 100644 --- a/src/backend/core/tests/documents/test_api_documents_search.py +++ b/src/backend/core/tests/documents/test_api_documents_search.py @@ -93,6 +93,7 @@ def test_api_documents_search_endpoint_is_none(indexer_settings): "path": document.path, "title": document.title, "updated_at": document.updated_at.isoformat().replace("+00:00", "Z"), + "deleted_at": None, "user_role": access.role, } @@ -184,6 +185,7 @@ def test_api_documents_search_format(indexer_settings): "path": document.path, "title": document.title, "updated_at": document.updated_at.isoformat().replace("+00:00", "Z"), + "deleted_at": None, "user_role": access.role, } diff --git a/src/backend/core/tests/test_models_documents.py b/src/backend/core/tests/test_models_documents.py index 9a97f51c..0d8f44dd 100644 --- a/src/backend/core/tests/test_models_documents.py +++ b/src/backend/core/tests/test_models_documents.py @@ -22,6 +22,7 @@ import pytest from core import factories, models from core.services.search_indexers import SearchIndexer +from core.tasks.search import document_indexer_task pytestmark = pytest.mark.django_db @@ -1645,10 +1646,10 @@ def test_models_documents_post_save_indexer(mock_push, indexer_settings): key=itemgetter("id"), ) - # The debounce counters should be reset - assert cache.get(f"doc-indexer-debounce-{doc1.pk}") == 0 - assert cache.get(f"doc-indexer-debounce-{doc2.pk}") == 0 - assert cache.get(f"doc-indexer-debounce-{doc3.pk}") == 0 + # The throttle counters should be reset + assert cache.get(f"doc-indexer-throttle-{doc1.pk}") is None + assert cache.get(f"doc-indexer-throttle-{doc2.pk}") is None + assert cache.get(f"doc-indexer-throttle-{doc3.pk}") is None @mock.patch.object(SearchIndexer, "push") @@ -1658,10 +1659,31 @@ def test_models_documents_post_save_indexer_not_configured(mock_push, indexer_se indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 indexer_settings.SEARCH_INDEXER_CLASS = None - with transaction.atomic(): - factories.DocumentFactory() + user = factories.UserFactory() - assert mock_push.call_args_list == [] + with transaction.atomic(): + doc = factories.DocumentFactory() + factories.UserDocumentAccessFactory(document=doc, user=user) + + assert mock_push.assert_not_called + + +@mock.patch.object(SearchIndexer, "push") +@pytest.mark.django_db(transaction=True) +def test_models_documents_post_save_indexer_wrongly_configured( + mock_push, indexer_settings +): + """Task should not start an indexation when disabled""" + indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 + indexer_settings.SEARCH_INDEXER_URL = None + + user = factories.UserFactory() + + with transaction.atomic(): + doc = factories.DocumentFactory() + factories.UserDocumentAccessFactory(document=doc, user=user) + + assert mock_push.assert_not_called @mock.patch.object(SearchIndexer, "push") @@ -1698,10 +1720,10 @@ def test_models_documents_post_save_indexer_with_accesses(mock_push, indexer_set key=itemgetter("id"), ) - # The debounce counters should be reset - assert cache.get(f"doc-indexer-debounce-{doc1.pk}") == 0 - assert cache.get(f"doc-indexer-debounce-{doc2.pk}") == 0 - assert cache.get(f"doc-indexer-debounce-{doc3.pk}") == 0 + # The throttle counters should be reset + assert cache.get(f"doc-indexer-throttle-{doc1.pk}") is None + assert cache.get(f"doc-indexer-throttle-{doc2.pk}") is None + assert cache.get(f"doc-indexer-throttle-{doc3.pk}") is None @mock.patch.object(SearchIndexer, "push") @@ -1760,10 +1782,34 @@ def test_models_documents_post_save_indexer_deleted(mock_push, indexer_settings) key=itemgetter("id"), ) - # The debounce counters should be reset - assert cache.get(f"doc-indexer-debounce-{doc.pk}") == 0 - assert cache.get(f"doc-indexer-debounce-{doc_deleted.pk}") == 0 - assert cache.get(f"doc-indexer-debounce-{doc_ancestor_deleted.pk}") == 0 + # The throttle counters should be reset + assert cache.get(f"doc-indexer-throttle-{doc.pk}") is None + assert cache.get(f"doc-indexer-throttle-{doc_deleted.pk}") is None + assert cache.get(f"doc-indexer-throttle-{doc_ancestor_deleted.pk}") is None + + +@pytest.mark.django_db(transaction=True) +def test_models_documents_indexer_hard_deleted(indexer_settings): + """Indexation task on hard deleted document""" + indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 + + user = factories.UserFactory() + + with transaction.atomic(): + doc = factories.DocumentFactory( + link_reach=models.LinkReachChoices.AUTHENTICATED + ) + factories.UserDocumentAccessFactory(document=doc, user=user) + + doc_id = doc.pk + doc.delete() + + # Call task on deleted document. + document_indexer_task.apply(args=[doc_id]) + + with mock.patch.object(SearchIndexer, "push") as mock_push: + # Hard delete document are not re-indexed. + assert mock_push.assert_not_called @mock.patch.object(SearchIndexer, "push") @@ -1836,7 +1882,7 @@ def test_models_documents_post_save_indexer_restored(mock_push, indexer_settings @pytest.mark.django_db(transaction=True) -def test_models_documents_post_save_indexer_debounce(indexer_settings): +def test_models_documents_post_save_indexer_throttle(indexer_settings): """Test indexation task skipping on document update""" indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 @@ -1853,11 +1899,11 @@ def test_models_documents_post_save_indexer_debounce(indexer_settings): } with mock.patch.object(SearchIndexer, "push") as mock_push: - # Simulate 1 waiting task - cache.set(f"doc-indexer-debounce-{doc.pk}", 1) + # Simulate 1 running task + cache.set(f"doc-indexer-throttle-{doc.pk}", 1) # save doc to trigger the indexer, but nothing should be done since - # the counter is over 0 + # the flag is up with transaction.atomic(): doc.save() @@ -1865,7 +1911,7 @@ def test_models_documents_post_save_indexer_debounce(indexer_settings): with mock.patch.object(SearchIndexer, "push") as mock_push: # No waiting task - cache.set(f"doc-indexer-debounce-{doc.pk}", 0) + cache.delete(f"doc-indexer-throttle-{doc.pk}") with transaction.atomic(): doc = models.Document.objects.get(pk=doc.pk)