-
流式输出图状态 — 使用
updates和values模式获取状态更新/值。 - 流式输出子图结果 — 包含父图和任意嵌套子图的输出。
- 流式输出 LLM token — 从任意位置捕获 token 流:节点内部、子图或工具中。
- 流式输出自定义数据 — 直接从工具函数发送自定义更新或进度信号。
-
使用多种流式模式 — 可选择
values(完整状态)、updates(状态增量)、messages(LLM token + 元数据)、custom(任意用户数据)或debug(详细追踪)。
支持的流式模式
将以下一种或多种流式模式以列表形式传入stream 或 astream 方法:
| 模式 | 描述 |
|---|---|
values | 在图的每一步执行后,流式输出状态的完整值。 |
updates | 在图的每一步执行后,流式输出状态的更新内容。如果在同一步骤中进行了多次更新(例如运行了多个节点),这些更新将分别进行流式输出。 |
custom | 从图节点内部流式输出自定义数据。 |
messages | 从调用了 LLM 的任意图节点中流式输出二元组(LLM token,元数据)。 |
debug | 在图的整个执行过程中尽可能多地流式输出信息。 |
基本使用示例
LangGraph 图提供了stream(同步)和 astream(异步)方法,以迭代器形式产生流式输出。
Copy
for chunk in graph.stream(inputs, stream_mode="updates"):
print(chunk)
扩展示例:流式输出更新
扩展示例:流式输出更新
Copy
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
# The stream() method returns an iterator that yields streamed outputs
for chunk in graph.stream(
{"topic": "ice cream"},
# Set stream_mode="updates" to stream only the updates to the graph state after each node
# Other stream modes are also available. See supported stream modes for details
stream_mode="updates",
):
print(chunk)
Copy
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}
使用多种流式模式
你可以将列表作为stream_mode 参数传入,以同时启用多种流式模式。
流式输出将以 (mode, chunk) 元组形式返回,其中 mode 为流式模式名称,chunk 为该模式所输出的数据。
Copy
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
print(chunk)
流式输出图状态
使用updates 和 values 流式模式,可在图执行过程中流式输出图的状态。
updates流式输出每一步执行后状态的更新内容。values流式输出每一步执行后状态的完整值。
Copy
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
- updates
- values
使用此模式可仅流式输出每步执行后节点返回的状态更新内容。流式输出包含节点名称及更新内容。
Copy
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode="updates",
):
print(chunk)
使用此模式可流式输出每步执行后图的完整状态。
Copy
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode="values",
):
print(chunk)
流式输出子图结果
要在流式输出中包含来自子图的输出,可在父图的.stream() 方法中设置 subgraphs=True。这将同时流式输出父图和任意子图的结果。
输出将以元组 (namespace, data) 形式流式返回,其中 namespace 是一个元组,包含调用子图的节点路径,例如 ("parent_node:<task_id>", "child_node:<task_id>")。
Copy
for chunk in graph.stream(
{"foo": "foo"},
# Set subgraphs=True to stream outputs from subgraphs
subgraphs=True,
stream_mode="updates",
):
print(chunk)
扩展示例:从子图流式输出
扩展示例:从子图流式输出
Copy
from langgraph.graph import START, StateGraph
from typing import TypedDict
# Define subgraph
class SubgraphState(TypedDict):
foo: str # note that this key is shared with the parent graph state
bar: str
def subgraph_node_1(state: SubgraphState):
return {"bar": "bar"}
def subgraph_node_2(state: SubgraphState):
return {"foo": state["foo"] + state["bar"]}
subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()
# Define parent graph
class ParentState(TypedDict):
foo: str
def node_1(state: ParentState):
return {"foo": "hi! " + state["foo"]}
builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
for chunk in graph.stream(
{"foo": "foo"},
stream_mode="updates",
# Set subgraphs=True to stream outputs from subgraphs
subgraphs=True,
):
print(chunk)
Copy
((), {'node_1': {'foo': 'hi! foo'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_2': {'foo': 'hi! foobar'}})
((), {'node_2': {'foo': 'hi! foobar'}})
调试
使用debug 流式模式,可在图的整个执行过程中尽可能多地流式输出信息。流式输出包含节点名称及完整状态。
Copy
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode="debug",
):
print(chunk)
LLM token
使用messages 流式模式,可从图的任意位置(包括节点、工具、子图或任务)逐 token 流式输出大语言模型(LLM)的输出内容。
messages 模式的流式输出是一个元组 (message_chunk, metadata),其中:
message_chunk:来自 LLM 的 token 或消息片段。metadata:包含图节点及 LLM 调用详情的字典。
如果你的 LLM 没有 LangChain 集成可用,可改用 custom 模式流式输出其结果。详见与任意 LLM 配合使用。
Python < 3.11 中异步使用需手动传入 config
在 Python < 3.11 的异步代码中,必须显式将
RunnableConfig 传递给 ainvoke(),以确保正确的流式输出。详见 Python < 3.11 的异步用法,或升级至 Python 3.11+。Copy
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START
@dataclass
class MyState:
topic: str
joke: str = ""
model = init_chat_model(model="gpt-4.1-mini")
def call_model(state: MyState):
"""Call the LLM to generate a joke about a topic"""
# Note that message events are emitted even when the LLM is run using .invoke rather than .stream
model_response = model.invoke(
[
{"role": "user", "content": f"Generate a joke about {state.topic}"}
]
)
return {"joke": model_response.content}
graph = (
StateGraph(MyState)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
# The "messages" stream mode returns an iterator of tuples (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for message_chunk, metadata in graph.stream(
{"topic": "ice cream"},
stream_mode="messages",
):
if message_chunk.content:
print(message_chunk.content, end="|", flush=True)
按 LLM 调用过滤
你可以为 LLM 调用关联tags,以按 LLM 调用过滤流式 token。
Copy
from langchain.chat_models import init_chat_model
# model_1 is tagged with "joke"
model_1 = init_chat_model(model="gpt-4.1-mini", tags=['joke'])
# model_2 is tagged with "poem"
model_2 = init_chat_model(model="gpt-4.1-mini", tags=['poem'])
graph = ... # define a graph that uses these LLMs
# The stream_mode is set to "messages" to stream LLM tokens
# The metadata contains information about the LLM invocation, including the tags
async for msg, metadata in graph.astream(
{"topic": "cats"},
stream_mode="messages",
):
# Filter the streamed tokens by the tags field in the metadata to only include
# the tokens from the LLM invocation with the "joke" tag
if metadata["tags"] == ["joke"]:
print(msg.content, end="|", flush=True)
扩展示例:按 tag 过滤
扩展示例:按 tag 过滤
Copy
from typing import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import START, StateGraph
# The joke_model is tagged with "joke"
joke_model = init_chat_model(model="gpt-4.1-mini", tags=["joke"])
# The poem_model is tagged with "poem"
poem_model = init_chat_model(model="gpt-4.1-mini", tags=["poem"])
class State(TypedDict):
topic: str
joke: str
poem: str
async def call_model(state, config):
topic = state["topic"]
print("Writing joke...")
# Note: Passing the config through explicitly is required for python < 3.11
# Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
# The config is passed through explicitly to ensure the context vars are propagated correctly
# This is required for Python < 3.11 when using async code. Please see the async section for more details
joke_response = await joke_model.ainvoke(
[{"role": "user", "content": f"Write a joke about {topic}"}],
config,
)
print("\n\nWriting poem...")
poem_response = await poem_model.ainvoke(
[{"role": "user", "content": f"Write a short poem about {topic}"}],
config,
)
return {"joke": joke_response.content, "poem": poem_response.content}
graph = (
StateGraph(State)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
# The stream_mode is set to "messages" to stream LLM tokens
# The metadata contains information about the LLM invocation, including the tags
async for msg, metadata in graph.astream(
{"topic": "cats"},
stream_mode="messages",
):
if metadata["tags"] == ["joke"]:
print(msg.content, end="|", flush=True)
按节点过滤
若只需从特定节点流式输出 token,可使用stream_mode="messages" 并按流式元数据中的 langgraph_node 字段过滤输出:
Copy
# The "messages" stream mode returns a tuple of (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for msg, metadata in graph.stream(
inputs,
stream_mode="messages",
):
# Filter the streamed tokens by the langgraph_node field in the metadata
# to only include the tokens from the specified node
if msg.content and metadata["langgraph_node"] == "some_node_name":
...
扩展示例:从特定节点流式输出 LLM token
扩展示例:从特定节点流式输出 LLM token
Copy
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4.1-mini")
class State(TypedDict):
topic: str
joke: str
poem: str
def write_joke(state: State):
topic = state["topic"]
joke_response = model.invoke(
[{"role": "user", "content": f"Write a joke about {topic}"}]
)
return {"joke": joke_response.content}
def write_poem(state: State):
topic = state["topic"]
poem_response = model.invoke(
[{"role": "user", "content": f"Write a short poem about {topic}"}]
)
return {"poem": poem_response.content}
graph = (
StateGraph(State)
.add_node(write_joke)
.add_node(write_poem)
# write both the joke and the poem concurrently
.add_edge(START, "write_joke")
.add_edge(START, "write_poem")
.compile()
)
# The "messages" stream mode returns a tuple of (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for msg, metadata in graph.stream(
{"topic": "cats"},
stream_mode="messages",
):
# Filter the streamed tokens by the langgraph_node field in the metadata
# to only include the tokens from the write_poem node
if msg.content and metadata["langgraph_node"] == "write_poem":
print(msg.content, end="|", flush=True)
流式输出自定义数据
要从 LangGraph 节点或工具内部发送用户自定义数据,请按以下步骤操作:- 使用
get_stream_writer获取流写入器并发出自定义数据。 - 调用
.stream()或.astream()时设置stream_mode="custom"以获取流中的自定义数据。可以组合多种模式(例如["updates", "custom"]),但其中至少有一个必须为"custom"。
Python < 3.11 的异步代码中不支持
get_stream_writer
在 Python < 3.11 的异步代码中,get_stream_writer 将无法正常工作。
请改为在节点或工具中添加 writer 参数并手动传入。
详见 Python < 3.11 的异步用法。- node
- tool
Copy
from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START
class State(TypedDict):
query: str
answer: str
def node(state: State):
# Get the stream writer to send custom data
writer = get_stream_writer()
# Emit a custom key-value pair (e.g., progress update)
writer({"custom_key": "Generating custom data inside node"})
return {"answer": "some data"}
graph = (
StateGraph(State)
.add_node(node)
.add_edge(START, "node")
.compile()
)
inputs = {"query": "example"}
# Set stream_mode="custom" to receive the custom data in the stream
for chunk in graph.stream(inputs, stream_mode="custom"):
print(chunk)
Copy
from langchain.tools import tool
from langgraph.config import get_stream_writer
@tool
def query_database(query: str) -> str:
"""Query the database."""
# Access the stream writer to send custom data
writer = get_stream_writer()
# Emit a custom key-value pair (e.g., progress update)
writer({"data": "Retrieved 0/100 records", "type": "progress"})
# perform query
# Emit another custom key-value pair
writer({"data": "Retrieved 100/100 records", "type": "progress"})
return "some-answer"
graph = ... # define a graph that uses this tool
# Set stream_mode="custom" to receive the custom data in the stream
for chunk in graph.stream(inputs, stream_mode="custom"):
print(chunk)
与任意 LLM 配合使用
你可以使用stream_mode="custom" 从任意 LLM API 流式输出数据——即使该 API 未实现 LangChain 聊天模型接口。
这使你能够集成原始 LLM 客户端或提供自有流式接口的外部服务,让 LangGraph 在自定义场景下具备极高的灵活性。
Copy
from langgraph.config import get_stream_writer
def call_arbitrary_model(state):
"""Example node that calls an arbitrary model and streams the output"""
# Get the stream writer to send custom data
writer = get_stream_writer()
# Assume you have a streaming client that yields chunks
# Generate LLM tokens using your custom streaming client
for chunk in your_custom_streaming_client(state["topic"]):
# Use the writer to send custom data to the stream
writer({"custom_llm_chunk": chunk})
return {"result": "completed"}
graph = (
StateGraph(State)
.add_node(call_arbitrary_model)
# Add other nodes and edges as needed
.compile()
)
# Set stream_mode="custom" to receive the custom data in the stream
for chunk in graph.stream(
{"topic": "cats"},
stream_mode="custom",
):
# The chunk will contain the custom data streamed from the llm
print(chunk)
扩展示例:流式输出任意聊天模型
扩展示例:流式输出任意聊天模型
Copy
import operator
import json
from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START
from openai import AsyncOpenAI
openai_client = AsyncOpenAI()
model_name = "gpt-4.1-mini"
async def stream_tokens(model_name: str, messages: list[dict]):
response = await openai_client.chat.completions.create(
messages=messages, model=model_name, stream=True
)
role = None
async for chunk in response:
delta = chunk.choices[0].delta
if delta.role is not None:
role = delta.role
if delta.content:
yield {"role": role, "content": delta.content}
# this is our tool
async def get_items(place: str) -> str:
"""Use this tool to list items one might find in a place you're asked about."""
writer = get_stream_writer()
response = ""
async for msg_chunk in stream_tokens(
model_name,
[
{
"role": "user",
"content": (
"Can you tell me what kind of items "
f"i might find in the following place: '{place}'. "
"List at least 3 such items separating them by a comma. "
"And include a brief description of each item."
),
}
],
):
response += msg_chunk["content"]
writer(msg_chunk)
return response
class State(TypedDict):
messages: Annotated[list[dict], operator.add]
# this is the tool-calling graph node
async def call_tool(state: State):
ai_message = state["messages"][-1]
tool_call = ai_message["tool_calls"][-1]
function_name = tool_call["function"]["name"]
if function_name != "get_items":
raise ValueError(f"Tool {function_name} not supported")
function_arguments = tool_call["function"]["arguments"]
arguments = json.loads(function_arguments)
function_response = await get_items(**arguments)
tool_message = {
"tool_call_id": tool_call["id"],
"role": "tool",
"name": function_name,
"content": function_response,
}
return {"messages": [tool_message]}
graph = (
StateGraph(State)
.add_node(call_tool)
.add_edge(START, "call_tool")
.compile()
)
AIMessage 调用图:Copy
inputs = {
"messages": [
{
"content": None,
"role": "assistant",
"tool_calls": [
{
"id": "1",
"function": {
"arguments": '{"place":"bedroom"}',
"name": "get_items",
},
"type": "function",
}
],
}
]
}
async for chunk in graph.astream(
inputs,
stream_mode="custom",
):
print(chunk["content"], end="|", flush=True)
为特定聊天模型禁用流式输出
如果你的应用程序混合使用了支持流式输出和不支持流式输出的模型,可能需要为不支持流式输出的模型显式禁用该功能。 初始化模型时设置streaming=False。
- init_chat_model
- Chat model interface
Copy
from langchain.chat_models import init_chat_model
model = init_chat_model(
"claude-sonnet-4-6",
# Set streaming=False to disable streaming for the chat model
streaming=False
)
Copy
from langchain_openai import ChatOpenAI
# Set streaming=False to disable streaming for the chat model
model = ChatOpenAI(model="o1-preview", streaming=False)
并非所有聊天模型集成都支持
streaming 参数。如果你的模型不支持该参数,请改用 disable_streaming=True。该参数通过基类在所有聊天模型上均可使用。Python < 3.11 的异步用法
在 Python < 3.11 版本中,asyncio 任务不支持context 参数。
这限制了 LangGraph 自动传播上下文的能力,并在两个关键方面影响 LangGraph 的流式机制:
- 必须显式将
RunnableConfig传递给异步 LLM 调用(例如ainvoke()),因为回调不会自动传播。 - 不能在异步节点或工具中使用
get_stream_writer——必须直接传入writer参数。
扩展示例:带手动 config 的异步 LLM 调用
扩展示例:带手动 config 的异步 LLM 调用
Copy
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain.chat_models import init_chat_model
model = init_chat_model(model="gpt-4.1-mini")
class State(TypedDict):
topic: str
joke: str
# Accept config as an argument in the async node function
async def call_model(state, config):
topic = state["topic"]
print("Generating joke...")
# Pass config to model.ainvoke() to ensure proper context propagation
joke_response = await model.ainvoke(
[{"role": "user", "content": f"Write a joke about {topic}"}],
config,
)
return {"joke": joke_response.content}
graph = (
StateGraph(State)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
# Set stream_mode="messages" to stream LLM tokens
async for chunk, metadata in graph.astream(
{"topic": "ice cream"},
stream_mode="messages",
):
if chunk.content:
print(chunk.content, end="|", flush=True)
扩展示例:使用流写入器的异步自定义流式输出
扩展示例:使用流写入器的异步自定义流式输出
Copy
from typing import TypedDict
from langgraph.types import StreamWriter
class State(TypedDict):
topic: str
joke: str
# Add writer as an argument in the function signature of the async node or tool
# LangGraph will automatically pass the stream writer to the function
async def generate_joke(state: State, writer: StreamWriter):
writer({"custom_key": "Streaming custom data while generating a joke"})
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(generate_joke)
.add_edge(START, "generate_joke")
.compile()
)
# Set stream_mode="custom" to receive the custom data in the stream #
async for chunk in graph.astream(
{"topic": "ice cream"},
stream_mode="custom",
):
print(chunk)
连接这些文档 到 Claude、VSCode 等,通过 MCP 获取实时答案。

