diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 72351042..6c6301ad 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -66,6 +66,7 @@ import { logger } from "matrix-js-sdk/lib/logger"; import { type CallMembership, isLivekitFocusConfig, + LivekitFocusConfig, type MatrixRTCSession, MatrixRTCSessionEvent, type MatrixRTCSessionEventHandlerMap, @@ -490,7 +491,6 @@ class Connection { for (const track of tracks) { await this.livekitRoom.localParticipant.publishTrack(track); } - // await this.livekitRoom.localParticipant.enableCameraAndMicrophone(); } } @@ -501,17 +501,45 @@ class Connection { this.stopped = true; } - public readonly participants$ = this.scope.behavior( + public readonly participantsIncludingJustSubscribers$ = this.scope.behavior( connectedParticipantsObserver(this.livekitRoom), [], ); + public readonly publishingParticipants$ = ( + memberships$: Behavior, + ): Observable => + this.scope.behavior( + combineLatest([ + connectedParticipantsObserver(this.livekitRoom), + memberships$, + ]).pipe( + map(([participants, memberships]) => { + const publishingMembers = membershipsFocusUrl( + memberships, + this.matrixRTCSession, + ) + .filter((f) => f.livekit_service_url === this.serviceUrl) + .map((f) => f.membership); + return publishingMembers + .map((m) => + participants.find( + (p) => p.identity === `${m.sender}:${m.deviceId}`, + ), + ) + .filter((p): p is RemoteParticipant => !!p); + }), + ), + [], + ); + public constructor( private readonly livekitRoom: LivekitRoom, private readonly serviceUrl: string, private readonly livekitAlias: string, private readonly client: MatrixClient, private readonly scope: ObservableScope, + private readonly matrixRTCSession: MatrixRTCSession, ) {} } @@ -523,7 +551,7 @@ export class CallViewModel extends ViewModel { private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession); - private readonly livekitRoom = new LivekitRoom({ + private readonly localConnectionLivekitRoom = new LivekitRoom({ ...defaultLiveKitOptions, e2ee: this.e2eeOptions, }); @@ -533,11 +561,12 @@ export class CallViewModel extends ViewModel { private readonly localConnection = this.localFocus.then( (focus) => new Connection( - this.livekitRoom, + this.localConnectionLivekitRoom, focus.livekit_service_url, this.livekitAlias, this.matrixRTCSession.room.client, this.scope, + this.matrixRTCSession, ), ); @@ -553,10 +582,9 @@ export class CallViewModel extends ViewModel { map( (memberships) => new Set( - memberships - .map((m) => this.matrixRTCSession.resolveActiveFocus(m)) - .filter((f) => f !== undefined && isLivekitFocusConfig(f)) - .map((f) => f.livekit_service_url), + membershipsFocusUrl(memberships, this.matrixRTCSession).map( + (f) => f.livekit_service_url, + ), ), ), ); @@ -584,6 +612,7 @@ export class CallViewModel extends ViewModel { this.livekitAlias, this.matrixRTCSession.room.client, this.scope, + this.matrixRTCSession, ), ); } @@ -698,7 +727,7 @@ export class CallViewModel extends ViewModel { private readonly remoteParticipants$ = this.scope .behavior< RemoteParticipant[] - >(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participants$, ...[...remoteConnections.values()].map((c) => c.participants$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([]))) + >(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participantsIncludingJustSubscribers$, ...[...remoteConnections.values()].map((c) => c.participantsIncludingJustSubscribers$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([]))) .pipe(pauseWhen(this.pretendToBeDisconnected$)); private readonly memberships$ = this.scope.behavior( @@ -781,7 +810,7 @@ export class CallViewModel extends ViewModel { private readonly mediaItems$ = this.scope.behavior( combineLatest([ this.remoteParticipants$, - observeParticipantMedia(this.livekitRoom.localParticipant), + observeParticipantMedia(this.localConnectionLivekitRoom.localParticipant), duplicateTiles.value$, this.memberships$, showNonMemberTiles.value$, @@ -849,7 +878,7 @@ export class CallViewModel extends ViewModel { member, participant, this.options.encryptionSystem, - this.livekitRoom, + this.localConnectionLivekitRoom, this.mediaDevices, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( @@ -874,7 +903,7 @@ export class CallViewModel extends ViewModel { member, participant, this.options.encryptionSystem, - this.livekitRoom, + this.localConnectionLivekitRoom, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( map((m) => m.get(matrixIdentifier) ?? "[👻]"), @@ -916,7 +945,7 @@ export class CallViewModel extends ViewModel { undefined, participant, this.options.encryptionSystem, - this.livekitRoom, + this.localConnectionLivekitRoom, this.mediaDevices, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( @@ -1862,7 +1891,7 @@ export class CallViewModel extends ViewModel { // that our own media is displayed on screen. this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => { const publications = - this.livekitRoom.localParticipant.trackPublications.values(); + this.localConnectionLivekitRoom.localParticipant.trackPublications.values(); if (connected) { for (const p of publications) { if (p.track?.isUpstreamPaused === true) { @@ -1904,3 +1933,22 @@ export class CallViewModel extends ViewModel { this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked? } } + +const membershipsFocusUrl = ( + memberships: CallMembership[], + matrixRTCSession: MatrixRTCSession, +): { livekit_service_url: string; membership: CallMembership }[] => { + return memberships + .map( + (m) => + [matrixRTCSession.resolveActiveFocus(m), m] as [ + LivekitFocusConfig | undefined, + CallMembership, + ], + ) + .filter(([f, _]) => f !== undefined && isLivekitFocusConfig(f)) + .map(([f, m]) => ({ + livekit_service_url: f!.livekit_service_url, + membership: m, + })); +};