J'Blog

3 课 · 核心

语义检索

基于 Chroma 向量库,与 Workspace 知识库分离。用自然语言描述你想查的内容。 非管理员每人每天可检索 5 次(消耗站点 AI 配额)。

工作流与代理

原文:Workflows & Agents

一句话

LangGraph 提供构建工作流(Workflows)和代理(Agents)的能力,工作流具有预定义的代码路径,而代理则动态定义自己的流程和工具使用方式。

什么时候翻这页

当你需要设计复杂的多步骤流程,或者需要构建能够自主决策和执行任务的智能代理时,可以参考本页内容。特别适用于需要处理不确定性问题、需要迭代优化结果,或者需要协调多个子任务的场景。

核心概念

工作流(Workflows)与代理(Agents)的区别

  • 工作流:具有预定义的代码路径,按照特定顺序执行
  • 代理:动态定义自己的流程和工具使用,具有更多自主性

LangGraph 的优势

  • 持久化(persistence)
  • 流式处理(streaming)
  • 调试支持
  • 部署支持

LLM 增强功能

  • 工具调用(tool calling)
  • 结构化输出(structured outputs)
  • 短期记忆(short term memory)

常见工作流模式

  1. 提示链(Prompt chaining):每个 LLM 调用处理前一个调用的输出
  2. 并行化(Parallelization):多个 LLM 同时处理任务
  3. 路由(Routing):处理输入并将其导向特定任务
  4. 编排器-工作者(Orchestrator-worker):编排者分解任务,工作者执行子任务
  5. 评估器-优化器(Evaluator-optimizer):一个 LLM 生成响应,另一个评估响应

怎么做

环境设置

pip install langchain_core langchain-anthropic langgraph
import os
import getpass
from langchain_anthropic import ChatAnthropic

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("ANTHROPIC_API_KEY")
llm = ChatAnthropic(model="claude-sonnet-4-6")

LLM 增强示例

# 结构化输出
from pydantic import BaseModel, Field

class SearchQuery(BaseModel):
    search_query: str = Field(None, description="优化网络搜索的查询")
    justification: str = Field(None, description="为什么此查询与用户请求相关")

structured_llm = llm.with_structured_output(SearchQuery)

# 工具增强
def multiply(a: int, b: int) -> int:
    return a * b

llm_with_tools = llm.bind_tools([multiply])

提示链(Prompt chaining)示例

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str
    improved_joke: str
    final_joke: str

def generate_joke(state: State):
    msg = llm.invoke(f"Write a short joke about {state['topic']}")
    return {"joke": msg.content}

def check_punchline(state: State):
    if "?" in state["joke"] or "!" in state["joke"]:
        return "Pass"
    return "Fail"

def improve_joke(state: State):
    msg = llm.invoke(f"Make this joke funnier by adding wordplay: {state['joke']}")
    return {"improved_joke": msg.content}

def polish_joke(state: State):
    msg = llm.invoke(f"Add a surprising twist to this joke: {state['improved_joke']}")
    return {"final_joke": msg.content}

workflow = StateGraph(State)
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)

workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges(
    "generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)

chain = workflow.compile()

并行化(Parallelization)示例

class State(TypedDict):
    topic: str
    joke: str
    story: str
    poem: str
    combined_output: str

def call_llm_1(state: State):
    msg = llm.invoke(f"Write a joke about {state['topic']}")
    return {"joke": msg.content}

def call_llm_2(state: State):
    msg = llm.invoke(f"Write a story about {state['topic']}")
    return {"story": msg.content}

def call_llm_3(state: State):
    msg = llm.invoke(f"Write a poem about {state['topic']}")
    return {"poem": msg.content}

def aggregator(state: State):
    combined = f"Here's a story, joke, and poem about {state['topic']}!\n\n"
    combined += f"STORY:\n{state['story']}\n\n"
    combined += f"JOKE:\n{state['joke']}\n\n"
    combined += f"POEM:\n{state['poem']}"
    return {"combined_output": combined}

parallel_builder = StateGraph(State)
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)

parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)

parallel_workflow = parallel_builder.compile()

路由(Routing)示例

from typing_extensions import Literal
from langchain.messages import HumanMessage, SystemMessage

class Route(BaseModel):
    step: Literal["poem", "story", "joke"] = Field(
        None, description="路由过程中的下一步"
    )

router = llm.with_structured_output(Route)

class State(TypedDict):
    input: str
    decision: str
    output: str

def llm_call_router(state: State):
    decision = router.invoke([
        SystemMessage(content="根据用户请求将输入路由到故事、笑话或诗歌。"),
        HumanMessage(content=state["input"]),
    ])
    return {"decision": decision.step}

def route_decision(state: State):
    if state["decision"] == "story":
        return "llm_call_1"
    elif state["decision"] == "joke":
        return "llm_call_2"
    elif state["decision"] == "poem":
        return "llm_call_3"

router_builder = StateGraph(State)
router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)

router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges(
    "llm_call_router",
    route_decision,
    {"llm_call_1": "llm_call_1", "llm_call_2": "llm_call_2", "llm_call_3": "llm_call_3"},
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)

router_workflow = router_builder.compile()

编排器-工作者(Orchestrator-worker)示例

from typing import Annotated, List
import operator

class Section(BaseModel):
    name: str = Field(description="此报告部分的名称")
    description: str = Field(description="此部分将涵盖的主要主题和概念的简要概述")

class Sections(BaseModel):
    sections: List[Section] = Field(description="报告的各个部分")

planner = llm.with_structured_output(Sections)

class State(TypedDict):
    topic: str
    sections: list[Section]
    completed_sections: Annotated[list, operator.add]
    final_report: str

class WorkerState(TypedDict):
    section: Section
    completed_sections: Annotated[list, operator.add]

def orchestrator(state: State):
    report_sections = planner.invoke([
        SystemMessage(content="为报告生成计划。"),
        HumanMessage(content=f"这是报告主题: {state['topic']}"),
    ])
    return {"sections": report_sections.sections}

def llm_call(state: WorkerState):
    section = llm.invoke([
        SystemMessage(content="根据提供的名称和描述编写报告部分。每个部分不要有前言。使用 markdown 格式。"),
        HumanMessage(content=f"这是部分名称: {state['section'].name} 和描述: {state['section'].description}"),
    ])
    return {"completed_sections": [section.content]}

def synthesizer(state: State):
    completed_report_sections = "\n\n---\n\n".join(state["completed_sections"])
    return {"final_report": completed_report_sections}

def assign_workers(state: State):
    return [Send("llm_call", {"section": s}) for s in state["sections"]]

orchestrator_worker_builder = StateGraph(State)
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)

orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
    "orchestrator", assign_workers, ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)

orchestrator_worker = orchestrator_worker_builder.compile()

评估器-优化器(Evaluator-optimizer)示例

class State(TypedDict):
    joke: str
    topic: str
    feedback: str
    funny_or_not: str

class Feedback(BaseModel):
    grade: Literal["funny", "not funny"] = Field(description="判断笑话是否有趣")
    feedback: str = Field(description="如果笑话不好笑,提供如何改进的反馈")

evaluator = llm.with_structured_output(Feedback)

def llm_call_generator(state: State):
    if state.get("feedback"):
        msg = llm.invoke(f"Write a joke about {state['topic']} but take into account the feedback: {state['feedback']}")
    else:
        msg = llm.invoke(f"Write a joke about {state['topic']}")
    return {"joke": msg.content}

def llm_call_evaluator(state: State):
    grade = evaluator.invoke(f"Grade the joke {state['joke']}")
    return {"funny_or_not": grade.grade, "feedback": grade.feedback}

def route_joke(state: State):
    if state["funny_or_not"] == "funny":
        return "Accepted"
    elif state["funny_or_not"] == "not funny":
        return "Rejected + Feedback"

optimizer_builder = StateGraph(State)
optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)

optimizer_builder.add_edge(START, "llm_call_generator")
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator")
optimizer_builder.add_conditional_edges(
    "llm_call_evaluator",
    route_joke,
    {"Accepted": END, "Rejected + Feedback": "llm_call_generator"},
)

optimizer_workflow = optimizer_builder.compile()

代理(Agents)示例

from langchain.tools import tool
from langgraph.graph import MessagesState
from langchain.messages import SystemMessage, HumanMessage, ToolMessage

@tool
def multiply(a: int, b: int) -> int:
    """Multiply `a` and `b`."""
    return a * b

@tool
def add(a: int, b: int) -> int:
    """Adds `a` and `b`."""
    return a + b

@tool
def divide(a: int, b: int) -> float:
    """Divide `a` and `b`."""
    return a / b

tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)

def llm_call(state: MessagesState):
    return {
        "messages": [
            llm_with_tools.invoke(
                [
                    SystemMessage(
                        content="You are a helpful assistant tasked with performing arithmetic on a set of inputs."
                    )
                ]
                + state["messages"]
            )
        ]
    }

def tool_node(state: MessagesState):
    result = []
    for tool_call in state["messages"][-1].tool_calls:
        tool = tools_by_name[tool_call["name"]]
        observation = tool.invoke(tool_call["args"])
        result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
    return {"messages": result}

def should_continue(state: MessagesState) -> Literal["tool_node", END]:
    messages = state["messages"]
    last_message = messages[-1]
    
    if last_message.tool_calls:
        return "tool_node"
    return END

agent_builder = StateGraph(MessagesState)
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)

agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
    "llm_call",
    should_continue,
    ["tool_node", END]
)
agent_builder.add_edge("tool_node", "llm_call")

agent = agent_builder.compile()

ToolNode 使用示例

from langchain.tools import tool
from langgraph.prebuilt import ToolNode
from langgraph.graph import MessagesState, StateGraph

@tool
def search(query: str) -> str:
    """Search for information."""
    return f"Results for: {query}"

@tool
def calculator(expression: str) -> str:
    """Evaluate a math expression."""
    return str(eval(expression))

builder = StateGraph(MessagesState)
builder.add_node("tools", ToolNode([search, calculator]))
# ... add other nodes and edges
graph = builder.compile()

命令 / API 速查

关键类和函数

  • StateGraph: 创建状态图
  • START: 工作流起点
  • END: 工作流终点
  • with_structured_output: 为 LLM 添加结构化输出能力
  • bind_tools: 为 LLM 绑定工具
  • Send: 动态创建工作节点并发送特定输入
  • ToolNode: 预构建节点,执行工具

常用模式

# 创建状态图
workflow = StateGraph(State)

# 添加节点
workflow.add_node("node_name", node_function)

# 添加边
workflow.add_edge(START, "node_name")
workflow.add_edge("node_name", END)

# 添加条件边
workflow.add_conditional_edges(
    "node_name",
    condition_function,
    {"condition_value": "next_node"}
)

# 编译工作流
app = workflow.compile()

与 Hello-Agents / Claude Code 的联系

在 Hello-Agents 中,我们学习了 LangGraph 的基本概念和简单工作流。本页内容扩展了这些概念,介绍了更复杂的工作流模式和代理设计。Hello-Agents 中的基础概念如节点(node)、边(edge)和状态(state)在本页中得到更深入的应用。Claude Code 则提供了将这些概念付诸实践的编码环境,使开发者能够快速构建和测试 LangGraph 工作流。

初学者易错点

  1. 状态管理混乱:确保状态(State)中的数据在节点间正确传递和更新,避免数据丢失或覆盖。

  2. 循环控制不当:在使用条件边和循环时,确保有明确的终止条件,避免无限循环。

  3. 工具调用错误:在使用工具时,确保参数格式正确,工具名称与定义一致。

  4. 并行执行误解:并行节点虽然同时执行,但它们的完成顺序可能影响后续节点,需要合理安排聚合逻辑。

  5. 路由逻辑不完整:在路由模式中,确保所有可能的条件都有对应的路径,避免工作流卡住。

相关词条

官方原文:https://docs.langchain.com/oss/python/langgraph/workflows-agents