Merge branch 'voip-team/rebased-multiSFU' into valere/multi-sfu/connection_states

This commit is contained in:
Valere
2025-10-07 10:33:31 +02:00
18 changed files with 633 additions and 489 deletions

View File

@@ -72,6 +72,7 @@
"livekit_server_info": "LiveKit Server Info", "livekit_server_info": "LiveKit Server Info",
"livekit_sfu": "LiveKit SFU: {{url}}", "livekit_sfu": "LiveKit SFU: {{url}}",
"matrix_id": "Matrix ID: {{id}}", "matrix_id": "Matrix ID: {{id}}",
"multi_sfu": "Multi-SFU media transport",
"mute_all_audio": "Mute all audio (participants, reactions, join sounds)", "mute_all_audio": "Mute all audio (participants, reactions, join sounds)",
"show_connection_stats": "Show connection statistics", "show_connection_stats": "Show connection statistics",
"url_params": "URL parameters", "url_params": "URL parameters",
@@ -91,7 +92,7 @@
"generic_description": "Submitting debug logs will help us track down the problem.", "generic_description": "Submitting debug logs will help us track down the problem.",
"insufficient_capacity": "Insufficient capacity", "insufficient_capacity": "Insufficient capacity",
"insufficient_capacity_description": "The server has reached its maximum capacity and you cannot join the call at this time. Try again later, or contact your server admin if the problem persists.", "insufficient_capacity_description": "The server has reached its maximum capacity and you cannot join the call at this time. Try again later, or contact your server admin if the problem persists.",
"matrix_rtc_focus_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).", "matrix_rtc_transport_missing": "The server is not configured to work with {{brand}}. Please contact your server admin (Domain: {{domain}}, Error Code: {{ errorCode }}).",
"open_elsewhere": "Opened in another tab", "open_elsewhere": "Opened in another tab",
"open_elsewhere_description": "{{brand}} has been opened in another tab. If that doesn't sound right, try reloading the page.", "open_elsewhere_description": "{{brand}} has been opened in another tab. If that doesn't sound right, try reloading the page.",
"room_creation_restricted": "Failed to create call", "room_creation_restricted": "Failed to create call",

View File

@@ -26,7 +26,7 @@ import {
E2EENotSupportedError, E2EENotSupportedError,
type ElementCallError, type ElementCallError,
InsufficientCapacityError, InsufficientCapacityError,
MatrixRTCFocusMissingError, MatrixRTCTransportMissingError,
UnknownCallError, UnknownCallError,
} from "../utils/errors.ts"; } from "../utils/errors.ts";
import { mockConfig } from "../utils/test.ts"; import { mockConfig } from "../utils/test.ts";
@@ -34,7 +34,7 @@ import { ElementWidgetActions, type WidgetHelpers } from "../widget.ts";
test.each([ test.each([
{ {
error: new MatrixRTCFocusMissingError("example.com"), error: new MatrixRTCTransportMissingError("example.com"),
expectedTitle: "Call is not supported", expectedTitle: "Call is not supported",
}, },
{ {
@@ -85,7 +85,7 @@ test.each([
); );
test("should render the error page with link back to home", async () => { test("should render the error page with link back to home", async () => {
const error = new MatrixRTCFocusMissingError("example.com"); const error = new MatrixRTCTransportMissingError("example.com");
const TestComponent = (): ReactNode => { const TestComponent = (): ReactNode => {
throw error; throw error;
}; };
@@ -213,7 +213,7 @@ describe("Rageshake button", () => {
}); });
test("should have a close button in widget mode", async () => { test("should have a close button in widget mode", async () => {
const error = new MatrixRTCFocusMissingError("example.com"); const error = new MatrixRTCTransportMissingError("example.com");
const TestComponent = (): ReactNode => { const TestComponent = (): ReactNode => {
throw error; throw error;
}; };

View File

@@ -42,7 +42,7 @@ import {
import { GroupCallView } from "./GroupCallView"; import { GroupCallView } from "./GroupCallView";
import { type WidgetHelpers } from "../widget"; import { type WidgetHelpers } from "../widget";
import { LazyEventEmitter } from "../LazyEventEmitter"; import { LazyEventEmitter } from "../LazyEventEmitter";
import { MatrixRTCFocusMissingError } from "../utils/errors"; import { MatrixRTCTransportMissingError } from "../utils/errors";
import { ProcessorProvider } from "../livekit/TrackProcessorContext"; import { ProcessorProvider } from "../livekit/TrackProcessorContext";
import { MediaDevicesContext } from "../MediaDevicesContext"; import { MediaDevicesContext } from "../MediaDevicesContext";
import { HeaderStyle } from "../UrlParams"; import { HeaderStyle } from "../UrlParams";
@@ -258,7 +258,7 @@ test("GroupCallView leaves the session when an error occurs", async () => {
test("GroupCallView shows errors that occur during joining", async () => { test("GroupCallView shows errors that occur during joining", async () => {
const user = userEvent.setup(); const user = userEvent.setup();
enterRTCSession.mockRejectedValue(new MatrixRTCFocusMissingError("")); enterRTCSession.mockRejectedValue(new MatrixRTCTransportMissingError(""));
onTestFinished(() => { onTestFinished(() => {
enterRTCSession.mockReset(); enterRTCSession.mockReset();
}); });

View File

@@ -1,45 +0,0 @@
/*
Copyright 2023, 2024 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
type MatrixRTCSession,
MatrixRTCSessionEvent,
} from "matrix-js-sdk/lib/matrixrtc";
import { useCallback, useRef } from "react";
import { deepCompare } from "matrix-js-sdk/lib/utils";
import { logger } from "matrix-js-sdk/lib/logger";
import { type LivekitFocus, isLivekitFocus } from "matrix-js-sdk/lib/matrixrtc";
import { useTypedEventEmitterState } from "../useEvents";
/**
* Gets the currently active (livekit) focus for a MatrixRTC session
* This logic is specific to livekit foci where the whole call must use one
* and the same focus.
*/
export function useActiveLivekitFocus(
rtcSession: MatrixRTCSession,
): LivekitFocus | undefined {
const prevActiveFocus = useRef<LivekitFocus | undefined>(undefined);
return useTypedEventEmitterState(
rtcSession,
MatrixRTCSessionEvent.MembershipsChanged,
useCallback(() => {
const f = rtcSession.getActiveFocus();
// Only handle foci with type="livekit" for now.
if (f && isLivekitFocus(f) && !deepCompare(f, prevActiveFocus.current)) {
const oldestMembership = rtcSession.getOldestMembership();
logger.info(
`Got new active focus from membership: ${oldestMembership?.sender}/${oldestMembership?.deviceId}.
Updated focus (focus switch) from ${JSON.stringify(prevActiveFocus.current)} to ${JSON.stringify(f)}`,
);
prevActiveFocus.current = f;
}
return prevActiveFocus.current;
}, [rtcSession]),
);
}

View File

@@ -6,11 +6,10 @@ Please see LICENSE in the repository root for full details.
*/ */
import { import {
isLivekitFocusConfig,
type LivekitFocusConfig,
type LivekitFocus,
type LivekitFocusSelection,
type MatrixRTCSession, type MatrixRTCSession,
isLivekitTransportConfig,
type LivekitTransportConfig,
type LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
@@ -18,53 +17,42 @@ import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { PosthogAnalytics } from "./analytics/PosthogAnalytics"; import { PosthogAnalytics } from "./analytics/PosthogAnalytics";
import { Config } from "./config/Config"; import { Config } from "./config/Config";
import { ElementWidgetActions, widget, type WidgetHelpers } from "./widget"; import { ElementWidgetActions, widget, type WidgetHelpers } from "./widget";
import { MatrixRTCFocusMissingError } from "./utils/errors"; import { MatrixRTCTransportMissingError } from "./utils/errors";
import { getUrlParams } from "./UrlParams"; import { getUrlParams } from "./UrlParams";
import { getSFUConfigWithOpenID } from "./livekit/openIDSFU.ts"; import { getSFUConfigWithOpenID } from "./livekit/openIDSFU.ts";
const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci";
export function makeActiveFocus(): LivekitFocusSelection {
return {
type: "livekit",
focus_selection: "oldest_membership",
};
}
export function getLivekitAlias(rtcSession: MatrixRTCSession): string { export function getLivekitAlias(rtcSession: MatrixRTCSession): string {
// For now we assume everything is a room-scoped call // For now we assume everything is a room-scoped call
return rtcSession.room.roomId; return rtcSession.room.roomId;
} }
async function makeFocusInternal( async function makeTransportInternal(
rtcSession: MatrixRTCSession, rtcSession: MatrixRTCSession,
): Promise<LivekitFocus> { ): Promise<LivekitTransport> {
logger.log("Searching for a preferred focus"); logger.log("Searching for a preferred transport");
const livekitAlias = getLivekitAlias(rtcSession); const livekitAlias = getLivekitAlias(rtcSession);
const urlFromStorage = localStorage.getItem("robin-matrixrtc-auth"); // TODO-MULTI-SFU: Either remove this dev tool or make it more official
const urlFromStorage =
localStorage.getItem("robin-matrixrtc-auth") ??
localStorage.getItem("timo-focus-url");
if (urlFromStorage !== null) { if (urlFromStorage !== null) {
const focusFromStorage: LivekitFocus = { const transportFromStorage: LivekitTransport = {
type: "livekit", type: "livekit",
livekit_service_url: urlFromStorage, livekit_service_url: urlFromStorage,
livekit_alias: livekitAlias, livekit_alias: livekitAlias,
}; };
logger.log("Using LiveKit focus from local storage: ", focusFromStorage); logger.log(
return focusFromStorage; "Using LiveKit transport from local storage: ",
transportFromStorage,
);
return transportFromStorage;
} }
// Prioritize the .well-known/matrix/client, if available, over the configured SFU // Prioritize the .well-known/matrix/client, if available, over the configured SFU
const domain = rtcSession.room.client.getDomain(); const domain = rtcSession.room.client.getDomain();
if (localStorage.getItem("timo-focus-url")) {
const timoFocusUrl = localStorage.getItem("timo-focus-url")!;
const focusFromUrl: LivekitFocus = {
type: "livekit",
livekit_service_url: timoFocusUrl,
livekit_alias: livekitAlias,
};
logger.log("Using LiveKit focus from localStorage: ", timoFocusUrl);
return focusFromUrl;
}
if (domain) { if (domain) {
// we use AutoDiscovery instead of relying on the MatrixClient having already // we use AutoDiscovery instead of relying on the MatrixClient having already
// been fully configured and started // been fully configured and started
@@ -72,49 +60,50 @@ async function makeFocusInternal(
FOCI_WK_KEY FOCI_WK_KEY
]; ];
if (Array.isArray(wellKnownFoci)) { if (Array.isArray(wellKnownFoci)) {
const focus: LivekitFocusConfig | undefined = wellKnownFoci.find( const transport: LivekitTransportConfig | undefined = wellKnownFoci.find(
(f) => f && isLivekitFocusConfig(f), (f) => f && isLivekitTransportConfig(f),
); );
if (focus !== undefined) { if (transport !== undefined) {
logger.log("Using LiveKit focus from .well-known: ", focus); logger.log("Using LiveKit transport from .well-known: ", transport);
return { ...focus, livekit_alias: livekitAlias }; return { ...transport, livekit_alias: livekitAlias };
} }
} }
} }
const urlFromConf = Config.get().livekit?.livekit_service_url; const urlFromConf = Config.get().livekit?.livekit_service_url;
if (urlFromConf) { if (urlFromConf) {
const focusFromConf: LivekitFocus = { const transportFromConf: LivekitTransport = {
type: "livekit", type: "livekit",
livekit_service_url: urlFromConf, livekit_service_url: urlFromConf,
livekit_alias: livekitAlias, livekit_alias: livekitAlias,
}; };
logger.log("Using LiveKit focus from config: ", focusFromConf); logger.log("Using LiveKit transport from config: ", transportFromConf);
return focusFromConf; return transportFromConf;
} }
throw new MatrixRTCFocusMissingError(domain ?? ""); throw new MatrixRTCTransportMissingError(domain ?? "");
} }
export async function makeFocus( export async function makeTransport(
rtcSession: MatrixRTCSession, rtcSession: MatrixRTCSession,
): Promise<LivekitFocus> { ): Promise<LivekitTransport> {
const focus = await makeFocusInternal(rtcSession); const transport = await makeTransportInternal(rtcSession);
// this will call the jwt/sfu/get endpoint to pre create the livekit room. // this will call the jwt/sfu/get endpoint to pre create the livekit room.
await getSFUConfigWithOpenID( await getSFUConfigWithOpenID(
rtcSession.room.client, rtcSession.room.client,
focus.livekit_service_url, transport.livekit_service_url,
focus.livekit_alias, transport.livekit_alias,
); );
return focus; return transport;
} }
export async function enterRTCSession( export async function enterRTCSession(
rtcSession: MatrixRTCSession, rtcSession: MatrixRTCSession,
focus: LivekitFocus, transport: LivekitTransport,
encryptMedia: boolean, encryptMedia: boolean,
useNewMembershipManager = true, useNewMembershipManager = true,
useExperimentalToDeviceTransport = false, useExperimentalToDeviceTransport = false,
useMultiSfu = true,
): Promise<void> { ): Promise<void> {
PosthogAnalytics.instance.eventCallEnded.cacheStartCall(new Date()); PosthogAnalytics.instance.eventCallEnded.cacheStartCall(new Date());
PosthogAnalytics.instance.eventCallStarted.track(rtcSession.room.roomId); PosthogAnalytics.instance.eventCallStarted.track(rtcSession.room.roomId);
@@ -127,25 +116,31 @@ export async function enterRTCSession(
const useDeviceSessionMemberEvents = const useDeviceSessionMemberEvents =
features?.feature_use_device_session_member_events; features?.feature_use_device_session_member_events;
const { sendNotificationType: notificationType, callIntent } = getUrlParams(); const { sendNotificationType: notificationType, callIntent } = getUrlParams();
rtcSession.joinRoomSession([focus], focus, { // Multi-sfu does not need a preferred foci list. just the focus that is actually used.
notificationType, rtcSession.joinRoomSession(
callIntent, useMultiSfu ? [] : [transport],
useNewMembershipManager, useMultiSfu ? transport : undefined,
manageMediaKeys: encryptMedia, {
...(useDeviceSessionMemberEvents !== undefined && { notificationType,
useLegacyMemberEvents: !useDeviceSessionMemberEvents, callIntent,
}), useNewMembershipManager,
delayedLeaveEventRestartMs: manageMediaKeys: encryptMedia,
matrixRtcSessionConfig?.delayed_leave_event_restart_ms, ...(useDeviceSessionMemberEvents !== undefined && {
delayedLeaveEventDelayMs: useLegacyMemberEvents: !useDeviceSessionMemberEvents,
matrixRtcSessionConfig?.delayed_leave_event_delay_ms, }),
delayedLeaveEventRestartLocalTimeoutMs: delayedLeaveEventRestartMs:
matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms, matrixRtcSessionConfig?.delayed_leave_event_restart_ms,
networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms, delayedLeaveEventDelayMs:
makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms, matrixRtcSessionConfig?.delayed_leave_event_delay_ms,
membershipEventExpiryMs: matrixRtcSessionConfig?.membership_event_expiry_ms, delayedLeaveEventRestartLocalTimeoutMs:
useExperimentalToDeviceTransport, matrixRtcSessionConfig?.delayed_leave_event_restart_local_timeout_ms,
}); networkErrorRetryMs: matrixRtcSessionConfig?.network_error_retry_ms,
makeKeyDelay: matrixRtcSessionConfig?.wait_for_key_rotation_ms,
membershipEventExpiryMs:
matrixRtcSessionConfig?.membership_event_expiry_ms,
useExperimentalToDeviceTransport,
},
);
if (widget) { if (widget) {
try { try {
await widget.api.transport.send(ElementWidgetActions.JoinCall, {}); await widget.api.transport.send(ElementWidgetActions.JoinCall, {});

View File

@@ -16,6 +16,7 @@ import {
showConnectionStats as showConnectionStatsSetting, showConnectionStats as showConnectionStatsSetting,
useNewMembershipManager as useNewMembershipManagerSetting, useNewMembershipManager as useNewMembershipManagerSetting,
useExperimentalToDeviceTransport as useExperimentalToDeviceTransportSetting, useExperimentalToDeviceTransport as useExperimentalToDeviceTransportSetting,
multiSfu as multiSfuSetting,
muteAllAudio as muteAllAudioSetting, muteAllAudio as muteAllAudioSetting,
alwaysShowIphoneEarpiece as alwaysShowIphoneEarpieceSetting, alwaysShowIphoneEarpiece as alwaysShowIphoneEarpieceSetting,
} from "./settings"; } from "./settings";
@@ -50,6 +51,7 @@ export const DeveloperSettingsTab: FC<Props> = ({ client, livekitRooms }) => {
useExperimentalToDeviceTransport, useExperimentalToDeviceTransport,
setUseExperimentalToDeviceTransport, setUseExperimentalToDeviceTransport,
] = useSetting(useExperimentalToDeviceTransportSetting); ] = useSetting(useExperimentalToDeviceTransportSetting);
const [multiSfu, setMultiSfu] = useSetting(multiSfuSetting);
const [muteAllAudio, setMuteAllAudio] = useSetting(muteAllAudioSetting); const [muteAllAudio, setMuteAllAudio] = useSetting(muteAllAudioSetting);
@@ -166,6 +168,20 @@ export const DeveloperSettingsTab: FC<Props> = ({ client, livekitRooms }) => {
)} )}
/> />
</FieldRow> </FieldRow>
<FieldRow>
<InputField
id="multiSfu"
type="checkbox"
label={t("developer_mode.multi_sfu")}
checked={multiSfu}
onChange={useCallback(
(event: ChangeEvent<HTMLInputElement>): void => {
setMultiSfu(event.target.checked);
},
[setMultiSfu],
)}
/>
</FieldRow>
<FieldRow> <FieldRow>
<InputField <InputField
id="muteAllAudio" id="muteAllAudio"

View File

@@ -125,6 +125,8 @@ export const useExperimentalToDeviceTransport = new Setting<boolean>(
true, true,
); );
export const multiSfu = new Setting<boolean>("multi-sfu", false);
export const muteAllAudio = new Setting<boolean>("mute-all-audio", false); export const muteAllAudio = new Setting<boolean>("mute-all-audio", false);
export const alwaysShowSelf = new Setting<boolean>("always-show-self", true); export const alwaysShowSelf = new Setting<boolean>("always-show-self", true);

44
src/state/Async.ts Normal file
View File

@@ -0,0 +1,44 @@
/*
Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import {
catchError,
from,
map,
Observable,
of,
startWith,
switchMap,
} from "rxjs";
export type Async<A> =
| { state: "loading" }
| { state: "error"; value: Error }
| { state: "ready"; value: A };
export const loading: Async<never> = { state: "loading" };
export function error(value: Error): Async<never> {
return { state: "error", value };
}
export function ready<A>(value: A): Async<A> {
return { state: "ready", value };
}
export function async<A>(promise: Promise<A>): Observable<Async<A>> {
return from(promise).pipe(
map(ready),
startWith(loading),
catchError((e) => of(error(e))),
);
}
export function mapAsync<A, B>(
async: Async<A>,
project: (value: A) => B,
): Async<B> {
return async.state === "ready" ? ready(project(async.value)) : async;
}

View File

@@ -28,6 +28,7 @@ import {
EventType, EventType,
RoomEvent, RoomEvent,
} from "matrix-js-sdk"; } from "matrix-js-sdk";
import { deepCompare } from "matrix-js-sdk/lib/utils";
import { import {
BehaviorSubject, BehaviorSubject,
EMPTY, EMPTY,
@@ -36,7 +37,6 @@ import {
Subject, Subject,
combineLatest, combineLatest,
concat, concat,
concatMap,
distinctUntilChanged, distinctUntilChanged,
endWith, endWith,
filter, filter,
@@ -48,6 +48,7 @@ import {
of, of,
pairwise, pairwise,
race, race,
repeat,
scan, scan,
skip, skip,
skipWhile, skipWhile,
@@ -57,6 +58,7 @@ import {
switchScan, switchScan,
take, take,
takeUntil, takeUntil,
takeWhile,
tap, tap,
throttleTime, throttleTime,
timer, timer,
@@ -64,7 +66,8 @@ import {
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { import {
type CallMembership, type CallMembership,
isLivekitFocus, isLivekitTransport,
type LivekitTransport,
type MatrixRTCSession, type MatrixRTCSession,
MatrixRTCSessionEvent, MatrixRTCSessionEvent,
type MatrixRTCSessionEventHandlerMap, type MatrixRTCSessionEventHandlerMap,
@@ -90,6 +93,7 @@ import {
import { ObservableScope } from "./ObservableScope"; import { ObservableScope } from "./ObservableScope";
import { import {
duplicateTiles, duplicateTiles,
multiSfu,
playReactionsSound, playReactionsSound,
showReactions, showReactions,
} from "../settings/settings"; } from "../settings/settings";
@@ -118,7 +122,7 @@ import { constant, type Behavior } from "./Behavior";
import { import {
enterRTCSession, enterRTCSession,
getLivekitAlias, getLivekitAlias,
makeFocus, makeTransport,
} from "../rtcSessionHelpers"; } from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType"; import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
@@ -128,6 +132,7 @@ import { getUrlParams } from "../UrlParams";
import { type ProcessorState } from "../livekit/TrackProcessorContext"; import { type ProcessorState } from "../livekit/TrackProcessorContext";
import { ElementWidgetActions, widget } from "../widget"; import { ElementWidgetActions, widget } from "../widget";
import { PublishConnection } from "./PublishConnection.ts"; import { PublishConnection } from "./PublishConnection.ts";
import { type Async, async, mapAsync, ready } from "./Async";
export interface CallViewModelOptions { export interface CallViewModelOptions {
encryptionSystem: EncryptionSystem; encryptionSystem: EncryptionSystem;
@@ -299,7 +304,7 @@ class UserMedia {
public readonly presenter$: Behavior<boolean>; public readonly presenter$: Behavior<boolean>;
public constructor( public constructor(
public readonly id: string, public readonly id: string,
member: RoomMember | undefined, member: RoomMember,
participant: LocalParticipant | RemoteParticipant | undefined, participant: LocalParticipant | RemoteParticipant | undefined,
encryptionSystem: EncryptionSystem, encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom, livekitRoom: LivekitRoom,
@@ -372,7 +377,7 @@ class ScreenShare {
public constructor( public constructor(
id: string, id: string,
member: RoomMember | undefined, member: RoomMember,
participant: LocalParticipant | RemoteParticipant, participant: LocalParticipant | RemoteParticipant,
encryptionSystem: EncryptionSystem, encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom, livekitRoom: LivekitRoom,
@@ -450,42 +455,36 @@ export class CallViewModel extends ViewModel {
} }
: undefined; : undefined;
private readonly localFocus = makeFocus(this.matrixRTCSession); private readonly join$ = new Subject<void>();
private readonly localConnection = this.localFocus.then( public join(): void {
(focus) => { this.join$.next();
const args: ConnectionOpts = { }
focus,
client: this.matrixRTCSession.room.client, // This is functionally the same Observable as leave$, except here it's
scope: this.scope, // hoisted to the top of the class. This enables the cyclic dependency between
membershipsFocusMap$: this.membershipsAndFocusMap$, // leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ ->
} // localConnection$ -> transports$ -> joined$ -> leave$.
return new PublishConnection( private readonly leaveHoisted$ = new Subject<
args, "user" | "timeout" | "decline" | "allOthersLeft"
this.mediaDevices, >();
this.muteStates,
this.e2eeLivekitOptions(), /**
this.scope.behavior(this.trackProcessorState$), * Whether we are joined to the call. This reflects our local state rather
) * than whether all connections are truly up and running.
} */
private readonly joined$ = this.scope.behavior(
this.join$.pipe(
map(() => true),
// Using takeUntil with the repeat operator is perfectly valid.
// eslint-disable-next-line rxjs/no-unsafe-takeuntil
takeUntil(this.leaveHoisted$),
endWith(false),
repeat(),
startWith(false),
),
); );
public readonly livekitConnectionState$ =
this.scope.behavior(
from(this.localConnection).pipe(
switchMap((c) =>
c.focusedConnectionState$.pipe(
map((s) => {
if (s.state === "ConnectedToLkRoom") return s.connectionState;
return ConnectionState.Disconnected
}),
distinctUntilChanged(),
),
),
startWith(ConnectionState.Disconnected),
),
)
/** /**
* The MatrixRTC session participants. * The MatrixRTC session participants.
@@ -502,122 +501,224 @@ export class CallViewModel extends ViewModel {
), ),
); );
private readonly membershipsAndFocusMap$ = this.scope.behavior( /**
this.memberships$.pipe( * The transport that we would personally prefer to publish on (if not for the
map((memberships) => * transport preferences of others, perhaps).
memberships.flatMap((m) => { */
const f = this.matrixRTCSession.resolveActiveFocus(m); private readonly preferredTransport = makeTransport(this.matrixRTCSession);
return f && isLivekitFocus(f) ? [{ membership: m, focus: f }] : [];
}),
),
),
);
private readonly livekitServiceUrls$ = this.membershipsAndFocusMap$.pipe( /**
map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))), * Lists the transports used by ourselves, plus all other MatrixRTC session
); * members.
*/
private readonly remoteConnections$ = this.scope.behavior( private readonly transports$: Behavior<{
combineLatest([this.localFocus, this.livekitServiceUrls$]).pipe( local: Async<LivekitTransport>;
accumulate( remote: { membership: CallMembership; transport: LivekitTransport }[];
new Map<string, Connection>(), } | null> = this.scope.behavior(
(prev, [localFocus, focusUrls]) => { this.joined$.pipe(
const stopped = new Map(prev); switchMap((joined) =>
const next = new Map<string, Connection>(); joined
for (const focusUrl of focusUrls) { ? combineLatest(
if (focusUrl !== localFocus.livekit_service_url) { [
stopped.delete(focusUrl); async(this.preferredTransport),
this.memberships$,
let nextConnection = prev.get(focusUrl); multiSfu.value$,
if (!nextConnection) { ],
logger.log( (preferred, memberships, multiSfu) => {
"SFU remoteConnections$ construct new connection: ", const remote = memberships.flatMap((m) => {
focusUrl, if (m.sender === this.userId && m.deviceId === this.deviceId)
); return [];
const args: ConnectionOpts = { const t = this.matrixRTCSession.resolveActiveFocus(m);
focus: { return t && isLivekitTransport(t)
type: "livekit", ? [{ membership: m, transport: t }]
livekit_service_url: focusUrl, : [];
livekit_alias: this.livekitAlias, });
}, let local = preferred;
client: this.matrixRTCSession.room.client, if (!multiSfu) {
scope: this.scope, const oldest = this.matrixRTCSession.getOldestMembership();
membershipsFocusMap$: this.membershipsAndFocusMap$, if (oldest !== undefined) {
const selection = oldest.getTransport(oldest);
if (isLivekitTransport(selection)) local = ready(selection);
}
} }
nextConnection = new RemoteConnection(args, this.e2eeLivekitOptions()); return { local, remote };
} else { },
logger.log( )
"SFU remoteConnections$ use prev connection: ", : of(null),
focusUrl,
);
}
next.set(focusUrl, nextConnection);
}
}
for (const connection of stopped.values()) connection.stop();
return next;
},
), ),
), ),
); );
private readonly join$ = new Subject<void>(); /**
* Lists the transports used by each MatrixRTC session member other than
* ourselves.
*/
private readonly remoteTransports$ = this.scope.behavior(
this.transports$.pipe(map((transports) => transports?.remote ?? [])),
);
public join(): void { /**
this.join$.next(); * The transport over which we should be actively publishing our media.
} */
private readonly localTransport$: Behavior<Async<LivekitTransport> | null> =
this.scope.behavior(
this.transports$.pipe(
map((transports) => transports?.local ?? null),
distinctUntilChanged(deepCompare),
),
);
private readonly connectionInstructions$ = this.join$.pipe( private readonly localConnectionAndTransport$ = this.scope.behavior(
switchMap(() => this.remoteConnections$), this.localTransport$.pipe(
startWith(new Map<string, Connection>()), map(
(transport) =>
transport &&
mapAsync(transport, (transport) => {
const opts: ConnectionOpts = {
transport,
client: this.matrixRTCSession.room.client,
scope: this.scope,
remoteTransports$: this.remoteTransports$,
};
return {
connection: new PublishConnection(
opts,
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
),
transport,
}}),
),
),
);
private readonly localConnection$ = this.scope.behavior(
this.localConnectionAndTransport$.pipe(
map((value) => value && mapAsync(value, ({ connection }) => connection)),
),
);
public readonly livekitConnectionState$ = this.scope.behavior(
this.localConnection$.pipe(
switchMap((c) =>
c?.state === "ready"
// TODO mapping to ConnectionState for compatibility, but we should use the full state?
? c.value.focusedConnectionState$.pipe(
map((s) => {
if (s.state === "ConnectedToLkRoom") return s.connectionState;
return ConnectionState.Disconnected
}),
distinctUntilChanged(),
)
: of(ConnectionState.Disconnected),
),
),
);
/**
* Connections for each transport in use by one or more session members that
* is *distinct* from the local transport.
*/
private readonly remoteConnections$ = this.scope.behavior(
this.transports$.pipe(
accumulate(new Map<string, Connection>(), (prev, transports) => {
const next = new Map<string, Connection>();
// Until the local transport becomes ready we have no idea which
// transports will actually need a dedicated remote connection
if (transports?.local.state === "ready") {
const localServiceUrl = transports.local.value.livekit_service_url;
const remoteServiceUrls = new Set(
transports.remote.flatMap(({ membership, transport }) => {
const t = this.matrixRTCSession.resolveActiveFocus(membership);
return t &&
isLivekitTransport(t) &&
t.livekit_service_url !== localServiceUrl
? [t.livekit_service_url]
: [];
}),
);
for (const remoteServiceUrl of remoteServiceUrls) {
let nextConnection = prev.get(remoteServiceUrl);
if (!nextConnection) {
logger.log(
"SFU remoteConnections$ construct new connection: ",
remoteServiceUrl,
);
const args: ConnectionOpts = {
transport: {
type: "livekit",
livekit_service_url: remoteServiceUrl,
livekit_alias: this.livekitAlias,
},
client: this.matrixRTCSession.room.client,
scope: this.scope,
remoteTransports$: this.remoteTransports$,
}
nextConnection = new RemoteConnection(args, this.e2eeLivekitOptions());
} else {
logger.log(
"SFU remoteConnections$ use prev connection: ",
remoteServiceUrl,
);
}
next.set(remoteServiceUrl, nextConnection);
}
}
return next;
}),
map((transports) => [...transports.values()]),
),
);
/**
* A list of the connections that should be active at any given time.
*/
private readonly connections$ = this.scope.behavior<Connection[]>(
combineLatest(
[this.localConnection$, this.remoteConnections$],
(local, remote) => [
...(local?.state === "ready" ? [local.value] : []),
...remote.values(),
],
),
);
/**
* Emits with connections whenever they should be started or stopped.
*/
private readonly connectionInstructions$ = this.connections$.pipe(
pairwise(), pairwise(),
map(([prev, next]) => { map(([prev, next]) => {
const start = new Set(next.values()); const start = new Set(next.values());
for (const connection of prev.values()) start.delete(connection); for (const connection of prev) start.delete(connection);
const stop = new Set(prev.values()); const stop = new Set(prev.values());
for (const connection of next.values()) stop.delete(connection); for (const connection of next) stop.delete(connection);
return { start, stop }; return { start, stop };
}), }),
this.scope.share,
);
private readonly startConnection$ = this.connectionInstructions$.pipe(
concatMap(({ start }) => start),
);
private readonly stopConnection$ = this.connectionInstructions$.pipe(
concatMap(({ stop }) => stop),
); );
public readonly allLivekitRooms$ = this.scope.behavior( public readonly allLivekitRooms$ = this.scope.behavior(
combineLatest([ this.connections$.pipe(
this.remoteConnections$, map((connections) =>
this.localConnection, [...connections.values()].map((c) => ({
this.localFocus, room: c.livekitRoom,
]).pipe( url: c.localTransport.livekit_service_url,
map(([remoteConnections, localConnection, localFocus]) => isLocal: c instanceof PublishConnection,
Array.from(remoteConnections.entries()) })),
.map(
([index, c]) =>
({
room: c.livekitRoom,
url: index,
}) as { room: LivekitRoom; url: string; isLocal?: boolean },
)
.concat([
{
room: localConnection.livekitRoom,
url: localFocus.livekit_service_url,
isLocal: true,
},
]),
), ),
startWith([]),
), ),
); );
private readonly userId = this.matrixRoom.client.getUserId(); private readonly userId = this.matrixRoom.client.getUserId();
private readonly deviceId = this.matrixRoom.client.getDeviceId();
private readonly matrixConnected$ = this.scope.behavior( private readonly matrixConnected$ = this.scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following: // To consider ourselves connected to MatrixRTC, we check the following:
@@ -690,41 +791,52 @@ export class CallViewModel extends ViewModel {
// in a split-brained state. // in a split-brained state.
private readonly pretendToBeDisconnected$ = this.reconnecting$; private readonly pretendToBeDisconnected$ = this.reconnecting$;
/**
* Lists, for each LiveKit room, the LiveKit participants whose media should
* be presented.
*/
public readonly participantsByRoom$ = this.scope.behavior< public readonly participantsByRoom$ = this.scope.behavior<
{ {
livekitRoom: LivekitRoom; livekitRoom: LivekitRoom;
url: string; url: string;
participants: { participants: {
participant: LocalParticipant | RemoteParticipant; id: string;
participant: LocalParticipant | RemoteParticipant | undefined;
member: RoomMember; member: RoomMember;
}[]; }[];
}[] }[]
>( >(
combineLatest([this.localConnection, this.localFocus]) // TODO: Move this logic into Connection/PublishConnection if possible
this.localConnectionAndTransport$
.pipe( .pipe(
switchMap(([localConnection, localFocus]) => { switchMap((values) => {
if (values?.state !== "ready") return [];
const localConnection = values.value.connection;
const memberError = (): never => { const memberError = (): never => {
throw new Error("No room member for call membership"); throw new Error("No room member for call membership");
}; };
const localParticipant = { const localParticipant = {
id: "local",
participant: localConnection.livekitRoom.localParticipant, participant: localConnection.livekitRoom.localParticipant,
member: member:
this.matrixRoom.getMember(this.userId ?? "") ?? memberError(), this.matrixRoom.getMember(this.userId ?? "") ?? memberError(),
}; };
return this.remoteConnections$.pipe( return this.remoteConnections$.pipe(
switchMap((connections) => switchMap((remoteConnections) =>
combineLatest( combineLatest(
[ [localConnection, ...remoteConnections].map((c) =>
[localFocus.livekit_service_url, localConnection] as const,
...connections,
].map(([url, c]) =>
c.publishingParticipants$.pipe( c.publishingParticipants$.pipe(
map((ps) => { map((ps) => {
const participants: { const participants: {
participant: LocalParticipant | RemoteParticipant; id: string;
participant:
| LocalParticipant
| RemoteParticipant
| undefined;
member: RoomMember; member: RoomMember;
}[] = ps.map(({ participant, membership }) => ({ }[] = ps.map(({ participant, membership }) => ({
id: `${membership.sender}:${membership.deviceId}`,
participant, participant,
member: member:
getRoomMemberFromRtcMember( getRoomMemberFromRtcMember(
@@ -737,7 +849,7 @@ export class CallViewModel extends ViewModel {
return { return {
livekitRoom: c.livekitRoom, livekitRoom: c.livekitRoom,
url, url: c.localTransport.livekit_service_url,
participants, participants,
}; };
}), }),
@@ -820,34 +932,17 @@ export class CallViewModel extends ViewModel {
* List of MediaItems that we want to display * List of MediaItems that we want to display
*/ */
private readonly mediaItems$ = this.scope.behavior<MediaItem[]>( private readonly mediaItems$ = this.scope.behavior<MediaItem[]>(
combineLatest([ combineLatest([this.participantsByRoom$, duplicateTiles.value$]).pipe(
this.participantsByRoom$, scan((prevItems, [participantsByRoom, duplicateTiles]) => {
duplicateTiles.value$,
this.memberships$,
]).pipe(
scan((prevItems, [participantsByRoom, duplicateTiles, memberships]) => {
const newItems: Map<string, UserMedia | ScreenShare> = new Map( const newItems: Map<string, UserMedia | ScreenShare> = new Map(
function* (this: CallViewModel): Iterable<[string, MediaItem]> { function* (this: CallViewModel): Iterable<[string, MediaItem]> {
for (const { livekitRoom, participants } of participantsByRoom) { for (const { livekitRoom, participants } of participantsByRoom) {
for (const { participant, member } of participants) { for (const { id, participant, member } of participants) {
const matrixId = participant.isLocal
? "local"
: participant.identity;
for (let i = 0; i < 1 + duplicateTiles; i++) { for (let i = 0; i < 1 + duplicateTiles; i++) {
const mediaId = `${matrixId}:${i}`; const mediaId = `${id}:${i}`;
let prevMedia = prevItems.get(mediaId); const prevMedia = prevItems.get(mediaId);
if (prevMedia && prevMedia instanceof UserMedia) { if (prevMedia instanceof UserMedia)
prevMedia.updateParticipant(participant); prevMedia.updateParticipant(participant);
if (prevMedia.vm.member === undefined) {
// We have a previous media created because of the `debugShowNonMember` flag.
// In this case we actually replace the media item.
// This "hack" never occurs if we do not use the `debugShowNonMember` debugging
// option and if we always find a room member for each rtc member (which also
// only fails if we have a fundamental problem)
prevMedia = undefined;
}
}
yield [ yield [
mediaId, mediaId,
@@ -864,14 +959,10 @@ export class CallViewModel extends ViewModel {
this.mediaDevices, this.mediaDevices,
this.pretendToBeDisconnected$, this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe( this.memberDisplaynames$.pipe(
map((m) => m.get(matrixId) ?? "[👻]"), map((m) => m.get(id) ?? "[👻]"),
),
this.handsRaised$.pipe(
map((v) => v[matrixId]?.time ?? null),
),
this.reactions$.pipe(
map((v) => v[matrixId] ?? undefined),
), ),
this.handsRaised$.pipe(map((v) => v[id]?.time ?? null)),
this.reactions$.pipe(map((v) => v[id] ?? undefined)),
), ),
]; ];
@@ -888,7 +979,7 @@ export class CallViewModel extends ViewModel {
livekitRoom, livekitRoom,
this.pretendToBeDisconnected$, this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe( this.memberDisplaynames$.pipe(
map((m) => m.get(matrixId) ?? "[👻]"), map((m) => m.get(id) ?? "[👻]"),
), ),
), ),
]; ];
@@ -942,6 +1033,16 @@ export class CallViewModel extends ViewModel {
this.memberships$.pipe(map((ms) => ms.length)), this.memberships$.pipe(map((ms) => ms.length)),
); );
private readonly allOthersLeft$ = this.memberships$.pipe(
pairwise(),
filter(
([prev, current]) =>
current.every((m) => m.sender === this.userId) &&
prev.some((m) => m.sender !== this.userId),
),
map(() => {}),
);
private readonly didSendCallNotification$ = fromEvent( private readonly didSendCallNotification$ = fromEvent(
this.matrixRTCSession, this.matrixRTCSession,
MatrixRTCSessionEvent.DidSendCallNotification, MatrixRTCSessionEvent.DidSendCallNotification,
@@ -1066,56 +1167,12 @@ export class CallViewModel extends ViewModel {
map(() => {}), map(() => {}),
throttleTime(THROTTLE_SOUND_EFFECT_MS), throttleTime(THROTTLE_SOUND_EFFECT_MS),
); );
/**
* This observable tracks the matrix users that are currently in the call.
* There can be just one matrix user with multiple participants (see also participantChanges$)
*/
public readonly matrixUserChanges$ = this.userMedia$.pipe(
map(
(mediaItems) =>
new Set(
mediaItems
.map((m) => m.vm.member?.userId)
.filter((id) => id !== undefined),
),
),
scan<
Set<string>,
{
userIds: Set<string>;
joinedUserIds: Set<string>;
leftUserIds: Set<string>;
}
>(
(prevState, userIds) => {
const left = new Set(
[...prevState.userIds].filter((id) => !userIds.has(id)),
);
const joined = new Set(
[...userIds].filter((id) => !prevState.userIds.has(id)),
);
return { userIds: userIds, joinedUserIds: joined, leftUserIds: left };
},
{ userIds: new Set(), joinedUserIds: new Set(), leftUserIds: new Set() },
),
);
private readonly allOthersLeft$ = this.matrixUserChanges$.pipe(
filter(({ userIds, leftUserIds }) => {
if (!this.userId) {
logger.warn("Could not access user ID to compute allOthersLeft");
return false;
}
return (
userIds.size === 1 && userIds.has(this.userId) && leftUserIds.size > 0
);
}),
map(() => "allOthersLeft" as const),
);
// Public for testing // Public for testing
public readonly autoLeave$ = merge( public readonly autoLeave$ = merge(
this.options.autoLeaveWhenOthersLeft ? this.allOthersLeft$ : NEVER, this.options.autoLeaveWhenOthersLeft
? this.allOthersLeft$.pipe(map(() => "allOthersLeft" as const))
: NEVER,
this.callPickupState$.pipe( this.callPickupState$.pipe(
filter((state) => state === "timeout" || state === "decline"), filter((state) => state === "timeout" || state === "decline"),
), ),
@@ -1143,6 +1200,9 @@ export class CallViewModel extends ViewModel {
merge(this.userHangup$, this.widgetHangup$).pipe( merge(this.userHangup$, this.widgetHangup$).pipe(
map(() => "user" as const), map(() => "user" as const),
), ),
).pipe(
this.scope.share,
tap((reason) => this.leaveHoisted$.next(reason)),
); );
/** /**
@@ -1831,9 +1891,12 @@ export class CallViewModel extends ViewModel {
* Whether we are sharing our screen. * Whether we are sharing our screen.
*/ */
public readonly sharingScreen$ = this.scope.behavior( public readonly sharingScreen$ = this.scope.behavior(
from(this.localConnection).pipe( from(this.localConnection$).pipe(
switchMap((c) => sharingScreen$(c.livekitRoom.localParticipant)), switchMap((c) =>
startWith(false), c?.state === "ready"
? sharingScreen$(c.value.livekitRoom.localParticipant)
: of(false),
),
), ),
); );
@@ -1845,17 +1908,26 @@ export class CallViewModel extends ViewModel {
"getDisplayMedia" in (navigator.mediaDevices ?? {}) && "getDisplayMedia" in (navigator.mediaDevices ?? {}) &&
!this.urlParams.hideScreensharing !this.urlParams.hideScreensharing
? (): void => ? (): void =>
void this.localConnection.then( // Once a connection is ready...
(c) => void this.localConnection$
void c.livekitRoom.localParticipant .pipe(
.setScreenShareEnabled(!this.sharingScreen$.value, { takeWhile((c) => c !== null && c.state !== "error"),
audio: true, switchMap((c) => (c.state === "ready" ? of(c.value) : NEVER)),
selfBrowserSurface: "include", take(1),
surfaceSwitching: "include", this.scope.bind(),
systemAudio: "include", )
}) // ...toggle screen sharing.
.catch(logger.error), .subscribe(
) (c) =>
void c.livekitRoom.localParticipant
.setScreenShareEnabled(!this.sharingScreen$.value, {
audio: true,
selfBrowserSurface: "include",
surfaceSwitching: "include",
systemAudio: "include",
})
.catch(logger.error),
)
: null; : null;
public constructor( public constructor(
@@ -1875,61 +1947,72 @@ export class CallViewModel extends ViewModel {
) { ) {
super(); super();
void from(this.localConnection) // Start and stop local and remote connections as needed
this.connectionInstructions$
.pipe(this.scope.bind()) .pipe(this.scope.bind())
.subscribe( .subscribe(({ start, stop }) => {
(c) => for (const c of stop) {
void c logger.info(`Disconnecting from ${c.localTransport.livekit_service_url}`);
.start() c.stop();
// eslint-disable-next-line no-console }
.then(() => console.log("successfully started publishing")) for (const c of start) {
// eslint-disable-next-line no-console c.start().then(
.catch((e) => console.error("failed to start publishing", e)), () =>
); logger.info(`Connected to ${c.localTransport.livekit_service_url}`),
(e) =>
this.startConnection$ logger.error(
.pipe(this.scope.bind()) `Failed to start connection to ${c.localTransport.livekit_service_url}`,
.subscribe((c) => void c.start()); e,
this.stopConnection$.pipe(this.scope.bind()).subscribe((c) => void c.stop());
combineLatest([this.localFocus, this.join$])
.pipe(this.scope.bind())
.subscribe(([localFocus]) => {
void enterRTCSession(
this.matrixRTCSession,
localFocus,
this.options.encryptionSystem.kind !== E2eeType.NONE,
true,
true,
)
.catch((e) => logger.error("Error entering RTC session", e))
.then(() =>
// Update our member event when our mute state changes.
this.muteStates.video.enabled$
.pipe(this.scope.bind(), takeUntil(this.leave$))
// eslint-disable-next-line rxjs/no-nested-subscribe
.subscribe(
(videoEnabled) =>
// TODO: Ensure that these calls are serialized in case of
// fast video toggling
void this.matrixRTCSession.updateCallIntent(
videoEnabled ? "video" : "audio",
),
), ),
); );
}
}); });
this.leave$.pipe(this.scope.bind()).subscribe(() => { // Start and stop session membership as needed
// Only sends Matrix leave event. The LiveKit session will disconnect once, uh... this.scope.reconcile(this.localTransport$, async (localTransport) => {
// (TODO-MULTI-SFU does anything actually cause it to disconnect?) if (localTransport?.state === "ready") {
void this.matrixRTCSession try {
.leaveRoomSession() await enterRTCSession(
.catch((e) => logger.error("Error leaving RTC session", e)) this.matrixRTCSession,
.then(async () => localTransport.value,
widget?.api.transport this.options.encryptionSystem.kind !== E2eeType.NONE,
.send(ElementWidgetActions.HangupCall, {}) true,
.catch((e) => logger.error("Failed to send hangup action", e)), true,
multiSfu.value$.value,
);
} catch (e) {
logger.error("Error entering RTC session", e);
}
// Update our member event when our mute state changes.
const muteSubscription = this.muteStates.video.enabled$.subscribe(
(videoEnabled) =>
// TODO: Ensure that these calls are serialized in case of
// fast video toggling
void this.matrixRTCSession.updateCallIntent(
videoEnabled ? "video" : "audio",
),
); );
return async (): Promise<void> => {
muteSubscription.unsubscribe();
// Only sends Matrix leave event. The LiveKit session will disconnect
// as soon as either the stopConnection$ handler above gets to it or
// the view model is destroyed.
try {
await this.matrixRTCSession.leaveRoomSession();
} catch (e) {
logger.error("Error leaving RTC session", e);
}
try {
await widget?.api.transport.send(
ElementWidgetActions.HangupCall,
{},
);
} catch (e) {
logger.error("Failed to send hangup action", e);
}
};
}
}); });
// Pause upstream of all local media tracks when we're disconnected from // Pause upstream of all local media tracks when we're disconnected from
@@ -1938,10 +2021,12 @@ export class CallViewModel extends ViewModel {
// We use matrixConnected$ rather than reconnecting$ because we want to // We use matrixConnected$ rather than reconnecting$ because we want to
// pause tracks during the initial joining sequence too until we're sure // pause tracks during the initial joining sequence too until we're sure
// that our own media is displayed on screen. // that our own media is displayed on screen.
void this.localConnection.then((localConnection) => combineLatest([this.localConnection$, this.matrixConnected$])
this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => { .pipe(this.scope.bind())
.subscribe(([connection, connected]) => {
if (connection?.state !== "ready") return;
const publications = const publications =
localConnection.livekitRoom.localParticipant.trackPublications.values(); connection.value.livekitRoom.localParticipant.trackPublications.values();
if (connected) { if (connected) {
for (const p of publications) { for (const p of publications) {
if (p.track?.isUpstreamPaused === true) { if (p.track?.isUpstreamPaused === true) {
@@ -1977,8 +2062,7 @@ export class CallViewModel extends ViewModel {
} }
} }
} }
}), });
);
// Join automatically // Join automatically
this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked? this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked?

View File

@@ -5,8 +5,8 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details. Please see LICENSE in the repository root for full details.
*/ */
import { afterEach, describe, expect, it, type Mock, Mocked, type MockedObject, vi } from "vitest"; import { afterEach, describe, expect, it, type Mock, type MockedObject, vi } from "vitest";
import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; import type { CallMembership, LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, of } from "rxjs"; import { BehaviorSubject, of } from "rxjs";
import { import {
ConnectionState, ConnectionState,
@@ -28,7 +28,6 @@ import { PublishConnection } from "./PublishConnection.ts";
import { mockMediaDevices, mockMuteStates } from "../utils/test.ts"; import { mockMediaDevices, mockMuteStates } from "../utils/test.ts";
import type { ProcessorState } from "../livekit/TrackProcessorContext.tsx"; import type { ProcessorState } from "../livekit/TrackProcessorContext.tsx";
import { type MuteStates } from "./MuteStates.ts"; import { type MuteStates } from "./MuteStates.ts";
import { DeviceLabel, MediaDevice, SelectedDevice } from "./MediaDevices.ts";
let testScope: ObservableScope; let testScope: ObservableScope;
@@ -41,9 +40,9 @@ let localParticipantEventEmiter: EventEmitter;
let fakeLocalParticipant: MockedObject<LocalParticipant>; let fakeLocalParticipant: MockedObject<LocalParticipant>;
let fakeRoomEventEmiter: EventEmitter; let fakeRoomEventEmiter: EventEmitter;
let fakeMembershipsFocusMap$: BehaviorSubject<{ membership: CallMembership; focus: LivekitFocus }[]>; let fakeMembershipsFocusMap$: BehaviorSubject<{ membership: CallMembership; transport: LivekitTransport }[]>;
const livekitFocus: LivekitFocus = { const livekitFocus: LivekitTransport = {
livekit_alias: "!roomID:example.org", livekit_alias: "!roomID:example.org",
livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt", livekit_service_url: "https://matrix-rtc.example.org/livekit/jwt",
type: "livekit" type: "livekit"
@@ -62,7 +61,7 @@ function setupTest(): void {
), ),
getDeviceId: vi.fn().mockReturnValue("ABCDEF") getDeviceId: vi.fn().mockReturnValue("ABCDEF")
} as unknown as OpenIDClientParts); } as unknown as OpenIDClientParts);
fakeMembershipsFocusMap$ = new BehaviorSubject<{ membership: CallMembership; focus: LivekitFocus }[]>([]); fakeMembershipsFocusMap$ = new BehaviorSubject<{ membership: CallMembership; transport: LivekitTransport }[]>([]);
localParticipantEventEmiter = new EventEmitter(); localParticipantEventEmiter = new EventEmitter();
@@ -98,8 +97,8 @@ function setupRemoteConnection(): RemoteConnection {
const opts: ConnectionOpts = { const opts: ConnectionOpts = {
client: client, client: client,
focus: livekitFocus, transport: livekitFocus,
membershipsFocusMap$: fakeMembershipsFocusMap$, remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope, scope: testScope,
livekitRoomFactory: () => fakeLivekitRoom livekitRoomFactory: () => fakeLivekitRoom
}; };
@@ -142,8 +141,8 @@ describe("Start connection states", () => {
const opts: ConnectionOpts = { const opts: ConnectionOpts = {
client: client, client: client,
focus: livekitFocus, transport: livekitFocus,
membershipsFocusMap$: fakeMembershipsFocusMap$, remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope, scope: testScope,
livekitRoomFactory: () => fakeLivekitRoom livekitRoomFactory: () => fakeLivekitRoom
}; };
@@ -162,8 +161,8 @@ describe("Start connection states", () => {
const opts: ConnectionOpts = { const opts: ConnectionOpts = {
client: client, client: client,
focus: livekitFocus, transport: livekitFocus,
membershipsFocusMap$: fakeMembershipsFocusMap$, remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope, scope: testScope,
livekitRoomFactory: () => fakeLivekitRoom livekitRoomFactory: () => fakeLivekitRoom
}; };
@@ -215,8 +214,8 @@ describe("Start connection states", () => {
const opts: ConnectionOpts = { const opts: ConnectionOpts = {
client: client, client: client,
focus: livekitFocus, transport: livekitFocus,
membershipsFocusMap$: fakeMembershipsFocusMap$, remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope, scope: testScope,
livekitRoomFactory: () => fakeLivekitRoom livekitRoomFactory: () => fakeLivekitRoom
}; };
@@ -274,8 +273,8 @@ describe("Start connection states", () => {
const opts: ConnectionOpts = { const opts: ConnectionOpts = {
client: client, client: client,
focus: livekitFocus, transport: livekitFocus,
membershipsFocusMap$: fakeMembershipsFocusMap$, remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope, scope: testScope,
livekitRoomFactory: () => fakeLivekitRoom livekitRoomFactory: () => fakeLivekitRoom
}; };
@@ -502,7 +501,7 @@ describe("Publishing participants observations", () => {
expect(observedPublishers.pop()!.length).toEqual(0); expect(observedPublishers.pop()!.length).toEqual(0);
const otherFocus: LivekitFocus = { const otherFocus: LivekitTransport = {
livekit_alias: "!roomID:example.org", livekit_alias: "!roomID:example.org",
livekit_service_url: "https://other-matrix-rtc.example.org/livekit/jwt", livekit_service_url: "https://other-matrix-rtc.example.org/livekit/jwt",
type: "livekit" type: "livekit"
@@ -511,10 +510,10 @@ describe("Publishing participants observations", () => {
const rtcMemberships = [ const rtcMemberships = [
// Say bob is on the same focus // Say bob is on the same focus
{ membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), focus: livekitFocus }, { membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), transport: livekitFocus },
// Alice and carol is on a different focus // Alice and carol is on a different focus
{ membership: fakeRtcMemberShip("@alice:example.org", "DEV000"), focus: otherFocus }, { membership: fakeRtcMemberShip("@alice:example.org", "DEV000"), transport: otherFocus },
{ membership: fakeRtcMemberShip("@carol:example.org", "DEV222"), focus: otherFocus } { membership: fakeRtcMemberShip("@carol:example.org", "DEV222"), transport: otherFocus }
// NO DAVE YET // NO DAVE YET
]; ];
// signal this change in rtc memberships // signal this change in rtc memberships
@@ -528,7 +527,7 @@ describe("Publishing participants observations", () => {
// Now let's make dan join the rtc memberships // Now let's make dan join the rtc memberships
rtcMemberships rtcMemberships
.push({ membership: fakeRtcMemberShip("@dan:example.org", "DEV333"), focus: livekitFocus }); .push({ membership: fakeRtcMemberShip("@dan:example.org", "DEV333"), transport: livekitFocus });
fakeMembershipsFocusMap$.next(rtcMemberships); fakeMembershipsFocusMap$.next(rtcMemberships);
// We should have bob and dan has publishers now // We should have bob and dan has publishers now
@@ -581,7 +580,7 @@ describe("Publishing participants observations", () => {
const rtcMemberships = [ const rtcMemberships = [
// Say bob is on the same focus // Say bob is on the same focus
{ membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), focus: livekitFocus } { membership: fakeRtcMemberShip("@bob:example.org", "DEV111"), transport: livekitFocus }
]; ];
// signal this change in rtc memberships // signal this change in rtc memberships
fakeMembershipsFocusMap$.next(rtcMemberships); fakeMembershipsFocusMap$.next(rtcMemberships);
@@ -610,7 +609,7 @@ describe("Publishing participants observations", () => {
describe("PublishConnection", () => { describe("PublishConnection", () => {
let fakeBlurProcessor: ProcessorWrapper<BackgroundOptions>; // let fakeBlurProcessor: ProcessorWrapper<BackgroundOptions>;
let roomFactoryMock: Mock<() => LivekitRoom>; let roomFactoryMock: Mock<() => LivekitRoom>;
let muteStates: MockedObject<MuteStates>; let muteStates: MockedObject<MuteStates>;
@@ -622,14 +621,13 @@ describe("PublishConnection", () => {
muteStates = mockMuteStates(); muteStates = mockMuteStates();
fakeBlurProcessor = vi.mocked<ProcessorWrapper<BackgroundOptions>>({ // fakeBlurProcessor = vi.mocked<ProcessorWrapper<BackgroundOptions>>({
name: "BackgroundBlur", // name: "BackgroundBlur",
start: vi.fn().mockResolvedValue(undefined), // restart: vi.fn().mockResolvedValue(undefined),
stop: vi.fn().mockResolvedValue(undefined), // setOptions: vi.fn().mockResolvedValue(undefined),
setOptions: vi.fn().mockResolvedValue(undefined), // getOptions: vi.fn().mockReturnValue({ strength: 0.5 }),
getOptions: vi.fn().mockReturnValue({ strength: 0.5 }), // isRunning: vi.fn().mockReturnValue(false)
isRunning: vi.fn().mockReturnValue(false) // });
});
} }
@@ -638,7 +636,7 @@ describe("PublishConnection", () => {
describe("Livekit room creation", () => { describe("Livekit room creation", () => {
function createSetup() { function createSetup(): void {
setUpPublishConnection(); setUpPublishConnection();
const fakeTrackProcessorSubject$ = new BehaviorSubject<ProcessorState>({ const fakeTrackProcessorSubject$ = new BehaviorSubject<ProcessorState>({
@@ -648,8 +646,8 @@ describe("PublishConnection", () => {
const opts: ConnectionOpts = { const opts: ConnectionOpts = {
client: client, client: client,
focus: livekitFocus, transport: livekitFocus,
membershipsFocusMap$: fakeMembershipsFocusMap$, remoteTransports$: fakeMembershipsFocusMap$,
scope: testScope, scope: testScope,
livekitRoomFactory: roomFactoryMock livekitRoomFactory: roomFactoryMock
}; };

View File

@@ -7,7 +7,7 @@ Please see LICENSE in the repository root for full details.
import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core"; import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core";
import { type ConnectionState, type E2EEOptions, Room as LivekitRoom, type RoomOptions } from "livekit-client"; import { type ConnectionState, type E2EEOptions, Room as LivekitRoom, type RoomOptions } from "livekit-client";
import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; import { type CallMembership, type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, combineLatest } from "rxjs"; import { BehaviorSubject, combineLatest } from "rxjs";
import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig } from "../livekit/openIDSFU"; import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig } from "../livekit/openIDSFU";
@@ -17,13 +17,13 @@ import { defaultLiveKitOptions } from "../livekit/options";
export interface ConnectionOpts { export interface ConnectionOpts {
/** The focus server to connect to. */ /** The focus server to connect to. */
focus: LivekitFocus; transport: LivekitTransport;
/** The Matrix client to use for OpenID and SFU config requests. */ /** The Matrix client to use for OpenID and SFU config requests. */
client: OpenIDClientParts; client: OpenIDClientParts;
/** The observable scope to use for this connection. */ /** The observable scope to use for this connection. */
scope: ObservableScope; scope: ObservableScope;
/** An observable of the current RTC call memberships and their associated focus. */ /** An observable of the current RTC call memberships and their associated focus. */
membershipsFocusMap$: Behavior<{ membership: CallMembership; focus: LivekitFocus }[]>; remoteTransports$: Behavior<{ membership: CallMembership; transport: LivekitTransport }[]>;
/** Optional factory to create the Livekit room, mainly for testing purposes. */ /** Optional factory to create the Livekit room, mainly for testing purposes. */
livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom; livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom;
@@ -31,12 +31,12 @@ export interface ConnectionOpts {
export type FocusConnectionState = export type FocusConnectionState =
| { state: 'Initialized' } | { state: 'Initialized' }
| { state: 'FetchingConfig', focus: LivekitFocus } | { state: 'FetchingConfig', focus: LivekitTransport }
| { state: 'ConnectingToLkRoom', focus: LivekitFocus } | { state: 'ConnectingToLkRoom', focus: LivekitTransport }
| { state: 'PublishingTracks', focus: LivekitFocus } | { state: 'PublishingTracks', focus: LivekitTransport }
| { state: 'FailedToStart', error: Error, focus: LivekitFocus } | { state: 'FailedToStart', error: Error, focus: LivekitTransport }
| { state: 'ConnectedToLkRoom', connectionState: ConnectionState, focus: LivekitFocus } | { state: 'ConnectedToLkRoom', connectionState: ConnectionState, focus: LivekitTransport }
| { state: 'Stopped', focus: LivekitFocus }; | { state: 'Stopped', focus: LivekitTransport };
/** /**
* A connection to a Matrix RTC LiveKit backend. * A connection to a Matrix RTC LiveKit backend.
@@ -71,20 +71,20 @@ export class Connection {
public async start(): Promise<void> { public async start(): Promise<void> {
this.stopped = false; this.stopped = false;
try { try {
this._focusedConnectionState$.next({ state: 'FetchingConfig', focus: this.targetFocus }); this._focusedConnectionState$.next({ state: 'FetchingConfig', focus: this.localTransport });
// TODO could this be loaded earlier to save time? // TODO could this be loaded earlier to save time?
const { url, jwt } = await this.getSFUConfigWithOpenID(); const { url, jwt } = await this.getSFUConfigWithOpenID();
// If we were stopped while fetching the config, don't proceed to connect // If we were stopped while fetching the config, don't proceed to connect
if (this.stopped) return; if (this.stopped) return;
this._focusedConnectionState$.next({ state: 'ConnectingToLkRoom', focus: this.targetFocus }); this._focusedConnectionState$.next({ state: 'ConnectingToLkRoom', focus: this.localTransport });
await this.livekitRoom.connect(url, jwt); await this.livekitRoom.connect(url, jwt);
// If we were stopped while connecting, don't proceed to update state. // If we were stopped while connecting, don't proceed to update state.
if (this.stopped) return; if (this.stopped) return;
this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', focus: this.targetFocus, connectionState: this.livekitRoom.state }); this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', focus: this.localTransport, connectionState: this.livekitRoom.state });
} catch (error) { } catch (error) {
this._focusedConnectionState$.next({ state: 'FailedToStart', error: error instanceof Error ? error : new Error(`${error}`), focus: this.targetFocus }); this._focusedConnectionState$.next({ state: 'FailedToStart', error: error instanceof Error ? error : new Error(`${error}`), focus: this.localTransport });
throw error; throw error;
} }
} }
@@ -93,8 +93,8 @@ export class Connection {
protected async getSFUConfigWithOpenID(): Promise<SFUConfig> { protected async getSFUConfigWithOpenID(): Promise<SFUConfig> {
return await getSFUConfigWithOpenID( return await getSFUConfigWithOpenID(
this.client, this.client,
this.targetFocus.livekit_service_url, this.localTransport.livekit_service_url,
this.targetFocus.livekit_alias this.localTransport.livekit_alias
) )
} }
/** /**
@@ -106,7 +106,7 @@ export class Connection {
public async stop(): Promise<void> { public async stop(): Promise<void> {
if (this.stopped) return; if (this.stopped) return;
await this.livekitRoom.disconnect(); await this.livekitRoom.disconnect();
this._focusedConnectionState$.next({ state: 'Stopped', focus: this.targetFocus }); this._focusedConnectionState$.next({ state: 'Stopped', focus: this.localTransport });
this.stopped = true; this.stopped = true;
} }
@@ -121,7 +121,7 @@ export class Connection {
/** /**
* The focus server to connect to. * The focus server to connect to.
*/ */
protected readonly targetFocus: LivekitFocus; public readonly localTransport: LivekitTransport;
private readonly client: OpenIDClientParts; private readonly client: OpenIDClientParts;
/** /**
@@ -135,11 +135,11 @@ export class Connection {
public readonly livekitRoom: LivekitRoom, public readonly livekitRoom: LivekitRoom,
opts: ConnectionOpts, opts: ConnectionOpts,
) { ) {
const { focus, client, scope, membershipsFocusMap$ } = const { transport, client, scope, remoteTransports$ } =
opts; opts;
this.livekitRoom = livekitRoom this.livekitRoom = livekitRoom
this.targetFocus = focus; this.localTransport = transport;
this.client = client; this.client = client;
this.focusedConnectionState$ = scope.behavior( this.focusedConnectionState$ = scope.behavior(
@@ -153,23 +153,23 @@ export class Connection {
this.publishingParticipants$ = scope.behavior( this.publishingParticipants$ = scope.behavior(
combineLatest( combineLatest(
[participantsIncludingSubscribers$, membershipsFocusMap$], [participantsIncludingSubscribers$, remoteTransports$],
(participants, membershipsFocusMap) => (participants, remoteTransports) =>
membershipsFocusMap remoteTransports
// Find all members that claim to publish on this connection // Find all members that claim to publish on this connection
.flatMap(({ membership, focus }) => .flatMap(({ membership, transport }) =>
focus.livekit_service_url === this.targetFocus.livekit_service_url transport.livekit_service_url ===
this.localTransport.livekit_service_url
? [membership] ? [membership]
: [] : []
) )
// Find all associated publishing livekit participant objects // Pair with their associated LiveKit participant (if any)
// Uses flatMap to filter out memberships with no associated rtc participant ([])
.flatMap((membership) => { .flatMap((membership) => {
const participant = participants.find( const id = `${membership.sender}:${membership.deviceId}`;
(p) => const participant = participants.find((p) => p.identity === id);
p.identity === `${membership.sender}:${membership.deviceId}`
);
return participant ? [{ participant, membership }] : []; return participant ? [{ participant, membership }] : [];
}) }),
), ),
[] []
); );

View File

@@ -255,7 +255,7 @@ abstract class BaseMediaViewModel extends ViewModel {
*/ */
// TODO: Fully separate the data layer from the UI layer by keeping the // TODO: Fully separate the data layer from the UI layer by keeping the
// member object internal // member object internal
public readonly member: RoomMember | undefined, public readonly member: RoomMember,
// We don't necessarily have a participant if a user connects via MatrixRTC but not (yet) through // We don't necessarily have a participant if a user connects via MatrixRTC but not (yet) through
// livekit. // livekit.
protected readonly participant$: Observable< protected readonly participant$: Observable<
@@ -403,7 +403,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel {
public constructor( public constructor(
id: string, id: string,
member: RoomMember | undefined, member: RoomMember,
participant$: Observable<LocalParticipant | RemoteParticipant | undefined>, participant$: Observable<LocalParticipant | RemoteParticipant | undefined>,
encryptionSystem: EncryptionSystem, encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom, livekitRoom: LivekitRoom,
@@ -535,7 +535,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel {
public constructor( public constructor(
id: string, id: string,
member: RoomMember | undefined, member: RoomMember,
participant$: Behavior<LocalParticipant | undefined>, participant$: Behavior<LocalParticipant | undefined>,
encryptionSystem: EncryptionSystem, encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom, livekitRoom: LivekitRoom,
@@ -641,7 +641,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel {
public constructor( public constructor(
id: string, id: string,
member: RoomMember | undefined, member: RoomMember,
participant$: Observable<RemoteParticipant | undefined>, participant$: Observable<RemoteParticipant | undefined>,
encryptionSystem: EncryptionSystem, encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom, livekitRoom: LivekitRoom,
@@ -736,7 +736,7 @@ export class ScreenShareViewModel extends BaseMediaViewModel {
public constructor( public constructor(
id: string, id: string,
member: RoomMember | undefined, member: RoomMember,
participant$: Observable<LocalParticipant | RemoteParticipant>, participant$: Observable<LocalParticipant | RemoteParticipant>,
encryptionSystem: EncryptionSystem, encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom, livekitRoom: LivekitRoom,

View File

@@ -137,7 +137,7 @@ export class MuteStates {
this.scope, this.scope,
this.mediaDevices.audioInput, this.mediaDevices.audioInput,
this.joined$, this.joined$,
Config.get().media_devices.enable_video, Config.get().media_devices.enable_audio,
); );
public readonly video = new MuteState( public readonly video = new MuteState(
this.scope, this.scope,

View File

@@ -7,10 +7,14 @@ Please see LICENSE in the repository root for full details.
import { import {
BehaviorSubject, BehaviorSubject,
catchError,
distinctUntilChanged, distinctUntilChanged,
EMPTY,
endWith,
filter,
type Observable, type Observable,
share, share,
Subject, take,
takeUntil, takeUntil,
} from "rxjs"; } from "rxjs";
@@ -24,9 +28,11 @@ const nothing = Symbol("nothing");
* A scope which limits the execution lifetime of its bound Observables. * A scope which limits the execution lifetime of its bound Observables.
*/ */
export class ObservableScope { export class ObservableScope {
private readonly ended$ = new Subject<void>(); private readonly ended$ = new BehaviorSubject(false);
private readonly bindImpl: MonoTypeOperator = takeUntil(this.ended$); private readonly bindImpl: MonoTypeOperator = takeUntil(
this.ended$.pipe(filter((ended) => ended)),
);
/** /**
* Binds an Observable to this scope, so that it completes when the scope * Binds an Observable to this scope, so that it completes when the scope
@@ -78,15 +84,54 @@ export class ObservableScope {
* Ends the scope, causing any bound Observables to complete. * Ends the scope, causing any bound Observables to complete.
*/ */
public end(): void { public end(): void {
this.ended$.next(); this.ended$.next(true);
this.ended$.complete();
} }
/** /**
* Register a callback to be executed when the scope is ended. * Register a callback to be executed when the scope is ended.
*/ */
public onEnd(callback: () => void): void { public onEnd(callback: () => void): void {
this.ended$.subscribe(callback); this.ended$
.pipe(
filter((ended) => ended),
take(1),
)
.subscribe(callback);
}
// TODO-MULTI-SFU Dear Future Robin, please document this. Love, Past Robin.
public reconcile<T>(
value$: Behavior<T>,
callback: (value: T) => Promise<(() => Promise<void>) | undefined>,
): void {
let latestValue: T | typeof nothing = nothing;
let reconciledValue: T | typeof nothing = nothing;
let cleanUp: (() => Promise<void>) | undefined = undefined;
let callbackPromise: Promise<(() => Promise<void>) | undefined>;
value$
.pipe(
catchError(() => EMPTY),
this.bind(),
endWith(nothing),
)
.subscribe((value) => {
void (async (): Promise<void> => {
if (latestValue === nothing) {
latestValue = value;
while (latestValue !== reconciledValue) {
await cleanUp?.();
reconciledValue = latestValue;
if (latestValue !== nothing) {
callbackPromise = callback(latestValue);
cleanUp = await callbackPromise;
}
}
latestValue = nothing;
} else {
latestValue = value;
}
})();
});
} }
} }

View File

@@ -32,7 +32,7 @@ interface Props extends ComponentProps<typeof animated.div> {
video: TrackReferenceOrPlaceholder | undefined; video: TrackReferenceOrPlaceholder | undefined;
videoFit: "cover" | "contain"; videoFit: "cover" | "contain";
mirror: boolean; mirror: boolean;
member: RoomMember | undefined; member: RoomMember;
videoEnabled: boolean; videoEnabled: boolean;
unencryptedWarning: boolean; unencryptedWarning: boolean;
encryptionStatus: EncryptionStatus; encryptionStatus: EncryptionStatus;

View File

@@ -55,7 +55,7 @@ interface SpotlightItemBaseProps {
targetHeight: number; targetHeight: number;
video: TrackReferenceOrPlaceholder | undefined; video: TrackReferenceOrPlaceholder | undefined;
videoEnabled: boolean; videoEnabled: boolean;
member: RoomMember | undefined; member: RoomMember;
unencryptedWarning: boolean; unencryptedWarning: boolean;
encryptionStatus: EncryptionStatus; encryptionStatus: EncryptionStatus;
displayName: string; displayName: string;

View File

@@ -11,7 +11,7 @@ export enum ErrorCode {
/** /**
* Configuration problem due to no MatrixRTC backend/SFU is exposed via .well-known and no fallback configured. * Configuration problem due to no MatrixRTC backend/SFU is exposed via .well-known and no fallback configured.
*/ */
MISSING_MATRIX_RTC_FOCUS = "MISSING_MATRIX_RTC_FOCUS", MISSING_MATRIX_RTC_TRANSPORT = "MISSING_MATRIX_RTC_TRANSPORT",
CONNECTION_LOST_ERROR = "CONNECTION_LOST_ERROR", CONNECTION_LOST_ERROR = "CONNECTION_LOST_ERROR",
/** LiveKit indicates that the server has hit its track limits */ /** LiveKit indicates that the server has hit its track limits */
INSUFFICIENT_CAPACITY_ERROR = "INSUFFICIENT_CAPACITY_ERROR", INSUFFICIENT_CAPACITY_ERROR = "INSUFFICIENT_CAPACITY_ERROR",
@@ -54,18 +54,18 @@ export class ElementCallError extends Error {
} }
} }
export class MatrixRTCFocusMissingError extends ElementCallError { export class MatrixRTCTransportMissingError extends ElementCallError {
public domain: string; public domain: string;
public constructor(domain: string) { public constructor(domain: string) {
super( super(
t("error.call_is_not_supported"), t("error.call_is_not_supported"),
ErrorCode.MISSING_MATRIX_RTC_FOCUS, ErrorCode.MISSING_MATRIX_RTC_TRANSPORT,
ErrorCategory.CONFIGURATION_ISSUE, ErrorCategory.CONFIGURATION_ISSUE,
t("error.matrix_rtc_focus_missing", { t("error.matrix_rtc_transport_missing", {
domain, domain,
brand: import.meta.env.VITE_PRODUCT_NAME || "Element Call", brand: import.meta.env.VITE_PRODUCT_NAME || "Element Call",
errorCode: ErrorCode.MISSING_MATRIX_RTC_FOCUS, errorCode: ErrorCode.MISSING_MATRIX_RTC_TRANSPORT,
}), }),
); );
this.domain = domain; this.domain = domain;

View File

@@ -16,12 +16,13 @@ import {
} from "matrix-js-sdk"; } from "matrix-js-sdk";
import { import {
CallMembership, CallMembership,
type Focus, type Transport,
MatrixRTCSessionEvent, MatrixRTCSessionEvent,
type MatrixRTCSessionEventHandlerMap, type MatrixRTCSessionEventHandlerMap,
MembershipManagerEvent, MembershipManagerEvent,
type SessionMembershipData, type SessionMembershipData,
Status, Status,
type LivekitFocusSelection,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { type MembershipManagerEventHandlerMap } from "matrix-js-sdk/lib/matrixrtc/IMembershipManager"; import { type MembershipManagerEventHandlerMap } from "matrix-js-sdk/lib/matrixrtc/IMembershipManager";
import { import {
@@ -172,8 +173,11 @@ export function mockRtcMembership(
user: string | RoomMember, user: string | RoomMember,
deviceId: string, deviceId: string,
callId = "", callId = "",
fociPreferred: Focus[] = [], fociPreferred: Transport[] = [],
focusActive: Focus = { type: "oldest_membership" }, focusActive: LivekitFocusSelection = {
type: "livekit",
focus_selection: "oldest_membership",
},
membership: Partial<SessionMembershipData> = {}, membership: Partial<SessionMembershipData> = {},
): CallMembership { ): CallMembership {
const data: SessionMembershipData = { const data: SessionMembershipData = {