Refactor how we aquire the jwt token for the local user. (only fetch it
once) The local jwt token needs to be aquired via the right endpoint. The endpoint defines how our rtcBackendIdentity is computed. Based on us using sticky events or state events we also need to use the right endpoint. This cannot be done generically in the connection manager. The jwt token now is computed in the localTransport and the resolved sfu config is passed to the connection manager. Add JWT endpoint version and SFU config support Pin matrix-js-sdk to a specific commit and update dev auth image tag. Propagate SFU config and JWT endpoint choice through local transport, ConnectionManager and Connection; add JwtEndpointVersion enum and LocalTransportWithSFUConfig type. Add NO_MATRIX_2 auth error and locale string, thread rtcBackendIdentity through UI props, and include related test, CSS and minor imports updates
This commit is contained in:
@@ -114,6 +114,7 @@ function setupRemoteConnection(): Connection {
|
||||
client: client,
|
||||
transport: livekitFocus,
|
||||
scope: testScope,
|
||||
ownMembershipIdentity: ownMemberMock,
|
||||
livekitRoomFactory: () => fakeLivekitRoom,
|
||||
};
|
||||
|
||||
@@ -138,7 +139,7 @@ function setupRemoteConnection(): Connection {
|
||||
return Promise.resolve();
|
||||
});
|
||||
|
||||
return new Connection(opts, logger, ownMemberMock);
|
||||
return new Connection(opts, logger);
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
@@ -155,9 +156,10 @@ describe("Start connection states", () => {
|
||||
client: client,
|
||||
transport: livekitFocus,
|
||||
scope: testScope,
|
||||
ownMembershipIdentity: ownMemberMock,
|
||||
livekitRoomFactory: () => fakeLivekitRoom,
|
||||
};
|
||||
const connection = new Connection(opts, logger, ownMemberMock);
|
||||
const connection = new Connection(opts, logger);
|
||||
|
||||
expect(connection.state$.getValue()).toEqual("Initialized");
|
||||
});
|
||||
@@ -170,10 +172,11 @@ describe("Start connection states", () => {
|
||||
client: client,
|
||||
transport: livekitFocus,
|
||||
scope: testScope,
|
||||
ownMembershipIdentity: ownMemberMock,
|
||||
livekitRoomFactory: () => fakeLivekitRoom,
|
||||
};
|
||||
|
||||
const connection = new Connection(opts, logger, ownMemberMock);
|
||||
const connection = new Connection(opts, logger);
|
||||
|
||||
const capturedStates: (ConnectionState | Error)[] = [];
|
||||
const s = connection.state$.subscribe((value) => {
|
||||
@@ -220,10 +223,11 @@ describe("Start connection states", () => {
|
||||
client: client,
|
||||
transport: livekitFocus,
|
||||
scope: testScope,
|
||||
ownMembershipIdentity: ownMemberMock,
|
||||
livekitRoomFactory: () => fakeLivekitRoom,
|
||||
};
|
||||
|
||||
const connection = new Connection(opts, logger, ownMemberMock);
|
||||
const connection = new Connection(opts, logger);
|
||||
|
||||
const capturedStates: (ConnectionState | Error)[] = [];
|
||||
const s = connection.state$.subscribe((value) => {
|
||||
@@ -277,10 +281,11 @@ describe("Start connection states", () => {
|
||||
client: client,
|
||||
transport: livekitFocus,
|
||||
scope: testScope,
|
||||
ownMembershipIdentity: ownMemberMock,
|
||||
livekitRoomFactory: () => fakeLivekitRoom,
|
||||
};
|
||||
|
||||
const connection = new Connection(opts, logger, ownMemberMock);
|
||||
const connection = new Connection(opts, logger);
|
||||
|
||||
const capturedStates: (ConnectionState | Error)[] = [];
|
||||
const s = connection.state$.subscribe((value) => {
|
||||
|
||||
@@ -33,10 +33,21 @@ import {
|
||||
SFURoomCreationRestrictedError,
|
||||
UnknownCallError,
|
||||
} from "../../../utils/errors.ts";
|
||||
import { type JwtEndpointVersion } from "../localMember/LocalTransport.ts";
|
||||
|
||||
export interface ConnectionOpts {
|
||||
/** Whether we always try to connect to this connection via the legacy jwt endpoint. (no hash identity) */
|
||||
forceOldJwtEndpoint?: boolean;
|
||||
/**
|
||||
* For the local transport we already do know the jwt token and url. We can reuse it.
|
||||
* On top the local transport will send additional data to the jwt server to use delayed event delegation.
|
||||
*/
|
||||
existingSFUConfig?: SFUConfig;
|
||||
/**
|
||||
* For local connections that use the oldest member pattern. here we have not prefetched the sfuConfig
|
||||
* and hence we need to let the connection do the jwt token fetching.
|
||||
*/
|
||||
forceJwtEndpoint?: JwtEndpointVersion;
|
||||
/** The identity parts to use on this connection */
|
||||
ownMembershipIdentity: CallMembershipIdentityParts;
|
||||
/** The media transport to connect to. */
|
||||
transport: LivekitTransport;
|
||||
/** The Matrix client to use for OpenID and SFU config requests. */
|
||||
@@ -132,8 +143,10 @@ export class Connection {
|
||||
try {
|
||||
this._state$.next(ConnectionState.FetchingConfig);
|
||||
// We should already have this information after creating the localTransport.
|
||||
// It would probably be better to forward this here.
|
||||
const { url, jwt } = await this.getSFUConfigWithOpenID();
|
||||
// only call getSFUConfigWithOpenID for connections where we do not have a token yet. (existingJwtTokenData === undefined)
|
||||
const { url, jwt } =
|
||||
this.existingSFUConfig ??
|
||||
(await this.getSFUConfigForRemoteConnection());
|
||||
// If we were stopped while fetching the config, don't proceed to connect
|
||||
if (this.stopped) return;
|
||||
|
||||
@@ -189,17 +202,16 @@ export class Connection {
|
||||
}
|
||||
}
|
||||
|
||||
protected async getSFUConfigWithOpenID(): Promise<SFUConfig> {
|
||||
protected async getSFUConfigForRemoteConnection(): Promise<SFUConfig> {
|
||||
// This will only be called for sfu's where we do not publish ourselves.
|
||||
// For the local connection we will use the existingJwtTokenData
|
||||
return await getSFUConfigWithOpenID(
|
||||
this.client,
|
||||
this.ownMembershipIdentity,
|
||||
this.transport.livekit_service_url,
|
||||
this.forceOldJwtEndpoint,
|
||||
this.transport.livekit_alias,
|
||||
// For the remote members we intentionally do not pass a delayEndpointBaseUrl.
|
||||
undefined,
|
||||
// and no delayId.
|
||||
undefined,
|
||||
// dont pass any custom opts for the subscribe only connections
|
||||
{},
|
||||
this.logger,
|
||||
);
|
||||
}
|
||||
@@ -222,7 +234,8 @@ export class Connection {
|
||||
|
||||
private readonly client: OpenIDClientParts;
|
||||
private readonly logger: Logger;
|
||||
private readonly forceOldJwtEndpoint: boolean;
|
||||
private readonly ownMembershipIdentity: CallMembershipIdentityParts;
|
||||
private readonly existingSFUConfig?: SFUConfig;
|
||||
/**
|
||||
* Creates a new connection to a matrix RTC LiveKit backend.
|
||||
*
|
||||
@@ -230,12 +243,9 @@ export class Connection {
|
||||
*
|
||||
* @param logger - The logger to use.
|
||||
*/
|
||||
public constructor(
|
||||
opts: ConnectionOpts,
|
||||
logger: Logger,
|
||||
private ownMembershipIdentity: CallMembershipIdentityParts,
|
||||
) {
|
||||
this.forceOldJwtEndpoint = opts.forceOldJwtEndpoint ?? false;
|
||||
public constructor(opts: ConnectionOpts, logger: Logger) {
|
||||
this.ownMembershipIdentity = opts.ownMembershipIdentity;
|
||||
this.existingSFUConfig = opts.existingSFUConfig;
|
||||
this.logger = logger.getChild("[Connection]");
|
||||
this.logger.info(
|
||||
`Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
|
||||
|
||||
@@ -20,7 +20,10 @@ import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc/LivekitTransp
|
||||
|
||||
import { type ObservableScope } from "../../ObservableScope.ts";
|
||||
import { Connection } from "./Connection.ts";
|
||||
import type { OpenIDClientParts } from "../../../livekit/openIDSFU.ts";
|
||||
import type {
|
||||
OpenIDClientParts,
|
||||
SFUConfig,
|
||||
} from "../../../livekit/openIDSFU.ts";
|
||||
import type { MediaDevices } from "../../MediaDevices.ts";
|
||||
import type { Behavior } from "../../Behavior.ts";
|
||||
import type { ProcessorState } from "../../../livekit/TrackProcessorContext.tsx";
|
||||
@@ -29,11 +32,11 @@ import { defaultLiveKitOptions } from "../../../livekit/options.ts";
|
||||
// TODO evaluate if this should be done like the Publisher Factory
|
||||
export interface ConnectionFactory {
|
||||
createConnection(
|
||||
transport: LivekitTransport,
|
||||
scope: ObservableScope,
|
||||
transport: LivekitTransport,
|
||||
ownMembershipIdentity: CallMembershipIdentityParts,
|
||||
logger: Logger,
|
||||
forceOldJwtEndpoint?: boolean,
|
||||
sfuConfig?: SFUConfig,
|
||||
): Connection;
|
||||
}
|
||||
|
||||
@@ -83,30 +86,30 @@ export class ECConnectionFactory implements ConnectionFactory {
|
||||
|
||||
/**
|
||||
*
|
||||
* @param transport The transport to use for this connection.
|
||||
* @param scope The observable scope (used for clean-up)
|
||||
* @param transport The transport to use for this connection.
|
||||
* @param ownMembershipIdentity required to connect (using the jwt service) with the SFU.
|
||||
* @param logger The logger instance to use for this connection.
|
||||
* @param forceOldJwtEndpoint Use the old JWT endpoint independent of what the sfu supports.
|
||||
* @param sfuConfig optional config in case we already have a token for this connection.
|
||||
* @returns
|
||||
*/
|
||||
public createConnection(
|
||||
transport: LivekitTransport,
|
||||
scope: ObservableScope,
|
||||
transport: LivekitTransport,
|
||||
ownMembershipIdentity: CallMembershipIdentityParts,
|
||||
logger: Logger,
|
||||
forceOldJwtEndpoint?: boolean,
|
||||
sfuConfig?: SFUConfig,
|
||||
): Connection {
|
||||
return new Connection(
|
||||
{
|
||||
existingSFUConfig: sfuConfig,
|
||||
transport,
|
||||
client: this.client,
|
||||
scope: scope,
|
||||
livekitRoomFactory: this.livekitRoomFactory,
|
||||
forceOldJwtEndpoint,
|
||||
ownMembershipIdentity,
|
||||
},
|
||||
logger,
|
||||
ownMembershipIdentity,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,12 +12,17 @@ import { type Logger } from "matrix-js-sdk/lib/logger";
|
||||
import { type RemoteParticipant } from "livekit-client";
|
||||
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
|
||||
|
||||
import { constant, type Behavior } from "../../Behavior.ts";
|
||||
import { type Behavior } from "../../Behavior.ts";
|
||||
import { type Connection } from "./Connection.ts";
|
||||
import { Epoch, type ObservableScope } from "../../ObservableScope.ts";
|
||||
import { generateItemsWithEpoch } from "../../../utils/observable.ts";
|
||||
import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts";
|
||||
import { type ConnectionFactory } from "./ConnectionFactory.ts";
|
||||
import {
|
||||
isLocalTransportWithSFUConfig,
|
||||
type LocalTransportWithSFUConfig,
|
||||
} from "../localMember/LocalTransport.ts";
|
||||
import { type SFUConfig } from "../../../livekit/openIDSFU.ts";
|
||||
|
||||
export class ConnectionManagerData {
|
||||
private readonly store: Map<
|
||||
@@ -66,9 +71,9 @@ export class ConnectionManagerData {
|
||||
interface Props {
|
||||
scope: ObservableScope;
|
||||
connectionFactory: ConnectionFactory;
|
||||
localTransport$: Behavior<LivekitTransport | null>;
|
||||
localTransport$: Behavior<LocalTransportWithSFUConfig | null>;
|
||||
remoteTransports$: Behavior<Epoch<LivekitTransport[]>>;
|
||||
forceOldJwtEndpointForLocalTransport$?: Behavior<boolean>;
|
||||
|
||||
logger: Logger;
|
||||
ownMembershipIdentity: CallMembershipIdentityParts;
|
||||
}
|
||||
@@ -87,7 +92,7 @@ export interface IConnectionManager {
|
||||
* @param props.remoteTransports$ - All other transports. The connection manager will create connections for each transport. (deduplicated with localTransport$)
|
||||
* @param props.ownMembershipIdentity - The own membership identity to use.
|
||||
* @param props.logger - The logger to use.
|
||||
* @param props.forceOldJwtEndpointForLocalTransport$ - Use the old JWT endpoint independent of what the sfu supports. Only applies for localTransport$.
|
||||
|
||||
*
|
||||
* Each of these behaviors can be interpreted as subscribed list of transports.
|
||||
*
|
||||
@@ -103,7 +108,6 @@ export function createConnectionManager$({
|
||||
connectionFactory,
|
||||
localTransport$,
|
||||
remoteTransports$,
|
||||
forceOldJwtEndpointForLocalTransport$ = constant(false),
|
||||
logger: parentLogger,
|
||||
ownMembershipIdentity,
|
||||
}: Props): IConnectionManager {
|
||||
@@ -118,42 +122,35 @@ export function createConnectionManager$({
|
||||
* It is build based on the list of subscribed transports (`transportsSubscriptions$`).
|
||||
* externally this is modified via `registerTransports()`.
|
||||
*/
|
||||
const transportsWithJwtTag$ = scope.behavior(
|
||||
combineLatest([
|
||||
remoteTransports$,
|
||||
localTransport$,
|
||||
forceOldJwtEndpointForLocalTransport$,
|
||||
]).pipe(
|
||||
// combine local and remote transports into one transport array
|
||||
const localAndRemoteTransports$: Behavior<
|
||||
Epoch<(LivekitTransport | LocalTransportWithSFUConfig)[]>
|
||||
> = scope.behavior(
|
||||
combineLatest([remoteTransports$, localTransport$]).pipe(
|
||||
// Combine local and remote transports into one transport array
|
||||
// and set the forceOldJwtEndpoint property on the local transport
|
||||
map(
|
||||
([
|
||||
remoteTransports,
|
||||
localTransport,
|
||||
forceOldJwtEndpointForLocalTransport,
|
||||
]) => {
|
||||
let localTransportAsArray: (LivekitTransport & {
|
||||
forceOldJwtEndpoint: boolean;
|
||||
})[] = [];
|
||||
if (localTransport) {
|
||||
localTransportAsArray = [
|
||||
{
|
||||
...localTransport,
|
||||
forceOldJwtEndpoint: forceOldJwtEndpointForLocalTransport,
|
||||
},
|
||||
];
|
||||
}
|
||||
return new Epoch(
|
||||
removeDuplicateTransports([
|
||||
...localTransportAsArray,
|
||||
...remoteTransports.value,
|
||||
]) as (LivekitTransport & {
|
||||
forceOldJwtEndpoint?: boolean;
|
||||
})[],
|
||||
remoteTransports.epoch,
|
||||
);
|
||||
},
|
||||
),
|
||||
map(([remoteTransports, localTransport]) => {
|
||||
let localTransportAsArray: LocalTransportWithSFUConfig[] = [];
|
||||
if (localTransport) {
|
||||
localTransportAsArray = [localTransport];
|
||||
}
|
||||
const dedupedRemote = removeDuplicateTransports(remoteTransports.value);
|
||||
const remoteWithoutLocal = dedupedRemote.filter(
|
||||
(transport) =>
|
||||
!localTransportAsArray.find((l) =>
|
||||
areLivekitTransportsEqual(l.transport, transport),
|
||||
),
|
||||
);
|
||||
logger.debug(
|
||||
"remoteWithoutLocal",
|
||||
remoteWithoutLocal,
|
||||
"localTransportAsArray",
|
||||
localTransportAsArray,
|
||||
);
|
||||
return new Epoch(
|
||||
[...localTransportAsArray, ...remoteWithoutLocal],
|
||||
remoteTransports.epoch,
|
||||
);
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
@@ -161,33 +158,51 @@ export function createConnectionManager$({
|
||||
* Connections for each transport in use by one or more session members.
|
||||
*/
|
||||
const connections$ = scope.behavior(
|
||||
transportsWithJwtTag$.pipe(
|
||||
localAndRemoteTransports$.pipe(
|
||||
generateItemsWithEpoch(
|
||||
function* (transports) {
|
||||
for (const transport of transports)
|
||||
yield {
|
||||
keys: [
|
||||
transport.livekit_service_url,
|
||||
transport.livekit_alias,
|
||||
transport.forceOldJwtEndpoint,
|
||||
],
|
||||
data: undefined,
|
||||
};
|
||||
for (const transportWithOrWithoutSfuConfig of transports) {
|
||||
if (
|
||||
isLocalTransportWithSFUConfig(transportWithOrWithoutSfuConfig)
|
||||
) {
|
||||
// This is the local transport only the `LocalTransportWithSFUConfig` has a `sfuConfig` field
|
||||
const { transport, sfuConfig } = transportWithOrWithoutSfuConfig;
|
||||
yield {
|
||||
keys: [
|
||||
transport.livekit_service_url,
|
||||
transport.livekit_alias,
|
||||
sfuConfig,
|
||||
],
|
||||
data: undefined,
|
||||
};
|
||||
} else {
|
||||
const transport = transportWithOrWithoutSfuConfig;
|
||||
yield {
|
||||
keys: [
|
||||
transport.livekit_service_url,
|
||||
transport.livekit_alias,
|
||||
undefined as undefined | SFUConfig,
|
||||
],
|
||||
data: undefined,
|
||||
};
|
||||
}
|
||||
}
|
||||
},
|
||||
(scope, _data$, serviceUrl, alias, forceOldJwtEndpoint) => {
|
||||
(scope, _data$, serviceUrl, alias, sfuConfig) => {
|
||||
logger.debug(
|
||||
`Creating connection to ${serviceUrl} (${alias}, forceOldJwtEndpoint: ${forceOldJwtEndpoint})`,
|
||||
`Creating connection to ${serviceUrl} (${alias}, withSfuConfig (local connection?): ${JSON.stringify(sfuConfig) ?? "no config->remote connection"})`,
|
||||
);
|
||||
|
||||
const connection = connectionFactory.createConnection(
|
||||
scope,
|
||||
{
|
||||
type: "livekit",
|
||||
livekit_service_url: serviceUrl,
|
||||
livekit_alias: alias,
|
||||
},
|
||||
scope,
|
||||
ownMembershipIdentity,
|
||||
logger,
|
||||
forceOldJwtEndpoint,
|
||||
sfuConfig,
|
||||
);
|
||||
// Start the connection immediately
|
||||
// Use connection state to track connection progress
|
||||
|
||||
@@ -77,8 +77,8 @@ describe("ECConnectionFactory - Audio inputs options", () => {
|
||||
noise,
|
||||
);
|
||||
ecConnectionFactory.createConnection(
|
||||
exampleTransport,
|
||||
testScope,
|
||||
exampleTransport,
|
||||
ownMemberMock,
|
||||
logger,
|
||||
);
|
||||
@@ -123,8 +123,8 @@ describe("ECConnectionFactory - ControlledAudioDevice", () => {
|
||||
false,
|
||||
);
|
||||
ecConnectionFactory.createConnection(
|
||||
exampleTransport,
|
||||
testScope,
|
||||
exampleTransport,
|
||||
ownMemberMock,
|
||||
logger,
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user