Backend Architecture
Deep dive into the Syllabi backend service for advanced document processing.
Overview
The backend is an optional Python service that provides:
- 📄 Advanced PDF parsing
- 🎥 Video transcription
- 🎵 Audio transcription
- 🌐 URL crawling and extraction
- 📦 Background task processing
- 🔗 Third-party integrations (Notion, Google Drive)
Note: The frontend works standalone! The backend is only needed for these advanced features.
Tech Stack
- Framework: FastAPI (Python 3.10+)
- Task Queue: Celery
- Message Broker: Redis
- Database: Supabase (PostgreSQL)
- Document Parsing: PyMuPDF, pdfplumber
- Transcription: OpenAI Whisper API
- Embeddings: OpenAI text-embedding-3-small
Architecture Overview
Frontend Upload
↓
FastAPI Endpoint
↓
Celery Task (Background)
↓
┌─────────────────────────┐
│ 1. Download File │
│ 2. Parse/Extract │
│ 3. Chunk Text │
│ 4. Generate Embeddings │
│ 5. Store in Database │
└─────────────────────────┘
↓
Update Status in DB
↓
Frontend Polls for CompletionProject Structure
backend/
├── app/
│ ├── main.py # FastAPI app entry
│ ├── api/ # API routes
│ ├── core/ # Configuration
│ ├── crud/ # Database operations
│ ├── schemas/ # Pydantic models
│ ├── services/ # Business logic
│ └── worker/ # Celery tasks
├── tests/ # Test files
└── requirements.txt # DependenciesCore Components
1. FastAPI Application
Location: app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.api_v1.api import api_router
app = FastAPI(title="Syllabi Backend API")
# CORS for frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(api_router, prefix="/api/v1")2. API Endpoints
Location: app/api/api_v1/endpoints/
Document Upload (documents.py)
@router.post("/documents/upload")
async def upload_document(
file: UploadFile,
chatbot_id: str,
user_id: str
):
# 1. Save file temporarily
temp_path = f"/tmp/{file.filename}"
with open(temp_path, "wb") as f:
f.write(await file.read())
# 2. Create task record
task = await crud_task.create_task(
chatbot_id=chatbot_id,
file_path=temp_path,
status="pending"
)
# 3. Trigger Celery task
process_document.delay(task.id, temp_path)
return {"task_id": task.id, "status": "processing"}Multimedia Processing (multimedia.py)
@router.post("/multimedia/transcribe")
async def transcribe_media(
file_url: str,
chatbot_id: str,
content_type: str # 'video' or 'audio'
):
# Create task
task = await crud_task.create_task(
chatbot_id=chatbot_id,
file_url=file_url,
task_type="transcription"
)
# Trigger background task
if content_type == "video":
transcribe_video.delay(task.id, file_url)
else:
transcribe_audio.delay(task.id, file_url)
return {"task_id": task.id}URL Indexing (urls.py)
@router.post("/urls/index")
async def index_url(
url: str,
chatbot_id: str
):
# Create task
task = await crud_task.create_task(
chatbot_id=chatbot_id,
url=url,
task_type="url_indexing"
)
# Crawl and index
crawl_url.delay(task.id, url)
return {"task_id": task.id}3. Services Layer
PDF Parsing Service
Location: app/services/pdf_parsing_service.py
import fitz # PyMuPDF
import pdfplumber
class PDFParsingService:
def extract_text(self, pdf_path: str) -> list[dict]:
"""Extract text from PDF with page numbers."""
pages = []
# Try PyMuPDF first (faster)
try:
doc = fitz.open(pdf_path)
for page_num, page in enumerate(doc):
text = page.get_text()
pages.append({
"page_number": page_num + 1,
"text": text
})
doc.close()
return pages
except Exception:
pass
# Fallback to pdfplumber (more accurate)
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages):
text = page.extract_text()
pages.append({
"page_number": page_num + 1,
"text": text
})
return pagesChunking Service
Location: app/services/chunking_service.py
from typing import List
class ChunkingService:
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def chunk_text(self, text: str) -> List[str]:
"""Split text into overlapping chunks."""
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
chunk = text[start:end]
# Try to break at sentence boundary
if end < len(text):
last_period = chunk.rfind('.')
if last_period > self.chunk_size * 0.8:
end = start + last_period + 1
chunk = text[start:end]
chunks.append(chunk.strip())
start = end - self.chunk_overlap
return chunksEmbedding Service
Location: app/services/embedding_service.py
from openai import OpenAI
class EmbeddingService:
def __init__(self, api_key: str):
self.client = OpenAI(api_key=api_key)
self.model = "text-embedding-3-small"
async def generate_embedding(self, text: str) -> list[float]:
"""Generate embedding vector for text."""
response = self.client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
async def generate_batch_embeddings(
self,
texts: list[str]
) -> list[list[float]]:
"""Generate embeddings for multiple texts."""
response = self.client.embeddings.create(
model=self.model,
input=texts
)
return [item.embedding for item in response.data]Transcription Service
Location: app/services/transcription_service.py
from openai import OpenAI
import subprocess
import os
class TranscriptionService:
def __init__(self, api_key: str):
self.client = OpenAI(api_key=api_key)
def extract_audio_from_video(
self,
video_path: str
) -> str:
"""Extract audio using ffmpeg."""
audio_path = video_path.replace('.mp4', '.mp3')
command = [
'ffmpeg',
'-i', video_path,
'-vn', # No video
'-acodec', 'libmp3lame',
'-q:a', '2', # Quality
audio_path
]
subprocess.run(command, check=True)
return audio_path
async def transcribe_audio(
self,
audio_path: str
) -> dict:
"""Transcribe audio using Whisper API."""
with open(audio_path, 'rb') as audio_file:
transcript = self.client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="verbose_json",
timestamp_granularities=["segment"]
)
# Return with timestamps
return {
"text": transcript.text,
"segments": [
{
"text": seg.text,
"start": seg.start,
"end": seg.end
}
for seg in transcript.segments
]
}4. Celery Workers
Location: app/worker/
Celery Configuration
celery_app.py:
from celery import Celery
from app.core.config import settings
celery_app = Celery(
"syllabi_worker",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL
)
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)Document Processing Task
tasks_document.py:
from app.worker.celery_app import celery_app
from app.services.pdf_parsing_service import PDFParsingService
from app.services.chunking_service import ChunkingService
from app.services.embedding_service import EmbeddingService
@celery_app.task(bind=True)
def process_document(self, task_id: str, file_path: str):
try:
# 1. Update status
await crud_task.update_status(task_id, "processing")
# 2. Parse PDF
parser = PDFParsingService()
pages = parser.extract_text(file_path)
# 3. Chunk text
chunker = ChunkingService()
all_chunks = []
for page in pages:
chunks = chunker.chunk_text(page["text"])
for chunk in chunks:
all_chunks.append({
"text": chunk,
"page_number": page["page_number"]
})
# 4. Generate embeddings
embedder = EmbeddingService(settings.OPENAI_API_KEY)
texts = [c["text"] for c in all_chunks]
embeddings = await embedder.generate_batch_embeddings(texts)
# 5. Store in database
for chunk, embedding in zip(all_chunks, embeddings):
await crud_chunk.create(
chatbot_id=task.chatbot_id,
text=chunk["text"],
page_number=chunk["page_number"],
embedding=embedding
)
# 6. Mark complete
await crud_task.update_status(task_id, "completed")
except Exception as e:
await crud_task.update_status(
task_id,
"failed",
error=str(e)
)
raiseMultimedia Processing Task
tasks_multimedia.py:
@celery_app.task(bind=True)
def transcribe_video(self, task_id: str, video_url: str):
try:
# 1. Download video
video_path = download_file(video_url)
# 2. Extract audio
transcriber = TranscriptionService(settings.OPENAI_API_KEY)
audio_path = transcriber.extract_audio_from_video(video_path)
# 3. Transcribe
result = await transcriber.transcribe_audio(audio_path)
# 4. Chunk by segments
for segment in result["segments"]:
# Generate embedding
embedding = await embedder.generate_embedding(
segment["text"]
)
# Store chunk
await crud_chunk.create(
chatbot_id=task.chatbot_id,
text=segment["text"],
start_time=segment["start"],
end_time=segment["end"],
embedding=embedding,
content_type="video"
)
# 5. Cleanup
os.remove(video_path)
os.remove(audio_path)
await crud_task.update_status(task_id, "completed")
except Exception as e:
await crud_task.update_status(task_id, "failed", error=str(e))
raise5. Database Operations (CRUD)
Location: app/crud/crud_chunk.py
from app.core.supabase_client import get_supabase_client
class CRUDChunk:
def __init__(self):
self.supabase = get_supabase_client()
async def create(
self,
chatbot_id: str,
text: str,
embedding: list[float],
**kwargs
):
"""Create a new document chunk."""
data = {
"chatbot_id": chatbot_id,
"chunk_text": text,
"embedding": embedding,
**kwargs
}
result = self.supabase.table("document_chunks").insert(data).execute()
return result.data[0]
async def search_similar(
self,
chatbot_id: str,
query_embedding: list[float],
limit: int = 10
):
"""Find similar chunks using vector similarity."""
result = self.supabase.rpc(
"match_document_chunks",
{
"query_embedding": query_embedding,
"chatbot_id_param": chatbot_id,
"match_threshold": 0.7,
"match_count": limit
}
).execute()
return result.dataDeployment
Running with Docker
Dockerfile:
FROM python:3.10-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]Docker Compose
version: '3.8'
services:
backend:
build: .
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379/0
- SUPABASE_URL=${SUPABASE_URL}
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
celery-worker:
build: .
command: celery -A app.worker.celery_app worker --loglevel=info
environment:
- REDIS_URL=redis://redis:6379/0
- SUPABASE_URL=${SUPABASE_URL}
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
redis:
image: redis:alpine
ports:
- "6379:6379"Start Services
# Development
uvicorn app.main:app --reload
celery -A app.worker.celery_app worker --loglevel=info
# Docker
docker-compose upTesting
Unit Tests
import pytest
from app.services.chunking_service import ChunkingService
def test_chunking():
service = ChunkingService(chunk_size=100, chunk_overlap=20)
text = "A" * 250
chunks = service.chunk_text(text)
assert len(chunks) == 3 # Overlapping chunks
assert len(chunks[0]) == 100Integration Tests
@pytest.mark.asyncio
async def test_document_processing():
# Upload test PDF
response = await client.post(
"/api/v1/documents/upload",
files={"file": open("test.pdf", "rb")},
data={"chatbot_id": "test-id"}
)
task_id = response.json()["task_id"]
# Wait for processing
await asyncio.sleep(5)
# Check status
status = await crud_task.get_status(task_id)
assert status == "completed"Performance Considerations
1. Batch Processing
Process multiple embeddings in one API call to reduce latency.
2. Caching
Cache frequently accessed data in Redis.
3. Rate Limiting
Respect OpenAI API rate limits (3,500 requests/min for embeddings).
4. Chunking Strategy
- Documents: 1000 tokens, 200 overlap
- Audio/Video: By natural segments
- URLs: By sections/paragraphs