From e92f084afbdcea9fbe205203bdbfc3362d96ea86 Mon Sep 17 00:00:00 2001 From: lebaudantoine Date: Fri, 22 Nov 2024 11:28:08 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=92=A9(summary)=20vendor=20a=20first=20dr?= =?UTF-8?q?aft?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is written in a rush, boostrap the real logic with celery worker to process meeting recording. Lack of unit tests is critical, I am not proud of a it. I am totally in a hurry for the demo. Not sure we will actually keep this microservice. --- src/summary/compose.yaml | 15 ++++ src/summary/pyproject.toml | 5 ++ src/summary/summary/celery_worker.py | 126 +++++++++++++++++++++++++++ src/summary/summary/config.py | 39 ++++++++- src/summary/summary/main.py | 47 ++++++---- src/summary/summary/prompt.py | 52 +++++++++++ src/summary/summary/security.py | 19 ++++ 7 files changed, 285 insertions(+), 18 deletions(-) create mode 100644 src/summary/summary/celery_worker.py create mode 100644 src/summary/summary/prompt.py create mode 100644 src/summary/summary/security.py diff --git a/src/summary/compose.yaml b/src/summary/compose.yaml index 99470401..fe529b9e 100644 --- a/src/summary/compose.yaml +++ b/src/summary/compose.yaml @@ -1,4 +1,8 @@ services: + redis: + image: redis + ports: + - "6379:6379" app: container_name: app build: . @@ -10,3 +14,14 @@ services: restart: always env_file: ".env" + depends_on: + - redis + celery_worker: + container_name: celery_worker + build: . + command: celery -A summary.celery_worker worker --pool=solo --loglevel=debug + volumes: + - .:/app + depends_on: + - redis + - app diff --git a/src/summary/pyproject.toml b/src/summary/pyproject.toml index 19cf3fd8..5cd37e08 100644 --- a/src/summary/pyproject.toml +++ b/src/summary/pyproject.toml @@ -7,6 +7,11 @@ dependencies = [ "uvicorn>=0.24.0", "pydantic>=2.5.0", "pydantic-settings>=2.1.0", + "celery==5.4.0", + "redis==4.5.4", + "minio==7.2.9", + "openai==1.51.2", + "requests==2.32.3", ] [project.optional-dependencies] diff --git a/src/summary/summary/celery_worker.py b/src/summary/summary/celery_worker.py new file mode 100644 index 00000000..cdf44f07 --- /dev/null +++ b/src/summary/summary/celery_worker.py @@ -0,0 +1,126 @@ +"""Celery workers.""" + +import json +import tempfile +from pathlib import Path + +import openai +from celery import Celery +from celery.utils.log import get_task_logger +from minio import Minio +from requests import Session +from requests.adapters import HTTPAdapter +from urllib3.util import Retry + +from .config import Settings +from .prompt import get_instructions + +settings = Settings() + + +logger = get_task_logger(__name__) + +celery = Celery( + __name__, + broker=settings.celery_broker_url, + backend=settings.celery_result_backend, + broker_connection_retry_on_startup=True, +) + + +def save_audio_stream(audio_stream, chunk_size=32 * 1024): + """Save an audio stream to a temporary OGG file.""" + with tempfile.NamedTemporaryFile(suffix=".ogg", delete=False) as tmp: + tmp.writelines(audio_stream.stream(chunk_size)) + return Path(tmp.name) + + +def create_retry_session(): + """Create an HTTP session configured with retry logic.""" + session = Session() + retries = Retry( + total=settings.webhook_max_retries, + backoff_factor=settings.webhook_backoff_factor, + status_forcelist=settings.webhook_status_forcelist, + allowed_methods={"POST"}, + ) + session.mount("https://", HTTPAdapter(max_retries=retries)) + return session + + +def post_with_retries(url, data): + """Send POST request with automatic retries.""" + session = create_retry_session() + session.headers.update({"Authorization": f"Bearer {settings.webhook_api_token}"}) + try: + response = session.post(url, json=data) + response.raise_for_status() + return response + finally: + session.close() + + +@celery.task(max_retries=1) +def process_audio_transcribe_summarize(filename: str, email: str, sub: str): + """Process an audio file by transcribing it and generating a summary. + + This Celery task performs the following operations: + 1. Retrieves the audio file from MinIO storage + 2. Transcribes the audio using OpenAI-compliant API's ASR model + 3. Generates a summary of the transcription using OpenAI-compliant API's LLM + 4. Sends the results via webhook + """ + logger.info("Notification received") + logger.debug("filename: %s", filename) + + minio_client = Minio( + settings.minio_url, + access_key=settings.minio_access_key, + secret_key=settings.minio_secret_key, + ) + + logger.debug("Connection to the Minio bucket successful") + + audio_file_stream = minio_client.get_object( + settings.minio_bucket, object_name=filename + ) + + temp_file_path = save_audio_stream(audio_file_stream) + logger.debug("Recording successfully downloaded, filepath: %s", temp_file_path) + + logger.debug("Initiating OpenAI client") + openai_client = openai.OpenAI( + api_key=settings.openai_api_key, base_url=settings.openai_base_url + ) + + logger.debug("Querying transcription …") + with open(temp_file_path, "rb") as audio_file: + transcription = openai_client.audio.transcriptions.create( + model=settings.openai_asr_model, file=audio_file + ) + + transcription = transcription.text + + logger.debug("Transcription: \n %s", transcription) + + instructions = get_instructions(transcription) + summary_response = openai_client.chat.completions.create( + model=settings.openai_llm_model, messages=instructions + ) + + summary = summary_response.choices[0].message.content + logger.debug("Summary: \n %s", summary) + + data = { + "summary": summary, + "email": email, + "sub": sub, + } + + logger.debug("Submitting webhook to %s", settings.webhook_url) + logger.debug("Request payload: %s", json.dumps(data, indent=2)) + + response = post_with_retries(settings.webhook_url, data) + + logger.info("Webhook submitted successfully. Status: %s", response.status_code) + logger.debug("Response body: %s", response.text) diff --git a/src/summary/summary/config.py b/src/summary/summary/config.py index cacb93d9..a907ca0b 100644 --- a/src/summary/summary/config.py +++ b/src/summary/summary/config.py @@ -1,5 +1,9 @@ -"""Module for managing application configuration and settings.""" +"""Application configuration and settings.""" +from functools import lru_cache +from typing import Annotated + +from fastapi import Depends from pydantic_settings import BaseSettings, SettingsConfigDict @@ -8,3 +12,36 @@ class Settings(BaseSettings): app_name: str = "Awesome API" model_config = SettingsConfigDict(env_file=".env") + app_api_token: str + + # Celery settings + celery_broker_url: str = "redis://redis/0" + celery_result_backend: str = "redis://redis/0" + + # Minio settings + minio_bucket: str + minio_url: str + minio_access_key: str + minio_secret_key: str + + # AI-related settings + openai_api_key: str + openai_base_url: str = "https://api.openai.com/v1" + openai_asr_model: str = "whisper-1" + openai_llm_model: str = "gpt-4o" + + # Webhook-related settings + webhook_max_retries: int = 2 + webhook_status_forcelist: list[int] = [502, 503, 504] + webhook_backoff_factor: float = 0.1 + webhook_api_token: str + webhook_url: str + + +@lru_cache +def get_settings(): + """Load and cache application settings.""" + return Settings() + + +SettingsDeps = Annotated[Settings, Depends(get_settings)] diff --git a/src/summary/summary/main.py b/src/summary/summary/main.py index 98743299..3673796e 100644 --- a/src/summary/summary/main.py +++ b/src/summary/summary/main.py @@ -1,26 +1,15 @@ -"""Application entry point.""" +"""Application endpoint.""" -from functools import lru_cache -from typing import Annotated - -from config import Settings +from celery.result import AsyncResult from fastapi import Depends, FastAPI +from pydantic import BaseModel + +from .celery_worker import process_audio_transcribe_summarize +from .security import verify_token app = FastAPI() -@lru_cache -def get_settings(): - """Load and cache application settings.""" - return Settings() - - -@app.get("/") -async def root(settings: Annotated[Settings, Depends(get_settings)]): - """Root endpoint that returns app name.""" - return {"message": f"Hello World, using {settings.app_name}"} - - @app.get("/__heartbeat__") async def heartbeat(): """Health check endpoint for monitoring.""" @@ -31,3 +20,27 @@ async def heartbeat(): async def lbheartbeat(): """Health check endpoint for load balancer.""" return {"status": "ok"} + + +class NotificationRequest(BaseModel): + """Notification data.""" + + filename: str + email: str + sub: str + + +@app.post("/push") +async def notify(request: NotificationRequest, token: str = Depends(verify_token)): + """Push a notification.""" + task = process_audio_transcribe_summarize.delay( + request.filename, request.email, request.sub + ) + return {"task_id": task.id, "message": "Notification sent"} + + +@app.get("/status/{task_id}") +async def get_status(task_id: str, token: str = Depends(verify_token)): + """Check task status by ID.""" + task = AsyncResult(task_id) + return {"task_id": task_id, "status": task.status} diff --git a/src/summary/summary/prompt.py b/src/summary/summary/prompt.py new file mode 100644 index 00000000..463c9ac5 --- /dev/null +++ b/src/summary/summary/prompt.py @@ -0,0 +1,52 @@ +# ruff: noqa + + +def get_instructions(transcript): + """Declare the summarize instructions.""" + prompt = f""" + Audience: Coworkers. + + **Do:** + - Detect the language of the transcript and provide your entire response in the same language. + - If any part of the transcript is unclear or lacks detail, politely inform the user, specifying which areas need further clarification. + - Ensure the accuracy of all information and refrain from adding unverified details. + - Format the response using proper markdown and structured sections. + - Be concise and avoid repeating yourself between the sections. + - Be super precise on nickname + - Be a nit-picker + - Auto-evaluate your response + + **Don't:** + - Write something your are not sure. + - Write something that is not mention in the transcript. + - Don't make mistake while mentioning someone + **Task:** + Summarize the provided meeting transcript into clear and well-organized meeting minutes. The summary should be structured into the following sections, excluding irrelevant or inapplicable details: + + 1. **Summary**: Write a TL;DR of the meeting. + 2. **Subjects Discussed**: List the key points or issues in bullet points. + 4. **Next Steps**: Provide action items as bullet points, assigning each task to a responsible individual and including deadlines (if mentioned). Format action items as tickable checkboxes. Ensure every action is assigned and, if a deadline is provided, that it is clearly stated. + + **Transcript**: + {transcript} + + **Response:** + + ### Summary [Translate this title based on the transcript’s language] + [Provide a brief overview of the key points discussed] + + ### Subjects Discussed [Translate this title based on the transcript’s language] + - [Summarize each topic concisely] + + ### Next Steps [Translate this title based on the transcript’s language] + - [ ] Action item [Assign to the responsible individual(s) and include a deadline if applicable, follow this strict format: Action - List of owner(s), deadline.] + + """ + + return [ + { + "role": "system", + "content": "You are a concise and structured assistant, that summarizes meeting transcripts.", + }, + {"role": "user", "content": prompt}, + ] diff --git a/src/summary/summary/security.py b/src/summary/summary/security.py new file mode 100644 index 00000000..d8503e91 --- /dev/null +++ b/src/summary/summary/security.py @@ -0,0 +1,19 @@ +"""Application security.""" + +from fastapi import HTTPException, Security +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer + +from .config import SettingsDeps + +security = HTTPBearer() + + +def verify_token( + settings: SettingsDeps, + credentials: HTTPAuthorizationCredentials = Security(security), # noqa: B008 +): + """Verify the bearer token from the Authorization header.""" + token = credentials.credentials + if token != settings.app_api_token: + raise HTTPException(status_code=401, detail="Invalid token") + return token