diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index cac4322e..31a7e32d 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -1890,7 +1890,7 @@ export class CallViewModel extends ViewModel { this.startConnection$ .pipe(this.scope.bind()) .subscribe((c) => void c.start()); - this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => c.stop()); + this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => void c.stop()); combineLatest([this.localFocus, this.join$]) .pipe(this.scope.bind()) diff --git a/src/state/Connection.test.ts b/src/state/Connection.test.ts index 15c5d88e..8552ec24 100644 --- a/src/state/Connection.test.ts +++ b/src/state/Connection.test.ts @@ -28,9 +28,9 @@ describe("Start connection states", () => { let fakeRoomEventEmiter: EventEmitter; let fakeMembershipsFocusMap$: BehaviorSubject<{ membership: CallMembership; focus: LivekitFocus }[]>; - const livekitFocus : LivekitFocus = { - livekit_alias:"!roomID:example.org", - livekit_service_url : "https://matrix-rtc.example.org/livekit/jwt" + const livekitFocus: LivekitFocus = { + livekit_alias: "!roomID:example.org", + livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt" } afterEach(() => { @@ -98,8 +98,8 @@ describe("Start connection states", () => { .mockResolvedValue(undefined); const connection = new RemoteConnection( - opts, - undefined, + opts, + undefined, ); return connection; } @@ -115,8 +115,8 @@ describe("Start connection states", () => { livekitRoomFactory: () => fakeLivekitRoom, } const connection = new RemoteConnection( - opts, - undefined, + opts, + undefined, ); expect(connection.focusedConnectionState$.getValue().state) @@ -254,7 +254,7 @@ describe("Start connection states", () => { const deferredSFU = Promise.withResolvers(); // mock the /sfu/get call fetchMock.post(`${livekitFocus.livekit_service_url}/sfu/get`, - () => { + () => { return { status: 200, body: @@ -318,15 +318,13 @@ describe("Start connection states", () => { }); it("should relay livekit events once connected", async () => { - vi.useFakeTimers(); setupTest() const connection = setupRemoteConnection(); await connection.start(); - await vi.runAllTimersAsync(); - const capturedState: FocusConnectionState[] = []; + let capturedState: FocusConnectionState[] = []; connection.focusedConnectionState$.subscribe((value) => { capturedState.push(value); }); @@ -342,11 +340,8 @@ describe("Start connection states", () => { ] for (const state of states) { fakeRoomEventEmiter.emit(RoomEvent.ConnectionStateChanged, state); - await vi.runAllTimersAsync(); } - await vi.runAllTimersAsync(); - for (const state of states) { const s = capturedState.shift(); expect(s?.state).toEqual("ConnectedToLkRoom"); @@ -357,7 +352,47 @@ describe("Start connection states", () => { expect(s?.focus.livekit_service_url).toEqual(livekitFocus.livekit_service_url); } + // If the state is not ConnectedToLkRoom, no events should be relayed anymore + await connection.stop(); + capturedState = []; + for (const state of states) { + fakeRoomEventEmiter.emit(RoomEvent.ConnectionStateChanged, state); + } + + expect(capturedState.length).toEqual(0); + }); -}) + it("shutting down the scope should stop the connection", async () => { + setupTest() + vi.useFakeTimers(); + + const connection = setupRemoteConnection(); + + let capturedState: FocusConnectionState[] = []; + connection.focusedConnectionState$.subscribe((value) => { + capturedState.push(value); + }); + + await connection.start(); + + const stopSpy = vi.spyOn(connection, "stop"); + testScope.end(); + + + expect(stopSpy).toHaveBeenCalled(); + expect(fakeLivekitRoom.disconnect).toHaveBeenCalled(); + + /// Ensures that focusedConnectionState$ is bound to the scope. + capturedState = []; + // the subscription should be closed, and no new state should be received + // @ts-expect-error: Accessing private field for testing purposes + connection._focusedConnectionState$.next({ state: "Initialized" }); + // @ts-expect-error: Accessing private field for testing purposes + connection._focusedConnectionState$.next({ state: "ConnectingToLkRoom" }); + + expect(capturedState.length).toEqual(0); + }); + +}); diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 1e081b06..1b93b523 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -46,14 +46,14 @@ export type FocusConnectionState = export class Connection { // Private Behavior - private readonly _focusedConnectionState$ = new BehaviorSubject({ state: 'Initialized' }); + private readonly _focusedConnectionState$ + = new BehaviorSubject({ state: 'Initialized' }); /** * The current state of the connection to the focus server. */ - public get focusedConnectionState$(): Behavior { - return this._focusedConnectionState$; - } + public readonly focusedConnectionState$: Behavior; + /** * Whether the connection has been stopped. * @see Connection.stop @@ -103,9 +103,9 @@ export class Connection { * This will disconnect from the LiveKit room. * If the connection is already stopped, this is a no-op. */ - public stop(): void { + public async stop(): Promise { if (this.stopped) return; - void this.livekitRoom.disconnect(); + await this.livekitRoom.disconnect(); this._focusedConnectionState$.next({ state: 'Stopped', focus: this.targetFocus }); this.stopped = true; } @@ -142,6 +142,10 @@ export class Connection { this.targetFocus = focus; this.client = client; + this.focusedConnectionState$ = scope.behavior( + this._focusedConnectionState$, { state: 'Initialized' } + ); + const participantsIncludingSubscribers$ = scope.behavior( connectedParticipantsObserver(this.livekitRoom), [] @@ -180,7 +184,7 @@ export class Connection { } }); - scope.onEnd(() => this.stop()); + scope.onEnd(() => void this.stop()); } }