Connection: add Connection state and handle error on start
This commit is contained in:
@@ -470,12 +470,22 @@ export class CallViewModel extends ViewModel {
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
public readonly livekitConnectionState$ = this.scope.behavior(
|
public readonly livekitConnectionState$ =
|
||||||
combineLatest([this.localConnection]).pipe(
|
this.scope.behavior(
|
||||||
switchMap(([c]) => c.connectionState$),
|
from(this.localConnection).pipe(
|
||||||
startWith(ConnectionState.Disconnected),
|
switchMap((c) =>
|
||||||
),
|
c.focusedConnectionState$.pipe(
|
||||||
);
|
map((s) => {
|
||||||
|
if (s.state === "ConnectedToLkRoom") return s.connectionState;
|
||||||
|
return ConnectionState.Disconnected
|
||||||
|
}),
|
||||||
|
distinctUntilChanged(),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
startWith(ConnectionState.Disconnected),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The MatrixRTC session participants.
|
* The MatrixRTC session participants.
|
||||||
|
|||||||
@@ -6,9 +6,9 @@ Please see LICENSE in the repository root for full details.
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core";
|
import { connectedParticipantsObserver, connectionStateObserver } from "@livekit/components-core";
|
||||||
import { type ConnectionState, type E2EEOptions, Room as LivekitRoom } from "livekit-client";
|
import { type ConnectionState, type E2EEOptions, Room as LivekitRoom, type RoomOptions } from "livekit-client";
|
||||||
import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc";
|
import { type CallMembership, type LivekitFocus } from "matrix-js-sdk/lib/matrixrtc";
|
||||||
import { combineLatest } from "rxjs";
|
import { BehaviorSubject, combineLatest } from "rxjs";
|
||||||
|
|
||||||
import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig } from "../livekit/openIDSFU";
|
import { getSFUConfigWithOpenID, type OpenIDClientParts, type SFUConfig } from "../livekit/openIDSFU";
|
||||||
import { type Behavior } from "./Behavior";
|
import { type Behavior } from "./Behavior";
|
||||||
@@ -24,7 +24,20 @@ export interface ConnectionOpts {
|
|||||||
scope: ObservableScope;
|
scope: ObservableScope;
|
||||||
/** An observable of the current RTC call memberships and their associated focus. */
|
/** An observable of the current RTC call memberships and their associated focus. */
|
||||||
membershipsFocusMap$: Behavior<{ membership: CallMembership; focus: LivekitFocus }[]>;
|
membershipsFocusMap$: Behavior<{ membership: CallMembership; focus: LivekitFocus }[]>;
|
||||||
|
|
||||||
|
/** Optional factory to create the Livekit room, mainly for testing purposes. */
|
||||||
|
livekitRoomFactory?: (options?: RoomOptions) => LivekitRoom;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type FocusConnectionState =
|
||||||
|
| { state: 'Initialized' }
|
||||||
|
| { state: 'FetchingConfig', focus: LivekitFocus }
|
||||||
|
| { state: 'ConnectingToLkRoom', focus: LivekitFocus }
|
||||||
|
| { state: 'PublishingTracks', focus: LivekitFocus }
|
||||||
|
| { state: 'FailedToStart', error: Error, focus: LivekitFocus }
|
||||||
|
| { state: 'ConnectedToLkRoom', connectionState: ConnectionState, focus: LivekitFocus }
|
||||||
|
| { state: 'Stopped', focus: LivekitFocus };
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A connection to a Matrix RTC LiveKit backend.
|
* A connection to a Matrix RTC LiveKit backend.
|
||||||
*
|
*
|
||||||
@@ -32,6 +45,15 @@ export interface ConnectionOpts {
|
|||||||
*/
|
*/
|
||||||
export class Connection {
|
export class Connection {
|
||||||
|
|
||||||
|
// Private Behavior
|
||||||
|
private readonly _focusedConnectionState$ = new BehaviorSubject<FocusConnectionState>({ state: 'Initialized' });
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The current state of the connection to the focus server.
|
||||||
|
*/
|
||||||
|
public get focusedConnectionState$(): Behavior<FocusConnectionState> {
|
||||||
|
return this._focusedConnectionState$;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Whether the connection has been stopped.
|
* Whether the connection has been stopped.
|
||||||
* @see Connection.stop
|
* @see Connection.stop
|
||||||
@@ -48,10 +70,23 @@ export class Connection {
|
|||||||
*/
|
*/
|
||||||
public async start(): Promise<void> {
|
public async start(): Promise<void> {
|
||||||
this.stopped = false;
|
this.stopped = false;
|
||||||
// TODO could this be loaded earlier to save time?
|
try {
|
||||||
const { url, jwt } = await this.getSFUConfigWithOpenID();
|
this._focusedConnectionState$.next({ state: 'FetchingConfig', focus: this.targetFocus });
|
||||||
|
// TODO could this be loaded earlier to save time?
|
||||||
|
const { url, jwt } = await this.getSFUConfigWithOpenID();
|
||||||
|
// If we were stopped while fetching the config, don't proceed to connect
|
||||||
|
if (this.stopped) return;
|
||||||
|
|
||||||
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
|
this._focusedConnectionState$.next({ state: 'ConnectingToLkRoom', focus: this.targetFocus });
|
||||||
|
await this.livekitRoom.connect(url, jwt);
|
||||||
|
// If we were stopped while connecting, don't proceed to update state.
|
||||||
|
if (this.stopped) return;
|
||||||
|
|
||||||
|
this._focusedConnectionState$.next({ state: 'ConnectedToLkRoom', focus: this.targetFocus, connectionState: this.livekitRoom.state });
|
||||||
|
} catch (error) {
|
||||||
|
this._focusedConnectionState$.next({ state: 'FailedToStart', error: error instanceof Error ? error : new Error(`${error}`), focus: this.targetFocus });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -71,6 +106,7 @@ export class Connection {
|
|||||||
public stop(): void {
|
public stop(): void {
|
||||||
if (this.stopped) return;
|
if (this.stopped) return;
|
||||||
void this.livekitRoom.disconnect();
|
void this.livekitRoom.disconnect();
|
||||||
|
this._focusedConnectionState$.next({ state: 'Stopped', focus: this.targetFocus });
|
||||||
this.stopped = true;
|
this.stopped = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,13 +123,6 @@ export class Connection {
|
|||||||
*/
|
*/
|
||||||
protected readonly targetFocus: LivekitFocus;
|
protected readonly targetFocus: LivekitFocus;
|
||||||
|
|
||||||
/**
|
|
||||||
* An observable of the livekit connection state.
|
|
||||||
* Converts the livekit room events StateChange to an observable.
|
|
||||||
*/
|
|
||||||
public connectionState$: Behavior<ConnectionState>;
|
|
||||||
|
|
||||||
|
|
||||||
private readonly client: OpenIDClientParts;
|
private readonly client: OpenIDClientParts;
|
||||||
/**
|
/**
|
||||||
* Creates a new connection to a matrix RTC LiveKit backend.
|
* Creates a new connection to a matrix RTC LiveKit backend.
|
||||||
@@ -140,9 +169,16 @@ export class Connection {
|
|||||||
),
|
),
|
||||||
[]
|
[]
|
||||||
);
|
);
|
||||||
this.connectionState$ = scope.behavior<ConnectionState>(
|
|
||||||
|
scope.behavior<ConnectionState>(
|
||||||
connectionStateObserver(this.livekitRoom)
|
connectionStateObserver(this.livekitRoom)
|
||||||
);
|
).subscribe((connectionState) => {
|
||||||
|
const current = this.focusedConnectionState$.value;
|
||||||
|
// Only update the state if we are already connected to the LiveKit room.
|
||||||
|
if (current.state === 'ConnectedToLkRoom') {
|
||||||
|
this.focusedConnectionState$.next({ state: 'ConnectedToLkRoom', connectionState, focus: current.focus });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
scope.onEnd(() => this.stop());
|
scope.onEnd(() => this.stop());
|
||||||
}
|
}
|
||||||
@@ -162,7 +198,8 @@ export class RemoteConnection extends Connection {
|
|||||||
* @param sharedE2eeOption - The shared E2EE options to use for the connection.
|
* @param sharedE2eeOption - The shared E2EE options to use for the connection.
|
||||||
*/
|
*/
|
||||||
public constructor(opts: ConnectionOpts, sharedE2eeOption: E2EEOptions | undefined) {
|
public constructor(opts: ConnectionOpts, sharedE2eeOption: E2EEOptions | undefined) {
|
||||||
const livekitRoom = new LivekitRoom({
|
const factory = opts.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options));
|
||||||
|
const livekitRoom = factory({
|
||||||
...defaultLiveKitOptions,
|
...defaultLiveKitOptions,
|
||||||
e2ee: sharedE2eeOption
|
e2ee: sharedE2eeOption
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ Copyright 2025 New Vector Ltd.
|
|||||||
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||||
Please see LICENSE in the repository root for full details.
|
Please see LICENSE in the repository root for full details.
|
||||||
*/
|
*/
|
||||||
import { ConnectionState, type E2EEOptions, LocalVideoTrack, Room as LivekitRoom, Track } from "livekit-client";
|
import { ConnectionState, type E2EEOptions, LocalVideoTrack, Room as LivekitRoom, type RoomOptions, Track } from "livekit-client";
|
||||||
import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs";
|
import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs";
|
||||||
|
|
||||||
import type { Behavior } from "./Behavior.ts";
|
import type { Behavior } from "./Behavior.ts";
|
||||||
@@ -39,18 +39,20 @@ export class PublishConnection extends Connection {
|
|||||||
|
|
||||||
await super.start()
|
await super.start()
|
||||||
|
|
||||||
if (!this.stopped) {
|
if (this.stopped) return;
|
||||||
// TODO this can throw errors? It will also prompt for permissions if not already granted
|
|
||||||
const tracks = await this.livekitRoom.localParticipant.createTracks({
|
// TODO this can throw errors? It will also prompt for permissions if not already granted
|
||||||
audio: this.muteStates.audio.enabled$.value,
|
const tracks = await this.livekitRoom.localParticipant.createTracks({
|
||||||
video: this.muteStates.video.enabled$.value
|
audio: this.muteStates.audio.enabled$.value,
|
||||||
});
|
video: this.muteStates.video.enabled$.value
|
||||||
for (const track of tracks) {
|
});
|
||||||
// TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally
|
if (this.stopped) return;
|
||||||
// with a timeout.
|
for (const track of tracks) {
|
||||||
await this.livekitRoom.localParticipant.publishTrack(track);
|
// TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally
|
||||||
// TODO: check if the connection is still active? and break the loop if not?
|
// with a timeout.
|
||||||
}
|
await this.livekitRoom.localParticipant.publishTrack(track);
|
||||||
|
if (this.stopped) return;
|
||||||
|
// TODO: check if the connection is still active? and break the loop if not?
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -74,7 +76,8 @@ export class PublishConnection extends Connection {
|
|||||||
logger.info("[LivekitRoom] Create LiveKit room");
|
logger.info("[LivekitRoom] Create LiveKit room");
|
||||||
const { controlledAudioDevices } = getUrlParams();
|
const { controlledAudioDevices } = getUrlParams();
|
||||||
|
|
||||||
const room = new LivekitRoom({
|
const factory = args.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options));
|
||||||
|
const room = factory({
|
||||||
...defaultLiveKitOptions,
|
...defaultLiveKitOptions,
|
||||||
videoCaptureDefaults: {
|
videoCaptureDefaults: {
|
||||||
...defaultLiveKitOptions.videoCaptureDefaults,
|
...defaultLiveKitOptions.videoCaptureDefaults,
|
||||||
@@ -99,17 +102,7 @@ export class PublishConnection extends Connection {
|
|||||||
logger.error("Failed to set E2EE enabled on room", e);
|
logger.error("Failed to set E2EE enabled on room", e);
|
||||||
});
|
});
|
||||||
|
|
||||||
super(
|
super(room, args);
|
||||||
room,
|
|
||||||
args,
|
|
||||||
// focus,
|
|
||||||
// livekitAlias,
|
|
||||||
// client,
|
|
||||||
// scope,
|
|
||||||
// membershipsFocusMap$,
|
|
||||||
// e2eeLivekitOptions,
|
|
||||||
// room
|
|
||||||
);
|
|
||||||
|
|
||||||
// Setup track processor syncing (blur)
|
// Setup track processor syncing (blur)
|
||||||
const track$ = scope.behavior(
|
const track$ = scope.behavior(
|
||||||
@@ -148,7 +141,8 @@ export class PublishConnection extends Connection {
|
|||||||
selected$: Observable<SelectedDevice | undefined>
|
selected$: Observable<SelectedDevice | undefined>
|
||||||
): Subscription =>
|
): Subscription =>
|
||||||
selected$.pipe(scope.bind()).subscribe((device) => {
|
selected$.pipe(scope.bind()).subscribe((device) => {
|
||||||
if (this.connectionState$.value !== ConnectionState.Connected) return;
|
if (this.livekitRoom.state != ConnectionState.Connected) return;
|
||||||
|
// if (this.connectionState$.value !== ConnectionState.Connected) return;
|
||||||
logger.info(
|
logger.info(
|
||||||
"[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :",
|
"[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :",
|
||||||
this.livekitRoom.getActiveDevice(kind),
|
this.livekitRoom.getActiveDevice(kind),
|
||||||
@@ -185,7 +179,7 @@ export class PublishConnection extends Connection {
|
|||||||
scope.bind()
|
scope.bind()
|
||||||
)
|
)
|
||||||
.subscribe(() => {
|
.subscribe(() => {
|
||||||
if (this.connectionState$.value !== ConnectionState.Connected) return;
|
if (this.livekitRoom.state != ConnectionState.Connected) return;
|
||||||
const activeMicTrack = Array.from(
|
const activeMicTrack = Array.from(
|
||||||
this.livekitRoom.localParticipant.audioTrackPublications.values()
|
this.livekitRoom.localParticipant.audioTrackPublications.values()
|
||||||
).find((d) => d.source === Track.Source.Microphone)?.track;
|
).find((d) => d.source === Track.Source.Microphone)?.track;
|
||||||
|
|||||||
Reference in New Issue
Block a user