Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
Timo K
2025-08-27 14:01:01 +02:00
parent 9b9c08ed61
commit 9011ae4e1f
8 changed files with 461 additions and 314 deletions

View File

@@ -12,7 +12,9 @@ import {
} from "@livekit/components-core";
import {
ConnectionState,
type Room as LivekitRoom,
E2EEOptions,
ExternalE2EEKeyProvider,
Room as LivekitRoom,
type LocalParticipant,
ParticipantEvent,
type RemoteParticipant,
@@ -22,6 +24,7 @@ import {
type EventTimelineSetHandlerMap,
EventType,
RoomEvent,
MatrixClient,
RoomStateEvent,
SyncState,
type Room as MatrixRoom,
@@ -63,6 +66,7 @@ import {
import { logger } from "matrix-js-sdk/lib/logger";
import {
type CallMembership,
isLivekitFocusConfig,
type MatrixRTCSession,
MatrixRTCSessionEvent,
type MatrixRTCSessionEventHandlerMap,
@@ -116,7 +120,16 @@ import { observeSpeaker$ } from "./observeSpeaker";
import { shallowEquals } from "../utils/array";
import { calculateDisplayName, shouldDisambiguate } from "../utils/displayname";
import { type MediaDevices } from "./MediaDevices";
import { constant, type Behavior } from "./Behavior";
import { type Behavior } from "./Behavior";
import { getSFUConfigWithOpenID } from "../livekit/openIDSFU";
import { defaultLiveKitOptions } from "../livekit/options";
import {
enterRTCSession,
getLivekitAlias,
makeFocus,
} from "../rtcSessionHelpers";
import { E2eeType } from "../e2ee/e2eeType";
import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider";
export interface CallViewModelOptions {
encryptionSystem: EncryptionSystem;
@@ -405,6 +418,31 @@ class ScreenShare {
type MediaItem = UserMedia | ScreenShare;
function getE2eeOptions(
e2eeSystem: EncryptionSystem,
rtcSession: MatrixRTCSession,
): E2EEOptions | undefined {
if (e2eeSystem.kind === E2eeType.NONE) return undefined;
if (e2eeSystem.kind === E2eeType.PER_PARTICIPANT) {
const keyProvider = new MatrixKeyProvider();
keyProvider.setRTCSession(rtcSession);
return {
keyProvider,
worker: new E2EEWorker(),
};
} else if (e2eeSystem.kind === E2eeType.SHARED_KEY && e2eeSystem.secret) {
const keyProvider = new ExternalE2EEKeyProvider();
keyProvider
.setKey(e2eeSystem.secret)
.catch((e) => logger.error("Failed to set shared key for E2EE", e));
return {
keyProvider,
worker: new E2EEWorker(),
};
}
}
function getRoomMemberFromRtcMember(
rtcMember: CallMembership,
room: MatrixRoom,
@@ -427,8 +465,151 @@ function getRoomMemberFromRtcMember(
return { id, member };
}
// TODO: Move wayyyy more business logic from the call and lobby views into here
class Connection {
// TODO-MULTI-SFU Add all device syncing logic from useLivekit
private readonly sfuConfig = getSFUConfigWithOpenID(
this.client,
this.serviceUrl,
this.livekitAlias,
);
public async startSubscribing(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
}
public async startPublishing(): Promise<void> {
this.stopped = false;
const { url, jwt } = await this.sfuConfig;
if (!this.stopped)
// TODO-MULTI-SFU this should not create a track?
await this.livekitRoom.localParticipant.createTracks({
audio: { deviceId: "default" },
});
if (!this.stopped) await this.livekitRoom.connect(url, jwt);
}
private stopped = false;
public stop(): void {
void this.livekitRoom.disconnect();
this.stopped = true;
}
public readonly participants$ = connectedParticipantsObserver(
this.livekitRoom,
).pipe(this.scope.state());
public constructor(
private readonly livekitRoom: LivekitRoom,
private readonly serviceUrl: string,
private readonly livekitAlias: string,
private readonly client: MatrixClient,
private readonly scope: ObservableScope,
) {}
}
export class CallViewModel extends ViewModel {
private readonly e2eeOptions = getE2eeOptions(
this.encryptionSystem,
this.matrixRTCSession,
);
private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession);
private readonly livekitRoom = new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
});
private readonly localFocus = makeFocus(this.matrixRTCSession);
private readonly localConnection = this.localFocus.then(
(focus) =>
new Connection(
this.livekitRoom,
focus.livekit_service_url,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
),
);
private readonly memberships$ = fromEvent(
this.matrixRTCSession,
MatrixRTCSessionEvent.MembershipsChanged,
).pipe(map(() => this.matrixRTCSession.memberships));
private readonly foci$ = this.memberships$.pipe(
map(
(memberships) =>
new Set(
memberships
.map((m) => this.matrixRTCSession.resolveActiveFocus(m))
.filter((f) => f !== undefined && isLivekitFocusConfig(f))
.map((f) => f.livekit_service_url),
),
),
);
private readonly remoteConnections$ = combineLatest([
this.localFocus,
this.foci$,
]).pipe(
accumulate(new Map<string, Connection>(), (prev, [localFocus, foci]) => {
const stopped = new Map(prev);
const next = new Map<string, Connection>();
for (const focus of foci) {
if (focus !== localFocus.livekit_service_url) {
stopped.delete(focus);
next.set(
focus,
prev.get(focus) ??
new Connection(
new LivekitRoom({
...defaultLiveKitOptions,
e2ee: this.e2eeOptions,
}),
focus,
this.livekitAlias,
this.matrixRTCSession.room.client,
this.scope,
),
);
}
}
for (const connection of stopped.values()) connection.stop();
return next;
}),
);
private readonly joined$ = new Subject<void>();
public join(): void {
this.joined$.next();
}
public leave(): void {
// TODO
}
private readonly connectionInstructions$ = this.joined$.pipe(
switchMap(() => this.remoteConnections$),
startWith(new Map<string, Connection>()),
pairwise(),
map(([prev, next]) => {
const start = new Set(next.values());
for (const connection of prev.values()) start.delete(connection);
const stop = new Set(prev.values());
for (const connection of next.values()) stop.delete(connection);
return { start, stop };
}),
);
private readonly userId = this.matrixRoom.client.getUserId();
private readonly matrixConnected$ = this.scope.behavior(
@@ -502,79 +683,13 @@ export class CallViewModel extends ViewModel {
// in a split-brained state.
private readonly pretendToBeDisconnected$ = this.reconnecting$;
/**
* The raw list of RemoteParticipants as reported by LiveKit
*/
private readonly rawRemoteParticipants$ = this.scope.behavior<
RemoteParticipant[]
>(connectedParticipantsObserver(this.livekitRoom), []);
/**
* Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that
* they've left
*/
private readonly remoteParticipantHolds$ = this.scope.behavior<
RemoteParticipant[][]
>(
this.livekitConnectionState$.pipe(
withLatestFrom(this.rawRemoteParticipants$),
mergeMap(([s, ps]) => {
// Whenever we switch focuses, we should retain all the previous
// participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to
// give their clients time to switch over and avoid jarring layout shifts
if (s === ECAddonConnectionState.ECSwitchingFocus) {
return concat(
// Hold these participants
of({ hold: ps }),
// Wait for time to pass and the connection state to have changed
forkJoin([
timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS),
this.livekitConnectionState$.pipe(
filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus),
take(1),
),
// Then unhold them
]).pipe(map(() => ({ unhold: ps }))),
);
} else {
return EMPTY;
}
}),
// Accumulate the hold instructions into a single list showing which
// participants are being held
accumulate([] as RemoteParticipant[][], (holds, instruction) =>
"hold" in instruction
? [instruction.hold, ...holds]
: holds.filter((h) => h !== instruction.unhold),
),
),
);
/**
* The RemoteParticipants including those that are being "held" on the screen
*/
private readonly remoteParticipants$ = this.scope
.behavior<RemoteParticipant[]>(
combineLatest(
[this.rawRemoteParticipants$, this.remoteParticipantHolds$],
(raw, holds) => {
const result = [...raw];
const resultIds = new Set(result.map((p) => p.identity));
// Incorporate the held participants into the list
for (const hold of holds) {
for (const p of hold) {
if (!resultIds.has(p.identity)) {
result.push(p);
resultIds.add(p.identity);
}
}
}
return result;
},
),
)
.behavior<
RemoteParticipant[]
>(combineLatest([this.localConnection, this.remoteConnections$], (localConnection, remoteConnections) => combineLatest([localConnection.participants$, ...[...remoteConnections.values()].map((c) => c.participants$)], (...ps) => ps.flat(1))).pipe(switchAll(), startWith([])))
.pipe(pauseWhen(this.pretendToBeDisconnected$));
private readonly memberships$ = this.scope.behavior(
@@ -1685,24 +1800,42 @@ export class CallViewModel extends ViewModel {
),
filter((v) => v.playSounds),
);
// TODO-REBASE: expose connection state observable
public readonly livekitConnectionState$: Observable<ECConnectionState>;
public constructor(
// A call is permanently tied to a single Matrix room and LiveKit room
// A call is permanently tied to a single Matrix room
private readonly matrixRTCSession: MatrixRTCSession,
private readonly matrixRoom: MatrixRoom,
private readonly livekitRoom: LivekitRoom,
private readonly mediaDevices: MediaDevices,
private readonly options: CallViewModelOptions,
public readonly livekitConnectionState$: Behavior<ECConnectionState>,
private readonly handsRaisedSubject$: Observable<
Record<string, RaisedHandInfo>
>,
private readonly reactionsSubject$: Observable<
Record<string, ReactionInfo>
>,
private readonly encryptionSystem: EncryptionSystem,
) {
super();
void this.localConnection.then((c) => c.startPublishing());
this.connectionInstructions$
.pipe(this.scope.bind())
.subscribe(({ start, stop }) => {
for (const connection of start) connection.startSubscribing();
for (const connection of stop) connection.stop();
});
combineLatest([this.localFocus, this.joined$])
.pipe(this.scope.bind())
.subscribe(([localFocus]) => {
enterRTCSession(
this.matrixRTCSession,
localFocus,
this.encryptionSystem.kind !== E2eeType.PER_PARTICIPANT,
);
});
// Pause upstream of all local media tracks when we're disconnected from
// MatrixRTC, because it can be an unpleasant surprise for the app to say
// 'reconnecting' and yet still be transmitting your media to others.