Building a RAG System with TimescaleDB

Sujithra Kathiravan
7 min readJan 8, 2025

--

RAG Is More Than Just Vector Search | Timescale

Retrieval Augmented Generation (RAG) has emerged as a powerful pattern for building knowledge-intensive applications. However, developers often face significant challenges:

  • Managing multiple databases for vectors and metadata
  • Ensuring data consistency across systems
  • Handling complex queries combining vector search with traditional data
  • Scaling systems cost-effectively

While many solutions focus on specialized vector databases, TimescaleDB offers a compelling alternative: a production-grade PostgreSQL database that handles both vector operations and traditional data workloads. In this article, we’ll build a scalable RAG system using TimescaleDB, exploring how to move beyond proof-of-concept to handle real-world challenges.

Why TimescaleDB for RAG?

The typical RAG architecture often involves juggling multiple databases: a vector store for embeddings and a traditional database for metadata and relationships. TimescaleDB offers a compelling alternative:

  1. Single Source of Truth: Store vectors, metadata, and relationships in one database
  2. PostgreSQL Compatibility: Leverage the rich PostgreSQL ecosystem
  3. Production-Ready Features: Built-in compression, backup, and monitoring
  4. Cost-Effective: No need for separate vector and traditional databases

Prerequisites

Before we begin, ensure you have the following installed:

# Software Requirements
PostgreSQL >= 14.0
TimescaleDB >= 2.11.0
Python >= 3.8
# Python Dependencies
pip install -r requirements.txt

Create a requirements.txt file with these dependencies:

psycopg2-binary>=2.9.9
langchain>=0.0.267
openai>=0.28.0
python-dotenv>=1.0.0
numpy>=1.24.0
cachetools>=5.3.2

Set up your environment variables:

# .env file
TIMESCALE_HOST=localhost
TIMESCALE_PORT=5432
TIMESCALE_DB=rag_system
TIMESCALE_USER=your_user
TIMESCALE_PASSWORD=your_password
OPENAI_API_KEY=your_openai_key

System Architecture

Our RAG system will include:

  • Document ingestion and chunking pipeline
  • Embedding generation and storage
  • Vector similarity search
  • Integration with an LLM for answer generation
  • Performance monitoring and optimization

Let’s build each component step by step.

Setting Up TimescaleDB with pgvector

Let’s start by setting up our database. We’ll need two main tables: one for documents and another for document chunks with their embeddings.

-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pgai;

-- Create our documents table
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
title TEXT NOT NULL,
content TEXT NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Create table for document chunks
CREATE TABLE document_chunks (
id SERIAL PRIMARY KEY,
document_id INTEGER REFERENCES documents(id),
chunk_index INTEGER,
content TEXT NOT NULL,
embedding vector(1536),
created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Create an index for vector similarity search
CREATE INDEX ON document_chunks USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

This initial setup adds vector operations support through pgvector and AI capabilities through pgai. The vector extension enables similarity searches, while pgai will help us integrate with various AI models.

This schema design serves several purposes:

  • The documents table stores original documents with their metadata
  • The document_chunks table holds text chunks and their embeddings
  • Using JSONB for metadata allows flexible document attributes
  • Timestamps enable tracking and maintenance operations

The IVFFlat index with 100 lists is chosen because:

  • It provides a good balance of search speed and accuracy for datasets up to 1 million vectors
  • Each list contains approximately 1% of the vectors, allowing efficient searching
  • Memory usage remains reasonable while maintaining good recall

The vector similarity search index dramatically speeds up similarity searches. The lists parameter is like creating 100 buckets for vectors, trading some accuracy for speed. For most applications, 100 lists provide a good balance, but you might adjust this based on your data size and accuracy needs.

Document Processing Pipeline

Here's our Python code for document processing:

import psycopg2
from psycopg2.extras import execute_values
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
import numpy as np

class DocumentProcessor:
def __init__(self, db_params, batch_size=100):
self.db_params = db_params
self.batch_size = batch_size
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
self.embeddings_model = OpenAIEmbeddings()

def process_document(self, title, content, metadata=None):
# Split document into chunks
chunks = self.text_splitter.split_text(content)

with psycopg2.connect(**self.db_params) as conn:
with conn.cursor() as cur:
# Insert document
cur.execute("""
INSERT INTO documents (title, content, metadata)
VALUES (%s, %s, %s)
RETURNING id
""", (title, content, metadata))
document_id = cur.fetchone()[0]

# Process chunks in batches
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]

# Generate embeddings
embeddings = self.embeddings_model.embed_documents(batch)

# Prepare data for bulk insert
chunk_data = [
(document_id, idx + i, chunk, embedding)
for idx, (chunk, embedding) in enumerate(zip(batch, embeddings))
]

# Bulk insert chunks and embeddings
execute_values(cur, """
INSERT INTO document_chunks
(document_id, chunk_index, content, embedding)
VALUES %s
""", chunk_data)

conn.commit()

This processor class handles three critical tasks:

  1. Splitting documents into manageable chunks
  2. Generating embeddings for each chunk
  3. Storing everything in TimescaleDB

The chunk size of 500 characters with 50-character overlap is a good starting point, but you might need to adjust based on your content. Longer chunks provide more context but increase processing time and token usage.

The process document method handles document insertion. We use a transaction to ensure the document and its chunks are stored atomically — either everything succeeds, or nothing changes.

The batch processing of chunks is where things get interesting:

Why batch processing? Two reasons:

  1. Memory Efficiency: Processing thousands of chunks at once could overwhelm your system
  2. API Optimization: Most embedding APIs work more efficiently with batches

Implementing Vector Search

Vector search is where TimescaleDB really shines. Let’s build a search function that combines vector similarity with metadata filtering:

def semantic_search(query, metadata_filter=None, limit=5):
query_embedding = embeddings_model.embed_query(query)

with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cur:
# Base query with vector similarity
sql = """
SELECT
dc.content,
d.title,
d.metadata,
1 - (dc.embedding <=> %s) as similarity
FROM document_chunks dc
JOIN documents d ON d.id = dc.document_id
WHERE 1=1
"""
params = [query_embedding]

# Add metadata filters if specified
if metadata_filter:
sql += " AND d.metadata @> %s"
params.append(json.dumps(metadata_filter))

# Order by similarity and limit results
sql += """
ORDER BY dc.embedding <=> %s
LIMIT %s
"""
params.extend([query_embedding, limit])

cur.execute(sql, params)
results = cur.fetchall()

return [
{
'content': r[0],
'title': r[1],
'metadata': r[2],
'similarity': r[3]
}
for r in results
]

The <=> operator computes cosine distance. We subtract from 1 to convert distance to similarity (higher is better). This query combines:

  • Vector similarity search
  • Document metadata
  • Relevance scoring

Adding metadata filters makes our search more powerful:

The @> operator checks if metadata contains specific fields, leveraging PostgreSQL's JSONB capabilities. This lets us filter results by any metadata field without changing our schema.

Configuration Management

Create a configuration manager to handle database and API settings:

import os
from typing import Dict, Any
from dotenv import load_dotenv

class Config:
def __init__(self):
load_dotenv()

@property
def db_params(self) -> Dict[str, Any]:
"""Get database connection parameters."""
return {
'host': os.getenv('TIMESCALE_HOST', 'localhost'),
'port': int(os.getenv('TIMESCALE_PORT', 5432)),
'database': os.getenv('TIMESCALE_DB', 'rag_system'),
'user': os.getenv('TIMESCALE_USER'),
'password': os.getenv('TIMESCALE_PASSWORD')
}

@property
def openai_config(self) -> Dict[str, str]:
"""Get OpenAI API configuration."""
return {
'api_key': os.getenv('OPENAI_API_KEY')
}

Optimizing for Production

1. Implement Smart Caching

class CachedEmbeddingSearch:
def __init__(self, cache_ttl=3600, cache_maxsize=10000):
self.cache = TTLCache(maxsize=cache_maxsize, ttl=cache_ttl)

This cache serves two purposes:

  • Reduces embedding API costs by caching frequent queries
  • Improves response times for common searches

2. Create a hypertable for tracking search metrics:

CREATE TABLE search_metrics (
id SERIAL PRIMARY KEY,
query TEXT,
result_count INTEGER,
execution_time_ms FLOAT,
timestamp TIMESTAMPTZ DEFAULT NOW()
);

SELECT create_hypertable('search_metrics', 'timestamp');

Using a hypertable lets us efficiently store and analyze search performance over time. We can track:

  • Query patterns
  • Response times
  • Result quality

3. Optimize for Common Queries

For frequently accessed data, create a materialized view:

CREATE MATERIALIZED VIEW common_searches AS
SELECT
dc.embedding,
dc.content,
d.title,
d.metadata
FROM document_chunks dc
JOIN documents d ON d.id = dc.document_id
WHERE d.metadata->>'importance' = 'high';

This approach:

  • Precomputes joins for frequent queries
  • Reduces runtime computation
  • Improves response times for important content

Best Practices and Lessons Learned

  1. Chunking Strategy
  • Keep chunks between 300–500 characters for balanced context
  • Use semantic boundaries when possible (paragraphs, sentences)
  • Consider your content type when setting overlap

2. Index Management

  • Run VACUUM ANALYZE regularly after bulk insertions
  • Monitor index usage with pg_stat_user_indexes
  • Review and adjust IVFFlat parameters as your data grows

3. Resource Management

  • Implement connection pooling using pgbouncer
  • Set appropriate timeouts for queries
  • Monitor memory usage and connection counts

Real-World Use Case: Customer Support System

Let’s build a practical example: a customer support system that combines historical tickets, product documentation, and real-time customer interactions.

class SupportSystem:
def __init__(self):
self.db = TimescaleDBClient(
host=os.environ['TIMESCALE_HOST'],
database='support_system'
)

async def handle_customer_query(self, query: str, customer_id: str):
# First, get customer context
customer_context = await self.db.fetch_one("""
SELECT
json_build_object(
'recent_tickets', (
SELECT json_agg(t.*)
FROM tickets t
WHERE customer_id = $1
ORDER BY created_at DESC
LIMIT 5
),
'subscription', (
SELECT json_build_object('plan', plan, 'status', status)
FROM subscriptions
WHERE customer_id = $1
)
) as context
""", customer_id)

# Then, perform vector search with context
results = await self.db.fetch_all("""
WITH query_embedding AS (
SELECT ai.embed($1) as embedding
)
SELECT
dc.content,
1 - (dc.embedding <=> (SELECT embedding FROM query_embedding)) as similarity,
d.source,
d.metadata
FROM document_chunks dc
JOIN documents d ON d.id = dc.document_id
WHERE d.metadata->>'product' = $2
ORDER BY dc.embedding <=> (SELECT embedding FROM query_embedding)
LIMIT 5
""", query, customer_context['subscription']['plan'])

return self.format_response(results, customer_context)

This real-world example demonstrates several TimescaleDB advantages:

  • Complex SQL joins with vector operations
  • Rich JSON operations for metadata
  • Transaction support for data consistency
  • No need for multiple databases

Decision Guide: When to Use TimescaleDB for RAG

✅ Choose TimescaleDB when you need:

  • Single source of truth for all data
  • Complex queries combining vectors and traditional data
  • Enterprise-grade reliability and backup
  • Cost-effective scaling
  • Rich monitoring and observability

❌ Consider alternatives when:

  • You only need simple vector similarity search
  • Your data doesn’t have temporal aspects
  • You don’t need ACID compliance

Remember: The true cost of a vector database isn’t just the storage — it’s the entire ecosystem and engineering effort required to maintain multiple databases in sync.

Conclusion

TimescaleDB provides a robust foundation for building production-ready RAG systems. Its combination of vector search capabilities and traditional database features makes it an excellent choice for organizations looking to implement RAG without managing multiple databases.

Key takeaways:

  • Unified storage simplifies architecture
  • Built-in monitoring tools aid optimization
  • PostgreSQL compatibility enables complex queries
  • Scalable from PoC to production

Next Steps

Ready to build your RAG system? Here are some resources to get started:

Feel free to reach out with questions or share your experiences in the comments below!

--

--

Sujithra Kathiravan
Sujithra Kathiravan

Written by Sujithra Kathiravan

AI/ML and GenAI Enthusiast🤖 || 📈MS Business Analytics @UT Dallas || 🌐 AWS Community Builder || 🚀AWS 2xCertified

No responses yet