lots of fixup in the new classes

This commit is contained in:
Timo K
2025-10-30 00:09:07 +01:00
parent 633a0f9290
commit 6b513534f1
6 changed files with 256 additions and 206 deletions

View File

@@ -16,16 +16,21 @@ import { BehaviorSubject, combineLatest, map, switchMap } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import {
type E2EEOptions,
type Room as LivekitRoom,
Room as LivekitRoom,
type Participant as LivekitParticipant,
type RoomOptions,
} from "livekit-client";
import { type MatrixClient } from "matrix-js-sdk";
import { type Behavior } from "../Behavior";
import { type Connection, RemoteConnection } from "./Connection";
import { Connection } from "./Connection";
import { type ObservableScope } from "../ObservableScope";
import { generateKeyed$ } from "../../utils/observable";
import { areLivekitTransportsEqual } from "./matrixLivekitMerger";
import { getUrlParams } from "../../UrlParams";
import { type ProcessorState } from "../../livekit/TrackProcessorContext";
import { type MediaDevices } from "../MediaDevices";
import { defaultLiveKitOptions } from "../../livekit/options";
export type ParticipantByMemberIdMap = Map<
ParticipantId,
@@ -33,25 +38,57 @@ export type ParticipantByMemberIdMap = Map<
// multiple times to several livekit rooms.
{ participant: LivekitParticipant; connection: Connection }[]
>;
// - write test for scopes (do we really need to bind scope)
// TODO - write test for scopes (do we really need to bind scope)
export class ConnectionManager {
/**
* The transport to use for publishing.
* This extends the list of tranports
*/
private publishTransport$ = new BehaviorSubject<LivekitTransport | undefined>(
undefined,
);
private livekitRoomFactory: () => LivekitRoom;
public constructor(
private client: MatrixClient,
private scope: ObservableScope,
private devices: MediaDevices,
private processorState: ProcessorState,
private e2eeLivekitOptions$: Behavior<E2EEOptions | undefined>,
private logger?: Logger,
livekitRoomFactory?: () => LivekitRoom,
) {
this.scope = scope;
const defaultFactory = (): LivekitRoom =>
new LivekitRoom(
generateRoomOption(
this.devices,
this.processorState,
this.e2eeLivekitOptions$.value,
),
);
this.livekitRoomFactory = livekitRoomFactory ?? defaultFactory;
}
private transportSubscriptions$ = new BehaviorSubject<
/**
* A list of Behaviors each containing a LIST of LivekitTransport.
* Each of these behaviors can be interpreted as subscribed list of transports.
*
* Using `registerTransports` independent external modules can control what connections
* are created by the ConnectionManager.
*
* The connection manager will remove all duplicate transports in each subscibed list.
*
* See `unregisterAllTransports` and `unregisterTransport` for details on how to unsubscribe.
*/
private readonly transportsSubscriptions$ = new BehaviorSubject<
Behavior<LivekitTransport[]>[]
>([]);
private transports$ = this.scope.behavior(
this.transportSubscriptions$.pipe(
/**
* All transports currently managed by the ConnectionManager.
*
* This list does not include duplicate transports.
*
* It is build based on the list of subscribed transports (`transportsSubscriptions$`).
* externally this is modified via `registerTransports()`.
*/
private readonly transports$ = this.scope.behavior(
this.transportsSubscriptions$.pipe(
switchMap((subscriptions) =>
combineLatest(subscriptions.map((s) => s.transports)).pipe(
combineLatest(subscriptions).pipe(
map((transportsNested) => transportsNested.flat()),
map(removeDuplicateTransports),
),
@@ -59,24 +96,6 @@ export class ConnectionManager {
),
);
public constructor(
private client: MatrixClient,
private e2eeLivekitOptions: () => E2EEOptions | undefined,
private scope: ObservableScope,
private logger?: Logger,
private livekitRoomFactory?: () => LivekitRoom,
) {
this.scope = scope;
}
public getOrCreatePublishConnection(
transport: LivekitTransport,
): Connection | undefined {
this.publishTransport$.next(transport);
const equalsRequestedTransport = (c: Connection): boolean =>
areLivekitTransportsEqual(c.transport, transport);
return this.connections$.value.find(equalsRequestedTransport);
}
/**
* Connections for each transport in use by one or more session members.
*/
@@ -87,16 +106,16 @@ export class ConnectionManager {
const createConnection =
(
transport: LivekitTransport,
): ((scope: ObservableScope) => RemoteConnection) =>
): ((scope: ObservableScope) => Connection) =>
(scope) => {
const connection = new RemoteConnection(
const connection = new Connection(
{
transport,
client: this.client,
scope: scope,
livekitRoomFactory: this.livekitRoomFactory,
},
this.e2eeLivekitOptions(),
this.logger,
);
void connection.start();
return connection;
@@ -114,15 +133,23 @@ export class ConnectionManager {
);
/**
* Add an a Behavior containing a list of transports to this ConnectionManager.
*
* @param transports$
* The intended usage is:
* - create a ConnectionManager
* - register one `transports$` behavior using registerTransports
* - add new connections to the `ConnectionManager` by updating the `transports$` behavior
* - remove a single connection by removing the transport.
* - remove this subscription by calling `unregisterTransports` and passing
* the same `transports$` behavior reference.
* @param transports$ The Behavior containing a list of transports to subscribe to.
*/
public registerTransports(
transports$: Behavior<LivekitTransport[]>,
): Connection[] {
if (!this.transportSubscriptions$.value.some((t$) => t$ === transports$)) {
this.transportSubscriptions$.next(
this.transportSubscriptions$.value.concat(transports$),
if (!this.transportsSubscriptions$.value.some((t$) => t$ === transports$)) {
this.transportsSubscriptions$.next(
this.transportsSubscriptions$.value.concat(transports$),
);
}
// After updating the subscriptions our connection list is also updated.
@@ -135,22 +162,30 @@ export class ConnectionManager {
.filter((c) => c !== undefined);
}
/**
* Unsubscribe from the given transports.
* @param transports$ The behavior to unsubscribe from
* @returns
*/
public unregisterTransports(
transports$: Behavior<LivekitTransport[]>,
): boolean {
const subscriptions = this.transportSubscriptions$.value;
const subscriptions = this.transportsSubscriptions$.value;
const subscriptionsUnregistered = subscriptions.filter(
(t$) => t$ !== transports$,
);
const canUnregister =
subscriptions.length !== subscriptionsUnregistered.length;
if (canUnregister)
this.transportSubscriptions$.next(subscriptionsUnregistered);
this.transportsSubscriptions$.next(subscriptionsUnregistered);
return canUnregister;
}
/**
* Unsubscribe from all transports.
*/
public unregisterAllTransports(): void {
this.transportSubscriptions$.next([]);
this.transportsSubscriptions$.next([]);
}
// We have a lost of connections, for each of these these
@@ -161,7 +196,7 @@ export class ConnectionManager {
switchMap((connections) => {
const listsOfParticipantWithConnection = connections.map(
(connection) => {
return connection.participantsWithPublishTrack$.pipe(
return connection.participantsWithTrack$.pipe(
map((participants) =>
participants.map((p) => ({
participant: p,
@@ -178,7 +213,13 @@ export class ConnectionManager {
),
);
// Filters the livekit participants
/**
* This field makes the connection manager to behave as close to a single SFU as possible.
* Each participant that is found on all connections managed by the manager will be listed.
*
* They are stored an a map keyed by `participant.identity`
* (which is equivalent to the `member.id` field in the `m.rtc.member` event)
*/
public allParticipantsByMemberId$ = this.scope.behavior(
this.allParticipantsWithConnection$.pipe(
map((participantsWithConnections) => {
@@ -191,10 +232,10 @@ export class ConnectionManager {
acc.set(participant.identity, [{ connection, participant }]);
} else {
// already known
// This is user is publishing on several SFUs
// This is for users publishing on several SFUs
currentVal.push({ connection, participant });
this.logger?.info(
`Participant ${participant.identity} is publishing on several SFUs ${currentVal.join()}`,
`Participant ${participant.identity} is publishing on several SFUs ${currentVal.map((v) => v.connection.transport.livekit_service_url).join(", ")}`,
);
}
}
@@ -217,3 +258,37 @@ function removeDuplicateTransports(
return acc;
}, [] as LivekitTransport[]);
}
/**
* Generate the initial LiveKit RoomOptions based on the current media devices and processor state.
*/
function generateRoomOption(
devices: MediaDevices,
processorState: ProcessorState,
e2eeLivekitOptions: E2EEOptions | undefined,
): RoomOptions {
const { controlledAudioDevices } = getUrlParams();
return {
...defaultLiveKitOptions,
videoCaptureDefaults: {
...defaultLiveKitOptions.videoCaptureDefaults,
deviceId: devices.videoInput.selected$.value?.id,
processor: processorState.processor,
},
audioCaptureDefaults: {
...defaultLiveKitOptions.audioCaptureDefaults,
deviceId: devices.audioInput.selected$.value?.id,
},
audioOutput: {
// When using controlled audio devices, we don't want to set the
// deviceId here, because it will be set by the native app.
// (also the id does not need to match a browser device id)
deviceId: controlledAudioDevices
? undefined
: devices.audioOutput.selected$.value?.id,
},
e2ee: e2eeLivekitOptions,
// TODO test and consider this:
// webAudioMix: true,
};
}