2025-10-29 18:31:58 +01:00
|
|
|
/*
|
2025-11-03 13:18:21 +01:00
|
|
|
Copyright 2025 Element Creations Ltd.
|
2025-10-29 18:31:58 +01:00
|
|
|
Copyright 2025 New Vector Ltd.
|
|
|
|
|
|
|
|
|
|
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
|
|
|
|
Please see LICENSE in the repository root for full details.
|
|
|
|
|
*/
|
|
|
|
|
|
2025-12-15 18:23:30 +01:00
|
|
|
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
|
2025-12-29 17:38:54 +01:00
|
|
|
import { combineLatest, map, of, switchMap } from "rxjs";
|
2025-11-14 16:18:31 +01:00
|
|
|
import { type Logger } from "matrix-js-sdk/lib/logger";
|
2025-12-08 23:01:44 -05:00
|
|
|
import { type RemoteParticipant } from "livekit-client";
|
2025-12-17 09:53:49 +01:00
|
|
|
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
|
2025-10-29 18:31:58 +01:00
|
|
|
|
2025-12-29 17:38:54 +01:00
|
|
|
import { constant, type Behavior } from "../../Behavior.ts";
|
2025-11-07 08:44:44 +01:00
|
|
|
import { type Connection } from "./Connection.ts";
|
|
|
|
|
import { Epoch, type ObservableScope } from "../../ObservableScope.ts";
|
2025-11-07 17:36:16 -05:00
|
|
|
import { generateItemsWithEpoch } from "../../../utils/observable.ts";
|
2025-11-06 15:26:17 +01:00
|
|
|
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
|
2025-11-03 13:18:21 +01:00
|
|
|
import { type ConnectionFactory } from "./ConnectionFactory.ts";
|
|
|
|
|
|
|
|
|
|
export class ConnectionManagerData {
|
2025-12-22 13:35:40 +01:00
|
|
|
private readonly store: Map<
|
|
|
|
|
string,
|
|
|
|
|
{ connection: Connection; participants: RemoteParticipant[] }
|
|
|
|
|
> = new Map();
|
2025-11-03 13:18:21 +01:00
|
|
|
|
|
|
|
|
public constructor() {}
|
|
|
|
|
|
2025-12-08 23:01:44 -05:00
|
|
|
public add(connection: Connection, participants: RemoteParticipant[]): void {
|
2025-11-03 13:18:21 +01:00
|
|
|
const key = this.getKey(connection.transport);
|
|
|
|
|
const existing = this.store.get(key);
|
|
|
|
|
if (!existing) {
|
2025-12-01 12:43:17 +01:00
|
|
|
this.store.set(key, { connection, participants });
|
2025-11-03 13:18:21 +01:00
|
|
|
} else {
|
2025-12-01 12:43:17 +01:00
|
|
|
existing.participants.push(...participants);
|
2025-11-03 13:18:21 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private getKey(transport: LivekitTransport): string {
|
|
|
|
|
return transport.livekit_service_url + "|" + transport.livekit_alias;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public getConnections(): Connection[] {
|
2025-12-01 12:43:17 +01:00
|
|
|
return Array.from(this.store.values()).map(({ connection }) => connection);
|
2025-11-03 13:18:21 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public getConnectionForTransport(
|
|
|
|
|
transport: LivekitTransport,
|
2025-11-12 12:09:31 +01:00
|
|
|
): Connection | null {
|
2025-12-01 12:43:17 +01:00
|
|
|
return this.store.get(this.getKey(transport))?.connection ?? null;
|
2025-11-03 13:18:21 +01:00
|
|
|
}
|
|
|
|
|
|
2025-12-01 12:43:17 +01:00
|
|
|
public getParticipantsForTransport(
|
2025-11-03 13:18:21 +01:00
|
|
|
transport: LivekitTransport,
|
2025-12-08 23:01:44 -05:00
|
|
|
): RemoteParticipant[] {
|
2025-11-03 13:18:21 +01:00
|
|
|
const key = transport.livekit_service_url + "|" + transport.livekit_alias;
|
|
|
|
|
const existing = this.store.get(key);
|
|
|
|
|
if (existing) {
|
2025-12-01 12:43:17 +01:00
|
|
|
return existing.participants;
|
2025-11-03 13:18:21 +01:00
|
|
|
}
|
|
|
|
|
return [];
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-12-15 18:23:30 +01:00
|
|
|
|
2025-11-05 18:57:24 +01:00
|
|
|
interface Props {
|
|
|
|
|
scope: ObservableScope;
|
|
|
|
|
connectionFactory: ConnectionFactory;
|
2025-12-29 17:38:54 +01:00
|
|
|
localTransport$: Behavior<LivekitTransport | null>;
|
|
|
|
|
remoteTransports$: Behavior<Epoch<LivekitTransport[]>>;
|
|
|
|
|
forceOldJwtEndpointForLocalTransport$?: Behavior<boolean>;
|
2025-11-14 16:18:31 +01:00
|
|
|
logger: Logger;
|
2025-12-17 09:53:49 +01:00
|
|
|
ownMembershipIdentity: CallMembershipIdentityParts;
|
2025-11-05 18:57:24 +01:00
|
|
|
}
|
2025-12-10 12:55:52 -05:00
|
|
|
|
2025-10-30 00:09:07 +01:00
|
|
|
// TODO - write test for scopes (do we really need to bind scope)
|
2025-11-06 15:26:17 +01:00
|
|
|
export interface IConnectionManager {
|
2025-11-06 21:54:34 +01:00
|
|
|
connectionManagerData$: Behavior<Epoch<ConnectionManagerData>>;
|
2025-11-06 12:08:46 +01:00
|
|
|
}
|
2025-12-10 12:55:52 -05:00
|
|
|
|
2025-11-05 18:57:24 +01:00
|
|
|
/**
|
|
|
|
|
* Crete a `ConnectionManager`
|
2025-12-30 17:02:44 +01:00
|
|
|
* @param props - Configuration object
|
|
|
|
|
* @param props.scope - The observable scope used by this object
|
|
|
|
|
* @param props.connectionFactory - Used to create new connections
|
2026-01-05 22:20:19 +01:00
|
|
|
* @param props.localTransport$ - The local transport to use. (deduplicated with remoteTransports$)
|
|
|
|
|
* @param props.remoteTransports$ - All other transports. The connection manager will create connections for each transport. (deduplicated with localTransport$)
|
|
|
|
|
* @param props.ownMembershipIdentity - The own membership identity to use.
|
|
|
|
|
* @param props.logger - The logger to use.
|
|
|
|
|
* @param props.forceOldJwtEndpointForLocalTransport$ - Use the old JWT endpoint independent of what the sfu supports. Only applies for localTransport$.
|
|
|
|
|
*
|
2025-11-05 18:57:24 +01:00
|
|
|
* Each of these behaviors can be interpreted as subscribed list of transports.
|
|
|
|
|
*
|
|
|
|
|
* Using `registerTransports` independent external modules can control what connections
|
|
|
|
|
* are created by the ConnectionManager.
|
|
|
|
|
*
|
|
|
|
|
* The connection manager will remove all duplicate transports in each subscibed list.
|
|
|
|
|
*
|
|
|
|
|
* See `unregisterAllTransports` and `unregisterTransport` for details on how to unsubscribe.
|
|
|
|
|
*/
|
|
|
|
|
export function createConnectionManager$({
|
|
|
|
|
scope,
|
|
|
|
|
connectionFactory,
|
2025-12-29 17:38:54 +01:00
|
|
|
localTransport$,
|
|
|
|
|
remoteTransports$,
|
|
|
|
|
forceOldJwtEndpointForLocalTransport$ = constant(false),
|
2025-11-14 16:18:31 +01:00
|
|
|
logger: parentLogger,
|
2025-12-17 09:53:49 +01:00
|
|
|
ownMembershipIdentity,
|
2025-11-06 15:26:17 +01:00
|
|
|
}: Props): IConnectionManager {
|
2025-11-14 16:18:31 +01:00
|
|
|
const logger = parentLogger.getChild("[ConnectionManager]");
|
2025-11-05 18:57:24 +01:00
|
|
|
// TODO logger: only construct one logger from the client and make it compatible via a EC specific sing
|
2025-10-30 00:09:07 +01:00
|
|
|
|
2025-12-29 17:38:54 +01:00
|
|
|
const allInputTransports$ = combineLatest([
|
|
|
|
|
localTransport$,
|
|
|
|
|
remoteTransports$,
|
|
|
|
|
]).pipe(
|
|
|
|
|
map(([localTransport, transports]) => {
|
|
|
|
|
const localTransportAsArray = localTransport ? [localTransport] : [];
|
|
|
|
|
return transports.mapInner((transports) => [
|
|
|
|
|
...localTransportAsArray,
|
|
|
|
|
...transports,
|
|
|
|
|
]);
|
|
|
|
|
}),
|
|
|
|
|
map((transports) => transports.mapInner(removeDuplicateTransports)),
|
|
|
|
|
);
|
|
|
|
|
|
2025-10-30 00:09:07 +01:00
|
|
|
/**
|
|
|
|
|
* All transports currently managed by the ConnectionManager.
|
|
|
|
|
*
|
|
|
|
|
* This list does not include duplicate transports.
|
|
|
|
|
*
|
|
|
|
|
* It is build based on the list of subscribed transports (`transportsSubscriptions$`).
|
|
|
|
|
* externally this is modified via `registerTransports()`.
|
|
|
|
|
*/
|
2025-12-29 17:38:54 +01:00
|
|
|
const transportsWithJwtTag$ = scope.behavior(
|
|
|
|
|
combineLatest([
|
|
|
|
|
allInputTransports$,
|
|
|
|
|
localTransport$,
|
|
|
|
|
forceOldJwtEndpointForLocalTransport$,
|
|
|
|
|
]).pipe(
|
|
|
|
|
map(
|
|
|
|
|
([
|
|
|
|
|
transports,
|
|
|
|
|
localTransport,
|
|
|
|
|
forceOldJwtEndpointForLocalTransport,
|
|
|
|
|
]) => {
|
2026-01-07 13:26:37 +01:00
|
|
|
// modify only the local transport with forceOldJwtEndpointForLocalTransport
|
2025-12-29 17:38:54 +01:00
|
|
|
const index = transports.value.findIndex((t) =>
|
|
|
|
|
areLivekitTransportsEqual(localTransport, t),
|
|
|
|
|
);
|
2026-01-07 13:26:37 +01:00
|
|
|
if (index !== -1) {
|
|
|
|
|
transports.value[index].forceOldJwtEndpoint =
|
|
|
|
|
forceOldJwtEndpointForLocalTransport;
|
|
|
|
|
}
|
2025-12-29 17:38:54 +01:00
|
|
|
logger.trace(
|
|
|
|
|
`Managing transports: ${transports.value.map((t) => t.livekit_service_url).join(", ")}`,
|
|
|
|
|
);
|
|
|
|
|
return transports as Epoch<
|
|
|
|
|
(LivekitTransport & { forceOldJwtEndpoint?: boolean })[]
|
|
|
|
|
>;
|
|
|
|
|
},
|
|
|
|
|
),
|
2025-10-29 18:31:58 +01:00
|
|
|
),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Connections for each transport in use by one or more session members.
|
|
|
|
|
*/
|
2025-11-05 18:57:24 +01:00
|
|
|
const connections$ = scope.behavior(
|
2025-12-29 17:38:54 +01:00
|
|
|
transportsWithJwtTag$.pipe(
|
2025-11-07 17:36:16 -05:00
|
|
|
generateItemsWithEpoch(
|
|
|
|
|
function* (transports) {
|
|
|
|
|
for (const transport of transports)
|
|
|
|
|
yield {
|
2025-12-17 09:53:49 +01:00
|
|
|
keys: [
|
|
|
|
|
transport.livekit_service_url,
|
|
|
|
|
transport.livekit_alias,
|
2025-12-29 17:38:54 +01:00
|
|
|
transport.forceOldJwtEndpoint,
|
2025-12-17 09:53:49 +01:00
|
|
|
],
|
2025-11-07 17:36:16 -05:00
|
|
|
data: undefined,
|
|
|
|
|
};
|
|
|
|
|
},
|
2025-12-29 17:38:54 +01:00
|
|
|
(scope, _data$, serviceUrl, alias, forceOldJwtEndpoint) => {
|
2025-11-14 16:18:31 +01:00
|
|
|
logger.debug(`Creating connection to ${serviceUrl} (${alias})`);
|
2025-11-07 17:36:16 -05:00
|
|
|
const connection = connectionFactory.createConnection(
|
|
|
|
|
{
|
|
|
|
|
type: "livekit",
|
|
|
|
|
livekit_service_url: serviceUrl,
|
|
|
|
|
livekit_alias: alias,
|
|
|
|
|
},
|
|
|
|
|
scope,
|
2025-12-17 09:53:49 +01:00
|
|
|
ownMembershipIdentity,
|
2026-01-05 22:20:19 +01:00
|
|
|
logger,
|
2025-12-29 17:38:54 +01:00
|
|
|
forceOldJwtEndpoint,
|
2025-11-07 17:36:16 -05:00
|
|
|
);
|
|
|
|
|
// Start the connection immediately
|
|
|
|
|
// Use connection state to track connection progress
|
|
|
|
|
void connection.start();
|
|
|
|
|
// TODO subscribe to connection state to retry or log issues?
|
|
|
|
|
return connection;
|
|
|
|
|
},
|
|
|
|
|
),
|
2025-10-29 18:31:58 +01:00
|
|
|
),
|
|
|
|
|
);
|
|
|
|
|
|
2025-11-06 21:54:34 +01:00
|
|
|
const connectionManagerData$ = scope.behavior(
|
|
|
|
|
connections$.pipe(
|
|
|
|
|
switchMap((connections) => {
|
|
|
|
|
const epoch = connections.epoch;
|
|
|
|
|
|
|
|
|
|
// Map the connections to list of {connection, participants}[]
|
2025-12-08 23:01:44 -05:00
|
|
|
const listOfConnectionsWithRemoteParticipants = connections.value.map(
|
|
|
|
|
(connection) => {
|
2025-12-10 15:09:40 -05:00
|
|
|
return connection.remoteParticipants$.pipe(
|
2025-11-06 21:54:34 +01:00
|
|
|
map((participants) => ({
|
|
|
|
|
connection,
|
|
|
|
|
participants,
|
|
|
|
|
})),
|
|
|
|
|
);
|
2025-12-08 23:01:44 -05:00
|
|
|
},
|
|
|
|
|
);
|
2025-11-06 21:54:34 +01:00
|
|
|
|
2025-11-07 12:32:29 +01:00
|
|
|
// probably not required
|
2025-12-22 13:35:40 +01:00
|
|
|
|
2025-12-08 23:01:44 -05:00
|
|
|
if (listOfConnectionsWithRemoteParticipants.length === 0) {
|
2025-11-07 12:32:29 +01:00
|
|
|
return of(new Epoch(new ConnectionManagerData(), epoch));
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-06 21:54:34 +01:00
|
|
|
// combineLatest the several streams into a single stream with the ConnectionManagerData
|
2025-12-08 23:01:44 -05:00
|
|
|
return combineLatest(listOfConnectionsWithRemoteParticipants).pipe(
|
2025-11-06 21:54:34 +01:00
|
|
|
map(
|
|
|
|
|
(lists) =>
|
|
|
|
|
new Epoch(
|
|
|
|
|
lists.reduce((data, { connection, participants }) => {
|
|
|
|
|
data.add(connection, participants);
|
|
|
|
|
return data;
|
|
|
|
|
}, new ConnectionManagerData()),
|
|
|
|
|
epoch,
|
|
|
|
|
),
|
|
|
|
|
),
|
|
|
|
|
);
|
|
|
|
|
}),
|
|
|
|
|
),
|
2025-12-15 18:23:30 +01:00
|
|
|
new Epoch(new ConnectionManagerData(), -1),
|
2025-11-06 21:54:34 +01:00
|
|
|
);
|
|
|
|
|
|
2025-11-25 20:18:34 +01:00
|
|
|
return { connectionManagerData$ };
|
2025-10-29 18:31:58 +01:00
|
|
|
}
|
2025-11-03 13:18:21 +01:00
|
|
|
|
2025-12-17 09:53:49 +01:00
|
|
|
function removeDuplicateTransports<T extends LivekitTransport>(
|
|
|
|
|
transports: T[],
|
|
|
|
|
): T[] {
|
2025-10-29 18:31:58 +01:00
|
|
|
return transports.reduce((acc, transport) => {
|
|
|
|
|
if (!acc.some((t) => areLivekitTransportsEqual(t, transport)))
|
|
|
|
|
acc.push(transport);
|
|
|
|
|
return acc;
|
2025-12-17 09:53:49 +01:00
|
|
|
}, [] as T[]);
|
2025-10-29 18:31:58 +01:00
|
|
|
}
|