diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 1977bf4a..90a1f682 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -23,7 +23,6 @@ import { type Room as MatrixRoom, RoomEvent, type RoomMember, - RoomStateEvent, SyncState, } from "matrix-js-sdk"; import { deepCompare } from "matrix-js-sdk/lib/utils"; @@ -108,7 +107,6 @@ import { type ReactionOption, } from "../reactions"; import { shallowEquals } from "../utils/array"; -import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname"; import { type MediaDevices } from "./MediaDevices"; import { type Behavior, constant } from "./Behavior"; import { @@ -118,12 +116,12 @@ import { } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; -import { type Connection, RemoteConnection } from "./remoteMembers/Connection.ts"; +import { type Connection } from "./remoteMembers/Connection.ts"; import { type MuteStates } from "./MuteStates"; import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; import { ElementWidgetActions, widget } from "../widget"; -import { PublishConnection } from "./ownMember/PublishConnection.ts"; +import { PublishConnection } from "./ownMember/Publisher.ts"; import { type Async, async$, mapAsync, ready } from "./Async"; import { sharingScreen$, UserMedia } from "./UserMedia.ts"; import { ScreenShare } from "./ScreenShare.ts"; @@ -369,57 +367,6 @@ export class CallViewModel { ), ); - /** - * Connections for each transport in use by one or more session members that - * is *distinct* from the local transport. - */ - // DISCUSSION move to ConnectionManager - private readonly remoteConnections$ = this.scope.behavior( - generateKeyed$( - this.transports$, - (transports, createOrGet) => { - const connections: Connection[] = []; - - // Until the local transport becomes ready we have no idea which - // transports will actually need a dedicated remote connection - if (transports?.local.state === "ready") { - // TODO: Handle custom transport.livekit_alias values here - const localServiceUrl = transports.local.value.livekit_service_url; - const remoteServiceUrls = new Set( - transports.remote.map( - ({ transport }) => transport.livekit_service_url, - ), - ); - remoteServiceUrls.delete(localServiceUrl); - - for (const remoteServiceUrl of remoteServiceUrls) - connections.push( - createOrGet( - remoteServiceUrl, - (scope) => - new RemoteConnection( - { - transport: { - type: "livekit", - livekit_service_url: remoteServiceUrl, - livekit_alias: this.livekitAlias, - }, - client: this.matrixRoom.client, - scope, - remoteTransports$: this.remoteTransports$, - livekitRoomFactory: this.options.livekitRoomFactory, - }, - this.e2eeLivekitOptions(), - ), - ), - ); - } - - return connections; - }, - ), - ); - /** * A list of the connections that should be active at any given time. */ diff --git a/src/state/ownMember/OwnMembership.ts b/src/state/ownMember/OwnMembership.ts index 1a5c1b24..4ba4c380 100644 --- a/src/state/ownMember/OwnMembership.ts +++ b/src/state/ownMember/OwnMembership.ts @@ -1,12 +1,29 @@ -import { Behavior } from "../Behavior"; +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { LiveKitReactNativeInfo } from "livekit-client"; +import { Behavior, constant } from "../Behavior"; +import { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; +import { ConnectionManager } from "../remoteMembers/ConnectionManager"; const ownMembership$ = ( multiSfu: boolean, preferStickyEvents: boolean, + connectionManager: ConnectionManager, + transport: LivekitTransport, ): { connected: Behavior; transport: Behavior; } => { + const connection = connectionManager.registerTransports( + constant([transport]), + ); + const publisher = new Publisher(connection); + /** * Lists the transports used by ourselves, plus all other MatrixRTC session * members. For completeness this also lists the preferred transport and diff --git a/src/state/ownMember/PublishConnection.ts b/src/state/ownMember/Publisher.ts similarity index 100% rename from src/state/ownMember/PublishConnection.ts rename to src/state/ownMember/Publisher.ts diff --git a/src/state/remoteMembers/Connection.ts b/src/state/remoteMembers/Connection.ts index 97127a48..e815ea55 100644 --- a/src/state/remoteMembers/Connection.ts +++ b/src/state/remoteMembers/Connection.ts @@ -13,16 +13,12 @@ import { ConnectionError, type ConnectionState as LivekitConenctionState, type E2EEOptions, - type RemoteParticipant, Room as LivekitRoom, type RoomOptions, - Participant, + type Participant, + RoomEvent, } from "livekit-client"; -import { - ParticipantId, - type CallMembership, - type LivekitTransport, -} from "matrix-js-sdk/lib/matrixrtc"; +import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import { BehaviorSubject, combineLatest, type Observable } from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; @@ -55,9 +51,9 @@ export interface ConnectionOpts { * The livekit room gives access to all the users subscribing to this connection, we need * to filter out the ones that are uploading to this connection. */ - membershipsWithTransport$: Behavior< - { membership: CallMembership; transport: LivekitTransport }[] - >; + // membershipsWithTransport$: Behavior< + // { membership: CallMembership; transport: LivekitTransport }[] + // >; /** Optional factory to create the LiveKit room, mainly for testing purposes. */ livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom; @@ -106,9 +102,13 @@ export class Connection { * 2. Use this token to request the SFU config to the MatrixRtc authentication service. * 3. Connect to the configured LiveKit room. * + * The errors are also represented as a state in the `state$` observable. + * It is safe to ignore those errors and handle them accordingly via the `state$` observable. * @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection. * @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created. */ + // TODO dont make this throw and instead store a connection error state in this class? + // TODO consider an autostart pattern... public async start(): Promise { this.stopped = false; try { @@ -221,35 +221,21 @@ export class Connection { logger?.info( `[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`, ); - const { transport, client, scope, membershipsWithTransport$ } = opts; + const { transport, client, scope } = opts; this.transport = transport; this.client = client; this.participantsWithPublishTrack$ = scope.behavior( - connectedParticipantsObserver(this.livekitRoom), - [], - ); - - // Legacy using callMemberships - this.publishingParticipants$ = scope.behavior( - combineLatest( - [this.participantsIncludingSubscribers$, membershipsWithTransport$], - (participants, remoteTransports) => - remoteTransports - // Find all members that claim to publish on this connection - .flatMap(({ membership, transport }) => - transport.livekit_service_url === - this.transport.livekit_service_url - ? [membership] - : [], - ) - // Pair with their associated LiveKit participant (if any) - .map((membership) => { - const id = `${membership.userId}:${membership.deviceId}`; - const participant = participants.find((p) => p.identity === id); - return { participant, membership }; - }), + connectedParticipantsObserver( + this.livekitRoom, + // VALR: added that while I think about it + { + additionalRoomEvents: [ + RoomEvent.TrackPublished, + RoomEvent.TrackUnpublished, + ], + }, ), [], ); diff --git a/src/state/remoteMembers/ConnectionManager.ts b/src/state/remoteMembers/ConnectionManager.ts new file mode 100644 index 00000000..311e621e --- /dev/null +++ b/src/state/remoteMembers/ConnectionManager.ts @@ -0,0 +1,219 @@ +// TODOs: +// - make ConnectionManager its own actual class + +/* +Copyright 2025 New Vector Ltd. + +SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +Please see LICENSE in the repository root for full details. +*/ + +import { + type LivekitTransport, + type ParticipantId, +} from "matrix-js-sdk/lib/matrixrtc"; +import { BehaviorSubject, combineLatest, map, switchMap } from "rxjs"; +import { type Logger } from "matrix-js-sdk/lib/logger"; +import { + type E2EEOptions, + type Room as LivekitRoom, + type Participant as LivekitParticipant, +} from "livekit-client"; +import { type MatrixClient } from "matrix-js-sdk"; + +import { type Behavior } from "../Behavior"; +import { type Connection, RemoteConnection } from "./Connection"; +import { type ObservableScope } from "../ObservableScope"; +import { generateKeyed$ } from "../../utils/observable"; +import { areLivekitTransportsEqual } from "./matrixLivekitMerger"; + +export type ParticipantByMemberIdMap = Map< + ParticipantId, + // It can be an array because a bad behaving client could be publishingParticipants$ + // multiple times to several livekit rooms. + { participant: LivekitParticipant; connection: Connection }[] +>; + +// - write test for scopes (do we really need to bind scope) +export class ConnectionManager { + /** + * The transport to use for publishing. + * This extends the list of tranports + */ + private publishTransport$ = new BehaviorSubject( + undefined, + ); + + private transportSubscriptions$ = new BehaviorSubject< + Behavior[] + >([]); + + private transports$ = this.scope.behavior( + this.transportSubscriptions$.pipe( + switchMap((subscriptions) => + combineLatest(subscriptions.map((s) => s.transports)).pipe( + map((transportsNested) => transportsNested.flat()), + map(removeDuplicateTransports), + ), + ), + ), + ); + + public constructor( + private client: MatrixClient, + private e2eeLivekitOptions: () => E2EEOptions | undefined, + private scope: ObservableScope, + private logger?: Logger, + private livekitRoomFactory?: () => LivekitRoom, + ) { + this.scope = scope; + } + + public getOrCreatePublishConnection( + transport: LivekitTransport, + ): Connection | undefined { + this.publishTransport$.next(transport); + const equalsRequestedTransport = (c: Connection): boolean => + areLivekitTransportsEqual(c.transport, transport); + return this.connections$.value.find(equalsRequestedTransport); + } + /** + * Connections for each transport in use by one or more session members. + */ + private readonly connections$ = this.scope.behavior( + generateKeyed$( + this.transports$, + (transports, createOrGet) => { + const createConnection = + ( + transport: LivekitTransport, + ): ((scope: ObservableScope) => RemoteConnection) => + (scope) => { + const connection = new RemoteConnection( + { + transport, + client: this.client, + scope: scope, + livekitRoomFactory: this.livekitRoomFactory, + }, + this.e2eeLivekitOptions(), + ); + void connection.start(); + return connection; + }; + + const connections = transports.map((transport) => { + const key = + transport.livekit_service_url + "|" + transport.livekit_alias; + return createOrGet(key, createConnection(transport)); + }); + + return connections; + }, + ), + ); + + /** + * + * @param transports$ + */ + public registerTransports( + transports$: Behavior, + ): Connection[] { + if (!this.transportSubscriptions$.value.some((t$) => t$ === transports$)) { + this.transportSubscriptions$.next( + this.transportSubscriptions$.value.concat(transports$), + ); + } + // After updating the subscriptions our connection list is also updated. + return transports$.value + .map((transport) => { + const isConnectionForTransport = (connection: Connection): boolean => + areLivekitTransportsEqual(connection.transport, transport); + return this.connections$.value.find(isConnectionForTransport); + }) + .filter((c) => c !== undefined); + } + + public unregisterTransports( + transports$: Behavior, + ): boolean { + const subscriptions = this.transportSubscriptions$.value; + const subscriptionsUnregistered = subscriptions.filter( + (t$) => t$ !== transports$, + ); + const canUnregister = + subscriptions.length !== subscriptionsUnregistered.length; + if (canUnregister) + this.transportSubscriptions$.next(subscriptionsUnregistered); + return canUnregister; + } + + public unregisterAllTransports(): void { + this.transportSubscriptions$.next([]); + } + + // We have a lost of connections, for each of these these + // connection we create a stream of (participant, connection) tuples. + // Then we combine the several streams (1 per Connection) into a single stream of tuples. + private allParticipantsWithConnection$ = this.scope.behavior( + this.connections$.pipe( + switchMap((connections) => { + const listsOfParticipantWithConnection = connections.map( + (connection) => { + return connection.participantsWithPublishTrack$.pipe( + map((participants) => + participants.map((p) => ({ + participant: p, + connection, + })), + ), + ); + }, + ); + return combineLatest(listsOfParticipantWithConnection).pipe( + map((lists) => lists.flatMap((list) => list)), + ); + }), + ), + ); + + // Filters the livekit participants + public allParticipantsByMemberId$ = this.scope.behavior( + this.allParticipantsWithConnection$.pipe( + map((participantsWithConnections) => { + const participantsByMemberId = participantsWithConnections.reduce( + (acc, test) => { + const { participant, connection } = test; + if (participant.getTrackPublications().length > 0) { + const currentVal = acc.get(participant.identity); + if (!currentVal) { + acc.set(participant.identity, [{ connection, participant }]); + } else { + // already known + // This is user is publishing on several SFUs + currentVal.push({ connection, participant }); + this.logger?.info( + `Participant ${participant.identity} is publishing on several SFUs ${currentVal.join()}`, + ); + } + } + return acc; + }, + new Map() as ParticipantByMemberIdMap, + ); + + return participantsByMemberId; + }), + ), + ); +} +function removeDuplicateTransports( + transports: LivekitTransport[], +): LivekitTransport[] { + return transports.reduce((acc, transport) => { + if (!acc.some((t) => areLivekitTransportsEqual(t, transport))) + acc.push(transport); + return acc; + }, [] as LivekitTransport[]); +} diff --git a/src/state/remoteMembers/matrixLivekitMerger.ts b/src/state/remoteMembers/matrixLivekitMerger.ts index 4cd68663..eb33f5a5 100644 --- a/src/state/remoteMembers/matrixLivekitMerger.ts +++ b/src/state/remoteMembers/matrixLivekitMerger.ts @@ -6,54 +6,22 @@ Please see LICENSE in the repository root for full details. */ import { - LocalParticipant, - Participant, - RemoteParticipant, + type RemoteParticipant, type Participant as LivekitParticipant, - type Room as LivekitRoom, } from "livekit-client"; import { - type MatrixRTCSession, - MatrixRTCSessionEvent, - type CallMembership, - type Transport, - LivekitTransport, isLivekitTransport, - ParticipantId, + type LivekitTransport, + type CallMembership, } from "matrix-js-sdk/lib/matrixrtc"; -import { - combineLatest, - map, - startWith, - switchMap, - type Observable, -} from "rxjs"; +import { combineLatest, map, startWith, type Observable } from "rxjs"; +import type { Room as MatrixRoom, RoomMember } from "matrix-js-sdk"; +// import type { Logger } from "matrix-js-sdk/lib/logger"; +import { type Behavior } from "../Behavior"; import { type ObservableScope } from "../ObservableScope"; -import { type Connection } from "./Connection"; -import { Behavior, constant } from "../Behavior"; -import { Room as MatrixRoom, RoomMember } from "matrix-js-sdk"; +import { type ConnectionManager } from "./ConnectionManager"; import { getRoomMemberFromRtcMember } from "./displayname"; -import { pauseWhen } from "../../utils/observable"; -import { Logger } from "matrix-js-sdk/lib/logger"; - -// TODOs: -// - make ConnectionManager its own actual class -// - write test for scopes (do we really need to bind scope) -class ConnectionManager { - public setTansports(transports$: Behavior): void {} - public readonly connections$: Observable = constant([]); - // connection is used to find the transport (to find matching callmembership) & for the livekitRoom - public readonly participantsByMemberId$: Behavior = - constant(new Map()); -} - -export type ParticipantByMemberIdMap = Map< - ParticipantId, - // It can be an array because a bad behaving client could be publishingParticipants$ - // multiple times to several livekit rooms. - { participant: LivekitParticipant; connection: Connection }[] ->; /** * Represents participant publishing or expected to publish on the connection. @@ -86,21 +54,6 @@ export interface MatrixLivekitItem { // Alternative structure idea: // const livekitMatrixItems$ = (callMemberships$,connectionManager,scope): Observable => { -interface LivekitRoomWithParticipants { - livekitRoom: LivekitRoom; - url: string; // Included for use as a React key - participants: { - // What id is that?? - // Looks like it userId:Deviceid? - id: string; - participant: LocalParticipant | RemoteParticipant | undefined; - // Why do we fetch a full room member here? - // looks like it is only for avatars? - // TODO: Remove that. have some Avatar Provider that can fetch avatar for user ids. - member: RoomMember; - }[]; -} - /** * Combines MatrixRtc and Livekit worlds. * @@ -112,9 +65,13 @@ interface LivekitRoomWithParticipants { * - `remoteMatrixLivekitItems` an observable of MatrixLivekitItem[] to track the remote members and associated livekit data. */ export class MatrixLivekitMerger { - private readonly logger: Logger; + /** + * Stream of all the call members and their associated livekit data (if available). + */ + public matrixLivekitItems$: Behavior; + + // private readonly logger: Logger; - public constructor( private memberships$: Observable, private connectionManager: ConnectionManager, @@ -123,10 +80,64 @@ export class MatrixLivekitMerger { // apparently needed to get a room member to later get the Avatar // => Extract an AvatarService instead? private matrixRoom: MatrixRoom, - parentLogger: Logger, + // parentLogger: Logger, ) { - this.logger = parentLogger.createChildLogger("MatrixLivekitMerger"); - connectionManager.setTansports(this.transports$); + // this.logger = parentLogger.getChild("MatrixLivekitMerger"); + + this.matrixLivekitItems$ = this.scope.behavior( + this.start$().pipe(startWith([])), + ); + } + + // ======================================= + /// PRIVATES + // ======================================= + private start$(): Observable { + const membershipsWithTransport$ = + this.mapMembershipsToMembershipWithTransport$(); + + this.startFeedingConnectionManager(membershipsWithTransport$); + + return combineLatest([ + membershipsWithTransport$, + this.connectionManager.allParticipantsByMemberId$, + ]).pipe( + map(([memberships, participantsByMemberId]) => { + const items = memberships.map(({ membership, transport }) => { + const participantsWithConnection = participantsByMemberId.get( + membership.membershipID, + ); + const participant = + transport && + participantsWithConnection?.find((p) => + areLivekitTransportsEqual(p.connection.transport, transport), + ); + return { + livekitParticipant: participant, + membership, + // This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) + member: + // Why a member error? if we have a call membership there is a room member + getRoomMemberFromRtcMember(membership, this.matrixRoom)?.member, + } as MatrixLivekitItem; + }); + return items; + }), + ); + } + + private startFeedingConnectionManager( + membershipsWithTransport$: Behavior< + { membership: CallMembership; transport?: LivekitTransport }[] + >, + ): void { + const transports$ = this.scope.behavior( + membershipsWithTransport$.pipe( + map((mts) => mts.flatMap(({ transport: t }) => (t ? [t] : []))), + ), + ); + // duplicated transports will be elimiated by the connection manager + this.connectionManager.registerTransports(transports$); } /** @@ -137,127 +148,30 @@ export class MatrixLivekitMerger { * together when it might change together is what you have to do in RxJS to * avoid reading inconsistent state or observing too many changes.) */ - private readonly membershipsWithTransport$ = this.scope.behavior( - this.memberships$.pipe( - map((memberships) => { - return memberships.map((membership) => { - const oldestMembership = memberships[0] ?? membership; - const transport = membership.getTransport(oldestMembership); - return { - membership, - transport: isLivekitTransport(transport) ? transport : undefined, - }; - }); - }), - ), - ); - - private readonly transports$ = this.scope.behavior( - this.membershipsWithTransport$.pipe( - map((membershipsWithTransport) => - membershipsWithTransport.reduce((acc, { transport }) => { - if ( - transport && - !acc.some((t) => areLivekitTransportsEqual(t, transport)) - ) { - acc.push(transport); - } - return acc; - }, [] as LivekitTransport[]), - ), - ), - ); - - // TODO move this over this the connection manager - // We have a lost of connections, for each of these these - // connection we create a stream of (participant, connection) tuples. - // Then we combine the several streams (1 per Connection) into a single stream of tuples. - private participantsWithConnection$ = - this.connectionManager.connections$.pipe( - switchMap((connections) => { - const listsOfParticipantWithConnection = connections.map( - (connection) => { - return connection.participantsWithPublishTrack$.pipe( - map((participants) => - participants.map((p) => ({ - participant: p, - connection, - })), - ), - ); - }, - ); - return combineLatest(listsOfParticipantWithConnection).pipe( - map((lists) => lists.flatMap((list) => list)), - ); - }), - ); - - // TODO move this over this the connection manager - // Filters the livekit partic - private participantsByMemberId$ = this.participantsWithConnection$.pipe( - map((participantsWithConnections) => { - const participantsByMemberId = participantsWithConnections.reduce( - (acc, test) => { - const { participant, connection } = test; - if (participant.getTrackPublications().length > 0) { - const currentVal = acc.get(participant.identity); - if (!currentVal) { - acc.set(participant.identity, [{ connection, participant }]); - } else { - // already known - // This is user is publishing on several SFUs - currentVal.push({ connection, participant }); - this.logger.info( - `Participant ${participant.identity} is publishing on several SFUs ${currentVal.join()}`, - ); - } - } - return acc; - }, - new Map() as ParticipantByMemberIdMap, - ); - - return participantsByMemberId; - }), - ); - - public readonly matrixLivekitItems$ = this.scope - .behavior( - combineLatest([ - this.membershipsWithTransport$, - this.participantsByMemberId$, - ]).pipe( - map(([memberships, participantsByMemberId]) => { - const items = memberships.map(({ membership, transport }) => { - const participantsWithConnection = participantsByMemberId.get( - membership.membershipID, - ); - const participant = - transport && - participantsWithConnection?.find((p) => - areLivekitTransportsEqual(p.connection.transport, transport), - ); + private mapMembershipsToMembershipWithTransport$(): Observable< + { membership: CallMembership; transport?: LivekitTransport }[] + > { + return this.scope.behavior( + this.memberships$.pipe( + map((memberships) => { + return memberships.map((membership) => { + const oldestMembership = memberships[0] ?? membership; + const transport = membership.getTransport(oldestMembership); return { - livekitParticipant: participant, membership, - // This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) - member: - // Why a member error? if we have a call membership there is a room member - getRoomMemberFromRtcMember(membership, this.matrixRoom)?.member, - } as MatrixLivekitItem; + transport: isLivekitTransport(transport) ? transport : undefined, + }; }); - return items; }), ), - ) - .pipe(startWith([])); + ); + } } // TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$) // TODO add this to the JS-SDK -function areLivekitTransportsEqual( +export function areLivekitTransportsEqual( t1: LivekitTransport, t2: LivekitTransport, ): boolean {