Merge branch 'livekit' into valere/double_pub_problems_highlevel

This commit is contained in:
Valere
2025-12-12 11:54:35 +01:00
16 changed files with 643 additions and 491 deletions

View File

@@ -54,7 +54,7 @@
"@opentelemetry/sdk-trace-base": "^2.0.0", "@opentelemetry/sdk-trace-base": "^2.0.0",
"@opentelemetry/sdk-trace-web": "^2.0.0", "@opentelemetry/sdk-trace-web": "^2.0.0",
"@opentelemetry/semantic-conventions": "^1.25.1", "@opentelemetry/semantic-conventions": "^1.25.1",
"@playwright/test": "^1.56.1", "@playwright/test": "^1.57.0",
"@radix-ui/react-dialog": "^1.0.4", "@radix-ui/react-dialog": "^1.0.4",
"@radix-ui/react-slider": "^1.1.2", "@radix-ui/react-slider": "^1.1.2",
"@radix-ui/react-visually-hidden": "^1.0.3", "@radix-ui/react-visually-hidden": "^1.0.3",

View File

@@ -111,19 +111,27 @@ async function registerUser(
await page.getByRole("textbox", { name: "Confirm password" }).click(); await page.getByRole("textbox", { name: "Confirm password" }).click();
await page.getByRole("textbox", { name: "Confirm password" }).fill(PASSWORD); await page.getByRole("textbox", { name: "Confirm password" }).fill(PASSWORD);
await page.getByRole("button", { name: "Register" }).click(); await page.getByRole("button", { name: "Register" }).click();
const continueButton = page.getByRole("button", { name: "Continue" });
try {
await expect(continueButton).toBeVisible({ timeout: 5000 });
await page
.getByRole("textbox", { name: "Password", exact: true })
.fill(PASSWORD);
await continueButton.click();
} catch {
// continueButton not visible, continue as normal
}
await expect( await expect(
page.getByRole("heading", { name: `Welcome ${username}` }), page.getByRole("heading", { name: `Welcome ${username}` }),
).toBeVisible(); ).toBeVisible();
const browserUnsupportedToast = page
.getByText("Element does not support this browser")
.locator("..")
.locator("..");
// Dismiss incompatible browser toast
const dismissButton = browserUnsupportedToast.getByRole("button", {
name: "Dismiss",
});
try {
await expect(dismissButton).toBeVisible({ timeout: 700 });
await dismissButton.click();
} catch {
// dismissButton not visible, continue as normal
}
await setDevToolElementCallDevUrl(page); await setDevToolElementCallDevUrl(page);
const clientHandle = await page.evaluateHandle(() => const clientHandle = await page.evaluateHandle(() =>

View File

@@ -160,6 +160,7 @@ export const GroupCallView: FC<Props> = ({
}, [rtcSession]); }, [rtcSession]);
// TODO move this into the callViewModel LocalMembership.ts // TODO move this into the callViewModel LocalMembership.ts
// We might actually not need this at all. Since we get into fatalError on those errors already?
useTypedEventEmitter( useTypedEventEmitter(
rtcSession, rtcSession,
MatrixRTCSessionEvent.MembershipManagerError, MatrixRTCSessionEvent.MembershipManagerError,
@@ -313,6 +314,7 @@ export const GroupCallView: FC<Props> = ({
const navigate = useNavigate(); const navigate = useNavigate();
// TODO split this into leave and onDisconnect
const onLeft = useCallback( const onLeft = useCallback(
( (
reason: "timeout" | "user" | "allOthersLeft" | "decline" | "error", reason: "timeout" | "user" | "allOthersLeft" | "decline" | "error",

View File

@@ -24,7 +24,7 @@ import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc";
import classNames from "classnames"; import classNames from "classnames";
import { BehaviorSubject, map } from "rxjs"; import { BehaviorSubject, map } from "rxjs";
import { useObservable } from "observable-hooks"; import { useObservable } from "observable-hooks";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { import {
VoiceCallSolidIcon, VoiceCallSolidIcon,
VolumeOnSolidIcon, VolumeOnSolidIcon,
@@ -109,6 +109,8 @@ import { useTrackProcessorObservable$ } from "../livekit/TrackProcessorContext.t
import { type Layout } from "../state/layout-types.ts"; import { type Layout } from "../state/layout-types.ts";
import { ObservableScope } from "../state/ObservableScope.ts"; import { ObservableScope } from "../state/ObservableScope.ts";
const logger = rootLogger.getChild("[InCallView]");
const maxTapDurationMs = 400; const maxTapDurationMs = 400;
export interface ActiveCallProps export interface ActiveCallProps
@@ -127,6 +129,7 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
const mediaDevices = useMediaDevices(); const mediaDevices = useMediaDevices();
const trackProcessorState$ = useTrackProcessorObservable$(); const trackProcessorState$ = useTrackProcessorObservable$();
useEffect(() => { useEffect(() => {
logger.info("START CALL VIEW SCOPE");
const scope = new ObservableScope(); const scope = new ObservableScope();
const reactionsReader = new ReactionsReader(scope, props.rtcSession); const reactionsReader = new ReactionsReader(scope, props.rtcSession);
const { autoLeaveWhenOthersLeft, waitForCallPickup, sendNotificationType } = const { autoLeaveWhenOthersLeft, waitForCallPickup, sendNotificationType } =
@@ -151,7 +154,9 @@ export const ActiveCall: FC<ActiveCallProps> = (props) => {
setVm(vm); setVm(vm);
vm.leave$.pipe(scope.bind()).subscribe(props.onLeft); vm.leave$.pipe(scope.bind()).subscribe(props.onLeft);
return (): void => { return (): void => {
logger.info("END CALL VIEW SCOPE");
scope.end(); scope.end();
}; };
}, [ }, [
@@ -270,7 +275,10 @@ export const InCallView: FC<InCallViewProps> = ({
const ringOverlay = useBehavior(vm.ringOverlay$); const ringOverlay = useBehavior(vm.ringOverlay$);
const fatalCallError = useBehavior(vm.fatalError$); const fatalCallError = useBehavior(vm.fatalError$);
// Stop the rendering and throw for the error boundary // Stop the rendering and throw for the error boundary
if (fatalCallError) throw fatalCallError; if (fatalCallError) {
logger.debug("fatalCallError stop rendering", fatalCallError);
throw fatalCallError;
}
// We need to set the proper timings on the animation based upon the sound length. // We need to set the proper timings on the animation based upon the sound length.
const ringDuration = pickupPhaseAudio?.soundDuration["waiting"] ?? 1; const ringDuration = pickupPhaseAudio?.soundDuration["waiting"] ?? 1;

View File

@@ -79,9 +79,9 @@ export const LobbyView: FC<Props> = ({
waitingForInvite, waitingForInvite,
}) => { }) => {
useEffect(() => { useEffect(() => {
logger.info("[Lifecycle] GroupCallView Component mounted"); logger.info("[Lifecycle] LobbyView Component mounted");
return (): void => { return (): void => {
logger.info("[Lifecycle] GroupCallView Component unmounted"); logger.info("[Lifecycle] LobbyView Component unmounted");
}; };
}, []); }, []);

View File

@@ -15,6 +15,7 @@ import {
} from "livekit-client"; } from "livekit-client";
import { type Room as MatrixRoom } from "matrix-js-sdk"; import { type Room as MatrixRoom } from "matrix-js-sdk";
import { import {
catchError,
combineLatest, combineLatest,
distinctUntilChanged, distinctUntilChanged,
filter, filter,
@@ -93,14 +94,14 @@ import {
type SpotlightLandscapeLayoutMedia, type SpotlightLandscapeLayoutMedia,
type SpotlightPortraitLayoutMedia, type SpotlightPortraitLayoutMedia,
} from "../layout-types.ts"; } from "../layout-types.ts";
import { type ElementCallError } from "../../utils/errors.ts"; import { ElementCallError } from "../../utils/errors.ts";
import { type ObservableScope } from "../ObservableScope.ts"; import { type ObservableScope } from "../ObservableScope.ts";
import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts"; import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts";
import { import {
createLocalMembership$, createLocalMembership$,
enterRTCSession, enterRTCSession,
RTCBackendState, TransportState,
} from "./localMember/LocalMembership.ts"; } from "./localMember/LocalMember.ts";
import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts";
import { import {
createMemberships$, createMemberships$,
@@ -425,7 +426,18 @@ export function createCallViewModel$(
connectionFactory: connectionFactory, connectionFactory: connectionFactory,
inputTransports$: scope.behavior( inputTransports$: scope.behavior(
combineLatest( combineLatest(
[localTransport$, membershipsAndTransports.transports$], [
localTransport$.pipe(
catchError((e: unknown) => {
logger.info(
"dont pass local transport to createConnectionManager$. localTransport$ threw an error",
e,
);
return of(null);
}),
),
membershipsAndTransports.transports$,
],
(localTransport, transports) => { (localTransport, transports) => {
const localTransportAsArray = localTransport ? [localTransport] : []; const localTransportAsArray = localTransport ? [localTransport] : [];
return transports.mapInner((transports) => [ return transports.mapInner((transports) => [
@@ -457,13 +469,13 @@ export function createCallViewModel$(
const localMembership = createLocalMembership$({ const localMembership = createLocalMembership$({
scope: scope, scope: scope,
homeserverConnected$: createHomeserverConnected$( homeserverConnected: createHomeserverConnected$(
scope, scope,
client, client,
matrixRTCSession, matrixRTCSession,
), ),
muteStates: muteStates, muteStates: muteStates,
joinMatrixRTC: async (transport: LivekitTransport) => { joinMatrixRTC: (transport: LivekitTransport) => {
return enterRTCSession( return enterRTCSession(
matrixRTCSession, matrixRTCSession,
transport, transport,
@@ -578,17 +590,6 @@ export function createCallViewModel$(
), ),
); );
/**
* Whether various media/event sources should pretend to be disconnected from
* all network input, even if their connection still technically works.
*/
// We do this when the app is in the 'reconnecting' state, because it might be
// that the LiveKit connection is still functional while the homeserver is
// down, for example, and we want to avoid making people worry that the app is
// in a split-brained state.
// DISCUSSION own membership manager ALSO this probably can be simplifis
const reconnecting$ = localMembership.reconnecting$;
const audioParticipants$ = scope.behavior( const audioParticipants$ = scope.behavior(
matrixLivekitMembers$.pipe( matrixLivekitMembers$.pipe(
switchMap((membersWithEpoch) => { switchMap((membersWithEpoch) => {
@@ -636,7 +637,7 @@ export function createCallViewModel$(
); );
const handsRaised$ = scope.behavior( const handsRaised$ = scope.behavior(
handsRaisedSubject$.pipe(pauseWhen(reconnecting$)), handsRaisedSubject$.pipe(pauseWhen(localMembership.reconnecting$)),
); );
const reactions$ = scope.behavior( const reactions$ = scope.behavior(
@@ -649,7 +650,7 @@ export function createCallViewModel$(
]), ]),
), ),
), ),
pauseWhen(reconnecting$), pauseWhen(localMembership.reconnecting$),
), ),
); );
@@ -740,7 +741,7 @@ export function createCallViewModel$(
livekitRoom$, livekitRoom$,
focusUrl$, focusUrl$,
mediaDevices, mediaDevices,
reconnecting$, localMembership.reconnecting$,
displayName$, displayName$,
matrixMemberMetadataStore.createAvatarUrlBehavior$(userId), matrixMemberMetadataStore.createAvatarUrlBehavior$(userId),
handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)), handsRaised$.pipe(map((v) => v[participantId]?.time ?? null)),
@@ -1423,13 +1424,44 @@ export function createCallViewModel$(
// reassigned here to make it publicly accessible // reassigned here to make it publicly accessible
const toggleScreenSharing = localMembership.toggleScreenSharing; const toggleScreenSharing = localMembership.toggleScreenSharing;
const errors$ = scope.behavior<{
transportError?: ElementCallError;
matrixError?: ElementCallError;
connectionError?: ElementCallError;
publishError?: ElementCallError;
} | null>(
localMembership.localMemberState$.pipe(
map((value) => {
const returnObject: {
transportError?: ElementCallError;
matrixError?: ElementCallError;
connectionError?: ElementCallError;
publishError?: ElementCallError;
} = {};
if (value instanceof ElementCallError) return { transportError: value };
if (value === TransportState.Waiting) return null;
if (value.matrix instanceof ElementCallError)
returnObject.matrixError = value.matrix;
if (value.media instanceof ElementCallError)
returnObject.publishError = value.media;
else if (
typeof value.media === "object" &&
value.media.connection instanceof ElementCallError
)
returnObject.connectionError = value.media.connection;
return returnObject;
}),
),
null,
);
return { return {
autoLeave$: autoLeave$, autoLeave$: autoLeave$,
callPickupState$: callPickupState$, callPickupState$: callPickupState$,
ringOverlay$: ringOverlay$, ringOverlay$: ringOverlay$,
leave$: leave$, leave$: leave$,
hangup: (): void => userHangup$.next(), hangup: (): void => userHangup$.next(),
join: localMembership.requestConnect, join: localMembership.requestJoinAndPublish,
toggleScreenSharing: toggleScreenSharing, toggleScreenSharing: toggleScreenSharing,
sharingScreen$: sharingScreen$, sharingScreen$: sharingScreen$,
@@ -1439,9 +1471,17 @@ export function createCallViewModel$(
unhoverScreen: (): void => screenUnhover$.next(), unhoverScreen: (): void => screenUnhover$.next(),
fatalError$: scope.behavior( fatalError$: scope.behavior(
localMembership.connectionState.livekit$.pipe( errors$.pipe(
filter((v) => v.state === RTCBackendState.Error), map((errors) => {
map((s) => s.error), logger.debug("errors$ to compute any fatal errors:", errors);
return (
errors?.transportError ??
errors?.matrixError ??
errors?.connectionError ??
null
);
}),
filter((error) => error !== null),
), ),
null, null,
), ),
@@ -1474,7 +1514,7 @@ export function createCallViewModel$(
showFooter$: showFooter$, showFooter$: showFooter$,
earpieceMode$: earpieceMode$, earpieceMode$: earpieceMode$,
audioOutputSwitcher$: audioOutputSwitcher$, audioOutputSwitcher$: audioOutputSwitcher$,
reconnecting$: reconnecting$, reconnecting$: localMembership.reconnecting$,
}; };
} }

View File

@@ -97,106 +97,106 @@ describe("createHomeserverConnected$", () => {
// LLM generated test cases. They are a bit overkill but I improved the mocking so it is // LLM generated test cases. They are a bit overkill but I improved the mocking so it is
// easy enough to read them so I think they can stay. // easy enough to read them so I think they can stay.
it("is false when sync state is not Syncing", () => { it("is false when sync state is not Syncing", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session); const hsConnected = createHomeserverConnected$(scope, client, session);
expect(hsConnected$.value).toBe(false); expect(hsConnected.combined$.value).toBe(false);
}); });
it("remains false while membership status is not Connected even if sync is Syncing", () => { it("remains false while membership status is not Connected even if sync is Syncing", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session); const hsConnected = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing); client.setSyncState(SyncState.Syncing);
expect(hsConnected$.value).toBe(false); // membership still disconnected expect(hsConnected.combined$.value).toBe(false); // membership still disconnected
}); });
it("is false when membership status transitions to Connected but ProbablyLeft is true", () => { it("is false when membership status transitions to Connected but ProbablyLeft is true", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session); const hsConnected = createHomeserverConnected$(scope, client, session);
// Make sync loop OK // Make sync loop OK
client.setSyncState(SyncState.Syncing); client.setSyncState(SyncState.Syncing);
// Indicate probable leave before connection // Indicate probable leave before connection
session.setProbablyLeft(true); session.setProbablyLeft(true);
session.setMembershipStatus(Status.Connected); session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(false); expect(hsConnected.combined$.value).toBe(false);
}); });
it("becomes true only when all three conditions are satisfied", () => { it("becomes true only when all three conditions are satisfied", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session); const hsConnected = createHomeserverConnected$(scope, client, session);
// 1. Sync loop connected // 1. Sync loop connected
client.setSyncState(SyncState.Syncing); client.setSyncState(SyncState.Syncing);
expect(hsConnected$.value).toBe(false); // not yet membership connected expect(hsConnected.combined$.value).toBe(false); // not yet membership connected
// 2. Membership connected // 2. Membership connected
session.setMembershipStatus(Status.Connected); session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true); // probablyLeft is false expect(hsConnected.combined$.value).toBe(true); // probablyLeft is false
}); });
it("drops back to false when sync loop leaves Syncing", () => { it("drops back to false when sync loop leaves Syncing", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session); const hsConnected = createHomeserverConnected$(scope, client, session);
// Reach connected state // Reach connected state
client.setSyncState(SyncState.Syncing); client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected); session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true); expect(hsConnected.combined$.value).toBe(true);
// Sync loop error => should flip false // Sync loop error => should flip false
client.setSyncState(SyncState.Error); client.setSyncState(SyncState.Error);
expect(hsConnected$.value).toBe(false); expect(hsConnected.combined$.value).toBe(false);
}); });
it("drops back to false when membership status becomes disconnected", () => { it("drops back to false when membership status becomes disconnected", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session); const hsConnected = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing); client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected); session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true); expect(hsConnected.combined$.value).toBe(true);
session.setMembershipStatus(Status.Disconnected); session.setMembershipStatus(Status.Disconnected);
expect(hsConnected$.value).toBe(false); expect(hsConnected.combined$.value).toBe(false);
}); });
it("drops to false when ProbablyLeft is emitted after being true", () => { it("drops to false when ProbablyLeft is emitted after being true", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session); const hsConnected = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing); client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected); session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true); expect(hsConnected.combined$.value).toBe(true);
session.setProbablyLeft(true); session.setProbablyLeft(true);
expect(hsConnected$.value).toBe(false); expect(hsConnected.combined$.value).toBe(false);
}); });
it("recovers to true if ProbablyLeft becomes false again while other conditions remain true", () => { it("recovers to true if ProbablyLeft becomes false again while other conditions remain true", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session); const hsConnected = createHomeserverConnected$(scope, client, session);
client.setSyncState(SyncState.Syncing); client.setSyncState(SyncState.Syncing);
session.setMembershipStatus(Status.Connected); session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true); expect(hsConnected.combined$.value).toBe(true);
session.setProbablyLeft(true); session.setProbablyLeft(true);
expect(hsConnected$.value).toBe(false); expect(hsConnected.combined$.value).toBe(false);
// Simulate clearing the flag (in realistic scenario membership manager would update) // Simulate clearing the flag (in realistic scenario membership manager would update)
session.setProbablyLeft(false); session.setProbablyLeft(false);
expect(hsConnected$.value).toBe(true); expect(hsConnected.combined$.value).toBe(true);
}); });
it("composite sequence reflects each individual failure reason", () => { it("composite sequence reflects each individual failure reason", () => {
const hsConnected$ = createHomeserverConnected$(scope, client, session); const hsConnected = createHomeserverConnected$(scope, client, session);
// Initially false (sync error + disconnected + not probably left) // Initially false (sync error + disconnected + not probably left)
expect(hsConnected$.value).toBe(false); expect(hsConnected.combined$.value).toBe(false);
// Fix sync only // Fix sync only
client.setSyncState(SyncState.Syncing); client.setSyncState(SyncState.Syncing);
expect(hsConnected$.value).toBe(false); expect(hsConnected.combined$.value).toBe(false);
// Fix membership // Fix membership
session.setMembershipStatus(Status.Connected); session.setMembershipStatus(Status.Connected);
expect(hsConnected$.value).toBe(true); expect(hsConnected.combined$.value).toBe(true);
// Introduce probablyLeft -> false // Introduce probablyLeft -> false
session.setProbablyLeft(true); session.setProbablyLeft(true);
expect(hsConnected$.value).toBe(false); expect(hsConnected.combined$.value).toBe(false);
// Restore notProbablyLeft -> true again // Restore notProbablyLeft -> true again
session.setProbablyLeft(false); session.setProbablyLeft(false);
expect(hsConnected$.value).toBe(true); expect(hsConnected.combined$.value).toBe(true);
// Drop sync -> false // Drop sync -> false
client.setSyncState(SyncState.Error); client.setSyncState(SyncState.Error);
expect(hsConnected$.value).toBe(false); expect(hsConnected.combined$.value).toBe(false);
}); });
}); });

View File

@@ -25,6 +25,11 @@ import { type NodeStyleEventEmitter } from "../../../utils/test";
*/ */
const logger = rootLogger.getChild("[HomeserverConnected]"); const logger = rootLogger.getChild("[HomeserverConnected]");
export interface HomeserverConnected {
combined$: Behavior<boolean>;
rtsSession$: Behavior<Status>;
}
/** /**
* Behavior representing whether we consider ourselves connected to the Matrix homeserver * Behavior representing whether we consider ourselves connected to the Matrix homeserver
* for the purposes of a MatrixRTC session. * for the purposes of a MatrixRTC session.
@@ -39,7 +44,7 @@ export function createHomeserverConnected$(
client: NodeStyleEventEmitter & Pick<MatrixClient, "getSyncState">, client: NodeStyleEventEmitter & Pick<MatrixClient, "getSyncState">,
matrixRTCSession: NodeStyleEventEmitter & matrixRTCSession: NodeStyleEventEmitter &
Pick<MatrixRTCSession, "membershipStatus" | "probablyLeft">, Pick<MatrixRTCSession, "membershipStatus" | "probablyLeft">,
): Behavior<boolean> { ): HomeserverConnected {
const syncing$ = ( const syncing$ = (
fromEvent(client, ClientEvent.Sync) as Observable<[SyncState]> fromEvent(client, ClientEvent.Sync) as Observable<[SyncState]>
).pipe( ).pipe(
@@ -47,12 +52,15 @@ export function createHomeserverConnected$(
map(([state]) => state === SyncState.Syncing), map(([state]) => state === SyncState.Syncing),
); );
const membershipConnected$ = fromEvent( const rtsSession$ = scope.behavior<Status>(
matrixRTCSession, fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe(
MembershipManagerEvent.StatusChanged, map(() => matrixRTCSession.membershipStatus ?? Status.Unknown),
).pipe( ),
startWith(null), Status.Unknown,
map(() => matrixRTCSession.membershipStatus === Status.Connected), );
const membershipConnected$ = rtsSession$.pipe(
map((status) => status === Status.Connected),
); );
// This is basically notProbablyLeft$ // This is basically notProbablyLeft$
@@ -71,15 +79,13 @@ export function createHomeserverConnected$(
map(() => matrixRTCSession.probablyLeft !== true), map(() => matrixRTCSession.probablyLeft !== true),
); );
const connectedCombined$ = and$( const combined$ = scope.behavior(
syncing$, and$(syncing$, membershipConnected$, certainlyConnected$).pipe(
membershipConnected$,
certainlyConnected$,
).pipe(
tap((connected) => { tap((connected) => {
logger.info(`Homeserver connected update: ${connected}`); logger.info(`Homeserver connected update: ${connected}`);
}), }),
),
); );
return scope.behavior(connectedCombined$); return { combined$, rtsSession$ };
} }

View File

@@ -7,6 +7,7 @@ Please see LICENSE in the repository root for full details.
*/ */
import { import {
Status as RTCMemberStatus,
type LivekitTransport, type LivekitTransport,
type MatrixRTCSession, type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
@@ -14,11 +15,7 @@ import { describe, expect, it, vi } from "vitest";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { BehaviorSubject, map, of } from "rxjs"; import { BehaviorSubject, map, of } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { import { type LocalParticipant, type LocalTrack } from "livekit-client";
ConnectionState as LivekitConnectionState,
type LocalParticipant,
type LocalTrack,
} from "livekit-client";
import { MatrixRTCMode } from "../../../settings/settings"; import { MatrixRTCMode } from "../../../settings/settings";
import { import {
@@ -29,15 +26,17 @@ import {
withTestScheduler, withTestScheduler,
} from "../../../utils/test"; } from "../../../utils/test";
import { import {
TransportState,
createLocalMembership$, createLocalMembership$,
enterRTCSession, enterRTCSession,
RTCBackendState, PublishState,
} from "./LocalMembership"; TrackState,
} from "./LocalMember";
import { MatrixRTCTransportMissingError } from "../../../utils/errors"; import { MatrixRTCTransportMissingError } from "../../../utils/errors";
import { Epoch, ObservableScope } from "../../ObservableScope"; import { Epoch, ObservableScope } from "../../ObservableScope";
import { constant } from "../../Behavior"; import { constant } from "../../Behavior";
import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager";
import { type Connection } from "../remoteMembers/Connection"; import { ConnectionState, type Connection } from "../remoteMembers/Connection";
import { type Publisher } from "./Publisher"; import { type Publisher } from "./Publisher";
const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy;
@@ -51,7 +50,7 @@ vi.mock("@livekit/components-core", () => ({
describe("LocalMembership", () => { describe("LocalMembership", () => {
describe("enterRTCSession", () => { describe("enterRTCSession", () => {
it("It joins the correct Session", async () => { it("It joins the correct Session", () => {
const focusFromOlderMembership = { const focusFromOlderMembership = {
type: "livekit", type: "livekit",
livekit_service_url: "http://my-oldest-member-service-url.com", livekit_service_url: "http://my-oldest-member-service-url.com",
@@ -107,7 +106,7 @@ describe("LocalMembership", () => {
joinRoomSession: vi.fn(), joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession; }) as unknown as MatrixRTCSession;
await enterRTCSession( enterRTCSession(
mockedSession, mockedSession,
{ {
livekit_alias: "roomId", livekit_alias: "roomId",
@@ -136,7 +135,7 @@ describe("LocalMembership", () => {
); );
}); });
it("It should not fail with configuration error if homeserver config has livekit url but not fallback", async () => { it("It should not fail with configuration error if homeserver config has livekit url but not fallback", () => {
mockConfig({}); mockConfig({});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({ vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({
"org.matrix.msc4143.rtc_foci": [ "org.matrix.msc4143.rtc_foci": [
@@ -165,7 +164,7 @@ describe("LocalMembership", () => {
joinRoomSession: vi.fn(), joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession; }) as unknown as MatrixRTCSession;
await enterRTCSession( enterRTCSession(
mockedSession, mockedSession,
{ {
livekit_alias: "roomId", livekit_alias: "roomId",
@@ -190,7 +189,6 @@ describe("LocalMembership", () => {
leaveRoomSession: () => {}, leaveRoomSession: () => {},
} as unknown as MatrixRTCSession, } as unknown as MatrixRTCSession,
muteStates: mockMuteStates(), muteStates: mockMuteStates(),
isHomeserverConnected: constant(true),
trackProcessorState$: constant({ trackProcessorState$: constant({
supported: false, supported: false,
processor: undefined, processor: undefined,
@@ -198,20 +196,20 @@ describe("LocalMembership", () => {
logger: logger, logger: logger,
createPublisherFactory: vi.fn(), createPublisherFactory: vi.fn(),
joinMatrixRTC: async (): Promise<void> => {}, joinMatrixRTC: async (): Promise<void> => {},
homeserverConnected$: constant(true), homeserverConnected: {
combined$: constant(true),
rtsSession$: constant(RTCMemberStatus.Connected),
},
}; };
it("throws error on missing RTC config error", () => { it("throws error on missing RTC config error", () => {
withTestScheduler(({ scope, hot, expectObservable }) => { withTestScheduler(({ scope, hot, expectObservable }) => {
const goodTransport = { const localTransport$ = scope.behavior<null | LivekitTransport>(
livekit_service_url: "other",
} as LivekitTransport;
const localTransport$ = scope.behavior<LivekitTransport>(
hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")),
goodTransport, null,
); );
// we do not need any connection data since we want to fail before reaching that.
const mockConnectionManager = { const mockConnectionManager = {
transports$: scope.behavior( transports$: scope.behavior(
localTransport$.pipe(map((t) => new Epoch([t]))), localTransport$.pipe(map((t) => new Epoch([t]))),
@@ -227,15 +225,11 @@ describe("LocalMembership", () => {
connectionManager: mockConnectionManager, connectionManager: mockConnectionManager,
localTransport$, localTransport$,
}); });
localMembership.requestJoinAndPublish();
expectObservable(localMembership.connectionState.livekit$).toBe("ne", { expectObservable(localMembership.localMemberState$).toBe("ne", {
n: { state: RTCBackendState.WaitingForConnection }, n: TransportState.Waiting,
e: { e: expect.toSatisfy((e) => e instanceof MatrixRTCTransportMissingError),
state: RTCBackendState.Error,
error: expect.toSatisfy(
(e) => e instanceof MatrixRTCTransportMissingError,
),
},
}); });
}); });
}); });
@@ -247,34 +241,24 @@ describe("LocalMembership", () => {
livekit_service_url: "b", livekit_service_url: "b",
} as LivekitTransport; } as LivekitTransport;
const connectionManagerData = new ConnectionManagerData(); const connectionTransportAConnected = {
connectionManagerData.add(
{
livekitRoom: mockLivekitRoom({ livekitRoom: mockLivekitRoom({
localParticipant: { localParticipant: {
isScreenShareEnabled: false, isScreenShareEnabled: false,
trackPublications: [], trackPublications: [],
} as unknown as LocalParticipant, } as unknown as LocalParticipant,
}), }),
state$: constant({ state$: constant(ConnectionState.LivekitConnected),
state: "ConnectedToLkRoom",
livekitConnectionState$: constant(LivekitConnectionState.Connected),
}),
transport: aTransport, transport: aTransport,
} as unknown as Connection, } as unknown as Connection;
[], const connectionTransportAConnecting = {
); ...connectionTransportAConnected,
connectionManagerData.add( state$: constant(ConnectionState.LivekitConnecting),
{ } as unknown as Connection;
state$: constant({ const connectionTransportBConnected = {
state: "ConnectedToLkRoom", state$: constant(ConnectionState.LivekitConnected),
}),
transport: bTransport, transport: bTransport,
livekitRoom: mockLivekitRoom({}), } as unknown as Connection;
} as unknown as Connection,
[],
);
it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => { it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => {
const scope = new ObservableScope(); const scope = new ObservableScope();
@@ -303,6 +287,9 @@ describe("LocalMembership", () => {
typeof vi.fn typeof vi.fn
>; >;
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(connectionTransportAConnected, []);
connectionManagerData.add(connectionTransportBConnected, []);
createLocalMembership$({ createLocalMembership$({
scope, scope,
...defaultCreateLocalMemberValues, ...defaultCreateLocalMemberValues,
@@ -362,6 +349,9 @@ describe("LocalMembership", () => {
typeof vi.fn typeof vi.fn
>; >;
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(connectionTransportAConnected, []);
// connectionManagerData.add(connectionTransportB, []);
const localMembership = createLocalMembership$({ const localMembership = createLocalMembership$({
scope, scope,
...defaultCreateLocalMemberValues, ...defaultCreateLocalMemberValues,
@@ -390,10 +380,11 @@ describe("LocalMembership", () => {
it("tracks livekit state correctly", async () => { it("tracks livekit state correctly", async () => {
const scope = new ObservableScope(); const scope = new ObservableScope();
const connectionManagerData = new ConnectionManagerData();
const localTransport$ = new BehaviorSubject<null | LivekitTransport>(null); const localTransport$ = new BehaviorSubject<null | LivekitTransport>(null);
const connectionManagerData$ = new BehaviorSubject< const connectionManagerData$ = new BehaviorSubject(
Epoch<ConnectionManagerData> new Epoch(connectionManagerData),
>(new Epoch(new ConnectionManagerData())); );
const publishers: Publisher[] = []; const publishers: Publisher[] = [];
const tracks$ = new BehaviorSubject<LocalTrack[]>([]); const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
@@ -439,19 +430,45 @@ describe("LocalMembership", () => {
}); });
await flushPromises(); await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(localMembership.localMemberState$.value).toStrictEqual(
state: RTCBackendState.WaitingForTransport, TransportState.Waiting,
}); );
localTransport$.next(aTransport); localTransport$.next(aTransport);
await flushPromises(); await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(localMembership.localMemberState$.value).toStrictEqual({
state: RTCBackendState.WaitingForConnection, matrix: RTCMemberStatus.Connected,
media: { connection: null, tracks: TrackState.WaitingForUser },
}); });
connectionManagerData$.next(new Epoch(connectionManagerData));
const connectionManagerData2 = new ConnectionManagerData();
connectionManagerData2.add(
// clone because we will mutate this later.
{ ...connectionTransportAConnecting } as unknown as Connection,
[],
);
connectionManagerData$.next(new Epoch(connectionManagerData2));
await flushPromises(); await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(localMembership.localMemberState$.value).toStrictEqual({
state: RTCBackendState.Initialized, matrix: RTCMemberStatus.Connected,
media: {
connection: ConnectionState.LivekitConnecting,
tracks: TrackState.WaitingForUser,
},
}); });
(
connectionManagerData2.getConnectionForTransport(aTransport)!
.state$ as BehaviorSubject<ConnectionState>
).next(ConnectionState.LivekitConnected);
expect(localMembership.localMemberState$.value).toStrictEqual({
matrix: RTCMemberStatus.Connected,
media: {
connection: ConnectionState.LivekitConnected,
tracks: TrackState.WaitingForUser,
},
});
expect(publisherFactory).toHaveBeenCalledOnce(); expect(publisherFactory).toHaveBeenCalledOnce();
// expect(localMembership.tracks$.value.length).toBe(0); // expect(localMembership.tracks$.value.length).toBe(0);
@@ -460,37 +477,46 @@ describe("LocalMembership", () => {
// ------- // -------
await flushPromises(); await flushPromises();
// expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(localMembership.localMemberState$.value).toStrictEqual({
// state: RTCBackendState.CreatingTracks, matrix: RTCMemberStatus.Connected,
// }); media: {
tracks: TrackState.Creating,
connection: ConnectionState.LivekitConnected,
},
});
createTrackResolver.resolve(); createTrackResolver.resolve();
await flushPromises(); await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(
state: RTCBackendState.ReadyToPublish, // eslint-disable-next-line @typescript-eslint/no-explicit-any
}); (localMembership.localMemberState$.value as any).media,
).toStrictEqual(PublishState.WaitingForUser);
// ------- // -------
localMembership.requestConnect(); localMembership.requestJoinAndPublish();
// ------- // -------
// expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(
// state: RTCBackendState.WaitingToPublish, // eslint-disable-next-line @typescript-eslint/no-explicit-any
// }); (localMembership.localMemberState$.value as any).media,
).toStrictEqual(PublishState.Starting);
publishResolver.resolve(); publishResolver.resolve();
await flushPromises(); await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(
state: RTCBackendState.Connected, // eslint-disable-next-line @typescript-eslint/no-explicit-any
}); (localMembership.localMemberState$.value as any).media,
).toStrictEqual(PublishState.Publishing);
expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); expect(publishers[0].stopPublishing).not.toHaveBeenCalled();
expect(localMembership.connectionState.livekit$.isStopped).toBe(false); expect(localMembership.localMemberState$.isStopped).toBe(false);
scope.end(); scope.end();
await flushPromises(); await flushPromises();
// stays in connected state because it is stopped before the update to tracks update the state. // stays in connected state because it is stopped before the update to tracks update the state.
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(
state: RTCBackendState.Connected, // eslint-disable-next-line @typescript-eslint/no-explicit-any
}); (localMembership.localMemberState$.value as any).media,
).toStrictEqual(PublishState.Publishing);
// stop all tracks after ending scopes // stop all tracks after ending scopes
expect(publishers[0].stopPublishing).toHaveBeenCalled(); expect(publishers[0].stopPublishing).toHaveBeenCalled();
// expect(publishers[0].stopTracks).toHaveBeenCalled(); // expect(publishers[0].stopTracks).toHaveBeenCalled();

View File

@@ -16,6 +16,7 @@ import {
} from "livekit-client"; } from "livekit-client";
import { observeParticipantEvents } from "@livekit/components-core"; import { observeParticipantEvents } from "@livekit/components-core";
import { import {
Status as RTCSessionStatus,
type LivekitTransport, type LivekitTransport,
type MatrixRTCSession, type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
@@ -29,7 +30,7 @@ import {
map, map,
type Observable, type Observable,
of, of,
scan, pairwise,
startWith, startWith,
switchMap, switchMap,
tap, tap,
@@ -37,74 +38,67 @@ import {
import { type Logger } from "matrix-js-sdk/lib/logger"; import { type Logger } from "matrix-js-sdk/lib/logger";
import { deepCompare } from "matrix-js-sdk/lib/utils"; import { deepCompare } from "matrix-js-sdk/lib/utils";
import { constant, type Behavior } from "../../Behavior"; import { constant, type Behavior } from "../../Behavior.ts";
import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager.ts";
import { ObservableScope } from "../../ObservableScope"; import { type ObservableScope } from "../../ObservableScope.ts";
import { type Publisher } from "./Publisher"; import { type Publisher } from "./Publisher.ts";
import { type MuteStates } from "../../MuteStates"; import { type MuteStates } from "../../MuteStates.ts";
import { and$ } from "../../../utils/observable";
import { import {
ElementCallError, ElementCallError,
FailToStartLivekitConnection,
MembershipManagerError, MembershipManagerError,
UnknownCallError, UnknownCallError,
} from "../../../utils/errors"; } from "../../../utils/errors.ts";
import { ElementWidgetActions, widget } from "../../../widget"; import { ElementWidgetActions, widget } from "../../../widget.ts";
import { getUrlParams } from "../../../UrlParams.ts"; import { getUrlParams } from "../../../UrlParams.ts";
import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts";
import { MatrixRTCMode } from "../../../settings/settings.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts";
import { Config } from "../../../config/Config.ts"; import { Config } from "../../../config/Config.ts";
import { type Connection } from "../remoteMembers/Connection.ts"; import {
ConnectionState,
type Connection,
type FailedToStartError,
} from "../remoteMembers/Connection.ts";
import { type HomeserverConnected } from "./HomeserverConnected.ts";
import { and$ } from "../../../utils/observable.ts";
export enum RTCBackendState { export enum TransportState {
Error = "error",
/** Not even a transport is available to the LocalMembership */ /** Not even a transport is available to the LocalMembership */
WaitingForTransport = "waiting_for_transport", Waiting = "transport_waiting",
/** A connection appeared so we can initialise the publisher */
WaitingForConnection = "waiting_for_connection",
/** Connection and transport arrived, publisher Initialized */
Initialized = "Initialized",
// CreatingTracks = "creating_tracks",
ReadyToPublish = "ready_to_publish",
// WaitingToPublish = "waiting_to_publish",
Connected = "connected",
Disconnected = "disconnected",
Disconnecting = "disconnecting",
} }
type LocalMemberRtcBackendState = export enum PublishState {
| { state: RTCBackendState.Error; error: ElementCallError } WaitingForUser = "publish_waiting_for_user",
| { state: RTCBackendState.WaitingForTransport } /** Implies lk connection is connected */
| { state: RTCBackendState.WaitingForConnection } Starting = "publish_start_publishing",
| { state: RTCBackendState.Initialized } /** Implies lk connection is connected */
// | { state: RTCBackendState.CreatingTracks } Publishing = "publish_publishing",
| { state: RTCBackendState.ReadyToPublish }
// | { state: RTCBackendState.WaitingToPublish }
| { state: RTCBackendState.Connected }
| { state: RTCBackendState.Disconnected }
| { state: RTCBackendState.Disconnecting };
export enum MatrixState {
WaitingForTransport = "waiting_for_transport",
Ready = "ready",
Connecting = "connecting",
Connected = "connected",
Disconnected = "disconnected",
Error = "Error",
} }
type LocalMemberMatrixState = export enum TrackState {
| { state: MatrixState.Connected } /** The track is waiting for user input to create tracks (waiting to call `startTracks()`) */
| { state: MatrixState.WaitingForTransport } WaitingForUser = "tracks_waiting_for_user",
| { state: MatrixState.Ready } /** Implies lk connection is connected */
| { state: MatrixState.Connecting } Creating = "tracks_creating",
| { state: MatrixState.Disconnected } /** Implies lk connection is connected */
| { state: MatrixState.Error; error: Error }; Ready = "tracks_ready",
export interface LocalMemberConnectionState {
livekit$: Behavior<LocalMemberRtcBackendState>;
matrix$: Behavior<LocalMemberMatrixState>;
} }
export type LocalMemberMediaState =
| {
tracks: TrackState;
connection: ConnectionState | FailedToStartError;
}
| PublishState
| ElementCallError;
export type LocalMemberState =
| ElementCallError
| TransportState.Waiting
| {
media: LocalMemberMediaState;
matrix: ElementCallError | RTCSessionStatus;
};
/* /*
* - get well known * - get well known
* - get oldest membership * - get oldest membership
@@ -124,8 +118,8 @@ interface Props {
muteStates: MuteStates; muteStates: MuteStates;
connectionManager: IConnectionManager; connectionManager: IConnectionManager;
createPublisherFactory: (connection: Connection) => Publisher; createPublisherFactory: (connection: Connection) => Publisher;
joinMatrixRTC: (transport: LivekitTransport) => Promise<void>; joinMatrixRTC: (transport: LivekitTransport) => void;
homeserverConnected$: Behavior<boolean>; homeserverConnected: HomeserverConnected;
localTransport$: Behavior<LivekitTransport | null>; localTransport$: Behavior<LivekitTransport | null>;
matrixRTCSession: Pick< matrixRTCSession: Pick<
MatrixRTCSession, MatrixRTCSession,
@@ -151,7 +145,7 @@ export const createLocalMembership$ = ({
scope, scope,
connectionManager, connectionManager,
localTransport$: localTransportCanThrow$, localTransport$: localTransportCanThrow$,
homeserverConnected$, homeserverConnected,
createPublisherFactory, createPublisherFactory,
joinMatrixRTC, joinMatrixRTC,
logger: parentLogger, logger: parentLogger,
@@ -159,16 +153,16 @@ export const createLocalMembership$ = ({
matrixRTCSession, matrixRTCSession,
}: Props): { }: Props): {
/** /**
* This starts audio and video tracks. They will be reused when calling `requestConnect`. * This starts audio and video tracks. They will be reused when calling `requestPublish`.
*/ */
startTracks: () => Behavior<void>; startTracks: () => Behavior<void>;
/** /**
* This sets a inner state (shouldConnect) to true and instructs the js-sdk and livekit to keep the user * This sets a inner state (shouldPublish) to true and instructs the js-sdk and livekit to keep the user
* connected to matrix and livekit. * connected to matrix and livekit.
*/ */
requestConnect: () => void; requestJoinAndPublish: () => void;
requestDisconnect: () => void; requestDisconnect: () => void;
connectionState: LocalMemberConnectionState; localMemberState$: Behavior<LocalMemberState>;
sharingScreen$: Behavior<boolean>; sharingScreen$: Behavior<boolean>;
/** /**
* Callback to toggle screen sharing. If null, screen sharing is not possible. * Callback to toggle screen sharing. If null, screen sharing is not possible.
@@ -177,10 +171,14 @@ export const createLocalMembership$ = ({
// tracks$: Behavior<LocalTrack[]>; // tracks$: Behavior<LocalTrack[]>;
participant$: Behavior<LocalParticipant | null>; participant$: Behavior<LocalParticipant | null>;
connection$: Behavior<Connection | null>; connection$: Behavior<Connection | null>;
homeserverConnected$: Behavior<boolean>; /** Shorthand for homeserverConnected.rtcSession === Status.Reconnecting
// this needs to be discussed * Direct translation to the js-sdk membership manager connection `Status`.
/** @deprecated use state instead*/ */
reconnecting$: Behavior<boolean>; reconnecting$: Behavior<boolean>;
/** Shorthand for homeserverConnected.rtcSession === Status.Disconnected
* Direct translation to the js-sdk membership manager connection `Status`.
*/
disconnected$: Behavior<boolean>;
} => { } => {
const logger = parentLogger.getChild("[LocalMembership]"); const logger = parentLogger.getChild("[LocalMembership]");
logger.debug(`Creating local membership..`); logger.debug(`Creating local membership..`);
@@ -199,7 +197,7 @@ export const createLocalMembership$ = ({
: new Error("Unknown error from localTransport"), : new Error("Unknown error from localTransport"),
); );
} }
setLivekitError(error); setTransportError(error);
return of(null); return of(null);
}), }),
), ),
@@ -246,74 +244,21 @@ export const createLocalMembership$ = ({
mediaErrors$.pipe(scope.bind()).subscribe((error) => { mediaErrors$.pipe(scope.bind()).subscribe((error) => {
if (error) { if (error) {
logger.error(`Failed to create local tracks:`, error); logger.error(`Failed to create local tracks:`, error);
// TODO is it fatal? Do we need to create a new Specialized Error?
setMatrixError( setMatrixError(
// TODO is it fatal? Do we need to create a new Specialized Error?
new UnknownCallError(new Error(`Media device error: ${error}`)), new UnknownCallError(new Error(`Media device error: ${error}`)),
); );
} }
}); });
const localConnectionState$ = localConnection$.pipe(
switchMap((connection) => (connection ? connection.state$ : of(null))),
);
// /**
// * Whether we are "fully" connected to the call. Accounts for both the
// * connection to the MatrixRTC session and the LiveKit publish connection.
// */
const connected$ = scope.behavior(
and$(
homeserverConnected$.pipe(
tap((v) => logger.debug("matrix: Connected state changed", v)),
),
localConnectionState$.pipe(
switchMap((state) => {
logger.debug("livekit: Connected state changed", state);
if (!state) return of(false);
if (state.state === "ConnectedToLkRoom") {
logger.debug(
"livekit: Connected state changed (inner livekitConnectionState$)",
state.livekitConnectionState$.value,
);
return state.livekitConnectionState$.pipe(
map((lkState) => lkState === ConnectionState.Connected),
);
}
return of(false);
}),
),
).pipe(tap((v) => logger.debug("combined: Connected state changed", v))),
);
// MATRIX RELATED // MATRIX RELATED
// /**
// * Whether we should tell the user that we're reconnecting to the call.
// */
// DISCUSSION is there a better way to do this?
// sth that is more deriectly implied from the membership manager of the js sdk. (fromEvent(matrixRTCSession, Reconnecting)) ??? or similar
const reconnecting$ = scope.behavior(
connected$.pipe(
// We are reconnecting if we previously had some successful initial
// connection but are now disconnected
scan(
({ connectedPreviously }, connectedNow) => ({
connectedPreviously: connectedPreviously || connectedNow,
reconnecting: connectedPreviously && !connectedNow,
}),
{ connectedPreviously: false, reconnecting: false },
),
map(({ reconnecting }) => reconnecting),
),
);
// This should be used in a combineLatest with publisher$ to connect. // This should be used in a combineLatest with publisher$ to connect.
// to make it possible to call startTracks before the preferredTransport$ has resolved. // to make it possible to call startTracks before the preferredTransport$ has resolved.
const trackStartRequested = Promise.withResolvers<void>(); const trackStartRequested = Promise.withResolvers<void>();
// This should be used in a combineLatest with publisher$ to connect. // This should be used in a combineLatest with publisher$ to connect.
// to make it possible to call startTracks before the preferredTransport$ has resolved. // to make it possible to call startTracks before the preferredTransport$ has resolved.
const connectRequested$ = new BehaviorSubject(false); const joinAndPublishRequested$ = new BehaviorSubject(false);
/** /**
* The publisher is stored in here an abstracts creating and publishing tracks. * The publisher is stored in here an abstracts creating and publishing tracks.
@@ -334,13 +279,13 @@ export const createLocalMembership$ = ({
return constant(undefined); return constant(undefined);
}; };
const requestConnect = (): void => { const requestJoinAndPublish = (): void => {
trackStartRequested.resolve(); trackStartRequested.resolve();
connectRequested$.next(true); joinAndPublishRequested$.next(true);
}; };
const requestDisconnect = (): void => { const requestDisconnect = (): void => {
connectRequested$.next(false); joinAndPublishRequested$.next(false);
}; };
// Take care of the publisher$ // Take care of the publisher$
@@ -352,12 +297,14 @@ export const createLocalMembership$ = ({
// - overwrite current publisher // - overwrite current publisher
scope.reconcile(localConnection$, async (connection) => { scope.reconcile(localConnection$, async (connection) => {
if (connection !== null) { if (connection !== null) {
publisher$.next(createPublisherFactory(connection)); const publisher = createPublisherFactory(connection);
} publisher$.next(publisher);
// Clean-up callback
return Promise.resolve(async (): Promise<void> => { return Promise.resolve(async (): Promise<void> => {
await publisher$?.value?.stopPublishing(); await publisher.stopPublishing();
await publisher$?.value?.stopTracks(); publisher.stopTracks();
}); });
}
}); });
// Use reconcile here to not run concurrent createAndSetupTracks calls // Use reconcile here to not run concurrent createAndSetupTracks calls
@@ -381,13 +328,24 @@ export const createLocalMembership$ = ({
// Based on `connectRequested$` we start publishing tracks. (once they are there!) // Based on `connectRequested$` we start publishing tracks. (once they are there!)
scope.reconcile( scope.reconcile(
scope.behavior(combineLatest([publisher$, connectRequested$])), scope.behavior(
async ([publisher, shouldConnect]) => { combineLatest([publisher$, tracks$, joinAndPublishRequested$]),
if (shouldConnect) { ),
async ([publisher, tracks, shouldJoinAndPublish]) => {
if (shouldJoinAndPublish === publisher?.publishing$.value) return;
if (tracks.length !== 0 && shouldJoinAndPublish) {
try { try {
await publisher?.startPublishing(); await publisher?.startPublishing();
} catch (error) { } catch (error) {
setLivekitError(error as ElementCallError); const message =
error instanceof Error ? error.message : String(error);
setPublishError(new FailToStartLivekitConnection(message));
}
} else if (tracks.length !== 0 && !shouldJoinAndPublish) {
try {
await publisher?.stopPublishing();
} catch (error) {
setPublishError(new UnknownCallError(error as Error));
} }
} }
// XXX Why is that? // XXX Why is that?
@@ -401,15 +359,37 @@ export const createLocalMembership$ = ({
}, },
); );
const fatalLivekitError$ = new BehaviorSubject<ElementCallError | null>(null); // STATE COMPUTATION
const setLivekitError = (e: ElementCallError): void => {
if (fatalLivekitError$.value !== null) // These are non fatal since we can join a room and concume media even though publishing failed.
logger.error("Multiple Livkit Errors:", e); const publishError$ = new BehaviorSubject<ElementCallError | null>(null);
else fatalLivekitError$.next(e); const setPublishError = (e: ElementCallError): void => {
if (publishError$.value !== null) {
logger.error("Multiple Media Errors:", e);
} else {
publishError$.next(e);
}
}; };
const livekitState$: Behavior<LocalMemberRtcBackendState> = scope.behavior(
const fatalTransportError$ = new BehaviorSubject<ElementCallError | null>(
null,
);
const setTransportError = (e: ElementCallError): void => {
if (fatalTransportError$.value !== null) {
logger.error("Multiple Transport Errors:", e);
} else {
fatalTransportError$.next(e);
}
};
const localConnectionState$ = localConnection$.pipe(
switchMap((connection) => (connection ? connection.state$ : of(null))),
);
const mediaState$: Behavior<LocalMemberMediaState> = scope.behavior(
combineLatest([ combineLatest([
publisher$, localConnectionState$,
localTransport$, localTransport$,
// tracks$.pipe( // tracks$.pipe(
// tap((t) => { // tap((t) => {
@@ -418,104 +398,166 @@ export const createLocalMembership$ = ({
// ), // ),
// publishing$, // publishing$,
connectRequested$, connectRequested$,
tracks$,
publishing$,
joinAndPublishRequested$,
from(trackStartRequested.promise).pipe( from(trackStartRequested.promise).pipe(
map(() => true), map(() => true),
startWith(false), startWith(false),
), ),
fatalLivekitError$,
]).pipe( ]).pipe(
map( map(
([ ([
publisher, localConnectionState,
localTransport, localTransport,
// tracks, tracks,
// publishing, publishing,
shouldConnect, shouldPublish,
shouldStartTracks, shouldStartTracks,
error,
]) => { ]) => {
// read this: if (!localTransport) return null;
// if(!<A>) return {state: ...} const hasTracks = tracks.length > 0;
// if(!<B>) return {state: <MyState>} let trackState: TrackState = TrackState.WaitingForUser;
// if (hasTracks && shouldStartTracks) trackState = TrackState.Ready;
// as: if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating;
// We do have <A> but not yet <B> so we are in <MyState>
if (error !== null) return { state: RTCBackendState.Error, error }; if (
// const hasTracks = tracks.length > 0; localConnectionState !== ConnectionState.LivekitConnected ||
if (!localTransport) trackState !== TrackState.Ready
return { state: RTCBackendState.WaitingForTransport }; )
if (!publisher) return {
return { state: RTCBackendState.WaitingForConnection }; connection: localConnectionState,
if (!shouldStartTracks) return { state: RTCBackendState.Initialized }; tracks: trackState,
// if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; };
if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; if (!shouldPublish) return PublishState.WaitingForUser;
// if (!publishing) return { state: RTCBackendState.WaitingToPublish }; if (!publishing) return PublishState.Starting;
return { state: RTCBackendState.Connected }; return PublishState.Publishing;
}, },
), ),
distinctUntilChanged(deepCompare), distinctUntilChanged(deepCompare),
), ),
); );
const fatalMatrixError$ = new BehaviorSubject<ElementCallError | null>(null); const fatalMatrixError$ = new BehaviorSubject<ElementCallError | null>(null);
const setMatrixError = (e: ElementCallError): void => { const setMatrixError = (e: ElementCallError): void => {
if (fatalMatrixError$.value !== null) if (fatalMatrixError$.value !== null) {
logger.error("Multiple Matrix Errors:", e); logger.error("Multiple Matrix Errors:", e);
else fatalMatrixError$.next(e); } else {
fatalMatrixError$.next(e);
}
}; };
const matrixState$: Behavior<LocalMemberMatrixState> = scope.behavior(
const localMemberState$ = scope.behavior<LocalMemberState>(
combineLatest([ combineLatest([
localTransport$, mediaState$,
connectRequested$, homeserverConnected.rtsSession$,
homeserverConnected$, fatalMatrixError$,
fatalTransportError$,
publishError$,
]).pipe( ]).pipe(
map(([localTransport, connectRequested, homeserverConnected]) => { map(
if (!localTransport) return { state: MatrixState.WaitingForTransport }; ([
if (!connectRequested) return { state: MatrixState.Ready }; mediaState,
if (!homeserverConnected) return { state: MatrixState.Connecting }; rtcSessionStatus,
return { state: MatrixState.Connected }; fatalMatrixError,
}), fatalTransportError,
publishError,
]) => {
if (fatalTransportError !== null) return fatalTransportError;
// `mediaState` will be 'null' until the transport/connection appears.
if (mediaState && rtcSessionStatus)
return {
matrix: fatalMatrixError ?? rtcSessionStatus,
media: publishError ?? mediaState,
};
return TransportState.Waiting;
},
),
), ),
); );
// Keep matrix rtc session in sync with localTransport$, connectRequested$ and muteStates.video.enabled$ /**
* Whether we are "fully" connected to the call. Accounts for both the
* connection to the MatrixRTC session and the LiveKit publish connection.
*/
const matrixAndLivekitConnected$ = scope.behavior(
and$(
homeserverConnected.combined$,
localConnectionState$.pipe(
map((state) => state === ConnectionState.LivekitConnected),
),
).pipe(
tap((v) => logger.debug("livekit+matrix: Connected state changed", v)),
),
);
/**
* Whether we should tell the user that we're reconnecting to the call.
*/
const reconnecting$ = scope.behavior(
matrixAndLivekitConnected$.pipe(
pairwise(),
map(([prev, current]) => prev === true && current === false),
),
false,
);
// inform the widget about the connect and disconnect intent from the user.
scope
.behavior(joinAndPublishRequested$.pipe(pairwise(), scope.bind()), [
undefined,
joinAndPublishRequested$.value,
])
.subscribe(([prev, current]) => {
if (!widget) return;
// JOIN prev=false (was left) => current-true (now joiend)
if (!prev && current) {
widget.api.transport
.send(ElementWidgetActions.JoinCall, {})
.catch((e) => {
logger.error("Failed to send join action", e);
});
}
// LEAVE prev=false (was joined) => current-true (now left)
if (prev && !current) {
widget.api.transport
.send(ElementWidgetActions.HangupCall, {})
.catch((e) => {
logger.error("Failed to send hangup action", e);
});
}
});
combineLatest([muteStates.video.enabled$, homeserverConnected.combined$])
.pipe(scope.bind())
.subscribe(([videoEnabled, connected]) => {
if (!connected) return;
void matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio");
});
// Keep matrix rtc session in sync with localTransport$, connectRequested$
scope.reconcile( scope.reconcile(
scope.behavior(combineLatest([localTransport$, connectRequested$])), scope.behavior(combineLatest([localTransport$, joinAndPublishRequested$])),
async ([transport, shouldConnect]) => { async ([transport, shouldConnect]) => {
if (!transport) return;
// if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration.
if (!shouldConnect) return; if (!shouldConnect) return;
if (!transport) return;
try { try {
await joinMatrixRTC(transport); joinMatrixRTC(transport);
} catch (error) { } catch (error) {
logger.error("Error entering RTC session", error); logger.error("Error entering RTC session", error);
if (error instanceof Error) if (error instanceof Error)
setMatrixError(new MembershipManagerError(error)); setMatrixError(new MembershipManagerError(error));
} }
// Update our member event when our mute state changes. return Promise.resolve(async (): Promise<void> => {
const callIntentScope = new ObservableScope();
// because this uses its own scope, we can start another reconciliation for the duration of one connection.
callIntentScope.reconcile(
muteStates.video.enabled$,
async (videoEnabled) =>
matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"),
);
return async (): Promise<void> => {
callIntentScope.end();
try { try {
// Update matrixRTCSession to allow udpating the transport without leaving the session! // TODO Update matrixRTCSession to allow udpating the transport without leaving the session!
await matrixRTCSession.leaveRoomSession(); await matrixRTCSession.leaveRoomSession(1000);
} catch (e) { } catch (e) {
logger.error("Error leaving RTC session", e); logger.error("Error leaving RTC session", e);
} }
try { });
await widget?.api.transport.send(ElementWidgetActions.HangupCall, {});
} catch (e) {
logger.error("Failed to send hangup action", e);
}
};
}, },
); );
@@ -530,7 +572,7 @@ export const createLocalMembership$ = ({
// pause tracks during the initial joining sequence too until we're sure // pause tracks during the initial joining sequence too until we're sure
// that our own media is displayed on screen. // that our own media is displayed on screen.
// TODO refactor this based no livekitState$ // TODO refactor this based no livekitState$
combineLatest([participant$, homeserverConnected$]) combineLatest([participant$, homeserverConnected.combined$])
.pipe(scope.bind()) .pipe(scope.bind())
.subscribe(([participant, connected]) => { .subscribe(([participant, connected]) => {
if (!participant) return; if (!participant) return;
@@ -615,16 +657,17 @@ export const createLocalMembership$ = ({
return { return {
startTracks, startTracks,
requestConnect, requestJoinAndPublish,
requestDisconnect, requestDisconnect,
connectionState: { localMemberState$,
livekit$: livekitState$, tracks$,
matrix$: matrixState$,
},
// tracks$,
participant$, participant$,
homeserverConnected$,
reconnecting$, reconnecting$,
disconnected$: scope.behavior(
homeserverConnected.rtsSession$.pipe(
map((state) => state === RTCSessionStatus.Disconnected),
),
),
sharingScreen$, sharingScreen$,
toggleScreenSharing, toggleScreenSharing,
connection$: localConnection$, connection$: localConnection$,
@@ -659,11 +702,11 @@ interface EnterRTCSessionOptions {
* @throws If the widget could not send ElementWidgetActions.JoinCall action. * @throws If the widget could not send ElementWidgetActions.JoinCall action.
*/ */
// Exported for unit testing // Exported for unit testing
export async function enterRTCSession( export function enterRTCSession(
rtcSession: MatrixRTCSession, rtcSession: MatrixRTCSession,
transport: LivekitTransport, transport: LivekitTransport,
{ encryptMedia, matrixRTCMode }: EnterRTCSessionOptions, { encryptMedia, matrixRTCMode }: EnterRTCSessionOptions,
): Promise<void> { ): void {
PosthogAnalytics.instance.eventCallEnded.cacheStartCall(new Date()); PosthogAnalytics.instance.eventCallEnded.cacheStartCall(new Date());
PosthogAnalytics.instance.eventCallStarted.track(rtcSession.room.roomId); PosthogAnalytics.instance.eventCallStarted.track(rtcSession.room.roomId);
@@ -702,7 +745,4 @@ export async function enterRTCSession(
unstableSendStickyEvents: matrixRTCMode === MatrixRTCMode.Matrix_2_0, unstableSendStickyEvents: matrixRTCMode === MatrixRTCMode.Matrix_2_0,
}, },
); );
if (widget) {
await widget.api.transport.send(ElementWidgetActions.JoinCall, {});
}
} }

View File

@@ -85,7 +85,7 @@ export const createLocalTransport$ = ({
* The transport that we would personally prefer to publish on (if not for the * The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps). * transport preferences of others, perhaps).
* *
* @throws * @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/ */
const preferredTransport$: Behavior<LivekitTransport | null> = scope.behavior( const preferredTransport$: Behavior<LivekitTransport | null> = scope.behavior(
customLivekitUrl.value$.pipe( customLivekitUrl.value$.pipe(

View File

@@ -30,13 +30,16 @@ import { logger } from "matrix-js-sdk/lib/logger";
import type { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import type { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { import {
Connection, Connection,
ConnectionState,
type ConnectionOpts, type ConnectionOpts,
type ConnectionState,
type PublishingParticipant, type PublishingParticipant,
} from "./Connection.ts"; } from "./Connection.ts";
import { ObservableScope } from "../../ObservableScope.ts"; import { ObservableScope } from "../../ObservableScope.ts";
import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts"; import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts";
import { FailToGetOpenIdToken } from "../../../utils/errors.ts"; import {
ElementCallError,
FailToGetOpenIdToken,
} from "../../../utils/errors.ts";
let testScope: ObservableScope; let testScope: ObservableScope;
@@ -47,11 +50,6 @@ let fakeLivekitRoom: MockedObject<LivekitRoom>;
let localParticipantEventEmiter: EventEmitter; let localParticipantEventEmiter: EventEmitter;
let fakeLocalParticipant: MockedObject<LocalParticipant>; let fakeLocalParticipant: MockedObject<LocalParticipant>;
let fakeRoomEventEmiter: EventEmitter;
// let fakeMembershipsFocusMap$: BehaviorSubject<
// { membership: CallMembership; transport: LivekitTransport }[]
// >;
const livekitFocus: LivekitTransport = { const livekitFocus: LivekitTransport = {
livekit_alias: "!roomID:example.org", livekit_alias: "!roomID:example.org",
livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt", livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt",
@@ -88,22 +86,25 @@ function setupTest(): void {
localParticipantEventEmiter, localParticipantEventEmiter,
), ),
} as unknown as LocalParticipant); } as unknown as LocalParticipant);
fakeRoomEventEmiter = new EventEmitter();
const fakeRoomEventEmitter = new EventEmitter();
fakeLivekitRoom = vi.mocked<LivekitRoom>({ fakeLivekitRoom = vi.mocked<LivekitRoom>({
connect: vi.fn(), connect: vi.fn(),
disconnect: vi.fn(), disconnect: vi.fn(),
remoteParticipants: new Map(), remoteParticipants: new Map(),
localParticipant: fakeLocalParticipant, localParticipant: fakeLocalParticipant,
state: LivekitConnectionState.Disconnected, state: LivekitConnectionState.Disconnected,
on: fakeRoomEventEmiter.on.bind(fakeRoomEventEmiter), on: fakeRoomEventEmitter.on.bind(fakeRoomEventEmitter),
off: fakeRoomEventEmiter.off.bind(fakeRoomEventEmiter), off: fakeRoomEventEmitter.off.bind(fakeRoomEventEmitter),
addListener: fakeRoomEventEmiter.addListener.bind(fakeRoomEventEmiter), addListener: fakeRoomEventEmitter.addListener.bind(fakeRoomEventEmitter),
removeListener: removeListener:
fakeRoomEventEmiter.removeListener.bind(fakeRoomEventEmiter), fakeRoomEventEmitter.removeListener.bind(fakeRoomEventEmitter),
removeAllListeners: removeAllListeners:
fakeRoomEventEmiter.removeAllListeners.bind(fakeRoomEventEmiter), fakeRoomEventEmitter.removeAllListeners.bind(fakeRoomEventEmitter),
setE2EEEnabled: vi.fn().mockResolvedValue(undefined), setE2EEEnabled: vi.fn().mockResolvedValue(undefined),
emit: (eventName: string | symbol, ...args: unknown[]) => {
fakeRoomEventEmitter.emit(eventName, ...args);
},
} as unknown as LivekitRoom); } as unknown as LivekitRoom);
} }
@@ -125,7 +126,16 @@ function setupRemoteConnection(): Connection {
}; };
}); });
fakeLivekitRoom.connect.mockResolvedValue(undefined); fakeLivekitRoom.connect.mockImplementation(async (): Promise<void> => {
const changeEv = RoomEvent.ConnectionStateChanged;
fakeLivekitRoom.state = LivekitConnectionState.Connecting;
fakeLivekitRoom.emit(changeEv, fakeLivekitRoom.state);
fakeLivekitRoom.state = LivekitConnectionState.Connected;
fakeLivekitRoom.emit(changeEv, fakeLivekitRoom.state);
return Promise.resolve();
});
return new Connection(opts, logger); return new Connection(opts, logger);
} }
@@ -148,7 +158,7 @@ describe("Start connection states", () => {
}; };
const connection = new Connection(opts, logger); const connection = new Connection(opts, logger);
expect(connection.state$.getValue().state).toEqual("Initialized"); expect(connection.state$.getValue()).toEqual("Initialized");
}); });
it("fail to getOpenId token then error state", async () => { it("fail to getOpenId token then error state", async () => {
@@ -164,7 +174,7 @@ describe("Start connection states", () => {
const connection = new Connection(opts, logger); const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = []; const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => { const s = connection.state$.subscribe((value) => {
capturedStates.push(value); capturedStates.push(value);
}); });
@@ -184,22 +194,20 @@ describe("Start connection states", () => {
let capturedState = capturedStates.pop(); let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined(); expect(capturedState).toBeDefined();
expect(capturedState!.state).toEqual("FetchingConfig"); expect(capturedState!).toEqual("FetchingConfig");
deferred.reject(new FailToGetOpenIdToken(new Error("Failed to get token"))); deferred.reject(new FailToGetOpenIdToken(new Error("Failed to get token")));
await vi.runAllTimersAsync(); await vi.runAllTimersAsync();
capturedState = capturedStates.pop(); capturedState = capturedStates.pop();
if (capturedState!.state === "FailedToStart") { if (capturedState instanceof Error) {
expect(capturedState!.error.message).toEqual("Something went wrong"); expect(capturedState.message).toEqual("Something went wrong");
expect(connection.transport.livekit_alias).toEqual( expect(connection.transport.livekit_alias).toEqual(
livekitFocus.livekit_alias, livekitFocus.livekit_alias,
); );
} else { } else {
expect.fail( expect.fail("Expected FailedToStart state but got " + capturedState);
"Expected FailedToStart state but got " + capturedState?.state,
);
} }
}); });
@@ -216,7 +224,7 @@ describe("Start connection states", () => {
const connection = new Connection(opts, logger); const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = []; const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => { const s = connection.state$.subscribe((value) => {
capturedStates.push(value); capturedStates.push(value);
}); });
@@ -238,24 +246,25 @@ describe("Start connection states", () => {
let capturedState = capturedStates.pop(); let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined(); expect(capturedState).toBeDefined();
expect(capturedState?.state).toEqual("FetchingConfig"); expect(capturedState).toEqual(ConnectionState.FetchingConfig);
deferredSFU.resolve(); deferredSFU.resolve();
await vi.runAllTimersAsync(); await vi.runAllTimersAsync();
capturedState = capturedStates.pop(); capturedState = capturedStates.pop();
if (capturedState?.state === "FailedToStart") { if (
expect(capturedState?.error.message).toContain( capturedState instanceof ElementCallError &&
capturedState.cause instanceof Error
) {
expect(capturedState.cause.message).toContain(
"SFU Config fetch failed with exception Error", "SFU Config fetch failed with exception Error",
); );
expect(connection.transport.livekit_alias).toEqual( expect(connection.transport.livekit_alias).toEqual(
livekitFocus.livekit_alias, livekitFocus.livekit_alias,
); );
} else { } else {
expect.fail( expect.fail("Expected FailedToStart state but got " + capturedState);
"Expected FailedToStart state but got " + capturedState?.state,
);
} }
}); });
@@ -272,7 +281,7 @@ describe("Start connection states", () => {
const connection = new Connection(opts, logger); const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = []; const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => { const s = connection.state$.subscribe((value) => {
capturedStates.push(value); capturedStates.push(value);
}); });
@@ -302,15 +311,18 @@ describe("Start connection states", () => {
let capturedState = capturedStates.pop(); let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined(); expect(capturedState).toBeDefined();
expect(capturedState?.state).toEqual("FetchingConfig"); expect(capturedState).toEqual(ConnectionState.FetchingConfig);
deferredSFU.resolve(); deferredSFU.resolve();
await vi.runAllTimersAsync(); await vi.runAllTimersAsync();
capturedState = capturedStates.pop(); capturedState = capturedStates.pop();
if (capturedState && capturedState?.state === "FailedToStart") { if (
expect(capturedState.error.message).toContain( capturedState instanceof ElementCallError &&
capturedState.cause instanceof Error
) {
expect(capturedState.cause.message).toContain(
"Failed to connect to livekit", "Failed to connect to livekit",
); );
expect(connection.transport.livekit_alias).toEqual( expect(connection.transport.livekit_alias).toEqual(
@@ -329,7 +341,7 @@ describe("Start connection states", () => {
const connection = setupRemoteConnection(); const connection = setupRemoteConnection();
const capturedStates: ConnectionState[] = []; const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => { const s = connection.state$.subscribe((value) => {
capturedStates.push(value); capturedStates.push(value);
}); });
@@ -339,13 +351,15 @@ describe("Start connection states", () => {
await vi.runAllTimersAsync(); await vi.runAllTimersAsync();
const initialState = capturedStates.shift(); const initialState = capturedStates.shift();
expect(initialState?.state).toEqual("Initialized"); expect(initialState).toEqual(ConnectionState.Initialized);
const fetchingState = capturedStates.shift(); const fetchingState = capturedStates.shift();
expect(fetchingState?.state).toEqual("FetchingConfig"); expect(fetchingState).toEqual(ConnectionState.FetchingConfig);
const disconnectedState = capturedStates.shift();
expect(disconnectedState).toEqual(ConnectionState.LivekitDisconnected);
const connectingState = capturedStates.shift(); const connectingState = capturedStates.shift();
expect(connectingState?.state).toEqual("ConnectingToLkRoom"); expect(connectingState).toEqual(ConnectionState.LivekitConnecting);
const connectedState = capturedStates.shift(); const connectedState = capturedStates.shift();
expect(connectedState?.state).toEqual("ConnectedToLkRoom"); expect(connectedState).toEqual(ConnectionState.LivekitConnected);
}); });
it("shutting down the scope should stop the connection", async () => { it("shutting down the scope should stop the connection", async () => {
@@ -411,7 +425,7 @@ describe("Publishing participants observations", () => {
); );
participants.forEach((p) => participants.forEach((p) =>
fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, p), fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p),
); );
// At this point there should be no publishers // At this point there should be no publishers
@@ -424,7 +438,7 @@ describe("Publishing participants observations", () => {
fakeRemoteLivekitParticipant("@dan:example.org:DEV333", 2), fakeRemoteLivekitParticipant("@dan:example.org:DEV333", 2),
]; ];
participants.forEach((p) => participants.forEach((p) =>
fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, p), fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p),
); );
// At this point there should be no publishers // At this point there should be no publishers
@@ -454,7 +468,7 @@ describe("Publishing participants observations", () => {
); );
for (const participant of participants) { for (const participant of participants) {
fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant); fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, participant);
} }
// At this point there should be no publishers // At this point there should be no publishers
@@ -463,7 +477,7 @@ describe("Publishing participants observations", () => {
participants = [fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 1)]; participants = [fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 1)];
for (const participant of participants) { for (const participant of participants) {
fakeRoomEventEmiter.emit(RoomEvent.ParticipantConnected, participant); fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, participant);
} }
// We should have bob has a publisher now // We should have bob has a publisher now
@@ -480,7 +494,7 @@ describe("Publishing participants observations", () => {
(p) => p.identity !== "@bob:example.org:DEV111", (p) => p.identity !== "@bob:example.org:DEV111",
); );
fakeRoomEventEmiter.emit( fakeLivekitRoom.emit(
RoomEvent.ParticipantDisconnected, RoomEvent.ParticipantDisconnected,
fakeRemoteLivekitParticipant("@bob:example.org:DEV111"), fakeRemoteLivekitParticipant("@bob:example.org:DEV111"),
); );

View File

@@ -12,7 +12,6 @@ import {
} from "@livekit/components-core"; } from "@livekit/components-core";
import { import {
ConnectionError, ConnectionError,
type ConnectionState as LivekitConenctionState,
type Room as LivekitRoom, type Room as LivekitRoom,
type LocalParticipant, type LocalParticipant,
type RemoteParticipant, type RemoteParticipant,
@@ -30,8 +29,10 @@ import {
import { type Behavior } from "../../Behavior.ts"; import { type Behavior } from "../../Behavior.ts";
import { type ObservableScope } from "../../ObservableScope.ts"; import { type ObservableScope } from "../../ObservableScope.ts";
import { import {
ElementCallError,
InsufficientCapacityError, InsufficientCapacityError,
SFURoomCreationRestrictedError, SFURoomCreationRestrictedError,
UnknownCallError,
} from "../../../utils/errors.ts"; } from "../../../utils/errors.ts";
export type PublishingParticipant = LocalParticipant | RemoteParticipant; export type PublishingParticipant = LocalParticipant | RemoteParticipant;
@@ -47,17 +48,30 @@ export interface ConnectionOpts {
/** Optional factory to create the LiveKit room, mainly for testing purposes. */ /** Optional factory to create the LiveKit room, mainly for testing purposes. */
livekitRoomFactory: () => LivekitRoom; livekitRoomFactory: () => LivekitRoom;
} }
export class FailedToStartError extends Error {
export type ConnectionState = public constructor(message: string) {
| { state: "Initialized" } super(message);
| { state: "FetchingConfig" } this.name = "FailedToStartError";
| { state: "ConnectingToLkRoom" } }
| { }
state: "ConnectedToLkRoom";
livekitConnectionState$: Behavior<LivekitConenctionState>; export enum ConnectionState {
/** The start state of a connection. It has been created but nothing has loaded yet. */
Initialized = "Initialized",
/** `start` has been called on the connection. It aquires the jwt info to conenct to the LK Room */
FetchingConfig = "FetchingConfig",
Stopped = "Stopped",
/** The same as ConnectionState.Disconnected from `livekit-client` */
LivekitDisconnected = "disconnected",
/** The same as ConnectionState.Connecting from `livekit-client` */
LivekitConnecting = "connecting",
/** The same as ConnectionState.Connected from `livekit-client` */
LivekitConnected = "connected",
/** The same as ConnectionState.Reconnecting from `livekit-client` */
LivekitReconnecting = "reconnecting",
/** The same as ConnectionState.SignalReconnecting from `livekit-client` */
LivekitSignalReconnecting = "signalReconnecting",
} }
| { state: "FailedToStart"; error: Error }
| { state: "Stopped" };
/** /**
* A connection to a Matrix RTC LiveKit backend. * A connection to a Matrix RTC LiveKit backend.
@@ -66,14 +80,14 @@ export type ConnectionState =
*/ */
export class Connection { export class Connection {
// Private Behavior // Private Behavior
private readonly _state$ = new BehaviorSubject<ConnectionState>({ private readonly _state$ = new BehaviorSubject<
state: "Initialized", ConnectionState | ElementCallError
}); >(ConnectionState.Initialized);
/** /**
* The current state of the connection to the media transport. * The current state of the connection to the media transport.
*/ */
public readonly state$: Behavior<ConnectionState> = this._state$; public readonly state$: Behavior<ConnectionState | Error> = this._state$;
/** /**
* The media transport to connect to. * The media transport to connect to.
@@ -117,16 +131,24 @@ export class Connection {
this.logger.debug("Starting Connection"); this.logger.debug("Starting Connection");
this.stopped = false; this.stopped = false;
try { try {
this._state$.next({ this._state$.next(ConnectionState.FetchingConfig);
state: "FetchingConfig", // We should already have this information after creating the localTransport.
}); // It would probably be better to forward this here.
const { url, jwt } = await this.getSFUConfigWithOpenID(); const { url, jwt } = await this.getSFUConfigWithOpenID();
// If we were stopped while fetching the config, don't proceed to connect // If we were stopped while fetching the config, don't proceed to connect
if (this.stopped) return; if (this.stopped) return;
this._state$.next({ // Setup observer once we are done with getSFUConfigWithOpenID
state: "ConnectingToLkRoom", connectionStateObserver(this.livekitRoom)
.pipe(
this.scope.bind(),
map((s) => s as unknown as ConnectionState),
)
.subscribe((lkState) => {
// It is save to cast lkState to ConnectionState as they are fully overlapping.
this._state$.next(lkState);
}); });
try { try {
await this.livekitRoom.connect(url, jwt); await this.livekitRoom.connect(url, jwt);
} catch (e) { } catch (e) {
@@ -141,7 +163,8 @@ export class Connection {
throw new InsufficientCapacityError(); throw new InsufficientCapacityError();
} }
if (e.status === 404) { if (e.status === 404) {
// error msg is "Could not establish signal connection: requested room does not exist" // error msg is "Failed to create call"
// error description is "Call creation might be restricted to authorized users only. Try again later, or contact your server admin if the problem persists."
// The room does not exist. There are two different modes of operation for the SFU: // The room does not exist. There are two different modes of operation for the SFU:
// - the room is created on the fly when connecting (livekit `auto_create` option) // - the room is created on the fly when connecting (livekit `auto_create` option)
// - Only authorized users can create rooms, so the room must exist before connecting (done by the auth jwt service) // - Only authorized users can create rooms, so the room must exist before connecting (done by the auth jwt service)
@@ -153,19 +176,16 @@ export class Connection {
} }
// If we were stopped while connecting, don't proceed to update state. // If we were stopped while connecting, don't proceed to update state.
if (this.stopped) return; if (this.stopped) return;
this._state$.next({
state: "ConnectedToLkRoom",
livekitConnectionState$: this.scope.behavior(
connectionStateObserver(this.livekitRoom),
),
});
} catch (error) { } catch (error) {
this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this.logger.debug(`Failed to connect to LiveKit room: ${error}`);
this._state$.next({ this._state$.next(
state: "FailedToStart", error instanceof ElementCallError
error: error instanceof Error ? error : new Error(`${error}`), ? error
}); : error instanceof Error
? new UnknownCallError(error)
: new UnknownCallError(new Error(`${error}`)),
);
// Its okay to ignore the throw. The error is part of the state.
throw error; throw error;
} }
} }
@@ -190,9 +210,7 @@ export class Connection {
); );
if (this.stopped) return; if (this.stopped) return;
await this.livekitRoom.disconnect(); await this.livekitRoom.disconnect();
this._state$.next({ this._state$.next(ConnectionState.Stopped);
state: "Stopped",
});
this.stopped = true; this.stopped = true;
} }

View File

@@ -10,7 +10,7 @@ import {
type LivekitTransport, type LivekitTransport,
type ParticipantId, type ParticipantId,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, combineLatest, map, of, switchMap, tap } from "rxjs"; import { combineLatest, map, of, switchMap, tap } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger"; import { type Logger } from "matrix-js-sdk/lib/logger";
import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
@@ -60,11 +60,7 @@ export class ConnectionManagerData {
transport: LivekitTransport, transport: LivekitTransport,
): (LocalParticipant | RemoteParticipant)[] { ): (LocalParticipant | RemoteParticipant)[] {
const key = transport.livekit_service_url + "|" + transport.livekit_alias; const key = transport.livekit_service_url + "|" + transport.livekit_alias;
const existing = this.store.get(key); return this.store.get(key)?.[1] ?? [];
if (existing) {
return existing[1];
}
return [];
} }
/** /**
* Get all connections where the given participant is publishing. * Get all connections where the given participant is publishing.
@@ -115,9 +111,6 @@ export function createConnectionManager$({
logger: parentLogger, logger: parentLogger,
}: Props): IConnectionManager { }: Props): IConnectionManager {
const logger = parentLogger.getChild("[ConnectionManager]"); const logger = parentLogger.getChild("[ConnectionManager]");
const running$ = new BehaviorSubject(true);
scope.onEnd(() => running$.next(false));
// TODO logger: only construct one logger from the client and make it compatible via a EC specific sing // TODO logger: only construct one logger from the client and make it compatible via a EC specific sing
/** /**
@@ -129,10 +122,7 @@ export function createConnectionManager$({
* externally this is modified via `registerTransports()`. * externally this is modified via `registerTransports()`.
*/ */
const transports$ = scope.behavior( const transports$ = scope.behavior(
combineLatest([running$, inputTransports$]).pipe( inputTransports$.pipe(
map(([running, transports]) =>
transports.mapInner((transport) => (running ? transport : [])),
),
map((transports) => transports.mapInner(removeDuplicateTransports)), map((transports) => transports.mapInner(removeDuplicateTransports)),
tap(({ value: transports }) => { tap(({ value: transports }) => {
logger.trace( logger.trace(

View File

@@ -108,7 +108,7 @@ export function createMatrixLivekitMembers$({
// Each update where the key of the generator array do not change will result in updates to the `data$` observable in the factory. // Each update where the key of the generator array do not change will result in updates to the `data$` observable in the factory.
(scope, data$, participantId, userId) => { (scope, data$, participantId, userId) => {
logger.debug( logger.debug(
`Updating data$ for participantId: ${participantId}, userId: ${userId}`, `Generating member for participantId: ${participantId}, userId: ${userId}`,
); );
// will only get called once per `participantId, userId` pair. // will only get called once per `participantId, userId` pair.
// updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent. // updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent.

View File

@@ -3373,14 +3373,14 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"@playwright/test@npm:^1.56.1": "@playwright/test@npm:^1.57.0":
version: 1.56.1 version: 1.57.0
resolution: "@playwright/test@npm:1.56.1" resolution: "@playwright/test@npm:1.57.0"
dependencies: dependencies:
playwright: "npm:1.56.1" playwright: "npm:1.57.0"
bin: bin:
playwright: cli.js playwright: cli.js
checksum: 10c0/2b5b0e1f2e6a18f6e5ce6897c7440ca78f64e0b004834e9808e93ad2b78b96366b562ae4366602669cf8ad793a43d85481b58541e74be71e905e732d833dd691 checksum: 10c0/35ba4b28be72bf0a53e33dbb11c6cff848fb9a37f49e893ce63a90675b5291ec29a1ba82c8a3b043abaead129400f0589623e9ace2e6a1c8eaa409721ecc3774
languageName: node languageName: node
linkType: hard linkType: hard
@@ -7492,7 +7492,7 @@ __metadata:
"@opentelemetry/sdk-trace-base": "npm:^2.0.0" "@opentelemetry/sdk-trace-base": "npm:^2.0.0"
"@opentelemetry/sdk-trace-web": "npm:^2.0.0" "@opentelemetry/sdk-trace-web": "npm:^2.0.0"
"@opentelemetry/semantic-conventions": "npm:^1.25.1" "@opentelemetry/semantic-conventions": "npm:^1.25.1"
"@playwright/test": "npm:^1.56.1" "@playwright/test": "npm:^1.57.0"
"@radix-ui/react-dialog": "npm:^1.0.4" "@radix-ui/react-dialog": "npm:^1.0.4"
"@radix-ui/react-slider": "npm:^1.1.2" "@radix-ui/react-slider": "npm:^1.1.2"
"@radix-ui/react-visually-hidden": "npm:^1.0.3" "@radix-ui/react-visually-hidden": "npm:^1.0.3"
@@ -10353,8 +10353,8 @@ __metadata:
linkType: hard linkType: hard
"matrix-js-sdk@npm:^39.2.0": "matrix-js-sdk@npm:^39.2.0":
version: 39.2.0 version: 39.3.0
resolution: "matrix-js-sdk@npm:39.2.0" resolution: "matrix-js-sdk@npm:39.3.0"
dependencies: dependencies:
"@babel/runtime": "npm:^7.12.5" "@babel/runtime": "npm:^7.12.5"
"@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.3.0" "@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.3.0"
@@ -10370,7 +10370,7 @@ __metadata:
sdp-transform: "npm:^3.0.0" sdp-transform: "npm:^3.0.0"
unhomoglyph: "npm:^1.0.6" unhomoglyph: "npm:^1.0.6"
uuid: "npm:13" uuid: "npm:13"
checksum: 10c0/f8b5261de2744305330ba3952821ca9303698170bfd3a0ff8a767b9286d4e8d4ed5aaf6fbaf8a1e8ff9dbd859102a2a47d882787e2da3b3078965bec00157959 checksum: 10c0/031c9ec042e00c32dc531f82fc59c64cc25fb665abfc642b1f0765c530d60684f8bd63daf0cdd0dbe96b4f87ea3f4148f9d3f024a59d57eceaec1ce5d0164755
languageName: node languageName: node
linkType: hard linkType: hard
@@ -11177,27 +11177,27 @@ __metadata:
languageName: node languageName: node
linkType: hard linkType: hard
"playwright-core@npm:1.56.1": "playwright-core@npm:1.57.0":
version: 1.56.1 version: 1.57.0
resolution: "playwright-core@npm:1.56.1" resolution: "playwright-core@npm:1.57.0"
bin: bin:
playwright-core: cli.js playwright-core: cli.js
checksum: 10c0/ffd40142b99c68678b387445d5b42f1fee4ab0b65d983058c37f342e5629f9cdbdac0506ea80a0dfd41a8f9f13345bad54e9a8c35826ef66dc765f4eb3db8da7 checksum: 10c0/798e35d83bf48419a8c73de20bb94d68be5dde68de23f95d80a0ebe401e3b83e29e3e84aea7894d67fa6c79d2d3d40cc5bcde3e166f657ce50987aaa2421b6a9
languageName: node languageName: node
linkType: hard linkType: hard
"playwright@npm:1.56.1": "playwright@npm:1.57.0":
version: 1.56.1 version: 1.57.0
resolution: "playwright@npm:1.56.1" resolution: "playwright@npm:1.57.0"
dependencies: dependencies:
fsevents: "npm:2.3.2" fsevents: "npm:2.3.2"
playwright-core: "npm:1.56.1" playwright-core: "npm:1.57.0"
dependenciesMeta: dependenciesMeta:
fsevents: fsevents:
optional: true optional: true
bin: bin:
playwright: cli.js playwright: cli.js
checksum: 10c0/8e9965aede86df0f4722063385748498977b219630a40a10d1b82b8bd8d4d4e9b6b65ecbfa024331a30800163161aca292fb6dd7446c531a1ad25f4155625ab4 checksum: 10c0/ab03c99a67b835bdea9059f516ad3b6e42c21025f9adaa161a4ef6bc7ca716dcba476d287140bb240d06126eb23f889a8933b8f5f1f1a56b80659d92d1358899
languageName: node languageName: node
linkType: hard linkType: hard