Move 'behavior' to be a method on ObservableScope

This commit is contained in:
Robin
2025-07-12 00:20:44 -04:00
parent 32bf1c30d2
commit 2b76d3dd70
7 changed files with 406 additions and 402 deletions

View File

@@ -5,9 +5,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/
import { BehaviorSubject, distinctUntilChanged, Observable } from "rxjs";
import { type ObservableScope } from "./ObservableScope";
import { BehaviorSubject } from "rxjs";
/**
* A stateful, read-only reactive value. As an Observable, it is "hot" and
@@ -26,38 +24,3 @@ export type Behavior<T> = Omit<BehaviorSubject<T>, "next" | "observers">;
export function constant<T>(value: T): Behavior<T> {
return new BehaviorSubject(value);
}
declare module "rxjs" {
interface Observable<T> {
/**
* Converts this Observable into a Behavior. This requires the Observable to
* synchronously emit an initial value.
*/
behavior(scope: ObservableScope): Behavior<T>;
}
}
const nothing = Symbol("nothing");
Observable.prototype.behavior = function <T>(
this: Observable<T>,
scope: ObservableScope,
): Behavior<T> {
const subject$ = new BehaviorSubject<T | typeof nothing>(nothing);
// Push values from the Observable into the BehaviorSubject.
// BehaviorSubjects have an undesirable feature where if you call 'complete',
// they will no longer re-emit their current value upon subscription. We want
// to support Observables that complete (for example `of({})`), so we have to
// take care to not propagate the completion event.
this.pipe(scope.bind(), distinctUntilChanged()).subscribe({
next(value) {
subject$.next(value);
},
error(err) {
subject$.error(err);
},
});
if (subject$.value === nothing)
throw new Error("Behavior failed to synchronously emit an initial value");
return subject$ as Behavior<T>;
};