start moving over/removing things from the CallViewModel
This commit is contained in:
@@ -137,7 +137,16 @@ import {
|
||||
import { ElementCallError, UnknownCallError } from "../utils/errors.ts";
|
||||
import { ObservableScope } from "./ObservableScope.ts";
|
||||
import { memberDisplaynames$ } from "./remoteMembers/displayname.ts";
|
||||
import { ConnectionManager } from "./remoteMembers/ConnectionManager.ts";
|
||||
import { MatrixLivekitMerger } from "./remoteMembers/matrixLivekitMerger.ts";
|
||||
|
||||
//TODO
|
||||
// Larger rename
|
||||
// member,membership -> rtcMember
|
||||
// participant -> livekitParticipant
|
||||
// matrixLivekitItem -> callMember
|
||||
// js-sdk
|
||||
// callMembership -> rtcMembership
|
||||
export interface CallViewModelOptions {
|
||||
encryptionSystem: EncryptionSystem;
|
||||
autoLeaveWhenOthersLeft?: boolean;
|
||||
@@ -205,6 +214,29 @@ export class CallViewModel {
|
||||
null,
|
||||
);
|
||||
|
||||
private memberships$ = this.scope.behavior(
|
||||
fromEvent(
|
||||
this.matrixRTCSession,
|
||||
MatrixRTCSessionEvent.MembershipsChanged,
|
||||
(_, memberships: CallMembership[]) => memberships,
|
||||
),
|
||||
);
|
||||
|
||||
private connectionManager = new ConnectionManager(
|
||||
this.scope,
|
||||
this.matrixRoom.client,
|
||||
this.mediaDevices,
|
||||
this.trackProcessorState$,
|
||||
this.e2eeLivekitOptions(),
|
||||
);
|
||||
|
||||
private matrixLivekitMerger = new MatrixLivekitMerger(
|
||||
this.scope,
|
||||
this.memberships$,
|
||||
this.connectionManager,
|
||||
this.matrixRoom,
|
||||
);
|
||||
|
||||
/**
|
||||
* If there is a configuration error with the call (e.g. misconfigured E2EE).
|
||||
* This is a fatal error that prevents the call from being created/joined.
|
||||
@@ -221,7 +253,7 @@ export class CallViewModel {
|
||||
this.join$.next();
|
||||
}
|
||||
|
||||
// CODESMALL
|
||||
// CODESMELL?
|
||||
// This is functionally the same Observable as leave$, except here it's
|
||||
// hoisted to the top of the class. This enables the cyclic dependency between
|
||||
// leave$ -> autoLeave$ -> callPickupState$ -> livekitConnectionState$ ->
|
||||
@@ -302,112 +334,28 @@ export class CallViewModel {
|
||||
),
|
||||
);
|
||||
|
||||
// DISCUSSION move to ConnectionManager
|
||||
/**
|
||||
* The local connection over which we will publish our media. It could
|
||||
* possibly also have some remote users' media available on it.
|
||||
* null when not joined.
|
||||
*/
|
||||
private readonly localConnection$: Behavior<Async<PublishConnection> | null> =
|
||||
this.scope.behavior(
|
||||
generateKeyed$<
|
||||
Async<LivekitTransport> | null,
|
||||
PublishConnection,
|
||||
Async<PublishConnection> | null
|
||||
>(
|
||||
this.localTransport$,
|
||||
(transport, createOrGet) =>
|
||||
transport &&
|
||||
mapAsync(transport, (transport) =>
|
||||
createOrGet(
|
||||
// Stable key that uniquely idenifies the transport
|
||||
JSON.stringify({
|
||||
url: transport.livekit_service_url,
|
||||
alias: transport.livekit_alias,
|
||||
}),
|
||||
(scope) =>
|
||||
new PublishConnection(
|
||||
{
|
||||
transport,
|
||||
client: this.matrixRoom.client,
|
||||
scope,
|
||||
remoteTransports$: this.remoteTransports$,
|
||||
livekitRoomFactory: this.options.livekitRoomFactory,
|
||||
},
|
||||
this.mediaDevices,
|
||||
this.muteStates,
|
||||
this.e2eeLivekitOptions(),
|
||||
this.scope.behavior(this.trackProcessorState$),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
// DISCUSSION move to ConnectionManager
|
||||
public readonly livekitConnectionState$ =
|
||||
// TODO: This options.connectionState$ behavior is a small hack inserted
|
||||
// here to facilitate testing. This would likely be better served by
|
||||
// breaking CallViewModel down into more naturally testable components.
|
||||
this.options.connectionState$ ??
|
||||
this.scope.behavior<ConnectionState>(
|
||||
this.localConnection$.pipe(
|
||||
switchMap((c) =>
|
||||
c?.state === "ready"
|
||||
? // TODO mapping to ConnectionState for compatibility, but we should use the full state?
|
||||
c.value.state$.pipe(
|
||||
switchMap((s) => {
|
||||
if (s.state === "ConnectedToLkRoom")
|
||||
return s.connectionState$;
|
||||
return of(ConnectionState.Disconnected);
|
||||
}),
|
||||
)
|
||||
: of(ConnectionState.Disconnected),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
/**
|
||||
* A list of the connections that should be active at any given time.
|
||||
*/
|
||||
// DISCUSSION move to ConnectionManager
|
||||
private readonly connections$ = this.scope.behavior<Connection[]>(
|
||||
combineLatest(
|
||||
[this.localConnection$, this.remoteConnections$],
|
||||
(local, remote) => [
|
||||
...(local?.state === "ready" ? [local.value] : []),
|
||||
...remote.values(),
|
||||
],
|
||||
),
|
||||
);
|
||||
|
||||
/**
|
||||
* Emits with connections whenever they should be started or stopped.
|
||||
*/
|
||||
// DISCUSSION move to ConnectionManager
|
||||
private readonly connectionInstructions$ = this.connections$.pipe(
|
||||
pairwise(),
|
||||
map(([prev, next]) => {
|
||||
const start = new Set(next.values());
|
||||
for (const connection of prev) start.delete(connection);
|
||||
const stop = new Set(prev.values());
|
||||
for (const connection of next) stop.delete(connection);
|
||||
|
||||
return { start, stop };
|
||||
}),
|
||||
);
|
||||
|
||||
public readonly allLivekitRooms$ = this.scope.behavior(
|
||||
this.connections$.pipe(
|
||||
map((connections) =>
|
||||
[...connections.values()].map((c) => ({
|
||||
room: c.livekitRoom,
|
||||
url: c.transport.livekit_service_url,
|
||||
isLocal: c instanceof PublishConnection,
|
||||
})),
|
||||
),
|
||||
),
|
||||
);
|
||||
// // DISCUSSION move to ConnectionManager
|
||||
// public readonly livekitConnectionState$ =
|
||||
// // TODO: This options.connectionState$ behavior is a small hack inserted
|
||||
// // here to facilitate testing. This would likely be better served by
|
||||
// // breaking CallViewModel down into more naturally testable components.
|
||||
// this.options.connectionState$ ??
|
||||
// this.scope.behavior<ConnectionState>(
|
||||
// this.localConnection$.pipe(
|
||||
// switchMap((c) =>
|
||||
// c?.state === "ready"
|
||||
// ? // TODO mapping to ConnectionState for compatibility, but we should use the full state?
|
||||
// c.value.state$.pipe(
|
||||
// switchMap((s) => {
|
||||
// if (s.state === "ConnectedToLkRoom")
|
||||
// return s.connectionState$;
|
||||
// return of(ConnectionState.Disconnected);
|
||||
// }),
|
||||
// )
|
||||
// : of(ConnectionState.Disconnected),
|
||||
// ),
|
||||
// ),
|
||||
// );
|
||||
|
||||
private readonly userId = this.matrixRoom.client.getUserId()!;
|
||||
private readonly deviceId = this.matrixRoom.client.getDeviceId()!;
|
||||
@@ -450,114 +398,6 @@ export class CallViewModel {
|
||||
),
|
||||
);
|
||||
|
||||
/**
|
||||
* Whether we are "fully" connected to the call. Accounts for both the
|
||||
* connection to the MatrixRTC session and the LiveKit publish connection.
|
||||
*/
|
||||
// DISCUSSION own membership manager
|
||||
private readonly connected$ = this.scope.behavior(
|
||||
and$(
|
||||
this.matrixConnected$,
|
||||
this.livekitConnectionState$.pipe(
|
||||
map((state) => state === ConnectionState.Connected),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
/**
|
||||
* Whether we should tell the user that we're reconnecting to the call.
|
||||
*/
|
||||
// DISCUSSION own membership manager
|
||||
public readonly reconnecting$ = this.scope.behavior(
|
||||
this.connected$.pipe(
|
||||
// We are reconnecting if we previously had some successful initial
|
||||
// connection but are now disconnected
|
||||
scan(
|
||||
({ connectedPreviously }, connectedNow) => ({
|
||||
connectedPreviously: connectedPreviously || connectedNow,
|
||||
reconnecting: connectedPreviously && !connectedNow,
|
||||
}),
|
||||
{ connectedPreviously: false, reconnecting: false },
|
||||
),
|
||||
map(({ reconnecting }) => reconnecting),
|
||||
),
|
||||
);
|
||||
|
||||
/**
|
||||
* Lists the transports used by ourselves, plus all other MatrixRTC session
|
||||
* members. For completeness this also lists the preferred transport and
|
||||
* whether we are in multi-SFU mode or sticky events mode (because
|
||||
* advertisedTransport$ wants to read them at the same time, and bundling data
|
||||
* together when it might change together is what you have to do in RxJS to
|
||||
* avoid reading inconsistent state or observing too many changes.)
|
||||
*/
|
||||
// TODO-MULTI-SFU find a better name for this. With the addition of sticky events it's no longer just about transports.
|
||||
// DISCUSS move the local part to the own membership file
|
||||
private readonly transports$: Behavior<{
|
||||
local: Async<LivekitTransport>;
|
||||
remote: { membership: CallMembership; transport: LivekitTransport }[];
|
||||
preferred: Async<LivekitTransport>;
|
||||
multiSfu: boolean;
|
||||
preferStickyEvents: boolean;
|
||||
} | null> = this.scope.behavior(
|
||||
this.joined$.pipe(
|
||||
switchMap((joined) =>
|
||||
joined
|
||||
? combineLatest(
|
||||
[
|
||||
this.preferredTransport$,
|
||||
this.memberships$,
|
||||
multiSfu.value$,
|
||||
preferStickyEvents.value$,
|
||||
],
|
||||
(preferred, memberships, preferMultiSfu, preferStickyEvents) => {
|
||||
// Multi-SFU must be implicitly enabled when using sticky events
|
||||
const multiSfu = preferStickyEvents || preferMultiSfu;
|
||||
|
||||
const oldestMembership =
|
||||
this.matrixRTCSession.getOldestMembership();
|
||||
const remote = memberships.flatMap((m) => {
|
||||
if (m.userId === this.userId && m.deviceId === this.deviceId)
|
||||
return [];
|
||||
const t = m.getTransport(oldestMembership ?? m);
|
||||
return t && isLivekitTransport(t)
|
||||
? [{ membership: m, transport: t }]
|
||||
: [];
|
||||
});
|
||||
|
||||
let local = preferred;
|
||||
if (!multiSfu) {
|
||||
const oldest = this.matrixRTCSession.getOldestMembership();
|
||||
if (oldest !== undefined) {
|
||||
const selection = oldest.getTransport(oldest);
|
||||
// TODO selection can be null if no transport is configured should we report an error?
|
||||
if (selection && isLivekitTransport(selection))
|
||||
local = ready(selection);
|
||||
}
|
||||
}
|
||||
|
||||
if (local.state === "error") {
|
||||
this._configError$.next(
|
||||
local.value instanceof ElementCallError
|
||||
? local.value
|
||||
: new UnknownCallError(local.value),
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
local,
|
||||
remote,
|
||||
preferred,
|
||||
multiSfu,
|
||||
preferStickyEvents,
|
||||
};
|
||||
},
|
||||
)
|
||||
: of(null),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
/**
|
||||
* Whether various media/event sources should pretend to be disconnected from
|
||||
* all network input, even if their connection still technically works.
|
||||
@@ -569,95 +409,7 @@ export class CallViewModel {
|
||||
// DISCUSSION own membership manager ALSO this probably can be simplifis
|
||||
private readonly pretendToBeDisconnected$ = this.reconnecting$;
|
||||
|
||||
/**
|
||||
* Lists, for each LiveKit room, the LiveKit participants whose media should
|
||||
* be presented.
|
||||
*/
|
||||
private readonly participantsByRoom$ = this.scope.behavior<
|
||||
{
|
||||
livekitRoom: LivekitRoom;
|
||||
url: string; // Included for use as a React key
|
||||
participants: {
|
||||
id: string;
|
||||
participant: LocalParticipant | RemoteParticipant | undefined;
|
||||
member: RoomMember;
|
||||
}[];
|
||||
}[]
|
||||
>(
|
||||
// TODO: Move this logic into Connection/PublishConnection if possible
|
||||
this.localConnection$
|
||||
.pipe(
|
||||
switchMap((localConnection) => {
|
||||
if (localConnection?.state !== "ready") return [];
|
||||
const memberError = (): never => {
|
||||
throw new Error("No room member for call membership");
|
||||
};
|
||||
const localParticipant = {
|
||||
id: `${this.userId}:${this.deviceId}`,
|
||||
participant: localConnection.value.livekitRoom.localParticipant,
|
||||
member:
|
||||
this.matrixRoom.getMember(this.userId ?? "") ?? memberError(),
|
||||
};
|
||||
|
||||
return this.remoteConnections$.pipe(
|
||||
switchMap((remoteConnections) =>
|
||||
combineLatest(
|
||||
[localConnection.value, ...remoteConnections].map((c) =>
|
||||
c.publishingParticipants$.pipe(
|
||||
map((ps) => {
|
||||
const participants: {
|
||||
id: string;
|
||||
participant:
|
||||
| LocalParticipant
|
||||
| RemoteParticipant
|
||||
| undefined;
|
||||
member: RoomMember;
|
||||
}[] = ps.map(({ participant, membership }) => ({
|
||||
id: `${membership.userId}:${membership.deviceId}`,
|
||||
participant,
|
||||
member:
|
||||
getRoomMemberFromRtcMember(
|
||||
membership,
|
||||
this.matrixRoom,
|
||||
)?.member ?? memberError(),
|
||||
}));
|
||||
if (c === localConnection.value)
|
||||
participants.push(localParticipant);
|
||||
|
||||
return {
|
||||
livekitRoom: c.livekitRoom,
|
||||
url: c.transport.livekit_service_url,
|
||||
participants,
|
||||
};
|
||||
}),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
}),
|
||||
)
|
||||
.pipe(startWith([]), pauseWhen(this.pretendToBeDisconnected$)),
|
||||
);
|
||||
|
||||
/**
|
||||
* Lists, for each LiveKit room, the LiveKit participants whose audio should
|
||||
* be rendered.
|
||||
*/
|
||||
// (This is effectively just participantsByRoom$ with a stricter type)
|
||||
public readonly audioParticipants$ = this.scope.behavior(
|
||||
this.participantsByRoom$.pipe(
|
||||
map((data) =>
|
||||
data.map(({ livekitRoom, url, participants }) => ({
|
||||
livekitRoom,
|
||||
url,
|
||||
participants: participants.flatMap(({ participant }) =>
|
||||
participant instanceof RemoteParticipant ? [participant] : [],
|
||||
),
|
||||
})),
|
||||
),
|
||||
),
|
||||
);
|
||||
public readonly audioParticipants$; // now will be created based on the connectionmanager
|
||||
|
||||
public readonly handsRaised$ = this.scope.behavior(
|
||||
this.handsRaisedSubject$.pipe(pauseWhen(this.pretendToBeDisconnected$)),
|
||||
@@ -677,17 +429,19 @@ export class CallViewModel {
|
||||
),
|
||||
);
|
||||
|
||||
memberDisplaynames$ = memberDisplaynames$(
|
||||
this.matrixRoom,
|
||||
this.memberships$,
|
||||
this.scope,
|
||||
this.userId,
|
||||
this.deviceId,
|
||||
);
|
||||
// Now will be added to the matricLivekitMerger
|
||||
// memberDisplaynames$ = memberDisplaynames$(
|
||||
// this.matrixRoom,
|
||||
// this.memberships$,
|
||||
// this.scope,
|
||||
// this.userId,
|
||||
// this.deviceId,
|
||||
// );
|
||||
|
||||
/**
|
||||
* List of MediaItems that we want to have tiles for.
|
||||
*/
|
||||
// TODO KEEP THIS!! and adapt it to what our membershipManger returns
|
||||
private readonly mediaItems$ = this.scope.behavior<MediaItem[]>(
|
||||
generateKeyed$<
|
||||
[typeof this.participantsByRoom$.value, number],
|
||||
@@ -790,10 +544,12 @@ export class CallViewModel {
|
||||
* - There can be multiple participants for one Matrix user if they join from
|
||||
* multiple devices.
|
||||
*/
|
||||
// TODO KEEP THIS!! and adapt it to what our membershipManger returns
|
||||
public readonly participantCount$ = this.scope.behavior(
|
||||
this.memberships$.pipe(map((ms) => ms.length)),
|
||||
);
|
||||
|
||||
// TODO convert all ring and all others left logic into one callLifecycleTracker$(didSendCallNotification$,matrixLivekitItem$): {autoLeave$,callPickupState$}
|
||||
private readonly allOthersLeft$ = this.memberships$.pipe(
|
||||
pairwise(),
|
||||
filter(
|
||||
@@ -1687,46 +1443,8 @@ export class CallViewModel {
|
||||
private readonly reactionsSubject$: Observable<
|
||||
Record<string, ReactionInfo>
|
||||
>,
|
||||
private readonly trackProcessorState$: Observable<ProcessorState>,
|
||||
private readonly trackProcessorState$: Behavior<ProcessorState>,
|
||||
) {
|
||||
// Start and stop local and remote connections as needed
|
||||
// DISCUSSION connection manager
|
||||
this.connectionInstructions$
|
||||
.pipe(this.scope.bind())
|
||||
.subscribe(({ start, stop }) => {
|
||||
for (const c of stop) {
|
||||
logger.info(`Disconnecting from ${c.transport.livekit_service_url}`);
|
||||
c.stop().catch((err) => {
|
||||
// TODO: better error handling
|
||||
logger.error(
|
||||
`Fail to stop connection to ${c.transport.livekit_service_url}`,
|
||||
err,
|
||||
);
|
||||
});
|
||||
}
|
||||
for (const c of start) {
|
||||
c.start().then(
|
||||
() =>
|
||||
logger.info(`Connected to ${c.transport.livekit_service_url}`),
|
||||
(e) => {
|
||||
// We only want to report fatal errors `_configError$` for the publish connection.
|
||||
// If there is an error with another connection, it will not terminate the call and will be displayed
|
||||
// on eacn tile.
|
||||
if (
|
||||
c instanceof PublishConnection &&
|
||||
e instanceof ElementCallError
|
||||
) {
|
||||
this._configError$.next(e);
|
||||
}
|
||||
logger.error(
|
||||
`Failed to start connection to ${c.transport.livekit_service_url}`,
|
||||
e,
|
||||
);
|
||||
},
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
// Start and stop session membership as needed
|
||||
this.scope.reconcile(this.advertisedTransport$, async (advertised) => {
|
||||
if (advertised !== null) {
|
||||
|
||||
Reference in New Issue
Block a user