langgraph-persistence

安装量: 1.8K
排名: #915

安装

npx skills add https://github.com/langchain-ai/langchain-skills --skill langgraph-persistence
Checkpointer
Saves/loads graph state at every super-step
Thread ID
Identifies separate checkpoint sequences (conversations)
Store
Cross-thread memory for user preferences, facts
Two memory types:
Short-term
(checkpointer): Thread-scoped conversation history
Long-term
(store): Cross-thread user preferences, facts
Checkpointer
Use Case
Production Ready
InMemorySaver
Testing, development
No
SqliteSaver
Local development
Partial
PostgresSaver
Production
Yes
Checkpointer Setup
class State(TypedDict):
messages: Annotated[list, operator.add]
def add_message(state: State) -> dict:
return
checkpointer = InMemorySaver()
graph = (
StateGraph(State)
.add_node("respond", add_message)
.add_edge(START, "respond")
.add_edge("respond", END)
.compile(checkpointer=checkpointer) # Pass at compile time
)
ALWAYS provide thread_id
config = {"configurable": {"thread_id": "conversation-1"}}
result1 = graph.invoke({"messages": ["Hello"]}, config)
print(len(result1["messages"])) # 2
result2 = graph.invoke({"messages": ["How are you?"]}, config)
print(len(result2["messages"])) # 4 (previous + new)
Set up a basic graph with in-memory checkpointing and thread-based state persistence.
```typescript
import { MemorySaver, StateGraph, StateSchema, MessagesValue, START, END } from "@langchain/langgraph";
import { HumanMessage } from "@langchain/core/messages";
const State = new StateSchema({ messages: MessagesValue });
const addMessage = async (state: typeof State.State) => {
return { messages: [{ role: "assistant", content: "Bot response" }] };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("respond", addMessage)
.addEdge(START, "respond")
.addEdge("respond", END)
.compile({ checkpointer });
// ALWAYS provide thread_id
const config = { configurable: { thread_id: "conversation-1" } };
const result1 = await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
console.log(result1.messages.length); // 2
const result2 = await graph.invoke({ messages: [new HumanMessage("How are you?")] }, config);
console.log(result2.messages.length); // 4 (previous + new)
with PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
) as checkpointer:
checkpointer.setup() # only needed on first use to create tables
graph = builder.compile(checkpointer=checkpointer)
Configure PostgreSQL-backed checkpointing for production deployments.
```typescript
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString(
"postgresql://user:pass@localhost/db"
);
await checkpointer.setup(); // only needed on first use to create tables
const graph = builder.compile({ checkpointer });
Thread Management
graph.invoke({"messages": ["Hi from Alice"]}, alice_config)
graph.invoke({"messages": ["Hi from Bob"]}, bob_config)
Alice's state is isolated from Bob's
Demonstrate isolated state between different thread IDs.
```typescript
// Different threads maintain separate state
const aliceConfig = { configurable: { thread_id: "user-alice" } };
const bobConfig = { configurable: { thread_id: "user-bob" } };
await graph.invoke({ messages: [new HumanMessage("Hi from Alice")] }, aliceConfig);
await graph.invoke({ messages: [new HumanMessage("Hi from Bob")] }, bobConfig);
// Alice's state is isolated from Bob's
State History & Time Travel
result = graph.invoke({"messages": ["start"]}, config)
Browse checkpoint history
states = list(graph.get_state_history(config))
Replay from a past checkpoint
past = states[-2]
result = graph.invoke(None, past.config) # None = resume from checkpoint
Or fork: update state at a past checkpoint, then resume
fork_config = graph.update_state(past.config, {"messages": ["edited"]})
result = graph.invoke(None, fork_config)
Time travel: browse checkpoint history and replay or fork from a past state.
```typescript
const config = { configurable: { thread_id: "session-1" } };
const result = await graph.invoke({ messages: ["start"] }, config);
// Browse checkpoint history (async iterable, collect to array)
const states: Awaited>[] = [];
for await (const state of graph.getStateHistory(config)) {
states.push(state);
}
// Replay from a past checkpoint
const past = states[states.length - 2];
const replayed = await graph.invoke(null, past.config); // null = resume from checkpoint
// Or fork: update state at a past checkpoint, then resume
const forkConfig = await graph.updateState(past.config, { messages: ["edited"] });
const forked = await graph.invoke(null, forkConfig);
Modify state before resuming
graph.update_state(config, {"data": "manually_updated"})
Resume with updated state
result = graph.invoke(None, config)
Manually update graph state before resuming execution.
```typescript
const config = { configurable: { thread_id: "session-1" } };
// Modify state before resuming
await graph.updateState(config, { data: "manually_updated" });
// Resume with updated state
const result = await graph.invoke(null, config);
Subgraph Checkpointer Scoping
When compiling a subgraph, the
checkpointer
parameter controls persistence behavior. This is critical for subgraphs that use interrupts, need multi-turn memory, or run in parallel.
Feature
checkpointer=False
None
(default)
True
Interrupts (HITL)
No
Yes
Yes
Multi-turn memory
No
No
Yes
Multiple calls (different subgraphs)
Yes
Yes
Warning (namespace conflicts possible)
Multiple calls (same subgraph)
Yes
Yes
No
State inspection
No
Warning (current invocation only)
Yes
When to use each mode
checkpointer=False
— Subgraph doesn't need interrupts or persistence. Simplest option, no checkpoint overhead.
None
(default / omit
checkpointer
)
— Subgraph needs
interrupt()
but not multi-turn memory. Each invocation starts fresh but can pause/resume. Parallel execution works because each invocation gets a unique namespace.
checkpointer=True
— Subgraph needs to remember state across invocations (multi-turn conversations). Each call picks up where the last left off.
Warning
Stateful subgraphs ( checkpointer=True ) do NOT support calling the same subgraph instance multiple times within a single node — the calls write to the same checkpoint namespace and conflict. Need interrupts but not cross-invocation persistence (default) subgraph = subgraph_builder.compile() Need cross-invocation persistence (stateful) subgraph = subgraph_builder.compile(checkpointer=True) Choose the right checkpointer mode for your subgraph. typescript // No interrupts needed — opt out of checkpointing const subgraph = subgraphBuilder.compile({ checkpointer: false }); // Need interrupts but not cross-invocation persistence (default) const subgraph = subgraphBuilder.compile(); // Need cross-invocation persistence (stateful) const subgraph = subgraphBuilder.compile({ checkpointer: true }); Parallel subgraph namespacing When multiple different stateful subgraphs run in parallel, wrap each in its own StateGraph with a unique node name for stable namespace isolation: def create_sub_agent(model, *, name, **kwargs): """Wrap an agent with a unique node name for namespace isolation.""" agent = create_agent(model=model, name=name, **kwargs) return ( StateGraph(MessagesState) .add_node(name, agent) # unique name -> stable namespace .add_edge(" start ", name) .compile() ) fruit_agent = create_sub_agent( "gpt-4.1-mini", name="fruit_agent", tools=[fruit_info], prompt="...", checkpointer=True, ) veggie_agent = create_sub_agent( "gpt-4.1-mini", name="veggie_agent", tools=[veggie_info], prompt="...", checkpointer=True, ) </python> <typescript>typescript import { StateGraph, StateSchema, MessagesValue, START } from "@langchain/langgraph"; function createSubAgent(model: string, { name, ...kwargs }: { name: string; [key: string]: any }) { const agent = createAgent({ model, name, ...kwargs }); return new StateGraph(new StateSchema({ messages: MessagesValue })) .addNode(name, agent) // unique name -> stable namespace .addEdge(START, name) .compile(); } const fruitAgent = createSubAgent("gpt-4.1-mini", { name: "fruit_agent", tools: [fruitInfo], prompt: "...", checkpointer: true, }); const veggieAgent = createSubAgent("gpt-4.1-mini", { name: "veggie_agent", tools: [veggieInfo], prompt: "...", checkpointer: true, }); Note: Subgraphs added as nodes (via add_node ) already get name-based namespaces automatically and don't need this wrapper. Long-Term Memory (Store) store = InMemoryStore() Save user preference (available across ALL threads) store.put(("alice", "preferences"), "language", {"preference": "short responses"}) Node with store — access via runtime from langgraph.runtime import Runtime def respond(state, runtime: Runtime): prefs = runtime.store.get((state["user_id"], "preferences"), "language") return {"response": f"Using preference: {prefs.value}"} Compile with BOTH checkpointer and store graph = builder.compile(checkpointer=checkpointer, store=store) Both threads access same long-term memory graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-1"}}) graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-2"}}) # Same preferences! Use a Store for cross-thread memory to share user preferences across conversations. ``typescript import { MemoryStore } from "@langchain/langgraph"; const store = new MemoryStore(); // Save user preference (available across ALL threads) await store.put(["alice", "preferences"], "language", { preference: "short responses" }); // Node with store — access via runtime const respond = async (state: typeof State.State, runtime: any) => { const item = await runtime.store?.get(["alice", "preferences"], "language"); return { response:Using preference: ${item?.value?.preference}` }; }; // Compile with BOTH checkpointer and store const graph = builder.compile({ checkpointer, store }); // Both threads access same long-term memory await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-1" } }); await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-2" } }); // Same preferences! store = InMemoryStore() store.put(("user-123", "facts"), "location", {"city": "San Francisco"}) # Put item = store.get(("user-123", "facts"), "location") # Get results = store.search(("user-123", "facts"), filter={"city": "San Francisco"}) # Search store.delete(("user-123", "facts"), "location") # Delete

Fixes

Always provide thread_id in config to enable state persistence. ```python

WRONG: No thread_id - state NOT persisted!

graph.invoke({"messages": ["Hello"]}) graph.invoke({"messages": ["What did I say?"]}) # Doesn't remember!

CORRECT: Always provide thread_id

config = {"configurable": {"thread_id": "session-1"}} graph.invoke({"messages": ["Hello"]}, config) graph.invoke({"messages": ["What did I say?"]}, config) # Remembers! // CORRECT: Always provide thread_id const config = { configurable: { thread_id: "session-1" } }; await graph.invoke({ messages: [new HumanMessage("Hello")] }, config); await graph.invoke({ messages: [new HumanMessage("What did I say?")] }, config); // Remembers! Use PostgresSaver instead of InMemorySaver for production persistence. ```python

WRONG: Data lost on process restart

checkpointer = InMemorySaver() # In-memory only!

CORRECT: Use persistent storage for production

from langgraph.checkpoint.postgres import PostgresSaver with PostgresSaver.from_conn_string("postgresql://...") as checkpointer: checkpointer.setup() # only needed on first use to create tables graph = builder.compile(checkpointer=checkpointer) // CORRECT: Use persistent storage for production import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres"; const checkpointer = PostgresSaver.fromConnString("postgresql://..."); await checkpointer.setup(); // only needed on first use to create tables Use Overwrite to replace state values instead of passing through reducers. ```python from langgraph.types import Overwrite

State with reducer: items: Annotated[list, operator.add]

Current state:

update_state PASSES THROUGH reducers

graph.update_state(config, {"items": ["C"]}) # Result: ["A", "B", "C"] - Appended!

To REPLACE instead, use Overwrite

graph.update_state(config, {"items": Overwrite(["C"])}) # Result: ["C"] - Replaced // State with reducer: items uses concat reducer // Current state: { items: ["A", "B"] } // updateState PASSES THROUGH reducers await graph.updateState(config, { items: ["C"] }); // Result: ["A", "B", "C"] - Appended! // To REPLACE instead, use Overwrite await graph.updateState(config, { items: new Overwrite(["C"]) }); // Result: ["C"] - Replaced Access store via the Runtime object in graph nodes. ```python

WRONG: Store not available in node

def my_node(state): store.put(...) # NameError! store not defined

CORRECT: Access store via runtime

from langgraph.runtime import Runtime def my_node(state, runtime: Runtime): runtime.store.put(...) # Correct store instance // CORRECT: Access store via runtime const myNode = async (state, runtime) => { await runtime.store?.put(...); // Correct store instance };

What You Should NOT Do

  • Use InMemorySaver in production — data lost on restart; use PostgresSaver
  • Forget thread_id — state won't persist without it
  • Expect update_state to bypass reducers — it passes through them; use Overwrite to replace
  • Run the same stateful subgraph (checkpointer=True) in parallel within one node — namespace conflict
  • Access store directly in a node — use runtime.store via the Runtime param
返回排行榜