LangGraph框架研究-能力
上篇引言本地主要介绍langgraph能力目录引言1 持久化1.1 使用持久化理由1.2 核心概念1.2.1 线程1.2.2 检查点1.2.3 获取和更新状态1.2.4 记忆存储1.2.5 检查点库2 持久化执行3 流4 中断5 时间回溯6 记忆7 子图1 持久化LangGraph 拥有一个内置的持久化层它以检查点的形式保存图表状态。当你使用检查器来编译图表时系统会在执行的每一步都保存一份状态快照并将这些快照按线程进行组织。这使得人类介入工作流、对话记忆、时间回溯调试以及容错执行成为可能。当你使用 Agent Server 时无需手动实现或配置检查器。服务器会在后台为你处理所有的持久化基础设施。1.1 使用持久化理由持久化是以下功能的基础人类介入检查器通过允许人类检查、中断和批准图表步骤从而促进了人类介入的工作流。这些工作流离不开检查器因为用户必须能够随时查看图表的状态而且在用户对状态进行更新后图表必须能够恢复执行。记忆检查器允许在多次交互之间保留“记忆”。在重复的人类交互如对话场景中任何后续消息都可以发送到该线程而它会保留对之前消息的记忆。时间回溯检查器支持“时间回溯”允许用户重放之前的图表执行过程以审查或调试特定的图表步骤。此外检查器还使得在任意检查点分叉图表状态成为可能从而探索不同的执行路径。容错性检查点提供了容错和错误恢复能力如果在一个超级步骤中有一个或多个节点失败你可以从最后一个成功的步骤重新启动图表。待处理写入当图表节点在某个超级步骤的执行中途失败时LangGraph 会存储来自该步骤中其他已成功完成节点的“待处理检查点写入”。当你从该超级步骤恢复图表执行时你不需要重新运行那些已经成功的节点。1.2 核心概念1.2.1 线程线程是由检查器保存的每个检查点分配的一个唯一 ID或线程标识符。它包含了一系列运行所累积的状态。当运行一次执行时助手底层图表的状态就会被持久化到这个线程中。当使用检查器调用图表时你必须在配置的可配置部分中指定一个thread_id{configurable: {thread_id: 1}}你可以检索线程的当前状态和历史状态。为了持久化状态必须在执行运行之前创建一个线程。LangSmith API 提供了几个用于创建和管理线程及线程状态的端点。有关更多详细信息请参阅 API 参考文档。检查器使用thread_id作为存储和检索检查点的主键。没有它检查器就无法保存状态也无法在中断后恢复执行因为检查器正是依靠 thread_id 来加载已保存的状态。1.2.2 检查点线程在特定时间点的状态被称为检查点。检查点是每个超级步骤保存的图表状态快照并由一个 StateSnapshot 对象表示有关完整的字段参考请参阅 StateSnapshot 字段超级步骤super-stepsLangGraph 会在每个超级步骤的边界处创建一个检查点。超级步骤是图表的一次“跳动”在该步骤中所有计划好的节点都会执行可能是并行的。对于一个像START - A - B - END这样的顺序图表输入、节点 A 和节点 B 分别对应独立的超级步骤——并在每一步之后生成一个检查点。理解超级步骤的边界对于时间回溯非常重要因为你只能从检查点即超级步骤的边界恢复执行。检查点会被持久化并可用于在稍后恢复线程的状态。让我们来看看当一个简单的图表被如下调用时会保存哪些检查点from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver from langchain_core.runnables import RunnableConfig from typing import Annotated from typing_extensions import TypedDict from operator import add class State(TypedDict): foo: str bar: Annotated[list[str], add] def node_a(state: State): return {foo: a, bar: [a]} def node_b(state: State): return {foo: b, bar: [b]} workflow StateGraph(State) workflow.add_node(node_a) workflow.add_node(node_b) workflow.add_edge(START, node_a) workflow.add_edge(node_a, node_b) workflow.add_edge(node_b, END) checkpointer InMemorySaver() graph workflow.compile(checkpointercheckpointer) config: RunnableConfig {configurable: {thread_id: 1}} graph.invoke({foo: , bar:[]}, config)当我们运行图表后我们应该会看到正好 4 个检查点空检查点下一个要执行的节点是 START。包含用户输入{foo: , bar: []}的检查点下一个要执行的节点是 node_a。包含 node_a 输出{foo: a, bar: [a]}的检查点下一个要执行的节点是 node_b。包含 node_b 输出{foo: b, bar: [a, b]}的检查点没有下一个要执行的节点了执行结束。请注意bar 通道的值包含了来自两个节点的输出因为我们要为 bar 通道配置一个归约器。检查点命名空间每个检查点都有一个checkpoint_ns检查点命名空间字段用于标识它属于哪个图表或子图表 (空字符串)表示该检查点属于父级根图表。node_name:uuid表示该检查点属于作为给定节点被调用的子图表。对于嵌套的子图表命名空间会用|分隔符连接例如outer_node:uuid|inner_node:uuid。你可以通过配置在节点内部访问检查点命名空间from langchain_core.runnables import RunnableConfig def my_node(state: State, config: RunnableConfig): checkpoint_ns config[configurable][checkpoint_ns] # for the parent graph, node_name:uuid for a subgraph1.2.3 获取和更新状态获取状态当与保存的图表状态进行交互时你必须指定一个线程标识符。你可以通过调用graph.get_state(config)来查看图表的最新状态。这将返回一个StateSnapshot对象该对象对应于配置中提供的线程 ID 所关联的最新检查点或者如果提供了特定的检查点 ID则返回该检查点对应的状态。# get the latest state snapshot config {configurable: {thread_id: 1}} graph.get_state(config) # get a state snapshot for a specific checkpoint_id config {configurable: {thread_id: 1, checkpoint_id: 1ef663ba-28fe-6528-8002-5a559208592c}} graph.get_state(config)在我们的示例中get_state的输出结果看起来会是这样的StateSnapshot( values{foo: b, bar: [a, b]}, next(), config{configurable: {thread_id: 1, checkpoint_ns: , checkpoint_id: 1ef663ba-28fe-6528-8002-5a559208592c}}, metadata{source: loop, writes: {node_b: {foo: b, bar: [b]}}, step: 2}, created_at2024-08-29T19:19:38.82174900:00, parent_config{configurable: {thread_id: 1, checkpoint_ns: , checkpoint_id: 1ef663ba-28f9-6ec4-8001-31981c2c39f8}}, tasks() )字段类型描述valuesdict此检查点处的状态通道值。nexttuple[str, ...]接下来要执行的节点名称。如果是空元组()意味着图表执行已完成。configdict包含thread_id线程ID、checkpoint_ns检查点命名空间和checkpoint_id检查点ID。metadatadict执行元数据。包含source来源input、loop 或 update、writes节点输出和step超级步骤计数器。created_atstr创建此检查点时的 ISO 8601 时间戳。parent_configdict | None上一个检查点的配置。如果是第一个检查点则为None。taskstuple[PregelTask, ...]在此步骤中要执行的任务。每个任务包含id、name、error、interrupts以及可选的state当使用subgraphsTrue时为子图表快照。获取状态历史你可以通过调用 graph.get_state_history(config) 来获取指定线程的完整图执行历史。这将返回一个与配置中提供的线程 ID 关联的 StateSnapshot状态快照对象列表。重要的是这些检查点将按时间顺序排列最近的检查点/状态快照会位于列表的最前面config {configurable: {thread_id: 1}} list(graph.get_state_history(config))在我们的示例中get_state_history的输出结果看起来会是这样的[ StateSnapshot( values{foo: b, bar: [a, b]}, next(), config{configurable: {thread_id: 1, checkpoint_ns: , checkpoint_id: 1ef663ba-28fe-6528-8002-5a559208592c}}, metadata{source: loop, writes: {node_b: {foo: b, bar: [b]}}, step: 2}, created_at2024-08-29T19:19:38.82174900:00, parent_config{configurable: {thread_id: 1, checkpoint_ns: , checkpoint_id: 1ef663ba-28f9-6ec4-8001-31981c2c39f8}}, tasks(), ), StateSnapshot( values{foo: a, bar: [a]}, next(node_b,), config{configurable: {thread_id: 1, checkpoint_ns: , checkpoint_id: 1ef663ba-28f9-6ec4-8001-31981c2c39f8}}, metadata{source: loop, writes: {node_a: {foo: a, bar: [a]}}, step: 1}, created_at2024-08-29T19:19:38.81994600:00, parent_config{configurable: {thread_id: 1, checkpoint_ns: , checkpoint_id: 1ef663ba-28f4-6b4a-8000-ca575a13d36a}}, tasks(PregelTask(id6fb7314f-f114-5413-a1f3-d37dfe98ff44, namenode_b, errorNone, interrupts()),), ), StateSnapshot( values{foo: , bar: []}, next(node_a,), config{configurable: {thread_id: 1, checkpoint_ns: , checkpoint_id: 1ef663ba-28f4-6b4a-8000-ca575a13d36a}}, metadata{source: loop, writes: None, step: 0}, created_at2024-08-29T19:19:38.81781300:00, parent_config{configurable: {thread_id: 1, checkpoint_ns: , checkpoint_id: 1ef663ba-28f0-6c66-bfff-6723431e8481}}, tasks(PregelTask(idf1b14528-5ee5-579c-949b-23ef9bfbed58, namenode_a, errorNone, interrupts()),), ), StateSnapshot( values{bar: []}, next(__start__,), config{configurable: {thread_id: 1, checkpoint_ns: , checkpoint_id: 1ef663ba-28f0-6c66-bfff-6723431e8481}}, metadata{source: input, writes: {foo: }, step: -1}, created_at2024-08-29T19:19:38.81620500:00, parent_configNone, tasks(PregelTask(id6d27aa2e-d72b-5504-a36f-8620e54a76dd, name__start__, errorNone, interrupts()),), ) ]查找特定检查点你可以通过筛选状态历史来找到符合特定条件的检查点。history list(graph.get_state_history(config)) # Find the checkpoint before a specific node executed before_node_b next(s for s in history if s.next (node_b,)) # Find a checkpoint by step number step_2 next(s for s in history if s.metadata[step] 2) # Find checkpoints created by update_state forks [s for s in history if s.metadata[source] update] # Find the checkpoint where an interrupt occurred interrupted next( s for s in history if s.tasks and any(t.interrupts for t in s.tasks) )回放Replay“回放”功能会重新执行先前某个检查点checkpoint之后的步骤。你可以通过传入一个先前的checkpoint_id来调用图从而重新运行该检查点之后的所有节点。检查点之前的节点会被跳过因为它们的结果已经被保存了直接读取即可。检查点之后的节点会重新执行这包括所有的 LLM 调用、API 请求或中断——这些操作在回放过程中总是会被重新触发。更新状态你可以使用update_state来编辑图的状态。这一操作会创建一个包含更新值的新检查点——它绝不会修改原始的检查点。这种更新被视为与节点更新完全相同当定义了reducer函数时值会经过这些函数处理。因此带有 reducer 的通道会累积值而不是覆盖它们1.2.4 记忆存储状态模式定义了一组键这些键会在图执行过程中被填充。正如上文所述检查点器可以在图的每一步将状态写入线程从而实现状态持久化。但是如果我们想要跨线程保留某些信息该怎么办呢试想一下聊天机器人的场景我们需要在该用户的所有聊天对话即不同的线程中保留关于该用户的特定信息仅靠检查点器我们无法在线程之间共享信息。这就引出了对Store接口的需求。举个例子我们可以定义一个InMemoryStore来存储跨线程的用户信息。我们只需像以前一样用检查点器编译图并传入这个 store 即可当你使用 LangGraph API 时不需要手动实现或配置存储。API 会在后台自动为你处理所有的存储基础设施InMemoryStore适合开发和测试阶段使用。但在生产环境中请使用持久化存储比如PostgresStore或RedisStore。所有的实现都继承自BaseStore这也是你在编写节点函数签名时应该使用的类型注解基本用法首先让我们在不使用 LangGraph 的情况下单独展示这个功能from langgraph.store.memory import InMemoryStore store InMemoryStore()1.2.5 检查点库在底层检查点功能是由符合BaseCheckpointSaver接口的检查点器对象驱动的。LangGraph 提供了多种检查点器实现它们都是通过独立的、可安装的库来实现的。关于可用的提供商列表请参阅“检查点器集成”文档langgraph-checkpoint这是检查点保存器BaseCheckpointSaver和序列化/反序列化接口SerializerProtocol的基础接口包。它包含了一个用于实验的内存检查点器实现InMemorySaver。LangGraph 默认自带了这个包。langgraph-checkpoint-sqlite这是一个使用 SQLite 数据库的 LangGraph 检查点器实现SqliteSaver/AsyncSqliteSaver。非常适合实验和本地开发工作流。需要单独安装。langgraph-checkpoint-postgres这是一个使用 Postgres 数据库的高级检查点器PostgresSaver/AsyncPostgresSaverLangSmith 内部就在用它。非常适合在生产环境中使用。需要单独安装。langgraph-checkpoint-cosmosdb这是一个使用 Azure Cosmos DB 的 LangGraph 检查点器实现CosmosDBSaver/AsyncCosmosDBSaver。非常适合在 Azure 云环境下进行生产使用。支持同步和异步操作。需要单独安装。检查点器接口每个检查点器都遵循BaseCheckpointSaver接口并实现了以下方法.put- 存储检查点及其配置和元数据。.put_writes- 存储与检查点关联的中间写入内容即待处理的写入。.get_tuple- 根据给定的配置thread_id和checkpoint_id获取检查点元组。这用于在graph.get_state()中填充StateSnapshot。.list- 列出符合给定配置和过滤条件的检查点。这用于在graph.get_state_history()中填充状态历史。若要异步运行你的图你可以使用InMemorySaver或者使用 SQLite/Postgres 检查点器的异步版本——即AsyncSqliteSaver/AsyncPostgresSaver检查点器。序列化器当检查点器保存图状态时它们需要对状态中的通道值进行序列化。这是通过序列化器对象来完成的。langgraph_checkpoint定义了实现序列化器的协议并提供了一个默认实现JsonPlusSerializer它可以处理各种各样的类型包括 LangChain 和 LangGraph 的原语、日期时间、枚举等等。使用 Pickle 进行序列化默认的序列化器JsonPlusSerializer底层使用的是ormsgpack和 JSON但这并不适用于所有类型的对象。如果你想为当前 msgpack 编码器不支持的对象例如 Pandas 数据框回退到使用 pickle你可以使用JsonPlusSerializer的pickle_fallback参数from langgraph.checkpoint.memory import InMemorySaver from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer # ... Define the graph ... graph.compile( checkpointerInMemorySaver(serdeJsonPlusSerializer(pickle_fallbackTrue)) )加密检查点器可以选择加密所有持久化的状态。要启用此功能请将EncryptedSerializer的实例传递给任何BaseCheckpointSaver实现的serde参数。创建加密序列化器最简单的方法是通过from_pycryptodome_aes它会从LANGGRAPH_AES_KEY环境变量中读取 AES 密钥或者接受一个key参数import sqlite3 from langgraph.checkpoint.serde.encrypted import EncryptedSerializer from langgraph.checkpoint.sqlite import SqliteSaver serde EncryptedSerializer.from_pycryptodome_aes() # reads LANGGRAPH_AES_KEY checkpointer SqliteSaver(sqlite3.connect(checkpoint.db), serdeserde)from langgraph.checkpoint.serde.encrypted import EncryptedSerializer from langgraph.checkpoint.postgres import PostgresSaver serde EncryptedSerializer.from_pycryptodome_aes() checkpointer PostgresSaver.from_conn_string(postgresql://..., serdeserde) checkpointer.setup()当在 LangSmith 上运行时只要存在LANGGRAPH_AES_KEY加密功能就会自动启用因此你只需要提供该环境变量即可。你可以通过实现CipherProtocol并将其提供给EncryptedSerializer来使用其他加密方案2 持久化执行持久化执行是一种技术即进程或工作流在关键节点保存进度使其能够暂停并在之后完全从暂停的地方恢复运行。这在以下场景中特别有用人机交互用户可以在继续之前检查、验证或修改流程。长耗时任务任务可能会遇到中断或错误例如调用大语言模型超时。通过保留已完成的工作持久化执行使进程无需重新处理之前的步骤即可恢复——即使间隔了很长时间例如一周后。LangGraph 内置的持久化层为工作流提供了持久化执行能力确保每个执行步骤的状态都被保存到持久化存储中。这种能力保证了如果工作流被中断——无论是由于系统故障还是为了进行人机交互——它都可以从最后记录的状态恢复运行。如果你在使用 LangGraph 时配置了检查点器那么你就已经启用了持久化执行功能。你可以在任意点暂停和恢复工作流即使是在中断或故障之后。为了充分利用持久化执行请确保你的工作流设计是确定性的和幂等的并将任何副作用或非确定性操作封装在任务中。你可以使用来自 StateGraph图 API和函数式 API 的任务2.1 条件要在 LangGraph 中利用持久化执行你需要在工作流中启用持久化指定一个检查点器来保存工作流的进度。在执行工作流时指定线程标识符这将跟踪特定工作流实例的执行历史。将任何非确定性操作例如随机数生成或具有副作用的操作例如文件写入、API 调用封装在任务中这是为了确保当工作流恢复时这些操作不会针对该次运行重复执行而是从持久化层检索它们的结果。更多信息请参见“确定性与一致重放”2.2 确定性与一致性重放当你恢复一次工作流运行时代码不会从执行停止的那一行代码处继续相反它会识别一个合适的起点从中断的地方开始继续。这意味着工作流将从起点重放所有步骤直到到达它停止的位置。因此在编写用于持久化执行的工作流时你必须将任何非确定性操作例如随机数生成和任何具有副作用的操作例如文件写入、API 调用封装在任务或节点中。为了确保你的工作流是确定性的并且可以一致地重放请遵循以下准则避免重复工作如果一个节点包含多个具有副作用的操作例如日志记录、文件写入或网络调用请将每个操作封装在单独的任务中。这确保了当工作流恢复时这些操作不会重复执行而是从持久化层检索它们的结果。封装非确定性操作将任何可能产生非确定性结果的代码例如随机数生成封装在任务或节点中。这确保了在恢复时工作流遵循完全相同的记录步骤序列并获得相同的结果。使用幂等操作在可能的情况下确保副作用例如 API 调用、文件写入是幂等的。这意味着如果工作流失败后重试操作其效果将与第一次执行时相同。这对于涉及数据写入的操作尤为重要。如果一个任务开始但未能成功完成工作流的恢复将重新运行该任务依靠记录的结果来保持一致性。请使用幂等键或验证现有结果以避免意外的重复从而确保工作流执行顺畅且可预测。关于要避免的一些陷阱示例请参阅函数式 API 中的“常见陷阱”部分该部分展示了如何使用任务来构建代码以避免这些问题。同样的原则也适用于 StateGraph图 API2.3 持久化模式LangGraph 支持三种持久性模式允许你根据应用程序的需求在性能和数据持久性之间取得平衡。更高的耐用性模式会给工作流执行增加更多的开销。你可以在调用任何图执行方法时指定耐用性模式graph.stream( {input: test}, durabilitysync )持久性模式从低到高排列如下exitLangGraph 仅在图执行退出时无论是成功、出错还是因人机交互中断才持久化更改。这为长耗时图提供了最佳性能但意味着中间状态不会被保存因此你无法从执行过程中发生的系统故障如进程崩溃中恢复。asyncLangGraph 在下一步执行的同时异步持久化更改。这提供了良好的性能和耐用性但如果进程在执行期间崩溃LangGraph 存在无法写入检查点的微小风险。syncLangGraph 在下一步开始之前同步持久化更改。这确保了 LangGraph 在继续执行之前写入每个检查点以牺牲一定的性能开销为代价提供高耐用性。2.4 节点中使用任务如果一个节点包含多个操作你可能会发现将每个操作转换为任务更容易而不是将这些操作重构为单独的节点#原始 from typing import NotRequired from typing_extensions import TypedDict from langchain_core.utils.uuid import uuid7 from langgraph.checkpoint.memory import InMemorySaver from langgraph.graph import StateGraph, START, END import requests # Define a TypedDict to represent the state class State(TypedDict): url: str result: NotRequired[str] def call_api(state: State): Example node that makes an API request. result requests.get(state[url]).text[:100] # Side-effect # return { result: result } # Create a StateGraph builder and add a node for the call_api function builder StateGraph(State) builder.add_node(call_api, call_api) # Connect the start and end nodes to the call_api node builder.add_edge(START, call_api) builder.add_edge(call_api, END) # Specify a checkpointer checkpointer InMemorySaver() # Compile the graph with the checkpointer graph builder.compile(checkpointercheckpointer) # Define a config with a thread ID. thread_id str(uuid7()) config {configurable: {thread_id: thread_id}} # Invoke the graph graph.invoke({url: https://www.example.com}, config)#with task from typing import NotRequired from typing_extensions import TypedDict from langchain_core.utils.uuid import uuid7 from langgraph.checkpoint.memory import InMemorySaver from langgraph.func import task from langgraph.graph import StateGraph, START, END import requests # Define a TypedDict to represent the state class State(TypedDict): urls: list[str] result: NotRequired[list[str]] #包装成langgraph异步任务 task def _make_request(url: str): Make a request. return requests.get(url).text[:100] def call_api(state: State): Example node that makes an API request. #future,所以下一步要调用result()获取结果 requests [_make_request(url) for url in state[urls]] results [request.result() for request in requests] return { results: results } # Create a StateGraph builder and add a node for the call_api function builder StateGraph(State) builder.add_node(call_api, call_api) # Connect the start and end nodes to the call_api node builder.add_edge(START, call_api) builder.add_edge(call_api, END) # Specify a checkpointer checkpointer InMemorySaver() # Compile the graph with the checkpointer graph builder.compile(checkpointercheckpointer) # Define a config with a thread ID. thread_id str(uuid7()) config {configurable: {thread_id: thread_id}} # Invoke the graph graph.invoke({urls: [https://www.example.com]}, config)2.5 恢复工作流一旦启用了持久执行你可以通过检查点checkpoint机制从上次中断的地方恢复工作流主要应用于以下两种情况暂停与恢复 (Pausing and Resuming)目的通常用于需要人工介入Human-in-the-Loop的场景例如审批、审核或等待外部输入。方法在代码中使用interrupt()函数主动暂停工作流。当需要恢复时通过Command指令可以携带更新后的状态来继续执行。从故障中恢复 (Recovering from Failures)目的在遇到意外异常如 LLM 服务中断、网络问题导致工作流失败后自动从最后一个成功的检查点恢复避免从头开始重跑。方法使用与之前相同的thread_id并以None作为输入值再次执行工作流。LangGraph 会自动识别并跳过已成功执行的步骤从失败点继续。2.6 恢复的起点恢复执行时工作流并非从代码中断的那一行继续而是从一个预定义的“起点”开始重放replay。这个起点取决于你使用的 API 和工作流结构使用 StateGraph (Graph API)恢复起点是执行停止时所在节点的开头。涉及子图 (Subgraph)这是一个嵌套场景。恢复起点有两个层面在父图中起点是调用该子图的那个节点的开头。在子图内部起点是子图中执行停止的那个具体节点的开头。使用 Functional API恢复起点是执行停止时所在的入口点entrypoint的开头。3 流LangGraph 实现了一个流式系统用于呈现实时更新。流式传输对于增强基于大语言模型构建的应用程序的响应能力至关重要。通过逐步显示输出即使在完整响应准备就绪之前流式传输能显著改善用户体验尤其是在处理大语言模型的延迟时3.1 起步基本用法LangGraph 图公开了stream同步和astream异步方法用于以迭代器的形式产出流式输出。你可以传递一个或多个流模式来控制你接收的数据。from typing import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.config import get_stream_writer class State(TypedDict): topic: str joke: str def generate_joke(state: State): writer get_stream_writer() writer({status: thinking of a joke...}) return {joke: fWhy did the {state[topic]} go to school? To get a sundae education!} graph ( StateGraph(State) .add_node(generate_joke) .add_edge(START, generate_joke) .add_edge(generate_joke, END) .compile() ) for chunk in graph.stream( {topic: ice cream}, stream_mode[updates, custom], versionv2, ): if chunk[type] updates: for node_name, state in chunk[data].items(): print(fNode {node_name} updated: {state}) elif chunk[type] custom: print(fStatus: {chunk[data][status]})#输出 Status: thinking of a joke... Node generate_joke updated: {joke: Why did the ice cream go to school? To get a sundae education!}流输出格式v2需要 LangGraph 1.1 版本。本页面所有示例均使用 versionv2向stream()或astream()传递versionv2以获取统一的输出格式。每个数据块都是一个StreamPart字典具有一致的结构——无论流模式、模式数量或子图设置如何{ type: values | updates | messages | custom | checkpoints | tasks | debug, ns: (), # namespace tuple, populated for subgraph events data: ..., # the actual payload (type varies by stream mode) }每种流模式都有对应的TypedDict包含ValuesStreamPart、UpdatesStreamPart、MessagesStreamPart、CustomStreamPart、CheckpointStreamPart、TasksStreamPart、DebugStreamPart。你可以从langgraph.types导入这些类型。联合类型StreamPart是基于part[type]的互斥联合能够在编辑器和类型检查器中实现完整的类型窄化。在 v1默认中输出格式会根据你的流式传输选项而改变单一模式返回原始数据多种模式返回(mode, data)元组子图返回(namespace, data)元组。而在 v2 中格式始终如一#v2 for chunk in graph.stream(inputs, stream_modeupdates, versionv2): print(chunk[type]) # updates print(chunk[ns]) # () print(chunk[data]) # {node_name: {key: value}} #v1 for chunk in graph.stream(inputs, stream_modeupdates): print(chunk) # {node_name: {key: value}}V2 格式还支持类型窄化这意味着你可以通过chunk[type]过滤数据块并获得正确的负载类型。每个分支都会将part[data]窄化为该模式对应的具体类型for part in graph.stream( {topic: ice cream}, stream_mode[values, updates, messages, custom], versionv2, ): if part[type] values: # ValuesStreamPart — full state snapshot after each step print(fState: topic{part[data][topic]}) elif part[type] updates: # UpdatesStreamPart — only the changed keys from each node for node_name, state in part[data].items(): print(fNode {node_name} updated: {state}) elif part[type] messages: # MessagesStreamPart — (message_chunk, metadata) from LLM calls msg, metadata part[data] print(msg.content, end, flushTrue) elif part[type] custom: # CustomStreamPart — arbitrary data from get_stream_writer() print(fProgress: {part[data][progress]}%)3.2 流模式3.3 高级4 中断5 时间回溯6 记忆7 子图