projection-patterns

安装量: 3.1K
排名: #710

安装

npx skills add https://github.com/wshobson/agents --skill projection-patterns

Projection Patterns

Comprehensive guide to building projections and read models for event-sourced systems.

When to Use This Skill Building CQRS read models Creating materialized views from events Optimizing query performance Implementing real-time dashboards Building search indexes from events Aggregating data across streams Core Concepts 1. Projection Architecture ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Event Store │────►│ Projector │────►│ Read Model │ │ │ │ │ │ (Database) │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ │ Events │ │ │ │ Handler │ │ │ │ Tables │ │ │ └─────────┘ │ │ │ Logic │ │ │ │ Views │ │ │ │ │ └─────────┘ │ │ │ Cache │ │ └─────────────┘ └─────────────┘ └─────────────┘

  1. Projection Types Type Description Use Case Live Real-time from subscription Current state queries Catchup Process historical events Rebuilding read models Persistent Stores checkpoint Resume after restart Inline Same transaction as write Strong consistency Templates Template 1: Basic Projector from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Dict, Any, Callable, List import asyncpg

@dataclass class Event: stream_id: str event_type: str data: dict version: int global_position: int

class Projection(ABC): """Base class for projections."""

@property
@abstractmethod
def name(self) -> str:
    """Unique projection name for checkpointing."""
    pass

@abstractmethod
def handles(self) -> List[str]:
    """List of event types this projection handles."""
    pass

@abstractmethod
async def apply(self, event: Event) -> None:
    """Apply event to the read model."""
    pass

class Projector: """Runs projections from event store."""

def __init__(self, event_store, checkpoint_store):
    self.event_store = event_store
    self.checkpoint_store = checkpoint_store
    self.projections: List[Projection] = []

def register(self, projection: Projection):
    self.projections.append(projection)

async def run(self, batch_size: int = 100):
    """Run all projections continuously."""
    while True:
        for projection in self.projections:
            await self._run_projection(projection, batch_size)
        await asyncio.sleep(0.1)

async def _run_projection(self, projection: Projection, batch_size: int):
    checkpoint = await self.checkpoint_store.get(projection.name)
    position = checkpoint or 0

    events = await self.event_store.read_all(position, batch_size)

    for event in events:
        if event.event_type in projection.handles():
            await projection.apply(event)

        await self.checkpoint_store.save(
            projection.name,
            event.global_position
        )

async def rebuild(self, projection: Projection):
    """Rebuild a projection from scratch."""
    await self.checkpoint_store.delete(projection.name)
    # Optionally clear read model tables
    await self._run_projection(projection, batch_size=1000)

Template 2: Order Summary Projection class OrderSummaryProjection(Projection): """Projects order events to a summary read model."""

def __init__(self, db_pool: asyncpg.Pool):
    self.pool = db_pool

@property
def name(self) -> str:
    return "order_summary"

def handles(self) -> List[str]:
    return [
        "OrderCreated",
        "OrderItemAdded",
        "OrderItemRemoved",
        "OrderShipped",
        "OrderCompleted",
        "OrderCancelled"
    ]

async def apply(self, event: Event) -> None:
    handlers = {
        "OrderCreated": self._handle_created,
        "OrderItemAdded": self._handle_item_added,
        "OrderItemRemoved": self._handle_item_removed,
        "OrderShipped": self._handle_shipped,
        "OrderCompleted": self._handle_completed,
        "OrderCancelled": self._handle_cancelled,
    }

    handler = handlers.get(event.event_type)
    if handler:
        await handler(event)

async def _handle_created(self, event: Event):
    async with self.pool.acquire() as conn:
        await conn.execute(
            """
            INSERT INTO order_summaries
            (order_id, customer_id, status, total_amount, item_count, created_at)
            VALUES ($1, $2, $3, $4, $5, $6)
            """,
            event.data['order_id'],
            event.data['customer_id'],
            'pending',
            0,
            0,
            event.data['created_at']
        )

async def _handle_item_added(self, event: Event):
    async with self.pool.acquire() as conn:
        await conn.execute(
            """
            UPDATE order_summaries
            SET total_amount = total_amount + $2,
                item_count = item_count + 1,
                updated_at = NOW()
            WHERE order_id = $1
            """,
            event.data['order_id'],
            event.data['price'] * event.data['quantity']
        )

async def _handle_item_removed(self, event: Event):
    async with self.pool.acquire() as conn:
        await conn.execute(
            """
            UPDATE order_summaries
            SET total_amount = total_amount - $2,
                item_count = item_count - 1,
                updated_at = NOW()
            WHERE order_id = $1
            """,
            event.data['order_id'],
            event.data['price'] * event.data['quantity']
        )

async def _handle_shipped(self, event: Event):
    async with self.pool.acquire() as conn:
        await conn.execute(
            """
            UPDATE order_summaries
            SET status = 'shipped',
                shipped_at = $2,
                updated_at = NOW()
            WHERE order_id = $1
            """,
            event.data['order_id'],
            event.data['shipped_at']
        )

async def _handle_completed(self, event: Event):
    async with self.pool.acquire() as conn:
        await conn.execute(
            """
            UPDATE order_summaries
            SET status = 'completed',
                completed_at = $2,
                updated_at = NOW()
            WHERE order_id = $1
            """,
            event.data['order_id'],
            event.data['completed_at']
        )

async def _handle_cancelled(self, event: Event):
    async with self.pool.acquire() as conn:
        await conn.execute(
            """
            UPDATE order_summaries
            SET status = 'cancelled',
                cancelled_at = $2,
                cancellation_reason = $3,
                updated_at = NOW()
            WHERE order_id = $1
            """,
            event.data['order_id'],
            event.data['cancelled_at'],
            event.data.get('reason')
        )

Template 3: Elasticsearch Search Projection from elasticsearch import AsyncElasticsearch

class ProductSearchProjection(Projection): """Projects product events to Elasticsearch for full-text search."""

def __init__(self, es_client: AsyncElasticsearch):
    self.es = es_client
    self.index = "products"

@property
def name(self) -> str:
    return "product_search"

def handles(self) -> List[str]:
    return [
        "ProductCreated",
        "ProductUpdated",
        "ProductPriceChanged",
        "ProductDeleted"
    ]

async def apply(self, event: Event) -> None:
    if event.event_type == "ProductCreated":
        await self.es.index(
            index=self.index,
            id=event.data['product_id'],
            document={
                'name': event.data['name'],
                'description': event.data['description'],
                'category': event.data['category'],
                'price': event.data['price'],
                'tags': event.data.get('tags', []),
                'created_at': event.data['created_at']
            }
        )

    elif event.event_type == "ProductUpdated":
        await self.es.update(
            index=self.index,
            id=event.data['product_id'],
            doc={
                'name': event.data['name'],
                'description': event.data['description'],
                'category': event.data['category'],
                'tags': event.data.get('tags', []),
                'updated_at': event.data['updated_at']
            }
        )

    elif event.event_type == "ProductPriceChanged":
        await self.es.update(
            index=self.index,
            id=event.data['product_id'],
            doc={
                'price': event.data['new_price'],
                'price_updated_at': event.data['changed_at']
            }
        )

    elif event.event_type == "ProductDeleted":
        await self.es.delete(
            index=self.index,
            id=event.data['product_id']
        )

Template 4: Aggregating Projection class DailySalesProjection(Projection): """Aggregates sales data by day for reporting."""

def __init__(self, db_pool: asyncpg.Pool):
    self.pool = db_pool

@property
def name(self) -> str:
    return "daily_sales"

def handles(self) -> List[str]:
    return ["OrderCompleted", "OrderRefunded"]

async def apply(self, event: Event) -> None:
    if event.event_type == "OrderCompleted":
        await self._increment_sales(event)
    elif event.event_type == "OrderRefunded":
        await self._decrement_sales(event)

async def _increment_sales(self, event: Event):
    date = event.data['completed_at'][:10]  # YYYY-MM-DD
    async with self.pool.acquire() as conn:
        await conn.execute(
            """
            INSERT INTO daily_sales (date, total_orders, total_revenue, total_items)
            VALUES ($1, 1, $2, $3)
            ON CONFLICT (date) DO UPDATE SET
                total_orders = daily_sales.total_orders + 1,
                total_revenue = daily_sales.total_revenue + $2,
                total_items = daily_sales.total_items + $3,
                updated_at = NOW()
            """,
            date,
            event.data['total_amount'],
            event.data['item_count']
        )

async def _decrement_sales(self, event: Event):
    date = event.data['original_completed_at'][:10]
    async with self.pool.acquire() as conn:
        await conn.execute(
            """
            UPDATE daily_sales SET
                total_orders = total_orders - 1,
                total_revenue = total_revenue - $2,
                total_refunds = total_refunds + $2,
                updated_at = NOW()
            WHERE date = $1
            """,
            date,
            event.data['refund_amount']
        )

Template 5: Multi-Table Projection class CustomerActivityProjection(Projection): """Projects customer activity across multiple tables."""

def __init__(self, db_pool: asyncpg.Pool):
    self.pool = db_pool

@property
def name(self) -> str:
    return "customer_activity"

def handles(self) -> List[str]:
    return [
        "CustomerCreated",
        "OrderCompleted",
        "ReviewSubmitted",
        "CustomerTierChanged"
    ]

async def apply(self, event: Event) -> None:
    async with self.pool.acquire() as conn:
        async with conn.transaction():
            if event.event_type == "CustomerCreated":
                # Insert into customers table
                await conn.execute(
                    """
                    INSERT INTO customers (customer_id, email, name, tier, created_at)
                    VALUES ($1, $2, $3, 'bronze', $4)
                    """,
                    event.data['customer_id'],
                    event.data['email'],
                    event.data['name'],
                    event.data['created_at']
                )
                # Initialize activity summary
                await conn.execute(
                    """
                    INSERT INTO customer_activity_summary
                    (customer_id, total_orders, total_spent, total_reviews)
                    VALUES ($1, 0, 0, 0)
                    """,
                    event.data['customer_id']
                )

            elif event.event_type == "OrderCompleted":
                # Update activity summary
                await conn.execute(
                    """
                    UPDATE customer_activity_summary SET
                        total_orders = total_orders + 1,
                        total_spent = total_spent + $2,
                        last_order_at = $3
                    WHERE customer_id = $1
                    """,
                    event.data['customer_id'],
                    event.data['total_amount'],
                    event.data['completed_at']
                )
                # Insert into order history
                await conn.execute(
                    """
                    INSERT INTO customer_order_history
                    (customer_id, order_id, amount, completed_at)
                    VALUES ($1, $2, $3, $4)
                    """,
                    event.data['customer_id'],
                    event.data['order_id'],
                    event.data['total_amount'],
                    event.data['completed_at']
                )

            elif event.event_type == "ReviewSubmitted":
                await conn.execute(
                    """
                    UPDATE customer_activity_summary SET
                        total_reviews = total_reviews + 1,
                        last_review_at = $2
                    WHERE customer_id = $1
                    """,
                    event.data['customer_id'],
                    event.data['submitted_at']
                )

            elif event.event_type == "CustomerTierChanged":
                await conn.execute(
                    """
                    UPDATE customers SET tier = $2, updated_at = NOW()
                    WHERE customer_id = $1
                    """,
                    event.data['customer_id'],
                    event.data['new_tier']
                )

Best Practices Do's Make projections idempotent - Safe to replay Use transactions - For multi-table updates Store checkpoints - Resume after failures Monitor lag - Alert on projection delays Plan for rebuilds - Design for reconstruction Don'ts Don't couple projections - Each is independent Don't skip error handling - Log and alert on failures Don't ignore ordering - Events must be processed in order Don't over-normalize - Denormalize for query patterns Resources CQRS Pattern Projection Building Blocks

返回排行榜