Fix connection leaks: Ensure that any pending connection open are cancelled/undo when ActiveCall is unmounted (#3255)

* Better logs for connection/component lifecycle

* fix: `AudioCaptureOptions` was causing un-necessary effect render

AudioCaptureOptions was a different object but with same internal values, use directly deviceId so that Object.is works properly

* fix: Livekit openned connection leaks

* review: rename to AbortHandles

* review: rename variable

---------

Co-authored-by: Timo <toger5@hotmail.de>
This commit is contained in:
Valere Fedronic
2025-05-14 18:41:22 +02:00
committed by GitHub
parent 4569d01353
commit 86d80630c1
7 changed files with 233 additions and 18 deletions

View File

@@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details.
*/ */
import { type FC, useCallback, useState } from "react"; import { type FC, useCallback, useState } from "react";
import { test, vi } from "vitest"; import { describe, expect, test, vi, vitest } from "vitest";
import { import {
ConnectionError, ConnectionError,
ConnectionErrorReason, ConnectionErrorReason,
@@ -15,6 +15,7 @@ import {
import userEvent from "@testing-library/user-event"; import userEvent from "@testing-library/user-event";
import { render, screen } from "@testing-library/react"; import { render, screen } from "@testing-library/react";
import { MemoryRouter } from "react-router-dom"; import { MemoryRouter } from "react-router-dom";
import { defer, sleep } from "matrix-js-sdk/lib/utils";
import { useECConnectionState } from "./useECConnectionState"; import { useECConnectionState } from "./useECConnectionState";
import { type SFUConfig } from "./openIDSFU"; import { type SFUConfig } from "./openIDSFU";
@@ -57,7 +58,7 @@ test.each<[string, ConnectionError]>([
() => setSfuConfig({ url: "URL", jwt: "JWT token" }), () => setSfuConfig({ url: "URL", jwt: "JWT token" }),
[], [],
); );
useECConnectionState({}, false, mockRoom, sfuConfig); useECConnectionState("default", false, mockRoom, sfuConfig);
return <button onClick={connect}>Connect</button>; return <button onClick={connect}>Connect</button>;
}; };
@@ -73,3 +74,111 @@ test.each<[string, ConnectionError]>([
screen.getByText("Insufficient capacity"); screen.getByText("Insufficient capacity");
}, },
); );
describe("Leaking connection prevention", () => {
function createTestComponent(mockRoom: Room): FC {
const TestComponent: FC = () => {
const [sfuConfig, setSfuConfig] = useState<SFUConfig | undefined>(
undefined,
);
const connect = useCallback(
() => setSfuConfig({ url: "URL", jwt: "JWT token" }),
[],
);
useECConnectionState("default", false, mockRoom, sfuConfig);
return <button onClick={connect}>Connect</button>;
};
return TestComponent;
}
test("Should cancel pending connections when the component is unmounted", async () => {
const connectCall = vi.fn();
const pendingConnection = defer<void>();
// let pendingDisconnection = defer<void>()
const disconnectMock = vi.fn();
const mockRoom = {
on: () => {},
off: () => {},
once: () => {},
connect: async () => {
connectCall.call(undefined);
return await pendingConnection.promise;
},
disconnect: disconnectMock,
localParticipant: {
getTrackPublication: () => {},
createTracks: () => [],
},
} as unknown as Room;
const TestComponent = createTestComponent(mockRoom);
const { unmount } = render(<TestComponent />);
const user = userEvent.setup();
await user.click(screen.getByRole("button", { name: "Connect" }));
expect(connectCall).toHaveBeenCalled();
// unmount while the connection is pending
unmount();
// resolve the pending connection
pendingConnection.resolve();
await vitest.waitUntil(
() => {
return disconnectMock.mock.calls.length > 0;
},
{
timeout: 1000,
interval: 100,
},
);
// There should be some cleaning up to avoid leaking an open connection
expect(disconnectMock).toHaveBeenCalledTimes(1);
});
test("Should cancel about to open but not yet opened connection", async () => {
const createTracksCall = vi.fn();
const pendingCreateTrack = defer<void>();
// let pendingDisconnection = defer<void>()
const disconnectMock = vi.fn();
const connectMock = vi.fn();
const mockRoom = {
on: () => {},
off: () => {},
once: () => {},
connect: connectMock,
disconnect: disconnectMock,
localParticipant: {
getTrackPublication: () => {},
createTracks: async () => {
createTracksCall.call(undefined);
await pendingCreateTrack.promise;
return [];
},
},
} as unknown as Room;
const TestComponent = createTestComponent(mockRoom);
const { unmount } = render(<TestComponent />);
const user = userEvent.setup();
await user.click(screen.getByRole("button", { name: "Connect" }));
expect(createTracksCall).toHaveBeenCalled();
// unmount while createTracks is pending
unmount();
// resolve createTracks
pendingCreateTrack.resolve();
// Yield to the event loop to let the connection attempt finish
await sleep(100);
// The operation should have been aborted before even calling connect.
expect(connectMock).not.toHaveBeenCalled();
});
});

View File

@@ -6,7 +6,6 @@ Please see LICENSE in the repository root for full details.
*/ */
import { import {
type AudioCaptureOptions,
ConnectionError, ConnectionError,
ConnectionState, ConnectionState,
type LocalTrack, type LocalTrack,
@@ -25,6 +24,7 @@ import {
InsufficientCapacityError, InsufficientCapacityError,
UnknownCallError, UnknownCallError,
} from "../utils/errors.ts"; } from "../utils/errors.ts";
import { AbortHandle } from "../utils/abortHandle.ts";
declare global { declare global {
interface Window { interface Window {
@@ -59,7 +59,8 @@ async function doConnect(
livekitRoom: Room, livekitRoom: Room,
sfuConfig: SFUConfig, sfuConfig: SFUConfig,
audioEnabled: boolean, audioEnabled: boolean,
audioOptions: AudioCaptureOptions, initialDeviceId: string | undefined,
abortHandle: AbortHandle,
): Promise<void> { ): Promise<void> {
// Always create an audio track manually. // Always create an audio track manually.
// livekit (by default) keeps the mic track open when you mute, but if you start muted, // livekit (by default) keeps the mic track open when you mute, but if you start muted,
@@ -82,19 +83,40 @@ async function doConnect(
let preCreatedAudioTrack: LocalTrack | undefined; let preCreatedAudioTrack: LocalTrack | undefined;
try { try {
const audioTracks = await livekitRoom!.localParticipant.createTracks({ const audioTracks = await livekitRoom!.localParticipant.createTracks({
audio: audioOptions, audio: { deviceId: initialDeviceId },
}); });
if (audioTracks.length < 1) { if (audioTracks.length < 1) {
logger.info("Tried to pre-create local audio track but got no tracks"); logger.info("Tried to pre-create local audio track but got no tracks");
} else { } else {
preCreatedAudioTrack = audioTracks[0]; preCreatedAudioTrack = audioTracks[0];
} }
// There was a yield point previously (awaiting for the track to be created) so we need to check
// if the operation was cancelled and stop connecting if needed.
if (abortHandle.isAborted()) {
logger.info(
"[Lifecycle] Signal Aborted: Pre-created audio track but connection aborted",
);
preCreatedAudioTrack?.stop();
return;
}
logger.info("Pre-created microphone track"); logger.info("Pre-created microphone track");
} catch (e) { } catch (e) {
logger.error("Failed to pre-create microphone track", e); logger.error("Failed to pre-create microphone track", e);
} }
if (!audioEnabled) await preCreatedAudioTrack?.mute(); if (!audioEnabled) {
await preCreatedAudioTrack?.mute();
// There was a yield point. Check if the operation was cancelled and stop connecting.
if (abortHandle.isAborted()) {
logger.info(
"[Lifecycle] Signal Aborted: Pre-created audio track but connection aborted",
);
preCreatedAudioTrack?.stop();
return;
}
}
// check again having awaited for the track to create // check again having awaited for the track to create
if ( if (
@@ -107,9 +129,18 @@ async function doConnect(
return; return;
} }
logger.info("Connecting & publishing"); logger.info("[Lifecycle] Connecting & publishing");
try { try {
await connectAndPublish(livekitRoom, sfuConfig, preCreatedAudioTrack, []); await connectAndPublish(livekitRoom, sfuConfig, preCreatedAudioTrack, []);
if (abortHandle.isAborted()) {
logger.info(
"[Lifecycle] Signal Aborted: Connected but operation was cancelled. Force disconnect",
);
livekitRoom?.disconnect().catch((err) => {
logger.error("Failed to disconnect from SFU", err);
});
return;
}
} catch (e) { } catch (e) {
preCreatedAudioTrack?.stop(); preCreatedAudioTrack?.stop();
logger.debug("Stopped precreated audio tracks."); logger.debug("Stopped precreated audio tracks.");
@@ -137,13 +168,16 @@ async function connectAndPublish(
livekitRoom.once(RoomEvent.SignalConnected, tracker.cacheWsConnect); livekitRoom.once(RoomEvent.SignalConnected, tracker.cacheWsConnect);
try { try {
logger.info(`[Lifecycle] Connecting to livekit room ${sfuConfig!.url} ...`);
await livekitRoom!.connect(sfuConfig!.url, sfuConfig!.jwt, { await livekitRoom!.connect(sfuConfig!.url, sfuConfig!.jwt, {
// Due to stability issues on Firefox we are testing the effect of different // Due to stability issues on Firefox we are testing the effect of different
// timeouts, and allow these values to be set through the console // timeouts, and allow these values to be set through the console
peerConnectionTimeout: window.peerConnectionTimeout ?? 45000, peerConnectionTimeout: window.peerConnectionTimeout ?? 45000,
websocketTimeout: window.websocketTimeout ?? 45000, websocketTimeout: window.websocketTimeout ?? 45000,
}); });
logger.info(`[Lifecycle] ... connected to livekit room`);
} catch (e) { } catch (e) {
logger.error("[Lifecycle] Failed to connect", e);
// LiveKit uses 503 to indicate that the server has hit its track limits. // LiveKit uses 503 to indicate that the server has hit its track limits.
// https://github.com/livekit/livekit/blob/fcb05e97c5a31812ecf0ca6f7efa57c485cea9fb/pkg/service/rtcservice.go#L171 // https://github.com/livekit/livekit/blob/fcb05e97c5a31812ecf0ca6f7efa57c485cea9fb/pkg/service/rtcservice.go#L171
// It also errors with a status code of 200 (yes, really) for room // It also errors with a status code of 200 (yes, really) for room
@@ -184,7 +218,7 @@ async function connectAndPublish(
} }
export function useECConnectionState( export function useECConnectionState(
initialAudioOptions: AudioCaptureOptions, initialDeviceId: string | undefined,
initialAudioEnabled: boolean, initialAudioEnabled: boolean,
livekitRoom?: Room, livekitRoom?: Room,
sfuConfig?: SFUConfig, sfuConfig?: SFUConfig,
@@ -247,6 +281,22 @@ export function useECConnectionState(
const currentSFUConfig = useRef(Object.assign({}, sfuConfig)); const currentSFUConfig = useRef(Object.assign({}, sfuConfig));
// Protection against potential leaks, where the component to be unmounted and there is
// still a pending doConnect promise. This would lead the user to still be in the call even
// if the component is unmounted.
const abortHandlesBag = useRef(new Set<AbortHandle>());
// This is a cleanup function that will be called when the component is about to be unmounted.
// It will cancel all abortHandles in the bag
useEffect(() => {
const bag = abortHandlesBag.current;
return (): void => {
bag.forEach((handle) => {
handle.abort();
});
};
}, []);
// Id we are transitioning from a valid config to another valid one, we need // Id we are transitioning from a valid config to another valid one, we need
// to explicitly switch focus // to explicitly switch focus
useEffect(() => { useEffect(() => {
@@ -273,11 +323,14 @@ export function useECConnectionState(
// always capturing audio: it helps keep bluetooth headsets in the right mode and // always capturing audio: it helps keep bluetooth headsets in the right mode and
// mobile browsers to know we're doing a call. // mobile browsers to know we're doing a call.
setIsInDoConnect(true); setIsInDoConnect(true);
const abortHandle = new AbortHandle();
abortHandlesBag.current.add(abortHandle);
doConnect( doConnect(
livekitRoom!, livekitRoom!,
sfuConfig!, sfuConfig!,
initialAudioEnabled, initialAudioEnabled,
initialAudioOptions, initialDeviceId,
abortHandle,
) )
.catch((e) => { .catch((e) => {
if (e instanceof ElementCallError) { if (e instanceof ElementCallError) {
@@ -286,14 +339,17 @@ export function useECConnectionState(
setError(new UnknownCallError(e)); setError(new UnknownCallError(e));
} else logger.error("Failed to connect to SFU", e); } else logger.error("Failed to connect to SFU", e);
}) })
.finally(() => setIsInDoConnect(false)); .finally(() => {
abortHandlesBag.current.delete(abortHandle);
setIsInDoConnect(false);
});
} }
currentSFUConfig.current = Object.assign({}, sfuConfig); currentSFUConfig.current = Object.assign({}, sfuConfig);
}, [ }, [
sfuConfig, sfuConfig,
livekitRoom, livekitRoom,
initialAudioOptions, initialDeviceId,
initialAudioEnabled, initialAudioEnabled,
doFocusSwitch, doFocusSwitch,
]); ]);

View File

@@ -155,9 +155,7 @@ export function useLivekit(
); );
const connectionState = useECConnectionState( const connectionState = useECConnectionState(
{ initialDevices.current.audioInput.selectedId,
deviceId: initialDevices.current.audioInput.selectedId,
},
initialMuteStates.current.audio.enabled, initialMuteStates.current.audio.enabled,
room, room,
sfuConfig, sfuConfig,

View File

@@ -121,6 +121,13 @@ export const GroupCallView: FC<Props> = ({
// eslint-disable-next-line react-hooks/exhaustive-deps // eslint-disable-next-line react-hooks/exhaustive-deps
}, []); }, []);
useEffect(() => {
logger.info("[Lifecycle] GroupCallView Component mounted");
return (): void => {
logger.info("[Lifecycle] GroupCallView Component unmounted");
};
}, []);
useEffect(() => { useEffect(() => {
window.rtcSession = rtcSession; window.rtcSession = rtcSession;
return (): void => { return (): void => {

View File

@@ -132,10 +132,23 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
const [vm, setVm] = useState<CallViewModel | null>(null); const [vm, setVm] = useState<CallViewModel | null>(null);
useEffect(() => { useEffect(() => {
logger.info(
`[Lifecycle] InCallView Component mounted, livekitroom state ${livekitRoom?.state}`,
);
return (): void => { return (): void => {
livekitRoom?.disconnect().catch((e) => { logger.info(
logger.error("Failed to disconnect from livekit room", e); `[Lifecycle] InCallView Component unmounted, livekitroom state ${livekitRoom?.state}`,
}); );
livekitRoom
?.disconnect()
.then(() => {
logger.info(
`[Lifecycle] Disconnected from livekite room, state:${livekitRoom?.state}`,
);
})
.catch((e) => {
logger.error("[Lifecycle] Failed to disconnect from livekit room", e);
});
}; };
// eslint-disable-next-line react-hooks/exhaustive-deps // eslint-disable-next-line react-hooks/exhaustive-deps
}, []); }, []);

View File

@@ -5,7 +5,14 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details. Please see LICENSE in the repository root for full details.
*/ */
import { type FC, useCallback, useMemo, useState, type JSX } from "react"; import {
type FC,
useCallback,
useMemo,
useState,
type JSX,
useEffect,
} from "react";
import { useTranslation } from "react-i18next"; import { useTranslation } from "react-i18next";
import { type MatrixClient } from "matrix-js-sdk"; import { type MatrixClient } from "matrix-js-sdk";
import { Button } from "@vector-im/compound-web"; import { Button } from "@vector-im/compound-web";
@@ -72,6 +79,13 @@ export const LobbyView: FC<Props> = ({
onShareClick, onShareClick,
waitingForInvite, waitingForInvite,
}) => { }) => {
useEffect(() => {
logger.info("[Lifecycle] GroupCallView Component mounted");
return (): void => {
logger.info("[Lifecycle] GroupCallView Component unmounted");
};
}, []);
const { t } = useTranslation(); const { t } = useTranslation();
usePageTitle(matrixInfo.roomName); usePageTitle(matrixInfo.roomName);

18
src/utils/abortHandle.ts Normal file
View File

@@ -0,0 +1,18 @@
/*
Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
export class AbortHandle {
public constructor(private aborted = false) {}
public abort(): void {
this.aborted = true;
}
public isAborted(): boolean {
return this.aborted;
}
}