Merge pull request #3726 from element-hq/robin/generate-items-logs

Add debug logs to generateItems
This commit is contained in:
Robin
2026-02-06 13:45:13 +01:00
committed by GitHub
6 changed files with 54 additions and 27 deletions

View File

@@ -715,6 +715,7 @@ export function createCallViewModel$(
// Generate a collection of MediaItems from the list of expected (whether // Generate a collection of MediaItems from the list of expected (whether
// present or missing) LiveKit participants. // present or missing) LiveKit participants.
generateItems( generateItems(
"CallViewModel userMedia$",
function* ([ function* ([
localMatrixLivekitMember, localMatrixLivekitMember,
matrixLivekitMembers, matrixLivekitMembers,

View File

@@ -162,6 +162,7 @@ export function createConnectionManager$({
const connections$ = scope.behavior( const connections$ = scope.behavior(
localAndRemoteTransports$.pipe( localAndRemoteTransports$.pipe(
generateItemsWithEpoch( generateItemsWithEpoch(
"ConnectionManager connections$",
function* (transports) { function* (transports) {
for (const transportWithOrWithoutSfuConfig of transports) { for (const transportWithOrWithoutSfuConfig of transports) {
if ( if (

View File

@@ -11,7 +11,6 @@ import {
type LivekitTransportConfig, type LivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, filter, map } from "rxjs"; import { combineLatest, filter, map } from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../../Behavior"; import { type Behavior } from "../../Behavior";
import { type IConnectionManager } from "./ConnectionManager"; import { type IConnectionManager } from "./ConnectionManager";
@@ -19,8 +18,6 @@ import { Epoch, type ObservableScope } from "../../ObservableScope";
import { type Connection } from "./Connection"; import { type Connection } from "./Connection";
import { generateItemsWithEpoch } from "../../../utils/observable"; import { generateItemsWithEpoch } from "../../../utils/observable";
const logger = rootLogger.getChild("[MatrixLivekitMembers]");
interface LocalTaggedParticipant { interface LocalTaggedParticipant {
type: "local"; type: "local";
value$: Behavior<LocalParticipant | null>; value$: Behavior<LocalParticipant | null>;
@@ -94,9 +91,10 @@ export function createMatrixLivekitMembers$({
), ),
map(([ms, data]) => new Epoch([ms.value, data.value] as const, ms.epoch)), map(([ms, data]) => new Epoch([ms.value, data.value] as const, ms.epoch)),
generateItemsWithEpoch( generateItemsWithEpoch(
"MatrixLivekitMembers",
// Generator function. // Generator function.
// creates an array of `{key, data}[]` // creates an array of `{key, data}[]`
// Each change in the keys (new key, missing key) will result in a call to the factory function. // Each change in the keys (new key) will result in a call to the factory function.
function* ([membershipsWithTransport, managerData]) { function* ([membershipsWithTransport, managerData]) {
for (const { membership, transport } of membershipsWithTransport) { for (const { membership, transport } of membershipsWithTransport) {
const participants = transport const participants = transport
@@ -111,26 +109,23 @@ export function createMatrixLivekitMembers$({
: null; : null;
yield { yield {
// This could also just be the memberId without the other fields. // This could just be the backend identity without the other keys.
// In theory we should never have the same memberId for different userIds (they are UUIDs) // The user ID, device ID, and member ID are included however so
// This still makes us resilient agains someone who intentionally tries to use the same memberId. // they show up in debug logs.
// If they want to do this they would now need to also use the same sender which is impossible.
keys: [ keys: [
membership.userId, membership.userId,
membership.deviceId, membership.deviceId,
membership.memberId, membership.memberId,
membership.rtcBackendIdentity,
], ],
data: { membership, participant, connection }, data: { membership, participant, connection },
}; };
} }
}, },
// Each update where the key of the generator array do not change will result in updates to the `data$` observable in the factory. // Each update where the key of the generator array do not change will result in updates to the `data$` behavior.
(scope, data$, userId, deviceId, memberId) => { (scope, data$, userId, _deviceId, _memberId, _rtcBackendIdentity) => {
logger.debug(
`Generating member for livekitIdentity: ${data$.value.membership.rtcBackendIdentity},keys userId:deviceId:memberId ${userId}:${deviceId}:${memberId}`,
);
const { participant$, ...rest } = scope.splitBehavior(data$); const { participant$, ...rest } = scope.splitBehavior(data$);
// will only get called once per `participantId, userId` pair. // will only get called once per backend identity.
// updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent. // updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent.
return { return {
userId, userId,

View File

@@ -130,6 +130,7 @@ export class UserMedia {
// MediaViewModels don't support it though since they look for a unique // MediaViewModels don't support it though since they look for a unique
// track for the given source. So generateItems here is a bit overkill. // track for the given source. So generateItems here is a bit overkill.
generateItems( generateItems(
`${this.id} screenShares$`,
function* (p) { function* (p) {
if (p.isScreenShareEnabled) if (p.isScreenShareEnabled)
yield { yield {

View File

@@ -47,6 +47,7 @@ test("generateItems", () => {
expectObservable( expectObservable(
hot<string>(inputMarbles).pipe( hot<string>(inputMarbles).pipe(
generateItems( generateItems(
"test items",
function* (input) { function* (input) {
for (let i = 1; i <= +input; i++) { for (let i = 1; i <= +input; i++) {
yield { keys: [i], data: undefined }; yield { keys: [i], data: undefined };

View File

@@ -24,6 +24,7 @@ import {
type OperatorFunction, type OperatorFunction,
distinctUntilChanged, distinctUntilChanged,
} from "rxjs"; } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../state/Behavior"; import { type Behavior } from "../state/Behavior";
import { Epoch, ObservableScope } from "../state/ObservableScope"; import { Epoch, ObservableScope } from "../state/ObservableScope";
@@ -122,8 +123,9 @@ export function pauseWhen<T>(pause$: Behavior<boolean>) {
); );
} }
interface ItemHandle<Data, Item> { interface ItemHandle<Keys extends unknown[], Data, Item> {
scope: ObservableScope; scope: ObservableScope;
keys: readonly [...Keys];
data$: BehaviorSubject<Data>; data$: BehaviorSubject<Data>;
item: Item; item: Item;
} }
@@ -135,6 +137,7 @@ interface ItemHandle<Data, Item> {
* requested at a later time, and destroyed (have their scope ended) when the * requested at a later time, and destroyed (have their scope ended) when the
* key is no longer requested. * key is no longer requested.
* *
* @param name A name for this collection to use in debug logs.
* @param generator A generator function yielding a tuple of keys and the * @param generator A generator function yielding a tuple of keys and the
* currently associated data for each item that it wants to exist. * currently associated data for each item that it wants to exist.
* @param factory A function constructing an individual item, given the item's key, * @param factory A function constructing an individual item, given the item's key,
@@ -146,16 +149,17 @@ export function generateItems<
Data, Data,
Item, Item,
>( >(
name: string,
generator: ( generator: (
input: Input, input: Input,
) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>, ) => Iterable<{ keys: readonly [...Keys]; data: Data }, void, void>,
factory: ( factory: (
scope: ObservableScope, scope: ObservableScope,
data$: Behavior<Data>, data$: Behavior<Data>,
...keys: Keys ...keys: Keys
) => Item, ) => Item,
): OperatorFunction<Input, Item[]> { ): OperatorFunction<Input, Item[]> {
return generateItemsInternal(generator, factory, (items) => items); return generateItemsInternal(name, generator, factory, (items) => items);
} }
/** /**
@@ -167,9 +171,10 @@ export function generateItemsWithEpoch<
Data, Data,
Item, Item,
>( >(
name: string,
generator: ( generator: (
input: Input, input: Input,
) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>, ) => Iterable<{ keys: readonly [...Keys]; data: Data }, void, void>,
factory: ( factory: (
scope: ObservableScope, scope: ObservableScope,
data$: Behavior<Data>, data$: Behavior<Data>,
@@ -177,6 +182,7 @@ export function generateItemsWithEpoch<
) => Item, ) => Item,
): OperatorFunction<Epoch<Input>, Epoch<Item[]>> { ): OperatorFunction<Epoch<Input>, Epoch<Item[]>> {
return generateItemsInternal( return generateItemsInternal(
name,
function* (input) { function* (input) {
yield* generator(input.value); yield* generator(input.value);
}, },
@@ -214,9 +220,10 @@ function generateItemsInternal<
Item, Item,
Output, Output,
>( >(
name: string,
generator: ( generator: (
input: Input, input: Input,
) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>, ) => Iterable<{ keys: readonly [...Keys]; data: Data }, void, void>,
factory: ( factory: (
scope: ObservableScope, scope: ObservableScope,
data$: Behavior<Data>, data$: Behavior<Data>,
@@ -232,26 +239,34 @@ function generateItemsInternal<
Input, Input,
{ {
map: Map<any, any>; map: Map<any, any>;
items: Set<ItemHandle<Data, Item>>; items: Set<ItemHandle<Keys, Data, Item>>;
input: Input; input: Input;
}, },
{ map: Map<any, any>; items: Set<ItemHandle<Data, Item>> } { map: Map<any, any>; items: Set<ItemHandle<Keys, Data, Item>> }
>( >(
({ map: prevMap, items: prevItems }, input) => { ({ map: prevMap, items: prevItems }, input) => {
const nextMap = new Map(); const nextMap = new Map();
const nextItems = new Set<ItemHandle<Data, Item>>(); const nextItems = new Set<ItemHandle<Keys, Data, Item>>();
for (const { keys, data } of generator(input)) { for (const { keys, data } of generator(input)) {
// Disable type checks for a second to grab the item out of a nested map // Disable type checks for a second to grab the item out of a nested map
let i: any = prevMap; let i: any = prevMap;
for (const key of keys) i = i?.get(key); for (const key of keys) i = i?.get(key);
let item = i as ItemHandle<Data, Item> | undefined; let item = i as ItemHandle<Keys, Data, Item> | undefined;
if (item === undefined) { if (item === undefined) {
// First time requesting the key; create the item // First time requesting the key; create the item
const scope = new ObservableScope(); const scope = new ObservableScope();
const data$ = new BehaviorSubject(data); const data$ = new BehaviorSubject(data);
item = { scope, data$, item: factory(scope, data$, ...keys) }; logger.debug(
`[${name}] Creating item with keys ${keys.join(", ")}`,
);
item = {
scope,
keys,
data$,
item: factory(scope, data$, ...keys),
};
} else { } else {
item.data$.next(data); item.data$.next(data);
} }
@@ -269,7 +284,7 @@ function generateItemsInternal<
const finalKey = keys[keys.length - 1]; const finalKey = keys[keys.length - 1];
if (m.has(finalKey)) if (m.has(finalKey))
throw new Error( throw new Error(
`Keys must be unique (tried to generate multiple items for key ${keys})`, `Keys must be unique (tried to generate multiple items for key ${keys.join(", ")})`,
); );
m.set(keys[keys.length - 1], item); m.set(keys[keys.length - 1], item);
nextItems.add(item); nextItems.add(item);
@@ -277,7 +292,12 @@ function generateItemsInternal<
// Destroy all items that are no longer being requested // Destroy all items that are no longer being requested
for (const item of prevItems) for (const item of prevItems)
if (!nextItems.has(item)) item.scope.end(); if (!nextItems.has(item)) {
logger.debug(
`[${name}] Destroying item with keys ${item.keys.join(", ")}`,
);
item.scope.end();
}
return { map: nextMap, items: nextItems, input }; return { map: nextMap, items: nextItems, input };
}, },
@@ -285,7 +305,15 @@ function generateItemsInternal<
), ),
finalizeValue(({ items }) => { finalizeValue(({ items }) => {
// Destroy all remaining items when no longer subscribed // Destroy all remaining items when no longer subscribed
for (const { scope } of items) scope.end(); logger.debug(
`[${name}] End of scope, destroying all ${items.size} items…`,
);
for (const item of items) {
logger.debug(
`[${name}] Destroying item with keys ${item.keys.join(", ")}`,
);
item.scope.end();
}
}), }),
map(({ items, input }) => map(({ items, input }) =>
project( project(