/* Copyright 2025 Element Creations Ltd. SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ import { type LocalParticipant as LocalLivekitParticipant, type RemoteParticipant as RemoteLivekitParticipant, } from "livekit-client"; import { type LivekitTransport, type CallMembership, } from "matrix-js-sdk/lib/matrixrtc"; import { combineLatest, filter, map, switchMap } from "rxjs"; import { logger as rootLogger } from "matrix-js-sdk/lib/logger"; import { sha256 } from "matrix-js-sdk/lib/digest"; import { encodeUnpaddedBase64Url } from "matrix-js-sdk"; import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager"; import { type Behavior } from "../../Behavior"; import { type IConnectionManager } from "./ConnectionManager"; import { Epoch, type ObservableScope } from "../../ObservableScope"; import { type Connection } from "./Connection"; import { generateItemsWithEpoch } from "../../../utils/observable"; const logger = rootLogger.getChild("[MatrixLivekitMembers]"); /** * Represents a Matrix call member and their associated LiveKit participation. * `livekitParticipant` can be undefined if the member is not yet connected to the livekit room * or if it has no livekit transport at all. */ export interface MatrixLivekitMember { membership$: Behavior; participant$: Behavior< LocalLivekitParticipant | RemoteLivekitParticipant | null >; connection$: Behavior; // participantId: string; We do not want a participantId here since it will be generated by the jwt // TODO decide if we can also drop the userId. Its in the matrix membership anyways. userId: string; } interface Props { scope: ObservableScope; membershipsWithTransport$: Behavior< Epoch<{ membership: CallMembership; transport?: LivekitTransport }[]> >; connectionManager: IConnectionManager; } /** * Combines MatrixRTC and Livekit worlds. * * It has a small public interface: * - in (via constructor): * - an observable of CallMembership[] to track the call members (The matrix side) * - a `ConnectionManager` for the lk rooms (The livekit side) * - out (via public Observable): * - `remoteMatrixLivekitMember` an observable of MatrixLivekitMember[] to track the remote members and associated livekit data. */ export function createMatrixLivekitMembers$({ scope, membershipsWithTransport$, connectionManager, }: Props): Behavior> { /** * This internal observable is used to compute the async sha256 hash of the user's identity. * a promise is treated like an observable. So we can switchMap on the promise from the identity computation. * The last update to `membershipsWithTransport$` will always be the last promise we pass to switchMap. * So we will eventually always end up with the latest memberships and their identities. */ const membershipsWithTransportAndLivekitIdentity$ = membershipsWithTransport$.pipe( switchMap(async (membershipsWithTransport) => { const { value, epoch } = membershipsWithTransport; const membershipsWithTransportAndLkIdentityPromises = value.map( async (obj) => { return computeLivekitParticipantIdentity( obj.membership, obj.membership.kind, ); }, ); const identities = await Promise.all( membershipsWithTransportAndLkIdentityPromises, ); const membershipsWithTransportAndLkIdentity = value.map( ({ transport, membership }, index) => { return { transport, membership, identity: identities[index] }; }, ); return new Epoch(membershipsWithTransportAndLkIdentity, epoch); }), ); /** * Stream of all the call members and their associated livekit data (if available). */ return scope.behavior( combineLatest([ membershipsWithTransportAndLivekitIdentity$, connectionManager.connectionManagerData$, ]).pipe( filter((values) => values.every((value) => value.epoch === values[0].epoch), ), map(([x, y]) => new Epoch([x.value, y.value] as const, x.epoch)), generateItemsWithEpoch( // Generator function. // creates an array of `{key, data}[]` // Each change in the keys (new key, missing key) will result in a call to the factory function. function* ([membershipsWithTransportAndLivekitIdentity, managerData]) { for (const { membership, transport, identity, } of membershipsWithTransportAndLivekitIdentity) { const participants = transport ? managerData.getParticipantForTransport(transport) : []; const participant = participants.find((p) => p.identity == identity) ?? null; const connection = transport ? managerData.getConnectionForTransport(transport) : null; yield { keys: [identity, membership.userId, membership.deviceId], data: { membership, participant, connection }, }; } }, // Each update where the key of the generator array do not change will result in updates to the `data$` observable in the factory. (scope, data$, identity, userId, deviceId) => { logger.debug( `Generating member for livekitIdentity: ${identity}, userId:deviceId: ${userId}${deviceId}`, ); // will only get called once per `participantId, userId` pair. // updates to data$ and as a result to displayName$ and mxcAvatarUrl$ are more frequent. return { identity, userId, ...scope.splitBehavior(data$), }; }, ), ), new Epoch([], -1), ); } // TODO add back in the callviewmodel pauseWhen(this.pretendToBeDisconnected$) // TODO add this to the JS-SDK export function areLivekitTransportsEqual( t1: LivekitTransport | null, t2: LivekitTransport | null, ): boolean { if (t1 && t2) return t1.livekit_service_url === t2.livekit_service_url; // In case we have different lk rooms in the same SFU (depends on the livekit authorization service) // It is only needed in case the livekit authorization service is not behaving as expected (or custom implementation) if (!t1 && !t2) return true; return false; } const livekitParticipantIdentityCache = new Map(); /** * The string that is computed based on the membership and used for the computing the hash. * `${userId}:${deviceId}:${membershipID}` * as the direct imput for: await sha256(input) */ export const livekitIdentityInput = ({ userId, deviceId, memberId, }: CallMembershipIdentityParts): string => `${userId}|${deviceId}|${memberId}`; export async function computeLivekitParticipantIdentity( membership: CallMembershipIdentityParts, kind: "rtc" | "session", ): Promise { switch (kind) { case "rtc": { const input = livekitIdentityInput(membership); if (livekitParticipantIdentityCache.size > 400) // prevent memory leaks in a stupid/simple way livekitParticipantIdentityCache.clear(); // TODO use non deprecated memberId if (livekitParticipantIdentityCache.has(input)) return livekitParticipantIdentityCache.get(input)!; else { const hashBuffer = await sha256(input); const hashedString = encodeUnpaddedBase64Url(hashBuffer); livekitParticipantIdentityCache.set(input, hashedString); return hashedString; } } case "session": default: return `${membership.userId}:${membership.deviceId}`; } }