chain_width to 50

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-04-22 04:42:26 +00:00
parent 9b658d86b2
commit 76509830e6
190 changed files with 3469 additions and 930 deletions

View File

@@ -87,7 +87,9 @@ impl Service {
let mut aliasid = room_id.as_bytes().to_vec();
aliasid.push(0xFF);
aliasid.extend_from_slice(&self.services.globals.next_count()?.to_be_bytes());
self.db.aliasid_alias.insert(&aliasid, alias.as_bytes());
self.db
.aliasid_alias
.insert(&aliasid, alias.as_bytes());
Ok(())
}
@@ -171,7 +173,11 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn resolve_local_alias(&self, alias: &RoomAliasId) -> Result<OwnedRoomId> {
self.db.alias_roomid.get(alias.alias()).await.deserialized()
self.db
.alias_roomid
.get(alias.alias())
.await
.deserialized()
}
#[tracing::instrument(skip(self), level = "debug")]
@@ -248,7 +254,11 @@ impl Service {
}
async fn who_created_alias(&self, alias: &RoomAliasId) -> Result<OwnedUserId> {
self.db.alias_userid.get(alias.alias()).await.deserialized()
self.db
.alias_userid
.get(alias.alias())
.await
.deserialized()
}
async fn resolve_appservice_alias(

View File

@@ -136,7 +136,10 @@ async fn get_auth_chain_outer(
return Ok(Vec::new());
}
if let Ok(cached) = self.get_cached_eventid_authchain(&chunk_key).await {
if let Ok(cached) = self
.get_cached_eventid_authchain(&chunk_key)
.await
{
return Ok(cached.to_vec());
}
@@ -144,11 +147,16 @@ async fn get_auth_chain_outer(
.into_iter()
.try_stream()
.broad_and_then(|(shortid, event_id)| async move {
if let Ok(cached) = self.get_cached_eventid_authchain(&[shortid]).await {
if let Ok(cached) = self
.get_cached_eventid_authchain(&[shortid])
.await
{
return Ok(cached.to_vec());
}
let auth_chain = self.get_auth_chain_inner(room_id, event_id).await?;
let auth_chain = self
.get_auth_chain_inner(room_id, event_id)
.await?;
self.cache_auth_chain_vec(vec![shortid], auth_chain.as_slice());
debug!(
?event_id,
@@ -254,4 +262,10 @@ pub fn get_cache_usage(&self) -> (usize, usize) {
}
#[implement(Service)]
pub fn clear_cache(&self) { self.db.auth_chain_cache.lock().expect("locked").clear(); }
pub fn clear_cache(&self) {
self.db
.auth_chain_cache
.lock()
.expect("locked")
.clear();
}

View File

@@ -25,8 +25,12 @@ pub async fn acl_check(&self, server_name: &ServerName, room_id: &RoomId) -> Res
return Ok(());
}
if acl_event_content.deny.contains(&String::from("*"))
&& acl_event_content.allow.contains(&String::from("*"))
if acl_event_content
.deny
.contains(&String::from("*"))
&& acl_event_content
.allow
.contains(&String::from("*"))
{
warn!(%room_id, "Ignoring broken ACL event (allow key and deny key both contain wildcard \"*\"");
return Ok(());

View File

@@ -66,7 +66,11 @@ pub async fn handle_incoming_pdu<'a>(
let meta_exists = self.services.metadata.exists(room_id).map(Ok);
// 1.2 Check if the room is disabled
let is_disabled = self.services.metadata.is_disabled(room_id).map(Ok);
let is_disabled = self
.services
.metadata
.is_disabled(room_id)
.map(Ok);
// 1.3.1 Check room ACL on origin field/server
let origin_acl_check = self.acl_check(origin, room_id);

View File

@@ -100,7 +100,11 @@ impl Service {
}
async fn event_fetch(&self, event_id: OwnedEventId) -> Option<PduEvent> {
self.services.timeline.get_pdu(&event_id).await.ok()
self.services
.timeline
.get_pdu(&event_id)
.await
.ok()
}
}

View File

@@ -92,7 +92,11 @@ pub async fn resolve_state(
let new_room_state: CompressedState = self
.services
.state_compressor
.compress_state_events(state_events.iter().map(|(ssk, eid)| (ssk, (*eid).borrow())))
.compress_state_events(
state_events
.iter()
.map(|(ssk, eid)| (ssk, (*eid).borrow())),
)
.collect()
.await;

View File

@@ -54,7 +54,8 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
debug!("Resolving state at event");
let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 {
self.state_at_incoming_degree_one(&incoming_pdu).await?
self.state_at_incoming_degree_one(&incoming_pdu)
.await?
} else {
self.state_at_incoming_resolved(&incoming_pdu, room_id, &room_version_id)
.await?
@@ -74,10 +75,19 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
// 11. Check the auth of the event passes based on the state of the event
let state_fetch_state = &state_at_incoming_event;
let state_fetch = |k: StateEventType, s: StateKey| async move {
let shortstatekey = self.services.short.get_shortstatekey(&k, &s).await.ok()?;
let shortstatekey = self
.services
.short
.get_shortstatekey(&k, &s)
.await
.ok()?;
let event_id = state_fetch_state.get(&shortstatekey)?;
self.services.timeline.get_pdu(event_id).await.ok()
self.services
.timeline
.get_pdu(event_id)
.await
.ok()
};
let auth_check = state_res::event_auth::auth_check(

View File

@@ -76,7 +76,9 @@ pub async fn witness_retain(&self, senders: Witness, ctx: &Context<'_>) -> Witne
);
let include_redundant = cfg!(feature = "element_hacks")
|| ctx.options.is_some_and(Options::include_redundant_members);
|| ctx
.options
.is_some_and(Options::include_redundant_members);
let witness = self
.witness(ctx, senders.iter().map(AsRef::as_ref))

View File

@@ -48,5 +48,7 @@ pub async fn get_pdu_outlier(&self, event_id: &EventId) -> Result<PduEvent> {
#[implement(Service)]
#[tracing::instrument(skip(self, pdu), level = "debug")]
pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) {
self.db.eventid_outlierpdu.raw_put(event_id, Json(pdu));
self.db
.eventid_outlierpdu
.raw_put(event_id, Json(pdu));
}

View File

@@ -52,7 +52,8 @@ impl Data {
const BUFSIZE: usize = size_of::<u64>() * 2;
let key: &[u64] = &[to, from];
self.tofrom_relation.aput_raw::<BUFSIZE, _, _>(key, []);
self.tofrom_relation
.aput_raw::<BUFSIZE, _, _>(key, []);
}
pub(super) fn get_relations<'a>(
@@ -65,11 +66,21 @@ impl Data {
) -> impl Stream<Item = PdusIterItem> + Send + '_ {
let mut current = ArrayVec::<u8, 16>::new();
current.extend(target.to_be_bytes());
current.extend(from.saturating_inc(dir).into_unsigned().to_be_bytes());
current.extend(
from.saturating_inc(dir)
.into_unsigned()
.to_be_bytes(),
);
let current = current.as_slice();
match dir {
| Direction::Forward => self.tofrom_relation.raw_keys_from(current).boxed(),
| Direction::Backward => self.tofrom_relation.rev_raw_keys_from(current).boxed(),
| Direction::Forward => self
.tofrom_relation
.raw_keys_from(current)
.boxed(),
| Direction::Backward => self
.tofrom_relation
.rev_raw_keys_from(current)
.boxed(),
}
.ignore_err()
.ready_take_while(move |key| key.starts_with(&target.to_be_bytes()))
@@ -78,7 +89,12 @@ impl Data {
.wide_filter_map(move |shorteventid| async move {
let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into();
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let mut pdu = self
.services
.timeline
.get_pdu_from_id(&pdu_id)
.await
.ok()?;
if pdu.sender != user_id {
pdu.remove_transaction_id().log_err().ok();
@@ -109,6 +125,9 @@ impl Data {
}
pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.softfailedeventids.get(event_id).await.is_ok()
self.softfailedeventids
.get(event_id)
.await
.is_ok()
}
}

View File

@@ -119,7 +119,9 @@ impl Service {
#[inline]
#[tracing::instrument(skip(self), level = "debug")]
pub async fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> bool {
self.db.is_event_referenced(room_id, event_id).await
self.db
.is_event_referenced(room_id, event_id)
.await
}
#[inline]

View File

@@ -58,7 +58,8 @@ impl Data {
let count = self.services.globals.next_count().unwrap();
let latest_id = (room_id, count, user_id);
self.readreceiptid_readreceipt.put(latest_id, Json(event));
self.readreceiptid_readreceipt
.put(latest_id, Json(event));
}
pub(super) fn readreceipts_since<'a>(
@@ -91,7 +92,8 @@ impl Data {
let next_count = self.services.globals.next_count().unwrap();
self.roomuserid_privateread.put(key, pdu_count);
self.roomuserid_lastprivatereadupdate.put(key, next_count);
self.roomuserid_lastprivatereadupdate
.put(key, next_count);
}
pub(super) async fn private_read_get_count(
@@ -100,7 +102,10 @@ impl Data {
user_id: &UserId,
) -> Result<u64> {
let key = (room_id, user_id);
self.roomuserid_privateread.qry(&key).await.deserialized()
self.roomuserid_privateread
.qry(&key)
.await
.deserialized()
}
pub(super) async fn last_privateread_update(

View File

@@ -54,7 +54,9 @@ impl Service {
room_id: &RoomId,
event: &ReceiptEvent,
) {
self.db.readreceipt_update(user_id, room_id, event).await;
self.db
.readreceipt_update(user_id, room_id, event)
.await;
self.services
.sending
.flush_room(room_id)
@@ -68,18 +70,30 @@ impl Service {
room_id: &RoomId,
user_id: &UserId,
) -> Result<Raw<AnySyncEphemeralRoomEvent>> {
let pdu_count = self.private_read_get_count(room_id, user_id).map_err(|e| {
err!(Database(warn!("No private read receipt was set in {room_id}: {e}")))
});
let shortroomid = self.services.short.get_shortroomid(room_id).map_err(|e| {
err!(Database(warn!("Short room ID does not exist in database for {room_id}: {e}")))
});
let pdu_count = self
.private_read_get_count(room_id, user_id)
.map_err(|e| {
err!(Database(warn!("No private read receipt was set in {room_id}: {e}")))
});
let shortroomid = self
.services
.short
.get_shortroomid(room_id)
.map_err(|e| {
err!(Database(warn!(
"Short room ID does not exist in database for {room_id}: {e}"
)))
});
let (pdu_count, shortroomid) = try_join!(pdu_count, shortroomid)?;
let shorteventid = PduCount::Normal(pdu_count);
let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into();
let pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await?;
let pdu = self
.services
.timeline
.get_pdu_from_id(&pdu_id)
.await?;
let event_id: OwnedEventId = pdu.event_id;
let user_id: OwnedUserId = user_id.to_owned();
@@ -129,13 +143,17 @@ impl Service {
room_id: &RoomId,
user_id: &UserId,
) -> Result<u64> {
self.db.private_read_get_count(room_id, user_id).await
self.db
.private_read_get_count(room_id, user_id)
.await
}
/// Returns the PDU count of the last typing update in this room.
#[inline]
pub async fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> u64 {
self.db.last_privateread_update(user_id, room_id).await
self.db
.last_privateread_update(user_id, room_id)
.await
}
}

View File

@@ -139,9 +139,15 @@ pub async fn search_pdu_ids(
&self,
query: &RoomQuery<'_>,
) -> Result<impl Stream<Item = RawPduId> + Send + '_ + use<'_>> {
let shortroomid = self.services.short.get_shortroomid(query.room_id).await?;
let shortroomid = self
.services
.short
.get_shortroomid(query.room_id)
.await?;
let pdu_ids = self.search_pdu_ids_query_room(query, shortroomid).await;
let pdu_ids = self
.search_pdu_ids_query_room(query, shortroomid)
.await;
let iters = pdu_ids.into_iter().map(IntoIterator::into_iter);

View File

@@ -112,7 +112,10 @@ pub async fn get_or_create_shortstatekey(
) -> ShortStateKey {
const BUFSIZE: usize = size_of::<ShortStateKey>();
if let Ok(shortstatekey) = self.get_shortstatekey(event_type, state_key).await {
if let Ok(shortstatekey) = self
.get_shortstatekey(event_type, state_key)
.await
{
return shortstatekey;
}
@@ -235,7 +238,11 @@ pub async fn get_or_create_shortstatehash(&self, state_hash: &[u8]) -> (ShortSta
#[implement(Service)]
pub async fn get_shortroomid(&self, room_id: &RoomId) -> Result<ShortRoomId> {
self.db.roomid_shortroomid.get(room_id).await.deserialized()
self.db
.roomid_shortroomid
.get(room_id)
.await
.deserialized()
}
#[implement(Service)]

View File

@@ -92,14 +92,23 @@ impl crate::Service for Service {
}
async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
let roomid_spacehierarchy_cache = self.roomid_spacehierarchy_cache.lock().await.len();
let roomid_spacehierarchy_cache = self
.roomid_spacehierarchy_cache
.lock()
.await
.len();
writeln!(out, "roomid_spacehierarchy_cache: {roomid_spacehierarchy_cache}")?;
Ok(())
}
async fn clear_cache(&self) { self.roomid_spacehierarchy_cache.lock().await.clear(); }
async fn clear_cache(&self) {
self.roomid_spacehierarchy_cache
.lock()
.await
.clear();
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
@@ -121,7 +130,11 @@ pub async fn get_summary_and_children_local(
| None => (), // cache miss
| Some(None) => return Ok(None),
| Some(Some(cached)) => {
let allowed_rooms = cached.summary.allowed_room_ids.iter().map(AsRef::as_ref);
let allowed_rooms = cached
.summary
.allowed_room_ids
.iter()
.map(AsRef::as_ref);
let is_accessible_child = self.is_accessible_child(
current_room,
@@ -154,10 +167,13 @@ pub async fn get_summary_and_children_local(
return Ok(None);
};
self.roomid_spacehierarchy_cache.lock().await.insert(
current_room.to_owned(),
Some(CachedSpaceHierarchySummary { summary: summary.clone() }),
);
self.roomid_spacehierarchy_cache
.lock()
.await
.insert(
current_room.to_owned(),
Some(CachedSpaceHierarchySummary { summary: summary.clone() }),
);
Ok(Some(SummaryAccessibility::Accessible(summary)))
}
@@ -196,10 +212,13 @@ async fn get_summary_and_children_federation(
};
let summary = response.room;
self.roomid_spacehierarchy_cache.lock().await.insert(
current_room.to_owned(),
Some(CachedSpaceHierarchySummary { summary: summary.clone() }),
);
self.roomid_spacehierarchy_cache
.lock()
.await
.insert(
current_room.to_owned(),
Some(CachedSpaceHierarchySummary { summary: summary.clone() }),
);
response
.children
@@ -304,7 +323,11 @@ async fn get_room_summary(
children_state: Vec<Raw<HierarchySpaceChildEvent>>,
identifier: &Identifier<'_>,
) -> Result<SpaceHierarchyParentSummary, Error> {
let join_rule = self.services.state_accessor.get_join_rules(room_id).await;
let join_rule = self
.services
.state_accessor
.get_join_rules(room_id)
.await;
let is_accessible_child = self
.is_accessible_child(
@@ -319,15 +342,33 @@ async fn get_room_summary(
return Err!(Request(Forbidden("User is not allowed to see the room")));
}
let name = self.services.state_accessor.get_name(room_id).ok();
let name = self
.services
.state_accessor
.get_name(room_id)
.ok();
let topic = self.services.state_accessor.get_room_topic(room_id).ok();
let topic = self
.services
.state_accessor
.get_room_topic(room_id)
.ok();
let room_type = self.services.state_accessor.get_room_type(room_id).ok();
let room_type = self
.services
.state_accessor
.get_room_type(room_id)
.ok();
let world_readable = self.services.state_accessor.is_world_readable(room_id);
let world_readable = self
.services
.state_accessor
.is_world_readable(room_id);
let guest_can_join = self.services.state_accessor.guest_can_join(room_id);
let guest_can_join = self
.services
.state_accessor
.guest_can_join(room_id);
let num_joined_members = self
.services
@@ -392,7 +433,10 @@ async fn get_room_summary(
room_version,
room_id: room_id.to_owned(),
num_joined_members: num_joined_members.try_into().unwrap_or_default(),
allowed_room_ids: join_rule.allowed_rooms().map(Into::into).collect(),
allowed_room_ids: join_rule
.allowed_rooms()
.map(Into::into)
.collect(),
join_rule: join_rule.clone().into(),
};
@@ -425,9 +469,15 @@ where
}
if let Identifier::UserId(user_id) = identifier {
let is_joined = self.services.state_cache.is_joined(user_id, current_room);
let is_joined = self
.services
.state_cache
.is_joined(user_id, current_room);
let is_invited = self.services.state_cache.is_invited(user_id, current_room);
let is_invited = self
.services
.state_cache
.is_invited(user_id, current_room);
pin_mut!(is_joined, is_invited);
if is_joined.or(is_invited).await {
@@ -444,9 +494,15 @@ where
.stream()
.any(async |room| match identifier {
| Identifier::UserId(user) =>
self.services.state_cache.is_joined(user, room).await,
self.services
.state_cache
.is_joined(user, room)
.await,
| Identifier::ServerName(server) =>
self.services.state_cache.server_in_room(server, room).await,
self.services
.state_cache
.server_in_room(server, room)
.await,
})
.await,

View File

@@ -120,7 +120,11 @@ impl Service {
match pdu.kind {
| TimelineEventType::RoomMember => {
let Some(user_id) = pdu.state_key.as_ref().map(UserId::parse).flat_ok()
let Some(user_id) = pdu
.state_key
.as_ref()
.map(UserId::parse)
.flat_ok()
else {
continue;
};
@@ -154,7 +158,10 @@ impl Service {
}
}
self.services.state_cache.update_joined_count(room_id).await;
self.services
.state_cache
.update_joined_count(room_id)
.await;
self.set_room_state(room_id, shortstatehash, state_lock);
@@ -218,13 +225,15 @@ impl Service {
} else {
(state_ids_compressed, Arc::new(CompressedState::new()))
};
self.services.state_compressor.save_state_from_diff(
shortstatehash,
statediffnew,
statediffremoved,
1_000_000, // high number because no state will be based on this one
states_parents,
)?;
self.services
.state_compressor
.save_state_from_diff(
shortstatehash,
statediffnew,
statediffremoved,
1_000_000, // high number because no state will be based on this one
states_parents,
)?;
}
self.db
@@ -248,7 +257,9 @@ impl Service {
.get_or_create_shorteventid(&new_pdu.event_id)
.await;
let previous_shortstatehash = self.get_room_shortstatehash(&new_pdu.room_id).await;
let previous_shortstatehash = self
.get_room_shortstatehash(&new_pdu.room_id)
.await;
if let Ok(p) = previous_shortstatehash {
self.db
@@ -303,13 +314,15 @@ impl Service {
statediffremoved.insert(*replaces);
}
self.services.state_compressor.save_state_from_diff(
shortstatehash,
Arc::new(statediffnew),
Arc::new(statediffremoved),
2,
states_parents,
)?;
self.services
.state_compressor
.save_state_from_diff(
shortstatehash,
Arc::new(statediffnew),
Arc::new(statediffremoved),
2,
states_parents,
)?;
Ok(shortstatehash)
},

View File

@@ -49,7 +49,11 @@ pub fn room_state_full_pdus<'a>(
self.services
.state
.get_room_shortstatehash(room_id)
.map_ok(|shortstatehash| self.state_full_pdus(shortstatehash).map(Ok).boxed())
.map_ok(|shortstatehash| {
self.state_full_pdus(shortstatehash)
.map(Ok)
.boxed()
})
.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
.try_flatten_stream()
}

View File

@@ -29,7 +29,8 @@ use crate::rooms::{
#[implement(super::Service)]
#[inline]
pub async fn user_was_joined(&self, shortstatehash: ShortStateHash, user_id: &UserId) -> bool {
self.user_membership(shortstatehash, user_id).await == MembershipState::Join
self.user_membership(shortstatehash, user_id)
.await == MembershipState::Join
}
/// The user was an invited or joined room member at this state (potentially
@@ -37,7 +38,9 @@ pub async fn user_was_joined(&self, shortstatehash: ShortStateHash, user_id: &Us
#[implement(super::Service)]
#[inline]
pub async fn user_was_invited(&self, shortstatehash: ShortStateHash, user_id: &UserId) -> bool {
let s = self.user_membership(shortstatehash, user_id).await;
let s = self
.user_membership(shortstatehash, user_id)
.await;
s == MembershipState::Join || s == MembershipState::Invite
}
@@ -259,7 +262,9 @@ pub fn state_keys_with_shortids<'a>(
.zip(shorteventids)
.ready_filter_map(|(res, id)| res.map(|res| (res, id)).ok())
.ready_filter_map(move |((event_type_, state_key), event_id)| {
event_type_.eq(event_type).then_some((state_key, event_id))
event_type_
.eq(event_type)
.then_some((state_key, event_id))
})
}
@@ -338,7 +343,11 @@ pub fn state_full_pdus(
.multi_get_eventid_from_short(short_ids)
.ready_filter_map(Result::ok)
.broad_filter_map(move |event_id: OwnedEventId| async move {
self.services.timeline.get_pdu(&event_id).await.ok()
self.services
.timeline
.get_pdu(&event_id)
.await
.ok()
})
}
@@ -406,7 +415,12 @@ async fn load_full_state(&self, shortstatehash: ShortStateHash) -> Result<Arc<Co
.state_compressor
.load_shortstatehash_info(shortstatehash)
.map_err(|e| err!(Database("Missing state IDs: {e}")))
.map_ok(|vec| vec.last().expect("at least one layer").full_state.clone())
.map_ok(|vec| {
vec.last()
.expect("at least one layer")
.full_state
.clone()
})
.await
}

View File

@@ -98,7 +98,11 @@ pub async fn user_can_see_event(
return true;
};
let currently_member = self.services.state_cache.is_joined(user_id, room_id).await;
let currently_member = self
.services
.state_cache
.is_joined(user_id, room_id)
.await;
let history_visibility = self
.state_get_content(shortstatehash, &StateEventType::RoomHistoryVisibility, "")
@@ -110,11 +114,13 @@ pub async fn user_can_see_event(
match history_visibility {
| HistoryVisibility::Invited => {
// Allow if any member on requesting server was AT LEAST invited, else deny
self.user_was_invited(shortstatehash, user_id).await
self.user_was_invited(shortstatehash, user_id)
.await
},
| HistoryVisibility::Joined => {
// Allow if any member on requested server was joined, else deny
self.user_was_joined(shortstatehash, user_id).await
self.user_was_joined(shortstatehash, user_id)
.await
},
| HistoryVisibility::WorldReadable => true,
| HistoryVisibility::Shared | _ => currently_member,
@@ -126,7 +132,12 @@ pub async fn user_can_see_event(
#[implement(super::Service)]
#[tracing::instrument(skip_all, level = "trace")]
pub async fn user_can_see_state_events(&self, user_id: &UserId, room_id: &RoomId) -> bool {
if self.services.state_cache.is_joined(user_id, room_id).await {
if self
.services
.state_cache
.is_joined(user_id, room_id)
.await
{
return true;
}
@@ -139,7 +150,10 @@ pub async fn user_can_see_state_events(&self, user_id: &UserId, room_id: &RoomId
match history_visibility {
| HistoryVisibility::Invited =>
self.services.state_cache.is_invited(user_id, room_id).await,
self.services
.state_cache
.is_invited(user_id, room_id)
.await,
| HistoryVisibility::WorldReadable => true,
| _ => false,
}

View File

@@ -248,7 +248,9 @@ impl Service {
.update(
None,
user_id,
GlobalAccountDataEventType::Direct.to_string().into(),
GlobalAccountDataEventType::Direct
.to_string()
.into(),
&serde_json::to_value(&direct_event)
.expect("to json always works"),
)
@@ -262,7 +264,12 @@ impl Service {
},
| MembershipState::Invite => {
// We want to know if the sender is ignored by the receiver
if self.services.users.user_is_ignored(sender, user_id).await {
if self
.services
.users
.user_is_ignored(sender, user_id)
.await
{
return Ok(());
}
@@ -346,14 +353,22 @@ impl Service {
self.db.userroomid_joined.insert(&userroom_id, []);
self.db.roomuserid_joined.insert(&roomuser_id, []);
self.db.userroomid_invitestate.remove(&userroom_id);
self.db.roomuserid_invitecount.remove(&roomuser_id);
self.db
.userroomid_invitestate
.remove(&userroom_id);
self.db
.roomuserid_invitecount
.remove(&roomuser_id);
self.db.userroomid_leftstate.remove(&userroom_id);
self.db.roomuserid_leftcount.remove(&roomuser_id);
self.db.userroomid_knockedstate.remove(&userroom_id);
self.db.roomuserid_knockedcount.remove(&roomuser_id);
self.db
.userroomid_knockedstate
.remove(&userroom_id);
self.db
.roomuserid_knockedcount
.remove(&roomuser_id);
self.db.roomid_inviteviaservers.remove(room_id);
}
@@ -382,11 +397,19 @@ impl Service {
self.db.userroomid_joined.remove(&userroom_id);
self.db.roomuserid_joined.remove(&roomuser_id);
self.db.userroomid_invitestate.remove(&userroom_id);
self.db.roomuserid_invitecount.remove(&roomuser_id);
self.db
.userroomid_invitestate
.remove(&userroom_id);
self.db
.roomuserid_invitecount
.remove(&roomuser_id);
self.db.userroomid_knockedstate.remove(&userroom_id);
self.db.roomuserid_knockedcount.remove(&roomuser_id);
self.db
.userroomid_knockedstate
.remove(&userroom_id);
self.db
.roomuserid_knockedcount
.remove(&roomuser_id);
self.db.roomid_inviteviaservers.remove(room_id);
}
@@ -417,8 +440,12 @@ impl Service {
self.db.userroomid_joined.remove(&userroom_id);
self.db.roomuserid_joined.remove(&roomuser_id);
self.db.userroomid_invitestate.remove(&userroom_id);
self.db.roomuserid_invitecount.remove(&roomuser_id);
self.db
.userroomid_invitestate
.remove(&userroom_id);
self.db
.roomuserid_invitecount
.remove(&roomuser_id);
self.db.userroomid_leftstate.remove(&userroom_id);
self.db.roomuserid_leftcount.remove(&roomuser_id);
@@ -523,7 +550,11 @@ impl Service {
/// Returns the number of users which are currently in a room
#[tracing::instrument(skip(self), level = "trace")]
pub async fn room_joined_count(&self, room_id: &RoomId) -> Result<u64> {
self.db.roomid_joinedcount.get(room_id).await.deserialized()
self.db
.roomid_joinedcount
.get(room_id)
.await
.deserialized()
}
#[tracing::instrument(skip(self), level = "debug")]
@@ -623,7 +654,11 @@ impl Service {
#[tracing::instrument(skip(self), level = "trace")]
pub async fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> {
let key = (room_id, user_id);
self.db.roomuserid_leftcount.qry(&key).await.deserialized()
self.db
.roomuserid_leftcount
.qry(&key)
.await
.deserialized()
}
/// Returns an iterator over all rooms this user joined.
@@ -750,7 +785,11 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn once_joined(&self, user_id: &UserId, room_id: &RoomId) -> bool {
let key = (user_id, room_id);
self.db.roomuseroncejoinedids.qry(&key).await.is_ok()
self.db
.roomuseroncejoinedids
.qry(&key)
.await
.is_ok()
}
#[tracing::instrument(skip(self), level = "trace")]
@@ -762,19 +801,31 @@ impl Service {
#[tracing::instrument(skip(self), level = "trace")]
pub async fn is_knocked<'a>(&'a self, user_id: &'a UserId, room_id: &'a RoomId) -> bool {
let key = (user_id, room_id);
self.db.userroomid_knockedstate.qry(&key).await.is_ok()
self.db
.userroomid_knockedstate
.qry(&key)
.await
.is_ok()
}
#[tracing::instrument(skip(self), level = "trace")]
pub async fn is_invited(&self, user_id: &UserId, room_id: &RoomId) -> bool {
let key = (user_id, room_id);
self.db.userroomid_invitestate.qry(&key).await.is_ok()
self.db
.userroomid_invitestate
.qry(&key)
.await
.is_ok()
}
#[tracing::instrument(skip(self), level = "trace")]
pub async fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> bool {
let key = (user_id, room_id);
self.db.userroomid_leftstate.qry(&key).await.is_ok()
self.db
.userroomid_leftstate
.qry(&key)
.await
.is_ok()
}
#[tracing::instrument(skip(self), level = "trace")]
@@ -856,7 +907,10 @@ impl Service {
}
pub fn get_appservice_in_room_cache_usage(&self) -> (usize, usize) {
let cache = self.appservice_in_room_cache.read().expect("locked");
let cache = self
.appservice_in_room_cache
.read()
.expect("locked");
(cache.len(), cache.capacity())
}
@@ -899,8 +953,12 @@ impl Service {
.unwrap_or(0),
);
self.db.roomid_joinedcount.raw_put(room_id, joinedcount);
self.db.roomid_invitedcount.raw_put(room_id, invitedcount);
self.db
.roomid_joinedcount
.raw_put(room_id, joinedcount);
self.db
.roomid_invitedcount
.raw_put(room_id, invitedcount);
self.db
.roomuserid_knockedcount
.raw_put(room_id, knockedcount);
@@ -968,11 +1026,16 @@ impl Service {
self.db.userroomid_leftstate.remove(&userroom_id);
self.db.roomuserid_leftcount.remove(&roomuser_id);
self.db.userroomid_knockedstate.remove(&userroom_id);
self.db.roomuserid_knockedcount.remove(&roomuser_id);
self.db
.userroomid_knockedstate
.remove(&userroom_id);
self.db
.roomuserid_knockedcount
.remove(&roomuser_id);
if let Some(servers) = invite_via.filter(is_not_empty!()) {
self.add_servers_invite_via(room_id, servers).await;
self.add_servers_invite_via(room_id, servers)
.await;
}
}

View File

@@ -87,22 +87,26 @@ impl crate::Service for Service {
async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
let (cache_len, ents) = {
let cache = self.stateinfo_cache.lock().expect("locked");
let ents = cache.iter().map(at!(1)).flat_map(|vec| vec.iter()).fold(
HashMap::new(),
|mut ents, ssi| {
let ents = cache
.iter()
.map(at!(1))
.flat_map(|vec| vec.iter())
.fold(HashMap::new(), |mut ents, ssi| {
for cs in &[&ssi.added, &ssi.removed, &ssi.full_state] {
ents.insert(Arc::as_ptr(cs), compressed_state_size(cs));
}
ents
},
);
});
(cache.len(), ents)
};
let ents_len = ents.len();
let bytes = ents.values().copied().fold(0_usize, usize::saturating_add);
let bytes = ents
.values()
.copied()
.fold(0_usize, usize::saturating_add);
let bytes = bytes::pretty(bytes);
writeln!(out, "stateinfo_cache: {cache_len} {ents_len} ({bytes})")?;
@@ -110,7 +114,12 @@ impl crate::Service for Service {
Ok(())
}
async fn clear_cache(&self) { self.stateinfo_cache.lock().expect("locked").clear(); }
async fn clear_cache(&self) {
self.stateinfo_cache
.lock()
.expect("locked")
.clear();
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
@@ -123,11 +132,17 @@ impl Service {
&self,
shortstatehash: ShortStateHash,
) -> Result<ShortStateInfoVec> {
if let Some(r) = self.stateinfo_cache.lock()?.get_mut(&shortstatehash) {
if let Some(r) = self
.stateinfo_cache
.lock()?
.get_mut(&shortstatehash)
{
return Ok(r.clone());
}
let stack = self.new_shortstatehash_info(shortstatehash).await?;
let stack = self
.new_shortstatehash_info(shortstatehash)
.await?;
self.cache_shortstatehash_info(shortstatehash, stack.clone())
.await?;
@@ -151,7 +166,9 @@ impl Service {
shortstatehash: ShortStateHash,
stack: ShortStateInfoVec,
) -> Result {
self.stateinfo_cache.lock()?.insert(shortstatehash, stack);
self.stateinfo_cache
.lock()?
.insert(shortstatehash, stack);
Ok(())
}
@@ -262,7 +279,9 @@ impl Service {
if parent_states.len() > 3 {
// Number of layers
// To many layers, we have to go deeper
let parent = parent_states.pop().expect("parent must have a state");
let parent = parent_states
.pop()
.expect("parent must have a state");
let mut parent_new = (*parent.added).clone();
let mut parent_removed = (*parent.removed).clone();
@@ -311,7 +330,9 @@ impl Service {
// 1. We add the current diff on top of the parent layer.
// 2. We replace a layer above
let parent = parent_states.pop().expect("parent must have a state");
let parent = parent_states
.pop()
.expect("parent must have a state");
let parent_added_len = parent.added.len();
let parent_removed_len = parent.removed.len();
let parent_diff = checked!(parent_added_len + parent_removed_len)?;
@@ -373,8 +394,11 @@ impl Service {
.await
.ok();
let state_hash =
utils::calculate_hash(new_state_ids_compressed.iter().map(|bytes| &bytes[..]));
let state_hash = utils::calculate_hash(
new_state_ids_compressed
.iter()
.map(|bytes| &bytes[..]),
);
let (new_shortstatehash, already_existed) = self
.services
@@ -390,7 +414,9 @@ impl Service {
}
let states_parents = if let Some(p) = previous_shortstatehash {
self.load_shortstatehash_info(p).await.unwrap_or_default()
self.load_shortstatehash_info(p)
.await
.unwrap_or_default()
} else {
ShortStateInfoVec::new()
};

View File

@@ -141,7 +141,11 @@ impl Service {
shorteventid: PduCount,
_inc: &'a IncludeThreads,
) -> Result<impl Stream<Item = (PduCount, PduEvent)> + Send + 'a> {
let shortroomid: ShortRoomId = self.services.short.get_shortroomid(room_id).await?;
let shortroomid: ShortRoomId = self
.services
.short
.get_shortroomid(room_id)
.await?;
let current: RawPduId = PduId {
shortroomid,
@@ -157,7 +161,12 @@ impl Service {
.map(RawPduId::from)
.ready_take_while(move |pdu_id| pdu_id.shortroomid() == shortroomid.to_be_bytes())
.wide_filter_map(move |pdu_id| async move {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let mut pdu = self
.services
.timeline
.get_pdu_from_id(&pdu_id)
.await
.ok()?;
let pdu_id: PduId = pdu_id.into();
if pdu.sender != user_id {
@@ -187,6 +196,10 @@ impl Service {
}
pub(super) async fn get_participants(&self, root_id: &RawPduId) -> Result<Vec<OwnedUserId>> {
self.db.threadid_userids.get(root_id).await.deserialized()
self.db
.threadid_userids
.get(root_id)
.await
.deserialized()
}
}

View File

@@ -159,7 +159,9 @@ impl Data {
let non_outlier = self.non_outlier_pdu_exists(event_id).boxed();
let outlier = self.outlier_pdu_exists(event_id).boxed();
select_ok([non_outlier, outlier]).await.map(at!(0))
select_ok([non_outlier, outlier])
.await
.map(at!(0))
}
/// Returns the pdu.
@@ -187,8 +189,10 @@ impl Data {
debug_assert!(matches!(count, PduCount::Normal(_)), "PduCount not Normal");
self.pduid_pdu.raw_put(pdu_id, Json(json));
self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id);
self.eventid_outlierpdu.remove(pdu.event_id.as_bytes());
self.eventid_pduid
.insert(pdu.event_id.as_bytes(), pdu_id);
self.eventid_outlierpdu
.remove(pdu.event_id.as_bytes());
}
pub(super) fn prepend_backfill_pdu(

View File

@@ -184,7 +184,9 @@ impl Service {
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
self.db.last_timeline_count(sender_user, room_id).await
self.db
.last_timeline_count(sender_user, room_id)
.await
}
/// Returns the `count` of this pdu's id.
@@ -363,7 +365,9 @@ impl Service {
let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count2 }.into();
// Insert pdu
self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2).await;
self.db
.append_pdu(&pdu_id, pdu, &pdu_json, count2)
.await;
drop(insert_lock);
@@ -395,7 +399,12 @@ impl Service {
if let Some(state_key) = &pdu.state_key {
let target_user_id = UserId::parse(state_key)?;
if self.services.users.is_active_local(target_user_id).await {
if self
.services
.users
.is_active_local(target_user_id)
.await
{
push_target.insert(target_user_id.to_owned());
}
}
@@ -462,7 +471,11 @@ impl Service {
| TimelineEventType::RoomRedaction => {
use RoomVersionId::*;
let room_version_id = self.services.state.get_room_version(&pdu.room_id).await?;
let room_version_id = self
.services
.state
.get_room_version(&pdu.room_id)
.await?;
match room_version_id {
| V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => {
if let Some(redact_id) = &pdu.redacts {
@@ -472,7 +485,8 @@ impl Service {
.user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false)
.await?
{
self.redact_pdu(redact_id, pdu, shortroomid).await?;
self.redact_pdu(redact_id, pdu, shortroomid)
.await?;
}
}
},
@@ -485,7 +499,8 @@ impl Service {
.user_can_redact(redact_id, &pdu.sender, &pdu.room_id, false)
.await?
{
self.redact_pdu(redact_id, pdu, shortroomid).await?;
self.redact_pdu(redact_id, pdu, shortroomid)
.await?;
}
}
},
@@ -508,8 +523,12 @@ impl Service {
let content: RoomMemberEventContent = pdu.get_content()?;
let stripped_state = match content.membership {
| MembershipState::Invite | MembershipState::Knock =>
self.services.state.summary_stripped(pdu).await.into(),
| MembershipState::Invite | MembershipState::Knock => self
.services
.state
.summary_stripped(pdu)
.await
.into(),
| _ => None,
};
@@ -533,9 +552,16 @@ impl Service {
| TimelineEventType::RoomMessage => {
let content: ExtractBody = pdu.get_content()?;
if let Some(body) = content.body {
self.services.search.index_pdu(shortroomid, &pdu_id, &body);
self.services
.search
.index_pdu(shortroomid, &pdu_id, &body);
if self.services.admin.is_admin_command(pdu, &body).await {
if self
.services
.admin
.is_admin_command(pdu, &body)
.await
{
self.services
.admin
.command(body, Some((*pdu.event_id).into()))?;
@@ -546,7 +572,10 @@ impl Service {
}
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
if let Ok(related_pducount) = self
.get_pdu_count(&content.relates_to.event_id)
.await
{
self.services
.pdu_metadata
.add_relation(count2, related_pducount);
@@ -834,14 +863,26 @@ impl Service {
.create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)
.await?;
if self.services.admin.is_admin_room(&pdu.room_id).await {
self.check_pdu_for_admin_room(&pdu, sender).boxed().await?;
if self
.services
.admin
.is_admin_room(&pdu.room_id)
.await
{
self.check_pdu_for_admin_room(&pdu, sender)
.boxed()
.await?;
}
// If redaction event is not authorized, do not append it to the timeline
if pdu.kind == TimelineEventType::RoomRedaction {
use RoomVersionId::*;
match self.services.state.get_room_version(&pdu.room_id).await? {
match self
.services
.state
.get_room_version(&pdu.room_id)
.await?
{
| V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => {
if let Some(redact_id) = &pdu.redacts {
if !self
@@ -885,7 +926,10 @@ impl Service {
.join_authorized_via_users_server
.as_ref()
.is_some_and(|authorising_user| {
!self.services.globals.user_is_local(authorising_user)
!self
.services
.globals
.user_is_local(authorising_user)
}) {
return Err!(Request(InvalidParam(
"Authorising user does not belong to this homeserver"
@@ -999,7 +1043,8 @@ impl Service {
user_id: &'a UserId,
room_id: &'a RoomId,
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
self.pdus(Some(user_id), room_id, None).ignore_err()
self.pdus(Some(user_id), room_id, None)
.ignore_err()
}
/// Reverse iteration starting at from.
@@ -1052,7 +1097,11 @@ impl Service {
}
}
let room_version_id = self.services.state.get_room_version(&pdu.room_id).await?;
let room_version_id = self
.services
.state
.get_room_version(&pdu.room_id)
.await?;
pdu.redact(&room_version_id, reason)?;
@@ -1098,15 +1147,18 @@ impl Service {
.await
.unwrap_or_default();
let room_mods = power_levels.users.iter().filter_map(|(user_id, level)| {
if level > &power_levels.users_default
&& !self.services.globals.user_is_local(user_id)
{
Some(user_id.server_name())
} else {
None
}
});
let room_mods = power_levels
.users
.iter()
.filter_map(|(user_id, level)| {
if level > &power_levels.users_default
&& !self.services.globals.user_is_local(user_id)
{
Some(user_id.server_name())
} else {
None
}
});
let canonical_room_alias_server = once(
self.services
@@ -1158,7 +1210,11 @@ impl Service {
match response {
| Ok(response) => {
for pdu in response.pdus {
if let Err(e) = self.backfill_pdu(backfill_server, pdu).boxed().await {
if let Err(e) = self
.backfill_pdu(backfill_server, pdu)
.boxed()
.await
{
debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}");
}
}
@@ -1176,8 +1232,11 @@ impl Service {
#[tracing::instrument(skip(self, pdu), level = "debug")]
pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) -> Result<()> {
let (room_id, event_id, value) =
self.services.event_handler.parse_incoming_pdu(&pdu).await?;
let (room_id, event_id, value) = self
.services
.event_handler
.parse_incoming_pdu(&pdu)
.await?;
// Lock so we cannot backfill the same pdu twice at the same time
let mutex_lock = self
@@ -1203,11 +1262,20 @@ impl Service {
let pdu = self.get_pdu(&event_id).await?;
let shortroomid = self.services.short.get_shortroomid(&room_id).await?;
let shortroomid = self
.services
.short
.get_shortroomid(&room_id)
.await?;
let insert_lock = self.mutex_insert.lock(&room_id).await;
let count: i64 = self.services.globals.next_count().unwrap().try_into()?;
let count: i64 = self
.services
.globals
.next_count()
.unwrap()
.try_into()?;
let pdu_id: RawPduId = PduId {
shortroomid,
@@ -1216,14 +1284,17 @@ impl Service {
.into();
// Insert pdu
self.db.prepend_backfill_pdu(&pdu_id, &event_id, &value);
self.db
.prepend_backfill_pdu(&pdu_id, &event_id, &value);
drop(insert_lock);
if pdu.kind == TimelineEventType::RoomMessage {
let content: ExtractBody = pdu.get_content()?;
if let Some(body) = content.body {
self.services.search.index_pdu(shortroomid, &pdu_id, &body);
self.services
.search
.index_pdu(shortroomid, &pdu_id, &body);
}
}
drop(mutex_lock);

View File

@@ -71,13 +71,18 @@ impl Service {
.await
.insert(room_id.to_owned(), self.services.globals.next_count()?);
if self.typing_update_sender.send(room_id.to_owned()).is_err() {
if self
.typing_update_sender
.send(room_id.to_owned())
.is_err()
{
trace!("receiver found what it was looking for and is no longer interested");
}
// update federation
if self.services.globals.user_is_local(user_id) {
self.federation_send(room_id, user_id, true).await?;
self.federation_send(room_id, user_id, true)
.await?;
}
Ok(())
@@ -99,13 +104,18 @@ impl Service {
.await
.insert(room_id.to_owned(), self.services.globals.next_count()?);
if self.typing_update_sender.send(room_id.to_owned()).is_err() {
if self
.typing_update_sender
.send(room_id.to_owned())
.is_err()
{
trace!("receiver found what it was looking for and is no longer interested");
}
// update federation
if self.services.globals.user_is_local(user_id) {
self.federation_send(room_id, user_id, false).await?;
self.federation_send(room_id, user_id, false)
.await?;
}
Ok(())
@@ -152,7 +162,11 @@ impl Service {
.await
.insert(room_id.to_owned(), self.services.globals.next_count()?);
if self.typing_update_sender.send(room_id.to_owned()).is_err() {
if self
.typing_update_sender
.send(room_id.to_owned())
.is_err()
{
trace!("receiver found what it was looking for and is no longer interested");
}
@@ -233,7 +247,10 @@ impl Service {
let mut buf = EduBuf::new();
serde_json::to_writer(&mut buf, &edu).expect("Serialized Edu::Typing");
self.services.sending.send_edu_room(room_id, buf).await?;
self.services
.sending
.send_edu_room(room_id, buf)
.await?;
Ok(())
}

View File

@@ -48,8 +48,12 @@ impl crate::Service for Service {
#[implement(Service)]
pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) {
let userroom_id = (user_id, room_id);
self.db.userroomid_highlightcount.put(userroom_id, 0_u64);
self.db.userroomid_notificationcount.put(userroom_id, 0_u64);
self.db
.userroomid_highlightcount
.put(userroom_id, 0_u64);
self.db
.userroomid_notificationcount
.put(userroom_id, 0_u64);
let roomuser_id = (room_id, user_id);
let count = self.services.globals.next_count().unwrap();
@@ -118,7 +122,11 @@ pub async fn get_token_shortstatehash(
room_id: &RoomId,
token: u64,
) -> Result<ShortStateHash> {
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
let shortroomid = self
.services
.short
.get_shortroomid(room_id)
.await?;
let key: &[u64] = &[shortroomid, token];
self.db