Another larger refactor to fix sfu switches and in general proper

cleanup.
This commit is contained in:
Timo K
2025-11-25 20:18:34 +01:00
parent 3491a68362
commit d22d7460fe
12 changed files with 482 additions and 328 deletions

View File

@@ -30,50 +30,68 @@ import {
tap,
} from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import { deepCompare } from "matrix-js-sdk/lib/utils";
import { type Behavior } from "../../Behavior";
import { constant, type Behavior } from "../../Behavior";
import { type IConnectionManager } from "../remoteMembers/ConnectionManager";
import { ObservableScope } from "../../ObservableScope";
import { type Publisher } from "./Publisher";
import { type MuteStates } from "../../MuteStates";
import { and$ } from "../../../utils/observable";
import { ElementCallError, UnknownCallError } from "../../../utils/errors";
import {
ElementCallError,
MembershipManagerError,
UnknownCallError,
} from "../../../utils/errors";
import { ElementWidgetActions, widget } from "../../../widget";
import { getUrlParams } from "../../../UrlParams.ts";
import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts";
import { MatrixRTCMode } from "../../../settings/settings.ts";
import { Config } from "../../../config/Config.ts";
import {
type Connection,
type ConnectionState,
} from "../remoteMembers/Connection.ts";
import { type Connection } from "../remoteMembers/Connection.ts";
export enum LivekitState {
Uninitialized = "uninitialized",
Connecting = "connecting",
Connected = "connected",
Error = "error",
/** Not even a transport is available to the LocalMembership */
WaitingForTransport = "waiting_for_transport",
/** A transport is and we are loading the connection based on the transport */
Connecting = "connecting",
InitialisingPublisher = "uninitialized",
Initialized = "Initialized",
CreatingTracks = "creating_tracks",
ReadyToPublish = "ready_to_publish",
WaitingToPublish = "publishing",
Connected = "connected",
Disconnected = "disconnected",
Disconnecting = "disconnecting",
}
type LocalMemberLivekitState =
| { state: LivekitState.Error; error: ElementCallError }
| { state: LivekitState.Connected }
| { state: LivekitState.WaitingForTransport }
| { state: LivekitState.Connecting }
| { state: LivekitState.Uninitialized }
| { state: LivekitState.InitialisingPublisher }
| { state: LivekitState.Initialized }
| { state: LivekitState.CreatingTracks }
| { state: LivekitState.ReadyToPublish }
| { state: LivekitState.WaitingToPublish }
| { state: LivekitState.Connected }
| { state: LivekitState.Disconnected }
| { state: LivekitState.Disconnecting };
export enum MatrixState {
WaitingForTransport = "waiting_for_transport",
Ready = "ready",
Connecting = "connecting",
Connected = "connected",
Disconnected = "disconnected",
Connecting = "connecting",
Error = "Error",
}
type LocalMemberMatrixState =
| { state: MatrixState.Connected }
| { state: MatrixState.WaitingForTransport }
| { state: MatrixState.Ready }
| { state: MatrixState.Connecting }
| { state: MatrixState.Disconnected }
| { state: MatrixState.Error; error: Error };
@@ -102,7 +120,7 @@ interface Props {
muteStates: MuteStates;
connectionManager: IConnectionManager;
createPublisherFactory: (connection: Connection) => Publisher;
joinMatrixRTC: (trasnport: LivekitTransport) => Promise<void>;
joinMatrixRTC: (transport: LivekitTransport) => Promise<void>;
homeserverConnected$: Behavior<boolean>;
localTransport$: Behavior<LivekitTransport | null>;
matrixRTCSession: Pick<
@@ -136,9 +154,9 @@ export const createLocalMembership$ = ({
muteStates,
matrixRTCSession,
}: Props): {
requestConnect: () => LocalMemberConnectionState;
requestConnect: () => void;
startTracks: () => Behavior<LocalTrack[]>;
requestDisconnect: () => Observable<LocalMemberLivekitState> | null;
requestDisconnect: () => void;
connectionState: LocalMemberConnectionState;
sharingScreen$: Behavior<boolean>;
/**
@@ -157,27 +175,8 @@ export const createLocalMembership$ = ({
} => {
const logger = parentLogger.getChild("[LocalMembership]");
logger.debug(`Creating local membership..`);
const state = {
livekit$: new BehaviorSubject<LocalMemberLivekitState>({
state: LivekitState.Uninitialized,
}),
matrix$: new BehaviorSubject<LocalMemberMatrixState>({
state: MatrixState.Disconnected,
}),
};
// This should be used in a combineLatest with publisher$ to connect.
// to make it possible to call startTracks before the preferredTransport$ has resolved.
const trackStartRequested$ = new BehaviorSubject(false);
// This should be used in a combineLatest with publisher$ to connect.
// to make it possible to call startTracks before the preferredTransport$ has resolved.
const connectRequested$ = new BehaviorSubject(false);
// This should be used in a combineLatest with publisher$ to connect.
const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
// unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error.
// Unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error.
const localTransport$ = scope.behavior(
localTransportCanThrow$.pipe(
catchError((e: unknown) => {
@@ -191,7 +190,7 @@ export const createLocalMembership$ = ({
: new Error("Unknown error from localTransport"),
);
}
state.livekit$.next({ state: LivekitState.Error, error });
setLivekitError(error);
return of(null);
}),
),
@@ -203,12 +202,12 @@ export const createLocalMembership$ = ({
connectionManager.connectionManagerData$,
localTransport$,
]).pipe(
map(([connectionData, localTransport]) => {
map(([{ value: connectionData }, localTransport]) => {
if (localTransport === null) {
return null;
}
return connectionData.value.getConnectionForTransport(localTransport);
return connectionData.getConnectionForTransport(localTransport);
}),
tap((connection) => {
logger.info(
@@ -236,34 +235,6 @@ export const createLocalMembership$ = ({
),
);
const publisher$ = new BehaviorSubject<Publisher | null>(null);
localConnection$.pipe(scope.bind()).subscribe((connection) => {
if (connection !== null && publisher$.value === null) {
// TODO looks strange to not change publisher if connection changes.
// @toger5 will take care of this!
publisher$.next(createPublisherFactory(connection));
}
});
// const mutestate= publisher$.pipe(switchMap((publisher) => {
// return publisher.muteState$
// });
combineLatest([publisher$, trackStartRequested$]).subscribe(
([publisher, shouldStartTracks]) => {
if (publisher && shouldStartTracks) {
publisher
.createAndSetupTracks()
.then((tracks) => {
tracks$.next(tracks);
})
.catch((error) => {
logger.error("Error creating tracks:", error);
});
}
},
);
// MATRIX RELATED
// /**
@@ -286,90 +257,230 @@ export const createLocalMembership$ = ({
),
);
// This should be used in a combineLatest with publisher$ to connect.
// to make it possible to call startTracks before the preferredTransport$ has resolved.
const trackStartRequested$ = new BehaviorSubject(false);
// This should be used in a combineLatest with publisher$ to connect.
// to make it possible to call startTracks before the preferredTransport$ has resolved.
const connectRequested$ = new BehaviorSubject(false);
/**
* The publisher is stored in here an abstracts creating and publishing tracks.
*/
const publisher$ = new BehaviorSubject<Publisher | null>(null);
/**
* Extract the tracks from the published. Also reacts to changing publishers.
*/
const tracks$ = scope.behavior(
publisher$.pipe(switchMap((p) => (p ? p.tracks$ : constant([])))),
);
const publishing$ = scope.behavior(
publisher$.pipe(switchMap((p) => (p ? p.publishing$ : constant(false)))),
);
const startTracks = (): Behavior<LocalTrack[]> => {
trackStartRequested$.next(true);
return tracks$;
};
combineLatest([publisher$, tracks$]).subscribe(([publisher, tracks]) => {
if (
tracks.length === 0 ||
// change this to !== Publishing
state.livekit$.value.state !== LivekitState.Uninitialized
) {
return;
const requestConnect = (): void => {
trackStartRequested$.next(true);
connectRequested$.next(true);
};
const requestDisconnect = (): void => {
connectRequested$.next(false);
};
// Take care of the publisher$
// create a new one as soon as a local Connection is available
//
// Recreate a new one once the local connection changes
// - stop publishing
// - destruct all current streams
// - overwrite current publisher
scope.reconcile(localConnection$, async (connection) => {
if (connection !== null) {
publisher$.next(createPublisherFactory(connection));
}
state.livekit$.next({ state: LivekitState.Connecting });
publisher
?.startPublishing()
.then(() => {
state.livekit$.next({ state: LivekitState.Connected });
})
.catch((error) => {
state.livekit$.next({ state: LivekitState.Error, error });
});
return Promise.resolve(async (): Promise<void> => {
await publisher$?.value?.stopPublishing();
publisher$?.value?.stopTracks();
});
});
combineLatest([localTransport$, connectRequested$]).subscribe(
// TODO reconnect when transport changes => create test.
([transport, connectRequested]) => {
if (
transport === null ||
!connectRequested ||
state.matrix$.value.state !== MatrixState.Disconnected
) {
logger.info(
"Not yet connecting because: ",
"transport === null:",
transport === null,
"!connectRequested:",
!connectRequested,
"state.matrix$.value.state !== MatrixState.Disconnected:",
state.matrix$.value.state !== MatrixState.Disconnected,
);
return;
}
state.matrix$.next({ state: MatrixState.Connecting });
logger.info("Matrix State connecting");
// const mutestate= publisher$.pipe(switchMap((publisher) => {
// return publisher.muteState$
// });
joinMatrixRTC(transport).catch((error) => {
logger.error(error);
state.matrix$.next({ state: MatrixState.Error, error });
});
// For each publisher create the descired tracks
// If we recreate a new publisher we remember the trackStartRequested$ value and immediately create the tracks
// THIS might be fine without a reconcile. There is no cleanup needed. We always get a working publisher
// track start request can than just toggle the tracks.
// TODO does this need `reconcile` to make sure we wait for createAndSetupTracks before we stop tracks?
combineLatest([publisher$, trackStartRequested$]).subscribe(
([publisher, shouldStartTracks]) => {
if (publisher && shouldStartTracks) {
publisher.createAndSetupTracks().catch(
// TODO make this set some error state
(e) => logger.error(e),
);
} else if (publisher) {
publisher.stopTracks();
}
},
);
// TODO add this and update `state.matrix$` based on it.
// useTypedEventEmitter(
// rtcSession,
// MatrixRTCSessionEvent.MembershipManagerError,
// (error) => setExternalError(new ConnectionLostError()),
// );
// Use reconcile here to not run concurrent createAndSetupTracks calls
// `tracks$` will update once they are ready.
scope.reconcile(
scope.behavior(combineLatest([publisher$, trackStartRequested$])),
async ([publisher, shouldStartTracks]) => {
if (publisher && shouldStartTracks) {
await publisher.createAndSetupTracks().catch((e) => logger.error(e));
} else if (publisher) {
publisher.stopTracks();
}
},
);
const requestConnect = (): LocalMemberConnectionState => {
trackStartRequested$.next(true);
connectRequested$.next(true);
// Based on `connectRequested$` we start publishing tracks. (once they are there!)
scope.reconcile(
scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])),
async ([publisher, tracks, shouldConnect]) => {
if (shouldConnect === publisher?.publishing$.value)
return Promise.resolve();
if (tracks.length !== 0 && shouldConnect) {
try {
await publisher?.startPublishing();
} catch (error) {
// will take care of "FailedToStartLk" errors.
setLivekitError(error as ElementCallError);
}
} else if (tracks.length !== 0 && !shouldConnect) {
try {
await publisher?.stopPublishing();
} catch (error) {
setLivekitError(new UnknownCallError(error as Error));
}
}
},
);
return state;
const fatalLivekitError$ = new BehaviorSubject<ElementCallError | null>(null);
const setLivekitError = (e: ElementCallError): void => {
if (fatalLivekitError$.value !== null)
logger.error("Multiple Livkit Errors:", e);
else fatalLivekitError$.next(e);
};
const livekitState$: Observable<LocalMemberLivekitState> = combineLatest([
publisher$,
localTransport$,
localConnection$,
tracks$,
publishing$,
connectRequested$,
trackStartRequested$,
fatalLivekitError$,
]).pipe(
map(
([
publisher,
localTransport,
localConnection,
tracks,
publishing,
shouldConnect,
shouldStartTracks,
error,
]) => {
// read this:
// if(!<A>) return {state: ...}
// if(!<B>) return {state: <MyState>}
//
// as:
// We do have <A> but not yet <B> so we are in <MyState>
if (error !== null) return { state: LivekitState.Error, error };
const hasTracks = tracks.length > 0;
if (!localTransport) return { state: LivekitState.WaitingForTransport };
if (!localConnection) return { state: LivekitState.Connecting };
if (!publisher) return { state: LivekitState.InitialisingPublisher };
if (!shouldStartTracks) return { state: LivekitState.Initialized };
if (!hasTracks) return { state: LivekitState.CreatingTracks };
if (!shouldConnect) return { state: LivekitState.ReadyToPublish };
if (!publishing) return { state: LivekitState.WaitingToPublish };
return { state: LivekitState.Connected };
},
),
distinctUntilChanged(deepCompare),
);
const requestDisconnect = (): Behavior<LocalMemberLivekitState> | null => {
if (state.livekit$.value.state !== LivekitState.Connected) return null;
state.livekit$.next({ state: LivekitState.Disconnecting });
combineLatest([publisher$, tracks$], (publisher, tracks) => {
publisher
?.stopPublishing()
.then(() => {
tracks.forEach((track) => track.stop());
state.livekit$.next({ state: LivekitState.Disconnected });
})
.catch((error) => {
state.livekit$.next({ state: LivekitState.Error, error });
});
});
return state.livekit$;
const fatalMatrixError$ = new BehaviorSubject<ElementCallError | null>(null);
const setMatrixError = (e: ElementCallError): void => {
if (fatalMatrixError$.value !== null)
logger.error("Multiple Matrix Errors:", e);
else fatalMatrixError$.next(e);
};
const matrixState$: Behavior<LocalMemberMatrixState> = scope.behavior(
combineLatest([
localTransport$,
connectRequested$,
homeserverConnected$,
]).pipe(
map(([localTransport, connectRequested, homeserverConnected]) => {
if (!localTransport) return { state: MatrixState.WaitingForTransport };
if (!connectRequested) return { state: MatrixState.Ready };
if (!homeserverConnected) return { state: MatrixState.Connecting };
return { state: MatrixState.Connected };
}),
),
);
// Keep matrix rtc session in sync with localTransport$, connectRequested$ and muteStates.video.enabled$
scope.reconcile(
scope.behavior(combineLatest([localTransport$, connectRequested$])),
async ([transport, shouldConnect]) => {
if (!shouldConnect) return;
if (!transport) return;
try {
await joinMatrixRTC(transport);
} catch (error) {
logger.error("Error entering RTC session", error);
if (error instanceof Error)
setMatrixError(new MembershipManagerError(error));
}
// Update our member event when our mute state changes.
const callIntentScope = new ObservableScope();
// because this uses its own scope, we can start another reconciliation for the duration of one connection.
callIntentScope.reconcile(
muteStates.video.enabled$,
async (videoEnabled) =>
matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"),
);
return async (): Promise<void> => {
callIntentScope.end();
try {
// Update matrixRTCSession to allow udpating the transport without leaving the session!
await matrixRTCSession.leaveRoomSession();
} catch (e) {
logger.error("Error leaving RTC session", e);
}
try {
await widget?.api.transport.send(ElementWidgetActions.HangupCall, {});
} catch (e) {
logger.error("Failed to send hangup action", e);
}
};
},
);
const participant$ = scope.behavior(
localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)),
);
// Pause upstream of all local media tracks when we're disconnected from
// MatrixRTC, because it can be an unpleasant surprise for the app to say
@@ -377,12 +488,12 @@ export const createLocalMembership$ = ({
// We use matrixConnected$ rather than reconnecting$ because we want to
// pause tracks during the initial joining sequence too until we're sure
// that our own media is displayed on screen.
combineLatest([localConnection$, homeserverConnected$])
// TODO refactor this based no livekitState$
combineLatest([participant$, homeserverConnected$])
.pipe(scope.bind())
.subscribe(([connection, connected]) => {
if (connection?.state$.value.state !== "ConnectedToLkRoom") return;
const publications =
connection.livekitRoom.localParticipant.trackPublications.values();
.subscribe(([participant, connected]) => {
if (!participant) return;
const publications = participant.trackPublications.values();
if (connected) {
for (const p of publications) {
if (p.track?.isUpstreamPaused === true) {
@@ -419,85 +530,13 @@ export const createLocalMembership$ = ({
}
}
});
// TODO: Refactor updateCallIntent to sth like this:
// combineLatest([muteStates.video.enabled$,localTransport$, state.matrix$]).pipe(map(()=>{
// matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"),
// }))
//
// TODO I do not fully understand what this does.
// Is it needed?
// Is this at the right place?
// Can this be simplified?
// Start and stop session membership as needed
// Discussed in statndup -> It seems we can remove this (there is another call to enterRTCSession in this file)
// MAKE SURE TO UNDERSTAND why reconcile is needed and what is potentially missing from the alternative enterRTCSession block.
// @toger5 will try to take care of this.
scope.reconcile(localTransport$, async (transport) => {
if (transport !== null && transport !== undefined) {
try {
state.matrix$.next({ state: MatrixState.Connecting });
await joinMatrixRTC(transport);
} catch (e) {
logger.error("Error entering RTC session", e);
}
// Update our member event when our mute state changes.
const intentScope = new ObservableScope();
intentScope.reconcile(muteStates.video.enabled$, async (videoEnabled) =>
matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"),
);
return async (): Promise<void> => {
intentScope.end();
// Only sends Matrix leave event. The LiveKit session will disconnect
// as soon as either the stopConnection$ handler above gets to it or
// the view model is destroyed.
try {
await matrixRTCSession.leaveRoomSession();
} catch (e) {
logger.error("Error leaving RTC session", e);
}
try {
await widget?.api.transport.send(ElementWidgetActions.HangupCall, {});
} catch (e) {
logger.error("Failed to send hangup action", e);
}
};
}
});
localConnection$
.pipe(
distinctUntilChanged(),
switchMap((c) =>
c === null ? of({ state: "Initialized" } as ConnectionState) : c.state$,
),
map((s) => {
logger.trace(`Local connection state update: ${s.state}`);
if (s.state == "FailedToStart") {
return s.error instanceof ElementCallError
? s.error
: new UnknownCallError(s.error);
}
}),
scope.bind(),
)
.subscribe((error) => {
if (error !== undefined)
state.livekit$.next({ state: LivekitState.Error, error });
});
/**
* Whether the user is currently sharing their screen.
*/
const sharingScreen$ = scope.behavior(
localConnection$.pipe(
switchMap((c) =>
c !== null
? observeSharingScreen$(c.livekitRoom.localParticipant)
: of(false),
),
participant$.pipe(
switchMap((p) => (p !== null ? observeSharingScreen$(p) : of(false))),
),
);
@@ -527,24 +566,23 @@ export const createLocalMembership$ = ({
// We also allow screen sharing to be toggled even if the connection
// is still initializing or publishing tracks, because there's no
// technical reason to disallow this. LiveKit will publish if it can.
localConnection$.value?.livekitRoom.localParticipant
.setScreenShareEnabled(targetScreenshareState, screenshareSettings)
participant$.value
?.setScreenShareEnabled(targetScreenshareState, screenshareSettings)
.catch(logger.error);
};
}
const participant$ = scope.behavior(
localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)),
);
return {
startTracks,
requestConnect,
requestDisconnect,
connectionState: state,
connectionState: {
livekit$: scope.behavior(livekitState$),
matrix$: matrixState$,
},
homeserverConnected$,
connected$,
reconnecting$,
sharingScreen$,
toggleScreenSharing,
participant$,