/* Copyright 2025 Element Creations Ltd. SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import { logger } from "matrix-js-sdk/lib/logger"; import { BehaviorSubject } from "rxjs"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type Participant as LivekitParticipant } from "livekit-client"; import { ObservableScope } from "../ObservableScope.ts"; import { ConnectionManager, type ConnectionManagerData, } from "./ConnectionManager.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts"; import { type Connection } from "./Connection.ts"; import { areLivekitTransportsEqual } from "./matrixLivekitMerger.ts"; import { flushPromises } from "../../utils/test.ts"; // Some test constants const TRANSPORT_1: LivekitTransport = { type: "livekit", livekit_service_url: "https://lk.example.org", livekit_alias: "!alias:example.org", }; const TRANSPORT_2: LivekitTransport = { type: "livekit", livekit_service_url: "https://lk.sample.com", livekit_alias: "!alias:sample.com", }; const TRANSPORT_3: LivekitTransport = { type: "livekit", livekit_service_url: "https://lk-other.sample.com", livekit_alias: "!alias:sample.com", }; let testScope: ObservableScope; let fakeConnectionFactory: ConnectionFactory; let testTransportStream$: BehaviorSubject; // The connection manager under test let manager: ConnectionManager; beforeEach(() => { testScope = new ObservableScope(); fakeConnectionFactory = {} as unknown as ConnectionFactory; vi.mocked(fakeConnectionFactory).createConnection = vi .fn() .mockImplementation( (transport: LivekitTransport, scope: ObservableScope) => { const mockConnection = { transport, } as unknown as Connection; vi.mocked(mockConnection).start = vi.fn(); vi.mocked(mockConnection).stop = vi.fn(); // Tie the connection's lifecycle to the scope to test scope lifecycle management scope.onEnd(() => { void mockConnection.stop(); }); return mockConnection; }, ); testTransportStream$ = new BehaviorSubject([]); manager = new ConnectionManager(testScope, fakeConnectionFactory, logger); manager.registerTransports(testTransportStream$); }); afterEach(() => { testScope.end(); }); describe("connections$ stream", () => { test("Should create and start new connections for each transports", async () => { const managedConnections = Promise.withResolvers(); manager.connections$.subscribe((connections) => { if (connections.length > 0) managedConnections.resolve(connections); }); testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]); const connections = await managedConnections.promise; expect(connections.length).toBe(2); expect( vi.mocked(fakeConnectionFactory).createConnection, ).toHaveBeenCalledTimes(2); const conn1 = connections.find((c) => areLivekitTransportsEqual(c.transport, TRANSPORT_1), ); expect(conn1).toBeDefined(); expect(conn1!.start).toHaveBeenCalled(); const conn2 = connections.find((c) => areLivekitTransportsEqual(c.transport, TRANSPORT_2), ); expect(conn2).toBeDefined(); expect(conn2!.start).toHaveBeenCalled(); }); test("Should start connection only once", async () => { const observedConnections: Connection[][] = []; manager.connections$.subscribe((connections) => { observedConnections.push(connections); }); testTransportStream$.next([TRANSPORT_1]); testTransportStream$.next([TRANSPORT_1]); testTransportStream$.next([TRANSPORT_1]); testTransportStream$.next([TRANSPORT_1]); testTransportStream$.next([TRANSPORT_1]); testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]); await flushPromises(); const connections = observedConnections.pop()!; expect(connections.length).toBe(2); expect( vi.mocked(fakeConnectionFactory).createConnection, ).toHaveBeenCalledTimes(2); const conn2 = connections.find((c) => areLivekitTransportsEqual(c.transport, TRANSPORT_2), ); expect(conn2).toBeDefined(); const conn1 = connections.find((c) => areLivekitTransportsEqual(c.transport, TRANSPORT_1), ); expect(conn1).toBeDefined(); expect(conn1!.start).toHaveBeenCalledOnce(); }); test("Should cleanup connections when not needed anymore", async () => { const observedConnections: Connection[][] = []; manager.connections$.subscribe((connections) => { observedConnections.push(connections); }); testTransportStream$.next([TRANSPORT_1]); testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]); await flushPromises(); const conn2 = observedConnections .pop()! .find((c) => areLivekitTransportsEqual(c.transport, TRANSPORT_2))!; testTransportStream$.next([TRANSPORT_1]); await flushPromises(); // The second connection should have been stopped has it is no longer needed expect(conn2.stop).toHaveBeenCalled(); // The first connection should still be active const conn1 = observedConnections.pop()![0]; expect(conn1.stop).not.toHaveBeenCalledOnce(); }); }); describe("connectionManagerData$ stream", () => { // Used in test to control fake connections' participantsWithTrack$ streams let fakePublishingParticipantsStreams: Map< string, BehaviorSubject >; function keyForTransport(transport: LivekitTransport): string { return `${transport.livekit_service_url}|${transport.livekit_alias}`; } beforeEach(() => { fakePublishingParticipantsStreams = new Map(); // need a more advanced fake connection factory vi.mocked(fakeConnectionFactory).createConnection = vi .fn() .mockImplementation( (transport: LivekitTransport, scope: ObservableScope) => { const fakePublishingParticipants$ = new BehaviorSubject< LivekitParticipant[] >([]); const mockConnection = { transport, participantsWithTrack$: fakePublishingParticipants$, } as unknown as Connection; vi.mocked(mockConnection).start = vi.fn(); vi.mocked(mockConnection).stop = vi.fn(); // Tie the connection's lifecycle to the scope to test scope lifecycle management scope.onEnd(() => { void mockConnection.stop(); }); fakePublishingParticipantsStreams.set( keyForTransport(transport), fakePublishingParticipants$, ); return mockConnection; }, ); }); test("Should report connections with the publishing participants", async () => { const managerDataUpdates: ConnectionManagerData[] = []; manager.connectionManagerData$.subscribe((data) => { managerDataUpdates.push(data); }); testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]); await flushPromises(); const conn1Participants$ = fakePublishingParticipantsStreams.get( keyForTransport(TRANSPORT_1), )!; conn1Participants$.next([{ identity: "user1A" } as LivekitParticipant]); const conn2Participants$ = fakePublishingParticipantsStreams.get( keyForTransport(TRANSPORT_2), )!; conn2Participants$.next([{ identity: "user2A" } as LivekitParticipant]); conn1Participants$.next([ { identity: "user1A" } as LivekitParticipant, { identity: "user1B" } as LivekitParticipant, ]); testTransportStream$.next([TRANSPORT_1, TRANSPORT_2, TRANSPORT_3]); expect(managerDataUpdates[0].getConnections().length).toEqual(0); { const data = managerDataUpdates[1]; expect(data.getConnections().length).toEqual(2); expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(0); expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(0); } { const data = managerDataUpdates[2]; expect(data.getConnections().length).toEqual(2); expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(1); expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toEqual( "user1A", ); expect(data.getParticipantForTransport(TRANSPORT_2).length).toEqual(0); } { const data = managerDataUpdates[3]; expect(data.getConnections().length).toEqual(2); expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(1); expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toEqual( "user1A", ); expect(data.getParticipantForTransport(TRANSPORT_2).length).toEqual(1); expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toEqual( "user2A", ); } { const data = managerDataUpdates[4]; expect(data.getConnections().length).toEqual(2); expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(2); expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toEqual( "user1A", ); expect(data.getParticipantForTransport(TRANSPORT_1)[1].identity).toEqual( "user1B", ); expect(data.getParticipantForTransport(TRANSPORT_2).length).toEqual(1); expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toEqual( "user2A", ); } { const data = managerDataUpdates[5]; expect(data.getConnections().length).toEqual(3); expect(data.getParticipantForTransport(TRANSPORT_1).length).toEqual(2); expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toEqual( "user1A", ); expect(data.getParticipantForTransport(TRANSPORT_1)[1].identity).toEqual( "user1B", ); expect(data.getParticipantForTransport(TRANSPORT_2).length).toEqual(1); expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toEqual( "user2A", ); expect(data.getParticipantForTransport(TRANSPORT_3).length).toEqual(0); } }); });