(backend) use batches in indexing task

Reduce the number of Find API calls by grouping all the latest changes
for indexation : send all the documents updated or deleted since the
triggering of the task.

Signed-off-by: Fabre Florian <ffabre@hybird.org>
This commit is contained in:
Fabre Florian
2025-10-31 17:06:46 +01:00
committed by Quentin BEY
parent 65d572ccd6
commit 780bcb360a
7 changed files with 506 additions and 378 deletions

View File

@@ -906,7 +906,8 @@ class Document(MP_Node, BaseModel):
# Mark all descendants as soft deleted # Mark all descendants as soft deleted
self.get_descendants().filter(ancestors_deleted_at__isnull=True).update( self.get_descendants().filter(ancestors_deleted_at__isnull=True).update(
ancestors_deleted_at=self.ancestors_deleted_at ancestors_deleted_at=self.ancestors_deleted_at,
updated_at=self.updated_at,
) )
@transaction.atomic @transaction.atomic

View File

@@ -130,16 +130,17 @@ class BaseDocumentIndexer(ABC):
"SEARCH_INDEXER_QUERY_URL must be set in Django settings." "SEARCH_INDEXER_QUERY_URL must be set in Django settings."
) )
def index(self): def index(self, queryset=None):
""" """
Fetch documents in batches, serialize them, and push to the search backend. Fetch documents in batches, serialize them, and push to the search backend.
""" """
last_id = 0 last_id = 0
count = 0 count = 0
queryset = queryset or models.Document.objects.all()
while True: while True:
documents_batch = list( documents_batch = list(
models.Document.objects.filter( queryset.filter(
id__gt=last_id, id__gt=last_id,
).order_by("id")[: self.batch_size] ).order_by("id")[: self.batch_size]
) )

View File

@@ -9,7 +9,7 @@ from django.db.models import signals
from django.dispatch import receiver from django.dispatch import receiver
from . import models from . import models
from .tasks.search import trigger_document_indexer from .tasks.search import trigger_batch_document_indexer
@receiver(signals.post_save, sender=models.Document) @receiver(signals.post_save, sender=models.Document)
@@ -19,7 +19,7 @@ def document_post_save(sender, instance, **kwargs): # pylint: disable=unused-ar
Note : Within the transaction we can have an empty content and a serialization Note : Within the transaction we can have an empty content and a serialization
error. error.
""" """
transaction.on_commit(partial(trigger_document_indexer, instance)) transaction.on_commit(partial(trigger_batch_document_indexer, instance))
@receiver(signals.post_save, sender=models.DocumentAccess) @receiver(signals.post_save, sender=models.DocumentAccess)
@@ -28,4 +28,6 @@ def document_access_post_save(sender, instance, created, **kwargs): # pylint: d
Asynchronous call to the document indexer at the end of the transaction. Asynchronous call to the document indexer at the end of the transaction.
""" """
if not created: if not created:
transaction.on_commit(partial(trigger_document_indexer, instance.document)) transaction.on_commit(
partial(trigger_batch_document_indexer, instance.document)
)

View File

@@ -4,12 +4,12 @@ from logging import getLogger
from django.conf import settings from django.conf import settings
from django.core.cache import cache from django.core.cache import cache
from django.db.models import Q
from django_redis.cache import RedisCache from django_redis.cache import RedisCache
from core import models from core import models
from core.services.search_indexers import ( from core.services.search_indexers import (
get_batch_accesses_by_users_and_teams,
get_document_indexer, get_document_indexer,
) )
@@ -18,23 +18,6 @@ from impress.celery_app import app
logger = getLogger(__file__) logger = getLogger(__file__)
def indexer_throttle_acquire(document_id, timeout=0, atomic=True):
"""
Enable the task throttle flag for a delay.
Uses redis locks if available to ensure atomic changes
"""
key = f"doc-indexer-throttle-{document_id}"
if isinstance(cache, RedisCache) and atomic:
with cache.locks(key):
return indexer_throttle_acquire(document_id, timeout, atomic=False)
# Use add() here :
# - set the flag and returns true if not exist
# - do nothing and return false if exist
return cache.add(key, 1, timeout=timeout)
@app.task @app.task
def document_indexer_task(document_id): def document_indexer_task(document_id):
"""Celery Task : Sends indexation query for a document.""" """Celery Task : Sends indexation query for a document."""
@@ -43,42 +26,72 @@ def document_indexer_task(document_id):
if indexer is None: if indexer is None:
return return
try:
doc = models.Document.objects.get(pk=document_id)
except models.Document.DoesNotExist:
# Skip the task if the document does not exist.
return
accesses = get_batch_accesses_by_users_and_teams((doc.path,))
data = indexer.serialize_document(document=doc, accesses=accesses)
logger.info("Start document %s indexation", document_id) logger.info("Start document %s indexation", document_id)
indexer.push(data) indexer.index(models.Document.objects.filter(pk=document_id))
def trigger_document_indexer(document): def batch_indexer_throttle_acquire(timeout: int = 0, atomic: bool = True):
"""
Enable the task throttle flag for a delay.
Uses redis locks if available to ensure atomic changes
"""
key = "document-batch-indexer-throttle"
# Redis is used as cache database (not in tests). Use the lock feature here
# to ensure atomicity of changes to the throttle flag.
if isinstance(cache, RedisCache) and atomic:
with cache.locks(key):
return batch_indexer_throttle_acquire(timeout, atomic=False)
# Use add() here :
# - set the flag and returns true if not exist
# - do nothing and return false if exist
return cache.add(key, 1, timeout=timeout)
@app.task
def batch_document_indexer_task(timestamp):
"""Celery Task : Sends indexation query for a batch of documents."""
indexer = get_document_indexer()
if indexer:
queryset = models.Document.objects.filter(
Q(updated_at__gte=timestamp)
| Q(deleted_at__gte=timestamp)
| Q(ancestors_deleted_at__gte=timestamp)
)
count = indexer.index(queryset)
logger.info("Indexed %d documents", count)
def trigger_batch_document_indexer(item):
""" """
Trigger indexation task with debounce a delay set by the SEARCH_INDEXER_COUNTDOWN setting. Trigger indexation task with debounce a delay set by the SEARCH_INDEXER_COUNTDOWN setting.
Args: Args:
document (Document): The document instance. document (Document): The document instance.
""" """
countdown = settings.SEARCH_INDEXER_COUNTDOWN countdown = int(settings.SEARCH_INDEXER_COUNTDOWN)
# DO NOT create a task if indexation if disabled # DO NOT create a task if indexation if disabled
if not settings.SEARCH_INDEXER_CLASS: if not settings.SEARCH_INDEXER_CLASS:
return return
# Each time this method is called during a countdown, we increment the if countdown > 0:
# counter and each task decrease it, so the index be run only once. # Each time this method is called during a countdown, we increment the
if indexer_throttle_acquire(document.pk, timeout=countdown): # counter and each task decrease it, so the index be run only once.
logger.info( if batch_indexer_throttle_acquire(timeout=countdown):
"Add task for document %s indexation in %.2f seconds", logger.info(
document.pk, "Add task for batch document indexation from updated_at=%s in %d seconds",
countdown, item.updated_at.isoformat(),
) countdown,
)
document_indexer_task.apply_async(args=[document.pk]) batch_document_indexer_task.apply_async(
args=[item.updated_at], countdown=countdown
)
else:
logger.info("Skip task for batch document %s indexation", item.pk)
else: else:
logger.info("Skip task for document %s indexation", document.pk) document_indexer_task.apply(args=[item.pk])

View File

@@ -45,6 +45,7 @@ def indexer_settings_fixture(settings):
settings.SEARCH_INDEXER_QUERY_URL = ( settings.SEARCH_INDEXER_QUERY_URL = (
"http://localhost:8081/api/v1.0/documents/search/" "http://localhost:8081/api/v1.0/documents/search/"
) )
settings.SEARCH_INDEXER_COUNTDOWN = 1
yield settings yield settings

View File

@@ -6,7 +6,6 @@ Unit tests for the Document model
import random import random
import smtplib import smtplib
from logging import Logger from logging import Logger
from operator import itemgetter
from unittest import mock from unittest import mock
from django.contrib.auth.models import AnonymousUser from django.contrib.auth.models import AnonymousUser
@@ -14,15 +13,12 @@ from django.core import mail
from django.core.cache import cache from django.core.cache import cache
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.core.files.storage import default_storage from django.core.files.storage import default_storage
from django.db import transaction
from django.test.utils import override_settings from django.test.utils import override_settings
from django.utils import timezone from django.utils import timezone
import pytest import pytest
from core import factories, models from core import factories, models
from core.services.search_indexers import SearchIndexer
from core.tasks.search import document_indexer_task
pytestmark = pytest.mark.django_db pytestmark = pytest.mark.django_db
@@ -1621,330 +1617,3 @@ def test_models_documents_compute_ancestors_links_paths_mapping_structure(
{"link_reach": sibling.link_reach, "link_role": sibling.link_role}, {"link_reach": sibling.link_reach, "link_role": sibling.link_role},
], ],
} }
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer(mock_push, indexer_settings):
"""Test indexation task on document creation"""
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
with transaction.atomic():
doc1, doc2, doc3 = factories.DocumentFactory.create_batch(3)
accesses = {}
data = [call.args[0] for call in mock_push.call_args_list]
indexer = SearchIndexer()
assert sorted(data, key=itemgetter("id")) == sorted(
[
indexer.serialize_document(doc1, accesses),
indexer.serialize_document(doc2, accesses),
indexer.serialize_document(doc3, accesses),
],
key=itemgetter("id"),
)
# The throttle counters should be reset
assert cache.get(f"doc-indexer-throttle-{doc1.pk}") is None
assert cache.get(f"doc-indexer-throttle-{doc2.pk}") is None
assert cache.get(f"doc-indexer-throttle-{doc3.pk}") is None
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_not_configured(mock_push, indexer_settings):
"""Task should not start an indexation when disabled"""
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
indexer_settings.SEARCH_INDEXER_CLASS = None
user = factories.UserFactory()
with transaction.atomic():
doc = factories.DocumentFactory()
factories.UserDocumentAccessFactory(document=doc, user=user)
assert mock_push.assert_not_called
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_wrongly_configured(
mock_push, indexer_settings
):
"""Task should not start an indexation when disabled"""
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
indexer_settings.SEARCH_INDEXER_URL = None
user = factories.UserFactory()
with transaction.atomic():
doc = factories.DocumentFactory()
factories.UserDocumentAccessFactory(document=doc, user=user)
assert mock_push.assert_not_called
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_with_accesses(mock_push, indexer_settings):
"""Test indexation task on document creation"""
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
user = factories.UserFactory()
with transaction.atomic():
doc1, doc2, doc3 = factories.DocumentFactory.create_batch(3)
factories.UserDocumentAccessFactory(document=doc1, user=user)
factories.UserDocumentAccessFactory(document=doc2, user=user)
factories.UserDocumentAccessFactory(document=doc3, user=user)
accesses = {
str(doc1.path): {"users": [user.sub]},
str(doc2.path): {"users": [user.sub]},
str(doc3.path): {"users": [user.sub]},
}
data = [call.args[0] for call in mock_push.call_args_list]
indexer = SearchIndexer()
assert sorted(data, key=itemgetter("id")) == sorted(
[
indexer.serialize_document(doc1, accesses),
indexer.serialize_document(doc2, accesses),
indexer.serialize_document(doc3, accesses),
],
key=itemgetter("id"),
)
# The throttle counters should be reset
assert cache.get(f"doc-indexer-throttle-{doc1.pk}") is None
assert cache.get(f"doc-indexer-throttle-{doc2.pk}") is None
assert cache.get(f"doc-indexer-throttle-{doc3.pk}") is None
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_deleted(mock_push, indexer_settings):
"""Indexation task on deleted or ancestor_deleted documents"""
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
user = factories.UserFactory()
with transaction.atomic():
doc = factories.DocumentFactory(
link_reach=models.LinkReachChoices.AUTHENTICATED
)
doc_deleted = factories.DocumentFactory(
link_reach=models.LinkReachChoices.AUTHENTICATED
)
doc_ancestor_deleted = factories.DocumentFactory(
parent=doc_deleted,
link_reach=models.LinkReachChoices.AUTHENTICATED,
)
doc_deleted.soft_delete()
doc_ancestor_deleted.ancestors_deleted_at = doc_deleted.deleted_at
factories.UserDocumentAccessFactory(document=doc, user=user)
factories.UserDocumentAccessFactory(document=doc_deleted, user=user)
factories.UserDocumentAccessFactory(document=doc_ancestor_deleted, user=user)
doc_deleted.refresh_from_db()
doc_ancestor_deleted.refresh_from_db()
assert doc_deleted.deleted_at is not None
assert doc_deleted.ancestors_deleted_at is not None
assert doc_ancestor_deleted.deleted_at is None
assert doc_ancestor_deleted.ancestors_deleted_at is not None
accesses = {
str(doc.path): {"users": [user.sub]},
str(doc_deleted.path): {"users": [user.sub]},
str(doc_ancestor_deleted.path): {"users": [user.sub]},
}
data = [call.args[0] for call in mock_push.call_args_list]
indexer = SearchIndexer()
# Even deleted document are re-indexed : only update their status in the future ?
assert sorted(data, key=itemgetter("id")) == sorted(
[
indexer.serialize_document(doc, accesses),
indexer.serialize_document(doc_deleted, accesses),
indexer.serialize_document(doc_ancestor_deleted, accesses),
indexer.serialize_document(doc_deleted, accesses), # soft_delete()
],
key=itemgetter("id"),
)
# The throttle counters should be reset
assert cache.get(f"doc-indexer-throttle-{doc.pk}") is None
assert cache.get(f"doc-indexer-throttle-{doc_deleted.pk}") is None
assert cache.get(f"doc-indexer-throttle-{doc_ancestor_deleted.pk}") is None
@pytest.mark.django_db(transaction=True)
def test_models_documents_indexer_hard_deleted(indexer_settings):
"""Indexation task on hard deleted document"""
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
user = factories.UserFactory()
with transaction.atomic():
doc = factories.DocumentFactory(
link_reach=models.LinkReachChoices.AUTHENTICATED
)
factories.UserDocumentAccessFactory(document=doc, user=user)
doc_id = doc.pk
doc.delete()
# Call task on deleted document.
document_indexer_task.apply(args=[doc_id])
with mock.patch.object(SearchIndexer, "push") as mock_push:
# Hard delete document are not re-indexed.
assert mock_push.assert_not_called
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_restored(mock_push, indexer_settings):
"""Restart indexation task on restored documents"""
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
user = factories.UserFactory()
with transaction.atomic():
doc = factories.DocumentFactory(
link_reach=models.LinkReachChoices.AUTHENTICATED
)
doc_deleted = factories.DocumentFactory(
link_reach=models.LinkReachChoices.AUTHENTICATED
)
doc_ancestor_deleted = factories.DocumentFactory(
parent=doc_deleted,
link_reach=models.LinkReachChoices.AUTHENTICATED,
)
doc_deleted.soft_delete()
doc_ancestor_deleted.ancestors_deleted_at = doc_deleted.deleted_at
factories.UserDocumentAccessFactory(document=doc, user=user)
factories.UserDocumentAccessFactory(document=doc_deleted, user=user)
factories.UserDocumentAccessFactory(document=doc_ancestor_deleted, user=user)
doc_deleted.refresh_from_db()
doc_ancestor_deleted.refresh_from_db()
assert doc_deleted.deleted_at is not None
assert doc_deleted.ancestors_deleted_at is not None
assert doc_ancestor_deleted.deleted_at is None
assert doc_ancestor_deleted.ancestors_deleted_at is not None
doc_restored = models.Document.objects.get(pk=doc_deleted.pk)
doc_restored.restore()
doc_ancestor_restored = models.Document.objects.get(pk=doc_ancestor_deleted.pk)
assert doc_restored.deleted_at is None
assert doc_restored.ancestors_deleted_at is None
assert doc_ancestor_restored.deleted_at is None
assert doc_ancestor_restored.ancestors_deleted_at is None
accesses = {
str(doc.path): {"users": [user.sub]},
str(doc_deleted.path): {"users": [user.sub]},
str(doc_ancestor_deleted.path): {"users": [user.sub]},
}
data = [call.args[0] for call in mock_push.call_args_list]
indexer = SearchIndexer()
# All docs are re-indexed
assert sorted(data, key=itemgetter("id")) == sorted(
[
indexer.serialize_document(doc, accesses),
indexer.serialize_document(doc_deleted, accesses),
indexer.serialize_document(doc_deleted, accesses), # soft_delete()
indexer.serialize_document(doc_restored, accesses), # restore()
indexer.serialize_document(doc_ancestor_deleted, accesses),
],
key=itemgetter("id"),
)
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_throttle(indexer_settings):
"""Test indexation task skipping on document update"""
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
indexer = SearchIndexer()
user = factories.UserFactory()
with mock.patch.object(SearchIndexer, "push"):
with transaction.atomic():
doc = factories.DocumentFactory()
factories.UserDocumentAccessFactory(document=doc, user=user)
accesses = {
str(doc.path): {"users": [user.sub]},
}
with mock.patch.object(SearchIndexer, "push") as mock_push:
# Simulate 1 running task
cache.set(f"doc-indexer-throttle-{doc.pk}", 1)
# save doc to trigger the indexer, but nothing should be done since
# the flag is up
with transaction.atomic():
doc.save()
assert [call.args[0] for call in mock_push.call_args_list] == []
with mock.patch.object(SearchIndexer, "push") as mock_push:
# No waiting task
cache.delete(f"doc-indexer-throttle-{doc.pk}")
with transaction.atomic():
doc = models.Document.objects.get(pk=doc.pk)
doc.save()
assert [call.args[0] for call in mock_push.call_args_list] == [
indexer.serialize_document(doc, accesses),
]
@pytest.mark.django_db(transaction=True)
def test_models_documents_access_post_save_indexer(indexer_settings):
"""Test indexation task on DocumentAccess update"""
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
indexer = SearchIndexer()
user = factories.UserFactory()
with mock.patch.object(SearchIndexer, "push"):
with transaction.atomic():
doc = factories.DocumentFactory()
doc_access = factories.UserDocumentAccessFactory(document=doc, user=user)
accesses = {
str(doc.path): {"users": [user.sub]},
}
indexer = SearchIndexer()
with mock.patch.object(SearchIndexer, "push") as mock_push:
with transaction.atomic():
doc_access.save()
assert [call.args[0] for call in mock_push.call_args_list] == [
indexer.serialize_document(doc, accesses),
]

View File

@@ -0,0 +1,441 @@
"""
Unit tests for the Document model
"""
# pylint: disable=too-many-lines
from operator import itemgetter
from unittest import mock
from django.core.cache import cache
from django.db import transaction
import pytest
from core import factories, models
from core.services.search_indexers import SearchIndexer
pytestmark = pytest.mark.django_db
def reset_batch_indexer_throttle():
"""Reset throttle flag"""
cache.delete("document-batch-indexer-throttle")
@pytest.fixture(autouse=True)
def reset_throttle():
"""Reset throttle flag before each test"""
reset_batch_indexer_throttle()
yield
reset_batch_indexer_throttle()
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.usefixtures("indexer_settings")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer(mock_push):
"""Test indexation task on document creation"""
with transaction.atomic():
doc1, doc2, doc3 = factories.DocumentFactory.create_batch(3)
accesses = {}
data = [call.args[0] for call in mock_push.call_args_list]
indexer = SearchIndexer()
assert len(data) == 1
# One call
assert sorted(data[0], key=itemgetter("id")) == sorted(
[
indexer.serialize_document(doc1, accesses),
indexer.serialize_document(doc2, accesses),
indexer.serialize_document(doc3, accesses),
],
key=itemgetter("id"),
)
# The throttle counters should be reset
assert cache.get("document-batch-indexer-throttle") == 1
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_no_batches(indexer_settings):
"""Test indexation task on doculment creation, no throttle"""
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
with mock.patch.object(SearchIndexer, "push") as mock_push:
with transaction.atomic():
doc1, doc2, doc3 = factories.DocumentFactory.create_batch(3)
accesses = {}
data = [call.args[0] for call in mock_push.call_args_list]
indexer = SearchIndexer()
# 3 calls
assert len(data) == 3
# one document per call
assert [len(d) for d in data] == [1] * 3
# all documents are indexed
assert sorted([d[0] for d in data], key=itemgetter("id")) == sorted(
[
indexer.serialize_document(doc1, accesses),
indexer.serialize_document(doc2, accesses),
indexer.serialize_document(doc3, accesses),
],
key=itemgetter("id"),
)
# The throttle counters should be reset
assert cache.get("file-batch-indexer-throttle") is None
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_not_configured(mock_push, indexer_settings):
"""Task should not start an indexation when disabled"""
indexer_settings.SEARCH_INDEXER_CLASS = None
user = factories.UserFactory()
with transaction.atomic():
doc = factories.DocumentFactory()
factories.UserDocumentAccessFactory(document=doc, user=user)
assert mock_push.assert_not_called
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_wrongly_configured(
mock_push, indexer_settings
):
"""Task should not start an indexation when disabled"""
indexer_settings.SEARCH_INDEXER_URL = None
user = factories.UserFactory()
with transaction.atomic():
doc = factories.DocumentFactory()
factories.UserDocumentAccessFactory(document=doc, user=user)
assert mock_push.assert_not_called
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.usefixtures("indexer_settings")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_with_accesses(mock_push):
"""Test indexation task on document creation"""
user = factories.UserFactory()
with transaction.atomic():
doc1, doc2, doc3 = factories.DocumentFactory.create_batch(3)
factories.UserDocumentAccessFactory(document=doc1, user=user)
factories.UserDocumentAccessFactory(document=doc2, user=user)
factories.UserDocumentAccessFactory(document=doc3, user=user)
accesses = {
str(doc1.path): {"users": [user.sub]},
str(doc2.path): {"users": [user.sub]},
str(doc3.path): {"users": [user.sub]},
}
data = [call.args[0] for call in mock_push.call_args_list]
indexer = SearchIndexer()
assert len(data) == 1
assert sorted(data[0], key=itemgetter("id")) == sorted(
[
indexer.serialize_document(doc1, accesses),
indexer.serialize_document(doc2, accesses),
indexer.serialize_document(doc3, accesses),
],
key=itemgetter("id"),
)
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.usefixtures("indexer_settings")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_deleted(mock_push):
"""Indexation task on deleted or ancestor_deleted documents"""
user = factories.UserFactory()
with transaction.atomic():
doc = factories.DocumentFactory(
link_reach=models.LinkReachChoices.AUTHENTICATED
)
main_doc = factories.DocumentFactory(
link_reach=models.LinkReachChoices.AUTHENTICATED
)
child_doc = factories.DocumentFactory(
parent=main_doc,
link_reach=models.LinkReachChoices.AUTHENTICATED,
)
factories.UserDocumentAccessFactory(document=doc, user=user)
factories.UserDocumentAccessFactory(document=main_doc, user=user)
factories.UserDocumentAccessFactory(document=child_doc, user=user)
# Manually reset the throttle flag here or the next indexation will be ignored for 1 second
reset_batch_indexer_throttle()
with transaction.atomic():
main_doc_deleted = models.Document.objects.get(pk=main_doc.pk)
main_doc_deleted.soft_delete()
child_doc_deleted = models.Document.objects.get(pk=child_doc.pk)
main_doc_deleted.refresh_from_db()
child_doc_deleted.refresh_from_db()
assert main_doc_deleted.deleted_at is not None
assert child_doc_deleted.ancestors_deleted_at is not None
assert child_doc_deleted.deleted_at is None
assert child_doc_deleted.ancestors_deleted_at is not None
accesses = {
str(doc.path): {"users": [user.sub]},
str(main_doc_deleted.path): {"users": [user.sub]},
str(child_doc_deleted.path): {"users": [user.sub]},
}
data = [call.args[0] for call in mock_push.call_args_list]
indexer = SearchIndexer()
assert len(data) == 2
# First indexation on document creation
assert sorted(data[0], key=itemgetter("id")) == sorted(
[
indexer.serialize_document(doc, accesses),
indexer.serialize_document(main_doc, accesses),
indexer.serialize_document(child_doc, accesses),
],
key=itemgetter("id"),
)
# Even deleted items are re-indexed : only update their status in the future
assert sorted(data[1], key=itemgetter("id")) == sorted(
[
indexer.serialize_document(main_doc_deleted, accesses), # soft_delete()
indexer.serialize_document(child_doc_deleted, accesses),
],
key=itemgetter("id"),
)
@pytest.mark.django_db(transaction=True)
@pytest.mark.usefixtures("indexer_settings")
def test_models_documents_indexer_hard_deleted():
"""Indexation task on hard deleted document"""
user = factories.UserFactory()
with transaction.atomic():
doc = factories.DocumentFactory(
link_reach=models.LinkReachChoices.AUTHENTICATED
)
factories.UserDocumentAccessFactory(document=doc, user=user)
# Call task on deleted document.
with mock.patch.object(SearchIndexer, "push") as mock_push:
doc.delete()
# Hard delete document are not re-indexed.
assert mock_push.assert_not_called
@mock.patch.object(SearchIndexer, "push")
@pytest.mark.usefixtures("indexer_settings")
@pytest.mark.django_db(transaction=True)
def test_models_documents_post_save_indexer_restored(mock_push):
"""Restart indexation task on restored documents"""
user = factories.UserFactory()
with transaction.atomic():
doc = factories.DocumentFactory(
link_reach=models.LinkReachChoices.AUTHENTICATED
)
doc_deleted = factories.DocumentFactory(
link_reach=models.LinkReachChoices.AUTHENTICATED
)
doc_ancestor_deleted = factories.DocumentFactory(
parent=doc_deleted,
link_reach=models.LinkReachChoices.AUTHENTICATED,
)
factories.UserDocumentAccessFactory(document=doc, user=user)
factories.UserDocumentAccessFactory(document=doc_deleted, user=user)
factories.UserDocumentAccessFactory(document=doc_ancestor_deleted, user=user)
doc_deleted.soft_delete()
doc_deleted.refresh_from_db()
doc_ancestor_deleted.refresh_from_db()
assert doc_deleted.deleted_at is not None
assert doc_deleted.ancestors_deleted_at is not None
assert doc_ancestor_deleted.deleted_at is None
assert doc_ancestor_deleted.ancestors_deleted_at is not None
# Manually reset the throttle flag here or the next indexation will be ignored for 1 second
reset_batch_indexer_throttle()
with transaction.atomic():
doc_restored = models.Document.objects.get(pk=doc_deleted.pk)
doc_restored.restore()
doc_ancestor_restored = models.Document.objects.get(pk=doc_ancestor_deleted.pk)
assert doc_restored.deleted_at is None
assert doc_restored.ancestors_deleted_at is None
assert doc_ancestor_restored.deleted_at is None
assert doc_ancestor_restored.ancestors_deleted_at is None
accesses = {
str(doc.path): {"users": [user.sub]},
str(doc_deleted.path): {"users": [user.sub]},
str(doc_ancestor_deleted.path): {"users": [user.sub]},
}
data = [call.args[0] for call in mock_push.call_args_list]
indexer = SearchIndexer()
# All docs are re-indexed
assert len(data) == 2
# First indexation on items creation & soft delete (in the same transaction)
assert sorted(data[0], key=itemgetter("id")) == sorted(
[
indexer.serialize_document(doc, accesses),
indexer.serialize_document(doc_deleted, accesses),
indexer.serialize_document(doc_ancestor_deleted, accesses),
],
key=itemgetter("id"),
)
# Restored items are re-indexed : only update their status in the future
assert sorted(data[1], key=itemgetter("id")) == sorted(
[
indexer.serialize_document(doc_restored, accesses), # restore()
indexer.serialize_document(doc_ancestor_restored, accesses),
],
key=itemgetter("id"),
)
@pytest.mark.django_db(transaction=True)
@pytest.mark.usefixtures("indexer_settings")
def test_models_documents_post_save_indexer_throttle():
"""Test indexation task skipping on document update"""
indexer = SearchIndexer()
user = factories.UserFactory()
with mock.patch.object(SearchIndexer, "push"):
with transaction.atomic():
docs = factories.DocumentFactory.create_batch(5, users=(user,))
accesses = {str(item.path): {"users": [user.sub]} for item in docs}
with mock.patch.object(SearchIndexer, "push") as mock_push:
# Simulate 1 running task
cache.set("document-batch-indexer-throttle", 1)
# save doc to trigger the indexer, but nothing should be done since
# the flag is up
with transaction.atomic():
docs[0].save()
docs[2].save()
docs[3].save()
assert [call.args[0] for call in mock_push.call_args_list] == []
with mock.patch.object(SearchIndexer, "push") as mock_push:
# No waiting task
cache.delete("document-batch-indexer-throttle")
with transaction.atomic():
docs[0].save()
docs[2].save()
docs[3].save()
data = [call.args[0] for call in mock_push.call_args_list]
# One call
assert len(data) == 1
assert sorted(data[0], key=itemgetter("id")) == sorted(
[
indexer.serialize_document(docs[0], accesses),
indexer.serialize_document(docs[2], accesses),
indexer.serialize_document(docs[3], accesses),
],
key=itemgetter("id"),
)
@pytest.mark.django_db(transaction=True)
@pytest.mark.usefixtures("indexer_settings")
def test_models_documents_access_post_save_indexer():
"""Test indexation task on DocumentAccess update"""
users = factories.UserFactory.create_batch(3)
with mock.patch.object(SearchIndexer, "push"):
with transaction.atomic():
doc = factories.DocumentFactory(users=users)
doc_accesses = models.DocumentAccess.objects.filter(document=doc).order_by(
"user__sub"
)
reset_batch_indexer_throttle()
with mock.patch.object(SearchIndexer, "push") as mock_push:
with transaction.atomic():
for doc_access in doc_accesses:
doc_access.save()
data = [call.args[0] for call in mock_push.call_args_list]
# One call
assert len(data) == 1
assert [d["id"] for d in data[0]] == [str(doc.pk)]
@pytest.mark.django_db(transaction=True)
def test_models_items_access_post_save_indexer_no_throttle(indexer_settings):
"""Test indexation task on ItemAccess update, no throttle"""
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
users = factories.UserFactory.create_batch(3)
with transaction.atomic():
doc = factories.DocumentFactory(users=users)
doc_accesses = models.DocumentAccess.objects.filter(document=doc).order_by(
"user__sub"
)
reset_batch_indexer_throttle()
with mock.patch.object(SearchIndexer, "push") as mock_push:
with transaction.atomic():
for doc_access in doc_accesses:
doc_access.save()
data = [call.args[0] for call in mock_push.call_args_list]
# 3 calls
assert len(data) == 3
# one document per call
assert [len(d) for d in data] == [1] * 3
# the same document is indexed 3 times
assert [d[0]["id"] for d in data] == [str(doc.pk)] * 3