Functional API 允许您以最少的代码改动,将 LangGraph 的关键功能——持久化、记忆、human-in-the-loop 和流式传输——添加到您的应用程序中。它旨在将这些功能集成到现有代码中,这些代码可能使用标准语言原语进行分支和控制流,例如 if 语句、for 循环和函数调用。与许多需要将代码重构为显式管道或 DAG 的数据编排框架不同,Functional API 允许您在不强制执行严格执行模型的情况下集成这些功能。Functional API 使用两个关键构建块:
from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.func import entrypoint, taskfrom langgraph.types import interrupt@taskdef write_essay(topic: str) -> str: """Write an essay about the given topic.""" time.sleep(1) # A placeholder for a long-running task. return f"An essay about topic: {topic}"@entrypoint(checkpointer=InMemorySaver())def workflow(topic: str) -> dict: """A simple workflow that writes an essay and asks for a review.""" essay = write_essay("cat").result() is_approved = interrupt({ # Any json-serializable payload provided to interrupt as argument. # It will be surfaced on the client side as an Interrupt when streaming data # from the workflow. "essay": essay, # The essay we want reviewed. # We can add any additional information that we need. # For example, introduce a key called "action" with some instructions. "action": "Please approve/reject the essay", }) return { "essay": essay, # The essay that was generated "is_approved": is_approved, # Response from HIL }
import timeimport uuidfrom langgraph.func import entrypoint, taskfrom langgraph.types import interruptfrom langgraph.checkpoint.memory import InMemorySaver@taskdef write_essay(topic: str) -> str: """Write an essay about the given topic.""" time.sleep(1) # This is a placeholder for a long-running task. return f"An essay about topic: {topic}"@entrypoint(checkpointer=InMemorySaver())def workflow(topic: str) -> dict: """A simple workflow that writes an essay and asks for a review.""" essay = write_essay("cat").result() is_approved = interrupt( { # Any json-serializable payload provided to interrupt as argument. # It will be surfaced on the client side as an Interrupt when streaming data # from the workflow. "essay": essay, # The essay we want reviewed. # We can add any additional information that we need. # For example, introduce a key called "action" with some instructions. "action": "Please approve/reject the essay", } ) return { "essay": essay, # The essay that was generated "is_approved": is_approved, # Response from HIL }thread_id = str(uuid.uuid4())config = {"configurable": {"thread_id": thread_id}}for item in workflow.stream("cat", config): print(item)# > {'write_essay': 'An essay about topic: cat'}# > {# > '__interrupt__': (# > Interrupt(# > value={# > 'essay': 'An essay about topic: cat',# > 'action': 'Please approve/reject the essay'# > },# > id='b9b2b9d788f482663ced6dc755c9e981'# > ),# > )# > }
文章已编写完毕并等待审查。一旦提供了审查,我们可以恢复工作流:
Copy
from langgraph.types import Command# Get review from a user (e.g., via a UI)# In this case, we're using a bool, but this can be any json-serializable value.human_review = Truefor item in workflow.stream(Command(resume=human_review), config): print(item)
Copy
{'workflow': {'essay': 'An essay about topic: cat', 'is_approved': False}}
from langgraph.func import entrypoint@entrypoint(checkpointer=checkpointer)def my_workflow(some_input: dict) -> int: # some logic that may involve long-running tasks like API calls, # and may be interrupted for human-in-the-loop. ... return result
Copy
from langgraph.func import entrypoint@entrypoint(checkpointer=checkpointer)async def my_workflow(some_input: dict) -> int: # some logic that may involve long-running tasks like API calls, # and may be interrupted for human-in-the-loop ... return result
@entrypoint(checkpointer=checkpointer)def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]: previous = previous or 0 # This will return the previous value to the caller, saving # 2 * number to the checkpoint, which will be used in the next invocation # for the `previous` parameter. return entrypoint.final(value=previous, save=2 * number)config = { "configurable": { "thread_id": "1" }}my_workflow.invoke(3, config) # 0 (previous was None)my_workflow.invoke(1, config) # 6 (previous was 3 * 2 from the previous invocation)
要使用 human-in-the-loop 等功能,任何随机性都应封装在任务中。这保证了当执行被暂停(例如,用于 human in the loop)然后恢复时,它将遵循相同的_步骤序列_,即使任务结果是非确定性的。LangGraph 通过在任务和子图执行时持久化其结果来实现此行为。精心设计的工作流确保恢复执行遵循_相同的步骤序列_,允许正确检索先前计算的结果,而无需重新执行它们。这对于长时间运行的任务或具有非确定性结果的任务特别有用,因为它避免了重复先前已完成的工作,并允许从基本相同的位置恢复。虽然工作流的不同运行可能产生不同的结果,但恢复特定运行应始终遵循相同的记录步骤序列。这允许 LangGraph 有效地查找在图被中断之前执行的任务和子图结果,并避免重新计算它们。
幂等性确保多次运行相同操作会产生相同的结果。这有助于在步骤因失败而重新运行时防止重复 API 调用和冗余处理。始终将 API 调用放在任务函数中以进行检查点,并将它们设计为幂等的以防止重新执行。如果任务开始但未成功完成,则可能发生重新执行。然后,如果工作流恢复,任务将再次运行。使用幂等性密钥或验证现有结果以避免重复。
@entrypoint(checkpointer=checkpointer)def my_workflow(inputs: dict) -> int: # This code will be executed a second time when resuming the workflow. # Which is likely not what you want. with open("output.txt", "w") as f: f.write("Side effect executed") value = interrupt("question") return value
在此示例中,副作用被封装在任务中,确保恢复时一致执行。
Copy
from langgraph.func import task@taskdef write_to_file(): with open("output.txt", "w") as f: f.write("Side effect executed")@entrypoint(checkpointer=checkpointer)def my_workflow(inputs: dict) -> int: # The side effect is now encapsulated in a task. write_to_file().result() value = interrupt("question") return value