功能性 API 允许你在最少改动现有代码的情况下,将 LangGraph 的关键功能(持久化 、记忆 、人工介入 和 流式处理 )添加到你的应用中。
该 API 设计用于将这些功能整合到可能使用标准控制流语法(例如 if 语句、for 循环和函数调用)的现有代码中。与许多要求将代码重构为明确管道或 DAG 的数据编排框架不同,功能性 API 允许你在不强制采用刚性执行模型的情况下集成这些能力。
功能性 API 使用两个关键构建块:
entrypoint – 封装工作流逻辑并管理执行流,包括处理长时任务和中断。
task – 表示一个独立的工作单元(如 API 调用或数据处理步骤),可在 entrypoint 内异步执行。任务返回类似 future 的对象,可被 await 或同步解析。
这为具有状态管理和流式能力的工作流提供了一个最小抽象。
功能性 API 与 图 API
对于喜欢声明式方式的用户,LangGraph 的 图 API 允许你使用图的范式来定义工作流。两个 API 使用相同的运行时,因此可以在同一应用中混合使用。
关键差异:
控制流 :功能性 API 不要求考虑图结构,你可以使用常规 TypeScript/JavaScript 结构来定义工作流,通常能减少需编写的代码量。
短期记忆 :Graph API 要求声明 State ,并可能需要定义 reducers 来管理状态更新。@entrypoint 和 @tasks 不要求显式状态管理,因为它们的状态作用域限于函数本身,函数间不共享状态。
检查点 :两者都会生成并使用检查点。在 Graph API 中,每个 superstep 后都会生成新的检查点。而在功能性 API 中,当任务执行时,其结果会保存到与给定 entrypoint 关联的现有检查点中,而不是创建新的检查点。
可视化 :Graph API 便于将工作流可视化为图,便于调试和交流。功能性 API 没有此可视化支持,因为图是在运行时动态生成的。
下面演示一个简单应用:写一篇文章并通过 interrupts 暂停以请求人工审核。
import { MemorySaver , entrypoint , task , interrupt } from "@langchain/langgraph" ;
const writeEssay = task ( "writeEssay" , async ( topic : string ) => {
// 模拟一个长时间运行的任务。
await new Promise ( ( resolve ) => setTimeout (resolve , 1000 )) ;
return `An essay about topic: ${ topic } ` ;
} ) ;
const workflow = entrypoint (
{ checkpointer : new MemorySaver () , name : "workflow" },
async ( topic : string ) => {
const essay = await writeEssay (topic) ;
const isApproved = interrupt ( {
// 传入到 interrupt 的任何可 JSON 序列化的负载。
// 在从工作流流式传输数据到客户端时,会作为 Interrupt 被展示。
essay , // 我们希望人工审核的文章。
// 可以添加任何附加信息,例如添加一个名为 "action" 的键作为指示。
action : "Please approve/reject the essay" ,
} ) ;
return {
essay , // 生成的文章
isApproved , // 来自人工的响应
};
},
) ;
该工作流将为主题 “cat” 写一篇文章,然后暂停以等待人工审核。工作流可以无限期中断,直到提供审核结果。 当工作流恢复时,它会从开始处执行,但由于 writeEssay 任务的结果已经保存,任务结果将从检查点加载,而不会重新计算。 import { v4 as uuidv4 } from "uuid" ;
import { MemorySaver , entrypoint , task , interrupt } from "@langchain/langgraph" ;
const writeEssay = task ( "writeEssay" , async ( topic : string ) => {
// 这是一个长时间运行任务的占位符。
await new Promise ( ( resolve ) => setTimeout (resolve , 1000 )) ;
return `An essay about topic: ${ topic } ` ;
} ) ;
const workflow = entrypoint (
{ checkpointer : new MemorySaver () , name : "workflow" },
async ( topic : string ) => {
const essay = await writeEssay (topic) ;
const isApproved = interrupt ( {
essay ,
action : "Please approve/reject the essay" ,
} ) ;
return {
essay ,
isApproved ,
};
},
) ;
const threadId = uuidv4 () ;
const config = {
configurable : {
thread_id : threadId ,
},
};
for await ( const item of workflow . stream ( "cat" , config)) {
console . log (item) ;
}
{ writeEssay: 'An essay about topic: cat' }
{
__interrupt__: [{
value: { essay: 'An essay about topic: cat', action: 'Please approve/reject the essay' },
resumable: true,
ns: ['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'],
when: 'during'
}]
}
一篇文章已生成并等待审核。一旦提供审核,我们可以恢复工作流: import { Command } from "@langchain/langgraph" ;
// 从用户处获取审核(例如通过 UI)
// 这里使用布尔值,但可以是任何可 JSON 序列化的值。
const humanReview = true ;
const stream = await workflow . stream (
new Command ( { resume : humanReview } ) ,
config ,
) ;
for await ( const item of stream) {
console . log (item) ;
}
{ workflow: { essay: 'An essay about topic: cat', isApproved: true } }
工作流完成,审核结果已添加到文章中。
Entrypoint
entrypoint 函数可用于从函数创建工作流。它封装了工作流逻辑并管理执行流,包括处理长时任务和 中断 。
entrypoint 通过将配置和函数传入 entrypoint 函数来定义。
该函数必须接受单个位置参数 ,该参数作为工作流输入。如果需要传递多项数据,请将一个对象作为第一个参数的输入类型。
使用函数创建 entrypoint 将生成一个工作流实例,用于管理工作流执行(例如处理流式、恢复和检查点)。
通常需要将 checkpointer 传递给 entrypoint 以启用持久化并使用人工介入等功能。
import { entrypoint } from "@langchain/langgraph" ;
const myWorkflow = entrypoint (
{ checkpointer , name : "workflow" },
async ( someInput : Record < string , any > ) : Promise < number > => {
// 可能涉及长时任务(如 API 调用),并可能被人工中断
return result ;
},
) ;
序列化 Entrypoint 的 输入 与 输出 必须是可 JSON 序列化的,以支持检查点。请参阅 序列化 部分了解更多细节。
使用 entrypoint 函数会返回一个对象,可以使用 invoke 和 stream 方法执行。
const config = {
configurable : {
thread_id : "some_thread_id"
}
};
await myWorkflow . invoke (someInput , config) ; // 等待结果
const config = {
configurable : {
thread_id : "some_thread_id"
}
};
for await ( const chunk of myWorkflow . stream (someInput , config)) {
console . log (chunk) ;
}
恢复执行
在遇到 interrupt 后恢复执行,可以通过向 Command 原语传入 resume 值来完成。
import { Command } from "@langchain/langgraph" ;
const config = {
configurable : {
thread_id : "some_thread_id"
}
};
await myWorkflow . invoke ( new Command ( { resume : someResumeValue } ) , config) ;
import { Command } from "@langchain/langgraph" ;
const config = {
configurable : {
thread_id : "some_thread_id"
}
};
const stream = await myWorkflow . stream (
new Command ( { resume : someResumableValue } ) ,
config ,
)
for await ( const chunk of stream) {
console . log (chunk) ;
}
在错误后恢复
要在错误后恢复,请使用相同的 thread id (config)以 null 作为输入运行 entrypoint。
这假设底层 错误 已被修复并且执行可以继续。
const config = {
configurable : {
thread_id : "some_thread_id"
}
};
await myWorkflow . invoke ( null , config) ;
const config = {
configurable : {
thread_id : "some_thread_id"
}
};
for await ( const chunk of myWorkflow . stream ( null , config)) {
console . log (chunk) ;
}
短期记忆
当 entrypoint 使用 checkpointer 定义时,它会在相同 thread id 的连续调用之间将信息存储在 检查点 中。
这允许使用 getPreviousState 函数访问前一次调用的状态。
默认情况下,getPreviousState 返回前一次调用的返回值。
import { entrypoint , getPreviousState } from "@langchain/langgraph" ;
const myWorkflow = entrypoint (
{ checkpointer , name : "workflow" },
async ( number : number ) => {
const previous = getPreviousState < number > () ?? 0 ;
return number + previous ;
},
) ;
const config = {
configurable : {
thread_id : "some_thread_id" ,
},
};
await myWorkflow . invoke ( 1 , config) ; // 1 (previous 未定义)
await myWorkflow . invoke ( 2 , config) ; // 3 (previous 为上一次调用的 1)
entrypoint.final
entrypoint.final 是一个特殊的原语,可以从 entrypoint 返回,允许将保存在检查点中的值 与entrypoint 的返回值 解耦。
第一个值是返回给调用者的值,第二个值将保存到检查点中。
import { entrypoint , getPreviousState } from "@langchain/langgraph" ;
const myWorkflow = entrypoint (
{ checkpointer , name : "workflow" },
async ( number : number ) => {
const previous = getPreviousState < number > () ?? 0 ;
// 这将把 previous 返回给调用者,同时把 2 * number 保存到检查点中,
// 下次调用时会用作 `previous` 参数。
return entrypoint . final ( {
value : previous ,
save : 2 * number ,
} ) ;
},
) ;
const config = {
configurable : {
thread_id : "1" ,
},
};
await myWorkflow . invoke ( 3 , config) ; // 0 (previous 未定义)
await myWorkflow . invoke ( 1 , config) ; // 6 (previous 为上一次调用保存的 3 * 2)
Task
任务(task) 表示独立的工作单元,例如 API 调用或数据处理步骤。其两个关键特性:
异步执行 :任务设计为异步执行,允许多个操作并发运行而不阻塞。
检查点 :任务结果会保存到检查点,从而在恢复工作流程时可以从上次保存的状态继续。(详见 持久化 )。
任务通过 task 函数定义,它包装了一个常规函数。
import { task } from "@langchain/langgraph" ;
const slowComputation = task ( "slowComputation" , async ( inputValue : any ) => {
// 模拟长时间运行的操作
return result ;
} ) ;
序列化 任务的 输出 必须可 JSON 序列化以支持检查点。
任务 只能在 entrypoint 、另一个 task 或 状态图节点 内被调用。
任务不能 直接从主应用代码中调用。
当调用 task 时,它会返回一个可 await 的 Promise。
const myWorkflow = entrypoint (
{ checkpointer , name : "workflow" },
async ( someInput : number ) : Promise < number > => {
return await slowComputation (someInput) ;
},
) ;
何时使用 task
任务 适合以下场景:
检查点 :需要将长时操作结果保存到检查点,以便恢复时无需重算。
人工介入 :构建需要人工参与的工作流时,必须使用 tasks 来封装任何随机性(例如 API 调用),以确保可以正确恢复。参见 确定性 节了解更多细节。
并行执行 :对于 I/O 密集型任务,tasks 支持并行执行,允许多个操作同时运行(例如调用多个 API)。
可观察性 :将操作包装为 tasks 可以让 LangSmith 跟踪工作流进度并监控单个操作的执行。
可重试工作 :当工作需要重试以应对失败或不一致时,tasks 提供了一种封装并管理重试逻辑的方式。
序列化
LangGraph 中的序列化有两个关键点:
entrypoint 的输入和输出必须可 JSON 序列化。
task 的输出必须可 JSON 序列化。
这些要求对于启用检查点和工作流恢复至关重要。请使用对象、数组、字符串、数字和布尔值等基本类型以保证可序列化。
提供不可序列化的输入或输出会在配置了 checkpointer 的工作流运行时产生错误。
确定性(Determinism)
要利用例如 人工介入 的功能,任何随机性都应被封装在 tasks 内。这样保证在执行被中止(例如等待人工)然后恢复时,会遵循相同的执行步骤序列,即使 task 的结果本身是非确定性的。
LangGraph 通过在执行期间持久化 task 和 子图 的结果来实现这一点。良好设计的工作流确保恢复执行时遵循相同的已记录步骤序列,从而可以正确检索先前计算的结果而无需重新执行。
虽然不同运行可能产生不同结果,但恢复某次特定 运行时应始终遵循相同的记录步骤序列,这允许 LangGraph 有效地查找在中断发生前已经执行的 task 和 subgraph 结果并避免重复计算。
幂等性
幂等性保证多次运行同一操作会产生相同结果。这有助于防止在步骤重试时出现重复的 API 调用或冗余处理。务必将 API 调用放在 tasks 内以便检查点控制,并设计这些调用在重执行时是幂等的,例如使用幂等键或验证已有结果以避免重复。
常见陷阱
处理副作用
将副作用(例如写文件、发送邮件)封装在 tasks 中,以确保在恢复工作流时不会被多次执行。
在该示例中,副作用(写入文件)直接包含在工作流中,因此在恢复工作流时会被重复执行。 import { entrypoint , interrupt } from "@langchain/langgraph" ;
import fs from "fs" ;
const myWorkflow = entrypoint (
{ checkpointer , name : "workflow },
async ( inputs : Record < string , any > ) => {
// 这段代码在恢复工作流时会被重复执行,
// 这通常不是期望的行为。
fs . writeFileSync ( "output.txt" , "Side effect executed" ) ;
const value = interrupt ( "question" ) ;
return value ;
}
);
在该示例中,副作用被封装到 task 中,确保在恢复时执行一致。 import { entrypoint , task , interrupt } from "@langchain/langgraph" ;
import * as fs from "fs" ;
const writeToFile = task ( "writeToFile" , async () => {
fs . writeFileSync ( "output.txt" , "Side effect executed" ) ;
} ) ;
const myWorkflow = entrypoint (
{ checkpointer , name : "workflow" },
async ( inputs : Record < string , any > ) => {
// 副作用现在封装在 task 中。
await writeToFile () ;
const value = interrupt ( "question" ) ;
return value ;
}
) ;
非确定性控制流
可能每次执行都产生不同结果的操作(如获取当前时间或随机数)应封装在 tasks 中,以确保在恢复时返回相同结果。
在 task 中:获取随机数 (5) → interrupt → 恢复 →(返回 5)→ …
不在 task 中:获取随机数 (5) → interrupt → 恢复 → 获取新的随机数 (7) → …
这在包含多个中断调用的 人工介入 工作流中尤为重要。LangGraph 为每个 task/entrypoint 保持一组 resume 值,当遇到中断时,会基于索引将中断与对应的 resume 值匹配。因此 resume 值的顺序应当与中断的顺序一致。
了解更多