introduce publishingParticipants$

Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo K
2025-08-27 18:41:03 +02:00
parent 6bdfd7fbd8
commit 55b46b3f33

View File

@@ -66,6 +66,7 @@ import { logger } from "matrix-js-sdk/lib/logger";
import {
type CallMembership,
isLivekitFocusConfig,
LivekitFocusConfig,
type MatrixRTCSession,
MatrixRTCSessionEvent,
type MatrixRTCSessionEventHandlerMap,
@@ -490,7 +491,6 @@ class Connection {
for (const track of tracks) {
await this.livekitRoom.localParticipant.publishTrack(track);
}
// await this.livekitRoom.localParticipant.enableCameraAndMicrophone();
}
}
@@ -501,17 +501,45 @@ class Connection {
this.stopped = true;
}
public readonly participants$ = this.scope.behavior(
public readonly participantsIncludingJustSubscribers$ = this.scope.behavior(
connectedParticipantsObserver(this.livekitRoom),
[],
);
public readonly publishingParticipants$ = (
memberships$: Behavior<CallMembership[]>,
): Observable<RemoteParticipant[]> =>
this.scope.behavior(
combineLatest([
connectedParticipantsObserver(this.livekitRoom),
memberships$,
]).pipe(
map(([participants, memberships]) => {
const publishingMembers = membershipsFocusUrl(
memberships,
this.matrixRTCSession,
)
.filter((f) => f.livekit_service_url === this.serviceUrl)
.map((f) => f.membership);
return publishingMembers
.map((m) =>
participants.find(
(p) => p.identity === `${m.sender}:${m.deviceId}`,
),
)
.filter((p): p is RemoteParticipant => !!p);
}),
),
[],
);
public constructor(
private readonly livekitRoom: LivekitRoom,
private readonly serviceUrl: string,
private readonly livekitAlias: string,
private readonly client: MatrixClient,
private readonly scope: ObservableScope,
private readonly matrixRTCSession: MatrixRTCSession,
) {}
}
@@ -523,7 +551,7 @@ export class CallViewModel extends ViewModel {
private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession);
private readonly livekitRoom = new LivekitRoom({
private readonly localConnectionLivekitRoom = new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
});
@@ -533,11 +561,12 @@ export class CallViewModel extends ViewModel {
private readonly localConnection = this.localFocus.then(
(focus) =>
new Connection(
this.livekitRoom,
this.localConnectionLivekitRoom,
focus.livekit_service_url,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.matrixRTCSession,
),
);
@@ -553,10 +582,9 @@ export class CallViewModel extends ViewModel {
map(
(memberships) =>
new Set(
memberships
.map((m) => this.matrixRTCSession.resolveActiveFocus(m))
.filter((f) => f !== undefined && isLivekitFocusConfig(f))
.map((f) => f.livekit_service_url),
membershipsFocusUrl(memberships, this.matrixRTCSession).map(
(f) => f.livekit_service_url,
),
),
),
);
@@ -584,6 +612,7 @@ export class CallViewModel extends ViewModel {
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
this.matrixRTCSession,
),
);
}
@@ -698,7 +727,7 @@ export class CallViewModel extends ViewModel {
private readonly remoteParticipants$ = this.scope
.behavior<
RemoteParticipant[]
>(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participants$, ...[...remoteConnections.values()].map((c) => c.participants$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([])))
>(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participantsIncludingJustSubscribers$, ...[...remoteConnections.values()].map((c) => c.participantsIncludingJustSubscribers$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([])))
.pipe(pauseWhen(this.pretendToBeDisconnected$));
private readonly memberships$ = this.scope.behavior(
@@ -781,7 +810,7 @@ export class CallViewModel extends ViewModel {
private readonly mediaItems$ = this.scope.behavior<MediaItem[]>(
combineLatest([
this.remoteParticipants$,
observeParticipantMedia(this.livekitRoom.localParticipant),
observeParticipantMedia(this.localConnectionLivekitRoom.localParticipant),
duplicateTiles.value$,
this.memberships$,
showNonMemberTiles.value$,
@@ -849,7 +878,7 @@ export class CallViewModel extends ViewModel {
member,
participant,
this.options.encryptionSystem,
this.livekitRoom,
this.localConnectionLivekitRoom,
this.mediaDevices,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
@@ -874,7 +903,7 @@ export class CallViewModel extends ViewModel {
member,
participant,
this.options.encryptionSystem,
this.livekitRoom,
this.localConnectionLivekitRoom,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
map((m) => m.get(matrixIdentifier) ?? "[👻]"),
@@ -916,7 +945,7 @@ export class CallViewModel extends ViewModel {
undefined,
participant,
this.options.encryptionSystem,
this.livekitRoom,
this.localConnectionLivekitRoom,
this.mediaDevices,
this.pretendToBeDisconnected$,
this.memberDisplaynames$.pipe(
@@ -1862,7 +1891,7 @@ export class CallViewModel extends ViewModel {
// that our own media is displayed on screen.
this.matrixConnected$.pipe(this.scope.bind()).subscribe((connected) => {
const publications =
this.livekitRoom.localParticipant.trackPublications.values();
this.localConnectionLivekitRoom.localParticipant.trackPublications.values();
if (connected) {
for (const p of publications) {
if (p.track?.isUpstreamPaused === true) {
@@ -1904,3 +1933,22 @@ export class CallViewModel extends ViewModel {
this.join(); // TODO-MULTI-SFU: Use this view model for the lobby as well, and only call this once 'join' is clicked?
}
}
const membershipsFocusUrl = (
memberships: CallMembership[],
matrixRTCSession: MatrixRTCSession,
): { livekit_service_url: string; membership: CallMembership }[] => {
return memberships
.map(
(m) =>
[matrixRTCSession.resolveActiveFocus(m), m] as [
LivekitFocusConfig | undefined,
CallMembership,
],
)
.filter(([f, _]) => f !== undefined && isLivekitFocusConfig(f))
.map(([f, m]) => ({
livekit_service_url: f!.livekit_service_url,
membership: m,
}));
};