azure-cosmosdb

安装量: 70
排名: #11018

安装

npx skills add https://github.com/alinaqi/claude-bootstrap --skill azure-cosmosdb

Azure Cosmos DB Skill

Load with: base.md + [typescript.md | python.md]

Azure Cosmos DB is a globally distributed, multi-model database with guaranteed low latency, elastic scalability, and multiple consistency models.

Sources: Cosmos DB Docs | Partitioning | SDK

Core Principle

Choose partition key wisely, design for your access patterns, understand consistency tradeoffs.

Cosmos DB distributes data across partitions. Your partition key choice determines scalability, performance, and cost. Design for even distribution and query efficiency.

Cosmos DB APIs API Use Case NoSQL (Core) Document database, most flexible MongoDB MongoDB wire protocol compatible PostgreSQL Distributed PostgreSQL (Citus) Apache Cassandra Wide-column store Apache Gremlin Graph database Table Key-value (Azure Table Storage compatible)

This skill focuses on NoSQL (Core) API - the most common choice.

Key Concepts Concept Description Container Collection of items (like a table) Item Single document/record (JSON) Partition Key Determines data distribution Logical Partition Items with same partition key Physical Partition Storage unit (max 50GB, 10K RU/s) RU (Request Unit) Throughput currency Partition Key Design Good Partition Keys // High cardinality, even distribution, used in queries

// E-commerce: userId for user data { "id": "order-123", "userId": "user-456", ... } // PK: /userId

// Multi-tenant: tenantId { "id": "doc-1", "tenantId": "tenant-abc", ... } // PK: /tenantId

// IoT: deviceId for telemetry { "id": "reading-1", "deviceId": "device-789", ... } // PK: /deviceId

// Logs: synthetic key (date + category) { "id": "log-1", "partitionKey": "2024-01-15_errors", ... } // PK: /partitionKey

Hierarchical Partition Keys // For multi-level distribution (e.g., tenant → user) // Container created with: /tenantId, /userId

{ "id": "order-123", "tenantId": "acme-corp", "userId": "user-456", "items": [...] }

// Query within tenant and user efficiently

Bad Partition Keys // Avoid: // - Low cardinality (status, type, boolean) // - Monotonically increasing (timestamp, auto-increment) // - Frequently updated fields // - Fields not used in queries

// Bad: Only 3 values → hot partitions

// Bad: All writes go to latest partition

SDK Setup (TypeScript) Install npm install @azure/cosmos

Initialize Client // lib/cosmosdb.ts import { CosmosClient, Database, Container } from '@azure/cosmos';

const endpoint = process.env.COSMOS_ENDPOINT!; const key = process.env.COSMOS_KEY!; const databaseId = process.env.COSMOS_DATABASE!;

const client = new CosmosClient({ endpoint, key });

// Or with connection string // const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);

export const database: Database = client.database(databaseId);

export function getContainer(containerId: string): Container { return database.container(containerId); }

Type Definitions // types/cosmos.ts export interface BaseItem { id: string; _ts?: number; // Auto-generated timestamp _etag?: string; // For optimistic concurrency }

export interface User extends BaseItem { userId: string; // Partition key email: string; name: string; createdAt: string; updatedAt: string; }

export interface Order extends BaseItem { userId: string; // Partition key orderId: string; items: OrderItem[]; total: number; status: 'pending' | 'paid' | 'shipped' | 'delivered'; createdAt: string; }

export interface OrderItem { productId: string; name: string; quantity: number; price: number; }

CRUD Operations Create Item import { getContainer } from './cosmosdb'; import { User } from './types';

const usersContainer = getContainer('users');

async function createUser(data: Omit): Promise { const now = new Date().toISOString(); const user: User = { id: crypto.randomUUID(), ...data, createdAt: now, updatedAt: now };

const { resource } = await usersContainer.items.create(user); return resource as User; }

Read Item (Point Read) // Most efficient read - requires id AND partition key async function getUser(userId: string, id: string): Promise { try { const { resource } = await usersContainer.item(id, userId).read(); return resource || null; } catch (error: any) { if (error.code === 404) return null; throw error; } }

// If id equals partition key value async function getUserById(userId: string): Promise { try { const { resource } = await usersContainer.item(userId, userId).read(); return resource || null; } catch (error: any) { if (error.code === 404) return null; throw error; } }

Query Items // Query within partition (efficient) async function getUserOrders(userId: string): Promise { const ordersContainer = getContainer('orders');

const { resources } = await ordersContainer.items .query({ query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC', parameters: [{ name: '@userId', value: userId }] }) .fetchAll();

return resources; }

// Cross-partition query (use sparingly) async function getOrdersByStatus(status: string): Promise { const ordersContainer = getContainer('orders');

const { resources } = await ordersContainer.items .query({ query: 'SELECT * FROM c WHERE c.status = @status', parameters: [{ name: '@status', value: status }] }) .fetchAll();

return resources; }

// Paginated query async function getOrdersPaginated( userId: string, pageSize: number = 10, continuationToken?: string ): Promise<{ items: Order[]; continuationToken?: string }> { const ordersContainer = getContainer('orders');

const queryIterator = ordersContainer.items.query( { query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC', parameters: [{ name: '@userId', value: userId }] }, { maxItemCount: pageSize, continuationToken } );

const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();

return { items: resources, continuationToken: nextToken }; }

Update Item // Replace entire item async function updateUser(userId: string, id: string, updates: Partial): Promise { const existing = await getUser(userId, id); if (!existing) throw new Error('User not found');

const updated: User = { ...existing, ...updates, updatedAt: new Date().toISOString() };

const { resource } = await usersContainer.item(id, userId).replace(updated); return resource as User; }

// Partial update (patch operations) async function patchUser(userId: string, id: string, operations: any[]): Promise { const { resource } = await usersContainer.item(id, userId).patch(operations); return resource as User; }

// Usage: await patchUser('user-123', 'user-123', [ { op: 'set', path: '/name', value: 'New Name' }, { op: 'set', path: '/updatedAt', value: new Date().toISOString() }, { op: 'incr', path: '/loginCount', value: 1 } ]);

Delete Item async function deleteUser(userId: string, id: string): Promise { await usersContainer.item(id, userId).delete(); }

Optimistic Concurrency (ETags) async function updateUserWithETag( userId: string, id: string, updates: Partial, etag: string ): Promise { const existing = await getUser(userId, id); if (!existing) throw new Error('User not found');

const updated: User = { ...existing, ...updates, updatedAt: new Date().toISOString() };

try { const { resource } = await usersContainer.item(id, userId).replace(updated, { accessCondition: { type: 'IfMatch', condition: etag } }); return resource as User; } catch (error: any) { if (error.code === 412) { throw new Error('Document was modified by another process'); } throw error; } }

Consistency Levels Level Guarantees Latency Use Case Strong Linearizable reads Highest Financial, inventory Bounded Staleness Consistent within bounds High Leaderboards, counters Session Read your writes Medium User sessions (default) Consistent Prefix Ordered reads Low Social feeds Eventual No ordering guarantee Lowest Analytics, logs Set Consistency Per Request // Override default consistency const { resource } = await usersContainer.item(id, userId).read({ consistencyLevel: 'Strong' });

// For queries const { resources } = await container.items.query( { query: 'SELECT * FROM c' }, { consistencyLevel: 'BoundedStaleness' } ).fetchAll();

Batch Operations Transactional Batch (Same Partition) async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise { const ordersContainer = getContainer('orders');

const operations = [ { operationType: 'Create' as const, resourceBody: order }, ...items.map(item => ({ operationType: 'Create' as const, resourceBody: { ...item, userId, orderId: order.orderId } })) ];

const { result } = await ordersContainer.items.batch(operations, userId);

// Check if any operation failed if (result.some(r => r.statusCode >= 400)) { throw new Error('Batch operation failed'); } }

Bulk Operations // For large-scale imports (not transactional) async function bulkImportUsers(users: User[]): Promise { const operations = users.map(user => ({ operationType: 'Create' as const, resourceBody: user, partitionKey: user.userId }));

// Process in chunks const chunkSize = 100; for (let i = 0; i < operations.length; i += chunkSize) { const chunk = operations.slice(i, i + chunkSize); await usersContainer.items.bulk(chunk); } }

Change Feed Process Changes import { ChangeFeedStartFrom } from '@azure/cosmos';

async function processChangeFeed(): Promise { const container = getContainer('orders');

const changeFeedIterator = container.items.changeFeed({ changeFeedStartFrom: ChangeFeedStartFrom.Beginning() });

while (changeFeedIterator.hasMoreResults) { const { result: items, statusCode } = await changeFeedIterator.fetchNext();

if (statusCode === 304) {
  // No new changes
  await sleep(1000);
  continue;
}

for (const item of items) {
  console.log('Changed item:', item);
  // Process the change...
}

} }

// For production, use Change Feed Processor with lease container

Change Feed Processor Pattern async function startChangeFeedProcessor(): Promise { const sourceContainer = getContainer('orders'); const leaseContainer = getContainer('leases');

const changeFeedProcessor = sourceContainer.items.changeFeed .for(item => { // Process each change console.log('Processing:', item); }) .withLeaseContainer(leaseContainer) .build();

await changeFeedProcessor.start(); }

Python SDK Install pip install azure-cosmos

Setup and Operations

cosmos_db.py

import os from azure.cosmos import CosmosClient, PartitionKey from azure.cosmos.exceptions import CosmosResourceNotFoundError from typing import Optional, List from datetime import datetime import uuid

Initialize client

endpoint = os.environ['COSMOS_ENDPOINT'] key = os.environ['COSMOS_KEY'] database_name = os.environ['COSMOS_DATABASE']

client = CosmosClient(endpoint, key) database = client.get_database_client(database_name)

def get_container(container_name: str): return database.get_container_client(container_name)

CRUD Operations

users_container = get_container('users')

def create_user(email: str, name: str, user_id: str = None) -> dict: user_id = user_id or str(uuid.uuid4()) now = datetime.utcnow().isoformat()

user = {
    'id': user_id,
    'userId': user_id,  # Partition key
    'email': email,
    'name': name,
    'createdAt': now,
    'updatedAt': now
}

return users_container.create_item(user)

def get_user(user_id: str) -> Optional[dict]: try: return users_container.read_item(item=user_id, partition_key=user_id) except CosmosResourceNotFoundError: return None

def query_users(email_domain: str) -> List[dict]: query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)" parameters = [{'name': '@domain', 'value': email_domain}]

return list(users_container.query_items(
    query=query,
    parameters=parameters,
    enable_cross_partition_query=True
))

def update_user(user_id: str, **updates) -> dict: user = get_user(user_id) if not user: raise ValueError('User not found')

user.update(updates)
user['updatedAt'] = datetime.utcnow().isoformat()

return users_container.replace_item(item=user_id, body=user)

def delete_user(user_id: str) -> None: users_container.delete_item(item=user_id, partition_key=user_id)

Paginated query

def get_users_paginated(page_size: int = 10, continuation_token: str = None): query = "SELECT * FROM c ORDER BY c.createdAt DESC"

items = users_container.query_items(
    query=query,
    enable_cross_partition_query=True,
    max_item_count=page_size,
    continuation_token=continuation_token
)

page = items.by_page()
results = list(next(page))

return {
    'items': results,
    'continuation_token': page.continuation_token
}

Indexing Custom Indexing Policy { "indexingMode": "consistent", "automatic": true, "includedPaths": [ { "path": "/userId/?" }, { "path": "/status/?" }, { "path": "/createdAt/?" } ], "excludedPaths": [ { "path": "/content/*" }, { "path": "/_etag/?" } ], "compositeIndexes": [ [ { "path": "/userId", "order": "ascending" }, { "path": "/createdAt", "order": "descending" } ] ] }

Create Container with Index await database.containers.createIfNotExists({ id: 'orders', partitionKey: { paths: ['/userId'] }, indexingPolicy: { indexingMode: 'consistent', includedPaths: [ { path: '/userId/?' }, { path: '/status/?' }, { path: '/createdAt/?' } ], excludedPaths: [ { path: '/*' } // Exclude all by default ] } });

Throughput Management Provisioned Throughput // Container level await database.containers.createIfNotExists({ id: 'orders', partitionKey: { paths: ['/userId'] }, throughput: 1000 // RU/s });

// Scale throughput const container = database.container('orders'); await container.throughput.replace(2000);

Autoscale await database.containers.createIfNotExists({ id: 'orders', partitionKey: { paths: ['/userId'] }, maxThroughput: 10000 // Auto-scales 10% to 100% });

Serverless // No throughput configuration needed // Pay per request (good for dev/test, intermittent workloads) await database.containers.createIfNotExists({ id: 'orders', partitionKey: { paths: ['/userId'] } // No throughput = serverless });

CLI Quick Reference

Azure CLI

az cosmosdb create --name myaccount --resource-group mygroup az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup az cosmosdb sql container create \ --account-name myaccount \ --database-name mydb \ --name orders \ --partition-key-path /userId \ --throughput 400

Query

az cosmosdb sql query --account-name myaccount --database-name mydb \ --container-name orders --query "SELECT * FROM c"

Keys

az cosmosdb keys list --name myaccount --resource-group mygroup az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings

Cost Optimization Strategy Impact Right partition key Avoid hot partitions (wasted RUs) Index only what you query Reduce write RU cost Use point reads 1 RU vs 3+ RU for queries Serverless for dev/test Pay per request Autoscale for production Scale down during low traffic TTL for temporary data Auto-delete old items Time-to-Live (TTL) // Enable TTL on container await database.containers.createIfNotExists({ id: 'sessions', partitionKey: { paths: ['/userId'] }, defaultTtl: 3600 // 1 hour });

// Per-item TTL const session = { id: 'session-123', userId: 'user-456', ttl: 1800 // Override: 30 minutes };

Anti-Patterns Bad partition key - Low cardinality causes hot partitions Cross-partition queries - Expensive; design for single-partition queries Over-indexing - Increases write cost; index only queried paths Large items - Max 2MB; store blobs in Azure Blob Storage Ignoring RU cost - Monitor and optimize expensive queries Strong consistency everywhere - Use Session (default) unless required No retry logic - Handle 429 (throttling) with exponential backoff Missing TTL - Set TTL for temporary/session data

返回排行榜