From ea2e5e8609fec81bb19b53396d9fa4051d88036e Mon Sep 17 00:00:00 2001 From: lebaudantoine Date: Fri, 20 Jun 2025 15:54:48 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8(agents)=20initialize=20LiveKit=20agen?= =?UTF-8?q?t=20from=20multi-user=20transcriber=20example?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create Python script based on LiveKit's multi-user transcriber example with enhanced request_fnc handler that ensures job uniqueness by room. A transcriber sends segments to every participant present in a room and transcribes every participant's audio. We don't need several transcribers in the same room. Made the worker hidden - by default it uses auto dispatch and is visible as any other participant, but having a transcriber participant would be weird since no other videoconference tool treats this feature as a bot participant joining a call. Job uniqueness is ensured using agent identity by forging a deterministic identity for each transcriber by room. This makes sure two transcribers would never be able to join the same room. It might be a bit harsh, but our API calling to list participants before accepting a new transcription job should already filter out situations where an agent is triggered twice. We chose explicit worker orchestration over auto-dispatch because we want to keep control of this feature which will be challenging to scale. LiveKit agent scaling is documented but we need to experiment in real life situations with their Worker/Job mechanism. Currently uses Deepgram since Arnaud's draft Kyutai plugin isn't ready for production. This allows our ops team to advance on deploying and monitoring agents. Deepgram was a random choice offering 200 hours free, though it only works for English. ASR provider needs to be refactored as a pluggable system selectable through environment variables or settings. Agent dispatch will be triggered via a new REST API endpoint to our backend. This is quite a first naive version of a minimal dockerized LiveKit agent to start playing with the framework. --- src/agents/Dockerfile | 24 ++++ src/agents/multi-user-transcriber.py | 189 +++++++++++++++++++++++++++ src/agents/pyproject.toml | 51 ++++++++ 3 files changed, 264 insertions(+) create mode 100644 src/agents/Dockerfile create mode 100644 src/agents/multi-user-transcriber.py create mode 100644 src/agents/pyproject.toml diff --git a/src/agents/Dockerfile b/src/agents/Dockerfile new file mode 100644 index 00000000..dcd2ebf2 --- /dev/null +++ b/src/agents/Dockerfile @@ -0,0 +1,24 @@ +FROM python:3.13-slim AS base + +FROM base AS builder + +WORKDIR /builder + +COPY pyproject.toml . + +RUN mkdir /install && \ + pip install --prefix=/install . + +FROM base AS production + +WORKDIR /app + +ARG DOCKER_USER +USER ${DOCKER_USER} + +# Un-privileged user running the application +COPY --from=builder /install /usr/local + +COPY . . + +CMD ["python", "multi-user-transcriber.py", "start"] diff --git a/src/agents/multi-user-transcriber.py b/src/agents/multi-user-transcriber.py new file mode 100644 index 00000000..16fb78ff --- /dev/null +++ b/src/agents/multi-user-transcriber.py @@ -0,0 +1,189 @@ +"""Multi user transcription agent.""" + +import asyncio +import logging +import os + +from dotenv import load_dotenv +from livekit import api, rtc +from livekit.agents import ( + Agent, + AgentSession, + AutoSubscribe, + JobContext, + JobProcess, + JobRequest, + RoomInputOptions, + RoomIO, + RoomOutputOptions, + WorkerOptions, + WorkerPermissions, + cli, + utils, +) +from livekit.plugins import deepgram, silero + +load_dotenv() + +logger = logging.getLogger("transcriber") + +TRANSCRIBER_AGENT_NAME = os.getenv("TRANSCRIBER_AGENT_NAME", "multi-user-transcriber") + + +class Transcriber(Agent): + """Create a transcription agent for a specific participant.""" + + def __init__(self, *, participant_identity: str): + """Init transcription agent.""" + super().__init__( + instructions="not-needed", + stt=deepgram.STT(), + ) + self.participant_identity = participant_identity + + +class MultiUserTranscriber: + """Manage transcription sessions for multiple room participants.""" + + def __init__(self, ctx: JobContext): + """Init multi user transcription agent.""" + self.ctx = ctx + self._sessions: dict[str, AgentSession] = {} + self._tasks: set[asyncio.Task] = set() + + def start(self): + """Start listening for participant connection events.""" + self.ctx.room.on("participant_connected", self.on_participant_connected) + self.ctx.room.on("participant_disconnected", self.on_participant_disconnected) + + async def aclose(self): + """Close all sessions and cleanup resources.""" + await utils.aio.cancel_and_wait(*self._tasks) + + await asyncio.gather( + *[self._close_session(session) for session in self._sessions.values()] + ) + + self.ctx.room.off("participant_connected", self.on_participant_connected) + self.ctx.room.off("participant_disconnected", self.on_participant_disconnected) + + def on_participant_connected(self, participant: rtc.RemoteParticipant): + """Handle new participant connection by starting transcription session.""" + if participant.identity in self._sessions: + return + + logger.info(f"starting session for {participant.identity}") + task = asyncio.create_task(self._start_session(participant)) + self._tasks.add(task) + + def on_task_done(task: asyncio.Task): + try: + self._sessions[participant.identity] = task.result() + finally: + self._tasks.discard(task) + + task.add_done_callback(on_task_done) + + def on_participant_disconnected(self, participant: rtc.RemoteParticipant): + """Handle participant disconnection by closing transcription session.""" + if (session := self._sessions.pop(participant.identity)) is None: + return + + logger.info(f"closing session for {participant.identity}") + task = asyncio.create_task(self._close_session(session)) + self._tasks.add(task) + task.add_done_callback(lambda _: self._tasks.discard(task)) + + async def _start_session(self, participant: rtc.RemoteParticipant) -> AgentSession: + """Create and start transcription session for participant.""" + if participant.identity in self._sessions: + return self._sessions[participant.identity] + + session = AgentSession( + vad=self.ctx.proc.userdata["vad"], + ) + room_io = RoomIO( + agent_session=session, + room=self.ctx.room, + participant=participant, + input_options=RoomInputOptions( + text_enabled=False, + ), + output_options=RoomOutputOptions( + transcription_enabled=True, + audio_enabled=False, + ), + ) + await room_io.start() + await session.start( + agent=Transcriber( + participant_identity=participant.identity, + ) + ) + return session + + async def _close_session(self, sess: AgentSession) -> None: + """Close and cleanup transcription session.""" + await sess.drain() + await sess.aclose() + + +async def entrypoint(ctx: JobContext): + """Initialize and run the multi-user transcriber.""" + transcriber = MultiUserTranscriber(ctx) + transcriber.start() + + await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) + for participant in ctx.room.remote_participants.values(): + transcriber.on_participant_connected(participant) + + async def cleanup(): + await transcriber.aclose() + + ctx.add_shutdown_callback(cleanup) + + +async def handle_transcriber_job_request(job_req: JobRequest) -> None: + """Accept job if no transcriber exists in room, otherwise reject.""" + room_name = job_req.room.name + transcriber_id = f"{TRANSCRIBER_AGENT_NAME}-{room_name}" + + async with api.LiveKitAPI() as lkapi: + try: + response = await lkapi.room.list_participants( + list=api.ListParticipantsRequest(room=room_name) + ) + + transcriber_exists = any( + p.kind == rtc.ParticipantKind.PARTICIPANT_KIND_AGENT + and p.identity == transcriber_id + for p in response.participants + ) + + if transcriber_exists: + logger.info(f"Transcriber exists in {room_name} - rejecting") + await job_req.reject() + else: + logger.info(f"Accepting job for {room_name}") + await job_req.accept(identity=transcriber_id) + + except Exception: + logger.exception(f"Error processing job for {room_name}") + await job_req.reject() + + +def prewarm(proc: JobProcess): + """Preload voice activity detection model.""" + proc.userdata["vad"] = silero.VAD.load() + + +if __name__ == "__main__": + cli.run_app( + WorkerOptions( + entrypoint_fnc=entrypoint, + request_fnc=handle_transcriber_job_request, + prewarm_fnc=prewarm, + agent_name=TRANSCRIBER_AGENT_NAME, + permissions=WorkerPermissions(hidden=True), + ) + ) diff --git a/src/agents/pyproject.toml b/src/agents/pyproject.toml new file mode 100644 index 00000000..53022995 --- /dev/null +++ b/src/agents/pyproject.toml @@ -0,0 +1,51 @@ + +[project] +name = "agents" +version = "0.1.23" +requires-python = ">=3.12" +dependencies = [ + "livekit-agents==1.2.6", + "livekit-plugins-deepgram==1.2.6", + "livekit-plugins-silero==1.2.6", + "python-dotenv==1.1.1" +] + +[project.optional-dependencies] +dev = [ + "ruff==0.12.0", +] + +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[tool.ruff] +target-version = "py313" + +[tool.ruff.lint] +select = [ + "B", # flake8-bugbear + "C4", # flake8-comprehensions + "D", # pydocstyle + "E", # pycodestyle error + "F", # Pyflakes + "I", # Isort + "ISC", # flake8-implicit-str-concat + "PLC", # Pylint Convention + "PLE", # Pylint Error + "PLR", # Pylint Refactor + "PLW", # Pylint Warning + "RUF100", # Ruff unused-noqa + "S", # flake8-bandit + "T20", # flake8-print + "W", # pycodestyle warning +] + +[tool.ruff.lint.per-file-ignores] +"tests/*" = [ + "S101", # use of assert +] + +[tool.ruff.lint.pydocstyle] +# Use Google-style docstrings. +convention = "google"