Replace ObservableScope.state with Observable.behavior

This commit is contained in:
Robin
2025-06-18 17:14:21 -04:00
parent 7e81eca068
commit 35ed313577
4 changed files with 670 additions and 657 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -51,6 +51,7 @@ import { accumulate } from "../utils/observable";
import { type EncryptionSystem } from "../e2ee/sharedKeyManagement"; import { type EncryptionSystem } from "../e2ee/sharedKeyManagement";
import { E2eeType } from "../e2ee/e2eeType"; import { E2eeType } from "../e2ee/e2eeType";
import { type ReactionOption } from "../reactions"; import { type ReactionOption } from "../reactions";
import { type Behavior } from "./Behavior";
export function observeTrackReference$( export function observeTrackReference$(
participant$: Observable<Participant | undefined>, participant$: Observable<Participant | undefined>,
@@ -223,13 +224,13 @@ abstract class BaseMediaViewModel extends ViewModel {
/** /**
* The LiveKit video track for this media. * The LiveKit video track for this media.
*/ */
public readonly video$: Observable<TrackReferenceOrPlaceholder | undefined>; public readonly video$: Behavior<TrackReferenceOrPlaceholder | undefined>;
/** /**
* Whether there should be a warning that this media is unencrypted. * Whether there should be a warning that this media is unencrypted.
*/ */
public readonly unencryptedWarning$: Observable<boolean>; public readonly unencryptedWarning$: Behavior<boolean>;
public readonly encryptionStatus$: Observable<EncryptionStatus>; public readonly encryptionStatus$: Behavior<EncryptionStatus>;
/** /**
* Whether this media corresponds to the local participant. * Whether this media corresponds to the local participant.
@@ -260,11 +261,11 @@ abstract class BaseMediaViewModel extends ViewModel {
public readonly displayname$: Observable<string>, public readonly displayname$: Observable<string>,
) { ) {
super(); super();
const audio$ = observeTrackReference$(participant$, audioSource).pipe( const audio$ = observeTrackReference$(participant$, audioSource).behavior(
this.scope.state(), this.scope,
); );
this.video$ = observeTrackReference$(participant$, videoSource).pipe( this.video$ = observeTrackReference$(participant$, videoSource).behavior(
this.scope.state(), this.scope,
); );
this.unencryptedWarning$ = combineLatest( this.unencryptedWarning$ = combineLatest(
[audio$, this.video$], [audio$, this.video$],
@@ -272,70 +273,71 @@ abstract class BaseMediaViewModel extends ViewModel {
encryptionSystem.kind !== E2eeType.NONE && encryptionSystem.kind !== E2eeType.NONE &&
(a?.publication?.isEncrypted === false || (a?.publication?.isEncrypted === false ||
v?.publication?.isEncrypted === false), v?.publication?.isEncrypted === false),
).pipe(this.scope.state()); ).behavior(this.scope);
this.encryptionStatus$ = this.participant$.pipe( this.encryptionStatus$ = this.participant$
switchMap((participant): Observable<EncryptionStatus> => { .pipe(
if (!participant) { switchMap((participant): Observable<EncryptionStatus> => {
return of(EncryptionStatus.Connecting); if (!participant) {
} else if ( return of(EncryptionStatus.Connecting);
participant.isLocal || } else if (
encryptionSystem.kind === E2eeType.NONE participant.isLocal ||
) { encryptionSystem.kind === E2eeType.NONE
return of(EncryptionStatus.Okay); ) {
} else if (encryptionSystem.kind === E2eeType.PER_PARTICIPANT) { return of(EncryptionStatus.Okay);
return combineLatest([ } else if (encryptionSystem.kind === E2eeType.PER_PARTICIPANT) {
encryptionErrorObservable$( return combineLatest([
livekitRoom, encryptionErrorObservable$(
participant, livekitRoom,
encryptionSystem, participant,
"MissingKey", encryptionSystem,
), "MissingKey",
encryptionErrorObservable$( ),
livekitRoom, encryptionErrorObservable$(
participant, livekitRoom,
encryptionSystem, participant,
"InvalidKey", encryptionSystem,
), "InvalidKey",
observeRemoteTrackReceivingOkay$(participant, audioSource), ),
observeRemoteTrackReceivingOkay$(participant, videoSource), observeRemoteTrackReceivingOkay$(participant, audioSource),
]).pipe( observeRemoteTrackReceivingOkay$(participant, videoSource),
map(([keyMissing, keyInvalid, audioOkay, videoOkay]) => { ]).pipe(
if (keyMissing) return EncryptionStatus.KeyMissing; map(([keyMissing, keyInvalid, audioOkay, videoOkay]) => {
if (keyInvalid) return EncryptionStatus.KeyInvalid; if (keyMissing) return EncryptionStatus.KeyMissing;
if (audioOkay || videoOkay) return EncryptionStatus.Okay; if (keyInvalid) return EncryptionStatus.KeyInvalid;
return undefined; // no change
}),
filter((x) => !!x),
startWith(EncryptionStatus.Connecting),
);
} else {
return combineLatest([
encryptionErrorObservable$(
livekitRoom,
participant,
encryptionSystem,
"InvalidKey",
),
observeRemoteTrackReceivingOkay$(participant, audioSource),
observeRemoteTrackReceivingOkay$(participant, videoSource),
]).pipe(
map(
([keyInvalid, audioOkay, videoOkay]):
| EncryptionStatus
| undefined => {
if (keyInvalid) return EncryptionStatus.PasswordInvalid;
if (audioOkay || videoOkay) return EncryptionStatus.Okay; if (audioOkay || videoOkay) return EncryptionStatus.Okay;
return undefined; // no change return undefined; // no change
}, }),
), filter((x) => !!x),
filter((x) => !!x), startWith(EncryptionStatus.Connecting),
startWith(EncryptionStatus.Connecting), );
); } else {
} return combineLatest([
}), encryptionErrorObservable$(
this.scope.state(), livekitRoom,
); participant,
encryptionSystem,
"InvalidKey",
),
observeRemoteTrackReceivingOkay$(participant, audioSource),
observeRemoteTrackReceivingOkay$(participant, videoSource),
]).pipe(
map(
([keyInvalid, audioOkay, videoOkay]):
| EncryptionStatus
| undefined => {
if (keyInvalid) return EncryptionStatus.PasswordInvalid;
if (audioOkay || videoOkay) return EncryptionStatus.Okay;
return undefined; // no change
},
),
filter((x) => !!x),
startWith(EncryptionStatus.Connecting),
);
}
}),
)
.behavior(this.scope);
} }
} }
@@ -354,31 +356,33 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel {
/** /**
* Whether the participant is speaking. * Whether the participant is speaking.
*/ */
public readonly speaking$ = this.participant$.pipe( public readonly speaking$ = this.participant$
switchMap((p) => .pipe(
p switchMap((p) =>
? observeParticipantEvents(p, ParticipantEvent.IsSpeakingChanged).pipe( p
map((p) => p.isSpeaking), ? observeParticipantEvents(
) p,
: of(false), ParticipantEvent.IsSpeakingChanged,
), ).pipe(map((p) => p.isSpeaking))
this.scope.state(), : of(false),
); ),
)
.behavior(this.scope);
/** /**
* Whether this participant is sending audio (i.e. is unmuted on their side). * Whether this participant is sending audio (i.e. is unmuted on their side).
*/ */
public readonly audioEnabled$: Observable<boolean>; public readonly audioEnabled$: Behavior<boolean>;
/** /**
* Whether this participant is sending video. * Whether this participant is sending video.
*/ */
public readonly videoEnabled$: Observable<boolean>; public readonly videoEnabled$: Behavior<boolean>;
private readonly _cropVideo$ = new BehaviorSubject(true); private readonly _cropVideo$ = new BehaviorSubject(true);
/** /**
* Whether the tile video should be contained inside the tile or be cropped to fit. * Whether the tile video should be contained inside the tile or be cropped to fit.
*/ */
public readonly cropVideo$: Observable<boolean> = this._cropVideo$; public readonly cropVideo$: Behavior<boolean> = this._cropVideo$;
public constructor( public constructor(
id: string, id: string,
@@ -387,8 +391,8 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel {
encryptionSystem: EncryptionSystem, encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom, livekitRoom: LivekitRoom,
displayname$: Observable<string>, displayname$: Observable<string>,
public readonly handRaised$: Observable<Date | null>, public readonly handRaised$: Behavior<Date | null>,
public readonly reaction$: Observable<ReactionOption | null>, public readonly reaction$: Behavior<ReactionOption | null>,
) { ) {
super( super(
id, id,
@@ -401,16 +405,17 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel {
displayname$, displayname$,
); );
const media$ = participant$.pipe( const media$ = participant$
switchMap((p) => (p && observeParticipantMedia(p)) ?? of(undefined)), .pipe(
this.scope.state(), switchMap((p) => (p && observeParticipantMedia(p)) ?? of(undefined)),
); )
this.audioEnabled$ = media$.pipe( .behavior(this.scope);
map((m) => m?.microphoneTrack?.isMuted === false), this.audioEnabled$ = media$
); .pipe(map((m) => m?.microphoneTrack?.isMuted === false))
this.videoEnabled$ = media$.pipe( .behavior(this.scope);
map((m) => m?.cameraTrack?.isMuted === false), this.videoEnabled$ = media$
); .pipe(map((m) => m?.cameraTrack?.isMuted === false))
.behavior(this.scope);
} }
public toggleFitContain(): void { public toggleFitContain(): void {
@@ -436,19 +441,20 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel {
/** /**
* Whether the video should be mirrored. * Whether the video should be mirrored.
*/ */
public readonly mirror$ = this.video$.pipe( public readonly mirror$ = this.video$
switchMap((v) => { .pipe(
const track = v?.publication?.track; switchMap((v) => {
if (!(track instanceof LocalTrack)) return of(false); const track = v?.publication?.track;
// Watch for track restarts, because they indicate a camera switch if (!(track instanceof LocalTrack)) return of(false);
return fromEvent(track, TrackEvent.Restarted).pipe( // Watch for track restarts, because they indicate a camera switch
startWith(null), return fromEvent(track, TrackEvent.Restarted).pipe(
// Mirror only front-facing cameras (those that face the user) startWith(null),
map(() => facingModeFromLocalTrack(track).facingMode === "user"), // Mirror only front-facing cameras (those that face the user)
); map(() => facingModeFromLocalTrack(track).facingMode === "user"),
}), );
this.scope.state(), }),
); )
.behavior(this.scope);
/** /**
* Whether to show this tile in a highly visible location near the start of * Whether to show this tile in a highly visible location near the start of
@@ -464,8 +470,8 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel {
encryptionSystem: EncryptionSystem, encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom, livekitRoom: LivekitRoom,
displayname$: Observable<string>, displayname$: Observable<string>,
handRaised$: Observable<Date | null>, handRaised$: Behavior<Date | null>,
reaction$: Observable<ReactionOption | null>, reaction$: Behavior<ReactionOption | null>,
) { ) {
super( super(
id, id,
@@ -512,43 +518,43 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel {
* The volume to which this participant's audio is set, as a scalar * The volume to which this participant's audio is set, as a scalar
* multiplier. * multiplier.
*/ */
public readonly localVolume$: Observable<number> = merge( public readonly localVolume$: Behavior<number> = merge(
this.locallyMutedToggle$.pipe(map(() => "toggle mute" as const)), this.locallyMutedToggle$.pipe(map(() => "toggle mute" as const)),
this.localVolumeAdjustment$, this.localVolumeAdjustment$,
this.localVolumeCommit$.pipe(map(() => "commit" as const)), this.localVolumeCommit$.pipe(map(() => "commit" as const)),
).pipe( )
accumulate({ volume: 1, committedVolume: 1 }, (state, event) => { .pipe(
switch (event) { accumulate({ volume: 1, committedVolume: 1 }, (state, event) => {
case "toggle mute": switch (event) {
return { case "toggle mute":
...state, return {
volume: state.volume === 0 ? state.committedVolume : 0, ...state,
}; volume: state.volume === 0 ? state.committedVolume : 0,
case "commit": };
// Dragging the slider to zero should have the same effect as case "commit":
// muting: keep the original committed volume, as if it were never // Dragging the slider to zero should have the same effect as
// dragged // muting: keep the original committed volume, as if it were never
return { // dragged
...state, return {
committedVolume: ...state,
state.volume === 0 ? state.committedVolume : state.volume, committedVolume:
}; state.volume === 0 ? state.committedVolume : state.volume,
default: };
// Volume adjustment default:
return { ...state, volume: event }; // Volume adjustment
} return { ...state, volume: event };
}), }
map(({ volume }) => volume), }),
this.scope.state(), map(({ volume }) => volume),
); )
.behavior(this.scope);
/** /**
* Whether this participant's audio is disabled. * Whether this participant's audio is disabled.
*/ */
public readonly locallyMuted$: Observable<boolean> = this.localVolume$.pipe( public readonly locallyMuted$: Behavior<boolean> = this.localVolume$
map((volume) => volume === 0), .pipe(map((volume) => volume === 0))
this.scope.state(), .behavior(this.scope);
);
public constructor( public constructor(
id: string, id: string,
@@ -557,8 +563,8 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel {
encryptionSystem: EncryptionSystem, encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom, livekitRoom: LivekitRoom,
displayname$: Observable<string>, displayname$: Observable<string>,
handRaised$: Observable<Date | null>, handRaised$: Behavior<Date | null>,
reaction$: Observable<ReactionOption | null>, reaction$: Behavior<ReactionOption | null>,
) { ) {
super( super(
id, id,

View File

@@ -5,13 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details. Please see LICENSE in the repository root for full details.
*/ */
import { import { type Observable, Subject, takeUntil } from "rxjs";
distinctUntilChanged,
type Observable,
shareReplay,
Subject,
takeUntil,
} from "rxjs";
type MonoTypeOperator = <T>(o: Observable<T>) => Observable<T>; type MonoTypeOperator = <T>(o: Observable<T>) => Observable<T>;
@@ -31,22 +25,6 @@ export class ObservableScope {
return this.bindImpl; return this.bindImpl;
} }
private readonly stateImpl: MonoTypeOperator = (o$) =>
o$.pipe(
this.bind(),
distinctUntilChanged(),
shareReplay({ bufferSize: 1, refCount: false }),
);
/**
* Transforms an Observable into a hot state Observable which replays its
* latest value upon subscription, skips updates with identical values, and
* is bound to this scope.
*/
public state(): MonoTypeOperator {
return this.stateImpl;
}
/** /**
* Ends the scope, causing any bound Observables to complete. * Ends the scope, causing any bound Observables to complete.
*/ */

View File

@@ -47,6 +47,7 @@ import {
} from "../config/ConfigOptions"; } from "../config/ConfigOptions";
import { Config } from "../config/Config"; import { Config } from "../config/Config";
import { type MediaDevices } from "../state/MediaDevices"; import { type MediaDevices } from "../state/MediaDevices";
import { constant } from "../state/Behavior";
export function withFakeTimers(continuation: () => void): void { export function withFakeTimers(continuation: () => void): void {
vi.useFakeTimers(); vi.useFakeTimers();
@@ -217,8 +218,8 @@ export async function withLocalMedia(
}, },
mockLivekitRoom({ localParticipant }), mockLivekitRoom({ localParticipant }),
of(roomMember.rawDisplayName ?? "nodisplayname"), of(roomMember.rawDisplayName ?? "nodisplayname"),
of(null), constant(null),
of(null), constant(null),
); );
try { try {
await continuation(vm); await continuation(vm);
@@ -256,8 +257,8 @@ export async function withRemoteMedia(
}, },
mockLivekitRoom({}, { remoteParticipants$: of([remoteParticipant]) }), mockLivekitRoom({}, { remoteParticipants$: of([remoteParticipant]) }),
of(roomMember.rawDisplayName ?? "nodisplayname"), of(roomMember.rawDisplayName ?? "nodisplayname"),
of(null), constant(null),
of(null), constant(null),
); );
try { try {
await continuation(vm); await continuation(vm);