diff --git a/src/state/remoteMembers/matrixLivekitMerger.ts b/src/state/remoteMembers/matrixLivekitMerger.ts index 935f36cb..ef2fb852 100644 --- a/src/state/remoteMembers/matrixLivekitMerger.ts +++ b/src/state/remoteMembers/matrixLivekitMerger.ts @@ -31,17 +31,18 @@ import { import { type ObservableScope } from "../ObservableScope"; import { type Connection } from "./Connection"; -import { Behavior } from "../Behavior"; -import { RoomMember } from "matrix-js-sdk"; +import { Behavior, constant } from "../Behavior"; +import { Room as MatrixRoom, RoomMember } from "matrix-js-sdk"; import { getRoomMemberFromRtcMember } from "./displayname"; - +import { pauseWhen } from "../../utils/observable"; // TODOs: // - make ConnectionManager its own actual class // - write test for scopes (do we really need to bind scope) class ConnectionManager { constructor(transports$: Observable) {} - public readonly connections$: Observable; + public startWithMemberships(memberships$: Behavior) {} + public readonly connections$: Observable = constant([]); } /** @@ -51,16 +52,14 @@ class ConnectionManager { */ export interface MatrixLivekitItem { - callMembership: CallMembership; + membership: CallMembership; livekitParticipant?: LivekitParticipant; + member?: RoomMember; } // Alternative structure idea: // const livekitMatrixItems$ = (callMemberships$,connectionManager,scope): Observable => { - // Map of Connection -> to (callMembership, LivekitParticipant?)) -type participants = {participant: LocalParticipant | RemoteParticipant}[] - interface LivekitRoomWithParticipants { livekitRoom: LivekitRoom; url: string; // Included for use as a React key @@ -87,14 +86,12 @@ interface LivekitRoomWithParticipants { * - `remoteMatrixLivekitItems` an observable of MatrixLivekitItem[] to track the remote members and associated livekit data. */ export class MatrixLivekitMerger { - public remoteMatrixLivekitItems$: Observable; - /** * The MatrixRTC session participants. */ // Note that MatrixRTCSession already filters the call memberships by users // that are joined to the room; we don't need to perform extra filtering here. - private readonly memberships$ = this.scope.behavior( + public readonly memberships$ = this.scope.behavior( fromEvent( this.matrixRTCSession, MatrixRTCSessionEvent.MembershipsChanged, @@ -108,16 +105,9 @@ export class MatrixLivekitMerger { private matrixRTCSession: MatrixRTCSession, private connectionManager: ConnectionManager, private scope: ObservableScope, + private matrixRoom: MatrixRoom, ) { - const publishingParticipants$ = combineLatest([ - this.memberships$, - connectionManager.connections$, - ]).pipe(map(), this.scope.bind()); - this.remoteMatrixLivekitItems$ = combineLatest([ - callMemberships$, - connectionManager.connections$, - ]).pipe(this.scope.bind()); - // Implementation goes here + connectionManager.startWithMemberships(this.memberships$); } /** @@ -128,6 +118,7 @@ export class MatrixLivekitMerger { * together when it might change together is what you have to do in RxJS to * avoid reading inconsistent state or observing too many changes.) */ + // TODO pass this over to our conncetions private readonly membershipsWithTransport$: Behavior<{ membership: CallMembership; transport?: LivekitTransport; @@ -137,63 +128,47 @@ export class MatrixLivekitMerger { const oldestMembership = this.matrixRTCSession.getOldestMembership(); memberships.map((membership) => { - let transport = membership.getTransport(oldestMembership ?? membership) - return { membership, transport: isLivekitTransport(transport) ? transport : undefined }; - }) + let transport = membership.getTransport( + oldestMembership ?? membership, + ); + return { + membership, + transport: isLivekitTransport(transport) ? transport : undefined, + }; + }); }), ), ); - /** - * Lists the transports used by each MatrixRTC session member other than - * ourselves. - */ - // private readonly remoteTransports$ = this.scope.behavior( - // this.membershipsWithTransport$.pipe( - // map((transports) => transports?.remote ?? []), - // ), - // ); - - /** - * Lists, for each LiveKit room, the LiveKit participants whose media should - * be presented. - */ - private readonly participantsByRoom$ = this.scope.behavior( - // TODO: Move this logic into Connection/PublishConnection if possible - - this.connectionManager.connections$.pipe( - switchMap((connections) => { - connections.map((c)=>c.publishingParticipants$.pipe( - map((publishingParticipants) => { - const participants: { - id: string; - participant: LivekitParticipant | undefined; - member: RoomMember; - }[] = publishingParticipants.map(({ participant, membership }) => ({ - // TODO update to UUID - id: `${membership.userId}:${membership.deviceId}`, - participant, - // This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) - member: - getRoomMemberFromRtcMember( - membership, - this.matrixRoom, - )?.member ?? memberError(), - })); - - return { - livekitRoom: c.livekitRoom, - url: c.transport.livekit_service_url, - participants, - }; - }), - ), - ), - ), - ), - ); - }), - ) - .pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$)), + private allPublishingParticipants$ = this.connectionManager.connections$.pipe( + switchMap((connections) => { + const listOfPublishingParticipants = connections.map( + (connection) => connection.publishingParticipants$, + ); + return combineLatest(listOfPublishingParticipants).pipe( + map((list) => list.flatMap((innerList) => innerList)), + ); + }), ); + + public readonly matrixLivekitItems$ = this.scope + .behavior( + this.allPublishingParticipants$.pipe( + map((participants) => { + const matrixLivekitItems: MatrixLivekitItem[] = participants.map( + ({ participant, membership }) => ({ + participant, + membership, + id: `${membership.userId}:${membership.deviceId}`, + // This makes sense to add the the js-sdk callMembership (we only need the avatar so probably the call memberhsip just should aquire the avatar) + member: + getRoomMemberFromRtcMember(membership, this.matrixRoom) + ?.member ?? memberError(), + }), + ); + return matrixLivekitItems; + }), + ), + ) + .pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$)); }