LangGraph SDK 允许你以多种模式从 LangSmith 部署 API 流式传输输出,从每一步后的完整状态快照到逐个令牌的 LLM 输出。线程流式传输还支持可恢复性:如果连接断开,可以使用最后一个事件 ID 重新连接,从断点处继续。
LangGraph SDK 和 Agent Server 是 LangSmith 的一部分。
基本用法
基本用法示例:
from langgraph_sdk import get_client
client = get_client ( url =< DEPLOYMENT_URL > , api_key =< API_KEY > )
# 使用以名称 "agent" 部署的图
assistant_id = "agent"
# 创建一个线程
thread = await client . threads . create ()
thread_id = thread [ " thread_id " ]
# 创建一个流式运行
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = inputs ,
stream_mode = "updates"
):
print ( chunk . data )
import { Client } from "@langchain/langgraph-sdk" ;
const client = new Client ( { apiUrl : < DEPLOYMENT_URL > , apiKey: < API_KEY > });
// 使用以名称 "agent" 部署的图
const assistantID = "agent";
// 创建一个线程
const thread = await client.threads.create();
const threadID = thread["thread_id"];
// 创建一个流式运行
const streamResponse = client.runs.stream(
threadID,
assistantID,
{
input ,
streamMode: "updates"
}
);
for await (const chunk of streamResponse) {
console . log (chunk . data);
}
创建一个线程: curl --request POST \
--url < DEPLOYMENT_UR L > /threads \
--header 'Content-Type: application/json' \
--data '{}'
创建一个流式运行: curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>'
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : <inputs>,
\" stream_mode \" : \" updates \"
}"
这是一个你可以在 Agent Server 中运行的示例图。
更多详情请参阅 LangSmith 快速入门 。 # graph.py
from typing import TypedDict
from langgraph . graph import StateGraph , START , END
class State ( TypedDict ):
topic : str
joke : str
def refine_topic ( state : State ):
return { "topic" : state [ " topic " ] + " and cats" }
def generate_joke ( state : State ):
return { "joke" : f "This is a joke about { state [ ' topic ' ] } " }
graph = (
StateGraph ( State )
. add_node ( refine_topic )
. add_node ( generate_joke )
. add_edge ( START , "refine_topic" )
. add_edge ( "refine_topic" , "generate_joke" )
. add_edge ( "generate_joke" , END )
. compile ()
)
一旦你有了一个运行的 Agent Server,你可以使用
LangGraph SDK 与之交互。 from langgraph_sdk import get_client
client = get_client ( url =< DEPLOYMENT_URL > )
# 使用以名称 "agent" 部署的图
assistant_id = "agent"
# 创建一个线程
thread = await client . threads . create ()
thread_id = thread [ " thread_id " ]
# 创建一个流式运行
async for chunk in client . runs . stream ( # (1)!
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "updates" # (2)!
):
print ( chunk . data )
client.runs.stream() 方法返回一个迭代器,该迭代器产生流式输出。
2. 设置 stream_mode="updates" 以仅流式传输每个节点后图状态的更新。其他流模式也可用。详情请参阅支持的流模式 。
import { Client } from "@langchain/langgraph-sdk" ;
const client = new Client ( { apiUrl : < DEPLOYMENT_URL > });
// 使用以名称 "agent" 部署的图
const assistantID = "agent";
// 创建一个线程
const thread = await client.threads.create();
const threadID = thread["thread_id"];
// 创建一个流式运行
const streamResponse = client.runs.stream( // (1)!
threadID,
assistantID,
{
input: { topic : "ice cream" },
streamMode: "updates" // (2)!
}
);
for await (const chunk of streamResponse) {
console . log (chunk . data);
}
client.runs.stream() 方法返回一个迭代器,该迭代器产生流式输出。
设置 streamMode: "updates" 以仅流式传输每个节点后图状态的更新。其他流模式也可用。详情请参阅支持的流模式 。
创建一个线程: curl --request POST \
--url < DEPLOYMENT_UR L > /threads \
--header 'Content-Type: application/json' \
--data '{}'
创建一个流式运行: curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" updates \"
}"
{ 'run_id' : '1f02c2b3-3cef-68de-b720-eec2a4a8e920' , 'attempt' : 1 }
{ 'refine_topic' : { 'topic' : 'ice cream and cats' }}
{ 'generate_joke' : { 'joke' : 'This is a joke about ice cream and cats' }}
支持的流模式
模式 描述 LangGraph 库方法 values在每个超级步骤 后流式传输完整的图状态。 .stream() / .astream() 配合 stream_mode="values"updates流式传输图每一步后状态的更新。如果在同一步中进行了多次更新(例如,运行了多个节点),这些更新将分别流式传输。 .stream() / .astream() 配合 stream_mode="updates"messages-tuple流式传输调用 LLM 的图节点的 LLM 令牌和元数据(对聊天应用很有用)。 .stream() / .astream() 配合 stream_mode="messages"debug在图执行过程中流式传输尽可能多的信息。 .stream() / .astream() 配合 stream_mode="debug"custom从你的图内部流式传输自定义数据 .stream() / .astream() 配合 stream_mode="custom"events流式传输所有事件(包括图的状态);在迁移大型 LCEL 应用时主要使用。 .astream_events()
流式传输多种模式
你可以将一个列表作为 stream_mode 参数传递,以同时流式传输多种模式。
流式输出将是 (mode, chunk) 的元组,其中 mode 是流模式的名称,chunk 是该模式流式传输的数据。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = inputs ,
stream_mode = [ "updates" , "custom" ]
):
print ( chunk )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input ,
streamMode : [ "updates" , "custom" ]
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : <inputs>,
\" stream_mode \" : [
\" updates \"
\" custom \"
]
}"
流式传输图状态
使用流模式 updates 和 values 在图执行时流式传输其状态。
updates 流式传输图每一步后状态的更新 。
values 流式传输图每一步后状态的完整值 。
from typing import TypedDict
from langgraph . graph import StateGraph , START , END
class State ( TypedDict ):
topic : str
joke : str
def refine_topic ( state : State ):
return { "topic" : state [ " topic " ] + " and cats" }
def generate_joke ( state : State ):
return { "joke" : f "This is a joke about { state [ ' topic ' ] } " }
graph = (
StateGraph ( State )
. add_node ( refine_topic )
. add_node ( generate_joke )
. add_edge ( START , "refine_topic" )
. add_edge ( "refine_topic" , "generate_joke" )
. add_edge ( "generate_joke" , END )
. compile ()
)
有状态运行
下面的示例假设你希望将流式运行的输出持久化 到检查点 数据库中,并且已经创建了一个线程。要创建一个线程:from langgraph_sdk import get_client
client = get_client ( url =< DEPLOYMENT_URL > )
# 使用以名称 "agent" 部署的图
assistant_id = "agent"
# 创建一个线程
thread = await client . threads . create ()
thread_id = thread [ " thread_id " ]
import { Client } from "@langchain/langgraph-sdk" ;
const client = new Client ( { apiUrl : < DEPLOYMENT_URL > });
// 使用以名称 "agent" 部署的图
const assistantID = "agent";
// 创建一个线程
const thread = await client.threads.create();
const threadID = thread["thread_id"]
curl --request POST \
--url < DEPLOYMENT_UR L > /threads \
--header 'Content-Type: application/json' \
--data '{}'
如果你不需要持久化运行的输出,可以在流式传输时传递 None 而不是 thread_id。
流模式:updates
使用此模式仅流式传输节点在每一步后返回的状态更新 。流式输出包括节点的名称以及更新内容。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "updates"
):
print ( chunk . data )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { topic : "ice cream" },
streamMode : "updates"
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk . data) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" updates \"
}"
流模式:values
使用此模式在每一步后流式传输图的完整状态 。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "values"
):
print ( chunk . data )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { topic : "ice cream" },
streamMode : "values"
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk . data) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" values \"
}"
要将子图 的输出包含在流式输出中,你可以在父图的 .stream() 方法中设置 subgraphs=True。这将同时流式传输父图和任何子图的输出。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "foo" : "foo" },
stream_subgraphs = True , # (1)!
stream_mode = "updates" ,
):
print ( chunk )
设置 stream_subgraphs=True 以流式传输子图的输出。
使用 debug 流模式在图执行过程中流式传输尽可能多的信息。流式输出包括节点的名称以及完整的状态。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "debug"
):
print ( chunk . data )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { topic : "ice cream" },
streamMode : "debug"
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk . data) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" debug \"
}"
LLM 令牌
使用 messages-tuple 流模式从图的任何部分(包括节点、工具、子图或任务)逐个令牌 地流式传输大型语言模型 (LLM) 的输出。
messages-tuple 模式 的流式输出是一个元组 (message_chunk, metadata),其中:
message_chunk:来自 LLM 的令牌或消息片段。
metadata:一个包含图节点和 LLM 调用详细信息的字典。
from dataclasses import dataclass
from langchain . chat_models import init_chat_model
from langgraph . graph import StateGraph , START
@dataclass
class MyState :
topic : str
joke : str = ""
model = init_chat_model ( model = "gpt-5.4-mini" )
def call_model ( state : MyState ):
"""调用 LLM 生成关于某个主题的笑话"""
model_response = model . invoke ( # (1)!
[
{ "role" : "user" , "content" : f "Generate a joke about { state . topic } " }
]
)
return { "joke" : model_response . content }
graph = (
StateGraph ( MyState )
. add_node ( call_model )
. add_edge ( START , "call_model" )
. compile ()
)
注意,即使 LLM 是使用 invoke 而不是 stream 运行的,也会发出消息事件。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "messages-tuple" ,
):
if chunk . event != "messages" :
continue
message_chunk , metadata = chunk . data # (1)!
if message_chunk [ " content " ]:
print ( message_chunk [ " content " ], end = "|" , flush = True )
“messages-tuple” 流模式返回一个元组 (message_chunk, metadata) 的迭代器,其中 message_chunk 是 LLM 流式传输的令牌,metadata 是一个包含有关调用 LLM 的图节点信息和其他信息的字典。
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { topic : "ice cream" },
streamMode : "messages-tuple"
}
) ;
for await ( const chunk of streamResponse) {
if (chunk . event !== "messages" ) {
continue ;
}
console . log (chunk . data[ 0 ][ "content" ]) ; // (1)!
}
“messages-tuple” 流模式返回一个元组 (message_chunk, metadata) 的迭代器,其中 message_chunk 是 LLM 流式传输的令牌,metadata 是一个包含有关调用 LLM 的图节点信息和其他信息的字典。
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" messages-tuple \"
}"
过滤 LLM 令牌
流式传输自定义数据
要发送自定义的用户定义数据 :
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "query" : "example" },
stream_mode = "custom"
):
print ( chunk . data )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { query : "example" },
streamMode : "custom"
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk . data) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" query \" : \" example \" },
\" stream_mode \" : \" custom \"
}"
流式传输事件
要流式传输所有事件,包括图的状态:
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "events"
):
print ( chunk . data )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { topic : "ice cream" },
streamMode : "events"
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk . data) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" events \"
}"
无状态运行
如果你不想将流式运行的输出持久化 到检查点 数据库中,你可以在不创建线程的情况下创建一个无状态运行:
from langgraph_sdk import get_client
client = get_client ( url =< DEPLOYMENT_URL > , api_key =< API_KEY > )
async for chunk in client . runs . stream (
None , # (1)!
assistant_id ,
input = inputs ,
stream_mode = "updates"
):
print ( chunk . data )
我们传递 None 而不是 thread_id UUID。
import { Client } from "@langchain/langgraph-sdk" ;
const client = new Client ( { apiUrl : < DEPLOYMENT_URL > , apiKey: < API_KEY > });
# 创建一个流式运行
const streamResponse = client.runs.stream(
null, # (1)!
assistantID,
{
input ,
streamMode: "updates"
}
);
for await (const chunk of streamResponse) {
console . log (chunk . data);
}
我们传递 None 而不是 thread_id UUID。
curl --request POST \
--url < DEPLOYMENT_UR L > /runs/stream \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>'
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : <inputs>,
\" stream_mode \" : \" updates \"
}"
加入并流式传输
LangSmith 允许你加入一个活动的后台运行 并从中流式传输输出。为此,你可以使用 LangGraph SDK 的 client.runs.join_stream 方法:
from langgraph_sdk import get_client
client = get_client ( url =< DEPLOYMENT_URL > , api_key =< API_KEY > )
async for chunk in client . runs . join_stream (
thread_id ,
run_id , # (1)!
):
print ( chunk )
这是你想要加入的现有运行的 run_id。
import { Client } from "@langchain/langgraph-sdk" ;
const client = new Client ( { apiUrl : < DEPLOYMENT_URL > , apiKey: < API_KEY > });
const streamResponse = client.runs.joinStream(
threadID,
runId // (1)!
);
for await (const chunk of streamResponse) {
console . log (chunk);
}
这是你想要加入的现有运行的 run_id。
curl --request GET \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/ < RUN_I D > /stream \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>'
输出未缓冲
当你使用 .join_stream 时,输出不会被缓冲,因此加入之前产生的任何输出都不会被接收到。
流式传输线程
线程流式传输为线程打开一个长期连接,并流式传输在该线程上执行的每次运行 的输出。这使你可以从单个连接监控线程上的所有活动,例如,在聊天 UI 中,可能通过后续消息、人机交互 恢复或后台运行 随时间触发多次运行。要通过 ID 加入特定的现有运行,请参阅加入并流式传输 。
比较线程流式传输和运行流式传输
线程流式传输 运行流式传输 SDK 方法 client.threads.join_stream()client.runs.stream()REST 端点 GET /threads/{thread_id}/streamPOST /threads/{thread_id}/runs/stream范围 线程上的所有运行 单次运行 连接生命周期 无限期打开 运行完成时关闭 创建运行 否 是 用例 监控正在进行的线程活动 执行并流式传输单次交互
基本用法
from langgraph_sdk import get_client
client = get_client ( url =< DEPLOYMENT_URL > , api_key =< API_KEY > )
thread = await client . threads . create ()
thread_id = thread [ " thread_id " ]
async for chunk in client . threads . join_stream ( thread_id ):
print ( chunk )
import { Client } from "@langchain/langgraph-sdk" ;
const client = new Client ( { apiUrl : < DEPLOYMENT_URL > , apiKey: < API_KEY > });
const thread = await client.threads.create();
const threadID = thread["thread_id"];
for await (const chunk of client.threads.joinStream(threadID)) {
console . log (chunk);
}
curl --request GET \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /stream \
--header 'x-api-key: <API_KEY>'
线程流模式
线程流式传输支持三种流模式,用于控制返回哪些事件。通过 stream_mode 参数传递一个或多个模式。
模式 描述 run_modes(默认)流式传输所有运行事件,等同于 client.runs.stream() 的输出。 lifecycle仅流式传输运行开始和结束事件。用于轻量级监控运行状态,无需完整输出。 state_update仅流式传输状态更新事件,在每次运行完成后提供线程状态。
async for chunk in client . threads . join_stream (
thread_id ,
stream_mode = [ "lifecycle" , "state_update" ],
):
print ( chunk . event , chunk . data )
for await ( const chunk of client . threads . joinStream (threadID , {
streamMode : [ "lifecycle" , "state_update" ] ,
} )) {
console . log (chunk . event , chunk . data) ;
}
curl --request GET \
--url '<DEPLOYMENT_URL>/threads/<THREAD_ID>/stream?stream_modes=lifecycle&stream_modes=state_update' \
--header 'x-api-key: <API_KEY>'
从最后一个事件恢复
线程流式传输通过 Last-Event-ID 头支持可恢复性。如果连接断开,传递你接收到的最后一个事件的 ID 以恢复而不丢失事件。传递 "-" 以从头开始重放。
async for chunk in client . threads . join_stream (
thread_id ,
last_event_id = "<LAST_EVENT_ID>" ,
):
print ( chunk )
for await ( const chunk of client . threads . joinStream (threadID , {
lastEventId : "<LAST_EVENT_ID>" ,
} )) {
console . log (chunk) ;
}
curl --request GET \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /stream \
--header 'x-api-key: <API_KEY>' \
--header 'Last-Event-ID: <LAST_EVENT_ID>'
API 参考
有关 API 用法和实现,请参阅 API 参考 。
将这些文档连接 到 Claude、VSCode 等,通过 MCP 获取实时答案。