RAG System Design Document: Markdown-Based Knowledge Retrieval¶
Note on Implementation Status: This document represents the architectural vision for the RAG system. The current implementation uses ChromaDB for vector storage rather than Milvus, which is planned for future versions. Features marked with "π" are planned for future implementation.
Executive Summary¶
This document outlines a comprehensive design for a retrieval-augmented generation (RAG) system that processes markdown documents from a monitored directory, creates vector embeddings, stores them in a vector database, and leverages LangChain to create a pipeline that connects with Ollama-hosted LLMs for intelligent querying and response generation.
The design emphasizes: - β Real-time document processing with change detection - β State-of-the-art embedding models for optimal semantic understanding - π Horizontally scalable vector storage with Milvus (future enhancement, currently using ChromaDB) - β Modular pipeline architecture for extensibility - π Comprehensive evaluation metrics for continuous improvement
1. System Architecture Overview¶
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β β β β β β β β
β Document β β Embedding β β Vector β β Response β
β Processing βββββΆβ Generation βββββΆβ Store βββββΆβ Generation β
β β β β β (ChromaDB) β β β
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β β² β
β β β
β β β
β ββββββββββββββββ β β
β β β β β
ββββββββββββββββββΆβ Metadata ββββββββ β
β (Stored β β
β in ChromaDB)β β
ββββββββββββββββ β
β
ββββββββββββββββ β
β β β
β User βββββββββββββββββββββββββββββ
β Interface β
β β
ββββββββββββββββ
Implementation Note: The current implementation uses ChromaDB for both vector storage and metadata storage. The separate metadata store component shown in this diagram represents a future enhancement.
2. Component Design¶
2.1 Document Processing Pipeline¶
Directory Watcher Service¶
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import os
class MarkdownWatcher(FileSystemEventHandler):
def init(self, processor):
self.processor = processor
def on_created(self, event):
if event.is_directory or not event.src_path.endswith('.md'):
return
self.processor.process_file(event.src_path)
def on_modified(self, event):
if event.is_directory or not event.src_path.endswith('.md'):
return
self.processor.process_file(event.src_path)
def on_deleted(self, event):
if event.is_directory or not event.src_path.endswith('.md'):
return
self.processor.delete_file(event.src_path)
class DirectoryWatcherService:
def init(self, directory_path, processor):
self.observer = Observer()
self.directory_path = directory_path
self.event_handler = MarkdownWatcher(processor)
def start(self):
self.observer.schedule(self.event_handler, self.directory_path, recursive=True)
self.observer.start()
def stop(self):
self.observer.stop()
self.observer.join()
Document Processor¶
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import MarkdownTextSplitter, RecursiveCharacterTextSplitter
import frontmatter
import hashlib
class DocumentProcessor:
def init(self, embedding_service, vectorstore_service, metadata_store):
self.embedding_service = embedding_service
self.vectorstore_service = vectorstore_service
self.metadata_store = metadata_store
self.text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=512,
chunk_overlap=50,
separators=["\n## ", "\n### ", "\n#### ", "\n", " ", ""]
)
def process_file(self, file_path):
# Generate document hash for tracking changes
file_hash = self._hash_file(file_path)
# Check if document has changed
existing_hash = self.metadata_store.get_document_hash(file_path)
if existing_hash == file_hash:
print(f"No changes detected for {file_path}")
return
# Extract content and metadata
with open(file_path, 'r') as f:
content = f.read()
try:
post = frontmatter.loads(content)
metadata = post.metadata
content_text = post.content
except:
metadata = {}
content_text = content
# Add file path and hash to metadata
metadata['source'] = file_path
metadata['file_hash'] = file_hash
# Split the document
docs = self.text_splitter.create_documents([content_text], [metadata])
# Delete old vectors if they exist
if existing_hash:
self.vectorstore_service.delete_document(file_path)
# Generate embeddings and store
self.embedding_service.embed_documents(docs)
self.vectorstore_service.add_documents(docs)
# Update metadata store
self.metadata_store.update_document_metadata(file_path, metadata, file_hash)
def delete_file(self, file_path):
self.vectorstore_service.delete_document(file_path)
self.metadata_store.delete_document(file_path)
def process_all_files(self, directory_path):
for root, , files in os.walk(directorypath):
for file in files:
if file.endswith('.md'):
file_path = os.path.join(root, file)
self.process_file(file_path)
def hashfile(self, file_path):
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
2.2 Embedding Generation¶
from langchain_ollama import OllamaEmbeddings
import numpy as np
class EmbeddingService:
def init(self, model_name="mxbai-embed-large", batch_size=32):
self.embeddings_model = OllamaEmbeddings(
model=model_name,
base_url="http://localhost:11434"
)
self.batch_size = batch_size
def embed_documents(self, documents):
"""Generate embeddings for a list of documents"""
texts = [doc.page_content for doc in documents]
# Process in batches to avoid memory issues
all_embeddings = []
for i in range(0, len(texts), self.batch_size):
batch_texts = texts[i:i+self.batch_size]
batch_embeddings = self.embeddings_model.embed_documents(batch_texts)
all_embeddings.extend(batch_embeddings)
# Add embeddings to document objects
for i, doc in enumerate(documents):
doc.embedding = all_embeddings[i]
return documents
def embed_query(self, query):
"""Generate embedding for a query string"""
return self.embeddings_model.embed_query(query)
2.3 Vector Storage¶
π Planned Future Implementation with Milvus¶
Implementation Note: The current implementation uses ChromaDB instead of Milvus. The code below represents the planned future implementation with Milvus for enhanced scalability.
The Milvus Vector Store service provides efficient storage and retrieval of document embeddings using Milvus's HNSW indexing for fast similarity search. It handles document addition, deletion, and searching with metadata filtering capabilities.from pymilvus import connections, utility, Collection, FieldSchema, CollectionSchema, DataType import numpy as np import uuid class MilvusVectorStore: def init(self, host="localhost", port="19530", embedding_dim=1024): self.embedding_dim = embedding_dim self.collection_name = "markdown_documents" # Connect to Milvus connections.connect(host=host, port=port) # Create collection if it doesn't exist if not utility.has_collection(self.collection_name): self._create_collection() def createcollection(self): fields = [ FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=100), FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=100), FieldSchema(name="chunk_id", dtype=DataType.INT64), FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=1000), FieldSchema(name="metadata", dtype=DataType.JSON), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=self.embedding_dim) ] schema = CollectionSchema(fields=fields) collection = Collection(name=self.collection_name, schema=schema) # Create index for fast retrieval index_params = { "metric_type": "L2", "index_type": "HNSW", "params": {"M": 8, "efConstruction": 200} } collection.create_index(field_name="embedding", index_params=index_params) print(f"Created Milvus collection: {self.collection_name}") def add_documents(self, documents): collection = Collection(self.collection_name) entities = [] for i, doc in enumerate(documents): # Generate document ID if needed doc_id = doc.metadata.get("source", str(uuid.uuid4())) # Prepare entity entity = { "id": f"{doc_id}_{i}", "document_id": doc_id, "chunk_id": i, "text": doc.page_content, "source": doc.metadata.get("source", ""), "metadata": doc.metadata, "embedding": doc.embedding } entities.append(entity) # Insert data in batches collection.insert(entities) collection.flush() print(f"Added {len(documents)} documents to Milvus") def search(self, query_embedding, top_k=5, filter=None): collection = Collection(self.collection_name) collection.load() search_params = {"metric_type": "L2", "params": {"ef": 128}} results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=top_k, expr=filter, output_fields=["text", "source", "metadata"] ) matches = [] for hits in results: for hit in hits: matches.append({ "id": hit.id, "score": hit.score, "text": hit.entity.get("text"), "source": hit.entity.get("source"), "metadata": hit.entity.get("metadata") }) collection.release() return matches def delete_document(self, document_id): collection = Collection(self.collection_name) expr = f'document_id == "{document_id}"' collection.delete(expr) print(f"Deleted document: {document_id}")
2.4 Metadata Store¶
import sqlite3
import json
class SQLiteMetadataStore:
def init(self, db_path="metadata.db"):
self.db_path = db_path
self._create_tables()
def createtables(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create documents table
cursor.execute('''
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
title TEXT,
hash TEXT,
metadata TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def update_document_metadata(self, document_id, metadata, document_hash):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Convert metadata to JSON string
metadata_json = json.dumps(metadata)
# Check if document exists
cursor.execute("SELECT id FROM documents WHERE id = ?", (document_id,))
exists = cursor.fetchone()
if exists:
cursor.execute('''
UPDATE documents SET
title = ?,
hash = ?,
metadata = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (metadata.get('title', document_id), document_hash, metadata_json, document_id))
else:
cursor.execute('''
INSERT INTO documents (id, title, hash, metadata)
VALUES (?, ?, ?, ?)
''', (document_id, metadata.get('title', document_id), document_hash, metadata_json))
conn.commit()
conn.close()
def get_document_hash(self, document_id):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT hash FROM documents WHERE id = ?", (document_id,))
result = cursor.fetchone()
conn.close()
if result:
return result[0]
return None
def get_document_metadata(self, document_id):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT metadata FROM documents WHERE id = ?", (document_id,))
result = cursor.fetchone()
conn.close()
if result:
return json.loads(result[0])
return None
def delete_document(self, document_id):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM documents WHERE id = ?", (document_id,))
conn.commit()
conn.close()
def list_all_documents(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT id, title, updated_at FROM documents ORDER BY updated_at DESC")
results = cursor.fetchall()
conn.close()
return [{"id": r[0], "title": r[1], "updated_at": r[2]} for r in results]
2.5 LangChain RAG Pipeline¶
from langchain.chains import RetrievalQAWithSourcesChain
from langchain.prompts import PromptTemplate
from langchain_ollama import Ollama
from typing import List, Dict, Any
class RAGPipeline:
def init(
self,
embedding_service,
vectorstore_service,
metadata_store,
model_name="llama3",
temperature=0.1,
top_k=5
):
self.embedding_service = embedding_service
self.vectorstore_service = vectorstore_service
self.metadata_store = metadata_store
self.model_name = model_name
self.temperature = temperature
self.top_k = top_k
# Initialize LLM
self.llm = Ollama(
model=model_name,
temperature=temperature
)
# Create QA prompt
self.qa_prompt = PromptTemplate(
template="""You are an assistant that helps users find information in a collection of markdown documents.
Answer the question based on the following context:
{context}
Question: {question}
Provide a comprehensive answer. If the answer cannot be found in the context, say so clearly.
Include relevant source information when possible.
""",
input_variables=["context", "question"]
)
def query(self, query_text, filter_metadata=None):
# Embed the query
query_embedding = self.embedding_service.embed_query(query_text)
# Construct filter expression if needed
filter_expr = None
if filter_metadata:
filter_parts = []
for key, value in filter_metadata.items():
filter_parts.append(f'metadata["{key}"] == "{value}"')
filter_expr = " && ".join(filter_parts)
# Retrieve relevant documents
results = self.vectorstore_service.search(
query_embedding=query_embedding,
top_k=self.top_k,
filter=filter_expr
)
# Construct context from results
context_texts = []
sources = []
for result in results:
context_texts.append(result["text"])
if result["source"]:
sources.append(result["source"])
context = "\n\n".join(context_texts)
# Generate response with LLM
prompt = self.qa_prompt.format(context=context, question=query_text)
response = self.llm.invoke(prompt)
return {
"query": query_text,
"response": response,
"sources": list(set(sources)),
"results": results
}
2.6 API and Interface¶
from fastapi import FastAPI, HTTPException, Body
from pydantic import BaseModel
from typing import Dict, List, Optional, Any
import uvicorn
import os
app = FastAPI(title="Markdown RAG API")
# Initialize services
embedding_service = EmbeddingService(model_name="mxbai-embed-large")
metadata_store = SQLiteMetadataStore(db_path="metadata.db")
vectorstore_service = MilvusVectorStore(
host=os.environ.get("MILVUS_HOST", "localhost"),
port=os.environ.get("MILVUS_PORT", "19530")
)
document_processor = DocumentProcessor(
embedding_service=embedding_service,
vectorstore_service=vectorstore_service,
metadata_store=metadata_store
)
rag_pipeline = RAGPipeline(
embedding_service=embedding_service,
vectorstore_service=vectorstore_service,
metadata_store=metadata_store,
model_name=os.environ.get("OLLAMA_MODEL", "llama3")
)
# Initialize directory watcher
watcher_service = DirectoryWatcherService(
directory_path=os.environ.get("VAULT_DIR", "./vault"),
processor=document_processor
)
# Start directory watcher on startup
@app.on_event("startup")
async def startup_event():
# Process all existing files first
print(f"Processing existing files in {os.environ.get('VAULT_DIR', './vault')}")
document_processor.process_all_files(os.environ.get("VAULT_DIR", "./vault"))
# Start watching for changes
print("Starting directory watcher")
watcher_service.start()
# Shutdown directory watcher on shutdown
@app.on_event("shutdown")
async def shutdown_event():
print("Stopping directory watcher")
watcher_service.stop()
# Define API models
class QueryRequest(BaseModel):
query: str
filter_metadata: Optional[Dict[str, Any]] = None
model: Optional[str] = None
temperature: Optional[float] = None
top_k: Optional[int] = None
class QueryResponse(BaseModel):
query: str
response: str
sources: List[str]
# Define API endpoints
@app.post("/api/query", response_model=QueryResponse)
async def query(request: QueryRequest = Body(...)):
try:
# Use provided model params or defaults
model = request.model or rag_pipeline.model_name
temperature = request.temperature or rag_pipeline.temperature
top_k = request.top_k or rag_pipeline.top_k
# Create a customized pipeline if needed
if model != rag_pipeline.model_name or temperature != rag_pipeline.temperature or top_k != rag_pipeline.top_k:
custom_pipeline = RAGPipeline(
embedding_service=embedding_service,
vectorstore_service=vectorstore_service,
metadata_store=metadata_store,
model_name=model,
temperature=temperature,
top_k=top_k
)
result = custom_pipeline.query(request.query, request.filter_metadata)
else:
result = rag_pipeline.query(request.query, request.filter_metadata)
return {
"query": result["query"],
"response": result["response"],
"sources": result["sources"]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/documents")
async def list_documents():
try:
documents = metadata_store.list_all_documents()
return documents
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/reindex")
async def reindex():
try:
document_processor.process_all_files(os.environ.get("VAULT_DIR", "./vault"))
return {"status": "success", "message": "Reindexing complete"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Run the API server
if name == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
3. Docker Deployment Configuration¶
version: '3.8'
services:
# LLM service
ollama:
image: ollama/ollama:latest
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
restart: unless-stopped
# Web UI for Ollama
openwebui:
image: ghcr.io/open-webui/open-webui:latest
depends_on:
- ollama
ports:
- "8080:8080"
environment:
- OLLAMA_BASE_URL=http://ollama:11434
restart: unless-stopped
# Vector database
milvus:
image: milvusdb/milvus:v2.3.2
ports:
- "19530:19530"
- "9091:9091"
environment:
- ETCD_ENDPOINTS=etcd:2379
- MINIO_ADDRESS=minio:9000
volumes:
- milvus_data:/var/lib/milvus
restart: unless-stopped
depends_on:
- etcd
- minio
etcd:
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- etcd_data:/etcd
command: etcd --advertise-client-urls=http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
restart: unless-stopped
minio:
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
ports:
- "9000:9000"
volumes:
- minio_data:/data
command: minio server /data
restart: unless-stopped
# RAG API service
rag-api:
build:
context: .
dockerfile: Dockerfile.rag
ports:
- "8000:8000"
environment:
- VAULT_DIR=/vault
- MILVUS_HOST=milvus
- MILVUS_PORT=19530
- OLLAMA_URL=http://ollama:11434
- OLLAMA_MODEL=llama3
volumes:
- ./vault:/vault
- rag_data:/app/data
depends_on:
- ollama
- milvus
restart: unless-stopped
# Minimal UI for RAG testing
streamlit-ui:
build:
context: .
dockerfile: Dockerfile.ui
ports:
- "8501:8501"
environment:
- RAG_API_URL=http://rag-api:8000
depends_on:
- rag-api
restart: unless-stopped
volumes:
ollama_data:
milvus_data:
etcd_data:
minio_data:
rag_data:
4. Implementation Details and Best Practices¶
4.1 Markdown Processing Strategies¶
For optimal handling of markdown files, we implement: 1. Hierarchical Chunking: Split documents based on heading structure to maintain context. 2. Metadata Extraction: Parse YAML frontmatter for enhanced filtering and context. 3. Link Resolution: Handle internal links (e.g., [[wiki-style]] links) to maintain cross-references. 4. Code Block Handling: Special processing for code blocks to preserve formatting and syntax.
4.2 Embedding Model Selection and Optimization¶
The mxbai-embed-large model provides excellent performance for semantic understanding of technical content. Key considerations: 1. Dimension Reduction: Consider implementing PCA for large collections to reduce storage requirements. 2. Batch Processing: Process embeddings in batches to optimize throughput. 3. Caching: Implement caching for frequently accessed embeddings. 4. Quantization: For larger collections, consider quantizing embeddings to reduce storage and memory footprint.
4.3 Vector Database Configuration and Optimization¶
β Current ChromaDB Configuration¶
The current implementation uses ChromaDB with the following configuration:
- Persistence: Local directory storage at the configured location
- Embedding Integration: Direct integration with Ollama embedding models
- Default Settings: Using ChromaDB's default HNSW parameters
- Metadata Handling: Filtering of complex data types for compatibility
π Future Milvus Configuration (Planned)¶
For optimal Milvus performance (future implementation):
- Index Selection: HNSW (Hierarchical Navigable Small World) provides the best balance of accuracy and performance
- Parameter Tuning:
- M: Controls the maximum number of connections per node (8-16 recommended)
- efConstruction: Controls index build quality (100-200 recommended)
- ef: Controls search accuracy (50-150 recommended)
- Resource Allocation: Configure adequate memory for Milvus, especially for the index
- Collection Design: Use partitioning for larger collections to improve query performance
4.4 Advanced RAG Techniques¶
Implementation Note: The following techniques are planned for future enhancements and are not part of the current implementation.
π Future RAG Enhancements (Planned)¶
-
Query Reformulation: Process user queries to improve retrieval effectiveness:
-
Hybrid Search: Combine vector similarity with keyword search for improved recall:
# PLANNED ENHANCEMENT def hybrid_search(query, vectorstore, metadata_store): # Vector search vector_results = vectorstore.search(query_embedding) # Keyword search keyword_results = metadata_store.keyword_search(query) # Combine results with appropriate weighting combined_results = combine_search_results(vector_results, keyword_results) return combined_results
-
Reranking: Implement a two-stage retrieval process to refine results:
# PLANNED ENHANCEMENT def rerank_results(query, initial_results, reranker_model): query_doc_pairs = [(query, result["text"]) for result in initial_results] scores = reranker_model.compute_scores(query_doc_pairs) # Sort by reranker scores reranked_results = [ (initial_results[i], scores[i]) for i in range(len(initial_results)) ] reranked_results.sort(key=lambda x: x[1], reverse=True) return [item[0] for item in reranked_results]
-
LLM Agents for Query Planning:
# PLANNED ENHANCEMENT def agent_based_query(query, rag_pipeline): # Analyze query to determine approach query_plan = rag_pipeline.llm.invoke(f""" Analyze this query and create a search plan: Query: {query} What kind of information is needed? Should I: 1. Perform a direct search 2. Break this into sub-questions 3. Filter by specific metadata Plan: """) # Execute the plan if "sub-questions" in query_plan: # Handle multi-hop retrieval # ... else: # Direct retrieval return rag_pipeline.query(query)
4.5 Evaluation and Monitoring¶
Implementation Note: The following evaluation metrics are planned for future implementation to measure and improve RAG performance.
π Planned Evaluation Framework¶
- Retrieval Evaluation:
- Mean Reciprocal Rank (MRR)
- Normalized Discounted Cumulative Gain (NDCG)
-
Precision@K and Recall@K
-
Response Quality Evaluation:
- Factual Accuracy
- Answer Relevance
- Citation Accuracy
β Current Evaluation Methods¶
The current implementation provides basic statistics and manual evaluation:
- Document count tracking
- Source file listing
- Manual verification of responses
π Planned System Monitoring¶
- System Monitoring:
- Query latency
- Embedding generation throughput
- Vector store query performance
- LLM response time
5. Extension Points¶
Implementation Note: The following are potential extension points for future development beyond the current implementation.
π Planned Future Extensions¶
The modular design allows for several extensions:
- Multi-Modal Support: Extend to handle images and other media in markdown
- Semantic Caching: Implement a semantic cache for similar queries
- Custom Embedding Models: Allow customization of embedding models based on domain
- Advanced Vector Database Integration: Support for Milvus and other scalable vector databases
- Hybrid Search Implementation: Combine vector search with keyword-based search
- User Feedback Integration: Capture user feedback to improve retrieval and generation
- Self-Critique and Refinement: Implement self-evaluation and refinement of responses
6. Testing Strategy¶
Implementation Note: The current implementation includes basic unit tests. The comprehensive testing framework described below is planned for future development.
π Planned Testing Framework¶
Comprehensive testing includes:
- Unit Tests: For individual components
- Integration Tests: For component interactions
- End-to-End Tests: Using a test corpus of markdown documents
- Performance Testing: Under various loads and document sizes
- Regression Testing: To ensure continued quality as the system evolves
β Current Testing Implementation¶
The current implementation includes:
- Basic Unit Tests: Testing core functionality of each component
- Service-level Tests: Verifying RAG service integration
- Document Processing Tests: Ensuring correct handling of markdown files
7. Appendix¶
7.1 Installation Instructions¶
# Clone the repository
git clone https://github.com/yourusername/markdown-rag.git
cd markdown-rag
# Build and start services
docker-compose up -d
# Download required models
curl -X POST http://localhost:11434/api/pull -d '{"name": "mxbai-embed-large"}'
curl -X POST http://localhost:11434/api/pull -d '{"name": "llama3"}'
# Verify installation
curl http://localhost:8000/api/documents
7.2 API Documentation¶
The API is documented using OpenAPI and accessible at http://localhost:8000/docs when the service is running.
7.3 Performance Benchmarks¶
Initial benchmarks with a corpus of 1,000 markdown files (~10MB total): - Document processing: ~5 documents/second - Query latency: ~500ms (including embedding generation and retrieval) - Memory usage: ~2GB (Milvus) + ~1GB (Python services)