Add Epoch<T> and use it in for the memberships$ behavior and its

derivatives.
This commit is contained in:
Timo K
2025-11-06 21:54:34 +01:00
parent 2e6b1767b9
commit 7c41aef801
11 changed files with 322 additions and 135 deletions

View File

@@ -18,6 +18,10 @@ import { BehaviorSubject } from "rxjs";
*/ */
export type Behavior<T> = Omit<BehaviorSubject<T>, "next" | "observers">; export type Behavior<T> = Omit<BehaviorSubject<T>, "next" | "observers">;
export type BehaviorWithEpoch<T> = Behavior<T> & {
pipeEpoch(): Behavior<{ value: T; epoch: number }>;
};
/** /**
* Creates a Behavior which never changes in value. * Creates a Behavior which never changes in value.
*/ */

View File

@@ -226,7 +226,10 @@ export class CallViewModel {
[this.localTransport$, this.membershipsAndTransports.transports$], [this.localTransport$, this.membershipsAndTransports.transports$],
(localTransport, transports) => { (localTransport, transports) => {
const localTransportAsArray = localTransport ? [localTransport] : []; const localTransportAsArray = localTransport ? [localTransport] : [];
return [...localTransportAsArray, ...transports]; return transports.mapInner((transports) => [
...localTransportAsArray,
...transports,
]);
}, },
), ),
); );

View File

@@ -0,0 +1,56 @@
/*
Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { describe, expect, it } from "vitest";
import { Epoch, mapEpoch, trackEpoch } from "./ObservableScope";
import { withTestScheduler } from "../utils/test";
describe("Epoch", () => {
it("should map the value correctly", () => {
const epoch = new Epoch(1);
const mappedEpoch = epoch.mapInner((v) => v + 1);
expect(mappedEpoch.value).toBe(2);
expect(mappedEpoch.epoch).toBe(0);
});
it("should be tracked from an observable", () => {
withTestScheduler(({ expectObservable, behavior }) => {
const observable$ = behavior("abc", {
a: 1,
b: 2,
c: 3,
});
const epochObservable$ = observable$.pipe(trackEpoch());
expectObservable(epochObservable$).toBe("abc", {
a: expect.toSatisfy((e) => e.epoch === 0 && e.value === 1),
b: expect.toSatisfy((e) => e.epoch === 1 && e.value === 2),
c: expect.toSatisfy((e) => e.epoch === 2 && e.value === 3),
});
});
});
it("can be mapped without loosing epoch information", () => {
withTestScheduler(({ expectObservable, behavior }) => {
const observable$ = behavior("abc", {
a: "A",
b: "B",
c: "C",
});
const epochObservable$ = observable$.pipe(trackEpoch());
const derivedEpoch$ = epochObservable$.pipe(
mapEpoch((e) => e + "-mapped"),
);
expectObservable(derivedEpoch$).toBe("abc", {
a: new Epoch("A-mapped", 0),
b: new Epoch("B-mapped", 1),
c: new Epoch("C-mapped", 2),
});
});
});
});

View File

@@ -12,7 +12,9 @@ import {
EMPTY, EMPTY,
endWith, endWith,
filter, filter,
map,
type Observable, type Observable,
type OperatorFunction,
share, share,
take, take,
takeUntil, takeUntil,
@@ -151,3 +153,107 @@ export class ObservableScope {
* The global scope, a scope which never ends. * The global scope, a scope which never ends.
*/ */
export const globalScope = new ObservableScope(); export const globalScope = new ObservableScope();
/**
* `Epoch`'s can be used to create `Behavior`s and `Observable`s which derivitives can be merged
* with `combinedLatest` without duplicated emissions.
*
* This is useful in the following example:
* ```
* const rootObs$ = of("red","green","blue");
* const derivedObs$ = rootObs$.pipe(
* map((v)=> {red:"fire", green:"grass", blue:"water"}[v])
* );
* const otherDerivedObs$ = rootObs$.pipe(
* map((v)=> {red:"tomatoes", green:"leaves", blue:"sky"}[v])
* );
* const mergedObs$ = combineLatest([rootObs$, derivedObs$, otherDerivedObs$]).pipe(
* map(([color, a,b]) => color + " like " + a + " and " + b)
* );
*
* ```
* will result in 6 emissions with mismatching items like "red like fire and leaves"
*
* # Use Epoch
* ```
* const rootObs$ = of(1,2,3).pipe(trackEpoch());
* const derivedObs$ = rootObs$.pipe(
* mapEpoch((v)=> "this number: " + v)
* );
* const otherDerivedObs$ = rootObs$.pipe(
* mapEpoch((v)=> "multiplied by: " + v)
* );
* const mergedObs$ = combineLatest([derivedObs$, otherDerivedObs$]).pipe(
* filter((values) => values.every((v) => v.epoch === values[0].v)),
* map(([color, a, b]) => color + " like " + a + " and " + b)
* );
*
* ```
* will result in 3 emissions all matching (e.g. "blue like water and sky")
*/
export class Epoch<T> {
public readonly epoch: number;
public readonly value: T;
public constructor(value: T, epoch?: number) {
this.value = value;
this.epoch = epoch ?? 0;
}
/**
* Maps the value inside the epoch to a new value while keeping the epoch number.
* # usage
* ```
* const myEpoch$ = myObservable$.pipe(
* map(trackEpoch()),
* // this is the preferred way using mapEpoch
* mapEpoch((v)=> v+1)
* // This is how inner map can be used:
* map((epoch) => epoch.innerMap((v)=> v+1))
* // It is equivalent to:
* map((epoch) => new Epoch(epoch.value + 1, epoch.epoch))
* )
* ```
* See also `Epoch<T>`
*/
public mapInner<U>(map: (value: T) => U): Epoch<U> {
return new Epoch<U>(map(this.value), this.epoch);
}
}
/**
* A `pipe` compatible map oparator that keeps the epoch in tact but allows mapping the value.
* # usage
* ```
* const myEpoch$ = myObservable$.pipe(
* map(trackEpoch()),
* // this is the preferred way using mapEpoch
* mapEpoch((v)=> v+1)
* // This is how inner map can be used:
* map((epoch) => epoch.innerMap((v)=> v+1))
* // It is equivalent to:
* map((epoch) => new Epoch(epoch.value + 1, epoch.epoch))
* )
* ```
* See also `Epoch<T>`
*/
export function mapEpoch<T, U>(
mapFn: (value: T) => U,
): OperatorFunction<Epoch<T>, Epoch<U>> {
return map((e) => e.mapInner(mapFn));
}
/**
* # usage
* ```
* const myEpoch$ = myObservable$.pipe(
* map(trackEpoch()),
* map((epoch) => epoch.innerMap((v)=> v+1))
* )
* const derived = myEpoch$.pipe(
* mapEpoch((v)=>v^2)
* )
* ```
* See also `Epoch<T>`
*/
export function trackEpoch<T>(): OperatorFunction<T, Epoch<T>> {
return map<T, Epoch<T>>((value, number) => new Epoch(value, number));
}

View File

@@ -12,19 +12,24 @@ import {
type MatrixRTCSession, type MatrixRTCSession,
MatrixRTCSessionEvent, MatrixRTCSessionEvent,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { fromEvent, map } from "rxjs"; import { fromEvent } from "rxjs";
import { type ObservableScope } from "./ObservableScope"; import {
type Epoch,
mapEpoch,
trackEpoch,
type ObservableScope,
} from "./ObservableScope";
import { type Behavior } from "./Behavior"; import { type Behavior } from "./Behavior";
export const membershipsAndTransports$ = ( export const membershipsAndTransports$ = (
scope: ObservableScope, scope: ObservableScope,
memberships$: Behavior<CallMembership[]>, memberships$: Behavior<Epoch<CallMembership[]>>,
): { ): {
membershipsWithTransport$: Behavior< membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[] Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]>
>; >;
transports$: Behavior<LivekitTransport[]>; transports$: Behavior<Epoch<LivekitTransport[]>>;
} => { } => {
/** /**
* Lists the transports used by ourselves, plus all other MatrixRTC session * Lists the transports used by ourselves, plus all other MatrixRTC session
@@ -36,7 +41,7 @@ export const membershipsAndTransports$ = (
*/ */
const membershipsWithTransport$ = scope.behavior( const membershipsWithTransport$ = scope.behavior(
memberships$.pipe( memberships$.pipe(
map((memberships) => { mapEpoch((memberships) => {
return memberships.map((membership) => { return memberships.map((membership) => {
const oldestMembership = memberships[0] ?? membership; const oldestMembership = memberships[0] ?? membership;
const transport = membership.getTransport(oldestMembership); const transport = membership.getTransport(oldestMembership);
@@ -51,7 +56,7 @@ export const membershipsAndTransports$ = (
const transports$ = scope.behavior( const transports$ = scope.behavior(
membershipsWithTransport$.pipe( membershipsWithTransport$.pipe(
map((mts) => mts.flatMap(({ transport: t }) => (t ? [t] : []))), mapEpoch((mts) => mts.flatMap(({ transport: t }) => (t ? [t] : []))),
), ),
); );
@@ -64,12 +69,12 @@ export const membershipsAndTransports$ = (
export const createMemberships$ = ( export const createMemberships$ = (
scope: ObservableScope, scope: ObservableScope,
matrixRTCSession: MatrixRTCSession, matrixRTCSession: MatrixRTCSession,
): Behavior<CallMembership[]> => { ): Behavior<Epoch<CallMembership[]>> => {
return scope.behavior( return scope.behavior(
fromEvent( fromEvent(
matrixRTCSession, matrixRTCSession,
MatrixRTCSessionEvent.MembershipsChanged, MatrixRTCSessionEvent.MembershipsChanged,
(_, memberships: CallMembership[]) => memberships, (_, memberships: CallMembership[]) => memberships,
), ).pipe(trackEpoch()),
); );
}; };

View File

@@ -28,23 +28,20 @@ import {
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../Behavior"; import { type Behavior } from "../Behavior";
import { import { type IConnectionManager } from "../remoteMembers/ConnectionManager";
type ConnectionManagerReturn,
type createConnectionManager$,
} from "../remoteMembers/ConnectionManager";
import { ObservableScope } from "../ObservableScope"; import { ObservableScope } from "../ObservableScope";
import { Publisher } from "./Publisher"; import { Publisher } from "./Publisher";
import { type MuteStates } from "../MuteStates"; import { type MuteStates } from "../MuteStates";
import { type ProcessorState } from "../../livekit/TrackProcessorContext"; import { type ProcessorState } from "../../livekit/TrackProcessorContext";
import { type MediaDevices } from "../MediaDevices"; import { type MediaDevices } from "../MediaDevices";
import { and$ } from "../../utils/observable"; import { and$ } from "../../utils/observable";
import { areLivekitTransportsEqual } from "../remoteMembers/matrixLivekitMerger";
import { import {
enterRTCSession, enterRTCSession,
type EnterRTCSessionOptions, type EnterRTCSessionOptions,
} from "../../rtcSessionHelpers"; } from "../../rtcSessionHelpers";
import { type ElementCallError } from "../../utils/errors"; import { type ElementCallError } from "../../utils/errors";
import { ElementWidgetActions, type WidgetHelpers } from "../../widget"; import { ElementWidgetActions, type WidgetHelpers } from "../../widget";
import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers";
enum LivekitState { enum LivekitState {
UNINITIALIZED = "uninitialized", UNINITIALIZED = "uninitialized",
@@ -93,7 +90,7 @@ interface Props {
scope: ObservableScope; scope: ObservableScope;
mediaDevices: MediaDevices; mediaDevices: MediaDevices;
muteStates: MuteStates; muteStates: MuteStates;
connectionManager: ConnectionManagerReturn; connectionManager: IConnectionManager;
matrixRTCSession: MatrixRTCSession; matrixRTCSession: MatrixRTCSession;
matrixRoom: MatrixRoom; matrixRoom: MatrixRoom;
localTransport$: Behavior<LivekitTransport | undefined>; localTransport$: Behavior<LivekitTransport | undefined>;
@@ -153,12 +150,13 @@ export const createLocalMembership$ = ({
// This should be used in a combineLatest with publisher$ to connect. // This should be used in a combineLatest with publisher$ to connect.
const tracks$ = new BehaviorSubject<LocalTrack[]>([]); const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
// Drop Epoch data here since we will not combine this anymore
const connection$ = scope.behavior( const connection$ = scope.behavior(
combineLatest( combineLatest(
[connectionManager.connections$, localTransport$], [connectionManager.connections$, localTransport$],
(connections, transport) => { (connections, transport) => {
if (transport === undefined) return undefined; if (transport === undefined) return undefined;
return connections.find((connection) => return connections.value.find((connection) =>
areLivekitTransportsEqual(connection.transport, transport), areLivekitTransportsEqual(connection.transport, transport),
); );
}, },

View File

@@ -13,13 +13,17 @@ import {
isLivekitTransportConfig, isLivekitTransportConfig,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { type MatrixClient } from "matrix-js-sdk"; import { type MatrixClient } from "matrix-js-sdk";
import { combineLatest, distinctUntilChanged, first, from, map } from "rxjs"; import { combineLatest, distinctUntilChanged, first, from } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { deepCompare } from "matrix-js-sdk/lib/utils"; import { deepCompare } from "matrix-js-sdk/lib/utils";
import { type Behavior } from "../Behavior.ts"; import { type Behavior } from "../Behavior.ts";
import { type ObservableScope } from "../ObservableScope.ts"; import {
type Epoch,
mapEpoch,
type ObservableScope,
} from "../ObservableScope.ts";
import { Config } from "../../config/Config.ts"; import { Config } from "../../config/Config.ts";
import { MatrixRTCTransportMissingError } from "../../utils/errors.ts"; import { MatrixRTCTransportMissingError } from "../../utils/errors.ts";
import { getSFUConfigWithOpenID } from "../../livekit/openIDSFU.ts"; import { getSFUConfigWithOpenID } from "../../livekit/openIDSFU.ts";
@@ -37,7 +41,7 @@ import { getSFUConfigWithOpenID } from "../../livekit/openIDSFU.ts";
*/ */
interface Props { interface Props {
scope: ObservableScope; scope: ObservableScope;
memberships$: Behavior<CallMembership[]>; memberships$: Behavior<Epoch<CallMembership[]>>;
client: MatrixClient; client: MatrixClient;
roomId: string; roomId: string;
useOldestMember$: Behavior<boolean>; useOldestMember$: Behavior<boolean>;
@@ -63,7 +67,7 @@ export const createLocalTransport$ = ({
*/ */
const oldestMemberTransport$ = scope.behavior( const oldestMemberTransport$ = scope.behavior(
memberships$.pipe( memberships$.pipe(
map((memberships) => memberships[0].getTransport(memberships[0])), mapEpoch((memberships) => memberships[0].getTransport(memberships[0])),
first((t) => t != undefined && isLivekitTransport(t)), first((t) => t != undefined && isLivekitTransport(t)),
), ),
undefined, undefined,

View File

@@ -19,7 +19,7 @@ import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
import { type Behavior } from "../Behavior"; import { type Behavior } from "../Behavior";
import { type Connection } from "./Connection"; import { type Connection } from "./Connection";
import { type ObservableScope } from "../ObservableScope"; import { Epoch, type ObservableScope } from "../ObservableScope";
import { generateKeyed$ } from "../../utils/observable"; import { generateKeyed$ } from "../../utils/observable";
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts"; import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts";
@@ -90,13 +90,13 @@ export class ConnectionManagerData {
interface Props { interface Props {
scope: ObservableScope; scope: ObservableScope;
connectionFactory: ConnectionFactory; connectionFactory: ConnectionFactory;
inputTransports$: Behavior<LivekitTransport[]>; inputTransports$: Behavior<Epoch<LivekitTransport[]>>;
} }
// TODO - write test for scopes (do we really need to bind scope) // TODO - write test for scopes (do we really need to bind scope)
export interface IConnectionManager { export interface IConnectionManager {
transports$: Behavior<LivekitTransport[]>; transports$: Behavior<Epoch<LivekitTransport[]>>;
connectionManagerData$: Behavior<ConnectionManagerData>; connectionManagerData$: Behavior<Epoch<ConnectionManagerData>>;
connections$: Behavior<Connection[]>; connections$: Behavior<Epoch<Connection[]>>;
} }
/** /**
* Crete a `ConnectionManager` * Crete a `ConnectionManager`
@@ -133,8 +133,10 @@ export function createConnectionManager$({
*/ */
const transports$ = scope.behavior( const transports$ = scope.behavior(
combineLatest([running$, inputTransports$]).pipe( combineLatest([running$, inputTransports$]).pipe(
map(([running, transports]) => (running ? transports : [])), map(([running, transports]) =>
map((transports) => removeDuplicateTransports(transports)), transports.mapInner((transport) => (running ? transport : [])),
),
map((transports) => transports.mapInner(removeDuplicateTransports)),
), ),
); );
@@ -142,7 +144,7 @@ export function createConnectionManager$({
* Connections for each transport in use by one or more session members. * Connections for each transport in use by one or more session members.
*/ */
const connections$ = scope.behavior( const connections$ = scope.behavior(
generateKeyed$<LivekitTransport[], Connection, Connection[]>( generateKeyed$<Epoch<LivekitTransport[]>, Connection, Epoch<Connection[]>>(
transports$, transports$,
(transports, createOrGet) => { (transports, createOrGet) => {
const createConnection = const createConnection =
@@ -162,46 +164,50 @@ export function createConnectionManager$({
return connection; return connection;
}; };
return transports.map((transport) => { return transports.mapInner((transports) => {
const key = return transports.map((transport) => {
transport.livekit_service_url + "|" + transport.livekit_alias; const key =
return createOrGet(key, createConnection(transport)); transport.livekit_service_url + "|" + transport.livekit_alias;
return createOrGet(key, createConnection(transport));
});
}); });
}, },
), ),
); );
const connectionManagerData$: Behavior<ConnectionManagerData> = const connectionManagerData$ = scope.behavior(
scope.behavior( connections$.pipe(
connections$.pipe( switchMap((connections) => {
switchMap((connections) => { const epoch = connections.epoch;
// Map the connections to list of {connection, participants}[]
const listOfConnectionsWithPublishingParticipants = connections.map( // Map the connections to list of {connection, participants}[]
(connection) => { const listOfConnectionsWithPublishingParticipants =
return connection.participantsWithTrack$.pipe( connections.value.map((connection) => {
map((participants) => ({ return connection.participantsWithTrack$.pipe(
connection, map((participants) => ({
participants, connection,
})), participants,
); })),
}, );
); });
// combineLatest the several streams into a single stream with the ConnectionManagerData
return combineLatest( // combineLatest the several streams into a single stream with the ConnectionManagerData
listOfConnectionsWithPublishingParticipants, return combineLatest(listOfConnectionsWithPublishingParticipants).pipe(
).pipe( map(
map((lists) => (lists) =>
lists.reduce((data, { connection, participants }) => { new Epoch(
data.add(connection, participants); lists.reduce((data, { connection, participants }) => {
return data; data.add(connection, participants);
}, new ConnectionManagerData()), return data;
), }, new ConnectionManagerData()),
); epoch,
}), ),
), ),
// start empty );
new ConnectionManagerData(), }),
); ),
);
return { transports$, connectionManagerData$, connections$ }; return { transports$, connectionManagerData$, connections$ };
} }

View File

@@ -13,14 +13,15 @@ import {
type LivekitTransport, type LivekitTransport,
type CallMembership, type CallMembership,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map } from "rxjs"; import { combineLatest, filter, map } from "rxjs";
// eslint-disable-next-line rxjs/no-internal // eslint-disable-next-line rxjs/no-internal
import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent"; import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent";
import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk"; import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk";
import { logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../Behavior"; import { type Behavior } from "../Behavior";
import { type IConnectionManager } from "./ConnectionManager"; import { type IConnectionManager } from "./ConnectionManager";
import { type ObservableScope } from "../ObservableScope"; import { Epoch, mapEpoch, type ObservableScope } from "../ObservableScope";
import { getRoomMemberFromRtcMember, memberDisplaynames$ } from "./displayname"; import { getRoomMemberFromRtcMember, memberDisplaynames$ } from "./displayname";
import { type Connection } from "./Connection"; import { type Connection } from "./Connection";
@@ -47,7 +48,7 @@ export interface MatrixLivekitMember {
interface Props { interface Props {
scope: ObservableScope; scope: ObservableScope;
membershipsWithTransport$: Behavior< membershipsWithTransport$: Behavior<
{ membership: CallMembership; transport?: LivekitTransport }[] Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]>
>; >;
connectionManager: IConnectionManager; connectionManager: IConnectionManager;
// TODO this is too much information for that class, // TODO this is too much information for that class,
@@ -74,7 +75,7 @@ export function createMatrixLivekitMembers$({
membershipsWithTransport$, membershipsWithTransport$,
connectionManager, connectionManager,
matrixRoom, matrixRoom,
}: Props): Behavior<MatrixLivekitMember[]> { }: Props): Behavior<Epoch<MatrixLivekitMember[]>> {
/** /**
* Stream of all the call members and their associated livekit data (if available). * Stream of all the call members and their associated livekit data (if available).
*/ */
@@ -82,7 +83,7 @@ export function createMatrixLivekitMembers$({
const displaynameMap$ = memberDisplaynames$( const displaynameMap$ = memberDisplaynames$(
scope, scope,
matrixRoom, matrixRoom,
membershipsWithTransport$.pipe(map((v) => v.map((v) => v.membership))), membershipsWithTransport$.pipe(mapEpoch((v) => v.map((v) => v.membership))),
); );
return scope.behavior( return scope.behavior(
@@ -91,48 +92,52 @@ export function createMatrixLivekitMembers$({
connectionManager.connectionManagerData$, connectionManager.connectionManagerData$,
displaynameMap$, displaynameMap$,
]).pipe( ]).pipe(
// filter( filter((values) =>
// ([membershipsWithTransports, managerData, displaynames]) => values.every((value) => value.epoch === values[0].epoch),
// // for each change in ),
// displaynames.size === membershipsWithTransports.length && map(
// displaynames.size === managerData.getConnections().length, ([
// ), { value: membershipsWithTransports, epoch },
map(([memberships, managerData, displaynames]) => { { value: managerData },
const items: MatrixLivekitMember[] = memberships.map( { value: displaynames },
({ membership, transport }) => { ]) => {
// TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to const items: MatrixLivekitMember[] = membershipsWithTransports.map(
const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`; ({ membership, transport }) => {
// TODO! cannot use membership.membershipID yet, Currently its hardcoded by the jwt service to
const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`;
const participants = transport const participants = transport
? managerData.getParticipantForTransport(transport) ? managerData.getParticipantForTransport(transport)
: []; : [];
const participant = participants.find( const participant = participants.find(
(p) => p.identity == participantId, (p) => p.identity == participantId,
); );
const member = getRoomMemberFromRtcMember( const member = getRoomMemberFromRtcMember(
membership, membership,
matrixRoom, matrixRoom,
)?.member; )?.member;
const connection = transport const connection = transport
? managerData.getConnectionForTransport(transport) ? managerData.getConnectionForTransport(transport)
: undefined; : undefined;
const displayName = displaynames.get(participantId); const displayName = displaynames.get(participantId);
return { return {
participant, participant,
membership, membership,
connection, connection,
// This makes sense to add to the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) // This makes sense to add to the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar)
// TODO Ugh this is hidign that it might be undefined!! best we remove the member entirely. // TODO Ugh this is hidign that it might be undefined!! best we remove the member entirely.
member: member as RoomMember, member: member as RoomMember,
displayName, displayName,
mxcAvatarUrl: member?.getMxcAvatarUrl(), mxcAvatarUrl: member?.getMxcAvatarUrl(),
participantId, participantId,
}; };
}, },
); );
return items; return new Epoch(items, epoch);
}), },
),
), ),
// new Epoch([]),
); );
} }

View File

@@ -16,14 +16,15 @@ import {
import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { type Room as MatrixRoom } from "matrix-js-sdk/lib/matrix"; import { type Room as MatrixRoom } from "matrix-js-sdk/lib/matrix";
// eslint-disable-next-line rxjs/no-internal
import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent";
import { type ObservableScope } from "../ObservableScope"; import { Epoch, type ObservableScope } from "../ObservableScope";
import { import {
calculateDisplayName, calculateDisplayName,
shouldDisambiguate, shouldDisambiguate,
} from "../../utils/displayname"; } from "../../utils/displayname";
import { type Behavior } from "../Behavior"; import { type Behavior } from "../Behavior";
import type { NodeStyleEventEmitter } from "rxjs/src/internal/observable/fromEvent.ts";
/** /**
* Displayname for each member of the call. This will disambiguate * Displayname for each member of the call. This will disambiguate
@@ -36,8 +37,8 @@ import type { NodeStyleEventEmitter } from "rxjs/src/internal/observable/fromEve
export const memberDisplaynames$ = ( export const memberDisplaynames$ = (
scope: ObservableScope, scope: ObservableScope,
matrixRoom: Pick<MatrixRoom, "getMember"> & NodeStyleEventEmitter, matrixRoom: Pick<MatrixRoom, "getMember"> & NodeStyleEventEmitter,
memberships$: Observable<CallMembership[]>, memberships$: Observable<Epoch<CallMembership[]>>,
): Behavior<Map<string, string>> => ): Behavior<Epoch<Map<string, string>>> =>
scope.behavior( scope.behavior(
combineLatest([ combineLatest([
// Handle call membership changes // Handle call membership changes
@@ -46,7 +47,8 @@ export const memberDisplaynames$ = (
fromEvent(matrixRoom, RoomStateEvent.Members).pipe(startWith(null)), fromEvent(matrixRoom, RoomStateEvent.Members).pipe(startWith(null)),
// TODO: do we need: pauseWhen(this.pretendToBeDisconnected$), // TODO: do we need: pauseWhen(this.pretendToBeDisconnected$),
]).pipe( ]).pipe(
map(([memberships, _displayNames]) => { map(([epochMemberships, _displayNames]) => {
const { epoch, value: memberships } = epochMemberships;
const displaynameMap = new Map<string, string>(); const displaynameMap = new Map<string, string>();
const room = matrixRoom; const room = matrixRoom;
@@ -68,10 +70,10 @@ export const memberDisplaynames$ = (
calculateDisplayName(member, disambiguate), calculateDisplayName(member, disambiguate),
); );
} }
return displaynameMap; return new Epoch(displaynameMap, epoch);
}), }),
), ),
new Map<string, string>(), new Epoch(new Map<string, string>()),
); );
export function getRoomMemberFromRtcMember( export function getRoomMemberFromRtcMember(

View File

@@ -14,7 +14,7 @@ import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk"; import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { ObservableScope } from "../ObservableScope.ts"; import { type Epoch, ObservableScope, trackEpoch } from "../ObservableScope.ts";
import { ECConnectionFactory } from "./ConnectionFactory.ts"; import { ECConnectionFactory } from "./ConnectionFactory.ts";
import { type OpenIDClientParts } from "../../livekit/openIDSFU.ts"; import { type OpenIDClientParts } from "../../livekit/openIDSFU.ts";
import { import {
@@ -107,25 +107,20 @@ afterEach(() => {
}); });
test("bob, carl, then bob joining no tracks yet", () => { test("bob, carl, then bob joining no tracks yet", () => {
withTestScheduler(({ expectObservable, behavior }) => { withTestScheduler(({ expectObservable, behavior, scope }) => {
const bobMembership = mockCallMembership("@bob:example.com", "BDEV000"); const bobMembership = mockCallMembership("@bob:example.com", "BDEV000");
const carlMembership = mockCallMembership("@carl:example.com", "CDEV000"); const carlMembership = mockCallMembership("@carl:example.com", "CDEV000");
const daveMembership = mockCallMembership("@dave:foo.bar", "DDEV000"); const daveMembership = mockCallMembership("@dave:foo.bar", "DDEV000");
// We add the `---` because there is a limitation in rxjs marbles https://github.com/ReactiveX/rxjs/issues/5677 const eMarble = "abc";
// Because we several values emitted at the same frame, so we use the grouping format const vMarble = "abc";
// e.g. a(bc) to indicate that b and c are emitted at the same time. But rxjs marbles advance the const memberships$ = scope.behavior(
// time by the number of characters in the marble diagram, so we need to add some padding to avoid so that behavior(eMarble, {
// the next emission is testable a: [bobMembership],
// ab---c--- b: [bobMembership, carlMembership],
// a(bc)(de) c: [bobMembership, carlMembership, daveMembership],
const eMarble = "ab----c----"; }).pipe(trackEpoch()),
const vMarble = "a(xxb)(xxc)"; );
const memberships$ = behavior(eMarble, {
a: [bobMembership],
b: [bobMembership, carlMembership],
c: [bobMembership, carlMembership, daveMembership],
});
const membershipsAndTransports = membershipsAndTransports$( const membershipsAndTransports = membershipsAndTransports$(
testScope, testScope,
@@ -147,7 +142,8 @@ test("bob, carl, then bob joining no tracks yet", () => {
}); });
expectObservable(matrixLivekitItems$).toBe(vMarble, { expectObservable(matrixLivekitItems$).toBe(vMarble, {
a: expect.toSatisfy((items: MatrixLivekitMember[]) => { a: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => {
const items = e.value;
expect(items.length).toBe(1); expect(items.length).toBe(1);
const item = items[0]!; const item = items[0]!;
expect(item.membership).toStrictEqual(bobMembership); expect(item.membership).toStrictEqual(bobMembership);
@@ -160,7 +156,8 @@ test("bob, carl, then bob joining no tracks yet", () => {
expect(item.participant).toBeUndefined(); expect(item.participant).toBeUndefined();
return true; return true;
}), }),
b: expect.toSatisfy((items: MatrixLivekitMember[]) => { b: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => {
const items = e.value;
expect(items.length).toBe(2); expect(items.length).toBe(2);
{ {
@@ -185,7 +182,8 @@ test("bob, carl, then bob joining no tracks yet", () => {
} }
return true; return true;
}), }),
c: expect.toSatisfy((items: MatrixLivekitMember[]) => { c: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => {
const items = e.value;
logger.info(`E Items length: ${items.length}`); logger.info(`E Items length: ${items.length}`);
expect(items.length).toBe(3); expect(items.length).toBe(3);
{ {