Move shortstatehash queries from state_accessor to state service.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -134,7 +134,7 @@ pub(crate) async fn get_context_route(
|
||||
.map_or_else(|| body.event_id.as_ref(), |pdu| pdu.event_id.as_ref());
|
||||
|
||||
let state_ids = services
|
||||
.state_accessor
|
||||
.state
|
||||
.pdu_shortstatehash(state_at)
|
||||
.or_else(|_| services.state.get_room_shortstatehash(room_id))
|
||||
.map_ok(|shortstatehash| {
|
||||
|
||||
@@ -580,7 +580,7 @@ async fn handle_left_room(
|
||||
};
|
||||
|
||||
let Ok(left_shortstatehash) = services
|
||||
.state_accessor
|
||||
.state
|
||||
.pdu_shortstatehash(&left_event_id)
|
||||
.await
|
||||
else {
|
||||
@@ -703,16 +703,12 @@ async fn load_joined_room(
|
||||
.iter()
|
||||
.map(at!(0))
|
||||
.map(PduCount::into_unsigned)
|
||||
.map(|shorteventid| {
|
||||
services
|
||||
.state_accessor
|
||||
.get_shortstatehash(shorteventid)
|
||||
})
|
||||
.map(|shorteventid| services.state.get_shortstatehash(shorteventid))
|
||||
.next()
|
||||
.into();
|
||||
|
||||
let current_shortstatehash = services
|
||||
.state_accessor
|
||||
.state
|
||||
.get_shortstatehash(last_timeline_count.into_unsigned())
|
||||
.or_else(|_| services.state.get_room_shortstatehash(room_id));
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ pub(crate) async fn get_room_state_route(
|
||||
.await?;
|
||||
|
||||
let shortstatehash = services
|
||||
.state_accessor
|
||||
.state
|
||||
.pdu_shortstatehash(&body.event_id)
|
||||
.await
|
||||
.map_err(|_| err!(Request(NotFound("PDU state not found."))))?;
|
||||
|
||||
@@ -26,7 +26,7 @@ pub(crate) async fn get_room_state_ids_route(
|
||||
.await?;
|
||||
|
||||
let shortstatehash = services
|
||||
.state_accessor
|
||||
.state
|
||||
.pdu_shortstatehash(&body.event_id)
|
||||
.await
|
||||
.map_err(|_| err!(Request(NotFound("Pdu state not found."))))?;
|
||||
|
||||
@@ -40,7 +40,7 @@ where
|
||||
|
||||
let Ok(prev_event_sstatehash) = self
|
||||
.services
|
||||
.state_accessor
|
||||
.state
|
||||
.pdu_shortstatehash(prev_event_id)
|
||||
.await
|
||||
else {
|
||||
@@ -105,7 +105,7 @@ where
|
||||
|
||||
let sstatehash = self
|
||||
.services
|
||||
.state_accessor
|
||||
.state
|
||||
.pdu_shortstatehash(prev_event_id);
|
||||
|
||||
try_join(sstatehash, prev_event)
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use std::{collections::HashMap, fmt::Write, iter::once, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::{FutureExt, Stream, StreamExt, TryStreamExt, future::join_all, pin_mut};
|
||||
use futures::{
|
||||
FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::join_all, pin_mut,
|
||||
};
|
||||
use ruma::{
|
||||
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId,
|
||||
events::{
|
||||
@@ -490,6 +492,44 @@ pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<ShortSta
|
||||
.deserialized()
|
||||
}
|
||||
|
||||
/// Returns the state hash at this event.
|
||||
#[implement(Service)]
|
||||
pub async fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<ShortStateHash> {
|
||||
self.services
|
||||
.short
|
||||
.get_shorteventid(event_id)
|
||||
.and_then(|shorteventid| self.get_shortstatehash(shorteventid))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns the state hash at this event.
|
||||
#[implement(Service)]
|
||||
#[tracing::instrument(
|
||||
level = "debug"
|
||||
skip(self),
|
||||
ret,
|
||||
)]
|
||||
pub async fn get_shortstatehash(&self, shorteventid: ShortEventId) -> Result<ShortStateHash> {
|
||||
const BUFSIZE: usize = size_of::<ShortEventId>();
|
||||
|
||||
self.db
|
||||
.shorteventid_shortstatehash
|
||||
.aqry::<BUFSIZE, _>(&shorteventid)
|
||||
.await
|
||||
.deserialized()
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub(super) async fn delete_room_shortstatehash(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
_mutex_lock: &Guard<OwnedRoomId, ()>,
|
||||
) -> Result {
|
||||
self.db.roomid_shortstatehash.remove(room_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
#[tracing::instrument(
|
||||
level = "trace"
|
||||
@@ -552,14 +592,3 @@ pub(super) async fn delete_all_rooms_forward_extremities(&self, room_id: &RoomId
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub(super) async fn delete_room_shortstatehash(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
_mutex_lock: &Guard<OwnedRoomId, ()>,
|
||||
) -> Result {
|
||||
self.db.roomid_shortstatehash.remove(room_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -31,26 +31,15 @@ use tuwunel_core::{
|
||||
Result, err,
|
||||
matrix::{Event, room_version, state_res::events::RoomCreateEvent},
|
||||
};
|
||||
use tuwunel_database::Map;
|
||||
|
||||
pub struct Service {
|
||||
services: Arc<crate::services::OnceServices>,
|
||||
db: Data,
|
||||
}
|
||||
|
||||
struct Data {
|
||||
shorteventid_shortstatehash: Arc<Map>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl crate::Service for Service {
|
||||
fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
services: args.services.clone(),
|
||||
db: Data {
|
||||
shorteventid_shortstatehash: args.db["shorteventid_shortstatehash"].clone(),
|
||||
},
|
||||
}))
|
||||
Ok(Arc::new(Self { services: args.services.clone() }))
|
||||
}
|
||||
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
|
||||
@@ -18,7 +18,12 @@ pub async fn server_can_see_event(
|
||||
room_id: &RoomId,
|
||||
event_id: &EventId,
|
||||
) -> bool {
|
||||
let Ok(shortstatehash) = self.pdu_shortstatehash(event_id).await else {
|
||||
let Ok(shortstatehash) = self
|
||||
.services
|
||||
.state
|
||||
.pdu_shortstatehash(event_id)
|
||||
.await
|
||||
else {
|
||||
return true;
|
||||
};
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ use tuwunel_core::{
|
||||
stream::{BroadbandExt, IterStream, ReadyExt, TryIgnore},
|
||||
},
|
||||
};
|
||||
use tuwunel_database::Deserialized;
|
||||
|
||||
use crate::rooms::{
|
||||
short::{ShortEventId, ShortStateHash, ShortStateKey},
|
||||
@@ -421,25 +420,3 @@ async fn load_full_state(&self, shortstatehash: ShortStateHash) -> Result<Arc<Co
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns the state hash at this event.
|
||||
#[implement(super::Service)]
|
||||
pub async fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<ShortStateHash> {
|
||||
self.services
|
||||
.short
|
||||
.get_shorteventid(event_id)
|
||||
.and_then(|shorteventid| self.get_shortstatehash(shorteventid))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns the state hash at this event.
|
||||
#[implement(super::Service)]
|
||||
pub async fn get_shortstatehash(&self, shorteventid: ShortEventId) -> Result<ShortStateHash> {
|
||||
const BUFSIZE: usize = size_of::<ShortEventId>();
|
||||
|
||||
self.db
|
||||
.shorteventid_shortstatehash
|
||||
.aqry::<BUFSIZE, _>(&shorteventid)
|
||||
.await
|
||||
.deserialized()
|
||||
}
|
||||
|
||||
@@ -83,7 +83,12 @@ pub async fn user_can_see_event(
|
||||
room_id: &RoomId,
|
||||
event_id: &EventId,
|
||||
) -> bool {
|
||||
let Ok(shortstatehash) = self.pdu_shortstatehash(event_id).await else {
|
||||
let Ok(shortstatehash) = self
|
||||
.services
|
||||
.state
|
||||
.pdu_shortstatehash(event_id)
|
||||
.await
|
||||
else {
|
||||
return true;
|
||||
};
|
||||
|
||||
|
||||
@@ -117,7 +117,7 @@ where
|
||||
{
|
||||
if let Ok(shortstatehash) = self
|
||||
.services
|
||||
.state_accessor
|
||||
.state
|
||||
.pdu_shortstatehash(pdu.event_id())
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -179,7 +179,7 @@ pub async fn prev_shortstatehash(
|
||||
let prev = self.prev_timeline_count(room_id, before).await?;
|
||||
|
||||
self.services
|
||||
.state_accessor
|
||||
.state
|
||||
.get_shortstatehash(prev.into_unsigned())
|
||||
.await
|
||||
}
|
||||
@@ -197,7 +197,7 @@ pub async fn next_shortstatehash(
|
||||
let next = self.next_timeline_count(room_id, after).await?;
|
||||
|
||||
self.services
|
||||
.state_accessor
|
||||
.state
|
||||
.get_shortstatehash(next.into_unsigned())
|
||||
.await
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user