refactor: Remote / Publish Connection and constructor

This commit is contained in:
Valere
2025-09-30 17:02:48 +02:00
parent edf68d16b7
commit b00f7d5409
4 changed files with 104 additions and 93 deletions

View File

@@ -31,7 +31,6 @@ import {
VolumeOnSolidIcon, VolumeOnSolidIcon,
} from "@vector-im/compound-design-tokens/assets/web/icons"; } from "@vector-im/compound-design-tokens/assets/web/icons";
import { useTranslation } from "react-i18next"; import { useTranslation } from "react-i18next";
import { ConnectionState } from "livekit-client";
import LogoMark from "../icons/LogoMark.svg?react"; import LogoMark from "../icons/LogoMark.svg?react";
import LogoType from "../icons/LogoType.svg?react"; import LogoType from "../icons/LogoType.svg?react";

View File

@@ -122,7 +122,7 @@ import {
} from "../rtcSessionHelpers"; } from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType"; import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
import { Connection } from "./Connection"; import { type Connection, type ConnectionOpts, RemoteConnection } from "./Connection";
import { type MuteStates } from "./MuteStates"; import { type MuteStates } from "./MuteStates";
import { getUrlParams } from "../UrlParams"; import { getUrlParams } from "../UrlParams";
import { type ProcessorState } from "../livekit/TrackProcessorContext"; import { type ProcessorState } from "../livekit/TrackProcessorContext";
@@ -453,18 +453,21 @@ export class CallViewModel extends ViewModel {
private readonly localFocus = makeFocus(this.matrixRTCSession); private readonly localFocus = makeFocus(this.matrixRTCSession);
private readonly localConnection = this.localFocus.then( private readonly localConnection = this.localFocus.then(
(focus) => (focus) => {
new PublishConnection( const args: ConnectionOpts = {
focus, focus,
this.livekitAlias, client: this.matrixRTCSession.room.client,
this.matrixRTCSession.room.client, scope: this.scope,
this.scope, membershipsFocusMap$: this.membershipsAndFocusMap$,
this.membershipsAndFocusMap$, }
return new PublishConnection(
args,
this.mediaDevices, this.mediaDevices,
this.muteStates, this.muteStates,
this.e2eeLivekitOptions(), this.e2eeLivekitOptions(),
this.scope.behavior(this.trackProcessorState$), this.scope.behavior(this.trackProcessorState$),
), )
}
); );
public readonly livekitConnectionState$ = this.scope.behavior( public readonly livekitConnectionState$ = this.scope.behavior(
@@ -521,18 +524,17 @@ export class CallViewModel extends ViewModel {
"SFU remoteConnections$ construct new connection: ", "SFU remoteConnections$ construct new connection: ",
focusUrl, focusUrl,
); );
nextConnection = new Connection( const args: ConnectionOpts = {
{ focus: {
type: "livekit",
livekit_service_url: focusUrl, livekit_service_url: focusUrl,
livekit_alias: this.livekitAlias, livekit_alias: this.livekitAlias,
type: "livekit",
}, },
this.livekitAlias, client: this.matrixRTCSession.room.client,
this.matrixRTCSession.room.client, scope: this.scope,
this.scope, membershipsFocusMap$: this.membershipsAndFocusMap$,
this.membershipsAndFocusMap$, }
this.e2eeLivekitOptions(), nextConnection = new RemoteConnection(args, this.e2eeLivekitOptions());
);
} else { } else {
logger.log( logger.log(
"SFU remoteConnections$ use prev connection: ", "SFU remoteConnections$ use prev connection: ",

View File

@@ -7,15 +7,24 @@ Please see LICENSE in the repository root for full details.
import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core"; import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core";
import { type ConnectionState, type E2EEOptions, Room as LivekitRoom } from "livekit-client"; import { type ConnectionState, type E2EEOptions, Room as LivekitRoom } from "livekit-client";
import { type MatrixClient } from "matrix-js-sdk";
import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc"; import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest } from "rxjs"; import { combineLatest } from "rxjs";
import { getSFUConfigWithOpenID } from "../livekit/openIDSFU"; import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig } from "../livekit/openIDSFU";
import { type Behavior } from "./Behavior"; import { type Behavior } from "./Behavior";
import { type ObservableScope } from "./ObservableScope"; import { type ObservableScope } from "./ObservableScope";
import { defaultLiveKitOptions } from "../livekit/options"; import { defaultLiveKitOptions } from "../livekit/options";
export interface ConnectionOpts {
/** The focus server to connect to. */
focus: LivekitFocus;
/** The Matrix client to use for OpenID and SFU config requests. */
client: OpenIDClientParts;
/** The observable scope to use for this connection. */
scope: ObservableScope;
/** An observable of the current RTC call memberships and their associated focus. */
membershipsFocusMap$: Behavior<{ membership: CallMembership; focus: LivekitFocus }[]>;
}
/** /**
* A connection to a Matrix RTC LiveKit backend. * A connection to a Matrix RTC LiveKit backend.
* *
@@ -39,10 +48,20 @@ export class Connection {
*/ */
public async start(): Promise<void> { public async start(): Promise<void> {
this.stopped = false; this.stopped = false;
const { url, jwt } = await this.sfuConfig; // TODO could this be loaded earlier to save time?
const { url, jwt } = await this.getSFUConfigWithOpenID();
if (!this.stopped) await this.livekitRoom.connect(url, jwt); if (!this.stopped) await this.livekitRoom.connect(url, jwt);
} }
protected async getSFUConfigWithOpenID(): Promise<SFUConfig> {
return await getSFUConfigWithOpenID(
this.client,
this.targetFocus.livekit_service_url,
this.targetFocus.livekit_alias
)
}
/** /**
* Stops the connection. * Stops the connection.
* *
@@ -55,17 +74,6 @@ export class Connection {
this.stopped = true; this.stopped = true;
} }
protected readonly sfuConfig = getSFUConfigWithOpenID(
this.client,
this.focus.livekit_service_url,
this.focus.livekit_alias
);
/*
* An observable of the participants in the livekit room, including subscribers.
* Converts the livekit room events ParticipantConnected/ParticipantDisconnected/StateChange to an observable.
*/
protected readonly participantsIncludingSubscribers$;
/** /**
* An observable of the participants that are publishing on this connection. * An observable of the participants that are publishing on this connection.
@@ -75,9 +83,9 @@ export class Connection {
public readonly publishingParticipants$; public readonly publishingParticipants$;
/** /**
* The LiveKit room instance. * The focus server to connect to.
*/ */
public readonly livekitRoom: LivekitRoom; protected readonly targetFocus: LivekitFocus;
/** /**
* An observable of the livekit connection state. * An observable of the livekit connection state.
@@ -85,48 +93,39 @@ export class Connection {
*/ */
public connectionState$: Behavior<ConnectionState>; public connectionState$: Behavior<ConnectionState>;
private readonly client: OpenIDClientParts;
/** /**
* Creates a new connection to a matrix RTC LiveKit backend. * Creates a new connection to a matrix RTC LiveKit backend.
* *
* @param livekitRoom - Optional LiveKit room instance to use. If not provided, a new instance will be created. * @param livekitRoom - LiveKit room instance to use.
* @param focus - The focus server to connect to. * @param opts - Connection options {@link ConnectionOpts}.
* @param livekitAlias - The livekit alias to use when connecting to the focus server. TODO duplicate of focus? *
* @param client - The matrix client, used to fetch the OpenId token. TODO refactor to avoid passing the whole client
* @param scope - The observable scope to use for creating observables.
* @param membershipsFocusMap$ - The observable of the current call RTC memberships and their associated focus.
* @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!. TODO refactor to avoid passing the whole options?
*/ */
public constructor( protected constructor(
protected readonly focus: LivekitFocus, public readonly livekitRoom: LivekitRoom,
// TODO : remove livekitAlias, it's already in focus? opts: ConnectionOpts,
protected readonly livekitAlias: string,
protected readonly client: MatrixClient,
protected readonly scope: ObservableScope,
protected readonly membershipsFocusMap$: Behavior<
{ membership: CallMembership; focus: LivekitFocus }[]
>,
e2eeLivekitOptions: E2EEOptions | undefined,
livekitRoom: LivekitRoom | undefined = undefined
) { ) {
this.livekitRoom = const { focus, client, scope, membershipsFocusMap$ } =
livekitRoom ?? opts;
new LivekitRoom({
...defaultLiveKitOptions, this.livekitRoom = livekitRoom
e2ee: e2eeLivekitOptions this.targetFocus = focus;
}); this.client = client;
this.participantsIncludingSubscribers$ = this.scope.behavior(
const participantsIncludingSubscribers$ = scope.behavior(
connectedParticipantsObserver(this.livekitRoom), connectedParticipantsObserver(this.livekitRoom),
[] []
); );
this.publishingParticipants$ = this.scope.behavior( this.publishingParticipants$ = scope.behavior(
combineLatest( combineLatest(
[this.participantsIncludingSubscribers$, this.membershipsFocusMap$], [participantsIncludingSubscribers$, membershipsFocusMap$],
(participants, membershipsFocusMap) => (participants, membershipsFocusMap) =>
membershipsFocusMap membershipsFocusMap
// Find all members that claim to publish on this connection // Find all members that claim to publish on this connection
.flatMap(({ membership, focus }) => .flatMap(({ membership, focus }) =>
focus.livekit_service_url === this.focus.livekit_service_url focus.livekit_service_url === this.targetFocus.livekit_service_url
? [membership] ? [membership]
: [] : []
) )
@@ -141,11 +140,32 @@ export class Connection {
), ),
[] []
); );
this.connectionState$ = this.scope.behavior<ConnectionState>( this.connectionState$ = scope.behavior<ConnectionState>(
connectionStateObserver(this.livekitRoom) connectionStateObserver(this.livekitRoom)
); );
this.scope.onEnd(() => this.stop()); scope.onEnd(() => this.stop());
} }
} }
/**
* A remote connection to the Matrix RTC LiveKit backend.
*
* This connection is used for subscribing to remote participants.
* It does not publish any local tracks.
*/
export class RemoteConnection extends Connection {
/**
* Creates a new remote connection to a matrix RTC LiveKit backend.
* @param opts
* @param sharedE2eeOption - The shared E2EE options to use for the connection.
*/
public constructor(opts: ConnectionOpts, sharedE2eeOption: E2EEOptions | undefined) {
const livekitRoom = new LivekitRoom({
...defaultLiveKitOptions,
e2ee: sharedE2eeOption
});
super(livekitRoom, opts);
}
}

View File

@@ -7,9 +7,6 @@ Please see LICENSE in the repository root for full details.
import { ConnectionState, type E2EEOptions, LocalVideoTrack, Room as LivekitRoom, Track } from "livekit-client"; import { ConnectionState, type E2EEOptions, LocalVideoTrack, Room as LivekitRoom, Track } from "livekit-client";
import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs"; import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs";
import type { CallMembership, LivekitFocus } from "../../../matrix-js-sdk/lib/matrixrtc";
import type { MatrixClient } from "../../../matrix-js-sdk";
import type { ObservableScope } from "./ObservableScope.ts";
import type { Behavior } from "./Behavior.ts"; import type { Behavior } from "./Behavior.ts";
import type { MediaDevices, SelectedDevice } from "./MediaDevices.ts"; import type { MediaDevices, SelectedDevice } from "./MediaDevices.ts";
import type { MuteStates } from "./MuteStates.ts"; import type { MuteStates } from "./MuteStates.ts";
@@ -19,7 +16,7 @@ import { getUrlParams } from "../UrlParams.ts";
import { defaultLiveKitOptions } from "../livekit/options.ts"; import { defaultLiveKitOptions } from "../livekit/options.ts";
import { getValue } from "../utils/observable.ts"; import { getValue } from "../utils/observable.ts";
import { observeTrackReference$ } from "./MediaViewModel.ts"; import { observeTrackReference$ } from "./MediaViewModel.ts";
import { Connection } from "./Connection.ts"; import { Connection, type ConnectionOpts } from "./Connection.ts";
/** /**
* A connection to the publishing LiveKit.e. the local livekit room, the one the user is publishing to. * A connection to the publishing LiveKit.e. the local livekit room, the one the user is publishing to.
@@ -39,8 +36,8 @@ export class PublishConnection extends Connection {
*/ */
public async start(): Promise<void> { public async start(): Promise<void> {
this.stopped = false; this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt); await super.start()
if (!this.stopped) { if (!this.stopped) {
// TODO this can throw errors? It will also prompt for permissions if not already granted // TODO this can throw errors? It will also prompt for permissions if not already granted
@@ -60,29 +57,20 @@ export class PublishConnection extends Connection {
/** /**
* Creates a new PublishConnection. * Creates a new PublishConnection.
* @param focus - The Livekit focus object containing the configuration for the connection. * @param args - The connection options. {@link ConnectionOpts}
* @param livekitAlias - TODO: remove, use focus.livekit_alias instead
* @param client - The Matrix client to use for authentication. TODO: remove only pick OpenIDClientParts
* @param scope - The observable scope to use for managing subscriptions.
* @param membershipsFocusMap$ - An observable of the current RTC call memberships and their associated focus.
* @param devices - The media devices to use for audio and video input. * @param devices - The media devices to use for audio and video input.
* @param muteStates - The mute states for audio and video. * @param muteStates - The mute states for audio and video.
* @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!. * @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!.
* @param trackerProcessorState$ - The processor state for the video track processor (e.g. background blur). * @param trackerProcessorState$ - The processor state for the video track processor (e.g. background blur).
*/ */
public constructor( public constructor(
focus: LivekitFocus, args: ConnectionOpts,
livekitAlias: string,
client: MatrixClient,
scope: ObservableScope,
membershipsFocusMap$: Behavior<
{ membership: CallMembership; focus: LivekitFocus }[]
>,
devices: MediaDevices, devices: MediaDevices,
private readonly muteStates: MuteStates, private readonly muteStates: MuteStates,
e2eeLivekitOptions: E2EEOptions | undefined, e2eeLivekitOptions: E2EEOptions | undefined,
trackerProcessorState$: Behavior<ProcessorState> trackerProcessorState$: Behavior<ProcessorState>
) { ) {
const { scope } = args;
logger.info("[LivekitRoom] Create LiveKit room"); logger.info("[LivekitRoom] Create LiveKit room");
const { controlledAudioDevices } = getUrlParams(); const { controlledAudioDevices } = getUrlParams();
@@ -112,17 +100,19 @@ export class PublishConnection extends Connection {
}); });
super( super(
focus, room,
livekitAlias, args,
client, // focus,
scope, // livekitAlias,
membershipsFocusMap$, // client,
e2eeLivekitOptions, // scope,
room // membershipsFocusMap$,
// e2eeLivekitOptions,
// room
); );
// Setup track processor syncing (blur) // Setup track processor syncing (blur)
const track$ = this.scope.behavior( const track$ = scope.behavior(
observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe( observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe(
map((trackRef) => { map((trackRef) => {
const track = trackRef?.publication?.track; const track = trackRef?.publication?.track;
@@ -148,7 +138,7 @@ export class PublishConnection extends Connection {
} }
return this.livekitRoom.localParticipant.isCameraEnabled; return this.livekitRoom.localParticipant.isCameraEnabled;
}); });
this.scope.onEnd(() => { scope.onEnd(() => {
this.muteStates.audio.unsetHandler(); this.muteStates.audio.unsetHandler();
this.muteStates.video.unsetHandler(); this.muteStates.video.unsetHandler();
}); });
@@ -157,7 +147,7 @@ export class PublishConnection extends Connection {
kind: MediaDeviceKind, kind: MediaDeviceKind,
selected$: Observable<SelectedDevice | undefined> selected$: Observable<SelectedDevice | undefined>
): Subscription => ): Subscription =>
selected$.pipe(this.scope.bind()).subscribe((device) => { selected$.pipe(scope.bind()).subscribe((device) => {
if (this.connectionState$.value !== ConnectionState.Connected) return; if (this.connectionState$.value !== ConnectionState.Connected) return;
logger.info( logger.info(
"[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :",
@@ -192,7 +182,7 @@ export class PublishConnection extends Connection {
devices.audioInput.selected$ devices.audioInput.selected$
.pipe( .pipe(
switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER), switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER),
this.scope.bind() scope.bind()
) )
.subscribe(() => { .subscribe(() => {
if (this.connectionState$.value !== ConnectionState.Connected) return; if (this.connectionState$.value !== ConnectionState.Connected) return;