Files
element-call/src/state/CallViewModel/remoteMembers/Connection.ts

233 lines
7.8 KiB
TypeScript
Raw Normal View History

/*
Copyright 2025 New Vector Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
2025-10-07 16:24:02 +02:00
import {
connectedParticipantsObserver,
connectionStateObserver,
} from "@livekit/components-core";
import {
ConnectionError,
2025-10-28 21:18:47 +01:00
type ConnectionState as LivekitConenctionState,
2025-10-30 00:09:07 +01:00
type Room as LivekitRoom,
type LocalParticipant,
type RemoteParticipant,
2025-10-29 18:31:58 +01:00
RoomEvent,
2025-10-07 16:24:02 +02:00
} from "livekit-client";
2025-10-29 18:31:58 +01:00
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
2025-11-10 15:55:01 +01:00
import { BehaviorSubject, map, type Observable } from "rxjs";
2025-10-28 21:18:47 +01:00
import { type Logger } from "matrix-js-sdk/lib/logger";
2025-10-07 16:24:02 +02:00
import {
getSFUConfigWithOpenID,
type OpenIDClientParts,
type SFUConfig,
} from "../../../livekit/openIDSFU.ts";
import { type Behavior } from "../../Behavior.ts";
import { type ObservableScope } from "../../ObservableScope.ts";
import {
InsufficientCapacityError,
SFURoomCreationRestrictedError,
} from "../../../utils/errors.ts";
export type PublishingParticipant = LocalParticipant | RemoteParticipant;
export interface ConnectionOpts {
/** The media transport to connect to. */
transport: LivekitTransport;
/** The Matrix client to use for OpenID and SFU config requests. */
client: OpenIDClientParts;
/** The observable scope to use for this connection. */
scope: ObservableScope;
/** Optional factory to create the LiveKit room, mainly for testing purposes. */
2025-10-30 00:09:07 +01:00
livekitRoomFactory: () => LivekitRoom;
}
2025-10-28 21:18:47 +01:00
export type ConnectionState =
2025-10-07 16:24:02 +02:00
| { state: "Initialized" }
| { state: "FetchingConfig"; transport: LivekitTransport }
| { state: "ConnectingToLkRoom"; transport: LivekitTransport }
| { state: "PublishingTracks"; transport: LivekitTransport }
| { state: "FailedToStart"; error: Error; transport: LivekitTransport }
2025-10-07 16:24:02 +02:00
| {
state: "ConnectedToLkRoom";
2025-10-28 21:18:47 +01:00
livekitConnectionState$: Observable<LivekitConenctionState>;
transport: LivekitTransport;
2025-10-07 16:24:02 +02:00
}
| { state: "Stopped"; transport: LivekitTransport };
/**
* A connection to a Matrix RTC LiveKit backend.
*
* Expose observables for participants and connection state.
*/
export class Connection {
// Private Behavior
2025-10-28 21:18:47 +01:00
private readonly _state$ = new BehaviorSubject<ConnectionState>({
state: "Initialized",
});
/**
* The current state of the connection to the media transport.
*/
2025-10-28 21:18:47 +01:00
public readonly state$: Behavior<ConnectionState> = this._state$;
2025-10-01 16:39:21 +02:00
/**
* Whether the connection has been stopped.
* @see Connection.stop
* */
protected stopped = false;
/**
* Starts the connection.
*
* This will:
* 1. Request an OpenId token `request_token` (allows matrix users to verify their identity with a third-party service.)
* 2. Use this token to request the SFU config to the MatrixRtc authentication service.
* 3. Connect to the configured LiveKit room.
*
2025-10-29 18:31:58 +01:00
* The errors are also represented as a state in the `state$` observable.
* It is safe to ignore those errors and handle them accordingly via the `state$` observable.
* @throws {InsufficientCapacityError} if the LiveKit server indicates that it has insufficient capacity to accept the connection.
* @throws {SFURoomCreationRestrictedError} if the LiveKit server indicates that the room does not exist and cannot be created.
*/
2025-10-29 18:31:58 +01:00
// TODO dont make this throw and instead store a connection error state in this class?
// TODO consider an autostart pattern...
public async start(): Promise<void> {
this.stopped = false;
try {
2025-10-28 21:18:47 +01:00
this._state$.next({
2025-10-07 16:24:02 +02:00
state: "FetchingConfig",
transport: this.transport,
2025-10-07 16:24:02 +02:00
});
const { url, jwt } = await this.getSFUConfigWithOpenID();
// If we were stopped while fetching the config, don't proceed to connect
if (this.stopped) return;
2025-10-28 21:18:47 +01:00
this._state$.next({
2025-10-07 16:24:02 +02:00
state: "ConnectingToLkRoom",
transport: this.transport,
2025-10-07 16:24:02 +02:00
});
try {
await this.livekitRoom.connect(url, jwt);
} catch (e) {
// LiveKit uses 503 to indicate that the server has hit its track limits.
// https://github.com/livekit/livekit/blob/fcb05e97c5a31812ecf0ca6f7efa57c485cea9fb/pkg/service/rtcservice.go#L171
// It also errors with a status code of 200 (yes, really) for room
// participant limits.
// LiveKit Cloud uses 429 for connection limits.
// Either way, all these errors can be explained as "insufficient capacity".
if (e instanceof ConnectionError) {
if (e.status === 503 || e.status === 200 || e.status === 429) {
throw new InsufficientCapacityError();
}
if (e.status === 404) {
// error msg is "Could not establish signal connection: requested room does not exist"
// The room does not exist. There are two different modes of operation for the SFU:
// - the room is created on the fly when connecting (livekit `auto_create` option)
// - Only authorized users can create rooms, so the room must exist before connecting (done by the auth jwt service)
// In the first case there will not be a 404, so we are in the second case.
throw new SFURoomCreationRestrictedError();
}
}
throw e;
}
// If we were stopped while connecting, don't proceed to update state.
if (this.stopped) return;
2025-10-28 21:18:47 +01:00
this._state$.next({
2025-10-07 16:24:02 +02:00
state: "ConnectedToLkRoom",
transport: this.transport,
2025-10-28 21:18:47 +01:00
livekitConnectionState$: connectionStateObserver(this.livekitRoom),
2025-10-07 16:24:02 +02:00
});
} catch (error) {
2025-10-28 21:18:47 +01:00
this._state$.next({
2025-10-07 16:24:02 +02:00
state: "FailedToStart",
error: error instanceof Error ? error : new Error(`${error}`),
transport: this.transport,
2025-10-07 16:24:02 +02:00
});
throw error;
}
}
protected async getSFUConfigWithOpenID(): Promise<SFUConfig> {
return await getSFUConfigWithOpenID(
this.client,
this.transport.livekit_service_url,
this.transport.livekit_alias,
2025-10-07 16:24:02 +02:00
);
}
2025-10-30 00:09:07 +01:00
/**
* Stops the connection.
*
* This will disconnect from the LiveKit room.
* If the connection is already stopped, this is a no-op.
*/
2025-10-01 16:39:21 +02:00
public async stop(): Promise<void> {
2025-09-26 13:20:55 -04:00
if (this.stopped) return;
2025-10-01 16:39:21 +02:00
await this.livekitRoom.disconnect();
2025-10-28 21:18:47 +01:00
this._state$.next({
2025-10-07 16:24:02 +02:00
state: "Stopped",
transport: this.transport,
2025-10-07 16:24:02 +02:00
});
this.stopped = true;
}
/**
* An observable of the participants that are publishing on this connection.
2025-10-08 18:17:42 -04:00
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
* It filters the participants to only those that are associated with a membership that claims to publish on this connection.
*/
2025-11-10 15:55:01 +01:00
public readonly participants$: Behavior<PublishingParticipant[]>;
/**
* The media transport to connect to.
*/
public readonly transport: LivekitTransport;
private readonly client: OpenIDClientParts;
2025-10-30 00:09:07 +01:00
public readonly livekitRoom: LivekitRoom;
/**
* Creates a new connection to a matrix RTC LiveKit backend.
*
* @param livekitRoom - LiveKit room instance to use.
* @param opts - Connection options {@link ConnectionOpts}.
*
*/
2025-10-30 00:09:07 +01:00
public constructor(opts: ConnectionOpts, logger?: Logger) {
2025-10-28 21:18:47 +01:00
logger?.info(
`[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
);
2025-10-29 18:31:58 +01:00
const { transport, client, scope } = opts;
2025-10-30 00:09:07 +01:00
this.livekitRoom = opts.livekitRoomFactory();
this.transport = transport;
this.client = client;
2025-11-10 15:55:01 +01:00
this.participants$ = scope.behavior(
// only tracks remote participants
2025-11-04 17:13:28 +01:00
connectedParticipantsObserver(this.livekitRoom, {
additionalRoomEvents: [
RoomEvent.TrackPublished,
RoomEvent.TrackUnpublished,
],
2025-11-10 15:55:01 +01:00
}).pipe(
map((participants) => [
this.livekitRoom.localParticipant,
...participants,
]),
),
2025-10-07 16:24:02 +02:00
[],
);
2025-10-01 16:39:21 +02:00
scope.onEnd(() => void this.stop());
}
}