Merge branch 'livekit' into valere/noise_cancellation

This commit is contained in:
Valere
2025-12-02 17:44:06 +01:00
31 changed files with 1178 additions and 456 deletions

View File

@@ -193,7 +193,7 @@ describe("Start connection states", () => {
capturedState = capturedStates.pop();
if (capturedState!.state === "FailedToStart") {
expect(capturedState!.error.message).toEqual("Something went wrong");
expect(capturedState!.transport.livekit_alias).toEqual(
expect(connection.transport.livekit_alias).toEqual(
livekitFocus.livekit_alias,
);
} else {
@@ -249,7 +249,7 @@ describe("Start connection states", () => {
expect(capturedState?.error.message).toContain(
"SFU Config fetch failed with exception Error",
);
expect(capturedState?.transport.livekit_alias).toEqual(
expect(connection.transport.livekit_alias).toEqual(
livekitFocus.livekit_alias,
);
} else {
@@ -313,7 +313,7 @@ describe("Start connection states", () => {
expect(capturedState.error.message).toContain(
"Failed to connect to livekit",
);
expect(capturedState.transport.livekit_alias).toEqual(
expect(connection.transport.livekit_alias).toEqual(
livekitFocus.livekit_alias,
);
} else {

View File

@@ -19,7 +19,7 @@ import {
RoomEvent,
} from "livekit-client";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, map, type Observable } from "rxjs";
import { BehaviorSubject, map } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import {
@@ -50,16 +50,14 @@ export interface ConnectionOpts {
export type ConnectionState =
| { state: "Initialized" }
| { state: "FetchingConfig"; transport: LivekitTransport }
| { state: "ConnectingToLkRoom"; transport: LivekitTransport }
| { state: "PublishingTracks"; transport: LivekitTransport }
| { state: "FailedToStart"; error: Error; transport: LivekitTransport }
| { state: "FetchingConfig" }
| { state: "ConnectingToLkRoom" }
| {
state: "ConnectedToLkRoom";
livekitConnectionState$: Observable<LivekitConenctionState>;
transport: LivekitTransport;
livekitConnectionState$: Behavior<LivekitConenctionState>;
}
| { state: "Stopped"; transport: LivekitTransport };
| { state: "FailedToStart"; error: Error }
| { state: "Stopped" };
/**
* A connection to a Matrix RTC LiveKit backend.
@@ -77,6 +75,24 @@ export class Connection {
*/
public readonly state$: Behavior<ConnectionState> = this._state$;
/**
* The media transport to connect to.
*/
public readonly transport: LivekitTransport;
public readonly livekitRoom: LivekitRoom;
private scope: ObservableScope;
/**
* An observable of the participants that are publishing on this connection. (Excluding our local participant)
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
* It filters the participants to only those that are associated with a membership that claims to publish on this connection.
*/
public readonly remoteParticipantsWithTracks$: Behavior<
PublishingParticipant[]
>;
/**
* Whether the connection has been stopped.
* @see Connection.stop
@@ -96,7 +112,6 @@ export class Connection {
* @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection.
* @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created.
*/
// TODO dont make this throw and instead store a connection error state in this class?
// TODO consider an autostart pattern...
public async start(): Promise<void> {
this.logger.debug("Starting Connection");
@@ -104,7 +119,6 @@ export class Connection {
try {
this._state$.next({
state: "FetchingConfig",
transport: this.transport,
});
const { url, jwt } = await this.getSFUConfigWithOpenID();
// If we were stopped while fetching the config, don't proceed to connect
@@ -112,7 +126,6 @@ export class Connection {
this._state$.next({
state: "ConnectingToLkRoom",
transport: this.transport,
});
try {
await this.livekitRoom.connect(url, jwt);
@@ -143,15 +156,15 @@ export class Connection {
this._state$.next({
state: "ConnectedToLkRoom",
transport: this.transport,
livekitConnectionState$: connectionStateObserver(this.livekitRoom),
livekitConnectionState$: this.scope.behavior(
connectionStateObserver(this.livekitRoom),
),
});
} catch (error) {
this.logger.debug(`Failed to connect to LiveKit room: ${error}`);
this._state$.next({
state: "FailedToStart",
error: error instanceof Error ? error : new Error(`${error}`),
transport: this.transport,
});
throw error;
}
@@ -179,28 +192,11 @@ export class Connection {
await this.livekitRoom.disconnect();
this._state$.next({
state: "Stopped",
transport: this.transport,
});
this.stopped = true;
}
/**
* An observable of the participants that are publishing on this connection. (Excluding our local participant)
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
* It filters the participants to only those that are associated with a membership that claims to publish on this connection.
*/
public readonly remoteParticipantsWithTracks$: Behavior<
PublishingParticipant[]
>;
/**
* The media transport to connect to.
*/
public readonly transport: LivekitTransport;
private readonly client: OpenIDClientParts;
public readonly livekitRoom: LivekitRoom;
private readonly logger: Logger;
/**
@@ -217,6 +213,7 @@ export class Connection {
);
const { transport, client, scope } = opts;
this.scope = scope;
this.livekitRoom = opts.livekitRoomFactory();
this.transport = transport;
this.client = client;

View File

@@ -92,7 +92,6 @@ interface Props {
}
// TODO - write test for scopes (do we really need to bind scope)
export interface IConnectionManager {
transports$: Behavior<Epoch<LivekitTransport[]>>;
connectionManagerData$: Behavior<Epoch<ConnectionManagerData>>;
}
/**
@@ -216,7 +215,7 @@ export function createConnectionManager$({
new Epoch(new ConnectionManagerData()),
);
return { transports$, connectionManagerData$ };
return { connectionManagerData$ };
}
function removeDuplicateTransports(