/* Copyright 2025 New Vector Ltd. SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ import { ConnectionState, type E2EEOptions, LocalVideoTrack, Room as LivekitRoom, type RoomOptions, Track } from "livekit-client"; import { map, NEVER, type Observable, type Subscription, switchMap } from "rxjs"; import type { Behavior } from "./Behavior.ts"; import type { MediaDevices, SelectedDevice } from "./MediaDevices.ts"; import type { MuteStates } from "./MuteStates.ts"; import { type ProcessorState, trackProcessorSync } from "../livekit/TrackProcessorContext.tsx"; import { logger } from "../../../matrix-js-sdk/lib/logger"; import { getUrlParams } from "../UrlParams.ts"; import { defaultLiveKitOptions } from "../livekit/options.ts"; import { getValue } from "../utils/observable.ts"; import { observeTrackReference$ } from "./MediaViewModel.ts"; import { Connection, type ConnectionOpts } from "./Connection.ts"; /** * A connection to the publishing LiveKit.e. the local livekit room, the one the user is publishing to. * This connection will publish the local user's audio and video tracks. */ export class PublishConnection extends Connection { /** * Start the connection to LiveKit and publish local tracks. * * This will: * 1. Request an OpenId token `request_token` (allows matrix users to verify their identity with a third-party service.) * 2. Use this token to request the SFU config to the MatrixRtc authentication service. * 3. Connect to the configured LiveKit room. * 4. Create local audio and video tracks based on the current mute states and publish them to the room. */ public async start(): Promise { this.stopped = false; await super.start() if (this.stopped) return; // TODO this can throw errors? It will also prompt for permissions if not already granted const tracks = await this.livekitRoom.localParticipant.createTracks({ audio: this.muteStates.audio.enabled$.value, video: this.muteStates.video.enabled$.value }); if (this.stopped) return; for (const track of tracks) { // TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally // with a timeout. await this.livekitRoom.localParticipant.publishTrack(track); if (this.stopped) return; // TODO: check if the connection is still active? and break the loop if not? } }; /** * Creates a new PublishConnection. * @param args - The connection options. {@link ConnectionOpts} * @param devices - The media devices to use for audio and video input. * @param muteStates - The mute states for audio and video. * @param e2eeLivekitOptions - The E2EE options to use for the LiveKit room. Use to share the same key provider across connections!. * @param trackerProcessorState$ - The processor state for the video track processor (e.g. background blur). */ public constructor( args: ConnectionOpts, devices: MediaDevices, private readonly muteStates: MuteStates, e2eeLivekitOptions: E2EEOptions | undefined, trackerProcessorState$: Behavior ) { const { scope } = args; logger.info("[LivekitRoom] Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); const factory = args.livekitRoomFactory ?? ((options: RoomOptions): LivekitRoom => new LivekitRoom(options)); const room = factory({ ...defaultLiveKitOptions, videoCaptureDefaults: { ...defaultLiveKitOptions.videoCaptureDefaults, deviceId: devices.videoInput.selected$.value?.id, processor: trackerProcessorState$.value.processor }, audioCaptureDefaults: { ...defaultLiveKitOptions.audioCaptureDefaults, deviceId: devices.audioInput.selected$.value?.id }, audioOutput: { // When using controlled audio devices, we don't want to set the // deviceId here, because it will be set by the native app. // (also the id does not need to match a browser device id) deviceId: controlledAudioDevices ? undefined : getValue(devices.audioOutput.selected$)?.id }, e2ee: e2eeLivekitOptions }); room.setE2EEEnabled(e2eeLivekitOptions !== undefined).catch((e) => { logger.error("Failed to set E2EE enabled on room", e); }); super(room, args); // Setup track processor syncing (blur) const track$ = scope.behavior( observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe( map((trackRef) => { const track = trackRef?.publication?.track; return track instanceof LocalVideoTrack ? track : null; }) ) ); trackProcessorSync(track$, trackerProcessorState$); this.muteStates.audio.setHandler(async (desired) => { try { await this.livekitRoom.localParticipant.setMicrophoneEnabled(desired); } catch (e) { logger.error("Failed to update LiveKit audio input mute state", e); } return this.livekitRoom.localParticipant.isMicrophoneEnabled; }); this.muteStates.video.setHandler(async (desired) => { try { await this.livekitRoom.localParticipant.setCameraEnabled(desired); } catch (e) { logger.error("Failed to update LiveKit video input mute state", e); } return this.livekitRoom.localParticipant.isCameraEnabled; }); scope.onEnd(() => { this.muteStates.audio.unsetHandler(); this.muteStates.video.unsetHandler(); }); const syncDevice = ( kind: MediaDeviceKind, selected$: Observable ): Subscription => selected$.pipe(scope.bind()).subscribe((device) => { if (this.livekitRoom.state != ConnectionState.Connected) return; // if (this.connectionState$.value !== ConnectionState.Connected) return; logger.info( "[LivekitRoom] syncDevice room.getActiveDevice(kind) !== d.id :", this.livekitRoom.getActiveDevice(kind), " !== ", device?.id ); if ( device !== undefined && this.livekitRoom.getActiveDevice(kind) !== device.id ) { this.livekitRoom .switchActiveDevice(kind, device.id) .catch((e) => logger.error(`Failed to sync ${kind} device with LiveKit`, e) ); } }); syncDevice("audioinput", devices.audioInput.selected$); if (!controlledAudioDevices) syncDevice("audiooutput", devices.audioOutput.selected$); syncDevice("videoinput", devices.videoInput.selected$); // Restart the audio input track whenever we detect that the active media // device has changed to refer to a different hardware device. We do this // for the sake of Chrome, which provides a "default" device that is meant // to match the system's default audio input, whatever that may be. // This is special-cased for only audio inputs because we need to dig around // in the LocalParticipant object for the track object and there's not a nice // way to do that generically. There is usually no OS-level default video capture // device anyway, and audio outputs work differently. devices.audioInput.selected$ .pipe( switchMap((device) => device?.hardwareDeviceChange$ ?? NEVER), scope.bind() ) .subscribe(() => { if (this.livekitRoom.state != ConnectionState.Connected) return; const activeMicTrack = Array.from( this.livekitRoom.localParticipant.audioTrackPublications.values() ).find((d) => d.source === Track.Source.Microphone)?.track; if ( activeMicTrack && // only restart if the stream is still running: LiveKit will detect // when a track stops & restart appropriately, so this is not our job. // Plus, we need to avoid restarting again if the track is already in // the process of being restarted. activeMicTrack.mediaStreamTrack.readyState !== "ended" ) { // Restart the track, which will cause Livekit to do another // getUserMedia() call with deviceId: default to get the *new* default device. // Note that room.switchActiveDevice() won't work: Livekit will ignore it because // the deviceId hasn't changed (was & still is default). this.livekitRoom.localParticipant .getTrackPublication(Track.Source.Microphone) ?.audioTrack?.restartTrack() .catch((e) => { logger.error(`Failed to restart audio device track`, e); }); } }); } }