本指南演示了 LangGraph 图 API 的基础知识。它将逐步讲解状态 ,以及如何组合常见的图结构,如序列 、分支 和循环 。它还涵盖了 LangGraph 的控制功能,包括用于 map-reduce 工作流的 Send API 和用于将状态更新与节点间“跳转”相结合的 Command API 。
安装 langgraph:
npm install @langchain/langgraph
设置 LangSmith 以获得更好的调试体验 注册 LangSmith 以快速发现问题并提升 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用程序——请阅读文档 了解更多关于如何开始的信息。
定义和更新状态
这里我们展示如何在 LangGraph 中定义和更新状态 。我们将演示:
如何使用状态定义图的模式
如何使用归约器 来控制状态更新的处理方式。
定义状态
LangGraph 中的状态 使用 StateSchema 类定义。这提供了一个统一的 API,接受各个字段的标准模式 (如 Zod ),以及特殊值类型,如 ReducedValue、MessagesValue 和 UntrackedValue。
默认情况下,图将具有相同的输入和输出模式,状态决定了该模式。请参阅定义输入和输出模式 以了解如何定义不同的输入和输出模式。
让我们考虑一个使用消息 的简单示例。这代表了多种 LLM 应用程序的状态的通用表述。有关更多详细信息,请参阅我们的概念页面 。
import { StateSchema , MessagesValue } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
messages : MessagesValue ,
extraField : z . number () ,
} ) ;
此状态跟踪一个消息 对象列表,以及一个额外的整数字段。
更新状态
让我们构建一个包含单个节点的示例图。我们的节点 只是一个 TypeScript 函数,它读取图的状态并对其进行更新。此函数的第一个参数始终是状态:
import { AIMessage } from "@langchain/core/messages" ;
import { GraphNode } from "@langchain/langgraph" ;
const node : GraphNode < typeof State > = ( state ) => {
const messages = state . messages ;
const newMessage = new AIMessage ( "Hello!" ) ;
return { messages : [newMessage] , extraField : 10 };
};
此节点只是将一条消息附加到我们的消息列表(归约器处理连接),并填充一个额外的字段。
接下来,让我们定义一个包含此节点的简单图。我们使用 StateGraph 来定义一个操作此状态的图。然后我们使用 addNode 来填充我们的图。
import { StateGraph } from "@langchain/langgraph" ;
const graph = new StateGraph (State)
. addNode ( "node" , node)
. addEdge ( "__start__" , "node" )
. compile () ;
LangGraph 提供了内置工具来可视化您的图。让我们检查我们的图。有关可视化的详细信息,请参阅可视化您的图 。
import * as fs from "node:fs/promises" ;
const drawableGraph = await graph . getGraphAsync () ;
const image = await drawableGraph . drawMermaidPng () ;
const imageBuffer = new Uint8Array ( await image . arrayBuffer ()) ;
await fs . writeFile ( "graph.png" , imageBuffer) ;
在这种情况下,我们的图只执行一个节点。让我们继续一个简单的调用:
import { HumanMessage } from "@langchain/core/messages" ;
const result = await graph . invoke ( { messages : [ new HumanMessage ( "Hi" )] , extraField : 0 } ) ;
console . log (result) ;
{ messages: [HumanMessage { content: 'Hi' }, AIMessage { content: 'Hello!' }], extraField: 10 }
请注意:
我们通过更新状态的一个键来启动调用。
我们在调用结果中接收到整个状态。
为方便起见,我们经常通过日志记录检查消息对象 的内容:
for ( const message of result . messages) {
console . log ( ` ${ message . getType () } : ${ message . content } ` ) ;
}
使用归约器处理状态更新
状态中的每个键都可以有自己的独立归约器 函数,该函数控制如何应用来自节点的更新。如果未显式指定归约器函数,则假定对该键的所有更新都应覆盖它。
在我们之前的示例中,我们使用了 MessagesValue,它已经有一个内置的归约器。对于自定义字段,您可以使用 ReducedValue 来定义如何应用更新。
在之前的示例中,我们的节点通过向状态中的 "messages" 键附加一条消息来更新它。MessagesValue 归约器会自动处理此操作:
import { StateSchema , MessagesValue , ReducedValue } from "@langchain/langgraph" ;
import * as z from "zod" ;
// MessagesValue 已经有一个内置的归约器
const State = new StateSchema ( {
messages : MessagesValue ,
extraField : z . number () ,
} ) ;
我们的节点可以简单地返回新消息(归约器处理连接):
import { GraphNode } from "@langchain/langgraph" ;
const node : GraphNode < typeof State > = ( state ) => {
const newMessage = new AIMessage ( "Hello!" ) ;
return { messages : [newMessage] , extraField : 10 };
};
import { START } from "@langchain/langgraph" ;
const graph = new StateGraph (State)
. addNode ( "node" , node)
. addEdge (START , "node" )
. compile () ;
const result = await graph . invoke ( { messages : [ new HumanMessage ( "Hi" )] } ) ;
for ( const message of result . messages) {
console . log ( ` ${ message . getType () } : ${ message . content } ` ) ;
}
MessagesValue
在实践中,更新消息列表还有额外的考虑因素:
LangGraph 包含内置的 MessagesValue 来处理这些考虑因素:
import { StateSchema , StateGraph , MessagesValue , GraphNode , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
messages : MessagesValue ,
extraField : z . number () ,
} ) ;
const node : GraphNode < typeof State > = ( state ) => {
const newMessage = new AIMessage ( "Hello!" ) ;
return { messages : [newMessage] , extraField : 10 };
};
const graph = new StateGraph (State)
. addNode ( "node" , node)
. addEdge (START , "node" )
. compile () ;
const inputMessage = { role : "user" , content : "Hi" };
const result = await graph . invoke ( { messages : [inputMessage] } ) ;
for ( const message of result . messages) {
console . log ( ` ${ message . getType () } : ${ message . content } ` ) ;
}
这是涉及聊天模型 的应用程序的状态的通用表示。LangGraph 包含预构建的 MessagesValue 以方便使用,因此我们可以拥有:
import { StateSchema , MessagesValue } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
messages : MessagesValue ,
extraField : z . number () ,
} ) ;
定义输入和输出模式
默认情况下,StateGraph 使用单一模式操作,所有节点都应使用该模式进行通信。但是,也可以为图定义不同的输入和输出模式。
当指定了不同的模式时,内部模式仍将用于节点之间的通信。输入模式确保提供的输入符合预期结构,而输出模式根据定义的输出模式过滤内部数据,仅返回相关信息。
下面,我们将看到如何定义不同的输入和输出模式。
import { StateGraph , StateSchema , GraphNode , START , END } from "@langchain/langgraph" ;
import * as z from "zod" ;
// 定义输入模式
const InputState = new StateSchema ( {
question : z . string () ,
} ) ;
// 定义输出模式
const OutputState = new StateSchema ( {
answer : z . string () ,
} ) ;
// 定义整体模式,结合输入和输出
const OverallState = new StateSchema ( {
question : z . string () ,
answer : z . string () ,
} ) ;
// 定义处理输入的节点
const answerNode : GraphNode < typeof OverallState > = ( state ) => {
// 示例答案和一个额外的键
return { answer : "bye" , question : state . question };
};
// 构建指定了输入和输出模式的图
const graph = new StateGraph ( {
input : InputState ,
output : OutputState ,
state : OverallState ,
} )
. addNode ( "answerNode" , answerNode)
. addEdge (START , "answerNode" )
. addEdge ( "answerNode" , END)
. compile () ;
// 使用输入调用图并打印结果
console . log ( await graph . invoke ( { question : "hi" } )) ;
注意,调用的输出仅包含输出模式。
在节点之间传递私有状态
在某些情况下,您可能希望节点交换对中间逻辑至关重要但不需要成为图的主要模式一部分的信息。此私有数据与图的整体输入/输出无关,应仅在特定节点之间共享。
下面,我们将创建一个由三个节点(node_1、node_2 和 node_3)组成的顺序图示例,其中私有数据在前两个步骤(node_1 和 node_2)之间传递,而第三步(node_3)只能访问公共整体状态。
import { StateGraph , StateSchema , GraphNode , START , END } from "@langchain/langgraph" ;
import * as z from "zod" ;
// 图的整体状态(这是跨节点共享的公共状态)
const OverallState = new StateSchema ( {
a : z . string () ,
} ) ;
// node1 的输出包含不属于整体状态的私有数据
const Node1Output = new StateSchema ( {
privateData : z . string () ,
} ) ;
// node2 的输入仅请求 node1 之后可用的私有数据
const Node2Input = new StateSchema ( {
privateData : z . string () ,
} ) ;
// 私有数据仅在 node1 和 node2 之间共享
const node1 : GraphNode < typeof OverallState > = ( state ) => {
const output = { privateData : "set by node1" };
console . log ( `Entered node 'node1': \n\t Input: ${ JSON . stringify ( state ) } . \n\t Returned: ${ JSON . stringify ( output ) } ` ) ;
return output ;
};
const node2 : GraphNode < typeof Node2Input > = ( state ) => {
const output = { a : "set by node2" };
console . log ( `Entered node 'node2': \n\t Input: ${ JSON . stringify ( state ) } . \n\t Returned: ${ JSON . stringify ( output ) } ` ) ;
return output ;
};
// node3 仅能访问整体状态(无法访问 node1 的私有数据)
const node3 : GraphNode < typeof OverallState > = ( state ) => {
const output = { a : "set by node3" };
console . log ( `Entered node 'node3': \n\t Input: ${ JSON . stringify ( state ) } . \n\t Returned: ${ JSON . stringify ( output ) } ` ) ;
return output ;
};
// 以序列连接节点
// node2 接受来自 node1 的私有数据,而
// node3 看不到私有数据。
const graph = new StateGraph (OverallState)
. addNode ( "node1" , node1)
. addNode ( "node2" , node2 , { input : Node2Input } )
. addNode ( "node3" , node3)
. addEdge (START , "node1" )
. addEdge ( "node1" , "node2" )
. addEdge ( "node2" , "node3" )
. addEdge ( "node3" , END)
. compile () ;
// 使用初始状态调用图
const response = await graph . invoke ( { a : "set at start" } ) ;
console . log ( ` \n Output of graph invocation: ${ JSON . stringify ( response ) } ` ) ;
Entered node 'node1':
Input: {"a":"set at start"}.
Returned: {"privateData":"set by node1"}
Entered node 'node2':
Input: {"privateData":"set by node1"}.
Returned: {"a":"set by node2"}
Entered node 'node3':
Input: {"a":"set by node2"}.
Returned: {"a":"set by node3"}
Output of graph invocation: {"a":"set by node3"}
替代状态定义
虽然 StateSchema 是定义状态的推荐方法,但 LangGraph 支持其他几种方法。本节涵盖所有可用选项。
Channels API
Channels API 提供了对状态管理的低级控制。LangGraph 提供了几种内置通道类型:
通道类型 行为 用例 LastValue存储最近的值 简单字段,会被覆盖 BinaryOperatorAggregate使用归约器函数组合值 累积值(计数器、列表) Topic将所有值收集到序列中 事件流、审计日志 EphemeralValue在超级步骤之间重置的值 临时计算状态
使用对象简写:
当您传递一个带有 reducer 和 default 的对象时,它会创建一个 BinaryOperatorAggregate 通道。传递 null 会创建一个 LastValue 通道:
import { BaseMessage } from "@langchain/core/messages" ;
import { StateGraph } from "@langchain/langgraph" ;
interface WorkflowState {
messages : BaseMessage[] ;
question : string ;
answer : string ;
}
const workflow = new StateGraph < WorkflowState > ( {
channels : {
// BinaryOperatorAggregate: 使用归约器组合值
messages : {
reducer : ( current , update ) => current . concat (update) ,
default : () => [] ,
},
// LastValue: 存储最近的值(null = 无归约器)
question : null ,
answer : null ,
},
} ) ;
直接使用通道类:
为了获得更多控制,您可以直接实例化通道类:
import { BaseMessage } from "@langchain/core/messages" ;
import { StateGraph , LastValue , BinaryOperatorAggregate , Topic } from "@langchain/langgraph" ;
interface WorkflowState {
messages : BaseMessage[] ;
question : string ;
events : string [] ;
}
const workflow = new StateGraph < WorkflowState > ( {
channels : {
messages : new BinaryOperatorAggregate < BaseMessage[] > (
( current , update ) => current . concat (update) ,
() => []
) ,
question : new LastValue < string > () ,
// Topic 收集执行期间推送的所有值
events : new Topic < string > () ,
},
} ) ;
Annotation.Root
Annotation.Root 提供了一种使用归约器声明性定义状态的方法。它类似于 StateSchema,但使用不同的语法:
import { BaseMessage } from "@langchain/core/messages" ;
import { Annotation , StateGraph , messagesStateReducer } from "@langchain/langgraph" ;
const State = Annotation . Root ( {
messages : Annotation < BaseMessage[] > ( {
reducer : messagesStateReducer ,
default : () => [] ,
} ) ,
question : Annotation < string > () ,
count : Annotation < number > ( {
reducer : ( current , update ) => current + update ,
default : () => 0 ,
} ) ,
} ) ;
const graph = new StateGraph (State) ;
使用 Zod v3 的 Zod 对象
当使用 Zod v3 时,您可以使用普通的 z.object() 模式定义状态。LangGraph 通过 .langgraph 插件扩展了 Zod v3,该插件提供了 .reducer() 和 .metadata() 方法:
import { z } from "zod/v3" ;
import { BaseMessage } from "@langchain/core/messages" ;
import { StateGraph , messagesStateReducer } from "@langchain/langgraph" ;
const State = z . object ( {
// 使用 .langgraph.reducer() 附加归约器函数
messages : z
. array (z . custom < BaseMessage > ())
. default ([])
. langgraph . reducer (messagesStateReducer) ,
// 简单字段直接使用(最后写入获胜)
question : z . string () . optional () ,
answer : z . string () . optional () ,
// 用于累积值的自定义归约器
count : z
. number ()
. default ( 0 )
. langgraph . reducer ( ( current , update ) => current + update) ,
} ) ;
const graph = new StateGraph (State) ;
使用 Zod v4 的 Zod 对象
Zod v4 使用基于注册表的方法。使用 LangGraph 注册表将元数据附加到模式字段:
import * as z from "zod" ;
import { BaseMessage } from "@langchain/core/messages" ;
import { StateGraph , MessagesZodMeta , messagesStateReducer } from "@langchain/langgraph" ;
import { registry } from "@langchain/langgraph/zod" ;
const State = z . object ( {
// 使用 .register() 和 LangGraph 注册表及 MessagesZodMeta
messages : z
. array (z . custom < BaseMessage > ())
. default ([])
. register (registry , MessagesZodMeta) ,
// 简单字段直接使用(最后写入获胜)
question : z . string () . optional () ,
answer : z . string () . optional () ,
// 通过注册表元数据的自定义归约器
count : z
. number ()
. default ( 0 )
. register (registry , { reducer : ( current : number , update : number ) => current + update } ) ,
} ) ;
const graph = new StateGraph (State) ;
比较表
方法 归约器 类型安全 Zod 版本 推荐 StateSchema✅ 内置 ✅ 完整 v3 或 v4 ✅ 是 Channels API ✅ 手动 ⚠️ 部分 N/A 用于高级情况 Annotation.Root✅ 内置 ✅ 完整 N/A 旧版 Zod v3 + .langgraph ✅ 通过插件 ✅ 完整 仅 v3 旧版 Zod v4 + 注册表 ✅ 通过注册表 ✅ 完整 仅 v4 旧版
添加运行时配置
有时您希望在调用图时能够配置它。例如,您可能希望能够在运行时指定要使用的 LLM 或系统提示,而不会用这些参数污染图状态 。
要添加运行时配置:
为您的配置指定一个模式
将配置添加到节点或条件边的函数签名中
将配置传递到图中。
请参阅下面的简单示例:
import { StateGraph , StateSchema , GraphNode , END , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
// 1. 指定配置模式
const ContextSchema = z . object ( {
myRuntimeValue : z . string () ,
} ) ;
// 2. 定义一个在节点中访问配置的图
const State = new StateSchema ( {
myStateValue : z . number () ,
} ) ;
const node : GraphNode < typeof State > = ( state , runtime ) => {
if (runtime ?. context ?. myRuntimeValue === "a" ) {
return { myStateValue : 1 };
} else if (runtime ?. context ?. myRuntimeValue === "b" ) {
return { myStateValue : 2 };
} else {
throw new Error ( "Unknown values." ) ;
}
};
const graph = new StateGraph (State , ContextSchema)
. addNode ( "node" , node)
. addEdge (START , "node" )
. addEdge ( "node" , END)
. compile () ;
// 3. 在运行时传入配置:
console . log ( await graph . invoke ( {}, { context : { myRuntimeValue : "a" } } )) ;
console . log ( await graph . invoke ( {}, { context : { myRuntimeValue : "b" } } )) ;
{ myStateValue: 1 }
{ myStateValue: 2 }
下面我们演示一个实际示例,我们在运行时配置要使用的 LLM。我们将同时使用 OpenAI 和 Anthropic 模型。 import { ChatOpenAI } from "@langchain/openai" ;
import { ChatAnthropic } from "@langchain/anthropic" ;
import { StateGraph , StateSchema , MessagesValue , GraphNode , START , END } from "@langchain/langgraph" ;
import * as z from "zod" ;
const ConfigSchema = z . object ( {
modelProvider : z . string () . default ( "anthropic" ) ,
} ) ;
const State = new StateSchema ( {
messages : MessagesValue ,
} ) ;
const MODELS = {
anthropic : new ChatAnthropic ( { model : "claude-haiku-4-5-20251001" } ) ,
openai : new ChatOpenAI ( { model : "gpt-4.1-mini" } ) ,
};
const callModel : GraphNode < typeof State > = async ( state , config ) => {
const modelProvider = config ?. configurable ?. modelProvider || "anthropic" ;
const model = MODELS[modelProvider as keyof typeof MODELS] ;
const response = await model . invoke (state . messages) ;
return { messages : [response] };
};
const graph = new StateGraph (State , ConfigSchema)
. addNode ( "model" , callModel)
. addEdge (START , "model" )
. addEdge ( "model" , END)
. compile () ;
// 用法
const inputMessage = { role : "user" , content : "hi" };
// 无配置时,使用默认值(Anthropic)
const response1 = await graph . invoke ( { messages : [inputMessage] } ) ;
// 或者,可以设置 OpenAI
const response2 = await graph . invoke (
{ messages : [inputMessage] },
{ configurable : { modelProvider : "openai" } },
) ;
console . log (response1 . messages . at ( - 1 ) ?. response_metadata ?. model) ;
console . log (response2 . messages . at ( - 1 ) ?. response_metadata ?. model) ;
claude-haiku-4-5-20251001
gpt-4.1-mini-2025-04-14
下面我们演示一个实际示例,我们在运行时配置两个参数:要使用的 LLM 和系统消息。 import { ChatOpenAI } from "@langchain/openai" ;
import { ChatAnthropic } from "@langchain/anthropic" ;
import { SystemMessage } from "@langchain/core/messages" ;
import { StateGraph , StateSchema , MessagesValue , GraphNode , START , END } from "@langchain/langgraph" ;
import * as z from "zod" ;
const ConfigSchema = z . object ( {
modelProvider : z . string () . default ( "anthropic" ) ,
systemMessage : z . string () . optional () ,
} ) ;
const State = new StateSchema ( {
messages : MessagesValue ,
} ) ;
const MODELS = {
anthropic : new ChatAnthropic ( { model : "claude-haiku-4-5-20251001" } ) ,
openai : new ChatOpenAI ( { model : "gpt-4.1-mini" } ) ,
};
const callModel : GraphNode < typeof State > = async ( state , config ) => {
const modelProvider = config ?. configurable ?. modelProvider || "anthropic" ;
const systemMessage = config ?. configurable ?. systemMessage ;
const model = MODELS[modelProvider as keyof typeof MODELS] ;
let messages = state . messages ;
if (systemMessage) {
messages = [ new SystemMessage (systemMessage) , ... messages] ;
}
const response = await model . invoke (messages) ;
return { messages : [response] };
};
const graph = new StateGraph (State , ConfigSchema)
. addNode ( "model" , callModel)
. addEdge (START , "model" )
. addEdge ( "model" , END)
. compile () ;
// 用法
const inputMessage = { role : "user" , content : "hi" };
const response = await graph . invoke (
{ messages : [inputMessage] },
{
configurable : {
modelProvider : "openai" ,
systemMessage : "Respond in Italian."
}
}
) ;
for ( const message of response . messages) {
console . log ( ` ${ message . getType () } : ${ message . content } ` ) ;
}
human: hi
ai: Ciao! Come posso aiutarti oggi?
添加重试策略
在许多用例中,您可能希望您的节点具有自定义的重试策略,例如,如果您正在调用 API、查询数据库或调用 LLM 等。LangGraph 允许您向节点添加重试策略。
要配置重试策略,请将 retryPolicy 参数传递给 addNode 。retryPolicy 参数接受一个 RetryPolicy 对象。下面我们使用默认参数实例化一个 RetryPolicy 对象,并将其与节点关联:
import { RetryPolicy } from "@langchain/langgraph" ;
const graph = new StateGraph (State)
. addNode ( "nodeName" , nodeFunction , { retryPolicy : {} } )
. compile () ;
默认情况下,重试策略会在任何异常上重试,但以下情况除外:
TypeError
SyntaxError
ReferenceError
考虑一个从 SQL 数据库读取的示例。下面我们将两个不同的重试策略传递给节点: import Database from "better-sqlite3" ;
import { ChatAnthropic } from "@langchain/anthropic" ;
import { StateGraph , StateSchema , MessagesValue , GraphNode , START , END } from "@langchain/langgraph" ;
import { AIMessage } from "@langchain/core/messages" ;
const State = new StateSchema ( {
messages : MessagesValue ,
} ) ;
// 创建一个内存数据库
const db : typeof Database . prototype = new Database ( ":memory:" ) ;
const model = new ChatAnthropic ( { model : "claude-3-5-sonnet-20240620" } ) ;
const callModel : GraphNode < typeof State > = async ( state ) => {
const response = await model . invoke (state . messages) ;
return { messages : [response] };
};
const queryDatabase : GraphNode < typeof State > = async ( state ) => {
const queryResult : string = JSON . stringify (
db . prepare ( "SELECT * FROM Artist LIMIT 10;" ) . all () ,
) ;
return { messages : [ new AIMessage ( { content : "queryResult" } )] };
};
const workflow = new StateGraph (State)
// 定义我们将循环的两个节点
. addNode ( "call_model" , callModel , { retryPolicy : { maxAttempts : 5 } } )
. addNode ( "query_database" , queryDatabase , {
retryPolicy : {
retryOn : ( e : any ) : boolean => {
if (e instanceof Database . SqliteError ) {
// 在 "SQLITE_BUSY" 错误上重试
return e . code === "SQLITE_BUSY" ;
}
return false ; // 不要在其他错误上重试
},
},
} )
. addEdge (START , "call_model" )
. addEdge ( "call_model" , "query_database" )
. addEdge ( "query_database" , END) ;
const graph = workflow . compile () ;
在节点内访问执行信息
您可以通过 runtime.executionInfo 访问执行标识和重试信息。这公开了线程、运行和检查点标识符以及重试状态,而无需直接从 config 读取。
属性 类型 描述 threadIdstring | undefined当前执行的线程 ID。 runIdstring | undefined当前执行的运行 ID。 checkpointIdstring当前执行的检查点 ID。 checkpointNsstring当前执行的检查点命名空间。 taskIdstring当前执行的任务 ID。 nodeAttemptnumber当前执行尝试次数(从 1 开始索引)。 nodeFirstAttemptTimenumber | undefined第一次尝试开始的 Unix 时间戳(秒)。在重试期间保持不变。
访问线程和运行 ID
使用 executionInfo 在节点内访问线程 ID、运行 ID 和其他标识字段:
import { StateGraph , StateSchema , START , END } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
result : z . string () ,
} ) ;
const myNode : GraphNode < typeof State > = async ( state , runtime ) => {
const info = runtime . executionInfo ;
console . log ( `Thread: ${ info . threadId } , Run: ${ info . runId } ` ) ;
return { result : "done" };
};
const graph = new StateGraph (State)
. addNode ( "my_node" , myNode)
. addEdge (START , "my_node" )
. addEdge ( "my_node" , END)
. compile () ;
根据重试状态调整行为
当节点具有重试策略时,使用 executionInfo 检查当前尝试次数,并在第一次尝试失败后切换到后备方案:
import { StateGraph , StateSchema , START , END } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
result : z . string () ,
} ) ;
const myNode : GraphNode < typeof State > = async ( state , runtime ) => {
const info = runtime . executionInfo ;
if (info . nodeAttempt > 1 ) {
// 在重试时使用后备方案
return { result : await callFallbackApi () };
}
return { result : await callPrimaryApi () };
};
const graph = new StateGraph (State)
. addNode ( "my_node" , myNode , { retryPolicy : { maxAttempts : 3 } } )
. addEdge (START , "my_node" )
. addEdge ( "my_node" , END)
. compile () ;
executionInfo 在没有重试策略的情况下也可在 Runtime 对象上使用——nodeAttempt 默认为 1,nodeFirstAttemptTime 设置为节点开始执行的时间。
在节点内访问服务器信息
当您的图在 LangGraph Server 上运行时,您可以通过 runtime.serverInfo 访问服务器特定的元数据。
属性 类型 描述 assistantIdstring当前部署的助手 ID。 graphIdstring当前部署的图 ID。 userBaseUser | null经过身份验证的用户(如果配置了自定义身份验证 )。
const myNode : GraphNode < typeof State > = async ( state , runtime ) => {
const server = runtime . serverInfo ;
if (server != null ) {
console . log ( `Assistant: ${ server . assistantId } , Graph: ${ server . graphId } ` ) ;
if (server . user != null ) {
console . log ( `User: ${ server . user . identity } ` ) ;
}
}
return { result : "done" };
};
当图未在 LangGraph Server 上运行时,serverInfo 为 null。
需要 deepagents>=1.9.0(或 @langchain/langgraph>=1.2.8)才能使用 runtime.executionInfo 和 runtime.serverInfo。
创建步骤序列
这里我们演示如何构建一个简单的步骤序列。我们将展示:
如何构建顺序图
用于构建类似图的内置简写形式。
要添加节点序列,我们使用图的 .addNode 和 .addEdge 方法:
import { START , StateGraph } from "@langchain/langgraph" ;
const builder = new StateGraph (State)
. addNode ( "step1" , step1)
. addNode ( "step2" , step2)
. addNode ( "step3" , step3)
. addEdge (START , "step1" )
. addEdge ( "step1" , "step2" )
. addEdge ( "step2" , "step3" ) ;
为什么使用 LangGraph 将应用程序步骤拆分为序列?
LangGraph 使得向您的应用程序添加底层持久化层变得容易。
这允许在节点执行之间对状态进行检查点,因此您的 LangGraph 节点管理:
状态更新如何检查点
人机回环 工作流中的中断如何恢复
我们如何使用 LangGraph 的时间旅行 功能“回退”和分支执行
它们还决定了执行步骤如何流式传输 ,以及您的应用程序如何使用 Studio 进行可视化和调试。 让我们演示一个端到端的示例。我们将创建一个三步序列:
在状态的一个键中填充一个值
更新相同的值
填充一个不同的值
让我们首先定义我们的状态 。这决定了图的模式 ,并可以指定如何应用更新。有关更多详细信息,请参阅使用归约器处理状态更新 。 在我们的例子中,我们将只跟踪两个值: import { StateSchema , GraphNode } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
value1 : z . string () ,
value2 : z . number () ,
} ) ;
我们的节点 只是 TypeScript 函数,它们读取图的状态并对其进行更新。此函数的第一个参数始终是状态: const step1 : GraphNode < typeof State > = ( state ) => {
return { value1 : "a" };
};
const step2 : GraphNode < typeof State > = ( state ) => {
const currentValue1 = state . value1 ;
return { value1 : ` ${ currentValue1 } b` };
};
const step3 : GraphNode < typeof State > = ( state ) => {
return { value2 : 10 };
};
请注意,在向状态发出更新时,每个节点只需指定它希望更新的键的值。 默认情况下,这将覆盖 相应键的值。您也可以使用归约器 来控制更新的处理方式——例如,您可以将连续的更新附加到一个键。有关更多详细信息,请参阅使用归约器处理状态更新 。 最后,我们定义图。我们使用 StateGraph 来定义一个操作此状态的图。 然后我们将使用 addNode 和 addEdge 来填充我们的图并定义其控制流。 import { START , StateGraph } from "@langchain/langgraph" ;
const graph = new StateGraph (State)
. addNode ( "step1" , step1)
. addNode ( "step2" , step2)
. addNode ( "step3" , step3)
. addEdge (START , "step1" )
. addEdge ( "step1" , "step2" )
. addEdge ( "step2" , "step3" )
. compile () ;
指定自定义名称
您可以使用 .addNode 为节点指定自定义名称:const graph = new StateGraph (State)
. addNode ( "myNode" , step1)
. compile () ;
请注意:
.addEdge 接受节点的名称,对于函数,默认为 node.name。
我们必须指定图的入口点。为此,我们添加一条指向 START 节点 的边。
当没有更多节点要执行时,图停止。
接下来,我们编译 我们的图。这提供了对图结构的一些基本检查(例如,识别孤立节点)。如果我们通过检查点 向应用程序添加持久化,也会在此处传入。 LangGraph 提供了内置工具来可视化您的图。让我们检查我们的序列。有关可视化的详细信息,请参阅可视化您的图 。 import * as fs from "node:fs/promises" ;
const drawableGraph = await graph . getGraphAsync () ;
const image = await drawableGraph . drawMermaidPng () ;
const imageBuffer = new Uint8Array ( await image . arrayBuffer ()) ;
await fs . writeFile ( "graph.png" , imageBuffer) ;
让我们继续一个简单的调用: const result = await graph . invoke ( { value1 : "c" } ) ;
console . log (result) ;
{ value1: 'a b', value2: 10 }
请注意:
我们通过为单个状态键提供值来启动调用。我们必须始终为至少一个键提供值。
我们传入的值被第一个节点覆盖。
第二个节点更新了该值。
第三个节点填充了一个不同的值。
创建分支
节点的并行执行对于加速整体图操作至关重要。LangGraph 提供了对节点并行执行的原生支持,这可以显著增强基于图的工作流的性能。这种并行化是通过扇出和扇入机制实现的,利用标准边和 conditional_edges 。下面是一些示例,展示了如何添加创建适合您的分支数据流。
并行运行图节点
在此示例中,我们从 Node A 扇出到 B 和 C,然后扇入到 D。使用我们的状态,我们指定归约器添加操作 。这将组合或累积状态中特定键的值,而不是简单地覆盖现有值。对于列表,这意味着将新列表与现有列表连接。有关使用归约器更新状态的更多详细信息,请参阅上面关于状态归约器 的部分。
import { StateGraph , StateSchema , ReducedValue , GraphNode , START , END } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
// 归约器使其成为仅追加
aggregate : new ReducedValue (
z . array (z . string ()) . default ( () => []) ,
{ reducer : ( x , y ) => x . concat (y) }
) ,
} ) ;
const nodeA : GraphNode < typeof State > = ( state ) => {
console . log ( `Adding "A" to ${ state . aggregate } ` ) ;
return { aggregate : [ "A" ] };
};
const nodeB : GraphNode < typeof State > = ( state ) => {
console . log ( `Adding "B" to ${ state . aggregate } ` ) ;
return { aggregate : [ "B" ] };
};
const nodeC : GraphNode < typeof State > = ( state ) => {
console . log ( `Adding "C" to ${ state . aggregate } ` ) ;
return { aggregate : [ "C" ] };
};
const nodeD : GraphNode < typeof State > = ( state ) => {
console . log ( `Adding "D" to ${ state . aggregate } ` ) ;
return { aggregate : [ "D" ] };
};
const graph = new StateGraph (State)
. addNode ( "a" , nodeA)
. addNode ( "b" , nodeB)
. addNode ( "c" , nodeC)
. addNode ( "d" , nodeD)
. addEdge (START , "a" )
. addEdge ( "a" , "b" )
. addEdge ( "a" , "c" )
. addEdge ( "b" , "d" )
. addEdge ( "c" , "d" )
. addEdge ( "d" , END)
. compile () ;
import * as fs from "node:fs/promises" ;
const drawableGraph = await graph . getGraphAsync () ;
const image = await drawableGraph . drawMermaidPng () ;
const imageBuffer = new Uint8Array ( await image . arrayBuffer ()) ;
await fs . writeFile ( "graph.png" , imageBuffer) ;
使用归约器,您可以看到每个节点中添加的值被累积。
const result = await graph . invoke ( {
aggregate : [] ,
} ) ;
console . log (result) ;
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']
{ aggregate: ['A', 'B', 'C', 'D'] }
在上面的示例中,节点 "b" 和 "c" 在同一个超级步骤 中并发执行。因为它们在同一个步骤中,所以节点 "d" 在 "b" 和 "c" 都完成后执行。 重要的是,来自并行超级步骤的更新可能无法保持一致的顺序。如果您需要来自并行超级步骤的更新具有一致的、预定的顺序,您应该将输出写入状态中的一个单独字段,并附带一个用于排序的值。
LangGraph 在超级步骤 内执行节点,这意味着虽然并行分支是并行执行的,但整个超级步骤是事务性的 。如果其中任何一个分支引发异常,则不会 将任何更新应用到状态(整个超级步骤出错)。 重要的是,当使用检查点 时,超级步骤内成功节点的结果会被保存,并且在恢复时不会重复。 如果您有容易出错的节点(也许想处理不稳定的 API 调用),LangGraph 提供了两种解决方法:
您可以在节点内编写常规 Python 代码来捕获和处理异常。
您可以设置一个**retry_policy ** 来指示图重试引发特定类型异常的节点。只有失败的分支会被重试,因此您无需担心执行冗余工作。
结合使用,这些可以让您执行并行执行并完全控制异常处理。
设置最大并发数
您可以通过在调用图时在配置 中设置 max_concurrency 来控制最大并发任务数。const result = await graph . invoke ( { value1 : "c" }, { configurable : { max_concurrency : 10 }} ) ;
条件分支
如果您的扇出应该在运行时根据状态变化,您可以使用 addConditionalEdges 来使用图状态选择一个或多个路径。请参阅下面的示例,其中节点 a 生成一个状态更新,该更新决定了后续节点。
import { StateGraph , StateSchema , ReducedValue , GraphNode , ConditionalEdgeRouter , START , END } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
aggregate : new ReducedValue (
z . array (z . string ()) . default ( () => []) ,
{ reducer : ( x , y ) => x . concat (y) }
) ,
// 向状态添加一个键。我们将设置此键来确定
// 我们如何分支。
which : z . string () ,
} ) ;
const nodeA : GraphNode < typeof State > = ( state ) => {
console . log ( `Adding "A" to ${ state . aggregate } ` ) ;
return { aggregate : [ "A" ] , which : "c" };
};
const nodeB : GraphNode < typeof State > = ( state ) => {
console . log ( `Adding "B" to ${ state . aggregate } ` ) ;
return { aggregate : [ "B" ] };
};
const nodeC : GraphNode < typeof State > = ( state ) => {
console . log ( `Adding "C" to ${ state . aggregate } ` ) ;
return { aggregate : [ "C" ] };
};
const conditionalEdge : ConditionalEdgeRouter < typeof State , " b " | " c " > = ( state ) => {
// 在此处填写使用状态确定下一个节点的任意逻辑
return state . which as "b" | "c" ;
};
const graph = new StateGraph (State)
. addNode ( "a" , nodeA)
. addNode ( "b" , nodeB)
. addNode ( "c" , nodeC)
. addEdge (START , "a" )
. addEdge ( "b" , END)
. addEdge ( "c" , END)
. addConditionalEdges ( "a" , conditionalEdge)
. compile () ;
import * as fs from "node:fs/promises" ;
const drawableGraph = await graph . getGraphAsync () ;
const image = await drawableGraph . drawMermaidPng () ;
const imageBuffer = new Uint8Array ( await image . arrayBuffer ()) ;
await fs . writeFile ( "graph.png" , imageBuffer) ;
const result = await graph . invoke ( { aggregate : [] } ) ;
console . log (result) ;
Adding "A" to []
Adding "C" to ['A']
{ aggregate: ['A', 'C'], which: 'c' }
您的条件边可以路由到多个目标节点。例如: const routeBcOrCd : ConditionalEdgeRouter < typeof State , " b " | " c " | " d " > = ( state ) => {
if (state . which === "cd" ) {
return [ "c" , "d" ] ;
}
return [ "b" , "c" ] ;
};
Map-Reduce 和 send API
LangGraph 使用 Send API 支持 map-reduce 和其他高级分支模式。以下是使用它的示例:
import { StateGraph , StateSchema , ReducedValue , GraphNode , START , END , Send } from "@langchain/langgraph" ;
import * as z from "zod" ;
const OverallState = new StateSchema ( {
topic : z . string () ,
subjects : z . array (z . string ()) ,
jokes : new ReducedValue (
z . array (z . string ()) . default ( () => []) ,
{ reducer : ( x , y ) => x . concat (y) }
) ,
bestSelectedJoke : z . string () ,
} ) ;
const generateTopics : GraphNode < typeof OverallState > = ( state ) => {
return { subjects : [ "lions" , "elephants" , "penguins" ] };
};
const generateJoke : GraphNode < typeof OverallState > = ( state ) => {
const jokeMap : Record < string , string > = {
lions : "Why don't lions like fast food? Because they can't catch it!" ,
elephants : "Why don't elephants use computers? They're afraid of the mouse!" ,
penguins : "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice."
};
return { jokes : [jokeMap[state . subject]] };
};
const continueToJokes : ConditionalEdgeRouter < typeof OverallState , " generateJoke " > = ( state ) => {
return state . subjects . map ( ( subject ) => new Send ( "generateJoke" , { subject } )) ;
};
const bestJoke : GraphNode < typeof OverallState > = ( state ) => {
return { bestSelectedJoke : "penguins" };
};
const graph = new StateGraph (OverallState)
. addNode ( "generateTopics" , generateTopics)
. addNode ( "generateJoke" , generateJoke)
. addNode ( "bestJoke" , bestJoke)
. addEdge (START , "generateTopics" )
. addConditionalEdges ( "generateTopics" , continueToJokes)
. addEdge ( "generateJoke" , "bestJoke" )
. addEdge ( "bestJoke" , END)
. compile () ;
import * as fs from "node:fs/promises" ;
const drawableGraph = await graph . getGraphAsync () ;
const image = await drawableGraph . drawMermaidPng () ;
const imageBuffer = new Uint8Array ( await image . arrayBuffer ()) ;
await fs . writeFile ( "graph.png" , imageBuffer) ;
// 调用图:这里我们调用它来生成笑话列表
for await ( const step of await graph . stream ( { topic : "animals" } )) {
console . log (step) ;
}
{ generateTopics: { subjects: [ 'lions', 'elephants', 'penguins' ] } }
{ generateJoke: { jokes: [ "Why don't lions like fast food? Because they can't catch it!" ] } }
{ generateJoke: { jokes: [ "Why don't elephants use computers? They're afraid of the mouse!" ] } }
{ generateJoke: { jokes: [ "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice." ] } }
{ bestJoke: { bestSelectedJoke: 'penguins' } }
创建和控制循环
创建带有循环的图时,我们需要一种终止执行的机制。最常见的是添加一条条件边 ,该边在达到某个终止条件时路由到 END 节点。
您还可以在调用或流式传输图时设置图的递归限制。递归限制设置了图在引发错误之前允许执行的超级步骤 数量。请阅读有关递归限制概念 的更多信息。
让我们考虑一个带有循环的简单图,以更好地理解这些机制的工作原理。
要返回状态的最后一个值而不是收到递归限制错误,请参阅下一节 。
创建循环时,您可以包含一条指定终止条件的条件边:
const route : ConditionalEdgeRouter < typeof State , " b " > = ( state ) => {
if ( terminationCondition (state)) {
return END ;
} else {
return "b" ;
}
};
const graph = new StateGraph (State)
. addNode ( "a" , nodeA)
. addNode ( "b" , nodeB)
. addEdge (START , "a" )
. addConditionalEdges ( "a" , route)
. addEdge ( "b" , "a" )
. compile () ;
要控制递归限制,请在配置中指定 "recursionLimit"。这将引发 GraphRecursionError,您可以捕获并处理它:
import { GraphRecursionError } from "@langchain/langgraph" ;
try {
await graph . invoke (inputs , { recursionLimit : 3 } ) ;
} catch (error) {
if (error instanceof GraphRecursionError ) {
console . log ( "Recursion Error" ) ;
}
}
让我们定义一个带有简单循环的图。请注意,我们使用条件边来实现终止条件。
import { StateGraph , StateSchema , ReducedValue , GraphNode , ConditionalEdgeRouter , START , END } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
// 归约器使其成为仅追加
aggregate : new ReducedValue (
z . array (z . string ()) . default ( () => []) ,
{ reducer : ( x , y ) => x . concat (y) }
) ,
} ) ;
const nodeA : GraphNode < typeof State > = ( state ) => {
console . log ( `Node A sees ${ state . aggregate } ` ) ;
return { aggregate : [ "A" ] };
};
const nodeB : GraphNode < typeof State > = ( state ) => {
console . log ( `Node B sees ${ state . aggregate } ` ) ;
return { aggregate : [ "B" ] };
};
// 定义边
const route : ConditionalEdgeRouter < typeof State , " b " > = ( state ) => {
if (state . aggregate . length < 7 ) {
return "b" ;
} else {
return END ;
}
};
const graph = new StateGraph (State)
. addNode ( "a" , nodeA)
. addNode ( "b" , nodeB)
. addEdge (START , "a" )
. addConditionalEdges ( "a" , route)
. addEdge ( "b" , "a" )
. compile () ;
import * as fs from "node:fs/promises" ;
const drawableGraph = await graph . getGraphAsync () ;
const image = await drawableGraph . drawMermaidPng () ;
const imageBuffer = new Uint8Array ( await image . arrayBuffer ()) ;
await fs . writeFile ( "graph.png" , imageBuffer) ;
此架构类似于 ReAct 代理 ,其中节点 "a" 是一个工具调用模型,节点 "b" 代表工具。
在我们的 route 条件边中,我们指定在状态中的 "aggregate" 列表超过阈值长度后结束。
调用图时,我们看到在达到终止条件之前,我们在节点 "a" 和 "b" 之间交替。
const result = await graph . invoke ( { aggregate : [] } ) ;
console . log (result) ;
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']
{ aggregate: ['A', 'B', 'A', 'B', 'A', 'B', 'A'] }
施加递归限制
在某些应用程序中,我们可能无法保证会达到给定的终止条件。在这些情况下,我们可以设置图的递归限制 。这将在给定数量的超级步骤 后引发 GraphRecursionError。然后我们可以捕获并处理此异常:
import { GraphRecursionError } from "@langchain/langgraph" ;
try {
await graph . invoke ( { aggregate : [] }, { recursionLimit : 4 } ) ;
} catch (error) {
if (error instanceof GraphRecursionError ) {
console . log ( "Recursion Error" ) ;
}
}
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Recursion Error
使用 Command 结合控制流和状态更新
将控制流(边)和状态更新(节点)结合起来会很有用。例如,您可能希望同时 执行状态更新并 在同一节点中决定下一个要访问的节点。LangGraph 提供了一种方法,通过从节点函数返回 Command 对象来实现:
import { Command } from "@langchain/langgraph" ;
const myNode = ( state : State ) : Command => {
return new Command ( {
// 状态更新
update : { foo : "bar" },
// 控制流
goto : "myOtherNode"
} ) ;
};
我们在下面展示一个端到端的示例。让我们创建一个包含 3 个节点的简单图:A、B 和 C。我们将首先执行节点 A,然后根据节点 A 的输出决定接下来是转到节点 B 还是节点 C。
import { StateGraph , StateSchema , GraphNode , Command , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
// 定义图状态
const State = new StateSchema ( {
foo : z . string () ,
} ) ;
// 定义节点
const nodeA : GraphNode < typeof State , " nodeB " | " nodeC " > = ( state ) => {
console . log ( "Called A" ) ;
const value = Math . random () > 0.5 ? "b" : "c" ;
// 这是条件边函数的替代品
const goto = value === "b" ? "nodeB" : "nodeC" ;
// 注意 Command 如何允许您同时更新图状态并路由到下一个节点
return new Command ( {
// 这是状态更新
update : { foo : value },
// 这是边的替代品
goto ,
} ) ;
};
const nodeB : GraphNode < typeof State > = ( state ) => {
console . log ( "Called B" ) ;
return { foo : state . foo + "b" };
};
const nodeC : GraphNode < typeof State > = ( state ) => {
console . log ( "Called C" ) ;
return { foo : state . foo + "c" };
};
我们现在可以使用上述节点创建 StateGraph。请注意,该图没有用于路由的条件边 !这是因为控制流是在 nodeA 内部用 Command 定义的。
const graph = new StateGraph (State)
. addNode ( "nodeA" , nodeA , {
ends : [ "nodeB" , "nodeC" ] ,
} )
. addNode ( "nodeB" , nodeB)
. addNode ( "nodeC" , nodeC)
. addEdge (START , "nodeA" )
. compile () ;
您可能已经注意到我们使用 ends 来指定 nodeA 可以导航到哪些节点。这对于图渲染是必要的,并告诉 LangGraph nodeA 可以导航到 nodeB 和 nodeC。
import * as fs from "node:fs/promises" ;
const drawableGraph = await graph . getGraphAsync () ;
const image = await drawableGraph . drawMermaidPng () ;
const imageBuffer = new Uint8Array ( await image . arrayBuffer ()) ;
await fs . writeFile ( "graph.png" , imageBuffer) ;
如果我们多次运行图,我们会看到它根据节点 A 中的随机选择采取不同的路径(A -> B 或 A -> C)。
const result = await graph . invoke ( { foo : "" } ) ;
console . log (result) ;
Called A
Called C
{ foo: 'cc' }
导航到父图中的节点
如果您使用子图 ,您可能希望从子图内的节点导航到不同的子图(即父图中的不同节点)。为此,您可以在 Command 中指定 graph=Command.PARENT:
const myNode = ( state : State ) : Command => {
return new Command ( {
update : { foo : "bar" },
goto : "otherSubgraph" , // 其中 `otherSubgraph` 是父图中的一个节点
graph : Command . PARENT
} ) ;
};
让我们使用上面的示例来演示这一点。我们将通过将上面示例中的 nodeA 更改为一个单节点图,然后将其作为子图添加到我们的父图中。
使用 Command.PARENT 进行状态更新
当您从子图节点向父图节点发送对父图和子图状态模式 共享的键的更新时,您必须 为父图状态中要更新的键定义一个归约器 。请参阅下面的示例。
import { StateGraph , StateSchema , ReducedValue , GraphNode , Command , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
// 注意:我们在这里定义了一个归约器
foo : new ReducedValue (
z . string () . default ( "" ) ,
{ reducer : ( x , y ) => x + y }
) ,
} ) ;
const nodeA : GraphNode < typeof State , " nodeB " | " nodeC " > = ( state ) => {
console . log ( "Called A" ) ;
const value = Math . random () > 0.5 ? "nodeB" : "nodeC" ;
// 注意 Command 如何允许您同时更新图状态并路由到下一个节点
return new Command ( {
update : { foo : "a" },
goto : value ,
// 这告诉 LangGraph 导航到父图中的 nodeB 或 nodeC
// 注意:这将导航到相对于子图的最接近的父图
graph : Command . PARENT ,
} ) ;
};
const subgraph = new StateGraph (State)
. addNode ( "nodeA" , nodeA , { ends : [ "nodeB" , "nodeC" ] } )
. addEdge (START , "nodeA" )
. compile () ;
const nodeB : GraphNode < typeof State > = ( state ) => {
console . log ( "Called B" ) ;
// 注意:由于我们定义了归约器,我们不需要手动将新字符附加到现有的 'foo' 值。
// 相反,归约器会自动附加这些字符
return { foo : "b" };
};
const nodeC : GraphNode < typeof State > = ( state ) => {
console . log ( "Called C" ) ;
return { foo : "c" };
};
const graph = new StateGraph (State)
. addNode ( "subgraph" , subgraph , { ends : [ "nodeB" , "nodeC" ] } )
. addNode ( "nodeB" , nodeB)
. addNode ( "nodeC" , nodeC)
. addEdge (START , "subgraph" )
. compile () ;
const result = await graph . invoke ( { foo : "" } ) ;
console . log (result) ;
Called A
Called C
{ foo: 'ac' }
在工具内部使用
一个常见的用例是从工具内部更新图状态。例如,在客户支持应用程序中,您可能希望在对话开始时根据客户的帐号或 ID 查找客户信息。要从工具更新图状态,您可以从工具返回 Command(update={"my_custom_key": "foo", "messages": [...]}):
import { tool } from "@langchain/core/tools" ;
import { Command } from "@langchain/langgraph" ;
import * as z from "zod" ;
const lookupUserInfo = tool (
async ( input , runtime ) => {
const userId = runtime . serverInfo ?. user ?. identity ;
const userInfo = getUserInfo (userId) ;
return new Command ( {
update : {
// 更新状态键
userInfo : userInfo ,
// 更新消息历史
messages : [ {
role : "tool" ,
content : "Successfully looked up user information" ,
tool_call_id : runtime . toolCall . id
} ]
}
} ) ;
},
{
name : "lookupUserInfo" ,
description : "Use this to look up user information to better assist them with their questions." ,
schema : z . object ( {} ) ,
}
) ;
当从工具返回 Command 时,您必须 在 Command.update 中包含 messages(或任何用于消息历史的状态键),并且 messages 中的消息列表必须 包含 ToolMessage。这对于生成的消息历史有效是必要的(LLM 提供商要求 AI 消息后跟工具调用,然后是工具结果消息)。
如果您使用通过 Command 更新状态的工具,我们建议使用预构建的 ToolNode ,它会自动处理返回 Command 对象的工具,并将它们传播到图状态。如果您正在编写一个调用工具的自定义节点,则需要手动将工具返回的 Command 对象作为节点的更新进行传播。
可视化您的图
这里我们演示如何可视化您创建的图。
您可以可视化任何任意的图 ,包括 StateGraph 。
让我们创建一个简单的示例图来演示可视化。
import { StateGraph , StateSchema , MessagesValue , ReducedValue , GraphNode , ConditionalEdgeRouter , START , END } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
messages : MessagesValue ,
value : new ReducedValue (
z . number () . default ( 0 ) ,
{ reducer : ( x , y ) => x + y }
) ,
} ) ;
const node1 : GraphNode < typeof State > = ( state ) => {
return { value : state . value + 1 };
};
const node2 : GraphNode < typeof State > = ( state ) => {
return { value : state . value * 2 };
};
const router : ConditionalEdgeRouter < typeof State , " node2 " > = ( state ) => {
if (state . value < 10 ) {
return "node2" ;
}
return END ;
};
const app = new StateGraph (State)
. addNode ( "node1" , node1)
. addNode ( "node2" , node2)
. addEdge (START , "node1" )
. addConditionalEdges ( "node1" , router)
. addEdge ( "node2" , "node1" )
. compile () ;
Mermaid
我们也可以将图类转换为 Mermaid 语法。
const drawableGraph = await app . getGraphAsync () ;
console . log (drawableGraph . drawMermaid ()) ;
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
tart__([<p>__start__</p>]):::first
e1(node1)
e2(node2)
nd__([<p>__end__</p>]):::last
tart__ --> node1;
e1 -.-> node2;
e1 -.-> __end__;
e2 --> node1;
ssDef default fill:#f2f0ff,line-height:1.2
ssDef first fill-opacity:0
ssDef last fill:#bfb6fc
PNG
如果愿意,我们可以将图渲染为 .png。这使用 Mermaid.ink API 生成图表。
import * as fs from "node:fs/promises" ;
const drawableGraph = await app . getGraphAsync () ;
const image = await drawableGraph . drawMermaidPng () ;
const imageBuffer = new Uint8Array ( await image . arrayBuffer ()) ;
await fs . writeFile ( "graph.png" , imageBuffer) ;
连接这些文档 到 Claude、VSCode 等,通过 MCP 获取实时答案。