Replace generateKeyed$ with a redesigned generateItems operator
And use it to clean up a number of code smells, fix some reactivity bugs, and avoid some resource leaks.
This commit is contained in:
@@ -9,7 +9,7 @@ import { test } from "vitest";
|
||||
import { Subject } from "rxjs";
|
||||
|
||||
import { withTestScheduler } from "./test";
|
||||
import { generateKeyed$, pauseWhen } from "./observable";
|
||||
import { generateItems, pauseWhen } from "./observable";
|
||||
|
||||
test("pauseWhen", () => {
|
||||
withTestScheduler(({ behavior, expectObservable }) => {
|
||||
@@ -24,7 +24,7 @@ test("pauseWhen", () => {
|
||||
});
|
||||
});
|
||||
|
||||
test("generateKeyed$ has the right output and ends scopes at the right times", () => {
|
||||
test("generateItems", () => {
|
||||
const scope1$ = new Subject<string>();
|
||||
const scope2$ = new Subject<string>();
|
||||
const scope3$ = new Subject<string>();
|
||||
@@ -44,18 +44,27 @@ test("generateKeyed$ has the right output and ends scopes at the right times", (
|
||||
const scope4Marbles = " ----yn";
|
||||
|
||||
expectObservable(
|
||||
generateKeyed$(hot<string>(inputMarbles), (input, createOrGet) => {
|
||||
for (let i = 1; i <= +input; i++) {
|
||||
createOrGet(i.toString(), (scope) => {
|
||||
hot<string>(inputMarbles).pipe(
|
||||
generateItems(
|
||||
function* (input) {
|
||||
for (let i = 1; i <= +input; i++) {
|
||||
yield { keys: [i], data: undefined };
|
||||
}
|
||||
},
|
||||
(scope, data$, i) => {
|
||||
scopeSubjects[i - 1].next("y");
|
||||
scope.onEnd(() => scopeSubjects[i - 1].next("n"));
|
||||
return i.toString();
|
||||
});
|
||||
}
|
||||
return "abcd"[+input - 1];
|
||||
}),
|
||||
},
|
||||
),
|
||||
),
|
||||
subscriptionMarbles,
|
||||
).toBe(outputMarbles);
|
||||
).toBe(outputMarbles, {
|
||||
a: ["1"],
|
||||
b: ["1", "2"],
|
||||
c: ["1", "2", "3"],
|
||||
d: ["1", "2", "3", "4"],
|
||||
});
|
||||
|
||||
expectObservable(scope1$).toBe(scope1Marbles);
|
||||
expectObservable(scope2$).toBe(scope2Marbles);
|
||||
|
||||
@@ -20,10 +20,12 @@ import {
|
||||
takeWhile,
|
||||
tap,
|
||||
withLatestFrom,
|
||||
BehaviorSubject,
|
||||
type OperatorFunction,
|
||||
} from "rxjs";
|
||||
|
||||
import { type Behavior } from "../state/Behavior";
|
||||
import { ObservableScope } from "../state/ObservableScope";
|
||||
import { Epoch, ObservableScope } from "../state/ObservableScope";
|
||||
|
||||
const nothing = Symbol("nothing");
|
||||
|
||||
@@ -119,70 +121,156 @@ export function pauseWhen<T>(pause$: Behavior<boolean>) {
|
||||
);
|
||||
}
|
||||
|
||||
interface ItemHandle<Data, Item> {
|
||||
scope: ObservableScope;
|
||||
data$: BehaviorSubject<Data>;
|
||||
item: Item;
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps a changing input value to an output value consisting of items that have
|
||||
* automatically generated ObservableScopes tied to a key. Items will be
|
||||
* automatically created when their key is requested for the first time, reused
|
||||
* when the same key is requested at a later time, and destroyed (have their
|
||||
* scope ended) when the key is no longer requested.
|
||||
* Maps a changing input value to a collection of items that each capture some
|
||||
* dynamic data and are tied to a key. Items will be automatically created when
|
||||
* their key is requested for the first time, reused when the same key is
|
||||
* requested at a later time, and destroyed (have their scope ended) when the
|
||||
* key is no longer requested.
|
||||
*
|
||||
* @param input$ The input value to be mapped.
|
||||
* @param project A function mapping input values to output values. This
|
||||
* function receives an additional callback `createOrGet` which can be used
|
||||
* within the function body to request that an item be generated for a certain
|
||||
* key. The caller provides a factory which will be used to create the item if
|
||||
* it is being requested for the first time. Otherwise, the item previously
|
||||
* existing under that key will be returned.
|
||||
* @param generator A generator function yielding a tuple of keys and the
|
||||
* currently associated data for each item that it wants to exist.
|
||||
* @param factory A function constructing an individual item, given the item's key,
|
||||
* dynamic data, and an automatically managed ObservableScope for the item.
|
||||
*/
|
||||
export function generateKeyed$<In, Item, Out>(
|
||||
input$: Observable<In>,
|
||||
project: (
|
||||
input: In,
|
||||
createOrGet: (
|
||||
key: string,
|
||||
factory: (scope: ObservableScope) => Item,
|
||||
) => Item,
|
||||
) => Out,
|
||||
): Observable<Out> {
|
||||
return input$.pipe(
|
||||
// Keep track of the existing items over time, so we can reuse them
|
||||
scan<
|
||||
In,
|
||||
{
|
||||
items: Map<string, { item: Item; scope: ObservableScope }>;
|
||||
output: Out;
|
||||
},
|
||||
{ items: Map<string, { item: Item; scope: ObservableScope }> }
|
||||
>(
|
||||
(state, data) => {
|
||||
const nextItems = new Map<
|
||||
string,
|
||||
{ item: Item; scope: ObservableScope }
|
||||
>();
|
||||
export function generateItems<
|
||||
Input,
|
||||
Keys extends [unknown, ...unknown[]],
|
||||
Data,
|
||||
Item,
|
||||
>(
|
||||
generator: (
|
||||
input: Input,
|
||||
) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>,
|
||||
factory: (
|
||||
scope: ObservableScope,
|
||||
data$: Behavior<Data>,
|
||||
...keys: Keys
|
||||
) => Item,
|
||||
): OperatorFunction<Input, Item[]> {
|
||||
return generateItemsInternal(generator, factory, (items) => items);
|
||||
}
|
||||
|
||||
const output = project(data, (key, factory) => {
|
||||
let item = state.items.get(key);
|
||||
if (item === undefined) {
|
||||
// First time requesting the key; create the item
|
||||
const scope = new ObservableScope();
|
||||
item = { item: factory(scope), scope };
|
||||
}
|
||||
nextItems.set(key, item);
|
||||
return item.item;
|
||||
});
|
||||
|
||||
// Destroy all items that are no longer being requested
|
||||
for (const [key, { scope }] of state.items)
|
||||
if (!nextItems.has(key)) scope.end();
|
||||
|
||||
return { items: nextItems, output };
|
||||
},
|
||||
{ items: new Map() },
|
||||
),
|
||||
finalizeValue((state) => {
|
||||
// Destroy all remaining items when no longer subscribed
|
||||
for (const { scope } of state.items.values()) scope.end();
|
||||
}),
|
||||
map(({ output }) => output),
|
||||
/**
|
||||
* Same as generateItems, but preserves epoch data.
|
||||
*/
|
||||
export function generateItemsWithEpoch<
|
||||
Input,
|
||||
Keys extends [unknown, ...unknown[]],
|
||||
Data,
|
||||
Item,
|
||||
>(
|
||||
generator: (
|
||||
input: Input,
|
||||
) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>,
|
||||
factory: (
|
||||
scope: ObservableScope,
|
||||
data$: Behavior<Data>,
|
||||
...keys: Keys
|
||||
) => Item,
|
||||
): OperatorFunction<Epoch<Input>, Epoch<Item[]>> {
|
||||
return generateItemsInternal(
|
||||
function* (input) {
|
||||
yield* generator(input.value);
|
||||
},
|
||||
factory,
|
||||
(items, input) => new Epoch(items, input.epoch),
|
||||
);
|
||||
}
|
||||
|
||||
function generateItemsInternal<
|
||||
Input,
|
||||
Keys extends [unknown, ...unknown[]],
|
||||
Data,
|
||||
Item,
|
||||
Output,
|
||||
>(
|
||||
generator: (
|
||||
input: Input,
|
||||
) => Generator<{ keys: readonly [...Keys]; data: Data }, void, void>,
|
||||
factory: (
|
||||
scope: ObservableScope,
|
||||
data$: Behavior<Data>,
|
||||
...keys: Keys
|
||||
) => Item,
|
||||
project: (items: Item[], input: Input) => Output,
|
||||
): OperatorFunction<Input, Output> {
|
||||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||
return (input$) =>
|
||||
input$.pipe(
|
||||
// Keep track of the existing items over time, so they can persist
|
||||
scan<
|
||||
Input,
|
||||
{
|
||||
map: Map<any, any>;
|
||||
items: Set<ItemHandle<Data, Item>>;
|
||||
input: Input;
|
||||
},
|
||||
{ map: Map<any, any>; items: Set<ItemHandle<Data, Item>> }
|
||||
>(
|
||||
({ map: prevMap, items: prevItems }, input) => {
|
||||
const nextMap = new Map();
|
||||
const nextItems = new Set<ItemHandle<Data, Item>>();
|
||||
|
||||
for (const { keys, data } of generator(input)) {
|
||||
// Disable type checks for a second to grab the item out of a nested map
|
||||
let i: any = prevMap;
|
||||
for (const key of keys) i = i?.get(key);
|
||||
let item = i as ItemHandle<Data, Item> | undefined;
|
||||
|
||||
if (item === undefined) {
|
||||
// First time requesting the key; create the item
|
||||
const scope = new ObservableScope();
|
||||
const data$ = new BehaviorSubject(data);
|
||||
item = { scope, data$, item: factory(scope, data$, ...keys) };
|
||||
} else {
|
||||
item.data$.next(data);
|
||||
}
|
||||
|
||||
// Likewise, disable type checks to insert the item in the nested map
|
||||
let m: Map<any, any> = nextMap;
|
||||
for (let i = 0; i < keys.length - 1; i++) {
|
||||
let inner = m.get(keys[i]);
|
||||
if (inner === undefined) {
|
||||
inner = new Map();
|
||||
m.set(keys[i], inner);
|
||||
}
|
||||
m = inner;
|
||||
}
|
||||
const finalKey = keys[keys.length - 1];
|
||||
if (m.has(finalKey))
|
||||
throw new Error(
|
||||
`Keys must be unique (tried to generate multiple items for key ${keys})`,
|
||||
);
|
||||
m.set(keys[keys.length - 1], item);
|
||||
nextItems.add(item);
|
||||
}
|
||||
|
||||
// Destroy all items that are no longer being requested
|
||||
for (const item of prevItems)
|
||||
if (!nextItems.has(item)) item.scope.end();
|
||||
|
||||
return { map: nextMap, items: nextItems, input };
|
||||
},
|
||||
{ map: new Map(), items: new Set() },
|
||||
),
|
||||
finalizeValue(({ items }) => {
|
||||
// Destroy all remaining items when no longer subscribed
|
||||
for (const { scope } of items) scope.end();
|
||||
}),
|
||||
map(({ items, input }) =>
|
||||
project(
|
||||
[...items].map(({ item }) => item),
|
||||
input,
|
||||
),
|
||||
),
|
||||
);
|
||||
/* eslint-enable @typescript-eslint/no-explicit-any */
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details.
|
||||
*/
|
||||
|
||||
import { type CallMembership } from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { BehaviorSubject, of } from "rxjs";
|
||||
import { BehaviorSubject } from "rxjs";
|
||||
import { vitest } from "vitest";
|
||||
import { type RelationsContainer } from "matrix-js-sdk/lib/models/relations-container";
|
||||
import EventEmitter from "events";
|
||||
@@ -158,7 +158,7 @@ export function getBasicCallViewModelEnvironment(
|
||||
},
|
||||
handRaisedSubject$,
|
||||
reactionsSubject$,
|
||||
of({ processor: undefined, supported: false }),
|
||||
constant({ processor: undefined, supported: false }),
|
||||
);
|
||||
return {
|
||||
vm,
|
||||
|
||||
@@ -304,18 +304,20 @@ export function createLocalMedia(
|
||||
localParticipant: LocalParticipant,
|
||||
mediaDevices: MediaDevices,
|
||||
): LocalUserMediaViewModel {
|
||||
const member = mockMatrixRoomMember(localRtcMember, roomMember);
|
||||
return new LocalUserMediaViewModel(
|
||||
testScope(),
|
||||
"local",
|
||||
mockMatrixRoomMember(localRtcMember, roomMember),
|
||||
member.userId,
|
||||
constant(localParticipant),
|
||||
{
|
||||
kind: E2eeType.PER_PARTICIPANT,
|
||||
},
|
||||
mockLivekitRoom({ localParticipant }),
|
||||
"https://rtc-example.org",
|
||||
constant(mockLivekitRoom({ localParticipant })),
|
||||
constant("https://rtc-example.org"),
|
||||
mediaDevices,
|
||||
constant(roomMember.rawDisplayName ?? "nodisplayname"),
|
||||
constant(member.rawDisplayName ?? "nodisplayname"),
|
||||
constant(member.getMxcAvatarUrl()),
|
||||
constant(null),
|
||||
constant(null),
|
||||
);
|
||||
@@ -339,19 +341,23 @@ export function createRemoteMedia(
|
||||
roomMember: Partial<RoomMember>,
|
||||
participant: Partial<RemoteParticipant>,
|
||||
): RemoteUserMediaViewModel {
|
||||
const member = mockMatrixRoomMember(localRtcMember, roomMember);
|
||||
const remoteParticipant = mockRemoteParticipant(participant);
|
||||
return new RemoteUserMediaViewModel(
|
||||
testScope(),
|
||||
"remote",
|
||||
mockMatrixRoomMember(localRtcMember, roomMember),
|
||||
member.userId,
|
||||
of(remoteParticipant),
|
||||
{
|
||||
kind: E2eeType.PER_PARTICIPANT,
|
||||
},
|
||||
mockLivekitRoom({}, { remoteParticipants$: of([remoteParticipant]) }),
|
||||
"https://rtc-example.org",
|
||||
constant(
|
||||
mockLivekitRoom({}, { remoteParticipants$: of([remoteParticipant]) }),
|
||||
),
|
||||
constant("https://rtc-example.org"),
|
||||
constant(false),
|
||||
constant(roomMember.rawDisplayName ?? "nodisplayname"),
|
||||
constant(member.rawDisplayName ?? "nodisplayname"),
|
||||
constant(member.getMxcAvatarUrl()),
|
||||
constant(null),
|
||||
constant(null),
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user