Skip to main content
本指南回顾常见工作流和代理模式。
  • 工作流具有预定的代码路径,旨在按特定顺序运行。
  • 代理是动态的,定义自己的流程和工具使用。
代理工作流 LangGraph 在构建代理和工作流时提供多项优势,包括 持久性流式处理 和调试支持以及 部署

设置

要构建工作流或代理,您可以使用 任何支持结构化输出和工具调用的聊天模型。以下示例使用 Anthropic:
  1. 安装依赖项:
pip install langchain_core langchain-anthropic langgraph
  1. 初始化 LLM:
import os
import getpass

from langchain_anthropic import ChatAnthropic

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")

llm = ChatAnthropic(model="claude-sonnet-4-6")

LLM 和增强

工作流和代理系统基于 LLM 和您添加到其中的各种增强功能。工具调用结构化输出短期记忆 是定制 LLM 以满足您需求的几个选项。 LLM 增强
# Schema for structured output
from pydantic import BaseModel, Field


class SearchQuery(BaseModel):
    search_query: str = Field(None, description="Query that is optimized web search.")
    justification: str = Field(
        None, description="Why this query is relevant to the user's request."
    )


# Augment the LLM with schema for structured output
structured_llm = llm.with_structured_output(SearchQuery)

# Invoke the augmented LLM
output = structured_llm.invoke("How does Calcium CT score relate to high cholesterol?")

# Define a tool
def multiply(a: int, b: int) -> int:
    return a * b

# Augment the LLM with tools
llm_with_tools = llm.bind_tools([multiply])

# Invoke the LLM with input that triggers the tool call
msg = llm_with_tools.invoke("What is 2 times 3?")

# Get the tool call
msg.tool_calls

提示链接

提示链接是当每个 LLM 调用处理前一个调用的输出时。它通常用于执行可以分解为较小的、可验证步骤的定义明确的任务。一些例子包括:
  • 将文件转换为不同的语言
  • 验证生成的内容是否一致
提示链接
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display

# Graph state

class State(TypedDict):
topic: str
joke: str
improved_joke: str
final_joke: str

# Nodes

def generate_joke(state: State):
"""First LLM call to generate initial joke"""

    msg = llm.invoke(f"Write a short joke about {state['topic']}")
    return {"joke": msg.content}

def check_punchline(state: State):
"""Gate function to check if the joke has a punchline"""

    # Simple check - does the joke contain "?" or "!"
    if "?" in state["joke"] or "!" in state["joke"]:
        return "Pass"
    return "Fail"

def improve_joke(state: State):
"""Second LLM call to improve the joke"""

    msg = llm.invoke(f"Make this joke funnier by adding wordplay: {state['joke']}")
    return {"improved_joke": msg.content}

def polish_joke(state: State):
"""Third LLM call for final polish"""
msg = llm.invoke(f"Add a surprising twist to this joke: {state['improved_joke']}")
return {"final_joke": msg.content}

# Build workflow

workflow = StateGraph(State)

# Add nodes

workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)

# Add edges to connect nodes

workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges(
"generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)

# Compile

chain = workflow.compile()

# Show workflow

display(Image(chain.get_graph().draw_mermaid_png()))

# Invoke

state = chain.invoke({"topic": "cats"})
print("Initial joke:")
print(state["joke"])
print("\n--- --- ---\n")
if "improved_joke" in state:
print("Improved joke:")
print(state["improved_joke"])
print("\n--- --- ---\n")

    print("Final joke:")
    print(state["final_joke"])

else:
print("Final joke:")
print(state["joke"])

并行化

并行化时,LLM 同时在一个任务上工作。这是通过同时运行多个独立的子任务,或多次运行同一任务以检查不同输出来完成的。并行化通常用于:
  • 拆分子任务并并行运行它们,这会增加速度
  • 多次运行任务以检查不同的输出,这会增加信心
一些例子包括:
  • 运行一个处理文档关键字的子任务,以及第二个子任务检查格式错误
  • 多次运行一个根据不同标准对文档准确性进行评分的任务,例如引用数量、使用的源,和源的质量
并行化
# Graph state
class State(TypedDict):
    topic: str
    joke: str
    story: str
    poem: str
    combined_output: str

# Nodes

def call_llm_1(state: State):
"""First LLM call to generate initial joke"""

    msg = llm.invoke(f"Write a joke about {state['topic']}")
    return {"joke": msg.content}

def call_llm_2(state: State):
"""Second LLM call to generate story"""

    msg = llm.invoke(f"Write a story about {state['topic']}")
    return {"story": msg.content}

def call_llm_3(state: State):
"""Third LLM call to generate poem"""

    msg = llm.invoke(f"Write a poem about {state['topic']}")
    return {"poem": msg.content}

def aggregator(state: State):
"""Combine the joke, story and poem into a single output"""

    combined = f"Here's a story, joke, and poem about {state['topic']}!\n\n"
    combined += f"STORY:\n{state['story']}\n\n"
    combined += f"JOKE:\n{state['joke']}\n\n"
    combined += f"POEM:\n{state['poem']}"
    return {"combined_output": combined}

# Build workflow

parallel_builder = StateGraph(State)

# Add nodes

parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)

# Add edges to connect nodes

parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)
parallel_workflow = parallel_builder.compile()

# Show workflow

display(Image(parallel_workflow.get_graph().draw_mermaid_png()))

# Invoke

state = parallel_workflow.invoke({"topic": "cats"})
print(state["combined_output"])

路由

路由工作流处理输入,然后将其定向到特定于上下文的任务。这允许您为复杂任务定义专用流程。例如,为回答产品相关问题而构建的工作流可能首先处理问题的类型,然后将请求路由到定价、退款、退货等的特定处理流程。 routing.png
from typing_extensions import Literal
from langchain.messages import HumanMessage, SystemMessage

# Schema for structured output to use as routing logic

class Route(BaseModel):
step: Literal["poem", "story", "joke"] = Field(
None, description="The next step in the routing process"
)

# Augment the LLM with schema for structured output

router = llm.with_structured_output(Route)

# State

class State(TypedDict):
input: str
decision: str
output: str

# Nodes

def llm_call_1(state: State):
"""Write a story"""

    result = llm.invoke(state["input"])
    return {"output": result.content}

def llm_call_2(state: State):
"""Write a joke"""

    result = llm.invoke(state["input"])
    return {"output": result.content}

def llm_call_3(state: State):
"""Write a poem"""

    result = llm.invoke(state["input"])
    return {"output": result.content}

def llm_call_router(state: State):
"""Route the input to the appropriate node"""

    # Run the augmented LLM with structured output to serve as routing logic
    decision = router.invoke(
        [
            SystemMessage(
                content="Route the input to story, joke, or poem based on the user's request."
            ),
            HumanMessage(content=state["input"]),
        ]
    )

    return {"decision": decision.step}

# Conditional edge function to route to the appropriate node

def route_decision(state: State): # Return the node name you want to visit next
if state["decision"] == "story":
return "llm_call_1"
elif state["decision"] == "joke":
return "llm_call_2"
elif state["decision"] == "poem":
return "llm_call_3"

# Build workflow

router_builder = StateGraph(State)

# Add nodes

router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)

# Add edges to connect nodes

router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges(
"llm_call_router",
route_decision,
{ # Name returned by route_decision : Name of next node to visit
"llm_call_1": "llm_call_1",
"llm_call_2": "llm_call_2",
"llm_call_3": "llm_call_3",
},
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)

# Compile workflow

router_workflow = router_builder.compile()

# Show the workflow

display(Image(router_workflow.get_graph().draw_mermaid_png()))

# Invoke

state = router_workflow.invoke({"input": "Write me a joke about cats"})
print(state["output"])

网络粗分管理员

在网络粗分管理员配置中,网络粗分管理员:
  • 将任务餆裂成小任务
  • 将小任务分配给业务员
  • 将业务员输出结成一个最终结果
worker.png 网络粗分管理员配置提供更大的糕殊性,并经常用于无法为且允求颛物庀法了提前其写试上的任务。此方式对于需为冶整幼室多个了的文件鲁鹰历乘法了特文库了个次九任务那山地算坪了。例对,需要方司式整幼室提示吃產了整為第二个文檔了干了放不上格平网帓。
from typing import Annotated, List
import operator

# Schema for structured output to use in planning

class Section(BaseModel):
name: str = Field(
description="Name for this section of the report.",
)
description: str = Field(
description="Brief overview of the main topics and concepts to be covered in this section.",
)

class Sections(BaseModel):
sections: List[Section] = Field(
description="Sections of the report.",
)

# Augment the LLM with schema for structured output

planner = llm.with_structured_output(Sections)

Creating workers in LangGraph

Orchestrator-worker workflows are common and LangGraph has built-in support for them. The Send API lets you dynamically create worker nodes and send them specific inputs. Each worker has its own state, and all worker outputs are written to a shared state key that is accessible to the orchestrator graph. This gives the orchestrator access to all worker output and allows it to synthesize them into a final output. The example below iterates over a list of sections and uses the Send API to send a section to each worker.
from langgraph.types import Send


# Graph state
class State(TypedDict):
    topic: str  # Report topic
    sections: list[Section]  # List of report sections
    completed_sections: Annotated[
        list, operator.add
    ]  # All workers write to this key in parallel
    final_report: str  # Final report


# Worker state
class WorkerState(TypedDict):
    section: Section
    completed_sections: Annotated[list, operator.add]


# Nodes
def orchestrator(state: State):
    """Orchestrator that generates a plan for the report"""

    # Generate queries
    report_sections = planner.invoke(
        [
            SystemMessage(content="Generate a plan for the report."),
            HumanMessage(content=f"Here is the report topic: {state['topic']}"),
        ]
    )

    return {"sections": report_sections.sections}


def llm_call(state: WorkerState):
    """Worker writes a section of the report"""

    # Generate section
    section = llm.invoke(
        [
            SystemMessage(
                content="Write a report section following the provided name and description. Include no preamble for each section. Use markdown formatting."
            ),
            HumanMessage(
                content=f"Here is the section name: {state['section'].name} and description: {state['section'].description}"
            ),
        ]
    )

    # Write the updated section to completed sections
    return {"completed_sections": [section.content]}


def synthesizer(state: State):
    """Synthesize full report from sections"""

    # List of completed sections
    completed_sections = state["completed_sections"]

    # Format completed section to str to use as context for final sections
    completed_report_sections = "\n\n---\n\n".join(completed_sections)

    return {"final_report": completed_report_sections}


# Conditional edge function to create llm_call workers that each write a section of the report
def assign_workers(state: State):
    """Assign a worker to each section in the plan"""

    # Kick off section writing in parallel via Send() API
    return [Send("llm_call", {"section": s}) for s in state["sections"]]


# Build workflow
orchestrator_worker_builder = StateGraph(State)

# Add the nodes
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)

# Add edges to connect nodes
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
    "orchestrator", assign_workers, ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)

# Compile the workflow
orchestrator_worker = orchestrator_worker_builder.compile()

# Show the workflow
display(Image(orchestrator_worker.get_graph().draw_mermaid_png()))

# Invoke
state = orchestrator_worker.invoke({"topic": "Create a report on LLM scaling laws"})

from IPython.display import Markdown
Markdown(state["final_report"])

Evaluator-optimizer

In evaluator-optimizer workflows, one LLM call creates a response and the other evaluates that response. If the evaluator or a human-in-the-loop determines the response needs refinement, feedback is provided and the response is recreated. This loop continues until an acceptable response is generated. Evaluator-optimizer workflows are commonly used when there’s particular success criteria for a task, but iteration is required to meet that criteria. For example, there’s not always a perfect match when translating text between two languages. It might take a few iterations to generate a translation with the same meaning across the two languages. evaluator_optimizer.png
# Graph state
class State(TypedDict):
    joke: str
    topic: str
    feedback: str
    funny_or_not: str

# Schema for structured output to use in evaluation

class Feedback(BaseModel):
grade: Literal["funny", "not funny"] = Field(
description="Decide if the joke is funny or not.",
)
feedback: str = Field(
description="If the joke is not funny, provide feedback on how to improve it.",
)

# Augment the LLM with schema for structured output

evaluator = llm.with_structured_output(Feedback)

# Nodes

def llm_call_generator(state: State):
"""LLM generates a joke"""

    if state.get("feedback"):
        msg = llm.invoke(
            f"Write a joke about {state['topic']} but take into account the feedback: {state['feedback']}"
        )
    else:
        msg = llm.invoke(f"Write a joke about {state['topic']}")
    return {"joke": msg.content}

def llm_call_evaluator(state: State):
"""LLM evaluates the joke"""

    grade = evaluator.invoke(f"Grade the joke {state['joke']}")
    return {"funny_or_not": grade.grade, "feedback": grade.feedback}

# Conditional edge function to route back to joke generator or end based upon feedback from the evaluator

def route_joke(state: State):
"""Route back to joke generator or end based upon feedback from the evaluator"""

    if state["funny_or_not"] == "funny":
        return "Accepted"
    elif state["funny_or_not"] == "not funny":
        return "Rejected + Feedback"

# Build workflow

optimizer_builder = StateGraph(State)

# Add the nodes

optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)

# Add edges to connect nodes

optimizer_builder.add_edge(START, "llm_call_generator")
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator")
optimizer_builder.add_conditional_edges(
"llm_call_evaluator",
route_joke,
{ # Name returned by route_joke : Name of next node to visit
"Accepted": END,
"Rejected + Feedback": "llm_call_generator",
},
)

# Compile the workflow

optimizer_workflow = optimizer_builder.compile()

# Show the workflow

display(Image(optimizer_workflow.get_graph().draw_mermaid_png()))

# Invoke

state = optimizer_workflow.invoke({"topic": "Cats"})
print(state["joke"])

代理

代理通常作为使用工具执行操作的 LLM 应用。它们在持续反馈循环中运行,并用于问题和解决方案不可预测的情况。代理比工作流有更高的自主权,可以对其使用的工具和解决问题的方案做出决策。您仍然可以定义可用的工具集和代理行为的指南。 agent.png
要开始使用代理,请参阅快速开始指南或在 LangChain 中了解更多关于其工作方式的信息。
使用工具
from langchain.tools import tool


# 定义工具
@tool
def multiply(a: int, b: int) -> int:
    """Multiply `a` and `b`.

    Args:
        a: First int
        b: Second int
    """
    return a * b


@tool
def add(a: int, b: int) -> int:
    """Adds `a` and `b`.

    Args:
        a: First int
        b: Second int
    """
    return a + b


@tool
def divide(a: int, b: int) -> float:
    """Divide `a` and `b`.

    Args:
        a: First int
        b: Second int
    """
    return a / b


# Augment the LLM with tools
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)
from langgraph.graph import MessagesState
from langchain.messages import SystemMessage, HumanMessage, ToolMessage

# Nodes

def llm_call(state: MessagesState):
"""LLM decides whether to call a tool or not"""

    return {
        "messages": [
            llm_with_tools.invoke(
                [
                    SystemMessage(
                        content="You are a helpful assistant tasked with performing arithmetic on a set of inputs."
                    )
                ]
                + state["messages"]
            )
        ]
    }

def tool_node(state: dict):
"""Performs the tool call"""

    result = []
    for tool_call in state["messages"][-1].tool_calls:
        tool = tools_by_name[tool_call["name"]]
        observation = tool.invoke(tool_call["args"])
        result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
    return {"messages": result}

# 条件边函数,简庅放到工具节点。

def should_continue(state: MessagesState) -> Literal["tool_node", END]:
"""\u663e决是否根据 LLM 是否调用工具来\u7ee7\u7eed\u975e\u6b7b\u5e9c\u6607\u505c\u7b2c\u4e00\u4e2a\u8282\u70b9\u3002"""

    messages = state["messages"]
    last_message = messages[-1]

    # 此\u4e2d LLM \u6279\u51fa\u4e00\u4e2a\u5de5\u5177\u8c03\u7528\uff0c\u7136\u540e\u6267\u884c\u64cd\u4f5c
    if last_message.tool_calls:
        return "tool_node"

    # \u4e0d\u7136\uff0c\u6211\u4eec\u505c\u4e0b\u6765\uff08\u56de\u590d\u7528\u6237\uff09
    return END

# Build workflow

agent_builder = StateGraph(MessagesState)

# Add nodes

agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)

# Add edges to connect nodes

agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
["tool_node", END]
)
agent_builder.add_edge("tool_node", "llm_call")

# \u8bbf\u95ee\u4ee3\u7406

agent = agent_builder.compile()

# \u6027\u6307\u8f85\u84dd\u56fe

display(Image(agent.get_graph(xray=True).draw_mermaid_png()))

# Invoke

messages = [HumanMessage(content="Add 3 and 4.")]
messages = agent.invoke({"messages": messages})
for m in messages["messages"]:
m.pretty_print()