💩(summary) vendor a first draft

This is written in a rush, boostrap the real logic with celery
worker to process meeting recording.

Lack of unit tests is critical, I am not proud of a it. I am
totally in a hurry for the demo. Not sure we will actually keep
this microservice.
This commit is contained in:
lebaudantoine
2024-11-22 11:28:08 +01:00
committed by aleb_the_flash
parent 4a53005ae3
commit e92f084afb
7 changed files with 285 additions and 18 deletions

View File

@@ -1,4 +1,8 @@
services:
redis:
image: redis
ports:
- "6379:6379"
app:
container_name: app
build: .
@@ -10,3 +14,14 @@ services:
restart: always
env_file:
".env"
depends_on:
- redis
celery_worker:
container_name: celery_worker
build: .
command: celery -A summary.celery_worker worker --pool=solo --loglevel=debug
volumes:
- .:/app
depends_on:
- redis
- app

View File

@@ -7,6 +7,11 @@ dependencies = [
"uvicorn>=0.24.0",
"pydantic>=2.5.0",
"pydantic-settings>=2.1.0",
"celery==5.4.0",
"redis==4.5.4",
"minio==7.2.9",
"openai==1.51.2",
"requests==2.32.3",
]
[project.optional-dependencies]

View File

@@ -0,0 +1,126 @@
"""Celery workers."""
import json
import tempfile
from pathlib import Path
import openai
from celery import Celery
from celery.utils.log import get_task_logger
from minio import Minio
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from .config import Settings
from .prompt import get_instructions
settings = Settings()
logger = get_task_logger(__name__)
celery = Celery(
__name__,
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
broker_connection_retry_on_startup=True,
)
def save_audio_stream(audio_stream, chunk_size=32 * 1024):
"""Save an audio stream to a temporary OGG file."""
with tempfile.NamedTemporaryFile(suffix=".ogg", delete=False) as tmp:
tmp.writelines(audio_stream.stream(chunk_size))
return Path(tmp.name)
def create_retry_session():
"""Create an HTTP session configured with retry logic."""
session = Session()
retries = Retry(
total=settings.webhook_max_retries,
backoff_factor=settings.webhook_backoff_factor,
status_forcelist=settings.webhook_status_forcelist,
allowed_methods={"POST"},
)
session.mount("https://", HTTPAdapter(max_retries=retries))
return session
def post_with_retries(url, data):
"""Send POST request with automatic retries."""
session = create_retry_session()
session.headers.update({"Authorization": f"Bearer {settings.webhook_api_token}"})
try:
response = session.post(url, json=data)
response.raise_for_status()
return response
finally:
session.close()
@celery.task(max_retries=1)
def process_audio_transcribe_summarize(filename: str, email: str, sub: str):
"""Process an audio file by transcribing it and generating a summary.
This Celery task performs the following operations:
1. Retrieves the audio file from MinIO storage
2. Transcribes the audio using OpenAI-compliant API's ASR model
3. Generates a summary of the transcription using OpenAI-compliant API's LLM
4. Sends the results via webhook
"""
logger.info("Notification received")
logger.debug("filename: %s", filename)
minio_client = Minio(
settings.minio_url,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
)
logger.debug("Connection to the Minio bucket successful")
audio_file_stream = minio_client.get_object(
settings.minio_bucket, object_name=filename
)
temp_file_path = save_audio_stream(audio_file_stream)
logger.debug("Recording successfully downloaded, filepath: %s", temp_file_path)
logger.debug("Initiating OpenAI client")
openai_client = openai.OpenAI(
api_key=settings.openai_api_key, base_url=settings.openai_base_url
)
logger.debug("Querying transcription …")
with open(temp_file_path, "rb") as audio_file:
transcription = openai_client.audio.transcriptions.create(
model=settings.openai_asr_model, file=audio_file
)
transcription = transcription.text
logger.debug("Transcription: \n %s", transcription)
instructions = get_instructions(transcription)
summary_response = openai_client.chat.completions.create(
model=settings.openai_llm_model, messages=instructions
)
summary = summary_response.choices[0].message.content
logger.debug("Summary: \n %s", summary)
data = {
"summary": summary,
"email": email,
"sub": sub,
}
logger.debug("Submitting webhook to %s", settings.webhook_url)
logger.debug("Request payload: %s", json.dumps(data, indent=2))
response = post_with_retries(settings.webhook_url, data)
logger.info("Webhook submitted successfully. Status: %s", response.status_code)
logger.debug("Response body: %s", response.text)

View File

@@ -1,5 +1,9 @@
"""Module for managing application configuration and settings."""
"""Application configuration and settings."""
from functools import lru_cache
from typing import Annotated
from fastapi import Depends
from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -8,3 +12,36 @@ class Settings(BaseSettings):
app_name: str = "Awesome API"
model_config = SettingsConfigDict(env_file=".env")
app_api_token: str
# Celery settings
celery_broker_url: str = "redis://redis/0"
celery_result_backend: str = "redis://redis/0"
# Minio settings
minio_bucket: str
minio_url: str
minio_access_key: str
minio_secret_key: str
# AI-related settings
openai_api_key: str
openai_base_url: str = "https://api.openai.com/v1"
openai_asr_model: str = "whisper-1"
openai_llm_model: str = "gpt-4o"
# Webhook-related settings
webhook_max_retries: int = 2
webhook_status_forcelist: list[int] = [502, 503, 504]
webhook_backoff_factor: float = 0.1
webhook_api_token: str
webhook_url: str
@lru_cache
def get_settings():
"""Load and cache application settings."""
return Settings()
SettingsDeps = Annotated[Settings, Depends(get_settings)]

View File

@@ -1,26 +1,15 @@
"""Application entry point."""
"""Application endpoint."""
from functools import lru_cache
from typing import Annotated
from config import Settings
from celery.result import AsyncResult
from fastapi import Depends, FastAPI
from pydantic import BaseModel
from .celery_worker import process_audio_transcribe_summarize
from .security import verify_token
app = FastAPI()
@lru_cache
def get_settings():
"""Load and cache application settings."""
return Settings()
@app.get("/")
async def root(settings: Annotated[Settings, Depends(get_settings)]):
"""Root endpoint that returns app name."""
return {"message": f"Hello World, using {settings.app_name}"}
@app.get("/__heartbeat__")
async def heartbeat():
"""Health check endpoint for monitoring."""
@@ -31,3 +20,27 @@ async def heartbeat():
async def lbheartbeat():
"""Health check endpoint for load balancer."""
return {"status": "ok"}
class NotificationRequest(BaseModel):
"""Notification data."""
filename: str
email: str
sub: str
@app.post("/push")
async def notify(request: NotificationRequest, token: str = Depends(verify_token)):
"""Push a notification."""
task = process_audio_transcribe_summarize.delay(
request.filename, request.email, request.sub
)
return {"task_id": task.id, "message": "Notification sent"}
@app.get("/status/{task_id}")
async def get_status(task_id: str, token: str = Depends(verify_token)):
"""Check task status by ID."""
task = AsyncResult(task_id)
return {"task_id": task_id, "status": task.status}

View File

@@ -0,0 +1,52 @@
# ruff: noqa
def get_instructions(transcript):
"""Declare the summarize instructions."""
prompt = f"""
Audience: Coworkers.
**Do:**
- Detect the language of the transcript and provide your entire response in the same language.
- If any part of the transcript is unclear or lacks detail, politely inform the user, specifying which areas need further clarification.
- Ensure the accuracy of all information and refrain from adding unverified details.
- Format the response using proper markdown and structured sections.
- Be concise and avoid repeating yourself between the sections.
- Be super precise on nickname
- Be a nit-picker
- Auto-evaluate your response
**Don't:**
- Write something your are not sure.
- Write something that is not mention in the transcript.
- Don't make mistake while mentioning someone
**Task:**
Summarize the provided meeting transcript into clear and well-organized meeting minutes. The summary should be structured into the following sections, excluding irrelevant or inapplicable details:
1. **Summary**: Write a TL;DR of the meeting.
2. **Subjects Discussed**: List the key points or issues in bullet points.
4. **Next Steps**: Provide action items as bullet points, assigning each task to a responsible individual and including deadlines (if mentioned). Format action items as tickable checkboxes. Ensure every action is assigned and, if a deadline is provided, that it is clearly stated.
**Transcript**:
{transcript}
**Response:**
### Summary [Translate this title based on the transcripts language]
[Provide a brief overview of the key points discussed]
### Subjects Discussed [Translate this title based on the transcripts language]
- [Summarize each topic concisely]
### Next Steps [Translate this title based on the transcripts language]
- [ ] Action item [Assign to the responsible individual(s) and include a deadline if applicable, follow this strict format: Action - List of owner(s), deadline.]
"""
return [
{
"role": "system",
"content": "You are a concise and structured assistant, that summarizes meeting transcripts.",
},
{"role": "user", "content": prompt},
]

View File

@@ -0,0 +1,19 @@
"""Application security."""
from fastapi import HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from .config import SettingsDeps
security = HTTPBearer()
def verify_token(
settings: SettingsDeps,
credentials: HTTPAuthorizationCredentials = Security(security), # noqa: B008
):
"""Verify the bearer token from the Authorization header."""
token = credentials.credentials
if token != settings.app_api_token:
raise HTTPException(status_code=401, detail="Invalid token")
return token