- interrupt(value)
- — pauses execution, surfaces a value to the caller
- Command(resume=value)
- — resumes execution, providing the value back to
- interrupt()
- Checkpointer
- — required to save state while paused
- Thread ID
- — required to identify which paused execution to resume
- Requirements
- Three things are required for interrupts to work:
- Checkpointer
- — compile with
- checkpointer=InMemorySaver()
- (dev) or
- PostgresSaver
- (prod)
- Thread ID
- — pass
- {"configurable": {"thread_id": "..."}}
- to every
- invoke
- /
- stream
- call
- JSON-serializable payload
- — the value passed to
- interrupt()
- must be JSON-serializable
- Basic Interrupt + Resume
- interrupt(value)
- pauses the graph. The value surfaces in the result under
- interrupt
- .
- Command(resume=value)
- resumes — the resume value becomes the return value of
- interrupt()
- .
- Critical
- when the graph resumes, the node restarts from the beginning — all code before interrupt() re-runs. class State(TypedDict): approved: bool def approval_node(state: State):
Pause and ask for approval
approved = interrupt("Do you approve this action?")
When resumed, Command(resume=...) returns that value here
return {"approved": approved}
checkpointer = InMemorySaver()
graph = (
StateGraph(State)
.add_node("approval", approval_node)
.add_edge(START, "approval")
.add_edge("approval", END)
.compile(checkpointer=checkpointer)
)
config = {"configurable": {"thread_id": "thread-1"}}
Initial run — hits interrupt and pauses
result = graph.invoke({"approved": False}, config)
print(result["
interrupt
"])
[Interrupt(value='Do you approve this action?')]
Resume with the human's response
result = graph.invoke(Command(resume=True), config)
print(result["approved"]) # True
interrupt() must come first — any code before it will re-run on resume
human_decision = interrupt({ "email_id": state.get("email_content", ""), "draft_response": state.get("draft_response", ""), "urgency": classification.get("urgency"), "action": "Please review and approve/edit this response" })
Process the human's decision
if human_decision.get("approved"): return Command( update={"draft_response": human_decision.get("edited_response", state.get("draft_response", ""))}, goto="send_reply" ) else:
Rejection — human will handle directly
return Command(update={}, goto=END)
Validate the input
if isinstance(answer, int) and answer > 0: break else:
Invalid input — ask again with a more specific prompt
prompt = f"'{answer}' is not a valid age. Please enter a positive number."
return {"age": answer}
Each Command(resume=...) call provides the next answer. If invalid, the loop re-interrupts with a clearer message.
```python
config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config)
interrupt: "What is your age?"
retry = graph.invoke(Command(resume="thirty"), config)
interrupt: "'thirty' is not a valid age..."
final = graph.invoke(Command(resume=30), config)
print(final["age"]) # 30
const getAgeNode = (state: typeof State.State) => {
let prompt = "What is your age?";
while (true) {
const answer = interrupt(prompt);
// Validate the input
if (typeof answer === "number" && answer > 0) {
return { age: answer };
} else {
// Invalid input — ask again with a more specific prompt
prompt = '${answer}' is not a valid age. Please enter a positive number.;
}
}
};
Multiple Interrupts
When parallel branches each call interrupt(), resume all of them in a single invocation by mapping each interrupt ID to its resume value.
Both parallel nodes hit interrupt() and pause
result = graph.invoke({"vals": []}, config)
result["interrupt"] contains both Interrupt objects with IDs
Resume all pending interrupts at once using a map of id -> value
resume_map = { i.id: f"answer for {i.value}" for i in result["interrupt"] } result = graph.invoke(Command(resume=resume_map), config)
result["vals"] = ["a:answer for question_a", "b:answer for question_b"]
const State = Annotation.Root({
vals: Annotationinterrupt() to pause and collect missing data — that's the pattern covered by this skill. For the full 4-tier error handling strategy (RetryPolicy, Command error loops, etc.), see the fundamentals skill.
Side Effects Before Interrupt Must Be Idempotent
When the graph resumes, the node restarts from the beginning — ALL code before interrupt() re-runs. In subgraphs, BOTH the parent node and the subgraph node re-execute.
interrupt()
- Use check-before-create patterns
- Place side effects after interrupt() when possible
- Separate side effects into their own nodes
Don't:
- Create new records before interrupt() — duplicates on each resume
- Append to lists before interrupt() — duplicate entries on each resume
GOOD: Upsert is idempotent — safe before interrupt
def node_a(state: State): db.upsert_user(user_id=state["user_id"], status="pending_approval") approved = interrupt("Approve this change?") return {"approved": approved}
GOOD: Side effect AFTER interrupt — only runs once
def node_a(state: State): approved = interrupt("Approve this change?") if approved: db.create_audit_log(user_id=state["user_id"], action="approved") return {"approved": approved}
BAD: Insert creates duplicates on each resume!
def node_a(state: State):
audit_id = db.create_audit_log({ # Runs again on resume!
"user_id": state["user_id"],
"action": "pending_approval",
})
approved = interrupt("Approve this change?")
return {"approved": approved}
// GOOD: Side effect AFTER interrupt — only runs once
const nodeA = async (state: typeof State.State) => {
const approved = interrupt("Approve this change?");
if (approved) {
await db.createAuditLog({ userId: state.userId, action: "approved" });
}
return { approved };
};
// BAD: Insert creates duplicates on each resume!
const nodeA = async (state: typeof State.State) => {
await db.createAuditLog({ // Runs again on resume!
userId: state.userId,
action: "pending_approval",
});
const approved = interrupt("Approve this change?");
return { approved };
};
Subgraph re-execution on resume
When a subgraph contains an interrupt(), resuming re-executes BOTH the parent node (that invoked the subgraph) AND the subgraph node (that called interrupt()):
...
def node_in_subgraph(state: State): some_other_code() # <-- Also re-executes on resume result = interrupt("What's your name?")
...
async function nodeInSubgraph(state: State) { someOtherCode(); // <-- Also re-executes on resume const result = interrupt("What's your name?"); // ... }
Command(resume) Warning
Command(resume=...) is the only Command pattern intended as input to invoke()/stream(). Do NOT pass Command(update=...) as input — it resumes from the latest checkpoint and the graph appears stuck. See the fundamentals skill for the full antipattern explanation.
Fixes
WRONG
graph = builder.compile()
CORRECT
graph = builder.compile(checkpointer=InMemorySaver())
// CORRECT
const graph = builder.compile({ checkpointer: new MemorySaver() });
WRONG
graph.invoke({"resume_data": "approve"}, config)
CORRECT
graph.invoke(Command(resume="approve"), config)
// CORRECT
await graph.invoke(new Command({ resume: "approve" }), config);
What You Should NOT Do
- Use interrupts without a checkpointer — will fail
- Resume without the same thread_id — creates a new thread instead of resuming
- Pass
Command(update=...)as invoke input — graph appears stuck (use plain dict) - Perform non-idempotent side effects before
interrupt()— creates duplicates on resume - Assume code before
interrupt()only runs once — it re-runs every resume