流式输出
原文:Streaming
一句话
LangGraph 通过流式输出(streaming)功能,将图(graph)执行过程中的中间结果实时返回,提升用户体验。
什么时候翻这页
- 需要实时显示 Agent 工作流执行进度
- 想要获取 LLM 输出的 token-by-token 流式响应
- 需要监控图(graph)执行状态和检查点(checkpoints)
- 希望自定义节点(node)中的数据流式输出
- 需要调试复杂的图(graph)执行流程
核心概念
- 流模式(stream_mode):控制流式输出内容的类型,包括
updates、values、messages、custom、checkpoints、tasks和debug - 版本(version):
v1(默认)和v2(推荐),v2提供统一的输出格式 - 流式输出部分(StreamPart):
v2中的统一输出格式,包含type、ns和data字段 - 流式写入器(stream writer):用于从节点(node)或工具(tool)发送自定义数据
- 命名空间(namespace):标识子图(subgraph)事件的来源路径
怎么做
基本用法
使用 stream(同步)和 astream(异步)方法获取流式输出:
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode=["updates", "custom"],
version="v2",
):
if chunk["type"] == "updates":
for node_name, state in chunk["data"].items():
print(f"Node {node_name} updated: {state}")
elif chunk["type"] == "custom":
print(f"Status: {chunk['data']['status']}")
流输出格式(v2)
v2 版本提供统一的输出格式,每个块(chunk)都是 StreamPart 字典:
{
"type": "values" | "updates" | "messages" | "custom" | "checkpoints" | "tasks" | "debug",
"ns": (), # 命名空间元组,子图事件会填充
"data": ..., # 实际负载(类型因流模式而异)
}
流模式详解
图状态(Graph state)
- updates:流式输出每个步骤后状态的更新
- values:流式输出每个步骤后状态的完整值
# updates 模式
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode="updates",
version="v2",
):
if chunk["type"] == "updates":
for node_name, state in chunk["data"].items():
print(f"Node `{node_name}` updated: {state}")
# values 模式
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode="values",
version="v2",
):
if chunk["type"] == "values":
print(f"topic: {chunk['data']['topic']}, joke: {chunk['data']['joke']}")
LLM tokens
使用 messages 模式流式输出 LLM 的 token:
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode="messages",
version="v2",
):
if chunk["type"] == "messages":
message_chunk, metadata = chunk["data"]
if message_chunk.content:
print(message_chunk.content, end="|", flush=True)
自定义数据(Custom data)
使用 get_stream_writer 从节点发送自定义数据:
from langgraph.config import get_stream_writer
def node(state: State):
writer = get_stream_writer()
writer({"custom_key": "Generating custom data inside node"})
return {"answer": "some data"}
# 设置 stream_mode="custom" 接收自定义数据
for chunk in graph.stream(inputs, stream_mode="custom", version="v2"):
if chunk["type"] == "custom":
print(f"Custom event: {chunk['data']['custom_key']}")
检查点(Checkpoints)
使用 checkpoints 模式接收检查点事件:
from langgraph.checkpoint.memory import MemorySaver
graph = graph.compile(checkpointer=MemorySaver())
config = {"configurable": {"thread_id": "1"}}
for chunk in graph.stream(
{"topic": "ice cream"},
config=config,
stream_mode="checkpoints",
version="v2",
):
if chunk["type"] == "checkpoints":
print(chunk["data"])
任务(Tasks)
使用 tasks 模式接收任务开始和结束事件:
for chunk in graph.stream(
{"topic": "ice cream"},
config=config,
stream_mode="tasks",
version="v2",
):
if chunk["type"] == "tasks":
print(chunk["data"])
调试(Debug)
使用 debug 模式流式输出尽可能多的执行信息:
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode="debug",
version="v2",
):
if chunk["type"] == "debug":
print(chunk["data"])
多模式同时使用
可以同时使用多种流模式:
for chunk in graph.stream(inputs, stream_mode=["updates", "custom"], version="v2"):
if chunk["type"] == "updates":
for node_name, state in chunk["data"].items():
print(f"Node `{node_name}` updated: {state}")
elif chunk["type"] == "custom":
print(f"Custom event: {chunk['data']}")
命令 / API 速查
| 方法/参数 | 描述 | 示例 |
|---|---|---|
graph.stream() | 同步流式执行 | graph.stream(inputs, stream_mode="updates", version="v2") |
graph.astream() | 异步流式执行 | await graph.astream(inputs, stream_mode="messages", version="v2") |
stream_mode | 流式输出模式 | ["updates", "custom"] 或 "values" |
version="v2" | 使用 v2 输出格式 | graph.stream(inputs, version="v2") |
get_stream_writer() | 获取流式写入器 | writer = get_stream_writer(); writer({"key": "value"}) |
subgraphs=True | 包含子图输出 | graph.stream(inputs, subgraphs=True, version="v2") |
streaming=False | 禁用特定模型的流式传输 | model = init_chat_model("model-name", streaming=False) |
与 Hello-Agents / Claude Code 的联系
在 Hello-Agents 中,我们学习了 LangGraph 的基本概念和 StateGraph 的使用。流式输出功能扩展了这些基础概念,允许我们在 Agent 工作流执行过程中实时获取反馈。与 Hello-Agents 中的静态执行不同,流式输出让我们能够:
- 实时监控 Agent 的思考过程
- 逐步显示 LLM 的输出内容
- 跟踪状态变化和节点执行情况
- 实现更好的用户交互体验
初学者易错点
- 忘记设置 version="v2":新版本推荐使用
version="v2"以获得统一的输出格式 - Python < 3.11 的异步处理:在 Python < 3.11 中使用异步代码时,必须显式传递
RunnableConfig给ainvoke() - 流式写入器在异步中的限制:在 Python < 3.11 的异步代码中,不能使用
get_stream_writer(),必须手动传递writer参数 - 混淆流模式:不同流模式返回的数据结构不同,需要正确处理
chunk["type"]来区分 - 忽略子图命名空间:使用子图时,注意
chunk["ns"]字段标识数据来源
相关词条
- StateGraph - 状态图
- Checkpointer - 检查点保存器
- Subgraphs - 子图
- Event Streaming - 事件流式输出(LangGraph v1.2+)
- RunnableConfig - 可运行配置