Outdent state_compressor service.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-04-29 06:55:54 +00:00
parent a479382974
commit 94f74c66f2

View File

@@ -12,7 +12,7 @@ use ruma::{EventId, RoomId};
use tuwunel_core::{ use tuwunel_core::{
Result, Result,
arrayvec::ArrayVec, arrayvec::ArrayVec,
at, checked, err, expected, utils, at, checked, err, expected, implement, utils,
utils::{bytes, math::usize_from_f64, stream::IterStream}, utils::{bytes, math::usize_from_f64, stream::IterStream},
}; };
use tuwunel_database::Map; use tuwunel_database::Map;
@@ -124,9 +124,9 @@ impl crate::Service for Service {
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
} }
impl Service {
/// Returns a stack with info on shortstatehash, full state, added diff and /// Returns a stack with info on shortstatehash, full state, added diff and
/// removed diff for the selected shortstatehash and each parent layer. /// removed diff for the selected shortstatehash and each parent layer.
#[implement(Service)]
#[tracing::instrument(name = "load", level = "debug", skip(self))] #[tracing::instrument(name = "load", level = "debug", skip(self))]
pub async fn load_shortstatehash_info( pub async fn load_shortstatehash_info(
&self, &self,
@@ -152,6 +152,7 @@ impl Service {
/// Returns a stack with info on shortstatehash, full state, added diff and /// Returns a stack with info on shortstatehash, full state, added diff and
/// removed diff for the selected shortstatehash and each parent layer. /// removed diff for the selected shortstatehash and each parent layer.
#[implement(Service)]
#[tracing::instrument( #[tracing::instrument(
name = "cache", name = "cache",
level = "debug", level = "debug",
@@ -173,6 +174,7 @@ impl Service {
Ok(()) Ok(())
} }
#[implement(Service)]
async fn new_shortstatehash_info( async fn new_shortstatehash_info(
&self, &self,
shortstatehash: ShortStateHash, shortstatehash: ShortStateHash,
@@ -209,6 +211,7 @@ impl Service {
Ok(stack) Ok(stack)
} }
#[implement(Service)]
pub fn compress_state_events<'a, I>( pub fn compress_state_events<'a, I>(
&'a self, &'a self,
state: I, state: I,
@@ -227,11 +230,10 @@ impl Service {
.stream() .stream()
.map(at!(0)) .map(at!(0))
.zip(short_event_ids) .zip(short_event_ids)
.map(|(shortstatekey, shorteventid)| { .map(|(shortstatekey, shorteventid)| compress_state_event(*shortstatekey, shorteventid))
compress_state_event(*shortstatekey, shorteventid)
})
} }
#[implement(Service)]
pub async fn compress_state_event( pub async fn compress_state_event(
&self, &self,
shortstatekey: ShortStateKey, shortstatekey: ShortStateKey,
@@ -260,10 +262,11 @@ impl Service {
/// * `statediffnew` - Added to base. Each vec is shortstatekey+shorteventid /// * `statediffnew` - Added to base. Each vec is shortstatekey+shorteventid
/// * `statediffremoved` - Removed from base. Each vec is /// * `statediffremoved` - Removed from base. Each vec is
/// shortstatekey+shorteventid /// shortstatekey+shorteventid
/// * `diff_to_sibling` - Approximately how much the diff grows each time /// * `diff_to_sibling` - Approximately how much the diff grows each time for
/// for this layer /// this layer
/// * `parent_states` - A stack with info on shortstatehash, full state, /// * `parent_states` - A stack with info on shortstatehash, full state, added
/// added diff and removed diff for each parent layer /// diff and removed diff for each parent layer
#[implement(Service)]
pub fn save_state_from_diff( pub fn save_state_from_diff(
&self, &self,
shortstatehash: ShortStateHash, shortstatehash: ShortStateHash,
@@ -381,6 +384,7 @@ impl Service {
/// Returns the new shortstatehash, and the state diff from the previous /// Returns the new shortstatehash, and the state diff from the previous
/// room state /// room state
#[implement(Service)]
#[tracing::instrument(skip(self, new_state_ids_compressed), level = "debug")] #[tracing::instrument(skip(self, new_state_ids_compressed), level = "debug")]
pub async fn save_state( pub async fn save_state(
&self, &self,
@@ -421,8 +425,7 @@ impl Service {
ShortStateInfoVec::new() ShortStateInfoVec::new()
}; };
let (statediffnew, statediffremoved) = let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last() {
if let Some(parent_stateinfo) = states_parents.last() {
let statediffnew: CompressedState = new_state_ids_compressed let statediffnew: CompressedState = new_state_ids_compressed
.difference(&parent_stateinfo.full_state) .difference(&parent_stateinfo.full_state)
.copied() .copied()
@@ -456,6 +459,7 @@ impl Service {
}) })
} }
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug", name = "get")] #[tracing::instrument(skip(self), level = "debug", name = "get")]
async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result<StateDiff> { async fn get_statediff(&self, shortstatehash: ShortStateHash) -> Result<StateDiff> {
const BUFSIZE: usize = size_of::<ShortStateHash>(); const BUFSIZE: usize = size_of::<ShortStateHash>();
@@ -503,6 +507,7 @@ impl Service {
}) })
} }
#[implement(Service)]
fn save_statediff(&self, shortstatehash: ShortStateHash, diff: &StateDiff) { fn save_statediff(&self, shortstatehash: ShortStateHash, diff: &StateDiff) {
let mut value = Vec::<u8>::with_capacity( let mut value = Vec::<u8>::with_capacity(
2_usize 2_usize
@@ -528,7 +533,6 @@ impl Service {
.shortstatehash_statediff .shortstatehash_statediff
.insert(&shortstatehash.to_be_bytes(), &value); .insert(&shortstatehash.to_be_bytes(), &value);
} }
}
#[inline] #[inline]
#[must_use] #[must_use]