Merge branch 'voip-team/rebased-multiSFU' of github.com:element-hq/element-call into voip-team/rebased-multiSFU

This commit is contained in:
Robin
2025-10-14 09:22:08 -04:00
51 changed files with 2340 additions and 1067 deletions

View File

@@ -5,39 +5,33 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { observeParticipantEvents } from "@livekit/components-core";
import {
ConnectionState,
type BaseKeyProvider,
ConnectionState,
type E2EEOptions,
ExternalE2EEKeyProvider,
type Room as LivekitRoom,
type LocalParticipant,
ParticipantEvent,
type RemoteParticipant,
type Participant,
RemoteParticipant,
type Room as LivekitRoom,
} from "livekit-client";
import E2EEWorker from "livekit-client/e2ee-worker?worker";
import {
ClientEvent,
type EventTimelineSetHandlerMap,
EventType,
type Room as MatrixRoom,
RoomEvent,
type RoomMember,
RoomStateEvent,
SyncState,
type Room as MatrixRoom,
type EventTimelineSetHandlerMap,
EventType,
RoomEvent,
} from "matrix-js-sdk";
import { deepCompare } from "matrix-js-sdk/lib/utils";
import {
BehaviorSubject,
EMPTY,
NEVER,
type Observable,
Subject,
combineLatest,
concat,
distinctUntilChanged,
EMPTY,
endWith,
filter,
from,
@@ -45,6 +39,8 @@ import {
ignoreElements,
map,
merge,
NEVER,
type Observable,
of,
pairwise,
race,
@@ -53,6 +49,7 @@ import {
skip,
skipWhile,
startWith,
Subject,
switchAll,
switchMap,
switchScan,
@@ -80,7 +77,7 @@ import { ViewModel } from "./ViewModel";
import {
LocalUserMediaViewModel,
type MediaViewModel,
RemoteUserMediaViewModel,
type RemoteUserMediaViewModel,
ScreenShareViewModel,
type UserMediaViewModel,
} from "./MediaViewModel";
@@ -90,7 +87,6 @@ import {
finalizeValue,
pauseWhen,
} from "../utils/observable";
import { ObservableScope } from "./ObservableScope";
import {
duplicateTiles,
multiSfu,
@@ -99,10 +95,6 @@ import {
} from "../settings/settings";
import { isFirefox } from "../Platform";
import { setPipEnabled$ } from "../controls";
import {
type GridTileViewModel,
type SpotlightTileViewModel,
} from "./TileViewModel";
import { TileStore } from "./TileStore";
import { gridLikeLayout } from "./GridLikeLayout";
import { spotlightExpandedLayout } from "./SpotlightExpandedLayout";
@@ -114,11 +106,10 @@ import {
type ReactionInfo,
type ReactionOption,
} from "../reactions";
import { observeSpeaker$ } from "./observeSpeaker";
import { shallowEquals } from "../utils/array";
import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname";
import { type MediaDevices } from "./MediaDevices";
import { constant, type Behavior } from "./Behavior";
import { type Behavior, constant } from "./Behavior";
import {
enterRTCSession,
getLivekitAlias,
@@ -126,12 +117,29 @@ import {
} from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
import { Connection, PublishConnection } from "./Connection";
import {
type Connection,
type ConnectionOpts,
RemoteConnection,
} from "./Connection";
import { type MuteStates } from "./MuteStates";
import { getUrlParams } from "../UrlParams";
import { type ProcessorState } from "../livekit/TrackProcessorContext";
import { ElementWidgetActions, widget } from "../widget";
import { type Async, async, mapAsync, ready } from "./Async";
import { PublishConnection } from "./PublishConnection.ts";
import { type Async, async$, mapAsync, ready } from "./Async";
import { sharingScreen$, UserMedia } from "./UserMedia.ts";
import { ScreenShare } from "./ScreenShare.ts";
import {
type GridLayoutMedia,
type Layout,
type LayoutMedia,
type OneOnOneLayoutMedia,
type SpotlightExpandedLayoutMedia,
type SpotlightLandscapeLayoutMedia,
type SpotlightPortraitLayoutMedia,
} from "./layout-types.ts";
import { ElementCallError, UnknownCallError } from "../utils/errors.ts";
export interface CallViewModelOptions {
encryptionSystem: EncryptionSystem;
@@ -156,99 +164,6 @@ const smallMobileCallThreshold = 3;
// with the interface
const showFooterMs = 4000;
export interface GridLayoutMedia {
type: "grid";
spotlight?: MediaViewModel[];
grid: UserMediaViewModel[];
}
export interface SpotlightLandscapeLayoutMedia {
type: "spotlight-landscape";
spotlight: MediaViewModel[];
grid: UserMediaViewModel[];
}
export interface SpotlightPortraitLayoutMedia {
type: "spotlight-portrait";
spotlight: MediaViewModel[];
grid: UserMediaViewModel[];
}
export interface SpotlightExpandedLayoutMedia {
type: "spotlight-expanded";
spotlight: MediaViewModel[];
pip?: UserMediaViewModel;
}
export interface OneOnOneLayoutMedia {
type: "one-on-one";
local: UserMediaViewModel;
remote: UserMediaViewModel;
}
export interface PipLayoutMedia {
type: "pip";
spotlight: MediaViewModel[];
}
export type LayoutMedia =
| GridLayoutMedia
| SpotlightLandscapeLayoutMedia
| SpotlightPortraitLayoutMedia
| SpotlightExpandedLayoutMedia
| OneOnOneLayoutMedia
| PipLayoutMedia;
export interface GridLayout {
type: "grid";
spotlight?: SpotlightTileViewModel;
grid: GridTileViewModel[];
setVisibleTiles: (value: number) => void;
}
export interface SpotlightLandscapeLayout {
type: "spotlight-landscape";
spotlight: SpotlightTileViewModel;
grid: GridTileViewModel[];
setVisibleTiles: (value: number) => void;
}
export interface SpotlightPortraitLayout {
type: "spotlight-portrait";
spotlight: SpotlightTileViewModel;
grid: GridTileViewModel[];
setVisibleTiles: (value: number) => void;
}
export interface SpotlightExpandedLayout {
type: "spotlight-expanded";
spotlight: SpotlightTileViewModel;
pip?: GridTileViewModel;
}
export interface OneOnOneLayout {
type: "one-on-one";
local: GridTileViewModel;
remote: GridTileViewModel;
}
export interface PipLayout {
type: "pip";
spotlight: SpotlightTileViewModel;
}
/**
* A layout defining the media tiles present on screen and their visual
* arrangement.
*/
export type Layout =
| GridLayout
| SpotlightLandscapeLayout
| SpotlightPortraitLayout
| SpotlightExpandedLayout
| OneOnOneLayout
| PipLayout;
export type GridMode = "grid" | "spotlight";
export type WindowMode = "normal" | "narrow" | "flat" | "pip";
@@ -292,151 +207,8 @@ interface LayoutScanState {
tiles: TileStore;
}
class UserMedia {
private readonly scope = new ObservableScope();
public readonly vm: UserMediaViewModel;
private readonly participant$: BehaviorSubject<
LocalParticipant | RemoteParticipant | undefined
>;
public readonly speaker$: Behavior<boolean>;
public readonly presenter$: Behavior<boolean>;
public constructor(
public readonly id: string,
member: RoomMember,
participant: LocalParticipant | RemoteParticipant | undefined,
encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom,
mediaDevices: MediaDevices,
pretendToBeDisconnected$: Behavior<boolean>,
displayname$: Observable<string>,
handRaised$: Observable<Date | null>,
reaction$: Observable<ReactionOption | null>,
) {
this.participant$ = new BehaviorSubject(participant);
if (participant?.isLocal) {
this.vm = new LocalUserMediaViewModel(
this.id,
member,
this.participant$ as Behavior<LocalParticipant>,
encryptionSystem,
livekitRoom,
mediaDevices,
this.scope.behavior(displayname$),
this.scope.behavior(handRaised$),
this.scope.behavior(reaction$),
);
} else {
this.vm = new RemoteUserMediaViewModel(
id,
member,
this.participant$.asObservable() as Observable<
RemoteParticipant | undefined
>,
encryptionSystem,
livekitRoom,
pretendToBeDisconnected$,
this.scope.behavior(displayname$),
this.scope.behavior(handRaised$),
this.scope.behavior(reaction$),
);
}
this.speaker$ = this.scope.behavior(observeSpeaker$(this.vm.speaking$));
this.presenter$ = this.scope.behavior(
this.participant$.pipe(
switchMap((p) => (p === undefined ? of(false) : sharingScreen$(p))),
),
);
}
public updateParticipant(
newParticipant: LocalParticipant | RemoteParticipant | undefined,
): void {
if (this.participant$.value !== newParticipant) {
// Update the BehaviourSubject in the UserMedia.
this.participant$.next(newParticipant);
}
}
public destroy(): void {
this.scope.end();
this.vm.destroy();
}
}
class ScreenShare {
private readonly scope = new ObservableScope();
public readonly vm: ScreenShareViewModel;
private readonly participant$: BehaviorSubject<
LocalParticipant | RemoteParticipant
>;
public constructor(
id: string,
member: RoomMember,
participant: LocalParticipant | RemoteParticipant,
encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom,
pretendToBeDisconnected$: Behavior<boolean>,
displayName$: Observable<string>,
) {
this.participant$ = new BehaviorSubject(participant);
this.vm = new ScreenShareViewModel(
id,
member,
this.participant$.asObservable(),
encryptionSystem,
livekitRoom,
pretendToBeDisconnected$,
this.scope.behavior(displayName$),
participant.isLocal,
);
}
public destroy(): void {
this.scope.end();
this.vm.destroy();
}
}
type MediaItem = UserMedia | ScreenShare;
function getRoomMemberFromRtcMember(
rtcMember: CallMembership,
room: MatrixRoom,
): { id: string; member: RoomMember | undefined } {
// WARN! This is not exactly the sender but the user defined in the state key.
// This will be available once we change to the new "member as object" format in the MatrixRTC object.
let id = rtcMember.sender + ":" + rtcMember.deviceId;
if (!rtcMember.sender) {
return { id, member: undefined };
}
if (
rtcMember.sender === room.client.getUserId() &&
rtcMember.deviceId === room.client.getDeviceId()
) {
id = "local";
}
const member = room.getMember(rtcMember.sender) ?? undefined;
return { id, member };
}
function sharingScreen$(p: Participant): Observable<boolean> {
return observeParticipantEvents(
p,
ParticipantEvent.TrackPublished,
ParticipantEvent.TrackUnpublished,
ParticipantEvent.LocalTrackPublished,
ParticipantEvent.LocalTrackUnpublished,
).pipe(map((p) => p.isScreenShareEnabled));
}
export class CallViewModel extends ViewModel {
private readonly urlParams = getUrlParams();
@@ -454,6 +226,19 @@ export class CallViewModel extends ViewModel {
}
: undefined;
private readonly _configError$ = new BehaviorSubject<ElementCallError | null>(
null,
);
/**
* If there is a configuration error with the call (e.g. misconfigured E2EE).
* This is a fatal error that prevents the call from being created/joined.
* Should render a blocking error screen.
*/
public get configError$(): Behavior<ElementCallError | null> {
return this._configError$;
}
private readonly join$ = new Subject<void>();
public join(): void {
@@ -503,7 +288,7 @@ export class CallViewModel extends ViewModel {
* The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps).
*/
private readonly preferredTransport = makeTransport(this.matrixRTCSession);
private readonly preferredTransport$: Observable<Async<LivekitTransport>>;
/**
* Lists the transports used by ourselves, plus all other MatrixRTC session
@@ -523,11 +308,7 @@ export class CallViewModel extends ViewModel {
switchMap((joined) =>
joined
? combineLatest(
[
async(this.preferredTransport),
this.memberships$,
multiSfu.value$,
],
[this.preferredTransport$, this.memberships$, multiSfu.value$],
(preferred, memberships, multiSfu) => {
const oldestMembership =
this.matrixRTCSession.getOldestMembership();
@@ -544,9 +325,18 @@ export class CallViewModel extends ViewModel {
const oldest = this.matrixRTCSession.getOldestMembership();
if (oldest !== undefined) {
const selection = oldest.getTransport(oldest);
if (isLivekitTransport(selection)) local = ready(selection);
// TODO selection can be null if no transport is configured should we report an error?
if (selection && isLivekitTransport(selection))
local = ready(selection);
}
}
if (local.state === "error") {
this._configError$.next(
local.value instanceof ElementCallError
? local.value
: new UnknownCallError(local.value),
);
}
return { local, remote, preferred, multiSfu };
},
)
@@ -565,12 +355,13 @@ export class CallViewModel extends ViewModel {
/**
* The transport over which we should be actively publishing our media.
* null when not joined.
*/
private readonly localTransport$: Behavior<Async<LivekitTransport> | null> =
this.scope.behavior(
this.transports$.pipe(
map((transports) => transports?.local ?? null),
distinctUntilChanged(deepCompare),
distinctUntilChanged<Async<LivekitTransport> | null>(deepCompare),
),
);
@@ -603,40 +394,48 @@ export class CallViewModel extends ViewModel {
),
);
private readonly localConnectionAndTransport$ = this.scope.behavior(
this.localTransport$.pipe(
map(
(transport) =>
transport &&
mapAsync(transport, (transport) => ({
connection: new PublishConnection(
transport,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.remoteTransports$,
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
),
transport,
})),
/**
* The local connection over which we will publish our media. It could
* possibly also have some remote users' media available on it.
* null when not joined.
*/
private readonly localConnection$: Behavior<Async<PublishConnection> | null> =
this.scope.behavior(
this.localTransport$.pipe(
map(
(transport) =>
transport &&
mapAsync(transport, (transport) => {
const opts: ConnectionOpts = {
transport,
client: this.matrixRTCSession.room.client,
scope: this.scope,
remoteTransports$: this.remoteTransports$,
};
return new PublishConnection(
opts,
this.mediaDevices,
this.muteStates,
this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$),
);
}),
),
),
),
);
private readonly localConnection$ = this.scope.behavior(
this.localConnectionAndTransport$.pipe(
map((value) => value && mapAsync(value, ({ connection }) => connection)),
),
);
);
public readonly livekitConnectionState$ = this.scope.behavior(
this.localConnection$.pipe(
switchMap((c) =>
c?.state === "ready"
? c.value.connectionState$
? // TODO mapping to ConnectionState for compatibility, but we should use the full state?
c.value.focusConnectionState$.pipe(
map((s) => {
if (s.state === "ConnectedToLkRoom") return s.connectionState;
return ConnectionState.Disconnected;
}),
distinctUntilChanged(),
)
: of(ConnectionState.Disconnected),
),
),
@@ -674,16 +473,19 @@ export class CallViewModel extends ViewModel {
"SFU remoteConnections$ construct new connection: ",
remoteServiceUrl,
);
nextConnection = new Connection(
{
const args: ConnectionOpts = {
transport: {
type: "livekit",
livekit_service_url: remoteServiceUrl,
livekit_alias: this.livekitAlias,
type: "livekit",
},
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.remoteTransports$,
client: this.matrixRTCSession.room.client,
scope: this.scope,
remoteTransports$: this.remoteTransports$,
};
nextConnection = new RemoteConnection(
args,
this.e2eeLivekitOptions(),
);
} else {
@@ -735,15 +537,15 @@ export class CallViewModel extends ViewModel {
map((connections) =>
[...connections.values()].map((c) => ({
room: c.livekitRoom,
url: c.transport.livekit_service_url,
url: c.localTransport.livekit_service_url,
isLocal: c instanceof PublishConnection,
})),
),
),
);
private readonly userId = this.matrixRoom.client.getUserId();
private readonly deviceId = this.matrixRoom.client.getDeviceId();
private readonly userId = this.matrixRoom.client.getUserId()!;
private readonly deviceId = this.matrixRoom.client.getDeviceId()!;
private readonly matrixConnected$ = this.scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
@@ -820,7 +622,7 @@ export class CallViewModel extends ViewModel {
* Lists, for each LiveKit room, the LiveKit participants whose media should
* be presented.
*/
public readonly participantsByRoom$ = this.scope.behavior<
private readonly participantsByRoom$ = this.scope.behavior<
{
livekitRoom: LivekitRoom;
url: string;
@@ -832,17 +634,16 @@ export class CallViewModel extends ViewModel {
}[]
>(
// TODO: Move this logic into Connection/PublishConnection if possible
this.localConnectionAndTransport$
this.localConnection$
.pipe(
switchMap((values) => {
if (values?.state !== "ready") return [];
const localConnection = values.value.connection;
switchMap((localConnection) => {
if (localConnection?.state !== "ready") return [];
const memberError = (): never => {
throw new Error("No room member for call membership");
};
const localParticipant = {
id: "local",
participant: localConnection.livekitRoom.localParticipant,
participant: localConnection.value.livekitRoom.localParticipant,
member:
this.matrixRoom.getMember(this.userId ?? "") ?? memberError(),
};
@@ -850,7 +651,7 @@ export class CallViewModel extends ViewModel {
return this.remoteConnections$.pipe(
switchMap((remoteConnections) =>
combineLatest(
[localConnection, ...remoteConnections].map((c) =>
[localConnection.value, ...remoteConnections].map((c) =>
c.publishingParticipants$.pipe(
map((ps) => {
const participants: {
@@ -869,12 +670,12 @@ export class CallViewModel extends ViewModel {
this.matrixRoom,
)?.member ?? memberError(),
}));
if (c === localConnection)
if (c === localConnection.value)
participants.push(localParticipant);
return {
livekitRoom: c.livekitRoom,
url: c.transport.livekit_service_url,
url: c.localTransport.livekit_service_url,
participants,
};
}),
@@ -888,6 +689,25 @@ export class CallViewModel extends ViewModel {
.pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$)),
);
/**
* Lists, for each LiveKit room, the LiveKit participants whose audio should
* be rendered.
*/
// (This is effectively just participantsByRoom$ with a stricter type)
public readonly audioParticipants$ = this.scope.behavior(
this.participantsByRoom$.pipe(
map((data) =>
data.map(({ livekitRoom, url, participants }) => ({
livekitRoom,
url,
participants: participants.flatMap(({ participant }) =>
participant instanceof RemoteParticipant ? [participant] : [],
),
})),
),
),
);
/**
* Displaynames for each member of the call. This will disambiguate
* any displaynames that clashes with another member. Only members
@@ -909,7 +729,11 @@ export class CallViewModel extends ViewModel {
],
(memberships, _displaynames) => {
const displaynameMap = new Map<string, string>([
["local", this.matrixRoom.getMember(this.userId!)!.rawDisplayName],
[
"local",
this.matrixRoom.getMember(this.userId)?.rawDisplayName ??
this.userId,
],
]);
const room = this.matrixRoom;
@@ -961,7 +785,11 @@ export class CallViewModel extends ViewModel {
scan((prevItems, [participantsByRoom, duplicateTiles]) => {
const newItems: Map<string, UserMedia | ScreenShare> = new Map(
function* (this: CallViewModel): Iterable<[string, MediaItem]> {
for (const { livekitRoom, participants } of participantsByRoom) {
for (const {
livekitRoom,
participants,
url,
} of participantsByRoom) {
for (const { id, participant, member } of participants) {
for (let i = 0; i < 1 + duplicateTiles; i++) {
const mediaId = `${id}:${i}`;
@@ -981,6 +809,7 @@ export class CallViewModel extends ViewModel {
participant,
this.options.encryptionSystem,
livekitRoom,
url,
this.mediaDevices,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
@@ -1002,6 +831,7 @@ export class CallViewModel extends ViewModel {
participant,
this.options.encryptionSystem,
livekitRoom,
url,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
map((m) => m.get(id) ?? "[👻]"),
@@ -1972,23 +1802,47 @@ export class CallViewModel extends ViewModel {
) {
super();
this.preferredTransport$ = async$(
makeTransport(this.matrixRTCSession),
).pipe(this.scope.bind());
// Start and stop local and remote connections as needed
this.connectionInstructions$
.pipe(this.scope.bind())
.subscribe(({ start, stop }) => {
for (const c of stop) {
logger.info(`Disconnecting from ${c.transport.livekit_service_url}`);
c.stop();
logger.info(
`Disconnecting from ${c.localTransport.livekit_service_url}`,
);
c.stop().catch((err) => {
// TODO: better error handling
logger.error(
`Fail to stop connection to ${c.localTransport.livekit_service_url}`,
err,
);
});
}
for (const c of start) {
c.start().then(
() =>
logger.info(`Connected to ${c.transport.livekit_service_url}`),
(e) =>
logger.error(
`Failed to start connection to ${c.transport.livekit_service_url}`,
e,
logger.info(
`Connected to ${c.localTransport.livekit_service_url}`,
),
(e) => {
// We only want to report fatal errors `_configError$` for the publish connection.
// If there is an error with another connection, it will not terminate the call and will be displayed
// on eacn tile.
if (
c instanceof PublishConnection &&
e instanceof ElementCallError
) {
this._configError$.next(e);
}
logger.error(
`Failed to start connection to ${c.localTransport.livekit_service_url}`,
e,
);
},
);
}
});
@@ -1997,14 +1851,13 @@ export class CallViewModel extends ViewModel {
this.scope.reconcile(this.advertisedTransport$, async (advertised) => {
if (advertised !== null) {
try {
await enterRTCSession(
this.matrixRTCSession,
advertised.transport,
this.options.encryptionSystem.kind !== E2eeType.NONE,
true,
true,
advertised.multiSfu,
);
this._configError$.next(null);
await enterRTCSession(this.matrixRTCSession, advertised.transport, {
encryptMedia: this.options.encryptionSystem.kind !== E2eeType.NONE,
useExperimentalToDeviceTransport: true,
useNewMembershipManager: true,
useMultiSfu: advertised.multiSfu,
});
} catch (e) {
logger.error("Error entering RTC session", e);
}
@@ -2115,3 +1968,25 @@ function getE2eeKeyProvider(
return keyProvider;
}
}
function getRoomMemberFromRtcMember(
rtcMember: CallMembership,
room: MatrixRoom,
): { id: string; member: RoomMember | undefined } {
// WARN! This is not exactly the sender but the user defined in the state key.
// This will be available once we change to the new "member as object" format in the MatrixRTC object.
let id = rtcMember.sender + ":" + rtcMember.deviceId;
if (!rtcMember.sender) {
return { id, member: undefined };
}
if (
rtcMember.sender === room.client.getUserId() &&
rtcMember.deviceId === room.client.getDeviceId()
) {
id = "local";
}
const member = room.getMember(rtcMember.sender) ?? undefined;
return { id, member };
}