Clean up subscriptions in Connection tests

This commit is contained in:
Robin
2025-10-08 18:58:03 -04:00
parent 64c2e5911c
commit 2c576a7477

View File

@@ -12,6 +12,7 @@ import {
it, it,
type Mock, type Mock,
type MockedObject, type MockedObject,
onTestFinished,
vi, vi,
} from "vitest"; } from "vitest";
import { BehaviorSubject, of } from "rxjs"; import { BehaviorSubject, of } from "rxjs";
@@ -179,9 +180,10 @@ describe("Start connection states", () => {
const connection = new RemoteConnection(opts, undefined); const connection = new RemoteConnection(opts, undefined);
const capturedStates: FocusConnectionState[] = []; const capturedStates: FocusConnectionState[] = [];
connection.focusConnectionState$.subscribe((value) => { const s = connection.focusConnectionState$.subscribe((value) => {
capturedStates.push(value); capturedStates.push(value);
}); });
onTestFinished(() => s.unsubscribe());
const deferred = Promise.withResolvers<IOpenIDToken>(); const deferred = Promise.withResolvers<IOpenIDToken>();
@@ -231,9 +233,10 @@ describe("Start connection states", () => {
const connection = new RemoteConnection(opts, undefined); const connection = new RemoteConnection(opts, undefined);
const capturedStates: FocusConnectionState[] = []; const capturedStates: FocusConnectionState[] = [];
connection.focusConnectionState$.subscribe((value) => { const s = connection.focusConnectionState$.subscribe((value) => {
capturedStates.push(value); capturedStates.push(value);
}); });
onTestFinished(() => s.unsubscribe());
const deferredSFU = Promise.withResolvers<void>(); const deferredSFU = Promise.withResolvers<void>();
// mock the /sfu/get call // mock the /sfu/get call
@@ -287,9 +290,10 @@ describe("Start connection states", () => {
const connection = new RemoteConnection(opts, undefined); const connection = new RemoteConnection(opts, undefined);
const capturedStates: FocusConnectionState[] = []; const capturedStates: FocusConnectionState[] = [];
connection.focusConnectionState$.subscribe((value) => { const s = connection.focusConnectionState$.subscribe((value) => {
capturedStates.push(value); capturedStates.push(value);
}); });
onTestFinished(() => s.unsubscribe());
const deferredSFU = Promise.withResolvers<void>(); const deferredSFU = Promise.withResolvers<void>();
// mock the /sfu/get call // mock the /sfu/get call
@@ -342,21 +346,22 @@ describe("Start connection states", () => {
const connection = setupRemoteConnection(); const connection = setupRemoteConnection();
const capturedState: FocusConnectionState[] = []; const capturedStates: FocusConnectionState[] = [];
connection.focusConnectionState$.subscribe((value) => { const s = connection.focusConnectionState$.subscribe((value) => {
capturedState.push(value); capturedStates.push(value);
}); });
onTestFinished(() => s.unsubscribe());
await connection.start(); await connection.start();
await vi.runAllTimersAsync(); await vi.runAllTimersAsync();
const initialState = capturedState.shift(); const initialState = capturedStates.shift();
expect(initialState?.state).toEqual("Initialized"); expect(initialState?.state).toEqual("Initialized");
const fetchingState = capturedState.shift(); const fetchingState = capturedStates.shift();
expect(fetchingState?.state).toEqual("FetchingConfig"); expect(fetchingState?.state).toEqual("FetchingConfig");
const connectingState = capturedState.shift(); const connectingState = capturedStates.shift();
expect(connectingState?.state).toEqual("ConnectingToLkRoom"); expect(connectingState?.state).toEqual("ConnectingToLkRoom");
const connectedState = capturedState.shift(); const connectedState = capturedStates.shift();
expect(connectedState?.state).toEqual("ConnectedToLkRoom"); expect(connectedState?.state).toEqual("ConnectedToLkRoom");
}); });
@@ -367,10 +372,11 @@ describe("Start connection states", () => {
await connection.start(); await connection.start();
let capturedState: FocusConnectionState[] = []; let capturedStates: FocusConnectionState[] = [];
connection.focusConnectionState$.subscribe((value) => { const s = connection.focusConnectionState$.subscribe((value) => {
capturedState.push(value); capturedStates.push(value);
}); });
onTestFinished(() => s.unsubscribe());
const states = [ const states = [
ConnectionState.Disconnected, ConnectionState.Disconnected,
@@ -386,7 +392,7 @@ describe("Start connection states", () => {
} }
for (const state of states) { for (const state of states) {
const s = capturedState.shift(); const s = capturedStates.shift();
expect(s?.state).toEqual("ConnectedToLkRoom"); expect(s?.state).toEqual("ConnectedToLkRoom");
const connectedState = s as FocusConnectionState & { const connectedState = s as FocusConnectionState & {
state: "ConnectedToLkRoom"; state: "ConnectedToLkRoom";
@@ -404,12 +410,12 @@ describe("Start connection states", () => {
// If the state is not ConnectedToLkRoom, no events should be relayed anymore // If the state is not ConnectedToLkRoom, no events should be relayed anymore
await connection.stop(); await connection.stop();
capturedState = []; capturedStates = [];
for (const state of states) { for (const state of states) {
fakeRoomEventEmiter.emit(RoomEvent.ConnectionStateChanged, state); fakeRoomEventEmiter.emit(RoomEvent.ConnectionStateChanged, state);
} }
expect(capturedState.length).toEqual(0); expect(capturedStates.length).toEqual(0);
}); });
it("shutting down the scope should stop the connection", async () => { it("shutting down the scope should stop the connection", async () => {
@@ -452,7 +458,7 @@ describe("Publishing participants observations", () => {
participant: RemoteParticipant; participant: RemoteParticipant;
membership: CallMembership; membership: CallMembership;
}[][] = []; }[][] = [];
connection.publishingParticipants$.subscribe((publishers) => { const s = connection.publishingParticipants$.subscribe((publishers) => {
observedPublishers.push(publishers); observedPublishers.push(publishers);
if ( if (
publishers.some( publishers.some(
@@ -469,6 +475,7 @@ describe("Publishing participants observations", () => {
danIsAPublisher.resolve(); danIsAPublisher.resolve();
} }
}); });
onTestFinished(() => s.unsubscribe());
// The publishingParticipants$ observable is derived from the current members of the // The publishingParticipants$ observable is derived from the current members of the
// livekitRoom and the rtc membership in order to publish the members that are publishing // livekitRoom and the rtc membership in order to publish the members that are publishing
// on this connection. // on this connection.
@@ -578,9 +585,10 @@ describe("Publishing participants observations", () => {
participant: RemoteParticipant; participant: RemoteParticipant;
membership: CallMembership; membership: CallMembership;
}[][] = []; }[][] = [];
connection.publishingParticipants$.subscribe((publishers) => { const s = connection.publishingParticipants$.subscribe((publishers) => {
observedPublishers.push(publishers); observedPublishers.push(publishers);
}); });
onTestFinished(() => s.unsubscribe());
let participants: RemoteParticipant[] = [ let participants: RemoteParticipant[] = [
fakeRemoteLivekitParticipant("@bob:example.org:DEV111"), fakeRemoteLivekitParticipant("@bob:example.org:DEV111"),