Rebox future segmentations.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-08 11:48:45 +00:00
parent aac49b09c7
commit 18f8d6c65c
14 changed files with 43 additions and 32 deletions

View File

@@ -1,5 +1,6 @@
use std::collections::BTreeMap;
use futures::FutureExt;
use ruma::{
RoomId, UserId,
events::{
@@ -239,6 +240,7 @@ pub async fn revoke_admin(&self, user_id: &UserId) -> Result {
&room_id,
&state_lock,
)
.boxed()
.await
.map(|_| ())
}

View File

@@ -321,6 +321,7 @@ impl Service {
.await
{
self.handle_response_error(e, room_id, user_id, &state_lock)
.boxed()
.await
.unwrap_or_else(default_log);
}
@@ -344,6 +345,7 @@ impl Service {
self.services
.timeline
.build_and_append_pdu(PduBuilder::timeline(&content), user_id, room_id, state_lock)
.boxed()
.await?;
Ok(())

View File

@@ -7,7 +7,7 @@ use std::{
time::Instant,
};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, pin_mut};
use ruma::{EventId, OwnedEventId, RoomId};
use tuwunel_core::{
Err, Result, at, debug, debug_error, implement, trace,
@@ -80,13 +80,13 @@ where
const BUCKET: Bucket<'_> = BTreeSet::new();
let started = Instant::now();
let mut starting_ids = self
let starting_ids = self
.services
.short
.multi_get_or_create_shorteventid(starting_events.clone())
.zip(starting_events.clone().stream())
.boxed();
.zip(starting_events.clone().stream());
pin_mut!(starting_ids);
let mut buckets = [BUCKET; NUM_BUCKETS];
while let Some((short, starting_event)) = starting_ids.next().await {
let bucket: usize = short.try_into()?;

View File

@@ -112,6 +112,7 @@ pub async fn handle_incoming_pdu<'a>(
let (incoming_pdu, val) = self
.handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
.boxed()
.await?;
// 8. if not timeline event: stop

View File

@@ -61,6 +61,7 @@ where
.await?
} else {
self.state_at_incoming_resolved(&incoming_pdu, room_id, &room_version_id)
.boxed()
.await?
};
@@ -215,6 +216,7 @@ where
let new_room_state = self
.resolve_state(room_id, &room_version_id, state_after)
.boxed()
.await?;
// Set the new room state to the resolved state

View File

@@ -118,14 +118,11 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
},
)
.await;
match response {
| Ok(response) => {
for pdu in response.pdus {
if let Err(e) = self
.backfill_pdu(backfill_server, pdu)
.boxed()
.await
{
if let Err(e) = self.backfill_pdu(backfill_server, pdu).await {
debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}");
}
}

View File

@@ -253,9 +253,11 @@ impl Service {
) {
let keep =
usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX);
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
let mut active = self.db.active_requests().boxed();
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
let active = self.db.active_requests();
pin_mut!(active);
while let Some((key, event, dest)) = active.next().await {
if self.shard_id(&dest) != id {
continue;
@@ -505,6 +507,7 @@ impl Service {
.then_some((room_id, receipt_map))
})
.collect()
.boxed()
.await;
if receipts.is_empty() {