vector-database-management

安装量: 40
排名: #18037

安装

npx skills add https://github.com/manutej/luxor-claude-marketplace --skill vector-database-management

Vector Database Management Table of Contents Introduction Vector Embeddings Fundamentals Database Setup & Configuration Index Operations Vector Operations Similarity Search Metadata Filtering Hybrid Search Namespace & Collection Management Performance & Scaling Production Best Practices Cost Optimization Introduction

Vector databases are specialized systems designed to store, index, and query high-dimensional vector embeddings efficiently. They power modern AI applications including semantic search, recommendation systems, RAG (Retrieval Augmented Generation), and similarity-based matching.

Key Concepts Vector Embeddings: Numerical representations of data (text, images, audio) in high-dimensional space Similarity Search: Finding vectors that are "close" to a query vector using distance metrics Metadata Filtering: Combining vector similarity with structured data filtering Indexing: Optimization structures (HNSW, IVF, etc.) for fast approximate nearest neighbor search Database Comparison Feature Pinecone Weaviate Chroma Deployment Fully managed Managed or self-hosted Self-hosted or cloud Index Types Serverless, Pods HNSW HNSW Metadata Filtering Advanced GraphQL-based Simple Hybrid Search Sparse-Dense Built-in Limited Scale Massive Large Small-Medium Best For Production RAG Knowledge graphs Local development Vector Embeddings Fundamentals Understanding Vector Representations

Vector embeddings transform unstructured data into numerical arrays that capture semantic meaning:

Text to embeddings using OpenAI

from openai import OpenAI

client = OpenAI(api_key="YOUR_API_KEY")

def generate_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]: """Generate embeddings from text using OpenAI.""" response = client.embeddings.create( input=text, model=model ) return response.data[0].embedding

Example usage

text = "Vector databases enable semantic search capabilities" embedding = generate_embedding(text) print(f"Embedding dimension: {len(embedding)}") # 1536 dimensions print(f"First 5 values: {embedding[:5]}")

Popular Embedding Models

1. OpenAI Embeddings (Production-grade)

from openai import OpenAI

def openai_embeddings(texts: list[str]) -> list[list[float]]: """Batch generate OpenAI embeddings.""" client = OpenAI(api_key="YOUR_API_KEY") response = client.embeddings.create( input=texts, model="text-embedding-3-large" # 3072 dimensions ) return [item.embedding for item in response.data]

2. Sentence Transformers (Open-source)

from sentence_transformers import SentenceTransformer

def sentence_transformer_embeddings(texts: list[str]) -> list[list[float]]: """Generate embeddings using Sentence Transformers.""" model = SentenceTransformer('all-MiniLM-L6-v2') # 384 dimensions embeddings = model.encode(texts) return embeddings.tolist()

3. Cohere Embeddings

import cohere

def cohere_embeddings(texts: list[str]) -> list[list[float]]: """Generate embeddings using Cohere.""" co = cohere.Client("YOUR_API_KEY") response = co.embed( texts=texts, model="embed-english-v3.0", input_type="search_document" ) return response.embeddings

Embedding Dimensions & Trade-offs

Different embedding models for different use cases

EMBEDDING_CONFIGS = { "openai-small": { "model": "text-embedding-3-small", "dimensions": 1536, "cost_per_1m": 0.02, "use_case": "General purpose, cost-effective" }, "openai-large": { "model": "text-embedding-3-large", "dimensions": 3072, "cost_per_1m": 0.13, "use_case": "High accuracy requirements" }, "sentence-transformers": { "model": "all-MiniLM-L6-v2", "dimensions": 384, "cost_per_1m": 0.00, # Open-source "use_case": "Local development, privacy-sensitive" }, "cohere-multilingual": { "model": "embed-multilingual-v3.0", "dimensions": 1024, "cost_per_1m": 0.10, "use_case": "Multi-language applications" } }

Database Setup & Configuration Pinecone Setup

Install Pinecone SDK

pip install pinecone-client

from pinecone import Pinecone, ServerlessSpec

Initialize Pinecone client

pc = Pinecone(api_key="YOUR_API_KEY")

List existing indexes

indexes = pc.list_indexes() print(f"Existing indexes: {[idx.name for idx in indexes]}")

Create serverless index (recommended for production)

index_name = "production-search"

if index_name not in [idx.name for idx in pc.list_indexes()]: pc.create_index( name=index_name, dimension=1536, # Match your embedding model metric="cosine", # cosine, dotproduct, or euclidean spec=ServerlessSpec( cloud="aws", region="us-east-1" ), deletion_protection="enabled", # Prevent accidental deletion tags={ "environment": "production", "team": "ml", "project": "semantic-search" } ) print(f"Created index: {index_name}")

Connect to index

index = pc.Index(index_name)

Get index stats

stats = index.describe_index_stats() print(f"Index stats: {stats}")

Selective Metadata Indexing (Pinecone)

Configure which metadata fields to index for filtering

This optimizes memory usage and query performance

from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key="YOUR_API_KEY")

Create index with metadata configuration

pc.create_index( name="optimized-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1", schema={ "fields": { # Index these fields for filtering "document_id": {"filterable": True}, "category": {"filterable": True}, "created_at": {"filterable": True}, "tags": {"filterable": True}, # Store but don't index (saves memory) "document_title": {"filterable": False}, "document_url": {"filterable": False}, "full_content": {"filterable": False} } } ) )

This configuration allows you to:

1. Filter by document_id, category, created_at, tags

2. Retrieve document_title, document_url, full_content in results

3. Save memory by not indexing non-filterable fields

Weaviate Setup

Install Weaviate client

pip install weaviate-client

import weaviate from weaviate.classes.config import Configure, Property, DataType

Connect to Weaviate

client = weaviate.connect_to_local()

Or connect to Weaviate Cloud

client = weaviate.connect_to_wcs(

cluster_url="YOUR_WCS_URL",

auth_credentials=weaviate.auth.AuthApiKey("YOUR_API_KEY")

)

Create collection (schema)

try: collection = client.collections.create( name="Documents", vectorizer_config=Configure.Vectorizer.text2vec_openai( model="text-embedding-3-small" ), properties=[ Property(name="title", data_type=DataType.TEXT), Property(name="content", data_type=DataType.TEXT), Property(name="category", data_type=DataType.TEXT), Property(name="created_at", data_type=DataType.DATE), Property(name="tags", data_type=DataType.TEXT_ARRAY) ] ) print(f"Created collection: Documents") except Exception as e: print(f"Collection exists or error: {e}")

Get collection

documents = client.collections.get("Documents")

Check collection info

print(documents.config.get())

Chroma Setup

Install Chroma

pip install chromadb

import chromadb from chromadb.config import Settings

Initialize Chroma client (persistent)

client = chromadb.PersistentClient(path="./chroma_db")

Or use ephemeral (in-memory)

client = chromadb.EphemeralClient()

Create or get collection

collection = client.get_or_create_collection( name="documents", metadata={ "description": "Document collection for semantic search", "hnsw:space": "cosine" # cosine, l2, or ip (inner product) } )

List all collections

collections = client.list_collections() print(f"Available collections: {[c.name for c in collections]}")

Get collection info

print(f"Collection count: {collection.count()}")

Index Operations Creating Indexes with Different Configurations from pinecone import Pinecone, ServerlessSpec, PodSpec

pc = Pinecone(api_key="YOUR_API_KEY")

1. Serverless index (auto-scaling, pay-per-use)

pc.create_index( name="serverless-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) )

2. Pod-based index (dedicated resources)

pc.create_index( name="pod-index", dimension=1536, metric="dotproduct", spec=PodSpec( environment="us-east-1-aws", pod_type="p1.x1", # Performance tier pods=2, # Number of pods replicas=2, # Replicas for high availability shards=1 ) )

3. Sparse index (for BM25-like search)

pc.create_index( name="sparse-index", dimension=None, # Sparse vectors don't have fixed dimension metric="dotproduct", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) )

Index Management Operations from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY")

List all indexes

indexes = pc.list_indexes() for idx in indexes: print(f"Name: {idx.name}, Status: {idx.status.state}, Host: {idx.host}")

Describe specific index

index_info = pc.describe_index("production-search") print(f"Dimension: {index_info.dimension}") print(f"Metric: {index_info.metric}") print(f"Status: {index_info.status}")

Connect to index

index = pc.Index("production-search")

Get index statistics

stats = index.describe_index_stats() print(f"Total vectors: {stats.total_vector_count}") print(f"Namespaces: {stats.namespaces}") print(f"Index fullness: {stats.index_fullness}")

Delete index (be careful!)

pc.delete_index("test-index")

Configuring Index for Optimal Performance

Configuration for different use cases

1. High-throughput search (many queries/second)

pc.create_index( name="high-throughput", dimension=1536, metric="cosine", spec=PodSpec( environment="us-east-1-aws", pod_type="p2.x1", # Higher performance tier pods=4, replicas=3 # More replicas = higher query throughput ) )

2. Large-scale storage (billions of vectors)

pc.create_index( name="large-scale", dimension=1536, metric="cosine", spec=PodSpec( environment="us-east-1-aws", pod_type="s1.x1", # Storage-optimized pods=8, shards=4 # More shards = more storage capacity ) )

3. Cost-optimized development

pc.create_index( name="dev-environment", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) # Serverless = pay only for what you use )

Vector Operations Upserting Vectors (Pinecone) from pinecone import Pinecone import uuid

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("production-search")

1. Single vector upsert

vector_id = str(uuid.uuid4()) index.upsert( vectors=[ { "id": vector_id, "values": [0.1, 0.2, 0.3, ...], # 1536 dimensions "metadata": { "title": "Introduction to Vector Databases", "category": "education", "author": "John Doe", "created_at": "2024-01-15", "tags": ["ml", "ai", "databases"] } } ], namespace="documents" )

2. Batch upsert (efficient for large datasets)

batch_size = 100 vectors = []

for i, (doc_id, embedding, metadata) in enumerate(documents): vectors.append({ "id": doc_id, "values": embedding, "metadata": metadata })

# Upsert in batches
if len(vectors) >= batch_size or i == len(documents) - 1:
    index.upsert(vectors=vectors, namespace="documents")
    print(f"Upserted batch of {len(vectors)} vectors")
    vectors = []

3. Upsert with async for better performance

from pinecone import Pinecone import asyncio

async def upsert_vectors_async(vectors_batch): """Async upsert for parallel processing.""" index.upsert(vectors=vectors_batch, namespace="documents", async_req=True)

Parallel upsert

tasks = [] for batch in batches: tasks.append(upsert_vectors_async(batch)) await asyncio.gather(*tasks)

Sparse Vector Operations (Pinecone)

Sparse vectors are useful for keyword-based search (like BM25)

Combined with dense vectors for hybrid search

from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("hybrid-search-index")

Upsert vector with both dense and sparse components

index.upsert( vectors=[ { "id": "doc1", "values": [0.1, 0.2, ..., 0.5], # Dense vector "sparse_values": { "indices": [10, 45, 123, 234, 678], # Token IDs "values": [0.8, 0.6, 0.9, 0.7, 0.5] # TF-IDF weights }, "metadata": {"title": "Hybrid Search Document"} } ], namespace="hybrid" )

Query with hybrid search

results = index.query( vector=[0.1, 0.2, ..., 0.5], # Dense query vector sparse_vector={ "indices": [10, 45, 123], "values": [0.8, 0.7, 0.9] }, top_k=10, namespace="hybrid", include_metadata=True )

Vector Operations (Weaviate) import weaviate from weaviate.classes.query import MetadataQuery

client = weaviate.connect_to_local() documents = client.collections.get("Documents")

1. Insert single object

doc_uuid = documents.data.insert( properties={ "title": "Vector Database Guide", "content": "A comprehensive guide to vector databases...", "category": "tutorial", "created_at": "2024-01-15T10:00:00Z", "tags": ["database", "ml", "ai"] } ) print(f"Inserted: {doc_uuid}")

2. Batch insert

with documents.batch.dynamic() as batch: for doc in document_list: batch.add_object( properties={ "title": doc["title"], "content": doc["content"], "category": doc["category"], "created_at": doc["created_at"], "tags": doc["tags"] } )

3. Insert with custom vector

documents.data.insert( properties={"title": "Custom Vector Doc", "content": "..."}, vector=[0.1, 0.2, 0.3, ...] # Your pre-computed vector )

4. Update object

documents.data.update( uuid=doc_uuid, properties={"title": "Updated Title"} )

5. Delete object

documents.data.delete_by_id(uuid=doc_uuid)

Vector Operations (Chroma) import chromadb

client = chromadb.PersistentClient(path="./chroma_db") collection = client.get_collection("documents")

1. Add documents with auto-embedding

collection.add( documents=[ "This is document 1", "This is document 2", "This is document 3" ], metadatas=[ {"category": "tech", "author": "Alice"}, {"category": "science", "author": "Bob"}, {"category": "tech", "author": "Charlie"} ], ids=["doc1", "doc2", "doc3"] )

2. Add with custom embeddings

collection.add( embeddings=[ [0.1, 0.2, 0.3, ...], [0.4, 0.5, 0.6, ...] ], metadatas=[ {"title": "Doc 1"}, {"title": "Doc 2"} ], ids=["custom1", "custom2"] )

3. Update documents

collection.update( ids=["doc1"], documents=["Updated document content"], metadatas=[{"category": "tech", "updated": True}] )

4. Delete documents

collection.delete(ids=["doc1", "doc2"])

5. Get documents by IDs

results = collection.get( ids=["doc1", "doc2"], include=["documents", "metadatas", "embeddings"] )

Similarity Search Basic Similarity Search (Pinecone) from pinecone import Pinecone from openai import OpenAI

Initialize clients

pc = Pinecone(api_key="PINECONE_API_KEY") openai_client = OpenAI(api_key="OPENAI_API_KEY")

index = pc.Index("production-search")

1. Generate query embedding

query_text = "What are the benefits of vector databases?" response = openai_client.embeddings.create( input=query_text, model="text-embedding-3-small" ) query_embedding = response.data[0].embedding

2. Search for similar vectors

results = index.query( vector=query_embedding, top_k=10, namespace="documents", include_values=False, include_metadata=True )

3. Process results

print(f"Found {len(results.matches)} results") for match in results.matches: print(f"ID: {match.id}") print(f"Score: {match.score:.4f}") print(f"Title: {match.metadata.get('title')}") print(f"Category: {match.metadata.get('category')}") print("---")

Search by ID (Query by Example)

Search using an existing vector as query

results = index.query( id="existing-doc-id", # Use this document as the query top_k=10, namespace="documents", include_metadata=True )

Useful for "find similar items" features

print(f"Documents similar to {results.matches[0].metadata.get('title')}:") for match in results.matches[1:]: # Skip first (self) print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")

Multi-vector Search (Pinecone)

Search multiple query vectors in one request

query_embeddings = [ [0.1, 0.2, ...], # Query 1 [0.3, 0.4, ...], # Query 2 [0.5, 0.6, ...] # Query 3 ]

results = index.query( queries=query_embeddings, top_k=5, namespace="documents", include_metadata=True )

Process results for each query

for i, query_results in enumerate(results): print(f"\nResults for query {i+1}:") for match in query_results.matches: print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")

Similarity Search (Weaviate) import weaviate from weaviate.classes.query import MetadataQuery

client = weaviate.connect_to_local() documents = client.collections.get("Documents")

1. Near text search (semantic)

response = documents.query.near_text( query="vector database performance optimization", limit=10, return_metadata=MetadataQuery(distance=True, certainty=True) )

for obj in response.objects: print(f"Title: {obj.properties['title']}") print(f"Distance: {obj.metadata.distance:.4f}") print(f"Certainty: {obj.metadata.certainty:.4f}") print("---")

2. Near vector search (with custom embedding)

response = documents.query.near_vector( near_vector=[0.1, 0.2, 0.3, ...], limit=10 )

3. Near object search (find similar to existing object)

response = documents.query.near_object( near_object="uuid-of-reference-object", limit=10 )

Similarity Search (Chroma) import chromadb

client = chromadb.PersistentClient(path="./chroma_db") collection = client.get_collection("documents")

1. Query with text (auto-embedding)

results = collection.query( query_texts=["What is machine learning?"], n_results=10, include=["documents", "metadatas", "distances"] )

print(f"Found {len(results['ids'][0])} results") for i, doc_id in enumerate(results['ids'][0]): print(f"ID: {doc_id}") print(f"Distance: {results['distances'][0][i]:.4f}") print(f"Document: {results['documents'][0][i][:100]}...") print(f"Metadata: {results['metadatas'][0][i]}") print("---")

2. Query with custom embedding

results = collection.query( query_embeddings=[[0.1, 0.2, 0.3, ...]], n_results=10 )

Metadata Filtering Pinecone Metadata Filters from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("production-search")

1. Equality filter

results = index.query( vector=query_embedding, top_k=10, filter={"category": {"$eq": "education"}}, include_metadata=True )

2. Inequality filter

results = index.query( vector=query_embedding, top_k=10, filter={"year": {"$ne": 2023}}, include_metadata=True )

3. Range filters

results = index.query( vector=query_embedding, top_k=10, filter={ "$and": [ {"year": {"$gte": 2020}}, {"year": {"$lte": 2024}} ] }, include_metadata=True )

4. In/Not-in filters

results = index.query( vector=query_embedding, top_k=10, filter={ "category": {"$in": ["education", "tutorial", "guide"]} }, include_metadata=True )

5. Existence check

results = index.query( vector=query_embedding, top_k=10, filter={"author": {"$exists": True}}, include_metadata=True )

6. Complex AND/OR queries

results = index.query( vector=query_embedding, top_k=10, filter={ "$and": [ {"category": {"$eq": "education"}}, { "$or": [ {"year": {"$eq": 2024}}, {"featured": {"$eq": True}} ] }, {"tags": {"$in": ["ml", "ai"]}} ] }, include_metadata=True )

7. Greater than/Less than

results = index.query( vector=query_embedding, top_k=10, filter={ "view_count": {"$gt": 1000}, "rating": {"$gte": 4.5} }, include_metadata=True )

Production Metadata Filter Patterns

Pattern 1: Time-based filtering (recent content)

from datetime import datetime, timedelta

def search_recent_documents(query_text: str, days: int = 30): """Search only documents from last N days.""" cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()

results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter={
        "created_at": {"$gte": cutoff_date}
    },
    include_metadata=True
)
return results

Pattern 2: User permission filtering

def search_with_permissions(query_text: str, user_id: str, user_roles: list): """Search only documents user has access to.""" results = index.query( vector=generate_embedding(query_text), top_k=10, filter={ "$or": [ {"owner_id": {"$eq": user_id}}, {"shared_with": {"$in": [user_id]}}, {"public": {"$eq": True}}, {"required_roles": {"$in": user_roles}} ] }, include_metadata=True ) return results

Pattern 3: Multi-tenant filtering

def search_tenant_documents(query_text: str, tenant_id: str, category: str = None): """Search within a specific tenant's data.""" filter_dict = {"tenant_id": {"$eq": tenant_id}}

if category:
    filter_dict["category"] = {"$eq": category}

results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter=filter_dict,
    include_metadata=True
)
return results

Pattern 4: Faceted search

def faceted_search(query_text: str, facets: dict): """Search with multiple facet filters.""" filter_conditions = []

for field, values in facets.items():
    if isinstance(values, list):
        filter_conditions.append({field: {"$in": values}})
    else:
        filter_conditions.append({field: {"$eq": values}})

results = index.query(
    vector=generate_embedding(query_text),
    top_k=10,
    filter={"$and": filter_conditions} if filter_conditions else {},
    include_metadata=True
)
return results

Usage

results = faceted_search( "machine learning tutorials", facets={ "category": ["education", "tutorial"], "difficulty": "beginner", "language": ["english", "spanish"] } )

Weaviate Metadata Filtering import weaviate from weaviate.classes.query import Filter

client = weaviate.connect_to_local() documents = client.collections.get("Documents")

1. Simple equality filter

response = documents.query.near_text( query="vector databases", limit=10, filters=Filter.by_property("category").equal("education") )

2. Greater than filter

response = documents.query.near_text( query="machine learning", limit=10, filters=Filter.by_property("year").greater_than(2020) )

3. Contains any filter

response = documents.query.near_text( query="AI tutorials", limit=10, filters=Filter.by_property("tags").contains_any(["ml", "ai", "deep-learning"]) )

4. Complex AND/OR filters

response = documents.query.near_text( query="database optimization", limit=10, filters=( Filter.by_property("category").equal("tutorial") & (Filter.by_property("difficulty").equal("beginner") | Filter.by_property("featured").equal(True)) ) )

Chroma Metadata Filtering import chromadb

client = chromadb.PersistentClient(path="./chroma_db") collection = client.get_collection("documents")

1. Simple equality filter

results = collection.query( query_texts=["vector databases"], n_results=10, where={"category": "education"} )

2. AND conditions

results = collection.query( query_texts=["machine learning"], n_results=10, where={ "$and": [ {"category": "education"}, {"difficulty": "beginner"} ] } )

3. OR conditions

results = collection.query( query_texts=["AI tutorials"], n_results=10, where={ "$or": [ {"category": "education"}, {"category": "tutorial"} ] } )

4. Greater than/Less than

results = collection.query( query_texts=["recent content"], n_results=10, where={"year": {"$gte": 2023}} )

5. In operator

results = collection.query( query_texts=["programming guides"], n_results=10, where={"language": {"$in": ["python", "javascript", "go"]}} )

Hybrid Search Pinecone Hybrid Search (Dense + Sparse) from pinecone import Pinecone from typing import Dict, List import re from collections import Counter

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("hybrid-search-index")

def create_sparse_vector(text: str, top_k: int = 100) -> Dict: """Create sparse vector using simple TF approach.""" # Tokenize tokens = re.findall(r'\w+', text.lower())

# Calculate term frequencies
tf = Counter(tokens)

# Create vocabulary mapping
vocab = {word: hash(word) % 10000 for word in set(tokens)}

# Get top-k terms
top_terms = tf.most_common(top_k)

# Create sparse vector
indices = [vocab[term] for term, _ in top_terms]
values = [float(freq) / len(tokens) for _, freq in top_terms]

return {
    "indices": indices,
    "values": values
}

def hybrid_search(query_text: str, top_k: int = 10, alpha: float = 0.5): """ Perform hybrid search combining dense and sparse vectors. alpha: weight for dense search (0.0 = sparse only, 1.0 = dense only) """ # Generate dense vector dense_vector = generate_embedding(query_text)

# Generate sparse vector
sparse_vector = create_sparse_vector(query_text)

# Hybrid query
results = index.query(
    vector=dense_vector,
    sparse_vector=sparse_vector,
    top_k=top_k,
    include_metadata=True
)

return results

Example usage

results = hybrid_search("machine learning vector databases", top_k=10) for match in results.matches: print(f"{match.metadata['title']}: {match.score:.4f}")

Weaviate Hybrid Search import weaviate

client = weaviate.connect_to_local() documents = client.collections.get("Documents")

Hybrid search (combines dense vector + BM25 keyword search)

response = documents.query.hybrid( query="vector database performance", limit=10, alpha=0.5, # 0 = pure keyword, 1 = pure vector, 0.5 = balanced fusion_type="rankedFusion" # or "relativeScore" )

for obj in response.objects: print(f"Title: {obj.properties['title']}") print(f"Score: {obj.metadata.score}") print("---")

Hybrid search with filters

response = documents.query.hybrid( query="machine learning tutorials", limit=10, alpha=0.7, # Favor semantic search filters=Filter.by_property("category").equal("education") )

Hybrid search with custom vector

response = documents.query.hybrid( query="custom query", vector=[0.1, 0.2, 0.3, ...], # Your pre-computed vector limit=10, alpha=0.5 )

BM25 + Vector Hybrid (Custom Implementation) from rank_bm25 import BM25Okapi import numpy as np

class HybridSearchEngine: """Custom hybrid search combining BM25 and vector search."""

def __init__(self, index, documents: List[Dict]):
    self.index = index
    self.documents = documents

    # Build BM25 index
    tokenized_docs = [doc['content'].lower().split() for doc in documents]
    self.bm25 = BM25Okapi(tokenized_docs)
    self.doc_ids = [doc['id'] for doc in documents]

def search(self, query: str, top_k: int = 10, alpha: float = 0.5):
    """
    Hybrid search with custom score fusion.
    alpha: weight for vector search (1-alpha for BM25)
    """
    # 1. Vector search
    query_embedding = generate_embedding(query)
    vector_results = self.index.query(
        vector=query_embedding,
        top_k=top_k * 2,  # Get more candidates
        include_metadata=True
    )

    # 2. BM25 search
    tokenized_query = query.lower().split()
    bm25_scores = self.bm25.get_scores(tokenized_query)

    # 3. Normalize scores
    vector_scores = {
        m.id: m.score for m in vector_results.matches
    }
    max_vec_score = max(vector_scores.values()) if vector_scores else 1.0

    max_bm25_score = max(bm25_scores) if max(bm25_scores) > 0 else 1.0

    # 4. Combine scores
    hybrid_scores = {}
    all_ids = set(vector_scores.keys()) | set(self.doc_ids)

    for doc_id in all_ids:
        vec_score = vector_scores.get(doc_id, 0) / max_vec_score
        idx = self.doc_ids.index(doc_id) if doc_id in self.doc_ids else -1
        bm25_score = bm25_scores[idx] / max_bm25_score if idx >= 0 else 0

        hybrid_scores[doc_id] = (alpha * vec_score) + ((1 - alpha) * bm25_score)

    # 5. Rank and return top-k
    ranked = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
    return ranked[:top_k]

Usage

engine = HybridSearchEngine(index, documents) results = engine.search("machine learning databases", top_k=10, alpha=0.7)

Namespace & Collection Management Pinecone Namespaces from pinecone import Pinecone

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("production-search")

1. Upsert to specific namespace

index.upsert( vectors=[ {"id": "doc1", "values": [...], "metadata": {...}} ], namespace="production" )

2. Query specific namespace

results = index.query( vector=[...], top_k=10, namespace="production", include_metadata=True )

3. Get namespace statistics

stats = index.describe_index_stats() for namespace, info in stats.namespaces.items(): print(f"Namespace: {namespace}") print(f" Vector count: {info.vector_count}")

4. Delete all vectors in namespace

index.delete(delete_all=True, namespace="test")

5. Multi-namespace architecture

NAMESPACES = { "production": "Live user-facing data", "staging": "Testing before production", "development": "Development and experiments", "archive": "Historical data" }

def upsert_with_environment(vectors, environment="production"): """Upsert to appropriate namespace.""" namespace = environment if environment in NAMESPACES else "development" index.upsert(vectors=vectors, namespace=namespace)

def search_across_namespaces(query_vector, namespaces=["production", "archive"]): """Search multiple namespaces and combine results.""" all_results = []

for ns in namespaces:
    results = index.query(
        vector=query_vector,
        top_k=10,
        namespace=ns,
        include_metadata=True
    )
    for match in results.matches:
        match.metadata["source_namespace"] = ns
        all_results.append(match)

# Sort by score
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:10]

Weaviate Collections import weaviate from weaviate.classes.config import Configure

client = weaviate.connect_to_local()

1. Create multiple collections

collections_config = [ { "name": "Products", "properties": ["name", "description", "category", "price"] }, { "name": "Users", "properties": ["username", "bio", "interests"] }, { "name": "Reviews", "properties": ["content", "rating", "product_id", "user_id"] } ]

for config in collections_config: try: client.collections.create( name=config["name"], vectorizer_config=Configure.Vectorizer.text2vec_openai() ) except Exception as e: print(f"Collection {config['name']} exists: {e}")

2. Cross-collection references

client.collections.create( name="Orders", references=[ weaviate.classes.config.ReferenceProperty( name="hasProduct", target_collection="Products" ), weaviate.classes.config.ReferenceProperty( name="byUser", target_collection="Users" ) ] )

3. Multi-collection search

def search_all_collections(query: str): """Search across multiple collections.""" results = {}

for collection_name in ["Products", "Users", "Reviews"]:
    collection = client.collections.get(collection_name)
    response = collection.query.near_text(
        query=query,
        limit=5
    )
    results[collection_name] = response.objects

return results

4. Delete collection

client.collections.delete("TestCollection")

Chroma Collections import chromadb

client = chromadb.PersistentClient(path="./chroma_db")

1. Create multiple collections

collections = { "documents": { "metadata": {"description": "Document embeddings"}, "embedding_function": None # Use default }, "images": { "metadata": {"description": "Image embeddings"}, "embedding_function": None }, "code": { "metadata": {"description": "Code snippets"}, "embedding_function": None } }

for name, config in collections.items(): collection = client.get_or_create_collection( name=name, metadata=config["metadata"] )

2. List all collections

all_collections = client.list_collections() for coll in all_collections: print(f"Collection: {coll.name}") print(f" Count: {coll.count()}") print(f" Metadata: {coll.metadata}")

3. Collection-specific operations

docs_collection = client.get_collection("documents") docs_collection.add( documents=["Document text..."], metadatas=[{"type": "article"}], ids=["doc1"] )

4. Delete collection

client.delete_collection("test_collection")

5. Multi-collection search

def search_all_collections(query: str, n_results: int = 5): """Search across all collections.""" results = {}

for collection in client.list_collections():
    try:
        collection_results = collection.query(
            query_texts=[query],
            n_results=n_results
        )
        results[collection.name] = collection_results
    except Exception as e:
        print(f"Error searching {collection.name}: {e}")

return results

Performance & Scaling Batch Operations Best Practices from pinecone import Pinecone from typing import List, Dict import asyncio from concurrent.futures import ThreadPoolExecutor import time

pc = Pinecone(api_key="YOUR_API_KEY") index = pc.Index("production-search")

1. Optimal batch size

OPTIMAL_BATCH_SIZE = 100 # Pinecone recommendation

def batch_upsert(vectors: List[Dict], batch_size: int = OPTIMAL_BATCH_SIZE): """Efficiently upsert vectors in batches.""" total_batches = (len(vectors) + batch_size - 1) // batch_size

for i in range(0, len(vectors), batch_size):
    batch = vectors[i:i + batch_size]
    index.upsert(vectors=batch, namespace="documents")

    if (i // batch_size + 1) % 10 == 0:
        print(f"Processed {i // batch_size + 1}/{total_batches} batches")

2. Parallel batch upsert

def parallel_batch_upsert(vectors: List[Dict], num_workers: int = 4): """Parallel upsert using thread pool.""" batch_size = 100 batches = [ vectors[i:i + batch_size] for i in range(0, len(vectors), batch_size) ]

def upsert_batch(batch):
    try:
        index.upsert(vectors=batch, namespace="documents")
        return len(batch)
    except Exception as e:
        print(f"Error upserting batch: {e}")
        return 0

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    results = list(executor.map(upsert_batch, batches))

print(f"Successfully upserted {sum(results)} vectors")

3. Rate limiting for API calls

class RateLimiter: """Simple rate limiter for API calls."""

def __init__(self, max_calls: int, time_window: float):
    self.max_calls = max_calls
    self.time_window = time_window
    self.calls = []

def wait_if_needed(self):
    """Wait if rate limit would be exceeded."""
    now = time.time()
    # Remove old calls outside time window
    self.calls = [call_time for call_time in self.calls
                  if now - call_time < self.time_window]

    if len(self.calls) >= self.max_calls:
        sleep_time = self.time_window - (now - self.calls[0])
        if sleep_time > 0:
            time.sleep(sleep_time)
            self.calls = []

    self.calls.append(now)

Usage

rate_limiter = RateLimiter(max_calls=100, time_window=60) # 100 calls/minute

for batch in batches: rate_limiter.wait_if_needed() index.upsert(vectors=batch)

4. Bulk delete optimization

def bulk_delete_by_filter(filter_dict: Dict, namespace: str = "documents"): """Delete vectors matching filter (more efficient than individual deletes).""" # First, get IDs matching filter results = index.query( vector=[0] * 1536, # Dummy vector top_k=10000, # Max allowed filter=filter_dict, namespace=namespace, include_values=False )

ids_to_delete = [match.id for match in results.matches]

# Delete in batches
batch_size = 1000
for i in range(0, len(ids_to_delete), batch_size):
    batch = ids_to_delete[i:i + batch_size]
    index.delete(ids=batch, namespace=namespace)
    print(f"Deleted {len(batch)} vectors")

Query Optimization

1. Minimize data transfer

results = index.query( vector=query_vector, top_k=10, include_values=False, # Don't return vectors if not needed include_metadata=False, # Don't return metadata if not needed namespace="documents" )

2. Use appropriate top_k

Smaller top_k = faster queries

results_small = index.query(vector=query_vector, top_k=10) # Fast results_large = index.query(vector=query_vector, top_k=1000) # Slower

3. Filter before vector search when possible

Good: Reduces search space

results = index.query( vector=query_vector, top_k=10, filter={"category": "education"}, # Reduces candidates namespace="documents" )

4. Batch queries when possible

More efficient than individual queries

queries = [embedding1, embedding2, embedding3] results = index.query( queries=queries, top_k=10, namespace="documents" )

5. Cache frequent queries

from functools import lru_cache import hashlib import json

def vector_hash(vector: List[float]) -> str: """Create hash of vector for caching.""" return hashlib.md5(json.dumps(vector).encode()).hexdigest()

class CachedIndex: """Wrapper with query caching."""

def __init__(self, index, cache_size: int = 1000):
    self.index = index
    self.cache = {}
    self.cache_size = cache_size

def query(self, vector: List[float], top_k: int = 10, **kwargs):
    """Query with caching."""
    cache_key = f"{vector_hash(vector)}_{top_k}_{json.dumps(kwargs)}"

    if cache_key in self.cache:
        return self.cache[cache_key]

    results = self.index.query(vector=vector, top_k=top_k, **kwargs)

    if len(self.cache) >= self.cache_size:
        # Remove oldest entry
        self.cache.pop(next(iter(self.cache)))

    self.cache[cache_key] = results
    return results

Usage

cached_index = CachedIndex(index) results = cached_index.query(query_vector, top_k=10) # Cached on subsequent calls

Scaling Strategies

1. Index sizing for scale

def calculate_index_requirements( num_vectors: int, dimension: int, metadata_size_per_vector: int = 1024 # bytes ) -> Dict: """Calculate storage and cost for index.""" # Approximate calculations vector_size = dimension * 4 # 4 bytes per float32 total_vector_storage = num_vectors * vector_size total_metadata_storage = num_vectors * metadata_size_per_vector total_storage = total_vector_storage + total_metadata_storage

# Pinecone pricing (approximate)
storage_cost_per_gb_month = 0.095  # Serverless pricing
total_gb = total_storage / (1024 ** 3)
monthly_storage_cost = total_gb * storage_cost_per_gb_month

return {
    "num_vectors": num_vectors,
    "total_storage_gb": round(total_gb, 2),
    "monthly_storage_cost_usd": round(monthly_storage_cost, 2),
    "recommended_pod_type": "s1.x1" if num_vectors > 10_000_000 else "p1.x1"
}

Example

reqs = calculate_index_requirements( num_vectors=10_000_000, dimension=1536 ) print(f"10M vectors storage: {reqs['total_storage_gb']} GB") print(f"Monthly cost: ${reqs['monthly_storage_cost_usd']}")

2. Sharding strategy for massive scale

def create_sharded_indexes( base_name: str, num_shards: int, dimension: int, metric: str = "cosine" ): """Create multiple indexes for horizontal scaling.""" indexes = []

for shard_id in range(num_shards):
    index_name = f"{base_name}-shard-{shard_id}"

    pc.create_index(
        name=index_name,
        dimension=dimension,
        metric=metric,
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )

    indexes.append(index_name)

return indexes

def route_to_shard(vector_id: str, num_shards: int) -> int: """Determine which shard a vector belongs to.""" return hash(vector_id) % num_shards

def query_sharded_indexes(query_vector: List[float], indexes: List, top_k: int = 10): """Query all shards and merge results.""" all_results = []

for index_name in indexes:
    idx = pc.Index(index_name)
    results = idx.query(
        vector=query_vector,
        top_k=top_k,
        include_metadata=True
    )
    all_results.extend(results.matches)

# Sort by score and return top_k
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:top_k]

Production Best Practices Error Handling & Retries import time from typing import Optional, Callable import logging

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)

class PineconeRetryHandler: """Robust error handling for Pinecone operations."""

def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
    self.max_retries = max_retries
    self.base_delay = base_delay

def retry_with_backoff(
    self,
    operation: Callable,
    *args,
    **kwargs
) -> Optional[any]:
    """Retry operation with exponential backoff."""
    for attempt in range(self.max_retries):
        try:
            return operation(*args, **kwargs)
        except Exception as e:
            if attempt == self.max_retries - 1:
                logger.error(f"Operation failed after {self.max_retries} attempts: {e}")
                raise

            delay = self.base_delay * (2 ** attempt)
            logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
            time.sleep(delay)

    return None

Usage

retry_handler = PineconeRetryHandler(max_retries=3)

Upsert with retry

def safe_upsert(vectors, namespace="documents"): return retry_handler.retry_with_backoff( index.upsert, vectors=vectors, namespace=namespace )

Query with retry

def safe_query(vector, top_k=10, kwargs): return retry_handler.retry_with_backoff( index.query, vector=vector, top_k=top_k, kwargs )

Example

try: results = safe_query(query_vector, top_k=10, include_metadata=True) except Exception as e: logger.error(f"Query failed permanently: {e}")

Monitoring & Observability import time from dataclasses import dataclass from typing import Dict, List from datetime import datetime

@dataclass class QueryMetrics: """Track query performance metrics.""" query_time: float result_count: int top_score: float timestamp: datetime namespace: str filter_used: bool

class VectorDBMonitor: """Monitor vector database operations."""

def __init__(self):
    self.metrics: List[QueryMetrics] = []

def track_query(
    self,
    query_func: Callable,
    *args,
    **kwargs
):
    """Track query execution and metrics."""
    start_time = time.time()
    results = query_func(*args, **kwargs)
    elapsed = time.time() - start_time

    metrics = QueryMetrics(
        query_time=elapsed,
        result_count=len(results.matches),
        top_score=results.matches[0].score if results.matches else 0.0,
        timestamp=datetime.now(),
        namespace=kwargs.get('namespace', 'default'),
        filter_used='filter' in kwargs
    )

    self.metrics.append(metrics)

    # Alert on slow queries
    if elapsed > 1.0:  # 1 second threshold
        logger.warning(f"Slow query detected: {elapsed:.2f}s")

    return results

def get_stats(self) -> Dict:
    """Get aggregate statistics."""
    if not self.metrics:
        return {}

    query_times = [m.query_time for m in self.metrics]

    return {
        "total_queries": len(self.metrics),
        "avg_query_time": sum(query_times) / len(query_times),
        "p95_query_time": sorted(query_times)[int(len(query_times) * 0.95)],
        "p99_query_time": sorted(query_times)[int(len(query_times) * 0.99)],
        "avg_results": sum(m.result_count for m in self.metrics) / len(self.metrics),
        "filtered_queries_pct": sum(1 for m in self.metrics if m.filter_used) / len(self.metrics) * 100
    }

Usage

monitor = VectorDBMonitor()

Wrap queries

results = monitor.track_query( index.query, vector=query_vector, top_k=10, namespace="documents", filter={"category": "education"} )

Get statistics

stats = monitor.get_stats() print(f"Average query time: {stats['avg_query_time']:.3f}s") print(f"P95 query time: {stats['p95_query_time']:.3f}s")

Data Validation from typing import List, Dict import numpy as np

class VectorValidator: """Validate vectors and metadata before operations."""

def __init__(self, expected_dimension: int):
    self.expected_dimension = expected_dimension

def validate_vector(self, vector: List[float]) -> tuple[bool, str]:
    """Validate vector format and content."""
    # Check type
    if not isinstance(vector, (list, np.ndarray)):
        return False, "Vector must be list or numpy array"

    # Check dimension
    if len(vector) != self.expected_dimension:
        return False, f"Expected {self.expected_dimension} dimensions, got {len(vector)}"

    # Check for NaN or Inf
    if any(not np.isfinite(v) for v in vector):
        return False, "Vector contains NaN or Inf values"

    # Check for zero vector
    if all(v == 0 for v in vector):
        return False, "Zero vector not allowed"

    return True, "Valid"

def validate_metadata(self, metadata: Dict) -> tuple[bool, str]:
    """Validate metadata format."""
    # Check type
    if not isinstance(metadata, dict):
        return False, "Metadata must be dictionary"

    # Check metadata size (Pinecone limit: 40KB)
    metadata_str = str(metadata)
    if len(metadata_str.encode('utf-8')) > 40_000:
        return False, "Metadata exceeds 40KB limit"

    # Check for required fields (customize as needed)
    required_fields = ["title", "category"]
    for field in required_fields:
        if field not in metadata:
            return False, f"Missing required field: {field}"

    return True, "Valid"

def validate_batch(
    self,
    vectors: List[Dict]
) -> tuple[List[Dict], List[str]]:
    """Validate batch of vectors, return valid ones and errors."""
    valid_vectors = []
    errors = []

    for i, item in enumerate(vectors):
        # Validate vector
        is_valid, error = self.validate_vector(item.get('values', []))
        if not is_valid:
            errors.append(f"Vector {i} ({item.get('id', 'unknown')}): {error}")
            continue

        # Validate metadata
        if 'metadata' in item:
            is_valid, error = self.validate_metadata(item['metadata'])
            if not is_valid:
                errors.append(f"Metadata {i} ({item.get('id', 'unknown')}): {error}")
                continue

        valid_vectors.append(item)

    return valid_vectors, errors

Usage

validator = VectorValidator(expected_dimension=1536)

Validate single vector

is_valid, error = validator.validate_vector(embedding) if not is_valid: print(f"Invalid vector: {error}")

Validate batch

valid_vectors, errors = validator.validate_batch(vectors_to_upsert) if errors: for error in errors: logger.error(error)

Upsert only valid vectors

if valid_vectors: index.upsert(vectors=valid_vectors)

Backup & Disaster Recovery import json import gzip from datetime import datetime from pathlib import Path

class VectorDBBackup: """Backup and restore vector database data."""

def __init__(self, index, backup_dir: str = "./backups"):
    self.index = index
    self.backup_dir = Path(backup_dir)
    self.backup_dir.mkdir(exist_ok=True)

def backup_namespace(
    self,
    namespace: str = "documents",
    compress: bool = True
) -> str:
    """Backup all vectors in a namespace."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"backup_{namespace}_{timestamp}.json"

    if compress:
        filename += ".gz"

    filepath = self.backup_dir / filename

    # Fetch all vectors (in batches)
    all_vectors = []
    batch_size = 100

    # Get all IDs first (would need to be tracked separately)
    # This is a simplified example
    stats = self.index.describe_index_stats()

    # For actual implementation, you'd need to track IDs
    # or use fetch with known IDs

    # Save to file
    data = {
        "namespace": namespace,
        "timestamp": timestamp,
        "vector_count": len(all_vectors),
        "vectors": all_vectors
    }

    if compress:
        with gzip.open(filepath, 'wt', encoding='utf-8') as f:
            json.dump(data, f)
    else:
        with open(filepath, 'w') as f:
            json.dump(data, f, indent=2)

    logger.info(f"Backed up {len(all_vectors)} vectors to {filepath}")
    return str(filepath)

def restore_from_backup(
    self,
    backup_file: str,
    target_namespace: str = None
):
    """Restore vectors from backup file."""
    filepath = Path(backup_file)

    # Load backup
    if filepath.suffix == '.gz':
        with gzip.open(filepath, 'rt', encoding='utf-8') as f:
            data = json.load(f)
    else:
        with open(filepath, 'r') as f:
            data = json.load(f)

    namespace = target_namespace or data['namespace']
    vectors = data['vectors']

    # Restore in batches
    batch_size = 100
    for i in range(0, len(vectors), batch_size):
        batch = vectors[i:i + batch_size]
        self.index.upsert(vectors=batch, namespace=namespace)
        logger.info(f"Restored {len(batch)} vectors")

    logger.info(f"Restored {len(vectors)} vectors to namespace '{namespace}'")

Usage

backup_manager = VectorDBBackup(index)

Backup

backup_file = backup_manager.backup_namespace("production")

Restore

backup_manager.restore_from_backup(backup_file, target_namespace="production-restored")

Cost Optimization Storage Optimization

1. Reduce metadata size

Bad: Storing full content in metadata

bad_metadata = { "title": "Long document title", "full_content": "......", # Wastes space "description": "......", "extra_field_1": "...", "extra_field_2": "..." }

Good: Store only necessary metadata

good_metadata = { "title": "Long document title", "doc_id": "doc-123", # Reference to external store "category": "education", "created_at": "2024-01-15" }

2. Use selective metadata indexing

Only index fields you'll filter on

pc.create_index( name="optimized-index", dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1", schema={ "fields": { "category": {"filterable": True}, # Need to filter "created_at": {"filterable": True}, # Need to filter "title": {"filterable": False}, # Just for display "description": {"filterable": False} # Just for display } } ) )

3. Regular cleanup of unused vectors

def cleanup_old_vectors(days_old: int = 90): """Delete vectors older than specified days.""" from datetime import datetime, timedelta

cutoff_date = (datetime.now() - timedelta(days=days_old)).isoformat()

# Delete by filter
index.delete(
    filter={"created_at": {"$lt": cutoff_date}},
    namespace="documents"
)

4. Compress dimensions for smaller models

text-embedding-3-small: 1536 dimensions

all-MiniLM-L6-v2: 384 dimensions (75% storage reduction)

Trade-off: slightly lower accuracy for significant cost savings

Query Cost Optimization

1. Batch queries instead of individual

Bad: Multiple individual queries

for query in queries: results = index.query(vector=query, top_k=10) # N API calls

Good: Single batch query

results = index.query( queries=query_vectors, # 1 API call top_k=10 )

2. Use appropriate top_k

Larger top_k = more expensive

results = index.query( vector=query_vector, top_k=10, # Usually sufficient # top_k=1000 # Much more expensive )

3. Minimize data transfer

results = index.query( vector=query_vector, top_k=10, include_values=False, # Save bandwidth include_metadata=False # Save bandwidth if not needed )

4. Use caching for repeated queries

from functools import lru_cache

@lru_cache(maxsize=1000) def cached_search(query_text: str, top_k: int = 10): """Cache search results for identical queries.""" embedding = generate_embedding(query_text) results = index.query( vector=embedding, top_k=top_k, include_metadata=True ) return results

5. Choose serverless vs pods appropriately

Serverless: Low/variable traffic (pay per query)

Pods: High consistent traffic (fixed cost)

def choose_deployment_type( queries_per_month: int, avg_response_time_requirement: float = 100 # ms ) -> str: """Recommend deployment type based on usage.""" # Rough cost calculations (update with current pricing) serverless_cost_per_query = 0.0001 # Example pod_cost_per_month = 70 # p1.x1 pod

serverless_monthly_cost = queries_per_month * serverless_cost_per_query

if serverless_monthly_cost < pod_cost_per_month:
    return "serverless"
else:
    return "pods"

Cost Monitoring import json from datetime import datetime, timedelta from collections import defaultdict

class CostMonitor: """Monitor and estimate vector database costs."""

def __init__(self):
    self.operations = defaultdict(int)
    self.pricing = {
        "serverless_write_units": 0.0000025,  # per write unit
        "serverless_read_units": 0.00000625,  # per read unit
        "serverless_storage_gb": 0.095,  # per GB per month
        "p1_x1_pod": 0.096,  # per hour
        "p2_x1_pod": 0.240,  # per hour
    }

def track_operation(self, operation_type: str, units: int = 1):
    """Track database operations."""
    self.operations[operation_type] += units

def estimate_monthly_cost(
    self,
    deployment_type: str,
    storage_gb: float = 0,
    pod_type: str = None
) -> Dict:
    """Estimate monthly costs."""
    costs = {}

    if deployment_type == "serverless":
        # Storage cost
        storage_cost = storage_gb * self.pricing["serverless_storage_gb"]

        # Operation costs
        write_cost = (
            self.operations["upsert"] *
            self.pricing["serverless_write_units"]
        )
        read_cost = (
            self.operations["query"] *
            self.pricing["serverless_read_units"]
        )

        costs = {
            "storage": storage_cost,
            "writes": write_cost,
            "reads": read_cost,
            "total": storage_cost + write_cost + read_cost
        }

    elif deployment_type == "pods":
        # Fixed pod cost
        hours_per_month = 730
        pod_cost = self.pricing.get(f"{pod_type}_pod", 0) * hours_per_month

        costs = {
            "pod": pod_cost,
            "total": pod_cost
        }

    return costs

def get_cost_report(self) -> str:
    """Generate cost report."""
    report = f"\n{'=' * 50}\n"
    report += "VECTOR DATABASE COST REPORT\n"
    report += f"{'=' * 50}\n\n"
    report += "Operations Summary:\n"

    for operation, count in self.operations.items():
        report += f"  {operation}: {count:,}\n"

    report += f"\n{'=' * 50}\n"
    return report

Usage

cost_monitor = CostMonitor()

Track operations

def monitored_upsert(vectors, kwargs): cost_monitor.track_operation("upsert", len(vectors)) return index.upsert(vectors=vectors, kwargs)

def monitored_query(vector, kwargs): cost_monitor.track_operation("query", 1) return index.query(vector=vector, kwargs)

Get cost estimate

monthly_cost = cost_monitor.estimate_monthly_cost( deployment_type="serverless", storage_gb=10.5 )

print(f"Estimated monthly cost: ${monthly_cost['total']:.2f}") print(cost_monitor.get_cost_report())

Summary

This comprehensive guide covers all aspects of vector database management across Pinecone, Weaviate, and Chroma. Key takeaways:

Choose the right database: Pinecone for production scale, Weaviate for knowledge graphs, Chroma for local development Optimize embeddings: Balance dimension size with accuracy and cost Use metadata filtering: Combine vector similarity with structured filtering for powerful search Implement hybrid search: Combine dense and sparse vectors for best results Scale efficiently: Use batching, caching, and appropriate index configurations Monitor and optimize costs: Track usage and choose the right deployment type

For more information:

Pinecone Documentation: https://docs.pinecone.io Weaviate Documentation: https://weaviate.io/developers/weaviate Chroma Documentation: https://docs.trychroma.com

返回排行榜