diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 15db43c4..436255eb 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -121,7 +121,7 @@ import { type MuteStates } from "./MuteStates"; import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; import { ElementWidgetActions, widget } from "../widget"; -import { PublishConnection } from "./ownMember/Publisher.ts"; +import { PublishConnection } from "./localMember/Publisher.ts"; import { type Async, async$, mapAsync, ready } from "./Async"; import { sharingScreen$, UserMedia } from "./UserMedia.ts"; import { ScreenShare } from "./ScreenShare.ts"; @@ -139,7 +139,10 @@ import { ObservableScope } from "./ObservableScope.ts"; import { memberDisplaynames$ } from "./remoteMembers/displayname.ts"; import { ConnectionManager } from "./remoteMembers/ConnectionManager.ts"; import { MatrixLivekitMerger } from "./remoteMembers/matrixLivekitMerger.ts"; -import { ownMembership$ } from "./ownMember/OwnMembership.ts"; +import { ownMembership$ } from "./localMember/LocalMembership.ts"; +import { localTransport$ as computeLocalTransport$ } from "./localMember/LocalTransport.ts"; +import { sessionBehaviors$ } from "./SessionBehaviors.ts"; +import { ECConnectionFactory } from "./remoteMembers/ConnectionFactory.ts"; //TODO // Larger rename @@ -197,6 +200,8 @@ type MediaItem = UserMedia | ScreenShare; export class CallViewModel { private readonly urlParams = getUrlParams(); + private readonly userId = this.matrixRoom.client.getUserId()!; + private readonly deviceId = this.matrixRoom.client.getDeviceId()!; private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession); private readonly livekitE2EEKeyProvider = getE2eeKeyProvider( @@ -214,31 +219,52 @@ export class CallViewModel { private readonly _configError$ = new BehaviorSubject( null, ); + private sessionBehaviors = sessionBehaviors$( + this.scope, + this.matrixRTCSession, + ); + private memberships$ = this.sessionBehaviors.memberships$; - private memberships$ = this.scope.behavior( - fromEvent( - this.matrixRTCSession, - MatrixRTCSessionEvent.MembershipsChanged, - (_, memberships: CallMembership[]) => memberships, + private localTransport$ = computeLocalTransport$({ + scope: this.scope, + memberships$: this.memberships$, + client: this.matrixRoom.client, + roomId: this.matrixRoom.roomId, + useOldestMember$: multiSfu.value$, + }); + + private connectionFactory = new ECConnectionFactory( + this.matrixRoom.client, + this.mediaDevices, + this.trackProcessorState$, + this.e2eeLivekitOptions(), + getUrlParams().controlledAudioDevices, + ); + + private allTransports$ = this.scope.behavior( + combineLatest( + [this.localTransport$, this.sessionBehaviors.transports$], + (l, t) => [...(l ? [l] : []), ...t], ), ); private connectionManager = new ConnectionManager( this.scope, - this.matrixRoom.client, - this.mediaDevices, - this.trackProcessorState$, - this.e2eeLivekitOptions(), + this.connectionFactory, + this.allTransports$, + logger, ); private matrixLivekitMerger = new MatrixLivekitMerger( this.scope, - this.memberships$, + this.sessionBehaviors.membershipsWithTransport$, this.connectionManager, this.matrixRoom, + this.userId, + this.deviceId, ); - private ownMembership = ownMembership$({ + private localMembership = this.localMembership$({ scope: this.scope, muteStates: this.muteStates, multiSfu: this.multiSfu, @@ -247,6 +273,7 @@ export class CallViewModel { e2eeLivekitOptions: this.e2eeLivekitOptions, }); + private matrixLivekitItems$ = this.matrixLivekitMerger.matrixLivekitItems$; /** * If there is a configuration error with the call (e.g. misconfigured E2EE). * This is a fatal error that prevents the call from being created/joined. @@ -289,60 +316,27 @@ export class CallViewModel { ), ); - /** - * The transport that we would personally prefer to publish on (if not for the - * transport preferences of others, perhaps). - */ - // DISCUSS move to ownMembership - private readonly preferredTransport$ = this.scope.behavior( - async$(makeTransport(this.matrixRTCSession)), - ); + // /** + // * The transport that we would personally prefer to publish on (if not for the + // * transport preferences of others, perhaps). + // */ + // // DISCUSS move to ownMembership + // private readonly preferredTransport$ = this.scope.behavior( + // async$(makeTransport(this.matrixRTCSession)), + // ); - /** - * The transport over which we should be actively publishing our media. - * null when not joined. - */ - // DISCUSSION ownMembershipManager - private readonly localTransport$: Behavior | null> = - this.scope.behavior( - this.transports$.pipe( - map((transports) => transports?.local ?? null), - distinctUntilChanged | null>(deepCompare), - ), - ); - - /** - * The transport we should advertise in our MatrixRTC membership (plus whether - * it is a multi-SFU transport and whether we should use sticky events). - */ - // DISCUSSION ownMembershipManager - private readonly advertisedTransport$: Behavior<{ - multiSfu: boolean; - preferStickyEvents: boolean; - transport: LivekitTransport; - } | null> = this.scope.behavior( - this.transports$.pipe( - map((transports) => - transports?.local.state === "ready" && - transports.preferred.state === "ready" - ? { - multiSfu: transports.multiSfu, - preferStickyEvents: transports.preferStickyEvents, - // In non-multi-SFU mode we should always advertise the preferred - // SFU to minimize the number of membership updates - transport: transports.multiSfu - ? transports.local.value - : transports.preferred.value, - } - : null, - ), - distinctUntilChanged<{ - multiSfu: boolean; - preferStickyEvents: boolean; - transport: LivekitTransport; - } | null>(deepCompare), - ), - ); + // /** + // * The transport over which we should be actively publishing our media. + // * null when not joined. + // */ + // // DISCUSSION ownMembershipManager + // private readonly localTransport$: Behavior | null> = + // this.scope.behavior( + // this.transports$.pipe( + // map((transports) => transports?.local ?? null), + // distinctUntilChanged | null>(deepCompare), + // ), + // ); // // DISCUSSION move to ConnectionManager // public readonly livekitConnectionState$ = @@ -367,8 +361,6 @@ export class CallViewModel { // ), // ); - private readonly userId = this.matrixRoom.client.getUserId()!; - /** * Whether various media/event sources should pretend to be disconnected from * all network input, even if their connection still technically works. diff --git a/src/state/SessionBehaviors.ts b/src/state/SessionBehaviors.ts new file mode 100644 index 00000000..6c16ace4 --- /dev/null +++ b/src/state/SessionBehaviors.ts @@ -0,0 +1,72 @@ +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + type CallMembership, + isLivekitTransport, + type LivekitTransport, + type MatrixRTCSession, + MatrixRTCSessionEvent, +} from "matrix-js-sdk/lib/matrixrtc"; +import { fromEvent, map } from "rxjs"; + +import { type ObservableScope } from "./ObservableScope"; +import { type Behavior } from "./Behavior"; + +export const sessionBehaviors$ = ( + scope: ObservableScope, + matrixRTCSession: MatrixRTCSession, +): { + memberships$: Behavior; + membershipsWithTransport$: Behavior< + { membership: CallMembership; transport?: LivekitTransport }[] + >; + transports$: Behavior; +} => { + const memberships$ = scope.behavior( + fromEvent( + matrixRTCSession, + MatrixRTCSessionEvent.MembershipsChanged, + (_, memberships: CallMembership[]) => memberships, + ), + ); + /** + * Lists the transports used by ourselves, plus all other MatrixRTC session + * members. For completeness this also lists the preferred transport and + * whether we are in multi-SFU mode or sticky events mode (because + * advertisedTransport$ wants to read them at the same time, and bundling data + * together when it might change together is what you have to do in RxJS to + * avoid reading inconsistent state or observing too many changes.) + */ + const membershipsWithTransport$: Behavior< + { membership: CallMembership; transport?: LivekitTransport }[] + > = scope.behavior( + memberships$.pipe( + map((memberships) => { + return memberships.map((membership) => { + const oldestMembership = memberships[0] ?? membership; + const transport = membership.getTransport(oldestMembership); + return { + membership, + transport: isLivekitTransport(transport) ? transport : undefined, + }; + }); + }), + ), + ); + + const transports$ = scope.behavior( + membershipsWithTransport$.pipe( + map((mts) => mts.flatMap(({ transport: t }) => (t ? [t] : []))), + ), + ); + return { + memberships$, + membershipsWithTransport$, + transports$, + }; +}; diff --git a/src/state/localMember/LocalMembership.ts b/src/state/localMember/LocalMembership.ts new file mode 100644 index 00000000..7448c2ee --- /dev/null +++ b/src/state/localMember/LocalMembership.ts @@ -0,0 +1,328 @@ +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-IdFentifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { type LocalTrack, type E2EEOptions } from "livekit-client"; +import { + type LivekitTransport, + type MatrixRTCSession, + MembershipManagerEvent, + Status, +} from "matrix-js-sdk/lib/matrixrtc"; +import { + ClientEvent, + type MatrixClient, + SyncState, + type Room as MatrixRoom, +} from "matrix-js-sdk"; +import { + BehaviorSubject, + combineLatest, + fromEvent, + map, + type Observable, + of, + startWith, + switchMap, + tap, +} from "rxjs"; +import { logger } from "matrix-js-sdk/lib/logger"; + +import { type Behavior } from "../Behavior"; +import { type ConnectionManager } from "../remoteMembers/ConnectionManager"; +import { type ObservableScope } from "../ObservableScope"; +import { Publisher } from "./Publisher"; +import { type MuteStates } from "../MuteStates"; +import { type ProcessorState } from "../../livekit/TrackProcessorContext"; +import { type MediaDevices } from "../MediaDevices"; +import { and$ } from "../../utils/observable"; +import { areLivekitTransportsEqual } from "../remoteMembers/matrixLivekitMerger"; +import { + enterRTCSession, + type EnterRTCSessionOptions, +} from "../../rtcSessionHelpers"; + +/* + * - get well known + * - get oldest membership + * - get transport to use + * - get openId + jwt token + * - wait for createTrack() call + * - create tracks + * - wait for join() call + * - Publisher.publishTracks() + * - send join state/sticky event + */ +interface Props { + scope: ObservableScope; + mediaDevices: MediaDevices; + muteStates: MuteStates; + connectionManager: ConnectionManager; + matrixRTCSession: MatrixRTCSession; + matrixRoom: MatrixRoom; + localTransport$: Behavior; + client: MatrixClient; + roomId: string; + e2eeLivekitOptions: E2EEOptions | undefined; + trackerProcessorState$: Behavior; +} +enum LivekitState { + UNINITIALIZED = "uninitialized", + CONNECTING = "connecting", + CONNECTED = "connected", + ERROR = "error", + DISCONNECTED = "disconnected", + DISCONNECTING = "disconnecting", +} +type LocalMemberLivekitState = + | { state: LivekitState.ERROR; error: string } + | { state: LivekitState.CONNECTED } + | { state: LivekitState.CONNECTING } + | { state: LivekitState.UNINITIALIZED } + | { state: LivekitState.DISCONNECTED } + | { state: LivekitState.DISCONNECTING }; + +enum MatrixState { + CONNECTED = "connected", + DISCONNECTED = "disconnected", + CONNECTING = "connecting", +} +type LocalMemberMatrixState = + | { state: MatrixState.CONNECTED } + | { state: MatrixState.CONNECTING } + | { state: MatrixState.DISCONNECTED }; + +interface LocalMemberState { + livekit$: BehaviorSubject; + matrix$: BehaviorSubject; +} +/** + * This class is responsible for managing the own membership in a room. + * We want + * - a publisher + * - + * @param param0 + * @returns + * - publisher: The handle to create tracks and publish them to the room. + * - connected$: the current connection state. Including matrix server and livekit server connection. (only the livekit server relevant for our own participation) + * - transport$: the transport object the ownMembership$ ended up using. + * + */ +export const localMembership$ = ({ + scope, + muteStates, + mediaDevices, + connectionManager, + matrixRTCSession, + localTransport$, + matrixRoom, + e2eeLivekitOptions, + trackerProcessorState$, +}: Props): { + // publisher: Publisher + requestConnect: (options: EnterRTCSessionOptions) => LocalMemberState; + startTracks: () => Behavior; + requestDisconnect: () => Observable | null; + state: LocalMemberState; // TODO this is probably superseeded by joinState$ + homeserverConnected$: Behavior; + connected$: Behavior; +} => { + const state = { + livekit$: new BehaviorSubject({ + state: LivekitState.UNINITIALIZED, + }), + matrix$: new BehaviorSubject({ + state: MatrixState.DISCONNECTED, + }), + }; + + // This should be used in a combineLatest with publisher$ to connect. + // to make it possible to call startTracks before the preferredTransport$ has resolved. + const shouldStartTracks$ = new BehaviorSubject(false); + + // This should be used in a combineLatest with publisher$ to connect. + const tracks$ = new BehaviorSubject([]); + + const connection$ = scope.behavior( + combineLatest([connectionManager.connections$, localTransport$]).pipe( + map(([connections, transport]) => + connections.find((connection) => + areLivekitTransportsEqual(connection.transport, transport), + ), + ), + ), + ); + /** + * Whether we are connected to the MatrixRTC session. + */ + const homeserverConnected$ = scope.behavior( + // To consider ourselves connected to MatrixRTC, we check the following: + and$( + // The client is connected to the sync loop + ( + fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable< + [SyncState] + > + ).pipe( + startWith([matrixRoom.client.getSyncState()]), + map(([state]) => state === SyncState.Syncing), + ), + // Room state observed by session says we're connected + fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe( + startWith(null), + map(() => matrixRTCSession.membershipStatus === Status.Connected), + ), + // Also watch out for warnings that we've likely hit a timeout and our + // delayed leave event is being sent (this condition is here because it + // provides an earlier warning than the sync loop timeout, and we wouldn't + // see the actual leave event until we reconnect to the sync loop) + fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe( + startWith(null), + map(() => matrixRTCSession.probablyLeft !== true), + ), + ), + ); + + // /** + // * Whether we are "fully" connected to the call. Accounts for both the + // * connection to the MatrixRTC session and the LiveKit publish connection. + // */ + // // TODO use this in combination with the MemberState. + const connected$ = scope.behavior( + and$( + homeserverConnected$, + connection$.pipe( + switchMap((c) => + c + ? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom")) + : of(false), + ), + ), + ), + ); + + const publisher$ = scope.behavior( + connection$.pipe( + map((connection) => + connection + ? new Publisher( + scope, + connection, + mediaDevices, + muteStates, + e2eeLivekitOptions, + trackerProcessorState$, + ) + : null, + ), + ), + ); + + combineLatest( + [publisher$, shouldStartTracks$], + (publisher, shouldStartTracks) => { + if (publisher && shouldStartTracks) { + publisher + .createAndSetupTracks() + .then((tracks) => { + tracks$.next(tracks); + }) + .catch((error) => { + logger.error("Error creating tracks:", error); + }); + } + }, + ); + + // MATRIX RELATED + + // /** + // * Whether we should tell the user that we're reconnecting to the call. + // */ + // // DISCUSSION own membership manager + // const reconnecting$ = scope.behavior( + // connected$.pipe( + // // We are reconnecting if we previously had some successful initial + // // connection but are now disconnected + // scan( + // ({ connectedPreviously }, connectedNow) => ({ + // connectedPreviously: connectedPreviously || connectedNow, + // reconnecting: connectedPreviously && !connectedNow, + // }), + // { connectedPreviously: false, reconnecting: false }, + // ), + // map(({ reconnecting }) => reconnecting), + // ), + // ); + + const startTracks = (): Behavior => { + shouldStartTracks$.next(true); + return tracks$; + }; + + // const joinState$ = new BehaviorSubject({ + // state: LivekitState.UNINITIALIZED, + // }); + + const requestConnect = ( + options: EnterRTCSessionOptions, + ): LocalMemberState => { + if (state.livekit$.value === null) { + startTracks(); + state.livekit$.next({ state: LivekitState.CONNECTING }); + combineLatest([publisher$, tracks$], (publisher, tracks) => { + publisher + ?.startPublishing() + .then(() => { + state.livekit$.next({ state: LivekitState.CONNECTED }); + }) + .catch((error) => { + state.livekit$.next({ state: LivekitState.ERROR, error }); + }); + }); + } + if (state.matrix$.value.state !== MatrixState.DISCONNECTED) { + state.matrix$.next({ state: MatrixState.CONNECTING }); + localTransport$.pipe( + tap((transport) => { + enterRTCSession(matrixRTCSession, transport, options).catch( + (error) => { + logger.error(error); + }, + ); + }), + ); + } + return state; + }; + + const requestDisconnect = (): Behavior | null => { + if (state.livekit$.value.state !== LivekitState.CONNECTED) return null; + state.livekit$.next({ state: LivekitState.DISCONNECTING }); + combineLatest([publisher$, tracks$], (publisher, tracks) => { + publisher + ?.stopPublishing() + .then(() => { + tracks.forEach((track) => track.stop()); + state.livekit$.next({ state: LivekitState.DISCONNECTED }); + }) + .catch((error) => { + state.livekit$.next({ state: LivekitState.ERROR, error }); + }); + }); + + return state.livekit$; + }; + + return { + startTracks, + requestConnect, + requestDisconnect, + state, + homeserverConnected$, + connected$, + }; +}; diff --git a/src/state/localMember/LocalTransport.ts b/src/state/localMember/LocalTransport.ts new file mode 100644 index 00000000..7a5202a9 --- /dev/null +++ b/src/state/localMember/LocalTransport.ts @@ -0,0 +1,166 @@ +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-IdFentifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + type CallMembership, + isLivekitTransport, + type LivekitTransportConfig, + type LivekitTransport, + isLivekitTransportConfig, +} from "matrix-js-sdk/lib/matrixrtc"; +import { type MatrixClient } from "matrix-js-sdk"; +import { combineLatest, distinctUntilChanged, first, from, map } from "rxjs"; +import { logger } from "matrix-js-sdk/lib/logger"; +import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; +import { deepCompare } from "matrix-js-sdk/lib/utils"; + +import { type Behavior } from "../Behavior.ts"; +import { type ObservableScope } from "../ObservableScope.ts"; +import { Config } from "../../config/Config.ts"; +import { MatrixRTCTransportMissingError } from "../../utils/errors.ts"; +import { getSFUConfigWithOpenID } from "../../livekit/openIDSFU.ts"; + +/* + * - get well known + * - get oldest membership + * - get transport to use + * - get openId + jwt token + * - wait for createTrack() call + * - create tracks + * - wait for join() call + * - Publisher.publishTracks() + * - send join state/sticky event + */ +interface Props { + scope: ObservableScope; + memberships$: Behavior; + client: MatrixClient; + roomId: string; + useOldestMember$: Behavior; +} + +/** + * This class is responsible for managing the local transport. + * "Which transport is the local member going to use" + * + * @prop useOldestMember Whether to use the same transport as the oldest member. + * This will only update once the first oldest member appears. Will not recompute if the oldest member leaves. + */ +export const localTransport$ = ({ + scope, + memberships$, + client, + roomId, + useOldestMember$, +}: Props): Behavior => { + /** + * The transport over which we should be actively publishing our media. + * undefined when not joined. + */ + const oldestMemberTransport$ = scope.behavior( + memberships$.pipe( + map((memberships) => memberships[0].getTransport(memberships[0])), + first((t) => t != undefined && isLivekitTransport(t)), + ), + undefined, + ); + + /** + * The transport that we would personally prefer to publish on (if not for the + * transport preferences of others, perhaps). + */ + const preferredTransport$: Behavior = + scope.behavior(from(makeTransport(client, roomId)), undefined); + + /** + * The transport we should advertise in our MatrixRTC membership (plus whether + * it is a multi-SFU transport and whether we should use sticky events). + */ + const advertisedTransport$ = scope.behavior( + combineLatest( + [useOldestMember$, preferredTransport$, oldestMemberTransport$], + (useOldestMember, preferredTransport, oldestMemberTransport) => + useOldestMember ? oldestMemberTransport : preferredTransport, + ).pipe(distinctUntilChanged(deepCompare)), + undefined, + ); + return advertisedTransport$; +}; + +const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; + +async function makeTransportInternal( + client: MatrixClient, + roomId: string, +): Promise { + logger.log("Searching for a preferred transport"); + //TODO refactor this to use the jwt service returned alias. + const livekitAlias = roomId; + + // TODO-MULTI-SFU: Either remove this dev tool or make it more official + const urlFromStorage = + localStorage.getItem("robin-matrixrtc-auth") ?? + localStorage.getItem("timo-focus-url"); + if (urlFromStorage !== null) { + const transportFromStorage: LivekitTransport = { + type: "livekit", + livekit_service_url: urlFromStorage, + livekit_alias: livekitAlias, + }; + logger.log( + "Using LiveKit transport from local storage: ", + transportFromStorage, + ); + return transportFromStorage; + } + + // Prioritize the .well-known/matrix/client, if available, over the configured SFU + const domain = client.getDomain(); + if (domain) { + // we use AutoDiscovery instead of relying on the MatrixClient having already + // been fully configured and started + const wellKnownFoci = (await AutoDiscovery.getRawClientConfig(domain))?.[ + FOCI_WK_KEY + ]; + if (Array.isArray(wellKnownFoci)) { + const transport: LivekitTransportConfig | undefined = wellKnownFoci.find( + (f) => f && isLivekitTransportConfig(f), + ); + if (transport !== undefined) { + logger.log("Using LiveKit transport from .well-known: ", transport); + return { ...transport, livekit_alias: livekitAlias }; + } + } + } + + const urlFromConf = Config.get().livekit?.livekit_service_url; + if (urlFromConf) { + const transportFromConf: LivekitTransport = { + type: "livekit", + livekit_service_url: urlFromConf, + livekit_alias: livekitAlias, + }; + logger.log("Using LiveKit transport from config: ", transportFromConf); + return transportFromConf; + } + + throw new MatrixRTCTransportMissingError(domain ?? ""); +} + +async function makeTransport( + client: MatrixClient, + roomId: string, +): Promise { + const transport = await makeTransportInternal(client, roomId); + // this will call the jwt/sfu/get endpoint to pre create the livekit room. + await getSFUConfigWithOpenID( + client, + transport.livekit_service_url, + transport.livekit_alias, + ); + return transport; +} diff --git a/src/state/ownMember/Publisher.ts b/src/state/localMember/Publisher.ts similarity index 99% rename from src/state/ownMember/Publisher.ts rename to src/state/localMember/Publisher.ts index c37445b0..6a1079fd 100644 --- a/src/state/ownMember/Publisher.ts +++ b/src/state/localMember/Publisher.ts @@ -89,7 +89,7 @@ export class Publisher { * @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection. * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ - public async createAndSetupTracks(): Promise { + public async createAndSetupTracks(): Promise { const lkRoom = this.connection.livekitRoom; // Observe mute state changes and update LiveKit microphone/camera states accordingly this.observeMuteStates(this.scope); @@ -125,6 +125,7 @@ export class Publisher { video, }); } + return this.tracks; } public async startPublishing(): Promise { diff --git a/src/state/ownMember/OwnMembership.ts b/src/state/ownMember/OwnMembership.ts deleted file mode 100644 index c096c2da..00000000 --- a/src/state/ownMember/OwnMembership.ts +++ /dev/null @@ -1,287 +0,0 @@ -/* -Copyright 2025 New Vector Ltd. - -SPDX-License-IdFentifier: AGPL-3.0-only OR LicenseRef-Element-Commercial -Please see LICENSE in the repository root for full details. -*/ - -import { type E2EEOptions, type Track } from "livekit-client"; -import { - type LivekitTransport, - type MatrixRTCSession, - MembershipManagerEvent, - Status, -} from "matrix-js-sdk/lib/matrixrtc"; -import { - ClientEvent, - type MatrixClient, - SyncState, - type Room as MatrixRoom, -} from "matrix-js-sdk"; -import { - BehaviorSubject, - combineLatest, - distinctUntilChanged, - from, - fromEvent, - map, - type Observable, - of, - scan, - startWith, - switchMap, -} from "rxjs"; -import { deepCompare } from "matrix-js-sdk/lib/utils"; - -import { multiSfu } from "../../settings/settings"; -import { type Behavior } from "../Behavior"; -import { type ConnectionManager } from "../remoteMembers/ConnectionManager"; -import { makeTransport } from "../../rtcSessionHelpers"; -import { type ObservableScope } from "../ObservableScope"; -import { Publisher } from "./Publisher"; -import { type MuteStates } from "../MuteStates"; -import { type ProcessorState } from "../../livekit/TrackProcessorContext"; -import { type MediaDevices } from "../../state/MediaDevices"; -import { and$ } from "../../utils/observable"; -import { areLivekitTransportsEqual } from "../remoteMembers/matrixLivekitMerger"; -import { type ElementCallError } from "../../utils/errors.ts"; - -/* - * - get well known - * - get oldest membership - * - get transport to use - * - get openId + jwt token - * - wait for createTrack() call - * - create tracks - * - wait for join() call - * - Publisher.publishTracks() - * - send join state/sticky event - */ -interface Props { - scope: ObservableScope; - mediaDevices: MediaDevices; - muteStates: MuteStates; - connectionManager: ConnectionManager; - matrixRTCSession: MatrixRTCSession; - matrixRoom: MatrixRoom; - client: MatrixClient; - preferStickyEvents: boolean; - roomId: string; - e2eeLivekitOptions: E2EEOptions | undefined; - trackerProcessorState$: Behavior; -} - -export type JoinedState = - | { state: "Initialized" } - | { state: "Error"; error: ElementCallError }; - -/** - * This class is responsible for managing the own membership in a room. - * We want - * - a publisher - * - - * @param param0 - * @returns - * - publisher: The handle to create tracks and publish them to the room. - * - connected$: the current connection state. Including matrix server and livekit server connection. (only the livekit server relevant for our own participation) - * - transport$: the transport object the ownMembership$ ended up using. - * - */ -export const ownMembership$ = ({ - scope, - muteStates, - mediaDevices, - preferStickyEvents, - connectionManager, - matrixRTCSession, - matrixRoom, - e2eeLivekitOptions, - client, - roomId, - trackerProcessorState$, -}: Props): { - // publisher: Publisher - requestJoin$(): Observable; - startTracks(): Track[]; -} => { - // This should be used in a combineLatest with publisher$ to connect. - const shouldStartTracks$ = new BehaviorSubject(false); - - // to make it possible to call startTracks before the preferredTransport$ has resolved. - const startTracks = () => { - shouldStartTracks$.next(true); - }; - - const userId = client.getUserId()!; - const deviceId = client.getDeviceId()!; - const multiSfu$ = multiSfu.value$; - /** - * The transport that we would personally prefer to publish on (if not for the - * transport preferences of others, perhaps). - */ - const preferredTransport$: Behavior = scope.behavior( - from(makeTransport(client, roomId)), - ); - - connectionManager.registerTransports( - scope.behavior(preferredTransport$.pipe(map((t) => (t ? [t] : [])))), - ); - - const connection$ = scope.behavior( - combineLatest([connectionManager.connections$, preferredTransport$]).pipe( - map(([connections, transport]) => - connections.find((connection) => - areLivekitTransportsEqual(connection.transport, transport), - ), - ), - ), - ); - /** - * Whether we are connected to the MatrixRTC session. - */ - // DISCUSSION own membership manager - const matrixConnected$ = scope.behavior( - // To consider ourselves connected to MatrixRTC, we check the following: - and$( - // The client is connected to the sync loop - ( - fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable< - [SyncState] - > - ).pipe( - startWith([matrixRoom.client.getSyncState()]), - map(([state]) => state === SyncState.Syncing), - ), - // Room state observed by session says we're connected - fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe( - startWith(null), - map(() => matrixRTCSession.membershipStatus === Status.Connected), - ), - // Also watch out for warnings that we've likely hit a timeout and our - // delayed leave event is being sent (this condition is here because it - // provides an earlier warning than the sync loop timeout, and we wouldn't - // see the actual leave event until we reconnect to the sync loop) - fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe( - startWith(null), - map(() => matrixRTCSession.probablyLeft !== true), - ), - ), - ); - - /** - * Whether we are "fully" connected to the call. Accounts for both the - * connection to the MatrixRTC session and the LiveKit publish connection. - */ - const connected$ = scope.behavior( - and$( - matrixConnected$, - connection$.pipe( - switchMap((c) => - c - ? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom")) - : of(false), - ), - ), - ), - ); - - const publisher = scope.behavior( - connection$.pipe( - map((c) => - c - ? new Publisher( - scope, - c, - mediaDevices, - muteStates, - e2eeLivekitOptions, - trackerProcessorState$, - ) - : null, - ), - ), - ); - - // HOW IT WAS PREVIEOUSLY CREATED - // new PublishConnection( - // { - // transport, - // client: this.matrixRoom.client, - // scope, - // remoteTransports$: this.remoteTransports$, - // livekitRoomFactory: this.options.livekitRoomFactory, - // }, - // this.mediaDevices, - // this.muteStates, - // this.e2eeLivekitOptions(), - // this.scope.behavior(this.trackProcessorState$), - // ), - - /** - * The transport over which we should be actively publishing our media. - * null when not joined. - */ - // DISCUSSION ownMembershipManager - const localTransport$: Behavior = - this.scope.behavior( - this.transports$.pipe( - map((transports) => transports?.local ?? null), - distinctUntilChanged(deepCompare), - ), - ); - - /** - * The transport we should advertise in our MatrixRTC membership (plus whether - * it is a multi-SFU transport and whether we should use sticky events). - */ - // DISCUSSION ownMembershipManager - const advertisedTransport$: Behavior<{ - multiSfu: boolean; - preferStickyEvents: boolean; - transport: LivekitTransport; - } | null> = this.scope.behavior( - this.transports$.pipe( - map((transports) => - transports?.local.state === "ready" && - transports.preferred.state === "ready" - ? { - multiSfu: transports.multiSfu, - preferStickyEvents: transports.preferStickyEvents, - // In non-multi-SFU mode we should always advertise the preferred - // SFU to minimize the number of membership updates - transport: transports.multiSfu - ? transports.local.value - : transports.preferred.value, - } - : null, - ), - distinctUntilChanged<{ - multiSfu: boolean; - preferStickyEvents: boolean; - transport: LivekitTransport; - } | null>(deepCompare), - ), - ); - - // MATRIX RELATED - - /** - * Whether we should tell the user that we're reconnecting to the call. - */ - // DISCUSSION own membership manager - const reconnecting$ = scope.behavior( - connected$.pipe( - // We are reconnecting if we previously had some successful initial - // connection but are now disconnected - scan( - ({ connectedPreviously }, connectedNow) => ({ - connectedPreviously: connectedPreviously || connectedNow, - reconnecting: connectedPreviously && !connectedNow, - }), - { connectedPreviously: false, reconnecting: false }, - ), - map(({ reconnecting }) => reconnecting), - ), - ); - return { connected$, transport$: preferredTransport$, publisher }; -}; diff --git a/src/state/remoteMembers/ConnectionManager.ts b/src/state/remoteMembers/ConnectionManager.ts index 239cf3c9..e333173e 100644 --- a/src/state/remoteMembers/ConnectionManager.ts +++ b/src/state/remoteMembers/ConnectionManager.ts @@ -87,29 +87,31 @@ export class ConnectionManagerData { export class ConnectionManager { private readonly logger: Logger; + private running$ = new BehaviorSubject(true); + /** + * Crete a `ConnectionManager` + * @param scope the observable scope used by this object. + * @param connectionFactory used to create new connections. + * @param _transportsSubscriptions$ A list of Behaviors each containing a LIST of LivekitTransport. + * Each of these behaviors can be interpreted as subscribed list of transports. + * + * Using `registerTransports` independent external modules can control what connections + * are created by the ConnectionManager. + * + * The connection manager will remove all duplicate transports in each subscibed list. + * + * See `unregisterAllTransports` and `unregisterTransport` for details on how to unsubscribe. + */ public constructor( private readonly scope: ObservableScope, private readonly connectionFactory: ConnectionFactory, + private readonly inputTransports$: Behavior, logger: Logger, ) { this.logger = logger.getChild("ConnectionManager"); + scope.onEnd(() => this.running$.next(false)); } - /** - * A list of Behaviors each containing a LIST of LivekitTransport. - * Each of these behaviors can be interpreted as subscribed list of transports. - * - * Using `registerTransports` independent external modules can control what connections - * are created by the ConnectionManager. - * - * The connection manager will remove all duplicate transports in each subscibed list. - * - * See `unregisterAllTransports` and `unregisterTransport` for details on how to unsubscribe. - */ - private readonly transportsSubscriptions$ = new BehaviorSubject< - Behavior[] - >([]); - /** * All transports currently managed by the ConnectionManager. * @@ -119,15 +121,10 @@ export class ConnectionManager { * externally this is modified via `registerTransports()`. */ private readonly transports$ = this.scope.behavior( - this.transportsSubscriptions$.pipe( - switchMap((subscriptions) => - combineLatest(subscriptions).pipe( - map((transportsNested) => transportsNested.flat()), - map(removeDuplicateTransports), - ), - ), + combineLatest([this.running$, this.inputTransports$]).pipe( + map(([running, transports]) => (running ? transports : [])), + map(removeDuplicateTransports), ), - [], ); /** @@ -163,60 +160,6 @@ export class ConnectionManager { ), ); - /** - * Add an a Behavior containing a list of transports to this ConnectionManager. - * - * The intended usage is: - * - create a ConnectionManager - * - register one `transports$` behavior using registerTransports - * - add new connections to the `ConnectionManager` by updating the `transports$` behavior - * - remove a single connection by removing the transport. - * - remove this subscription by calling `unregisterTransports` and passing - * the same `transports$` behavior reference. - * @param transports$ The Behavior containing a list of transports to subscribe to. - */ - public registerTransports(transports$: Behavior): void { - if (!this.transportsSubscriptions$.value.some((t$) => t$ === transports$)) { - this.transportsSubscriptions$.next( - this.transportsSubscriptions$.value.concat(transports$), - ); - } - // // After updating the subscriptions our connection list is also updated. - // return transports$.value - // .map((transport) => { - // const isConnectionForTransport = (connection: Connection): boolean => - // areLivekitTransportsEqual(connection.transport, transport); - // return this.connections$.value.find(isConnectionForTransport); - // }) - // .filter((c) => c !== undefined); - } - - /** - * Unsubscribe from the given transports. - * @param transports$ The behavior to unsubscribe from - * @returns - */ - public unregisterTransports( - transports$: Behavior, - ): boolean { - const subscriptions = this.transportsSubscriptions$.value; - const subscriptionsUnregistered = subscriptions.filter( - (t$) => t$ !== transports$, - ); - const canUnregister = - subscriptions.length !== subscriptionsUnregistered.length; - if (canUnregister) - this.transportsSubscriptions$.next(subscriptionsUnregistered); - return canUnregister; - } - - /** - * Unsubscribe from all transports. - */ - public unregisterAllTransports(): void { - this.transportsSubscriptions$.next([]); - } - public connectionManagerData$: Behavior = this.scope.behavior( this.connections$.pipe( diff --git a/src/state/remoteMembers/displayname.ts b/src/state/remoteMembers/displayname.ts index 825ad5a1..a5d1ae3d 100644 --- a/src/state/remoteMembers/displayname.ts +++ b/src/state/remoteMembers/displayname.ts @@ -6,18 +6,12 @@ Please see LICENSE in the repository root for full details. */ import { type RoomMember, RoomStateEvent } from "matrix-js-sdk"; -import { - combineLatest, - fromEvent, - map, - type Observable, - startWith, -} from "rxjs"; +import { combineLatest, fromEvent, type Observable, startWith } from "rxjs"; import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc"; import { logger } from "matrix-js-sdk/lib/logger"; import { type Room as MatrixRoom } from "matrix-js-sdk/lib/matrix"; // eslint-disable-next-line rxjs/no-internal -import { type HasEventTargetAddRemove } from "rxjs/internal/observable/fromEvent"; +import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent"; import { type ObservableScope } from "../ObservableScope"; import { @@ -36,20 +30,21 @@ import { type Behavior } from "../Behavior"; // don't do this work more times than we need to. This is achieved by converting to a behavior: export const memberDisplaynames$ = ( scope: ObservableScope, - matrixRoom: Pick & HasEventTargetAddRemove, + matrixRoom: Pick & NodeStyleEventEmitter, memberships$: Observable, userId: string, deviceId: string, ): Behavior> => scope.behavior( - combineLatest([ - // Handle call membership changes - memberships$, - // Additionally handle display name changes (implicitly reacting to them) - fromEvent(matrixRoom, RoomStateEvent.Members).pipe(startWith(null)), - // TODO: do we need: pauseWhen(this.pretendToBeDisconnected$), - ]).pipe( - map((memberships, _displaynames) => { + combineLatest( + [ + // Handle call membership changes + memberships$, + // Additionally handle display name changes (implicitly reacting to them) + fromEvent(matrixRoom, RoomStateEvent.Members).pipe(startWith(null)), + // TODO: do we need: pauseWhen(this.pretendToBeDisconnected$), + ], + (memberships, _displaynames) => { const displaynameMap = new Map([ [ `${userId}:${deviceId}`, @@ -76,7 +71,7 @@ export const memberDisplaynames$ = ( ); } return displaynameMap; - }), + }, ), new Map(), ); diff --git a/src/state/remoteMembers/matrixLivekitMerger.ts b/src/state/remoteMembers/matrixLivekitMerger.ts index 1487636c..39acc65b 100644 --- a/src/state/remoteMembers/matrixLivekitMerger.ts +++ b/src/state/remoteMembers/matrixLivekitMerger.ts @@ -7,13 +7,12 @@ Please see LICENSE in the repository root for full details. import { type Participant as LivekitParticipant } from "livekit-client"; import { - isLivekitTransport, type LivekitTransport, type CallMembership, } from "matrix-js-sdk/lib/matrixrtc"; import { combineLatest, map, startWith, type Observable } from "rxjs"; // eslint-disable-next-line rxjs/no-internal -import { type NodeStyleEventEmitter } from "rxjs/src/internal/observable/fromEvent.ts"; +import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent"; import type { Room as MatrixRoom, RoomMember } from "matrix-js-sdk"; // import type { Logger } from "matrix-js-sdk/lib/logger"; @@ -65,7 +64,9 @@ export class MatrixLivekitMerger { public constructor( private scope: ObservableScope, - private memberships$: Observable, + private membershipsWithTransport$: Behavior< + { membership: CallMembership; transport?: LivekitTransport }[] + >, private connectionManager: ConnectionManager, // TODO this is too much information for that class, // apparently needed to get a room member to later get the Avatar @@ -90,14 +91,13 @@ export class MatrixLivekitMerger { const displaynameMap$ = memberDisplaynames$( this.scope, this.matrixRoom, - this.memberships$, + this.membershipsWithTransport$.pipe( + map((v) => v.map((v) => v.membership)), + ), this.userId, this.deviceId, ); - const membershipsWithTransport$ = - this.mapMembershipsToMembershipWithTransport$(); - - this.startFeedingConnectionManager(membershipsWithTransport$); + const membershipsWithTransport$ = this.membershipsWithTransport$; return combineLatest([ membershipsWithTransport$, @@ -138,48 +138,6 @@ export class MatrixLivekitMerger { }), ); } - - private startFeedingConnectionManager( - membershipsWithTransport$: Behavior< - { membership: CallMembership; transport?: LivekitTransport }[] - >, - ): void { - const transports$ = this.scope.behavior( - membershipsWithTransport$.pipe( - map((mts) => mts.flatMap(({ transport: t }) => (t ? [t] : []))), - ), - ); - // duplicated transports will be elimiated by the connection manager - this.connectionManager.registerTransports(transports$); - } - - /** - * Lists the transports used by ourselves, plus all other MatrixRTC session - * members. For completeness this also lists the preferred transport and - * whether we are in multi-SFU mode or sticky events mode (because - * advertisedTransport$ wants to read them at the same time, and bundling data - * together when it might change together is what you have to do in RxJS to - * avoid reading inconsistent state or observing too many changes.) - */ - private mapMembershipsToMembershipWithTransport$(): Behavior< - { membership: CallMembership; transport?: LivekitTransport }[] - > { - return this.scope.behavior( - this.memberships$.pipe( - map((memberships) => { - return memberships.map((membership) => { - const oldestMembership = memberships[0] ?? membership; - const transport = membership.getTransport(oldestMembership); - return { - membership, - transport: isLivekitTransport(transport) ? transport : undefined, - }; - }); - }), - ), - [], - ); - } } // TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$)