Skip to main content
功能API 允许您以对现有代码的最小改动,将LangGraph的关键功能(持久化记忆人机交互流式处理)添加到您的应用程序中。
关于功能API的概念信息,请参阅功能API

创建简单工作流

定义 entrypoint 时,输入仅限于函数的第一个参数。要传递多个输入,可以使用字典。
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    value = inputs["value"]
    another_value = inputs["another_value"]
    ...

my_workflow.invoke({"value": 1, "another_value": 2})
from langchain_core.utils.uuid import uuid7
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

# 检查数字是否为偶数的任务
@task
def is_even(number: int) -> bool:
    return number % 2 == 0

# 格式化消息的任务
@task
def format_message(is_even: bool) -> str:
    return "The number is even." if is_even else "The number is odd."

# 创建用于持久化的检查点保存器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: dict) -> str:
    """用于分类数字的简单工作流。"""
    even = is_even(inputs["number"]).result()
    return format_message(even).result()

# 使用唯一的线程ID运行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
result = workflow.invoke({"number": 7}, config=config)
print(result)
此示例演示了如何在语法上使用 @task@entrypoint 装饰器。 由于提供了检查点保存器,工作流结果将被持久化保存在检查点中。
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

model = init_chat_model('gpt-3.5-turbo')

# 任务:使用LLM生成文章
@task
def compose_essay(topic: str) -> str:
    """生成关于给定主题的文章。"""
    return model.invoke([
        {"role": "system", "content": "You are a helpful assistant that writes essays."},
        {"role": "user", "content": f"Write an essay about {topic}."}
    ]).content

# 创建用于持久化的检查点保存器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(topic: str) -> str:
    """使用LLM生成文章的简单工作流。"""
    return compose_essay(topic).result()

# 执行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
result = workflow.invoke("the history of flight", config=config)
print(result)

并行执行

通过并发调用任务并等待结果,可以并行执行任务。这对于提高IO密集型任务(例如,调用LLM的API)的性能非常有用。
@task
def add_one(number: int) -> int:
    return number + 1

@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
    futures = [add_one(i) for i in numbers]
    return [f.result() for f in futures]
此示例演示了如何使用 @task 并行运行多个LLM调用。每个调用生成一个关于不同主题的段落,结果被连接成单个文本输出。
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

# 初始化LLM模型
model = init_chat_model("gpt-3.5-turbo")

# 生成关于给定主题段落的任务
@task
def generate_paragraph(topic: str) -> str:
    response = model.invoke([
        {"role": "system", "content": "You are a helpful assistant that writes educational paragraphs."},
        {"role": "user", "content": f"Write a paragraph about {topic}."}
    ])
    return response.content

# 创建用于持久化的检查点保存器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(topics: list[str]) -> str:
    """并行生成多个段落并组合它们。"""
    futures = [generate_paragraph(topic) for topic in topics]
    paragraphs = [f.result() for f in futures]
    return "\n\n".join(paragraphs)

# 运行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
result = workflow.invoke(["quantum computing", "climate change", "history of aviation"], config=config)
print(result)
此示例使用LangGraph的并发模型来提高执行时间,特别是当任务涉及LLM补全等I/O操作时。

调用图

功能API图API 可以在同一个应用程序中一起使用,因为它们共享相同的底层运行时。
from langgraph.func import entrypoint
from langgraph.graph import StateGraph

builder = StateGraph()
...
some_graph = builder.compile()

@entrypoint()
def some_workflow(some_input: dict) -> int:
    # 调用使用图API定义的图
    result_1 = some_graph.invoke(...)
    # 调用另一个使用图API定义的图
    result_2 = another_graph.invoke(...)
    return {
        "result_1": result_1,
        "result_2": result_2
    }
import uuid
from typing import TypedDict
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph

# 定义共享状态类型
class State(TypedDict):
    foo: int

# 定义一个简单的转换节点
def double(state: State) -> State:
    return {"foo": state["foo"] * 2}

# 使用图API构建图
builder = StateGraph(State)
builder.add_node("double", double)
builder.set_entry_point("double")
graph = builder.compile()

# 定义功能API工作流
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(x: int) -> dict:
    result = graph.invoke({"foo": x})
    return {"bar": result["foo"]}

# 执行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
print(workflow.invoke(5, config=config))  # 输出: {'bar': 10}

调用其他入口点

您可以从 入口点任务 内部调用其他 入口点
@entrypoint() # 将自动使用父入口点的检查点保存器
def some_other_workflow(inputs: dict) -> int:
    return inputs["value"]

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    value = some_other_workflow.invoke({"value": 1})
    return value
import uuid
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

# 初始化一个检查点保存器
checkpointer = InMemorySaver()

# 一个可重用的子工作流,用于乘法运算
@entrypoint()
def multiply(inputs: dict) -> int:
    return inputs["a"] * inputs["b"]

# 调用子工作流的主工作流
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> dict:
    result = multiply.invoke({"a": inputs["x"], "b": inputs["y"]})
    return {"product": result}

# 执行主工作流
config = {"configurable": {"thread_id": str(uuid7())}}
print(main.invoke({"x": 6, "y": 7}, config=config))  # 输出: {'product': 42}

流式处理

功能API 使用与 图API 相同的流式处理机制。请阅读 流式处理指南 部分了解更多详情。 使用流式处理API同时流式传输更新和自定义数据的示例。
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_stream_writer   

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> int:
    writer = get_stream_writer()
    writer("Started processing")
    result = inputs["x"] * 2
    writer(f"Result is {result}")
    return result

config = {"configurable": {"thread_id": "abc"}}

for mode, chunk in main.stream(
    {"x": 5},
    stream_mode=["custom", "updates"],
    config=config
):
    print(f"{mode}: {chunk}")
  1. langgraph.config 导入 get_stream_writer
  2. 在入口点内获取流写入器实例。
  3. 在计算开始前发出自定义数据。
  4. 在计算结果后发出另一条自定义消息。
  5. 使用 .stream() 处理流式输出。
  6. 指定要使用的流式处理模式。
('updates', {'add_one': 2})
('updates', {'add_two': 3})
('custom', 'hello')
('custom', 'world')
('updates', {'main': 5})
Python < 3.11 的异步处理 如果使用 Python < 3.11 并编写异步代码,使用 get_stream_writer 将不起作用。请改用 StreamWriter 类。有关更多详情,请参阅 Python < 3.11 的异步处理
from langgraph.types import StreamWriter

@entrypoint(checkpointer=checkpointer)
async def main(inputs: dict, writer: StreamWriter) -> int:
...

重试策略

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy

# 此变量仅用于演示目的,模拟网络故障。
# 在实际代码中不会有这个。
attempts = 0

# 让我们配置RetryPolicy以在ValueError时重试。
# 默认的RetryPolicy针对特定网络错误进行了优化。
retry_policy = RetryPolicy(retry_on=ValueError)

@task(retry_policy=retry_policy)
def get_info():
    global attempts
    attempts += 1

    if attempts < 2:
        raise ValueError('Failure')
    return "OK"

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def main(inputs, writer):
    return get_info().result()

config = {
    "configurable": {
        "thread_id": "1"
    }
}

main.invoke({'any_input': 'foobar'}, config=config)
'OK'
@task 还支持 timeout= 以限制单次尝试的运行时间。有关 @task@entrypointTimeoutPolicy 用法,请参阅 容错性
@task@entrypoint 的超时支持需要 langgraph>=1.2,目前处于alpha阶段。

缓存任务

import time
from langgraph.cache.memory import InMemoryCache
from langgraph.func import entrypoint, task
from langgraph.types import CachePolicy


@task(cache_policy=CachePolicy(ttl=120))
def slow_add(x: int) -> int:
    time.sleep(1)
    return x * 2


@entrypoint(cache=InMemoryCache())
def main(inputs: dict) -> dict[str, int]:
    result1 = slow_add(inputs["x"]).result()
    result2 = slow_add(inputs["x"]).result()
    return {"result1": result1, "result2": result2}


for chunk in main.stream({"x": 5}, stream_mode="updates"):
    print(chunk)

#> {'slow_add': 10}
#> {'slow_add': 10, '__metadata__': {'cached': True}}
#> {'main': {'result1': 10, 'result2': 10}}
  1. ttl 以秒为单位指定。缓存将在此时间后失效。

错误后恢复

import time
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter

# 此变量仅用于演示目的,模拟网络故障。
# 在实际代码中不会有这个。
attempts = 0

@task()
def get_info():
    """
    模拟一个在成功前会失败一次的任务。
    第一次尝试时引发异常,后续尝试返回 "OK"。
    """
    global attempts
    attempts += 1

    if attempts < 2:
        raise ValueError("Failure")  # 模拟第一次尝试失败
    return "OK"

# 初始化一个内存检查点保存器用于持久化
checkpointer = InMemorySaver()

@task
def slow_task():
    """
    通过引入1秒延迟来模拟运行缓慢的任务。
    """
    time.sleep(1)
    return "Ran slow task."

@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter):
    """
    主工作流函数,顺序运行 slow_task 和 get_info 任务。

    参数:
    - inputs:包含工作流输入值的字典。
    - writer:用于流式传输自定义数据的 StreamWriter。

    工作流首先执行 `slow_task`,然后尝试执行 `get_info`,
    后者在第一次调用时会失败。
    """
    slow_task_result = slow_task().result()  # 对 slow_task 的阻塞调用
    get_info().result()  # 第一次尝试时将在此处引发异常
    return slow_task_result

# 使用唯一线程标识符的工作流执行配置
config = {
    "configurable": {
        "thread_id": "1"  # 用于跟踪工作流执行的唯一标识符
    }
}

# 由于 slow_task 的执行,此调用将花费约1秒
try:
    # 由于 `get_info` 任务失败,第一次调用将引发异常
    main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
    pass  # 优雅地处理失败
当我们恢复执行时,不需要重新运行 slow_task,因为其结果已保存在检查点中。
main.invoke(None, config=config)
'Ran slow task.'

人机交互

功能API支持使用 interrupt 函数和 Command 原语的人机交互工作流。

基本人机交互工作流

我们将创建三个任务
  1. 追加 "bar"
  2. 暂停等待人类输入。恢复时,追加人类输入。
  3. 追加 "qux"
from langgraph.func import entrypoint, task
from langgraph.types import Command, interrupt


@task
def step_1(input_query):
    """追加 bar。"""
    return f"{input_query} bar"


@task
def human_feedback(input_query):
    """追加用户输入。"""
    feedback = interrupt(f"Please provide feedback: {input_query}")
    return f"{input_query} {feedback}"


@task
def step_3(input_query):
    """追加 qux。"""
    return f"{input_query} qux"
我们现在可以在一个入口点中组合这些任务:
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()


@entrypoint(checkpointer=checkpointer)
def graph(input_query):
    result_1 = step_1(input_query).result()
    result_2 = human_feedback(result_1).result()
    result_3 = step_3(result_2).result()

    return result_3
interrupt() 在任务内部调用,使人类能够审查和编辑前一个任务的输出。先前任务的结果(在本例中为 step_1)被持久化,因此在 interrupt 之后不会再次运行。 让我们发送一个查询字符串:
config = {"configurable": {"thread_id": "1"}}

for event in graph.stream("foo", config):
    print(event)
    print("\n")
请注意,我们在 step_1 之后使用 interrupt 暂停了。该中断提供了恢复运行的说明。要恢复,我们发出一个包含 human_feedback 任务所期望数据的 Command
# 继续执行
for event in graph.stream(Command(resume="baz"), config):
    print(event)
    print("\n")
恢复后,运行继续执行剩余步骤并按预期终止。

审查工具调用

为了在执行前审查工具调用,我们添加了一个 review_tool_call 函数,该函数调用 interrupt。当调用此函数时,执行将暂停,直到我们发出恢复命令。 给定一个工具调用,我们的函数将 interrupt 以供人类审查。此时我们可以:
  • 接受工具调用
  • 修改工具调用并继续
  • 生成自定义工具消息(例如,指示模型重新格式化其工具调用)
from typing import Union

def review_tool_call(tool_call: ToolCall) -> Union[ToolCall, ToolMessage]:
    """审查工具调用,返回经过验证的版本。"""
    human_review = interrupt(
        {
            "question": "Is this correct?",
            "tool_call": tool_call,
        }
    )
    review_action = human_review["action"]
    review_data = human_review.get("data")
    if review_action == "continue":
        return tool_call
    elif review_action == "update":
        updated_tool_call = {**tool_call, **{"args": review_data}}
        return updated_tool_call
    elif review_action == "feedback":
        return ToolMessage(
            content=review_data, name=tool_call["name"], tool_call_id=tool_call["id"]
        )
我们现在可以更新我们的入口点以审查生成的工具调用。如果工具调用被接受或修改,我们以前的方式执行。否则,我们只追加人类提供的 ToolMessage。先前任务的结果(在本例中为初始模型调用)被持久化,因此在 interrupt 之后不会再次运行。
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph.message import add_messages
from langgraph.types import Command, interrupt


checkpointer = InMemorySaver()


@entrypoint(checkpointer=checkpointer)
def agent(messages, previous):
    if previous is not None:
        messages = add_messages(previous, messages)

    model_response = call_model(messages).result()
    while True:
        if not model_response.tool_calls:
            break

        # 审查工具调用
        tool_results = []
        tool_calls = []
        for i, tool_call in enumerate(model_response.tool_calls):
            review = review_tool_call(tool_call)
            if isinstance(review, ToolMessage):
                tool_results.append(review)
            else:  # 是一个经过验证的工具调用
                tool_calls.append(review)
                if review != tool_call:
                    model_response.tool_calls[i] = review  # 更新消息

        # 执行剩余的工具调用
        tool_result_futures = [call_tool(tool_call) for tool_call in tool_calls]
        remaining_tool_results = [fut.result() for fut in tool_result_futures]

        # 追加到消息列表
        messages = add_messages(
            messages,
            [model_response, *tool_results, *remaining_tool_results],
        )

        # 再次调用模型
        model_response = call_model(messages).result()

    # 生成最终响应
    messages = add_messages(messages, model_response)
    return entrypoint.final(value=model_response, save=messages)

短期记忆

短期记忆允许在相同 线程ID 的不同 调用 之间存储信息。有关更多详情,请参阅短期记忆

管理检查点

您可以查看和删除检查点保存器存储的信息。

查看线程状态

config = {
    "configurable": {
        "thread_id": "1",
        # 可选地提供特定检查点的ID,
        # 否则将显示最新的检查点
        # "checkpoint_id": "1f029ca3-1f5b-6704-8004-820c16b69a5a"  #

    }
}
graph.get_state(config)
StateSnapshot(
    values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today?), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]}, next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},
    metadata={
        'source': 'loop',
        'writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}},
        'step': 4,
        'parents': {},
        'thread_id': '1'
    },
    created_at='2025-05-05T16:01:24.680462+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
    tasks=(),
    interrupts=()
)

查看线程历史记录

config = {
    "configurable": {
        "thread_id": "1"
    }
}
list(graph.get_state_history(config))
[
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},
        metadata={'source': 'loop', 'writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}}, 'step': 4, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:24.680462+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
        tasks=(),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?")]},
        next=('call_model',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
        metadata={'source': 'loop', 'writes': None, 'step': 3, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:23.863421+00:00',
        parent_config={...}
        tasks=(PregelTask(id='8ab4155e-6b15-b885-9ce5-bed69a2c305c', name='call_model', path=('__pregel_pull', 'call_model'), error=None, interrupts=(), state=None, result={'messages': AIMessage(content='Your name is Bob.')}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')]},
        next=('__start__',),
        config={...},
        metadata={'source': 'input', 'writes': {'__start__': {'messages': [{'role': 'user', 'content': "what's my name?"}]}}, 'step': 2, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:23.863173+00:00',
        parent_config={...}
        tasks=(PregelTask(id='24ba39d6-6db1-4c9b-f4c5-682aeaf38dcd', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'messages': [{'role': 'user', 'content': "what's my name?"}]}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')]},
        next=(),
        config={...},
        metadata={'source': 'loop', 'writes': {'call_model': {'messages': AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')}}, 'step': 1, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:23.862295+00:00',
        parent_config={...}
        tasks=(),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob")]},
        next=('call_model',),
        config={...},
        metadata={'source': 'loop', 'writes': None, 'step': 0, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:22.278960+00:00',
        parent_config={...}
        tasks=(PregelTask(id='8cbd75e0-3720-b056-04f7-71ac805140a0', name='call_model', path=('__pregel_pull', 'call_model'), error=None, interrupts=(), state=None, result={'messages': AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-0870-6ce2-bfff-1f3f14c3e565'}},
        metadata={'source': 'input', 'writes': {'__start__': {'messages': [{'role': 'user', 'content': "hi! I'm bob"}]}}, 'step': -1, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:22.277497+00:00',
        parent_config=None,
        tasks=(PregelTask(id='d458367b-8265-812c-18e2-33001d199ce6', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'messages': [{'role': 'user', 'content': "hi! I'm bob"}]}),),
        interrupts=()
    )
]

将返回值与保存值解耦

使用 entrypoint.final 将返回给调用者的内容与持久化到检查点的内容解耦。这在以下情况下很有用:
  • 您想返回计算结果(例如,摘要或状态),但保存不同的内部值以供下次调用使用。
  • 您需要控制下次运行时传递给 previous 参数的内容。
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def accumulate(n: int, *, previous: int | None) -> entrypoint.final[int, int]:
    previous = previous or 0
    total = previous + n
    # 将 *previous* 值返回给调用者,但将 *new* 总计保存到检查点。
    return entrypoint.final(value=previous, save=total)

config = {"configurable": {"thread_id": "my-thread"}}

print(accumulate.invoke(1, config=config))  # 0
print(accumulate.invoke(2, config=config))  # 1
print(accumulate.invoke(3, config=config))  # 3

聊天机器人示例

一个使用功能API和 InMemorySaver 检查点保存器的简单聊天机器人示例。 该机器人能够记住之前的对话并从上次中断的地方继续。
from langchain.messages import BaseMessage
from langgraph.graph import add_messages
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-sonnet-4-6")

@task
def call_model(messages: list[BaseMessage]):
    response = model.invoke(messages)
    return response

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
    if previous:
        inputs = add_messages(previous, inputs)

    response = call_model(inputs).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

config = {"configurable": {"thread_id": "1"}}
input_message = {"role": "user", "content": "hi! I'm bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()

input_message = {"role": "user", "content": "what's my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()

长期记忆

长期记忆 允许在不同的 线程ID 之间存储信息。这对于在一个对话中学习关于给定用户的信息并在另一个对话中使用它非常有用。

工作流

  • 工作流与代理 指南,包含更多关于如何使用功能API构建工作流的示例。

与其他库集成