/* Copyright 2024 New Vector Ltd. SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ import { BehaviorSubject, distinctUntilChanged, type Observable, Subject, takeUntil, } from "rxjs"; import { type Behavior } from "./Behavior"; type MonoTypeOperator = (o: Observable) => Observable; const nothing = Symbol("nothing"); /** * A scope which limits the execution lifetime of its bound Observables. */ export class ObservableScope { private readonly ended$ = new Subject(); private readonly bindImpl: MonoTypeOperator = takeUntil(this.ended$); /** * Binds an Observable to this scope, so that it completes when the scope * ends. */ public bind(): MonoTypeOperator { return this.bindImpl; } /** * Converts an Observable to a Behavior. If no initial value is specified, the * Observable must synchronously emit an initial value. */ public behavior( setValue$: Observable, initialValue: T | typeof nothing = nothing, ): Behavior { const subject$ = new BehaviorSubject(initialValue); // Push values from the Observable into the BehaviorSubject. // BehaviorSubjects have an undesirable feature where if you call 'complete', // they will no longer re-emit their current value upon subscription. We want // to support Observables that complete (for example `of({})`), so we have to // take care to not propagate the completion event. setValue$.pipe(this.bind(), distinctUntilChanged()).subscribe({ next(value) { subject$.next(value); }, error(err: unknown) { subject$.error(err); }, }); if (subject$.value === nothing) throw new Error("Behavior failed to synchronously emit an initial value"); return subject$ as Behavior; } /** * Ends the scope, causing any bound Observables to complete. */ public end(): void { this.ended$.next(); this.ended$.complete(); } } /** * The global scope, a scope which never ends. */ export const globalScope = new ObservableScope();