Merge branch 'livekit' into toger5/delayed-event-delegation

This commit is contained in:
Timo K
2025-12-28 21:04:49 +01:00
5 changed files with 587 additions and 273 deletions

View File

@@ -6,11 +6,12 @@ Please see LICENSE in the repository root for full details.
*/
import {
type LocalTrack,
type Participant,
ParticipantEvent,
type LocalParticipant,
type ScreenShareCaptureOptions,
RoomEvent,
MediaDeviceFailure,
} from "livekit-client";
import { observeParticipantEvents } from "@livekit/components-core";
import {
@@ -24,6 +25,7 @@ import {
combineLatest,
distinctUntilChanged,
from,
fromEvent,
map,
type Observable,
of,
@@ -36,7 +38,7 @@ import { type Logger } from "matrix-js-sdk/lib/logger";
import { deepCompare } from "matrix-js-sdk/lib/utils";
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
import { constant, type Behavior } from "../../Behavior.ts";
import { type Behavior } from "../../Behavior.ts";
import { type IConnectionManager } from "../remoteMembers/ConnectionManager.ts";
import { type ObservableScope } from "../../ObservableScope.ts";
import { type Publisher } from "./Publisher.ts";
@@ -67,17 +69,23 @@ export enum TransportState {
export enum PublishState {
WaitingForUser = "publish_waiting_for_user",
/** Implies lk connection is connected */
Starting = "publish_start_publishing",
// XXX: This state is removed for now since we do not have full control over
// track publication anymore with the publisher abstraction, might come back in the future?
// /** Implies lk connection is connected */
// Starting = "publish_start_publishing",
/** Implies lk connection is connected */
Publishing = "publish_publishing",
}
// TODO not sure how to map that correctly with the
// new publisher that does not manage tracks itself anymore
export enum TrackState {
/** The track is waiting for user input to create tracks (waiting to call `startTracks()`) */
WaitingForUser = "tracks_waiting_for_user",
/** Implies lk connection is connected */
Creating = "tracks_creating",
// XXX: This state is removed for now since we do not have full control over
// track creation anymore with the publisher abstraction, might come back in the future?
// /** Implies lk connection is connected */
// Creating = "tracks_creating",
/** Implies lk connection is connected */
Ready = "tracks_ready",
}
@@ -151,9 +159,10 @@ export const createLocalMembership$ = ({
matrixRTCSession,
}: Props): {
/**
* This starts audio and video tracks. They will be reused when calling `requestPublish`.
* This request to start audio and video tracks.
* Can be called early to pre-emptively get media permissions and start devices.
*/
startTracks: () => Behavior<LocalTrack[]>;
startTracks: () => void;
/**
* This sets a inner state (shouldPublish) to true and instructs the js-sdk and livekit to keep the user
* connected to matrix and livekit.
@@ -166,7 +175,7 @@ export const createLocalMembership$ = ({
* Callback to toggle screen sharing. If null, screen sharing is not possible.
*/
toggleScreenSharing: (() => void) | null;
tracks$: Behavior<LocalTrack[]>;
// tracks$: Behavior<LocalTrack[]>;
participant$: Behavior<LocalParticipant | null>;
connection$: Behavior<Connection | null>;
/** Shorthand for homeserverConnected.rtcSession === Status.Reconnecting
@@ -222,6 +231,32 @@ export const createLocalMembership$ = ({
),
);
// Tracks error that happen when creating the local tracks.
const mediaErrors$ = localConnection$.pipe(
switchMap((connection) => {
if (!connection) {
return of(null);
} else {
return fromEvent(
connection.livekitRoom,
RoomEvent.MediaDevicesError,
(error: Error) => {
return MediaDeviceFailure.getFailure(error) ?? null;
},
);
}
}),
);
mediaErrors$.pipe(scope.bind()).subscribe((error) => {
if (error) {
logger.error(`Failed to create local tracks:`, error);
setMatrixError(
// TODO is it fatal? Do we need to create a new Specialized Error?
new UnknownCallError(new Error(`Media device error: ${error}`)),
);
}
});
// MATRIX RELATED
// This should be used in a combineLatest with publisher$ to connect.
@@ -236,19 +271,10 @@ export const createLocalMembership$ = ({
* The publisher is stored in here an abstracts creating and publishing tracks.
*/
const publisher$ = new BehaviorSubject<Publisher | null>(null);
/**
* Extract the tracks from the published. Also reacts to changing publishers.
*/
const tracks$ = scope.behavior(
publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))),
);
const publishing$ = scope.behavior(
publisher$.pipe(switchMap((p) => p?.publishing$ ?? constant(false))),
);
const startTracks = (): Behavior<LocalTrack[]> => {
const startTracks = (): void => {
trackStartRequested.resolve();
return tracks$;
// This used to return the tracks, but now they are only accessible via the publisher.
};
const requestJoinAndPublish = (): void => {
@@ -274,7 +300,7 @@ export const createLocalMembership$ = ({
// Clean-up callback
return Promise.resolve(async (): Promise<void> => {
await publisher.stopPublishing();
publisher.stopTracks();
await publisher.stopTracks();
});
}
});
@@ -283,13 +309,16 @@ export const createLocalMembership$ = ({
// `tracks$` will update once they are ready.
scope.reconcile(
scope.behavior(
combineLatest([publisher$, tracks$, from(trackStartRequested.promise)]),
combineLatest([
publisher$ /*, tracks$*/,
from(trackStartRequested.promise),
]),
null,
),
async (valueIfReady) => {
if (!valueIfReady) return;
const [publisher, tracks] = valueIfReady;
if (publisher && tracks.length === 0) {
const [publisher] = valueIfReady;
if (publisher) {
await publisher.createAndSetupTracks().catch((e) => logger.error(e));
}
},
@@ -297,12 +326,11 @@ export const createLocalMembership$ = ({
// Based on `connectRequested$` we start publishing tracks. (once they are there!)
scope.reconcile(
scope.behavior(
combineLatest([publisher$, tracks$, joinAndPublishRequested$]),
),
async ([publisher, tracks, shouldJoinAndPublish]) => {
if (shouldJoinAndPublish === publisher?.publishing$.value) return;
if (tracks.length !== 0 && shouldJoinAndPublish) {
scope.behavior(combineLatest([publisher$, joinAndPublishRequested$])),
async ([publisher, shouldJoinAndPublish]) => {
// Get the current publishing state to avoid redundant calls.
const isPublishing = publisher?.shouldPublish === true;
if (shouldJoinAndPublish && !isPublishing) {
try {
await publisher?.startPublishing();
} catch (error) {
@@ -310,7 +338,7 @@ export const createLocalMembership$ = ({
error instanceof Error ? error.message : String(error);
setPublishError(new FailToStartLivekitConnection(message));
}
} else if (tracks.length !== 0 && !shouldJoinAndPublish) {
} else if (isPublishing) {
try {
await publisher?.stopPublishing();
} catch (error) {
@@ -352,8 +380,6 @@ export const createLocalMembership$ = ({
combineLatest([
localConnectionState$,
localTransport$,
tracks$,
publishing$,
joinAndPublishRequested$,
from(trackStartRequested.promise).pipe(
map(() => true),
@@ -364,16 +390,13 @@ export const createLocalMembership$ = ({
([
localConnectionState,
localTransport,
tracks,
publishing,
shouldPublish,
shouldStartTracks,
]) => {
if (!localTransport) return null;
const hasTracks = tracks.length > 0;
let trackState: TrackState = TrackState.WaitingForUser;
if (hasTracks && shouldStartTracks) trackState = TrackState.Ready;
if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating;
const trackState: TrackState = shouldStartTracks
? TrackState.Ready
: TrackState.WaitingForUser;
if (
localConnectionState !== ConnectionState.LivekitConnected ||
@@ -384,7 +407,7 @@ export const createLocalMembership$ = ({
tracks: trackState,
};
if (!shouldPublish) return PublishState.WaitingForUser;
if (!publishing) return PublishState.Starting;
// if (!publishing) return PublishState.Starting;
return PublishState.Publishing;
},
),
@@ -614,7 +637,6 @@ export const createLocalMembership$ = ({
requestJoinAndPublish,
requestDisconnect,
localMemberState$,
tracks$,
participant$,
reconnecting$,
disconnected$: scope.behavior(