small refactor to make it testable.
This commit is contained in:
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
Copyright 2025 New Vector Ltd.
|
||||
|
||||
SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
|
||||
Please see LICENSE in the repository root for full details.
|
||||
*/
|
||||
|
||||
import { encodeUnpaddedBase64Url } from "matrix-js-sdk";
|
||||
import { sha256 } from "matrix-js-sdk/lib/digest";
|
||||
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
|
||||
import { from, type Observable } from "rxjs";
|
||||
|
||||
const livekitParticipantIdentityCache = new Map<string, string>();
|
||||
|
||||
/**
|
||||
* The string that is computed based on the membership and used for the computing the hash.
|
||||
* `${userId}:${deviceId}:${membershipID}`
|
||||
* as the direct imput for: await sha256(input)
|
||||
*/
|
||||
export const livekitIdentityInput = ({
|
||||
userId,
|
||||
deviceId,
|
||||
memberId,
|
||||
}: CallMembershipIdentityParts): string => `${userId}|${deviceId}|${memberId}`;
|
||||
|
||||
export function computeLivekitParticipantIdentity$(
|
||||
membership: CallMembershipIdentityParts,
|
||||
kind: "rtc" | "session",
|
||||
): Observable<string> {
|
||||
const compute = async (): Promise<string> => {
|
||||
switch (kind) {
|
||||
case "rtc": {
|
||||
const input = livekitIdentityInput(membership);
|
||||
if (livekitParticipantIdentityCache.size > 400)
|
||||
// prevent memory leaks in a stupid/simple way
|
||||
livekitParticipantIdentityCache.clear();
|
||||
// TODO use non deprecated memberId
|
||||
if (livekitParticipantIdentityCache.has(input))
|
||||
return livekitParticipantIdentityCache.get(input)!;
|
||||
else {
|
||||
const hashBuffer = await sha256(input);
|
||||
const hashedString = encodeUnpaddedBase64Url(hashBuffer);
|
||||
livekitParticipantIdentityCache.set(input, hashedString);
|
||||
return hashedString;
|
||||
}
|
||||
}
|
||||
case "session":
|
||||
default:
|
||||
return `${membership.userId}:${membership.deviceId}`;
|
||||
}
|
||||
};
|
||||
return from(compute());
|
||||
}
|
||||
@@ -28,7 +28,6 @@ import {
|
||||
flushPromises,
|
||||
mockCallMembership,
|
||||
mockRemoteParticipant,
|
||||
withTestScheduler,
|
||||
} from "../../../utils/test.ts";
|
||||
import { type Connection } from "./Connection.ts";
|
||||
import { constant } from "../../Behavior.ts";
|
||||
|
||||
@@ -15,15 +15,13 @@ import {
|
||||
} from "matrix-js-sdk/lib/matrixrtc";
|
||||
import { combineLatest, filter, map, switchMap } from "rxjs";
|
||||
import { logger as rootLogger } from "matrix-js-sdk/lib/logger";
|
||||
import { sha256 } from "matrix-js-sdk/lib/digest";
|
||||
import { encodeUnpaddedBase64Url } from "matrix-js-sdk";
|
||||
import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager";
|
||||
|
||||
import { type Behavior } from "../../Behavior";
|
||||
import { type IConnectionManager } from "./ConnectionManager";
|
||||
import { Epoch, type ObservableScope } from "../../ObservableScope";
|
||||
import { type Connection } from "./Connection";
|
||||
import { generateItemsWithEpoch } from "../../../utils/observable";
|
||||
import { computeLivekitParticipantIdentity$ } from "./LivekitParticipantIdentity";
|
||||
|
||||
const logger = rootLogger.getChild("[MatrixLivekitMembers]");
|
||||
|
||||
@@ -73,25 +71,28 @@ export function createMatrixLivekitMembers$({
|
||||
*/
|
||||
const membershipsWithTransportAndLivekitIdentity$ =
|
||||
membershipsWithTransport$.pipe(
|
||||
switchMap(async (membershipsWithTransport) => {
|
||||
switchMap((membershipsWithTransport) => {
|
||||
const { value, epoch } = membershipsWithTransport;
|
||||
const membershipsWithTransportAndLkIdentityPromises = value.map(
|
||||
async (obj) => {
|
||||
return computeLivekitParticipantIdentity(
|
||||
(obj) => {
|
||||
return computeLivekitParticipantIdentity$(
|
||||
obj.membership,
|
||||
obj.membership.kind,
|
||||
);
|
||||
},
|
||||
);
|
||||
const identities = await Promise.all(
|
||||
return combineLatest(
|
||||
membershipsWithTransportAndLkIdentityPromises,
|
||||
).pipe(
|
||||
map((identities) => {
|
||||
const membershipsWithTransportAndLkIdentity = value.map(
|
||||
({ transport, membership }, index) => {
|
||||
return { transport, membership, identity: identities[index] };
|
||||
},
|
||||
);
|
||||
return new Epoch(membershipsWithTransportAndLkIdentity, epoch);
|
||||
}),
|
||||
);
|
||||
const membershipsWithTransportAndLkIdentity = value.map(
|
||||
({ transport, membership }, index) => {
|
||||
return { transport, membership, identity: identities[index] };
|
||||
},
|
||||
);
|
||||
return new Epoch(membershipsWithTransportAndLkIdentity, epoch);
|
||||
}),
|
||||
);
|
||||
|
||||
@@ -164,42 +165,3 @@ export function areLivekitTransportsEqual(
|
||||
if (!t1 && !t2) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
const livekitParticipantIdentityCache = new Map<string, string>();
|
||||
|
||||
/**
|
||||
* The string that is computed based on the membership and used for the computing the hash.
|
||||
* `${userId}:${deviceId}:${membershipID}`
|
||||
* as the direct imput for: await sha256(input)
|
||||
*/
|
||||
export const livekitIdentityInput = ({
|
||||
userId,
|
||||
deviceId,
|
||||
memberId,
|
||||
}: CallMembershipIdentityParts): string => `${userId}|${deviceId}|${memberId}`;
|
||||
|
||||
export async function computeLivekitParticipantIdentity(
|
||||
membership: CallMembershipIdentityParts,
|
||||
kind: "rtc" | "session",
|
||||
): Promise<string> {
|
||||
switch (kind) {
|
||||
case "rtc": {
|
||||
const input = livekitIdentityInput(membership);
|
||||
if (livekitParticipantIdentityCache.size > 400)
|
||||
// prevent memory leaks in a stupid/simple way
|
||||
livekitParticipantIdentityCache.clear();
|
||||
// TODO use non deprecated memberId
|
||||
if (livekitParticipantIdentityCache.has(input))
|
||||
return livekitParticipantIdentityCache.get(input)!;
|
||||
else {
|
||||
const hashBuffer = await sha256(input);
|
||||
const hashedString = encodeUnpaddedBase64Url(hashBuffer);
|
||||
livekitParticipantIdentityCache.set(input, hashedString);
|
||||
return hashedString;
|
||||
}
|
||||
}
|
||||
case "session":
|
||||
default:
|
||||
return `${membership.userId}:${membership.deviceId}`;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import { ECConnectionFactory } from "./ConnectionFactory.ts";
|
||||
import { type OpenIDClientParts } from "../../../livekit/openIDSFU.ts";
|
||||
import {
|
||||
mockCallMembership,
|
||||
mockComputeLivekitParticipantIdentity$,
|
||||
mockMediaDevices,
|
||||
withTestScheduler,
|
||||
} from "../../../utils/test.ts";
|
||||
@@ -43,6 +44,11 @@ let lkRoomFactory: () => LivekitRoom;
|
||||
|
||||
const createdMockLivekitRooms: Map<string, LivekitRoom> = new Map();
|
||||
|
||||
vi.mock(import("./LivekitParticipantIdentity.ts"), async (importOriginal) => ({
|
||||
...(await importOriginal()),
|
||||
computeLivekitParticipantIdentity$: mockComputeLivekitParticipantIdentity$,
|
||||
}));
|
||||
|
||||
beforeEach(() => {
|
||||
testScope = new ObservableScope();
|
||||
mockClient = {
|
||||
|
||||
Reference in New Issue
Block a user