🩹(backend) fix empty indexation batch
As we filter the empty documents from the batch during indexing some batches can be empty and cause an error. Now they are ignored. Add --batch-size argument to the index command. Signed-off-by: Fabre Florian <ffabre@hybird.org>
This commit is contained in:
committed by
Quentin BEY
parent
780bcb360a
commit
f5a9ef2643
@@ -17,6 +17,17 @@ class Command(BaseCommand):
|
|||||||
|
|
||||||
help = __doc__
|
help = __doc__
|
||||||
|
|
||||||
|
def add_arguments(self, parser):
|
||||||
|
"""Add argument to require forcing execution when not in debug mode."""
|
||||||
|
parser.add_argument(
|
||||||
|
"--batch-size",
|
||||||
|
action="store",
|
||||||
|
dest="batch_size",
|
||||||
|
type=int,
|
||||||
|
default=50,
|
||||||
|
help="Indexation query batch size",
|
||||||
|
)
|
||||||
|
|
||||||
def handle(self, *args, **options):
|
def handle(self, *args, **options):
|
||||||
"""Launch and log search index generation."""
|
"""Launch and log search index generation."""
|
||||||
indexer = get_document_indexer()
|
indexer = get_document_indexer()
|
||||||
@@ -26,9 +37,10 @@ class Command(BaseCommand):
|
|||||||
|
|
||||||
logger.info("Starting to regenerate Find index...")
|
logger.info("Starting to regenerate Find index...")
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
|
batch_size = options["batch_size"]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
count = indexer.index()
|
count = indexer.index(batch_size=batch_size)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
raise CommandError("Unable to regenerate index") from err
|
raise CommandError("Unable to regenerate index") from err
|
||||||
|
|
||||||
|
|||||||
@@ -102,15 +102,11 @@ class BaseDocumentIndexer(ABC):
|
|||||||
`serialize_document()` and `push()` to define backend-specific behavior.
|
`serialize_document()` and `push()` to define backend-specific behavior.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, batch_size=None):
|
def __init__(self):
|
||||||
"""
|
"""
|
||||||
Initialize the indexer.
|
Initialize the indexer.
|
||||||
|
|
||||||
Args:
|
|
||||||
batch_size (int, optional): Number of documents per batch.
|
|
||||||
Defaults to settings.SEARCH_INDEXER_BATCH_SIZE.
|
|
||||||
"""
|
"""
|
||||||
self.batch_size = batch_size or settings.SEARCH_INDEXER_BATCH_SIZE
|
self.batch_size = settings.SEARCH_INDEXER_BATCH_SIZE
|
||||||
self.indexer_url = settings.SEARCH_INDEXER_URL
|
self.indexer_url = settings.SEARCH_INDEXER_URL
|
||||||
self.indexer_secret = settings.SEARCH_INDEXER_SECRET
|
self.indexer_secret = settings.SEARCH_INDEXER_SECRET
|
||||||
self.search_url = settings.SEARCH_INDEXER_QUERY_URL
|
self.search_url = settings.SEARCH_INDEXER_QUERY_URL
|
||||||
@@ -130,19 +126,26 @@ class BaseDocumentIndexer(ABC):
|
|||||||
"SEARCH_INDEXER_QUERY_URL must be set in Django settings."
|
"SEARCH_INDEXER_QUERY_URL must be set in Django settings."
|
||||||
)
|
)
|
||||||
|
|
||||||
def index(self, queryset=None):
|
def index(self, queryset=None, batch_size=None):
|
||||||
"""
|
"""
|
||||||
Fetch documents in batches, serialize them, and push to the search backend.
|
Fetch documents in batches, serialize them, and push to the search backend.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
queryset (optional): Document queryset
|
||||||
|
Defaults to all documents without filter.
|
||||||
|
batch_size (int, optional): Number of documents per batch.
|
||||||
|
Defaults to settings.SEARCH_INDEXER_BATCH_SIZE.
|
||||||
"""
|
"""
|
||||||
last_id = 0
|
last_id = 0
|
||||||
count = 0
|
count = 0
|
||||||
queryset = queryset or models.Document.objects.all()
|
queryset = queryset or models.Document.objects.all()
|
||||||
|
batch_size = batch_size or self.batch_size
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
documents_batch = list(
|
documents_batch = list(
|
||||||
queryset.filter(
|
queryset.filter(
|
||||||
id__gt=last_id,
|
id__gt=last_id,
|
||||||
).order_by("id")[: self.batch_size]
|
).order_by("id")[:batch_size]
|
||||||
)
|
)
|
||||||
|
|
||||||
if not documents_batch:
|
if not documents_batch:
|
||||||
@@ -158,8 +161,9 @@ class BaseDocumentIndexer(ABC):
|
|||||||
if document.content or document.title
|
if document.content or document.title
|
||||||
]
|
]
|
||||||
|
|
||||||
self.push(serialized_batch)
|
if serialized_batch:
|
||||||
count += len(serialized_batch)
|
self.push(serialized_batch)
|
||||||
|
count += len(serialized_batch)
|
||||||
|
|
||||||
return count
|
return count
|
||||||
|
|
||||||
|
|||||||
@@ -299,6 +299,44 @@ def test_services_search_indexers_batches_pass_only_batch_accesses(
|
|||||||
assert seen_doc_ids == {str(d.id) for d in documents}
|
assert seen_doc_ids == {str(d.id) for d in documents}
|
||||||
|
|
||||||
|
|
||||||
|
@patch.object(SearchIndexer, "push")
|
||||||
|
@pytest.mark.usefixtures("indexer_settings")
|
||||||
|
def test_services_search_indexers_batch_size_argument(mock_push):
|
||||||
|
"""
|
||||||
|
Documents indexing should be processed in batches,
|
||||||
|
batch_size overrides SEARCH_INDEXER_BATCH_SIZE
|
||||||
|
"""
|
||||||
|
documents = factories.DocumentFactory.create_batch(5)
|
||||||
|
|
||||||
|
# Attach a single user access to each document
|
||||||
|
expected_user_subs = {}
|
||||||
|
for document in documents:
|
||||||
|
access = factories.UserDocumentAccessFactory(document=document)
|
||||||
|
expected_user_subs[str(document.id)] = str(access.user.sub)
|
||||||
|
|
||||||
|
assert SearchIndexer().index(batch_size=2) == 5
|
||||||
|
|
||||||
|
# Should be 3 batches: 2 + 2 + 1
|
||||||
|
assert mock_push.call_count == 3
|
||||||
|
|
||||||
|
seen_doc_ids = set()
|
||||||
|
|
||||||
|
for call in mock_push.call_args_list:
|
||||||
|
batch = call.args[0]
|
||||||
|
assert isinstance(batch, list)
|
||||||
|
|
||||||
|
for doc_json in batch:
|
||||||
|
doc_id = doc_json["id"]
|
||||||
|
seen_doc_ids.add(doc_id)
|
||||||
|
|
||||||
|
# Only one user expected per document
|
||||||
|
assert doc_json["users"] == [expected_user_subs[doc_id]]
|
||||||
|
assert doc_json["groups"] == []
|
||||||
|
|
||||||
|
# Make sure all 5 documents were indexed
|
||||||
|
assert seen_doc_ids == {str(d.id) for d in documents}
|
||||||
|
|
||||||
|
|
||||||
@patch.object(SearchIndexer, "push")
|
@patch.object(SearchIndexer, "push")
|
||||||
@pytest.mark.usefixtures("indexer_settings")
|
@pytest.mark.usefixtures("indexer_settings")
|
||||||
def test_services_search_indexers_ignore_empty_documents(mock_push):
|
def test_services_search_indexers_ignore_empty_documents(mock_push):
|
||||||
@@ -327,6 +365,25 @@ def test_services_search_indexers_ignore_empty_documents(mock_push):
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@patch.object(SearchIndexer, "push")
|
||||||
|
def test_services_search_indexers_skip_empty_batches(mock_push, indexer_settings):
|
||||||
|
"""
|
||||||
|
Documents indexing batch can be empty if all the docs are empty.
|
||||||
|
"""
|
||||||
|
indexer_settings.SEARCH_INDEXER_BATCH_SIZE = 2
|
||||||
|
|
||||||
|
document = factories.DocumentFactory()
|
||||||
|
|
||||||
|
# Only empty docs
|
||||||
|
factories.DocumentFactory.create_batch(5, content="", title="")
|
||||||
|
|
||||||
|
assert SearchIndexer().index() == 1
|
||||||
|
assert mock_push.call_count == 1
|
||||||
|
|
||||||
|
results = [doc["id"] for doc in mock_push.call_args[0][0]]
|
||||||
|
assert results == [str(document.id)]
|
||||||
|
|
||||||
|
|
||||||
@patch.object(SearchIndexer, "push")
|
@patch.object(SearchIndexer, "push")
|
||||||
@pytest.mark.usefixtures("indexer_settings")
|
@pytest.mark.usefixtures("indexer_settings")
|
||||||
def test_services_search_indexers_ancestors_link_reach(mock_push):
|
def test_services_search_indexers_ancestors_link_reach(mock_push):
|
||||||
|
|||||||
Reference in New Issue
Block a user