2025-10-05 19:48:07 +00:00
|
|
|
mod account_data;
|
|
|
|
|
mod e2ee;
|
2025-10-07 21:35:42 +00:00
|
|
|
mod filter;
|
2025-10-05 19:48:07 +00:00
|
|
|
mod receipts;
|
|
|
|
|
mod room;
|
2025-10-07 21:35:42 +00:00
|
|
|
mod selector;
|
2025-10-05 19:48:07 +00:00
|
|
|
mod to_device;
|
|
|
|
|
mod typing;
|
|
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
use std::{collections::BTreeMap, fmt::Debug, time::Duration};
|
2025-01-03 08:32:54 +01:00
|
|
|
|
|
|
|
|
use axum::extract::State;
|
2025-04-06 21:59:18 +00:00
|
|
|
use futures::{
|
2025-10-07 21:35:42 +00:00
|
|
|
FutureExt, TryFutureExt, TryStreamExt,
|
|
|
|
|
future::{OptionFuture, join, join5, try_join},
|
2025-04-06 21:59:18 +00:00
|
|
|
};
|
2025-01-03 08:32:54 +01:00
|
|
|
use ruma::{
|
2025-10-07 21:35:42 +00:00
|
|
|
DeviceId, OwnedRoomId, RoomId, UserId,
|
2025-10-05 19:48:07 +00:00
|
|
|
api::client::sync::sync_events::v5::{
|
2025-10-07 21:35:42 +00:00
|
|
|
ListId, Request, Response, request::ExtensionRoomConfig, response,
|
2025-06-29 03:33:29 +00:00
|
|
|
},
|
2025-10-06 01:12:01 +00:00
|
|
|
events::room::member::MembershipState,
|
2025-01-03 08:32:54 +01:00
|
|
|
};
|
2025-09-04 22:33:42 +00:00
|
|
|
use tokio::time::{Instant, timeout_at};
|
2025-04-22 01:41:02 +00:00
|
|
|
use tuwunel_core::{
|
2025-10-22 20:30:43 +00:00
|
|
|
Err, Result, apply, at, debug,
|
2025-10-07 21:35:42 +00:00
|
|
|
debug::INFO_SPAN_LEVEL,
|
2025-10-25 13:14:23 +00:00
|
|
|
debug_warn,
|
2025-10-04 08:50:21 +00:00
|
|
|
error::inspect_log,
|
2025-10-07 21:35:42 +00:00
|
|
|
extract_variant,
|
|
|
|
|
smallvec::SmallVec,
|
2025-10-05 19:48:07 +00:00
|
|
|
trace,
|
2025-04-22 01:41:02 +00:00
|
|
|
utils::{
|
2025-10-07 21:35:42 +00:00
|
|
|
BoolExt, IterStream, TryFutureExtExt,
|
2025-09-04 22:33:42 +00:00
|
|
|
result::FlatOk,
|
2025-10-07 21:35:42 +00:00
|
|
|
stream::{TryBroadbandExt, TryReadyExt},
|
2025-04-22 01:41:02 +00:00
|
|
|
},
|
|
|
|
|
};
|
2025-09-04 22:33:42 +00:00
|
|
|
use tuwunel_service::{
|
|
|
|
|
Services,
|
2025-10-07 21:35:42 +00:00
|
|
|
sync::{Connection, into_connection_key},
|
2025-09-04 22:33:42 +00:00
|
|
|
};
|
2025-01-03 08:32:54 +01:00
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
use self::{
|
|
|
|
|
filter::{filter_room, filter_room_meta},
|
|
|
|
|
selector::selector,
|
|
|
|
|
};
|
2025-04-06 21:59:18 +00:00
|
|
|
use super::share_encrypted_room;
|
2025-10-07 21:35:42 +00:00
|
|
|
use crate::Ruma;
|
2025-01-03 08:32:54 +01:00
|
|
|
|
2025-10-06 01:12:01 +00:00
|
|
|
#[derive(Copy, Clone)]
|
|
|
|
|
struct SyncInfo<'a> {
|
2025-10-07 21:35:42 +00:00
|
|
|
services: &'a Services,
|
2025-10-06 01:12:01 +00:00
|
|
|
sender_user: &'a UserId,
|
|
|
|
|
sender_device: &'a DeviceId,
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
|
struct WindowRoom {
|
|
|
|
|
room_id: OwnedRoomId,
|
|
|
|
|
membership: Option<MembershipState>,
|
|
|
|
|
lists: ListIds,
|
|
|
|
|
ranked: usize,
|
|
|
|
|
last_count: u64,
|
2025-10-06 01:12:01 +00:00
|
|
|
}
|
|
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
type Window = BTreeMap<OwnedRoomId, WindowRoom>;
|
2025-10-06 01:12:01 +00:00
|
|
|
type ResponseLists = BTreeMap<ListId, response::List>;
|
2025-10-07 21:35:42 +00:00
|
|
|
type ListIds = SmallVec<[ListId; 1]>;
|
2025-01-03 08:32:54 +01:00
|
|
|
|
|
|
|
|
/// `POST /_matrix/client/unstable/org.matrix.simplified_msc3575/sync`
|
|
|
|
|
/// ([MSC4186])
|
|
|
|
|
///
|
|
|
|
|
/// A simplified version of sliding sync ([MSC3575]).
|
|
|
|
|
///
|
|
|
|
|
/// Get all new events in a sliding window of rooms since the last sync or a
|
|
|
|
|
/// given point in time.
|
|
|
|
|
///
|
|
|
|
|
/// [MSC3575]: https://github.com/matrix-org/matrix-spec-proposals/pull/3575
|
|
|
|
|
/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186
|
2025-09-04 22:33:42 +00:00
|
|
|
#[tracing::instrument(
|
|
|
|
|
name = "sync",
|
2025-10-07 21:35:42 +00:00
|
|
|
level = INFO_SPAN_LEVEL,
|
2025-09-04 22:33:42 +00:00
|
|
|
skip_all,
|
|
|
|
|
fields(
|
2025-10-07 21:35:42 +00:00
|
|
|
user_id = %body.sender_user().localpart(),
|
2025-09-04 22:33:42 +00:00
|
|
|
device_id = %body.sender_device(),
|
2025-10-07 21:35:42 +00:00
|
|
|
conn_id = ?body.body.conn_id.clone().unwrap_or_default(),
|
2025-10-22 07:06:00 +00:00
|
|
|
since = ?body.body.pos.clone().unwrap_or_default(),
|
2025-09-04 22:33:42 +00:00
|
|
|
)
|
|
|
|
|
)]
|
2025-01-03 08:32:54 +01:00
|
|
|
pub(crate) async fn sync_events_v5_route(
|
2025-04-06 21:59:18 +00:00
|
|
|
State(ref services): State<crate::State>,
|
2025-10-07 21:35:42 +00:00
|
|
|
body: Ruma<Request>,
|
2025-09-04 22:33:42 +00:00
|
|
|
) -> Result<Response> {
|
2025-10-21 05:25:27 +00:00
|
|
|
let (sender_user, sender_device) = body.sender();
|
2025-10-07 21:35:42 +00:00
|
|
|
let request = &body.body;
|
|
|
|
|
let since = request
|
2025-01-03 08:32:54 +01:00
|
|
|
.pos
|
|
|
|
|
.as_ref()
|
|
|
|
|
.and_then(|string| string.parse().ok())
|
|
|
|
|
.unwrap_or(0);
|
|
|
|
|
|
2025-10-21 05:25:27 +00:00
|
|
|
let timeout = request
|
|
|
|
|
.timeout
|
|
|
|
|
.as_ref()
|
|
|
|
|
.map(Duration::as_millis)
|
|
|
|
|
.map(TryInto::try_into)
|
|
|
|
|
.flat_ok()
|
2025-10-22 20:53:04 +00:00
|
|
|
.map(|timeout: u64| {
|
|
|
|
|
timeout
|
|
|
|
|
.max(services.config.client_sync_timeout_min)
|
|
|
|
|
.min(services.config.client_sync_timeout_max)
|
|
|
|
|
})
|
|
|
|
|
.unwrap_or(0);
|
2025-10-21 05:25:27 +00:00
|
|
|
|
2025-10-06 01:12:01 +00:00
|
|
|
let conn_key = into_connection_key(sender_user, sender_device, request.conn_id.as_deref());
|
2025-10-25 13:14:23 +00:00
|
|
|
let conn_val = services
|
|
|
|
|
.sync
|
|
|
|
|
.load_or_init_connection(&conn_key)
|
|
|
|
|
.await;
|
2025-10-06 01:12:01 +00:00
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
let conn = conn_val.lock();
|
|
|
|
|
let ping_presence = services
|
|
|
|
|
.presence
|
|
|
|
|
.maybe_ping_presence(sender_user, &request.set_presence)
|
|
|
|
|
.inspect_err(inspect_log)
|
|
|
|
|
.ok();
|
2025-01-03 08:32:54 +01:00
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
let (mut conn, _) = join(conn, ping_presence).await;
|
2025-01-03 08:32:54 +01:00
|
|
|
|
2025-10-25 13:14:23 +00:00
|
|
|
if since != 0 && conn.next_batch == 0 {
|
|
|
|
|
return Err!(Request(UnknownPos(warn!("Connection lost; restarting sync stream."))));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if since == 0 {
|
|
|
|
|
*conn = Connection::default();
|
|
|
|
|
conn.store(&services.sync, &conn_key);
|
|
|
|
|
debug_warn!(?conn_key, "Client cleared cache and reloaded.");
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-21 05:25:27 +00:00
|
|
|
let advancing = since == conn.next_batch;
|
2025-10-25 13:14:23 +00:00
|
|
|
let retarding = since != 0 && since <= conn.globalsince;
|
2025-10-24 18:30:53 +00:00
|
|
|
if !advancing && !retarding {
|
2025-10-25 13:14:23 +00:00
|
|
|
return Err!(Request(UnknownPos(warn!(
|
|
|
|
|
"Requesting unknown or invalid stream position."
|
|
|
|
|
))));
|
2025-10-15 21:59:01 +00:00
|
|
|
}
|
|
|
|
|
|
2025-10-21 05:25:27 +00:00
|
|
|
debug_assert!(
|
2025-10-24 18:30:53 +00:00
|
|
|
advancing || retarding,
|
|
|
|
|
"Request should either be advancing or replaying the since token."
|
2025-10-21 05:25:27 +00:00
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Update parameters regardless of replay or advance
|
2025-10-22 07:06:00 +00:00
|
|
|
conn.next_batch = services.globals.wait_pending().await?;
|
2025-10-22 20:53:04 +00:00
|
|
|
conn.globalsince = since.min(conn.next_batch);
|
2025-10-07 21:35:42 +00:00
|
|
|
conn.update_cache(request);
|
2025-10-24 18:30:53 +00:00
|
|
|
conn.update_rooms_prologue(retarding.then_some(since));
|
2025-10-06 01:12:01 +00:00
|
|
|
|
2025-09-04 22:33:42 +00:00
|
|
|
let mut response = Response {
|
|
|
|
|
txn_id: request.txn_id.clone(),
|
2025-10-07 21:35:42 +00:00
|
|
|
lists: Default::default(),
|
2025-10-06 01:12:01 +00:00
|
|
|
pos: Default::default(),
|
2025-09-04 22:33:42 +00:00
|
|
|
rooms: Default::default(),
|
|
|
|
|
extensions: Default::default(),
|
|
|
|
|
};
|
2025-01-03 08:32:54 +01:00
|
|
|
|
2025-10-22 20:53:04 +00:00
|
|
|
let stop_at = Instant::now()
|
|
|
|
|
.checked_add(Duration::from_millis(timeout))
|
|
|
|
|
.expect("configuration must limit maximum timeout");
|
|
|
|
|
|
2025-10-22 20:30:43 +00:00
|
|
|
let sync_info = SyncInfo { services, sender_user, sender_device };
|
2025-09-04 22:33:42 +00:00
|
|
|
loop {
|
2025-10-21 05:25:27 +00:00
|
|
|
debug_assert!(
|
|
|
|
|
conn.globalsince <= conn.next_batch,
|
2025-10-21 15:48:05 +00:00
|
|
|
"since should not be greater than next_batch."
|
2025-10-21 05:25:27 +00:00
|
|
|
);
|
|
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
let window;
|
2025-10-22 20:53:04 +00:00
|
|
|
let watchers = services.sync.watch(
|
|
|
|
|
sender_user,
|
|
|
|
|
sender_device,
|
|
|
|
|
services.state_cache.rooms_joined(sender_user),
|
|
|
|
|
);
|
2025-01-03 08:32:54 +01:00
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
conn.next_batch = services.globals.wait_pending().await?;
|
2025-10-22 07:06:00 +00:00
|
|
|
(window, response.lists) = selector(&mut conn, sync_info).boxed().await;
|
2025-10-07 21:35:42 +00:00
|
|
|
if conn.globalsince < conn.next_batch {
|
2025-10-22 20:53:04 +00:00
|
|
|
let rooms = handle_rooms(sync_info, &conn, &window)
|
|
|
|
|
.map_ok(|response_rooms| response.rooms = response_rooms);
|
2025-04-06 21:59:18 +00:00
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
let extensions = handle_extensions(sync_info, &conn, &window)
|
2025-10-22 20:53:04 +00:00
|
|
|
.map_ok(|response_extensions| response.extensions = response_extensions);
|
2025-04-06 21:59:18 +00:00
|
|
|
|
2025-09-04 22:33:42 +00:00
|
|
|
try_join(rooms, extensions).boxed().await?;
|
2025-04-06 21:59:18 +00:00
|
|
|
|
2025-10-21 05:25:27 +00:00
|
|
|
conn.update_rooms_epilogue(window.keys().map(AsRef::as_ref));
|
2025-10-07 21:35:42 +00:00
|
|
|
|
2025-09-04 22:33:42 +00:00
|
|
|
if !is_empty_response(&response) {
|
2025-10-07 21:35:42 +00:00
|
|
|
response.pos = conn.next_batch.to_string().into();
|
|
|
|
|
trace!(conn.globalsince, conn.next_batch, "response {response:?}");
|
2025-10-25 13:14:23 +00:00
|
|
|
conn.store(&services.sync, &conn_key);
|
2025-09-04 22:33:42 +00:00
|
|
|
return Ok(response);
|
|
|
|
|
}
|
2025-01-03 08:32:54 +01:00
|
|
|
}
|
2025-04-06 21:59:18 +00:00
|
|
|
|
2025-10-22 20:53:04 +00:00
|
|
|
if timeout == 0
|
|
|
|
|
|| services.server.is_stopping()
|
|
|
|
|
|| timeout_at(stop_at, watchers)
|
|
|
|
|
.boxed()
|
|
|
|
|
.await
|
|
|
|
|
.is_err()
|
|
|
|
|
{
|
2025-10-07 21:35:42 +00:00
|
|
|
response.pos = conn.next_batch.to_string().into();
|
2025-10-22 20:30:43 +00:00
|
|
|
trace!(conn.globalsince, conn.next_batch, "timeout; empty response {response:?}");
|
2025-10-25 13:14:23 +00:00
|
|
|
conn.store(&services.sync, &conn_key);
|
2025-09-04 22:33:42 +00:00
|
|
|
return Ok(response);
|
|
|
|
|
}
|
2025-01-03 08:32:54 +01:00
|
|
|
|
2025-10-22 20:30:43 +00:00
|
|
|
debug!(
|
|
|
|
|
?timeout,
|
|
|
|
|
last_since = conn.globalsince,
|
|
|
|
|
last_batch = conn.next_batch,
|
|
|
|
|
pend_count = ?services.globals.pending_count(),
|
2025-09-04 22:33:42 +00:00
|
|
|
"notified by watcher"
|
2025-01-03 08:32:54 +01:00
|
|
|
);
|
2025-09-04 22:33:42 +00:00
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
conn.globalsince = conn.next_batch;
|
2025-01-03 08:32:54 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-09-04 22:33:42 +00:00
|
|
|
fn is_empty_response(response: &Response) -> bool {
|
2025-10-22 07:06:00 +00:00
|
|
|
response.extensions.is_empty() && response.rooms.is_empty()
|
2025-09-04 22:33:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tracing::instrument(
|
2025-10-07 21:35:42 +00:00
|
|
|
name = "rooms",
|
2025-09-04 22:33:42 +00:00
|
|
|
level = "debug",
|
|
|
|
|
skip_all,
|
|
|
|
|
fields(
|
2025-10-07 21:35:42 +00:00
|
|
|
next_batch = conn.next_batch,
|
|
|
|
|
window = window.len(),
|
2025-09-04 22:33:42 +00:00
|
|
|
)
|
|
|
|
|
)]
|
2025-10-06 01:12:01 +00:00
|
|
|
async fn handle_rooms(
|
|
|
|
|
sync_info: SyncInfo<'_>,
|
2025-10-07 21:35:42 +00:00
|
|
|
conn: &Connection,
|
|
|
|
|
window: &Window,
|
2025-10-06 01:12:01 +00:00
|
|
|
) -> Result<BTreeMap<OwnedRoomId, response::Room>> {
|
2025-10-07 21:35:42 +00:00
|
|
|
window
|
2025-09-04 22:33:42 +00:00
|
|
|
.iter()
|
|
|
|
|
.try_stream()
|
2025-10-07 21:35:42 +00:00
|
|
|
.broad_and_then(async |(room_id, room)| {
|
|
|
|
|
room::handle(sync_info, conn, room)
|
|
|
|
|
.map_ok(|room| (room_id, room))
|
|
|
|
|
.await
|
2025-09-04 22:33:42 +00:00
|
|
|
})
|
|
|
|
|
.ready_try_filter_map(|(room_id, room)| Ok(room.map(|room| (room_id, room))))
|
|
|
|
|
.map_ok(|(room_id, room)| (room_id.to_owned(), room))
|
|
|
|
|
.try_collect()
|
2025-10-07 21:35:42 +00:00
|
|
|
.await
|
2025-09-04 22:33:42 +00:00
|
|
|
}
|
2025-01-03 08:32:54 +01:00
|
|
|
|
2025-09-04 22:33:42 +00:00
|
|
|
#[tracing::instrument(
|
2025-10-07 21:35:42 +00:00
|
|
|
name = "extensions",
|
2025-09-04 22:33:42 +00:00
|
|
|
level = "debug",
|
|
|
|
|
skip_all,
|
|
|
|
|
fields(
|
2025-10-07 21:35:42 +00:00
|
|
|
next_batch = conn.next_batch,
|
|
|
|
|
window = window.len(),
|
|
|
|
|
rooms = conn.rooms.len(),
|
|
|
|
|
subs = conn.subscriptions.len(),
|
2025-09-04 22:33:42 +00:00
|
|
|
)
|
|
|
|
|
)]
|
2025-10-04 02:36:17 +00:00
|
|
|
async fn handle_extensions(
|
2025-09-04 22:33:42 +00:00
|
|
|
sync_info: SyncInfo<'_>,
|
2025-10-07 21:35:42 +00:00
|
|
|
conn: &Connection,
|
|
|
|
|
window: &Window,
|
2025-10-04 02:36:17 +00:00
|
|
|
) -> Result<response::Extensions> {
|
2025-10-07 21:35:42 +00:00
|
|
|
let SyncInfo { .. } = sync_info;
|
2025-01-03 08:32:54 +01:00
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
let account_data: OptionFuture<_> = conn
|
2025-04-22 04:42:26 +00:00
|
|
|
.extensions
|
|
|
|
|
.account_data
|
|
|
|
|
.enabled
|
|
|
|
|
.unwrap_or(false)
|
2025-10-07 21:35:42 +00:00
|
|
|
.then(|| account_data::collect(sync_info, conn, window))
|
2025-09-04 22:33:42 +00:00
|
|
|
.into();
|
|
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
let receipts: OptionFuture<_> = conn
|
2025-09-04 22:33:42 +00:00
|
|
|
.extensions
|
|
|
|
|
.receipts
|
|
|
|
|
.enabled
|
|
|
|
|
.unwrap_or(false)
|
2025-10-07 21:35:42 +00:00
|
|
|
.then(|| receipts::collect(sync_info, conn, window))
|
2025-09-04 22:33:42 +00:00
|
|
|
.into();
|
|
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
let typing: OptionFuture<_> = conn
|
2025-09-04 22:33:42 +00:00
|
|
|
.extensions
|
|
|
|
|
.typing
|
|
|
|
|
.enabled
|
|
|
|
|
.unwrap_or(false)
|
2025-10-07 21:35:42 +00:00
|
|
|
.then(|| typing::collect(sync_info, conn, window))
|
2025-09-04 22:33:42 +00:00
|
|
|
.into();
|
|
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
let to_device: OptionFuture<_> = conn
|
2025-09-04 22:33:42 +00:00
|
|
|
.extensions
|
|
|
|
|
.to_device
|
|
|
|
|
.enabled
|
|
|
|
|
.unwrap_or(false)
|
2025-10-07 21:35:42 +00:00
|
|
|
.then(|| to_device::collect(sync_info, conn))
|
2025-09-04 22:33:42 +00:00
|
|
|
.into();
|
|
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
let e2ee: OptionFuture<_> = conn
|
2025-09-04 22:33:42 +00:00
|
|
|
.extensions
|
|
|
|
|
.e2ee
|
|
|
|
|
.enabled
|
|
|
|
|
.unwrap_or(false)
|
2025-10-07 21:35:42 +00:00
|
|
|
.then(|| e2ee::collect(sync_info, conn))
|
2025-09-04 22:33:42 +00:00
|
|
|
.into();
|
|
|
|
|
|
|
|
|
|
let (account_data, receipts, typing, to_device, e2ee) =
|
|
|
|
|
join5(account_data, receipts, typing, to_device, e2ee)
|
|
|
|
|
.map(apply!(5, |t: Option<_>| t.unwrap_or(Ok(Default::default()))))
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
Ok(response::Extensions {
|
|
|
|
|
account_data: account_data?,
|
|
|
|
|
receipts: receipts?,
|
|
|
|
|
typing: typing?,
|
|
|
|
|
to_device: to_device?,
|
|
|
|
|
e2ee: e2ee?,
|
|
|
|
|
})
|
|
|
|
|
}
|
2025-01-03 08:32:54 +01:00
|
|
|
|
2025-10-07 21:35:42 +00:00
|
|
|
#[tracing::instrument(
|
|
|
|
|
name = "selector",
|
|
|
|
|
level = "trace",
|
|
|
|
|
skip_all,
|
|
|
|
|
fields(?implicit, ?explicit),
|
|
|
|
|
)]
|
|
|
|
|
fn extension_rooms_selector<'a, ListIter, SubsIter>(
|
|
|
|
|
SyncInfo { .. }: SyncInfo<'a>,
|
|
|
|
|
conn: &'a Connection,
|
|
|
|
|
window: &'a Window,
|
|
|
|
|
implicit: Option<ListIter>,
|
|
|
|
|
explicit: Option<SubsIter>,
|
2025-10-06 01:12:01 +00:00
|
|
|
) -> impl Iterator<Item = &'a RoomId> + Send + Sync + 'a
|
|
|
|
|
where
|
2025-10-07 21:35:42 +00:00
|
|
|
ListIter: Iterator<Item = &'a ListId> + Clone + Debug + Send + Sync + 'a,
|
|
|
|
|
SubsIter: Iterator<Item = &'a ExtensionRoomConfig> + Clone + Debug + Send + Sync + 'a,
|
2025-10-06 01:12:01 +00:00
|
|
|
{
|
2025-10-07 21:35:42 +00:00
|
|
|
let has_all_subscribed = explicit
|
2025-10-06 01:12:01 +00:00
|
|
|
.clone()
|
2025-09-04 22:33:42 +00:00
|
|
|
.into_iter()
|
2025-10-06 01:12:01 +00:00
|
|
|
.flatten()
|
2025-10-07 21:35:42 +00:00
|
|
|
.any(|erc| matches!(erc, ExtensionRoomConfig::AllSubscribed));
|
|
|
|
|
|
|
|
|
|
let all_subscribed = has_all_subscribed
|
|
|
|
|
.then(|| conn.subscriptions.keys())
|
|
|
|
|
.into_iter()
|
|
|
|
|
.flatten()
|
|
|
|
|
.map(AsRef::as_ref);
|
|
|
|
|
|
|
|
|
|
let rooms_explicit = has_all_subscribed
|
|
|
|
|
.is_false()
|
|
|
|
|
.then(move || {
|
|
|
|
|
explicit
|
2025-09-04 22:33:42 +00:00
|
|
|
.into_iter()
|
2025-10-07 21:35:42 +00:00
|
|
|
.flatten()
|
|
|
|
|
.filter_map(|erc| extract_variant!(erc, ExtensionRoomConfig::Room))
|
|
|
|
|
.map(AsRef::as_ref)
|
2025-07-24 05:34:22 +00:00
|
|
|
})
|
2025-10-07 21:35:42 +00:00
|
|
|
.into_iter()
|
|
|
|
|
.flatten();
|
|
|
|
|
|
|
|
|
|
let rooms_selected = window
|
|
|
|
|
.iter()
|
|
|
|
|
.filter(move |(_, room)| {
|
|
|
|
|
implicit.as_ref().is_none_or(|lists| {
|
|
|
|
|
lists
|
|
|
|
|
.clone()
|
|
|
|
|
.any(|list| room.lists.contains(list))
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
.map(at!(0))
|
|
|
|
|
.map(AsRef::as_ref);
|
|
|
|
|
|
|
|
|
|
all_subscribed
|
2025-09-04 22:33:42 +00:00
|
|
|
.chain(rooms_explicit)
|
2025-10-07 21:35:42 +00:00
|
|
|
.chain(rooms_selected)
|
2025-04-06 21:59:18 +00:00
|
|
|
}
|