Azure Cosmos DB Skill
Load with: base.md + [typescript.md | python.md]
Azure Cosmos DB is a globally distributed, multi-model database with guaranteed low latency, elastic scalability, and multiple consistency models.
Sources: Cosmos DB Docs | Partitioning | SDK
Core Principle
Choose partition key wisely, design for your access patterns, understand consistency tradeoffs.
Cosmos DB distributes data across partitions. Your partition key choice determines scalability, performance, and cost. Design for even distribution and query efficiency.
Cosmos DB APIs API Use Case NoSQL (Core) Document database, most flexible MongoDB MongoDB wire protocol compatible PostgreSQL Distributed PostgreSQL (Citus) Apache Cassandra Wide-column store Apache Gremlin Graph database Table Key-value (Azure Table Storage compatible)
This skill focuses on NoSQL (Core) API - the most common choice.
Key Concepts Concept Description Container Collection of items (like a table) Item Single document/record (JSON) Partition Key Determines data distribution Logical Partition Items with same partition key Physical Partition Storage unit (max 50GB, 10K RU/s) RU (Request Unit) Throughput currency Partition Key Design Good Partition Keys // High cardinality, even distribution, used in queries
// E-commerce: userId for user data { "id": "order-123", "userId": "user-456", ... } // PK: /userId
// Multi-tenant: tenantId { "id": "doc-1", "tenantId": "tenant-abc", ... } // PK: /tenantId
// IoT: deviceId for telemetry { "id": "reading-1", "deviceId": "device-789", ... } // PK: /deviceId
// Logs: synthetic key (date + category) { "id": "log-1", "partitionKey": "2024-01-15_errors", ... } // PK: /partitionKey
Hierarchical Partition Keys // For multi-level distribution (e.g., tenant → user) // Container created with: /tenantId, /userId
{ "id": "order-123", "tenantId": "acme-corp", "userId": "user-456", "items": [...] }
// Query within tenant and user efficiently
Bad Partition Keys // Avoid: // - Low cardinality (status, type, boolean) // - Monotonically increasing (timestamp, auto-increment) // - Frequently updated fields // - Fields not used in queries
// Bad: Only 3 values → hot partitions
// Bad: All writes go to latest partition
SDK Setup (TypeScript) Install npm install @azure/cosmos
Initialize Client // lib/cosmosdb.ts import { CosmosClient, Database, Container } from '@azure/cosmos';
const endpoint = process.env.COSMOS_ENDPOINT!; const key = process.env.COSMOS_KEY!; const databaseId = process.env.COSMOS_DATABASE!;
const client = new CosmosClient({ endpoint, key });
// Or with connection string // const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);
export const database: Database = client.database(databaseId);
export function getContainer(containerId: string): Container { return database.container(containerId); }
Type Definitions // types/cosmos.ts export interface BaseItem { id: string; _ts?: number; // Auto-generated timestamp _etag?: string; // For optimistic concurrency }
export interface User extends BaseItem { userId: string; // Partition key email: string; name: string; createdAt: string; updatedAt: string; }
export interface Order extends BaseItem { userId: string; // Partition key orderId: string; items: OrderItem[]; total: number; status: 'pending' | 'paid' | 'shipped' | 'delivered'; createdAt: string; }
export interface OrderItem { productId: string; name: string; quantity: number; price: number; }
CRUD Operations Create Item import { getContainer } from './cosmosdb'; import { User } from './types';
const usersContainer = getContainer('users');
async function createUser(data: Omit
const { resource } = await usersContainer.items.create(user); return resource as User; }
Read Item (Point Read)
// Most efficient read - requires id AND partition key
async function getUser(userId: string, id: string): Promise
// If id equals partition key value
async function getUserById(userId: string): Promise
Query Items
// Query within partition (efficient)
async function getUserOrders(userId: string): Promise
const { resources } = await ordersContainer.items
.query
return resources; }
// Cross-partition query (use sparingly)
async function getOrdersByStatus(status: string): Promise
const { resources } = await ordersContainer.items
.query
return resources; }
// Paginated query async function getOrdersPaginated( userId: string, pageSize: number = 10, continuationToken?: string ): Promise<{ items: Order[]; continuationToken?: string }> { const ordersContainer = getContainer('orders');
const queryIterator = ordersContainer.items.query
const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();
return { items: resources, continuationToken: nextToken }; }
Update Item
// Replace entire item
async function updateUser(userId: string, id: string, updates: Partial
const updated: User = { ...existing, ...updates, updatedAt: new Date().toISOString() };
const { resource } = await usersContainer.item(id, userId).replace(updated); return resource as User; }
// Partial update (patch operations)
async function patchUser(userId: string, id: string, operations: any[]): Promise
// Usage: await patchUser('user-123', 'user-123', [ { op: 'set', path: '/name', value: 'New Name' }, { op: 'set', path: '/updatedAt', value: new Date().toISOString() }, { op: 'incr', path: '/loginCount', value: 1 } ]);
Delete Item
async function deleteUser(userId: string, id: string): Promise
Optimistic Concurrency (ETags)
async function updateUserWithETag(
userId: string,
id: string,
updates: Partial
const updated: User = { ...existing, ...updates, updatedAt: new Date().toISOString() };
try { const { resource } = await usersContainer.item(id, userId).replace(updated, { accessCondition: { type: 'IfMatch', condition: etag } }); return resource as User; } catch (error: any) { if (error.code === 412) { throw new Error('Document was modified by another process'); } throw error; } }
Consistency Levels
Level Guarantees Latency Use Case
Strong Linearizable reads Highest Financial, inventory
Bounded Staleness Consistent within bounds High Leaderboards, counters
Session Read your writes Medium User sessions (default)
Consistent Prefix Ordered reads Low Social feeds
Eventual No ordering guarantee Lowest Analytics, logs
Set Consistency Per Request
// Override default consistency
const { resource } = await usersContainer.item(id, userId).read
// For queries const { resources } = await container.items.query( { query: 'SELECT * FROM c' }, { consistencyLevel: 'BoundedStaleness' } ).fetchAll();
Batch Operations
Transactional Batch (Same Partition)
async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise
const operations = [ { operationType: 'Create' as const, resourceBody: order }, ...items.map(item => ({ operationType: 'Create' as const, resourceBody: { ...item, userId, orderId: order.orderId } })) ];
const { result } = await ordersContainer.items.batch(operations, userId);
// Check if any operation failed if (result.some(r => r.statusCode >= 400)) { throw new Error('Batch operation failed'); } }
Bulk Operations
// For large-scale imports (not transactional)
async function bulkImportUsers(users: User[]): Promise
// Process in chunks const chunkSize = 100; for (let i = 0; i < operations.length; i += chunkSize) { const chunk = operations.slice(i, i + chunkSize); await usersContainer.items.bulk(chunk); } }
Change Feed Process Changes import { ChangeFeedStartFrom } from '@azure/cosmos';
async function processChangeFeed(): Promise
const changeFeedIterator = container.items.changeFeed({ changeFeedStartFrom: ChangeFeedStartFrom.Beginning() });
while (changeFeedIterator.hasMoreResults) { const { result: items, statusCode } = await changeFeedIterator.fetchNext();
if (statusCode === 304) {
// No new changes
await sleep(1000);
continue;
}
for (const item of items) {
console.log('Changed item:', item);
// Process the change...
}
} }
// For production, use Change Feed Processor with lease container
Change Feed Processor Pattern
async function startChangeFeedProcessor(): Promise
const changeFeedProcessor = sourceContainer.items.changeFeed .for(item => { // Process each change console.log('Processing:', item); }) .withLeaseContainer(leaseContainer) .build();
await changeFeedProcessor.start(); }
Python SDK Install pip install azure-cosmos
Setup and Operations
cosmos_db.py
import os from azure.cosmos import CosmosClient, PartitionKey from azure.cosmos.exceptions import CosmosResourceNotFoundError from typing import Optional, List from datetime import datetime import uuid
Initialize client
endpoint = os.environ['COSMOS_ENDPOINT'] key = os.environ['COSMOS_KEY'] database_name = os.environ['COSMOS_DATABASE']
client = CosmosClient(endpoint, key) database = client.get_database_client(database_name)
def get_container(container_name: str): return database.get_container_client(container_name)
CRUD Operations
users_container = get_container('users')
def create_user(email: str, name: str, user_id: str = None) -> dict: user_id = user_id or str(uuid.uuid4()) now = datetime.utcnow().isoformat()
user = {
'id': user_id,
'userId': user_id, # Partition key
'email': email,
'name': name,
'createdAt': now,
'updatedAt': now
}
return users_container.create_item(user)
def get_user(user_id: str) -> Optional[dict]: try: return users_container.read_item(item=user_id, partition_key=user_id) except CosmosResourceNotFoundError: return None
def query_users(email_domain: str) -> List[dict]: query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)" parameters = [{'name': '@domain', 'value': email_domain}]
return list(users_container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))
def update_user(user_id: str, **updates) -> dict: user = get_user(user_id) if not user: raise ValueError('User not found')
user.update(updates)
user['updatedAt'] = datetime.utcnow().isoformat()
return users_container.replace_item(item=user_id, body=user)
def delete_user(user_id: str) -> None: users_container.delete_item(item=user_id, partition_key=user_id)
Paginated query
def get_users_paginated(page_size: int = 10, continuation_token: str = None): query = "SELECT * FROM c ORDER BY c.createdAt DESC"
items = users_container.query_items(
query=query,
enable_cross_partition_query=True,
max_item_count=page_size,
continuation_token=continuation_token
)
page = items.by_page()
results = list(next(page))
return {
'items': results,
'continuation_token': page.continuation_token
}
Indexing Custom Indexing Policy { "indexingMode": "consistent", "automatic": true, "includedPaths": [ { "path": "/userId/?" }, { "path": "/status/?" }, { "path": "/createdAt/?" } ], "excludedPaths": [ { "path": "/content/*" }, { "path": "/_etag/?" } ], "compositeIndexes": [ [ { "path": "/userId", "order": "ascending" }, { "path": "/createdAt", "order": "descending" } ] ] }
Create Container with Index await database.containers.createIfNotExists({ id: 'orders', partitionKey: { paths: ['/userId'] }, indexingPolicy: { indexingMode: 'consistent', includedPaths: [ { path: '/userId/?' }, { path: '/status/?' }, { path: '/createdAt/?' } ], excludedPaths: [ { path: '/*' } // Exclude all by default ] } });
Throughput Management Provisioned Throughput // Container level await database.containers.createIfNotExists({ id: 'orders', partitionKey: { paths: ['/userId'] }, throughput: 1000 // RU/s });
// Scale throughput const container = database.container('orders'); await container.throughput.replace(2000);
Autoscale await database.containers.createIfNotExists({ id: 'orders', partitionKey: { paths: ['/userId'] }, maxThroughput: 10000 // Auto-scales 10% to 100% });
Serverless // No throughput configuration needed // Pay per request (good for dev/test, intermittent workloads) await database.containers.createIfNotExists({ id: 'orders', partitionKey: { paths: ['/userId'] } // No throughput = serverless });
CLI Quick Reference
Azure CLI
az cosmosdb create --name myaccount --resource-group mygroup az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup az cosmosdb sql container create \ --account-name myaccount \ --database-name mydb \ --name orders \ --partition-key-path /userId \ --throughput 400
Query
az cosmosdb sql query --account-name myaccount --database-name mydb \ --container-name orders --query "SELECT * FROM c"
Keys
az cosmosdb keys list --name myaccount --resource-group mygroup az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings
Cost Optimization Strategy Impact Right partition key Avoid hot partitions (wasted RUs) Index only what you query Reduce write RU cost Use point reads 1 RU vs 3+ RU for queries Serverless for dev/test Pay per request Autoscale for production Scale down during low traffic TTL for temporary data Auto-delete old items Time-to-Live (TTL) // Enable TTL on container await database.containers.createIfNotExists({ id: 'sessions', partitionKey: { paths: ['/userId'] }, defaultTtl: 3600 // 1 hour });
// Per-item TTL const session = { id: 'session-123', userId: 'user-456', ttl: 1800 // Override: 30 minutes };
Anti-Patterns Bad partition key - Low cardinality causes hot partitions Cross-partition queries - Expensive; design for single-partition queries Over-indexing - Increases write cost; index only queried paths Large items - Max 2MB; store blobs in Azure Blob Storage Ignoring RU cost - Monitor and optimize expensive queries Strong consistency everywhere - Use Session (default) unless required No retry logic - Handle 429 (throttling) with exponential backoff Missing TTL - Set TTL for temporary/session data