/* Copyright 2024 New Vector Ltd. SPDX-License-Identifier: AGPL-3.0-only Please see LICENSE in the repository root for full details. */ import { distinctUntilChanged, type Observable, shareReplay, Subject, takeUntil, } from "rxjs"; type MonoTypeOperator = (o: Observable) => Observable; /** * A scope which limits the execution lifetime of its bound Observables. */ export class ObservableScope { private readonly ended = new Subject(); private readonly bindImpl: MonoTypeOperator = takeUntil(this.ended); /** * Binds an Observable to this scope, so that it completes when the scope * ends. */ public bind(): MonoTypeOperator { return this.bindImpl; } private readonly stateImpl: MonoTypeOperator = (o) => o.pipe( this.bind(), distinctUntilChanged(), shareReplay({ bufferSize: 1, refCount: false }), ); /** * Transforms an Observable into a hot state Observable which replays its * latest value upon subscription, skips updates with identical values, and * is bound to this scope. */ public state(): MonoTypeOperator { return this.stateImpl; } /** * Ends the scope, causing any bound Observables to complete. */ public end(): void { this.ended.next(); this.ended.complete(); } }