Compare commits
2 Commits
main
...
feat/searc
| Author | SHA1 | Date | |
|---|---|---|---|
|
cf21ffc452
|
|||
|
c9cddc80d9
|
13
.github/workflows/main.yml
vendored
13
.github/workflows/main.yml
vendored
@@ -332,6 +332,19 @@ jobs:
|
||||
{"sys_target": "x86_64-v4-linux-gnu", "feat_set": "default"},
|
||||
]
|
||||
|
||||
opensearch-tests:
|
||||
if: >
|
||||
!failure() && !cancelled()
|
||||
&& fromJSON(needs.init.outputs.enable_test)
|
||||
&& !fromJSON(needs.init.outputs.is_release)
|
||||
&& !contains(needs.init.outputs.pipeline, '[ci no test]')
|
||||
|
||||
name: OpenSearch
|
||||
needs: [init, lint]
|
||||
uses: ./.github/workflows/opensearch-tests.yml
|
||||
with:
|
||||
checkout: ${{needs.init.outputs.checkout}}
|
||||
|
||||
package:
|
||||
if: >
|
||||
!failure() && !cancelled()
|
||||
|
||||
114
.github/workflows/opensearch-tests.yml
vendored
Normal file
114
.github/workflows/opensearch-tests.yml
vendored
Normal file
@@ -0,0 +1,114 @@
|
||||
name: OpenSearch Integration Tests
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
checkout:
|
||||
type: string
|
||||
default: 'HEAD'
|
||||
pull_request:
|
||||
paths:
|
||||
- 'src/service/rooms/search/**'
|
||||
- 'src/core/config/mod.rs'
|
||||
- '.github/workflows/opensearch-tests.yml'
|
||||
push:
|
||||
branches: [main]
|
||||
paths:
|
||||
- 'src/service/rooms/search/**'
|
||||
- 'src/core/config/mod.rs'
|
||||
- '.github/workflows/opensearch-tests.yml'
|
||||
|
||||
jobs:
|
||||
opensearch-tests:
|
||||
name: OpenSearch Integration
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
services:
|
||||
opensearch:
|
||||
image: opensearchproject/opensearch:3.5.0
|
||||
ports:
|
||||
- 9200:9200
|
||||
env:
|
||||
discovery.type: single-node
|
||||
DISABLE_SECURITY_PLUGIN: "true"
|
||||
OPENSEARCH_INITIAL_ADMIN_PASSWORD: "SuperSecret123!"
|
||||
OPENSEARCH_JAVA_OPTS: "-Xms1g -Xmx2g"
|
||||
options: >-
|
||||
--health-cmd "curl -f http://localhost:9200/_cluster/health || exit 1"
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 20
|
||||
--health-start-period 60s
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
ref: ${{ inputs.checkout || github.sha }}
|
||||
|
||||
- name: Install Rust toolchain
|
||||
uses: dtolnay/rust-toolchain@nightly
|
||||
|
||||
- name: Cache cargo registry and build
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
~/.cargo/git
|
||||
target
|
||||
key: opensearch-tests-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
|
||||
restore-keys: |
|
||||
opensearch-tests-${{ runner.os }}-
|
||||
|
||||
- name: Wait for OpenSearch
|
||||
run: |
|
||||
for i in $(seq 1 60); do
|
||||
if curl -sf http://localhost:9200/_cluster/health; then
|
||||
echo ""
|
||||
echo "OpenSearch is ready"
|
||||
exit 0
|
||||
fi
|
||||
echo "Waiting for OpenSearch... ($i/60)"
|
||||
sleep 2
|
||||
done
|
||||
echo "OpenSearch did not start in time"
|
||||
exit 1
|
||||
|
||||
- name: Configure OpenSearch ML plugin
|
||||
run: |
|
||||
curl -sf -X PUT http://localhost:9200/_cluster/settings \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{
|
||||
"persistent": {
|
||||
"plugins.ml_commons.only_run_on_ml_node": "false",
|
||||
"plugins.ml_commons.native_memory_threshold": "99",
|
||||
"plugins.ml_commons.allow_registering_model_via_url": "true"
|
||||
}
|
||||
}'
|
||||
|
||||
- name: Run OpenSearch integration tests (BM25)
|
||||
env:
|
||||
OPENSEARCH_URL: http://localhost:9200
|
||||
run: >
|
||||
cargo test
|
||||
-p tuwunel_service
|
||||
--lib
|
||||
rooms::search::opensearch::tests
|
||||
--
|
||||
--ignored
|
||||
--test-threads=1
|
||||
--skip test_neural
|
||||
--skip test_hybrid
|
||||
|
||||
- name: Run OpenSearch neural/hybrid tests
|
||||
env:
|
||||
OPENSEARCH_URL: http://localhost:9200
|
||||
timeout-minutes: 15
|
||||
run: >
|
||||
cargo test
|
||||
-p tuwunel_service
|
||||
--lib
|
||||
rooms::search::opensearch::tests
|
||||
--
|
||||
--ignored
|
||||
--test-threads=1
|
||||
test_neural test_hybrid
|
||||
37
Dockerfile
Normal file
37
Dockerfile
Normal file
@@ -0,0 +1,37 @@
|
||||
# Simple production Dockerfile for tuwunel
|
||||
# Built by buildkitd on the x86_64 server — no cross-compilation needed.
|
||||
|
||||
FROM rust:slim-bookworm AS builder
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
libclang-dev clang cmake pkg-config make liburing-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /usr/src/tuwunel
|
||||
|
||||
# Copy manifests first for dependency caching
|
||||
COPY Cargo.toml Cargo.lock rust-toolchain.toml ./
|
||||
COPY .cargo/ .cargo/
|
||||
COPY src/ src/
|
||||
|
||||
# Strip unnecessary cross-compilation targets from rust-toolchain.toml
|
||||
# to avoid downloading toolchains we don't need
|
||||
RUN sed -i '/x86_64-unknown-linux-musl/d; /aarch64-/d; /apple-/d; /rust-src/d; /rust-analyzer/d; /rustfmt/d; /clippy/d' rust-toolchain.toml
|
||||
|
||||
RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
||||
--mount=type=cache,target=/usr/local/cargo/git \
|
||||
cargo build --release --locked \
|
||||
&& cp target/release/tuwunel /usr/local/bin/tuwunel
|
||||
|
||||
FROM debian:bookworm-slim
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
ca-certificates tini liburing2 \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY --from=builder /usr/local/bin/tuwunel /usr/local/bin/tuwunel
|
||||
|
||||
EXPOSE 6167
|
||||
|
||||
ENTRYPOINT ["tini", "--"]
|
||||
CMD ["tuwunel"]
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use axum::extract::State;
|
||||
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
|
||||
@@ -92,37 +92,120 @@ async fn category_room_events(
|
||||
.boxed()
|
||||
});
|
||||
|
||||
let results: Vec<_> = rooms
|
||||
let accessible_rooms: Vec<OwnedRoomId> = rooms
|
||||
.filter_map(async |room_id| {
|
||||
check_room_visible(services, sender_user, &room_id, criteria)
|
||||
.await
|
||||
.is_ok()
|
||||
.then_some(room_id)
|
||||
})
|
||||
.filter_map(async |room_id| {
|
||||
let query = RoomQuery {
|
||||
room_id: &room_id,
|
||||
user_id: Some(sender_user),
|
||||
criteria,
|
||||
skip: next_batch,
|
||||
limit,
|
||||
};
|
||||
|
||||
let (count, results) = services.search.search_pdus(&query).await.ok()?;
|
||||
|
||||
results
|
||||
.collect::<Vec<_>>()
|
||||
.map(|results| (room_id.clone(), count, results))
|
||||
.map(Some)
|
||||
.await
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let total: UInt = results
|
||||
.iter()
|
||||
.fold(0, |a: usize, (_, count, _)| a.saturating_add(*count))
|
||||
.try_into()?;
|
||||
let (results, total) = if services.search.supports_cross_room_search() {
|
||||
let room_id_refs: Vec<&RoomId> = accessible_rooms
|
||||
.iter()
|
||||
.map(AsRef::as_ref)
|
||||
.collect();
|
||||
|
||||
let (count, pdus) = services
|
||||
.search
|
||||
.search_pdus_multi_room(&room_id_refs, sender_user, criteria, limit, next_batch)
|
||||
.await?;
|
||||
|
||||
let pdu_list: Vec<_> = pdus.collect().await;
|
||||
let total: UInt = count.try_into()?;
|
||||
|
||||
// Group results by room for state collection
|
||||
let room_ids_in_results: Vec<OwnedRoomId> = pdu_list
|
||||
.iter()
|
||||
.map(|pdu| pdu.room_id().to_owned())
|
||||
.collect();
|
||||
|
||||
let state: RoomStates = if criteria.include_state.is_some_and(is_true!()) {
|
||||
let unique_rooms: BTreeSet<OwnedRoomId> =
|
||||
room_ids_in_results.iter().cloned().collect();
|
||||
futures::stream::iter(unique_rooms.iter())
|
||||
.filter_map(async |room_id| {
|
||||
procure_room_state(services, room_id)
|
||||
.map_ok(|state| (room_id.clone(), state))
|
||||
.await
|
||||
.ok()
|
||||
})
|
||||
.collect()
|
||||
.await
|
||||
} else {
|
||||
BTreeMap::new()
|
||||
};
|
||||
|
||||
let results: Vec<SearchResult> = pdu_list
|
||||
.into_iter()
|
||||
.stream()
|
||||
.map(Event::into_format)
|
||||
.map(|result| SearchResult {
|
||||
rank: None,
|
||||
result: Some(result),
|
||||
context: EventContextResult {
|
||||
profile_info: BTreeMap::new(), //TODO
|
||||
events_after: Vec::new(), //TODO
|
||||
events_before: Vec::new(), //TODO
|
||||
start: None, //TODO
|
||||
end: None, //TODO
|
||||
},
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let highlights = criteria
|
||||
.search_term
|
||||
.split_terminator(|c: char| !c.is_alphanumeric())
|
||||
.map(str::to_lowercase)
|
||||
.collect();
|
||||
|
||||
let next_batch = (results.len() >= limit)
|
||||
.then_some(next_batch.saturating_add(results.len()))
|
||||
.as_ref()
|
||||
.map(ToString::to_string);
|
||||
|
||||
return Ok(ResultRoomEvents {
|
||||
count: Some(total),
|
||||
next_batch,
|
||||
results,
|
||||
state,
|
||||
highlights,
|
||||
groups: BTreeMap::new(), // TODO
|
||||
});
|
||||
} else {
|
||||
let results: Vec<_> = accessible_rooms
|
||||
.iter()
|
||||
.stream()
|
||||
.filter_map(async |room_id| {
|
||||
let query = RoomQuery {
|
||||
room_id,
|
||||
user_id: Some(sender_user),
|
||||
criteria,
|
||||
skip: next_batch,
|
||||
limit,
|
||||
};
|
||||
|
||||
let (count, results) = services.search.search_pdus(&query).await.ok()?;
|
||||
|
||||
results
|
||||
.collect::<Vec<_>>()
|
||||
.map(|results| (room_id.clone(), count, results))
|
||||
.map(Some)
|
||||
.await
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let total: UInt = results
|
||||
.iter()
|
||||
.fold(0, |a: usize, (_, count, _)| a.saturating_add(*count))
|
||||
.try_into()?;
|
||||
|
||||
(results, total)
|
||||
};
|
||||
|
||||
let state: RoomStates = results
|
||||
.iter()
|
||||
|
||||
@@ -116,6 +116,22 @@ pub fn check(config: &Config) -> Result {
|
||||
});
|
||||
}
|
||||
|
||||
if config.search_backend == super::SearchBackendConfig::OpenSearch
|
||||
&& config.search_opensearch_url.is_none()
|
||||
{
|
||||
return Err!(Config(
|
||||
"search_opensearch_url",
|
||||
"OpenSearch URL must be set when search_backend is \"opensearch\""
|
||||
));
|
||||
}
|
||||
|
||||
if config.search_opensearch_hybrid && config.search_opensearch_model_id.is_none() {
|
||||
return Err!(Config(
|
||||
"search_opensearch_model_id",
|
||||
"Model ID must be set when search_opensearch_hybrid is enabled"
|
||||
));
|
||||
}
|
||||
|
||||
// rocksdb does not allow max_log_files to be 0
|
||||
if config.rocksdb_max_log_files == 0 {
|
||||
return Err!(Config(
|
||||
|
||||
@@ -1100,6 +1100,85 @@ pub struct Config {
|
||||
#[serde(default)]
|
||||
pub auto_deactivate_banned_room_attempts: bool,
|
||||
|
||||
/// Search backend to use for full-text message search.
|
||||
///
|
||||
/// Available options: "rocksdb" (default) or "opensearch".
|
||||
///
|
||||
/// default: "rocksdb"
|
||||
#[serde(default)]
|
||||
pub search_backend: SearchBackendConfig,
|
||||
|
||||
/// URL of the OpenSearch instance. Required when search_backend is
|
||||
/// "opensearch".
|
||||
///
|
||||
/// example: "http://localhost:9200"
|
||||
pub search_opensearch_url: Option<Url>,
|
||||
|
||||
/// Name of the OpenSearch index for message search.
|
||||
///
|
||||
/// default: "tuwunel_messages"
|
||||
#[serde(default = "default_search_opensearch_index")]
|
||||
pub search_opensearch_index: String,
|
||||
|
||||
/// Authentication for OpenSearch in "user:pass" format.
|
||||
///
|
||||
/// display: sensitive
|
||||
pub search_opensearch_auth: Option<String>,
|
||||
|
||||
/// Maximum number of documents to batch before flushing to OpenSearch.
|
||||
///
|
||||
/// default: 100
|
||||
#[serde(default = "default_search_opensearch_batch_size")]
|
||||
pub search_opensearch_batch_size: usize,
|
||||
|
||||
/// Maximum time in milliseconds to wait before flushing a partial batch
|
||||
/// to OpenSearch.
|
||||
///
|
||||
/// default: 1000
|
||||
#[serde(default = "default_search_opensearch_flush_interval_ms")]
|
||||
pub search_opensearch_flush_interval_ms: u64,
|
||||
|
||||
/// Enable hybrid neural+BM25 search in OpenSearch. Requires an ML model
|
||||
/// deployed in OpenSearch and an ingest pipeline that populates an
|
||||
/// "embedding" field.
|
||||
///
|
||||
/// When enabled, tuwunel will:
|
||||
/// - Create the index with a knn_vector "embedding" field
|
||||
/// - Attach the ingest pipeline (search_opensearch_pipeline) to the index
|
||||
/// - Use hybrid queries combining BM25 + neural kNN scoring
|
||||
///
|
||||
/// For a complete reference on configuring OpenSearch's ML plugin, model
|
||||
/// registration, and ingest pipeline setup, see the test helpers in
|
||||
/// `src/service/rooms/search/opensearch.rs` (the `ensure_neural_model`,
|
||||
/// `ensure_ingest_pipeline`, etc. functions in the `tests` module).
|
||||
///
|
||||
/// See also: https://opensearch.org/docs/latest/search-plugins/neural-search/
|
||||
///
|
||||
/// default: false
|
||||
#[serde(default)]
|
||||
pub search_opensearch_hybrid: bool,
|
||||
|
||||
/// The model ID registered in OpenSearch for neural search. Required when
|
||||
/// search_opensearch_hybrid is enabled.
|
||||
///
|
||||
/// example: "aKV84osBBHNT0StI3MBr"
|
||||
pub search_opensearch_model_id: Option<String>,
|
||||
|
||||
/// Embedding dimension for the neural search model. Must match the output
|
||||
/// dimension of the deployed model. Common values: 384
|
||||
/// (all-MiniLM-L6-v2), 768 (msmarco-distilbert-base-tas-b).
|
||||
///
|
||||
/// default: 384
|
||||
#[serde(default = "default_search_opensearch_embedding_dim")]
|
||||
pub search_opensearch_embedding_dim: usize,
|
||||
|
||||
/// Name of the ingest pipeline that generates embeddings for the
|
||||
/// "embedding" field. This pipeline must already exist in OpenSearch.
|
||||
///
|
||||
/// default: "tuwunel_embedding_pipeline"
|
||||
#[serde(default = "default_search_opensearch_pipeline")]
|
||||
pub search_opensearch_pipeline: String,
|
||||
|
||||
/// RocksDB log level. This is not the same as tuwunel's log level. This
|
||||
/// is the log level for the RocksDB engine/library which show up in your
|
||||
/// database folder/path as `LOG` files. tuwunel will log RocksDB errors
|
||||
@@ -2304,6 +2383,14 @@ pub struct Config {
|
||||
catchall: BTreeMap<String, IgnoredAny>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum SearchBackendConfig {
|
||||
#[default]
|
||||
RocksDb,
|
||||
OpenSearch,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Default)]
|
||||
#[config_example_generator(filename = "tuwunel-example.toml", section = "global.tls")]
|
||||
pub struct TlsConfig {
|
||||
@@ -3181,6 +3268,16 @@ impl Config {
|
||||
|
||||
fn true_fn() -> bool { true }
|
||||
|
||||
fn default_search_opensearch_index() -> String { "tuwunel_messages".to_owned() }
|
||||
|
||||
fn default_search_opensearch_batch_size() -> usize { 100 }
|
||||
|
||||
fn default_search_opensearch_flush_interval_ms() -> u64 { 1000 }
|
||||
|
||||
fn default_search_opensearch_embedding_dim() -> usize { 384 }
|
||||
|
||||
fn default_search_opensearch_pipeline() -> String { "tuwunel_embedding_pipeline".to_owned() }
|
||||
|
||||
#[cfg(test)]
|
||||
fn default_server_name() -> OwnedServerName { ruma::owned_server_name!("localhost") }
|
||||
|
||||
|
||||
@@ -28,7 +28,12 @@ pub fn maximize_fd_limit() -> Result {
|
||||
#[cfg(not(unix))]
|
||||
pub fn maximize_fd_limit() -> Result { Ok(()) }
|
||||
|
||||
#[cfg(unix)]
|
||||
#[cfg(any(
|
||||
linux_android,
|
||||
netbsdlike,
|
||||
target_os = "aix",
|
||||
target_os = "freebsd",
|
||||
))]
|
||||
/// Some distributions ship with very low defaults for thread counts; similar to
|
||||
/// low default file descriptor limits. But unlike fd's, thread limit is rarely
|
||||
/// reached, though on large systems (32+ cores) shipping with defaults of
|
||||
@@ -47,7 +52,12 @@ pub fn maximize_thread_limit() -> Result {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
#[cfg(not(any(
|
||||
linux_android,
|
||||
netbsdlike,
|
||||
target_os = "aix",
|
||||
target_os = "freebsd",
|
||||
)))]
|
||||
pub fn maximize_thread_limit() -> Result { Ok(()) }
|
||||
|
||||
#[cfg(unix)]
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use nix::sys::resource::Usage;
|
||||
#[cfg(unix)]
|
||||
use nix::sys::resource::{UsageWho, getrusage};
|
||||
use nix::sys::resource::{Usage, UsageWho, getrusage};
|
||||
|
||||
use crate::Result;
|
||||
|
||||
@@ -8,7 +7,7 @@ use crate::Result;
|
||||
pub fn usage() -> Result<Usage> { getrusage(UsageWho::RUSAGE_SELF).map_err(Into::into) }
|
||||
|
||||
#[cfg(not(unix))]
|
||||
pub fn usage() -> Result<Usage> { Ok(Usage::default()) }
|
||||
pub fn usage() -> Result<()> { Ok(()) }
|
||||
|
||||
#[cfg(any(
|
||||
target_os = "linux",
|
||||
@@ -17,9 +16,15 @@ pub fn usage() -> Result<Usage> { Ok(Usage::default()) }
|
||||
))]
|
||||
pub fn thread_usage() -> Result<Usage> { getrusage(UsageWho::RUSAGE_THREAD).map_err(Into::into) }
|
||||
|
||||
#[cfg(not(any(
|
||||
target_os = "linux",
|
||||
target_os = "freebsd",
|
||||
target_os = "openbsd"
|
||||
)))]
|
||||
pub fn thread_usage() -> Result<Usage> { Ok(Usage::default()) }
|
||||
#[cfg(all(
|
||||
unix,
|
||||
not(any(
|
||||
target_os = "linux",
|
||||
target_os = "freebsd",
|
||||
target_os = "openbsd"
|
||||
))
|
||||
))]
|
||||
pub fn thread_usage() -> Result<Usage> { getrusage(UsageWho::RUSAGE_SELF).map_err(Into::into) }
|
||||
|
||||
#[cfg(not(unix))]
|
||||
pub fn thread_usage() -> Result<()> { Ok(()) }
|
||||
|
||||
50
src/service/rooms/search/backend.rs
Normal file
50
src/service/rooms/search/backend.rs
Normal file
@@ -0,0 +1,50 @@
|
||||
use async_trait::async_trait;
|
||||
use ruma::RoomId;
|
||||
use tuwunel_core::Result;
|
||||
|
||||
use crate::rooms::{short::ShortRoomId, timeline::RawPduId};
|
||||
|
||||
/// A search hit returned by a backend.
|
||||
pub(super) struct SearchHit {
|
||||
pub(super) pdu_id: RawPduId,
|
||||
#[expect(dead_code)]
|
||||
pub(super) score: Option<f64>,
|
||||
}
|
||||
|
||||
/// Query parameters passed to the backend for searching.
|
||||
pub(super) struct BackendQuery<'a> {
|
||||
pub(super) search_term: &'a str,
|
||||
pub(super) room_ids: Option<&'a [&'a RoomId]>,
|
||||
}
|
||||
|
||||
/// Trait abstracting the search index backend.
|
||||
#[async_trait]
|
||||
pub(super) trait SearchBackend: Send + Sync {
|
||||
async fn index_pdu(
|
||||
&self,
|
||||
shortroomid: ShortRoomId,
|
||||
pdu_id: &RawPduId,
|
||||
room_id: &RoomId,
|
||||
message_body: &str,
|
||||
) -> Result;
|
||||
|
||||
async fn deindex_pdu(
|
||||
&self,
|
||||
shortroomid: ShortRoomId,
|
||||
pdu_id: &RawPduId,
|
||||
room_id: &RoomId,
|
||||
message_body: &str,
|
||||
) -> Result;
|
||||
|
||||
async fn search_pdu_ids(
|
||||
&self,
|
||||
query: &BackendQuery<'_>,
|
||||
shortroomid: Option<ShortRoomId>,
|
||||
limit: usize,
|
||||
skip: usize,
|
||||
) -> Result<(usize, Vec<SearchHit>)>;
|
||||
|
||||
async fn delete_room(&self, room_id: &RoomId, shortroomid: ShortRoomId) -> Result;
|
||||
|
||||
fn supports_cross_room_search(&self) -> bool { false }
|
||||
}
|
||||
@@ -1,34 +1,33 @@
|
||||
mod backend;
|
||||
mod opensearch;
|
||||
mod rocksdb;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::{Stream, StreamExt};
|
||||
use ruma::{RoomId, UserId, api::client::search::search_events::v3::Criteria};
|
||||
use tokio::sync::mpsc;
|
||||
use tuwunel_core::{
|
||||
PduCount, Result,
|
||||
arrayvec::ArrayVec,
|
||||
implement,
|
||||
Result, implement,
|
||||
matrix::event::{Event, Matches},
|
||||
trace,
|
||||
utils::{
|
||||
ArrayVecExt, IterStream, ReadyExt, set,
|
||||
stream::{TryIgnore, WidebandExt},
|
||||
},
|
||||
utils::{IterStream, ReadyExt, stream::WidebandExt},
|
||||
};
|
||||
use tuwunel_database::{Interfix, Map, keyval::Val};
|
||||
|
||||
use crate::rooms::{
|
||||
short::ShortRoomId,
|
||||
timeline::{PduId, RawPduId},
|
||||
use self::{
|
||||
backend::{BackendQuery, SearchBackend, SearchHit},
|
||||
opensearch::OpenSearchBackend,
|
||||
rocksdb::RocksDbBackend,
|
||||
};
|
||||
use crate::rooms::{short::ShortRoomId, timeline::RawPduId};
|
||||
|
||||
pub struct Service {
|
||||
db: Data,
|
||||
backend: Box<dyn SearchBackend>,
|
||||
bulk_rx: Option<tokio::sync::Mutex<mpsc::Receiver<opensearch::BulkAction>>>,
|
||||
opensearch: Option<Arc<OpenSearchBackend>>,
|
||||
services: Arc<crate::services::OnceServices>,
|
||||
}
|
||||
|
||||
struct Data {
|
||||
tokenids: Arc<Map>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RoomQuery<'a> {
|
||||
pub room_id: &'a RoomId,
|
||||
@@ -38,52 +37,133 @@ pub struct RoomQuery<'a> {
|
||||
pub skip: usize,
|
||||
}
|
||||
|
||||
type TokenId = ArrayVec<u8, TOKEN_ID_MAX_LEN>;
|
||||
|
||||
const TOKEN_ID_MAX_LEN: usize =
|
||||
size_of::<ShortRoomId>() + WORD_MAX_LEN + 1 + size_of::<RawPduId>();
|
||||
const WORD_MAX_LEN: usize = 50;
|
||||
|
||||
#[async_trait]
|
||||
impl crate::Service for Service {
|
||||
fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
let config = &args.server.config;
|
||||
let (backend, bulk_rx, opensearch): (Box<dyn SearchBackend>, _, _) = match config
|
||||
.search_backend
|
||||
{
|
||||
| tuwunel_core::config::SearchBackendConfig::RocksDb => {
|
||||
let b = RocksDbBackend::new(args.db["tokenids"].clone());
|
||||
(Box::new(b), None, None)
|
||||
},
|
||||
| tuwunel_core::config::SearchBackendConfig::OpenSearch => {
|
||||
let (b, rx) = OpenSearchBackend::new(config)?;
|
||||
let b = Arc::new(b);
|
||||
let backend: Box<dyn SearchBackend> = Box::new(OpenSearchWorkerRef(b.clone()));
|
||||
(backend, Some(tokio::sync::Mutex::new(rx)), Some(b))
|
||||
},
|
||||
};
|
||||
|
||||
Ok(Arc::new(Self {
|
||||
db: Data { tokenids: args.db["tokenids"].clone() },
|
||||
backend,
|
||||
bulk_rx,
|
||||
opensearch,
|
||||
services: args.services.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
|
||||
async fn worker(self: Arc<Self>) -> Result {
|
||||
if let (Some(os), Some(rx_mutex)) = (self.opensearch.as_ref(), self.bulk_rx.as_ref()) {
|
||||
os.ensure_index().await?;
|
||||
|
||||
let services = &**self.services;
|
||||
let config = &services.server.config;
|
||||
let batch_size = config.search_opensearch_batch_size;
|
||||
let flush_interval =
|
||||
std::time::Duration::from_millis(config.search_opensearch_flush_interval_ms);
|
||||
|
||||
let mut rx = rx_mutex.lock().await;
|
||||
os.process_bulk(&mut rx, batch_size, flush_interval)
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper so we can put the `Arc<OpenSearchBackend>` behind the trait.
|
||||
struct OpenSearchWorkerRef(Arc<OpenSearchBackend>);
|
||||
|
||||
#[async_trait]
|
||||
impl SearchBackend for OpenSearchWorkerRef {
|
||||
async fn index_pdu(
|
||||
&self,
|
||||
shortroomid: ShortRoomId,
|
||||
pdu_id: &RawPduId,
|
||||
room_id: &RoomId,
|
||||
message_body: &str,
|
||||
) -> Result {
|
||||
self.0
|
||||
.index_pdu(shortroomid, pdu_id, room_id, message_body)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn deindex_pdu(
|
||||
&self,
|
||||
shortroomid: ShortRoomId,
|
||||
pdu_id: &RawPduId,
|
||||
room_id: &RoomId,
|
||||
message_body: &str,
|
||||
) -> Result {
|
||||
self.0
|
||||
.deindex_pdu(shortroomid, pdu_id, room_id, message_body)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn search_pdu_ids(
|
||||
&self,
|
||||
query: &BackendQuery<'_>,
|
||||
shortroomid: Option<ShortRoomId>,
|
||||
limit: usize,
|
||||
skip: usize,
|
||||
) -> Result<(usize, Vec<SearchHit>)> {
|
||||
self.0
|
||||
.search_pdu_ids(query, shortroomid, limit, skip)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete_room(&self, room_id: &RoomId, shortroomid: ShortRoomId) -> Result {
|
||||
self.0.delete_room(room_id, shortroomid).await
|
||||
}
|
||||
|
||||
fn supports_cross_room_search(&self) -> bool { self.0.supports_cross_room_search() }
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn index_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_body: &str) {
|
||||
let batch = tokenize(message_body)
|
||||
.map(|word| {
|
||||
let mut key = shortroomid.to_be_bytes().to_vec();
|
||||
key.extend_from_slice(word.as_bytes());
|
||||
key.push(0xFF);
|
||||
key.extend_from_slice(pdu_id.as_ref()); // TODO: currently we save the room id a second time here
|
||||
key
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.db
|
||||
.tokenids
|
||||
.insert_batch(batch.iter().map(|k| (k.as_slice(), &[])));
|
||||
pub async fn index_pdu(
|
||||
&self,
|
||||
shortroomid: ShortRoomId,
|
||||
pdu_id: &RawPduId,
|
||||
room_id: &RoomId,
|
||||
message_body: &str,
|
||||
) {
|
||||
if let Err(e) = self
|
||||
.backend
|
||||
.index_pdu(shortroomid, pdu_id, room_id, message_body)
|
||||
.await
|
||||
{
|
||||
tuwunel_core::warn!("Failed to index PDU: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn deindex_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_body: &str) {
|
||||
let batch = tokenize(message_body).map(|word| {
|
||||
let mut key = shortroomid.to_be_bytes().to_vec();
|
||||
key.extend_from_slice(word.as_bytes());
|
||||
key.push(0xFF);
|
||||
key.extend_from_slice(pdu_id.as_ref()); // TODO: currently we save the room id a second time here
|
||||
key
|
||||
});
|
||||
|
||||
for token in batch {
|
||||
self.db.tokenids.remove(&token);
|
||||
pub async fn deindex_pdu(
|
||||
&self,
|
||||
shortroomid: ShortRoomId,
|
||||
pdu_id: &RawPduId,
|
||||
room_id: &RoomId,
|
||||
message_body: &str,
|
||||
) {
|
||||
if let Err(e) = self
|
||||
.backend
|
||||
.deindex_pdu(shortroomid, pdu_id, room_id, message_body)
|
||||
.await
|
||||
{
|
||||
tuwunel_core::warn!("Failed to deindex PDU: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,12 +172,31 @@ pub async fn search_pdus<'a>(
|
||||
&'a self,
|
||||
query: &'a RoomQuery<'a>,
|
||||
) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + '_)> {
|
||||
let pdu_ids: Vec<_> = self.search_pdu_ids(query).await?.collect().await;
|
||||
let shortroomid = self
|
||||
.services
|
||||
.short
|
||||
.get_shortroomid(query.room_id)
|
||||
.await?;
|
||||
|
||||
let backend_query = BackendQuery {
|
||||
search_term: &query.criteria.search_term,
|
||||
room_ids: None,
|
||||
};
|
||||
|
||||
let (count, hits) = self
|
||||
.backend
|
||||
.search_pdu_ids(
|
||||
&backend_query,
|
||||
Some(shortroomid),
|
||||
query.limit.saturating_add(query.skip),
|
||||
0,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let filter = &query.criteria.filter;
|
||||
let count = pdu_ids.len();
|
||||
let pdus = pdu_ids
|
||||
let pdus = hits
|
||||
.into_iter()
|
||||
.map(|h| h.pdu_id)
|
||||
.stream()
|
||||
.wide_filter_map(async |result_pdu_id: RawPduId| {
|
||||
self.services
|
||||
@@ -121,123 +220,64 @@ pub async fn search_pdus<'a>(
|
||||
Ok((count, pdus))
|
||||
}
|
||||
|
||||
// result is modeled as a stream such that callers don't have to be refactored
|
||||
// though an additional async/wrap still exists for now
|
||||
#[implement(Service)]
|
||||
pub async fn search_pdu_ids(
|
||||
&self,
|
||||
query: &RoomQuery<'_>,
|
||||
) -> Result<impl Stream<Item = RawPduId> + Send + '_ + use<'_>> {
|
||||
let shortroomid = self
|
||||
.services
|
||||
.short
|
||||
.get_shortroomid(query.room_id)
|
||||
pub async fn search_pdus_multi_room<'a>(
|
||||
&'a self,
|
||||
room_ids: &'a [&'a RoomId],
|
||||
sender_user: &'a UserId,
|
||||
criteria: &'a Criteria,
|
||||
limit: usize,
|
||||
skip: usize,
|
||||
) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + '_)> {
|
||||
let backend_query = BackendQuery {
|
||||
search_term: &criteria.search_term,
|
||||
room_ids: Some(room_ids),
|
||||
};
|
||||
|
||||
let (count, hits) = self
|
||||
.backend
|
||||
.search_pdu_ids(&backend_query, None, limit.saturating_add(skip), 0)
|
||||
.await?;
|
||||
|
||||
let pdu_ids = self
|
||||
.search_pdu_ids_query_room(query, shortroomid)
|
||||
.await;
|
||||
|
||||
let iters = pdu_ids.into_iter().map(IntoIterator::into_iter);
|
||||
|
||||
Ok(set::intersection(iters).stream())
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
async fn search_pdu_ids_query_room(
|
||||
&self,
|
||||
query: &RoomQuery<'_>,
|
||||
shortroomid: ShortRoomId,
|
||||
) -> Vec<Vec<RawPduId>> {
|
||||
tokenize(&query.criteria.search_term)
|
||||
let filter = &criteria.filter;
|
||||
let pdus = hits
|
||||
.into_iter()
|
||||
.map(|h| h.pdu_id)
|
||||
.stream()
|
||||
.wide_then(async |word| {
|
||||
self.search_pdu_ids_query_words(shortroomid, &word)
|
||||
.collect::<Vec<_>>()
|
||||
.wide_filter_map(async |result_pdu_id: RawPduId| {
|
||||
self.services
|
||||
.timeline
|
||||
.get_pdu_from_id(&result_pdu_id)
|
||||
.await
|
||||
.ok()
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await
|
||||
}
|
||||
|
||||
/// Iterate over PduId's containing a word
|
||||
#[implement(Service)]
|
||||
fn search_pdu_ids_query_words<'a>(
|
||||
&'a self,
|
||||
shortroomid: ShortRoomId,
|
||||
word: &'a str,
|
||||
) -> impl Stream<Item = RawPduId> + Send + '_ {
|
||||
self.search_pdu_ids_query_word(shortroomid, word)
|
||||
.map(move |key| -> RawPduId {
|
||||
let key = &key[prefix_len(word)..];
|
||||
key.into()
|
||||
.ready_filter(|pdu| !pdu.is_redacted())
|
||||
.ready_filter(move |pdu| filter.matches(pdu))
|
||||
.wide_filter_map(async move |pdu| {
|
||||
self.services
|
||||
.state_accessor
|
||||
.user_can_see_event(sender_user, pdu.room_id(), pdu.event_id())
|
||||
.await
|
||||
.then_some(pdu)
|
||||
})
|
||||
.skip(skip)
|
||||
.take(limit);
|
||||
|
||||
Ok((count, pdus))
|
||||
}
|
||||
|
||||
/// Iterate over raw database results for a word
|
||||
#[implement(Service)]
|
||||
fn search_pdu_ids_query_word(
|
||||
&self,
|
||||
shortroomid: ShortRoomId,
|
||||
word: &str,
|
||||
) -> impl Stream<Item = Val<'_>> + Send + '_ + use<'_> {
|
||||
// rustc says const'ing this not yet stable
|
||||
let end_id: RawPduId = PduId { shortroomid, count: PduCount::max() }.into();
|
||||
|
||||
// Newest pdus first
|
||||
let end = make_tokenid(shortroomid, word, &end_id);
|
||||
let prefix = make_prefix(shortroomid, word);
|
||||
self.db
|
||||
.tokenids
|
||||
.rev_raw_keys_from(&end)
|
||||
.ignore_err()
|
||||
.ready_take_while(move |key| key.starts_with(&prefix))
|
||||
}
|
||||
pub fn supports_cross_room_search(&self) -> bool { self.backend.supports_cross_room_search() }
|
||||
|
||||
#[implement(Service)]
|
||||
pub async fn delete_all_search_tokenids_for_room(&self, room_id: &RoomId) -> Result {
|
||||
let prefix = (room_id, Interfix);
|
||||
let shortroomid = self
|
||||
.services
|
||||
.short
|
||||
.get_shortroomid(room_id)
|
||||
.await?;
|
||||
|
||||
self.db
|
||||
.tokenids
|
||||
.keys_prefix_raw(&prefix)
|
||||
.ignore_err()
|
||||
.ready_for_each(|key| {
|
||||
trace!("Removing key: {key:?}");
|
||||
self.db.tokenids.remove(key);
|
||||
})
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Splits a string into tokens used as keys in the search inverted index
|
||||
///
|
||||
/// This may be used to tokenize both message bodies (for indexing) or search
|
||||
/// queries (for querying).
|
||||
fn tokenize(body: &str) -> impl Iterator<Item = String> + Send + '_ {
|
||||
body.split_terminator(|c: char| !c.is_alphanumeric())
|
||||
.filter(|s| !s.is_empty())
|
||||
.filter(|word| word.len() <= WORD_MAX_LEN)
|
||||
.map(str::to_lowercase)
|
||||
}
|
||||
|
||||
fn make_tokenid(shortroomid: ShortRoomId, word: &str, pdu_id: &RawPduId) -> TokenId {
|
||||
let mut key = make_prefix(shortroomid, word);
|
||||
key.extend_from_slice(pdu_id.as_ref());
|
||||
key
|
||||
}
|
||||
|
||||
fn make_prefix(shortroomid: ShortRoomId, word: &str) -> TokenId {
|
||||
let mut key = TokenId::new();
|
||||
key.extend_from_slice(&shortroomid.to_be_bytes());
|
||||
key.extend_from_slice(word.as_bytes());
|
||||
key.push(tuwunel_database::SEP);
|
||||
key
|
||||
}
|
||||
|
||||
fn prefix_len(word: &str) -> usize {
|
||||
size_of::<ShortRoomId>()
|
||||
.saturating_add(word.len())
|
||||
.saturating_add(1)
|
||||
self.backend
|
||||
.delete_room(room_id, shortroomid)
|
||||
.await
|
||||
}
|
||||
|
||||
1442
src/service/rooms/search/opensearch.rs
Normal file
1442
src/service/rooms/search/opensearch.rs
Normal file
File diff suppressed because it is too large
Load Diff
193
src/service/rooms/search/rocksdb.rs
Normal file
193
src/service/rooms/search/rocksdb.rs
Normal file
@@ -0,0 +1,193 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::StreamExt;
|
||||
use ruma::RoomId;
|
||||
use tuwunel_core::{
|
||||
PduCount, Result,
|
||||
arrayvec::ArrayVec,
|
||||
utils::{ArrayVecExt, ReadyExt, set, stream::TryIgnore},
|
||||
};
|
||||
use tuwunel_database::{Interfix, Map, keyval::Val};
|
||||
|
||||
use super::backend::{BackendQuery, SearchBackend, SearchHit};
|
||||
use crate::rooms::{
|
||||
short::ShortRoomId,
|
||||
timeline::{PduId, RawPduId},
|
||||
};
|
||||
|
||||
pub(super) struct RocksDbBackend {
|
||||
tokenids: Arc<Map>,
|
||||
}
|
||||
|
||||
type TokenId = ArrayVec<u8, TOKEN_ID_MAX_LEN>;
|
||||
|
||||
const TOKEN_ID_MAX_LEN: usize =
|
||||
size_of::<ShortRoomId>() + WORD_MAX_LEN + 1 + size_of::<RawPduId>();
|
||||
const WORD_MAX_LEN: usize = 50;
|
||||
|
||||
impl RocksDbBackend {
|
||||
pub(super) fn new(tokenids: Arc<Map>) -> Self { Self { tokenids } }
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SearchBackend for RocksDbBackend {
|
||||
async fn index_pdu(
|
||||
&self,
|
||||
shortroomid: ShortRoomId,
|
||||
pdu_id: &RawPduId,
|
||||
_room_id: &RoomId,
|
||||
message_body: &str,
|
||||
) -> Result {
|
||||
let batch = tokenize(message_body)
|
||||
.map(|word| {
|
||||
let mut key = shortroomid.to_be_bytes().to_vec();
|
||||
key.extend_from_slice(word.as_bytes());
|
||||
key.push(0xFF);
|
||||
key.extend_from_slice(pdu_id.as_ref());
|
||||
key
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.tokenids
|
||||
.insert_batch(batch.iter().map(|k| (k.as_slice(), &[])));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn deindex_pdu(
|
||||
&self,
|
||||
shortroomid: ShortRoomId,
|
||||
pdu_id: &RawPduId,
|
||||
_room_id: &RoomId,
|
||||
message_body: &str,
|
||||
) -> Result {
|
||||
let batch = tokenize(message_body).map(|word| {
|
||||
let mut key = shortroomid.to_be_bytes().to_vec();
|
||||
key.extend_from_slice(word.as_bytes());
|
||||
key.push(0xFF);
|
||||
key.extend_from_slice(pdu_id.as_ref());
|
||||
key
|
||||
});
|
||||
|
||||
for token in batch {
|
||||
self.tokenids.remove(&token);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn search_pdu_ids(
|
||||
&self,
|
||||
query: &BackendQuery<'_>,
|
||||
shortroomid: Option<ShortRoomId>,
|
||||
_limit: usize,
|
||||
_skip: usize,
|
||||
) -> Result<(usize, Vec<SearchHit>)> {
|
||||
let shortroomid = shortroomid.ok_or_else(|| {
|
||||
tuwunel_core::err!(Request(InvalidParam(
|
||||
"RocksDB backend requires a shortroomid for search"
|
||||
)))
|
||||
})?;
|
||||
|
||||
let pdu_ids =
|
||||
search_pdu_ids_query_room(&self.tokenids, shortroomid, query.search_term).await;
|
||||
|
||||
let iters = pdu_ids.into_iter().map(IntoIterator::into_iter);
|
||||
let results: Vec<_> = set::intersection(iters)
|
||||
.map(|pdu_id| SearchHit { pdu_id, score: None })
|
||||
.collect();
|
||||
let count = results.len();
|
||||
|
||||
Ok((count, results))
|
||||
}
|
||||
|
||||
async fn delete_room(&self, room_id: &RoomId, _shortroomid: ShortRoomId) -> Result {
|
||||
let prefix = (room_id, Interfix);
|
||||
|
||||
self.tokenids
|
||||
.keys_prefix_raw(&prefix)
|
||||
.ignore_err()
|
||||
.ready_for_each(|key| {
|
||||
self.tokenids.remove(key);
|
||||
})
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn search_pdu_ids_query_room(
|
||||
tokenids: &Arc<Map>,
|
||||
shortroomid: ShortRoomId,
|
||||
search_term: &str,
|
||||
) -> Vec<Vec<RawPduId>> {
|
||||
tokenize(search_term)
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
.map(|word| {
|
||||
let tokenids = tokenids.clone();
|
||||
async move {
|
||||
search_pdu_ids_query_words(&tokenids, shortroomid, &word)
|
||||
.collect::<Vec<_>>()
|
||||
.await
|
||||
}
|
||||
})
|
||||
.collect::<futures::stream::FuturesUnordered<_>>()
|
||||
.collect::<Vec<_>>()
|
||||
.await
|
||||
}
|
||||
|
||||
fn search_pdu_ids_query_words<'a>(
|
||||
tokenids: &'a Arc<Map>,
|
||||
shortroomid: ShortRoomId,
|
||||
word: &'a str,
|
||||
) -> impl futures::Stream<Item = RawPduId> + Send + 'a {
|
||||
search_pdu_ids_query_word(tokenids, shortroomid, word).map(move |key| -> RawPduId {
|
||||
let key = &key[prefix_len(word)..];
|
||||
key.into()
|
||||
})
|
||||
}
|
||||
|
||||
fn search_pdu_ids_query_word<'a>(
|
||||
tokenids: &'a Arc<Map>,
|
||||
shortroomid: ShortRoomId,
|
||||
word: &'a str,
|
||||
) -> impl futures::Stream<Item = Val<'a>> + Send + 'a {
|
||||
let end_id: RawPduId = PduId { shortroomid, count: PduCount::max() }.into();
|
||||
|
||||
let end = make_tokenid(shortroomid, word, &end_id);
|
||||
let prefix = make_prefix(shortroomid, word);
|
||||
tokenids
|
||||
.rev_raw_keys_from(&end)
|
||||
.ignore_err()
|
||||
.ready_take_while(move |key| key.starts_with(&prefix))
|
||||
}
|
||||
|
||||
/// Splits a string into tokens used as keys in the search inverted index
|
||||
pub(super) fn tokenize(body: &str) -> impl Iterator<Item = String> + Send + '_ {
|
||||
body.split_terminator(|c: char| !c.is_alphanumeric())
|
||||
.filter(|s| !s.is_empty())
|
||||
.filter(|word| word.len() <= WORD_MAX_LEN)
|
||||
.map(str::to_lowercase)
|
||||
}
|
||||
|
||||
fn make_tokenid(shortroomid: ShortRoomId, word: &str, pdu_id: &RawPduId) -> TokenId {
|
||||
let mut key = make_prefix(shortroomid, word);
|
||||
key.extend_from_slice(pdu_id.as_ref());
|
||||
key
|
||||
}
|
||||
|
||||
fn make_prefix(shortroomid: ShortRoomId, word: &str) -> TokenId {
|
||||
let mut key = TokenId::new();
|
||||
key.extend_from_slice(&shortroomid.to_be_bytes());
|
||||
key.extend_from_slice(word.as_bytes());
|
||||
key.push(tuwunel_database::SEP);
|
||||
key
|
||||
}
|
||||
|
||||
fn prefix_len(word: &str) -> usize {
|
||||
size_of::<ShortRoomId>()
|
||||
.saturating_add(word.len())
|
||||
.saturating_add(1)
|
||||
}
|
||||
@@ -281,7 +281,8 @@ async fn append_pdu_effects(
|
||||
if let Some(body) = content.body {
|
||||
self.services
|
||||
.search
|
||||
.index_pdu(shortroomid, &pdu_id, &body);
|
||||
.index_pdu(shortroomid, &pdu_id, pdu.room_id(), &body)
|
||||
.await;
|
||||
|
||||
if self
|
||||
.services
|
||||
|
||||
@@ -240,7 +240,8 @@ pub async fn backfill_pdu(
|
||||
if let Some(body) = content.body {
|
||||
self.services
|
||||
.search
|
||||
.index_pdu(shortroomid, &pdu_id, &body);
|
||||
.index_pdu(shortroomid, &pdu_id, pdu.room_id(), &body)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
drop(mutex_lock);
|
||||
|
||||
@@ -40,14 +40,15 @@ pub async fn redact_pdu<Pdu: Event + Send + Sync>(
|
||||
.get("body")
|
||||
.and_then(|body| body.as_str());
|
||||
|
||||
let room_id = RoomId::parse(pdu["room_id"].as_str().unwrap()).unwrap();
|
||||
|
||||
if let Some(body) = body {
|
||||
self.services
|
||||
.search
|
||||
.deindex_pdu(shortroomid, &pdu_id, body);
|
||||
.deindex_pdu(shortroomid, &pdu_id, room_id, body)
|
||||
.await;
|
||||
}
|
||||
|
||||
let room_id = RoomId::parse(pdu["room_id"].as_str().unwrap()).unwrap();
|
||||
|
||||
let room_version_id = self
|
||||
.services
|
||||
.state
|
||||
|
||||
@@ -907,6 +907,72 @@
|
||||
#
|
||||
#auto_deactivate_banned_room_attempts = false
|
||||
|
||||
# Search backend to use for full-text message search.
|
||||
#
|
||||
# Available options: "rocksdb" (default) or "opensearch".
|
||||
#
|
||||
#search_backend = "rocksdb"
|
||||
|
||||
# URL of the OpenSearch instance. Required when search_backend is
|
||||
# "opensearch".
|
||||
#
|
||||
# example: "http://localhost:9200"
|
||||
#
|
||||
#search_opensearch_url =
|
||||
|
||||
# Name of the OpenSearch index for message search.
|
||||
#
|
||||
#search_opensearch_index = "tuwunel_messages"
|
||||
|
||||
# Authentication for OpenSearch in "user:pass" format.
|
||||
#
|
||||
#search_opensearch_auth =
|
||||
|
||||
# Maximum number of documents to batch before flushing to OpenSearch.
|
||||
#
|
||||
#search_opensearch_batch_size = 100
|
||||
|
||||
# Maximum time in milliseconds to wait before flushing a partial batch
|
||||
# to OpenSearch.
|
||||
#
|
||||
#search_opensearch_flush_interval_ms = 1000
|
||||
|
||||
# Enable hybrid neural+BM25 search in OpenSearch. Requires an ML model
|
||||
# deployed in OpenSearch and an ingest pipeline that populates an
|
||||
# "embedding" field.
|
||||
#
|
||||
# When enabled, tuwunel will:
|
||||
# - Create the index with a knn_vector "embedding" field
|
||||
# - Attach the ingest pipeline (search_opensearch_pipeline) to the index
|
||||
# - Use hybrid queries combining BM25 + neural kNN scoring
|
||||
#
|
||||
# For a complete reference on configuring OpenSearch's ML plugin, model
|
||||
# registration, and ingest pipeline setup, see the test helpers in
|
||||
# `src/service/rooms/search/opensearch.rs` (the `ensure_neural_model`,
|
||||
# `ensure_ingest_pipeline`, etc. functions in the `tests` module).
|
||||
#
|
||||
# See also: https://opensearch.org/docs/latest/search-plugins/neural-search/
|
||||
#
|
||||
#search_opensearch_hybrid = false
|
||||
|
||||
# The model ID registered in OpenSearch for neural search. Required when
|
||||
# search_opensearch_hybrid is enabled.
|
||||
#
|
||||
# example: "aKV84osBBHNT0StI3MBr"
|
||||
#
|
||||
#search_opensearch_model_id =
|
||||
|
||||
# Embedding dimension for the neural search model. Must match the output
|
||||
# dimension of the deployed model. Common values: 384
|
||||
# (all-MiniLM-L6-v2), 768 (msmarco-distilbert-base-tas-b).
|
||||
#
|
||||
#search_opensearch_embedding_dim = 384
|
||||
|
||||
# Name of the ingest pipeline that generates embeddings for the
|
||||
# "embedding" field. This pipeline must already exist in OpenSearch.
|
||||
#
|
||||
#search_opensearch_pipeline = "tuwunel_embedding_pipeline"
|
||||
|
||||
# RocksDB log level. This is not the same as tuwunel's log level. This
|
||||
# is the log level for the RocksDB engine/library which show up in your
|
||||
# database folder/path as `LOG` files. tuwunel will log RocksDB errors
|
||||
|
||||
Reference in New Issue
Block a user