- StateGraph
-
- Main class for building stateful graphs
- Nodes
-
- Functions that perform work and update state
- Edges
-
- Define execution order (static or conditional)
- START/END
-
- Special nodes marking entry and exit points
- State with Reducers
-
- Control how state updates are merged
- Graphs must be
- compile()
- d before execution.
- Designing a LangGraph application
- Follow these 5 steps when building a new graph:
- Map out discrete steps
- — sketch a flowchart of your workflow. Each step becomes a node.
- Identify what each step does
- — categorize nodes: LLM step, data step, action step, or user input step. For each, determine static context (prompt), dynamic context (from state), retry strategy, and desired outcome.
- Design your state
- — state is shared memory for all nodes. Store raw data, format prompts on-demand inside nodes.
- Build your nodes
- — implement each step as a function that takes state and returns partial updates.
- Wire it together
- — connect nodes with edges, add conditional routing, compile with a checkpointer if needed.
- Use LangGraph When
- Use Alternatives When
- Need fine-grained control over agent orchestration
- Quick prototyping → LangChain agents
- Building complex workflows with branching/loops
- Simple stateless workflows → LangChain direct
- Require human-in-the-loop, persistence
- Batteries-included features → Deep Agents
- State Management
- Need
- Solution
- Example
- Overwrite value
- No reducer (default)
- Simple fields like counters
- Append to list
- Reducer (operator.add / concat)
- Message history, logs
- Custom logic
- Custom reducer function
- Complex merging
- class State(TypedDict):
- name: str # Default: overwrites on update
- messages: Annotated[list, operator.add] # Appends to list
- total: Annotated[int, operator.add] # Sums integers
- Use StateSchema with ReducedValue for accumulating arrays.
- ```typescript
- import { StateSchema, ReducedValue, MessagesValue } from "@langchain/langgraph";
- import { z } from "zod";
- const State = new StateSchema({
- name: z.string(), // Default: overwrites
- messages: MessagesValue, // Built-in for messages
- items: new ReducedValue(
- z.array(z.string()).default(() => []),
- { reducer: (current, update) => current.concat(update) }
- ),
- });
- Node 1 returns:
- Node 2 returns:
- Final: {"messages": ["B"]} # "A" is LOST!
- CORRECT: Use Annotated with operator.add
- from typing import Annotated
- import operator
- class State(TypedDict):
- messages: Annotated[list, operator.add]
- Final:
- Without ReducedValue, arrays are overwritten not appended.
- ```typescript
- // WRONG: Array will be overwritten
- const State = new StateSchema({
- items: z.array(z.string()), // No reducer!
- });
- // Node 1: { items: ["A"] }, Node 2: { items: ["B"] }
- // Final: { items: ["B"] } // A is lost!
- // CORRECT: Use ReducedValue
- const State = new StateSchema({
- items: new ReducedValue(
- z.array(z.string()).default(() => []),
- { reducer: (current, update) => current.concat(update) }
- ),
- });
- // Final:
- CORRECT: Return dict with only the updates
- def my_node(state: State) -> dict:
- return
- Return partial updates only, not the full state object.
- ```typescript
- // WRONG: Returning entire state
- const myNode = async (state: typeof State.State) => {
- state.field = "updated";
- return state; // Don't do this!
- };
- // CORRECT: Return partial updates
- const myNode = async (state: typeof State.State) => {
- return { field: "updated" };
- };
- Nodes
- Node functions accept these arguments:
- Signature
- When to Use
- def node(state: State)
- Simple nodes that only need state
- def node(state: State, config: RunnableConfig)
- Need thread_id, tags, or configurable values
- def node(state: State, runtime: Runtime[Context])
- Need runtime context, store, or stream_writer
- from
- langchain_core
- .
- runnables
- import
- RunnableConfig
- from
- langgraph
- .
- runtime
- import
- Runtime
- def
- plain_node
- (
- state
- :
- State
- )
- :
- return
- {
- "results"
- :
- "done"
- }
- def
- node_with_config
- (
- state
- :
- State
- ,
- config
- :
- RunnableConfig
- )
- :
- thread_id
- =
- config
- [
- "configurable"
- ]
- [
- "thread_id"
- ]
- return
- {
- "results"
- :
- f"Thread:
- {
- thread_id
- }
- "
- }
- def
- node_with_runtime
- (
- state
- :
- State
- ,
- runtime
- :
- Runtime
- [
- Context
- ]
- )
- :
- user_id
- =
- runtime
- .
- context
- .
- user_id
- return
- {
- "results"
- :
- f"User:
- {
- user_id
- }
- "
- }
- Signature
- When to Use
- (state) =>
- Simple nodes that only need state
- (state, config) =>
- Need thread_id, tags, or configurable values
- import
- {
- GraphNode
- ,
- StateSchema
- }
- from
- "@langchain/langgraph"
- ;
- const
- plainNode
- :
- GraphNode
- <
- typeof
- State
- >
- =
- (
- state
- )
- =>
- {
- return
- {
- results
- :
- "done"
- }
- ;
- }
- ;
- const
- nodeWithConfig
- :
- GraphNode
- <
- typeof
- State
- >
- =
- (
- state
- ,
- config
- )
- =>
- {
- const
- threadId
- =
- config
- ?.
- configurable
- ?.
- thread_id
- ;
- return
- {
- results
- :
- `
- Thread:
- ${
- threadId
- }
- `
- }
- ;
- }
- ;
- Edges
- Need
- Edge Type
- When to Use
- Always go to same node
- add_edge()
- Fixed, deterministic flow
- Route based on state
- add_conditional_edges()
- Dynamic branching
- Update state AND route
- Command
- Combine logic in single node
- Fan-out to multiple nodes
- Send
- Parallel processing with dynamic inputs
- class State(TypedDict):
- input: str
- output: str
- def process_input(state: State) -> dict:
- return {"output": f"Processed: {state['input']}"}
- def finalize(state: State) -> dict:
- return
- graph = (
- StateGraph(State)
- .add_node("process", process_input)
- .add_node("finalize", finalize)
- .add_edge(START, "process")
- .add_edge("process", "finalize")
- .add_edge("finalize", END)
- .compile()
- )
- result = graph.invoke({"input": "hello"})
- print(result["output"]) # "PROCESSED: HELLO"
- Chain nodes with addEdge and compile before invoking.
- ```typescript
- import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
- import { z } from "zod";
- const State = new StateSchema({
- input: z.string(),
- output: z.string().default(""),
- });
- const processInput = async (state: typeof State.State) => {
- return { output:
Processed: ${state.input}}; - };
- const finalize = async (state: typeof State.State) => {
- return { output: state.output.toUpperCase() };
- };
- const graph = new StateGraph(State)
- .addNode("process", processInput)
- .addNode("finalize", finalize)
- .addEdge(START, "process")
- .addEdge("process", "finalize")
- .addEdge("finalize", END)
- .compile();
- const result = await graph.invoke({ input: "hello" });
- console.log(result.output); // "PROCESSED: HELLO"
- class State(TypedDict):
- query: str
- route: str
- result: str
- def classify(state: State) -> dict:
- if "weather" in state["query"].lower():
- return
- return
- def route_query(state: State) -> Literal["weather", "general"]:
- return state["route"]
- graph = (
- StateGraph(State)
- .add_node("classify", classify)
- .add_node("weather", lambda s: {"result": "Sunny, 72F"})
- .add_node("general", lambda s: {"result": "General response"})
- .add_edge(START, "classify")
- .add_conditional_edges("classify", route_query, ["weather", "general"])
- .add_edge("weather", END)
- .add_edge("general", END)
- .compile()
- )
- addConditionalEdges routes based on function return value.
- ```typescript
- import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
- import { z } from "zod";
- const State = new StateSchema({
- query: z.string(),
- route: z.string().default(""),
- result: z.string().default(""),
- });
- const classify = async (state: typeof State.State) => {
- if (state.query.toLowerCase().includes("weather")) {
- return { route: "weather" };
- }
- return { route: "general" };
- };
- const routeQuery = (state: typeof State.State) => state.route;
- const graph = new StateGraph(State)
- .addNode("classify", classify)
- .addNode("weather", async () => ({ result: "Sunny, 72F" }))
- .addNode("general", async () => ({ result: "General response" }))
- .addEdge(START, "classify")
- .addConditionalEdges("classify", routeQuery, ["weather", "general"])
- .addEdge("weather", END)
- .addEdge("general", END)
- .compile();
- Command
- Command combines state updates and routing in a single return value. Fields:
- update
-
- State updates to apply (like returning a dict from a node)
- goto
-
- Node name(s) to navigate to next
- resume
-
- Value to resume after
- interrupt()
- — see human-in-the-loop skill
- class State(TypedDict):
- count: int
- result: str
- def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
- """Update state AND decide next node in one return."""
- new_count = state["count"] + 1
- if new_count > 5:
- return Command(update={"count": new_count}, goto="node_c")
- return Command(update={"count": new_count}, goto="node_b")
- graph = (
- StateGraph(State)
- .add_node("node_a", node_a)
- .add_node("node_b", lambda s: {"result": "B"})
- .add_node("node_c", lambda s: {"result": "C"})
- .add_edge(START, "node_a")
- .add_edge("node_b", END)
- .add_edge("node_c", END)
- .compile()
- )
- Return Command with update and goto to combine state change with routing.
- ```typescript
- import { StateGraph, StateSchema, START, END, Command } from "@langchain/langgraph";
- import { z } from "zod";
- const State = new StateSchema({
- count: z.number().default(0),
- result: z.string().default(""),
- });
- const nodeA = async (state: typeof State.State) => {
- const newCount = state.count + 1;
- if (newCount > 5) {
- return new Command({ update: { count: newCount }, goto: "node_c" });
- }
- return new Command({ update: { count: newCount }, goto: "node_b" });
- };
- const graph = new StateGraph(State)
- .addNode("node_a", nodeA, { ends: ["node_b", "node_c"] })
- .addNode("node_b", async () => ({ result: "B" }))
- .addNode("node_c", async () => ({ result: "C" }))
- .addEdge(START, "node_a")
- .addEdge("node_b", END)
- .addEdge("node_c", END)
- .compile();
- Python
-
- Use
- Command[Literal["node_a", "node_b"]]
- as the return type annotation to declare valid goto destinations.
- TypeScript
-
- Pass
- { ends: ["node_a", "node_b"] }
- as the third argument to
- addNode
- to declare valid goto destinations.
- Warning
- :
- Command
- only adds
- dynamic
- edges — static edges defined with
- add_edge
- /
- addEdge
- still execute. If
- node_a
- returns
- Command(goto="node_c")
- and you also have
- graph.add_edge("node_a", "node_b")
- ,
- both
- node_b
- and
- node_c
- will run.
- Send API
- Fan-out with
- Send
- return
[Send("worker", {...})]
from a conditional edge to spawn parallel workers. Requires a reducer on the results field.
class OrchestratorState(TypedDict):
tasks: list[str]
results: Annotated[list, operator.add]
summary: str
def orchestrator(state: OrchestratorState):
"""Fan out tasks to workers."""
return [Send("worker", {"task": task}) for task in state["tasks"]]
def worker(state: dict) -> dict:
return {"results": [f"Completed: {state['task']}"]}
def synthesize(state: OrchestratorState) -> dict:
return {"summary": f"Processed {len(state['results'])} tasks"}
graph = (
StateGraph(OrchestratorState)
.add_node("worker", worker)
.add_node("synthesize", synthesize)
.add_conditional_edges(START, orchestrator, ["worker"])
.add_edge("worker", "synthesize")
.add_edge("synthesize", END)
.compile()
)
result = graph.invoke({"tasks": ["Task A", "Task B", "Task C"]})
Fan out tasks to parallel workers using the Send API and aggregate results. typescript import { Send, StateGraph, StateSchema, ReducedValue, START, END } from "@langchain/langgraph"; import { z } from "zod"; const State = new StateSchema({ tasks: z.array(z.string()), results: new ReducedValue( z.array(z.string()).default(() => []), { reducer: (curr, upd) => curr.concat(upd) } ), summary: z.string().default(""), }); const orchestrator = (state: typeof State.State) => { return state.tasks.map((task) => new Send("worker", { task })); }; const worker = async (state: { task: string }) => { return { results: [`Completed: ${state.task}`] }; }; const synthesize = async (state: typeof State.State) => { return { summary: `Processed ${state.results.length} tasks` }; }; const graph = new StateGraph(State) .addNode("worker", worker) .addNode("synthesize", synthesize) .addConditionalEdges(START, orchestrator, ["worker"]) .addEdge("worker", "synthesize") .addEdge("synthesize", END) .compile(); CORRECT class State(TypedDict): results: Annotated[list, operator.add] # Accumulates </python> <typescript> Use ReducedValue to accumulate parallel worker results.typescript // WRONG: No reducer const State = new StateSchema({ results: z.array(z.string()) }); // CORRECT const State = new StateSchema({ results: new ReducedValue(z.array(z.string()).default(() => []), { reducer: (curr, upd) => curr.concat(upd) }), }); Running Graphs: Invoke and Stream Call graph.invoke(input, config) to run a graph to completion and return the final state. Mode What it Streams Use Case values Full state after each step Monitor complete state updates State deltas Track incremental updates messages LLM tokens + metadata Chat UIs custom User-defined data Progress indicators def my_node(state): writer = get_stream_writer() writer("Processing step 1...")
Do work
writer("Complete!")
return {"result": "done"}
for chunk in graph.stream({"data": "test"}, stream_mode="custom"):
print(chunk)
typescript
import { getWriter } from "@langchain/langgraph";
const myNode = async (state: typeof State.State) => {
const writer = getWriter();
writer("Processing step 1...");
// Do work
writer("Complete!");
return { result: "done" };
};
for await (const chunk of graph.stream({ data: "test" }, { streamMode: "custom" })) {
console.log(chunk);
}
Error Handling
Match the error type to the right handler:
Error Type
Who Fixes
Strategy
Example
Transient (network, rate limits)
System
RetryPolicy(max_attempts=3)
add_node(..., retry_policy=...)
LLM-recoverable (tool failures)
LLM
ToolNode(tools, handle_tool_errors=True)
Error returned as ToolMessage
User-fixable (missing info)
Human
interrupt({"message": ...})
Collect missing data (see HITL skill)
Unexpected
Developer
Let bubble up
raise
workflow.add_node(
"search_documentation",
search_documentation,
retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0)
)
</python>
<typescript>
Use retryPolicy for transient errors.typescript
workflow.addNode(
"searchDocumentation",
searchDocumentation,
{
retryPolicy: { maxAttempts: 3, initialInterval: 1.0 },
},
);
tool_node = ToolNode(tools, handle_tool_errors=True)
workflow.add_node("tools", tool_node)
typescript
import { ToolNode } from "@langchain/langgraph/prebuilt";
const toolNode = new ToolNode(tools, { handleToolErrors: true });
workflow.addNode("tools", toolNode);
Common Fixes
CORRECT
graph = builder.compile()
graph.invoke({"input": "test"})
</python>
<typescript>
Must compile() to get executable graph.typescript
// WRONG
await builder.invoke({ input: "test" });
// CORRECT
const graph = builder.compile();
await graph.invoke({ input: "test" });
CORRECT
def should_continue(state):
return END if state["count"] > 10 else "node_b"
builder.add_conditional_edges("node_a", should_continue)
typescript
// WRONG: Loops forever
builder.addEdge("node_a", "node_b").addEdge("node_b", "node_a");
// CORRECT
builder.addConditionalEdges("node_a", (state) => state.count > 10 ? END : "node_b");
Command return type needs Literal for routing destinations (Python)
def node_a(state) -> Command[Literal["node_b", "node_c"]]:
return Command(goto="node_b")
START is entry-only - cannot route back to it
builder.add_edge("node_a", START) # WRONG!
builder.add_edge("node_a", "entry") # Use a named entry node instead
Reducer expects matching types
return {"items": ["item"]} # List for list reducer, not a stringtypescript
// Always await graph.invoke() - it returns a Promise
const result = await graph.invoke({ input: "test" });
// TS Command nodes need { ends } to declare routing destinations
builder.addNode("router", routerFn, { ends: ["node_b", "node_c"] });
Mutate state directly — always return partial update dicts from nodes
Route back to START — it's entry-only; use a named node instead
Forget reducers on list fields — without one, last write wins
Mix static edges with Command goto without understanding both will execute