工作流与代理
一句话
LangGraph 提供构建工作流(Workflows)和代理(Agents)的能力,工作流具有预定义的代码路径,而代理则动态定义自己的流程和工具使用方式。
什么时候翻这页
当你需要设计复杂的多步骤流程,或者需要构建能够自主决策和执行任务的智能代理时,可以参考本页内容。特别适用于需要处理不确定性问题、需要迭代优化结果,或者需要协调多个子任务的场景。
核心概念
工作流(Workflows)与代理(Agents)的区别
- 工作流:具有预定义的代码路径,按照特定顺序执行
- 代理:动态定义自己的流程和工具使用,具有更多自主性
LangGraph 的优势
- 持久化(persistence)
- 流式处理(streaming)
- 调试支持
- 部署支持
LLM 增强功能
- 工具调用(tool calling)
- 结构化输出(structured outputs)
- 短期记忆(short term memory)
常见工作流模式
- 提示链(Prompt chaining):每个 LLM 调用处理前一个调用的输出
- 并行化(Parallelization):多个 LLM 同时处理任务
- 路由(Routing):处理输入并将其导向特定任务
- 编排器-工作者(Orchestrator-worker):编排者分解任务,工作者执行子任务
- 评估器-优化器(Evaluator-optimizer):一个 LLM 生成响应,另一个评估响应
怎么做
环境设置
pip install langchain_core langchain-anthropic langgraph
import os
import getpass
from langchain_anthropic import ChatAnthropic
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
llm = ChatAnthropic(model="claude-sonnet-4-6")
LLM 增强示例
# 结构化输出
from pydantic import BaseModel, Field
class SearchQuery(BaseModel):
search_query: str = Field(None, description="优化网络搜索的查询")
justification: str = Field(None, description="为什么此查询与用户请求相关")
structured_llm = llm.with_structured_output(SearchQuery)
# 工具增强
def multiply(a: int, b: int) -> int:
return a * b
llm_with_tools = llm.bind_tools([multiply])
提示链(Prompt chaining)示例
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
improved_joke: str
final_joke: str
def generate_joke(state: State):
msg = llm.invoke(f"Write a short joke about {state['topic']}")
return {"joke": msg.content}
def check_punchline(state: State):
if "?" in state["joke"] or "!" in state["joke"]:
return "Pass"
return "Fail"
def improve_joke(state: State):
msg = llm.invoke(f"Make this joke funnier by adding wordplay: {state['joke']}")
return {"improved_joke": msg.content}
def polish_joke(state: State):
msg = llm.invoke(f"Add a surprising twist to this joke: {state['improved_joke']}")
return {"final_joke": msg.content}
workflow = StateGraph(State)
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)
workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges(
"generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)
chain = workflow.compile()
并行化(Parallelization)示例
class State(TypedDict):
topic: str
joke: str
story: str
poem: str
combined_output: str
def call_llm_1(state: State):
msg = llm.invoke(f"Write a joke about {state['topic']}")
return {"joke": msg.content}
def call_llm_2(state: State):
msg = llm.invoke(f"Write a story about {state['topic']}")
return {"story": msg.content}
def call_llm_3(state: State):
msg = llm.invoke(f"Write a poem about {state['topic']}")
return {"poem": msg.content}
def aggregator(state: State):
combined = f"Here's a story, joke, and poem about {state['topic']}!\n\n"
combined += f"STORY:\n{state['story']}\n\n"
combined += f"JOKE:\n{state['joke']}\n\n"
combined += f"POEM:\n{state['poem']}"
return {"combined_output": combined}
parallel_builder = StateGraph(State)
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)
parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)
parallel_workflow = parallel_builder.compile()
路由(Routing)示例
from typing_extensions import Literal
from langchain.messages import HumanMessage, SystemMessage
class Route(BaseModel):
step: Literal["poem", "story", "joke"] = Field(
None, description="路由过程中的下一步"
)
router = llm.with_structured_output(Route)
class State(TypedDict):
input: str
decision: str
output: str
def llm_call_router(state: State):
decision = router.invoke([
SystemMessage(content="根据用户请求将输入路由到故事、笑话或诗歌。"),
HumanMessage(content=state["input"]),
])
return {"decision": decision.step}
def route_decision(state: State):
if state["decision"] == "story":
return "llm_call_1"
elif state["decision"] == "joke":
return "llm_call_2"
elif state["decision"] == "poem":
return "llm_call_3"
router_builder = StateGraph(State)
router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)
router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges(
"llm_call_router",
route_decision,
{"llm_call_1": "llm_call_1", "llm_call_2": "llm_call_2", "llm_call_3": "llm_call_3"},
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)
router_workflow = router_builder.compile()
编排器-工作者(Orchestrator-worker)示例
from typing import Annotated, List
import operator
class Section(BaseModel):
name: str = Field(description="此报告部分的名称")
description: str = Field(description="此部分将涵盖的主要主题和概念的简要概述")
class Sections(BaseModel):
sections: List[Section] = Field(description="报告的各个部分")
planner = llm.with_structured_output(Sections)
class State(TypedDict):
topic: str
sections: list[Section]
completed_sections: Annotated[list, operator.add]
final_report: str
class WorkerState(TypedDict):
section: Section
completed_sections: Annotated[list, operator.add]
def orchestrator(state: State):
report_sections = planner.invoke([
SystemMessage(content="为报告生成计划。"),
HumanMessage(content=f"这是报告主题: {state['topic']}"),
])
return {"sections": report_sections.sections}
def llm_call(state: WorkerState):
section = llm.invoke([
SystemMessage(content="根据提供的名称和描述编写报告部分。每个部分不要有前言。使用 markdown 格式。"),
HumanMessage(content=f"这是部分名称: {state['section'].name} 和描述: {state['section'].description}"),
])
return {"completed_sections": [section.content]}
def synthesizer(state: State):
completed_report_sections = "\n\n---\n\n".join(state["completed_sections"])
return {"final_report": completed_report_sections}
def assign_workers(state: State):
return [Send("llm_call", {"section": s}) for s in state["sections"]]
orchestrator_worker_builder = StateGraph(State)
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
"orchestrator", assign_workers, ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)
orchestrator_worker = orchestrator_worker_builder.compile()
评估器-优化器(Evaluator-optimizer)示例
class State(TypedDict):
joke: str
topic: str
feedback: str
funny_or_not: str
class Feedback(BaseModel):
grade: Literal["funny", "not funny"] = Field(description="判断笑话是否有趣")
feedback: str = Field(description="如果笑话不好笑,提供如何改进的反馈")
evaluator = llm.with_structured_output(Feedback)
def llm_call_generator(state: State):
if state.get("feedback"):
msg = llm.invoke(f"Write a joke about {state['topic']} but take into account the feedback: {state['feedback']}")
else:
msg = llm.invoke(f"Write a joke about {state['topic']}")
return {"joke": msg.content}
def llm_call_evaluator(state: State):
grade = evaluator.invoke(f"Grade the joke {state['joke']}")
return {"funny_or_not": grade.grade, "feedback": grade.feedback}
def route_joke(state: State):
if state["funny_or_not"] == "funny":
return "Accepted"
elif state["funny_or_not"] == "not funny":
return "Rejected + Feedback"
optimizer_builder = StateGraph(State)
optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)
optimizer_builder.add_edge(START, "llm_call_generator")
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator")
optimizer_builder.add_conditional_edges(
"llm_call_evaluator",
route_joke,
{"Accepted": END, "Rejected + Feedback": "llm_call_generator"},
)
optimizer_workflow = optimizer_builder.compile()
代理(Agents)示例
from langchain.tools import tool
from langgraph.graph import MessagesState
from langchain.messages import SystemMessage, HumanMessage, ToolMessage
@tool
def multiply(a: int, b: int) -> int:
"""Multiply `a` and `b`."""
return a * b
@tool
def add(a: int, b: int) -> int:
"""Adds `a` and `b`."""
return a + b
@tool
def divide(a: int, b: int) -> float:
"""Divide `a` and `b`."""
return a / b
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)
def llm_call(state: MessagesState):
return {
"messages": [
llm_with_tools.invoke(
[
SystemMessage(
content="You are a helpful assistant tasked with performing arithmetic on a set of inputs."
)
]
+ state["messages"]
)
]
}
def tool_node(state: MessagesState):
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
return {"messages": result}
def should_continue(state: MessagesState) -> Literal["tool_node", END]:
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "tool_node"
return END
agent_builder = StateGraph(MessagesState)
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
["tool_node", END]
)
agent_builder.add_edge("tool_node", "llm_call")
agent = agent_builder.compile()
ToolNode 使用示例
from langchain.tools import tool
from langgraph.prebuilt import ToolNode
from langgraph.graph import MessagesState, StateGraph
@tool
def search(query: str) -> str:
"""Search for information."""
return f"Results for: {query}"
@tool
def calculator(expression: str) -> str:
"""Evaluate a math expression."""
return str(eval(expression))
builder = StateGraph(MessagesState)
builder.add_node("tools", ToolNode([search, calculator]))
# ... add other nodes and edges
graph = builder.compile()
命令 / API 速查
关键类和函数
StateGraph: 创建状态图START: 工作流起点END: 工作流终点with_structured_output: 为 LLM 添加结构化输出能力bind_tools: 为 LLM 绑定工具Send: 动态创建工作节点并发送特定输入ToolNode: 预构建节点,执行工具
常用模式
# 创建状态图
workflow = StateGraph(State)
# 添加节点
workflow.add_node("node_name", node_function)
# 添加边
workflow.add_edge(START, "node_name")
workflow.add_edge("node_name", END)
# 添加条件边
workflow.add_conditional_edges(
"node_name",
condition_function,
{"condition_value": "next_node"}
)
# 编译工作流
app = workflow.compile()
与 Hello-Agents / Claude Code 的联系
在 Hello-Agents 中,我们学习了 LangGraph 的基本概念和简单工作流。本页内容扩展了这些概念,介绍了更复杂的工作流模式和代理设计。Hello-Agents 中的基础概念如节点(node)、边(edge)和状态(state)在本页中得到更深入的应用。Claude Code 则提供了将这些概念付诸实践的编码环境,使开发者能够快速构建和测试 LangGraph 工作流。
初学者易错点
-
状态管理混乱:确保状态(State)中的数据在节点间正确传递和更新,避免数据丢失或覆盖。
-
循环控制不当:在使用条件边和循环时,确保有明确的终止条件,避免无限循环。
-
工具调用错误:在使用工具时,确保参数格式正确,工具名称与定义一致。
-
并行执行误解:并行节点虽然同时执行,但它们的完成顺序可能影响后续节点,需要合理安排聚合逻辑。
-
路由逻辑不完整:在路由模式中,确保所有可能的条件都有对应的路径,避免工作流卡住。
相关词条
- StateGraph: 状态图,LangGraph 的核心构建块
- Persistence: 持久化功能,保存工作流状态
- Streaming: 流式处理,实时获取工作流输出
- Tool calling: 工具调用,扩展 LLM 能力
- Structured outputs: 结构化输出,确保 LLM 输出符合特定格式
- Human-in-the-loop: 人机交互,在工作流中引入人工干预