(backend) add document search indexer

Add indexer that loops across documents in the database, formats them
as json objects and indexes them in the remote "Find" mico-service.
This commit is contained in:
Samuel Paccoud - DINUM
2025-07-24 12:31:20 +02:00
committed by Quentin BEY
parent f4bdde7e59
commit 1d9c2a8118
7 changed files with 503 additions and 0 deletions

View File

@@ -247,6 +247,10 @@ demo: ## flush db then create a demo for load testing purpose
@$(MANAGE) create_demo
.PHONY: demo
index: ## index all documents to remote search
@$(MANAGE) index
.PHONY: index
# Nota bene: Black should come after isort just in case they don't agree...
lint: ## lint back-end python sources
lint: \

View File

@@ -0,0 +1,28 @@
"""
Handle search setup that needs to be done at bootstrap time.
"""
import logging
import time
from django.core.management.base import BaseCommand
from ...services.search_indexers import FindDocumentIndexer
logger = logging.getLogger("docs.search.bootstrap_search")
class Command(BaseCommand):
"""Index all documents to remote search service"""
help = __doc__
def handle(self, *args, **options):
"""Launch and log search index generation."""
logger.info("Starting to regenerate Find index...")
start = time.perf_counter()
FindDocumentIndexer().index()
duration = time.perf_counter() - start
logger.info(f"Search index regenerated in {duration:.2f} seconds.")

View File

@@ -0,0 +1,170 @@
"""Document search index management utilities and indexers"""
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from django.conf import settings
import requests
from core import models, utils
logger = logging.getLogger(__name__)
def get_batch_accesses_by_users_and_teams(paths):
"""
Get accesses related to a list of document paths,
grouped by users and teams, including all ancestor paths.
"""
print("paths: ", paths)
ancestor_map = utils.get_ancestor_to_descendants_map(paths, steplen=models.Document.steplen)
ancestor_paths = list(ancestor_map.keys())
print("ancestor map: ", ancestor_map)
print("ancestor paths: ", ancestor_paths)
access_qs = models.DocumentAccess.objects.filter(
document__path__in=ancestor_paths
).values("document__path", "user__sub", "team")
access_by_document_path = defaultdict(lambda: {"users": set(), "teams": set()})
for access in access_qs:
ancestor_path = access["document__path"]
user_sub = access["user__sub"]
team = access["team"]
for descendant_path in ancestor_map.get(ancestor_path, []):
if user_sub:
access_by_document_path[descendant_path]["users"].add(str(user_sub))
if team:
access_by_document_path[descendant_path]["teams"].add(team)
return dict(access_by_document_path)
class BaseDocumentIndexer(ABC):
"""
Base class for document indexers.
Handles batching and access resolution. Subclasses must implement both
`serialize_document()` and `push()` to define backend-specific behavior.
"""
def __init__(self, batch_size=None):
"""
Initialize the indexer.
Args:
batch_size (int, optional): Number of documents per batch.
Defaults to settings.SEARCH_INDEXER_BATCH_SIZE.
"""
self.batch_size = batch_size or settings.SEARCH_INDEXER_BATCH_SIZE
def index(self):
"""
Fetch documents in batches, serialize them, and push to the search backend.
"""
last_id = 0
while True:
documents_batch = list(
models.Document.objects.filter(
id__gt=last_id,
).order_by("id")[: self.batch_size]
)
if not documents_batch:
break
doc_paths = [doc.path for doc in documents_batch]
last_id = documents_batch[-1].id
accesses_by_document_path = get_batch_accesses_by_users_and_teams(doc_paths)
serialized_batch = [
self.serialize_document(document, accesses_by_document_path)
for document in documents_batch
]
self.push(serialized_batch)
@abstractmethod
def serialize_document(self, document, accesses):
"""
Convert a Document instance to a JSON-serializable format for indexing.
Must be implemented by subclasses.
"""
@abstractmethod
def push(self, data):
"""
Push a batch of serialized documents to the backend.
Must be implemented by subclasses.
"""
class FindDocumentIndexer(BaseDocumentIndexer):
"""
Document indexer that pushes documents to La Suite Find app.
"""
def serialize_document(self, document, accesses):
"""
Convert a Document to the JSON format expected by La Suite Find.
Args:
document (Document): The document instance.
accesses (dict): Mapping of document ID to user/team access.
Returns:
dict: A JSON-serializable dictionary.
"""
doc_path = document.path
text_content = utils.base64_yjs_to_text(document.content)
return {
"id": str(document.id),
"title": document.title,
"content": text_content,
"depth": document.depth,
"path": document.path,
"numchild": document.numchild,
"created_at": document.created_at.isoformat(),
"updated_at": document.updated_at.isoformat(),
"users": list(accesses.get(doc_path, {}).get("users", set())),
"groups": list(accesses.get(doc_path, {}).get("teams", set())),
"reach": document.computed_link_reach,
"size": len(text_content.encode("utf-8")),
"is_active": not bool(document.ancestors_deleted_at),
}
def push(self, data):
"""
Push a batch of documents to the Find backend.
Args:
data (list): List of document dictionaries.
"""
url = getattr(settings, "SEARCH_INDEXER_URL", None)
if not url:
raise RuntimeError(
"SEARCH_INDEXER_URL must be set in Django settings before indexing."
)
secret = getattr(settings, "SEARCH_INDEXER_SECRET", None)
if not secret:
raise RuntimeError(
"SEARCH_INDEXER_SECRET must be set in Django settings before indexing."
)
try:
response = requests.post(
url,
json=data,
headers={"Authorization": f"Bearer {secret}"},
timeout=10,
)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.error("HTTPError: %s", e)
logger.error("Response content: %s", response.text)
raise

View File

@@ -0,0 +1,243 @@
"""Tests for Documents search indexers"""
from unittest.mock import patch
import pytest
from core import factories, utils
from core.services.search_indexers import FindDocumentIndexer
pytestmark = pytest.mark.django_db
def test_push_raises_error_if_search_indexer_url_is_none(settings):
"""
Indexer should raise RuntimeError if SEARCH_INDEXER_URL is None or empty.
"""
settings.SEARCH_INDEXER_URL = None
indexer = FindDocumentIndexer()
with pytest.raises(RuntimeError) as exc_info:
indexer.push([])
assert "SEARCH_INDEXER_URL must be set in Django settings before indexing." in str(
exc_info.value
)
def test_push_raises_error_if_search_indexer_url_is_empty(settings):
"""
Indexer should raise RuntimeError if SEARCH_INDEXER_URL is empty string.
"""
settings.SEARCH_INDEXER_URL = ""
indexer = FindDocumentIndexer()
with pytest.raises(RuntimeError) as exc_info:
indexer.push([])
assert "SEARCH_INDEXER_URL must be set in Django settings before indexing." in str(
exc_info.value
)
def test_push_raises_error_if_search_indexer_secret_is_none(settings):
"""
Indexer should raise RuntimeError if SEARCH_INDEXER_SECRET is None or empty.
"""
settings.SEARCH_INDEXER_SECRET = None
indexer = FindDocumentIndexer()
with pytest.raises(RuntimeError) as exc_info:
indexer.push([])
assert (
"SEARCH_INDEXER_SECRET must be set in Django settings before indexing."
in str(exc_info.value)
)
def test_push_raises_error_if_search_indexer_secret_is_empty(settings):
"""
Indexer should raise RuntimeError if SEARCH_INDEXER_SECRET is empty string.
"""
settings.SEARCH_INDEXER_SECRET = ""
indexer = FindDocumentIndexer()
with pytest.raises(RuntimeError) as exc_info:
indexer.push([])
assert (
"SEARCH_INDEXER_SECRET must be set in Django settings before indexing."
in str(exc_info.value)
)
def test_services_search_indexers_serialize_document_returns_expected_json():
"""
It should serialize documents with correct metadata and access control.
"""
user_a, user_b = factories.UserFactory.create_batch(2)
document = factories.DocumentFactory()
factories.DocumentFactory(parent=document)
factories.UserDocumentAccessFactory(document=document, user=user_a)
factories.UserDocumentAccessFactory(document=document, user=user_b)
factories.TeamDocumentAccessFactory(document=document, team="team1")
factories.TeamDocumentAccessFactory(document=document, team="team2")
accesses = {
document.path: {
"users": {str(user_a.sub), str(user_b.sub)},
"teams": {"team1", "team2"},
}
}
indexer = FindDocumentIndexer()
result = indexer.serialize_document(document, accesses)
assert set(result.pop("users")) == {str(user_a.sub), str(user_b.sub)}
assert set(result.pop("groups")) == {"team1", "team2"}
assert result == {
"id": str(document.id),
"title": document.title,
"depth": 1,
"path": document.path,
"numchild": 1,
"content": utils.base64_yjs_to_text(document.content),
"created_at": document.created_at.isoformat(),
"updated_at": document.updated_at.isoformat(),
"reach": document.link_reach,
'size': 13,
"is_active": True,
}
def test_services_search_indexers_serialize_document_deleted():
"""Deleted documents are marked as just in the serialized json."""
parent = factories.DocumentFactory()
document = factories.DocumentFactory(parent=parent)
parent.soft_delete()
document.refresh_from_db()
indexer = FindDocumentIndexer()
result = indexer.serialize_document(document, {})
assert result["is_active"] is False
@patch.object(FindDocumentIndexer, "push")
def test_services_search_indexers_batches_pass_only_batch_accesses(mock_push, settings):
"""
Documents indexing should be processed in batches,
and only the access data relevant to each batch should be used.
"""
settings.SEARCH_INDEXER_BATCH_SIZE = 2
documents = factories.DocumentFactory.create_batch(5)
# Attach a single user access to each document
expected_user_subs = {}
for document in documents:
access = factories.UserDocumentAccessFactory(document=document)
expected_user_subs[str(document.id)] = str(access.user.sub)
FindDocumentIndexer().index()
# Should be 3 batches: 2 + 2 + 1
assert mock_push.call_count == 3
seen_doc_ids = set()
for call in mock_push.call_args_list:
batch = call.args[0]
assert isinstance(batch, list)
for doc_json in batch:
doc_id = doc_json["id"]
seen_doc_ids.add(doc_id)
# Only one user expected per document
assert doc_json["users"] == [expected_user_subs[doc_id]]
assert doc_json["groups"] == []
# Make sure all 5 documents were indexed
assert seen_doc_ids == {str(d.id) for d in documents}
@patch.object(FindDocumentIndexer, "push")
def test_services_search_indexers_ancestors_link_reach(mock_push):
"""Document accesses and reach should take into account ancestors link reaches."""
great_grand_parent = factories.DocumentFactory(link_reach="restricted")
grand_parent = factories.DocumentFactory(parent=great_grand_parent, link_reach="authenticated")
parent = factories.DocumentFactory(parent=grand_parent, link_reach="public")
document = factories.DocumentFactory(parent=parent, link_reach="restricted")
FindDocumentIndexer().index()
seen_doc_ids = set()
results = {doc["id"]: doc for doc in mock_push.call_args[0][0]}
assert len(results) == 4
assert results[str(great_grand_parent.id)]["reach"] == "restricted"
assert results[str(grand_parent.id)]["reach"] == "authenticated"
assert results[str(parent.id)]["reach"] == "public"
assert results[str(document.id)]["reach"] == "public"
@patch.object(FindDocumentIndexer, "push")
def test_services_search_indexers_ancestors_users(mock_push):
"""Document accesses and reach should include users from ancestors."""
user_gp, user_p, user_d = factories.UserFactory.create_batch(3)
grand_parent = factories.DocumentFactory(users=[user_gp])
parent = factories.DocumentFactory(parent=grand_parent, users=[user_p])
document = factories.DocumentFactory(parent=parent, users=[user_d])
FindDocumentIndexer().index()
seen_doc_ids = set()
results = {doc["id"]: doc for doc in mock_push.call_args[0][0]}
assert len(results) == 3
assert results[str(grand_parent.id)]["users"] == [str(user_gp.sub)]
assert set(results[str(parent.id)]["users"]) == {str(user_gp.sub), str(user_p.sub)}
assert set(results[str(document.id)]["users"]) == {str(user_gp.sub), str(user_p.sub), str(user_d.sub)}
@patch.object(FindDocumentIndexer, "push")
def test_services_search_indexers_ancestors_teams(mock_push):
"""Document accesses and reach should include teams from ancestors."""
grand_parent = factories.DocumentFactory(teams=["team_gp"])
parent = factories.DocumentFactory(parent=grand_parent, teams=["team_p"])
document = factories.DocumentFactory(parent=parent, teams=["team_d"])
FindDocumentIndexer().index()
seen_doc_ids = set()
results = {doc["id"]: doc for doc in mock_push.call_args[0][0]}
assert len(results) == 3
assert results[str(grand_parent.id)]["groups"] == ["team_gp"]
assert set(results[str(parent.id)]["groups"]) == {"team_gp", "team_p"}
assert set(results[str(document.id)]["groups"]) == {"team_gp", "team_p", "team_d"}
@patch("requests.post")
def test_push_uses_correct_url_and_data(mock_post, settings):
"""
push() should call requests.post with the correct URL from settings
the timeout set to 10 seconds and the data as JSON.
"""
settings.SEARCH_INDEXER_URL = "http://example.com/index"
indexer = FindDocumentIndexer()
sample_data = [{"id": "123", "title": "Test"}]
mock_response = mock_post.return_value
mock_response.raise_for_status.return_value = None # No error
indexer.push(sample_data)
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
assert args[0] == settings.SEARCH_INDEXER_URL
assert kwargs.get("json") == sample_data
assert kwargs.get("timeout") == 10

View File

@@ -75,3 +75,28 @@ def test_utils_extract_attachments():
base64_string = base64.b64encode(update).decode("utf-8")
# image_key2 is missing the "/media/" part and shouldn't get extracted
assert utils.extract_attachments(base64_string) == [image_key1, image_key3]
def test_utils_get_ancestor_to_descendants_map_single_path():
"""Test ancestor mapping of a single path."""
paths = ['000100020005']
result = utils.get_ancestor_to_descendants_map(paths, steplen=4)
assert result == {
'0001': {'000100020005'},
'00010002': {'000100020005'},
'000100020005': {'000100020005'},
}
def test_utils_get_ancestor_to_descendants_map_multiple_paths():
"""Test ancestor mapping of multiple paths with shared prefixes."""
paths = ['000100020005', '00010003']
result = utils.get_ancestor_to_descendants_map(paths, steplen=4)
assert result == {
'0001': {'000100020005', '00010003'},
'00010002': {'000100020005'},
'000100020005': {'000100020005'},
'00010003': {'00010003'},
}

View File

@@ -1,6 +1,7 @@
"""Utils for the core app."""
import base64
from collections import defaultdict
import re
import pycrdt
@@ -9,6 +10,27 @@ from bs4 import BeautifulSoup
from core import enums
def get_ancestor_to_descendants_map(paths, steplen):
"""
Given a list of document paths, return a mapping of ancestor_path -> set of descendant_paths.
Each path is assumed to use materialized path format with fixed-length segments.
Args:
paths (list of str): List of full document paths.
steplen (int): Length of each path segment.
Returns:
dict[str, set[str]]: Mapping from ancestor path to its descendant paths (including itself).
"""
ancestor_map = defaultdict(set)
for path in paths:
for i in range(steplen, len(path) + 1, steplen):
ancestor = path[:i]
ancestor_map[ancestor].add(path)
return ancestor_map
def filter_descendants(paths, root_paths, skip_sorting=False):
"""
Filters paths to keep only those that are descendants of any path in root_paths.

View File

@@ -99,6 +99,17 @@ class Base(Configuration):
}
DEFAULT_AUTO_FIELD = "django.db.models.AutoField"
# Search
SEARCH_INDEXER_BATCH_SIZE = values.IntegerValue(
default=100_000, environ_name="SEARCH_INDEXER_BATCH_SIZE", environ_prefix=None
)
SEARCH_INDEXER_URL = values.Value(
default=None, environ_name="SEARCH_INDEXER_URL", environ_prefix=None
)
SEARCH_INDEXER_SECRET = values.Value(
default=None, environ_name="SEARCH_INDEXER_SECRET", environ_prefix=None
)
# Static files (CSS, JavaScript, Images)
STATIC_URL = "/static/"
STATIC_ROOT = os.path.join(DATA_DIR, "static")