Refactor local membership publisher and connectionState related logic
This commit is contained in:
@@ -11,6 +11,7 @@ import {
|
||||
ParticipantEvent,
|
||||
type LocalParticipant,
|
||||
type ScreenShareCaptureOptions,
|
||||
ConnectionState,
|
||||
} from "livekit-client";
|
||||
import { observeParticipantEvents } from "@livekit/components-core";
|
||||
import {
|
||||
@@ -22,64 +23,83 @@ import {
|
||||
catchError,
|
||||
combineLatest,
|
||||
distinctUntilChanged,
|
||||
from,
|
||||
map,
|
||||
type Observable,
|
||||
of,
|
||||
scan,
|
||||
startWith,
|
||||
switchMap,
|
||||
tap,
|
||||
} from "rxjs";
|
||||
import { type Logger } from "matrix-js-sdk/lib/logger";
|
||||
import { deepCompare } from "matrix-js-sdk/lib/utils";
|
||||
|
||||
import { type Behavior } from "../../Behavior";
|
||||
import { constant, type Behavior } from "../../Behavior";
|
||||
import { type IConnectionManager } from "../remoteMembers/ConnectionManager";
|
||||
import { ObservableScope } from "../../ObservableScope";
|
||||
import { type Publisher } from "./Publisher";
|
||||
import { type MuteStates } from "../../MuteStates";
|
||||
import { and$ } from "../../../utils/observable";
|
||||
import { ElementCallError, UnknownCallError } from "../../../utils/errors";
|
||||
import {
|
||||
ElementCallError,
|
||||
MembershipManagerError,
|
||||
UnknownCallError,
|
||||
} from "../../../utils/errors";
|
||||
import { ElementWidgetActions, widget } from "../../../widget";
|
||||
import { getUrlParams } from "../../../UrlParams.ts";
|
||||
import { PosthogAnalytics } from "../../../analytics/PosthogAnalytics.ts";
|
||||
import { MatrixRTCMode } from "../../../settings/settings.ts";
|
||||
import { Config } from "../../../config/Config.ts";
|
||||
import {
|
||||
type Connection,
|
||||
type ConnectionState,
|
||||
} from "../remoteMembers/Connection.ts";
|
||||
import { type Connection } from "../remoteMembers/Connection.ts";
|
||||
|
||||
export enum LivekitState {
|
||||
Uninitialized = "uninitialized",
|
||||
Connecting = "connecting",
|
||||
Connected = "connected",
|
||||
export enum RTCBackendState {
|
||||
Error = "error",
|
||||
/** Not even a transport is available to the LocalMembership */
|
||||
WaitingForTransport = "waiting_for_transport",
|
||||
/** A connection appeared so we can initialise the publisher */
|
||||
WaitingForConnection = "waiting_for_connection",
|
||||
/** Connection and transport arrived, publisher Initialized */
|
||||
Initialized = "Initialized",
|
||||
CreatingTracks = "creating_tracks",
|
||||
ReadyToPublish = "ready_to_publish",
|
||||
WaitingToPublish = "waiting_to_publish",
|
||||
Connected = "connected",
|
||||
Disconnected = "disconnected",
|
||||
Disconnecting = "disconnecting",
|
||||
}
|
||||
|
||||
type LocalMemberLivekitState =
|
||||
| { state: LivekitState.Error; error: ElementCallError }
|
||||
| { state: LivekitState.Connected }
|
||||
| { state: LivekitState.Connecting }
|
||||
| { state: LivekitState.Uninitialized }
|
||||
| { state: LivekitState.Disconnected }
|
||||
| { state: LivekitState.Disconnecting };
|
||||
type LocalMemberRtcBackendState =
|
||||
| { state: RTCBackendState.Error; error: ElementCallError }
|
||||
| { state: RTCBackendState.WaitingForTransport }
|
||||
| { state: RTCBackendState.WaitingForConnection }
|
||||
| { state: RTCBackendState.Initialized }
|
||||
| { state: RTCBackendState.CreatingTracks }
|
||||
| { state: RTCBackendState.ReadyToPublish }
|
||||
| { state: RTCBackendState.WaitingToPublish }
|
||||
| { state: RTCBackendState.Connected }
|
||||
| { state: RTCBackendState.Disconnected }
|
||||
| { state: RTCBackendState.Disconnecting };
|
||||
|
||||
export enum MatrixState {
|
||||
WaitingForTransport = "waiting_for_transport",
|
||||
Ready = "ready",
|
||||
Connecting = "connecting",
|
||||
Connected = "connected",
|
||||
Disconnected = "disconnected",
|
||||
Connecting = "connecting",
|
||||
Error = "Error",
|
||||
}
|
||||
|
||||
type LocalMemberMatrixState =
|
||||
| { state: MatrixState.Connected }
|
||||
| { state: MatrixState.WaitingForTransport }
|
||||
| { state: MatrixState.Ready }
|
||||
| { state: MatrixState.Connecting }
|
||||
| { state: MatrixState.Disconnected }
|
||||
| { state: MatrixState.Error; error: Error };
|
||||
|
||||
export interface LocalMemberConnectionState {
|
||||
livekit$: Behavior<LocalMemberLivekitState>;
|
||||
livekit$: Behavior<LocalMemberRtcBackendState>;
|
||||
matrix$: Behavior<LocalMemberMatrixState>;
|
||||
}
|
||||
|
||||
@@ -102,7 +122,7 @@ interface Props {
|
||||
muteStates: MuteStates;
|
||||
connectionManager: IConnectionManager;
|
||||
createPublisherFactory: (connection: Connection) => Publisher;
|
||||
joinMatrixRTC: (trasnport: LivekitTransport) => Promise<void>;
|
||||
joinMatrixRTC: (transport: LivekitTransport) => Promise<void>;
|
||||
homeserverConnected$: Behavior<boolean>;
|
||||
localTransport$: Behavior<LivekitTransport | null>;
|
||||
matrixRTCSession: Pick<
|
||||
@@ -136,48 +156,38 @@ export const createLocalMembership$ = ({
|
||||
muteStates,
|
||||
matrixRTCSession,
|
||||
}: Props): {
|
||||
requestConnect: () => LocalMemberConnectionState;
|
||||
/**
|
||||
* This starts audio and video tracks. They will be reused when calling `requestConnect`.
|
||||
*/
|
||||
startTracks: () => Behavior<LocalTrack[]>;
|
||||
requestDisconnect: () => Observable<LocalMemberLivekitState> | null;
|
||||
/**
|
||||
* This sets a inner state (shouldConnect) to true and instructs the js-sdk and livekit to keep the user
|
||||
* connected to matrix and livekit.
|
||||
*/
|
||||
requestConnect: () => void;
|
||||
requestDisconnect: () => void;
|
||||
connectionState: LocalMemberConnectionState;
|
||||
sharingScreen$: Behavior<boolean>;
|
||||
/**
|
||||
* Callback to toggle screen sharing. If null, screen sharing is not possible.
|
||||
*/
|
||||
toggleScreenSharing: (() => void) | null;
|
||||
tracks$: Behavior<LocalTrack[]>;
|
||||
participant$: Behavior<LocalParticipant | null>;
|
||||
connection$: Behavior<Connection | null>;
|
||||
homeserverConnected$: Behavior<boolean>;
|
||||
// deprecated fields
|
||||
/** @deprecated use state instead*/
|
||||
connected$: Behavior<boolean>;
|
||||
// this needs to be discussed
|
||||
/** @deprecated use state instead*/
|
||||
/**
|
||||
* Whether various media/event sources should pretend to be disconnected from
|
||||
* all network input, even if their connection still technically works.
|
||||
* @deprecated use state instead
|
||||
*/
|
||||
reconnecting$: Behavior<boolean>;
|
||||
} => {
|
||||
const logger = parentLogger.getChild("[LocalMembership]");
|
||||
logger.debug(`Creating local membership..`);
|
||||
const state = {
|
||||
livekit$: new BehaviorSubject<LocalMemberLivekitState>({
|
||||
state: LivekitState.Uninitialized,
|
||||
}),
|
||||
matrix$: new BehaviorSubject<LocalMemberMatrixState>({
|
||||
state: MatrixState.Disconnected,
|
||||
}),
|
||||
};
|
||||
|
||||
// This should be used in a combineLatest with publisher$ to connect.
|
||||
// to make it possible to call startTracks before the preferredTransport$ has resolved.
|
||||
const trackStartRequested$ = new BehaviorSubject(false);
|
||||
|
||||
// This should be used in a combineLatest with publisher$ to connect.
|
||||
// to make it possible to call startTracks before the preferredTransport$ has resolved.
|
||||
const connectRequested$ = new BehaviorSubject(false);
|
||||
|
||||
// This should be used in a combineLatest with publisher$ to connect.
|
||||
const tracks$ = new BehaviorSubject<LocalTrack[]>([]);
|
||||
|
||||
// unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error.
|
||||
// Unwrap the local transport and set the state of the LocalMembership to error in case the transport is an error.
|
||||
const localTransport$ = scope.behavior(
|
||||
localTransportCanThrow$.pipe(
|
||||
catchError((e: unknown) => {
|
||||
@@ -191,7 +201,7 @@ export const createLocalMembership$ = ({
|
||||
: new Error("Unknown error from localTransport"),
|
||||
);
|
||||
}
|
||||
state.livekit$.next({ state: LivekitState.Error, error });
|
||||
setLivekitError(error);
|
||||
return of(null);
|
||||
}),
|
||||
),
|
||||
@@ -203,12 +213,12 @@ export const createLocalMembership$ = ({
|
||||
connectionManager.connectionManagerData$,
|
||||
localTransport$,
|
||||
]).pipe(
|
||||
map(([connectionData, localTransport]) => {
|
||||
map(([{ value: connectionData }, localTransport]) => {
|
||||
if (localTransport === null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return connectionData.value.getConnectionForTransport(localTransport);
|
||||
return connectionData.getConnectionForTransport(localTransport);
|
||||
}),
|
||||
tap((connection) => {
|
||||
logger.info(
|
||||
@@ -218,50 +228,36 @@ export const createLocalMembership$ = ({
|
||||
),
|
||||
);
|
||||
|
||||
const localConnectionState$ = localConnection$.pipe(
|
||||
switchMap((connection) => (connection ? connection.state$ : of(null))),
|
||||
);
|
||||
|
||||
// /**
|
||||
// * Whether we are "fully" connected to the call. Accounts for both the
|
||||
// * connection to the MatrixRTC session and the LiveKit publish connection.
|
||||
// */
|
||||
// // TODO use this in combination with the MemberState.
|
||||
const connected$ = scope.behavior(
|
||||
and$(
|
||||
homeserverConnected$,
|
||||
localConnection$.pipe(
|
||||
switchMap((c) =>
|
||||
c
|
||||
? c.state$.pipe(map((state) => state.state === "ConnectedToLkRoom"))
|
||||
: of(false),
|
||||
),
|
||||
homeserverConnected$.pipe(
|
||||
tap((v) => logger.debug("matrix: Connected state changed", v)),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
const publisher$ = new BehaviorSubject<Publisher | null>(null);
|
||||
localConnection$.pipe(scope.bind()).subscribe((connection) => {
|
||||
if (connection !== null && publisher$.value === null) {
|
||||
// TODO looks strange to not change publisher if connection changes.
|
||||
// @toger5 will take care of this!
|
||||
publisher$.next(createPublisherFactory(connection));
|
||||
}
|
||||
});
|
||||
|
||||
// const mutestate= publisher$.pipe(switchMap((publisher) => {
|
||||
// return publisher.muteState$
|
||||
// });
|
||||
|
||||
combineLatest([publisher$, trackStartRequested$]).subscribe(
|
||||
([publisher, shouldStartTracks]) => {
|
||||
if (publisher && shouldStartTracks) {
|
||||
publisher
|
||||
.createAndSetupTracks()
|
||||
.then((tracks) => {
|
||||
tracks$.next(tracks);
|
||||
})
|
||||
.catch((error) => {
|
||||
logger.error("Error creating tracks:", error);
|
||||
});
|
||||
}
|
||||
},
|
||||
localConnectionState$.pipe(
|
||||
switchMap((state) => {
|
||||
logger.debug("livekit: Connected state changed", state);
|
||||
if (!state) return of(false);
|
||||
if (state.state === "ConnectedToLkRoom") {
|
||||
logger.debug(
|
||||
"livekit: Connected state changed (inner livekitConnectionState$)",
|
||||
state.livekitConnectionState$.value,
|
||||
);
|
||||
return state.livekitConnectionState$.pipe(
|
||||
map((lkState) => lkState === ConnectionState.Connected),
|
||||
);
|
||||
}
|
||||
return of(false);
|
||||
}),
|
||||
),
|
||||
).pipe(tap((v) => logger.debug("combined: Connected state changed", v))),
|
||||
);
|
||||
|
||||
// MATRIX RELATED
|
||||
@@ -286,90 +282,217 @@ export const createLocalMembership$ = ({
|
||||
),
|
||||
);
|
||||
|
||||
// This should be used in a combineLatest with publisher$ to connect.
|
||||
// to make it possible to call startTracks before the preferredTransport$ has resolved.
|
||||
const trackStartRequested = Promise.withResolvers<void>();
|
||||
|
||||
// This should be used in a combineLatest with publisher$ to connect.
|
||||
// to make it possible to call startTracks before the preferredTransport$ has resolved.
|
||||
const connectRequested$ = new BehaviorSubject(false);
|
||||
|
||||
/**
|
||||
* The publisher is stored in here an abstracts creating and publishing tracks.
|
||||
*/
|
||||
const publisher$ = new BehaviorSubject<Publisher | null>(null);
|
||||
/**
|
||||
* Extract the tracks from the published. Also reacts to changing publishers.
|
||||
*/
|
||||
const tracks$ = scope.behavior(
|
||||
publisher$.pipe(switchMap((p) => (p?.tracks$ ? p.tracks$ : constant([])))),
|
||||
);
|
||||
const publishing$ = scope.behavior(
|
||||
publisher$.pipe(switchMap((p) => p?.publishing$ ?? constant(false))),
|
||||
);
|
||||
|
||||
const startTracks = (): Behavior<LocalTrack[]> => {
|
||||
trackStartRequested$.next(true);
|
||||
trackStartRequested.resolve();
|
||||
return tracks$;
|
||||
};
|
||||
|
||||
combineLatest([publisher$, tracks$]).subscribe(([publisher, tracks]) => {
|
||||
if (
|
||||
tracks.length === 0 ||
|
||||
// change this to !== Publishing
|
||||
state.livekit$.value.state !== LivekitState.Uninitialized
|
||||
) {
|
||||
return;
|
||||
const requestConnect = (): void => {
|
||||
trackStartRequested.resolve();
|
||||
connectRequested$.next(true);
|
||||
};
|
||||
|
||||
const requestDisconnect = (): void => {
|
||||
connectRequested$.next(false);
|
||||
};
|
||||
|
||||
// Take care of the publisher$
|
||||
// create a new one as soon as a local Connection is available
|
||||
//
|
||||
// Recreate a new one once the local connection changes
|
||||
// - stop publishing
|
||||
// - destruct all current streams
|
||||
// - overwrite current publisher
|
||||
scope.reconcile(localConnection$, async (connection) => {
|
||||
if (connection !== null) {
|
||||
publisher$.next(createPublisherFactory(connection));
|
||||
}
|
||||
state.livekit$.next({ state: LivekitState.Connecting });
|
||||
publisher
|
||||
?.startPublishing()
|
||||
.then(() => {
|
||||
state.livekit$.next({ state: LivekitState.Connected });
|
||||
})
|
||||
.catch((error) => {
|
||||
state.livekit$.next({ state: LivekitState.Error, error });
|
||||
});
|
||||
return Promise.resolve(async (): Promise<void> => {
|
||||
await publisher$?.value?.stopPublishing();
|
||||
publisher$?.value?.stopTracks();
|
||||
});
|
||||
});
|
||||
|
||||
combineLatest([localTransport$, connectRequested$]).subscribe(
|
||||
// TODO reconnect when transport changes => create test.
|
||||
([transport, connectRequested]) => {
|
||||
if (
|
||||
transport === null ||
|
||||
!connectRequested ||
|
||||
state.matrix$.value.state !== MatrixState.Disconnected
|
||||
) {
|
||||
logger.info(
|
||||
"Not yet connecting because: ",
|
||||
"transport === null:",
|
||||
transport === null,
|
||||
"!connectRequested:",
|
||||
!connectRequested,
|
||||
"state.matrix$.value.state !== MatrixState.Disconnected:",
|
||||
state.matrix$.value.state !== MatrixState.Disconnected,
|
||||
);
|
||||
return;
|
||||
// Use reconcile here to not run concurrent createAndSetupTracks calls
|
||||
// `tracks$` will update once they are ready.
|
||||
scope.reconcile(
|
||||
scope.behavior(
|
||||
combineLatest([publisher$, tracks$, from(trackStartRequested.promise)]),
|
||||
null,
|
||||
),
|
||||
async (valueIfReady) => {
|
||||
if (!valueIfReady) return;
|
||||
const [publisher, tracks] = valueIfReady;
|
||||
if (publisher && tracks.length === 0) {
|
||||
await publisher.createAndSetupTracks().catch((e) => logger.error(e));
|
||||
}
|
||||
state.matrix$.next({ state: MatrixState.Connecting });
|
||||
logger.info("Matrix State connecting");
|
||||
|
||||
joinMatrixRTC(transport).catch((error) => {
|
||||
logger.error(error);
|
||||
state.matrix$.next({ state: MatrixState.Error, error });
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
// TODO add this and update `state.matrix$` based on it.
|
||||
// useTypedEventEmitter(
|
||||
// rtcSession,
|
||||
// MatrixRTCSessionEvent.MembershipManagerError,
|
||||
// (error) => setExternalError(new ConnectionLostError()),
|
||||
// );
|
||||
// Based on `connectRequested$` we start publishing tracks. (once they are there!)
|
||||
scope.reconcile(
|
||||
scope.behavior(combineLatest([publisher$, tracks$, connectRequested$])),
|
||||
async ([publisher, tracks, shouldConnect]) => {
|
||||
if (shouldConnect === publisher?.publishing$.value) return;
|
||||
if (tracks.length !== 0 && shouldConnect) {
|
||||
try {
|
||||
await publisher?.startPublishing();
|
||||
} catch (error) {
|
||||
setLivekitError(error as ElementCallError);
|
||||
}
|
||||
} else if (tracks.length !== 0 && !shouldConnect) {
|
||||
try {
|
||||
await publisher?.stopPublishing();
|
||||
} catch (error) {
|
||||
setLivekitError(new UnknownCallError(error as Error));
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
const requestConnect = (): LocalMemberConnectionState => {
|
||||
trackStartRequested$.next(true);
|
||||
connectRequested$.next(true);
|
||||
|
||||
return state;
|
||||
const fatalLivekitError$ = new BehaviorSubject<ElementCallError | null>(null);
|
||||
const setLivekitError = (e: ElementCallError): void => {
|
||||
if (fatalLivekitError$.value !== null)
|
||||
logger.error("Multiple Livkit Errors:", e);
|
||||
else fatalLivekitError$.next(e);
|
||||
};
|
||||
const livekitState$: Behavior<LocalMemberRtcBackendState> = scope.behavior(
|
||||
combineLatest([
|
||||
publisher$,
|
||||
localTransport$,
|
||||
tracks$.pipe(
|
||||
tap((t) => {
|
||||
logger.info("tracks$: ", t);
|
||||
}),
|
||||
),
|
||||
publishing$,
|
||||
connectRequested$,
|
||||
from(trackStartRequested.promise).pipe(
|
||||
map(() => true),
|
||||
startWith(false),
|
||||
),
|
||||
fatalLivekitError$,
|
||||
]).pipe(
|
||||
map(
|
||||
([
|
||||
publisher,
|
||||
localTransport,
|
||||
tracks,
|
||||
publishing,
|
||||
shouldConnect,
|
||||
shouldStartTracks,
|
||||
error,
|
||||
]) => {
|
||||
// read this:
|
||||
// if(!<A>) return {state: ...}
|
||||
// if(!<B>) return {state: <MyState>}
|
||||
//
|
||||
// as:
|
||||
// We do have <A> but not yet <B> so we are in <MyState>
|
||||
if (error !== null) return { state: RTCBackendState.Error, error };
|
||||
const hasTracks = tracks.length > 0;
|
||||
if (!localTransport)
|
||||
return { state: RTCBackendState.WaitingForTransport };
|
||||
if (!publisher)
|
||||
return { state: RTCBackendState.WaitingForConnection };
|
||||
if (!shouldStartTracks) return { state: RTCBackendState.Initialized };
|
||||
if (!hasTracks) return { state: RTCBackendState.CreatingTracks };
|
||||
if (!shouldConnect) return { state: RTCBackendState.ReadyToPublish };
|
||||
if (!publishing) return { state: RTCBackendState.WaitingToPublish };
|
||||
return { state: RTCBackendState.Connected };
|
||||
},
|
||||
),
|
||||
distinctUntilChanged(deepCompare),
|
||||
),
|
||||
);
|
||||
|
||||
const requestDisconnect = (): Behavior<LocalMemberLivekitState> | null => {
|
||||
if (state.livekit$.value.state !== LivekitState.Connected) return null;
|
||||
state.livekit$.next({ state: LivekitState.Disconnecting });
|
||||
combineLatest([publisher$, tracks$], (publisher, tracks) => {
|
||||
publisher
|
||||
?.stopPublishing()
|
||||
.then(() => {
|
||||
tracks.forEach((track) => track.stop());
|
||||
state.livekit$.next({ state: LivekitState.Disconnected });
|
||||
})
|
||||
.catch((error) => {
|
||||
state.livekit$.next({ state: LivekitState.Error, error });
|
||||
});
|
||||
});
|
||||
|
||||
return state.livekit$;
|
||||
const fatalMatrixError$ = new BehaviorSubject<ElementCallError | null>(null);
|
||||
const setMatrixError = (e: ElementCallError): void => {
|
||||
if (fatalMatrixError$.value !== null)
|
||||
logger.error("Multiple Matrix Errors:", e);
|
||||
else fatalMatrixError$.next(e);
|
||||
};
|
||||
const matrixState$: Behavior<LocalMemberMatrixState> = scope.behavior(
|
||||
combineLatest([
|
||||
localTransport$,
|
||||
connectRequested$,
|
||||
homeserverConnected$,
|
||||
]).pipe(
|
||||
map(([localTransport, connectRequested, homeserverConnected]) => {
|
||||
if (!localTransport) return { state: MatrixState.WaitingForTransport };
|
||||
if (!connectRequested) return { state: MatrixState.Ready };
|
||||
if (!homeserverConnected) return { state: MatrixState.Connecting };
|
||||
return { state: MatrixState.Connected };
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
// Keep matrix rtc session in sync with localTransport$, connectRequested$ and muteStates.video.enabled$
|
||||
scope.reconcile(
|
||||
scope.behavior(combineLatest([localTransport$, connectRequested$])),
|
||||
async ([transport, shouldConnect]) => {
|
||||
if (!shouldConnect) return;
|
||||
|
||||
if (!transport) return;
|
||||
try {
|
||||
await joinMatrixRTC(transport);
|
||||
} catch (error) {
|
||||
logger.error("Error entering RTC session", error);
|
||||
if (error instanceof Error)
|
||||
setMatrixError(new MembershipManagerError(error));
|
||||
}
|
||||
|
||||
// Update our member event when our mute state changes.
|
||||
const callIntentScope = new ObservableScope();
|
||||
// because this uses its own scope, we can start another reconciliation for the duration of one connection.
|
||||
callIntentScope.reconcile(
|
||||
muteStates.video.enabled$,
|
||||
async (videoEnabled) =>
|
||||
matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"),
|
||||
);
|
||||
|
||||
return async (): Promise<void> => {
|
||||
callIntentScope.end();
|
||||
try {
|
||||
// Update matrixRTCSession to allow udpating the transport without leaving the session!
|
||||
await matrixRTCSession.leaveRoomSession();
|
||||
} catch (e) {
|
||||
logger.error("Error leaving RTC session", e);
|
||||
}
|
||||
try {
|
||||
await widget?.api.transport.send(ElementWidgetActions.HangupCall, {});
|
||||
} catch (e) {
|
||||
logger.error("Failed to send hangup action", e);
|
||||
}
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
const participant$ = scope.behavior(
|
||||
localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)),
|
||||
);
|
||||
|
||||
// Pause upstream of all local media tracks when we're disconnected from
|
||||
// MatrixRTC, because it can be an unpleasant surprise for the app to say
|
||||
@@ -377,12 +500,12 @@ export const createLocalMembership$ = ({
|
||||
// We use matrixConnected$ rather than reconnecting$ because we want to
|
||||
// pause tracks during the initial joining sequence too until we're sure
|
||||
// that our own media is displayed on screen.
|
||||
combineLatest([localConnection$, homeserverConnected$])
|
||||
// TODO refactor this based no livekitState$
|
||||
combineLatest([participant$, homeserverConnected$])
|
||||
.pipe(scope.bind())
|
||||
.subscribe(([connection, connected]) => {
|
||||
if (connection?.state$.value.state !== "ConnectedToLkRoom") return;
|
||||
const publications =
|
||||
connection.livekitRoom.localParticipant.trackPublications.values();
|
||||
.subscribe(([participant, connected]) => {
|
||||
if (!participant) return;
|
||||
const publications = participant.trackPublications.values();
|
||||
if (connected) {
|
||||
for (const p of publications) {
|
||||
if (p.track?.isUpstreamPaused === true) {
|
||||
@@ -419,89 +542,17 @@ export const createLocalMembership$ = ({
|
||||
}
|
||||
}
|
||||
});
|
||||
// TODO: Refactor updateCallIntent to sth like this:
|
||||
// combineLatest([muteStates.video.enabled$,localTransport$, state.matrix$]).pipe(map(()=>{
|
||||
// matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"),
|
||||
// }))
|
||||
//
|
||||
|
||||
// TODO I do not fully understand what this does.
|
||||
// Is it needed?
|
||||
// Is this at the right place?
|
||||
// Can this be simplified?
|
||||
// Start and stop session membership as needed
|
||||
// Discussed in statndup -> It seems we can remove this (there is another call to enterRTCSession in this file)
|
||||
// MAKE SURE TO UNDERSTAND why reconcile is needed and what is potentially missing from the alternative enterRTCSession block.
|
||||
// @toger5 will try to take care of this.
|
||||
scope.reconcile(localTransport$, async (transport) => {
|
||||
if (transport !== null && transport !== undefined) {
|
||||
try {
|
||||
state.matrix$.next({ state: MatrixState.Connecting });
|
||||
await joinMatrixRTC(transport);
|
||||
} catch (e) {
|
||||
logger.error("Error entering RTC session", e);
|
||||
}
|
||||
|
||||
// Update our member event when our mute state changes.
|
||||
const intentScope = new ObservableScope();
|
||||
intentScope.reconcile(muteStates.video.enabled$, async (videoEnabled) =>
|
||||
matrixRTCSession.updateCallIntent(videoEnabled ? "video" : "audio"),
|
||||
);
|
||||
|
||||
return async (): Promise<void> => {
|
||||
intentScope.end();
|
||||
// Only sends Matrix leave event. The LiveKit session will disconnect
|
||||
// as soon as either the stopConnection$ handler above gets to it or
|
||||
// the view model is destroyed.
|
||||
try {
|
||||
await matrixRTCSession.leaveRoomSession();
|
||||
} catch (e) {
|
||||
logger.error("Error leaving RTC session", e);
|
||||
}
|
||||
try {
|
||||
await widget?.api.transport.send(ElementWidgetActions.HangupCall, {});
|
||||
} catch (e) {
|
||||
logger.error("Failed to send hangup action", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
localConnection$
|
||||
.pipe(
|
||||
distinctUntilChanged(),
|
||||
switchMap((c) =>
|
||||
c === null ? of({ state: "Initialized" } as ConnectionState) : c.state$,
|
||||
),
|
||||
map((s) => {
|
||||
logger.trace(`Local connection state update: ${s.state}`);
|
||||
if (s.state == "FailedToStart") {
|
||||
return s.error instanceof ElementCallError
|
||||
? s.error
|
||||
: new UnknownCallError(s.error);
|
||||
}
|
||||
}),
|
||||
scope.bind(),
|
||||
)
|
||||
.subscribe((error) => {
|
||||
if (error !== undefined)
|
||||
state.livekit$.next({ state: LivekitState.Error, error });
|
||||
});
|
||||
|
||||
/**
|
||||
* Whether the user is currently sharing their screen.
|
||||
*/
|
||||
const sharingScreen$ = scope.behavior(
|
||||
localConnection$.pipe(
|
||||
switchMap((c) =>
|
||||
c !== null
|
||||
? observeSharingScreen$(c.livekitRoom.localParticipant)
|
||||
: of(false),
|
||||
),
|
||||
participant$.pipe(
|
||||
switchMap((p) => (p !== null ? observeSharingScreen$(p) : of(false))),
|
||||
),
|
||||
);
|
||||
|
||||
let toggleScreenSharing = null;
|
||||
let toggleScreenSharing: (() => void) | null = null;
|
||||
if (
|
||||
"getDisplayMedia" in (navigator.mediaDevices ?? {}) &&
|
||||
!getUrlParams().hideScreensharing
|
||||
@@ -527,27 +578,26 @@ export const createLocalMembership$ = ({
|
||||
// We also allow screen sharing to be toggled even if the connection
|
||||
// is still initializing or publishing tracks, because there's no
|
||||
// technical reason to disallow this. LiveKit will publish if it can.
|
||||
localConnection$.value?.livekitRoom.localParticipant
|
||||
.setScreenShareEnabled(targetScreenshareState, screenshareSettings)
|
||||
participant$.value
|
||||
?.setScreenShareEnabled(targetScreenshareState, screenshareSettings)
|
||||
.catch(logger.error);
|
||||
};
|
||||
}
|
||||
|
||||
const participant$ = scope.behavior(
|
||||
localConnection$.pipe(map((c) => c?.livekitRoom?.localParticipant ?? null)),
|
||||
);
|
||||
return {
|
||||
startTracks,
|
||||
requestConnect,
|
||||
requestDisconnect,
|
||||
connectionState: state,
|
||||
homeserverConnected$,
|
||||
connectionState: {
|
||||
livekit$: livekitState$,
|
||||
matrix$: matrixState$,
|
||||
},
|
||||
tracks$,
|
||||
participant$,
|
||||
connected$,
|
||||
reconnecting$,
|
||||
|
||||
sharingScreen$,
|
||||
toggleScreenSharing,
|
||||
participant$,
|
||||
connection$: localConnection$,
|
||||
};
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user