cleanup and tests

This commit is contained in:
Timo K
2025-12-09 15:23:30 +01:00
parent 7c40b0e177
commit bf801364a6
8 changed files with 302 additions and 275 deletions

View File

@@ -94,14 +94,13 @@ import {
type SpotlightLandscapeLayoutMedia, type SpotlightLandscapeLayoutMedia,
type SpotlightPortraitLayoutMedia, type SpotlightPortraitLayoutMedia,
} from "../layout-types.ts"; } from "../layout-types.ts";
import { type ElementCallError } from "../../utils/errors.ts"; import { ElementCallError } from "../../utils/errors.ts";
import { type ObservableScope } from "../ObservableScope.ts"; import { type ObservableScope } from "../ObservableScope.ts";
import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts"; import { createHomeserverConnected$ } from "./localMember/HomeserverConnected.ts";
import { import {
createLocalMembership$, createLocalMembership$,
enterRTCSession, enterRTCSession,
RTCBackendState, } from "./localMember/LocalMember.ts";
} from "./localMember/LocalMembership.ts";
import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts";
import { import {
createMemberships$, createMemberships$,
@@ -452,13 +451,13 @@ export function createCallViewModel$(
const localMembership = createLocalMembership$({ const localMembership = createLocalMembership$({
scope: scope, scope: scope,
homeserverConnected$: createHomeserverConnected$( homeserverConnected: createHomeserverConnected$(
scope, scope,
client, client,
matrixRTCSession, matrixRTCSession,
), ),
muteStates: muteStates, muteStates: muteStates,
joinMatrixRTC: async (transport: LivekitTransport) => { joinMatrixRTC: (transport: LivekitTransport) => {
return enterRTCSession( return enterRTCSession(
matrixRTCSession, matrixRTCSession,
transport, transport,
@@ -1455,7 +1454,7 @@ export function createCallViewModel$(
ringOverlay$: ringOverlay$, ringOverlay$: ringOverlay$,
leave$: leave$, leave$: leave$,
hangup: (): void => userHangup$.next(), hangup: (): void => userHangup$.next(),
join: localMembership.requestConnect, join: localMembership.requestJoinAndPublish,
toggleScreenSharing: toggleScreenSharing, toggleScreenSharing: toggleScreenSharing,
sharingScreen$: sharingScreen$, sharingScreen$: sharingScreen$,
@@ -1465,9 +1464,8 @@ export function createCallViewModel$(
unhoverScreen: (): void => screenUnhover$.next(), unhoverScreen: (): void => screenUnhover$.next(),
fatalError$: scope.behavior( fatalError$: scope.behavior(
localMembership.connectionState.livekit$.pipe( localMembership.localMemberState$.pipe(
filter((v) => v.state === RTCBackendState.Error), filter((v) => v instanceof ElementCallError),
map((s) => s.error),
), ),
null, null,
), ),

View File

@@ -7,7 +7,7 @@ Please see LICENSE in the repository root for full details.
*/ */
import { import {
Status, Status as RTCMemberStatus,
type LivekitTransport, type LivekitTransport,
type MatrixRTCSession, type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
@@ -15,11 +15,7 @@ import { describe, expect, it, vi } from "vitest";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { BehaviorSubject, map, of } from "rxjs"; import { BehaviorSubject, map, of } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { import { type LocalParticipant, type LocalTrack } from "livekit-client";
ConnectionState as LivekitConnectionState,
type LocalParticipant,
type LocalTrack,
} from "livekit-client";
import { MatrixRTCMode } from "../../../settings/settings"; import { MatrixRTCMode } from "../../../settings/settings";
import { import {
@@ -30,16 +26,19 @@ import {
withTestScheduler, withTestScheduler,
} from "../../../utils/test"; } from "../../../utils/test";
import { import {
TransportState,
createLocalMembership$, createLocalMembership$,
enterRTCSession, enterRTCSession,
RTCBackendState, PublishState,
} from "./LocalMembership"; TrackState,
} from "./LocalMember";
import { MatrixRTCTransportMissingError } from "../../../utils/errors"; import { MatrixRTCTransportMissingError } from "../../../utils/errors";
import { Epoch, ObservableScope } from "../../ObservableScope"; import { Epoch, ObservableScope } from "../../ObservableScope";
import { constant } from "../../Behavior"; import { constant } from "../../Behavior";
import { ConnectionManagerData } from "../remoteMembers/ConnectionManager"; import { ConnectionManagerData } from "../remoteMembers/ConnectionManager";
import { type Connection } from "../remoteMembers/Connection"; import { ConnectionState, type Connection } from "../remoteMembers/Connection";
import { type Publisher } from "./Publisher"; import { type Publisher } from "./Publisher";
import { C } from "vitest/dist/chunks/global.d.MAmajcmJ.js";
const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy;
const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); const getUrlParams = vi.hoisted(() => vi.fn(() => ({})));
@@ -200,21 +199,18 @@ describe("LocalMembership", () => {
joinMatrixRTC: async (): Promise<void> => {}, joinMatrixRTC: async (): Promise<void> => {},
homeserverConnected: { homeserverConnected: {
combined$: constant(true), combined$: constant(true),
rtsSession$: constant(Status.Connected), rtsSession$: constant(RTCMemberStatus.Connected),
}, },
}; };
it("throws error on missing RTC config error", () => { it("throws error on missing RTC config error", () => {
withTestScheduler(({ scope, hot, expectObservable }) => { withTestScheduler(({ scope, hot, expectObservable }) => {
const goodTransport = { const localTransport$ = scope.behavior<null | LivekitTransport>(
livekit_service_url: "other",
} as LivekitTransport;
const localTransport$ = scope.behavior<LivekitTransport>(
hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")), hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")),
goodTransport, null,
); );
// we do not need any connection data since we want to fail before reaching that.
const mockConnectionManager = { const mockConnectionManager = {
transports$: scope.behavior( transports$: scope.behavior(
localTransport$.pipe(map((t) => new Epoch([t]))), localTransport$.pipe(map((t) => new Epoch([t]))),
@@ -230,15 +226,11 @@ describe("LocalMembership", () => {
connectionManager: mockConnectionManager, connectionManager: mockConnectionManager,
localTransport$, localTransport$,
}); });
localMembership.requestJoinAndPublish();
expectObservable(localMembership.connectionState.livekit$).toBe("ne", { expectObservable(localMembership.localMemberState$).toBe("ne", {
n: { state: RTCBackendState.WaitingForConnection }, n: TransportState.Waiting,
e: { e: expect.toSatisfy((e) => e instanceof MatrixRTCTransportMissingError),
state: RTCBackendState.Error,
error: expect.toSatisfy(
(e) => e instanceof MatrixRTCTransportMissingError,
),
},
}); });
}); });
}); });
@@ -250,32 +242,24 @@ describe("LocalMembership", () => {
livekit_service_url: "b", livekit_service_url: "b",
} as LivekitTransport; } as LivekitTransport;
const connectionManagerData = new ConnectionManagerData(); const connectionTransportAConnected = {
livekitRoom: mockLivekitRoom({
connectionManagerData.add( localParticipant: {
{ isScreenShareEnabled: false,
livekitRoom: mockLivekitRoom({ trackPublications: [],
localParticipant: { } as unknown as LocalParticipant,
isScreenShareEnabled: false, }),
trackPublications: [], state$: constant(ConnectionState.LivekitConnected),
} as unknown as LocalParticipant, transport: aTransport,
}), } as unknown as Connection;
state$: constant({ const connectionTransportAConnecting = {
state: LivekitConnectionState.Connected, ...connectionTransportAConnected,
}), state$: constant(ConnectionState.LivekitConnecting),
transport: aTransport, } as unknown as Connection;
} as unknown as Connection, const connectionTransportBConnected = {
[], state$: constant(ConnectionState.LivekitConnected),
); transport: bTransport,
connectionManagerData.add( } as unknown as Connection;
{
state$: constant({
state: LivekitConnectionState.Connected,
}),
transport: bTransport,
} as unknown as Connection,
[],
);
it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => { it("recreates publisher if new connection is used and ENDS always unpublish and end tracks", async () => {
const scope = new ObservableScope(); const scope = new ObservableScope();
@@ -300,6 +284,9 @@ describe("LocalMembership", () => {
typeof vi.fn typeof vi.fn
>; >;
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(connectionTransportAConnected, []);
connectionManagerData.add(connectionTransportBConnected, []);
createLocalMembership$({ createLocalMembership$({
scope, scope,
...defaultCreateLocalMemberValues, ...defaultCreateLocalMemberValues,
@@ -359,6 +346,9 @@ describe("LocalMembership", () => {
typeof vi.fn typeof vi.fn
>; >;
const connectionManagerData = new ConnectionManagerData();
connectionManagerData.add(connectionTransportAConnected, []);
// connectionManagerData.add(connectionTransportB, []);
const localMembership = createLocalMembership$({ const localMembership = createLocalMembership$({
scope, scope,
...defaultCreateLocalMemberValues, ...defaultCreateLocalMemberValues,
@@ -385,10 +375,11 @@ describe("LocalMembership", () => {
it("tracks livekit state correctly", async () => { it("tracks livekit state correctly", async () => {
const scope = new ObservableScope(); const scope = new ObservableScope();
const connectionManagerData = new ConnectionManagerData();
const localTransport$ = new BehaviorSubject<null | LivekitTransport>(null); const localTransport$ = new BehaviorSubject<null | LivekitTransport>(null);
const connectionManagerData$ = new BehaviorSubject< const connectionManagerData$ = new BehaviorSubject(
Epoch<ConnectionManagerData> new Epoch(connectionManagerData),
>(new Epoch(new ConnectionManagerData())); );
const publishers: Publisher[] = []; const publishers: Publisher[] = [];
const tracks$ = new BehaviorSubject<LocalTrack[]>([]); const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
@@ -434,19 +425,45 @@ describe("LocalMembership", () => {
}); });
await flushPromises(); await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(localMembership.localMemberState$.value).toStrictEqual(
state: RTCBackendState.WaitingForTransport, TransportState.Waiting,
}); );
localTransport$.next(aTransport); localTransport$.next(aTransport);
await flushPromises(); await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(localMembership.localMemberState$.value).toStrictEqual({
state: RTCBackendState.WaitingForConnection, matrix: RTCMemberStatus.Connected,
media: { connection: null, tracks: TrackState.WaitingForUser },
}); });
connectionManagerData$.next(new Epoch(connectionManagerData));
const connectionManagerData2 = new ConnectionManagerData();
connectionManagerData2.add(
// clone because we will mutate this later.
{ ...connectionTransportAConnecting } as unknown as Connection,
[],
);
connectionManagerData$.next(new Epoch(connectionManagerData2));
await flushPromises(); await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(localMembership.localMemberState$.value).toStrictEqual({
state: LivekitConnectionState.Connected, matrix: RTCMemberStatus.Connected,
media: {
connection: ConnectionState.LivekitConnecting,
tracks: TrackState.WaitingForUser,
},
}); });
(
connectionManagerData2.getConnectionForTransport(aTransport)!
.state$ as BehaviorSubject<ConnectionState>
).next(ConnectionState.LivekitConnected);
expect(localMembership.localMemberState$.value).toStrictEqual({
matrix: RTCMemberStatus.Connected,
media: {
connection: ConnectionState.LivekitConnected,
tracks: TrackState.WaitingForUser,
},
});
expect(publisherFactory).toHaveBeenCalledOnce(); expect(publisherFactory).toHaveBeenCalledOnce();
expect(localMembership.tracks$.value.length).toBe(0); expect(localMembership.tracks$.value.length).toBe(0);
@@ -455,37 +472,46 @@ describe("LocalMembership", () => {
// ------- // -------
await flushPromises(); await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(localMembership.localMemberState$.value).toStrictEqual({
state: RTCBackendState.CreatingTracks, matrix: RTCMemberStatus.Connected,
media: {
tracks: TrackState.Creating,
connection: ConnectionState.LivekitConnected,
},
}); });
createTrackResolver.resolve(); createTrackResolver.resolve();
await flushPromises(); await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(
state: RTCBackendState.ReadyToPublish, // eslint-disable-next-line @typescript-eslint/no-explicit-any
}); (localMembership.localMemberState$.value as any).media,
).toStrictEqual(PublishState.WaitingForUser);
// ------- // -------
localMembership.requestConnect(); localMembership.requestJoinAndPublish();
// ------- // -------
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(
state: RTCBackendState.WaitingToPublish, // eslint-disable-next-line @typescript-eslint/no-explicit-any
}); (localMembership.localMemberState$.value as any).media,
).toStrictEqual(PublishState.Starting);
publishResolver.resolve(); publishResolver.resolve();
await flushPromises(); await flushPromises();
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(
state: RTCBackendState.ConnectedAndPublishing, // eslint-disable-next-line @typescript-eslint/no-explicit-any
}); (localMembership.localMemberState$.value as any).media,
).toStrictEqual(PublishState.Publishing);
expect(publishers[0].stopPublishing).not.toHaveBeenCalled(); expect(publishers[0].stopPublishing).not.toHaveBeenCalled();
expect(localMembership.connectionState.livekit$.isStopped).toBe(false); expect(localMembership.localMemberState$.isStopped).toBe(false);
scope.end(); scope.end();
await flushPromises(); await flushPromises();
// stays in connected state because it is stopped before the update to tracks update the state. // stays in connected state because it is stopped before the update to tracks update the state.
expect(localMembership.connectionState.livekit$.value).toStrictEqual({ expect(
state: RTCBackendState.ConnectedAndPublishing, // eslint-disable-next-line @typescript-eslint/no-explicit-any
}); (localMembership.localMemberState$.value as any).media,
).toStrictEqual(PublishState.Publishing);
// stop all tracks after ending scopes // stop all tracks after ending scopes
expect(publishers[0].stopPublishing).toHaveBeenCalled(); expect(publishers[0].stopPublishing).toHaveBeenCalled();
expect(publishers[0].stopTracks).toHaveBeenCalled(); expect(publishers[0].stopTracks).toHaveBeenCalled();

View File

@@ -11,7 +11,6 @@ import {
ParticipantEvent, ParticipantEvent,
type LocalParticipant, type LocalParticipant,
type ScreenShareCaptureOptions, type ScreenShareCaptureOptions,
ConnectionState as LivekitConnectionState,
} from "livekit-client"; } from "livekit-client";
import { observeParticipantEvents } from "@livekit/components-core"; import { observeParticipantEvents } from "@livekit/components-core";
import { import {
@@ -36,62 +35,66 @@ import {
import { type Logger } from "matrix-js-sdk/lib/logger"; import { type Logger } from "matrix-js-sdk/lib/logger";
import { deepCompare } from "matrix-js-sdk/lib/utils"; import { deepCompare } from "matrix-js-sdk/lib/utils";
import { constant, type Behavior } from "../../Behavior"; import { constant, type Behavior } from "../../Behavior.ts";
import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager.ts";
import { type ObservableScope } from "../../ObservableScope"; import { type ObservableScope } from "../../ObservableScope.ts";
import { type Publisher } from "./Publisher"; import { type Publisher } from "./Publisher.ts";
import { type MuteStates } from "../../MuteStates"; import { type MuteStates } from "../../MuteStates.ts";
import { import {
ElementCallError, ElementCallError,
MembershipManagerError, MembershipManagerError,
UnknownCallError, UnknownCallError,
} from "../../../utils/errors"; } from "../../../utils/errors.ts";
import { ElementWidgetActions, widget } from "../../../widget"; import { ElementWidgetActions, widget } from "../../../widget.ts";
import { getUrlParams } from "../../../UrlParams.ts"; import { getUrlParams } from "../../../UrlParams.ts";
import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts";
import { MatrixRTCMode } from "../../../settings/settings.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts";
import { Config } from "../../../config/Config.ts"; import { Config } from "../../../config/Config.ts";
import { import {
type ConnectionState, ConnectionState,
type Connection, type Connection,
type FailedToStartError,
} from "../remoteMembers/Connection.ts"; } from "../remoteMembers/Connection.ts";
import { type HomeserverConnected } from "./HomeserverConnected.ts"; import { type HomeserverConnected } from "./HomeserverConnected.ts";
export enum RTCBackendState { export enum TransportState {
Error = "error",
/** Not even a transport is available to the LocalMembership */ /** Not even a transport is available to the LocalMembership */
WaitingForTransport = "waiting_for_transport", Waiting = "transport_waiting",
/** A connection appeared so we can initialise the publisher */
WaitingForConnection = "waiting_for_connection",
/** Implies lk connection is connected */
CreatingTracks = "creating_tracks",
/** Implies lk connection is connected */
ReadyToPublish = "ready_to_publish",
/** Implies lk connection is connected */
WaitingToPublish = "waiting_to_publish",
/** Implies lk connection is connected */
ConnectedAndPublishing = "fully_connected",
} }
type LocalMemberRTCBackendState = export enum PublishState {
| { state: RTCBackendState.Error; error: ElementCallError } WaitingForUser = "publish_waiting_for_user",
| { state: Exclude<RTCBackendState, RTCBackendState.Error> } /** Implies lk connection is connected */
| ConnectionState; Starting = "publish_start_publishing",
/** Implies lk connection is connected */
export enum MatrixAdditionalState { Publishing = "publish_publishing",
WaitingForTransport = "waiting_for_transport",
} }
type LocalMemberMatrixState = export enum TrackState {
| { state: MatrixAdditionalState.WaitingForTransport } /** The track is waiting for user input to create tracks (waiting to call `startTracks()`) */
| { state: "Error"; error: Error } WaitingForUser = "tracks_waiting_for_user",
| { state: RTCSessionStatus }; /** Implies lk connection is connected */
Creating = "tracks_creating",
export interface LocalMemberConnectionState { /** Implies lk connection is connected */
livekit$: Behavior<LocalMemberRTCBackendState>; Ready = "tracks_ready",
matrix$: Behavior<LocalMemberMatrixState>;
} }
export type LocalMemberMediaState =
| {
tracks: TrackState;
connection: ConnectionState | FailedToStartError;
}
| PublishState
| ElementCallError;
export type LocalMemberMatrixState = Error | RTCSessionStatus;
export type LocalMemberState =
| ElementCallError
| TransportState.Waiting
| {
media: LocalMemberMediaState;
matrix: LocalMemberMatrixState;
};
/* /*
* - get well known * - get well known
* - get oldest membership * - get oldest membership
@@ -146,16 +149,16 @@ export const createLocalMembership$ = ({
matrixRTCSession, matrixRTCSession,
}: Props): { }: Props): {
/** /**
* This starts audio and video tracks. They will be reused when calling `requestConnect`. * This starts audio and video tracks. They will be reused when calling `requestPublish`.
*/ */
startTracks: () => Behavior<LocalTrack[]>; startTracks: () => Behavior<LocalTrack[]>;
/** /**
* This sets a inner state (shouldConnect) to true and instructs the js-sdk and livekit to keep the user * This sets a inner state (shouldPublish) to true and instructs the js-sdk and livekit to keep the user
* connected to matrix and livekit. * connected to matrix and livekit.
*/ */
requestConnect: () => void; requestJoinAndPublish: () => void;
requestDisconnect: () => void; requestDisconnect: () => void;
connectionState: LocalMemberConnectionState; localMemberState$: Behavior<LocalMemberState>;
sharingScreen$: Behavior<boolean>; sharingScreen$: Behavior<boolean>;
/** /**
* Callback to toggle screen sharing. If null, screen sharing is not possible. * Callback to toggle screen sharing. If null, screen sharing is not possible.
@@ -164,11 +167,11 @@ export const createLocalMembership$ = ({
tracks$: Behavior<LocalTrack[]>; tracks$: Behavior<LocalTrack[]>;
participant$: Behavior<LocalParticipant | null>; participant$: Behavior<LocalParticipant | null>;
connection$: Behavior<Connection | null>; connection$: Behavior<Connection | null>;
/** Shorthand for connectionState.matrix.state === Status.Reconnecting /** Shorthand for homeserverConnected.rtcSession === Status.Reconnecting
* Direct translation to the js-sdk membership manager connection `Status`. * Direct translation to the js-sdk membership manager connection `Status`.
*/ */
reconnecting$: Behavior<boolean>; reconnecting$: Behavior<boolean>;
/** Shorthand for connectionState.matrix.state === Status.Disconnected /** Shorthand for homeserverConnected.rtcSession === Status.Disconnected
* Direct translation to the js-sdk membership manager connection `Status`. * Direct translation to the js-sdk membership manager connection `Status`.
*/ */
disconnected$: Behavior<boolean>; disconnected$: Behavior<boolean>;
@@ -190,7 +193,7 @@ export const createLocalMembership$ = ({
: new Error("Unknown error from localTransport"), : new Error("Unknown error from localTransport"),
); );
} }
setLivekitError(error); setTransportError(error);
return of(null); return of(null);
}), }),
), ),
@@ -223,19 +226,13 @@ export const createLocalMembership$ = ({
// MATRIX RELATED // MATRIX RELATED
const reconnecting$ = scope.behavior(
homeserverConnected.rtsSession$.pipe(
map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting),
),
);
// This should be used in a combineLatest with publisher$ to connect. // This should be used in a combineLatest with publisher$ to connect.
// to make it possible to call startTracks before the preferredTransport$ has resolved. // to make it possible to call startTracks before the preferredTransport$ has resolved.
const trackStartRequested = Promise.withResolvers<void>(); const trackStartRequested = Promise.withResolvers<void>();
// This should be used in a combineLatest with publisher$ to connect. // This should be used in a combineLatest with publisher$ to connect.
// to make it possible to call startTracks before the preferredTransport$ has resolved. // to make it possible to call startTracks before the preferredTransport$ has resolved.
const connectRequested$ = new BehaviorSubject(false); const joinAndPublishRequested$ = new BehaviorSubject(false);
/** /**
* The publisher is stored in here an abstracts creating and publishing tracks. * The publisher is stored in here an abstracts creating and publishing tracks.
@@ -256,13 +253,13 @@ export const createLocalMembership$ = ({
return tracks$; return tracks$;
}; };
const requestConnect = (): void => { const requestJoinAndPublish = (): void => {
trackStartRequested.resolve(); trackStartRequested.resolve();
connectRequested$.next(true); joinAndPublishRequested$.next(true);
}; };
const requestDisconnect = (): void => { const requestDisconnect = (): void => {
connectRequested$.next(false); joinAndPublishRequested$.next(false);
}; };
// Take care of the publisher$ // Take care of the publisher$
@@ -300,112 +297,129 @@ export const createLocalMembership$ = ({
// Based on `connectRequested$` we start publishing tracks. (once they are there!) // Based on `connectRequested$` we start publishing tracks. (once they are there!)
scope.reconcile( scope.reconcile(
scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])), scope.behavior(
async ([publisher, tracks, shouldConnect]) => { combineLatest([publisher$, tracks$, joinAndPublishRequested$]),
if (shouldConnect === publisher?.publishing$.value) return; ),
if (tracks.length !== 0 && shouldConnect) { async ([publisher, tracks, shouldJoinAndPublish]) => {
if (shouldJoinAndPublish === publisher?.publishing$.value) return;
if (tracks.length !== 0 && shouldJoinAndPublish) {
try { try {
await publisher?.startPublishing(); await publisher?.startPublishing();
} catch (error) { } catch (error) {
setLivekitError(error as ElementCallError); setMediaError(error as ElementCallError);
} }
} else if (tracks.length !== 0 && !shouldConnect) { } else if (tracks.length !== 0 && !shouldJoinAndPublish) {
try { try {
await publisher?.stopPublishing(); await publisher?.stopPublishing();
} catch (error) { } catch (error) {
setLivekitError(new UnknownCallError(error as Error)); setMediaError(new UnknownCallError(error as Error));
} }
} }
}, },
); );
const fatalLivekitError$ = new BehaviorSubject<ElementCallError | null>(null); const fatalMediaError$ = new BehaviorSubject<ElementCallError | null>(null);
const setLivekitError = (e: ElementCallError): void => { const setMediaError = (e: ElementCallError): void => {
if (fatalLivekitError$.value !== null) if (fatalMediaError$.value !== null)
logger.error("Multiple Livkit Errors:", e); logger.error("Multiple Media Errors:", e);
else fatalLivekitError$.next(e); else fatalMediaError$.next(e);
}; };
const livekitState$: Behavior<LocalMemberRTCBackendState> = scope.behavior(
const fatalTransportError$ = new BehaviorSubject<ElementCallError | null>(
null,
);
const setTransportError = (e: ElementCallError): void => {
if (fatalTransportError$.value !== null)
logger.error("Multiple Transport Errors:", e);
else fatalTransportError$.next(e);
};
const mediaState$: Behavior<LocalMemberMediaState> = scope.behavior(
combineLatest([ combineLatest([
localConnectionState$, localConnectionState$,
publisher$,
localTransport$, localTransport$,
tracks$.pipe( tracks$,
tap((t) => {
logger.info("tracks$: ", t);
}),
),
publishing$, publishing$,
connectRequested$, joinAndPublishRequested$,
from(trackStartRequested.promise).pipe( from(trackStartRequested.promise).pipe(
map(() => true), map(() => true),
startWith(false), startWith(false),
), ),
fatalLivekitError$,
]).pipe( ]).pipe(
map( map(
([ ([
localConnectionState, localConnectionState,
publisher,
localTransport, localTransport,
tracks, tracks,
publishing, publishing,
shouldConnect, shouldPublish,
shouldStartTracks, shouldStartTracks,
error,
]) => { ]) => {
// read this: if (!localTransport) return null;
// if(!<A>) return {state: ...}
// if(!<B>) return {state: <MyState>}
//
// as:
// We do have <A> but not yet <B> so we are in <MyState>
if (error !== null) return { state: RTCBackendState.Error, error };
const hasTracks = tracks.length > 0; const hasTracks = tracks.length > 0;
if (!localTransport) let trackState: TrackState = TrackState.WaitingForUser;
return { state: RTCBackendState.WaitingForTransport }; if (hasTracks && shouldStartTracks) trackState = TrackState.Ready;
if (!localConnectionState) if (!hasTracks && shouldStartTracks) trackState = TrackState.Creating;
return { state: RTCBackendState.WaitingForConnection };
if ( if (
localConnectionState.state !== LivekitConnectionState.Connected || localConnectionState !== ConnectionState.LivekitConnected ||
!publisher trackState !== TrackState.Ready
) )
// pass through the localConnectionState while we do not yet have a publisher or the state return {
// of the connection is not yet connected connection: localConnectionState,
return { state: localConnectionState.state }; tracks: trackState,
if (!shouldStartTracks) };
return { state: LivekitConnectionState.Connected }; if (!shouldPublish) return PublishState.WaitingForUser;
if (!hasTracks) return { state: RTCBackendState.CreatingTracks }; if (!publishing) return PublishState.Starting;
if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish }; return PublishState.Publishing;
if (!publishing) return { state: RTCBackendState.WaitingToPublish };
return { state: RTCBackendState.ConnectedAndPublishing };
}, },
), ),
distinctUntilChanged(deepCompare), distinctUntilChanged(deepCompare),
), ),
); );
const fatalMatrixError$ = new BehaviorSubject<ElementCallError | null>(null); const fatalMatrixError$ = new BehaviorSubject<ElementCallError | null>(null);
const setMatrixError = (e: ElementCallError): void => { const setMatrixError = (e: ElementCallError): void => {
if (fatalMatrixError$.value !== null) if (fatalMatrixError$.value !== null)
logger.error("Multiple Matrix Errors:", e); logger.error("Multiple Matrix Errors:", e);
else fatalMatrixError$.next(e); else fatalMatrixError$.next(e);
}; };
const matrixState$: Behavior<LocalMemberMatrixState> = scope.behavior(
combineLatest([localTransport$, homeserverConnected.rtsSession$]).pipe( const localMemberState$ = scope.behavior<LocalMemberState>(
map(([localTransport, rtcSessionStatus]) => { combineLatest([
if (!localTransport) mediaState$,
return { state: MatrixAdditionalState.WaitingForTransport }; homeserverConnected.rtsSession$,
return { state: rtcSessionStatus }; fatalMatrixError$,
}), fatalTransportError$,
fatalMediaError$,
]).pipe(
map(
([
mediaState,
rtcSessionStatus,
matrixError,
transportError,
mediaError,
]) => {
if (transportError !== null) return transportError;
// `mediaState` will be 'null' until the transport appears.
if (mediaState && rtcSessionStatus)
return {
matrix: matrixError ?? rtcSessionStatus,
media: mediaError ?? mediaState,
};
else {
return TransportState.Waiting;
}
},
),
), ),
); );
// inform the widget about the connect and disconnect intent from the user. // inform the widget about the connect and disconnect intent from the user.
scope scope
.behavior(connectRequested$.pipe(pairwise(), scope.bind()), [ .behavior(joinAndPublishRequested$.pipe(pairwise(), scope.bind()), [
undefined, undefined,
connectRequested$.value, joinAndPublishRequested$.value,
]) ])
.subscribe(([prev, current]) => { .subscribe(([prev, current]) => {
if (!widget) return; if (!widget) return;
@@ -434,7 +448,7 @@ export const createLocalMembership$ = ({
// Keep matrix rtc session in sync with localTransport$, connectRequested$ // Keep matrix rtc session in sync with localTransport$, connectRequested$
scope.reconcile( scope.reconcile(
scope.behavior(combineLatest([localTransport$, connectRequested$])), scope.behavior(combineLatest([localTransport$, joinAndPublishRequested$])),
async ([transport, shouldConnect]) => { async ([transport, shouldConnect]) => {
if (!transport) return; if (!transport) return;
// if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration. // if shouldConnect=false we will do the disconnect as the cleanup from the previous reconcile iteration.
@@ -555,21 +569,19 @@ export const createLocalMembership$ = ({
return { return {
startTracks, startTracks,
requestConnect, requestJoinAndPublish,
requestDisconnect, requestDisconnect,
connectionState: { localMemberState$,
livekit$: livekitState$,
matrix$: matrixState$,
},
tracks$, tracks$,
participant$, participant$,
reconnecting$, reconnecting$: scope.behavior(
homeserverConnected.rtsSession$.pipe(
map((sessionStatus) => sessionStatus === RTCSessionStatus.Reconnecting),
),
),
disconnected$: scope.behavior( disconnected$: scope.behavior(
matrixState$.pipe( homeserverConnected.rtsSession$.pipe(
map( map((state) => state === RTCSessionStatus.Disconnected),
(sessionStatus) =>
sessionStatus.state === RTCSessionStatus.Disconnected,
),
), ),
), ),
sharingScreen$, sharingScreen$,

View File

@@ -52,9 +52,7 @@ describe("Publisher", () => {
} as unknown as MuteStates; } as unknown as MuteStates;
scope = new ObservableScope(); scope = new ObservableScope();
connection = { connection = {
state$: constant({ state$: constant(LivekitConenctionState.Connected),
state: LivekitConenctionState.Connected,
}),
livekitRoom: mockLivekitRoom({ livekitRoom: mockLivekitRoom({
localParticipant: mockLocalParticipant({}), localParticipant: mockLocalParticipant({}),
}), }),
@@ -110,15 +108,14 @@ describe("Publisher", () => {
// failiour due to connection.state$ // failiour due to connection.state$
const beforeState = connection.state$.value; const beforeState = connection.state$.value;
(connection.state$ as BehaviorSubject<ConnectionState>).next({ (connection.state$ as BehaviorSubject<Error>).next(Error("testStartError"));
state: "FailedToStart",
error: Error("testStartError"),
});
await expect(publisher.startPublishing()).rejects.toThrow( await expect(publisher.startPublishing()).rejects.toThrow(
new FailToStartLivekitConnection("testStartError"), new FailToStartLivekitConnection("testStartError"),
); );
(connection.state$ as BehaviorSubject<ConnectionState>).next(beforeState); (connection.state$ as BehaviorSubject<ConnectionState | Error>).next(
beforeState,
);
// does not try other conenction after the first one failed // does not try other conenction after the first one failed
expect( expect(

View File

@@ -32,7 +32,10 @@ import {
} from "../../../livekit/TrackProcessorContext.tsx"; } from "../../../livekit/TrackProcessorContext.tsx";
import { getUrlParams } from "../../../UrlParams.ts"; import { getUrlParams } from "../../../UrlParams.ts";
import { observeTrackReference$ } from "../../MediaViewModel.ts"; import { observeTrackReference$ } from "../../MediaViewModel.ts";
import { type Connection } from "../remoteMembers/Connection.ts"; import {
ConnectionState,
type Connection,
} from "../remoteMembers/Connection.ts";
import { type ObservableScope } from "../../ObservableScope.ts"; import { type ObservableScope } from "../../ObservableScope.ts";
import { import {
ElementCallError, ElementCallError,
@@ -158,20 +161,17 @@ export class Publisher {
this.logger.debug("startPublishing called"); this.logger.debug("startPublishing called");
const lkRoom = this.connection.livekitRoom; const lkRoom = this.connection.livekitRoom;
const { promise, resolve, reject } = Promise.withResolvers<void>(); const { promise, resolve, reject } = Promise.withResolvers<void>();
const sub = this.connection.state$.subscribe((s) => { const sub = this.connection.state$.subscribe((state) => {
switch (s.state) { if (state instanceof Error) {
case LivekitConnectionState.Connected: const error =
resolve(); state instanceof ElementCallError
break; ? state
case "FailedToStart": : new FailToStartLivekitConnection(state.message);
reject( reject(error);
s.error instanceof ElementCallError } else if (state === ConnectionState.LivekitConnected) {
? s.error resolve();
: new FailToStartLivekitConnection(s.error.message), } else {
); this.logger.info("waiting for connection: ", state);
break;
default:
this.logger.info("waiting for connection: ", s.state);
} }
}); });
try { try {

View File

@@ -30,8 +30,8 @@ import { logger } from "matrix-js-sdk/lib/logger";
import type { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc"; import type { LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { import {
Connection, Connection,
ConnectionState,
type ConnectionOpts, type ConnectionOpts,
type ConnectionState,
type PublishingParticipant, type PublishingParticipant,
} from "./Connection.ts"; } from "./Connection.ts";
import { ObservableScope } from "../../ObservableScope.ts"; import { ObservableScope } from "../../ObservableScope.ts";
@@ -151,7 +151,7 @@ describe("Start connection states", () => {
}; };
const connection = new Connection(opts, logger); const connection = new Connection(opts, logger);
expect(connection.state$.getValue().state).toEqual("Initialized"); expect(connection.state$.getValue()).toEqual("Initialized");
}); });
it("fail to getOpenId token then error state", async () => { it("fail to getOpenId token then error state", async () => {
@@ -167,7 +167,7 @@ describe("Start connection states", () => {
const connection = new Connection(opts, logger); const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = []; const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => { const s = connection.state$.subscribe((value) => {
capturedStates.push(value); capturedStates.push(value);
}); });
@@ -187,22 +187,20 @@ describe("Start connection states", () => {
let capturedState = capturedStates.pop(); let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined(); expect(capturedState).toBeDefined();
expect(capturedState!.state).toEqual("FetchingConfig"); expect(capturedState!).toEqual("FetchingConfig");
deferred.reject(new FailToGetOpenIdToken(new Error("Failed to get token"))); deferred.reject(new FailToGetOpenIdToken(new Error("Failed to get token")));
await vi.runAllTimersAsync(); await vi.runAllTimersAsync();
capturedState = capturedStates.pop(); capturedState = capturedStates.pop();
if (capturedState!.state === "FailedToStart") { if (capturedState instanceof Error) {
expect(capturedState!.error.message).toEqual("Something went wrong"); expect(capturedState.message).toEqual("Something went wrong");
expect(connection.transport.livekit_alias).toEqual( expect(connection.transport.livekit_alias).toEqual(
livekitFocus.livekit_alias, livekitFocus.livekit_alias,
); );
} else { } else {
expect.fail( expect.fail("Expected FailedToStart state but got " + capturedState);
"Expected FailedToStart state but got " + capturedState?.state,
);
} }
}); });
@@ -219,7 +217,7 @@ describe("Start connection states", () => {
const connection = new Connection(opts, logger); const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = []; const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => { const s = connection.state$.subscribe((value) => {
capturedStates.push(value); capturedStates.push(value);
}); });
@@ -241,24 +239,22 @@ describe("Start connection states", () => {
let capturedState = capturedStates.pop(); let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined(); expect(capturedState).toBeDefined();
expect(capturedState?.state).toEqual("FetchingConfig"); expect(capturedState).toEqual(ConnectionState.FetchingConfig);
deferredSFU.resolve(); deferredSFU.resolve();
await vi.runAllTimersAsync(); await vi.runAllTimersAsync();
capturedState = capturedStates.pop(); capturedState = capturedStates.pop();
if (capturedState?.state === "FailedToStart") { if (capturedState instanceof Error) {
expect(capturedState?.error.message).toContain( expect(capturedState.message).toContain(
"SFU Config fetch failed with exception Error", "SFU Config fetch failed with exception Error",
); );
expect(connection.transport.livekit_alias).toEqual( expect(connection.transport.livekit_alias).toEqual(
livekitFocus.livekit_alias, livekitFocus.livekit_alias,
); );
} else { } else {
expect.fail( expect.fail("Expected FailedToStart state but got " + capturedState);
"Expected FailedToStart state but got " + capturedState?.state,
);
} }
}); });
@@ -275,7 +271,7 @@ describe("Start connection states", () => {
const connection = new Connection(opts, logger); const connection = new Connection(opts, logger);
const capturedStates: ConnectionState[] = []; const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => { const s = connection.state$.subscribe((value) => {
capturedStates.push(value); capturedStates.push(value);
}); });
@@ -305,17 +301,15 @@ describe("Start connection states", () => {
let capturedState = capturedStates.pop(); let capturedState = capturedStates.pop();
expect(capturedState).toBeDefined(); expect(capturedState).toBeDefined();
expect(capturedState?.state).toEqual("FetchingConfig"); expect(capturedState).toEqual(ConnectionState.FetchingConfig);
deferredSFU.resolve(); deferredSFU.resolve();
await vi.runAllTimersAsync(); await vi.runAllTimersAsync();
capturedState = capturedStates.pop(); capturedState = capturedStates.pop();
if (capturedState && capturedState.state === "FailedToStart") { if (capturedState instanceof Error) {
expect(capturedState.error.message).toContain( expect(capturedState.message).toContain("Failed to connect to livekit");
"Failed to connect to livekit",
);
expect(connection.transport.livekit_alias).toEqual( expect(connection.transport.livekit_alias).toEqual(
livekitFocus.livekit_alias, livekitFocus.livekit_alias,
); );
@@ -332,7 +326,7 @@ describe("Start connection states", () => {
const connection = setupRemoteConnection(); const connection = setupRemoteConnection();
const capturedStates: ConnectionState[] = []; const capturedStates: (ConnectionState | Error)[] = [];
const s = connection.state$.subscribe((value) => { const s = connection.state$.subscribe((value) => {
capturedStates.push(value); capturedStates.push(value);
}); });
@@ -342,13 +336,13 @@ describe("Start connection states", () => {
await vi.runAllTimersAsync(); await vi.runAllTimersAsync();
const initialState = capturedStates.shift(); const initialState = capturedStates.shift();
expect(initialState?.state).toEqual("Initialized"); expect(initialState).toEqual(ConnectionState.Initialized);
const fetchingState = capturedStates.shift(); const fetchingState = capturedStates.shift();
expect(fetchingState?.state).toEqual("FetchingConfig"); expect(fetchingState).toEqual(ConnectionState.FetchingConfig);
const connectingState = capturedStates.shift(); const connectingState = capturedStates.shift();
expect(connectingState?.state).toEqual("ConnectingToLkRoom"); expect(connectingState).toEqual(ConnectionState.ConnectingToLkRoom);
const connectedState = capturedStates.shift(); const connectedState = capturedStates.shift();
expect(connectedState?.state).toEqual("connected"); expect(connectedState).toEqual(ConnectionState.LivekitConnected);
}); });
it("shutting down the scope should stop the connection", async () => { it("shutting down the scope should stop the connection", async () => {

View File

@@ -12,7 +12,6 @@ import {
} from "@livekit/components-core"; } from "@livekit/components-core";
import { import {
ConnectionError, ConnectionError,
ConnectionState as LivekitConnectionState,
type Room as LivekitRoom, type Room as LivekitRoom,
type LocalParticipant, type LocalParticipant,
type RemoteParticipant, type RemoteParticipant,
@@ -55,14 +54,21 @@ export class FailedToStartError extends Error {
} }
export enum ConnectionState { export enum ConnectionState {
/** The start state of a connection. It has been created but nothing has loaded yet. */
Initialized = "Initialized", Initialized = "Initialized",
/** `start` has been called on the connection. It aquires the jwt info to conenct to the LK Room */
FetchingConfig = "FetchingConfig", FetchingConfig = "FetchingConfig",
Stopped = "Stopped", Stopped = "Stopped",
ConnectingToLkRoom = "ConnectingToLkRoom", ConnectingToLkRoom = "ConnectingToLkRoom",
/** The same as ConnectionState.Disconnected from `livekit-client` */
LivekitDisconnected = "disconnected", LivekitDisconnected = "disconnected",
/** The same as ConnectionState.Connecting from `livekit-client` */
LivekitConnecting = "connecting", LivekitConnecting = "connecting",
/** The same as ConnectionState.Connected from `livekit-client` */
LivekitConnected = "connected", LivekitConnected = "connected",
/** The same as ConnectionState.Reconnecting from `livekit-client` */
LivekitReconnecting = "reconnecting", LivekitReconnecting = "reconnecting",
/** The same as ConnectionState.SignalReconnecting from `livekit-client` */
LivekitSignalReconnecting = "signalReconnecting", LivekitSignalReconnecting = "signalReconnecting",
} }
@@ -73,15 +79,14 @@ export enum ConnectionState {
*/ */
export class Connection { export class Connection {
// Private Behavior // Private Behavior
private readonly _state$ = new BehaviorSubject< private readonly _state$ = new BehaviorSubject<ConnectionState | Error>(
ConnectionState | FailedToStartError ConnectionState.Initialized,
>(ConnectionState.Initialized); );
/** /**
* The current state of the connection to the media transport. * The current state of the connection to the media transport.
*/ */
public readonly state$: Behavior<ConnectionState | FailedToStartError> = public readonly state$: Behavior<ConnectionState | Error> = this._state$;
this._state$;
/** /**
* The media transport to connect to. * The media transport to connect to.
@@ -161,15 +166,12 @@ export class Connection {
connectionStateObserver(this.livekitRoom) connectionStateObserver(this.livekitRoom)
.pipe(this.scope.bind()) .pipe(this.scope.bind())
.subscribe((lkState) => { .subscribe((lkState) => {
// It si save to cast lkState to ConnectionState as they are fully overlapping. // It is save to cast lkState to ConnectionState as they are fully overlapping.
this._state$.next(lkState as unknown as ConnectionState); this._state$.next(lkState as unknown as ConnectionState);
}); });
} catch (error) { } catch (error) {
this.logger.debug(`Failed to connect to LiveKit room: ${error}`); this.logger.debug(`Failed to connect to LiveKit room: ${error}`);
this._state$.next({ this._state$.next(error instanceof Error ? error : new Error(`${error}`));
state: "FailedToStart",
error: error instanceof Error ? error : new Error(`${error}`),
});
throw error; throw error;
} }
} }
@@ -194,9 +196,7 @@ export class Connection {
); );
if (this.stopped) return; if (this.stopped) return;
await this.livekitRoom.disconnect(); await this.livekitRoom.disconnect();
this._state$.next({ this._state$.next(ConnectionState.Stopped);
state: ConnectionAdditionalState.Stopped,
});
this.stopped = true; this.stopped = true;
} }

View File

@@ -10353,8 +10353,8 @@ __metadata:
linkType: hard linkType: hard
"matrix-js-sdk@npm:^39.2.0": "matrix-js-sdk@npm:^39.2.0":
version: 39.2.0 version: 39.3.0
resolution: "matrix-js-sdk@npm:39.2.0" resolution: "matrix-js-sdk@npm:39.3.0"
dependencies: dependencies:
"@babel/runtime": "npm:^7.12.5" "@babel/runtime": "npm:^7.12.5"
"@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.3.0" "@matrix-org/matrix-sdk-crypto-wasm": "npm:^15.3.0"
@@ -10370,7 +10370,7 @@ __metadata:
sdp-transform: "npm:^3.0.0" sdp-transform: "npm:^3.0.0"
unhomoglyph: "npm:^1.0.6" unhomoglyph: "npm:^1.0.6"
uuid: "npm:13" uuid: "npm:13"
checksum: 10c0/f8b5261de2744305330ba3952821ca9303698170bfd3a0ff8a767b9286d4e8d4ed5aaf6fbaf8a1e8ff9dbd859102a2a47d882787e2da3b3078965bec00157959 checksum: 10c0/031c9ec042e00c32dc531f82fc59c64cc25fb665abfc642b1f0765c530d60684f8bd63daf0cdd0dbe96b4f87ea3f4148f9d3f024a59d57eceaec1ce5d0164755
languageName: node languageName: node
linkType: hard linkType: hard