refactor: PublishConnection extract from giant constructor
This commit is contained in:
@@ -4,7 +4,14 @@ Copyright 2025 New Vector Ltd.
|
|||||||
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||||
Please see LICENSE in the repository root for full details.
|
Please see LICENSE in the repository root for full details.
|
||||||
*/
|
*/
|
||||||
import { ConnectionState, type E2EEOptions, LocalVideoTrack, Room as LivekitRoom, type RoomOptions, Track } from "livekit-client";
|
import {
|
||||||
|
ConnectionState,
|
||||||
|
type E2EEOptions,
|
||||||
|
LocalVideoTrack,
|
||||||
|
Room as LivekitRoom,
|
||||||
|
type RoomOptions,
|
||||||
|
Track
|
||||||
|
} from "livekit-client";
|
||||||
import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs";
|
import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs";
|
||||||
|
|
||||||
import type { Behavior } from "./Behavior.ts";
|
import type { Behavior } from "./Behavior.ts";
|
||||||
@@ -17,6 +24,7 @@ import { defaultLiveKitOptions } from "../livekit/options.ts";
|
|||||||
import { getValue } from "../utils/observable.ts";
|
import { getValue } from "../utils/observable.ts";
|
||||||
import { observeTrackReference$ } from "./MediaViewModel.ts";
|
import { observeTrackReference$ } from "./MediaViewModel.ts";
|
||||||
import { Connection, type ConnectionOpts } from "./Connection.ts";
|
import { Connection, type ConnectionOpts } from "./Connection.ts";
|
||||||
|
import { type ObservableScope } from "./ObservableScope.ts";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A connection to the publishing LiveKit.e. the local livekit room, the one the user is publishing to.
|
* A connection to the publishing LiveKit.e. the local livekit room, the one the user is publishing to.
|
||||||
@@ -24,6 +32,44 @@ import { Connection, type ConnectionOpts } from "./Connection.ts";
|
|||||||
*/
|
*/
|
||||||
export class PublishConnection extends Connection {
|
export class PublishConnection extends Connection {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new PublishConnection.
|
||||||
|
* @param args - The connection options. {@link ConnectionOpts}
|
||||||
|
* @param devices - The media devices to use for audio and video input.
|
||||||
|
* @param muteStates - The mute states for audio and video.
|
||||||
|
* @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!.
|
||||||
|
* @param trackerProcessorState$ - The processor state for the video track processor (e.g. background blur).
|
||||||
|
*/
|
||||||
|
public constructor(
|
||||||
|
args: ConnectionOpts,
|
||||||
|
devices: MediaDevices,
|
||||||
|
private readonly muteStates: MuteStates,
|
||||||
|
e2eeLivekitOptions: E2EEOptions | undefined,
|
||||||
|
trackerProcessorState$: Behavior<ProcessorState>
|
||||||
|
) {
|
||||||
|
const { scope } = args;
|
||||||
|
logger.info("[LivekitRoom] Create LiveKit room");
|
||||||
|
const { controlledAudioDevices } = getUrlParams();
|
||||||
|
|
||||||
|
const factory = args.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options));
|
||||||
|
const room = factory(
|
||||||
|
generateRoomOption(devices, trackerProcessorState$.value, controlledAudioDevices, e2eeLivekitOptions)
|
||||||
|
);
|
||||||
|
room.setE2EEEnabled(e2eeLivekitOptions !== undefined).catch((e) => {
|
||||||
|
logger.error("Failed to set E2EE enabled on room", e);
|
||||||
|
});
|
||||||
|
|
||||||
|
super(room, args);
|
||||||
|
|
||||||
|
// Setup track processor syncing (blur)
|
||||||
|
this.observeTrackProcessors(scope, room, trackerProcessorState$);
|
||||||
|
// Observe mute state changes and update LiveKit microphone/camera states accordingly
|
||||||
|
this.observeMuteStates(scope);
|
||||||
|
// Observe media device changes and update LiveKit active devices accordingly
|
||||||
|
this.observeMediaDevices(scope, devices, controlledAudioDevices);
|
||||||
|
|
||||||
|
this.workaroundRestartAudioInputTrackChrome(devices, scope);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the connection to LiveKit and publish local tracks.
|
* Start the connection to LiveKit and publish local tracks.
|
||||||
@@ -56,123 +102,18 @@ export class PublishConnection extends Connection {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Private methods
|
||||||
|
|
||||||
/**
|
// Restart the audio input track whenever we detect that the active media
|
||||||
* Creates a new PublishConnection.
|
// device has changed to refer to a different hardware device. We do this
|
||||||
* @param args - The connection options. {@link ConnectionOpts}
|
// for the sake of Chrome, which provides a "default" device that is meant
|
||||||
* @param devices - The media devices to use for audio and video input.
|
// to match the system's default audio input, whatever that may be.
|
||||||
* @param muteStates - The mute states for audio and video.
|
// This is special-cased for only audio inputs because we need to dig around
|
||||||
* @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!.
|
// in the LocalParticipant object for the track object and there's not a nice
|
||||||
* @param trackerProcessorState$ - The processor state for the video track processor (e.g. background blur).
|
// way to do that generically. There is usually no OS-level default video capture
|
||||||
*/
|
// device anyway, and audio outputs work differently.
|
||||||
public constructor(
|
private workaroundRestartAudioInputTrackChrome(devices: MediaDevices, scope: ObservableScope): void {
|
||||||
args: ConnectionOpts,
|
|
||||||
devices: MediaDevices,
|
|
||||||
private readonly muteStates: MuteStates,
|
|
||||||
e2eeLivekitOptions: E2EEOptions | undefined,
|
|
||||||
trackerProcessorState$: Behavior<ProcessorState>
|
|
||||||
) {
|
|
||||||
const { scope } = args;
|
|
||||||
logger.info("[LivekitRoom] Create LiveKit room");
|
|
||||||
const { controlledAudioDevices } = getUrlParams();
|
|
||||||
|
|
||||||
const factory = args.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options));
|
|
||||||
const room = factory({
|
|
||||||
...defaultLiveKitOptions,
|
|
||||||
videoCaptureDefaults: {
|
|
||||||
...defaultLiveKitOptions.videoCaptureDefaults,
|
|
||||||
deviceId: devices.videoInput.selected$.value?.id,
|
|
||||||
processor: trackerProcessorState$.value.processor
|
|
||||||
},
|
|
||||||
audioCaptureDefaults: {
|
|
||||||
...defaultLiveKitOptions.audioCaptureDefaults,
|
|
||||||
deviceId: devices.audioInput.selected$.value?.id
|
|
||||||
},
|
|
||||||
audioOutput: {
|
|
||||||
// When using controlled audio devices, we don't want to set the
|
|
||||||
// deviceId here, because it will be set by the native app.
|
|
||||||
// (also the id does not need to match a browser device id)
|
|
||||||
deviceId: controlledAudioDevices
|
|
||||||
? undefined
|
|
||||||
: getValue(devices.audioOutput.selected$)?.id
|
|
||||||
},
|
|
||||||
e2ee: e2eeLivekitOptions
|
|
||||||
});
|
|
||||||
room.setE2EEEnabled(e2eeLivekitOptions !== undefined).catch((e) => {
|
|
||||||
logger.error("Failed to set E2EE enabled on room", e);
|
|
||||||
});
|
|
||||||
|
|
||||||
super(room, args);
|
|
||||||
|
|
||||||
// Setup track processor syncing (blur)
|
|
||||||
const track$ = scope.behavior(
|
|
||||||
observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe(
|
|
||||||
map((trackRef) => {
|
|
||||||
const track = trackRef?.publication?.track;
|
|
||||||
return track instanceof LocalVideoTrack ? track : null;
|
|
||||||
})
|
|
||||||
)
|
|
||||||
);
|
|
||||||
trackProcessorSync(track$, trackerProcessorState$);
|
|
||||||
|
|
||||||
this.muteStates.audio.setHandler(async (desired) => {
|
|
||||||
try {
|
|
||||||
await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired);
|
|
||||||
} catch (e) {
|
|
||||||
logger.error("Failed to update LiveKit audio input mute state", e);
|
|
||||||
}
|
|
||||||
return this.livekitRoom.localParticipant.isMicrophoneEnabled;
|
|
||||||
});
|
|
||||||
this.muteStates.video.setHandler(async (desired) => {
|
|
||||||
try {
|
|
||||||
await this.livekitRoom.localParticipant.setCameraEnabled(desired);
|
|
||||||
} catch (e) {
|
|
||||||
logger.error("Failed to update LiveKit video input mute state", e);
|
|
||||||
}
|
|
||||||
return this.livekitRoom.localParticipant.isCameraEnabled;
|
|
||||||
});
|
|
||||||
scope.onEnd(() => {
|
|
||||||
this.muteStates.audio.unsetHandler();
|
|
||||||
this.muteStates.video.unsetHandler();
|
|
||||||
});
|
|
||||||
|
|
||||||
const syncDevice = (
|
|
||||||
kind: MediaDeviceKind,
|
|
||||||
selected$: Observable<SelectedDevice | undefined>
|
|
||||||
): Subscription =>
|
|
||||||
selected$.pipe(scope.bind()).subscribe((device) => {
|
|
||||||
if (this.livekitRoom.state != ConnectionState.Connected) return;
|
|
||||||
// if (this.connectionState$.value !== ConnectionState.Connected) return;
|
|
||||||
logger.info(
|
|
||||||
"[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :",
|
|
||||||
this.livekitRoom.getActiveDevice(kind),
|
|
||||||
" !== ",
|
|
||||||
device?.id
|
|
||||||
);
|
|
||||||
if (
|
|
||||||
device !== undefined &&
|
|
||||||
this.livekitRoom.getActiveDevice(kind) !== device.id
|
|
||||||
) {
|
|
||||||
this.livekitRoom
|
|
||||||
.switchActiveDevice(kind, device.id)
|
|
||||||
.catch((e) =>
|
|
||||||
logger.error(`Failed to sync ${kind} device with LiveKit`, e)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
syncDevice("audioinput", devices.audioInput.selected$);
|
|
||||||
if (!controlledAudioDevices)
|
|
||||||
syncDevice("audiooutput", devices.audioOutput.selected$);
|
|
||||||
syncDevice("videoinput", devices.videoInput.selected$);
|
|
||||||
// Restart the audio input track whenever we detect that the active media
|
|
||||||
// device has changed to refer to a different hardware device. We do this
|
|
||||||
// for the sake of Chrome, which provides a "default" device that is meant
|
|
||||||
// to match the system's default audio input, whatever that may be.
|
|
||||||
// This is special-cased for only audio inputs because we need to dig around
|
|
||||||
// in the LocalParticipant object for the track object and there's not a nice
|
|
||||||
// way to do that generically. There is usually no OS-level default video capture
|
|
||||||
// device anyway, and audio outputs work differently.
|
|
||||||
devices.audioInput.selected$
|
devices.audioInput.selected$
|
||||||
.pipe(
|
.pipe(
|
||||||
switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER),
|
switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER),
|
||||||
@@ -205,4 +146,109 @@ export class PublishConnection extends Connection {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Observe changes in the selected media devices and update the LiveKit room accordingly.
|
||||||
|
private observeMediaDevices(scope: ObservableScope, devices: MediaDevices, controlledAudioDevices: boolean):void {
|
||||||
|
const syncDevice = (
|
||||||
|
kind: MediaDeviceKind,
|
||||||
|
selected$: Observable<SelectedDevice | undefined>
|
||||||
|
): Subscription =>
|
||||||
|
selected$.pipe(scope.bind()).subscribe((device) => {
|
||||||
|
if (this.livekitRoom.state != ConnectionState.Connected) return;
|
||||||
|
// if (this.connectionState$.value !== ConnectionState.Connected) return;
|
||||||
|
logger.info(
|
||||||
|
"[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :",
|
||||||
|
this.livekitRoom.getActiveDevice(kind),
|
||||||
|
" !== ",
|
||||||
|
device?.id
|
||||||
|
);
|
||||||
|
if (
|
||||||
|
device !== undefined &&
|
||||||
|
this.livekitRoom.getActiveDevice(kind) !== device.id
|
||||||
|
) {
|
||||||
|
this.livekitRoom
|
||||||
|
.switchActiveDevice(kind, device.id)
|
||||||
|
.catch((e) =>
|
||||||
|
logger.error(`Failed to sync ${kind} device with LiveKit`, e)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
syncDevice("audioinput", devices.audioInput.selected$);
|
||||||
|
if (!controlledAudioDevices)
|
||||||
|
syncDevice("audiooutput", devices.audioOutput.selected$);
|
||||||
|
syncDevice("videoinput", devices.videoInput.selected$);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Observe changes in the mute states and update the LiveKit room accordingly.
|
||||||
|
* @param scope
|
||||||
|
* @private
|
||||||
|
*/
|
||||||
|
private observeMuteStates(scope: ObservableScope): void {
|
||||||
|
this.muteStates.audio.setHandler(async (desired) => {
|
||||||
|
try {
|
||||||
|
await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired);
|
||||||
|
} catch (e) {
|
||||||
|
logger.error("Failed to update LiveKit audio input mute state", e);
|
||||||
|
}
|
||||||
|
return this.livekitRoom.localParticipant.isMicrophoneEnabled;
|
||||||
|
});
|
||||||
|
this.muteStates.video.setHandler(async (desired) => {
|
||||||
|
try {
|
||||||
|
await this.livekitRoom.localParticipant.setCameraEnabled(desired);
|
||||||
|
} catch (e) {
|
||||||
|
logger.error("Failed to update LiveKit video input mute state", e);
|
||||||
|
}
|
||||||
|
return this.livekitRoom.localParticipant.isCameraEnabled;
|
||||||
|
});
|
||||||
|
scope.onEnd(() => {
|
||||||
|
this.muteStates.audio.unsetHandler();
|
||||||
|
this.muteStates.video.unsetHandler();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private observeTrackProcessors(scope: ObservableScope, room: LivekitRoom, trackerProcessorState$: Behavior<ProcessorState>): void {
|
||||||
|
const track$ = scope.behavior(
|
||||||
|
observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe(
|
||||||
|
map((trackRef) => {
|
||||||
|
const track = trackRef?.publication?.track;
|
||||||
|
return track instanceof LocalVideoTrack ? track : null;
|
||||||
|
})
|
||||||
|
)
|
||||||
|
);
|
||||||
|
trackProcessorSync(track$, trackerProcessorState$);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Generate the initial LiveKit RoomOptions based on the current media devices and processor state.
|
||||||
|
function generateRoomOption(
|
||||||
|
devices: MediaDevices,
|
||||||
|
processorState: ProcessorState,
|
||||||
|
controlledAudioDevices: boolean,
|
||||||
|
e2eeLivekitOptions: E2EEOptions | undefined,
|
||||||
|
): RoomOptions {
|
||||||
|
return {
|
||||||
|
...defaultLiveKitOptions,
|
||||||
|
videoCaptureDefaults: {
|
||||||
|
...defaultLiveKitOptions.videoCaptureDefaults,
|
||||||
|
deviceId: devices.videoInput.selected$.value?.id,
|
||||||
|
processor: processorState.processor
|
||||||
|
},
|
||||||
|
audioCaptureDefaults: {
|
||||||
|
...defaultLiveKitOptions.audioCaptureDefaults,
|
||||||
|
deviceId: devices.audioInput.selected$.value?.id
|
||||||
|
},
|
||||||
|
audioOutput: {
|
||||||
|
// When using controlled audio devices, we don't want to set the
|
||||||
|
// deviceId here, because it will be set by the native app.
|
||||||
|
// (also the id does not need to match a browser device id)
|
||||||
|
deviceId: controlledAudioDevices
|
||||||
|
? undefined
|
||||||
|
: getValue(devices.audioOutput.selected$)?.id
|
||||||
|
},
|
||||||
|
e2ee: e2eeLivekitOptions
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user