langgraph-architecture

安装量: 175
排名: #4947

安装

npx skills add https://github.com/existential-birds/beagle --skill langgraph-architecture

LangGraph Architecture Decisions When to Use LangGraph Use LangGraph When You Need: Stateful conversations - Multi-turn interactions with memory Human-in-the-loop - Approval gates, corrections, interventions Complex control flow - Loops, branches, conditional routing Multi-agent coordination - Multiple LLMs working together Persistence - Resume from checkpoints, time travel debugging Streaming - Real-time token streaming, progress updates Reliability - Retries, error recovery, durability guarantees Consider Alternatives When: Scenario Alternative Why Single LLM call Direct API call Overhead not justified Linear pipeline LangChain LCEL Simpler abstraction Stateless tool use Function calling No persistence needed Simple RAG LangChain retrievers Built-in patterns Batch processing Async tasks Different execution model State Schema Decisions TypedDict vs Pydantic TypedDict Pydantic Lightweight, faster Runtime validation Dict-like access Attribute access No validation overhead Type coercion Simpler serialization Complex nested models

Recommendation: Use TypedDict for most cases. Use Pydantic when you need validation or complex nested structures.

Reducer Selection Use Case Reducer Example Chat messages add_messages Handles IDs, RemoveMessage Simple append operator.add Annotated[list, operator.add] Keep latest None (LastValue) field: str Custom merge Lambda Annotated[list, lambda a, b: ...] Overwrite list Overwrite Bypass reducer State Size Considerations

SMALL STATE (< 1MB) - Put in state

class State(TypedDict): messages: Annotated[list, add_messages] context: str

LARGE DATA - Use Store

class State(TypedDict): messages: Annotated[list, add_messages] document_ref: str # Reference to store

def node(state, *, store: BaseStore): doc = store.get(namespace, state["document_ref"]) # Process without bloating checkpoints

Graph Structure Decisions Single Graph vs Subgraphs

Single Graph when:

All nodes share the same state schema Simple linear or branching flow < 10 nodes

Subgraphs when:

Different state schemas needed Reusable components across graphs Team separation of concerns Complex hierarchical workflows Conditional Edges vs Command Conditional Edges Command Routing based on state Routing + state update Separate router function Decision in node Clearer visualization More flexible Standard patterns Dynamic destinations

Conditional Edge - when routing is the focus

def router(state) -> Literal["a", "b"]: return "a" if condition else "b" builder.add_conditional_edges("node", router)

Command - when combining routing with updates

def node(state) -> Command: return Command(goto="next", update={"step": state["step"] + 1})

Static vs Dynamic Routing

Static Edges (add_edge):

Fixed flow known at build time Clearer graph visualization Easier to reason about

Dynamic Routing (add_conditional_edges, Command, Send):

Runtime decisions based on state Agent-driven navigation Fan-out patterns Persistence Strategy Checkpointer Selection Checkpointer Use Case Characteristics InMemorySaver Testing only Lost on restart SqliteSaver Development Single file, local PostgresSaver Production Scalable, concurrent Custom Special needs Implement BaseCheckpointSaver Checkpointing Scope

Full persistence (default)

graph = builder.compile(checkpointer=checkpointer)

Subgraph options

subgraph = sub_builder.compile( checkpointer=None, # Inherit from parent checkpointer=True, # Independent checkpointing checkpointer=False, # No checkpointing (runs atomically) )

When to Disable Checkpointing Short-lived subgraphs that should be atomic Subgraphs with incompatible state schemas Performance-critical paths without need for resume Multi-Agent Architecture Supervisor Pattern

Best for:

Clear hierarchy Centralized decision making Different agent specializations ┌─────────────┐ │ Supervisor │ └──────┬──────┘ ┌────────┬───┴───┬────────┐ ▼ ▼ ▼ ▼ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │Agent1│ │Agent2│ │Agent3│ │Agent4│ └──────┘ └──────┘ └──────┘ └──────┘

Peer-to-Peer Pattern

Best for:

Collaborative agents No clear hierarchy Flexible communication ┌──────┐ ┌──────┐ │Agent1│◄───►│Agent2│ └──┬───┘ └───┬──┘ │ │ ▼ ▼ ┌──────┐ ┌──────┐ │Agent3│◄───►│Agent4│ └──────┘ └──────┘

Handoff Pattern

Best for:

Sequential specialization Clear stage transitions Different capabilities per stage ┌────────┐ ┌────────┐ ┌────────┐ │Research│───►│Planning│───►│Execute │ └────────┘ └────────┘ └────────┘

Streaming Strategy Stream Mode Selection Mode Use Case Data updates UI updates Node outputs only values State inspection Full state each step messages Chat UX LLM tokens custom Progress/logs Your data via StreamWriter debug Debugging Tasks + checkpoints Subgraph Streaming

Stream from subgraphs

async for chunk in graph.astream( input, stream_mode="updates", subgraphs=True # Include subgraph events ): namespace, data = chunk # namespace indicates depth

Human-in-the-Loop Design Interrupt Placement Strategy Use Case interrupt_before Approval before action interrupt_after Review after completion interrupt() in node Dynamic, contextual pauses Resume Patterns

Simple resume (same thread)

graph.invoke(None, config)

Resume with value

graph.invoke(Command(resume="approved"), config)

Resume specific interrupt

graph.invoke(Command(resume={interrupt_id: value}), config)

Modify state and resume

graph.update_state(config, {"field": "new_value"}) graph.invoke(None, config)

Error Handling Strategy Retry Configuration

Per-node retry

RetryPolicy( initial_interval=0.5, backoff_factor=2.0, max_interval=60.0, max_attempts=3, retry_on=lambda e: isinstance(e, (APIError, TimeoutError)) )

Multiple policies (first match wins)

builder.add_node("node", fn, retry_policy=[ RetryPolicy(retry_on=RateLimitError, max_attempts=5), RetryPolicy(retry_on=Exception, max_attempts=2), ])

Fallback Patterns def node_with_fallback(state): try: return primary_operation(state) except PrimaryError: return fallback_operation(state)

Or use conditional edges for complex fallback routing

def route_on_error(state) -> Literal["retry", "fallback", "end"]: if state.get("error") and state["attempts"] < 3: return "retry" elif state.get("error"): return "fallback" return END

Scaling Considerations Horizontal Scaling Use PostgresSaver for shared state Consider LangGraph Platform for managed infrastructure Use stores for large data outside checkpoints Performance Optimization Minimize state size - Use references for large data Parallel nodes - Fan out when possible Cache expensive operations - Use CachePolicy Async everywhere - Use ainvoke, astream Resource Limits

Set recursion limit

config = {"recursion_limit": 50} graph.invoke(input, config)

Track remaining steps in state

class State(TypedDict): remaining_steps: RemainingSteps

def check_budget(state): if state["remaining_steps"] < 5: return "wrap_up" return "continue"

Decision Checklist

Before implementing:

Is LangGraph the right tool? (vs simpler alternatives) State schema defined with appropriate reducers? Persistence strategy chosen? (dev vs prod checkpointer) Streaming needs identified? Human-in-the-loop points defined? Error handling and retry strategy? Multi-agent coordination pattern? (if applicable) Resource limits configured?

返回排行榜