✨(backend) add telephony service for automatic SIP dispatch rules
Implemented a service that automatically creates a SIP dispatch rule when the first WebRTC participant joins a room and removes it when the room becomes empty. Why? I don’t want a SIP participant to join an empty room. The PIN code could be easily leaked, and there is currently no lobby mechanism available for SIP participants. A WebRTC participant is still required to create a room. This behavior is inspired by a proprietary tool. The service uses LiveKit’s webhook notification system to react to room lifecycle events. This is a naive implementation that currently supports only a single SIP trunk and will require refactoring to support multiple trunks. When no trunk is specified, rules are created by default on a fallback trunk. @rouja wrote a minimal Helm chart for LiveKit SIP with Asterisk, which couldn’t be versioned yet due to embedded credentials. I deployed it locally and successfully tested the integration with a remote OVH SIP trunk. One point to note: LiveKit lacks advanced filtering capabilities when listing dispatch rules. Their recommendation is to fetch all rules and filter them within your backend logic. I’ve opened a feature request asking for at least the ability to filter dispatch rules by room, since filtering by trunk is already supported, room-based filtering feels like a natural addition. Until there's an update, I prefer to keep the implementation simple. It works well at our current scale, and can be refactored when higher load or multi-trunk support becomes necessary. While caching dispatch rule IDs could be a performance optimization, I feel it would be premature and potentially error-prone due to the complexity of invalidation. If performance becomes an issue, I’ll consider introducing caching at that point. To handle the edge case where multiple dispatch rules with different PIN codes are present, the service performs an extensive cleanup during room creation to ensure SIP routing remains clean and predictable. This edge case should not happen. In the 'delete_dispatch_rule' if deleting one rule fails, method would exit without deleting the other rules. It's okay IMO for a first iteration. If multiple dispatch rules are often found for room, I would enhance this part.
This commit is contained in:
committed by
aleb_the_flash
parent
d3178eff5d
commit
988e5aa256
@@ -2,12 +2,18 @@
|
||||
|
||||
import uuid
|
||||
from enum import Enum
|
||||
from logging import getLogger
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from livekit import api
|
||||
|
||||
from core import models
|
||||
|
||||
from .lobby import LobbyService
|
||||
from .telephony import TelephonyException, TelephonyService
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
class LiveKitWebhookError(Exception):
|
||||
@@ -77,6 +83,7 @@ class LiveKitEventsService:
|
||||
)
|
||||
self.webhook_receiver = api.WebhookReceiver(token_verifier)
|
||||
self.lobby_service = LobbyService()
|
||||
self.telephony_service = TelephonyService()
|
||||
|
||||
def receive(self, request):
|
||||
"""Process webhook and route to appropriate handler."""
|
||||
@@ -108,10 +115,54 @@ class LiveKitEventsService:
|
||||
# pylint: disable=not-callable
|
||||
handler(data)
|
||||
|
||||
def _handle_room_finished(self, data):
|
||||
"""Handle 'room_finished' event."""
|
||||
def _handle_room_started(self, data):
|
||||
"""Handle 'room_started' event."""
|
||||
|
||||
try:
|
||||
room_id = uuid.UUID(data.room.name)
|
||||
except ValueError as e:
|
||||
logger.warning(
|
||||
"Ignoring room event: room name '%s' is not a valid UUID format.",
|
||||
data.room.name,
|
||||
)
|
||||
raise ActionFailedError("Failed to process room started event") from e
|
||||
|
||||
try:
|
||||
room = models.Room.objects.get(id=room_id)
|
||||
except models.Room.DoesNotExist as err:
|
||||
raise ActionFailedError(f"Room with ID {room_id} does not exist") from err
|
||||
|
||||
if settings.ROOM_TELEPHONY_ENABLED:
|
||||
try:
|
||||
self.telephony_service.create_dispatch_rule(room)
|
||||
except TelephonyException as e:
|
||||
raise ActionFailedError(
|
||||
f"Failed to create telephony dispatch rule for room {room_id}"
|
||||
) from e
|
||||
|
||||
def _handle_room_finished(self, data):
|
||||
"""Handle 'room_finished' event."""
|
||||
|
||||
try:
|
||||
room_id = uuid.UUID(data.room.name)
|
||||
except ValueError as e:
|
||||
logger.warning(
|
||||
"Ignoring room event: room name '%s' is not a valid UUID format.",
|
||||
data.room.name,
|
||||
)
|
||||
raise ActionFailedError("Failed to process room finished event") from e
|
||||
|
||||
if settings.ROOM_TELEPHONY_ENABLED:
|
||||
try:
|
||||
self.telephony_service.delete_dispatch_rule(room_id)
|
||||
except TelephonyException as e:
|
||||
raise ActionFailedError(
|
||||
f"Failed to delete telephony dispatch rule for room {room_id}"
|
||||
) from e
|
||||
|
||||
try:
|
||||
self.lobby_service.clear_room_cache(room_id)
|
||||
except Exception as e:
|
||||
raise ActionFailedError("Failed to process room finished event") from e
|
||||
raise ActionFailedError(
|
||||
f"Failed to clear room cache for room {room_id}"
|
||||
) from e
|
||||
|
||||
124
src/backend/core/services/telephony.py
Normal file
124
src/backend/core/services/telephony.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""Telephony service for managing SIP dispatch rules for room access."""
|
||||
|
||||
from logging import getLogger
|
||||
|
||||
from asgiref.sync import async_to_sync
|
||||
from livekit.api import TwirpError
|
||||
from livekit.protocol.sip import (
|
||||
CreateSIPDispatchRuleRequest,
|
||||
DeleteSIPDispatchRuleRequest,
|
||||
ListSIPDispatchRuleRequest,
|
||||
SIPDispatchRule,
|
||||
SIPDispatchRuleDirect,
|
||||
)
|
||||
|
||||
from core import utils
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
class TelephonyException(Exception):
|
||||
"""Exception raised when telephony operations fail."""
|
||||
|
||||
|
||||
class TelephonyService:
|
||||
"""Service for managing participant access through the telephony system (SIP)."""
|
||||
|
||||
def _rule_name(self, room_id):
|
||||
"""Generate the rule name for a room based on its ID."""
|
||||
return f"SIP_{str(room_id)}"
|
||||
|
||||
@async_to_sync
|
||||
async def create_dispatch_rule(self, room):
|
||||
"""Create a SIP inbound dispatch rule for direct room routing.
|
||||
|
||||
Configures telephony to route incoming SIP calls directly to the specified room
|
||||
using the room's ID and PIN code for authentication.
|
||||
"""
|
||||
|
||||
direct_rule = SIPDispatchRule(
|
||||
dispatch_rule_direct=SIPDispatchRuleDirect(
|
||||
room_name=str(room.id), pin=str(room.pin_code)
|
||||
)
|
||||
)
|
||||
|
||||
request = CreateSIPDispatchRuleRequest(
|
||||
rule=direct_rule, name=self._rule_name(room.id)
|
||||
)
|
||||
|
||||
lkapi = utils.create_livekit_client()
|
||||
|
||||
try:
|
||||
await lkapi.sip.create_sip_dispatch_rule(create=request)
|
||||
except TwirpError as e:
|
||||
logger.exception(
|
||||
"Unexpected error creating dispatch rule for room %s", room.id
|
||||
)
|
||||
raise TelephonyException("Could not create dispatch rule") from e
|
||||
|
||||
finally:
|
||||
await lkapi.aclose()
|
||||
|
||||
async def _list_dispatch_rules_ids(self, room_id):
|
||||
"""List SIP dispatch rule IDs for a specific room.
|
||||
|
||||
Fetches all existing SIP dispatch rules and filters them by room name
|
||||
since LiveKit API doesn't support server-side filtering by 'room_name'.
|
||||
This approach is acceptable for moderate scale but may need refactoring
|
||||
for high-volume scenarios.
|
||||
|
||||
Note:
|
||||
Feature request for server-side filtering: livekit/sip#405
|
||||
"""
|
||||
|
||||
lkapi = utils.create_livekit_client()
|
||||
|
||||
try:
|
||||
existing_rules = await lkapi.sip.list_sip_dispatch_rule(
|
||||
list=ListSIPDispatchRuleRequest()
|
||||
)
|
||||
except TwirpError as e:
|
||||
logger.exception("Failed to list dispatch rules for room %s", room_id)
|
||||
raise TelephonyException("Could not list dispatch rules") from e
|
||||
finally:
|
||||
await lkapi.aclose()
|
||||
|
||||
if not existing_rules or not existing_rules.items:
|
||||
return []
|
||||
|
||||
rule_name = self._rule_name(room_id)
|
||||
|
||||
return [
|
||||
existing_rule.sip_dispatch_rule_id
|
||||
for existing_rule in existing_rules.items
|
||||
if existing_rule.name == rule_name
|
||||
]
|
||||
|
||||
@async_to_sync
|
||||
async def delete_dispatch_rule(self, room_id):
|
||||
"""Delete all SIP inbound dispatch rules associated with a specific room."""
|
||||
|
||||
rules_ids = await self._list_dispatch_rules_ids(room_id)
|
||||
|
||||
if not rules_ids:
|
||||
logger.info("No dispatch rules found for room %s", room_id)
|
||||
return False
|
||||
|
||||
if len(rules_ids) > 1:
|
||||
logger.error("Multiple dispatch rules found for room %s", room_id)
|
||||
|
||||
lkapi = utils.create_livekit_client()
|
||||
try:
|
||||
for rule_id in rules_ids:
|
||||
await lkapi.sip.delete_sip_dispatch_rule(
|
||||
delete=DeleteSIPDispatchRuleRequest(sip_dispatch_rule_id=rule_id)
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
except TwirpError as e:
|
||||
logger.exception("Failed to delete dispatch rules for room %s", room_id)
|
||||
raise TelephonyException("Could not delete dispatch rules") from e
|
||||
|
||||
finally:
|
||||
await lkapi.aclose()
|
||||
Reference in New Issue
Block a user