♻️(backend) extract notify_participant to util function

Move from lobby service to utils for reuse across services. Method is
generic enough for utility status. Future: create dedicated LiveKit
service to encapsulate all LiveKit-related utilities.
This commit is contained in:
lebaudantoine
2025-07-14 17:36:34 +02:00
committed by aleb_the_flash
parent 85bde9633f
commit 17c486f7bf
5 changed files with 171 additions and 181 deletions

View File

@@ -15,7 +15,15 @@ from django.core.files.storage import default_storage
import aiohttp
import botocore
from livekit.api import AccessToken, LiveKitAPI, VideoGrants
from asgiref.sync import async_to_sync
from livekit.api import ( # pylint: disable=E0611
AccessToken,
ListRoomsRequest,
LiveKitAPI,
SendDataRequest,
TwirpError,
VideoGrants,
)
def generate_color(identity: str) -> str:
@@ -158,3 +166,37 @@ def create_livekit_client(custom_configuration=None):
configuration = custom_configuration or settings.LIVEKIT_CONFIGURATION
return LiveKitAPI(session=custom_session, **configuration)
class NotificationError(Exception):
"""Notification delivery to room participants fails."""
@async_to_sync
async def notify_participants(room_name: str, notification_data: dict):
"""Send notification data to all participants in a LiveKit room."""
lkapi = create_livekit_client()
try:
room_response = await lkapi.room.list_rooms(
ListRoomsRequest(
names=[room_name],
)
)
# Check if the room exists
if not room_response.rooms:
return
await lkapi.room.send_data(
SendDataRequest(
room=room_name,
data=json.dumps(notification_data).encode("utf-8"),
kind="RELIABLE",
)
)
except TwirpError as e:
raise NotificationError("Failed to notify room participants") from e
finally:
await lkapi.aclose()