diff --git a/src/backend/core/models.py b/src/backend/core/models.py index 424d6b89..453e683f 100644 --- a/src/backend/core/models.py +++ b/src/backend/core/models.py @@ -906,7 +906,8 @@ class Document(MP_Node, BaseModel): # Mark all descendants as soft deleted self.get_descendants().filter(ancestors_deleted_at__isnull=True).update( - ancestors_deleted_at=self.ancestors_deleted_at + ancestors_deleted_at=self.ancestors_deleted_at, + updated_at=self.updated_at, ) @transaction.atomic diff --git a/src/backend/core/services/search_indexers.py b/src/backend/core/services/search_indexers.py index abaa4d45..45184f55 100644 --- a/src/backend/core/services/search_indexers.py +++ b/src/backend/core/services/search_indexers.py @@ -130,16 +130,17 @@ class BaseDocumentIndexer(ABC): "SEARCH_INDEXER_QUERY_URL must be set in Django settings." ) - def index(self): + def index(self, queryset=None): """ Fetch documents in batches, serialize them, and push to the search backend. """ last_id = 0 count = 0 + queryset = queryset or models.Document.objects.all() while True: documents_batch = list( - models.Document.objects.filter( + queryset.filter( id__gt=last_id, ).order_by("id")[: self.batch_size] ) diff --git a/src/backend/core/signals.py b/src/backend/core/signals.py index 1aa95f44..4bd96477 100644 --- a/src/backend/core/signals.py +++ b/src/backend/core/signals.py @@ -9,7 +9,7 @@ from django.db.models import signals from django.dispatch import receiver from . import models -from .tasks.search import trigger_document_indexer +from .tasks.search import trigger_batch_document_indexer @receiver(signals.post_save, sender=models.Document) @@ -19,7 +19,7 @@ def document_post_save(sender, instance, **kwargs): # pylint: disable=unused-ar Note : Within the transaction we can have an empty content and a serialization error. """ - transaction.on_commit(partial(trigger_document_indexer, instance)) + transaction.on_commit(partial(trigger_batch_document_indexer, instance)) @receiver(signals.post_save, sender=models.DocumentAccess) @@ -28,4 +28,6 @@ def document_access_post_save(sender, instance, created, **kwargs): # pylint: d Asynchronous call to the document indexer at the end of the transaction. """ if not created: - transaction.on_commit(partial(trigger_document_indexer, instance.document)) + transaction.on_commit( + partial(trigger_batch_document_indexer, instance.document) + ) diff --git a/src/backend/core/tasks/search.py b/src/backend/core/tasks/search.py index 9f3a8cf1..ebdf6c15 100644 --- a/src/backend/core/tasks/search.py +++ b/src/backend/core/tasks/search.py @@ -4,12 +4,12 @@ from logging import getLogger from django.conf import settings from django.core.cache import cache +from django.db.models import Q from django_redis.cache import RedisCache from core import models from core.services.search_indexers import ( - get_batch_accesses_by_users_and_teams, get_document_indexer, ) @@ -18,23 +18,6 @@ from impress.celery_app import app logger = getLogger(__file__) -def indexer_throttle_acquire(document_id, timeout=0, atomic=True): - """ - Enable the task throttle flag for a delay. - Uses redis locks if available to ensure atomic changes - """ - key = f"doc-indexer-throttle-{document_id}" - - if isinstance(cache, RedisCache) and atomic: - with cache.locks(key): - return indexer_throttle_acquire(document_id, timeout, atomic=False) - - # Use add() here : - # - set the flag and returns true if not exist - # - do nothing and return false if exist - return cache.add(key, 1, timeout=timeout) - - @app.task def document_indexer_task(document_id): """Celery Task : Sends indexation query for a document.""" @@ -43,42 +26,72 @@ def document_indexer_task(document_id): if indexer is None: return - try: - doc = models.Document.objects.get(pk=document_id) - except models.Document.DoesNotExist: - # Skip the task if the document does not exist. - return - - accesses = get_batch_accesses_by_users_and_teams((doc.path,)) - - data = indexer.serialize_document(document=doc, accesses=accesses) - logger.info("Start document %s indexation", document_id) - indexer.push(data) + indexer.index(models.Document.objects.filter(pk=document_id)) -def trigger_document_indexer(document): +def batch_indexer_throttle_acquire(timeout: int = 0, atomic: bool = True): + """ + Enable the task throttle flag for a delay. + Uses redis locks if available to ensure atomic changes + """ + key = "document-batch-indexer-throttle" + + # Redis is used as cache database (not in tests). Use the lock feature here + # to ensure atomicity of changes to the throttle flag. + if isinstance(cache, RedisCache) and atomic: + with cache.locks(key): + return batch_indexer_throttle_acquire(timeout, atomic=False) + + # Use add() here : + # - set the flag and returns true if not exist + # - do nothing and return false if exist + return cache.add(key, 1, timeout=timeout) + + +@app.task +def batch_document_indexer_task(timestamp): + """Celery Task : Sends indexation query for a batch of documents.""" + indexer = get_document_indexer() + + if indexer: + queryset = models.Document.objects.filter( + Q(updated_at__gte=timestamp) + | Q(deleted_at__gte=timestamp) + | Q(ancestors_deleted_at__gte=timestamp) + ) + + count = indexer.index(queryset) + logger.info("Indexed %d documents", count) + + +def trigger_batch_document_indexer(item): """ Trigger indexation task with debounce a delay set by the SEARCH_INDEXER_COUNTDOWN setting. Args: document (Document): The document instance. """ - countdown = settings.SEARCH_INDEXER_COUNTDOWN + countdown = int(settings.SEARCH_INDEXER_COUNTDOWN) # DO NOT create a task if indexation if disabled if not settings.SEARCH_INDEXER_CLASS: return - # Each time this method is called during a countdown, we increment the - # counter and each task decrease it, so the index be run only once. - if indexer_throttle_acquire(document.pk, timeout=countdown): - logger.info( - "Add task for document %s indexation in %.2f seconds", - document.pk, - countdown, - ) + if countdown > 0: + # Each time this method is called during a countdown, we increment the + # counter and each task decrease it, so the index be run only once. + if batch_indexer_throttle_acquire(timeout=countdown): + logger.info( + "Add task for batch document indexation from updated_at=%s in %d seconds", + item.updated_at.isoformat(), + countdown, + ) - document_indexer_task.apply_async(args=[document.pk]) + batch_document_indexer_task.apply_async( + args=[item.updated_at], countdown=countdown + ) + else: + logger.info("Skip task for batch document %s indexation", item.pk) else: - logger.info("Skip task for document %s indexation", document.pk) + document_indexer_task.apply(args=[item.pk]) diff --git a/src/backend/core/tests/conftest.py b/src/backend/core/tests/conftest.py index ba607b42..65e39269 100644 --- a/src/backend/core/tests/conftest.py +++ b/src/backend/core/tests/conftest.py @@ -45,6 +45,7 @@ def indexer_settings_fixture(settings): settings.SEARCH_INDEXER_QUERY_URL = ( "http://localhost:8081/api/v1.0/documents/search/" ) + settings.SEARCH_INDEXER_COUNTDOWN = 1 yield settings diff --git a/src/backend/core/tests/test_models_documents.py b/src/backend/core/tests/test_models_documents.py index 0d8f44dd..91b8abf9 100644 --- a/src/backend/core/tests/test_models_documents.py +++ b/src/backend/core/tests/test_models_documents.py @@ -6,7 +6,6 @@ Unit tests for the Document model import random import smtplib from logging import Logger -from operator import itemgetter from unittest import mock from django.contrib.auth.models import AnonymousUser @@ -14,15 +13,12 @@ from django.core import mail from django.core.cache import cache from django.core.exceptions import ValidationError from django.core.files.storage import default_storage -from django.db import transaction from django.test.utils import override_settings from django.utils import timezone import pytest from core import factories, models -from core.services.search_indexers import SearchIndexer -from core.tasks.search import document_indexer_task pytestmark = pytest.mark.django_db @@ -1621,330 +1617,3 @@ def test_models_documents_compute_ancestors_links_paths_mapping_structure( {"link_reach": sibling.link_reach, "link_role": sibling.link_role}, ], } - - -@mock.patch.object(SearchIndexer, "push") -@pytest.mark.django_db(transaction=True) -def test_models_documents_post_save_indexer(mock_push, indexer_settings): - """Test indexation task on document creation""" - indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 - - with transaction.atomic(): - doc1, doc2, doc3 = factories.DocumentFactory.create_batch(3) - - accesses = {} - data = [call.args[0] for call in mock_push.call_args_list] - - indexer = SearchIndexer() - - assert sorted(data, key=itemgetter("id")) == sorted( - [ - indexer.serialize_document(doc1, accesses), - indexer.serialize_document(doc2, accesses), - indexer.serialize_document(doc3, accesses), - ], - key=itemgetter("id"), - ) - - # The throttle counters should be reset - assert cache.get(f"doc-indexer-throttle-{doc1.pk}") is None - assert cache.get(f"doc-indexer-throttle-{doc2.pk}") is None - assert cache.get(f"doc-indexer-throttle-{doc3.pk}") is None - - -@mock.patch.object(SearchIndexer, "push") -@pytest.mark.django_db(transaction=True) -def test_models_documents_post_save_indexer_not_configured(mock_push, indexer_settings): - """Task should not start an indexation when disabled""" - indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 - indexer_settings.SEARCH_INDEXER_CLASS = None - - user = factories.UserFactory() - - with transaction.atomic(): - doc = factories.DocumentFactory() - factories.UserDocumentAccessFactory(document=doc, user=user) - - assert mock_push.assert_not_called - - -@mock.patch.object(SearchIndexer, "push") -@pytest.mark.django_db(transaction=True) -def test_models_documents_post_save_indexer_wrongly_configured( - mock_push, indexer_settings -): - """Task should not start an indexation when disabled""" - indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 - indexer_settings.SEARCH_INDEXER_URL = None - - user = factories.UserFactory() - - with transaction.atomic(): - doc = factories.DocumentFactory() - factories.UserDocumentAccessFactory(document=doc, user=user) - - assert mock_push.assert_not_called - - -@mock.patch.object(SearchIndexer, "push") -@pytest.mark.django_db(transaction=True) -def test_models_documents_post_save_indexer_with_accesses(mock_push, indexer_settings): - """Test indexation task on document creation""" - indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 - - user = factories.UserFactory() - - with transaction.atomic(): - doc1, doc2, doc3 = factories.DocumentFactory.create_batch(3) - - factories.UserDocumentAccessFactory(document=doc1, user=user) - factories.UserDocumentAccessFactory(document=doc2, user=user) - factories.UserDocumentAccessFactory(document=doc3, user=user) - - accesses = { - str(doc1.path): {"users": [user.sub]}, - str(doc2.path): {"users": [user.sub]}, - str(doc3.path): {"users": [user.sub]}, - } - - data = [call.args[0] for call in mock_push.call_args_list] - - indexer = SearchIndexer() - - assert sorted(data, key=itemgetter("id")) == sorted( - [ - indexer.serialize_document(doc1, accesses), - indexer.serialize_document(doc2, accesses), - indexer.serialize_document(doc3, accesses), - ], - key=itemgetter("id"), - ) - - # The throttle counters should be reset - assert cache.get(f"doc-indexer-throttle-{doc1.pk}") is None - assert cache.get(f"doc-indexer-throttle-{doc2.pk}") is None - assert cache.get(f"doc-indexer-throttle-{doc3.pk}") is None - - -@mock.patch.object(SearchIndexer, "push") -@pytest.mark.django_db(transaction=True) -def test_models_documents_post_save_indexer_deleted(mock_push, indexer_settings): - """Indexation task on deleted or ancestor_deleted documents""" - indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 - - user = factories.UserFactory() - - with transaction.atomic(): - doc = factories.DocumentFactory( - link_reach=models.LinkReachChoices.AUTHENTICATED - ) - doc_deleted = factories.DocumentFactory( - link_reach=models.LinkReachChoices.AUTHENTICATED - ) - doc_ancestor_deleted = factories.DocumentFactory( - parent=doc_deleted, - link_reach=models.LinkReachChoices.AUTHENTICATED, - ) - doc_deleted.soft_delete() - doc_ancestor_deleted.ancestors_deleted_at = doc_deleted.deleted_at - - factories.UserDocumentAccessFactory(document=doc, user=user) - factories.UserDocumentAccessFactory(document=doc_deleted, user=user) - factories.UserDocumentAccessFactory(document=doc_ancestor_deleted, user=user) - - doc_deleted.refresh_from_db() - doc_ancestor_deleted.refresh_from_db() - - assert doc_deleted.deleted_at is not None - assert doc_deleted.ancestors_deleted_at is not None - - assert doc_ancestor_deleted.deleted_at is None - assert doc_ancestor_deleted.ancestors_deleted_at is not None - - accesses = { - str(doc.path): {"users": [user.sub]}, - str(doc_deleted.path): {"users": [user.sub]}, - str(doc_ancestor_deleted.path): {"users": [user.sub]}, - } - - data = [call.args[0] for call in mock_push.call_args_list] - - indexer = SearchIndexer() - - # Even deleted document are re-indexed : only update their status in the future ? - assert sorted(data, key=itemgetter("id")) == sorted( - [ - indexer.serialize_document(doc, accesses), - indexer.serialize_document(doc_deleted, accesses), - indexer.serialize_document(doc_ancestor_deleted, accesses), - indexer.serialize_document(doc_deleted, accesses), # soft_delete() - ], - key=itemgetter("id"), - ) - - # The throttle counters should be reset - assert cache.get(f"doc-indexer-throttle-{doc.pk}") is None - assert cache.get(f"doc-indexer-throttle-{doc_deleted.pk}") is None - assert cache.get(f"doc-indexer-throttle-{doc_ancestor_deleted.pk}") is None - - -@pytest.mark.django_db(transaction=True) -def test_models_documents_indexer_hard_deleted(indexer_settings): - """Indexation task on hard deleted document""" - indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 - - user = factories.UserFactory() - - with transaction.atomic(): - doc = factories.DocumentFactory( - link_reach=models.LinkReachChoices.AUTHENTICATED - ) - factories.UserDocumentAccessFactory(document=doc, user=user) - - doc_id = doc.pk - doc.delete() - - # Call task on deleted document. - document_indexer_task.apply(args=[doc_id]) - - with mock.patch.object(SearchIndexer, "push") as mock_push: - # Hard delete document are not re-indexed. - assert mock_push.assert_not_called - - -@mock.patch.object(SearchIndexer, "push") -@pytest.mark.django_db(transaction=True) -def test_models_documents_post_save_indexer_restored(mock_push, indexer_settings): - """Restart indexation task on restored documents""" - indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 - - user = factories.UserFactory() - - with transaction.atomic(): - doc = factories.DocumentFactory( - link_reach=models.LinkReachChoices.AUTHENTICATED - ) - doc_deleted = factories.DocumentFactory( - link_reach=models.LinkReachChoices.AUTHENTICATED - ) - doc_ancestor_deleted = factories.DocumentFactory( - parent=doc_deleted, - link_reach=models.LinkReachChoices.AUTHENTICATED, - ) - doc_deleted.soft_delete() - doc_ancestor_deleted.ancestors_deleted_at = doc_deleted.deleted_at - - factories.UserDocumentAccessFactory(document=doc, user=user) - factories.UserDocumentAccessFactory(document=doc_deleted, user=user) - factories.UserDocumentAccessFactory(document=doc_ancestor_deleted, user=user) - - doc_deleted.refresh_from_db() - doc_ancestor_deleted.refresh_from_db() - - assert doc_deleted.deleted_at is not None - assert doc_deleted.ancestors_deleted_at is not None - - assert doc_ancestor_deleted.deleted_at is None - assert doc_ancestor_deleted.ancestors_deleted_at is not None - - doc_restored = models.Document.objects.get(pk=doc_deleted.pk) - doc_restored.restore() - - doc_ancestor_restored = models.Document.objects.get(pk=doc_ancestor_deleted.pk) - - assert doc_restored.deleted_at is None - assert doc_restored.ancestors_deleted_at is None - - assert doc_ancestor_restored.deleted_at is None - assert doc_ancestor_restored.ancestors_deleted_at is None - - accesses = { - str(doc.path): {"users": [user.sub]}, - str(doc_deleted.path): {"users": [user.sub]}, - str(doc_ancestor_deleted.path): {"users": [user.sub]}, - } - - data = [call.args[0] for call in mock_push.call_args_list] - - indexer = SearchIndexer() - - # All docs are re-indexed - assert sorted(data, key=itemgetter("id")) == sorted( - [ - indexer.serialize_document(doc, accesses), - indexer.serialize_document(doc_deleted, accesses), - indexer.serialize_document(doc_deleted, accesses), # soft_delete() - indexer.serialize_document(doc_restored, accesses), # restore() - indexer.serialize_document(doc_ancestor_deleted, accesses), - ], - key=itemgetter("id"), - ) - - -@pytest.mark.django_db(transaction=True) -def test_models_documents_post_save_indexer_throttle(indexer_settings): - """Test indexation task skipping on document update""" - indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 - - indexer = SearchIndexer() - user = factories.UserFactory() - - with mock.patch.object(SearchIndexer, "push"): - with transaction.atomic(): - doc = factories.DocumentFactory() - factories.UserDocumentAccessFactory(document=doc, user=user) - - accesses = { - str(doc.path): {"users": [user.sub]}, - } - - with mock.patch.object(SearchIndexer, "push") as mock_push: - # Simulate 1 running task - cache.set(f"doc-indexer-throttle-{doc.pk}", 1) - - # save doc to trigger the indexer, but nothing should be done since - # the flag is up - with transaction.atomic(): - doc.save() - - assert [call.args[0] for call in mock_push.call_args_list] == [] - - with mock.patch.object(SearchIndexer, "push") as mock_push: - # No waiting task - cache.delete(f"doc-indexer-throttle-{doc.pk}") - - with transaction.atomic(): - doc = models.Document.objects.get(pk=doc.pk) - doc.save() - - assert [call.args[0] for call in mock_push.call_args_list] == [ - indexer.serialize_document(doc, accesses), - ] - - -@pytest.mark.django_db(transaction=True) -def test_models_documents_access_post_save_indexer(indexer_settings): - """Test indexation task on DocumentAccess update""" - indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 - - indexer = SearchIndexer() - user = factories.UserFactory() - - with mock.patch.object(SearchIndexer, "push"): - with transaction.atomic(): - doc = factories.DocumentFactory() - doc_access = factories.UserDocumentAccessFactory(document=doc, user=user) - - accesses = { - str(doc.path): {"users": [user.sub]}, - } - - indexer = SearchIndexer() - - with mock.patch.object(SearchIndexer, "push") as mock_push: - with transaction.atomic(): - doc_access.save() - - assert [call.args[0] for call in mock_push.call_args_list] == [ - indexer.serialize_document(doc, accesses), - ] diff --git a/src/backend/core/tests/test_models_documents_indexer.py b/src/backend/core/tests/test_models_documents_indexer.py new file mode 100644 index 00000000..9e171f72 --- /dev/null +++ b/src/backend/core/tests/test_models_documents_indexer.py @@ -0,0 +1,441 @@ +""" +Unit tests for the Document model +""" +# pylint: disable=too-many-lines + +from operator import itemgetter +from unittest import mock + +from django.core.cache import cache +from django.db import transaction + +import pytest + +from core import factories, models +from core.services.search_indexers import SearchIndexer + +pytestmark = pytest.mark.django_db + + +def reset_batch_indexer_throttle(): + """Reset throttle flag""" + cache.delete("document-batch-indexer-throttle") + + +@pytest.fixture(autouse=True) +def reset_throttle(): + """Reset throttle flag before each test""" + reset_batch_indexer_throttle() + yield + reset_batch_indexer_throttle() + + +@mock.patch.object(SearchIndexer, "push") +@pytest.mark.usefixtures("indexer_settings") +@pytest.mark.django_db(transaction=True) +def test_models_documents_post_save_indexer(mock_push): + """Test indexation task on document creation""" + with transaction.atomic(): + doc1, doc2, doc3 = factories.DocumentFactory.create_batch(3) + + accesses = {} + data = [call.args[0] for call in mock_push.call_args_list] + + indexer = SearchIndexer() + + assert len(data) == 1 + + # One call + assert sorted(data[0], key=itemgetter("id")) == sorted( + [ + indexer.serialize_document(doc1, accesses), + indexer.serialize_document(doc2, accesses), + indexer.serialize_document(doc3, accesses), + ], + key=itemgetter("id"), + ) + + # The throttle counters should be reset + assert cache.get("document-batch-indexer-throttle") == 1 + + +@pytest.mark.django_db(transaction=True) +def test_models_documents_post_save_indexer_no_batches(indexer_settings): + """Test indexation task on doculment creation, no throttle""" + indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 + + with mock.patch.object(SearchIndexer, "push") as mock_push: + with transaction.atomic(): + doc1, doc2, doc3 = factories.DocumentFactory.create_batch(3) + + accesses = {} + data = [call.args[0] for call in mock_push.call_args_list] + + indexer = SearchIndexer() + + # 3 calls + assert len(data) == 3 + # one document per call + assert [len(d) for d in data] == [1] * 3 + # all documents are indexed + assert sorted([d[0] for d in data], key=itemgetter("id")) == sorted( + [ + indexer.serialize_document(doc1, accesses), + indexer.serialize_document(doc2, accesses), + indexer.serialize_document(doc3, accesses), + ], + key=itemgetter("id"), + ) + + # The throttle counters should be reset + assert cache.get("file-batch-indexer-throttle") is None + + +@mock.patch.object(SearchIndexer, "push") +@pytest.mark.django_db(transaction=True) +def test_models_documents_post_save_indexer_not_configured(mock_push, indexer_settings): + """Task should not start an indexation when disabled""" + indexer_settings.SEARCH_INDEXER_CLASS = None + + user = factories.UserFactory() + + with transaction.atomic(): + doc = factories.DocumentFactory() + factories.UserDocumentAccessFactory(document=doc, user=user) + + assert mock_push.assert_not_called + + +@mock.patch.object(SearchIndexer, "push") +@pytest.mark.django_db(transaction=True) +def test_models_documents_post_save_indexer_wrongly_configured( + mock_push, indexer_settings +): + """Task should not start an indexation when disabled""" + indexer_settings.SEARCH_INDEXER_URL = None + + user = factories.UserFactory() + + with transaction.atomic(): + doc = factories.DocumentFactory() + factories.UserDocumentAccessFactory(document=doc, user=user) + + assert mock_push.assert_not_called + + +@mock.patch.object(SearchIndexer, "push") +@pytest.mark.usefixtures("indexer_settings") +@pytest.mark.django_db(transaction=True) +def test_models_documents_post_save_indexer_with_accesses(mock_push): + """Test indexation task on document creation""" + user = factories.UserFactory() + + with transaction.atomic(): + doc1, doc2, doc3 = factories.DocumentFactory.create_batch(3) + + factories.UserDocumentAccessFactory(document=doc1, user=user) + factories.UserDocumentAccessFactory(document=doc2, user=user) + factories.UserDocumentAccessFactory(document=doc3, user=user) + + accesses = { + str(doc1.path): {"users": [user.sub]}, + str(doc2.path): {"users": [user.sub]}, + str(doc3.path): {"users": [user.sub]}, + } + + data = [call.args[0] for call in mock_push.call_args_list] + + indexer = SearchIndexer() + + assert len(data) == 1 + assert sorted(data[0], key=itemgetter("id")) == sorted( + [ + indexer.serialize_document(doc1, accesses), + indexer.serialize_document(doc2, accesses), + indexer.serialize_document(doc3, accesses), + ], + key=itemgetter("id"), + ) + + +@mock.patch.object(SearchIndexer, "push") +@pytest.mark.usefixtures("indexer_settings") +@pytest.mark.django_db(transaction=True) +def test_models_documents_post_save_indexer_deleted(mock_push): + """Indexation task on deleted or ancestor_deleted documents""" + user = factories.UserFactory() + + with transaction.atomic(): + doc = factories.DocumentFactory( + link_reach=models.LinkReachChoices.AUTHENTICATED + ) + main_doc = factories.DocumentFactory( + link_reach=models.LinkReachChoices.AUTHENTICATED + ) + child_doc = factories.DocumentFactory( + parent=main_doc, + link_reach=models.LinkReachChoices.AUTHENTICATED, + ) + + factories.UserDocumentAccessFactory(document=doc, user=user) + factories.UserDocumentAccessFactory(document=main_doc, user=user) + factories.UserDocumentAccessFactory(document=child_doc, user=user) + + # Manually reset the throttle flag here or the next indexation will be ignored for 1 second + reset_batch_indexer_throttle() + + with transaction.atomic(): + main_doc_deleted = models.Document.objects.get(pk=main_doc.pk) + main_doc_deleted.soft_delete() + + child_doc_deleted = models.Document.objects.get(pk=child_doc.pk) + + main_doc_deleted.refresh_from_db() + child_doc_deleted.refresh_from_db() + + assert main_doc_deleted.deleted_at is not None + assert child_doc_deleted.ancestors_deleted_at is not None + + assert child_doc_deleted.deleted_at is None + assert child_doc_deleted.ancestors_deleted_at is not None + + accesses = { + str(doc.path): {"users": [user.sub]}, + str(main_doc_deleted.path): {"users": [user.sub]}, + str(child_doc_deleted.path): {"users": [user.sub]}, + } + + data = [call.args[0] for call in mock_push.call_args_list] + + indexer = SearchIndexer() + + assert len(data) == 2 + + # First indexation on document creation + assert sorted(data[0], key=itemgetter("id")) == sorted( + [ + indexer.serialize_document(doc, accesses), + indexer.serialize_document(main_doc, accesses), + indexer.serialize_document(child_doc, accesses), + ], + key=itemgetter("id"), + ) + + # Even deleted items are re-indexed : only update their status in the future + assert sorted(data[1], key=itemgetter("id")) == sorted( + [ + indexer.serialize_document(main_doc_deleted, accesses), # soft_delete() + indexer.serialize_document(child_doc_deleted, accesses), + ], + key=itemgetter("id"), + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.usefixtures("indexer_settings") +def test_models_documents_indexer_hard_deleted(): + """Indexation task on hard deleted document""" + user = factories.UserFactory() + + with transaction.atomic(): + doc = factories.DocumentFactory( + link_reach=models.LinkReachChoices.AUTHENTICATED + ) + factories.UserDocumentAccessFactory(document=doc, user=user) + + # Call task on deleted document. + with mock.patch.object(SearchIndexer, "push") as mock_push: + doc.delete() + + # Hard delete document are not re-indexed. + assert mock_push.assert_not_called + + +@mock.patch.object(SearchIndexer, "push") +@pytest.mark.usefixtures("indexer_settings") +@pytest.mark.django_db(transaction=True) +def test_models_documents_post_save_indexer_restored(mock_push): + """Restart indexation task on restored documents""" + user = factories.UserFactory() + + with transaction.atomic(): + doc = factories.DocumentFactory( + link_reach=models.LinkReachChoices.AUTHENTICATED + ) + doc_deleted = factories.DocumentFactory( + link_reach=models.LinkReachChoices.AUTHENTICATED + ) + doc_ancestor_deleted = factories.DocumentFactory( + parent=doc_deleted, + link_reach=models.LinkReachChoices.AUTHENTICATED, + ) + + factories.UserDocumentAccessFactory(document=doc, user=user) + factories.UserDocumentAccessFactory(document=doc_deleted, user=user) + factories.UserDocumentAccessFactory(document=doc_ancestor_deleted, user=user) + + doc_deleted.soft_delete() + + doc_deleted.refresh_from_db() + doc_ancestor_deleted.refresh_from_db() + + assert doc_deleted.deleted_at is not None + assert doc_deleted.ancestors_deleted_at is not None + + assert doc_ancestor_deleted.deleted_at is None + assert doc_ancestor_deleted.ancestors_deleted_at is not None + + # Manually reset the throttle flag here or the next indexation will be ignored for 1 second + reset_batch_indexer_throttle() + + with transaction.atomic(): + doc_restored = models.Document.objects.get(pk=doc_deleted.pk) + doc_restored.restore() + + doc_ancestor_restored = models.Document.objects.get(pk=doc_ancestor_deleted.pk) + + assert doc_restored.deleted_at is None + assert doc_restored.ancestors_deleted_at is None + + assert doc_ancestor_restored.deleted_at is None + assert doc_ancestor_restored.ancestors_deleted_at is None + + accesses = { + str(doc.path): {"users": [user.sub]}, + str(doc_deleted.path): {"users": [user.sub]}, + str(doc_ancestor_deleted.path): {"users": [user.sub]}, + } + + data = [call.args[0] for call in mock_push.call_args_list] + + indexer = SearchIndexer() + + # All docs are re-indexed + assert len(data) == 2 + + # First indexation on items creation & soft delete (in the same transaction) + assert sorted(data[0], key=itemgetter("id")) == sorted( + [ + indexer.serialize_document(doc, accesses), + indexer.serialize_document(doc_deleted, accesses), + indexer.serialize_document(doc_ancestor_deleted, accesses), + ], + key=itemgetter("id"), + ) + + # Restored items are re-indexed : only update their status in the future + assert sorted(data[1], key=itemgetter("id")) == sorted( + [ + indexer.serialize_document(doc_restored, accesses), # restore() + indexer.serialize_document(doc_ancestor_restored, accesses), + ], + key=itemgetter("id"), + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.usefixtures("indexer_settings") +def test_models_documents_post_save_indexer_throttle(): + """Test indexation task skipping on document update""" + indexer = SearchIndexer() + user = factories.UserFactory() + + with mock.patch.object(SearchIndexer, "push"): + with transaction.atomic(): + docs = factories.DocumentFactory.create_batch(5, users=(user,)) + + accesses = {str(item.path): {"users": [user.sub]} for item in docs} + + with mock.patch.object(SearchIndexer, "push") as mock_push: + # Simulate 1 running task + cache.set("document-batch-indexer-throttle", 1) + + # save doc to trigger the indexer, but nothing should be done since + # the flag is up + with transaction.atomic(): + docs[0].save() + docs[2].save() + docs[3].save() + + assert [call.args[0] for call in mock_push.call_args_list] == [] + + with mock.patch.object(SearchIndexer, "push") as mock_push: + # No waiting task + cache.delete("document-batch-indexer-throttle") + + with transaction.atomic(): + docs[0].save() + docs[2].save() + docs[3].save() + + data = [call.args[0] for call in mock_push.call_args_list] + + # One call + assert len(data) == 1 + + assert sorted(data[0], key=itemgetter("id")) == sorted( + [ + indexer.serialize_document(docs[0], accesses), + indexer.serialize_document(docs[2], accesses), + indexer.serialize_document(docs[3], accesses), + ], + key=itemgetter("id"), + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.usefixtures("indexer_settings") +def test_models_documents_access_post_save_indexer(): + """Test indexation task on DocumentAccess update""" + users = factories.UserFactory.create_batch(3) + + with mock.patch.object(SearchIndexer, "push"): + with transaction.atomic(): + doc = factories.DocumentFactory(users=users) + doc_accesses = models.DocumentAccess.objects.filter(document=doc).order_by( + "user__sub" + ) + + reset_batch_indexer_throttle() + + with mock.patch.object(SearchIndexer, "push") as mock_push: + with transaction.atomic(): + for doc_access in doc_accesses: + doc_access.save() + + data = [call.args[0] for call in mock_push.call_args_list] + + # One call + assert len(data) == 1 + + assert [d["id"] for d in data[0]] == [str(doc.pk)] + + +@pytest.mark.django_db(transaction=True) +def test_models_items_access_post_save_indexer_no_throttle(indexer_settings): + """Test indexation task on ItemAccess update, no throttle""" + indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0 + + users = factories.UserFactory.create_batch(3) + + with transaction.atomic(): + doc = factories.DocumentFactory(users=users) + doc_accesses = models.DocumentAccess.objects.filter(document=doc).order_by( + "user__sub" + ) + + reset_batch_indexer_throttle() + + with mock.patch.object(SearchIndexer, "push") as mock_push: + with transaction.atomic(): + for doc_access in doc_accesses: + doc_access.save() + + data = [call.args[0] for call in mock_push.call_args_list] + + # 3 calls + assert len(data) == 3 + # one document per call + assert [len(d) for d in data] == [1] * 3 + # the same document is indexed 3 times + assert [d[0]["id"] for d in data] == [str(doc.pk)] * 3