test: fixup ConnectionManager tests

This commit is contained in:
Valere
2025-11-08 13:02:12 +01:00
parent b8635b52d8
commit fc842d4be7
2 changed files with 196 additions and 167 deletions

View File

@@ -10,15 +10,16 @@ import { BehaviorSubject } from "rxjs";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type Participant as LivekitParticipant } from "livekit-client"; import { type Participant as LivekitParticipant } from "livekit-client";
import { ObservableScope } from "../../ObservableScope.ts"; import { Epoch, ObservableScope } from "../../ObservableScope.ts";
import { import {
type IConnectionManager,
createConnectionManager$, createConnectionManager$,
type ConnectionManagerData,
} from "./ConnectionManager.ts"; } from "./ConnectionManager.ts";
import { type ConnectionFactory } from "./ConnectionFactory.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts";
import { type Connection } from "./Connection.ts"; import { type Connection } from "./Connection.ts";
import { flushPromises, withTestScheduler } from "../../../utils/test.ts"; import { withTestScheduler } from "../../../utils/test.ts";
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts"; import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
import { type Behavior } from "../../Behavior.ts";
// Some test constants // Some test constants
@@ -34,23 +35,15 @@ const TRANSPORT_2: LivekitTransport = {
livekit_alias: "!alias:sample.com", livekit_alias: "!alias:sample.com",
}; };
// const TRANSPORT_3: LivekitTransport = {
// type: "livekit",
// livekit_service_url: "https://lk-other.sample.com",
// livekit_alias: "!alias:sample.com",
// };
let fakeConnectionFactory: ConnectionFactory; let fakeConnectionFactory: ConnectionFactory;
let testScope: ObservableScope; let testScope: ObservableScope;
let testTransportStream$: BehaviorSubject<LivekitTransport[]>;
let connectionManagerInputs: { // Can be useful to track all created connections in tests, even the disposed ones
scope: ObservableScope; let allCreatedConnections: Connection[];
connectionFactory: ConnectionFactory;
inputTransports$: BehaviorSubject<LivekitTransport[]>;
};
let manager: IConnectionManager;
beforeEach(() => { beforeEach(() => {
testScope = new ObservableScope(); testScope = new ObservableScope();
allCreatedConnections = [];
fakeConnectionFactory = {} as unknown as ConnectionFactory; fakeConnectionFactory = {} as unknown as ConnectionFactory;
vi.mocked(fakeConnectionFactory).createConnection = vi vi.mocked(fakeConnectionFactory).createConnection = vi
.fn() .fn()
@@ -58,6 +51,7 @@ beforeEach(() => {
(transport: LivekitTransport, scope: ObservableScope) => { (transport: LivekitTransport, scope: ObservableScope) => {
const mockConnection = { const mockConnection = {
transport, transport,
participantsWithTrack$: new BehaviorSubject([]),
} as unknown as Connection; } as unknown as Connection;
vi.mocked(mockConnection).start = vi.fn(); vi.mocked(mockConnection).start = vi.fn();
vi.mocked(mockConnection).stop = vi.fn(); vi.mocked(mockConnection).stop = vi.fn();
@@ -65,17 +59,10 @@ beforeEach(() => {
scope.onEnd(() => { scope.onEnd(() => {
void mockConnection.stop(); void mockConnection.stop();
}); });
allCreatedConnections.push(mockConnection);
return mockConnection; return mockConnection;
}, },
); );
testTransportStream$ = new BehaviorSubject<LivekitTransport[]>([]);
connectionManagerInputs = {
scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: testTransportStream$,
};
manager = createConnectionManager$(connectionManagerInputs);
}); });
afterEach(() => { afterEach(() => {
@@ -83,93 +70,122 @@ afterEach(() => {
}); });
describe("connections$ stream", () => { describe("connections$ stream", () => {
test("Should create and start new connections for each transports", async () => { test("Should create and start new connections for each transports", () => {
const managedConnections = Promise.withResolvers<Connection[]>(); withTestScheduler(({ behavior, expectObservable }) => {
manager.connections$.subscribe((connections) => { const { connections$ } = createConnectionManager$({
if (connections.length > 0) managedConnections.resolve(connections); scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("a", {
a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0),
}),
});
expectObservable(connections$).toBe("a", {
a: expect.toSatisfy((e: Epoch<Connection[]>) => {
const connections = e.value;
expect(connections.length).toBe(2);
expect(
vi.mocked(fakeConnectionFactory).createConnection,
).toHaveBeenCalledTimes(2);
const conn1 = connections.find((c) =>
areLivekitTransportsEqual(c.transport, TRANSPORT_1),
);
expect(conn1).toBeDefined();
expect(conn1!.start).toHaveBeenCalled();
const conn2 = connections.find((c) =>
areLivekitTransportsEqual(c.transport, TRANSPORT_2),
);
expect(conn2).toBeDefined();
expect(conn2!.start).toHaveBeenCalled();
return true;
}),
});
}); });
connectionManagerInputs.inputTransports$.next([TRANSPORT_1, TRANSPORT_2]);
const connections = await managedConnections.promise;
expect(connections.length).toBe(2);
expect(
vi.mocked(fakeConnectionFactory).createConnection,
).toHaveBeenCalledTimes(2);
const conn1 = connections.find((c) =>
areLivekitTransportsEqual(c.transport, TRANSPORT_1),
);
expect(conn1).toBeDefined();
expect(conn1!.start).toHaveBeenCalled();
const conn2 = connections.find((c) =>
areLivekitTransportsEqual(c.transport, TRANSPORT_2),
);
expect(conn2).toBeDefined();
expect(conn2!.start).toHaveBeenCalled();
}); });
test("Should start connection only once", async () => { test("Should start connection only once", () => {
const observedConnections: Connection[][] = []; withTestScheduler(({ behavior, expectObservable }) => {
manager.connections$.subscribe((connections) => { const { connections$ } = createConnectionManager$({
observedConnections.push(connections); scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("abcdef", {
a: new Epoch([TRANSPORT_1], 0),
b: new Epoch([TRANSPORT_1], 1),
c: new Epoch([TRANSPORT_1], 2),
d: new Epoch([TRANSPORT_1], 3),
e: new Epoch([TRANSPORT_1], 4),
f: new Epoch([TRANSPORT_1, TRANSPORT_2], 5),
}),
});
expectObservable(connections$).toBe("xxxxxa", {
x: expect.anything(),
a: expect.toSatisfy((e: Epoch<Connection[]>) => {
const connections = e.value;
expect(connections.length).toBe(2);
expect(
vi.mocked(fakeConnectionFactory).createConnection,
).toHaveBeenCalledTimes(2);
const conn2 = connections.find((c) =>
areLivekitTransportsEqual(c.transport, TRANSPORT_2),
);
expect(conn2).toBeDefined();
const conn1 = connections.find((c) =>
areLivekitTransportsEqual(c.transport, TRANSPORT_1),
);
expect(conn1).toBeDefined();
expect(conn1!.start).toHaveBeenCalledOnce();
return true;
}),
});
}); });
testTransportStream$.next([TRANSPORT_1]);
testTransportStream$.next([TRANSPORT_1]);
testTransportStream$.next([TRANSPORT_1]);
testTransportStream$.next([TRANSPORT_1]);
testTransportStream$.next([TRANSPORT_1]);
testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]);
await flushPromises();
const connections = observedConnections.pop()!;
expect(connections.length).toBe(2);
expect(
vi.mocked(fakeConnectionFactory).createConnection,
).toHaveBeenCalledTimes(2);
const conn2 = connections.find((c) =>
areLivekitTransportsEqual(c.transport, TRANSPORT_2),
);
expect(conn2).toBeDefined();
const conn1 = connections.find((c) =>
areLivekitTransportsEqual(c.transport, TRANSPORT_1),
);
expect(conn1).toBeDefined();
expect(conn1!.start).toHaveBeenCalledOnce();
}); });
test("Should cleanup connections when not needed anymore", async () => { test("Should cleanup connections when not needed anymore", () => {
const observedConnections: Connection[][] = []; withTestScheduler(({ behavior, expectObservable }) => {
manager.connections$.subscribe((connections) => { const { connections$ } = createConnectionManager$({
observedConnections.push(connections); scope: testScope,
connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("abc", {
a: new Epoch([TRANSPORT_1], 0),
b: new Epoch([TRANSPORT_1, TRANSPORT_2], 1),
c: new Epoch([TRANSPORT_1], 2),
}),
});
expectObservable(connections$).toBe("xab", {
x: expect.anything(),
a: expect.toSatisfy((e: Epoch<Connection[]>) => {
const connections = e.value;
expect(connections.length).toBe(2);
return true;
}),
b: expect.toSatisfy((e: Epoch<Connection[]>) => {
const connections = e.value;
expect(connections.length).toBe(1);
// The second connection should have been stopped has it is no longer needed.
const connection2 = allCreatedConnections.find((c) =>
areLivekitTransportsEqual(c.transport, TRANSPORT_2),
);
expect(connection2).toBeDefined();
expect(connection2!.stop).toHaveBeenCalled();
// The first connection should still be active
const conn1 = connections[0];
expect(conn1.stop).not.toHaveBeenCalledOnce();
return true;
}),
});
}); });
testTransportStream$.next([TRANSPORT_1]);
testTransportStream$.next([TRANSPORT_1, TRANSPORT_2]);
await flushPromises();
const conn2 = observedConnections
.pop()!
.find((c) => areLivekitTransportsEqual(c.transport, TRANSPORT_2))!;
testTransportStream$.next([TRANSPORT_1]);
await flushPromises();
// The second connection should have been stopped has it is no longer needed
expect(conn2.stop).toHaveBeenCalled();
// The first connection should still be active
const conn1 = observedConnections.pop()![0];
expect(conn1.stop).not.toHaveBeenCalledOnce();
}); });
}); });
@@ -177,7 +193,7 @@ describe("connectionManagerData$ stream", () => {
// Used in test to control fake connections' participantsWithTrack$ streams // Used in test to control fake connections' participantsWithTrack$ streams
let fakePublishingParticipantsStreams: Map< let fakePublishingParticipantsStreams: Map<
string, string,
BehaviorSubject<LivekitParticipant[]> Behavior<LivekitParticipant[]>
>; >;
function keyForTransport(transport: LivekitTransport): string { function keyForTransport(transport: LivekitTransport): string {
@@ -186,6 +202,16 @@ describe("connectionManagerData$ stream", () => {
beforeEach(() => { beforeEach(() => {
fakePublishingParticipantsStreams = new Map(); fakePublishingParticipantsStreams = new Map();
function getPublishingParticipantsFor(
transport: LivekitTransport,
): Behavior<LivekitParticipant[]> {
return (
fakePublishingParticipantsStreams.get(keyForTransport(transport)) ??
new BehaviorSubject([])
);
}
// need a more advanced fake connection factory // need a more advanced fake connection factory
vi.mocked(fakeConnectionFactory).createConnection = vi vi.mocked(fakeConnectionFactory).createConnection = vi
.fn() .fn()
@@ -196,7 +222,7 @@ describe("connectionManagerData$ stream", () => {
>([]); >([]);
const mockConnection = { const mockConnection = {
transport, transport,
participantsWithTrack$: fakePublishingParticipants$, participantsWithTrack$: getPublishingParticipantsFor(transport),
} as unknown as Connection; } as unknown as Connection;
vi.mocked(mockConnection).start = vi.fn(); vi.mocked(mockConnection).start = vi.fn();
vi.mocked(mockConnection).stop = vi.fn(); vi.mocked(mockConnection).stop = vi.fn();
@@ -216,80 +242,83 @@ describe("connectionManagerData$ stream", () => {
test("Should report connections with the publishing participants", () => { test("Should report connections with the publishing participants", () => {
withTestScheduler(({ expectObservable, schedule, behavior }) => { withTestScheduler(({ expectObservable, schedule, behavior }) => {
manager = createConnectionManager$({ // Setup the fake participants streams behavior
...connectionManagerInputs, // ==============================
inputTransports$: behavior("a", { fakePublishingParticipantsStreams.set(
a: [TRANSPORT_1, TRANSPORT_2],
}),
});
const conn1Participants$ = fakePublishingParticipantsStreams.get(
keyForTransport(TRANSPORT_1), keyForTransport(TRANSPORT_1),
)!; behavior("oa-b", {
o: [],
schedule("-a-b", { a: [{ identity: "user1A" } as LivekitParticipant],
a: () => { b: [
conn1Participants$.next([
{ identity: "user1A" } as LivekitParticipant,
]);
},
b: () => {
conn1Participants$.next([
{ identity: "user1A" } as LivekitParticipant, { identity: "user1A" } as LivekitParticipant,
{ identity: "user1B" } as LivekitParticipant, { identity: "user1B" } as LivekitParticipant,
]); ],
}, }),
}); );
const conn2Participants$ = fakePublishingParticipantsStreams.get( fakePublishingParticipantsStreams.set(
keyForTransport(TRANSPORT_2), keyForTransport(TRANSPORT_2),
)!; behavior("o-a", {
o: [],
a: [{ identity: "user2A" } as LivekitParticipant],
}),
);
// ==============================
schedule("--a", { const { connectionManagerData$ } = createConnectionManager$({
a: () => { scope: testScope,
conn2Participants$.next([ connectionFactory: fakeConnectionFactory,
{ identity: "user2A" } as LivekitParticipant, inputTransports$: behavior("a", {
]); a: new Epoch([TRANSPORT_1, TRANSPORT_2], 0),
}, }),
}); });
expectObservable(manager.connectionManagerData$).toBe("abcd", { expectObservable(connectionManagerData$).toBe("abcd", {
a: expect.toSatisfy((data) => { a: expect.toSatisfy((e) => {
return ( const data: ConnectionManagerData = e.value;
data.getConnections().length == 2 && expect(data.getConnections().length).toBe(2);
data.getParticipantForTransport(TRANSPORT_1).length == 0 && expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(0);
data.getParticipantForTransport(TRANSPORT_2).length == 0 expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0);
); return true;
}), }),
b: expect.toSatisfy((data) => { b: expect.toSatisfy((e) => {
return ( const data: ConnectionManagerData = e.value;
data.getConnections().length == 2 && expect(data.getConnections().length).toBe(2);
data.getParticipantForTransport(TRANSPORT_1).length == 1 && expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1);
data.getParticipantForTransport(TRANSPORT_2).length == 0 && expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0);
data.getParticipantForTransport(TRANSPORT_1)[0].identity == "user1A" expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
"user1A",
); );
return true;
}), }),
c: expect.toSatisfy((data) => { c: expect.toSatisfy((e) => {
return ( const data: ConnectionManagerData = e.value;
data.getConnections().length == 2 && expect(data.getConnections().length).toBe(2);
data.getParticipantForTransport(TRANSPORT_1).length == 1 && expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1);
data.getParticipantForTransport(TRANSPORT_2).length == 1 && expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1);
data.getParticipantForTransport(TRANSPORT_1)[0].identity == expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
"user1A" && "user1A",
data.getParticipantForTransport(TRANSPORT_2)[0].identity == "user2A"
); );
expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe(
"user2A",
);
return true;
}), }),
d: expect.toSatisfy((data) => { d: expect.toSatisfy((e) => {
return ( const data: ConnectionManagerData = e.value;
data.getConnections().length == 2 && expect(data.getConnections().length).toBe(2);
data.getParticipantForTransport(TRANSPORT_1).length == 2 && expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(2);
data.getParticipantForTransport(TRANSPORT_2).length == 1 && expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1);
data.getParticipantForTransport(TRANSPORT_1)[0].identity == expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
"user1A" && "user1A",
data.getParticipantForTransport(TRANSPORT_1)[1].identity ==
"user1B" &&
data.getParticipantForTransport(TRANSPORT_2)[0].identity == "user2A"
); );
expect(data.getParticipantForTransport(TRANSPORT_1)[1].identity).toBe(
"user1B",
);
expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe(
"user2A",
);
return true;
}), }),
}); });
}); });

View File

@@ -6,6 +6,7 @@ Please see LICENSE in the repository root for full details.
*/ */
import { describe, expect, it } from "vitest"; import { describe, expect, it } from "vitest";
import { BehaviorSubject, timer } from "rxjs";
import { import {
Epoch, Epoch,
@@ -14,7 +15,6 @@ import {
trackEpoch, trackEpoch,
} from "./ObservableScope"; } from "./ObservableScope";
import { withTestScheduler } from "../utils/test"; import { withTestScheduler } from "../utils/test";
import { BehaviorSubject, timer } from "rxjs";
describe("Epoch", () => { describe("Epoch", () => {
it("should map the value correctly", () => { it("should map the value correctly", () => {