Compare commits
2 Commits
main
...
feat/searc
| Author | SHA1 | Date | |
|---|---|---|---|
|
cf21ffc452
|
|||
|
c9cddc80d9
|
13
.github/workflows/main.yml
vendored
13
.github/workflows/main.yml
vendored
@@ -332,6 +332,19 @@ jobs:
|
|||||||
{"sys_target": "x86_64-v4-linux-gnu", "feat_set": "default"},
|
{"sys_target": "x86_64-v4-linux-gnu", "feat_set": "default"},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
opensearch-tests:
|
||||||
|
if: >
|
||||||
|
!failure() && !cancelled()
|
||||||
|
&& fromJSON(needs.init.outputs.enable_test)
|
||||||
|
&& !fromJSON(needs.init.outputs.is_release)
|
||||||
|
&& !contains(needs.init.outputs.pipeline, '[ci no test]')
|
||||||
|
|
||||||
|
name: OpenSearch
|
||||||
|
needs: [init, lint]
|
||||||
|
uses: ./.github/workflows/opensearch-tests.yml
|
||||||
|
with:
|
||||||
|
checkout: ${{needs.init.outputs.checkout}}
|
||||||
|
|
||||||
package:
|
package:
|
||||||
if: >
|
if: >
|
||||||
!failure() && !cancelled()
|
!failure() && !cancelled()
|
||||||
|
|||||||
114
.github/workflows/opensearch-tests.yml
vendored
Normal file
114
.github/workflows/opensearch-tests.yml
vendored
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
name: OpenSearch Integration Tests
|
||||||
|
|
||||||
|
on:
|
||||||
|
workflow_call:
|
||||||
|
inputs:
|
||||||
|
checkout:
|
||||||
|
type: string
|
||||||
|
default: 'HEAD'
|
||||||
|
pull_request:
|
||||||
|
paths:
|
||||||
|
- 'src/service/rooms/search/**'
|
||||||
|
- 'src/core/config/mod.rs'
|
||||||
|
- '.github/workflows/opensearch-tests.yml'
|
||||||
|
push:
|
||||||
|
branches: [main]
|
||||||
|
paths:
|
||||||
|
- 'src/service/rooms/search/**'
|
||||||
|
- 'src/core/config/mod.rs'
|
||||||
|
- '.github/workflows/opensearch-tests.yml'
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
opensearch-tests:
|
||||||
|
name: OpenSearch Integration
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
services:
|
||||||
|
opensearch:
|
||||||
|
image: opensearchproject/opensearch:3.5.0
|
||||||
|
ports:
|
||||||
|
- 9200:9200
|
||||||
|
env:
|
||||||
|
discovery.type: single-node
|
||||||
|
DISABLE_SECURITY_PLUGIN: "true"
|
||||||
|
OPENSEARCH_INITIAL_ADMIN_PASSWORD: "SuperSecret123!"
|
||||||
|
OPENSEARCH_JAVA_OPTS: "-Xms1g -Xmx2g"
|
||||||
|
options: >-
|
||||||
|
--health-cmd "curl -f http://localhost:9200/_cluster/health || exit 1"
|
||||||
|
--health-interval 10s
|
||||||
|
--health-timeout 5s
|
||||||
|
--health-retries 20
|
||||||
|
--health-start-period 60s
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
with:
|
||||||
|
ref: ${{ inputs.checkout || github.sha }}
|
||||||
|
|
||||||
|
- name: Install Rust toolchain
|
||||||
|
uses: dtolnay/rust-toolchain@nightly
|
||||||
|
|
||||||
|
- name: Cache cargo registry and build
|
||||||
|
uses: actions/cache@v4
|
||||||
|
with:
|
||||||
|
path: |
|
||||||
|
~/.cargo/registry
|
||||||
|
~/.cargo/git
|
||||||
|
target
|
||||||
|
key: opensearch-tests-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }}
|
||||||
|
restore-keys: |
|
||||||
|
opensearch-tests-${{ runner.os }}-
|
||||||
|
|
||||||
|
- name: Wait for OpenSearch
|
||||||
|
run: |
|
||||||
|
for i in $(seq 1 60); do
|
||||||
|
if curl -sf http://localhost:9200/_cluster/health; then
|
||||||
|
echo ""
|
||||||
|
echo "OpenSearch is ready"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
echo "Waiting for OpenSearch... ($i/60)"
|
||||||
|
sleep 2
|
||||||
|
done
|
||||||
|
echo "OpenSearch did not start in time"
|
||||||
|
exit 1
|
||||||
|
|
||||||
|
- name: Configure OpenSearch ML plugin
|
||||||
|
run: |
|
||||||
|
curl -sf -X PUT http://localhost:9200/_cluster/settings \
|
||||||
|
-H 'Content-Type: application/json' \
|
||||||
|
-d '{
|
||||||
|
"persistent": {
|
||||||
|
"plugins.ml_commons.only_run_on_ml_node": "false",
|
||||||
|
"plugins.ml_commons.native_memory_threshold": "99",
|
||||||
|
"plugins.ml_commons.allow_registering_model_via_url": "true"
|
||||||
|
}
|
||||||
|
}'
|
||||||
|
|
||||||
|
- name: Run OpenSearch integration tests (BM25)
|
||||||
|
env:
|
||||||
|
OPENSEARCH_URL: http://localhost:9200
|
||||||
|
run: >
|
||||||
|
cargo test
|
||||||
|
-p tuwunel_service
|
||||||
|
--lib
|
||||||
|
rooms::search::opensearch::tests
|
||||||
|
--
|
||||||
|
--ignored
|
||||||
|
--test-threads=1
|
||||||
|
--skip test_neural
|
||||||
|
--skip test_hybrid
|
||||||
|
|
||||||
|
- name: Run OpenSearch neural/hybrid tests
|
||||||
|
env:
|
||||||
|
OPENSEARCH_URL: http://localhost:9200
|
||||||
|
timeout-minutes: 15
|
||||||
|
run: >
|
||||||
|
cargo test
|
||||||
|
-p tuwunel_service
|
||||||
|
--lib
|
||||||
|
rooms::search::opensearch::tests
|
||||||
|
--
|
||||||
|
--ignored
|
||||||
|
--test-threads=1
|
||||||
|
test_neural test_hybrid
|
||||||
37
Dockerfile
Normal file
37
Dockerfile
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
# Simple production Dockerfile for tuwunel
|
||||||
|
# Built by buildkitd on the x86_64 server — no cross-compilation needed.
|
||||||
|
|
||||||
|
FROM rust:slim-bookworm AS builder
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
|
libclang-dev clang cmake pkg-config make liburing-dev \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
WORKDIR /usr/src/tuwunel
|
||||||
|
|
||||||
|
# Copy manifests first for dependency caching
|
||||||
|
COPY Cargo.toml Cargo.lock rust-toolchain.toml ./
|
||||||
|
COPY .cargo/ .cargo/
|
||||||
|
COPY src/ src/
|
||||||
|
|
||||||
|
# Strip unnecessary cross-compilation targets from rust-toolchain.toml
|
||||||
|
# to avoid downloading toolchains we don't need
|
||||||
|
RUN sed -i '/x86_64-unknown-linux-musl/d; /aarch64-/d; /apple-/d; /rust-src/d; /rust-analyzer/d; /rustfmt/d; /clippy/d' rust-toolchain.toml
|
||||||
|
|
||||||
|
RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
||||||
|
--mount=type=cache,target=/usr/local/cargo/git \
|
||||||
|
cargo build --release --locked \
|
||||||
|
&& cp target/release/tuwunel /usr/local/bin/tuwunel
|
||||||
|
|
||||||
|
FROM debian:bookworm-slim
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
|
ca-certificates tini liburing2 \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
COPY --from=builder /usr/local/bin/tuwunel /usr/local/bin/tuwunel
|
||||||
|
|
||||||
|
EXPOSE 6167
|
||||||
|
|
||||||
|
ENTRYPOINT ["tini", "--"]
|
||||||
|
CMD ["tuwunel"]
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::{BTreeMap, BTreeSet};
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
|
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
|
||||||
@@ -92,37 +92,120 @@ async fn category_room_events(
|
|||||||
.boxed()
|
.boxed()
|
||||||
});
|
});
|
||||||
|
|
||||||
let results: Vec<_> = rooms
|
let accessible_rooms: Vec<OwnedRoomId> = rooms
|
||||||
.filter_map(async |room_id| {
|
.filter_map(async |room_id| {
|
||||||
check_room_visible(services, sender_user, &room_id, criteria)
|
check_room_visible(services, sender_user, &room_id, criteria)
|
||||||
.await
|
.await
|
||||||
.is_ok()
|
.is_ok()
|
||||||
.then_some(room_id)
|
.then_some(room_id)
|
||||||
})
|
})
|
||||||
.filter_map(async |room_id| {
|
|
||||||
let query = RoomQuery {
|
|
||||||
room_id: &room_id,
|
|
||||||
user_id: Some(sender_user),
|
|
||||||
criteria,
|
|
||||||
skip: next_batch,
|
|
||||||
limit,
|
|
||||||
};
|
|
||||||
|
|
||||||
let (count, results) = services.search.search_pdus(&query).await.ok()?;
|
|
||||||
|
|
||||||
results
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.map(|results| (room_id.clone(), count, results))
|
|
||||||
.map(Some)
|
|
||||||
.await
|
|
||||||
})
|
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let total: UInt = results
|
let (results, total) = if services.search.supports_cross_room_search() {
|
||||||
.iter()
|
let room_id_refs: Vec<&RoomId> = accessible_rooms
|
||||||
.fold(0, |a: usize, (_, count, _)| a.saturating_add(*count))
|
.iter()
|
||||||
.try_into()?;
|
.map(AsRef::as_ref)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let (count, pdus) = services
|
||||||
|
.search
|
||||||
|
.search_pdus_multi_room(&room_id_refs, sender_user, criteria, limit, next_batch)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let pdu_list: Vec<_> = pdus.collect().await;
|
||||||
|
let total: UInt = count.try_into()?;
|
||||||
|
|
||||||
|
// Group results by room for state collection
|
||||||
|
let room_ids_in_results: Vec<OwnedRoomId> = pdu_list
|
||||||
|
.iter()
|
||||||
|
.map(|pdu| pdu.room_id().to_owned())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let state: RoomStates = if criteria.include_state.is_some_and(is_true!()) {
|
||||||
|
let unique_rooms: BTreeSet<OwnedRoomId> =
|
||||||
|
room_ids_in_results.iter().cloned().collect();
|
||||||
|
futures::stream::iter(unique_rooms.iter())
|
||||||
|
.filter_map(async |room_id| {
|
||||||
|
procure_room_state(services, room_id)
|
||||||
|
.map_ok(|state| (room_id.clone(), state))
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
|
} else {
|
||||||
|
BTreeMap::new()
|
||||||
|
};
|
||||||
|
|
||||||
|
let results: Vec<SearchResult> = pdu_list
|
||||||
|
.into_iter()
|
||||||
|
.stream()
|
||||||
|
.map(Event::into_format)
|
||||||
|
.map(|result| SearchResult {
|
||||||
|
rank: None,
|
||||||
|
result: Some(result),
|
||||||
|
context: EventContextResult {
|
||||||
|
profile_info: BTreeMap::new(), //TODO
|
||||||
|
events_after: Vec::new(), //TODO
|
||||||
|
events_before: Vec::new(), //TODO
|
||||||
|
start: None, //TODO
|
||||||
|
end: None, //TODO
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let highlights = criteria
|
||||||
|
.search_term
|
||||||
|
.split_terminator(|c: char| !c.is_alphanumeric())
|
||||||
|
.map(str::to_lowercase)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let next_batch = (results.len() >= limit)
|
||||||
|
.then_some(next_batch.saturating_add(results.len()))
|
||||||
|
.as_ref()
|
||||||
|
.map(ToString::to_string);
|
||||||
|
|
||||||
|
return Ok(ResultRoomEvents {
|
||||||
|
count: Some(total),
|
||||||
|
next_batch,
|
||||||
|
results,
|
||||||
|
state,
|
||||||
|
highlights,
|
||||||
|
groups: BTreeMap::new(), // TODO
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
let results: Vec<_> = accessible_rooms
|
||||||
|
.iter()
|
||||||
|
.stream()
|
||||||
|
.filter_map(async |room_id| {
|
||||||
|
let query = RoomQuery {
|
||||||
|
room_id,
|
||||||
|
user_id: Some(sender_user),
|
||||||
|
criteria,
|
||||||
|
skip: next_batch,
|
||||||
|
limit,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (count, results) = services.search.search_pdus(&query).await.ok()?;
|
||||||
|
|
||||||
|
results
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.map(|results| (room_id.clone(), count, results))
|
||||||
|
.map(Some)
|
||||||
|
.await
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let total: UInt = results
|
||||||
|
.iter()
|
||||||
|
.fold(0, |a: usize, (_, count, _)| a.saturating_add(*count))
|
||||||
|
.try_into()?;
|
||||||
|
|
||||||
|
(results, total)
|
||||||
|
};
|
||||||
|
|
||||||
let state: RoomStates = results
|
let state: RoomStates = results
|
||||||
.iter()
|
.iter()
|
||||||
|
|||||||
@@ -116,6 +116,22 @@ pub fn check(config: &Config) -> Result {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.search_backend == super::SearchBackendConfig::OpenSearch
|
||||||
|
&& config.search_opensearch_url.is_none()
|
||||||
|
{
|
||||||
|
return Err!(Config(
|
||||||
|
"search_opensearch_url",
|
||||||
|
"OpenSearch URL must be set when search_backend is \"opensearch\""
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.search_opensearch_hybrid && config.search_opensearch_model_id.is_none() {
|
||||||
|
return Err!(Config(
|
||||||
|
"search_opensearch_model_id",
|
||||||
|
"Model ID must be set when search_opensearch_hybrid is enabled"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
// rocksdb does not allow max_log_files to be 0
|
// rocksdb does not allow max_log_files to be 0
|
||||||
if config.rocksdb_max_log_files == 0 {
|
if config.rocksdb_max_log_files == 0 {
|
||||||
return Err!(Config(
|
return Err!(Config(
|
||||||
|
|||||||
@@ -1100,6 +1100,85 @@ pub struct Config {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub auto_deactivate_banned_room_attempts: bool,
|
pub auto_deactivate_banned_room_attempts: bool,
|
||||||
|
|
||||||
|
/// Search backend to use for full-text message search.
|
||||||
|
///
|
||||||
|
/// Available options: "rocksdb" (default) or "opensearch".
|
||||||
|
///
|
||||||
|
/// default: "rocksdb"
|
||||||
|
#[serde(default)]
|
||||||
|
pub search_backend: SearchBackendConfig,
|
||||||
|
|
||||||
|
/// URL of the OpenSearch instance. Required when search_backend is
|
||||||
|
/// "opensearch".
|
||||||
|
///
|
||||||
|
/// example: "http://localhost:9200"
|
||||||
|
pub search_opensearch_url: Option<Url>,
|
||||||
|
|
||||||
|
/// Name of the OpenSearch index for message search.
|
||||||
|
///
|
||||||
|
/// default: "tuwunel_messages"
|
||||||
|
#[serde(default = "default_search_opensearch_index")]
|
||||||
|
pub search_opensearch_index: String,
|
||||||
|
|
||||||
|
/// Authentication for OpenSearch in "user:pass" format.
|
||||||
|
///
|
||||||
|
/// display: sensitive
|
||||||
|
pub search_opensearch_auth: Option<String>,
|
||||||
|
|
||||||
|
/// Maximum number of documents to batch before flushing to OpenSearch.
|
||||||
|
///
|
||||||
|
/// default: 100
|
||||||
|
#[serde(default = "default_search_opensearch_batch_size")]
|
||||||
|
pub search_opensearch_batch_size: usize,
|
||||||
|
|
||||||
|
/// Maximum time in milliseconds to wait before flushing a partial batch
|
||||||
|
/// to OpenSearch.
|
||||||
|
///
|
||||||
|
/// default: 1000
|
||||||
|
#[serde(default = "default_search_opensearch_flush_interval_ms")]
|
||||||
|
pub search_opensearch_flush_interval_ms: u64,
|
||||||
|
|
||||||
|
/// Enable hybrid neural+BM25 search in OpenSearch. Requires an ML model
|
||||||
|
/// deployed in OpenSearch and an ingest pipeline that populates an
|
||||||
|
/// "embedding" field.
|
||||||
|
///
|
||||||
|
/// When enabled, tuwunel will:
|
||||||
|
/// - Create the index with a knn_vector "embedding" field
|
||||||
|
/// - Attach the ingest pipeline (search_opensearch_pipeline) to the index
|
||||||
|
/// - Use hybrid queries combining BM25 + neural kNN scoring
|
||||||
|
///
|
||||||
|
/// For a complete reference on configuring OpenSearch's ML plugin, model
|
||||||
|
/// registration, and ingest pipeline setup, see the test helpers in
|
||||||
|
/// `src/service/rooms/search/opensearch.rs` (the `ensure_neural_model`,
|
||||||
|
/// `ensure_ingest_pipeline`, etc. functions in the `tests` module).
|
||||||
|
///
|
||||||
|
/// See also: https://opensearch.org/docs/latest/search-plugins/neural-search/
|
||||||
|
///
|
||||||
|
/// default: false
|
||||||
|
#[serde(default)]
|
||||||
|
pub search_opensearch_hybrid: bool,
|
||||||
|
|
||||||
|
/// The model ID registered in OpenSearch for neural search. Required when
|
||||||
|
/// search_opensearch_hybrid is enabled.
|
||||||
|
///
|
||||||
|
/// example: "aKV84osBBHNT0StI3MBr"
|
||||||
|
pub search_opensearch_model_id: Option<String>,
|
||||||
|
|
||||||
|
/// Embedding dimension for the neural search model. Must match the output
|
||||||
|
/// dimension of the deployed model. Common values: 384
|
||||||
|
/// (all-MiniLM-L6-v2), 768 (msmarco-distilbert-base-tas-b).
|
||||||
|
///
|
||||||
|
/// default: 384
|
||||||
|
#[serde(default = "default_search_opensearch_embedding_dim")]
|
||||||
|
pub search_opensearch_embedding_dim: usize,
|
||||||
|
|
||||||
|
/// Name of the ingest pipeline that generates embeddings for the
|
||||||
|
/// "embedding" field. This pipeline must already exist in OpenSearch.
|
||||||
|
///
|
||||||
|
/// default: "tuwunel_embedding_pipeline"
|
||||||
|
#[serde(default = "default_search_opensearch_pipeline")]
|
||||||
|
pub search_opensearch_pipeline: String,
|
||||||
|
|
||||||
/// RocksDB log level. This is not the same as tuwunel's log level. This
|
/// RocksDB log level. This is not the same as tuwunel's log level. This
|
||||||
/// is the log level for the RocksDB engine/library which show up in your
|
/// is the log level for the RocksDB engine/library which show up in your
|
||||||
/// database folder/path as `LOG` files. tuwunel will log RocksDB errors
|
/// database folder/path as `LOG` files. tuwunel will log RocksDB errors
|
||||||
@@ -2304,6 +2383,14 @@ pub struct Config {
|
|||||||
catchall: BTreeMap<String, IgnoredAny>,
|
catchall: BTreeMap<String, IgnoredAny>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "lowercase")]
|
||||||
|
pub enum SearchBackendConfig {
|
||||||
|
#[default]
|
||||||
|
RocksDb,
|
||||||
|
OpenSearch,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize, Default)]
|
#[derive(Clone, Debug, Deserialize, Default)]
|
||||||
#[config_example_generator(filename = "tuwunel-example.toml", section = "global.tls")]
|
#[config_example_generator(filename = "tuwunel-example.toml", section = "global.tls")]
|
||||||
pub struct TlsConfig {
|
pub struct TlsConfig {
|
||||||
@@ -3181,6 +3268,16 @@ impl Config {
|
|||||||
|
|
||||||
fn true_fn() -> bool { true }
|
fn true_fn() -> bool { true }
|
||||||
|
|
||||||
|
fn default_search_opensearch_index() -> String { "tuwunel_messages".to_owned() }
|
||||||
|
|
||||||
|
fn default_search_opensearch_batch_size() -> usize { 100 }
|
||||||
|
|
||||||
|
fn default_search_opensearch_flush_interval_ms() -> u64 { 1000 }
|
||||||
|
|
||||||
|
fn default_search_opensearch_embedding_dim() -> usize { 384 }
|
||||||
|
|
||||||
|
fn default_search_opensearch_pipeline() -> String { "tuwunel_embedding_pipeline".to_owned() }
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn default_server_name() -> OwnedServerName { ruma::owned_server_name!("localhost") }
|
fn default_server_name() -> OwnedServerName { ruma::owned_server_name!("localhost") }
|
||||||
|
|
||||||
|
|||||||
@@ -28,7 +28,12 @@ pub fn maximize_fd_limit() -> Result {
|
|||||||
#[cfg(not(unix))]
|
#[cfg(not(unix))]
|
||||||
pub fn maximize_fd_limit() -> Result { Ok(()) }
|
pub fn maximize_fd_limit() -> Result { Ok(()) }
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(any(
|
||||||
|
linux_android,
|
||||||
|
netbsdlike,
|
||||||
|
target_os = "aix",
|
||||||
|
target_os = "freebsd",
|
||||||
|
))]
|
||||||
/// Some distributions ship with very low defaults for thread counts; similar to
|
/// Some distributions ship with very low defaults for thread counts; similar to
|
||||||
/// low default file descriptor limits. But unlike fd's, thread limit is rarely
|
/// low default file descriptor limits. But unlike fd's, thread limit is rarely
|
||||||
/// reached, though on large systems (32+ cores) shipping with defaults of
|
/// reached, though on large systems (32+ cores) shipping with defaults of
|
||||||
@@ -47,7 +52,12 @@ pub fn maximize_thread_limit() -> Result {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(unix))]
|
#[cfg(not(any(
|
||||||
|
linux_android,
|
||||||
|
netbsdlike,
|
||||||
|
target_os = "aix",
|
||||||
|
target_os = "freebsd",
|
||||||
|
)))]
|
||||||
pub fn maximize_thread_limit() -> Result { Ok(()) }
|
pub fn maximize_thread_limit() -> Result { Ok(()) }
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
use nix::sys::resource::Usage;
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
use nix::sys::resource::{UsageWho, getrusage};
|
use nix::sys::resource::{Usage, UsageWho, getrusage};
|
||||||
|
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
@@ -8,7 +7,7 @@ use crate::Result;
|
|||||||
pub fn usage() -> Result<Usage> { getrusage(UsageWho::RUSAGE_SELF).map_err(Into::into) }
|
pub fn usage() -> Result<Usage> { getrusage(UsageWho::RUSAGE_SELF).map_err(Into::into) }
|
||||||
|
|
||||||
#[cfg(not(unix))]
|
#[cfg(not(unix))]
|
||||||
pub fn usage() -> Result<Usage> { Ok(Usage::default()) }
|
pub fn usage() -> Result<()> { Ok(()) }
|
||||||
|
|
||||||
#[cfg(any(
|
#[cfg(any(
|
||||||
target_os = "linux",
|
target_os = "linux",
|
||||||
@@ -17,9 +16,15 @@ pub fn usage() -> Result<Usage> { Ok(Usage::default()) }
|
|||||||
))]
|
))]
|
||||||
pub fn thread_usage() -> Result<Usage> { getrusage(UsageWho::RUSAGE_THREAD).map_err(Into::into) }
|
pub fn thread_usage() -> Result<Usage> { getrusage(UsageWho::RUSAGE_THREAD).map_err(Into::into) }
|
||||||
|
|
||||||
#[cfg(not(any(
|
#[cfg(all(
|
||||||
target_os = "linux",
|
unix,
|
||||||
target_os = "freebsd",
|
not(any(
|
||||||
target_os = "openbsd"
|
target_os = "linux",
|
||||||
)))]
|
target_os = "freebsd",
|
||||||
pub fn thread_usage() -> Result<Usage> { Ok(Usage::default()) }
|
target_os = "openbsd"
|
||||||
|
))
|
||||||
|
))]
|
||||||
|
pub fn thread_usage() -> Result<Usage> { getrusage(UsageWho::RUSAGE_SELF).map_err(Into::into) }
|
||||||
|
|
||||||
|
#[cfg(not(unix))]
|
||||||
|
pub fn thread_usage() -> Result<()> { Ok(()) }
|
||||||
|
|||||||
50
src/service/rooms/search/backend.rs
Normal file
50
src/service/rooms/search/backend.rs
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use ruma::RoomId;
|
||||||
|
use tuwunel_core::Result;
|
||||||
|
|
||||||
|
use crate::rooms::{short::ShortRoomId, timeline::RawPduId};
|
||||||
|
|
||||||
|
/// A search hit returned by a backend.
|
||||||
|
pub(super) struct SearchHit {
|
||||||
|
pub(super) pdu_id: RawPduId,
|
||||||
|
#[expect(dead_code)]
|
||||||
|
pub(super) score: Option<f64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Query parameters passed to the backend for searching.
|
||||||
|
pub(super) struct BackendQuery<'a> {
|
||||||
|
pub(super) search_term: &'a str,
|
||||||
|
pub(super) room_ids: Option<&'a [&'a RoomId]>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Trait abstracting the search index backend.
|
||||||
|
#[async_trait]
|
||||||
|
pub(super) trait SearchBackend: Send + Sync {
|
||||||
|
async fn index_pdu(
|
||||||
|
&self,
|
||||||
|
shortroomid: ShortRoomId,
|
||||||
|
pdu_id: &RawPduId,
|
||||||
|
room_id: &RoomId,
|
||||||
|
message_body: &str,
|
||||||
|
) -> Result;
|
||||||
|
|
||||||
|
async fn deindex_pdu(
|
||||||
|
&self,
|
||||||
|
shortroomid: ShortRoomId,
|
||||||
|
pdu_id: &RawPduId,
|
||||||
|
room_id: &RoomId,
|
||||||
|
message_body: &str,
|
||||||
|
) -> Result;
|
||||||
|
|
||||||
|
async fn search_pdu_ids(
|
||||||
|
&self,
|
||||||
|
query: &BackendQuery<'_>,
|
||||||
|
shortroomid: Option<ShortRoomId>,
|
||||||
|
limit: usize,
|
||||||
|
skip: usize,
|
||||||
|
) -> Result<(usize, Vec<SearchHit>)>;
|
||||||
|
|
||||||
|
async fn delete_room(&self, room_id: &RoomId, shortroomid: ShortRoomId) -> Result;
|
||||||
|
|
||||||
|
fn supports_cross_room_search(&self) -> bool { false }
|
||||||
|
}
|
||||||
@@ -1,34 +1,33 @@
|
|||||||
|
mod backend;
|
||||||
|
mod opensearch;
|
||||||
|
mod rocksdb;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use ruma::{RoomId, UserId, api::client::search::search_events::v3::Criteria};
|
use ruma::{RoomId, UserId, api::client::search::search_events::v3::Criteria};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
PduCount, Result,
|
Result, implement,
|
||||||
arrayvec::ArrayVec,
|
|
||||||
implement,
|
|
||||||
matrix::event::{Event, Matches},
|
matrix::event::{Event, Matches},
|
||||||
trace,
|
utils::{IterStream, ReadyExt, stream::WidebandExt},
|
||||||
utils::{
|
|
||||||
ArrayVecExt, IterStream, ReadyExt, set,
|
|
||||||
stream::{TryIgnore, WidebandExt},
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
use tuwunel_database::{Interfix, Map, keyval::Val};
|
|
||||||
|
|
||||||
use crate::rooms::{
|
use self::{
|
||||||
short::ShortRoomId,
|
backend::{BackendQuery, SearchBackend, SearchHit},
|
||||||
timeline::{PduId, RawPduId},
|
opensearch::OpenSearchBackend,
|
||||||
|
rocksdb::RocksDbBackend,
|
||||||
};
|
};
|
||||||
|
use crate::rooms::{short::ShortRoomId, timeline::RawPduId};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
db: Data,
|
backend: Box<dyn SearchBackend>,
|
||||||
|
bulk_rx: Option<tokio::sync::Mutex<mpsc::Receiver<opensearch::BulkAction>>>,
|
||||||
|
opensearch: Option<Arc<OpenSearchBackend>>,
|
||||||
services: Arc<crate::services::OnceServices>,
|
services: Arc<crate::services::OnceServices>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Data {
|
|
||||||
tokenids: Arc<Map>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct RoomQuery<'a> {
|
pub struct RoomQuery<'a> {
|
||||||
pub room_id: &'a RoomId,
|
pub room_id: &'a RoomId,
|
||||||
@@ -38,52 +37,133 @@ pub struct RoomQuery<'a> {
|
|||||||
pub skip: usize,
|
pub skip: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
type TokenId = ArrayVec<u8, TOKEN_ID_MAX_LEN>;
|
#[async_trait]
|
||||||
|
|
||||||
const TOKEN_ID_MAX_LEN: usize =
|
|
||||||
size_of::<ShortRoomId>() + WORD_MAX_LEN + 1 + size_of::<RawPduId>();
|
|
||||||
const WORD_MAX_LEN: usize = 50;
|
|
||||||
|
|
||||||
impl crate::Service for Service {
|
impl crate::Service for Service {
|
||||||
fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
|
fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
|
||||||
|
let config = &args.server.config;
|
||||||
|
let (backend, bulk_rx, opensearch): (Box<dyn SearchBackend>, _, _) = match config
|
||||||
|
.search_backend
|
||||||
|
{
|
||||||
|
| tuwunel_core::config::SearchBackendConfig::RocksDb => {
|
||||||
|
let b = RocksDbBackend::new(args.db["tokenids"].clone());
|
||||||
|
(Box::new(b), None, None)
|
||||||
|
},
|
||||||
|
| tuwunel_core::config::SearchBackendConfig::OpenSearch => {
|
||||||
|
let (b, rx) = OpenSearchBackend::new(config)?;
|
||||||
|
let b = Arc::new(b);
|
||||||
|
let backend: Box<dyn SearchBackend> = Box::new(OpenSearchWorkerRef(b.clone()));
|
||||||
|
(backend, Some(tokio::sync::Mutex::new(rx)), Some(b))
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
db: Data { tokenids: args.db["tokenids"].clone() },
|
backend,
|
||||||
|
bulk_rx,
|
||||||
|
opensearch,
|
||||||
services: args.services.clone(),
|
services: args.services.clone(),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||||
|
|
||||||
|
async fn worker(self: Arc<Self>) -> Result {
|
||||||
|
if let (Some(os), Some(rx_mutex)) = (self.opensearch.as_ref(), self.bulk_rx.as_ref()) {
|
||||||
|
os.ensure_index().await?;
|
||||||
|
|
||||||
|
let services = &**self.services;
|
||||||
|
let config = &services.server.config;
|
||||||
|
let batch_size = config.search_opensearch_batch_size;
|
||||||
|
let flush_interval =
|
||||||
|
std::time::Duration::from_millis(config.search_opensearch_flush_interval_ms);
|
||||||
|
|
||||||
|
let mut rx = rx_mutex.lock().await;
|
||||||
|
os.process_bulk(&mut rx, batch_size, flush_interval)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wrapper so we can put the `Arc<OpenSearchBackend>` behind the trait.
|
||||||
|
struct OpenSearchWorkerRef(Arc<OpenSearchBackend>);
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl SearchBackend for OpenSearchWorkerRef {
|
||||||
|
async fn index_pdu(
|
||||||
|
&self,
|
||||||
|
shortroomid: ShortRoomId,
|
||||||
|
pdu_id: &RawPduId,
|
||||||
|
room_id: &RoomId,
|
||||||
|
message_body: &str,
|
||||||
|
) -> Result {
|
||||||
|
self.0
|
||||||
|
.index_pdu(shortroomid, pdu_id, room_id, message_body)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn deindex_pdu(
|
||||||
|
&self,
|
||||||
|
shortroomid: ShortRoomId,
|
||||||
|
pdu_id: &RawPduId,
|
||||||
|
room_id: &RoomId,
|
||||||
|
message_body: &str,
|
||||||
|
) -> Result {
|
||||||
|
self.0
|
||||||
|
.deindex_pdu(shortroomid, pdu_id, room_id, message_body)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn search_pdu_ids(
|
||||||
|
&self,
|
||||||
|
query: &BackendQuery<'_>,
|
||||||
|
shortroomid: Option<ShortRoomId>,
|
||||||
|
limit: usize,
|
||||||
|
skip: usize,
|
||||||
|
) -> Result<(usize, Vec<SearchHit>)> {
|
||||||
|
self.0
|
||||||
|
.search_pdu_ids(query, shortroomid, limit, skip)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_room(&self, room_id: &RoomId, shortroomid: ShortRoomId) -> Result {
|
||||||
|
self.0.delete_room(room_id, shortroomid).await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn supports_cross_room_search(&self) -> bool { self.0.supports_cross_room_search() }
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
pub fn index_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_body: &str) {
|
pub async fn index_pdu(
|
||||||
let batch = tokenize(message_body)
|
&self,
|
||||||
.map(|word| {
|
shortroomid: ShortRoomId,
|
||||||
let mut key = shortroomid.to_be_bytes().to_vec();
|
pdu_id: &RawPduId,
|
||||||
key.extend_from_slice(word.as_bytes());
|
room_id: &RoomId,
|
||||||
key.push(0xFF);
|
message_body: &str,
|
||||||
key.extend_from_slice(pdu_id.as_ref()); // TODO: currently we save the room id a second time here
|
) {
|
||||||
key
|
if let Err(e) = self
|
||||||
})
|
.backend
|
||||||
.collect::<Vec<_>>();
|
.index_pdu(shortroomid, pdu_id, room_id, message_body)
|
||||||
|
.await
|
||||||
self.db
|
{
|
||||||
.tokenids
|
tuwunel_core::warn!("Failed to index PDU: {e}");
|
||||||
.insert_batch(batch.iter().map(|k| (k.as_slice(), &[])));
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
pub fn deindex_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_body: &str) {
|
pub async fn deindex_pdu(
|
||||||
let batch = tokenize(message_body).map(|word| {
|
&self,
|
||||||
let mut key = shortroomid.to_be_bytes().to_vec();
|
shortroomid: ShortRoomId,
|
||||||
key.extend_from_slice(word.as_bytes());
|
pdu_id: &RawPduId,
|
||||||
key.push(0xFF);
|
room_id: &RoomId,
|
||||||
key.extend_from_slice(pdu_id.as_ref()); // TODO: currently we save the room id a second time here
|
message_body: &str,
|
||||||
key
|
) {
|
||||||
});
|
if let Err(e) = self
|
||||||
|
.backend
|
||||||
for token in batch {
|
.deindex_pdu(shortroomid, pdu_id, room_id, message_body)
|
||||||
self.db.tokenids.remove(&token);
|
.await
|
||||||
|
{
|
||||||
|
tuwunel_core::warn!("Failed to deindex PDU: {e}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -92,12 +172,31 @@ pub async fn search_pdus<'a>(
|
|||||||
&'a self,
|
&'a self,
|
||||||
query: &'a RoomQuery<'a>,
|
query: &'a RoomQuery<'a>,
|
||||||
) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + '_)> {
|
) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + '_)> {
|
||||||
let pdu_ids: Vec<_> = self.search_pdu_ids(query).await?.collect().await;
|
let shortroomid = self
|
||||||
|
.services
|
||||||
|
.short
|
||||||
|
.get_shortroomid(query.room_id)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let backend_query = BackendQuery {
|
||||||
|
search_term: &query.criteria.search_term,
|
||||||
|
room_ids: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (count, hits) = self
|
||||||
|
.backend
|
||||||
|
.search_pdu_ids(
|
||||||
|
&backend_query,
|
||||||
|
Some(shortroomid),
|
||||||
|
query.limit.saturating_add(query.skip),
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let filter = &query.criteria.filter;
|
let filter = &query.criteria.filter;
|
||||||
let count = pdu_ids.len();
|
let pdus = hits
|
||||||
let pdus = pdu_ids
|
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
.map(|h| h.pdu_id)
|
||||||
.stream()
|
.stream()
|
||||||
.wide_filter_map(async |result_pdu_id: RawPduId| {
|
.wide_filter_map(async |result_pdu_id: RawPduId| {
|
||||||
self.services
|
self.services
|
||||||
@@ -121,123 +220,64 @@ pub async fn search_pdus<'a>(
|
|||||||
Ok((count, pdus))
|
Ok((count, pdus))
|
||||||
}
|
}
|
||||||
|
|
||||||
// result is modeled as a stream such that callers don't have to be refactored
|
|
||||||
// though an additional async/wrap still exists for now
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
pub async fn search_pdu_ids(
|
pub async fn search_pdus_multi_room<'a>(
|
||||||
&self,
|
&'a self,
|
||||||
query: &RoomQuery<'_>,
|
room_ids: &'a [&'a RoomId],
|
||||||
) -> Result<impl Stream<Item = RawPduId> + Send + '_ + use<'_>> {
|
sender_user: &'a UserId,
|
||||||
let shortroomid = self
|
criteria: &'a Criteria,
|
||||||
.services
|
limit: usize,
|
||||||
.short
|
skip: usize,
|
||||||
.get_shortroomid(query.room_id)
|
) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + '_)> {
|
||||||
|
let backend_query = BackendQuery {
|
||||||
|
search_term: &criteria.search_term,
|
||||||
|
room_ids: Some(room_ids),
|
||||||
|
};
|
||||||
|
|
||||||
|
let (count, hits) = self
|
||||||
|
.backend
|
||||||
|
.search_pdu_ids(&backend_query, None, limit.saturating_add(skip), 0)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let pdu_ids = self
|
let filter = &criteria.filter;
|
||||||
.search_pdu_ids_query_room(query, shortroomid)
|
let pdus = hits
|
||||||
.await;
|
.into_iter()
|
||||||
|
.map(|h| h.pdu_id)
|
||||||
let iters = pdu_ids.into_iter().map(IntoIterator::into_iter);
|
|
||||||
|
|
||||||
Ok(set::intersection(iters).stream())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[implement(Service)]
|
|
||||||
async fn search_pdu_ids_query_room(
|
|
||||||
&self,
|
|
||||||
query: &RoomQuery<'_>,
|
|
||||||
shortroomid: ShortRoomId,
|
|
||||||
) -> Vec<Vec<RawPduId>> {
|
|
||||||
tokenize(&query.criteria.search_term)
|
|
||||||
.stream()
|
.stream()
|
||||||
.wide_then(async |word| {
|
.wide_filter_map(async |result_pdu_id: RawPduId| {
|
||||||
self.search_pdu_ids_query_words(shortroomid, &word)
|
self.services
|
||||||
.collect::<Vec<_>>()
|
.timeline
|
||||||
|
.get_pdu_from_id(&result_pdu_id)
|
||||||
.await
|
.await
|
||||||
|
.ok()
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>()
|
.ready_filter(|pdu| !pdu.is_redacted())
|
||||||
.await
|
.ready_filter(move |pdu| filter.matches(pdu))
|
||||||
}
|
.wide_filter_map(async move |pdu| {
|
||||||
|
self.services
|
||||||
/// Iterate over PduId's containing a word
|
.state_accessor
|
||||||
#[implement(Service)]
|
.user_can_see_event(sender_user, pdu.room_id(), pdu.event_id())
|
||||||
fn search_pdu_ids_query_words<'a>(
|
.await
|
||||||
&'a self,
|
.then_some(pdu)
|
||||||
shortroomid: ShortRoomId,
|
|
||||||
word: &'a str,
|
|
||||||
) -> impl Stream<Item = RawPduId> + Send + '_ {
|
|
||||||
self.search_pdu_ids_query_word(shortroomid, word)
|
|
||||||
.map(move |key| -> RawPduId {
|
|
||||||
let key = &key[prefix_len(word)..];
|
|
||||||
key.into()
|
|
||||||
})
|
})
|
||||||
|
.skip(skip)
|
||||||
|
.take(limit);
|
||||||
|
|
||||||
|
Ok((count, pdus))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Iterate over raw database results for a word
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
fn search_pdu_ids_query_word(
|
pub fn supports_cross_room_search(&self) -> bool { self.backend.supports_cross_room_search() }
|
||||||
&self,
|
|
||||||
shortroomid: ShortRoomId,
|
|
||||||
word: &str,
|
|
||||||
) -> impl Stream<Item = Val<'_>> + Send + '_ + use<'_> {
|
|
||||||
// rustc says const'ing this not yet stable
|
|
||||||
let end_id: RawPduId = PduId { shortroomid, count: PduCount::max() }.into();
|
|
||||||
|
|
||||||
// Newest pdus first
|
|
||||||
let end = make_tokenid(shortroomid, word, &end_id);
|
|
||||||
let prefix = make_prefix(shortroomid, word);
|
|
||||||
self.db
|
|
||||||
.tokenids
|
|
||||||
.rev_raw_keys_from(&end)
|
|
||||||
.ignore_err()
|
|
||||||
.ready_take_while(move |key| key.starts_with(&prefix))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
pub async fn delete_all_search_tokenids_for_room(&self, room_id: &RoomId) -> Result {
|
pub async fn delete_all_search_tokenids_for_room(&self, room_id: &RoomId) -> Result {
|
||||||
let prefix = (room_id, Interfix);
|
let shortroomid = self
|
||||||
|
.services
|
||||||
|
.short
|
||||||
|
.get_shortroomid(room_id)
|
||||||
|
.await?;
|
||||||
|
|
||||||
self.db
|
self.backend
|
||||||
.tokenids
|
.delete_room(room_id, shortroomid)
|
||||||
.keys_prefix_raw(&prefix)
|
.await
|
||||||
.ignore_err()
|
|
||||||
.ready_for_each(|key| {
|
|
||||||
trace!("Removing key: {key:?}");
|
|
||||||
self.db.tokenids.remove(key);
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Splits a string into tokens used as keys in the search inverted index
|
|
||||||
///
|
|
||||||
/// This may be used to tokenize both message bodies (for indexing) or search
|
|
||||||
/// queries (for querying).
|
|
||||||
fn tokenize(body: &str) -> impl Iterator<Item = String> + Send + '_ {
|
|
||||||
body.split_terminator(|c: char| !c.is_alphanumeric())
|
|
||||||
.filter(|s| !s.is_empty())
|
|
||||||
.filter(|word| word.len() <= WORD_MAX_LEN)
|
|
||||||
.map(str::to_lowercase)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_tokenid(shortroomid: ShortRoomId, word: &str, pdu_id: &RawPduId) -> TokenId {
|
|
||||||
let mut key = make_prefix(shortroomid, word);
|
|
||||||
key.extend_from_slice(pdu_id.as_ref());
|
|
||||||
key
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_prefix(shortroomid: ShortRoomId, word: &str) -> TokenId {
|
|
||||||
let mut key = TokenId::new();
|
|
||||||
key.extend_from_slice(&shortroomid.to_be_bytes());
|
|
||||||
key.extend_from_slice(word.as_bytes());
|
|
||||||
key.push(tuwunel_database::SEP);
|
|
||||||
key
|
|
||||||
}
|
|
||||||
|
|
||||||
fn prefix_len(word: &str) -> usize {
|
|
||||||
size_of::<ShortRoomId>()
|
|
||||||
.saturating_add(word.len())
|
|
||||||
.saturating_add(1)
|
|
||||||
}
|
}
|
||||||
|
|||||||
1442
src/service/rooms/search/opensearch.rs
Normal file
1442
src/service/rooms/search/opensearch.rs
Normal file
File diff suppressed because it is too large
Load Diff
193
src/service/rooms/search/rocksdb.rs
Normal file
193
src/service/rooms/search/rocksdb.rs
Normal file
@@ -0,0 +1,193 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use ruma::RoomId;
|
||||||
|
use tuwunel_core::{
|
||||||
|
PduCount, Result,
|
||||||
|
arrayvec::ArrayVec,
|
||||||
|
utils::{ArrayVecExt, ReadyExt, set, stream::TryIgnore},
|
||||||
|
};
|
||||||
|
use tuwunel_database::{Interfix, Map, keyval::Val};
|
||||||
|
|
||||||
|
use super::backend::{BackendQuery, SearchBackend, SearchHit};
|
||||||
|
use crate::rooms::{
|
||||||
|
short::ShortRoomId,
|
||||||
|
timeline::{PduId, RawPduId},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub(super) struct RocksDbBackend {
|
||||||
|
tokenids: Arc<Map>,
|
||||||
|
}
|
||||||
|
|
||||||
|
type TokenId = ArrayVec<u8, TOKEN_ID_MAX_LEN>;
|
||||||
|
|
||||||
|
const TOKEN_ID_MAX_LEN: usize =
|
||||||
|
size_of::<ShortRoomId>() + WORD_MAX_LEN + 1 + size_of::<RawPduId>();
|
||||||
|
const WORD_MAX_LEN: usize = 50;
|
||||||
|
|
||||||
|
impl RocksDbBackend {
|
||||||
|
pub(super) fn new(tokenids: Arc<Map>) -> Self { Self { tokenids } }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl SearchBackend for RocksDbBackend {
|
||||||
|
async fn index_pdu(
|
||||||
|
&self,
|
||||||
|
shortroomid: ShortRoomId,
|
||||||
|
pdu_id: &RawPduId,
|
||||||
|
_room_id: &RoomId,
|
||||||
|
message_body: &str,
|
||||||
|
) -> Result {
|
||||||
|
let batch = tokenize(message_body)
|
||||||
|
.map(|word| {
|
||||||
|
let mut key = shortroomid.to_be_bytes().to_vec();
|
||||||
|
key.extend_from_slice(word.as_bytes());
|
||||||
|
key.push(0xFF);
|
||||||
|
key.extend_from_slice(pdu_id.as_ref());
|
||||||
|
key
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
self.tokenids
|
||||||
|
.insert_batch(batch.iter().map(|k| (k.as_slice(), &[])));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn deindex_pdu(
|
||||||
|
&self,
|
||||||
|
shortroomid: ShortRoomId,
|
||||||
|
pdu_id: &RawPduId,
|
||||||
|
_room_id: &RoomId,
|
||||||
|
message_body: &str,
|
||||||
|
) -> Result {
|
||||||
|
let batch = tokenize(message_body).map(|word| {
|
||||||
|
let mut key = shortroomid.to_be_bytes().to_vec();
|
||||||
|
key.extend_from_slice(word.as_bytes());
|
||||||
|
key.push(0xFF);
|
||||||
|
key.extend_from_slice(pdu_id.as_ref());
|
||||||
|
key
|
||||||
|
});
|
||||||
|
|
||||||
|
for token in batch {
|
||||||
|
self.tokenids.remove(&token);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn search_pdu_ids(
|
||||||
|
&self,
|
||||||
|
query: &BackendQuery<'_>,
|
||||||
|
shortroomid: Option<ShortRoomId>,
|
||||||
|
_limit: usize,
|
||||||
|
_skip: usize,
|
||||||
|
) -> Result<(usize, Vec<SearchHit>)> {
|
||||||
|
let shortroomid = shortroomid.ok_or_else(|| {
|
||||||
|
tuwunel_core::err!(Request(InvalidParam(
|
||||||
|
"RocksDB backend requires a shortroomid for search"
|
||||||
|
)))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let pdu_ids =
|
||||||
|
search_pdu_ids_query_room(&self.tokenids, shortroomid, query.search_term).await;
|
||||||
|
|
||||||
|
let iters = pdu_ids.into_iter().map(IntoIterator::into_iter);
|
||||||
|
let results: Vec<_> = set::intersection(iters)
|
||||||
|
.map(|pdu_id| SearchHit { pdu_id, score: None })
|
||||||
|
.collect();
|
||||||
|
let count = results.len();
|
||||||
|
|
||||||
|
Ok((count, results))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_room(&self, room_id: &RoomId, _shortroomid: ShortRoomId) -> Result {
|
||||||
|
let prefix = (room_id, Interfix);
|
||||||
|
|
||||||
|
self.tokenids
|
||||||
|
.keys_prefix_raw(&prefix)
|
||||||
|
.ignore_err()
|
||||||
|
.ready_for_each(|key| {
|
||||||
|
self.tokenids.remove(key);
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn search_pdu_ids_query_room(
|
||||||
|
tokenids: &Arc<Map>,
|
||||||
|
shortroomid: ShortRoomId,
|
||||||
|
search_term: &str,
|
||||||
|
) -> Vec<Vec<RawPduId>> {
|
||||||
|
tokenize(search_term)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.into_iter()
|
||||||
|
.map(|word| {
|
||||||
|
let tokenids = tokenids.clone();
|
||||||
|
async move {
|
||||||
|
search_pdu_ids_query_words(&tokenids, shortroomid, &word)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<futures::stream::FuturesUnordered<_>>()
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn search_pdu_ids_query_words<'a>(
|
||||||
|
tokenids: &'a Arc<Map>,
|
||||||
|
shortroomid: ShortRoomId,
|
||||||
|
word: &'a str,
|
||||||
|
) -> impl futures::Stream<Item = RawPduId> + Send + 'a {
|
||||||
|
search_pdu_ids_query_word(tokenids, shortroomid, word).map(move |key| -> RawPduId {
|
||||||
|
let key = &key[prefix_len(word)..];
|
||||||
|
key.into()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn search_pdu_ids_query_word<'a>(
|
||||||
|
tokenids: &'a Arc<Map>,
|
||||||
|
shortroomid: ShortRoomId,
|
||||||
|
word: &'a str,
|
||||||
|
) -> impl futures::Stream<Item = Val<'a>> + Send + 'a {
|
||||||
|
let end_id: RawPduId = PduId { shortroomid, count: PduCount::max() }.into();
|
||||||
|
|
||||||
|
let end = make_tokenid(shortroomid, word, &end_id);
|
||||||
|
let prefix = make_prefix(shortroomid, word);
|
||||||
|
tokenids
|
||||||
|
.rev_raw_keys_from(&end)
|
||||||
|
.ignore_err()
|
||||||
|
.ready_take_while(move |key| key.starts_with(&prefix))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Splits a string into tokens used as keys in the search inverted index
|
||||||
|
pub(super) fn tokenize(body: &str) -> impl Iterator<Item = String> + Send + '_ {
|
||||||
|
body.split_terminator(|c: char| !c.is_alphanumeric())
|
||||||
|
.filter(|s| !s.is_empty())
|
||||||
|
.filter(|word| word.len() <= WORD_MAX_LEN)
|
||||||
|
.map(str::to_lowercase)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_tokenid(shortroomid: ShortRoomId, word: &str, pdu_id: &RawPduId) -> TokenId {
|
||||||
|
let mut key = make_prefix(shortroomid, word);
|
||||||
|
key.extend_from_slice(pdu_id.as_ref());
|
||||||
|
key
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_prefix(shortroomid: ShortRoomId, word: &str) -> TokenId {
|
||||||
|
let mut key = TokenId::new();
|
||||||
|
key.extend_from_slice(&shortroomid.to_be_bytes());
|
||||||
|
key.extend_from_slice(word.as_bytes());
|
||||||
|
key.push(tuwunel_database::SEP);
|
||||||
|
key
|
||||||
|
}
|
||||||
|
|
||||||
|
fn prefix_len(word: &str) -> usize {
|
||||||
|
size_of::<ShortRoomId>()
|
||||||
|
.saturating_add(word.len())
|
||||||
|
.saturating_add(1)
|
||||||
|
}
|
||||||
@@ -281,7 +281,8 @@ async fn append_pdu_effects(
|
|||||||
if let Some(body) = content.body {
|
if let Some(body) = content.body {
|
||||||
self.services
|
self.services
|
||||||
.search
|
.search
|
||||||
.index_pdu(shortroomid, &pdu_id, &body);
|
.index_pdu(shortroomid, &pdu_id, pdu.room_id(), &body)
|
||||||
|
.await;
|
||||||
|
|
||||||
if self
|
if self
|
||||||
.services
|
.services
|
||||||
|
|||||||
@@ -240,7 +240,8 @@ pub async fn backfill_pdu(
|
|||||||
if let Some(body) = content.body {
|
if let Some(body) = content.body {
|
||||||
self.services
|
self.services
|
||||||
.search
|
.search
|
||||||
.index_pdu(shortroomid, &pdu_id, &body);
|
.index_pdu(shortroomid, &pdu_id, pdu.room_id(), &body)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
drop(mutex_lock);
|
drop(mutex_lock);
|
||||||
|
|||||||
@@ -40,14 +40,15 @@ pub async fn redact_pdu<Pdu: Event + Send + Sync>(
|
|||||||
.get("body")
|
.get("body")
|
||||||
.and_then(|body| body.as_str());
|
.and_then(|body| body.as_str());
|
||||||
|
|
||||||
|
let room_id = RoomId::parse(pdu["room_id"].as_str().unwrap()).unwrap();
|
||||||
|
|
||||||
if let Some(body) = body {
|
if let Some(body) = body {
|
||||||
self.services
|
self.services
|
||||||
.search
|
.search
|
||||||
.deindex_pdu(shortroomid, &pdu_id, body);
|
.deindex_pdu(shortroomid, &pdu_id, room_id, body)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let room_id = RoomId::parse(pdu["room_id"].as_str().unwrap()).unwrap();
|
|
||||||
|
|
||||||
let room_version_id = self
|
let room_version_id = self
|
||||||
.services
|
.services
|
||||||
.state
|
.state
|
||||||
|
|||||||
@@ -907,6 +907,72 @@
|
|||||||
#
|
#
|
||||||
#auto_deactivate_banned_room_attempts = false
|
#auto_deactivate_banned_room_attempts = false
|
||||||
|
|
||||||
|
# Search backend to use for full-text message search.
|
||||||
|
#
|
||||||
|
# Available options: "rocksdb" (default) or "opensearch".
|
||||||
|
#
|
||||||
|
#search_backend = "rocksdb"
|
||||||
|
|
||||||
|
# URL of the OpenSearch instance. Required when search_backend is
|
||||||
|
# "opensearch".
|
||||||
|
#
|
||||||
|
# example: "http://localhost:9200"
|
||||||
|
#
|
||||||
|
#search_opensearch_url =
|
||||||
|
|
||||||
|
# Name of the OpenSearch index for message search.
|
||||||
|
#
|
||||||
|
#search_opensearch_index = "tuwunel_messages"
|
||||||
|
|
||||||
|
# Authentication for OpenSearch in "user:pass" format.
|
||||||
|
#
|
||||||
|
#search_opensearch_auth =
|
||||||
|
|
||||||
|
# Maximum number of documents to batch before flushing to OpenSearch.
|
||||||
|
#
|
||||||
|
#search_opensearch_batch_size = 100
|
||||||
|
|
||||||
|
# Maximum time in milliseconds to wait before flushing a partial batch
|
||||||
|
# to OpenSearch.
|
||||||
|
#
|
||||||
|
#search_opensearch_flush_interval_ms = 1000
|
||||||
|
|
||||||
|
# Enable hybrid neural+BM25 search in OpenSearch. Requires an ML model
|
||||||
|
# deployed in OpenSearch and an ingest pipeline that populates an
|
||||||
|
# "embedding" field.
|
||||||
|
#
|
||||||
|
# When enabled, tuwunel will:
|
||||||
|
# - Create the index with a knn_vector "embedding" field
|
||||||
|
# - Attach the ingest pipeline (search_opensearch_pipeline) to the index
|
||||||
|
# - Use hybrid queries combining BM25 + neural kNN scoring
|
||||||
|
#
|
||||||
|
# For a complete reference on configuring OpenSearch's ML plugin, model
|
||||||
|
# registration, and ingest pipeline setup, see the test helpers in
|
||||||
|
# `src/service/rooms/search/opensearch.rs` (the `ensure_neural_model`,
|
||||||
|
# `ensure_ingest_pipeline`, etc. functions in the `tests` module).
|
||||||
|
#
|
||||||
|
# See also: https://opensearch.org/docs/latest/search-plugins/neural-search/
|
||||||
|
#
|
||||||
|
#search_opensearch_hybrid = false
|
||||||
|
|
||||||
|
# The model ID registered in OpenSearch for neural search. Required when
|
||||||
|
# search_opensearch_hybrid is enabled.
|
||||||
|
#
|
||||||
|
# example: "aKV84osBBHNT0StI3MBr"
|
||||||
|
#
|
||||||
|
#search_opensearch_model_id =
|
||||||
|
|
||||||
|
# Embedding dimension for the neural search model. Must match the output
|
||||||
|
# dimension of the deployed model. Common values: 384
|
||||||
|
# (all-MiniLM-L6-v2), 768 (msmarco-distilbert-base-tas-b).
|
||||||
|
#
|
||||||
|
#search_opensearch_embedding_dim = 384
|
||||||
|
|
||||||
|
# Name of the ingest pipeline that generates embeddings for the
|
||||||
|
# "embedding" field. This pipeline must already exist in OpenSearch.
|
||||||
|
#
|
||||||
|
#search_opensearch_pipeline = "tuwunel_embedding_pipeline"
|
||||||
|
|
||||||
# RocksDB log level. This is not the same as tuwunel's log level. This
|
# RocksDB log level. This is not the same as tuwunel's log level. This
|
||||||
# is the log level for the RocksDB engine/library which show up in your
|
# is the log level for the RocksDB engine/library which show up in your
|
||||||
# database folder/path as `LOG` files. tuwunel will log RocksDB errors
|
# database folder/path as `LOG` files. tuwunel will log RocksDB errors
|
||||||
|
|||||||
Reference in New Issue
Block a user