2025-08-28 13:52:12 +02:00
/ *
2025-11-18 10:13:10 +01:00
Copyright 2025 Element Creations Ltd .
2025-08-28 13:52:12 +02:00
Copyright 2025 New Vector Ltd .
SPDX - License - Identifier : AGPL - 3.0 - only OR LicenseRef - Element - Commercial
Please see LICENSE in the repository root for full details .
* /
2025-10-07 16:24:02 +02:00
import {
connectedParticipantsObserver ,
connectionStateObserver ,
} from "@livekit/components-core" ;
import {
2025-10-14 10:46:57 +02:00
ConnectionError ,
2025-10-30 00:09:07 +01:00
type Room as LivekitRoom ,
2025-11-05 18:57:24 +01:00
type RemoteParticipant ,
2025-10-07 16:24:02 +02:00
} from "livekit-client" ;
2025-10-29 18:31:58 +01:00
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc" ;
2025-12-22 13:35:40 +01:00
import { BehaviorSubject , map } from "rxjs" ;
2025-10-28 21:18:47 +01:00
import { type Logger } from "matrix-js-sdk/lib/logger" ;
2025-08-28 13:52:12 +02:00
2025-10-07 16:24:02 +02:00
import {
getSFUConfigWithOpenID ,
type OpenIDClientParts ,
type SFUConfig ,
2025-11-07 08:44:44 +01:00
} from "../../../livekit/openIDSFU.ts" ;
import { type Behavior } from "../../Behavior.ts" ;
import { type ObservableScope } from "../../ObservableScope.ts" ;
2025-10-14 10:46:57 +02:00
import {
2025-12-10 21:14:13 +01:00
ElementCallError ,
2025-10-14 10:46:57 +02:00
InsufficientCapacityError ,
SFURoomCreationRestrictedError ,
2025-12-10 21:14:13 +01:00
UnknownCallError ,
2025-11-07 08:44:44 +01:00
} from "../../../utils/errors.ts" ;
2025-08-28 13:52:12 +02:00
2025-09-30 17:02:48 +02:00
export interface ConnectionOpts {
2025-10-14 14:38:37 -04:00
/** The media transport to connect to. */
2025-10-07 10:33:31 +02:00
transport : LivekitTransport ;
2025-09-30 17:02:48 +02:00
/** The Matrix client to use for OpenID and SFU config requests. */
client : OpenIDClientParts ;
/** The observable scope to use for this connection. */
scope : ObservableScope ;
2025-10-01 10:06:43 +02:00
2025-10-22 18:50:16 -04:00
/** Optional factory to create the LiveKit room, mainly for testing purposes. */
2025-10-30 00:09:07 +01:00
livekitRoomFactory : ( ) = > LivekitRoom ;
2025-09-30 17:02:48 +02:00
}
2025-12-05 19:48:02 +01:00
export class FailedToStartError extends Error {
public constructor ( message : string ) {
super ( message ) ;
this . name = "FailedToStartError" ;
}
}
2025-10-01 10:06:43 +02:00
2025-12-05 19:48:02 +01:00
export enum ConnectionState {
2025-12-09 15:23:30 +01:00
/** The start state of a connection. It has been created but nothing has loaded yet. */
2025-12-02 19:40:08 +01:00
Initialized = "Initialized" ,
2025-12-09 15:23:30 +01:00
/** `start` has been called on the connection. It aquires the jwt info to conenct to the LK Room */
2025-12-02 19:40:08 +01:00
FetchingConfig = "FetchingConfig" ,
Stopped = "Stopped" ,
2025-12-09 15:23:30 +01:00
/** The same as ConnectionState.Disconnected from `livekit-client` */
2025-12-05 19:48:02 +01:00
LivekitDisconnected = "disconnected" ,
2025-12-09 15:23:30 +01:00
/** The same as ConnectionState.Connecting from `livekit-client` */
2025-12-05 19:48:02 +01:00
LivekitConnecting = "connecting" ,
2025-12-09 15:23:30 +01:00
/** The same as ConnectionState.Connected from `livekit-client` */
2025-12-05 19:48:02 +01:00
LivekitConnected = "connected" ,
2025-12-09 15:23:30 +01:00
/** The same as ConnectionState.Reconnecting from `livekit-client` */
2025-12-05 19:48:02 +01:00
LivekitReconnecting = "reconnecting" ,
2025-12-09 15:23:30 +01:00
/** The same as ConnectionState.SignalReconnecting from `livekit-client` */
2025-12-05 19:48:02 +01:00
LivekitSignalReconnecting = "signalReconnecting" ,
2025-12-02 19:40:08 +01:00
}
2025-10-01 10:06:43 +02:00
2025-09-30 11:33:45 +02:00
/ * *
* A connection to a Matrix RTC LiveKit backend .
*
* Expose observables for participants and connection state .
* /
2025-08-28 13:52:12 +02:00
export class Connection {
2025-10-01 10:06:43 +02:00
// Private Behavior
2025-12-10 21:14:13 +01:00
private readonly _state $ = new BehaviorSubject <
ConnectionState | ElementCallError
> ( ConnectionState . Initialized ) ;
2025-10-01 10:06:43 +02:00
/ * *
2025-10-14 14:38:37 -04:00
* The current state of the connection to the media transport .
2025-10-01 10:06:43 +02:00
* /
2025-12-09 15:23:30 +01:00
public readonly state$ : Behavior < ConnectionState | Error > = this . _state $ ;
2025-10-01 16:39:21 +02:00
2025-11-25 20:18:34 +01:00
/ * *
* The media transport to connect to .
* /
public readonly transport : LivekitTransport ;
public readonly livekitRoom : LivekitRoom ;
2025-12-01 17:29:21 +01:00
private scope : ObservableScope ;
2025-11-25 20:18:34 +01:00
/ * *
2025-12-10 15:09:40 -05:00
* The remote LiveKit participants that are visible on this connection .
*
* Note that this may include participants that are connected only to
* subscribe , or publishers that are otherwise unattested in MatrixRTC state .
* It is therefore more low - level than what should be presented to the user .
2025-11-25 20:18:34 +01:00
* /
2025-12-10 15:09:40 -05:00
public readonly remoteParticipants$ : Behavior < RemoteParticipant [ ] > ;
2025-11-25 20:18:34 +01:00
2025-09-30 11:33:45 +02:00
/ * *
* Whether the connection has been stopped .
* @see Connection . stop
* * /
2025-08-28 17:45:14 +02:00
protected stopped = false ;
2025-08-28 13:52:12 +02:00
2025-09-30 11:33:45 +02:00
/ * *
* Starts the connection .
*
* This will :
* 1 . Request an OpenId token ` request_token ` ( allows matrix users to verify their identity with a third - party service . )
* 2 . Use this token to request the SFU config to the MatrixRtc authentication service .
* 3 . Connect to the configured LiveKit room .
2025-10-14 10:46:57 +02:00
*
2025-10-29 18:31:58 +01:00
* The errors are also represented as a state in the ` state $ ` observable .
* It is safe to ignore those errors and handle them accordingly via the ` state $ ` observable .
2025-10-14 10:46:57 +02:00
* @throws { InsufficientCapacityError } if the LiveKit server indicates that it has insufficient capacity to accept the connection .
* @throws { SFURoomCreationRestrictedError } if the LiveKit server indicates that the room does not exist and cannot be created .
2025-09-30 11:33:45 +02:00
* /
2025-10-29 18:31:58 +01:00
// TODO consider an autostart pattern...
2025-08-28 13:52:12 +02:00
public async start ( ) : Promise < void > {
2025-11-14 16:18:31 +01:00
this . logger . debug ( "Starting Connection" ) ;
2025-08-28 13:52:12 +02:00
this . stopped = false ;
2025-10-01 10:06:43 +02:00
try {
2025-12-05 19:48:02 +01:00
this . _state $ . next ( ConnectionState . FetchingConfig ) ;
2025-12-10 21:14:13 +01:00
// We should already have this information after creating the localTransport.
// It would probably be better to forward this here.
2025-10-01 10:06:43 +02:00
const { url , jwt } = await this . getSFUConfigWithOpenID ( ) ;
// If we were stopped while fetching the config, don't proceed to connect
if ( this . stopped ) return ;
2025-12-11 16:04:12 +01:00
// Setup observer once we are done with getSFUConfigWithOpenID
connectionStateObserver ( this . livekitRoom )
. pipe (
this . scope . bind ( ) ,
map ( ( s ) = > s as unknown as ConnectionState ) ,
)
. subscribe ( ( lkState ) = > {
// It is save to cast lkState to ConnectionState as they are fully overlapping.
this . _state $ . next ( lkState ) ;
} ) ;
2025-10-14 10:46:57 +02:00
try {
await this . livekitRoom . connect ( url , jwt ) ;
} catch ( e ) {
// LiveKit uses 503 to indicate that the server has hit its track limits.
// https://github.com/livekit/livekit/blob/fcb05e97c5a31812ecf0ca6f7efa57c485cea9fb/pkg/service/rtcservice.go#L171
// It also errors with a status code of 200 (yes, really) for room
// participant limits.
// LiveKit Cloud uses 429 for connection limits.
// Either way, all these errors can be explained as "insufficient capacity".
if ( e instanceof ConnectionError ) {
if ( e . status === 503 || e . status === 200 || e . status === 429 ) {
throw new InsufficientCapacityError ( ) ;
}
if ( e . status === 404 ) {
2025-12-10 18:50:19 +01:00
// error msg is "Failed to create call"
// error description is "Call creation might be restricted to authorized users only. Try again later, or contact your server admin if the problem persists."
2025-10-14 10:46:57 +02:00
// The room does not exist. There are two different modes of operation for the SFU:
// - the room is created on the fly when connecting (livekit `auto_create` option)
// - Only authorized users can create rooms, so the room must exist before connecting (done by the auth jwt service)
// In the first case there will not be a 404, so we are in the second case.
throw new SFURoomCreationRestrictedError ( ) ;
}
}
throw e ;
}
2025-10-01 10:06:43 +02:00
// If we were stopped while connecting, don't proceed to update state.
if ( this . stopped ) return ;
} catch ( error ) {
2025-11-14 16:18:31 +01:00
this . logger . debug ( ` Failed to connect to LiveKit room: ${ error } ` ) ;
2025-12-10 21:14:13 +01:00
this . _state $ . next (
2025-12-10 21:17:33 +01:00
error instanceof ElementCallError
? error
: error instanceof Error
? new UnknownCallError ( error )
: new UnknownCallError ( new Error ( ` ${ error } ` ) ) ,
2025-12-10 21:14:13 +01:00
) ;
2025-12-10 18:50:19 +01:00
// Its okay to ignore the throw. The error is part of the state.
2025-10-01 10:06:43 +02:00
throw error ;
}
2025-08-28 13:52:12 +02:00
}
2025-09-30 17:02:48 +02:00
protected async getSFUConfigWithOpenID ( ) : Promise < SFUConfig > {
return await getSFUConfigWithOpenID (
this . client ,
2025-10-14 14:34:51 -04:00
this . transport . livekit_service_url ,
this . transport . livekit_alias ,
2025-10-07 16:24:02 +02:00
) ;
2025-09-30 17:02:48 +02:00
}
2025-10-30 00:09:07 +01:00
2025-09-30 11:33:45 +02:00
/ * *
* Stops the connection .
*
* This will disconnect from the LiveKit room .
* If the connection is already stopped , this is a no - op .
* /
2025-10-01 16:39:21 +02:00
public async stop ( ) : Promise < void > {
2025-11-14 16:18:31 +01:00
this . logger . debug (
` Stopping connection to ${ this . transport . livekit_service_url } ` ,
) ;
2025-09-26 13:20:55 -04:00
if ( this . stopped ) return ;
2025-10-01 16:39:21 +02:00
await this . livekitRoom . disconnect ( ) ;
2025-12-09 15:23:30 +01:00
this . _state $ . next ( ConnectionState . Stopped ) ;
2025-08-28 13:52:12 +02:00
this . stopped = true ;
}
2025-09-30 17:02:48 +02:00
private readonly client : OpenIDClientParts ;
2025-11-14 16:18:31 +01:00
private readonly logger : Logger ;
2025-09-30 11:33:45 +02:00
/ * *
* Creates a new connection to a matrix RTC LiveKit backend .
*
2025-09-30 17:02:48 +02:00
* @param opts - Connection options { @link ConnectionOpts } .
*
2025-11-14 16:18:31 +01:00
* @param logger
2025-09-30 11:33:45 +02:00
* /
2025-11-14 16:18:31 +01:00
public constructor ( opts : ConnectionOpts , logger : Logger ) {
this . logger = logger . getChild ( "[Connection]" ) ;
this . logger . info (
2025-12-01 12:43:17 +01:00
` Creating new connection to ${ opts . transport . livekit_service_url } ${ opts . transport . livekit_alias } ` ,
2025-10-16 15:52:56 -04:00
) ;
2025-10-29 18:31:58 +01:00
const { transport , client , scope } = opts ;
2025-09-30 17:02:48 +02:00
2025-12-01 17:29:21 +01:00
this . scope = scope ;
2025-10-30 00:09:07 +01:00
this . livekitRoom = opts . livekitRoomFactory ( ) ;
2025-10-14 14:34:51 -04:00
this . transport = transport ;
2025-09-30 17:02:48 +02:00
this . client = client ;
2025-12-01 12:43:17 +01:00
this . remoteParticipants $ = scope . behavior (
2025-12-10 15:09:40 -05:00
// Only tracks remote participants
connectedParticipantsObserver ( this . livekitRoom ) ,
2025-08-28 13:52:12 +02:00
) ;
2025-10-01 10:06:43 +02:00
2025-11-14 16:18:31 +01:00
scope . onEnd ( ( ) = > {
this . logger . info ( ` Connection scope ended, stopping connection ` ) ;
void this . stop ( ) ;
} ) ;
2025-08-28 17:45:14 +02:00
}
2025-08-28 13:52:12 +02:00
}