diff --git a/src/oss/langgraph/persistence.mdx b/src/oss/langgraph/persistence.mdx index 3a648285b1..bd91b2700b 100644 --- a/src/oss/langgraph/persistence.mdx +++ b/src/oss/langgraph/persistence.mdx @@ -4,7 +4,7 @@ title: Persistence -LangGraph has a built-in persistence layer, implemented through checkpointers. When you compile a graph with a checkpointer, the checkpointer saves a `checkpoint` of the graph state at every super-step. Those checkpoints are saved to a `thread`, which can be accessed after graph execution. Because `threads` allow access to graph's state after execution, several powerful capabilities including human-in-the-loop, memory, time travel, and fault-tolerance are all possible. Below, we'll discuss each of these concepts in more detail. +LangGraph has a built-in persistence layer that saves graph state as checkpoints. When you compile a graph with a checkpointer, a snapshot of the graph state is saved at every step of execution, organized into threads. This enables human-in-the-loop workflows, conversational memory, time travel debugging, and fault-tolerant execution. ![Checkpoints](/oss/images/checkpoints.jpg) @@ -13,7 +13,19 @@ LangGraph has a built-in persistence layer, implemented through checkpointers. W When using the [Agent Server](/langsmith/agent-server), you don't need to implement or configure checkpointers manually. The server handles all persistence infrastructure for you behind the scenes. -## Threads +## Why use persistence + +Persistence is required for the following features: + +- **Human-in-the-loop**: Checkpointers facilitate [human-in-the-loop workflows](/oss/langgraph/interrupts) by allowing humans to inspect, interrupt, and approve graph steps. Checkpointers are needed for these workflows as the person has to be able to view the state of a graph at any point in time, and the graph has to be able to resume execution after the person has made any updates to the state. See [Interrupts](/oss/langgraph/interrupts) for examples. +- **Memory**: Checkpointers allow for ["memory"](/oss/concepts/memory) between interactions. In the case of repeated human interactions (like conversations) any follow up messages can be sent to that thread, which will retain its memory of previous ones. See [Add memory](/oss/langgraph/add-memory) for information on how to add and manage conversation memory using checkpointers. +- **Time travel**: Checkpointers allow for ["time travel"](/oss/langgraph/use-time-travel), allowing users to replay prior graph executions to review and / or debug specific graph steps. In addition, checkpointers make it possible to fork the graph state at arbitrary checkpoints to explore alternative trajectories. +- **Fault-tolerance**: Checkpointing provides fault-tolerance and error recovery: if one or more nodes fail at a given superstep, you can restart your graph from the last successful step. +- **Pending writes**: When a graph node fails mid-execution at a given [super-step](#super-steps), LangGraph stores pending checkpoint writes from any other nodes that completed successfully at that super-step. When you resume graph execution from that super-step you don't re-run the successful nodes. + +## Core concepts + +### Threads A thread is a unique ID or thread identifier assigned to each checkpoint saved by a checkpointer. It contains the accumulated state of a sequence of [runs](/langsmith/assistants#execution). When a run is executed, the [state](/oss/langgraph/graph-api#state) of the underlying graph of the assistant will be persisted to the thread. @@ -39,15 +51,13 @@ A thread's current and historical state can be retrieved. To persist state, a th The checkpointer uses `thread_id` as the primary key for storing and retrieving checkpoints. Without it, the checkpointer cannot save state or resume execution after an [interrupt](/oss/langgraph/interrupts), since the checkpointer uses `thread_id` to load the saved state. -## Checkpoints +### Checkpoints -The state of a thread at a particular point in time is called a checkpoint. Checkpoint is a snapshot of the graph state saved at each super-step and is represented by `StateSnapshot` object with the following key properties: +The state of a thread at a particular point in time is called a checkpoint. A checkpoint is a snapshot of the graph state saved at each [super-step](#super-steps) and is represented by a `StateSnapshot` object (see [StateSnapshot fields](#statesnapshot-fields) for the full field reference). -* `config`: Config associated with this checkpoint. -* `metadata`: Metadata associated with this checkpoint. -* `values`: Values of the state channels at this point in time. -* `next` A tuple of the node names to execute next in the graph. -* `tasks`: A tuple of `PregelTask` objects that contain information about next tasks to be executed. If the step was previously attempted, it will include error information. If a graph was interrupted [dynamically](/oss/langgraph/interrupts#pause-using-interrupt) from within a node, tasks will contain additional data associated with interrupts. +#### Super-steps + +LangGraph created a checkpoint at each **super-step** boundary. A super-step is a single "tick" of the graph where all nodes scheduled for that step execute (potentially in parallel). For a sequential graph like `START -> A -> B -> END`, there are separate super-steps for the input, node A, and node B — producing a checkpoint after each one. Understanding super-step boundaries is important for [time travel](/oss/langgraph/use-time-travel), because you can only resume execution from a checkpoint (i.e., a super-step boundary). Checkpoints are persisted and can be used to restore the state of a thread at a later time. @@ -145,6 +155,40 @@ After we run the graph, we expect to see exactly 4 checkpoints: Note that the `bar` channel values contain outputs from both nodes as we have a reducer for the `bar` channel. ::: +#### Checkpoint namespace + +Each checkpoint has a `checkpoint_ns` (checkpoint namespace) field that identifies which graph or subgraph it belongs to: + +- **`""`** (empty string): The checkpoint belongs to the parent (root) graph. +- **`"node_name:uuid"`**: The checkpoint belongs to a subgraph invoked as the given node. For nested subgraphs, namespaces are joined with `|` separators (e.g., `"outer_node:uuid|inner_node:uuid"`). + +You can access the checkpoint namespace from within a node via the config: + +:::python +```python +from langchain_core.runnables import RunnableConfig + +def my_node(state: State, config: RunnableConfig): + checkpoint_ns = config["configurable"]["checkpoint_ns"] + # "" for the parent graph, "node_name:uuid" for a subgraph +``` +::: + +:::js +```typescript +import { RunnableConfig } from "@langchain/core/runnables"; + +function myNode(state: typeof State.Type, config: RunnableConfig) { + const checkpointNs = config.configurable?.checkpoint_ns; + // "" for the parent graph, "node_name:uuid" for a subgraph +} +``` +::: + +See [Subgraphs](/oss/langgraph/use-subgraphs) for more details on working with subgraph state and checkpoints. + +## Get and update state + ### Get state :::python @@ -227,6 +271,36 @@ StateSnapshot { ``` ::: +#### StateSnapshot fields + +:::python + +| Field | Type | Description | +|-------|------|-------------| +| `values` | `dict` | State channel values at this checkpoint. | +| `next` | `tuple[str, ...]` | Node names to execute next. Empty `()` means the graph is complete. | +| `config` | `dict` | Contains `thread_id`, `checkpoint_ns`, and `checkpoint_id`. | +| `metadata` | `dict` | Execution metadata. Contains `source` (`"input"`, `"loop"`, or `"update"`), `writes` (node outputs), and `step` (super-step counter). | +| `created_at` | `str` | ISO 8601 timestamp of when this checkpoint was created. | +| `parent_config` | `dict \| None` | Config of the previous checkpoint. `None` for the first checkpoint. | +| `tasks` | `tuple[PregelTask, ...]` | Tasks to execute at this step. Each task has `id`, `name`, `error`, `interrupts`, and optionally `state` (subgraph snapshot, when using `subgraphs=True`). | + +::: + +:::js + +| Field | Type | Description | +|-------|------|-------------| +| `values` | `object` | State channel values at this checkpoint. | +| `next` | `string[]` | Node names to execute next. Empty `[]` means the graph is complete. | +| `config` | `object` | Contains `thread_id`, `checkpoint_ns`, and `checkpoint_id`. | +| `metadata` | `object` | Execution metadata. Contains `source` (`"input"`, `"loop"`, or `"update"`), `writes` (node outputs), and `step` (super-step counter). | +| `createdAt` | `string` | ISO 8601 timestamp of when this checkpoint was created. | +| `parentConfig` | `object \| null` | Config of the previous checkpoint. `null` for the first checkpoint. | +| `tasks` | `PregelTask[]` | Tasks to execute at this step. Each task has `id`, `name`, `error`, `interrupts`, and optionally `state` (subgraph snapshot, when using `subgraphs: true`). | + +::: + ### Get state history :::python @@ -420,142 +494,74 @@ In our example, the output of `getStateHistory` will look like this: ![State](/oss/images/get_state.jpg) -### Replay - -It's also possible to play-back a prior graph execution. If we `invoke` a graph with a `thread_id` and a `checkpoint_id`, then we will _re-play_ the previously executed steps _before_ a checkpoint that corresponds to the `checkpoint_id`, and only execute the steps _after_ the checkpoint. +#### Find a specific checkpoint -* `thread_id` is the ID of a thread. -* `checkpoint_id` is an identifier that refers to a specific checkpoint within a thread. - -You must pass these when invoking the graph as part of the `configurable` portion of the config: +You can filter the state history to find checkpoints matching specific criteria: :::python ```python -config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}} -graph.invoke(None, config=config) -``` -::: - -:::js -```typescript -const config = { - configurable: { - thread_id: "1", - checkpoint_id: "0c62ca34-ac19-445d-bbb0-5b4984975b2a", - }, -}; -await graph.invoke(null, config); -``` -::: - -Importantly, LangGraph knows whether a particular step has been executed previously. If it has, LangGraph simply _re-plays_ that particular step in the graph and does not re-execute the step, but only for the steps _before_ the provided `checkpoint_id`. All of the steps _after_ `checkpoint_id` will be executed (i.e., a new fork), even if they have been executed previously. See this [how to guide on time-travel to learn more about replaying](/oss/langgraph/use-time-travel). - -![Replay](/oss/images/re_play.png) - -### Update state - -:::python -In addition to re-playing the graph from specific `checkpoints`, we can also _edit_ the graph state. We do this using @[`update_state`]. This method accepts three different arguments: -::: - -:::js -In addition to re-playing the graph from specific `checkpoints`, we can also _edit_ the graph state. We do this using `graph.updateState()`. This method accepts three different arguments: -::: - -#### `config` +history = list(graph.get_state_history(config)) -The config should contain `thread_id` specifying which thread to update. When only the `thread_id` is passed, we update (or fork) the current state. Optionally, if we include `checkpoint_id` field, then we fork that selected checkpoint. +# Find the checkpoint before a specific node executed +before_node_b = next(s for s in history if s.next == ("node_b",)) -#### `values` +# Find a checkpoint by step number +step_2 = next(s for s in history if s.metadata["step"] == 2) -These are the values that will be used to update the state. Note that this update is treated exactly as any update from a node is treated. This means that these values will be passed to the [reducer](/oss/langgraph/graph-api#reducers) functions, if they are defined for some of the channels in the graph state. This means that @[`update_state`] does NOT automatically overwrite the channel values for every channel, but only for the channels without reducers. Let's walk through an example. +# Find checkpoints created by update_state +forks = [s for s in history if s.metadata["source"] == "update"] -Let's assume you have defined the state of your graph with the following schema (see full example above): - -:::python -```python -from typing import Annotated -from typing_extensions import TypedDict -from operator import add - -class State(TypedDict): - foo: int - bar: Annotated[list[str], add] +# Find the checkpoint where an interrupt occurred +interrupted = next( + s for s in history + if s.tasks and any(t.interrupts for t in s.tasks) +) ``` ::: :::js ```typescript -import { StateSchema, ReducedValue } from "@langchain/langgraph"; -import * as z from "zod"; +const history: StateSnapshot[] = []; +for await (const state of graph.getStateHistory(config)) { + history.push(state); +} -const State = new StateSchema({ - foo: z.number(), - bar: new ReducedValue( - z.array(z.string()).default(() => []), - { - inputSchema: z.array(z.string()), - reducer: (x, y) => x.concat(y), - } - ), -}); -``` -::: +// Find the checkpoint before a specific node executed +const beforeNodeB = history.find((s) => s.next.includes("nodeB")); -Let's now assume the current state of the graph is +// Find a checkpoint by step number +const step2 = history.find((s) => s.metadata.step === 2); -:::python -``` -{"foo": 1, "bar": ["a"]} -``` -::: +// Find checkpoints created by updateState +const forks = history.filter((s) => s.metadata.source === "update"); -:::js -```typescript -{ foo: 1, bar: ["a"] } +// Find the checkpoint where an interrupt occurred +const interrupted = history.find( + (s) => s.tasks.length > 0 && s.tasks.some((t) => t.interrupts.length > 0) +); ``` ::: -If you update the state as below: +### Replay -:::python -```python -graph.update_state(config, {"foo": 2, "bar": ["b"]}) -``` -::: +Replay re-executes steps from a prior checkpoint. Invoke the graph with a prior `checkpoint_id` to re-run nodes after that checkpoint. Nodes before the checkpoint are skipped (their results are already saved). Nodes after the checkpoint re-execute, including any LLM calls, API requests, or [interrupts](/oss/langgraph/interrupts) — which are always re-triggered during replay. -:::js -```typescript -await graph.updateState(config, { foo: 2, bar: ["b"] }); -``` -::: +See [Time travel](/oss/langgraph/use-time-travel) for full details and code examples on replaying past executions. + +![Replay](/oss/images/re_play.png) -Then the new state of the graph will be: +### Update state :::python -``` -{"foo": 2, "bar": ["a", "b"]} -``` +You can edit the graph state using @[`update_state`]. This creates a new checkpoint with the updated values — it does not modify the original checkpoint. The update is treated the same as a node update: values are passed through [reducer](/oss/langgraph/graph-api#reducers) functions when defined, so channels with reducers _accumulate_ values rather than overwrite them. -The `foo` key (channel) is completely changed (because there is no reducer specified for that channel, so @[`update_state`] overwrites it). However, there is a reducer specified for the `bar` key, and so it appends `"b"` to the state of `bar`. +You can optionally specify `as_node` to control which node the update is treated as coming from, which affects which node executes next. See [Time travel: `as_node`](/oss/langgraph/use-time-travel#control-which-node-runs-next-with-as_node) for details. ::: :::js -```typescript -{ foo: 2, bar: ["a", "b"] } -``` +You can edit the graph state using `graph.updateState()`. This creates a new checkpoint with the updated values — it does not modify the original checkpoint. The update is treated the same as a node update: values are passed through [reducer](/oss/langgraph/graph-api#reducers) functions when defined, so channels with reducers _accumulate_ values rather than overwrite them. -The `foo` key (channel) is completely changed (because there is no reducer specified for that channel, so `updateState` overwrites it). However, there is a reducer specified for the `bar` key, and so it appends `"b"` to the state of `bar`. -::: - -#### `as_node` - -:::python -The final thing you can optionally specify when calling @[`update_state`] is `as_node`. If you provided it, the update will be applied as if it came from node `as_node`. If `as_node` is not provided, it will be set to the last node that updated the state, if not ambiguous. The reason this matters is that the next steps to execute depend on the last node to have given an update, so this can be used to control which node executes next. See this [how to guide on time-travel to learn more about forking state](/oss/langgraph/use-time-travel). -::: - -:::js -The final thing you can optionally specify when calling `updateState` is `asNode`. If you provide it, the update will be applied as if it came from node `asNode`. If `asNode` is not provided, it will be set to the last node that updated the state, if not ambiguous. The reason this matters is that the next steps to execute depend on the last node to have given an update, so this can be used to control which node executes next. See this [how to guide on time-travel to learn more about forking state](/oss/langgraph/use-time-travel). +You can optionally specify `asNode` to control which node the update is treated as coming from, which affects which node executes next. See [Time travel: `asNode`](/oss/langgraph/use-time-travel#control-which-node-runs-next-with-as_node) for details. ::: ![Update](/oss/images/checkpoints_full_story.jpg) @@ -1159,24 +1165,3 @@ checkpointer.setup() When running on LangSmith, encryption is automatically enabled whenever `LANGGRAPH_AES_KEY` is present, so you only need to provide the environment variable. Other encryption schemes can be used by implementing @[`CipherProtocol`] and supplying it to @[`EncryptedSerializer`]. ::: -## Capabilities - -### Human-in-the-loop - -First, checkpointers facilitate [human-in-the-loop workflows](/oss/langgraph/interrupts) by allowing humans to inspect, interrupt, and approve graph steps. Checkpointers are needed for these workflows as the human has to be able to view the state of a graph at any point in time, and the graph has to be to resume execution after the human has made any updates to the state. See [the how-to guides](/oss/langgraph/interrupts) for examples. - -### Memory - -Second, checkpointers allow for ["memory"](/oss/concepts/memory) between interactions. In the case of repeated human interactions (like conversations) any follow up messages can be sent to that thread, which will retain its memory of previous ones. See [Add memory](/oss/langgraph/add-memory) for information on how to add and manage conversation memory using checkpointers. - -### Time travel - -Third, checkpointers allow for ["time travel"](/oss/langgraph/use-time-travel), allowing users to replay prior graph executions to review and / or debug specific graph steps. In addition, checkpointers make it possible to fork the graph state at arbitrary checkpoints to explore alternative trajectories. - -### Fault-tolerance - -Lastly, checkpointing also provides fault-tolerance and error recovery: if one or more nodes fail at a given superstep, you can restart your graph from the last successful step. Additionally, when a graph node fails mid-execution at a given superstep, LangGraph stores pending checkpoint writes from any other nodes that completed successfully at that superstep, so that whenever we resume graph execution from that superstep we don't re-run the successful nodes. - -#### Pending writes - -Additionally, when a graph node fails mid-execution at a given superstep, LangGraph stores pending checkpoint writes from any other nodes that completed successfully at that superstep, so that whenever we resume graph execution from that superstep we don't re-run the successful nodes. diff --git a/src/oss/langgraph/use-time-travel.mdx b/src/oss/langgraph/use-time-travel.mdx index 68b13493b3..a286e35779 100644 --- a/src/oss/langgraph/use-time-travel.mdx +++ b/src/oss/langgraph/use-time-travel.mdx @@ -1,408 +1,511 @@ --- title: Use time-travel sidebarTitle: Time travel +description: Replay past executions and fork to explore alternative paths in LangGraph --- +## Overview +LangGraph supports time travel through [checkpoints](/oss/langgraph/persistence#checkpoints): -When working with non-deterministic systems that make model-based decisions (e.g., agents powered by LLMs), it can be useful to examine their decision-making process in detail: +- **[Replay](#replay)**: Retry from a prior checkpoint. +- **[Fork](#fork)**: Branch from a prior checkpoint with modified state to explore an alternative path. -1. **Understand reasoning**: Analyze the steps that led to a successful result. -2. **Debug mistakes**: Identify where and why errors occurred. -3. **Explore alternatives**: Test different paths to uncover better solutions. +Both work by resuming from a prior checkpoint. Nodes before the checkpoint are not re-executed (results are already saved). Nodes after the checkpoint re-execute, including any LLM calls, API requests, and [interrupts](/oss/langgraph/interrupts) (which may produce different results). -LangGraph provides time-travel functionality to support these use cases. Specifically, you can resume execution from a prior checkpoint — either replaying the same state or modifying it to explore alternatives. In all cases, resuming past execution produces a new fork in the history. +## Replay -To use time-travel in LangGraph: +Invoke the graph with a prior checkpoint's config to replay from that point. -:::python -1. [Run the graph](#1-run-the-graph) with initial inputs using @[`invoke`][CompiledStateGraph.invoke] or @[`stream`][CompiledStateGraph.stream] methods. -2. [Identify a checkpoint in an existing thread](#2-identify-a-checkpoint): Use the @[`get_state_history`] method to retrieve the execution history for a specific `thread_id` and locate the desired `checkpoint_id`. - Alternatively, set an [interrupt](/oss/langgraph/interrupts) before the node(s) where you want execution to pause. You can then find the most recent checkpoint recorded up to that interrupt. -3. [Update the graph state (optional)](#3-update-the-state-optional): Use the @[`update_state`] method to modify the graph's state at the checkpoint and resume execution from alternative state. -4. [Resume execution from the checkpoint](#4-resume-execution-from-the-checkpoint): Use the `invoke` or `stream` methods with an input of `None` and a configuration containing the appropriate `thread_id` and `checkpoint_id`. -::: - -:::js -1. [Run the graph](#1-run-the-graph) with initial inputs using @[`invoke`][CompiledStateGraph.invoke] or @[`stream`][CompiledStateGraph.stream] methods. -2. [Identify a checkpoint in an existing thread](#2-identify-a-checkpoint): Use the @[`getStateHistory`] method to retrieve the execution history for a specific `thread_id` and locate the desired `checkpoint_id`. - Alternatively, set a [breakpoint](/oss/langgraph/interrupts) before the node(s) where you want execution to pause. You can then find the most recent checkpoint recorded up to that breakpoint. -3. [Update the graph state (optional)](#3-update-the-state-optional): Use the @[`updateState`] method to modify the graph's state at the checkpoint and resume execution from alternative state. -4. [Resume execution from the checkpoint](#4-resume-execution-from-the-checkpoint): Use the `invoke` or `stream` methods with an input of `null` and a configuration containing the appropriate `thread_id` and `checkpoint_id`. -::: + -## In a workflow +Replay re-executes nodes — it doesn't just read from cache. LLM calls, API requests, and [interrupts](/oss/langgraph/interrupts) fire again and may return different results. Replaying from the final checkpoint (no `next` nodes) is a no-op. -This example builds a simple LangGraph workflow that generates a joke topic and writes a joke using an LLM. It demonstrates how to run the graph, retrieve past execution checkpoints, optionally modify the state, and resume execution from a chosen checkpoint to explore alternate outcomes. + -### Setup - -To build this workflow in this example you need to set up the Anthropic LLM and install the required dependencies: +![Replay](/oss/images/re_play.png) :::python -1. Install dependencies: -```bash -pip install langchain_core langchain-anthropic langgraph -``` - -2. Initialize the LLM: - -```python -import os -import getpass - -from langchain_anthropic import ChatAnthropic - -def _set_env(var: str): - if not os.environ.get(var): - os.environ[var] = getpass.getpass(f"{var}: ") - - -_set_env("ANTHROPIC_API_KEY") - -llm = ChatAnthropic(model="claude-sonnet-4-6") -``` -::: - -:::js -1. Install dependencies - -```bash npm -npm install @langchain/langgraph @langchain/core -``` - -```bash pnpm -pnpm add @langchain/langgraph @langchain/core -``` - -```bash yarn -yarn add @langchain/langgraph @langchain/core -``` - -```bash bun -bun add @langchain/langgraph @langchain/core -``` - - -2. Initialize the LLM: - -```typescript -import { ChatAnthropic } from "@langchain/anthropic"; +Use @[`get_state_history`] to find the checkpoint you want to replay from, then call @[`invoke`][CompiledStateGraph.invoke] with that checkpoint's config: -const llm = new ChatAnthropic({ - model: "claude-sonnet-4-6", - apiKey: "" -}); -``` -::: - - - Sign up for [LangSmith](https://smith.langchain.com) to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph. You can also [fetch traces from LangSmith](/langsmith/export-traces#fetch-a-single-run-by-id) to replay and debug production issues locally. - - -3. Implement the workflow -The implementation of the workflow is a simple graph with two nodes, one for generating a joke topic, another for writing the joke itself and a state to storing the intermediate values. - -:::python ```python -import uuid - -from typing_extensions import TypedDict, NotRequired -from langgraph.graph import StateGraph, START, END -from langchain.chat_models import init_chat_model +from langgraph.graph import StateGraph, START from langgraph.checkpoint.memory import InMemorySaver - +from typing_extensions import TypedDict, NotRequired +import uuid class State(TypedDict): topic: NotRequired[str] joke: NotRequired[str] -model = init_chat_model( - "claude-sonnet-4-6", - temperature=0, -) - - def generate_topic(state: State): - """LLM call to generate a topic for the joke""" - msg = model.invoke("Give me a funny topic for a joke") - return {"topic": msg.content} + return {"topic": "socks in the dryer"} def write_joke(state: State): - """LLM call to write a joke based on the topic""" - msg = model.invoke(f"Write a short joke about {state['topic']}") - return {"joke": msg.content} + return {"joke": f"Why do {state['topic']} disappear? They elope!"} -# Build workflow -workflow = StateGraph(State) +checkpointer = InMemorySaver() +graph = ( + StateGraph(State) + .add_node("generate_topic", generate_topic) + .add_node("write_joke", write_joke) + .add_edge(START, "generate_topic") + .add_edge("generate_topic", "write_joke") + .compile(checkpointer=checkpointer) +) -# Add nodes -workflow.add_node("generate_topic", generate_topic) -workflow.add_node("write_joke", write_joke) +# Step 1: Run the graph +config = {"configurable": {"thread_id": str(uuid.uuid4())}} +result = graph.invoke({}, config) -# Add edges to connect nodes -workflow.add_edge(START, "generate_topic") -workflow.add_edge("generate_topic", "write_joke") -workflow.add_edge("write_joke", END) +# Step 2: Find a checkpoint to replay from +history = list(graph.get_state_history(config)) +# History is in reverse chronological order +for state in history: + print(f"next={state.next}, checkpoint_id={state.config['configurable']['checkpoint_id']}") -# Compile -checkpointer = InMemorySaver() -graph = workflow.compile(checkpointer=checkpointer) -graph +# Step 3: Replay from a specific checkpoint +# Find the checkpoint before write_joke +before_joke = next(s for s in history if s.next == ("write_joke",)) +replay_result = graph.invoke(None, before_joke.config) +# write_joke re-executes (runs again), generate_topic does not ``` ::: :::js +Use @[`getStateHistory`] to find the checkpoint you want to replay from, then call @[`invoke`][CompiledStateGraph.invoke] with that checkpoint's config: + ```typescript import { v4 as uuidv4 } from "uuid"; -import * as z from "zod"; -import { StateGraph, StateSchema, GraphNode, START, END, MemorySaver } from "@langchain/langgraph"; -import { ChatAnthropic } from "@langchain/anthropic"; +import { StateGraph, MemorySaver, START } from "@langchain/langgraph"; -const State = new StateSchema({ - topic: z.string().optional(), - joke: z.string().optional(), +const StateAnnotation = Annotation.Root({ + topic: Annotation(), + joke: Annotation(), }); -const model = new ChatAnthropic({ - model: "claude-sonnet-4-6", - temperature: 0, -}); +function generateTopic(state: typeof StateAnnotation.State) { + return { topic: "socks in the dryer" }; +} + +function writeJoke(state: typeof StateAnnotation.State) { + return { joke: `Why do ${state.topic} disappear? They elope!` }; +} -const generateTopic: GraphNode = async (state) => { - // LLM call to generate a topic for the joke - const msg = await model.invoke("Give me a funny topic for a joke"); - return { topic: msg.content }; -}; - -const writeJoke: GraphNode = async (state) => { - // LLM call to write a joke based on the topic - const msg = await model.invoke(`Write a short joke about ${state.topic}`); - return { joke: msg.content }; -}; - -// Build workflow -const workflow = new StateGraph(State) - // Add nodes +const checkpointer = new MemorySaver(); +const graph = new StateGraph(StateAnnotation) .addNode("generateTopic", generateTopic) .addNode("writeJoke", writeJoke) - // Add edges to connect nodes .addEdge(START, "generateTopic") .addEdge("generateTopic", "writeJoke") - .addEdge("writeJoke", END); + .compile({ checkpointer }); -// Compile -const checkpointer = new MemorySaver(); -const graph = workflow.compile({ checkpointer }); +// Step 1: Run the graph +const config = { configurable: { thread_id: uuidv4() } }; +const result = await graph.invoke({}, config); + +// Step 2: Find a checkpoint to replay from +const states = []; +for await (const state of graph.getStateHistory(config)) { + states.push(state); +} + +// Step 3: Replay from a specific checkpoint +const beforeJoke = states.find((s) => s.next.includes("writeJoke")); +const replayResult = await graph.invoke(null, beforeJoke.config); +// writeJoke re-executes (runs again), generateTopic does not ``` ::: -### 1. Run the graph -To start the workflow, @[`invoke`][CompiledStateGraph.invoke] is called without any inputs. Note the `thread_id` to track this execution and retrieve its checkpoints later. +## Fork + +Fork creates a new branch from a past checkpoint with modified state. Call @[`update_state`] on a prior checkpoint to create the fork, then @[`invoke`][CompiledStateGraph.invoke] with `None` to continue execution. + +![Fork](/oss/images/checkpoints_full_story.jpg) + + + +`update_state` does **not** roll back a thread. It creates a new checkpoint that branches from the specified point. The original execution history remains intact. + + :::python ```python -config = { - "configurable": { - "thread_id": uuid.uuid4(), - } -} -state = graph.invoke({}, config) +# Find checkpoint before write_joke +history = list(graph.get_state_history(config)) +before_joke = next(s for s in history if s.next == ("write_joke",)) + +# Fork: update state to change the topic +fork_config = graph.update_state( + before_joke.config, + values={"topic": "chickens"}, +) -print(state["topic"]) -print() -print(state["joke"]) +# Resume from the fork — write_joke re-executes with the new topic +fork_result = graph.invoke(None, fork_config) +print(fork_result["joke"]) # A joke about chickens, not socks ``` ::: :::js ```typescript -const config = { - configurable: { - thread_id: uuidv4(), - }, -}; +// Find checkpoint before writeJoke +const states = []; +for await (const state of graph.getStateHistory(config)) { + states.push(state); +} +const beforeJoke = states.find((s) => s.next.includes("writeJoke")); -const state = await graph.invoke({}, config); +// Fork: update state to change the topic +const forkConfig = await graph.updateState( + beforeJoke.config, + { topic: "chickens" }, +); -console.log(state.topic); -console.log(); -console.log(state.joke); +// Resume from the fork — writeJoke re-executes with the new topic +const forkResult = await graph.invoke(null, forkConfig); +console.log(forkResult.joke); // A joke about chickens, not socks ``` ::: -**Output:** +### From a specific node -``` -How about "The Secret Life of Socks in the Dryer"? You know, exploring the mysterious phenomenon of how socks go into the laundry as pairs but come out as singles. Where do they go? Are they starting new lives elsewhere? Is there a sock paradise we don't know about? There's a lot of comedic potential in the everyday mystery that unites us all! +When you call @[`update_state`], values are applied using the specified node's writers (including [reducers](/oss/langgraph/graph-api#reducers)). The checkpoint records that node as having produced the update, and execution resumes from that node's successors. + +By default, LangGraph infers `as_node` from the checkpoint's version history. When forking from a specific checkpoint, this inference is almost always correct. + +Specify `as_node` explicitly when: -# The Secret Life of Socks in the Dryer +- **Parallel branches**: Multiple nodes updated state in the same step, and LangGraph can't determine which was last (`InvalidUpdateError`). +- **No execution history**: Setting up state on a fresh thread (common in [testing](/oss/langgraph/test)). +- **Skipping nodes**: Set `as_node` to a later node to make the graph think that node already ran. -I finally discovered where all my missing socks go after the dryer. Turns out they're not missing at all—they've just eloped with someone else's socks from the laundromat to start new lives together. +:::python +```python +# graph: generate_topic -> write_joke + +# Treat this update as if generate_topic produced it. +# Execution resumes at write_joke (the successor of generate_topic). +fork_config = graph.update_state( + before_joke.config, + values={"topic": "chickens"}, + as_node="generate_topic", +) +``` +::: -My blue argyle is now living in Bermuda with a red polka dot, posting vacation photos on Sockstagram and sending me lint as alimony. +:::js +```typescript +// graph: generateTopic -> writeJoke + +// Treat this update as if generateTopic produced it. +// Execution resumes at writeJoke (the successor of generateTopic). +const forkConfig = await graph.updateState( + beforeJoke.config, + { topic: "chickens" }, + { asNode: "generateTopic" }, +); ``` +::: + +## Interrupts -### 2. Identify a checkpoint -To continue from a previous point in the graphs run, use @[`get_state_history`] to retrieve all the states and select the one where you want to resume execution. +If your graph uses @[`interrupt`] for [human-in-the-loop](/oss/langgraph/interrupts) workflows, interrupts are always re-triggered during time travel. The node containing the interrupt re-executes, and `interrupt()` pauses for a new `Command(resume=...)`. :::python ```python -# The states are returned in reverse chronological order. -states = list(graph.get_state_history(config)) +from langgraph.types import interrupt, Command -for state in states: - print(state.next) - print(state.config["configurable"]["checkpoint_id"]) - print() -``` +class State(TypedDict): + value: list[str] + +def ask_human(state: State): + answer = interrupt("What is your name?") + return {"value": [f"Hello, {answer}!"]} + +def final_step(state: State): + return {"value": ["Done"]} + +graph = ( + StateGraph(State) + .add_node("ask_human", ask_human) + .add_node("final_step", final_step) + .add_edge(START, "ask_human") + .add_edge("ask_human", "final_step") + .compile(checkpointer=InMemorySaver()) +) -**Output:** +config = {"configurable": {"thread_id": "1"}} -``` -() -1f02ac4a-ec9f-6524-8002-8f7b0bbeed0e +# First run: hits interrupt +graph.invoke({"value": []}, config) +# Resume with answer +graph.invoke(Command(resume="Alice"), config) + +# Replay from before ask_human +history = list(graph.get_state_history(config)) +before_ask = [s for s in history if s.next == ("ask_human",)][-1] -('write_joke',) -1f02ac4a-ce2a-6494-8001-cb2e2d651227 +replay_result = graph.invoke(None, before_ask.config) +# Pauses at interrupt — waiting for new Command(resume=...) -('generate_topic',) -1f02ac4a-a4e0-630d-8000-b73c254ba748 +# Fork from before ask_human +fork_config = graph.update_state(before_ask.config, {"value": ["forked"]}) +fork_result = graph.invoke(None, fork_config) +# Pauses at interrupt — waiting for new Command(resume=...) -('__start__',) -1f02ac4a-a4dd-665e-bfff-e6c8c44315d9 +# Resume the forked interrupt with a different answer +graph.invoke(Command(resume="Bob"), fork_config) +# Result: {"value": ["forked", "Hello, Bob!", "Done"]} ``` ::: :::js ```typescript -// The states are returned in reverse chronological order. -const states = []; -for await (const state of graph.getStateHistory(config)) { - states.push(state); +import { interrupt, Command } from "@langchain/langgraph"; + +function askHuman(state: { value: string[] }) { + const answer = interrupt("What is your name?"); + return { value: [`Hello, ${answer}!`] }; } -for (const state of states) { - console.log(state.next); - console.log(state.config.configurable?.checkpoint_id); - console.log(); +function finalStep(state: { value: string[] }) { + return { value: ["Done"] }; } -``` -**Output:** +// ... build graph with checkpointer ... -``` -[] -1f02ac4a-ec9f-6524-8002-8f7b0bbeed0e +// First run: hits interrupt +await graph.invoke({ value: [] }, config); +// Resume with answer +await graph.invoke(new Command({ resume: "Alice" }), config); + +// Replay from before askHuman +const states = []; +for await (const state of graph.getStateHistory(config)) { + states.push(state); +} +const beforeAsk = states.filter((s) => s.next.includes("askHuman")).pop(); -['writeJoke'] -1f02ac4a-ce2a-6494-8001-cb2e2d651227 +const replayResult = await graph.invoke(null, beforeAsk.config); +// Pauses at interrupt — waiting for new Command({ resume: ... }) -['generateTopic'] -1f02ac4a-a4e0-630d-8000-b73c254ba748 +// Fork from before askHuman +const forkConfig = await graph.updateState(beforeAsk.config, { value: ["forked"] }); +const forkResult = await graph.invoke(null, forkConfig); +// Pauses at interrupt — waiting for new Command({ resume: ... }) -['__start__'] -1f02ac4a-a4dd-665e-bfff-e6c8c44315d9 +// Resume the forked interrupt with a different answer +await graph.invoke(new Command({ resume: "Bob" }), forkConfig); +// Result: { value: ["forked", "Hello, Bob!", "Done"] } ``` ::: +### Multiple interrupts + +If your graph collects input at several points (for example, a multi-step form), you can fork from between the interrupts to change a later answer without re-asking earlier questions. + :::python ```python -# This is the state before last (states are listed in chronological order) -selected_state = states[1] -print(selected_state.next) -print(selected_state.values) -``` +def ask_name(state): + name = interrupt("What is your name?") + return {"value": [f"name:{name}"]} -**Output:** +def ask_age(state): + age = interrupt("How old are you?") + return {"value": [f"age:{age}"]} -``` -('write_joke',) -{'topic': 'How about "The Secret Life of Socks in the Dryer"? You know, exploring the mysterious phenomenon of how socks go into the laundry as pairs but come out as singles. Where do they go? Are they starting new lives elsewhere? Is there a sock paradise we don\\'t know about? There\\'s a lot of comedic potential in the everyday mystery that unites us all!'} +# Graph: ask_name -> ask_age -> final +# After completing both interrupts: + +# Fork from BETWEEN the two interrupts (after ask_name, before ask_age) +history = list(graph.get_state_history(config)) +between = [s for s in history if s.next == ("ask_age",)][-1] + +fork_config = graph.update_state(between.config, {"value": ["modified"]}) +result = graph.invoke(None, fork_config) +# ask_name result preserved ("name:Alice") +# ask_age pauses at interrupt — waiting for new answer ``` ::: :::js ```typescript -// This is the state before last (states are listed in chronological order) -const selectedState = states[1]; -console.log(selectedState.next); -console.log(selectedState.values); -``` - -**Output:** +// Fork from BETWEEN the two interrupts (after askName, before askAge) +const states = []; +for await (const state of graph.getStateHistory(config)) { + states.push(state); +} +const between = states.filter((s) => s.next.includes("askAge")).pop(); -``` -['writeJoke'] -{'topic': 'How about "The Secret Life of Socks in the Dryer"? You know, exploring the mysterious phenomenon of how socks go into the laundry as pairs but come out as singles. Where do they go? Are they starting new lives elsewhere? Is there a sock paradise we don\\'t know about? There\\'s a lot of comedic potential in the everyday mystery that unites us all!'} +const forkConfig = await graph.updateState(between.config, { value: ["modified"] }); +const result = await graph.invoke(null, forkConfig); +// askName result preserved ("name:Alice") +// askAge pauses at interrupt — waiting for new answer ``` ::: - -### 3. Update the state (optional) +## Subgraphs -:::python -@[`update_state`] will create a new checkpoint. The new checkpoint will be associated with the same thread, but a new checkpoint ID. +Time travel with [subgraphs](/oss/langgraph/use-subgraphs) depends on whether the subgraph has its own checkpointer. This determines the granularity of checkpoints you can time travel from. + + + + +By default, a subgraph inherits the parent's checkpointer. The parent treats the entire subgraph as a **single super-step** — there is only one parent-level checkpoint for the whole subgraph execution. Time traveling from before the subgraph re-executes it from scratch. + +You cannot time travel to a point *between* nodes in a default subgraph — you can only time travel from the parent level. +:::python ```python -new_config = graph.update_state(selected_state.config, values={"topic": "chickens"}) -print(new_config) -``` +# Subgraph without its own checkpointer (default) +subgraph = ( + StateGraph(State) + .add_node("step_a", step_a) # Has interrupt() + .add_node("step_b", step_b) # Has interrupt() + .add_edge(START, "step_a") + .add_edge("step_a", "step_b") + .compile() # No checkpointer — inherits from parent +) -**Output:** +graph = ( + StateGraph(State) + .add_node("subgraph_node", subgraph) + .add_edge(START, "subgraph_node") + .compile(checkpointer=InMemorySaver()) +) -``` -{'configurable': {'thread_id': 'c62e2e03-c27b-4cb6-8cea-ea9bfedae006', 'checkpoint_ns': '', 'checkpoint_id': '1f02ac4a-ecee-600b-8002-a1d21df32e4c'}} +config = {"configurable": {"thread_id": "1"}} + +# Complete both interrupts +graph.invoke({"value": []}, config) # Hits step_a interrupt +graph.invoke(Command(resume="Alice"), config) # Hits step_b interrupt +graph.invoke(Command(resume="30"), config) # Completes + +# Time travel from before the subgraph +history = list(graph.get_state_history(config)) +before_sub = [s for s in history if s.next == ("subgraph_node",)][-1] + +fork_config = graph.update_state(before_sub.config, {"value": ["forked"]}) +result = graph.invoke(None, fork_config) +# The entire subgraph re-executes from scratch +# You cannot time travel to a point between step_a and step_b ``` ::: :::js -`updateState` will create a new checkpoint. The new checkpoint will be associated with the same thread, but a new checkpoint ID. - ```typescript -const newConfig = await graph.updateState(selectedState.config, { - topic: "chickens", -}); -console.log(newConfig); -``` - -**Output:** +// Subgraph without its own checkpointer (default) +const subgraph = new StateGraph(StateAnnotation) + .addNode("stepA", stepA) // Has interrupt() + .addNode("stepB", stepB) // Has interrupt() + .addEdge(START, "stepA") + .addEdge("stepA", "stepB") + .compile(); // No checkpointer — inherits from parent + +const graph = new StateGraph(StateAnnotation) + .addNode("subgraphNode", subgraph) + .addEdge(START, "subgraphNode") + .compile({ checkpointer }); + +// Complete both interrupts +await graph.invoke({ value: [] }, config); +await graph.invoke(new Command({ resume: "Alice" }), config); +await graph.invoke(new Command({ resume: "30" }), config); + +// Time travel from before the subgraph +const states = []; +for await (const state of graph.getStateHistory(config)) { + states.push(state); +} +const beforeSub = states.filter((s) => s.next.includes("subgraphNode")).pop(); -``` -{'configurable': {'thread_id': 'c62e2e03-c27b-4cb6-8cea-ea9bfedae006', 'checkpoint_ns': '', 'checkpoint_id': '1f02ac4a-ecee-600b-8002-a1d21df32e4c'}} +const forkConfig = await graph.updateState(beforeSub.config, { value: ["forked"] }); +const result = await graph.invoke(null, forkConfig); +// The entire subgraph re-executes from scratch +// You cannot time travel to a point between stepA and stepB ``` ::: -### 4. Resume execution from the checkpoint -For resumings execution from the selected checkpoint, call @[`invoke`][CompiledStateGraph.invoke] with the config that points to the new checkpoint. + + + +Set `checkpointer=True` on the subgraph to give it its own checkpoint history. This creates checkpoints at each step **within** the subgraph, allowing you to time travel from a specific point inside it — for example, between two interrupts. + +Use @[`get_state`] with `subgraphs=True` to access the subgraph's own checkpoint config, then fork from it: :::python ```python -graph.invoke(None, new_config) -``` +# Subgraph with its own checkpointer +subgraph = ( + StateGraph(State) + .add_node("step_a", step_a) # Has interrupt() + .add_node("step_b", step_b) # Has interrupt() + .add_edge(START, "step_a") + .add_edge("step_a", "step_b") + .compile(checkpointer=True) # Own checkpoint history +) -**Output:** +graph = ( + StateGraph(State) + .add_node("subgraph_node", subgraph) + .add_edge(START, "subgraph_node") + .compile(checkpointer=InMemorySaver()) +) -```python -{'topic': 'chickens', - 'joke': 'Why did the chicken join a band?\n\nBecause it had excellent drumsticks!'} +config = {"configurable": {"thread_id": "1"}} + +# Run until step_a interrupt +graph.invoke({"value": []}, config) + +# Resume step_a -> hits step_b interrupt +graph.invoke(Command(resume="Alice"), config) + +# Get the subgraph's own checkpoint (between step_a and step_b) +parent_state = graph.get_state(config, subgraphs=True) +sub_config = parent_state.tasks[0].state.config + +# Fork from the subgraph checkpoint +fork_config = graph.update_state(sub_config, {"value": ["forked"]}) +result = graph.invoke(None, fork_config) +# step_b re-executes, step_a's result is preserved ``` ::: :::js ```typescript -await graph.invoke(null, newConfig); +// Subgraph with its own checkpointer +const subgraph = new StateGraph(StateAnnotation) + .addNode("stepA", stepA) // Has interrupt() + .addNode("stepB", stepB) // Has interrupt() + .addEdge(START, "stepA") + .addEdge("stepA", "stepB") + .compile({ checkpointer: true }); // Own checkpoint history + +const graph = new StateGraph(StateAnnotation) + .addNode("subgraphNode", subgraph) + .addEdge(START, "subgraphNode") + .compile({ checkpointer }); + +// Run until stepA interrupt, then resume -> hits stepB interrupt +await graph.invoke({ value: [] }, config); +await graph.invoke(new Command({ resume: "Alice" }), config); + +// Get the subgraph's own checkpoint (between stepA and stepB) +const parentState = await graph.getState(config, { subgraphs: true }); +const subConfig = parentState.tasks[0].state.config; + +// Fork from the subgraph checkpoint +const forkConfig = await graph.updateState(subConfig, { value: ["forked"] }); +const result = await graph.invoke(null, forkConfig); +// stepB re-executes, stepA's result is preserved ``` +::: -**Output:** + + -```typescript -{ - 'topic': 'chickens', - 'joke': 'Why did the chicken join a band?\n\nBecause it had excellent drumsticks!' -} -``` -::: +See [subgraph persistence](/oss/langgraph/use-subgraphs#subgraph-persistence) for more on configuring subgraph checkpointers.