docs
Development
Backend Architecture

Backend Architecture

Deep dive into the Syllabi backend service for advanced document processing.

Overview

The backend is an optional Python service that provides:

  • 📄 Advanced PDF parsing
  • 🎥 Video transcription
  • 🎵 Audio transcription
  • 🌐 URL crawling and extraction
  • 📦 Background task processing
  • 🔗 Third-party integrations (Notion, Google Drive)

Note: The frontend works standalone! The backend is only needed for these advanced features.

Tech Stack

  • Framework: FastAPI (Python 3.10+)
  • Task Queue: Celery
  • Message Broker: Redis
  • Database: Supabase (PostgreSQL)
  • Document Parsing: PyMuPDF, pdfplumber
  • Transcription: OpenAI Whisper API
  • Embeddings: OpenAI text-embedding-3-small

Architecture Overview

Frontend Upload

FastAPI Endpoint

Celery Task (Background)

┌─────────────────────────┐
│ 1. Download File        │
│ 2. Parse/Extract        │
│ 3. Chunk Text           │
│ 4. Generate Embeddings  │
│ 5. Store in Database    │
└─────────────────────────┘

Update Status in DB

Frontend Polls for Completion

Project Structure

backend/
├── app/
│   ├── main.py                 # FastAPI app entry
│   ├── api/                    # API routes
│   ├── core/                   # Configuration
│   ├── crud/                   # Database operations
│   ├── schemas/                # Pydantic models
│   ├── services/               # Business logic
│   └── worker/                 # Celery tasks
├── tests/                      # Test files
└── requirements.txt            # Dependencies

Core Components

1. FastAPI Application

Location: app/main.py

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.api_v1.api import api_router
 
app = FastAPI(title="Syllabi Backend API")
 
# CORS for frontend
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
 
# Include routers
app.include_router(api_router, prefix="/api/v1")

2. API Endpoints

Location: app/api/api_v1/endpoints/

Document Upload (documents.py)

@router.post("/documents/upload")
async def upload_document(
    file: UploadFile,
    chatbot_id: str,
    user_id: str
):
    # 1. Save file temporarily
    temp_path = f"/tmp/{file.filename}"
    with open(temp_path, "wb") as f:
        f.write(await file.read())
 
    # 2. Create task record
    task = await crud_task.create_task(
        chatbot_id=chatbot_id,
        file_path=temp_path,
        status="pending"
    )
 
    # 3. Trigger Celery task
    process_document.delay(task.id, temp_path)
 
    return {"task_id": task.id, "status": "processing"}

Multimedia Processing (multimedia.py)

@router.post("/multimedia/transcribe")
async def transcribe_media(
    file_url: str,
    chatbot_id: str,
    content_type: str  # 'video' or 'audio'
):
    # Create task
    task = await crud_task.create_task(
        chatbot_id=chatbot_id,
        file_url=file_url,
        task_type="transcription"
    )
 
    # Trigger background task
    if content_type == "video":
        transcribe_video.delay(task.id, file_url)
    else:
        transcribe_audio.delay(task.id, file_url)
 
    return {"task_id": task.id}

URL Indexing (urls.py)

@router.post("/urls/index")
async def index_url(
    url: str,
    chatbot_id: str
):
    # Create task
    task = await crud_task.create_task(
        chatbot_id=chatbot_id,
        url=url,
        task_type="url_indexing"
    )
 
    # Crawl and index
    crawl_url.delay(task.id, url)
 
    return {"task_id": task.id}

3. Services Layer

PDF Parsing Service

Location: app/services/pdf_parsing_service.py

import fitz  # PyMuPDF
import pdfplumber
 
class PDFParsingService:
    def extract_text(self, pdf_path: str) -> list[dict]:
        """Extract text from PDF with page numbers."""
        pages = []
 
        # Try PyMuPDF first (faster)
        try:
            doc = fitz.open(pdf_path)
            for page_num, page in enumerate(doc):
                text = page.get_text()
                pages.append({
                    "page_number": page_num + 1,
                    "text": text
                })
            doc.close()
            return pages
        except Exception:
            pass
 
        # Fallback to pdfplumber (more accurate)
        with pdfplumber.open(pdf_path) as pdf:
            for page_num, page in enumerate(pdf.pages):
                text = page.extract_text()
                pages.append({
                    "page_number": page_num + 1,
                    "text": text
                })
 
        return pages

Chunking Service

Location: app/services/chunking_service.py

from typing import List
 
class ChunkingService:
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
 
    def chunk_text(self, text: str) -> List[str]:
        """Split text into overlapping chunks."""
        chunks = []
        start = 0
 
        while start < len(text):
            end = start + self.chunk_size
            chunk = text[start:end]
 
            # Try to break at sentence boundary
            if end < len(text):
                last_period = chunk.rfind('.')
                if last_period > self.chunk_size * 0.8:
                    end = start + last_period + 1
                    chunk = text[start:end]
 
            chunks.append(chunk.strip())
            start = end - self.chunk_overlap
 
        return chunks

Embedding Service

Location: app/services/embedding_service.py

from openai import OpenAI
 
class EmbeddingService:
    def __init__(self, api_key: str):
        self.client = OpenAI(api_key=api_key)
        self.model = "text-embedding-3-small"
 
    async def generate_embedding(self, text: str) -> list[float]:
        """Generate embedding vector for text."""
        response = self.client.embeddings.create(
            model=self.model,
            input=text
        )
        return response.data[0].embedding
 
    async def generate_batch_embeddings(
        self,
        texts: list[str]
    ) -> list[list[float]]:
        """Generate embeddings for multiple texts."""
        response = self.client.embeddings.create(
            model=self.model,
            input=texts
        )
        return [item.embedding for item in response.data]

Transcription Service

Location: app/services/transcription_service.py

from openai import OpenAI
import subprocess
import os
 
class TranscriptionService:
    def __init__(self, api_key: str):
        self.client = OpenAI(api_key=api_key)
 
    def extract_audio_from_video(
        self,
        video_path: str
    ) -> str:
        """Extract audio using ffmpeg."""
        audio_path = video_path.replace('.mp4', '.mp3')
 
        command = [
            'ffmpeg',
            '-i', video_path,
            '-vn',  # No video
            '-acodec', 'libmp3lame',
            '-q:a', '2',  # Quality
            audio_path
        ]
 
        subprocess.run(command, check=True)
        return audio_path
 
    async def transcribe_audio(
        self,
        audio_path: str
    ) -> dict:
        """Transcribe audio using Whisper API."""
        with open(audio_path, 'rb') as audio_file:
            transcript = self.client.audio.transcriptions.create(
                model="whisper-1",
                file=audio_file,
                response_format="verbose_json",
                timestamp_granularities=["segment"]
            )
 
        # Return with timestamps
        return {
            "text": transcript.text,
            "segments": [
                {
                    "text": seg.text,
                    "start": seg.start,
                    "end": seg.end
                }
                for seg in transcript.segments
            ]
        }

4. Celery Workers

Location: app/worker/

Celery Configuration

celery_app.py:

from celery import Celery
from app.core.config import settings
 
celery_app = Celery(
    "syllabi_worker",
    broker=settings.REDIS_URL,
    backend=settings.REDIS_URL
)
 
celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

Document Processing Task

tasks_document.py:

from app.worker.celery_app import celery_app
from app.services.pdf_parsing_service import PDFParsingService
from app.services.chunking_service import ChunkingService
from app.services.embedding_service import EmbeddingService
 
@celery_app.task(bind=True)
def process_document(self, task_id: str, file_path: str):
    try:
        # 1. Update status
        await crud_task.update_status(task_id, "processing")
 
        # 2. Parse PDF
        parser = PDFParsingService()
        pages = parser.extract_text(file_path)
 
        # 3. Chunk text
        chunker = ChunkingService()
        all_chunks = []
        for page in pages:
            chunks = chunker.chunk_text(page["text"])
            for chunk in chunks:
                all_chunks.append({
                    "text": chunk,
                    "page_number": page["page_number"]
                })
 
        # 4. Generate embeddings
        embedder = EmbeddingService(settings.OPENAI_API_KEY)
        texts = [c["text"] for c in all_chunks]
        embeddings = await embedder.generate_batch_embeddings(texts)
 
        # 5. Store in database
        for chunk, embedding in zip(all_chunks, embeddings):
            await crud_chunk.create(
                chatbot_id=task.chatbot_id,
                text=chunk["text"],
                page_number=chunk["page_number"],
                embedding=embedding
            )
 
        # 6. Mark complete
        await crud_task.update_status(task_id, "completed")
 
    except Exception as e:
        await crud_task.update_status(
            task_id,
            "failed",
            error=str(e)
        )
        raise

Multimedia Processing Task

tasks_multimedia.py:

@celery_app.task(bind=True)
def transcribe_video(self, task_id: str, video_url: str):
    try:
        # 1. Download video
        video_path = download_file(video_url)
 
        # 2. Extract audio
        transcriber = TranscriptionService(settings.OPENAI_API_KEY)
        audio_path = transcriber.extract_audio_from_video(video_path)
 
        # 3. Transcribe
        result = await transcriber.transcribe_audio(audio_path)
 
        # 4. Chunk by segments
        for segment in result["segments"]:
            # Generate embedding
            embedding = await embedder.generate_embedding(
                segment["text"]
            )
 
            # Store chunk
            await crud_chunk.create(
                chatbot_id=task.chatbot_id,
                text=segment["text"],
                start_time=segment["start"],
                end_time=segment["end"],
                embedding=embedding,
                content_type="video"
            )
 
        # 5. Cleanup
        os.remove(video_path)
        os.remove(audio_path)
 
        await crud_task.update_status(task_id, "completed")
 
    except Exception as e:
        await crud_task.update_status(task_id, "failed", error=str(e))
        raise

5. Database Operations (CRUD)

Location: app/crud/crud_chunk.py

from app.core.supabase_client import get_supabase_client
 
class CRUDChunk:
    def __init__(self):
        self.supabase = get_supabase_client()
 
    async def create(
        self,
        chatbot_id: str,
        text: str,
        embedding: list[float],
        **kwargs
    ):
        """Create a new document chunk."""
        data = {
            "chatbot_id": chatbot_id,
            "chunk_text": text,
            "embedding": embedding,
            **kwargs
        }
 
        result = self.supabase.table("document_chunks").insert(data).execute()
        return result.data[0]
 
    async def search_similar(
        self,
        chatbot_id: str,
        query_embedding: list[float],
        limit: int = 10
    ):
        """Find similar chunks using vector similarity."""
        result = self.supabase.rpc(
            "match_document_chunks",
            {
                "query_embedding": query_embedding,
                "chatbot_id_param": chatbot_id,
                "match_threshold": 0.7,
                "match_count": limit
            }
        ).execute()
 
        return result.data

Deployment

Running with Docker

Dockerfile:

FROM python:3.10-slim
 
WORKDIR /app
 
# Install system dependencies
RUN apt-get update && apt-get install -y \
    ffmpeg \
    && rm -rf /var/lib/apt/lists/*
 
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
# Copy application
COPY . .
 
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose

version: '3.8'
 
services:
  backend:
    build: .
    ports:
      - "8000:8000"
    environment:
      - REDIS_URL=redis://redis:6379/0
      - SUPABASE_URL=${SUPABASE_URL}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - redis
 
  celery-worker:
    build: .
    command: celery -A app.worker.celery_app worker --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379/0
      - SUPABASE_URL=${SUPABASE_URL}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - redis
 
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

Start Services

# Development
uvicorn app.main:app --reload
celery -A app.worker.celery_app worker --loglevel=info
 
# Docker
docker-compose up

Testing

Unit Tests

import pytest
from app.services.chunking_service import ChunkingService
 
def test_chunking():
    service = ChunkingService(chunk_size=100, chunk_overlap=20)
    text = "A" * 250
 
    chunks = service.chunk_text(text)
 
    assert len(chunks) == 3  # Overlapping chunks
    assert len(chunks[0]) == 100

Integration Tests

@pytest.mark.asyncio
async def test_document_processing():
    # Upload test PDF
    response = await client.post(
        "/api/v1/documents/upload",
        files={"file": open("test.pdf", "rb")},
        data={"chatbot_id": "test-id"}
    )
 
    task_id = response.json()["task_id"]
 
    # Wait for processing
    await asyncio.sleep(5)
 
    # Check status
    status = await crud_task.get_status(task_id)
    assert status == "completed"

Performance Considerations

1. Batch Processing

Process multiple embeddings in one API call to reduce latency.

2. Caching

Cache frequently accessed data in Redis.

3. Rate Limiting

Respect OpenAI API rate limits (3,500 requests/min for embeddings).

4. Chunking Strategy

  • Documents: 1000 tokens, 200 overlap
  • Audio/Video: By natural segments
  • URLs: By sections/paragraphs

Next Steps