- 工作流具有预定义的代码路径,旨在按特定顺序运行。
- 智能体是动态的,会自行定义其流程和工具使用方式。

设置
要构建工作流或智能体,您可以使用支持结构化输出和工具调用的任何聊天模型。以下示例使用 Anthropic:- 安装依赖项:
pip install langchain_core langchain-anthropic langgraph
- 初始化 LLM:
import os
import getpass
from langchain_anthropic import ChatAnthropic
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
llm = ChatAnthropic(model="claude-sonnet-4-6")
LLM 与增强功能
工作流和智能体系统基于 LLM 以及您为其添加的各种增强功能。工具调用、结构化输出和短期记忆是根据您的需求定制 LLM 的几种选项。
# 结构化输出的模式
from pydantic import BaseModel, Field
class SearchQuery(BaseModel):
search_query: str = Field(None, description="针对网络搜索优化的查询。")
justification: str = Field(
None, description="此查询与用户请求相关的理由。"
)
# 使用结构化输出模式增强 LLM
structured_llm = llm.with_structured_output(SearchQuery)
# 调用增强的 LLM
output = structured_llm.invoke("钙 CT 评分与高胆固醇有何关系?")
# 定义工具
def multiply(a: int, b: int) -> int:
return a * b
# 使用工具增强 LLM
llm_with_tools = llm.bind_tools([multiply])
# 调用 LLM 并输入触发工具调用的内容
msg = llm_with_tools.invoke("2 乘以 3 是多少?")
# 获取工具调用
msg.tool_calls
提示链
提示链是指每个 LLM 调用处理前一个调用的输出。它通常用于执行定义明确的任务,这些任务可以分解为更小、可验证的步骤。一些示例包括:- 将文档翻译成不同语言
- 验证生成内容的一致性

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display
# 图状态
class State(TypedDict):
topic: str
joke: str
improved_joke: str
final_joke: str
# 节点
def generate_joke(state: State):
"""首次 LLM 调用以生成初始笑话"""
msg = llm.invoke(f"写一个关于 {state['topic']} 的简短笑话")
return {"joke": msg.content}
def check_punchline(state: State):
"""检查函数,用于检查笑话是否有笑点"""
# 简单检查 - 笑话是否包含 "?" 或 "!"
if "?" in state["joke"] or "!" in state["joke"]:
return "Pass"
return "Fail"
def improve_joke(state: State):
"""第二次 LLM 调用以改进笑话"""
msg = llm.invoke(f"通过添加文字游戏让这个笑话更有趣:{state['joke']}")
return {"improved_joke": msg.content}
def polish_joke(state: State):
"""第三次 LLM 调用以进行最终润色"""
msg = llm.invoke(f"为这个笑话添加一个令人惊讶的转折:{state['improved_joke']}")
return {"final_joke": msg.content}
# 构建工作流
workflow = StateGraph(State)
# 添加节点
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)
# 添加边以连接节点
workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges(
"generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)
# 编译
chain = workflow.compile()
# 显示工作流
display(Image(chain.get_graph().draw_mermaid_png()))
# 调用
state = chain.invoke({"topic": "cats"})
print("初始笑话:")
print(state["joke"])
print("\n--- --- ---\n")
if "improved_joke" in state:
print("改进后的笑话:")
print(state["improved_joke"])
print("\n--- --- ---\n")
print("最终笑话:")
print(state["final_joke"])
else:
print("最终笑话:")
print(state["joke"])
并行化
通过并行化,LLM 可以同时处理任务。这可以通过同时运行多个独立子任务,或多次运行同一任务以检查不同输出来实现。并行化通常用于:- 拆分子任务并并行运行,以提高速度
- 多次运行任务以检查不同输出,从而提高置信度
- 运行一个处理文档关键词的子任务,以及另一个检查格式错误的子任务
- 多次运行任务以根据不同的标准(如引用数量、使用的来源数量以及来源质量)对文档的准确性进行评分

# 图状态
class State(TypedDict):
topic: str
joke: str
story: str
poem: str
combined_output: str
# 节点
def call_llm_1(state: State):
"""首次 LLM 调用以生成初始笑话"""
msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话")
return {"joke": msg.content}
def call_llm_2(state: State):
"""第二次 LLM 调用以生成故事"""
msg = llm.invoke(f"写一个关于 {state['topic']} 的故事")
return {"story": msg.content}
def call_llm_3(state: State):
"""第三次 LLM 调用以生成诗歌"""
msg = llm.invoke(f"写一首关于 {state['topic']} 的诗")
return {"poem": msg.content}
def aggregator(state: State):
"""将笑话、故事和诗歌合并为单个输出"""
combined = f"这是一个关于 {state['topic']} 的故事、笑话和诗歌!\n\n"
combined += f"故事:\n{state['story']}\n\n"
combined += f"笑话:\n{state['joke']}\n\n"
combined += f"诗歌:\n{state['poem']}"
return {"combined_output": combined}
# 构建工作流
parallel_builder = StateGraph(State)
# 添加节点
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)
# 添加边以连接节点
parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)
parallel_workflow = parallel_builder.compile()
# 显示工作流
display(Image(parallel_workflow.get_graph().draw_mermaid_png()))
# 调用
state = parallel_workflow.invoke({"topic": "cats"})
print(state["combined_output"])
路由
路由工作流处理输入,然后将其定向到特定于上下文的任务。这允许您为复杂任务定义专门的流程。例如,为回答产品相关问题而构建的工作流可能会首先处理问题类型,然后将请求路由到特定流程,如定价、退款、退货等。
from typing_extensions import Literal
from langchain.messages import HumanMessage, SystemMessage
# 用于路由逻辑的结构化输出模式
class Route(BaseModel):
step: Literal["poem", "story", "joke"] = Field(
None, description="路由过程中的下一步"
)
# 使用结构化输出模式增强 LLM
router = llm.with_structured_output(Route)
# 状态
class State(TypedDict):
input: str
decision: str
output: str
# 节点
def llm_call_1(state: State):
"""写一个故事"""
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_2(state: State):
"""写一个笑话"""
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_3(state: State):
"""写一首诗"""
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_router(state: State):
"""将输入路由到适当的节点"""
# 运行增强的 LLM 并使用结构化输出作为路由逻辑
decision = router.invoke(
[
SystemMessage(
content="根据用户的请求,将输入路由到故事、笑话或诗歌。"
),
HumanMessage(content=state["input"]),
]
)
return {"decision": decision.step}
# 用于路由到适当节点的条件边函数
def route_decision(state: State):
# 返回您希望下一步访问的节点名称
if state["decision"] == "story":
return "llm_call_1"
elif state["decision"] == "joke":
return "llm_call_2"
elif state["decision"] == "poem":
return "llm_call_3"
# 构建工作流
router_builder = StateGraph(State)
# 添加节点
router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)
# 添加边以连接节点
router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges(
"llm_call_router",
route_decision,
{ # route_decision 返回的名称 : 下一个要访问的节点名称
"llm_call_1": "llm_call_1",
"llm_call_2": "llm_call_2",
"llm_call_3": "llm_call_3",
},
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)
# 编译工作流
router_workflow = router_builder.compile()
# 显示工作流
display(Image(router_workflow.get_graph().draw_mermaid_png()))
# 调用
state = router_workflow.invoke({"input": "给我写一个关于猫的笑话"})
print(state["output"])
协调器-工作者
在协调器-工作者配置中,协调器:- 将任务分解为子任务
- 将子任务委派给工作者
- 将工作者输出合成为最终结果

from typing import Annotated, List
import operator
# 用于规划的结构化输出模式
class Section(BaseModel):
name: str = Field(
description="此报告部分的名称。",
)
description: str = Field(
description="此部分将涵盖的主要主题和概念的简要概述。",
)
class Sections(BaseModel):
sections: List[Section] = Field(
description="报告的各个部分。",
)
# 使用结构化输出模式增强 LLM
planner = llm.with_structured_output(Sections)
在 LangGraph 中创建工作者
协调器-工作者工作流很常见,LangGraph 对其提供了内置支持。Send API 允许您动态创建工作者节点并向它们发送特定输入。每个工作者都有自己的状态,所有工作者的输出都会写入协调器图可访问的共享状态键。这使协调器能够访问所有工作者的输出,并允许其将它们合成为最终输出。下面的示例遍历部分列表,并使用 Send API 将每个部分发送给一个工作者。
from langgraph.types import Send
# 图状态
class State(TypedDict):
topic: str # 报告主题
sections: list[Section] # 报告部分列表
completed_sections: Annotated[
list, operator.add
] # 所有工作者并行写入此键
final_report: str # 最终报告
# 工作者状态
class WorkerState(TypedDict):
section: Section
completed_sections: Annotated[list, operator.add]
# 节点
def orchestrator(state: State):
"""为报告生成计划的协调器"""
# 生成查询
report_sections = planner.invoke(
[
SystemMessage(content="生成报告计划。"),
HumanMessage(content=f"报告主题:{state['topic']}"),
]
)
return {"sections": report_sections.sections}
def llm_call(state: WorkerState):
"""工作者撰写报告的一个部分"""
# 生成部分
section = llm.invoke(
[
SystemMessage(
content="按照提供的名称和描述撰写报告部分。每个部分不包含序言。使用 Markdown 格式。"
),
HumanMessage(
content=f"部分名称:{state['section'].name},描述:{state['section'].description}"
),
]
)
# 将更新后的部分写入已完成的部分
return {"completed_sections": [section.content]}
def synthesizer(state: State):
"""从各部分合成完整报告"""
# 已完成部分列表
completed_sections = state["completed_sections"]
# 将已完成部分格式化为字符串,用作最终部分的上下文
completed_report_sections = "\n\n---\n\n".join(completed_sections)
return {"final_report": completed_report_sections}
# 用于创建 llm_call 工作者的条件边函数,每个工作者撰写报告的一个部分
def assign_workers(state: State):
"""为计划中的每个部分分配一个工作者"""
# 通过 Send() API 并行启动部分撰写
return [Send("llm_call", {"section": s}) for s in state["sections"]]
# 构建工作流
orchestrator_worker_builder = StateGraph(State)
# 添加节点
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)
# 添加边以连接节点
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
"orchestrator", assign_workers, ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)
# 编译工作流
orchestrator_worker = orchestrator_worker_builder.compile()
# 显示工作流
display(Image(orchestrator_worker.get_graph().draw_mermaid_png()))
# 调用
state = orchestrator_worker.invoke({"topic": "创建一份关于 LLM 扩展定律的报告"})
from IPython.display import Markdown
Markdown(state["final_report"])
评估器-优化器
在评估器-优化器工作流中,一个 LLM 调用创建响应,另一个评估该响应。如果评估器或人机回环确定响应需要改进,则会提供反馈并重新创建响应。此循环会持续进行,直到生成可接受的响应。 评估器-优化器工作流通常在任务有特定成功标准但需要迭代才能满足该标准时使用。例如,在两种语言之间翻译文本时,并不总是能完美匹配。可能需要几次迭代才能生成在两种语言中具有相同含义的翻译。
# 图状态
class State(TypedDict):
joke: str
topic: str
feedback: str
funny_or_not: str
# 用于评估的结构化输出模式
class Feedback(BaseModel):
grade: Literal["funny", "not funny"] = Field(
description="决定笑话是否有趣。",
)
feedback: str = Field(
description="如果笑话不有趣,请提供改进建议。",
)
# 使用结构化输出模式增强 LLM
evaluator = llm.with_structured_output(Feedback)
# 节点
def llm_call_generator(state: State):
"""LLM 生成笑话"""
if state.get("feedback"):
msg = llm.invoke(
f"写一个关于 {state['topic']} 的笑话,但要考虑反馈:{state['feedback']}"
)
else:
msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话")
return {"joke": msg.content}
def llm_call_evaluator(state: State):
"""LLM 评估笑话"""
grade = evaluator.invoke(f"评估这个笑话 {state['joke']}")
return {"funny_or_not": grade.grade, "feedback": grade.feedback}
# 用于根据评估器的反馈路由回笑话生成器或结束的条件边函数
def route_joke(state: State):
"""根据评估器的反馈路由回笑话生成器或结束"""
if state["funny_or_not"] == "funny":
return "Accepted"
elif state["funny_or_not"] == "not funny":
return "Rejected + Feedback"
# 构建工作流
optimizer_builder = StateGraph(State)
# 添加节点
optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)
# 添加边以连接节点
optimizer_builder.add_edge(START, "llm_call_generator")
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator")
optimizer_builder.add_conditional_edges(
"llm_call_evaluator",
route_joke,
{ # route_joke 返回的名称 : 下一个要访问的节点名称
"Accepted": END,
"Rejected + Feedback": "llm_call_generator",
},
)
# 编译工作流
optimizer_workflow = optimizer_builder.compile()
# 显示工作流
display(Image(optimizer_workflow.get_graph().draw_mermaid_png()))
# 调用
state = optimizer_workflow.invoke({"topic": "Cats"})
print(state["joke"])
智能体
智能体通常实现为使用工具执行操作的 LLM。它们在连续的反馈循环中运行,并用于问题和解决方案不可预测的情况。智能体比工作流具有更大的自主权,可以决定使用哪些工具以及如何解决问题。您仍然可以定义可用的工具集和智能体行为准则。
使用工具
from langchain.tools import tool
# 定义工具
@tool
def multiply(a: int, b: int) -> int:
"""将 `a` 和 `b` 相乘。
参数:
a: 第一个整数
b: 第二个整数
"""
return a * b
@tool
def add(a: int, b: int) -> int:
"""将 `a` 和 `b` 相加。
参数:
a: 第一个整数
b: 第二个整数
"""
return a + b
@tool
def divide(a: int, b: int) -> float:
"""将 `a` 除以 `b`。
参数:
a: 第一个整数
b: 第二个整数
"""
return a / b
# 使用工具增强 LLM
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)
from langgraph.graph import MessagesState
from langchain.messages import SystemMessage, HumanMessage, ToolMessage
# 节点
def llm_call(state: MessagesState):
"""LLM 决定是否调用工具"""
return {
"messages": [
llm_with_tools.invoke(
[
SystemMessage(
content="您是一个有用的助手,负责对一组输入执行算术运算。"
)
]
+ state["messages"]
)
]
}
def tool_node(state: dict):
"""执行工具调用"""
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
return {"messages": result}
# 用于根据 LLM 是否进行工具调用来路由到工具节点或结束的条件边函数
def should_continue(state: MessagesState) -> Literal["tool_node", END]:
"""根据 LLM 是否进行工具调用来决定是继续循环还是停止"""
messages = state["messages"]
last_message = messages[-1]
# 如果 LLM 进行工具调用,则执行操作
if last_message.tool_calls:
return "tool_node"
# 否则,我们停止(回复用户)
return END
# 构建工作流
agent_builder = StateGraph(MessagesState)
# 添加节点
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)
# 添加边以连接节点
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
["tool_node", END]
)
agent_builder.add_edge("tool_node", "llm_call")
# 编译智能体
agent = agent_builder.compile()
# 显示智能体
display(Image(agent.get_graph(xray=True).draw_mermaid_png()))
# 调用
messages = [HumanMessage(content="Add 3 and 4.")]
messages = agent.invoke({"messages": messages})
for m in messages["messages"]:
m.pretty_print()

