Almost running

- NEVER use undefined as the default for behaviors (FOOTGUN)
This commit is contained in:
Timo K
2025-11-07 12:32:29 +01:00
parent 92fdce33ea
commit 28047217b8
13 changed files with 83 additions and 34 deletions

View File

@@ -146,7 +146,7 @@ export function createCallNotificationLifecycle$({
newAndLegacyEvents?.[0].notification_type === "ring", newAndLegacyEvents?.[0].notification_type === "ring",
), ),
map((e) => e as CallNotificationWrapper), map((e) => e as CallNotificationWrapper),
switchMap(([notificastionEvent]) => { switchMap(([notificationEvent]) => {
const lifetimeMs = notificationEvent?.lifetime ?? 0; const lifetimeMs = notificationEvent?.lifetime ?? 0;
return concat( return concat(
lifetimeMs === 0 lifetimeMs === 0

View File

@@ -109,6 +109,7 @@ import {
createReceivedDecline$, createReceivedDecline$,
createSentCallNotification$, createSentCallNotification$,
} from "./CallNotificationLifecycle.ts"; } from "./CallNotificationLifecycle.ts";
import { createRoomMembers$ } from "./remoteMembers/displayname.ts";
const logger = rootLogger.getChild("[CallViewModel]"); const logger = rootLogger.getChild("[CallViewModel]");
//TODO //TODO
@@ -266,6 +267,7 @@ export class CallViewModel {
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// CallNotificationLifecycle // CallNotificationLifecycle
// consider inlining these!!!
private sentCallNotification$ = createSentCallNotification$( private sentCallNotification$ = createSentCallNotification$(
this.scope, this.scope,
this.matrixRTCSession, this.matrixRTCSession,
@@ -281,6 +283,9 @@ export class CallViewModel {
localUser: { userId: this.userId, deviceId: this.deviceId }, localUser: { userId: this.userId, deviceId: this.deviceId },
}); });
// ------------------------------------------------------------------------
// ROOM MEMBER tracking TODO
private roomMembers$ = createRoomMembers$(this.scope, this.matrixRoom);
/** /**
* If there is a configuration error with the call (e.g. misconfigured E2EE). * If there is a configuration error with the call (e.g. misconfigured E2EE).
* This is a fatal error that prevents the call from being created/joined. * This is a fatal error that prevents the call from being created/joined.
@@ -440,6 +445,7 @@ export class CallViewModel {
mediaItems.filter((m): m is UserMedia => m instanceof UserMedia), mediaItems.filter((m): m is UserMedia => m instanceof UserMedia),
), ),
), ),
[],
); );
public readonly joinSoundEffect$ = this.userMedia$.pipe( public readonly joinSoundEffect$ = this.userMedia$.pipe(
@@ -465,6 +471,9 @@ export class CallViewModel {
this.memberships$.pipe(map((ms) => ms.value.length)), this.memberships$.pipe(map((ms) => ms.value.length)),
); );
// only public to expose to the view.
public readonly callPickupState$ = this.callLifecycle.callPickupState$;
public readonly leaveSoundEffect$ = combineLatest([ public readonly leaveSoundEffect$ = combineLatest([
this.callLifecycle.callPickupState$, this.callLifecycle.callPickupState$,
this.userMedia$, this.userMedia$,
@@ -645,7 +654,6 @@ export class CallViewModel {
private readonly naturalWindowMode$ = this.scope.behavior<WindowMode>( private readonly naturalWindowMode$ = this.scope.behavior<WindowMode>(
fromEvent(window, "resize").pipe( fromEvent(window, "resize").pipe(
startWith(null),
map(() => { map(() => {
const height = window.innerHeight; const height = window.innerHeight;
const width = window.innerWidth; const width = window.innerWidth;
@@ -658,6 +666,7 @@ export class CallViewModel {
return "normal"; return "normal";
}), }),
), ),
"normal",
); );
/** /**
@@ -687,7 +696,6 @@ export class CallViewModel {
// automatically switch to spotlight mode and reset when screen sharing ends // automatically switch to spotlight mode and reset when screen sharing ends
this.scope.behavior<GridMode>( this.scope.behavior<GridMode>(
this.gridModeUserSelection$.pipe( this.gridModeUserSelection$.pipe(
startWith(null),
switchMap((userSelection) => switchMap((userSelection) =>
(userSelection === "spotlight" (userSelection === "spotlight"
? EMPTY ? EMPTY
@@ -706,6 +714,7 @@ export class CallViewModel {
).pipe(startWith(userSelection ?? "grid")), ).pipe(startWith(userSelection ?? "grid")),
), ),
), ),
"grid",
); );
public setGridMode(value: GridMode): void { public setGridMode(value: GridMode): void {

View File

@@ -288,7 +288,7 @@ export const createLocalMembership$ = ({
}; };
const requestConnect = (): LocalMemberConnectionState => { const requestConnect = (): LocalMemberConnectionState => {
if (state.livekit$.value === null) { if (state.livekit$.value.state === LivekitState.Uninitialized) {
startTracks(); startTracks();
state.livekit$.next({ state: LivekitState.Connecting }); state.livekit$.next({ state: LivekitState.Connecting });
combineLatest([publisher$, tracks$], (publisher, tracks) => { combineLatest([publisher$, tracks$], (publisher, tracks) => {
@@ -302,7 +302,7 @@ export const createLocalMembership$ = ({
}); });
}); });
} }
if (state.matrix$.value.state !== MatrixState.Disconnected) { if (state.matrix$.value.state === MatrixState.Disconnected) {
state.matrix$.next({ state: MatrixState.Connecting }); state.matrix$.next({ state: MatrixState.Connecting });
localTransport$.pipe( localTransport$.pipe(
tap((transport) => { tap((transport) => {
@@ -438,6 +438,7 @@ export const createLocalMembership$ = ({
return of(false); return of(false);
}), }),
), ),
null,
); );
const toggleScreenSharing = const toggleScreenSharing =

View File

@@ -67,18 +67,22 @@ export const createLocalTransport$ = ({
*/ */
const oldestMemberTransport$ = scope.behavior( const oldestMemberTransport$ = scope.behavior(
memberships$.pipe( memberships$.pipe(
mapEpoch((memberships) => memberships[0].getTransport(memberships[0])), mapEpoch(
first((t) => t != undefined && isLivekitTransport(t)), (memberships) => memberships[0]?.getTransport(memberships[0]) ?? null,
),
first((t) => t != null && isLivekitTransport(t)),
), ),
undefined, null,
); );
/** /**
* The transport that we would personally prefer to publish on (if not for the * The transport that we would personally prefer to publish on (if not for the
* transport preferences of others, perhaps). * transport preferences of others, perhaps).
*/ */
const preferredTransport$: Behavior<LivekitTransport | undefined> = const preferredTransport$: Behavior<LivekitTransport | null> = scope.behavior(
scope.behavior(from(makeTransport(client, roomId)), undefined); from(makeTransport(client, roomId)),
null,
);
/** /**
* The transport we should advertise in our MatrixRTC membership. * The transport we should advertise in our MatrixRTC membership.
@@ -89,7 +93,6 @@ export const createLocalTransport$ = ({
(useOldestMember, oldestMemberTransport, preferredTransport) => (useOldestMember, oldestMemberTransport, preferredTransport) =>
useOldestMember ? oldestMemberTransport : preferredTransport, useOldestMember ? oldestMemberTransport : preferredTransport,
).pipe<LivekitTransport>(distinctUntilChanged(deepCompare)), ).pipe<LivekitTransport>(distinctUntilChanged(deepCompare)),
undefined,
); );
return advertisedTransport$; return advertisedTransport$;
}; };
@@ -103,7 +106,6 @@ async function makeTransportInternal(
logger.log("Searching for a preferred transport"); logger.log("Searching for a preferred transport");
//TODO refactor this to use the jwt service returned alias. //TODO refactor this to use the jwt service returned alias.
const livekitAlias = roomId; const livekitAlias = roomId;
// TODO-MULTI-SFU: Either remove this dev tool or make it more official // TODO-MULTI-SFU: Either remove this dev tool or make it more official
const urlFromStorage = const urlFromStorage =
localStorage.getItem("robin-matrixrtc-auth") ?? localStorage.getItem("robin-matrixrtc-auth") ??

View File

@@ -307,6 +307,7 @@ export class Publisher {
return track instanceof LocalVideoTrack ? track : null; return track instanceof LocalVideoTrack ? track : null;
}), }),
), ),
null,
); );
trackProcessorSync(track$, trackerProcessorState$); trackProcessorSync(track$, trackerProcessorState$);
} }

View File

@@ -13,7 +13,7 @@ import {
type LivekitTransport, type LivekitTransport,
type ParticipantId, type ParticipantId,
} from "matrix-js-sdk/lib/matrixrtc"; } from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, combineLatest, map, switchMap } from "rxjs"; import { BehaviorSubject, combineLatest, map, of, switchMap } from "rxjs";
import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
import { type LocalParticipant, type RemoteParticipant } from "livekit-client"; import { type LocalParticipant, type RemoteParticipant } from "livekit-client";
@@ -191,6 +191,11 @@ export function createConnectionManager$({
); );
}); });
// probably not required
if (listOfConnectionsWithPublishingParticipants.length === 0) {
return of(new Epoch(new ConnectionManagerData(), epoch));
}
// combineLatest the several streams into a single stream with the ConnectionManagerData // combineLatest the several streams into a single stream with the ConnectionManagerData
return combineLatest(listOfConnectionsWithPublishingParticipants).pipe( return combineLatest(listOfConnectionsWithPublishingParticipants).pipe(
map( map(
@@ -206,6 +211,7 @@ export function createConnectionManager$({
); );
}), }),
), ),
new Epoch(new ConnectionManagerData()),
); );
return { transports$, connectionManagerData$, connections$ }; return { transports$, connectionManagerData$, connections$ };

View File

@@ -17,7 +17,6 @@ import { combineLatest, filter, map } from "rxjs";
// eslint-disable-next-line rxjs/no-internal // eslint-disable-next-line rxjs/no-internal
import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent"; import { type NodeStyleEventEmitter } from "rxjs/internal/observable/fromEvent";
import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk"; import { type Room as MatrixRoom, type RoomMember } from "matrix-js-sdk";
import { logger } from "matrix-js-sdk/lib/logger";
import { type Behavior } from "../../Behavior"; import { type Behavior } from "../../Behavior";
import { type IConnectionManager } from "./ConnectionManager"; import { type IConnectionManager } from "./ConnectionManager";
@@ -56,6 +55,7 @@ interface Props {
// => Extract an AvatarService instead? // => Extract an AvatarService instead?
// Better with just `getMember` // Better with just `getMember`
matrixRoom: Pick<MatrixRoom, "getMember"> & NodeStyleEventEmitter; matrixRoom: Pick<MatrixRoom, "getMember"> & NodeStyleEventEmitter;
roomMember$: Behavior<Pick<RoomMember, "userId" | "getMxcAvatarUrl">>;
} }
// Alternative structure idea: // Alternative structure idea:
// const livekitMatrixMember$ = (callMemberships$,connectionManager,scope): Observable<MatrixLivekitMember[]> => { // const livekitMatrixMember$ = (callMemberships$,connectionManager,scope): Observable<MatrixLivekitMember[]> => {

View File

@@ -26,6 +26,17 @@ import {
} from "../../../utils/displayname"; } from "../../../utils/displayname";
import { type Behavior } from "../../Behavior"; import { type Behavior } from "../../Behavior";
export function createRoomMembers$(
scope: ObservableScope,
matrixRoom: MatrixRoom,
): Behavior<Pick<RoomMember, "userId" | "getMxcAvatarUrl">[]> {
return scope.behavior(
fromEvent(matrixRoom, RoomStateEvent.Members).pipe(
map(() => matrixRoom.getMembers()),
),
[],
);
}
/** /**
* Displayname for each member of the call. This will disambiguate * Displayname for each member of the call. This will disambiguate
* any displayname that clashes with another member. Only members * any displayname that clashes with another member. Only members
@@ -37,6 +48,7 @@ import { type Behavior } from "../../Behavior";
export const memberDisplaynames$ = ( export const memberDisplaynames$ = (
scope: ObservableScope, scope: ObservableScope,
matrixRoom: Pick<MatrixRoom, "getMember"> & NodeStyleEventEmitter, matrixRoom: Pick<MatrixRoom, "getMember"> & NodeStyleEventEmitter,
// roomMember$: Behavior<Pick<RoomMember, "userId" | "getMxcAvatarUrl">>;
memberships$: Observable<Epoch<CallMembership[]>>, memberships$: Observable<Epoch<CallMembership[]>>,
): Behavior<Epoch<Map<string, string>>> => ): Behavior<Epoch<Map<string, string>>> =>
scope.behavior( scope.behavior(

View File

@@ -7,8 +7,14 @@ Please see LICENSE in the repository root for full details.
import { describe, expect, it } from "vitest"; import { describe, expect, it } from "vitest";
import { Epoch, mapEpoch, trackEpoch } from "./ObservableScope"; import {
Epoch,
mapEpoch,
ObservableScope,
trackEpoch,
} from "./ObservableScope";
import { withTestScheduler } from "../utils/test"; import { withTestScheduler } from "../utils/test";
import { BehaviorSubject, timer } from "rxjs";
describe("Epoch", () => { describe("Epoch", () => {
it("should map the value correctly", () => { it("should map the value correctly", () => {
@@ -53,4 +59,17 @@ describe("Epoch", () => {
}); });
}); });
}); });
it("obs", () => {
const nothing = Symbol("nothing");
const scope = new ObservableScope();
const sb$ = new BehaviorSubject("initial");
const su$ = new BehaviorSubject(undefined);
expect(sb$.value).toBe("initial");
expect(su$.value).toBe(undefined);
expect(su$.value === nothing).toBe(false);
const a$ = timer(10);
scope.behavior(a$, undefined);
});
}); });

View File

@@ -24,7 +24,7 @@ import { type Behavior } from "./Behavior";
type MonoTypeOperator = <T>(o: Observable<T>) => Observable<T>; type MonoTypeOperator = <T>(o: Observable<T>) => Observable<T>;
const nothing = Symbol("nothing"); export const noInitialValue = Symbol("nothing");
/** /**
* A scope which limits the execution lifetime of its bound Observables. * A scope which limits the execution lifetime of its bound Observables.
@@ -59,10 +59,7 @@ export class ObservableScope {
* Converts an Observable to a Behavior. If no initial value is specified, the * Converts an Observable to a Behavior. If no initial value is specified, the
* Observable must synchronously emit an initial value. * Observable must synchronously emit an initial value.
*/ */
public behavior<T>( public behavior<T>(setValue$: Observable<T>, initialValue?: T): Behavior<T> {
setValue$: Observable<T>,
initialValue: T | typeof nothing = nothing,
): Behavior<T> {
const subject$ = new BehaviorSubject(initialValue); const subject$ = new BehaviorSubject(initialValue);
// Push values from the Observable into the BehaviorSubject. // Push values from the Observable into the BehaviorSubject.
// BehaviorSubjects have an undesirable feature where if you call 'complete', // BehaviorSubjects have an undesirable feature where if you call 'complete',
@@ -77,7 +74,7 @@ export class ObservableScope {
subject$.error(err); subject$.error(err);
}, },
}); });
if (subject$.value === nothing) if (subject$.value === noInitialValue)
throw new Error("Behavior failed to synchronously emit an initial value"); throw new Error("Behavior failed to synchronously emit an initial value");
return subject$ as Behavior<T>; return subject$ as Behavior<T>;
} }
@@ -118,27 +115,27 @@ export class ObservableScope {
value$: Behavior<T>, value$: Behavior<T>,
callback: (value: T) => Promise<(() => Promise<void>) | void>, callback: (value: T) => Promise<(() => Promise<void>) | void>,
): void { ): void {
let latestValue: T | typeof nothing = nothing; let latestValue: T | typeof noInitialValue = noInitialValue;
let reconciledValue: T | typeof nothing = nothing; let reconciledValue: T | typeof noInitialValue = noInitialValue;
let cleanUp: (() => Promise<void>) | void = undefined; let cleanUp: (() => Promise<void>) | void = undefined;
value$ value$
.pipe( .pipe(
catchError(() => EMPTY), // Ignore errors catchError(() => EMPTY), // Ignore errors
this.bind(), // Limit to the duration of the scope this.bind(), // Limit to the duration of the scope
endWith(nothing), // Clean up when the scope ends endWith(noInitialValue), // Clean up when the scope ends
) )
.subscribe((value) => { .subscribe((value) => {
void (async (): Promise<void> => { void (async (): Promise<void> => {
if (latestValue === nothing) { if (latestValue === noInitialValue) {
latestValue = value; latestValue = value;
while (latestValue !== reconciledValue) { while (latestValue !== reconciledValue) {
await cleanUp?.(); // Call the previous value's clean-up handler await cleanUp?.(); // Call the previous value's clean-up handler
reconciledValue = latestValue; reconciledValue = latestValue;
if (latestValue !== nothing) if (latestValue !== noInitialValue)
cleanUp = await callback(latestValue); // Sync current value cleanUp = await callback(latestValue); // Sync current value
} }
// Reset to signal that reconciliation is done for now // Reset to signal that reconciliation is done for now
latestValue = nothing; latestValue = noInitialValue;
} else { } else {
// There's already an instance of the above 'while' loop running // There's already an instance of the above 'while' loop running
// concurrently. Just update the latest value and let it be handled. // concurrently. Just update the latest value and let it be handled.
@@ -176,11 +173,11 @@ export const globalScope = new ObservableScope();
* *
* # Use Epoch * # Use Epoch
* ``` * ```
* const rootObs$ = of(1,2,3).pipe(trackEpoch()); * const ancestorObs$ = of(1,2,3).pipe(trackEpoch());
* const derivedObs$ = rootObs$.pipe( * const derivedObs$ = ancestorObs$.pipe(
* mapEpoch((v)=> "this number: " + v) * mapEpoch((v)=> "this number: " + v)
* ); * );
* const otherDerivedObs$ = rootObs$.pipe( * const otherDerivedObs$ = ancestorObs$.pipe(
* mapEpoch((v)=> "multiplied by: " + v) * mapEpoch((v)=> "multiplied by: " + v)
* ); * );
* const mergedObs$ = combineLatest([derivedObs$, otherDerivedObs$]).pipe( * const mergedObs$ = combineLatest([derivedObs$, otherDerivedObs$]).pipe(
@@ -241,6 +238,7 @@ export function mapEpoch<T, U>(
): OperatorFunction<Epoch<T>, Epoch<U>> { ): OperatorFunction<Epoch<T>, Epoch<U>> {
return map((e) => e.mapInner(mapFn)); return map((e) => e.mapInner(mapFn));
} }
/** /**
* # usage * # usage
* ``` * ```

View File

@@ -15,7 +15,7 @@ import {
import { fromEvent } from "rxjs"; import { fromEvent } from "rxjs";
import { import {
type Epoch, Epoch,
mapEpoch, mapEpoch,
trackEpoch, trackEpoch,
type ObservableScope, type ObservableScope,
@@ -76,5 +76,6 @@ export const createMemberships$ = (
MatrixRTCSessionEvent.MembershipsChanged, MatrixRTCSessionEvent.MembershipsChanged,
(_, memberships: CallMembership[]) => memberships, (_, memberships: CallMembership[]) => memberships,
).pipe(trackEpoch()), ).pipe(trackEpoch()),
new Epoch([]),
); );
}; };

View File

@@ -111,7 +111,7 @@ export class UserMedia {
private readonly presenter$ = this.scope.behavior( private readonly presenter$ = this.scope.behavior(
this.participant$.pipe( this.participant$.pipe(
switchMap((p) => (p === undefined ? of(false) : sharingScreen$(p))), switchMap((p) => (p === null ? of(false) : sharingScreen$(p))),
), ),
); );
@@ -151,7 +151,7 @@ export class UserMedia {
private readonly initialParticipant: private readonly initialParticipant:
| LocalParticipant | LocalParticipant
| RemoteParticipant | RemoteParticipant
| undefined, | null = null,
private readonly encryptionSystem: EncryptionSystem, private readonly encryptionSystem: EncryptionSystem,
private readonly livekitRoom: LivekitRoom, private readonly livekitRoom: LivekitRoom,
private readonly focusURL: string, private readonly focusURL: string,
@@ -163,7 +163,7 @@ export class UserMedia {
) {} ) {}
public updateParticipant( public updateParticipant(
newParticipant: LocalParticipant | RemoteParticipant | undefined, newParticipant: LocalParticipant | RemoteParticipant | null = null,
): void { ): void {
if (this.participant$.value !== newParticipant) { if (this.participant$.value !== newParticipant) {
// Update the BehaviourSubject in the UserMedia. // Update the BehaviourSubject in the UserMedia.