Use Behaviors even more consistently

This commit is contained in:
Robin
2025-07-11 23:53:59 -04:00
parent 586a923be3
commit 32bf1c30d2
4 changed files with 94 additions and 83 deletions

View File

@@ -157,10 +157,13 @@ export function useLivekit(
useObservableEagerState( useObservableEagerState(
useObservable( useObservable(
(room$) => (room$) =>
observeTrackReference$( room$.pipe(
room$.pipe(map(([room]) => room.localParticipant)), switchMap(([room]) =>
Track.Source.Camera, observeTrackReference$(
).pipe( room.localParticipant,
Track.Source.Camera,
),
),
map((trackRef) => { map((trackRef) => {
const track = trackRef?.publication?.track; const track = trackRef?.publication?.track;
return track instanceof LocalVideoTrack ? track : null; return track instanceof LocalVideoTrack ? track : null;

View File

@@ -251,8 +251,8 @@ class UserMedia {
LocalParticipant | RemoteParticipant | undefined LocalParticipant | RemoteParticipant | undefined
>; >;
public readonly speaker$: Observable<boolean>; public readonly speaker$: Behavior<boolean>;
public readonly presenter$: Observable<boolean>; public readonly presenter$: Behavior<boolean>;
public constructor( public constructor(
public readonly id: string, public readonly id: string,
member: RoomMember | undefined, member: RoomMember | undefined,
@@ -269,7 +269,7 @@ class UserMedia {
this.vm = new LocalUserMediaViewModel( this.vm = new LocalUserMediaViewModel(
this.id, this.id,
member, member,
this.participant$.asObservable() as Observable<LocalParticipant>, this.participant$ as Behavior<LocalParticipant>,
encryptionSystem, encryptionSystem,
livekitRoom, livekitRoom,
displayname$.behavior(this.scope), displayname$.behavior(this.scope),
@@ -386,21 +386,23 @@ function getRoomMemberFromRtcMember(
// TODO: Move wayyyy more business logic from the call and lobby views into here // TODO: Move wayyyy more business logic from the call and lobby views into here
export class CallViewModel extends ViewModel { export class CallViewModel extends ViewModel {
public readonly localVideo$: Observable<LocalVideoTrack | null> = public readonly localVideo$: Behavior<LocalVideoTrack | null> =
observeTrackReference$( observeTrackReference$(
of(this.livekitRoom.localParticipant), this.livekitRoom.localParticipant,
Track.Source.Camera, Track.Source.Camera,
).pipe( )
map((trackRef) => { .pipe(
const track = trackRef?.publication?.track; map((trackRef) => {
return track instanceof LocalVideoTrack ? track : null; const track = trackRef?.publication?.track;
}), return track instanceof LocalVideoTrack ? track : null;
); }),
)
.behavior(this.scope);
/** /**
* The raw list of RemoteParticipants as reported by LiveKit * The raw list of RemoteParticipants as reported by LiveKit
*/ */
private readonly rawRemoteParticipants$: Observable<RemoteParticipant[]> = private readonly rawRemoteParticipants$: Behavior<RemoteParticipant[]> =
connectedParticipantsObserver(this.livekitRoom) connectedParticipantsObserver(this.livekitRoom)
.pipe(startWith([])) .pipe(startWith([]))
.behavior(this.scope); .behavior(this.scope);
@@ -409,44 +411,46 @@ export class CallViewModel extends ViewModel {
* Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that * Lists of RemoteParticipants to "hold" on display, even if LiveKit claims that
* they've left * they've left
*/ */
private readonly remoteParticipantHolds$: Observable<RemoteParticipant[][]> = private readonly remoteParticipantHolds$: Behavior<RemoteParticipant[][]> =
this.connectionState$.pipe( this.connectionState$
withLatestFrom(this.rawRemoteParticipants$), .pipe(
mergeMap(([s, ps]) => { withLatestFrom(this.rawRemoteParticipants$),
// Whenever we switch focuses, we should retain all the previous mergeMap(([s, ps]) => {
// participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to // Whenever we switch focuses, we should retain all the previous
// give their clients time to switch over and avoid jarring layout shifts // participants for at least POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS ms to
if (s === ECAddonConnectionState.ECSwitchingFocus) { // give their clients time to switch over and avoid jarring layout shifts
return concat( if (s === ECAddonConnectionState.ECSwitchingFocus) {
// Hold these participants return concat(
of({ hold: ps }), // Hold these participants
// Wait for time to pass and the connection state to have changed of({ hold: ps }),
forkJoin([ // Wait for time to pass and the connection state to have changed
timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS), forkJoin([
this.connectionState$.pipe( timer(POST_FOCUS_PARTICIPANT_UPDATE_DELAY_MS),
filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus), this.connectionState$.pipe(
take(1), filter((s) => s !== ECAddonConnectionState.ECSwitchingFocus),
), take(1),
// Then unhold them ),
]).pipe(map(() => ({ unhold: ps }))), // Then unhold them
); ]).pipe(map(() => ({ unhold: ps }))),
} else { );
return EMPTY; } else {
} return EMPTY;
}), }
// Accumulate the hold instructions into a single list showing which }),
// participants are being held // Accumulate the hold instructions into a single list showing which
accumulate([] as RemoteParticipant[][], (holds, instruction) => // participants are being held
"hold" in instruction accumulate([] as RemoteParticipant[][], (holds, instruction) =>
? [instruction.hold, ...holds] "hold" in instruction
: holds.filter((h) => h !== instruction.unhold), ? [instruction.hold, ...holds]
), : holds.filter((h) => h !== instruction.unhold),
); ),
)
.behavior(this.scope);
/** /**
* The RemoteParticipants including those that are being "held" on the screen * The RemoteParticipants including those that are being "held" on the screen
*/ */
private readonly remoteParticipants$: Observable<RemoteParticipant[]> = private readonly remoteParticipants$: Behavior<RemoteParticipant[]> =
combineLatest( combineLatest(
[this.rawRemoteParticipants$, this.remoteParticipantHolds$], [this.rawRemoteParticipants$, this.remoteParticipantHolds$],
(raw, holds) => { (raw, holds) => {
@@ -465,7 +469,7 @@ export class CallViewModel extends ViewModel {
return result; return result;
}, },
); ).behavior(this.scope);
/** /**
* Displaynames for each member of the call. This will disambiguate * Displaynames for each member of the call. This will disambiguate
@@ -709,11 +713,13 @@ export class CallViewModel extends ViewModel {
/** /**
* List of MediaItems that we want to display, that are of type UserMedia * List of MediaItems that we want to display, that are of type UserMedia
*/ */
private readonly userMedia$: Observable<UserMedia[]> = this.mediaItems$.pipe( private readonly userMedia$: Behavior<UserMedia[]> = this.mediaItems$
map((mediaItems) => .pipe(
mediaItems.filter((m): m is UserMedia => m instanceof UserMedia), map((mediaItems) =>
), mediaItems.filter((m): m is UserMedia => m instanceof UserMedia),
); ),
)
.behavior(this.scope);
public readonly memberChanges$ = this.userMedia$ public readonly memberChanges$ = this.userMedia$
.pipe(map((mediaItems) => mediaItems.map((m) => m.id))) .pipe(map((mediaItems) => mediaItems.map((m) => m.id)))

View File

@@ -54,24 +54,16 @@ import { type ReactionOption } from "../reactions";
import { type Behavior } from "./Behavior"; import { type Behavior } from "./Behavior";
export function observeTrackReference$( export function observeTrackReference$(
participant$: Observable<Participant | undefined>, participant: Participant,
source: Track.Source, source: Track.Source,
): Observable<TrackReferenceOrPlaceholder | undefined> { ): Observable<TrackReferenceOrPlaceholder> {
return participant$.pipe( return observeParticipantMedia(participant).pipe(
switchMap((p) => { map(() => ({
if (p) { participant: participant,
return observeParticipantMedia(p).pipe( publication: participant.getTrackPublication(source),
map(() => ({ source,
participant: p, })),
publication: p.getTrackPublication(source), distinctUntilKeyChanged("publication"),
source,
})),
distinctUntilKeyChanged("publication"),
);
} else {
return of(undefined);
}
}),
); );
} }
@@ -83,7 +75,7 @@ export function observeRtpStreamStats$(
RTCInboundRtpStreamStats | RTCOutboundRtpStreamStats | undefined RTCInboundRtpStreamStats | RTCOutboundRtpStreamStats | undefined
> { > {
return combineLatest([ return combineLatest([
observeTrackReference$(of(participant), source), observeTrackReference$(participant, source),
interval(1000).pipe(startWith(0)), interval(1000).pipe(startWith(0)),
]).pipe( ]).pipe(
switchMap(async ([trackReference]) => { switchMap(async ([trackReference]) => {
@@ -237,6 +229,18 @@ abstract class BaseMediaViewModel extends ViewModel {
*/ */
public abstract readonly local: boolean; public abstract readonly local: boolean;
private observeTrackReference$(
source: Track.Source,
): Behavior<TrackReferenceOrPlaceholder | undefined> {
return this.participant$
.pipe(
switchMap((p) =>
p === undefined ? of(undefined) : observeTrackReference$(p, source),
),
)
.behavior(this.scope);
}
public constructor( public constructor(
/** /**
* An opaque identifier for this media. * An opaque identifier for this media.
@@ -261,12 +265,10 @@ abstract class BaseMediaViewModel extends ViewModel {
public readonly displayName$: Behavior<string>, public readonly displayName$: Behavior<string>,
) { ) {
super(); super();
const audio$ = observeTrackReference$(participant$, audioSource).behavior(
this.scope, const audio$ = this.observeTrackReference$(audioSource);
); this.video$ = this.observeTrackReference$(videoSource);
this.video$ = observeTrackReference$(participant$, videoSource).behavior(
this.scope,
);
this.unencryptedWarning$ = combineLatest( this.unencryptedWarning$ = combineLatest(
[audio$, this.video$], [audio$, this.video$],
(a, v) => (a, v) =>
@@ -466,7 +468,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel {
public constructor( public constructor(
id: string, id: string,
member: RoomMember | undefined, member: RoomMember | undefined,
participant$: Observable<LocalParticipant | undefined>, participant$: Behavior<LocalParticipant | undefined>,
encryptionSystem: EncryptionSystem, encryptionSystem: EncryptionSystem,
livekitRoom: LivekitRoom, livekitRoom: LivekitRoom,
displayName$: Behavior<string>, displayName$: Behavior<string>,

View File

@@ -243,7 +243,7 @@ export async function withLocalMedia(
const vm = new LocalUserMediaViewModel( const vm = new LocalUserMediaViewModel(
"local", "local",
mockMatrixRoomMember(localRtcMember, roomMember), mockMatrixRoomMember(localRtcMember, roomMember),
of(localParticipant), constant(localParticipant),
{ {
kind: E2eeType.PER_PARTICIPANT, kind: E2eeType.PER_PARTICIPANT,
}, },
@@ -331,7 +331,7 @@ export class MockRTCSession extends TypedEventEmitter<
} }
public withMemberships( public withMemberships(
rtcMembers$: Observable<Partial<CallMembership>[]>, rtcMembers$: Behavior<Partial<CallMembership>[]>,
): MockRTCSession { ): MockRTCSession {
rtcMembers$.subscribe((m) => { rtcMembers$.subscribe((m) => {
const old = this.memberships; const old = this.memberships;