✨(backend) throttle indexation tasks instead of debounce (simplier)
Replace indexer_debounce_lock|release functions by indexer_throttle_acquire() Instead of mutex-like mechanism, simply set a flag in cache for an amount of time that prevents any other task creation. Signed-off-by: Fabre Florian <ffabre@hybird.org>
This commit is contained in:
committed by
Quentin BEY
parent
044c1495a9
commit
b0e7a511cb
@@ -5,56 +5,50 @@ from logging import getLogger
|
|||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
|
|
||||||
|
from django_redis.cache import RedisCache
|
||||||
|
|
||||||
|
from core import models
|
||||||
|
from core.services.search_indexers import (
|
||||||
|
get_batch_accesses_by_users_and_teams,
|
||||||
|
get_document_indexer,
|
||||||
|
)
|
||||||
|
|
||||||
from impress.celery_app import app
|
from impress.celery_app import app
|
||||||
|
|
||||||
logger = getLogger(__file__)
|
logger = getLogger(__file__)
|
||||||
|
|
||||||
|
|
||||||
def indexer_debounce_lock(document_id):
|
def indexer_throttle_acquire(document_id, timeout=0, atomic=True):
|
||||||
"""Increase or reset counter"""
|
"""
|
||||||
key = f"doc-indexer-debounce-{document_id}"
|
Enable the task throttle flag for a delay.
|
||||||
|
Uses redis locks if available to ensure atomic changes
|
||||||
|
"""
|
||||||
|
key = f"doc-indexer-throttle-{document_id}"
|
||||||
|
|
||||||
try:
|
if isinstance(cache, RedisCache) and atomic:
|
||||||
return cache.incr(key)
|
with cache.locks(key):
|
||||||
except ValueError:
|
return indexer_throttle_acquire(document_id, timeout, atomic=False)
|
||||||
cache.set(key, 1)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
|
# Use add() here :
|
||||||
def indexer_debounce_release(document_id):
|
# - set the flag and returns true if not exist
|
||||||
"""Decrease or reset counter"""
|
# - do nothing and return false if exist
|
||||||
key = f"doc-indexer-debounce-{document_id}"
|
return cache.add(key, 1, timeout=timeout)
|
||||||
|
|
||||||
try:
|
|
||||||
return cache.decr(key)
|
|
||||||
except ValueError:
|
|
||||||
cache.set(key, 0)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task
|
||||||
def document_indexer_task(document_id):
|
def document_indexer_task(document_id):
|
||||||
"""Celery Task : Sends indexation query for a document."""
|
"""Celery Task : Sends indexation query for a document."""
|
||||||
# Prevents some circular imports
|
|
||||||
# pylint: disable=import-outside-toplevel
|
|
||||||
from core import models # noqa : PLC0415
|
|
||||||
from core.services.search_indexers import ( # noqa : PLC0415
|
|
||||||
get_batch_accesses_by_users_and_teams,
|
|
||||||
get_document_indexer,
|
|
||||||
)
|
|
||||||
|
|
||||||
# check if the counter : if still up, skip the task. only the last one
|
|
||||||
# within the countdown delay will do the query.
|
|
||||||
if indexer_debounce_release(document_id) > 0:
|
|
||||||
logger.info("Skip document %s indexation", document_id)
|
|
||||||
return
|
|
||||||
|
|
||||||
indexer = get_document_indexer()
|
indexer = get_document_indexer()
|
||||||
|
|
||||||
if indexer is None:
|
if indexer is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
doc = models.Document.objects.get(pk=document_id)
|
try:
|
||||||
|
doc = models.Document.objects.get(pk=document_id)
|
||||||
|
except models.Document.DoesNotExist:
|
||||||
|
# Skip the task if the document does not exist.
|
||||||
|
return
|
||||||
|
|
||||||
accesses = get_batch_accesses_by_users_and_teams((doc.path,))
|
accesses = get_batch_accesses_by_users_and_teams((doc.path,))
|
||||||
|
|
||||||
data = indexer.serialize_document(document=doc, accesses=accesses)
|
data = indexer.serialize_document(document=doc, accesses=accesses)
|
||||||
@@ -76,14 +70,15 @@ def trigger_document_indexer(document):
|
|||||||
if not settings.SEARCH_INDEXER_CLASS:
|
if not settings.SEARCH_INDEXER_CLASS:
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info(
|
# Each time this method is called during a countdown, we increment the
|
||||||
"Add task for document %s indexation in %.2f seconds",
|
|
||||||
document.pk,
|
|
||||||
countdown,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Each time this method is called during the countdown, we increment the
|
|
||||||
# counter and each task decrease it, so the index be run only once.
|
# counter and each task decrease it, so the index be run only once.
|
||||||
indexer_debounce_lock(document.pk)
|
if indexer_throttle_acquire(document.pk, timeout=countdown):
|
||||||
|
logger.info(
|
||||||
|
"Add task for document %s indexation in %.2f seconds",
|
||||||
|
document.pk,
|
||||||
|
countdown,
|
||||||
|
)
|
||||||
|
|
||||||
document_indexer_task.apply_async(args=[document.pk], countdown=countdown)
|
document_indexer_task.apply_async(args=[document.pk])
|
||||||
|
else:
|
||||||
|
logger.info("Skip task for document %s indexation", document.pk)
|
||||||
|
|||||||
@@ -93,6 +93,7 @@ def test_api_documents_search_endpoint_is_none(indexer_settings):
|
|||||||
"path": document.path,
|
"path": document.path,
|
||||||
"title": document.title,
|
"title": document.title,
|
||||||
"updated_at": document.updated_at.isoformat().replace("+00:00", "Z"),
|
"updated_at": document.updated_at.isoformat().replace("+00:00", "Z"),
|
||||||
|
"deleted_at": None,
|
||||||
"user_role": access.role,
|
"user_role": access.role,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -184,6 +185,7 @@ def test_api_documents_search_format(indexer_settings):
|
|||||||
"path": document.path,
|
"path": document.path,
|
||||||
"title": document.title,
|
"title": document.title,
|
||||||
"updated_at": document.updated_at.isoformat().replace("+00:00", "Z"),
|
"updated_at": document.updated_at.isoformat().replace("+00:00", "Z"),
|
||||||
|
"deleted_at": None,
|
||||||
"user_role": access.role,
|
"user_role": access.role,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import pytest
|
|||||||
|
|
||||||
from core import factories, models
|
from core import factories, models
|
||||||
from core.services.search_indexers import SearchIndexer
|
from core.services.search_indexers import SearchIndexer
|
||||||
|
from core.tasks.search import document_indexer_task
|
||||||
|
|
||||||
pytestmark = pytest.mark.django_db
|
pytestmark = pytest.mark.django_db
|
||||||
|
|
||||||
@@ -1645,10 +1646,10 @@ def test_models_documents_post_save_indexer(mock_push, indexer_settings):
|
|||||||
key=itemgetter("id"),
|
key=itemgetter("id"),
|
||||||
)
|
)
|
||||||
|
|
||||||
# The debounce counters should be reset
|
# The throttle counters should be reset
|
||||||
assert cache.get(f"doc-indexer-debounce-{doc1.pk}") == 0
|
assert cache.get(f"doc-indexer-throttle-{doc1.pk}") is None
|
||||||
assert cache.get(f"doc-indexer-debounce-{doc2.pk}") == 0
|
assert cache.get(f"doc-indexer-throttle-{doc2.pk}") is None
|
||||||
assert cache.get(f"doc-indexer-debounce-{doc3.pk}") == 0
|
assert cache.get(f"doc-indexer-throttle-{doc3.pk}") is None
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(SearchIndexer, "push")
|
@mock.patch.object(SearchIndexer, "push")
|
||||||
@@ -1658,10 +1659,31 @@ def test_models_documents_post_save_indexer_not_configured(mock_push, indexer_se
|
|||||||
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
|
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
|
||||||
indexer_settings.SEARCH_INDEXER_CLASS = None
|
indexer_settings.SEARCH_INDEXER_CLASS = None
|
||||||
|
|
||||||
with transaction.atomic():
|
user = factories.UserFactory()
|
||||||
factories.DocumentFactory()
|
|
||||||
|
|
||||||
assert mock_push.call_args_list == []
|
with transaction.atomic():
|
||||||
|
doc = factories.DocumentFactory()
|
||||||
|
factories.UserDocumentAccessFactory(document=doc, user=user)
|
||||||
|
|
||||||
|
assert mock_push.assert_not_called
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch.object(SearchIndexer, "push")
|
||||||
|
@pytest.mark.django_db(transaction=True)
|
||||||
|
def test_models_documents_post_save_indexer_wrongly_configured(
|
||||||
|
mock_push, indexer_settings
|
||||||
|
):
|
||||||
|
"""Task should not start an indexation when disabled"""
|
||||||
|
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
|
||||||
|
indexer_settings.SEARCH_INDEXER_URL = None
|
||||||
|
|
||||||
|
user = factories.UserFactory()
|
||||||
|
|
||||||
|
with transaction.atomic():
|
||||||
|
doc = factories.DocumentFactory()
|
||||||
|
factories.UserDocumentAccessFactory(document=doc, user=user)
|
||||||
|
|
||||||
|
assert mock_push.assert_not_called
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(SearchIndexer, "push")
|
@mock.patch.object(SearchIndexer, "push")
|
||||||
@@ -1698,10 +1720,10 @@ def test_models_documents_post_save_indexer_with_accesses(mock_push, indexer_set
|
|||||||
key=itemgetter("id"),
|
key=itemgetter("id"),
|
||||||
)
|
)
|
||||||
|
|
||||||
# The debounce counters should be reset
|
# The throttle counters should be reset
|
||||||
assert cache.get(f"doc-indexer-debounce-{doc1.pk}") == 0
|
assert cache.get(f"doc-indexer-throttle-{doc1.pk}") is None
|
||||||
assert cache.get(f"doc-indexer-debounce-{doc2.pk}") == 0
|
assert cache.get(f"doc-indexer-throttle-{doc2.pk}") is None
|
||||||
assert cache.get(f"doc-indexer-debounce-{doc3.pk}") == 0
|
assert cache.get(f"doc-indexer-throttle-{doc3.pk}") is None
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(SearchIndexer, "push")
|
@mock.patch.object(SearchIndexer, "push")
|
||||||
@@ -1760,10 +1782,34 @@ def test_models_documents_post_save_indexer_deleted(mock_push, indexer_settings)
|
|||||||
key=itemgetter("id"),
|
key=itemgetter("id"),
|
||||||
)
|
)
|
||||||
|
|
||||||
# The debounce counters should be reset
|
# The throttle counters should be reset
|
||||||
assert cache.get(f"doc-indexer-debounce-{doc.pk}") == 0
|
assert cache.get(f"doc-indexer-throttle-{doc.pk}") is None
|
||||||
assert cache.get(f"doc-indexer-debounce-{doc_deleted.pk}") == 0
|
assert cache.get(f"doc-indexer-throttle-{doc_deleted.pk}") is None
|
||||||
assert cache.get(f"doc-indexer-debounce-{doc_ancestor_deleted.pk}") == 0
|
assert cache.get(f"doc-indexer-throttle-{doc_ancestor_deleted.pk}") is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db(transaction=True)
|
||||||
|
def test_models_documents_indexer_hard_deleted(indexer_settings):
|
||||||
|
"""Indexation task on hard deleted document"""
|
||||||
|
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
|
||||||
|
|
||||||
|
user = factories.UserFactory()
|
||||||
|
|
||||||
|
with transaction.atomic():
|
||||||
|
doc = factories.DocumentFactory(
|
||||||
|
link_reach=models.LinkReachChoices.AUTHENTICATED
|
||||||
|
)
|
||||||
|
factories.UserDocumentAccessFactory(document=doc, user=user)
|
||||||
|
|
||||||
|
doc_id = doc.pk
|
||||||
|
doc.delete()
|
||||||
|
|
||||||
|
# Call task on deleted document.
|
||||||
|
document_indexer_task.apply(args=[doc_id])
|
||||||
|
|
||||||
|
with mock.patch.object(SearchIndexer, "push") as mock_push:
|
||||||
|
# Hard delete document are not re-indexed.
|
||||||
|
assert mock_push.assert_not_called
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(SearchIndexer, "push")
|
@mock.patch.object(SearchIndexer, "push")
|
||||||
@@ -1836,7 +1882,7 @@ def test_models_documents_post_save_indexer_restored(mock_push, indexer_settings
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db(transaction=True)
|
@pytest.mark.django_db(transaction=True)
|
||||||
def test_models_documents_post_save_indexer_debounce(indexer_settings):
|
def test_models_documents_post_save_indexer_throttle(indexer_settings):
|
||||||
"""Test indexation task skipping on document update"""
|
"""Test indexation task skipping on document update"""
|
||||||
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
|
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
|
||||||
|
|
||||||
@@ -1853,11 +1899,11 @@ def test_models_documents_post_save_indexer_debounce(indexer_settings):
|
|||||||
}
|
}
|
||||||
|
|
||||||
with mock.patch.object(SearchIndexer, "push") as mock_push:
|
with mock.patch.object(SearchIndexer, "push") as mock_push:
|
||||||
# Simulate 1 waiting task
|
# Simulate 1 running task
|
||||||
cache.set(f"doc-indexer-debounce-{doc.pk}", 1)
|
cache.set(f"doc-indexer-throttle-{doc.pk}", 1)
|
||||||
|
|
||||||
# save doc to trigger the indexer, but nothing should be done since
|
# save doc to trigger the indexer, but nothing should be done since
|
||||||
# the counter is over 0
|
# the flag is up
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
doc.save()
|
doc.save()
|
||||||
|
|
||||||
@@ -1865,7 +1911,7 @@ def test_models_documents_post_save_indexer_debounce(indexer_settings):
|
|||||||
|
|
||||||
with mock.patch.object(SearchIndexer, "push") as mock_push:
|
with mock.patch.object(SearchIndexer, "push") as mock_push:
|
||||||
# No waiting task
|
# No waiting task
|
||||||
cache.set(f"doc-indexer-debounce-{doc.pk}", 0)
|
cache.delete(f"doc-indexer-throttle-{doc.pk}")
|
||||||
|
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
doc = models.Document.objects.get(pk=doc.pk)
|
doc = models.Document.objects.get(pk=doc.pk)
|
||||||
|
|||||||
Reference in New Issue
Block a user