LangGraph Architecture Decisions When to Use LangGraph Use LangGraph When You Need: Stateful conversations - Multi-turn interactions with memory Human-in-the-loop - Approval gates, corrections, interventions Complex control flow - Loops, branches, conditional routing Multi-agent coordination - Multiple LLMs working together Persistence - Resume from checkpoints, time travel debugging Streaming - Real-time token streaming, progress updates Reliability - Retries, error recovery, durability guarantees Consider Alternatives When: Scenario Alternative Why Single LLM call Direct API call Overhead not justified Linear pipeline LangChain LCEL Simpler abstraction Stateless tool use Function calling No persistence needed Simple RAG LangChain retrievers Built-in patterns Batch processing Async tasks Different execution model State Schema Decisions TypedDict vs Pydantic TypedDict Pydantic Lightweight, faster Runtime validation Dict-like access Attribute access No validation overhead Type coercion Simpler serialization Complex nested models
Recommendation: Use TypedDict for most cases. Use Pydantic when you need validation or complex nested structures.
Reducer Selection Use Case Reducer Example Chat messages add_messages Handles IDs, RemoveMessage Simple append operator.add Annotated[list, operator.add] Keep latest None (LastValue) field: str Custom merge Lambda Annotated[list, lambda a, b: ...] Overwrite list Overwrite Bypass reducer State Size Considerations
SMALL STATE (< 1MB) - Put in state
class State(TypedDict): messages: Annotated[list, add_messages] context: str
LARGE DATA - Use Store
class State(TypedDict): messages: Annotated[list, add_messages] document_ref: str # Reference to store
def node(state, *, store: BaseStore): doc = store.get(namespace, state["document_ref"]) # Process without bloating checkpoints
Graph Structure Decisions Single Graph vs Subgraphs
Single Graph when:
All nodes share the same state schema Simple linear or branching flow < 10 nodes
Subgraphs when:
Different state schemas needed Reusable components across graphs Team separation of concerns Complex hierarchical workflows Conditional Edges vs Command Conditional Edges Command Routing based on state Routing + state update Separate router function Decision in node Clearer visualization More flexible Standard patterns Dynamic destinations
Conditional Edge - when routing is the focus
def router(state) -> Literal["a", "b"]: return "a" if condition else "b" builder.add_conditional_edges("node", router)
Command - when combining routing with updates
def node(state) -> Command: return Command(goto="next", update={"step": state["step"] + 1})
Static vs Dynamic Routing
Static Edges (add_edge):
Fixed flow known at build time Clearer graph visualization Easier to reason about
Dynamic Routing (add_conditional_edges, Command, Send):
Runtime decisions based on state Agent-driven navigation Fan-out patterns Persistence Strategy Checkpointer Selection Checkpointer Use Case Characteristics InMemorySaver Testing only Lost on restart SqliteSaver Development Single file, local PostgresSaver Production Scalable, concurrent Custom Special needs Implement BaseCheckpointSaver Checkpointing Scope
Full persistence (default)
graph = builder.compile(checkpointer=checkpointer)
Subgraph options
subgraph = sub_builder.compile( checkpointer=None, # Inherit from parent checkpointer=True, # Independent checkpointing checkpointer=False, # No checkpointing (runs atomically) )
When to Disable Checkpointing Short-lived subgraphs that should be atomic Subgraphs with incompatible state schemas Performance-critical paths without need for resume Multi-Agent Architecture Supervisor Pattern
Best for:
Clear hierarchy Centralized decision making Different agent specializations ┌─────────────┐ │ Supervisor │ └──────┬──────┘ ┌────────┬───┴───┬────────┐ ▼ ▼ ▼ ▼ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │Agent1│ │Agent2│ │Agent3│ │Agent4│ └──────┘ └──────┘ └──────┘ └──────┘
Peer-to-Peer Pattern
Best for:
Collaborative agents No clear hierarchy Flexible communication ┌──────┐ ┌──────┐ │Agent1│◄───►│Agent2│ └──┬───┘ └───┬──┘ │ │ ▼ ▼ ┌──────┐ ┌──────┐ │Agent3│◄───►│Agent4│ └──────┘ └──────┘
Handoff Pattern
Best for:
Sequential specialization Clear stage transitions Different capabilities per stage ┌────────┐ ┌────────┐ ┌────────┐ │Research│───►│Planning│───►│Execute │ └────────┘ └────────┘ └────────┘
Streaming Strategy Stream Mode Selection Mode Use Case Data updates UI updates Node outputs only values State inspection Full state each step messages Chat UX LLM tokens custom Progress/logs Your data via StreamWriter debug Debugging Tasks + checkpoints Subgraph Streaming
Stream from subgraphs
async for chunk in graph.astream( input, stream_mode="updates", subgraphs=True # Include subgraph events ): namespace, data = chunk # namespace indicates depth
Human-in-the-Loop Design Interrupt Placement Strategy Use Case interrupt_before Approval before action interrupt_after Review after completion interrupt() in node Dynamic, contextual pauses Resume Patterns
Simple resume (same thread)
graph.invoke(None, config)
Resume with value
graph.invoke(Command(resume="approved"), config)
Resume specific interrupt
graph.invoke(Command(resume={interrupt_id: value}), config)
Modify state and resume
graph.update_state(config, {"field": "new_value"}) graph.invoke(None, config)
Error Handling Strategy Retry Configuration
Per-node retry
RetryPolicy( initial_interval=0.5, backoff_factor=2.0, max_interval=60.0, max_attempts=3, retry_on=lambda e: isinstance(e, (APIError, TimeoutError)) )
Multiple policies (first match wins)
builder.add_node("node", fn, retry_policy=[ RetryPolicy(retry_on=RateLimitError, max_attempts=5), RetryPolicy(retry_on=Exception, max_attempts=2), ])
Fallback Patterns def node_with_fallback(state): try: return primary_operation(state) except PrimaryError: return fallback_operation(state)
Or use conditional edges for complex fallback routing
def route_on_error(state) -> Literal["retry", "fallback", "end"]: if state.get("error") and state["attempts"] < 3: return "retry" elif state.get("error"): return "fallback" return END
Scaling Considerations Horizontal Scaling Use PostgresSaver for shared state Consider LangGraph Platform for managed infrastructure Use stores for large data outside checkpoints Performance Optimization Minimize state size - Use references for large data Parallel nodes - Fan out when possible Cache expensive operations - Use CachePolicy Async everywhere - Use ainvoke, astream Resource Limits
Set recursion limit
config = {"recursion_limit": 50} graph.invoke(input, config)
Track remaining steps in state
class State(TypedDict): remaining_steps: RemainingSteps
def check_budget(state): if state["remaining_steps"] < 5: return "wrap_up" return "continue"
Decision Checklist
Before implementing:
Is LangGraph the right tool? (vs simpler alternatives) State schema defined with appropriate reducers? Persistence strategy chosen? (dev vs prod checkpointer) Streaming needs identified? Human-in-the-loop points defined? Error handling and retry strategy? Multi-agent coordination pattern? (if applicable) Resource limits configured?