refactor local transport testing and local memberhsip initialization

This commit is contained in:
Timo K
2025-11-20 14:42:12 +01:00
parent f6ef872242
commit 1fd9ac93c9
15 changed files with 571 additions and 378 deletions

View File

@@ -21,7 +21,14 @@ export type OpenIDClientParts = Pick<
MatrixClient, MatrixClient,
"getOpenIdToken" | "getDeviceId" "getOpenIdToken" | "getDeviceId"
>; >;
/**
*
* @param client
* @param serviceUrl
* @param matrixRoomId
* @returns
* @throws FailToGetOpenIdToken
*/
export async function getSFUConfigWithOpenID( export async function getSFUConfigWithOpenID(
client: OpenIDClientParts, client: OpenIDClientParts,
serviceUrl: string, serviceUrl: string,

View File

@@ -159,6 +159,7 @@ export const GroupCallView: FC<Props> = ({
}; };
}, [rtcSession]); }, [rtcSession]);
// TODO move this into the callViewModel LocalMembership.ts
useTypedEventEmitter( useTypedEventEmitter(
rtcSession, rtcSession,
MatrixRTCSessionEvent.MembershipManagerError, MatrixRTCSessionEvent.MembershipManagerError,

View File

@@ -266,7 +266,7 @@ export const InCallView: FC<InCallViewProps> = ({
const sharingScreen = useBehavior(vm.sharingScreen$); const sharingScreen = useBehavior(vm.sharingScreen$);
const ringOverlay = useBehavior(vm.ringOverlay$); const ringOverlay = useBehavior(vm.ringOverlay$);
const fatalCallError = useBehavior(vm.configError$); const fatalCallError = useBehavior(vm.fatalError$);
// Stop the rendering and throw for the error boundary // Stop the rendering and throw for the error boundary
if (fatalCallError) throw fatalCallError; if (fatalCallError) throw fatalCallError;

View File

@@ -16,7 +16,10 @@ import { BehaviorSubject } from "rxjs";
* distinction between Behaviors and Observables, see * distinction between Behaviors and Observables, see
* https://monoid.dk/post/behaviors-and-streams-why-both/. * https://monoid.dk/post/behaviors-and-streams-why-both/.
*/ */
export type Behavior<T> = Omit<BehaviorSubject<T>, "next" | "observers">; export type Behavior<T> = Omit<
BehaviorSubject<T>,
"next" | "observers" | "error"
>;
/** /**
* Creates a Behavior which never changes in value. * Creates a Behavior which never changes in value.

View File

@@ -6,8 +6,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details. Please see LICENSE in the repository root for full details.
*/ */
import { test, vi, onTestFinished, it, describe, expect } from "vitest"; import { test, vi, onTestFinished, it, describe } from "vitest";
import EventEmitter from "events";
import { import {
BehaviorSubject, BehaviorSubject,
combineLatest, combineLatest,
@@ -19,12 +18,11 @@ import {
of, of,
switchMap, switchMap,
} from "rxjs"; } from "rxjs";
import { SyncState, type MatrixClient } from "matrix-js-sdk"; import { SyncState } from "matrix-js-sdk";
import { import {
ConnectionState, ConnectionState,
type LocalTrackPublication, type LocalTrackPublication,
type RemoteParticipant, type RemoteParticipant,
type Room as LivekitRoom,
} from "livekit-client"; } from "livekit-client";
import * as ComponentsCore from "@livekit/components-core"; import * as ComponentsCore from "@livekit/components-core";
import { import {
@@ -36,27 +34,18 @@ import {
type LivekitTransport, type LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { deepCompare } from "matrix-js-sdk/lib/utils"; import { deepCompare } from "matrix-js-sdk/lib/utils";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import { createCallViewModel$ } from "./CallViewModel";
import { type Layout } from "../layout-types.ts"; import { type Layout } from "../layout-types.ts";
import { import {
mockLocalParticipant, mockLocalParticipant,
mockMatrixRoom,
mockMatrixRoomMember, mockMatrixRoomMember,
mockRemoteParticipant, mockRemoteParticipant,
withTestScheduler, withTestScheduler,
mockRtcMembership, mockRtcMembership,
MockRTCSession,
mockMediaDevices,
mockMuteStates,
mockConfig,
testScope, testScope,
mockLivekitRoom,
exampleTransport, exampleTransport,
} from "../../utils/test.ts"; } from "../../utils/test.ts";
import { E2eeType } from "../../e2ee/e2eeType.ts"; import { E2eeType } from "../../e2ee/e2eeType.ts";
import type { RaisedHandInfo, ReactionInfo } from "../../reactions/index.ts";
import { import {
aliceId, aliceId,
aliceParticipant, aliceParticipant,
@@ -71,10 +60,6 @@ import {
import { MediaDevices } from "../MediaDevices.ts"; import { MediaDevices } from "../MediaDevices.ts";
import { getValue } from "../../utils/observable.ts"; import { getValue } from "../../utils/observable.ts";
import { type Behavior, constant } from "../Behavior.ts"; import { type Behavior, constant } from "../Behavior.ts";
import {
type ElementCallError,
MatrixRTCTransportMissingError,
} from "../../utils/errors.ts";
import { withCallViewModel } from "./CallViewModelTestUtils.ts"; import { withCallViewModel } from "./CallViewModelTestUtils.ts";
vi.mock("rxjs", async (importOriginal) => ({ vi.mock("rxjs", async (importOriginal) => ({
@@ -245,71 +230,6 @@ function mockRingEvent(
const mockLegacyRingEvent = {} as { event_id: string } & ICallNotifyContent; const mockLegacyRingEvent = {} as { event_id: string } & ICallNotifyContent;
describe("CallViewModel", () => { describe("CallViewModel", () => {
// TODO: Restore this test. It requires makeTransport to not be mocked, unlike
// the rest of the tests in this file… what do we do?
it.skip("test missing RTC config error", async () => {
const rtcMemberships$ = new BehaviorSubject<CallMembership[]>([]);
const emitter = new EventEmitter();
const client = vi.mocked<MatrixClient>({
on: emitter.on.bind(emitter),
off: emitter.off.bind(emitter),
getSyncState: vi.fn().mockReturnValue(SyncState.Syncing),
getUserId: vi.fn().mockReturnValue("@user:localhost"),
getUser: vi.fn().mockReturnValue(null),
getDeviceId: vi.fn().mockReturnValue("DEVICE"),
credentials: {
userId: "@user:localhost",
},
getCrypto: vi.fn().mockReturnValue(undefined),
getDomain: vi.fn().mockReturnValue("example.org"),
} as unknown as MatrixClient);
const matrixRoom = mockMatrixRoom({
roomId: "!myRoomId:example.com",
client,
getMember: vi.fn().mockReturnValue(undefined),
});
const fakeRtcSession = new MockRTCSession(matrixRoom).withMemberships(
rtcMemberships$,
);
mockConfig({});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({});
const callVM = createCallViewModel$(
testScope(),
fakeRtcSession.asMockedSession(),
matrixRoom,
mockMediaDevices({}),
mockMuteStates(),
{
encryptionSystem: { kind: E2eeType.PER_PARTICIPANT },
autoLeaveWhenOthersLeft: false,
livekitRoomFactory: (): LivekitRoom =>
mockLivekitRoom({
localParticipant,
disconnect: async () => Promise.resolve(),
setE2EEEnabled: async () => Promise.resolve(),
}),
},
new BehaviorSubject({} as Record<string, RaisedHandInfo>),
new BehaviorSubject({} as Record<string, ReactionInfo>),
constant({ processor: undefined, supported: false }),
);
const failPromise = Promise.withResolvers<ElementCallError>();
callVM.configError$.subscribe((error) => {
if (error) {
failPromise.resolve(error);
}
});
const error = await failPromise.promise;
expect(error).toBeInstanceOf(MatrixRTCTransportMissingError);
});
test("participants are retained during a focus switch", () => { test("participants are retained during a focus switch", () => {
withTestScheduler(({ behavior, expectObservable }) => { withTestScheduler(({ behavior, expectObservable }) => {
// Participants disappear on frame 2 and come back on frame 3 // Participants disappear on frame 2 and come back on frame 3

View File

@@ -41,7 +41,10 @@ import {
timer, timer,
} from "rxjs"; } from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; import {
type LivekitTransport,
type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc";
import { type IWidgetApiRequest } from "matrix-widget-api"; import { type IWidgetApiRequest } from "matrix-widget-api";
import { import {
@@ -95,7 +98,10 @@ import {
import { type ElementCallError } from "../../utils/errors.ts"; import { type ElementCallError } from "../../utils/errors.ts";
import { type ObservableScope } from "../ObservableScope.ts"; import { type ObservableScope } from "../ObservableScope.ts";
import { import {
createHomeserverConnected$,
createLocalMembership$, createLocalMembership$,
enterRTCSession,
LivekitState,
type LocalMemberConnectionState, type LocalMemberConnectionState,
} from "./localMember/LocalMembership.ts"; } from "./localMember/LocalMembership.ts";
import { createLocalTransport$ } from "./localMember/LocalTransport.ts"; import { createLocalTransport$ } from "./localMember/LocalTransport.ts";
@@ -120,6 +126,8 @@ import {
createMatrixMemberMetadata$, createMatrixMemberMetadata$,
createRoomMembers$, createRoomMembers$,
} from "./remoteMembers/MatrixMemberMetadata.ts"; } from "./remoteMembers/MatrixMemberMetadata.ts";
import { Publisher } from "./localMember/Publisher.ts";
import { type Connection } from "./remoteMembers/Connection.ts";
const logger = rootLogger.getChild("[CallViewModel]"); const logger = rootLogger.getChild("[CallViewModel]");
//TODO //TODO
@@ -230,7 +238,7 @@ export interface CallViewModel {
* This is a fatal error that prevents the call from being created/joined. * This is a fatal error that prevents the call from being created/joined.
* Should render a blocking error screen. * Should render a blocking error screen.
*/ */
configError$: Behavior<ElementCallError | null>; fatalError$: Behavior<ElementCallError | null>;
// participants and counts // participants and counts
/** /**
@@ -446,15 +454,31 @@ export function createCallViewModel$(
const localMembership = createLocalMembership$({ const localMembership = createLocalMembership$({
scope: scope, scope: scope,
homeserverConnected$: createHomeserverConnected$(
scope,
matrixRoom,
matrixRTCSession,
),
muteStates: muteStates, muteStates: muteStates,
mediaDevices: mediaDevices, joinMatrixRTC: async (transport: LivekitTransport) => {
return enterRTCSession(
matrixRTCSession,
transport,
connectOptions$.value,
);
},
createPublisherFactory: (connection: Connection) => {
return new Publisher(
scope,
connection,
mediaDevices,
muteStates,
trackProcessorState$,
);
},
connectionManager: connectionManager, connectionManager: connectionManager,
matrixRTCSession: matrixRTCSession, matrixRTCSession: matrixRTCSession,
matrixRoom: matrixRoom,
localTransport$: localTransport$, localTransport$: localTransport$,
trackProcessorState$: trackProcessorState$,
widget,
options: connectOptions$,
logger: logger.getChild(`[${Date.now()}]`), logger: logger.getChild(`[${Date.now()}]`),
}); });
@@ -1442,7 +1466,14 @@ export function createCallViewModel$(
hoverScreen: (): void => screenHover$.next(), hoverScreen: (): void => screenHover$.next(),
unhoverScreen: (): void => screenUnhover$.next(), unhoverScreen: (): void => screenUnhover$.next(),
configError$: localMembership.configError$, fatalError$: scope.behavior(
localMembership.connectionState.livekit$.pipe(
filter((v) => v.state === LivekitState.Error),
map((s) => s.error),
),
null,
),
participantCount$: participantCount$, participantCount$: participantCount$,
audioParticipants$: audioParticipants$, audioParticipants$: audioParticipants$,

View File

@@ -6,154 +6,234 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details. Please see LICENSE in the repository root for full details.
*/ */
import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; import {
import { expect, test, vi } from "vitest"; type LivekitTransport,
type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc";
import { describe, expect, it, vi } from "vitest";
import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery"; import { AutoDiscovery } from "matrix-js-sdk/lib/autodiscovery";
import EventEmitter from "events"; import { map } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";
import { MatrixRTCMode } from "../../../settings/settings"; import { MatrixRTCMode } from "../../../settings/settings";
import { mockConfig } from "../../../utils/test"; import {
import { enterRTCSession } from "./LocalMembership"; mockConfig,
mockMuteStates,
withTestScheduler,
} from "../../../utils/test";
import {
createLocalMembership$,
enterRTCSession,
LivekitState,
} from "./LocalMembership";
import { MatrixRTCTransportMissingError } from "../../../utils/errors";
import { Epoch } from "../../ObservableScope";
import { constant } from "../../Behavior";
import { ConnectionManagerData } from "../remoteMembers/ConnectionManager";
import { type Publisher } from "./Publisher";
const MATRIX_RTC_MODE = MatrixRTCMode.Legacy; const MATRIX_RTC_MODE = MatrixRTCMode.Legacy;
const getUrlParams = vi.hoisted(() => vi.fn(() => ({}))); const getUrlParams = vi.hoisted(() => vi.fn(() => ({})));
vi.mock("../../../UrlParams", () => ({ getUrlParams })); vi.mock("../../../UrlParams", () => ({ getUrlParams }));
vi.mock("../../../widget", async (importOriginal) => ({ // vi.mock("../../../widget", async (importOriginal) => ({
...(await importOriginal()), // ...(await importOriginal()),
widget: { // widget: {
api: { // api: {
setAlwaysOnScreen: (): void => {}, // setAlwaysOnScreen: (): void => {},
transport: { send: vi.fn(), reply: vi.fn(), stop: vi.fn() }, // transport: { send: vi.fn(), reply: vi.fn(), stop: vi.fn() },
}, // },
lazyActions: new EventEmitter(), // lazyActions: new EventEmitter(),
}, // },
})); // }));
describe("LocalMembership", () => {
test("It joins the correct Session", async () => { describe("enterRTCSession", () => {
const focusFromOlderMembership = { it("It joins the correct Session", async () => {
type: "livekit", const focusFromOlderMembership = {
livekit_service_url: "http://my-oldest-member-service-url.com",
livekit_alias: "my-oldest-member-service-alias",
};
const focusConfigFromWellKnown = {
type: "livekit",
livekit_service_url: "http://my-well-known-service-url.com",
};
const focusConfigFromWellKnown2 = {
type: "livekit",
livekit_service_url: "http://my-well-known-service-url2.com",
};
const clientWellKnown = {
"org.matrix.msc4143.rtc_foci": [
focusConfigFromWellKnown,
focusConfigFromWellKnown2,
],
};
mockConfig({
livekit: { livekit_service_url: "http://my-default-service-url.com" },
});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockImplementation(
async (domain) => {
if (domain === "example.org") {
return Promise.resolve(clientWellKnown);
}
return Promise.resolve({});
},
);
const mockedSession = vi.mocked({
room: {
roomId: "roomId",
client: {
getDomain: vi.fn().mockReturnValue("example.org"),
getOpenIdToken: vi.fn().mockResolvedValue({
access_token: "ACCCESS_TOKEN",
token_type: "Bearer",
matrix_server_name: "localhost",
expires_in: 10000,
}),
},
},
memberships: [],
getFocusInUse: vi.fn().mockReturnValue(focusFromOlderMembership),
getOldestMembership: vi.fn().mockReturnValue({
getPreferredFoci: vi.fn().mockReturnValue([focusFromOlderMembership]),
}),
joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession;
await enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
{
encryptMedia: true,
matrixRTCMode: MATRIX_RTC_MODE,
},
);
expect(mockedSession.joinRoomSession).toHaveBeenLastCalledWith(
[
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit", type: "livekit",
}, livekit_service_url: "http://my-oldest-member-service-url.com",
], livekit_alias: "my-oldest-member-service-alias",
undefined, };
expect.objectContaining({
manageMediaKeys: true,
useLegacyMemberEvents: false,
}),
);
});
test("It should not fail with configuration error if homeserver config has livekit url but not fallback", async () => { const focusConfigFromWellKnown = {
mockConfig({});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({
"org.matrix.msc4143.rtc_foci": [
{
type: "livekit", type: "livekit",
livekit_service_url: "http://my-well-known-service-url.com", livekit_service_url: "http://my-well-known-service-url.com",
}, };
], const focusConfigFromWellKnown2 = {
type: "livekit",
livekit_service_url: "http://my-well-known-service-url2.com",
};
const clientWellKnown = {
"org.matrix.msc4143.rtc_foci": [
focusConfigFromWellKnown,
focusConfigFromWellKnown2,
],
};
mockConfig({
livekit: { livekit_service_url: "http://my-default-service-url.com" },
});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockImplementation(
async (domain) => {
if (domain === "example.org") {
return Promise.resolve(clientWellKnown);
}
return Promise.resolve({});
},
);
const mockedSession = vi.mocked({
room: {
roomId: "roomId",
client: {
getDomain: vi.fn().mockReturnValue("example.org"),
getOpenIdToken: vi.fn().mockResolvedValue({
access_token: "ACCCESS_TOKEN",
token_type: "Bearer",
matrix_server_name: "localhost",
expires_in: 10000,
}),
},
},
memberships: [],
getFocusInUse: vi.fn().mockReturnValue(focusFromOlderMembership),
getOldestMembership: vi.fn().mockReturnValue({
getPreferredFoci: vi.fn().mockReturnValue([focusFromOlderMembership]),
}),
joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession;
await enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
{
encryptMedia: true,
matrixRTCMode: MATRIX_RTC_MODE,
},
);
expect(mockedSession.joinRoomSession).toHaveBeenLastCalledWith(
[
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
],
undefined,
expect.objectContaining({
manageMediaKeys: true,
useLegacyMemberEvents: false,
}),
);
});
it("It should not fail with configuration error if homeserver config has livekit url but not fallback", async () => {
mockConfig({});
vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({
"org.matrix.msc4143.rtc_foci": [
{
type: "livekit",
livekit_service_url: "http://my-well-known-service-url.com",
},
],
});
const mockedSession = vi.mocked({
room: {
roomId: "roomId",
client: {
getDomain: vi.fn().mockReturnValue("example.org"),
getOpenIdToken: vi.fn().mockResolvedValue({
access_token: "ACCCESS_TOKEN",
token_type: "Bearer",
matrix_server_name: "localhost",
expires_in: 10000,
}),
},
},
memberships: [],
getFocusInUse: vi.fn(),
joinRoomSession: vi.fn(),
}) as unknown as MatrixRTCSession;
await enterRTCSession(
mockedSession,
{
livekit_alias: "roomId",
livekit_service_url: "http://my-well-known-service-url.com",
type: "livekit",
},
{
encryptMedia: true,
matrixRTCMode: MATRIX_RTC_MODE,
},
);
});
}); });
const mockedSession = vi.mocked({ const defaultCreateLocalMemberValues = {
room: { options: constant({
roomId: "roomId", encryptMedia: false,
client: { matrixRTCMode: MatrixRTCMode.Matrix_2_0,
getDomain: vi.fn().mockReturnValue("example.org"), }),
getOpenIdToken: vi.fn().mockResolvedValue({ matrixRTCSession: {
access_token: "ACCCESS_TOKEN", updateCallIntent: () => {},
token_type: "Bearer", leaveRoomSession: () => {},
matrix_server_name: "localhost", } as unknown as MatrixRTCSession,
expires_in: 10000, muteStates: mockMuteStates(),
}), isHomeserverConnected: constant(true),
}, trackProcessorState$: constant({
}, supported: false,
memberships: [], processor: undefined,
getFocusInUse: vi.fn(), }),
joinRoomSession: vi.fn(), logger: logger,
}) as unknown as MatrixRTCSession; createPublisherFactory: (): Publisher => ({}) as unknown as Publisher,
joinMatrixRTC: async (): Promise<void> => {},
homeserverConnected$: constant(true),
};
await enterRTCSession( it("throws error on missing RTC config error", () => {
mockedSession, withTestScheduler(({ scope, hot, expectObservable }) => {
{ const goodTransport = {
livekit_alias: "roomId", livekit_service_url: "other",
livekit_service_url: "http://my-well-known-service-url.com", } as LivekitTransport;
type: "livekit",
}, const localTransport$ = scope.behavior<LivekitTransport>(
{ hot("1ms #", {}, new MatrixRTCTransportMissingError("domain.com")),
encryptMedia: true, goodTransport,
matrixRTCMode: MATRIX_RTC_MODE, );
},
); const mockConnectionManager = {
transports$: scope.behavior(
localTransport$.pipe(map((t) => new Epoch([t]))),
),
connectionManagerData$: constant(
new Epoch(new ConnectionManagerData()),
),
};
const localMembership = createLocalMembership$({
scope,
...defaultCreateLocalMemberValues,
connectionManager: mockConnectionManager,
localTransport$,
});
expectObservable(localMembership.connectionState.livekit$).toBe("ne", {
n: { state: LivekitState.Uninitialized },
e: {
state: LivekitState.Error,
error: expect.toSatisfy(
(e) => e instanceof MatrixRTCTransportMissingError,
),
},
});
});
});
}); });

View File

@@ -13,14 +13,14 @@ import {
} from "livekit-client"; } from "livekit-client";
import { observeParticipantEvents } from "@livekit/components-core"; import { observeParticipantEvents } from "@livekit/components-core";
import { import {
type LivekitTransport,
type MatrixRTCSession,
MembershipManagerEvent, MembershipManagerEvent,
Status, Status,
type LivekitTransport,
type MatrixRTCSession,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { ClientEvent, SyncState, type Room as MatrixRoom } from "matrix-js-sdk";
import { import {
BehaviorSubject, BehaviorSubject,
catchError,
combineLatest, combineLatest,
distinctUntilChanged, distinctUntilChanged,
fromEvent, fromEvent,
@@ -32,23 +32,17 @@ import {
switchMap, switchMap,
tap, tap,
} from "rxjs"; } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger"; import { logger, type Logger } from "matrix-js-sdk/lib/logger";
import { ClientEvent, type Room, SyncState } from "matrix-js-sdk";
import { type Behavior } from "../../Behavior"; import { type Behavior } from "../../Behavior";
import { type IConnectionManager } from "../remoteMembers/ConnectionManager"; import { type IConnectionManager } from "../remoteMembers/ConnectionManager";
import { ObservableScope } from "../../ObservableScope"; import { ObservableScope } from "../../ObservableScope";
import { Publisher } from "./Publisher"; import { type Publisher } from "./Publisher";
import { type MuteStates } from "../../MuteStates"; import { type MuteStates } from "../../MuteStates";
import { type ProcessorState } from "../../../livekit/TrackProcessorContext";
import { type MediaDevices } from "../../MediaDevices";
import { and$ } from "../../../utils/observable"; import { and$ } from "../../../utils/observable";
import { ElementCallError, UnknownCallError } from "../../../utils/errors"; import { ElementCallError, UnknownCallError } from "../../../utils/errors";
import { import { ElementWidgetActions, widget } from "../../../widget";
ElementWidgetActions,
widget,
type WidgetHelpers,
} from "../../../widget";
import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers";
import { getUrlParams } from "../../../UrlParams.ts"; import { getUrlParams } from "../../../UrlParams.ts";
import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts"; import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts";
import { MatrixRTCMode } from "../../../settings/settings.ts"; import { MatrixRTCMode } from "../../../settings/settings.ts";
@@ -68,7 +62,7 @@ export enum LivekitState {
} }
type LocalMemberLivekitState = type LocalMemberLivekitState =
| { state: LivekitState.Error; error: string } | { state: LivekitState.Error; error: ElementCallError }
| { state: LivekitState.Connected } | { state: LivekitState.Connected }
| { state: LivekitState.Connecting } | { state: LivekitState.Connecting }
| { state: LivekitState.Uninitialized } | { state: LivekitState.Uninitialized }
@@ -79,12 +73,14 @@ export enum MatrixState {
Connected = "connected", Connected = "connected",
Disconnected = "disconnected", Disconnected = "disconnected",
Connecting = "connecting", Connecting = "connecting",
Error = "Error",
} }
type LocalMemberMatrixState = type LocalMemberMatrixState =
| { state: MatrixState.Connected } | { state: MatrixState.Connected }
| { state: MatrixState.Connecting } | { state: MatrixState.Connecting }
| { state: MatrixState.Disconnected }; | { state: MatrixState.Disconnected }
| { state: MatrixState.Error; error: Error };
export interface LocalMemberConnectionState { export interface LocalMemberConnectionState {
livekit$: Behavior<LocalMemberLivekitState>; livekit$: Behavior<LocalMemberLivekitState>;
@@ -102,17 +98,21 @@ export interface LocalMemberConnectionState {
* - Publisher.publishTracks() * - Publisher.publishTracks()
* - send join state/sticky event * - send join state/sticky event
*/ */
interface Props { interface Props {
options: Behavior<EnterRTCSessionOptions>; // TODO add a comment into some code style readme or file header callviewmodel
// that the inputs for those createSomething$() functions should NOT contain any js-sdk objectes
scope: ObservableScope; scope: ObservableScope;
mediaDevices: MediaDevices;
muteStates: MuteStates; muteStates: MuteStates;
connectionManager: IConnectionManager; connectionManager: IConnectionManager;
matrixRTCSession: MatrixRTCSession; createPublisherFactory: (connection: Connection) => Publisher;
matrixRoom: MatrixRoom; joinMatrixRTC: (trasnport: LivekitTransport) => Promise<void>;
homeserverConnected$: Behavior<boolean>;
localTransport$: Behavior<LivekitTransport | null>; localTransport$: Behavior<LivekitTransport | null>;
trackProcessorState$: Behavior<ProcessorState>; matrixRTCSession: Pick<
widget: WidgetHelpers | null; MatrixRTCSession,
"updateCallIntent" | "leaveRoomSession"
>;
logger: Logger; logger: Logger;
} }
@@ -131,18 +131,15 @@ interface Props {
*/ */
export const createLocalMembership$ = ({ export const createLocalMembership$ = ({
scope, scope,
options,
muteStates,
mediaDevices,
connectionManager, connectionManager,
matrixRTCSession, localTransport$: localTransportCanThrow$,
localTransport$, homeserverConnected$,
matrixRoom, createPublisherFactory,
trackProcessorState$, joinMatrixRTC,
widget,
logger: parentLogger, logger: parentLogger,
muteStates,
matrixRTCSession,
}: Props): { }: Props): {
// publisher: Publisher
requestConnect: () => LocalMemberConnectionState; requestConnect: () => LocalMemberConnectionState;
startTracks: () => Behavior<LocalTrack[]>; startTracks: () => Behavior<LocalTrack[]>;
requestDisconnect: () => Observable<LocalMemberLivekitState> | null; requestDisconnect: () => Observable<LocalMemberLivekitState> | null;
@@ -154,17 +151,13 @@ export const createLocalMembership$ = ({
toggleScreenSharing: (() => void) | null; toggleScreenSharing: (() => void) | null;
participant$: Behavior<LocalParticipant | null>; participant$: Behavior<LocalParticipant | null>;
connection$: Behavior<Connection | null>; connection$: Behavior<Connection | null>;
// deprecated fields
/** @deprecated use state instead*/
homeserverConnected$: Behavior<boolean>; homeserverConnected$: Behavior<boolean>;
// deprecated fields
/** @deprecated use state instead*/ /** @deprecated use state instead*/
connected$: Behavior<boolean>; connected$: Behavior<boolean>;
// this needs to be discussed // this needs to be discussed
/** @deprecated use state instead*/ /** @deprecated use state instead*/
reconnecting$: Behavior<boolean>; reconnecting$: Behavior<boolean>;
// also needs to be disccues
/** @deprecated use state instead*/
configError$: Behavior<ElementCallError | null>;
} => { } => {
const logger = parentLogger.getChild("[LocalMembership]"); const logger = parentLogger.getChild("[LocalMembership]");
logger.debug(`Creating local membership..`); logger.debug(`Creating local membership..`);
@@ -188,18 +181,32 @@ export const createLocalMembership$ = ({
// This should be used in a combineLatest with publisher$ to connect. // This should be used in a combineLatest with publisher$ to connect.
const tracks$ = new BehaviorSubject<LocalTrack[]>([]); const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
// unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error.
const localTransport$ = scope.behavior(
localTransportCanThrow$.pipe(
catchError((e: unknown) => {
if (e instanceof ElementCallError) {
state.livekit$.next({ state: LivekitState.Error, error: e });
} else {
logger.error("Unknown error from localTransport$", e);
}
return of(null);
}),
),
);
// Drop Epoch data here since we will not combine this anymore // Drop Epoch data here since we will not combine this anymore
const localConnection$ = scope.behavior( const localConnection$ = scope.behavior(
combineLatest([connectionManager.connections$, localTransport$]).pipe( combineLatest([
map(([connections, localTransport]) => { connectionManager.connectionManagerData$,
localTransport$,
]).pipe(
map(([connectionData, localTransport]) => {
if (localTransport === null) { if (localTransport === null) {
return null; return null;
} }
return (
connections.value.find((connection) => return connectionData.value.getConnectionForTransport(localTransport);
areLivekitTransportsEqual(connection.transport, localTransport),
) ?? null
);
}), }),
tap((connection) => { tap((connection) => {
logger.info( logger.info(
@@ -208,40 +215,6 @@ export const createLocalMembership$ = ({
}), }),
), ),
); );
/**
* Whether we are connected to the MatrixRTC session.
*/
const homeserverConnected$ = scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
and$(
// The client is connected to the sync loop
(
fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable<
[SyncState]
>
).pipe(
startWith([matrixRoom.client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
),
// Room state observed by session says we're connected
fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe(
startWith(null),
map(() => matrixRTCSession.membershipStatus === Status.Connected),
),
// Also watch out for warnings that we've likely hit a timeout and our
// delayed leave event is being sent (this condition is here because it
// provides an earlier warning than the sync loop timeout, and we wouldn't
// see the actual leave event until we reconnect to the sync loop)
fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe(
startWith(null),
map(() => matrixRTCSession.probablyLeft !== true),
),
).pipe(
tap((connected) => {
logger.info(`Homeserver connected update: ${connected}`);
}),
),
);
// /** // /**
// * Whether we are "fully" connected to the call. Accounts for both the // * Whether we are "fully" connected to the call. Accounts for both the
@@ -265,18 +238,15 @@ export const createLocalMembership$ = ({
localConnection$.pipe(scope.bind()).subscribe((connection) => { localConnection$.pipe(scope.bind()).subscribe((connection) => {
if (connection !== null && publisher$.value === null) { if (connection !== null && publisher$.value === null) {
// TODO looks strange to not change publisher if connection changes. // TODO looks strange to not change publisher if connection changes.
publisher$.next( // @valere will take care of this!
new Publisher( publisher$.next(createPublisherFactory(connection));
scope,
connection,
mediaDevices,
muteStates,
trackProcessorState$,
),
);
} }
}); });
// const mutestate= publisher$.pipe(switchMap((publisher) => {
// return publisher.muteState$
// });
combineLatest([publisher$, trackStartRequested$]).subscribe( combineLatest([publisher$, trackStartRequested$]).subscribe(
([publisher, shouldStartTracks]) => { ([publisher, shouldStartTracks]) => {
if (publisher && shouldStartTracks) { if (publisher && shouldStartTracks) {
@@ -359,14 +329,21 @@ export const createLocalMembership$ = ({
} }
state.matrix$.next({ state: MatrixState.Connecting }); state.matrix$.next({ state: MatrixState.Connecting });
logger.info("Matrix State connecting"); logger.info("Matrix State connecting");
enterRTCSession(matrixRTCSession, transport, options.value).catch(
(error) => { joinMatrixRTC(transport).catch((error) => {
logger.error(error); logger.error(error);
}, state.matrix$.next({ state: MatrixState.Error, error });
); });
}, },
); );
// TODO add this and update `state.matrix$` based on it.
// useTypedEventEmitter(
// rtcSession,
// MatrixRTCSessionEvent.MembershipManagerError,
// (error) => setExternalError(new ConnectionLostError()),
// );
const requestConnect = (): LocalMemberConnectionState => { const requestConnect = (): LocalMemberConnectionState => {
trackStartRequested$.next(true); trackStartRequested$.next(true);
connectRequested$.next(true); connectRequested$.next(true);
@@ -440,18 +417,25 @@ export const createLocalMembership$ = ({
} }
} }
}); });
// TODO: Refactor updateCallIntent to sth like this:
// combineLatest([muteStates.video.enabled$,localTransport$, state.matrix$]).pipe(map(()=>{
// matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"),
// }))
//
const configError$ = new BehaviorSubject<ElementCallError | null>(null);
// TODO I do not fully understand what this does. // TODO I do not fully understand what this does.
// Is it needed? // Is it needed?
// Is this at the right place? // Is this at the right place?
// Can this be simplified? // Can this be simplified?
// Start and stop session membership as needed // Start and stop session membership as needed
scope.reconcile(localTransport$, async (advertised) => { // Discussed in statndup -> It seems we can remove this (there is another call to enterRTCSession in this file)
if (advertised !== null && advertised !== undefined) { // MAKE SURE TO UNDERSTAND why reconcile is needed and what is potentially missing from the alternative enterRTCSession block.
// @toger5 will try to take care of this.
scope.reconcile(localTransport$, async (transport) => {
if (transport !== null && transport !== undefined) {
try { try {
await enterRTCSession(matrixRTCSession, advertised, options.value); state.matrix$.next({ state: MatrixState.Connecting });
configError$.next(null); await joinMatrixRTC(transport);
} catch (e) { } catch (e) {
logger.error("Error entering RTC session", e); logger.error("Error entering RTC session", e);
} }
@@ -493,14 +477,13 @@ export const createLocalMembership$ = ({
return s.error instanceof ElementCallError return s.error instanceof ElementCallError
? s.error ? s.error
: new UnknownCallError(s.error); : new UnknownCallError(s.error);
} else {
return null;
} }
}), }),
scope.bind(), scope.bind(),
) )
.subscribe((fatalError) => { .subscribe((error) => {
configError$.next(fatalError); if (error !== undefined)
state.livekit$.next({ state: LivekitState.Error, error });
}); });
/** /**
@@ -509,9 +492,9 @@ export const createLocalMembership$ = ({
const sharingScreen$ = scope.behavior( const sharingScreen$ = scope.behavior(
localConnection$.pipe( localConnection$.pipe(
switchMap((c) => switchMap((c) =>
c === null c !== null && c.livekitRoom
? of(false) ? observeSharingScreen$(c.livekitRoom.localParticipant)
: observeSharingScreen$(c.livekitRoom.localParticipant), : of(false),
), ),
), ),
); );
@@ -539,7 +522,7 @@ export const createLocalMembership$ = ({
: null; : null;
const participant$ = scope.behavior( const participant$ = scope.behavior(
localConnection$.pipe(map((c) => c?.livekitRoom.localParticipant ?? null)), localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)),
); );
return { return {
startTracks, startTracks,
@@ -549,7 +532,7 @@ export const createLocalMembership$ = ({
homeserverConnected$, homeserverConnected$,
connected$, connected$,
reconnecting$, reconnecting$,
configError$,
sharingScreen$, sharingScreen$,
toggleScreenSharing, toggleScreenSharing,
participant$, participant$,
@@ -603,6 +586,7 @@ export async function enterRTCSession(
const { sendNotificationType: notificationType, callIntent } = getUrlParams(); const { sendNotificationType: notificationType, callIntent } = getUrlParams();
const multiSFU = matrixRTCMode !== MatrixRTCMode.Legacy; const multiSFU = matrixRTCMode !== MatrixRTCMode.Legacy;
// Multi-sfu does not need a preferred foci list. just the focus that is actually used. // Multi-sfu does not need a preferred foci list. just the focus that is actually used.
// TODO where/how do we track errors originating from the ongoing rtcSession?
rtcSession.joinRoomSession( rtcSession.joinRoomSession(
multiSFU ? [] : [transport], multiSFU ? [] : [transport],
multiSFU ? transport : undefined, multiSFU ? transport : undefined,
@@ -631,3 +615,44 @@ export async function enterRTCSession(
await widget.api.transport.send(ElementWidgetActions.JoinCall, {}); await widget.api.transport.send(ElementWidgetActions.JoinCall, {});
} }
} }
/**
* Whether we are connected to the MatrixRTC session.
*/
export function createHomeserverConnected$(
scope: ObservableScope,
matrixRoom: Room,
matrixRTCSession: MatrixRTCSession,
): Behavior<boolean> {
return scope.behavior(
// To consider ourselves connected to MatrixRTC, we check the following:
and$(
// The client is connected to the sync loop
(
fromEvent(matrixRoom.client, ClientEvent.Sync) as Observable<
[SyncState]
>
).pipe(
startWith([matrixRoom.client.getSyncState()]),
map(([state]) => state === SyncState.Syncing),
),
// Room state observed by session says we're connected
fromEvent(matrixRTCSession, MembershipManagerEvent.StatusChanged).pipe(
startWith(null),
map(() => matrixRTCSession.membershipStatus === Status.Connected),
),
// Also watch out for warnings that we've likely hit a timeout and our
// delayed leave event is being sent (this condition is here because it
// provides an earlier warning than the sync loop timeout, and we wouldn't
// see the actual leave event until we reconnect to the sync loop)
fromEvent(matrixRTCSession, MembershipManagerEvent.ProbablyLeft).pipe(
startWith(null),
map(() => matrixRTCSession.probablyLeft !== true),
),
).pipe(
tap((connected) => {
logger.info(`Homeserver connected update: ${connected}`);
}),
),
);
}

View File

@@ -0,0 +1,120 @@
/*
Copyright 2025 Element Creations Ltd.
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc";
import { mockConfig, flushPromises } from "../../../utils/test";
import { createLocalTransport$ } from "./LocalTransport";
import { constant } from "../../Behavior";
import { Epoch, ObservableScope } from "../../ObservableScope";
import {
MatrixRTCTransportMissingError,
FailToGetOpenIdToken,
} from "../../../utils/errors";
import * as openIDSFU from "../../../livekit/openIDSFU";
describe("LocalTransport", () => {
let scope: ObservableScope;
beforeEach(() => (scope = new ObservableScope()));
afterEach(() => scope.end());
it("throws if config is missing", async () => {
const localTransport$ = createLocalTransport$({
scope,
roomId: "!room:example.org",
useOldestMember$: constant(false),
memberships$: constant(new Epoch<CallMembership[]>([])),
client: {
getDomain: () => "",
// These won't be called in this error path but satisfy the type
getOpenIdToken: vi.fn(),
getDeviceId: vi.fn(),
},
});
await flushPromises();
expect(() => localTransport$.value).toThrow(
new MatrixRTCTransportMissingError(""),
);
});
it("throws FailToGetOpenIdToken when OpenID fetch fails", async () => {
// Provide a valid config so makeTransportInternal resolves a transport
const scope = new ObservableScope();
mockConfig({
livekit: { livekit_service_url: "https://lk.example.org" },
});
const resolver = Promise.withResolvers<void>();
vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockImplementation(
async () => {
await resolver.promise;
throw new FailToGetOpenIdToken(new Error("no openid"));
},
);
const observations: unknown[] = [];
const errors: Error[] = [];
const localTransport$ = createLocalTransport$({
scope,
roomId: "!room:example.org",
useOldestMember$: constant(false),
memberships$: constant(new Epoch<CallMembership[]>([])),
client: {
// Use empty domain to skip .well-known and use config directly
getDomain: () => "",
getOpenIdToken: vi.fn(),
getDeviceId: vi.fn(),
},
});
localTransport$.subscribe(
(o) => observations.push(o),
(e) => errors.push(e),
);
resolver.resolve();
await flushPromises();
const expectedError = new FailToGetOpenIdToken(new Error("no openid"));
expect(observations).toStrictEqual([null]);
expect(errors).toStrictEqual([expectedError]);
expect(() => localTransport$.value).toThrow(expectedError);
});
it("emits preferred transport after OpenID resolves", async () => {
// Use config so transport discovery succeeds, but delay OpenID JWT fetch
mockConfig({
livekit: { livekit_service_url: "https://lk.example.org" },
});
const openIdResolver = Promise.withResolvers<openIDSFU.SFUConfig>();
vi.spyOn(openIDSFU, "getSFUConfigWithOpenID").mockReturnValue(
openIdResolver.promise,
);
const localTransport$ = createLocalTransport$({
scope,
roomId: "!room:example.org",
useOldestMember$: constant(false),
memberships$: constant(new Epoch<CallMembership[]>([])),
client: {
getDomain: () => "",
getOpenIdToken: vi.fn(),
getDeviceId: vi.fn(),
},
});
openIdResolver.resolve?.({ url: "https://lk.example.org", jwt: "jwt" });
expect(localTransport$.value).toBe(null);
await flushPromises();
// final
expect(localTransport$.value).toStrictEqual({
livekit_alias: "!room:example.org",
livekit_service_url: "https://lk.example.org",
type: "livekit",
});
});
});

View File

@@ -21,24 +21,21 @@ import { type Behavior } from "../../Behavior.ts";
import { type Epoch, type ObservableScope } from "../../ObservableScope.ts"; import { type Epoch, type ObservableScope } from "../../ObservableScope.ts";
import { Config } from "../../../config/Config.ts"; import { Config } from "../../../config/Config.ts";
import { MatrixRTCTransportMissingError } from "../../../utils/errors.ts"; import { MatrixRTCTransportMissingError } from "../../../utils/errors.ts";
import { getSFUConfigWithOpenID } from "../../../livekit/openIDSFU.ts"; import {
getSFUConfigWithOpenID,
type OpenIDClientParts,
} from "../../../livekit/openIDSFU.ts";
import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts"; import { areLivekitTransportsEqual } from "../remoteMembers/MatrixLivekitMembers.ts";
/* /*
* - get well known * It figures out “which LiveKit focus URL/alias the local user should use,”
* - get oldest membership * optionally aligning with the oldest member, and ensures the SFU path is primed
* - get transport to use * before advertising that choice.
* - get openId + jwt token
* - wait for createTrack() call
* - create tracks
* - wait for join() call
* - Publisher.publishTracks()
* - send join state/sticky event
*/ */
interface Props { interface Props {
scope: ObservableScope; scope: ObservableScope;
memberships$: Behavior<Epoch<CallMembership[]>>; memberships$: Behavior<Epoch<CallMembership[]>>;
client: MatrixClient; client: Pick<MatrixClient, "getDomain"> & OpenIDClientParts;
roomId: string; roomId: string;
useOldestMember$: Behavior<boolean>; useOldestMember$: Behavior<boolean>;
} }
@@ -49,6 +46,8 @@ interface Props {
* *
* @prop useOldestMember Whether to use the same transport as the oldest member. * @prop useOldestMember Whether to use the same transport as the oldest member.
* This will only update once the first oldest member appears. Will not recompute if the oldest member leaves. * This will only update once the first oldest member appears. Will not recompute if the oldest member leaves.
*
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/ */
export const createLocalTransport$ = ({ export const createLocalTransport$ = ({
scope, scope,
@@ -75,6 +74,8 @@ export const createLocalTransport$ = ({
/** /**
* The transport that we would personally prefer to publish on (if not for the * The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps). * transport preferences of others, perhaps).
*
* @throws
*/ */
const preferredTransport$: Behavior<LivekitTransport | null> = scope.behavior( const preferredTransport$: Behavior<LivekitTransport | null> = scope.behavior(
from(makeTransport(client, roomId)), from(makeTransport(client, roomId)),
@@ -103,10 +104,18 @@ export const createLocalTransport$ = ({
const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci"; const FOCI_WK_KEY = "org.matrix.msc4143.rtc_foci";
async function makeTransportInternal( /**
client: MatrixClient, *
* @param client
* @param roomId
* @returns
* @throws MatrixRTCTransportMissingError | FailToGetOpenIdToken
*/
async function makeTransport(
client: Pick<MatrixClient, "getDomain"> & OpenIDClientParts,
roomId: string, roomId: string,
): Promise<LivekitTransport> { ): Promise<LivekitTransport> {
let transport: LivekitTransport | undefined;
logger.log("Searching for a preferred transport"); logger.log("Searching for a preferred transport");
//TODO refactor this to use the jwt service returned alias. //TODO refactor this to use the jwt service returned alias.
const livekitAlias = roomId; const livekitAlias = roomId;
@@ -124,7 +133,7 @@ async function makeTransportInternal(
"Using LiveKit transport from local storage: ", "Using LiveKit transport from local storage: ",
transportFromStorage, transportFromStorage,
); );
return transportFromStorage; transport = transportFromStorage;
} }
// Prioritize the .well-known/matrix/client, if available, over the configured SFU // Prioritize the .well-known/matrix/client, if available, over the configured SFU
@@ -136,12 +145,11 @@ async function makeTransportInternal(
FOCI_WK_KEY FOCI_WK_KEY
]; ];
if (Array.isArray(wellKnownFoci)) { if (Array.isArray(wellKnownFoci)) {
const transport: LivekitTransportConfig | undefined = wellKnownFoci.find( const wellKnownTransport: LivekitTransportConfig | undefined =
(f) => f && isLivekitTransportConfig(f), wellKnownFoci.find((f) => f && isLivekitTransportConfig(f));
); if (wellKnownTransport !== undefined) {
if (transport !== undefined) {
logger.log("Using LiveKit transport from .well-known: ", transport); logger.log("Using LiveKit transport from .well-known: ", transport);
return { ...transport, livekit_alias: livekitAlias }; transport = { ...wellKnownTransport, livekit_alias: livekitAlias };
} }
} }
} }
@@ -154,26 +162,15 @@ async function makeTransportInternal(
livekit_alias: livekitAlias, livekit_alias: livekitAlias,
}; };
logger.log("Using LiveKit transport from config: ", transportFromConf); logger.log("Using LiveKit transport from config: ", transportFromConf);
return transportFromConf; transport = transportFromConf;
} }
if (!transport) throw new MatrixRTCTransportMissingError(domain ?? ""); // this will call the jwt/sfu/get endpoint to pre create the livekit room.
throw new MatrixRTCTransportMissingError(domain ?? ""); await getSFUConfigWithOpenID(
} client,
transport.livekit_service_url,
transport.livekit_alias,
);
async function makeTransport(
client: MatrixClient,
roomId: string,
): Promise<LivekitTransport> {
const transport = await makeTransportInternal(client, roomId);
// this will call the jwt/sfu/get endpoint to pre create the livekit room.
try {
await getSFUConfigWithOpenID(
client,
transport.livekit_service_url,
transport.livekit_alias,
);
} catch (e) {
logger.warn(`Failed to get SFU config for transport: ${e}`);
}
return transport; return transport;
} }

View File

@@ -23,6 +23,7 @@ import type { Behavior } from "../../Behavior.ts";
import type { ProcessorState } from "../../../livekit/TrackProcessorContext.tsx"; import type { ProcessorState } from "../../../livekit/TrackProcessorContext.tsx";
import { defaultLiveKitOptions } from "../../../livekit/options.ts"; import { defaultLiveKitOptions } from "../../../livekit/options.ts";
// TODO evaluate if this should be done like the Publisher Factory
export interface ConnectionFactory { export interface ConnectionFactory {
createConnection( createConnection(
transport: LivekitTransport, transport: LivekitTransport,

View File

@@ -11,7 +11,7 @@ import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { type Participant as LivekitParticipant } from "livekit-client"; import { type Participant as LivekitParticipant } from "livekit-client";
import { logger } from "matrix-js-sdk/lib/logger"; import { logger } from "matrix-js-sdk/lib/logger";
import { Epoch, ObservableScope } from "../../ObservableScope.ts"; import { Epoch, mapEpoch, ObservableScope } from "../../ObservableScope.ts";
import { import {
createConnectionManager$, createConnectionManager$,
type ConnectionManagerData, type ConnectionManagerData,
@@ -73,7 +73,7 @@ afterEach(() => {
describe("connections$ stream", () => { describe("connections$ stream", () => {
test("Should create and start new connections for each transports", () => { test("Should create and start new connections for each transports", () => {
withTestScheduler(({ behavior, expectObservable }) => { withTestScheduler(({ behavior, expectObservable }) => {
const { connections$ } = createConnectionManager$({ const { connectionManagerData$ } = createConnectionManager$({
scope: testScope, scope: testScope,
connectionFactory: fakeConnectionFactory, connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("a", { inputTransports$: behavior("a", {
@@ -82,7 +82,9 @@ describe("connections$ stream", () => {
logger: logger, logger: logger,
}); });
expectObservable(connections$).toBe("a", { expectObservable(
connectionManagerData$.pipe(mapEpoch((d) => d.getConnections())),
).toBe("a", {
a: expect.toSatisfy((e: Epoch<Connection[]>) => { a: expect.toSatisfy((e: Epoch<Connection[]>) => {
const connections = e.value; const connections = e.value;
expect(connections.length).toBe(2); expect(connections.length).toBe(2);
@@ -110,7 +112,7 @@ describe("connections$ stream", () => {
test("Should start connection only once", () => { test("Should start connection only once", () => {
withTestScheduler(({ behavior, expectObservable }) => { withTestScheduler(({ behavior, expectObservable }) => {
const { connections$ } = createConnectionManager$({ const { connectionManagerData$ } = createConnectionManager$({
scope: testScope, scope: testScope,
connectionFactory: fakeConnectionFactory, connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("abcdef", { inputTransports$: behavior("abcdef", {
@@ -124,7 +126,9 @@ describe("connections$ stream", () => {
logger: logger, logger: logger,
}); });
expectObservable(connections$).toBe("xxxxxa", { expectObservable(
connectionManagerData$.pipe(mapEpoch((d) => d.getConnections())),
).toBe("xxxxxa", {
x: expect.anything(), x: expect.anything(),
a: expect.toSatisfy((e: Epoch<Connection[]>) => { a: expect.toSatisfy((e: Epoch<Connection[]>) => {
const connections = e.value; const connections = e.value;
@@ -153,7 +157,7 @@ describe("connections$ stream", () => {
test("Should cleanup connections when not needed anymore", () => { test("Should cleanup connections when not needed anymore", () => {
withTestScheduler(({ behavior, expectObservable }) => { withTestScheduler(({ behavior, expectObservable }) => {
const { connections$ } = createConnectionManager$({ const { connectionManagerData$ } = createConnectionManager$({
scope: testScope, scope: testScope,
connectionFactory: fakeConnectionFactory, connectionFactory: fakeConnectionFactory,
inputTransports$: behavior("abc", { inputTransports$: behavior("abc", {
@@ -164,7 +168,9 @@ describe("connections$ stream", () => {
logger: logger, logger: logger,
}); });
expectObservable(connections$).toBe("xab", { expectObservable(
connectionManagerData$.pipe(mapEpoch((d) => d.getConnections())),
).toBe("xab", {
x: expect.anything(), x: expect.anything(),
a: expect.toSatisfy((e: Epoch<Connection[]>) => { a: expect.toSatisfy((e: Epoch<Connection[]>) => {
const connections = e.value; const connections = e.value;

View File

@@ -94,7 +94,6 @@ interface Props {
export interface IConnectionManager { export interface IConnectionManager {
transports$: Behavior<Epoch<LivekitTransport[]>>; transports$: Behavior<Epoch<LivekitTransport[]>>;
connectionManagerData$: Behavior<Epoch<ConnectionManagerData>>; connectionManagerData$: Behavior<Epoch<ConnectionManagerData>>;
connections$: Behavior<Epoch<Connection[]>>;
} }
/** /**
* Crete a `ConnectionManager` * Crete a `ConnectionManager`
@@ -217,7 +216,7 @@ export function createConnectionManager$({
new Epoch(new ConnectionManagerData()), new Epoch(new ConnectionManagerData()),
); );
return { transports$, connectionManagerData$, connections$ }; return { transports$, connectionManagerData$ };
} }
function removeDuplicateTransports( function removeDuplicateTransports(

View File

@@ -157,6 +157,7 @@ export class MuteStates {
private readonly mediaDevices: MediaDevices, private readonly mediaDevices: MediaDevices,
private readonly joined$: Observable<boolean>, private readonly joined$: Observable<boolean>,
) { ) {
logger.log("widget", widget);
if (widget !== null) { if (widget !== null) {
// Sync our mute states with the hosting client // Sync our mute states with the hosting client
const widgetApiState$ = combineLatest( const widgetApiState$ = combineLatest(

View File

@@ -80,6 +80,7 @@ export async function flushPromises(): Promise<void> {
} }
export interface OurRunHelpers extends RunHelpers { export interface OurRunHelpers extends RunHelpers {
scheduler: TestScheduler;
/** /**
* Schedules a sequence of actions to happen, as described by a marble * Schedules a sequence of actions to happen, as described by a marble
* diagram. * diagram.
@@ -123,6 +124,7 @@ export function withTestScheduler(
continuation({ continuation({
...helpers, ...helpers,
scope, scope,
scheduler,
schedule(marbles, actions) { schedule(marbles, actions) {
const actionsObservable$ = helpers const actionsObservable$ = helpers
.cold(marbles) .cold(marbles)