Compare commits

2 Commits

Author SHA1 Message Date
cf21ffc452 Add production Dockerfile for container builds
Some checks failed
Main / Init (push) Has been cancelled
Main / Lint (push) Has been cancelled
Main / Test (push) Has been cancelled
Main / OpenSearch (push) Has been cancelled
Main / Package (push) Has been cancelled
Main / Publish (push) Has been cancelled
Multi-stage build using rust:slim-bookworm with io_uring support.
Built by buildkitd on the x86_64 server — no cross-compilation needed.
2026-03-10 18:48:44 +00:00
c9cddc80d9 Add OpenSearch search backend with hybrid neural+BM25 support
Extract a SearchBackend trait from the existing RocksDB search code and
add an OpenSearch implementation supporting cross-room search, relevance
ranking, fuzzy matching, English stemming, and optional hybrid
neural+BM25 semantic search using sentence-transformers.

Fix macOS build by gating RLIMIT_NPROC and getrusage to supported
platforms.
2026-03-08 17:41:20 +00:00
16 changed files with 2365 additions and 196 deletions

View File

@@ -332,6 +332,19 @@ jobs:
{"sys_target": "x86_64-v4-linux-gnu", "feat_set": "default"},
]
opensearch-tests:
if: >
!failure() && !cancelled()
&& fromJSON(needs.init.outputs.enable_test)
&& !fromJSON(needs.init.outputs.is_release)
&& !contains(needs.init.outputs.pipeline, '[ci no test]')
name: OpenSearch
needs: [init, lint]
uses: ./.github/workflows/opensearch-tests.yml
with:
checkout: ${{needs.init.outputs.checkout}}
package:
if: >
!failure() && !cancelled()

114
.github/workflows/opensearch-tests.yml vendored Normal file
View File

@@ -0,0 +1,114 @@
name: OpenSearch Integration Tests
on:
workflow_call:
inputs:
checkout:
type: string
default: 'HEAD'
pull_request:
paths:
- 'src/service/rooms/search/**'
- 'src/core/config/mod.rs'
- '.github/workflows/opensearch-tests.yml'
push:
branches: [main]
paths:
- 'src/service/rooms/search/**'
- 'src/core/config/mod.rs'
- '.github/workflows/opensearch-tests.yml'
jobs:
opensearch-tests:
name: OpenSearch Integration
runs-on: ubuntu-latest
services:
opensearch:
image: opensearchproject/opensearch:3.5.0
ports:
- 9200:9200
env:
discovery.type: single-node
DISABLE_SECURITY_PLUGIN: "true"
OPENSEARCH_INITIAL_ADMIN_PASSWORD: "SuperSecret123!"
OPENSEARCH_JAVA_OPTS: "-Xms1g -Xmx2g"
options: >-
--health-cmd "curl -f http://localhost:9200/_cluster/health || exit 1"
--health-interval 10s
--health-timeout 5s
--health-retries 20
--health-start-period 60s
steps:
- uses: actions/checkout@v4
with:
ref: ${{ inputs.checkout || github.sha }}
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@nightly
- name: Cache cargo registry and build
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: opensearch-tests-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
opensearch-tests-${{ runner.os }}-
- name: Wait for OpenSearch
run: |
for i in $(seq 1 60); do
if curl -sf http://localhost:9200/_cluster/health; then
echo ""
echo "OpenSearch is ready"
exit 0
fi
echo "Waiting for OpenSearch... ($i/60)"
sleep 2
done
echo "OpenSearch did not start in time"
exit 1
- name: Configure OpenSearch ML plugin
run: |
curl -sf -X PUT http://localhost:9200/_cluster/settings \
-H 'Content-Type: application/json' \
-d '{
"persistent": {
"plugins.ml_commons.only_run_on_ml_node": "false",
"plugins.ml_commons.native_memory_threshold": "99",
"plugins.ml_commons.allow_registering_model_via_url": "true"
}
}'
- name: Run OpenSearch integration tests (BM25)
env:
OPENSEARCH_URL: http://localhost:9200
run: >
cargo test
-p tuwunel_service
--lib
rooms::search::opensearch::tests
--
--ignored
--test-threads=1
--skip test_neural
--skip test_hybrid
- name: Run OpenSearch neural/hybrid tests
env:
OPENSEARCH_URL: http://localhost:9200
timeout-minutes: 15
run: >
cargo test
-p tuwunel_service
--lib
rooms::search::opensearch::tests
--
--ignored
--test-threads=1
test_neural test_hybrid

37
Dockerfile Normal file
View File

@@ -0,0 +1,37 @@
# Simple production Dockerfile for tuwunel
# Built by buildkitd on the x86_64 server — no cross-compilation needed.
FROM rust:slim-bookworm AS builder
RUN apt-get update && apt-get install -y --no-install-recommends \
libclang-dev clang cmake pkg-config make liburing-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /usr/src/tuwunel
# Copy manifests first for dependency caching
COPY Cargo.toml Cargo.lock rust-toolchain.toml ./
COPY .cargo/ .cargo/
COPY src/ src/
# Strip unnecessary cross-compilation targets from rust-toolchain.toml
# to avoid downloading toolchains we don't need
RUN sed -i '/x86_64-unknown-linux-musl/d; /aarch64-/d; /apple-/d; /rust-src/d; /rust-analyzer/d; /rustfmt/d; /clippy/d' rust-toolchain.toml
RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/usr/local/cargo/git \
cargo build --release --locked \
&& cp target/release/tuwunel /usr/local/bin/tuwunel
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates tini liburing2 \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /usr/local/bin/tuwunel /usr/local/bin/tuwunel
EXPOSE 6167
ENTRYPOINT ["tini", "--"]
CMD ["tuwunel"]

View File

@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use axum::extract::State;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
@@ -92,16 +92,96 @@ async fn category_room_events(
.boxed()
});
let results: Vec<_> = rooms
let accessible_rooms: Vec<OwnedRoomId> = rooms
.filter_map(async |room_id| {
check_room_visible(services, sender_user, &room_id, criteria)
.await
.is_ok()
.then_some(room_id)
})
.collect()
.await;
let (results, total) = if services.search.supports_cross_room_search() {
let room_id_refs: Vec<&RoomId> = accessible_rooms
.iter()
.map(AsRef::as_ref)
.collect();
let (count, pdus) = services
.search
.search_pdus_multi_room(&room_id_refs, sender_user, criteria, limit, next_batch)
.await?;
let pdu_list: Vec<_> = pdus.collect().await;
let total: UInt = count.try_into()?;
// Group results by room for state collection
let room_ids_in_results: Vec<OwnedRoomId> = pdu_list
.iter()
.map(|pdu| pdu.room_id().to_owned())
.collect();
let state: RoomStates = if criteria.include_state.is_some_and(is_true!()) {
let unique_rooms: BTreeSet<OwnedRoomId> =
room_ids_in_results.iter().cloned().collect();
futures::stream::iter(unique_rooms.iter())
.filter_map(async |room_id| {
procure_room_state(services, room_id)
.map_ok(|state| (room_id.clone(), state))
.await
.ok()
})
.collect()
.await
} else {
BTreeMap::new()
};
let results: Vec<SearchResult> = pdu_list
.into_iter()
.stream()
.map(Event::into_format)
.map(|result| SearchResult {
rank: None,
result: Some(result),
context: EventContextResult {
profile_info: BTreeMap::new(), //TODO
events_after: Vec::new(), //TODO
events_before: Vec::new(), //TODO
start: None, //TODO
end: None, //TODO
},
})
.collect()
.await;
let highlights = criteria
.search_term
.split_terminator(|c: char| !c.is_alphanumeric())
.map(str::to_lowercase)
.collect();
let next_batch = (results.len() >= limit)
.then_some(next_batch.saturating_add(results.len()))
.as_ref()
.map(ToString::to_string);
return Ok(ResultRoomEvents {
count: Some(total),
next_batch,
results,
state,
highlights,
groups: BTreeMap::new(), // TODO
});
} else {
let results: Vec<_> = accessible_rooms
.iter()
.stream()
.filter_map(async |room_id| {
let query = RoomQuery {
room_id: &room_id,
room_id,
user_id: Some(sender_user),
criteria,
skip: next_batch,
@@ -124,6 +204,9 @@ async fn category_room_events(
.fold(0, |a: usize, (_, count, _)| a.saturating_add(*count))
.try_into()?;
(results, total)
};
let state: RoomStates = results
.iter()
.stream()

View File

@@ -116,6 +116,22 @@ pub fn check(config: &Config) -> Result {
});
}
if config.search_backend == super::SearchBackendConfig::OpenSearch
&& config.search_opensearch_url.is_none()
{
return Err!(Config(
"search_opensearch_url",
"OpenSearch URL must be set when search_backend is \"opensearch\""
));
}
if config.search_opensearch_hybrid && config.search_opensearch_model_id.is_none() {
return Err!(Config(
"search_opensearch_model_id",
"Model ID must be set when search_opensearch_hybrid is enabled"
));
}
// rocksdb does not allow max_log_files to be 0
if config.rocksdb_max_log_files == 0 {
return Err!(Config(

View File

@@ -1100,6 +1100,85 @@ pub struct Config {
#[serde(default)]
pub auto_deactivate_banned_room_attempts: bool,
/// Search backend to use for full-text message search.
///
/// Available options: "rocksdb" (default) or "opensearch".
///
/// default: "rocksdb"
#[serde(default)]
pub search_backend: SearchBackendConfig,
/// URL of the OpenSearch instance. Required when search_backend is
/// "opensearch".
///
/// example: "http://localhost:9200"
pub search_opensearch_url: Option<Url>,
/// Name of the OpenSearch index for message search.
///
/// default: "tuwunel_messages"
#[serde(default = "default_search_opensearch_index")]
pub search_opensearch_index: String,
/// Authentication for OpenSearch in "user:pass" format.
///
/// display: sensitive
pub search_opensearch_auth: Option<String>,
/// Maximum number of documents to batch before flushing to OpenSearch.
///
/// default: 100
#[serde(default = "default_search_opensearch_batch_size")]
pub search_opensearch_batch_size: usize,
/// Maximum time in milliseconds to wait before flushing a partial batch
/// to OpenSearch.
///
/// default: 1000
#[serde(default = "default_search_opensearch_flush_interval_ms")]
pub search_opensearch_flush_interval_ms: u64,
/// Enable hybrid neural+BM25 search in OpenSearch. Requires an ML model
/// deployed in OpenSearch and an ingest pipeline that populates an
/// "embedding" field.
///
/// When enabled, tuwunel will:
/// - Create the index with a knn_vector "embedding" field
/// - Attach the ingest pipeline (search_opensearch_pipeline) to the index
/// - Use hybrid queries combining BM25 + neural kNN scoring
///
/// For a complete reference on configuring OpenSearch's ML plugin, model
/// registration, and ingest pipeline setup, see the test helpers in
/// `src/service/rooms/search/opensearch.rs` (the `ensure_neural_model`,
/// `ensure_ingest_pipeline`, etc. functions in the `tests` module).
///
/// See also: https://opensearch.org/docs/latest/search-plugins/neural-search/
///
/// default: false
#[serde(default)]
pub search_opensearch_hybrid: bool,
/// The model ID registered in OpenSearch for neural search. Required when
/// search_opensearch_hybrid is enabled.
///
/// example: "aKV84osBBHNT0StI3MBr"
pub search_opensearch_model_id: Option<String>,
/// Embedding dimension for the neural search model. Must match the output
/// dimension of the deployed model. Common values: 384
/// (all-MiniLM-L6-v2), 768 (msmarco-distilbert-base-tas-b).
///
/// default: 384
#[serde(default = "default_search_opensearch_embedding_dim")]
pub search_opensearch_embedding_dim: usize,
/// Name of the ingest pipeline that generates embeddings for the
/// "embedding" field. This pipeline must already exist in OpenSearch.
///
/// default: "tuwunel_embedding_pipeline"
#[serde(default = "default_search_opensearch_pipeline")]
pub search_opensearch_pipeline: String,
/// RocksDB log level. This is not the same as tuwunel's log level. This
/// is the log level for the RocksDB engine/library which show up in your
/// database folder/path as `LOG` files. tuwunel will log RocksDB errors
@@ -2304,6 +2383,14 @@ pub struct Config {
catchall: BTreeMap<String, IgnoredAny>,
}
#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum SearchBackendConfig {
#[default]
RocksDb,
OpenSearch,
}
#[derive(Clone, Debug, Deserialize, Default)]
#[config_example_generator(filename = "tuwunel-example.toml", section = "global.tls")]
pub struct TlsConfig {
@@ -3181,6 +3268,16 @@ impl Config {
fn true_fn() -> bool { true }
fn default_search_opensearch_index() -> String { "tuwunel_messages".to_owned() }
fn default_search_opensearch_batch_size() -> usize { 100 }
fn default_search_opensearch_flush_interval_ms() -> u64 { 1000 }
fn default_search_opensearch_embedding_dim() -> usize { 384 }
fn default_search_opensearch_pipeline() -> String { "tuwunel_embedding_pipeline".to_owned() }
#[cfg(test)]
fn default_server_name() -> OwnedServerName { ruma::owned_server_name!("localhost") }

View File

@@ -28,7 +28,12 @@ pub fn maximize_fd_limit() -> Result {
#[cfg(not(unix))]
pub fn maximize_fd_limit() -> Result { Ok(()) }
#[cfg(unix)]
#[cfg(any(
linux_android,
netbsdlike,
target_os = "aix",
target_os = "freebsd",
))]
/// Some distributions ship with very low defaults for thread counts; similar to
/// low default file descriptor limits. But unlike fd's, thread limit is rarely
/// reached, though on large systems (32+ cores) shipping with defaults of
@@ -47,7 +52,12 @@ pub fn maximize_thread_limit() -> Result {
Ok(())
}
#[cfg(not(unix))]
#[cfg(not(any(
linux_android,
netbsdlike,
target_os = "aix",
target_os = "freebsd",
)))]
pub fn maximize_thread_limit() -> Result { Ok(()) }
#[cfg(unix)]

View File

@@ -1,6 +1,5 @@
use nix::sys::resource::Usage;
#[cfg(unix)]
use nix::sys::resource::{UsageWho, getrusage};
use nix::sys::resource::{Usage, UsageWho, getrusage};
use crate::Result;
@@ -8,7 +7,7 @@ use crate::Result;
pub fn usage() -> Result<Usage> { getrusage(UsageWho::RUSAGE_SELF).map_err(Into::into) }
#[cfg(not(unix))]
pub fn usage() -> Result<Usage> { Ok(Usage::default()) }
pub fn usage() -> Result<()> { Ok(()) }
#[cfg(any(
target_os = "linux",
@@ -17,9 +16,15 @@ pub fn usage() -> Result<Usage> { Ok(Usage::default()) }
))]
pub fn thread_usage() -> Result<Usage> { getrusage(UsageWho::RUSAGE_THREAD).map_err(Into::into) }
#[cfg(not(any(
#[cfg(all(
unix,
not(any(
target_os = "linux",
target_os = "freebsd",
target_os = "openbsd"
)))]
pub fn thread_usage() -> Result<Usage> { Ok(Usage::default()) }
))
))]
pub fn thread_usage() -> Result<Usage> { getrusage(UsageWho::RUSAGE_SELF).map_err(Into::into) }
#[cfg(not(unix))]
pub fn thread_usage() -> Result<()> { Ok(()) }

View File

@@ -0,0 +1,50 @@
use async_trait::async_trait;
use ruma::RoomId;
use tuwunel_core::Result;
use crate::rooms::{short::ShortRoomId, timeline::RawPduId};
/// A search hit returned by a backend.
pub(super) struct SearchHit {
pub(super) pdu_id: RawPduId,
#[expect(dead_code)]
pub(super) score: Option<f64>,
}
/// Query parameters passed to the backend for searching.
pub(super) struct BackendQuery<'a> {
pub(super) search_term: &'a str,
pub(super) room_ids: Option<&'a [&'a RoomId]>,
}
/// Trait abstracting the search index backend.
#[async_trait]
pub(super) trait SearchBackend: Send + Sync {
async fn index_pdu(
&self,
shortroomid: ShortRoomId,
pdu_id: &RawPduId,
room_id: &RoomId,
message_body: &str,
) -> Result;
async fn deindex_pdu(
&self,
shortroomid: ShortRoomId,
pdu_id: &RawPduId,
room_id: &RoomId,
message_body: &str,
) -> Result;
async fn search_pdu_ids(
&self,
query: &BackendQuery<'_>,
shortroomid: Option<ShortRoomId>,
limit: usize,
skip: usize,
) -> Result<(usize, Vec<SearchHit>)>;
async fn delete_room(&self, room_id: &RoomId, shortroomid: ShortRoomId) -> Result;
fn supports_cross_room_search(&self) -> bool { false }
}

View File

@@ -1,34 +1,33 @@
mod backend;
mod opensearch;
mod rocksdb;
use std::sync::Arc;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use ruma::{RoomId, UserId, api::client::search::search_events::v3::Criteria};
use tokio::sync::mpsc;
use tuwunel_core::{
PduCount, Result,
arrayvec::ArrayVec,
implement,
Result, implement,
matrix::event::{Event, Matches},
trace,
utils::{
ArrayVecExt, IterStream, ReadyExt, set,
stream::{TryIgnore, WidebandExt},
},
utils::{IterStream, ReadyExt, stream::WidebandExt},
};
use tuwunel_database::{Interfix, Map, keyval::Val};
use crate::rooms::{
short::ShortRoomId,
timeline::{PduId, RawPduId},
use self::{
backend::{BackendQuery, SearchBackend, SearchHit},
opensearch::OpenSearchBackend,
rocksdb::RocksDbBackend,
};
use crate::rooms::{short::ShortRoomId, timeline::RawPduId};
pub struct Service {
db: Data,
backend: Box<dyn SearchBackend>,
bulk_rx: Option<tokio::sync::Mutex<mpsc::Receiver<opensearch::BulkAction>>>,
opensearch: Option<Arc<OpenSearchBackend>>,
services: Arc<crate::services::OnceServices>,
}
struct Data {
tokenids: Arc<Map>,
}
#[derive(Clone, Debug)]
pub struct RoomQuery<'a> {
pub room_id: &'a RoomId,
@@ -38,52 +37,133 @@ pub struct RoomQuery<'a> {
pub skip: usize,
}
type TokenId = ArrayVec<u8, TOKEN_ID_MAX_LEN>;
const TOKEN_ID_MAX_LEN: usize =
size_of::<ShortRoomId>() + WORD_MAX_LEN + 1 + size_of::<RawPduId>();
const WORD_MAX_LEN: usize = 50;
#[async_trait]
impl crate::Service for Service {
fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
let config = &args.server.config;
let (backend, bulk_rx, opensearch): (Box<dyn SearchBackend>, _, _) = match config
.search_backend
{
| tuwunel_core::config::SearchBackendConfig::RocksDb => {
let b = RocksDbBackend::new(args.db["tokenids"].clone());
(Box::new(b), None, None)
},
| tuwunel_core::config::SearchBackendConfig::OpenSearch => {
let (b, rx) = OpenSearchBackend::new(config)?;
let b = Arc::new(b);
let backend: Box<dyn SearchBackend> = Box::new(OpenSearchWorkerRef(b.clone()));
(backend, Some(tokio::sync::Mutex::new(rx)), Some(b))
},
};
Ok(Arc::new(Self {
db: Data { tokenids: args.db["tokenids"].clone() },
backend,
bulk_rx,
opensearch,
services: args.services.clone(),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
async fn worker(self: Arc<Self>) -> Result {
if let (Some(os), Some(rx_mutex)) = (self.opensearch.as_ref(), self.bulk_rx.as_ref()) {
os.ensure_index().await?;
let services = &**self.services;
let config = &services.server.config;
let batch_size = config.search_opensearch_batch_size;
let flush_interval =
std::time::Duration::from_millis(config.search_opensearch_flush_interval_ms);
let mut rx = rx_mutex.lock().await;
os.process_bulk(&mut rx, batch_size, flush_interval)
.await;
}
Ok(())
}
}
/// Wrapper so we can put the `Arc<OpenSearchBackend>` behind the trait.
struct OpenSearchWorkerRef(Arc<OpenSearchBackend>);
#[async_trait]
impl SearchBackend for OpenSearchWorkerRef {
async fn index_pdu(
&self,
shortroomid: ShortRoomId,
pdu_id: &RawPduId,
room_id: &RoomId,
message_body: &str,
) -> Result {
self.0
.index_pdu(shortroomid, pdu_id, room_id, message_body)
.await
}
async fn deindex_pdu(
&self,
shortroomid: ShortRoomId,
pdu_id: &RawPduId,
room_id: &RoomId,
message_body: &str,
) -> Result {
self.0
.deindex_pdu(shortroomid, pdu_id, room_id, message_body)
.await
}
async fn search_pdu_ids(
&self,
query: &BackendQuery<'_>,
shortroomid: Option<ShortRoomId>,
limit: usize,
skip: usize,
) -> Result<(usize, Vec<SearchHit>)> {
self.0
.search_pdu_ids(query, shortroomid, limit, skip)
.await
}
async fn delete_room(&self, room_id: &RoomId, shortroomid: ShortRoomId) -> Result {
self.0.delete_room(room_id, shortroomid).await
}
fn supports_cross_room_search(&self) -> bool { self.0.supports_cross_room_search() }
}
#[implement(Service)]
pub fn index_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_body: &str) {
let batch = tokenize(message_body)
.map(|word| {
let mut key = shortroomid.to_be_bytes().to_vec();
key.extend_from_slice(word.as_bytes());
key.push(0xFF);
key.extend_from_slice(pdu_id.as_ref()); // TODO: currently we save the room id a second time here
key
})
.collect::<Vec<_>>();
self.db
.tokenids
.insert_batch(batch.iter().map(|k| (k.as_slice(), &[])));
pub async fn index_pdu(
&self,
shortroomid: ShortRoomId,
pdu_id: &RawPduId,
room_id: &RoomId,
message_body: &str,
) {
if let Err(e) = self
.backend
.index_pdu(shortroomid, pdu_id, room_id, message_body)
.await
{
tuwunel_core::warn!("Failed to index PDU: {e}");
}
}
#[implement(Service)]
pub fn deindex_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_body: &str) {
let batch = tokenize(message_body).map(|word| {
let mut key = shortroomid.to_be_bytes().to_vec();
key.extend_from_slice(word.as_bytes());
key.push(0xFF);
key.extend_from_slice(pdu_id.as_ref()); // TODO: currently we save the room id a second time here
key
});
for token in batch {
self.db.tokenids.remove(&token);
pub async fn deindex_pdu(
&self,
shortroomid: ShortRoomId,
pdu_id: &RawPduId,
room_id: &RoomId,
message_body: &str,
) {
if let Err(e) = self
.backend
.deindex_pdu(shortroomid, pdu_id, room_id, message_body)
.await
{
tuwunel_core::warn!("Failed to deindex PDU: {e}");
}
}
@@ -92,12 +172,31 @@ pub async fn search_pdus<'a>(
&'a self,
query: &'a RoomQuery<'a>,
) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + '_)> {
let pdu_ids: Vec<_> = self.search_pdu_ids(query).await?.collect().await;
let shortroomid = self
.services
.short
.get_shortroomid(query.room_id)
.await?;
let backend_query = BackendQuery {
search_term: &query.criteria.search_term,
room_ids: None,
};
let (count, hits) = self
.backend
.search_pdu_ids(
&backend_query,
Some(shortroomid),
query.limit.saturating_add(query.skip),
0,
)
.await?;
let filter = &query.criteria.filter;
let count = pdu_ids.len();
let pdus = pdu_ids
let pdus = hits
.into_iter()
.map(|h| h.pdu_id)
.stream()
.wide_filter_map(async |result_pdu_id: RawPduId| {
self.services
@@ -121,123 +220,64 @@ pub async fn search_pdus<'a>(
Ok((count, pdus))
}
// result is modeled as a stream such that callers don't have to be refactored
// though an additional async/wrap still exists for now
#[implement(Service)]
pub async fn search_pdu_ids(
&self,
query: &RoomQuery<'_>,
) -> Result<impl Stream<Item = RawPduId> + Send + '_ + use<'_>> {
let shortroomid = self
.services
.short
.get_shortroomid(query.room_id)
pub async fn search_pdus_multi_room<'a>(
&'a self,
room_ids: &'a [&'a RoomId],
sender_user: &'a UserId,
criteria: &'a Criteria,
limit: usize,
skip: usize,
) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + '_)> {
let backend_query = BackendQuery {
search_term: &criteria.search_term,
room_ids: Some(room_ids),
};
let (count, hits) = self
.backend
.search_pdu_ids(&backend_query, None, limit.saturating_add(skip), 0)
.await?;
let pdu_ids = self
.search_pdu_ids_query_room(query, shortroomid)
.await;
let iters = pdu_ids.into_iter().map(IntoIterator::into_iter);
Ok(set::intersection(iters).stream())
}
#[implement(Service)]
async fn search_pdu_ids_query_room(
&self,
query: &RoomQuery<'_>,
shortroomid: ShortRoomId,
) -> Vec<Vec<RawPduId>> {
tokenize(&query.criteria.search_term)
let filter = &criteria.filter;
let pdus = hits
.into_iter()
.map(|h| h.pdu_id)
.stream()
.wide_then(async |word| {
self.search_pdu_ids_query_words(shortroomid, &word)
.collect::<Vec<_>>()
.wide_filter_map(async |result_pdu_id: RawPduId| {
self.services
.timeline
.get_pdu_from_id(&result_pdu_id)
.await
.ok()
})
.collect::<Vec<_>>()
.ready_filter(|pdu| !pdu.is_redacted())
.ready_filter(move |pdu| filter.matches(pdu))
.wide_filter_map(async move |pdu| {
self.services
.state_accessor
.user_can_see_event(sender_user, pdu.room_id(), pdu.event_id())
.await
}
/// Iterate over PduId's containing a word
#[implement(Service)]
fn search_pdu_ids_query_words<'a>(
&'a self,
shortroomid: ShortRoomId,
word: &'a str,
) -> impl Stream<Item = RawPduId> + Send + '_ {
self.search_pdu_ids_query_word(shortroomid, word)
.map(move |key| -> RawPduId {
let key = &key[prefix_len(word)..];
key.into()
.then_some(pdu)
})
.skip(skip)
.take(limit);
Ok((count, pdus))
}
/// Iterate over raw database results for a word
#[implement(Service)]
fn search_pdu_ids_query_word(
&self,
shortroomid: ShortRoomId,
word: &str,
) -> impl Stream<Item = Val<'_>> + Send + '_ + use<'_> {
// rustc says const'ing this not yet stable
let end_id: RawPduId = PduId { shortroomid, count: PduCount::max() }.into();
// Newest pdus first
let end = make_tokenid(shortroomid, word, &end_id);
let prefix = make_prefix(shortroomid, word);
self.db
.tokenids
.rev_raw_keys_from(&end)
.ignore_err()
.ready_take_while(move |key| key.starts_with(&prefix))
}
pub fn supports_cross_room_search(&self) -> bool { self.backend.supports_cross_room_search() }
#[implement(Service)]
pub async fn delete_all_search_tokenids_for_room(&self, room_id: &RoomId) -> Result {
let prefix = (room_id, Interfix);
let shortroomid = self
.services
.short
.get_shortroomid(room_id)
.await?;
self.db
.tokenids
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| {
trace!("Removing key: {key:?}");
self.db.tokenids.remove(key);
})
.await;
Ok(())
}
/// Splits a string into tokens used as keys in the search inverted index
///
/// This may be used to tokenize both message bodies (for indexing) or search
/// queries (for querying).
fn tokenize(body: &str) -> impl Iterator<Item = String> + Send + '_ {
body.split_terminator(|c: char| !c.is_alphanumeric())
.filter(|s| !s.is_empty())
.filter(|word| word.len() <= WORD_MAX_LEN)
.map(str::to_lowercase)
}
fn make_tokenid(shortroomid: ShortRoomId, word: &str, pdu_id: &RawPduId) -> TokenId {
let mut key = make_prefix(shortroomid, word);
key.extend_from_slice(pdu_id.as_ref());
key
}
fn make_prefix(shortroomid: ShortRoomId, word: &str) -> TokenId {
let mut key = TokenId::new();
key.extend_from_slice(&shortroomid.to_be_bytes());
key.extend_from_slice(word.as_bytes());
key.push(tuwunel_database::SEP);
key
}
fn prefix_len(word: &str) -> usize {
size_of::<ShortRoomId>()
.saturating_add(word.len())
.saturating_add(1)
self.backend
.delete_room(room_id, shortroomid)
.await
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,193 @@
use std::sync::Arc;
use async_trait::async_trait;
use futures::StreamExt;
use ruma::RoomId;
use tuwunel_core::{
PduCount, Result,
arrayvec::ArrayVec,
utils::{ArrayVecExt, ReadyExt, set, stream::TryIgnore},
};
use tuwunel_database::{Interfix, Map, keyval::Val};
use super::backend::{BackendQuery, SearchBackend, SearchHit};
use crate::rooms::{
short::ShortRoomId,
timeline::{PduId, RawPduId},
};
pub(super) struct RocksDbBackend {
tokenids: Arc<Map>,
}
type TokenId = ArrayVec<u8, TOKEN_ID_MAX_LEN>;
const TOKEN_ID_MAX_LEN: usize =
size_of::<ShortRoomId>() + WORD_MAX_LEN + 1 + size_of::<RawPduId>();
const WORD_MAX_LEN: usize = 50;
impl RocksDbBackend {
pub(super) fn new(tokenids: Arc<Map>) -> Self { Self { tokenids } }
}
#[async_trait]
impl SearchBackend for RocksDbBackend {
async fn index_pdu(
&self,
shortroomid: ShortRoomId,
pdu_id: &RawPduId,
_room_id: &RoomId,
message_body: &str,
) -> Result {
let batch = tokenize(message_body)
.map(|word| {
let mut key = shortroomid.to_be_bytes().to_vec();
key.extend_from_slice(word.as_bytes());
key.push(0xFF);
key.extend_from_slice(pdu_id.as_ref());
key
})
.collect::<Vec<_>>();
self.tokenids
.insert_batch(batch.iter().map(|k| (k.as_slice(), &[])));
Ok(())
}
async fn deindex_pdu(
&self,
shortroomid: ShortRoomId,
pdu_id: &RawPduId,
_room_id: &RoomId,
message_body: &str,
) -> Result {
let batch = tokenize(message_body).map(|word| {
let mut key = shortroomid.to_be_bytes().to_vec();
key.extend_from_slice(word.as_bytes());
key.push(0xFF);
key.extend_from_slice(pdu_id.as_ref());
key
});
for token in batch {
self.tokenids.remove(&token);
}
Ok(())
}
async fn search_pdu_ids(
&self,
query: &BackendQuery<'_>,
shortroomid: Option<ShortRoomId>,
_limit: usize,
_skip: usize,
) -> Result<(usize, Vec<SearchHit>)> {
let shortroomid = shortroomid.ok_or_else(|| {
tuwunel_core::err!(Request(InvalidParam(
"RocksDB backend requires a shortroomid for search"
)))
})?;
let pdu_ids =
search_pdu_ids_query_room(&self.tokenids, shortroomid, query.search_term).await;
let iters = pdu_ids.into_iter().map(IntoIterator::into_iter);
let results: Vec<_> = set::intersection(iters)
.map(|pdu_id| SearchHit { pdu_id, score: None })
.collect();
let count = results.len();
Ok((count, results))
}
async fn delete_room(&self, room_id: &RoomId, _shortroomid: ShortRoomId) -> Result {
let prefix = (room_id, Interfix);
self.tokenids
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| {
self.tokenids.remove(key);
})
.await;
Ok(())
}
}
async fn search_pdu_ids_query_room(
tokenids: &Arc<Map>,
shortroomid: ShortRoomId,
search_term: &str,
) -> Vec<Vec<RawPduId>> {
tokenize(search_term)
.collect::<Vec<_>>()
.into_iter()
.map(|word| {
let tokenids = tokenids.clone();
async move {
search_pdu_ids_query_words(&tokenids, shortroomid, &word)
.collect::<Vec<_>>()
.await
}
})
.collect::<futures::stream::FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await
}
fn search_pdu_ids_query_words<'a>(
tokenids: &'a Arc<Map>,
shortroomid: ShortRoomId,
word: &'a str,
) -> impl futures::Stream<Item = RawPduId> + Send + 'a {
search_pdu_ids_query_word(tokenids, shortroomid, word).map(move |key| -> RawPduId {
let key = &key[prefix_len(word)..];
key.into()
})
}
fn search_pdu_ids_query_word<'a>(
tokenids: &'a Arc<Map>,
shortroomid: ShortRoomId,
word: &'a str,
) -> impl futures::Stream<Item = Val<'a>> + Send + 'a {
let end_id: RawPduId = PduId { shortroomid, count: PduCount::max() }.into();
let end = make_tokenid(shortroomid, word, &end_id);
let prefix = make_prefix(shortroomid, word);
tokenids
.rev_raw_keys_from(&end)
.ignore_err()
.ready_take_while(move |key| key.starts_with(&prefix))
}
/// Splits a string into tokens used as keys in the search inverted index
pub(super) fn tokenize(body: &str) -> impl Iterator<Item = String> + Send + '_ {
body.split_terminator(|c: char| !c.is_alphanumeric())
.filter(|s| !s.is_empty())
.filter(|word| word.len() <= WORD_MAX_LEN)
.map(str::to_lowercase)
}
fn make_tokenid(shortroomid: ShortRoomId, word: &str, pdu_id: &RawPduId) -> TokenId {
let mut key = make_prefix(shortroomid, word);
key.extend_from_slice(pdu_id.as_ref());
key
}
fn make_prefix(shortroomid: ShortRoomId, word: &str) -> TokenId {
let mut key = TokenId::new();
key.extend_from_slice(&shortroomid.to_be_bytes());
key.extend_from_slice(word.as_bytes());
key.push(tuwunel_database::SEP);
key
}
fn prefix_len(word: &str) -> usize {
size_of::<ShortRoomId>()
.saturating_add(word.len())
.saturating_add(1)
}

View File

@@ -281,7 +281,8 @@ async fn append_pdu_effects(
if let Some(body) = content.body {
self.services
.search
.index_pdu(shortroomid, &pdu_id, &body);
.index_pdu(shortroomid, &pdu_id, pdu.room_id(), &body)
.await;
if self
.services

View File

@@ -240,7 +240,8 @@ pub async fn backfill_pdu(
if let Some(body) = content.body {
self.services
.search
.index_pdu(shortroomid, &pdu_id, &body);
.index_pdu(shortroomid, &pdu_id, pdu.room_id(), &body)
.await;
}
}
drop(mutex_lock);

View File

@@ -40,14 +40,15 @@ pub async fn redact_pdu<Pdu: Event + Send + Sync>(
.get("body")
.and_then(|body| body.as_str());
let room_id = RoomId::parse(pdu["room_id"].as_str().unwrap()).unwrap();
if let Some(body) = body {
self.services
.search
.deindex_pdu(shortroomid, &pdu_id, body);
.deindex_pdu(shortroomid, &pdu_id, room_id, body)
.await;
}
let room_id = RoomId::parse(pdu["room_id"].as_str().unwrap()).unwrap();
let room_version_id = self
.services
.state

View File

@@ -907,6 +907,72 @@
#
#auto_deactivate_banned_room_attempts = false
# Search backend to use for full-text message search.
#
# Available options: "rocksdb" (default) or "opensearch".
#
#search_backend = "rocksdb"
# URL of the OpenSearch instance. Required when search_backend is
# "opensearch".
#
# example: "http://localhost:9200"
#
#search_opensearch_url =
# Name of the OpenSearch index for message search.
#
#search_opensearch_index = "tuwunel_messages"
# Authentication for OpenSearch in "user:pass" format.
#
#search_opensearch_auth =
# Maximum number of documents to batch before flushing to OpenSearch.
#
#search_opensearch_batch_size = 100
# Maximum time in milliseconds to wait before flushing a partial batch
# to OpenSearch.
#
#search_opensearch_flush_interval_ms = 1000
# Enable hybrid neural+BM25 search in OpenSearch. Requires an ML model
# deployed in OpenSearch and an ingest pipeline that populates an
# "embedding" field.
#
# When enabled, tuwunel will:
# - Create the index with a knn_vector "embedding" field
# - Attach the ingest pipeline (search_opensearch_pipeline) to the index
# - Use hybrid queries combining BM25 + neural kNN scoring
#
# For a complete reference on configuring OpenSearch's ML plugin, model
# registration, and ingest pipeline setup, see the test helpers in
# `src/service/rooms/search/opensearch.rs` (the `ensure_neural_model`,
# `ensure_ingest_pipeline`, etc. functions in the `tests` module).
#
# See also: https://opensearch.org/docs/latest/search-plugins/neural-search/
#
#search_opensearch_hybrid = false
# The model ID registered in OpenSearch for neural search. Required when
# search_opensearch_hybrid is enabled.
#
# example: "aKV84osBBHNT0StI3MBr"
#
#search_opensearch_model_id =
# Embedding dimension for the neural search model. Must match the output
# dimension of the deployed model. Common values: 384
# (all-MiniLM-L6-v2), 768 (msmarco-distilbert-base-tas-b).
#
#search_opensearch_embedding_dim = 384
# Name of the ingest pipeline that generates embeddings for the
# "embedding" field. This pipeline must already exist in OpenSearch.
#
#search_opensearch_pipeline = "tuwunel_embedding_pipeline"
# RocksDB log level. This is not the same as tuwunel's log level. This
# is the log level for the RocksDB engine/library which show up in your
# database folder/path as `LOG` files. tuwunel will log RocksDB errors