This commit is contained in:
Timo K
2025-12-01 12:43:17 +01:00
parent 2d8ffc0ccd
commit 284a52c23c
12 changed files with 296 additions and 251 deletions

View File

@@ -382,17 +382,15 @@ describe("Publishing participants observations", () => {
const bobIsAPublisher = Promise.withResolvers<void>();
const danIsAPublisher = Promise.withResolvers<void>();
const observedPublishers: PublishingParticipant[][] = [];
const s = connection.remoteParticipantsWithTracks$.subscribe(
(publishers) => {
observedPublishers.push(publishers);
if (publishers.some((p) => p.identity === "@bob:example.org:DEV111")) {
bobIsAPublisher.resolve();
}
if (publishers.some((p) => p.identity === "@dan:example.org:DEV333")) {
danIsAPublisher.resolve();
}
},
);
const s = connection.remoteParticipants$.subscribe((publishers) => {
observedPublishers.push(publishers);
if (publishers.some((p) => p.identity === "@bob:example.org:DEV111")) {
bobIsAPublisher.resolve();
}
if (publishers.some((p) => p.identity === "@dan:example.org:DEV333")) {
danIsAPublisher.resolve();
}
});
onTestFinished(() => s.unsubscribe());
// The publishingParticipants$ observable is derived from the current members of the
// livekitRoom and the rtc membership in order to publish the members that are publishing
@@ -437,11 +435,9 @@ describe("Publishing participants observations", () => {
const connection = setupRemoteConnection();
let observedPublishers: PublishingParticipant[][] = [];
const s = connection.remoteParticipantsWithTracks$.subscribe(
(publishers) => {
observedPublishers.push(publishers);
},
);
const s = connection.remoteParticipants$.subscribe((publishers) => {
observedPublishers.push(publishers);
});
onTestFinished(() => s.unsubscribe());
let participants: RemoteParticipant[] = [

View File

@@ -19,7 +19,7 @@ import {
RoomEvent,
} from "livekit-client";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, map, type Observable } from "rxjs";
import { BehaviorSubject, type Observable } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import {
@@ -146,6 +146,10 @@ export class Connection {
transport: this.transport,
livekitConnectionState$: connectionStateObserver(this.livekitRoom),
});
this.logger.info(
"Connected to LiveKit room",
this.transport.livekit_service_url,
);
} catch (error) {
this.logger.debug(`Failed to connect to LiveKit room: ${error}`);
this._state$.next({
@@ -189,9 +193,7 @@ export class Connection {
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
* It filters the participants to only those that are associated with a membership that claims to publish on this connection.
*/
public readonly remoteParticipantsWithTracks$: Behavior<
PublishingParticipant[]
>;
public readonly remoteParticipants$: Behavior<PublishingParticipant[]>;
/**
* The media transport to connect to.
@@ -213,7 +215,7 @@ export class Connection {
public constructor(opts: ConnectionOpts, logger: Logger) {
this.logger = logger.getChild("[Connection]");
this.logger.info(
`[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
`Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`,
);
const { transport, client, scope } = opts;
@@ -223,20 +225,21 @@ export class Connection {
// REMOTE participants with track!!!
// this.remoteParticipantsWithTracks$
this.remoteParticipantsWithTracks$ = scope.behavior(
this.remoteParticipants$ = scope.behavior(
// only tracks remote participants
connectedParticipantsObserver(this.livekitRoom, {
additionalRoomEvents: [
RoomEvent.TrackPublished,
RoomEvent.TrackUnpublished,
],
}).pipe(
map((participants) => {
return participants.filter(
(participant) => participant.getTrackPublications().length > 0,
);
}),
),
}),
// .pipe(
// map((participants) => {
// return participants.filter(
// (participant) => participant.getTrackPublications().length > 0,
// );
// }),
// )
[],
);

View File

@@ -13,7 +13,7 @@ import {
type BaseKeyProvider,
} from "livekit-client";
import { type Logger } from "matrix-js-sdk/lib/logger";
import E2EEWorker from "livekit-client/e2ee-worker?worker";
import E2EEWorker from "livekit-client/e2ee-worker?worker&inline";
import { type ObservableScope } from "../../ObservableScope.ts";
import { Connection } from "./Connection.ts";

View File

@@ -289,47 +289,47 @@ describe("connectionManagerData$ stream", () => {
a: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(0);
expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0);
expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(0);
expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(0);
return true;
}),
b: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1);
expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(0);
expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
"user1A",
);
expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(1);
expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(0);
expect(
data.getParticipantsForTransport(TRANSPORT_1)[0].identity,
).toBe("user1A");
return true;
}),
c: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(1);
expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1);
expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
"user1A",
);
expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe(
"user2A",
);
expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(1);
expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(1);
expect(
data.getParticipantsForTransport(TRANSPORT_1)[0].identity,
).toBe("user1A");
expect(
data.getParticipantsForTransport(TRANSPORT_2)[0].identity,
).toBe("user2A");
return true;
}),
d: expect.toSatisfy((e) => {
const data: ConnectionManagerData = e.value;
expect(data.getConnections().length).toBe(2);
expect(data.getParticipantForTransport(TRANSPORT_1).length).toBe(2);
expect(data.getParticipantForTransport(TRANSPORT_2).length).toBe(1);
expect(data.getParticipantForTransport(TRANSPORT_1)[0].identity).toBe(
"user1A",
);
expect(data.getParticipantForTransport(TRANSPORT_1)[1].identity).toBe(
"user1B",
);
expect(data.getParticipantForTransport(TRANSPORT_2)[0].identity).toBe(
"user2A",
);
expect(data.getParticipantsForTransport(TRANSPORT_1).length).toBe(2);
expect(data.getParticipantsForTransport(TRANSPORT_2).length).toBe(1);
expect(
data.getParticipantsForTransport(TRANSPORT_1)[0].identity,
).toBe("user1A");
expect(
data.getParticipantsForTransport(TRANSPORT_1)[1].identity,
).toBe("user1B");
expect(
data.getParticipantsForTransport(TRANSPORT_2)[0].identity,
).toBe("user2A");
return true;
}),
});

View File

@@ -24,7 +24,10 @@ import { type ConnectionFactory } from "./ConnectionFactory.ts";
export class ConnectionManagerData {
private readonly store: Map<
string,
[Connection, (LocalParticipant | RemoteParticipant)[]]
{
connection: Connection;
participants: (LocalParticipant | RemoteParticipant)[];
}
> = new Map();
public constructor() {}
@@ -36,9 +39,9 @@ export class ConnectionManagerData {
const key = this.getKey(connection.transport);
const existing = this.store.get(key);
if (!existing) {
this.store.set(key, [connection, participants]);
this.store.set(key, { connection, participants });
} else {
existing[1].push(...participants);
existing.participants.push(...participants);
}
}
@@ -47,25 +50,26 @@ export class ConnectionManagerData {
}
public getConnections(): Connection[] {
return Array.from(this.store.values()).map(([connection]) => connection);
return Array.from(this.store.values()).map(({ connection }) => connection);
}
public getConnectionForTransport(
transport: LivekitTransport,
): Connection | null {
return this.store.get(this.getKey(transport))?.[0] ?? null;
return this.store.get(this.getKey(transport))?.connection ?? null;
}
public getParticipantForTransport(
public getParticipantsForTransport(
transport: LivekitTransport,
): (LocalParticipant | RemoteParticipant)[] {
const key = transport.livekit_service_url + "|" + transport.livekit_alias;
const existing = this.store.get(key);
if (existing) {
return existing[1];
return existing.participants;
}
return [];
}
/**
* Get all connections where the given participant is publishing.
* In theory, there could be several connections where the same participant is publishing but with
@@ -76,8 +80,12 @@ export class ConnectionManagerData {
participantId: ParticipantId,
): Connection[] {
const connections: Connection[] = [];
for (const [connection, participants] of this.store.values()) {
if (participants.some((p) => p.identity === participantId)) {
for (const { connection, participants } of this.store.values()) {
if (
participants.some(
(participant) => participant?.identity === participantId,
)
) {
connections.push(connection);
}
}
@@ -183,23 +191,24 @@ export function createConnectionManager$({
const epoch = connections.epoch;
// Map the connections to list of {connection, participants}[]
const listOfConnectionsWithPublishingParticipants =
connections.value.map((connection) => {
return connection.remoteParticipantsWithTracks$.pipe(
const listOfConnectionsWithParticipants = connections.value.map(
(connection) => {
return connection.remoteParticipants$.pipe(
map((participants) => ({
connection,
participants,
})),
);
});
},
);
// probably not required
if (listOfConnectionsWithPublishingParticipants.length === 0) {
if (listOfConnectionsWithParticipants.length === 0) {
return of(new Epoch(new ConnectionManagerData(), epoch));
}
// combineLatest the several streams into a single stream with the ConnectionManagerData
return combineLatest(listOfConnectionsWithPublishingParticipants).pipe(
return combineLatest(listOfConnectionsWithParticipants).pipe(
map(
(lists) =>
new Epoch(

View File

@@ -91,7 +91,7 @@ test("should signal participant not yet connected to livekit", () => {
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
@@ -99,21 +99,24 @@ test("should signal participant not yet connected to livekit", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: null,
});
return true;
}),
});
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: null,
});
return true;
}),
},
);
});
});
@@ -171,7 +174,7 @@ test("should signal participant on a connection that is publishing", () => {
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
@@ -179,25 +182,28 @@ test("should signal participant on a connection that is publishing", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: expect.toSatisfy((participant) => {
expect(participant).toBeDefined();
expect(participant!.identity).toEqual(bobParticipantId);
return true;
}),
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
});
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: expect.toSatisfy((participant) => {
expect(participant).toBeDefined();
expect(participant!.identity).toEqual(bobParticipantId);
return true;
}),
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
},
);
});
});
@@ -222,7 +228,7 @@ test("should signal participant on a connection that is not publishing", () => {
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(membershipsWithTransport$),
connectionManager: {
@@ -230,21 +236,24 @@ test("should signal participant on a connection that is not publishing", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe("a", {
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
});
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
expect(data.length).toEqual(1);
expectObservable(data[0].membership$).toBe("a", {
a: bobMembership,
});
expectObservable(data[0].participant$).toBe("a", {
a: null,
});
expectObservable(data[0].connection$).toBe("a", {
a: connection,
});
return true;
}),
},
);
});
});
@@ -283,7 +292,7 @@ describe("Publication edge case", () => {
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(
membershipsWithTransport$,
@@ -293,7 +302,7 @@ describe("Publication edge case", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe(
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {
@@ -349,7 +358,7 @@ describe("Publication edge case", () => {
}),
);
const matrixLivekitMember$ = createMatrixLivekitMembers$({
const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$: testScope.behavior(
membershipsWithTransport$,
@@ -359,7 +368,7 @@ describe("Publication edge case", () => {
} as unknown as IConnectionManager,
});
expectObservable(matrixLivekitMember$.pipe(map((e) => e.value))).toBe(
expectObservable(matrixLivekitMembers$.pipe(map((e) => e.value))).toBe(
"a",
{
a: expect.toSatisfy((data: MatrixLivekitMember[]) => {

View File

@@ -61,12 +61,12 @@ export function createMatrixLivekitMembers$({
scope,
membershipsWithTransport$,
connectionManager,
}: Props): Behavior<Epoch<MatrixLivekitMember[]>> {
}: Props): { matrixLivekitMembers$: Behavior<Epoch<MatrixLivekitMember[]>> } {
/**
* Stream of all the call members and their associated livekit data (if available).
*/
return scope.behavior(
const matrixLivekitMembers$ = scope.behavior(
combineLatest([
membershipsWithTransport$,
connectionManager.connectionManagerData$,
@@ -91,7 +91,7 @@ export function createMatrixLivekitMembers$({
const participantId = /*membership.membershipID*/ `${membership.userId}:${membership.deviceId}`;
const participants = transport
? managerData.getParticipantForTransport(transport)
? managerData.getParticipantsForTransport(transport)
: [];
const participant =
participants.find((p) => p.identity == participantId) ?? null;
@@ -121,6 +121,11 @@ export function createMatrixLivekitMembers$({
),
),
);
return {
matrixLivekitMembers$,
// TODO add only publishing participants... maybe. disucss at least
// scope.behavior(matrixLivekitMembers$.pipe(map((items) => items.value.map((i)=>{ i.}))))
};
}
// TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$)

View File

@@ -124,14 +124,14 @@ test("bob, carl, then bob joining no tracks yet", () => {
logger: logger,
});
const matrixLivekitItems$ = createMatrixLivekitMembers$({
const { matrixLivekitMembers$ } = createMatrixLivekitMembers$({
scope: testScope,
membershipsWithTransport$:
membershipsAndTransports.membershipsWithTransport$,
connectionManager,
});
expectObservable(matrixLivekitItems$).toBe(vMarble, {
expectObservable(matrixLivekitMembers$).toBe(vMarble, {
a: expect.toSatisfy((e: Epoch<MatrixLivekitMember[]>) => {
const items = e.value;
expect(items.length).toBe(1);