流式输出
原文:流式输出
一句话
LangChain 实现了流式系统,用于展示 agent 运行的实时更新,显著提升基于 LLM 的应用响应性。
什么时候翻这页
当你需要:
- 实时展示 agent 执行进度
- 流式显示 LLM 生成的 tokens
- 展示模型的推理过程
- 自定义流式更新内容
- 同时使用多种流式模式
核心概念
-
流式模式 (stream_mode): LangChain 支持三种流式模式:
updates: 每个 agent 步骤后流式传输状态更新messages: 从调用 LLM 的图节点中流式传输(token, metadata)元组custom: 使用流式写入器从图节点中流式传输自定义数据
-
流式输出 (streaming): 通过逐步显示输出来改善用户体验,特别是在处理 LLM 延迟时
-
事件流式传输 (event streaming): LangChain v1.3 引入的类型投影 API,为每个投影提供独立的迭代器
怎么做
流式传输 agent 进度
使用 stream_mode="updates" 流式传输 agent 进度:
from langchain.agents import create_agent
from langchain_core.utils.uuid import uuid7
from langgraph.checkpoint.memory import InMemorySaver
def get_weather(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
agent = create_agent(
model="google_genai:gemini-3.5-flash",
tools=[get_weather],
checkpointer=InMemorySaver()
)
config = {"configurable": {"thread_id": str(uuid7())}}
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
config=config,
stream_mode="updates",
version="v2",
):
if chunk["type"] == "updates":
for step, data in chunk["data"].items():
print(f"step: {step}")
print(f"content: {data['messages'][-1].content_blocks}")
流式传输 LLM tokens
使用 stream_mode="messages" 流式传输 LLM 生成的 tokens:
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="messages",
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
print(f"node: {metadata['langgraph_node']}")
print(f"content: {token.content_blocks}")
流式传输自定义更新
使用 get_stream_writer 流式传输自定义更新:
from langchain.agents import create_agent
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""Get weather for a given city."""
writer = get_stream_writer()
writer(f"Looking up data for city: {city}")
writer(f"Acquired data for city: {city}")
return f"It's always sunny in {city}!"
agent = create_agent(
model="claude-sonnet-4-6",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="custom",
version="v2",
):
if chunk["type"] == "custom":
print(chunk["data"])
同时使用多种流式模式
通过将流式模式作为列表传递来指定多种流式模式:
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode=["updates", "custom"],
version="v2",
):
print(f"stream_mode: {chunk['type']}")
print(f"content: {chunk['data']}")
流式传输推理/思考 tokens
过滤类型为 "reasoning" 的标准内容块来流式传输推理 tokens:
from langchain.agents import create_agent
from langchain.messages import AIMessageChunk
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables import Runnable
def get_weather(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
model = ChatAnthropic(
model_name="claude-sonnet-4-6",
timeout=None,
stop=None,
thinking={"type": "enabled", "budget_tokens": 5000},
)
agent: Runnable = create_agent(
model=model,
tools=[get_weather],
)
for token, metadata in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="messages",
):
if not isinstance(token, AIMessageChunk):
continue
reasoning = [b for b in token.content_blocks if b["type"] == "reasoning"]
text = [b for b in token.content_blocks if b["type"] == "text"]
if reasoning:
print(f"[thinking] {reasoning[0]['reasoning']}", end="")
if text:
print(text[0]["text"], end="")
禁用流式传输
在初始化模型时设置 streaming=False 来禁用特定模型的流式传输:
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="gpt-5.4",
streaming=False
)
使用 v2 流式格式
传递 version="v2" 获取统一的输出格式:
# v2 格式 - 无需解包元组
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode=["updates", "custom"],
version="v2",
):
print(chunk["type"]) # "updates" 或 "custom"
print(chunk["data"]) # 载荷
命令 / API 速查
agent.stream(): 流式执行 agentagent.astream(): 异步流式执行 agentstream_mode: 指定流式模式 ("updates","messages","custom")get_stream_writer(): 获取流式写入器version="v2": 使用 v2 流式格式streaming=False: 禁用模型流式传输
与 LangGraph / RAG 手册的联系
- LangGraph 提供更底层的编排 runtime,而 LangChain 提供高层的 harness
- 流式输出在 LangGraph 中有更高级的选项,包括
values和debug模式 - RAG 系统可以利用流式输出实时展示检索和生成过程
初学者易错点
- 忘记为持久化对话历史传递
thread_id,导致无法恢复对话状态 - 在工具外部使用
get_stream_writer(),会导致无法在非 LangGraph 执行上下文中调用工具 - 混淆
stream_mode参数,导致无法获取预期的流式输出类型 - 忽略
version="v2"参数,导致使用旧的元组解包格式 - 未正确处理中断(interrupts),导致人类在循环功能无法正常工作
相关词条
- create_agent - 创建 agent
- tools - 工具定义和使用
- models - 模型配置和推理
- RAG - 检索增强生成
- LangGraph - 底层编排框架
- human-in-the-loop - 人类在循环功能
- frontend-streaming - 前端流式UI