Merge pull request #3537 from robintown/connection-leaks

Fix resource leaks when we stop using a connection
This commit is contained in:
Robin
2025-10-21 09:56:31 -04:00
committed by GitHub
23 changed files with 533 additions and 455 deletions

View File

@@ -24,6 +24,7 @@ import { App } from "./App";
import { init as initRageshake } from "./settings/rageshake"; import { init as initRageshake } from "./settings/rageshake";
import { Initializer } from "./initializer"; import { Initializer } from "./initializer";
import { AppViewModel } from "./state/AppViewModel"; import { AppViewModel } from "./state/AppViewModel";
import { globalScope } from "./state/ObservableScope";
window.setLKLogLevel = setLKLogLevel; window.setLKLogLevel = setLKLogLevel;
@@ -61,7 +62,7 @@ Initializer.initBeforeReact()
.then(() => { .then(() => {
root.render( root.render(
<StrictMode> <StrictMode>
<App vm={new AppViewModel()} />, <App vm={new AppViewModel(globalScope)} />,
</StrictMode>, </StrictMode>,
); );
}) })

View File

@@ -23,7 +23,7 @@ import {
localRtcMember, localRtcMember,
} from "../utils/test-fixtures"; } from "../utils/test-fixtures";
import { getBasicRTCSession } from "../utils/test-viewmodel"; import { getBasicRTCSession } from "../utils/test-viewmodel";
import { withTestScheduler } from "../utils/test"; import { testScope, withTestScheduler } from "../utils/test";
import { ElementCallReactionEventType, ReactionSet } from "."; import { ElementCallReactionEventType, ReactionSet } from ".";
afterEach(() => { afterEach(() => {
@@ -37,6 +37,7 @@ test("handles a hand raised reaction", () => {
withTestScheduler(({ schedule, expectObservable }) => { withTestScheduler(({ schedule, expectObservable }) => {
renderHook(() => { renderHook(() => {
const { raisedHands$ } = new ReactionsReader( const { raisedHands$ } = new ReactionsReader(
testScope(),
rtcSession.asMockedSession(), rtcSession.asMockedSession(),
); );
schedule("ab", { schedule("ab", {
@@ -85,6 +86,7 @@ test("handles a redaction", () => {
withTestScheduler(({ schedule, expectObservable }) => { withTestScheduler(({ schedule, expectObservable }) => {
renderHook(() => { renderHook(() => {
const { raisedHands$ } = new ReactionsReader( const { raisedHands$ } = new ReactionsReader(
testScope(),
rtcSession.asMockedSession(), rtcSession.asMockedSession(),
); );
schedule("abc", { schedule("abc", {
@@ -148,6 +150,7 @@ test("handles waiting for event decryption", () => {
withTestScheduler(({ schedule, expectObservable }) => { withTestScheduler(({ schedule, expectObservable }) => {
renderHook(() => { renderHook(() => {
const { raisedHands$ } = new ReactionsReader( const { raisedHands$ } = new ReactionsReader(
testScope(),
rtcSession.asMockedSession(), rtcSession.asMockedSession(),
); );
schedule("abc", { schedule("abc", {
@@ -217,6 +220,7 @@ test("hands rejecting events without a proper membership", () => {
withTestScheduler(({ schedule, expectObservable }) => { withTestScheduler(({ schedule, expectObservable }) => {
renderHook(() => { renderHook(() => {
const { raisedHands$ } = new ReactionsReader( const { raisedHands$ } = new ReactionsReader(
testScope(),
rtcSession.asMockedSession(), rtcSession.asMockedSession(),
); );
schedule("ab", { schedule("ab", {
@@ -261,7 +265,10 @@ test("handles a reaction", () => {
withTestScheduler(({ schedule, time, expectObservable }) => { withTestScheduler(({ schedule, time, expectObservable }) => {
renderHook(() => { renderHook(() => {
const { reactions$ } = new ReactionsReader(rtcSession.asMockedSession()); const { reactions$ } = new ReactionsReader(
testScope(),
rtcSession.asMockedSession(),
);
schedule(`abc`, { schedule(`abc`, {
a: () => {}, a: () => {},
b: () => { b: () => {
@@ -317,7 +324,10 @@ test("ignores bad reaction events", () => {
withTestScheduler(({ schedule, expectObservable }) => { withTestScheduler(({ schedule, expectObservable }) => {
renderHook(() => { renderHook(() => {
const { reactions$ } = new ReactionsReader(rtcSession.asMockedSession()); const { reactions$ } = new ReactionsReader(
testScope(),
rtcSession.asMockedSession(),
);
schedule("ab", { schedule("ab", {
a: () => {}, a: () => {},
b: () => { b: () => {
@@ -439,7 +449,10 @@ test("that reactions cannot be spammed", () => {
withTestScheduler(({ schedule, expectObservable }) => { withTestScheduler(({ schedule, expectObservable }) => {
renderHook(() => { renderHook(() => {
const { reactions$ } = new ReactionsReader(rtcSession.asMockedSession()); const { reactions$ } = new ReactionsReader(
testScope(),
rtcSession.asMockedSession(),
);
schedule("abcd", { schedule("abcd", {
a: () => {}, a: () => {},
b: () => { b: () => {

View File

@@ -18,7 +18,7 @@ import {
EventType, EventType,
RoomEvent as MatrixRoomEvent, RoomEvent as MatrixRoomEvent,
} from "matrix-js-sdk"; } from "matrix-js-sdk";
import { BehaviorSubject, delay, type Subscription } from "rxjs"; import { BehaviorSubject, delay } from "rxjs";
import { import {
ElementCallReactionEventType, ElementCallReactionEventType,
@@ -28,6 +28,7 @@ import {
type RaisedHandInfo, type RaisedHandInfo,
type ReactionInfo, type ReactionInfo,
} from "."; } from ".";
import { type ObservableScope } from "../state/ObservableScope";
export const REACTION_ACTIVE_TIME_MS = 3000; export const REACTION_ACTIVE_TIME_MS = 3000;
@@ -54,12 +55,13 @@ export class ReactionsReader {
*/ */
public readonly reactions$ = this.reactionsSubject$.asObservable(); public readonly reactions$ = this.reactionsSubject$.asObservable();
private readonly reactionsSub: Subscription; public constructor(
private readonly scope: ObservableScope,
public constructor(private readonly rtcSession: MatrixRTCSession) { private readonly rtcSession: MatrixRTCSession,
) {
// Hide reactions after a given time. // Hide reactions after a given time.
this.reactionsSub = this.reactionsSubject$ this.reactionsSubject$
.pipe(delay(REACTION_ACTIVE_TIME_MS)) .pipe(delay(REACTION_ACTIVE_TIME_MS), this.scope.bind())
.subscribe((reactions) => { .subscribe((reactions) => {
const date = new Date(); const date = new Date();
const nextEntries = Object.fromEntries( const nextEntries = Object.fromEntries(
@@ -71,15 +73,38 @@ export class ReactionsReader {
this.reactionsSubject$.next(nextEntries); this.reactionsSubject$.next(nextEntries);
}); });
// TODO: Convert this class to the functional reactive style and get rid of
// all this manual setup and teardown for event listeners
this.rtcSession.room.on(MatrixRoomEvent.Timeline, this.handleReactionEvent); this.rtcSession.room.on(MatrixRoomEvent.Timeline, this.handleReactionEvent);
this.scope.onEnd(() =>
this.rtcSession.room.off(
MatrixRoomEvent.Timeline,
this.handleReactionEvent,
),
);
this.rtcSession.room.on( this.rtcSession.room.on(
MatrixRoomEvent.Redaction, MatrixRoomEvent.Redaction,
this.handleReactionEvent, this.handleReactionEvent,
); );
this.scope.onEnd(() =>
this.rtcSession.room.off(
MatrixRoomEvent.Redaction,
this.handleReactionEvent,
),
);
this.rtcSession.room.client.on( this.rtcSession.room.client.on(
MatrixEventEvent.Decrypted, MatrixEventEvent.Decrypted,
this.handleReactionEvent, this.handleReactionEvent,
); );
this.scope.onEnd(() =>
this.rtcSession.room.client.off(
MatrixEventEvent.Decrypted,
this.handleReactionEvent,
),
);
// We listen for a local echo to get the real event ID, as timeline events // We listen for a local echo to get the real event ID, as timeline events
// may still be sending. // may still be sending.
@@ -87,11 +112,23 @@ export class ReactionsReader {
MatrixRoomEvent.LocalEchoUpdated, MatrixRoomEvent.LocalEchoUpdated,
this.handleReactionEvent, this.handleReactionEvent,
); );
this.scope.onEnd(() =>
this.rtcSession.room.off(
MatrixRoomEvent.LocalEchoUpdated,
this.handleReactionEvent,
),
);
rtcSession.on( this.rtcSession.on(
MatrixRTCSessionEvent.MembershipsChanged, MatrixRTCSessionEvent.MembershipsChanged,
this.onMembershipsChanged, this.onMembershipsChanged,
); );
this.scope.onEnd(() =>
this.rtcSession.off(
MatrixRTCSessionEvent.MembershipsChanged,
this.onMembershipsChanged,
),
);
// Run this once to ensure we have fetched the state from the call. // Run this once to ensure we have fetched the state from the call.
this.onMembershipsChanged([]); this.onMembershipsChanged([]);
@@ -309,31 +346,4 @@ export class ReactionsReader {
this.removeRaisedHand(targetUser); this.removeRaisedHand(targetUser);
} }
}; };
/**
* Stop listening for events.
*/
public destroy(): void {
this.rtcSession.off(
MatrixRTCSessionEvent.MembershipsChanged,
this.onMembershipsChanged,
);
this.rtcSession.room.off(
MatrixRoomEvent.Timeline,
this.handleReactionEvent,
);
this.rtcSession.room.off(
MatrixRoomEvent.Redaction,
this.handleReactionEvent,
);
this.rtcSession.room.client.off(
MatrixEventEvent.Decrypted,
this.handleReactionEvent,
);
this.rtcSession.room.off(
MatrixRoomEvent.LocalEchoUpdated,
this.handleReactionEvent,
);
this.reactionsSub.unsubscribe();
}
} }

View File

@@ -110,6 +110,7 @@ import ringtoneMp3 from "../sound/ringtone.mp3?url";
import ringtoneOgg from "../sound/ringtone.ogg?url"; import ringtoneOgg from "../sound/ringtone.ogg?url";
import { useTrackProcessorObservable$ } from "../livekit/TrackProcessorContext.tsx"; import { useTrackProcessorObservable$ } from "../livekit/TrackProcessorContext.tsx";
import { type Layout } from "../state/layout-types.ts"; import { type Layout } from "../state/layout-types.ts";
import { ObservableScope } from "../state/ObservableScope.ts";
const maxTapDurationMs = 400; const maxTapDurationMs = 400;
@@ -129,8 +130,10 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
const trackProcessorState$ = useTrackProcessorObservable$(); const trackProcessorState$ = useTrackProcessorObservable$();
useEffect(() => { useEffect(() => {
const reactionsReader = new ReactionsReader(props.rtcSession); const scope = new ObservableScope();
const reactionsReader = new ReactionsReader(scope, props.rtcSession);
const vm = new CallViewModel( const vm = new CallViewModel(
scope,
props.rtcSession, props.rtcSession,
props.matrixRoom, props.matrixRoom,
mediaDevices, mediaDevices,
@@ -146,11 +149,9 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
); );
setVm(vm); setVm(vm);
const sub = vm.leave$.subscribe(props.onLeft); vm.leave$.pipe(scope.bind()).subscribe(props.onLeft);
return (): void => { return (): void => {
vm.destroy(); scope.end();
sub.unsubscribe();
reactionsReader.destroy();
}; };
}, [ }, [
props.rtcSession, props.rtcSession,

View File

@@ -108,9 +108,7 @@ function mockMediaDevices(
throw new Error("Unimplemented"); throw new Error("Unimplemented");
} }
}); });
const scope = new ObservableScope(); return new MediaDevices(testScope());
onTestFinished(() => scope.end());
return new MediaDevices(scope);
} }
describe("useMuteStates VITE_PACKAGE='full' (SPA) mode", () => { describe("useMuteStates VITE_PACKAGE='full' (SPA) mode", () => {

View File

@@ -6,14 +6,16 @@ Please see LICENSE in the repository root for full details.
*/ */
import { MediaDevices } from "./MediaDevices"; import { MediaDevices } from "./MediaDevices";
import { ViewModel } from "./ViewModel"; import { type ObservableScope } from "./ObservableScope";
/** /**
* The top-level state holder for the application. * The top-level state holder for the application.
*/ */
export class AppViewModel extends ViewModel { export class AppViewModel {
public readonly mediaDevices = new MediaDevices(this.scope); public readonly mediaDevices = new MediaDevices(this.scope);
// TODO: Move more application logic here. The CallViewModel, at the very // TODO: Move more application logic here. The CallViewModel, at the very
// least, ought to be accessible from this object. // least, ought to be accessible from this object.
public constructor(private readonly scope: ObservableScope) {}
} }

View File

@@ -60,6 +60,7 @@ import {
mockMediaDevices, mockMediaDevices,
mockMuteStates, mockMuteStates,
mockConfig, mockConfig,
testScope,
} from "../utils/test"; } from "../utils/test";
import { import {
ECAddonConnectionState, ECAddonConnectionState,
@@ -89,7 +90,6 @@ import {
localRtcMember, localRtcMember,
localRtcMemberDevice2, localRtcMemberDevice2,
} from "../utils/test-fixtures"; } from "../utils/test-fixtures";
import { ObservableScope } from "./ObservableScope";
import { MediaDevices } from "./MediaDevices"; import { MediaDevices } from "./MediaDevices";
import { getValue } from "../utils/observable"; import { getValue } from "../utils/observable";
import { type Behavior, constant } from "./Behavior"; import { type Behavior, constant } from "./Behavior";
@@ -347,6 +347,7 @@ function withCallViewModel(
const reactions$ = new BehaviorSubject<Record<string, ReactionInfo>>({}); const reactions$ = new BehaviorSubject<Record<string, ReactionInfo>>({});
const vm = new CallViewModel( const vm = new CallViewModel(
testScope(),
rtcSession.asMockedSession(), rtcSession.asMockedSession(),
room, room,
mediaDevices, mediaDevices,
@@ -361,7 +362,6 @@ function withCallViewModel(
); );
onTestFinished(() => { onTestFinished(() => {
vm!.destroy();
participantsSpy!.mockRestore(); participantsSpy!.mockRestore();
mediaSpy!.mockRestore(); mediaSpy!.mockRestore();
eventsSpy!.mockRestore(); eventsSpy!.mockRestore();
@@ -402,6 +402,7 @@ test("test missing RTC config error", async () => {
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({}); vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({});
const callVM = new CallViewModel( const callVM = new CallViewModel(
testScope(),
fakeRtcSession.asMockedSession(), fakeRtcSession.asMockedSession(),
matrixRoom, matrixRoom,
mockMediaDevices({}), mockMediaDevices({}),
@@ -1630,9 +1631,7 @@ test("audio output changes when toggling earpiece mode", () => {
getUrlParams.mockReturnValue({ controlledAudioDevices: true }); getUrlParams.mockReturnValue({ controlledAudioDevices: true });
vi.mocked(ComponentsCore.createMediaDeviceObserver).mockReturnValue(of([])); vi.mocked(ComponentsCore.createMediaDeviceObserver).mockReturnValue(of([]));
const scope = new ObservableScope(); const devices = new MediaDevices(testScope());
onTestFinished(() => scope.end());
const devices = new MediaDevices(scope);
window.controls.setAvailableAudioDevices([ window.controls.setAvailableAudioDevices([
{ id: "speaker", name: "Speaker", isSpeaker: true }, { id: "speaker", name: "Speaker", isSpeaker: true },

View File

@@ -73,7 +73,6 @@ import {
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { type IWidgetApiRequest } from "matrix-widget-api"; import { type IWidgetApiRequest } from "matrix-widget-api";
import { ViewModel } from "./ViewModel";
import { import {
LocalUserMediaViewModel, LocalUserMediaViewModel,
type MediaViewModel, type MediaViewModel,
@@ -84,7 +83,7 @@ import {
import { import {
accumulate, accumulate,
and$, and$,
finalizeValue, generateKeyed$,
pauseWhen, pauseWhen,
} from "../utils/observable"; } from "../utils/observable";
import { import {
@@ -117,11 +116,7 @@ import {
} from "../rtcSessionHelpers"; } from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType"; import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
import { import { type Connection, RemoteConnection } from "./Connection";
type Connection,
type ConnectionOpts,
RemoteConnection,
} from "./Connection";
import { type MuteStates } from "./MuteStates"; import { type MuteStates } from "./MuteStates";
import { getUrlParams } from "../UrlParams"; import { getUrlParams } from "../UrlParams";
import { type ProcessorState } from "../livekit/TrackProcessorContext"; import { type ProcessorState } from "../livekit/TrackProcessorContext";
@@ -176,7 +171,15 @@ interface LayoutScanState {
type MediaItem = UserMedia | ScreenShare; type MediaItem = UserMedia | ScreenShare;
export class CallViewModel extends ViewModel { /**
* A view model providing all the application logic needed to show the in-call
* UI (may eventually be expanded to cover the lobby and feedback screens in the
* future).
*/
// Throughout this class and related code we must distinguish between MatrixRTC
// state and LiveKit state. We use the common terminology of room "members", RTC
// "memberships", and LiveKit "participants".
export class CallViewModel {
private readonly urlParams = getUrlParams(); private readonly urlParams = getUrlParams();
private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession); private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession);
@@ -370,26 +373,36 @@ export class CallViewModel extends ViewModel {
*/ */
private readonly localConnection$: Behavior<Async<PublishConnection> | null> = private readonly localConnection$: Behavior<Async<PublishConnection> | null> =
this.scope.behavior( this.scope.behavior(
this.localTransport$.pipe( generateKeyed$<
map( Async<LivekitTransport> | null,
(transport) => PublishConnection,
transport && Async<PublishConnection> | null
mapAsync(transport, (transport) => { >(
const opts: ConnectionOpts = { this.localTransport$,
transport, (transport, createOrGet) =>
client: this.matrixRTCSession.room.client, transport &&
scope: this.scope, mapAsync(transport, (transport) =>
remoteTransports$: this.remoteTransports$, createOrGet(
}; // Stable key that uniquely idenifies the transport
return new PublishConnection( JSON.stringify({
opts, url: transport.livekit_service_url,
this.mediaDevices, alias: transport.livekit_alias,
this.muteStates, }),
this.e2eeLivekitOptions(), (scope) =>
this.scope.behavior(this.trackProcessorState$), new PublishConnection(
); {
}), transport,
), client: this.matrixRoom.client,
scope,
remoteTransports$: this.remoteTransports$,
},
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
),
),
),
), ),
); );
@@ -416,61 +429,47 @@ export class CallViewModel extends ViewModel {
* is *distinct* from the local transport. * is *distinct* from the local transport.
*/ */
private readonly remoteConnections$ = this.scope.behavior( private readonly remoteConnections$ = this.scope.behavior(
this.transports$.pipe( generateKeyed$<typeof this.transports$.value, Connection, Connection[]>(
accumulate(new Map<string, Connection>(), (prev, transports) => { this.transports$,
const next = new Map<string, Connection>(); (transports, createOrGet) => {
const connections: Connection[] = [];
// Until the local transport becomes ready we have no idea which // Until the local transport becomes ready we have no idea which
// transports will actually need a dedicated remote connection // transports will actually need a dedicated remote connection
if (transports?.local.state === "ready") { if (transports?.local.state === "ready") {
const oldestMembership = this.matrixRTCSession.getOldestMembership(); // TODO: Handle custom transport.livekit_alias values here
const localServiceUrl = transports.local.value.livekit_service_url; const localServiceUrl = transports.local.value.livekit_service_url;
const remoteServiceUrls = new Set( const remoteServiceUrls = new Set(
transports.remote.flatMap(({ membership, transport }) => { transports.remote.map(
const t = membership.getTransport(oldestMembership ?? membership); ({ transport }) => transport.livekit_service_url,
return t && ),
isLivekitTransport(t) &&
t.livekit_service_url !== localServiceUrl
? [t.livekit_service_url]
: [];
}),
); );
remoteServiceUrls.delete(localServiceUrl);
for (const remoteServiceUrl of remoteServiceUrls) { for (const remoteServiceUrl of remoteServiceUrls)
let nextConnection = prev.get(remoteServiceUrl); connections.push(
if (!nextConnection) { createOrGet(
logger.log(
"SFU remoteConnections$ construct new connection: ",
remoteServiceUrl, remoteServiceUrl,
); (scope) =>
new RemoteConnection(
const args: ConnectionOpts = { {
transport: { transport: {
type: "livekit", type: "livekit",
livekit_service_url: remoteServiceUrl, livekit_service_url: remoteServiceUrl,
livekit_alias: this.livekitAlias, livekit_alias: this.livekitAlias,
}, },
client: this.matrixRTCSession.room.client, client: this.matrixRoom.client,
scope: this.scope, scope,
remoteTransports$: this.remoteTransports$, remoteTransports$: this.remoteTransports$,
}; },
nextConnection = new RemoteConnection( this.e2eeLivekitOptions(),
args, ),
this.e2eeLivekitOptions(), ),
); );
} else {
logger.log(
"SFU remoteConnections$ use prev connection: ",
remoteServiceUrl,
);
}
next.set(remoteServiceUrl, nextConnection);
}
} }
return next; return connections;
}), },
map((transports) => [...transports.values()]),
), ),
); );
@@ -755,80 +754,78 @@ export class CallViewModel extends ViewModel {
); );
/** /**
* List of MediaItems that we want to display * List of MediaItems that we want to have tiles for.
*/ */
private readonly mediaItems$ = this.scope.behavior<MediaItem[]>( private readonly mediaItems$ = this.scope.behavior<MediaItem[]>(
combineLatest([this.participantsByRoom$, duplicateTiles.value$]).pipe( generateKeyed$<
scan((prevItems, [participantsByRoom, duplicateTiles]) => { [typeof this.participantsByRoom$.value, number],
const newItems: Map<string, UserMedia | ScreenShare> = new Map( MediaItem,
function* (this: CallViewModel): Iterable<[string, MediaItem]> { MediaItem[]
for (const { >(
livekitRoom, // Generate a collection of MediaItems from the list of expected (whether
participants, // present or missing) LiveKit participants.
url, combineLatest([this.participantsByRoom$, duplicateTiles.value$]),
} of participantsByRoom) { ([participantsByRoom, duplicateTiles], createOrGet) => {
for (const { id, participant, member } of participants) { const items: MediaItem[] = [];
for (let i = 0; i < 1 + duplicateTiles; i++) {
const mediaId = `${id}:${i}`;
const prevMedia = prevItems.get(mediaId);
if (prevMedia instanceof UserMedia)
prevMedia.updateParticipant(participant);
yield [ for (const { livekitRoom, participants, url } of participantsByRoom) {
for (const { id, participant, member } of participants) {
for (let i = 0; i < 1 + duplicateTiles; i++) {
const mediaId = `${id}:${i}`;
const item = createOrGet(
mediaId,
(scope) =>
// We create UserMedia with or without a participant.
// This will be the initial value of a BehaviourSubject.
// Once a participant appears we will update the BehaviourSubject. (see below)
new UserMedia(
scope,
mediaId, mediaId,
// We create UserMedia with or without a participant. member,
// This will be the initial value of a BehaviourSubject. participant,
// Once a participant appears we will update the BehaviourSubject. (see above) this.options.encryptionSystem,
prevMedia ?? livekitRoom,
new UserMedia( url,
mediaId, this.mediaDevices,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
map((m) => m.get(id) ?? "[👻]"),
),
this.handsRaised$.pipe(map((v) => v[id]?.time ?? null)),
this.reactions$.pipe(map((v) => v[id] ?? undefined)),
),
);
items.push(item);
(item as UserMedia).updateParticipant(participant);
if (participant?.isScreenShareEnabled) {
const screenShareId = `${mediaId}:screen-share`;
items.push(
createOrGet(
screenShareId,
(scope) =>
new ScreenShare(
scope,
screenShareId,
member, member,
participant, participant,
this.options.encryptionSystem, this.options.encryptionSystem,
livekitRoom, livekitRoom,
url, url,
this.mediaDevices,
this.pretendToBeDisconnected$, this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe( this.memberDisplaynames$.pipe(
map((m) => m.get(id) ?? "[👻]"), map((m) => m.get(id) ?? "[👻]"),
), ),
this.handsRaised$.pipe(map((v) => v[id]?.time ?? null)),
this.reactions$.pipe(map((v) => v[id] ?? undefined)),
), ),
]; ),
);
if (participant?.isScreenShareEnabled) {
const screenShareId = `${mediaId}:screen-share`;
yield [
screenShareId,
prevItems.get(screenShareId) ??
new ScreenShare(
screenShareId,
member,
participant,
this.options.encryptionSystem,
livekitRoom,
url,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
map((m) => m.get(id) ?? "[👻]"),
),
),
];
}
}
} }
} }
}.bind(this)(), }
); }
for (const [id, t] of prevItems) if (!newItems.has(id)) t.destroy(); return items;
return newItems; },
}, new Map<string, MediaItem>()),
map((mediaItems) => [...mediaItems.values()]),
finalizeValue((ts) => {
for (const t of ts) t.destroy();
}),
), ),
); );
@@ -1739,6 +1736,7 @@ export class CallViewModel extends ViewModel {
: null; : null;
public constructor( public constructor(
private readonly scope: ObservableScope,
// A call is permanently tied to a single Matrix room // A call is permanently tied to a single Matrix room
private readonly matrixRTCSession: MatrixRTCSession, private readonly matrixRTCSession: MatrixRTCSession,
private readonly matrixRoom: MatrixRoom, private readonly matrixRoom: MatrixRoom,
@@ -1753,8 +1751,6 @@ export class CallViewModel extends ViewModel {
>, >,
private readonly trackProcessorState$: Observable<ProcessorState>, private readonly trackProcessorState$: Observable<ProcessorState>,
) { ) {
super();
// Start and stop local and remote connections as needed // Start and stop local and remote connections as needed
this.connectionInstructions$ this.connectionInstructions$
.pipe(this.scope.bind()) .pipe(this.scope.bind())

View File

@@ -21,6 +21,7 @@ import {
type CallMembership, type CallMembership,
type LivekitTransport, type LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { logger } from "matrix-js-sdk/lib/logger";
import { BehaviorSubject, combineLatest, type Observable } from "rxjs"; import { BehaviorSubject, combineLatest, type Observable } from "rxjs";
import { import {
@@ -218,6 +219,9 @@ export class Connection {
public readonly livekitRoom: LivekitRoom, public readonly livekitRoom: LivekitRoom,
opts: ConnectionOpts, opts: ConnectionOpts,
) { ) {
logger.log(
`[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
);
const { transport, client, scope, remoteTransports$ } = opts; const { transport, client, scope, remoteTransports$ } = opts;
this.transport = transport; this.transport = transport;

View File

@@ -17,8 +17,8 @@ import {
mockLocalParticipant, mockLocalParticipant,
mockMediaDevices, mockMediaDevices,
mockRtcMembership, mockRtcMembership,
withLocalMedia, createLocalMedia,
withRemoteMedia, createRemoteMedia,
withTestScheduler, withTestScheduler,
} from "../utils/test"; } from "../utils/test";
import { getValue } from "../utils/observable"; import { getValue } from "../utils/observable";
@@ -42,92 +42,89 @@ vi.mock("../Platform", () => ({
const rtcMembership = mockRtcMembership("@alice:example.org", "AAAA"); const rtcMembership = mockRtcMembership("@alice:example.org", "AAAA");
test("control a participant's volume", async () => { test("control a participant's volume", () => {
const setVolumeSpy = vi.fn(); const setVolumeSpy = vi.fn();
await withRemoteMedia(rtcMembership, {}, { setVolume: setVolumeSpy }, (vm) => const vm = createRemoteMedia(rtcMembership, {}, { setVolume: setVolumeSpy });
withTestScheduler(({ expectObservable, schedule }) => { withTestScheduler(({ expectObservable, schedule }) => {
schedule("-ab---c---d|", { schedule("-ab---c---d|", {
a() { a() {
// Try muting by toggling // Try muting by toggling
vm.toggleLocallyMuted(); vm.toggleLocallyMuted();
expect(setVolumeSpy).toHaveBeenLastCalledWith(0); expect(setVolumeSpy).toHaveBeenLastCalledWith(0);
}, },
b() { b() {
// Try unmuting by dragging the slider back up // Try unmuting by dragging the slider back up
vm.setLocalVolume(0.6); vm.setLocalVolume(0.6);
vm.setLocalVolume(0.8); vm.setLocalVolume(0.8);
vm.commitLocalVolume(); vm.commitLocalVolume();
expect(setVolumeSpy).toHaveBeenCalledWith(0.6); expect(setVolumeSpy).toHaveBeenCalledWith(0.6);
expect(setVolumeSpy).toHaveBeenLastCalledWith(0.8); expect(setVolumeSpy).toHaveBeenLastCalledWith(0.8);
}, },
c() { c() {
// Try muting by dragging the slider back down // Try muting by dragging the slider back down
vm.setLocalVolume(0.2); vm.setLocalVolume(0.2);
vm.setLocalVolume(0); vm.setLocalVolume(0);
vm.commitLocalVolume(); vm.commitLocalVolume();
expect(setVolumeSpy).toHaveBeenCalledWith(0.2); expect(setVolumeSpy).toHaveBeenCalledWith(0.2);
expect(setVolumeSpy).toHaveBeenLastCalledWith(0); expect(setVolumeSpy).toHaveBeenLastCalledWith(0);
}, },
d() { d() {
// Try unmuting by toggling // Try unmuting by toggling
vm.toggleLocallyMuted(); vm.toggleLocallyMuted();
// The volume should return to the last non-zero committed volume // The volume should return to the last non-zero committed volume
expect(setVolumeSpy).toHaveBeenLastCalledWith(0.8); expect(setVolumeSpy).toHaveBeenLastCalledWith(0.8);
}, },
}); });
expectObservable(vm.localVolume$).toBe("ab(cd)(ef)g", { expectObservable(vm.localVolume$).toBe("ab(cd)(ef)g", {
a: 1, a: 1,
b: 0, b: 0,
c: 0.6, c: 0.6,
d: 0.8, d: 0.8,
e: 0.2, e: 0.2,
f: 0, f: 0,
g: 0.8, g: 0.8,
}); });
}), });
);
}); });
test("toggle fit/contain for a participant's video", async () => { test("toggle fit/contain for a participant's video", () => {
await withRemoteMedia(rtcMembership, {}, {}, (vm) => const vm = createRemoteMedia(rtcMembership, {}, {});
withTestScheduler(({ expectObservable, schedule }) => { withTestScheduler(({ expectObservable, schedule }) => {
schedule("-ab|", { schedule("-ab|", {
a: () => vm.toggleFitContain(), a: () => vm.toggleFitContain(),
b: () => vm.toggleFitContain(), b: () => vm.toggleFitContain(),
}); });
expectObservable(vm.cropVideo$).toBe("abc", { expectObservable(vm.cropVideo$).toBe("abc", {
a: true, a: true,
b: false, b: false,
c: true, c: true,
}); });
}), });
);
}); });
test("local media remembers whether it should always be shown", async () => { test("local media remembers whether it should always be shown", () => {
await withLocalMedia( const vm1 = createLocalMedia(
rtcMembership, rtcMembership,
{}, {},
mockLocalParticipant({}), mockLocalParticipant({}),
mockMediaDevices({}), mockMediaDevices({}),
(vm) =>
withTestScheduler(({ expectObservable, schedule }) => {
schedule("-a|", { a: () => vm.setAlwaysShow(false) });
expectObservable(vm.alwaysShow$).toBe("ab", { a: true, b: false });
}),
); );
withTestScheduler(({ expectObservable, schedule }) => {
schedule("-a|", { a: () => vm1.setAlwaysShow(false) });
expectObservable(vm1.alwaysShow$).toBe("ab", { a: true, b: false });
});
// Next local media should start out *not* always shown // Next local media should start out *not* always shown
await withLocalMedia( const vm2 = createLocalMedia(
rtcMembership, rtcMembership,
{}, {},
mockLocalParticipant({}), mockLocalParticipant({}),
mockMediaDevices({}), mockMediaDevices({}),
(vm) =>
withTestScheduler(({ expectObservable, schedule }) => {
schedule("-a|", { a: () => vm.setAlwaysShow(true) });
expectObservable(vm.alwaysShow$).toBe("ab", { a: false, b: true });
}),
); );
withTestScheduler(({ expectObservable, schedule }) => {
schedule("-a|", { a: () => vm2.setAlwaysShow(true) });
expectObservable(vm2.alwaysShow$).toBe("ab", { a: false, b: true });
});
}); });
test("switch cameras", async () => { test("switch cameras", async () => {
@@ -164,7 +161,7 @@ test("switch cameras", async () => {
const selectVideoInput = vi.fn(); const selectVideoInput = vi.fn();
await withLocalMedia( const vm = createLocalMedia(
rtcMembership, rtcMembership,
{}, {},
mockLocalParticipant({ mockLocalParticipant({
@@ -179,27 +176,26 @@ test("switch cameras", async () => {
select: selectVideoInput, select: selectVideoInput,
}, },
}), }),
async (vm) => {
// Switch to back camera
getValue(vm.switchCamera$)!();
expect(restartTrack).toHaveBeenCalledExactlyOnceWith({
facingMode: "environment",
});
await waitFor(() => {
expect(selectVideoInput).toHaveBeenCalledTimes(1);
expect(selectVideoInput).toHaveBeenCalledWith("back camera");
});
expect(deviceId).toBe("back camera");
// Switch to front camera
getValue(vm.switchCamera$)!();
expect(restartTrack).toHaveBeenCalledTimes(2);
expect(restartTrack).toHaveBeenLastCalledWith({ facingMode: "user" });
await waitFor(() => {
expect(selectVideoInput).toHaveBeenCalledTimes(2);
expect(selectVideoInput).toHaveBeenLastCalledWith("front camera");
});
expect(deviceId).toBe("front camera");
},
); );
// Switch to back camera
getValue(vm.switchCamera$)!();
expect(restartTrack).toHaveBeenCalledExactlyOnceWith({
facingMode: "environment",
});
await waitFor(() => {
expect(selectVideoInput).toHaveBeenCalledTimes(1);
expect(selectVideoInput).toHaveBeenCalledWith("back camera");
});
expect(deviceId).toBe("back camera");
// Switch to front camera
getValue(vm.switchCamera$)!();
expect(restartTrack).toHaveBeenCalledTimes(2);
expect(restartTrack).toHaveBeenLastCalledWith({ facingMode: "user" });
await waitFor(() => {
expect(selectVideoInput).toHaveBeenCalledTimes(2);
expect(selectVideoInput).toHaveBeenLastCalledWith("front camera");
});
expect(deviceId).toBe("front camera");
}); });

View File

@@ -46,7 +46,6 @@ import {
throttleTime, throttleTime,
} from "rxjs"; } from "rxjs";
import { ViewModel } from "./ViewModel";
import { alwaysShowSelf } from "../settings/settings"; import { alwaysShowSelf } from "../settings/settings";
import { showConnectionStats } from "../settings/settings"; import { showConnectionStats } from "../settings/settings";
import { accumulate } from "../utils/observable"; import { accumulate } from "../utils/observable";
@@ -56,6 +55,7 @@ import { type ReactionOption } from "../reactions";
import { platform } from "../Platform"; import { platform } from "../Platform";
import { type MediaDevices } from "./MediaDevices"; import { type MediaDevices } from "./MediaDevices";
import { type Behavior } from "./Behavior"; import { type Behavior } from "./Behavior";
import { type ObservableScope } from "./ObservableScope";
export function observeTrackReference$( export function observeTrackReference$(
participant: Participant, participant: Participant,
@@ -216,7 +216,7 @@ export enum EncryptionStatus {
PasswordInvalid, PasswordInvalid,
} }
abstract class BaseMediaViewModel extends ViewModel { abstract class BaseMediaViewModel {
/** /**
* The LiveKit video track for this media. * The LiveKit video track for this media.
*/ */
@@ -246,6 +246,7 @@ abstract class BaseMediaViewModel extends ViewModel {
} }
public constructor( public constructor(
protected readonly scope: ObservableScope,
/** /**
* An opaque identifier for this media. * An opaque identifier for this media.
*/ */
@@ -269,8 +270,6 @@ abstract class BaseMediaViewModel extends ViewModel {
public readonly focusURL: string, public readonly focusURL: string,
public readonly displayName$: Behavior<string>, public readonly displayName$: Behavior<string>,
) { ) {
super();
const audio$ = this.observeTrackReference$(audioSource); const audio$ = this.observeTrackReference$(audioSource);
this.video$ = this.observeTrackReference$(videoSource); this.video$ = this.observeTrackReference$(videoSource);
@@ -403,6 +402,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel {
public readonly cropVideo$: Behavior<boolean> = this._cropVideo$; public readonly cropVideo$: Behavior<boolean> = this._cropVideo$;
public constructor( public constructor(
scope: ObservableScope,
id: string, id: string,
member: RoomMember, member: RoomMember,
participant$: Observable<LocalParticipant | RemoteParticipant | undefined>, participant$: Observable<LocalParticipant | RemoteParticipant | undefined>,
@@ -414,6 +414,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel {
public readonly reaction$: Behavior<ReactionOption | null>, public readonly reaction$: Behavior<ReactionOption | null>,
) { ) {
super( super(
scope,
id, id,
member, member,
participant$, participant$,
@@ -537,6 +538,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel {
); );
public constructor( public constructor(
scope: ObservableScope,
id: string, id: string,
member: RoomMember, member: RoomMember,
participant$: Behavior<LocalParticipant | undefined>, participant$: Behavior<LocalParticipant | undefined>,
@@ -549,6 +551,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel {
reaction$: Behavior<ReactionOption | null>, reaction$: Behavior<ReactionOption | null>,
) { ) {
super( super(
scope,
id, id,
member, member,
participant$, participant$,
@@ -645,6 +648,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel {
); );
public constructor( public constructor(
scope: ObservableScope,
id: string, id: string,
member: RoomMember, member: RoomMember,
participant$: Observable<RemoteParticipant | undefined>, participant$: Observable<RemoteParticipant | undefined>,
@@ -657,6 +661,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel {
reaction$: Behavior<ReactionOption | null>, reaction$: Behavior<ReactionOption | null>,
) { ) {
super( super(
scope,
id, id,
member, member,
participant$, participant$,
@@ -742,6 +747,7 @@ export class ScreenShareViewModel extends BaseMediaViewModel {
); );
public constructor( public constructor(
scope: ObservableScope,
id: string, id: string,
member: RoomMember, member: RoomMember,
participant$: Observable<LocalParticipant | RemoteParticipant>, participant$: Observable<LocalParticipant | RemoteParticipant>,
@@ -753,6 +759,7 @@ export class ScreenShareViewModel extends BaseMediaViewModel {
public readonly local: boolean, public readonly local: boolean,
) { ) {
super( super(
scope,
id, id,
member, member,
participant$, participant$,

View File

@@ -58,7 +58,7 @@ export class PublishConnection extends Connection {
trackerProcessorState$: Behavior<ProcessorState>, trackerProcessorState$: Behavior<ProcessorState>,
) { ) {
const { scope } = args; const { scope } = args;
logger.info("[LivekitRoom] Create LiveKit room"); logger.info("[PublishConnection] Create LiveKit room");
const { controlledAudioDevices } = getUrlParams(); const { controlledAudioDevices } = getUrlParams();
const factory = const factory =

View File

@@ -11,7 +11,7 @@ import {
type Room as LivekitRoom, type Room as LivekitRoom,
} from "livekit-client"; } from "livekit-client";
import { ObservableScope } from "./ObservableScope.ts"; import { type ObservableScope } from "./ObservableScope.ts";
import { ScreenShareViewModel } from "./MediaViewModel.ts"; import { ScreenShareViewModel } from "./MediaViewModel.ts";
import type { RoomMember } from "matrix-js-sdk"; import type { RoomMember } from "matrix-js-sdk";
import type { EncryptionSystem } from "../e2ee/sharedKeyManagement.ts"; import type { EncryptionSystem } from "../e2ee/sharedKeyManagement.ts";
@@ -23,10 +23,10 @@ import type { Behavior } from "./Behavior.ts";
* ObservableScope for behaviors that the view model depends on. * ObservableScope for behaviors that the view model depends on.
*/ */
export class ScreenShare { export class ScreenShare {
private readonly scope = new ObservableScope();
public readonly vm: ScreenShareViewModel; public readonly vm: ScreenShareViewModel;
public constructor( public constructor(
private readonly scope: ObservableScope,
id: string, id: string,
member: RoomMember, member: RoomMember,
participant: LocalParticipant | RemoteParticipant, participant: LocalParticipant | RemoteParticipant,
@@ -37,6 +37,7 @@ export class ScreenShare {
displayName$: Observable<string>, displayName$: Observable<string>,
) { ) {
this.vm = new ScreenShareViewModel( this.vm = new ScreenShareViewModel(
this.scope,
id, id,
member, member,
of(participant), of(participant),
@@ -48,9 +49,4 @@ export class ScreenShare {
participant.isLocal, participant.isLocal,
); );
} }
public destroy(): void {
this.scope.end();
this.vm.destroy();
}
} }

View File

@@ -44,10 +44,6 @@ class SpotlightTileData {
this.maximised$ = new BehaviorSubject(maximised); this.maximised$ = new BehaviorSubject(maximised);
this.vm = new SpotlightTileViewModel(this.media$, this.maximised$); this.vm = new SpotlightTileViewModel(this.media$, this.maximised$);
} }
public destroy(): void {
this.vm.destroy();
}
} }
class GridTileData { class GridTileData {
@@ -65,14 +61,10 @@ class GridTileData {
this.media$ = new BehaviorSubject(media); this.media$ = new BehaviorSubject(media);
this.vm = new GridTileViewModel(this.media$); this.vm = new GridTileViewModel(this.media$);
} }
public destroy(): void {
this.vm.destroy();
}
} }
/** /**
* A collection of tiles to be mapped to a layout. * An immutable collection of tiles to be mapped to a layout.
*/ */
export class TileStore { export class TileStore {
private constructor( private constructor(
@@ -288,13 +280,6 @@ export class TileStoreBuilder {
); );
} }
// Destroy unused tiles
if (this.spotlight === null && this.prevSpotlight !== null)
this.prevSpotlight.destroy();
const gridEntries = new Set(grid);
for (const entry of this.prevGrid)
if (!gridEntries.has(entry)) entry.destroy();
return this.construct(this.spotlight, grid); return this.construct(this.spotlight, grid);
} }
} }

View File

@@ -5,7 +5,6 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details. Please see LICENSE in the repository root for full details.
*/ */
import { ViewModel } from "./ViewModel";
import { type MediaViewModel, type UserMediaViewModel } from "./MediaViewModel"; import { type MediaViewModel, type UserMediaViewModel } from "./MediaViewModel";
import { type Behavior } from "./Behavior"; import { type Behavior } from "./Behavior";
@@ -14,21 +13,17 @@ function createId(): string {
return (nextId++).toString(); return (nextId++).toString();
} }
export class GridTileViewModel extends ViewModel { export class GridTileViewModel {
public readonly id = createId(); public readonly id = createId();
public constructor(public readonly media$: Behavior<UserMediaViewModel>) { public constructor(public readonly media$: Behavior<UserMediaViewModel>) {}
super();
}
} }
export class SpotlightTileViewModel extends ViewModel { export class SpotlightTileViewModel {
public constructor( public constructor(
public readonly media$: Behavior<MediaViewModel[]>, public readonly media$: Behavior<MediaViewModel[]>,
public readonly maximised$: Behavior<boolean>, public readonly maximised$: Behavior<boolean>,
) { ) {}
super();
}
} }
export type TileViewModel = GridTileViewModel | SpotlightTileViewModel; export type TileViewModel = GridTileViewModel | SpotlightTileViewModel;

View File

@@ -22,7 +22,7 @@ import {
} from "livekit-client"; } from "livekit-client";
import { observeParticipantEvents } from "@livekit/components-core"; import { observeParticipantEvents } from "@livekit/components-core";
import { ObservableScope } from "./ObservableScope.ts"; import { type ObservableScope } from "./ObservableScope.ts";
import { import {
LocalUserMediaViewModel, LocalUserMediaViewModel,
RemoteUserMediaViewModel, RemoteUserMediaViewModel,
@@ -75,11 +75,11 @@ enum SortingBin {
* for inclusion in the call layout. * for inclusion in the call layout.
*/ */
export class UserMedia { export class UserMedia {
private readonly scope = new ObservableScope();
private readonly participant$ = new BehaviorSubject(this.initialParticipant); private readonly participant$ = new BehaviorSubject(this.initialParticipant);
public readonly vm: UserMediaViewModel = this.participant$.value?.isLocal public readonly vm: UserMediaViewModel = this.participant$.value?.isLocal
? new LocalUserMediaViewModel( ? new LocalUserMediaViewModel(
this.scope,
this.id, this.id,
this.member, this.member,
this.participant$ as Behavior<LocalParticipant>, this.participant$ as Behavior<LocalParticipant>,
@@ -92,6 +92,7 @@ export class UserMedia {
this.scope.behavior(this.reaction$), this.scope.behavior(this.reaction$),
) )
: new RemoteUserMediaViewModel( : new RemoteUserMediaViewModel(
this.scope,
this.id, this.id,
this.member, this.member,
this.participant$ as Observable<RemoteParticipant | undefined>, this.participant$ as Observable<RemoteParticipant | undefined>,
@@ -144,6 +145,7 @@ export class UserMedia {
); );
public constructor( public constructor(
private readonly scope: ObservableScope,
public readonly id: string, public readonly id: string,
private readonly member: RoomMember, private readonly member: RoomMember,
private readonly initialParticipant: private readonly initialParticipant:
@@ -168,11 +170,6 @@ export class UserMedia {
this.participant$.next(newParticipant); this.participant$.next(newParticipant);
} }
} }
public destroy(): void {
this.scope.end();
this.vm.destroy();
}
} }
export function sharingScreen$(p: Participant): Observable<boolean> { export function sharingScreen$(p: Participant): Observable<boolean> {

View File

@@ -1,23 +0,0 @@
/*
Copyright 2023, 2024 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { ObservableScope } from "./ObservableScope";
/**
* An MVVM view model.
*/
export abstract class ViewModel {
protected readonly scope = new ObservableScope();
/**
* Instructs the ViewModel to clean up its resources. If you forget to call
* this, there may be memory leaks!
*/
public destroy(): void {
this.scope.end();
}
}

View File

@@ -12,7 +12,7 @@ import { axe } from "vitest-axe";
import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc";
import { GridTile } from "./GridTile"; import { GridTile } from "./GridTile";
import { mockRtcMembership, withRemoteMedia } from "../utils/test"; import { mockRtcMembership, createRemoteMedia } from "../utils/test";
import { GridTileViewModel } from "../state/TileViewModel"; import { GridTileViewModel } from "../state/TileViewModel";
import { ReactionsSenderProvider } from "../reactions/useReactionsSender"; import { ReactionsSenderProvider } from "../reactions/useReactionsSender";
import type { CallViewModel } from "../state/CallViewModel"; import type { CallViewModel } from "../state/CallViewModel";
@@ -25,7 +25,7 @@ global.IntersectionObserver = class MockIntersectionObserver {
} as unknown as typeof IntersectionObserver; } as unknown as typeof IntersectionObserver;
test("GridTile is accessible", async () => { test("GridTile is accessible", async () => {
await withRemoteMedia( const vm = createRemoteMedia(
mockRtcMembership("@alice:example.org", "AAAA"), mockRtcMembership("@alice:example.org", "AAAA"),
{ {
rawDisplayName: "Alice", rawDisplayName: "Alice",
@@ -36,41 +36,40 @@ test("GridTile is accessible", async () => {
getTrackPublication: () => getTrackPublication: () =>
({}) as Partial<RemoteTrackPublication> as RemoteTrackPublication, ({}) as Partial<RemoteTrackPublication> as RemoteTrackPublication,
}, },
async (vm) => { );
const fakeRtcSession = {
const fakeRtcSession = {
on: () => {},
off: () => {},
room: {
on: () => {},
off: () => {},
client: {
getUserId: () => null,
getDeviceId: () => null,
on: () => {}, on: () => {},
off: () => {}, off: () => {},
room: { },
on: () => {},
off: () => {},
client: {
getUserId: () => null,
getDeviceId: () => null,
on: () => {},
off: () => {},
},
},
memberships: [],
} as unknown as MatrixRTCSession;
const cVm = {
reactions$: constant({}),
handsRaised$: constant({}),
} as Partial<CallViewModel> as CallViewModel;
const { container } = render(
<ReactionsSenderProvider vm={cVm} rtcSession={fakeRtcSession}>
<GridTile
vm={new GridTileViewModel(constant(vm))}
onOpenProfile={() => {}}
targetWidth={300}
targetHeight={200}
showSpeakingIndicators
focusable={true}
/>
</ReactionsSenderProvider>,
);
expect(await axe(container)).toHaveNoViolations();
// Name should be visible
screen.getByText("Alice");
}, },
memberships: [],
} as unknown as MatrixRTCSession;
const cVm = {
reactions$: constant({}),
handsRaised$: constant({}),
} as Partial<CallViewModel> as CallViewModel;
const { container } = render(
<ReactionsSenderProvider vm={cVm} rtcSession={fakeRtcSession}>
<GridTile
vm={new GridTileViewModel(constant(vm))}
onOpenProfile={() => {}}
targetWidth={300}
targetHeight={200}
showSpeakingIndicators
focusable={true}
/>
</ReactionsSenderProvider>,
); );
expect(await axe(container)).toHaveNoViolations();
// Name should be visible
screen.getByText("Alice");
}); });

View File

@@ -15,8 +15,8 @@ import {
mockLocalParticipant, mockLocalParticipant,
mockMediaDevices, mockMediaDevices,
mockRtcMembership, mockRtcMembership,
withLocalMedia, createLocalMedia,
withRemoteMedia, createRemoteMedia,
} from "../utils/test"; } from "../utils/test";
import { SpotlightTileViewModel } from "../state/TileViewModel"; import { SpotlightTileViewModel } from "../state/TileViewModel";
import { constant } from "../state/Behavior"; import { constant } from "../state/Behavior";
@@ -27,62 +27,53 @@ global.IntersectionObserver = class MockIntersectionObserver {
} as unknown as typeof IntersectionObserver; } as unknown as typeof IntersectionObserver;
test("SpotlightTile is accessible", async () => { test("SpotlightTile is accessible", async () => {
await withRemoteMedia( const vm1 = createRemoteMedia(
mockRtcMembership("@alice:example.org", "AAAA"), mockRtcMembership("@alice:example.org", "AAAA"),
{ {
rawDisplayName: "Alice", rawDisplayName: "Alice",
getMxcAvatarUrl: () => "mxc://adfsg", getMxcAvatarUrl: () => "mxc://adfsg",
}, },
{}, {},
async (vm1) => {
await withLocalMedia(
mockRtcMembership("@bob:example.org", "BBBB"),
{
rawDisplayName: "Bob",
getMxcAvatarUrl: () => "mxc://dlskf",
},
mockLocalParticipant({}),
mockMediaDevices({}),
async (vm2) => {
const user = userEvent.setup();
const toggleExpanded = vi.fn();
const { container } = render(
<SpotlightTile
vm={
new SpotlightTileViewModel(
constant([vm1, vm2]),
constant(false),
)
}
targetWidth={300}
targetHeight={200}
expanded={false}
onToggleExpanded={toggleExpanded}
showIndicators
focusable={true}
/>,
);
expect(await axe(container)).toHaveNoViolations();
// Alice should be in the spotlight, with her name and avatar on the
// first page
screen.getByText("Alice");
const aliceAvatar = screen.getByRole("img");
expect(screen.queryByRole("button", { name: "common.back" })).toBe(
null,
);
// Bob should be out of the spotlight, and therefore invisible
expect(isInaccessible(screen.getByText("Bob"))).toBe(true);
// Now navigate to Bob
await user.click(screen.getByRole("button", { name: "Next" }));
screen.getByText("Bob");
expect(screen.getByRole("img")).not.toBe(aliceAvatar);
expect(isInaccessible(screen.getByText("Alice"))).toBe(true);
// Can toggle whether the tile is expanded
await user.click(screen.getByRole("button", { name: "Expand" }));
expect(toggleExpanded).toHaveBeenCalled();
},
);
},
); );
const vm2 = createLocalMedia(
mockRtcMembership("@bob:example.org", "BBBB"),
{
rawDisplayName: "Bob",
getMxcAvatarUrl: () => "mxc://dlskf",
},
mockLocalParticipant({}),
mockMediaDevices({}),
);
const user = userEvent.setup();
const toggleExpanded = vi.fn();
const { container } = render(
<SpotlightTile
vm={new SpotlightTileViewModel(constant([vm1, vm2]), constant(false))}
targetWidth={300}
targetHeight={200}
expanded={false}
onToggleExpanded={toggleExpanded}
showIndicators
focusable={true}
/>,
);
expect(await axe(container)).toHaveNoViolations();
// Alice should be in the spotlight, with her name and avatar on the
// first page
screen.getByText("Alice");
const aliceAvatar = screen.getByRole("img");
expect(screen.queryByRole("button", { name: "common.back" })).toBe(null);
// Bob should be out of the spotlight, and therefore invisible
expect(isInaccessible(screen.getByText("Bob"))).toBe(true);
// Now navigate to Bob
await user.click(screen.getByRole("button", { name: "Next" }));
screen.getByText("Bob");
expect(screen.getByRole("img")).not.toBe(aliceAvatar);
expect(isInaccessible(screen.getByText("Alice"))).toBe(true);
// Can toggle whether the tile is expanded
await user.click(screen.getByRole("button", { name: "Expand" }));
expect(toggleExpanded).toHaveBeenCalled();
}); });

View File

@@ -6,9 +6,10 @@ Please see LICENSE in the repository root for full details.
*/ */
import { test } from "vitest"; import { test } from "vitest";
import { Subject } from "rxjs";
import { withTestScheduler } from "./test"; import { withTestScheduler } from "./test";
import { pauseWhen } from "./observable"; import { generateKeyed$, pauseWhen } from "./observable";
test("pauseWhen", () => { test("pauseWhen", () => {
withTestScheduler(({ behavior, expectObservable }) => { withTestScheduler(({ behavior, expectObservable }) => {
@@ -22,3 +23,43 @@ test("pauseWhen", () => {
).toBe(outputMarbles); ).toBe(outputMarbles);
}); });
}); });
test("generateKeyed$ has the right output and ends scopes at the right times", () => {
const scope1$ = new Subject<string>();
const scope2$ = new Subject<string>();
const scope3$ = new Subject<string>();
const scope4$ = new Subject<string>();
const scopeSubjects = [scope1$, scope2$, scope3$, scope4$];
withTestScheduler(({ hot, expectObservable }) => {
// Each scope should start when the input number reaches or surpasses their
// number and end when the input number drops back below their number.
// At the very end, unsubscribing should end all remaining scopes.
const inputMarbles = " 123242";
const outputMarbles = " abcbdb";
const subscriptionMarbles = "^-----!";
const scope1Marbles = " y-----n";
const scope2Marbles = " -y----n";
const scope3Marbles = " --ynyn";
const scope4Marbles = " ----yn";
expectObservable(
generateKeyed$(hot<string>(inputMarbles), (input, createOrGet) => {
for (let i = 1; i <= +input; i++) {
createOrGet(i.toString(), (scope) => {
scopeSubjects[i - 1].next("y");
scope.onEnd(() => scopeSubjects[i - 1].next("n"));
return i.toString();
});
}
return "abcd"[+input - 1];
}),
subscriptionMarbles,
).toBe(outputMarbles);
expectObservable(scope1$).toBe(scope1Marbles);
expectObservable(scope2$).toBe(scope2Marbles);
expectObservable(scope3$).toBe(scope3Marbles);
expectObservable(scope4$).toBe(scope4Marbles);
});
});

View File

@@ -23,6 +23,7 @@ import {
} from "rxjs"; } from "rxjs";
import { type Behavior } from "../state/Behavior"; import { type Behavior } from "../state/Behavior";
import { ObservableScope } from "../state/ObservableScope";
const nothing = Symbol("nothing"); const nothing = Symbol("nothing");
@@ -117,3 +118,71 @@ export function pauseWhen<T>(pause$: Behavior<boolean>) {
map(([value]) => value), map(([value]) => value),
); );
} }
/**
* Maps a changing input value to an output value consisting of items that have
* automatically generated ObservableScopes tied to a key. Items will be
* automatically created when their key is requested for the first time, reused
* when the same key is requested at a later time, and destroyed (have their
* scope ended) when the key is no longer requested.
*
* @param input$ The input value to be mapped.
* @param project A function mapping input values to output values. This
* function receives an additional callback `createOrGet` which can be used
* within the function body to request that an item be generated for a certain
* key. The caller provides a factory which will be used to create the item if
* it is being requested for the first time. Otherwise, the item previously
* existing under that key will be returned.
*/
export function generateKeyed$<In, Item, Out>(
input$: Observable<In>,
project: (
input: In,
createOrGet: (
key: string,
factory: (scope: ObservableScope) => Item,
) => Item,
) => Out,
): Observable<Out> {
return input$.pipe(
// Keep track of the existing items over time, so we can reuse them
scan<
In,
{
items: Map<string, { item: Item; scope: ObservableScope }>;
output: Out;
},
{ items: Map<string, { item: Item; scope: ObservableScope }> }
>(
(state, data) => {
const nextItems = new Map<
string,
{ item: Item; scope: ObservableScope }
>();
const output = project(data, (key, factory) => {
let item = state.items.get(key);
if (item === undefined) {
// First time requesting the key; create the item
const scope = new ObservableScope();
item = { item: factory(scope), scope };
}
nextItems.set(key, item);
return item.item;
});
// Destroy all items that are no longer being requested
for (const [key, { scope }] of state.items)
if (!nextItems.has(key)) scope.end();
return { items: nextItems, output };
},
{ items: new Map() },
),
finalizeValue((state) => {
// Destroy all remaining items when no longer subscribed
for (const { scope } of state.items.values()) scope.end();
}),
map(({ output }) => output),
);
}

View File

@@ -27,6 +27,7 @@ import {
mockMediaDevices, mockMediaDevices,
mockMuteStates, mockMuteStates,
MockRTCSession, MockRTCSession,
testScope,
} from "./test"; } from "./test";
import { aliceRtcMember, localRtcMember } from "./test-fixtures"; import { aliceRtcMember, localRtcMember } from "./test-fixtures";
import { type RaisedHandInfo, type ReactionInfo } from "../reactions"; import { type RaisedHandInfo, type ReactionInfo } from "../reactions";
@@ -134,6 +135,7 @@ export function getBasicCallViewModelEnvironment(
// const remoteParticipants$ = of([aliceParticipant]); // const remoteParticipants$ = of([aliceParticipant]);
const vm = new CallViewModel( const vm = new CallViewModel(
testScope(),
rtcSession.asMockedSession(), rtcSession.asMockedSession(),
matrixRoom, matrixRoom,
mockMediaDevices({}), mockMediaDevices({}),

View File

@@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details.
*/ */
import { map, type Observable, of, type SchedulerLike } from "rxjs"; import { map, type Observable, of, type SchedulerLike } from "rxjs";
import { type RunHelpers, TestScheduler } from "rxjs/testing"; import { type RunHelpers, TestScheduler } from "rxjs/testing";
import { expect, type MockedObject, vi, vitest } from "vitest"; import { expect, type MockedObject, onTestFinished, vi, vitest } from "vitest";
import { import {
type RoomMember, type RoomMember,
type Room as MatrixRoom, type Room as MatrixRoom,
@@ -89,6 +89,15 @@ interface TestRunnerGlobal {
rxjsTestScheduler?: SchedulerLike; rxjsTestScheduler?: SchedulerLike;
} }
/**
* Create a new ObservableScope which ends when the current test ends.
*/
export function testScope(): ObservableScope {
const scope = new ObservableScope();
onTestFinished(() => scope.end());
return scope;
}
/** /**
* Run Observables with a scheduler that virtualizes time, for testing purposes. * Run Observables with a scheduler that virtualizes time, for testing purposes.
*/ */
@@ -259,14 +268,14 @@ export function mockLocalParticipant(
} as Partial<LocalParticipant> as LocalParticipant; } as Partial<LocalParticipant> as LocalParticipant;
} }
export async function withLocalMedia( export function createLocalMedia(
localRtcMember: CallMembership, localRtcMember: CallMembership,
roomMember: Partial<RoomMember>, roomMember: Partial<RoomMember>,
localParticipant: LocalParticipant, localParticipant: LocalParticipant,
mediaDevices: MediaDevices, mediaDevices: MediaDevices,
continuation: (vm: LocalUserMediaViewModel) => void | Promise<void>, ): LocalUserMediaViewModel {
): Promise<void> { return new LocalUserMediaViewModel(
const vm = new LocalUserMediaViewModel( testScope(),
"local", "local",
mockMatrixRoomMember(localRtcMember, roomMember), mockMatrixRoomMember(localRtcMember, roomMember),
constant(localParticipant), constant(localParticipant),
@@ -280,11 +289,6 @@ export async function withLocalMedia(
constant(null), constant(null),
constant(null), constant(null),
); );
try {
await continuation(vm);
} finally {
vm.destroy();
}
} }
export function mockRemoteParticipant( export function mockRemoteParticipant(
@@ -300,14 +304,14 @@ export function mockRemoteParticipant(
} as RemoteParticipant; } as RemoteParticipant;
} }
export async function withRemoteMedia( export function createRemoteMedia(
localRtcMember: CallMembership, localRtcMember: CallMembership,
roomMember: Partial<RoomMember>, roomMember: Partial<RoomMember>,
participant: Partial<RemoteParticipant>, participant: Partial<RemoteParticipant>,
continuation: (vm: RemoteUserMediaViewModel) => void | Promise<void>, ): RemoteUserMediaViewModel {
): Promise<void> {
const remoteParticipant = mockRemoteParticipant(participant); const remoteParticipant = mockRemoteParticipant(participant);
const vm = new RemoteUserMediaViewModel( return new RemoteUserMediaViewModel(
testScope(),
"remote", "remote",
mockMatrixRoomMember(localRtcMember, roomMember), mockMatrixRoomMember(localRtcMember, roomMember),
of(remoteParticipant), of(remoteParticipant),
@@ -321,11 +325,6 @@ export async function withRemoteMedia(
constant(null), constant(null),
constant(null), constant(null),
); );
try {
await continuation(vm);
} finally {
vm.destroy();
}
} }
export function mockConfig(config: Partial<ResolvedConfigOptions> = {}): void { export function mockConfig(config: Partial<ResolvedConfigOptions> = {}): void {