Merge branch 'livekit' into local-remote-confusion
This commit is contained in:
@@ -12,7 +12,6 @@ import {
|
||||
} from "@livekit/components-core";
|
||||
import {
|
||||
ConnectionError,
|
||||
type ConnectionState as LivekitConenctionState,
|
||||
type Room as LivekitRoom,
|
||||
type RemoteParticipant,
|
||||
RoomEvent,
|
||||
@@ -29,8 +28,10 @@ import {
|
||||
import { type Behavior } from "../../Behavior.ts";
|
||||
import { type ObservableScope } from "../../ObservableScope.ts";
|
||||
import {
|
||||
ElementCallError,
|
||||
InsufficientCapacityError,
|
||||
SFURoomCreationRestrictedError,
|
||||
UnknownCallError,
|
||||
} from "../../../utils/errors.ts";
|
||||
|
||||
export interface ConnectionOpts {
|
||||
@@ -44,17 +45,30 @@ export interface ConnectionOpts {
|
||||
/** Optional factory to create the LiveKit room, mainly for testing purposes. */
|
||||
livekitRoomFactory: () => LivekitRoom;
|
||||
}
|
||||
export class FailedToStartError extends Error {
|
||||
public constructor(message: string) {
|
||||
super(message);
|
||||
this.name = "FailedToStartError";
|
||||
}
|
||||
}
|
||||
|
||||
export type ConnectionState =
|
||||
| { state: "Initialized" }
|
||||
| { state: "FetchingConfig" }
|
||||
| { state: "ConnectingToLkRoom" }
|
||||
| {
|
||||
state: "ConnectedToLkRoom";
|
||||
livekitConnectionState$: Behavior<LivekitConenctionState>;
|
||||
}
|
||||
| { state: "FailedToStart"; error: Error }
|
||||
| { state: "Stopped" };
|
||||
export enum ConnectionState {
|
||||
/** The start state of a connection. It has been created but nothing has loaded yet. */
|
||||
Initialized = "Initialized",
|
||||
/** `start` has been called on the connection. It aquires the jwt info to conenct to the LK Room */
|
||||
FetchingConfig = "FetchingConfig",
|
||||
Stopped = "Stopped",
|
||||
/** The same as ConnectionState.Disconnected from `livekit-client` */
|
||||
LivekitDisconnected = "disconnected",
|
||||
/** The same as ConnectionState.Connecting from `livekit-client` */
|
||||
LivekitConnecting = "connecting",
|
||||
/** The same as ConnectionState.Connected from `livekit-client` */
|
||||
LivekitConnected = "connected",
|
||||
/** The same as ConnectionState.Reconnecting from `livekit-client` */
|
||||
LivekitReconnecting = "reconnecting",
|
||||
/** The same as ConnectionState.SignalReconnecting from `livekit-client` */
|
||||
LivekitSignalReconnecting = "signalReconnecting",
|
||||
}
|
||||
|
||||
/**
|
||||
* A connection to a Matrix RTC LiveKit backend.
|
||||
@@ -63,14 +77,14 @@ export type ConnectionState =
|
||||
*/
|
||||
export class Connection {
|
||||
// Private Behavior
|
||||
private readonly _state$ = new BehaviorSubject<ConnectionState>({
|
||||
state: "Initialized",
|
||||
});
|
||||
private readonly _state$ = new BehaviorSubject<
|
||||
ConnectionState | ElementCallError
|
||||
>(ConnectionState.Initialized);
|
||||
|
||||
/**
|
||||
* The current state of the connection to the media transport.
|
||||
*/
|
||||
public readonly state$: Behavior<ConnectionState> = this._state$;
|
||||
public readonly state$: Behavior<ConnectionState | Error> = this._state$;
|
||||
|
||||
/**
|
||||
* The media transport to connect to.
|
||||
@@ -112,16 +126,24 @@ export class Connection {
|
||||
this.logger.debug("Starting Connection");
|
||||
this.stopped = false;
|
||||
try {
|
||||
this._state$.next({
|
||||
state: "FetchingConfig",
|
||||
});
|
||||
this._state$.next(ConnectionState.FetchingConfig);
|
||||
// We should already have this information after creating the localTransport.
|
||||
// It would probably be better to forward this here.
|
||||
const { url, jwt } = await this.getSFUConfigWithOpenID();
|
||||
// If we were stopped while fetching the config, don't proceed to connect
|
||||
if (this.stopped) return;
|
||||
|
||||
this._state$.next({
|
||||
state: "ConnectingToLkRoom",
|
||||
});
|
||||
// Setup observer once we are done with getSFUConfigWithOpenID
|
||||
connectionStateObserver(this.livekitRoom)
|
||||
.pipe(
|
||||
this.scope.bind(),
|
||||
map((s) => s as unknown as ConnectionState),
|
||||
)
|
||||
.subscribe((lkState) => {
|
||||
// It is save to cast lkState to ConnectionState as they are fully overlapping.
|
||||
this._state$.next(lkState);
|
||||
});
|
||||
|
||||
try {
|
||||
await this.livekitRoom.connect(url, jwt);
|
||||
} catch (e) {
|
||||
@@ -136,7 +158,8 @@ export class Connection {
|
||||
throw new InsufficientCapacityError();
|
||||
}
|
||||
if (e.status === 404) {
|
||||
// error msg is "Could not establish signal connection: requested room does not exist"
|
||||
// error msg is "Failed to create call"
|
||||
// error description is "Call creation might be restricted to authorized users only. Try again later, or contact your server admin if the problem persists."
|
||||
// The room does not exist. There are two different modes of operation for the SFU:
|
||||
// - the room is created on the fly when connecting (livekit `auto_create` option)
|
||||
// - Only authorized users can create rooms, so the room must exist before connecting (done by the auth jwt service)
|
||||
@@ -148,19 +171,16 @@ export class Connection {
|
||||
}
|
||||
// If we were stopped while connecting, don't proceed to update state.
|
||||
if (this.stopped) return;
|
||||
|
||||
this._state$.next({
|
||||
state: "ConnectedToLkRoom",
|
||||
livekitConnectionState$: this.scope.behavior(
|
||||
connectionStateObserver(this.livekitRoom),
|
||||
),
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.debug(`Failed to connect to LiveKit room: ${error}`);
|
||||
this._state$.next({
|
||||
state: "FailedToStart",
|
||||
error: error instanceof Error ? error : new Error(`${error}`),
|
||||
});
|
||||
this._state$.next(
|
||||
error instanceof ElementCallError
|
||||
? error
|
||||
: error instanceof Error
|
||||
? new UnknownCallError(error)
|
||||
: new UnknownCallError(new Error(`${error}`)),
|
||||
);
|
||||
// Its okay to ignore the throw. The error is part of the state.
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
@@ -185,9 +205,7 @@ export class Connection {
|
||||
);
|
||||
if (this.stopped) return;
|
||||
await this.livekitRoom.disconnect();
|
||||
this._state$.next({
|
||||
state: "Stopped",
|
||||
});
|
||||
this._state$.next(ConnectionState.Stopped);
|
||||
this.stopped = true;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user