refactor connection

Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo K
2025-08-28 15:32:46 +02:00
parent e4a54e3a19
commit 802ebf828d
2 changed files with 82 additions and 69 deletions

View File

@@ -63,6 +63,7 @@ import {
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { import {
type CallMembership, type CallMembership,
isLivekitFocus,
isLivekitFocusConfig, isLivekitFocusConfig,
type LivekitFocusConfig, type LivekitFocusConfig,
type MatrixRTCSession, type MatrixRTCSession,
@@ -476,11 +477,11 @@ export class CallViewModel extends ViewModel {
(focus) => (focus) =>
new PublishConnection( new PublishConnection(
this.localConnectionLivekitRoom, this.localConnectionLivekitRoom,
focus.livekit_service_url, focus,
this.livekitAlias, this.livekitAlias,
this.matrixRTCSession.room.client, this.matrixRTCSession.room.client,
this.scope, this.scope,
this.matrixRTCSession, this.membershipsAndFocusMap$,
), ),
); );
@@ -494,53 +495,67 @@ export class CallViewModel extends ViewModel {
), ),
); );
private readonly foci$ = this.memberships$.pipe( private readonly membershipsAndFocusMap$ = this.scope.behavior(
map( this.memberships$.pipe(
(memberships) => map((memberships) =>
new Set( memberships.flatMap((m) => {
membershipsFocusUrl(memberships, this.matrixRTCSession).map( const f = this.matrixRTCSession.resolveActiveFocus(m);
(f) => f.livekit_service_url, return f && isLivekitFocus(f) ? [{ membership: m, focus: f }] : [];
), }),
), ),
), ),
); );
private readonly focusServiceUrls$ = this.membershipsAndFocusMap$.pipe(
map((v) => new Set(v.map(({ focus }) => focus.livekit_service_url))),
);
private readonly remoteConnections$ = this.scope.behavior( private readonly remoteConnections$ = this.scope.behavior(
combineLatest([this.localFocus, this.foci$]).pipe( combineLatest([this.localFocus, this.focusServiceUrls$]).pipe(
accumulate(new Map<string, Connection>(), (prev, [localFocus, foci]) => { accumulate(
const stopped = new Map(prev); new Map<string, Connection>(),
const next = new Map<string, Connection>(); (prev, [localFocus, focusUrls]) => {
for (const focus of foci) { const stopped = new Map(prev);
if (focus !== localFocus.livekit_service_url) { const next = new Map<string, Connection>();
stopped.delete(focus); for (const focusUrl of focusUrls) {
if (focusUrl !== localFocus.livekit_service_url) {
stopped.delete(focusUrl);
let nextConnection = prev.get(focus); let nextConnection = prev.get(focusUrl);
if (!nextConnection) { if (!nextConnection) {
logger.log( logger.log(
"SFU remoteConnections$ construct new connection: ", "SFU remoteConnections$ construct new connection: ",
focus, focusUrl,
); );
nextConnection = new Connection( nextConnection = new Connection(
new LivekitRoom({ new LivekitRoom({
...defaultLiveKitOptions, ...defaultLiveKitOptions,
e2ee: this.e2eeOptions, e2ee: this.e2eeOptions,
}), }),
focus, {
this.livekitAlias, livekit_service_url: focusUrl,
this.matrixRTCSession.room.client, livekit_alias: this.livekitAlias,
this.scope, type: "livekit",
this.matrixRTCSession, },
); this.livekitAlias,
} else { this.matrixRTCSession.room.client,
logger.log("SFU remoteConnections$ use prev connection: ", focus); this.scope,
this.membershipsAndFocusMap$,
);
} else {
logger.log(
"SFU remoteConnections$ use prev connection: ",
focusUrl,
);
}
next.set(focusUrl, nextConnection);
} }
next.set(focus, nextConnection);
} }
}
for (const connection of stopped.values()) connection.stop(); for (const connection of stopped.values()) connection.stop();
return next; return next;
}), },
),
), ),
); );
@@ -652,11 +667,11 @@ export class CallViewModel extends ViewModel {
(localConnection, remoteConnections) => { (localConnection, remoteConnections) => {
const remoteConnectionsParticipants = [ const remoteConnectionsParticipants = [
...remoteConnections.values(), ...remoteConnections.values(),
].map((c) => c.publishingParticipants$(this.memberships$)); ].map((c) => c.publishingParticipants$);
return combineLatest( return combineLatest(
[ [
localConnection.publishingParticipants$(this.memberships$), localConnection.publishingParticipants$,
...remoteConnectionsParticipants, ...remoteConnectionsParticipants,
], ],
(...ps) => ps.flat(1), (...ps) => ps.flat(1),

View File

@@ -13,20 +13,19 @@ import {
} from "livekit-client"; } from "livekit-client";
import { type MatrixClient } from "matrix-js-sdk"; import { type MatrixClient } from "matrix-js-sdk";
import { import {
type LivekitFocus,
type CallMembership, type CallMembership,
type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, type Observable } from "rxjs"; import { combineLatest, map, type Observable } from "rxjs";
import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; import { getSFUConfigWithOpenID } from "../livekit/openIDSFU";
import { type Behavior } from "./Behavior"; import { type Behavior } from "./Behavior";
import { membershipsFocusUrl } from "./CallViewModel";
import { type ObservableScope } from "./ObservableScope"; import { type ObservableScope } from "./ObservableScope";
export class Connection { export class Connection {
protected readonly sfuConfig = getSFUConfigWithOpenID( protected readonly sfuConfig = getSFUConfigWithOpenID(
this.client, this.client,
this.serviceUrl, this.focus.livekit_service_url,
this.livekitAlias, this.livekitAlias,
); );
@@ -48,42 +47,41 @@ export class Connection {
[], [],
); );
public readonly publishingParticipants$ = ( public readonly publishingParticipants$: Observable<RemoteParticipant[]> =
memberships$: Behavior<CallMembership[]>,
): Observable<RemoteParticipant[]> =>
this.scope.behavior( this.scope.behavior(
combineLatest([ combineLatest([
connectedParticipantsObserver(this.livekitRoom), connectedParticipantsObserver(this.livekitRoom),
memberships$, this.membershipsFocusMap$,
]).pipe( ]).pipe(
map(([participants, memberships]) => { map(([participants, membershipsFocusMap]) =>
const publishingMembers = membershipsFocusUrl( membershipsFocusMap
memberships, // Find all members that claim to publish on this connection
this.matrixRTCSession, .flatMap(({ membership, focus }) =>
) focus.livekit_service_url === this.focus.livekit_service_url
.filter((f) => f.livekit_service_url === this.serviceUrl) ? [membership]
.map((f) => f.membership); : [],
)
const publishingP = publishingMembers // Find all associated publishing livekit participant objects
.map((m) => { .flatMap(({ sender, deviceId }) => {
return participants.find((p) => { const participant = participants.find(
return p.identity === `${m.sender}:${m.deviceId}`; (p) => p.identity === `${sender}:${deviceId}`,
}); );
}) return participant ? [participant] : [];
.filter((p): p is RemoteParticipant => !!p); }),
return publishingP; ),
}),
), ),
[], [],
); );
public constructor( public constructor(
protected readonly livekitRoom: LivekitRoom, protected readonly livekitRoom: LivekitRoom,
protected readonly serviceUrl: string, protected readonly focus: LivekitFocus,
protected readonly livekitAlias: string, protected readonly livekitAlias: string,
protected readonly client: MatrixClient, protected readonly client: MatrixClient,
protected readonly scope: ObservableScope, protected readonly scope: ObservableScope,
protected readonly matrixRTCSession: MatrixRTCSession, protected readonly membershipsFocusMap$: Behavior<
{ membership: CallMembership; focus: LivekitFocus }[]
>,
) {} ) {}
} }