Skip to main content
LangGraph SDK 允许你以多种模式从 LangSmith 部署 API 流式传输输出,从每一步后的完整状态快照到逐个令牌的 LLM 输出。线程流式传输还支持可恢复性:如果连接断开,可以使用最后一个事件 ID 重新连接,从断点处继续。
LangGraph SDK 和 Agent Server 是 LangSmith 的一部分。

基本用法

基本用法示例:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

# 使用以名称 "agent" 部署的图
assistant_id = "agent"

# 创建一个线程
thread = await client.threads.create()
thread_id = thread["thread_id"]

# 创建一个流式运行
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)
这是一个你可以在 Agent Server 中运行的示例图。 更多详情请参阅 LangSmith 快速入门
# graph.py
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)
一旦你有了一个运行的 Agent Server,你可以使用 LangGraph SDK 与之交互。
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# 使用以名称 "agent" 部署的图
assistant_id = "agent"

# 创建一个线程
thread = await client.threads.create()
thread_id = thread["thread_id"]

# 创建一个流式运行
async for chunk in client.runs.stream(  # (1)!
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="updates"  # (2)!
):
    print(chunk.data)
  1. client.runs.stream() 方法返回一个迭代器,该迭代器产生流式输出。 2. 设置 stream_mode="updates" 以仅流式传输每个节点后图状态的更新。其他流模式也可用。详情请参阅支持的流模式
{'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1}
{'refine_topic': {'topic': 'ice cream and cats'}}
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}

支持的流模式

模式描述LangGraph 库方法
values在每个超级步骤后流式传输完整的图状态。.stream() / .astream() 配合 stream_mode="values"
updates流式传输图每一步后状态的更新。如果在同一步中进行了多次更新(例如,运行了多个节点),这些更新将分别流式传输。.stream() / .astream() 配合 stream_mode="updates"
messages-tuple流式传输调用 LLM 的图节点的 LLM 令牌和元数据(对聊天应用很有用)。.stream() / .astream() 配合 stream_mode="messages"
debug在图执行过程中流式传输尽可能多的信息。.stream() / .astream() 配合 stream_mode="debug"
custom从你的图内部流式传输自定义数据.stream() / .astream() 配合 stream_mode="custom"
events流式传输所有事件(包括图的状态);在迁移大型 LCEL 应用时主要使用。.astream_events()

流式传输多种模式

你可以将一个列表作为 stream_mode 参数传递,以同时流式传输多种模式。 流式输出将是 (mode, chunk) 的元组,其中 mode 是流模式的名称,chunk 是该模式流式传输的数据。
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode=["updates", "custom"]
):
    print(chunk)

流式传输图状态

使用流模式 updatesvalues 在图执行时流式传输其状态。
  • updates 流式传输图每一步后状态的更新
  • values 流式传输图每一步后状态的完整值
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
  topic: str
  joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
  StateGraph(State)
  .add_node(refine_topic)
  .add_node(generate_joke)
  .add_edge(START, "refine_topic")
  .add_edge("refine_topic", "generate_joke")
  .add_edge("generate_joke", END)
  .compile()
)
有状态运行 下面的示例假设你希望将流式运行的输出持久化检查点数据库中,并且已经创建了一个线程。要创建一个线程:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# 使用以名称 "agent" 部署的图
assistant_id = "agent"
# 创建一个线程
thread = await client.threads.create()
thread_id = thread["thread_id"]
如果你不需要持久化运行的输出,可以在流式传输时传递 None 而不是 thread_id

流模式:updates

使用此模式仅流式传输节点在每一步后返回的状态更新。流式输出包括节点的名称以及更新内容。
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="updates"
):
    print(chunk.data)

流模式:values

使用此模式在每一步后流式传输图的完整状态
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="values"
):
    print(chunk.data)

子图

要将子图的输出包含在流式输出中,你可以在父图的 .stream() 方法中设置 subgraphs=True。这将同时流式传输父图和任何子图的输出。
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"foo": "foo"},
    stream_subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. 设置 stream_subgraphs=True 以流式传输子图的输出。
这是一个你可以在 Agent Server 中运行的示例图。 更多详情请参阅 LangSmith 快速入门
# graph.py
from langgraph.graph import START, StateGraph
from typing import TypedDict

# 定义子图
class SubgraphState(TypedDict):
    foo: str  # 注意此键与父图状态共享
    bar: str

def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}

def subgraph_node_2(state: SubgraphState):
    return {"foo": state["foo"] + state["bar"]}

subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# 定义父图
class ParentState(TypedDict):
    foo: str

def node_1(state: ParentState):
    return {"foo": "hi! " + state["foo"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
一旦你有了一个运行的 Agent Server,你可以使用 LangGraph SDK 与之交互。
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# 使用以名称 "agent" 部署的图
assistant_id = "agent"

# 创建一个线程
thread = await client.threads.create()
thread_id = thread["thread_id"]

async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"foo": "foo"},
    stream_subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. 设置 stream_subgraphs=True 以流式传输子图的输出。
注意,我们不仅接收节点更新,还接收命名空间,这些命名空间告诉我们正在从哪个图(或子图)进行流式传输。

调试

使用 debug 流模式在图执行过程中流式传输尽可能多的信息。流式输出包括节点的名称以及完整的状态。
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="debug"
):
    print(chunk.data)

LLM 令牌

使用 messages-tuple 流模式从图的任何部分(包括节点、工具、子图或任务)逐个令牌地流式传输大型语言模型 (LLM) 的输出。 messages-tuple 模式的流式输出是一个元组 (message_chunk, metadata),其中:
  • message_chunk:来自 LLM 的令牌或消息片段。
  • metadata:一个包含图节点和 LLM 调用详细信息的字典。
from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START

@dataclass
class MyState:
    topic: str
    joke: str = ""

model = init_chat_model(model="gpt-5.4-mini")

def call_model(state: MyState):
    """调用 LLM 生成关于某个主题的笑话"""
    model_response = model.invoke( # (1)!
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": model_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)
  1. 注意,即使 LLM 是使用 invoke 而不是 stream 运行的,也会发出消息事件。
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="messages-tuple",
):
    if chunk.event != "messages":
        continue

    message_chunk, metadata = chunk.data  # (1)!
    if message_chunk["content"]:
        print(message_chunk["content"], end="|", flush=True)
  1. “messages-tuple” 流模式返回一个元组 (message_chunk, metadata) 的迭代器,其中 message_chunk 是 LLM 流式传输的令牌,metadata 是一个包含有关调用 LLM 的图节点信息和其他信息的字典。

过滤 LLM 令牌

流式传输自定义数据

要发送自定义的用户定义数据
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"query": "example"},
    stream_mode="custom"
):
    print(chunk.data)

流式传输事件

要流式传输所有事件,包括图的状态:
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="events"
):
    print(chunk.data)

无状态运行

如果你不想将流式运行的输出持久化检查点数据库中,你可以在不创建线程的情况下创建一个无状态运行:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

async for chunk in client.runs.stream(
    None,  # (1)!
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)
  1. 我们传递 None 而不是 thread_id UUID。

加入并流式传输

LangSmith 允许你加入一个活动的后台运行并从中流式传输输出。为此,你可以使用 LangGraph SDK 的 client.runs.join_stream 方法:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

async for chunk in client.runs.join_stream(
    thread_id,
    run_id,  # (1)!
):
    print(chunk)
  1. 这是你想要加入的现有运行的 run_id
输出未缓冲 当你使用 .join_stream 时,输出不会被缓冲,因此加入之前产生的任何输出都不会被接收到。

流式传输线程

线程流式传输为线程打开一个长期连接,并流式传输在该线程上执行的每次运行的输出。这使你可以从单个连接监控线程上的所有活动,例如,在聊天 UI 中,可能通过后续消息、人机交互恢复或后台运行随时间触发多次运行。要通过 ID 加入特定的现有运行,请参阅加入并流式传输

比较线程流式传输和运行流式传输

线程流式传输运行流式传输
SDK 方法client.threads.join_stream()client.runs.stream()
REST 端点GET /threads/{thread_id}/streamPOST /threads/{thread_id}/runs/stream
范围线程上的所有运行单次运行
连接生命周期无限期打开运行完成时关闭
创建运行
用例监控正在进行的线程活动执行并流式传输单次交互

基本用法

from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

thread = await client.threads.create()
thread_id = thread["thread_id"]

async for chunk in client.threads.join_stream(thread_id):
    print(chunk)

线程流模式

线程流式传输支持三种流模式,用于控制返回哪些事件。通过 stream_mode 参数传递一个或多个模式。
模式描述
run_modes(默认)流式传输所有运行事件,等同于 client.runs.stream() 的输出。
lifecycle仅流式传输运行开始和结束事件。用于轻量级监控运行状态,无需完整输出。
state_update仅流式传输状态更新事件,在每次运行完成后提供线程状态。
async for chunk in client.threads.join_stream(
    thread_id,
    stream_mode=["lifecycle", "state_update"],
):
    print(chunk.event, chunk.data)

从最后一个事件恢复

线程流式传输通过 Last-Event-ID 头支持可恢复性。如果连接断开,传递你接收到的最后一个事件的 ID 以恢复而不丢失事件。传递 "-" 以从头开始重放。
async for chunk in client.threads.join_stream(
    thread_id,
    last_event_id="<LAST_EVENT_ID>",
):
    print(chunk)

API 参考

有关 API 用法和实现,请参阅 API 参考