2023-12-01 17:43:09 -05:00
|
|
|
/*
|
2024-09-06 10:22:13 +02:00
|
|
|
Copyright 2023, 2024 New Vector Ltd.
|
2023-12-01 17:43:09 -05:00
|
|
|
|
2025-02-18 17:59:58 +00:00
|
|
|
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
2024-09-06 10:22:13 +02:00
|
|
|
Please see LICENSE in the repository root for full details.
|
2023-12-01 17:43:09 -05:00
|
|
|
*/
|
|
|
|
|
|
2024-12-11 09:27:55 +00:00
|
|
|
import { type Observable, defer, finalize, scan, startWith, tap } from "rxjs";
|
2023-12-01 17:43:09 -05:00
|
|
|
|
|
|
|
|
const nothing = Symbol("nothing");
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* RxJS operator that invokes a callback when the Observable is finalized,
|
|
|
|
|
* passing the most recently emitted value. If no value was emitted, the
|
|
|
|
|
* callback will not be invoked.
|
|
|
|
|
*/
|
|
|
|
|
export function finalizeValue<T>(callback: (finalValue: T) => void) {
|
2024-12-17 04:01:56 +00:00
|
|
|
return (source$: Observable<T>): Observable<T> =>
|
2023-12-01 17:43:09 -05:00
|
|
|
defer(() => {
|
|
|
|
|
let finalValue: T | typeof nothing = nothing;
|
2024-12-17 04:01:56 +00:00
|
|
|
return source$.pipe(
|
2023-12-01 17:43:09 -05:00
|
|
|
tap((value) => (finalValue = value)),
|
|
|
|
|
finalize(() => {
|
|
|
|
|
if (finalValue !== nothing) callback(finalValue);
|
|
|
|
|
}),
|
|
|
|
|
);
|
|
|
|
|
});
|
|
|
|
|
}
|
2024-07-03 15:08:30 -04:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* RxJS operator that accumulates a state from a source of events. This is like
|
|
|
|
|
* scan, except it emits an initial value immediately before any events arrive.
|
|
|
|
|
*/
|
|
|
|
|
export function accumulate<State, Event>(
|
|
|
|
|
initial: State,
|
|
|
|
|
update: (state: State, event: Event) => State,
|
|
|
|
|
) {
|
2024-12-17 04:01:56 +00:00
|
|
|
return (events$: Observable<Event>): Observable<State> =>
|
|
|
|
|
events$.pipe(scan(update, initial), startWith(initial));
|
2024-07-03 15:08:30 -04:00
|
|
|
}
|
2025-06-20 12:37:25 -04:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Reads the current value of a state Observable without reacting to future
|
|
|
|
|
* changes.
|
|
|
|
|
*
|
|
|
|
|
* This function exists to help with certain cases of bridging Observables into
|
|
|
|
|
* React, where an initial value is needed. You should never use it to create an
|
|
|
|
|
* Observable derived from another Observable; use reactive operators instead.
|
|
|
|
|
*/
|
|
|
|
|
export function getValue<T>(state$: Observable<T>): T {
|
|
|
|
|
let value: T | typeof nothing = nothing;
|
|
|
|
|
state$.subscribe((x) => (value = x)).unsubscribe();
|
|
|
|
|
if (value === nothing) throw new Error("Not a state Observable");
|
|
|
|
|
return value;
|
|
|
|
|
}
|