♻️(backend) update BaseEgress to use custom session from livekit-api
Refactor BaseEgress class to leverage latest livekit-api client's custom session support. Simplifies code by using built-in capability to disable SSL verification in development environments instead of previous workaround.
This commit is contained in:
committed by
aleb_the_flash
parent
a83e5c4b1c
commit
2ef95aa835
@@ -17,7 +17,6 @@ class WorkerServiceConfig:
|
||||
|
||||
output_folder: str
|
||||
server_configurations: Dict[str, Any]
|
||||
verify_ssl: Optional[bool]
|
||||
bucket_args: Optional[dict]
|
||||
|
||||
@classmethod
|
||||
@@ -29,7 +28,6 @@ class WorkerServiceConfig:
|
||||
return cls(
|
||||
output_folder=settings.RECORDING_OUTPUT_FOLDER,
|
||||
server_configurations=settings.LIVEKIT_CONFIGURATION,
|
||||
verify_ssl=settings.RECORDING_VERIFY_SSL,
|
||||
bucket_args={
|
||||
"endpoint": settings.AWS_S3_ENDPOINT_URL,
|
||||
"access_key": settings.AWS_S3_ACCESS_KEY_ID,
|
||||
|
||||
@@ -2,10 +2,11 @@
|
||||
|
||||
# pylint: disable=no-member
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
import aiohttp
|
||||
from asgiref.sync import async_to_sync
|
||||
from livekit import api as livekit_api
|
||||
from livekit.api.egress_service import EgressService
|
||||
|
||||
from ..enums import FileExtension
|
||||
from .exceptions import WorkerConnectionError, WorkerResponseError
|
||||
@@ -29,21 +30,29 @@ class BaseEgressService:
|
||||
async def _handle_request(self, request, method_name: str):
|
||||
"""Handle making a request to the LiveKit API and returns the response."""
|
||||
|
||||
# Use HTTP connector for local development with Tilt,
|
||||
# where cluster communications are unsecure
|
||||
connector = aiohttp.TCPConnector(ssl=self._config.verify_ssl)
|
||||
custom_session = None
|
||||
if not settings.LIVEKIT_VERIFY_SSL:
|
||||
connector = aiohttp.TCPConnector(ssl=False)
|
||||
custom_session = aiohttp.ClientSession(connector=connector)
|
||||
|
||||
async with aiohttp.ClientSession(connector=connector) as session:
|
||||
client = EgressService(session, **self._config.server_configurations)
|
||||
method = getattr(client, method_name)
|
||||
try:
|
||||
response = await method(request)
|
||||
except livekit_api.TwirpError as e:
|
||||
raise WorkerConnectionError(
|
||||
f"LiveKit client connection error, {e.message}."
|
||||
) from e
|
||||
lkapi = livekit_api.LiveKitAPI(
|
||||
session=custom_session, **self._config.server_configurations
|
||||
)
|
||||
|
||||
# ruff: noqa: SLF001
|
||||
# pylint: disable=protected-access
|
||||
method = getattr(lkapi._egress, method_name)
|
||||
|
||||
try:
|
||||
response = await method(request)
|
||||
return response
|
||||
except livekit_api.TwirpError as e:
|
||||
raise WorkerConnectionError(
|
||||
f"LiveKit client connection error, {e.message}."
|
||||
) from e
|
||||
|
||||
finally:
|
||||
await lkapi.aclose()
|
||||
|
||||
def stop(self, worker_id: str) -> str:
|
||||
"""Stop an ongoing egress worker.
|
||||
|
||||
Reference in New Issue
Block a user