More testing and cleaning up

This commit is contained in:
Timo K
2025-11-27 14:42:23 +01:00
parent d22d7460fe
commit e5117f705d
4 changed files with 321 additions and 127 deletions

View File

@@ -14,7 +14,7 @@ import { describe, expect, it, vi } from "vitest";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { BehaviorSubject, map, of } from "rxjs"; import { BehaviorSubject, map, of } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { type LocalParticipant } from "livekit-client"; import { type LocalParticipant, type LocalTrack } from "livekit-client";
import { MatrixRTCMode } from "../../../settings/settings"; import { MatrixRTCMode } from "../../../settings/settings";
import { import {
@@ -34,6 +34,7 @@ import { Epoch, ObservableScope } from "../../ObservableScope";
import { constant } from "../../Behavior"; import { constant } from "../../Behavior";
import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager";
import { type Connection } from "../remoteMembers/Connection"; import { type Connection } from "../remoteMembers/Connection";
import { type Publisher } from "./Publisher";
const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy;
const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); const getUrlParams = vi.hoisted(() => vi.fn(() => ({})));
@@ -235,44 +236,54 @@ describe("LocalMembership", () => {
}); });
}); });
it("recreates publisher if new connection is used", async () => { const aTransport = {
livekit_service_url: "a",
} as LivekitTransport;
const bTransport = {
livekit_service_url: "b",
} as LivekitTransport;
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(
{
livekitRoom: mockLivekitRoom({
localParticipant: {
isScreenShareEnabled: false,
trackPublications: [],
} as unknown as LocalParticipant,
}),
state$: constant({
state: "ConnectedToLkRoom",
}),
transport: aTransport,
} as unknown as Connection,
[],
);
connectionManagerData.add(
{
state$: constant({
state: "ConnectedToLkRoom",
}),
transport: bTransport,
} as unknown as Connection,
[],
);
it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => {
const scope = new ObservableScope(); const scope = new ObservableScope();
const aTransport = {
livekit_service_url: "a",
} as LivekitTransport;
const bTransport = {
livekit_service_url: "b",
} as LivekitTransport;
const localTransport$ = new BehaviorSubject(aTransport); const localTransport$ = new BehaviorSubject(aTransport);
const connectionManagerData = new ConnectionManagerData(); const publishers: Publisher[] = [];
connectionManagerData.add( defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation(
{ () => {
livekitRoom: mockLivekitRoom({ const p = { stopPublishing: vi.fn(), stopTracks: vi.fn() };
localParticipant: { publishers.push(p as unknown as Publisher);
isScreenShareEnabled: false, return p;
trackPublications: [], },
} as unknown as LocalParticipant,
}),
state$: constant({
state: "ConnectedToLkRoom",
}),
transport: aTransport,
} as unknown as Connection,
[],
); );
connectionManagerData.add(
{
state$: constant({
state: "ConnectedToLkRoom",
}),
transport: bTransport,
} as unknown as Connection,
[],
);
const publisherFactory = const publisherFactory =
defaultCreateLocalMemberValues.createPublisherFactory as ReturnType< defaultCreateLocalMemberValues.createPublisherFactory as ReturnType<
typeof vi.fn typeof vi.fn
@@ -290,7 +301,182 @@ describe("LocalMembership", () => {
localTransport$.next(bTransport); localTransport$.next(bTransport);
await flushPromises(); await flushPromises();
expect(publisherFactory).toHaveBeenCalledTimes(2); expect(publisherFactory).toHaveBeenCalledTimes(2);
expect(publishers.length).toBe(2);
// stop the first Publisher and let the second one life.
expect(publishers[0].stopTracks).toHaveBeenCalled();
expect(publishers[1].stopTracks).not.toHaveBeenCalled();
expect(publishers[0].stopPublishing).toHaveBeenCalled();
expect(publishers[1].stopPublishing).not.toHaveBeenCalled();
expect(publisherFactory.mock.calls[0][0].transport).toBe(aTransport); expect(publisherFactory.mock.calls[0][0].transport).toBe(aTransport);
expect(publisherFactory.mock.calls[1][0].transport).toBe(bTransport); expect(publisherFactory.mock.calls[1][0].transport).toBe(bTransport);
scope.end();
await flushPromises();
// stop all tracks after ending scopes
expect(publishers[1].stopPublishing).toHaveBeenCalled();
expect(publishers[1].stopTracks).toHaveBeenCalled();
defaultCreateLocalMemberValues.createPublisherFactory.mockReset();
});
it("only start tracks if requested", async () => {
const scope = new ObservableScope();
const localTransport$ = new BehaviorSubject(aTransport);
const publishers: Publisher[] = [];
const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
const publishing$ = new BehaviorSubject<boolean>(false);
defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation(
() => {
const p = {
stopPublishing: vi.fn(),
stopTracks: vi.fn(),
createAndSetupTracks: vi.fn().mockImplementation(async () => {
tracks$.next([{}, {}] as LocalTrack[]);
return Promise.resolve();
}),
tracks$,
publishing$,
};
publishers.push(p as unknown as Publisher);
return p;
},
);
const publisherFactory =
defaultCreateLocalMemberValues.createPublisherFactory as ReturnType<
typeof vi.fn
>;
const localMembership = createLocalMembership$({
scope,
...defaultCreateLocalMemberValues,
connectionManager: {
connectionManagerData$: constant(new Epoch(connectionManagerData)),
},
localTransport$,
});
await flushPromises();
expect(publisherFactory).toHaveBeenCalledOnce();
expect(localMembership.tracks$.value.length).toBe(0);
localMembership.startTracks();
await flushPromises();
expect(localMembership.tracks$.value.length).toBe(2);
scope.end();
await flushPromises();
// stop all tracks after ending scopes
expect(publishers[0].stopPublishing).toHaveBeenCalled();
expect(publishers[0].stopTracks).toHaveBeenCalled();
});
// TODO add an integration test combining publisher and localMembership
//
it("tracks livekit state correctly", async () => {
const scope = new ObservableScope();
const localTransport$ = new BehaviorSubject<null | LivekitTransport>(null);
const connectionManagerData$ = new BehaviorSubject<
Epoch<ConnectionManagerData>
>(new Epoch(new ConnectionManagerData()));
const publishers: Publisher[] = [];
const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
const publishing$ = new BehaviorSubject<boolean>(false);
const createTrackResolver = Promise.withResolvers<void>();
const publishResolver = Promise.withResolvers<void>();
defaultCreateLocalMemberValues.createPublisherFactory.mockImplementation(
() => {
const p = {
stopPublishing: vi.fn(),
stopTracks: vi.fn().mockImplementation(() => {
logger.info("stopTracks");
tracks$.next([]);
}),
createAndSetupTracks: vi.fn().mockImplementation(async () => {
await createTrackResolver.promise;
tracks$.next([{}, {}] as LocalTrack[]);
}),
startPublishing: vi.fn().mockImplementation(async () => {
await publishResolver.promise;
publishing$.next(true);
}),
tracks$,
publishing$,
};
publishers.push(p as unknown as Publisher);
return p;
},
);
const publisherFactory =
defaultCreateLocalMemberValues.createPublisherFactory as ReturnType<
typeof vi.fn
>;
const localMembership = createLocalMembership$({
scope,
...defaultCreateLocalMemberValues,
connectionManager: {
connectionManagerData$,
},
localTransport$,
});
await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({
state: LivekitState.WaitingForTransport,
});
localTransport$.next(aTransport);
await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({
state: LivekitState.WaitingForConnection,
});
connectionManagerData$.next(new Epoch(connectionManagerData));
await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({
state: LivekitState.Initialized,
});
expect(publisherFactory).toHaveBeenCalledOnce();
expect(localMembership.tracks$.value.length).toBe(0);
// -------
localMembership.startTracks();
// -------
await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({
state: LivekitState.CreatingTracks,
});
createTrackResolver.resolve();
await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({
state: LivekitState.ReadyToPublish,
});
// -------
localMembership.requestConnect();
// -------
expect(localMembership.connectionState.livekit$.value).toStrictEqual({
state: LivekitState.WaitingToPublish,
});
publishResolver.resolve();
await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({
state: LivekitState.Connected,
});
expect(publishers[0].stopPublishing).not.toHaveBeenCalled();
expect(localMembership.connectionState.livekit$.isStopped).toBe(false);
scope.end();
await flushPromises();
expect(localMembership.connectionState.livekit$.isStopped).toBe(true);
// stays in connected state because it is stopped before the update to tracks update the state.
expect(localMembership.connectionState.livekit$.value).toStrictEqual({
state: LivekitState.Connected,
});
// stop all tracks after ending scopes
expect(publishers[0].stopPublishing).toHaveBeenCalled();
expect(publishers[0].stopTracks).toHaveBeenCalled();
}); });
}); });

View File

@@ -22,10 +22,12 @@ import {
catchError, catchError,
combineLatest, combineLatest,
distinctUntilChanged, distinctUntilChanged,
from,
map, map,
type Observable, type Observable,
of, of,
scan, scan,
startWith,
switchMap, switchMap,
tap, tap,
} from "rxjs"; } from "rxjs";
@@ -54,13 +56,13 @@ export enum LivekitState {
Error = "error", Error = "error",
/** Not even a transport is available to the LocalMembership */ /** Not even a transport is available to the LocalMembership */
WaitingForTransport = "waiting_for_transport", WaitingForTransport = "waiting_for_transport",
/** A transport is and we are loading the connection based on the transport */ /** A connection appeared so we can initialise the publisher */
Connecting = "connecting", WaitingForConnection = "waiting_for_connection",
InitialisingPublisher = "uninitialized", /** Connection and transport arrived, publisher Initialized */
Initialized = "Initialized", Initialized = "Initialized",
CreatingTracks = "creating_tracks", CreatingTracks = "creating_tracks",
ReadyToPublish = "ready_to_publish", ReadyToPublish = "ready_to_publish",
WaitingToPublish = "publishing", WaitingToPublish = "waiting_to_publish",
Connected = "connected", Connected = "connected",
Disconnected = "disconnected", Disconnected = "disconnected",
Disconnecting = "disconnecting", Disconnecting = "disconnecting",
@@ -69,8 +71,7 @@ export enum LivekitState {
type LocalMemberLivekitState = type LocalMemberLivekitState =
| { state: LivekitState.Error; error: ElementCallError } | { state: LivekitState.Error; error: ElementCallError }
| { state: LivekitState.WaitingForTransport } | { state: LivekitState.WaitingForTransport }
| { state: LivekitState.Connecting } | { state: LivekitState.WaitingForConnection }
| { state: LivekitState.InitialisingPublisher }
| { state: LivekitState.Initialized } | { state: LivekitState.Initialized }
| { state: LivekitState.CreatingTracks } | { state: LivekitState.CreatingTracks }
| { state: LivekitState.ReadyToPublish } | { state: LivekitState.ReadyToPublish }
@@ -163,12 +164,10 @@ export const createLocalMembership$ = ({
* Callback to toggle screen sharing. If null, screen sharing is not possible. * Callback to toggle screen sharing. If null, screen sharing is not possible.
*/ */
toggleScreenSharing: (() => void) | null; toggleScreenSharing: (() => void) | null;
tracks$: Behavior<LocalTrack[]>;
participant$: Behavior<LocalParticipant | null>; participant$: Behavior<LocalParticipant | null>;
connection$: Behavior<Connection | null>; connection$: Behavior<Connection | null>;
homeserverConnected$: Behavior<boolean>; homeserverConnected$: Behavior<boolean>;
// deprecated fields
/** @deprecated use state instead*/
connected$: Behavior<boolean>;
// this needs to be discussed // this needs to be discussed
/** @deprecated use state instead*/ /** @deprecated use state instead*/
reconnecting$: Behavior<boolean>; reconnecting$: Behavior<boolean>;
@@ -217,20 +216,19 @@ export const createLocalMembership$ = ({
), ),
); );
const localConnectionState$ = localConnection$.pipe(
switchMap((connection) => (connection ? connection.state$ : of(null))),
);
// /** // /**
// * Whether we are "fully" connected to the call. Accounts for both the // * Whether we are "fully" connected to the call. Accounts for both the
// * connection to the MatrixRTC session and the LiveKit publish connection. // * connection to the MatrixRTC session and the LiveKit publish connection.
// */ // */
// // TODO use this in combination with the MemberState.
const connected$ = scope.behavior( const connected$ = scope.behavior(
and$( and$(
homeserverConnected$, homeserverConnected$,
localConnection$.pipe( localConnectionState$.pipe(
switchMap((c) => map((state) => (state ? state.state === "ConnectedToLkRoom" : false)),
c
? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom"))
: of(false),
),
), ),
), ),
); );
@@ -259,7 +257,7 @@ export const createLocalMembership$ = ({
// This should be used in a combineLatest with publisher$ to connect. // This should be used in a combineLatest with publisher$ to connect.
// to make it possible to call startTracks before the preferredTransport$ has resolved. // to make it possible to call startTracks before the preferredTransport$ has resolved.
const trackStartRequested$ = new BehaviorSubject(false); const trackStartRequested = Promise.withResolvers<void>();
// This should be used in a combineLatest with publisher$ to connect. // This should be used in a combineLatest with publisher$ to connect.
// to make it possible to call startTracks before the preferredTransport$ has resolved. // to make it possible to call startTracks before the preferredTransport$ has resolved.
@@ -273,19 +271,21 @@ export const createLocalMembership$ = ({
* Extract the tracks from the published. Also reacts to changing publishers. * Extract the tracks from the published. Also reacts to changing publishers.
*/ */
const tracks$ = scope.behavior( const tracks$ = scope.behavior(
publisher$.pipe(switchMap((p) => (p ? p.tracks$ : constant([])))), publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))),
); );
const publishing$ = scope.behavior( const publishing$ = scope.behavior(
publisher$.pipe(switchMap((p) => (p ? p.publishing$ : constant(false)))), publisher$.pipe(
switchMap((p) => (p?.publishing$ ? p.publishing$ : constant(false))),
),
); );
const startTracks = (): Behavior<LocalTrack[]> => { const startTracks = (): Behavior<LocalTrack[]> => {
trackStartRequested$.next(true); trackStartRequested.resolve();
return tracks$; return tracks$;
}; };
const requestConnect = (): void => { const requestConnect = (): void => {
trackStartRequested$.next(true); trackStartRequested.resolve();
connectRequested$.next(true); connectRequested$.next(true);
}; };
@@ -310,37 +310,18 @@ export const createLocalMembership$ = ({
}); });
}); });
// const mutestate= publisher$.pipe(switchMap((publisher) => {
// return publisher.muteState$
// });
// For each publisher create the descired tracks
// If we recreate a new publisher we remember the trackStartRequested$ value and immediately create the tracks
// THIS might be fine without a reconcile. There is no cleanup needed. We always get a working publisher
// track start request can than just toggle the tracks.
// TODO does this need `reconcile` to make sure we wait for createAndSetupTracks before we stop tracks?
combineLatest([publisher$, trackStartRequested$]).subscribe(
([publisher, shouldStartTracks]) => {
if (publisher && shouldStartTracks) {
publisher.createAndSetupTracks().catch(
// TODO make this set some error state
(e) => logger.error(e),
);
} else if (publisher) {
publisher.stopTracks();
}
},
);
// Use reconcile here to not run concurrent createAndSetupTracks calls // Use reconcile here to not run concurrent createAndSetupTracks calls
// `tracks$` will update once they are ready. // `tracks$` will update once they are ready.
scope.reconcile( scope.reconcile(
scope.behavior(combineLatest([publisher$, trackStartRequested$])), scope.behavior(
async ([publisher, shouldStartTracks]) => { combineLatest([publisher$, tracks$, from(trackStartRequested.promise)]),
if (publisher && shouldStartTracks) { null,
),
async (valueIfReady) => {
if (!valueIfReady) return;
const [publisher, tracks] = valueIfReady;
if (publisher && tracks.length === 0) {
await publisher.createAndSetupTracks().catch((e) => logger.error(e)); await publisher.createAndSetupTracks().catch((e) => logger.error(e));
} else if (publisher) {
publisher.stopTracks();
} }
}, },
); );
@@ -349,8 +330,7 @@ export const createLocalMembership$ = ({
scope.reconcile( scope.reconcile(
scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])), scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])),
async ([publisher, tracks, shouldConnect]) => { async ([publisher, tracks, shouldConnect]) => {
if (shouldConnect === publisher?.publishing$.value) if (shouldConnect === publisher?.publishing$.value) return;
return Promise.resolve();
if (tracks.length !== 0 && shouldConnect) { if (tracks.length !== 0 && shouldConnect) {
try { try {
await publisher?.startPublishing(); await publisher?.startPublishing();
@@ -374,46 +354,53 @@ export const createLocalMembership$ = ({
logger.error("Multiple Livkit Errors:", e); logger.error("Multiple Livkit Errors:", e);
else fatalLivekitError$.next(e); else fatalLivekitError$.next(e);
}; };
const livekitState$: Observable<LocalMemberLivekitState> = combineLatest([ const livekitState$: Behavior<LocalMemberLivekitState> = scope.behavior(
publisher$, combineLatest([
localTransport$, publisher$,
localConnection$, localTransport$,
tracks$, tracks$.pipe(
publishing$, tap((t) => {
connectRequested$, logger.info("tracks$: ", t);
trackStartRequested$, }),
fatalLivekitError$, ),
]).pipe( publishing$,
map( connectRequested$,
([ from(trackStartRequested.promise).pipe(
publisher, map(() => true),
localTransport, startWith(false),
localConnection, ),
tracks, fatalLivekitError$,
publishing, ]).pipe(
shouldConnect, map(
shouldStartTracks, ([
error, publisher,
]) => { localTransport,
// read this: tracks,
// if(!<A>) return {state: ...} publishing,
// if(!<B>) return {state: <MyState>} shouldConnect,
// shouldStartTracks,
// as: error,
// We do have <A> but not yet <B> so we are in <MyState> ]) => {
if (error !== null) return { state: LivekitState.Error, error }; // read this:
const hasTracks = tracks.length > 0; // if(!<A>) return {state: ...}
if (!localTransport) return { state: LivekitState.WaitingForTransport }; // if(!<B>) return {state: <MyState>}
if (!localConnection) return { state: LivekitState.Connecting }; //
if (!publisher) return { state: LivekitState.InitialisingPublisher }; // as:
if (!shouldStartTracks) return { state: LivekitState.Initialized }; // We do have <A> but not yet <B> so we are in <MyState>
if (!hasTracks) return { state: LivekitState.CreatingTracks }; if (error !== null) return { state: LivekitState.Error, error };
if (!shouldConnect) return { state: LivekitState.ReadyToPublish }; const hasTracks = tracks.length > 0;
if (!publishing) return { state: LivekitState.WaitingToPublish }; if (!localTransport)
return { state: LivekitState.Connected }; return { state: LivekitState.WaitingForTransport };
}, if (!publisher) return { state: LivekitState.WaitingForConnection };
if (!shouldStartTracks) return { state: LivekitState.Initialized };
if (!hasTracks) return { state: LivekitState.CreatingTracks };
if (!shouldConnect) return { state: LivekitState.ReadyToPublish };
if (!publishing) return { state: LivekitState.WaitingToPublish };
return { state: LivekitState.Connected };
},
),
distinctUntilChanged(deepCompare),
), ),
distinctUntilChanged(deepCompare),
); );
const fatalMatrixError$ = new BehaviorSubject<ElementCallError | null>(null); const fatalMatrixError$ = new BehaviorSubject<ElementCallError | null>(null);
@@ -577,15 +564,15 @@ export const createLocalMembership$ = ({
requestConnect, requestConnect,
requestDisconnect, requestDisconnect,
connectionState: { connectionState: {
livekit$: scope.behavior(livekitState$), livekit$: livekitState$,
matrix$: matrixState$, matrix$: matrixState$,
}, },
tracks$,
participant$,
homeserverConnected$, homeserverConnected$,
connected$,
reconnecting$, reconnecting$,
sharingScreen$, sharingScreen$,
toggleScreenSharing, toggleScreenSharing,
participant$,
connection$: localConnection$, connection$: localConnection$,
}; };
}; };

View File

@@ -15,6 +15,7 @@ import {
} from "livekit-client"; } from "livekit-client";
import { import {
BehaviorSubject, BehaviorSubject,
combineLatest,
map, map,
NEVER, NEVER,
type Observable, type Observable,
@@ -80,6 +81,23 @@ export class Publisher {
); );
void this.stopPublishing(); void this.stopPublishing();
}); });
// TODO move mute state handling here using reconcile (instead of inside the mute state class)
// this.scope.reconcile(
// this.scope.behavior(
// combineLatest([this.muteStates.video.enabled$, this.tracks$]),
// ),
// async ([videoEnabled, tracks]) => {
// const track = tracks.find((t) => t.kind == Track.Kind.Video);
// if (!track) return;
// if (videoEnabled) {
// await track.unmute();
// } else {
// await track.mute();
// }
// },
// );
} }
private _tracks$ = new BehaviorSubject<LocalTrack<Track.Kind>[]>([]); private _tracks$ = new BehaviorSubject<LocalTrack<Track.Kind>[]>([]);

View File

@@ -80,8 +80,11 @@ export class ObservableScope {
error(err: unknown) { error(err: unknown) {
subject$.error(err); subject$.error(err);
}, },
complete() {
subject$.complete();
},
}); });
if (subject$.value === nothing) if (subject$.value === nothing && !subject$.isStopped)
throw new Error("Behavior failed to synchronously emit an initial value"); throw new Error("Behavior failed to synchronously emit an initial value");
return subject$ as Behavior<T>; return subject$ as Behavior<T>;
} }
@@ -125,11 +128,11 @@ export class ObservableScope {
let latestValue: T | typeof nothing = nothing; let latestValue: T | typeof nothing = nothing;
let reconcilePromise: Promise<void> | undefined = undefined; let reconcilePromise: Promise<void> | undefined = undefined;
let cleanUp: (() => Promise<void>) | void = undefined; let cleanUp: (() => Promise<void>) | void = undefined;
let prevVal: T | typeof nothing = nothing;
// While this loop runs it will process the latest from `value$` until it caught up with the updates. // While this loop runs it will process the latest from `value$` until it caught up with the updates.
// It might skip updates from `value$` and only process the newest value after callback has resolved. // It might skip updates from `value$` and only process the newest value after callback has resolved.
const reconcileLoop = async (): Promise<void> => { const reconcileLoop = async (): Promise<void> => {
let prevVal: T | typeof nothing = nothing;
while (latestValue !== prevVal) { while (latestValue !== prevVal) {
await cleanUp?.(); // Call the previous value's clean-up handler await cleanUp?.(); // Call the previous value's clean-up handler
prevVal = latestValue; prevVal = latestValue;