Simplify and improve locality of the device name request logic
This commit is contained in:
@@ -5,7 +5,17 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
Please see LICENSE in the repository root for full details.
|
||||
*/
|
||||
|
||||
import { type Observable, defer, finalize, scan, startWith, tap } from "rxjs";
|
||||
import {
|
||||
type Observable,
|
||||
concat,
|
||||
defer,
|
||||
finalize,
|
||||
map,
|
||||
scan,
|
||||
startWith,
|
||||
takeWhile,
|
||||
tap,
|
||||
} from "rxjs";
|
||||
|
||||
const nothing = Symbol("nothing");
|
||||
|
||||
@@ -39,6 +49,29 @@ export function accumulate<State, Event>(
|
||||
events$.pipe(scan(update, initial), startWith(initial));
|
||||
}
|
||||
|
||||
const switchSymbol = Symbol("switch");
|
||||
|
||||
/**
|
||||
* RxJS operator which behaves like the input Observable (A) until it emits a
|
||||
* value satisfying the given predicate, then behaves like Observable B.
|
||||
*
|
||||
* The switch is immediate; the value that triggers the switch will not be
|
||||
* present in the output.
|
||||
*/
|
||||
export function switchWhen<A, B>(
|
||||
predicate: (a: A, index: number) => boolean,
|
||||
b$: Observable<B>,
|
||||
) {
|
||||
return (a$: Observable<A>): Observable<A | B> =>
|
||||
concat(
|
||||
a$.pipe(
|
||||
map((a, index) => (predicate(a, index) ? switchSymbol : a)),
|
||||
takeWhile((a) => a !== switchSymbol),
|
||||
) as Observable<A>,
|
||||
b$,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the current value of a state Observable without reacting to future
|
||||
* changes.
|
||||
|
||||
Reference in New Issue
Block a user