fix playwright tests

This commit is contained in:
Timo K
2025-12-10 18:50:19 +01:00
parent ef2f53c38a
commit 6efce232f8
4 changed files with 129 additions and 80 deletions

View File

@@ -42,6 +42,7 @@ import { type Publisher } from "./Publisher.ts";
import { type MuteStates } from "../../MuteStates.ts";
import {
ElementCallError,
FailToStartLivekitConnection,
MembershipManagerError,
UnknownCallError,
} from "../../../utils/errors.ts";
@@ -56,6 +57,7 @@ import {
type FailedToStartError,
} from "../remoteMembers/Connection.ts";
import { type HomeserverConnected } from "./HomeserverConnected.ts";
import { and$ } from "../../../utils/observable.ts";
export enum TransportState {
/** Not even a transport is available to the LocalMembership */
@@ -86,13 +88,12 @@ export type LocalMemberMediaState =
}
| PublishState
| ElementCallError;
export type LocalMemberMatrixState = Error | RTCSessionStatus;
export type LocalMemberState =
| ElementCallError
| TransportState.Waiting
| {
media: LocalMemberMediaState;
matrix: LocalMemberMatrixState;
matrix: ElementCallError | RTCSessionStatus;
};
/*
@@ -220,10 +221,6 @@ export const createLocalMembership$ = ({
),
);
const localConnectionState$ = localConnection$.pipe(
switchMap((connection) => (connection ? connection.state$ : of(null))),
);
// MATRIX RELATED
// This should be used in a combineLatest with publisher$ to connect.
@@ -308,23 +305,27 @@ export const createLocalMembership$ = ({
try {
await publisher?.startPublishing();
} catch (error) {
setMediaError(error as ElementCallError);
const message =
error instanceof Error ? error.message : String(error);
setPublishError(new FailToStartLivekitConnection(message));
}
} else if (tracks.length !== 0 && !shouldJoinAndPublish) {
try {
await publisher?.stopPublishing();
} catch (error) {
setMediaError(new UnknownCallError(error as Error));
setPublishError(new UnknownCallError(error as Error));
}
}
},
);
const fatalMediaError$ = new BehaviorSubject<ElementCallError | null>(null);
const setMediaError = (e: ElementCallError): void => {
if (fatalMediaError$.value !== null)
logger.error("Multiple Media Errors:", e);
else fatalMediaError$.next(e);
// STATE COMPUTATION
// These are non fatal since we can join a room and concume media even though publishing failed.
const publishError$ = new BehaviorSubject<ElementCallError | null>(null);
const setPublishError = (e: ElementCallError): void => {
if (publishError$.value !== null) logger.error("Multiple Media Errors:", e);
else publishError$.next(e);
};
const fatalTransportError$ = new BehaviorSubject<ElementCallError | null>(
@@ -336,6 +337,10 @@ export const createLocalMembership$ = ({
else fatalTransportError$.next(e);
};
const localConnectionState$ = localConnection$.pipe(
switchMap((connection) => (connection ? connection.state$ : of(null))),
);
const mediaState$: Behavior<LocalMemberMediaState> = scope.behavior(
combineLatest([
localConnectionState$,
@@ -392,22 +397,22 @@ export const createLocalMembership$ = ({
homeserverConnected.rtsSession$,
fatalMatrixError$,
fatalTransportError$,
fatalMediaError$,
publishError$,
]).pipe(
map(
([
mediaState,
rtcSessionStatus,
matrixError,
transportError,
mediaError,
fatalMatrixError,
fatalTransportError,
publishError,
]) => {
if (transportError !== null) return transportError;
// `mediaState` will be 'null' until the transport appears.
if (fatalTransportError !== null) return fatalTransportError;
// `mediaState` will be 'null' until the transport/connection appears.
if (mediaState && rtcSessionStatus)
return {
matrix: matrixError ?? rtcSessionStatus,
media: mediaError ?? mediaState,
matrix: fatalMatrixError ?? rtcSessionStatus,
media: publishError ?? mediaState,
};
return TransportState.Waiting;
},
@@ -415,6 +420,31 @@ export const createLocalMembership$ = ({
),
);
/**
* Whether we are "fully" connected to the call. Accounts for both the
* connection to the MatrixRTC session and the LiveKit publish connection.
*/
const matrixAndLivekitConnected$ = scope.behavior(
and$(
homeserverConnected.combined$,
localConnectionState$.pipe(
map((state) => state === ConnectionState.LivekitConnected),
),
).pipe(
tap((v) => logger.debug("livekit+matrix: Connected state changed", v)),
),
);
/**
* Whether we should tell the user that we're reconnecting to the call.
*/
const reconnecting$ = scope.behavior(
matrixAndLivekitConnected$.pipe(
pairwise(),
map(([prev, current]) => prev === true && current === false),
),
);
// inform the widget about the connect and disconnect intent from the user.
scope
.behavior(joinAndPublishRequested$.pipe(pairwise(), scope.bind()), [
@@ -576,15 +606,7 @@ export const createLocalMembership$ = ({
localMemberState$,
tracks$,
participant$,
reconnecting$: scope.behavior(
localMemberState$.pipe(
map((state) => {
if (typeof state === "object" && "matrix" in state)
return state.matrix === RTCSessionStatus.Reconnecting;
return false;
}),
),
),
reconnecting$,
disconnected$: scope.behavior(
homeserverConnected.rtsSession$.pipe(
map((state) => state === RTCSessionStatus.Disconnected),

View File

@@ -32,15 +32,8 @@ import {
} from "../../../livekit/TrackProcessorContext.tsx";
import { getUrlParams } from "../../../UrlParams.ts";
import { observeTrackReference$ } from "../../MediaViewModel.ts";
import {
ConnectionState,
type Connection,
} from "../remoteMembers/Connection.ts";
import { type Connection } from "../remoteMembers/Connection.ts";
import { type ObservableScope } from "../../ObservableScope.ts";
import {
ElementCallError,
FailToStartLivekitConnection,
} from "../../../utils/errors.ts";
/**
* A wrapper for a Connection object.
@@ -160,27 +153,29 @@ export class Publisher {
public async startPublishing(): Promise<LocalTrack[]> {
this.logger.debug("startPublishing called");
const lkRoom = this.connection.livekitRoom;
const { promise, resolve, reject } = Promise.withResolvers<void>();
const sub = this.connection.state$.subscribe((state) => {
if (state instanceof Error) {
const error =
state instanceof ElementCallError
? state
: new FailToStartLivekitConnection(state.message);
reject(error);
} else if (state === ConnectionState.LivekitConnected) {
resolve();
} else {
this.logger.info("waiting for connection: ", state);
}
});
try {
await promise;
} catch (e) {
throw e;
} finally {
sub.unsubscribe();
}
// we do not need to do this since lk will wait in `localParticipant.publishTrack`
// const { promise, resolve, reject } = Promise.withResolvers<void>();
// const sub = this.connection.state$.subscribe((state) => {
// if (state instanceof Error) {
// const error =
// state instanceof ElementCallError
// ? state
// : new FailToStartLivekitConnection(state.message);
// reject(error);
// } else if (state === ConnectionState.LivekitConnected) {
// resolve();
// } else {
// this.logger.info("waiting for connection: ", state);
// }
// });
// try {
// await promise;
// } catch (e) {
// throw e;
// } finally {
// sub.unsubscribe();
// }
for (const track of this.tracks$.value) {
this.logger.info("publish ", this.tracks$.value.length, "tracks");
@@ -188,9 +183,10 @@ export class Publisher {
// with a timeout.
await lkRoom.localParticipant.publishTrack(track).catch((error) => {
this.logger.error("Failed to publish track", error);
throw new FailToStartLivekitConnection(
error instanceof Error ? error.message : error,
);
// throw new FailToStartLivekitConnection(
// error instanceof Error ? error.message : error,
// );
throw error;
});
this.logger.info("published track ", track.kind, track.id);