概述
LangChain 的流式处理系统允许你将代理运行的实时反馈呈现到你的应用程序中。 使用 LangChain 流式处理可以实现:- 流式传输代理进度—在每个代理步骤后获取状态更新。
- 流式传输大语言模型令牌—在生成时流式传输语言模型令牌。
- 流式传输思考/推理令牌—在生成时呈现模型推理过程。
- 流式传输自定义更新—发出用户定义的信号(例如,
"已获取 10/100 条记录")。 - 流式传输多种模式—从
updates(代理进度)、messages(大语言模型令牌 + 元数据)或custom(任意用户数据)中选择。
支持的流式模式
将以下一种或多种流式模式作为列表传递给stream 或 astream 方法:
| 模式 | 描述 |
|---|---|
updates | 在每个代理步骤后流式传输状态更新。如果在同一步骤中进行了多次更新(例如,运行了多个节点),这些更新将分别流式传输。 |
messages | 从任何调用了大语言模型的图节点流式传输 (token, metadata) 元组。 |
custom | 使用流式写入器从图节点内部流式传输自定义数据。 |
代理进度
要流式传输代理进度,请使用stream 或 astream 方法,并设置 stream_mode="updates"。这会在每个代理步骤后发出一个事件。
例如,如果你有一个调用一次工具的代理,你应该看到以下更新:
- 大语言模型节点:包含工具调用请求的
AIMessage - 工具节点:包含执行结果的
ToolMessage - 大语言模型节点:最终的 AI 响应
流式传输代理进度
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""获取给定城市的天气。"""
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="updates",
version="v2",
):
if chunk["type"] == "updates":
for step, data in chunk["data"].items():
print(f"step: {step}")
print(f"content: {data['messages'][-1].content_blocks}")
输出
step: model
content: [{'type': 'tool_call', 'name': 'get_weather', 'args': {'city': 'San Francisco'}, 'id': 'call_OW2NYNsNSKhRZpjW0wm2Aszd'}]
step: tools
content: [{'type': 'text', 'text': "It's always sunny in San Francisco!"}]
step: model
content: [{'type': 'text', 'text': 'It\'s always sunny in San Francisco!'}]
大语言模型令牌
要流式传输大语言模型生成的令牌,请使用stream_mode="messages"。下面你可以看到代理流式传输工具调用和最终响应的输出。
流式传输大语言模型令牌
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""获取给定城市的天气。"""
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="messages",
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
print(f"node: {metadata['langgraph_node']}")
print(f"content: {token.content_blocks}")
print("\n")
输出
node: model
content: [{'type': 'tool_call_chunk', 'id': 'call_vbCyBcP8VuneUzyYlSBZZsVa', 'name': 'get_weather', 'args': '', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': '{"', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': 'city', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': '":"', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': 'San', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': ' Francisco', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': '"}', 'index': 0}]
node: model
content: []
node: tools
content: [{'type': 'text', 'text': "It's always sunny in San Francisco!"}]
node: model
content: []
node: model
content: [{'type': 'text', 'text': 'Here'}]
node: model
content: [{'type': 'text', 'text': ''s'}]
node: model
content: [{'type': 'text', 'text': ' what'}]
node: model
content: [{'type': 'text', 'text': ' I'}]
node: model
content: [{'type': 'text', 'text': ' got'}]
node: model
content: [{'type': 'text', 'text': ':'}]
node: model
content: [{'type': 'text', 'text': ' "'}]
node: model
content: [{'type': 'text', 'text': "It's"}]
node: model
content: [{'type': 'text', 'text': ' always'}]
node: model
content: [{'type': 'text', 'text': ' sunny'}]
node: model
content: [{'type': 'text', 'text': ' in'}]
node: model
content: [{'type': 'text', 'text': ' San'}]
node: model
content: [{'type': 'text', 'text': ' Francisco'}]
node: model
content: [{'type': 'text', 'text': '!"\n\n'}]
自定义更新
要在工具执行时流式传输更新,你可以使用get_stream_writer。
流式传输自定义更新
from langchain.agents import create_agent
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""获取给定城市的天气。"""
writer = get_stream_writer()
# 流式传输任意数据
writer(f"正在查找城市数据: {city}")
writer(f"已获取城市数据: {city}")
return f"It's always sunny in {city}!"
agent = create_agent(
model="claude-sonnet-4-6",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="custom",
version="v2",
):
if chunk["type"] == "custom":
print(chunk["data"])
输出
正在查找城市数据: San Francisco
已获取城市数据: San Francisco
如果你在工具内部添加了
get_stream_writer,你将无法在 LangGraph 执行上下文之外调用该工具。流式传输多种模式
你可以通过将流式模式作为列表传递来指定多种流式模式:stream_mode=["updates", "custom"]。
每个流式传输的块都是一个 StreamPart 字典,包含 type、ns 和 data 键。使用 chunk["type"] 来确定流式模式,使用 chunk["data"] 来访问有效负载。
流式传输多种模式
from langchain.agents import create_agent
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""获取给定城市的天气。"""
writer = get_stream_writer()
writer(f"正在查找城市数据: {city}")
writer(f"已获取城市数据: {city}")
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode=["updates", "custom"],
version="v2",
):
print(f"stream_mode: {chunk['type']}")
print(f"content: {chunk['data']}")
print("\n")
输出
stream_mode: updates
content: {'model': {'messages': [AIMessage(content='', response_metadata={'token_usage': {'completion_tokens': 280, 'prompt_tokens': 132, 'total_tokens': 412, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 256, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-5-nano-2025-08-07', 'system_fingerprint': None, 'id': 'chatcmpl-C9tlgBzGEbedGYxZ0rTCz5F7OXpL7', 'service_tier': 'default', 'finish_reason': 'tool_calls', 'logprobs': None}, id='lc_run--480c07cb-e405-4411-aa7f-0520fddeed66-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'San Francisco'}, 'id': 'call_KTNQIftMrl9vgNwEfAJMVu7r', 'type': 'tool_call'}], usage_metadata={'input_tokens': 132, 'output_tokens': 280, 'total_tokens': 412, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 256}})]}}
stream_mode: custom
content: 正在查找城市数据: San Francisco
stream_mode: custom
content: 已获取城市数据: San Francisco
stream_mode: updates
content: {'tools': {'messages': [ToolMessage(content="It's always sunny in San Francisco!", name='get_weather', tool_call_id='call_KTNQIftMrl9vgNwEfAJMVu7r')]}}
stream_mode: updates
content: {'model': {'messages': [AIMessage(content='San Francisco weather: It\'s always sunny in San Francisco!\n\n', response_metadata={'token_usage': {'completion_tokens': 764, 'prompt_tokens': 168, 'total_tokens': 932, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 704, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-5-nano-2025-08-07', 'system_fingerprint': None, 'id': 'chatcmpl-C9tljDFVki1e1haCyikBptAuXuHYG', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='lc_run--acbc740a-18fe-4a14-8619-da92a0d0ee90-0', usage_metadata={'input_tokens': 168, 'output_tokens': 764, 'total_tokens': 932, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 704}})]}}
常见模式
以下是展示流式处理常见用例的示例。流式传输思考/推理令牌
一些模型在生成最终答案之前会执行内部推理。你可以通过过滤 标准内容块 中type 为 "reasoning" 的内容,在生成时流式传输这些思考/推理令牌。
要从代理流式传输思考令牌,请使用
stream_mode="messages" 并过滤推理内容块:
from langchain.agents import create_agent
from langchain.messages import AIMessageChunk
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables import Runnable
def get_weather(city: str) -> str:
"""获取给定城市的天气。"""
return f"{city}总是阳光明媚!"
model = ChatAnthropic(
model_name="claude-sonnet-4-6",
timeout=None,
stop=None,
thinking={"type": "enabled", "budget_tokens": 5000},
)
agent: Runnable = create_agent(
model=model,
tools=[get_weather],
)
for token, metadata in agent.stream(
{"messages": [{"role": "user", "content": "旧金山的天气怎么样?"}]},
stream_mode="messages",
):
if not isinstance(token, AIMessageChunk):
continue
reasoning = [b for b in token.content_blocks if b["type"] == "reasoning"]
text = [b for b in token.content_blocks if b["type"] == "text"]
if reasoning:
print(f"[思考] {reasoning[0]['reasoning']}", end="")
if text:
print(text[0]["text"], end="")
输出
[thinking] The user is asking about the weather in San Francisco. I have a tool
[thinking] available to get this information. Let me call the get_weather tool
[thinking] with "San Francisco" as the city parameter.
The weather in San Francisco is: It's always sunny in San Francisco!
content_blocks 属性将提供商特定的格式(Anthropic thinking 块、OpenAI reasoning 摘要等)标准化为标准的 "reasoning" 内容块类型。
要直接从聊天模型(不使用代理)流式传输推理令牌,请参阅使用聊天模型进行流式传输。
流式传输工具调用
你可能希望同时流式传输:- 在生成 工具调用 时的部分 JSON
- 已执行的、已解析的工具调用
stream_mode="messages" 将流式传输代理中所有大语言模型调用生成的增量消息块。要访问包含已解析工具调用的完整消息:
- 如果这些消息在状态中被跟踪(如
create_agent的模型节点中),请使用stream_mode=["messages", "updates"]通过状态更新访问完整消息(如下所示)。 - 如果这些消息未在状态中跟踪,请使用自定义更新或在流式循环中聚合块(下一节)。
如果你的代理包含多个大语言模型,请参阅下面关于从子代理流式传输的部分。
from typing import Any
from langchain.agents import create_agent
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
def get_weather(city: str) -> str:
"""获取给定城市的天气。"""
return f"It's always sunny in {city}!"
agent = create_agent("openai:gpt-5.4", tools=[get_weather])
def _render_message_chunk(token: AIMessageChunk) -> None:
if token.text:
print(token.text, end="|")
if token.tool_call_chunks:
print(token.tool_call_chunks)
# 注意:所有内容均可通过 token.content_blocks 访问
def _render_completed_message(message: AnyMessage) -> None:
if isinstance(message, AIMessage) and message.tool_calls:
print(f"工具调用: {message.tool_calls}")
if isinstance(message, ToolMessage):
print(f"工具响应: {message.content_blocks}")
input_message = {"role": "user", "content": "What is the weather in Boston?"}
for chunk in agent.stream(
{"messages": [input_message]},
stream_mode=["messages", "updates"],
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if isinstance(token, AIMessageChunk):
_render_message_chunk(token)
elif chunk["type"] == "updates":
for source, update in chunk["data"].items():
if source in ("model", "tools"): # `source` 捕获节点名称
_render_completed_message(update["messages"][-1])
输出
[{'name': 'get_weather', 'args': '', 'id': 'call_D3Orjr89KgsLTZ9hTzYv7Hpf', 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'city', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'Boston', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
工具调用: [{'name': 'get_weather', 'args': {'city': 'Boston'}, 'id': 'call_D3Orjr89KgsLTZ9hTzYv7Hpf', 'type': 'tool_call'}]
工具响应: [{'type': 'text', 'text': "It's always sunny in Boston!"}]
The| weather| in| Boston| is| **|sun|ny|**|.|
访问完整消息
在某些情况下,完整消息不会反映在状态更新中。如果你可以访问代理内部,你可以使用自定义更新在流式传输期间访问这些消息。否则,你可以在流式循环中聚合消息块(见下文)。 考虑下面的示例,我们将一个流式写入器集成到一个简化的护栏中间件中。这个中间件演示了工具调用以生成结构化的“安全/不安全”评估(也可以使用结构化输出来实现):from typing import Any, Literal
from langchain.agents.middleware import after_agent, AgentState
from langgraph.runtime import Runtime
from langchain.messages import AIMessage
from langchain.chat_models import init_chat_model
from langgraph.config import get_stream_writer
from pydantic import BaseModel
class ResponseSafety(BaseModel):
"""将响应评估为安全或不安全。"""
evaluation: Literal["safe", "unsafe"]
safety_model = init_chat_model("openai:gpt-5.4")
@after_agent(can_jump_to=["end"])
def safety_guardrail(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""基于模型的护栏:使用大语言模型评估响应安全性。"""
stream_writer = get_stream_writer()
# 获取模型响应
if not state["messages"]:
return None
last_message = state["messages"][-1]
if not isinstance(last_message, AIMessage):
return None
# 使用另一个模型评估安全性
model_with_tools = safety_model.bind_tools([ResponseSafety], tool_choice="any")
result = model_with_tools.invoke(
[
{
"role": "system",
"content": "将此 AI 响应评估为通常安全或不安全。"
},
{
"role": "user",
"content": f"AI 响应: {last_message.text}"
}
]
)
stream_writer(result)
tool_call = result.tool_calls[0]
if tool_call["args"]["evaluation"] == "unsafe":
last_message.content = "我无法提供该响应。请重新表述你的请求。"
return None
from typing import Any
from langchain.agents import create_agent
from langchain.messages import AIMessageChunk, AIMessage, AnyMessage
def get_weather(city: str) -> str:
"""获取给定城市的天气。"""
return f"It's always sunny in {city}!"
agent = create_agent(
model="openai:gpt-5.4",
tools=[get_weather],
middleware=[safety_guardrail],
)
def _render_message_chunk(token: AIMessageChunk) -> None:
if token.text:
print(token.text, end="|")
if token.tool_call_chunks:
print(token.tool_call_chunks)
def _render_completed_message(message: AnyMessage) -> None:
if isinstance(message, AIMessage) and message.tool_calls:
print(f"工具调用: {message.tool_calls}")
if isinstance(message, ToolMessage):
print(f"工具响应: {message.content_blocks}")
input_message = {"role": "user", "content": "What is the weather in Boston?"}
for chunk in agent.stream(
{"messages": [input_message]},
stream_mode=["messages", "updates", "custom"],
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if isinstance(token, AIMessageChunk):
_render_message_chunk(token)
elif chunk["type"] == "updates":
for source, update in chunk["data"].items():
if source in ("model", "tools"):
_render_completed_message(update["messages"][-1])
elif chunk["type"] == "custom":
# 在流中访问完整消息
print(f"工具调用: {chunk['data'].tool_calls}")
输出
[{'name': 'get_weather', 'args': '', 'id': 'call_je6LWgxYzuZ84mmoDalTYMJC', 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'city', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'Boston', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
工具调用: [{'name': 'get_weather', 'args': {'city': 'Boston'}, 'id': 'call_je6LWgxYzuZ84mmoDalTYMJC', 'type': 'tool_call'}]
工具响应: [{'type': 'text', 'text': "It's always sunny in Boston!"}]
The| weather| in| **|Boston|**| is| **|sun|ny|**|.|[{'name': 'ResponseSafety', 'args': '', 'id': 'call_O8VJIbOG4Q9nQF0T8ltVi58O', 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'evaluation', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'safe', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
工具调用: [{'name': 'ResponseSafety', 'args': {'evaluation': 'safe'}, 'id': 'call_O8VJIbOG4Q9nQF0T8ltVi58O', 'type': 'tool_call'}]
input_message = {"role": "user", "content": "What is the weather in Boston?"}
full_message = None
for chunk in agent.stream(
{"messages": [input_message]},
stream_mode=["messages", "updates"],
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if isinstance(token, AIMessageChunk):
_render_message_chunk(token)
full_message = token if full_message is None else full_message + token
if token.chunk_position == "last":
if full_message.tool_calls:
print(f"工具调用: {full_message.tool_calls}")
full_message = None
elif chunk["type"] == "updates":
for source, update in chunk["data"].items():
if source == "tools":
_render_completed_message(update["messages"][-1])
与人机交互一起流式传输
为了处理人机交互中断,我们基于上面的示例进行构建:- 我们使用人机交互中间件和检查点配置代理
- 我们收集在
"updates"流式模式期间生成的中断 - 我们使用命令响应这些中断
from typing import Any
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, Interrupt
def get_weather(city: str) -> str:
"""获取给定城市的天气。"""
return f"It's always sunny in {city}!"
checkpointer = InMemorySaver()
agent = create_agent(
"openai:gpt-5.4",
tools=[get_weather],
middleware=[
HumanInTheLoopMiddleware(interrupt_on={"get_weather": True}),
],
checkpointer=checkpointer,
)
def _render_message_chunk(token: AIMessageChunk) -> None:
if token.text:
print(token.text, end="|")
if token.tool_call_chunks:
print(token.tool_call_chunks)
def _render_completed_message(message: AnyMessage) -> None:
if isinstance(message, AIMessage) and message.tool_calls:
print(f"工具调用: {message.tool_calls}")
if isinstance(message, ToolMessage):
print(f"工具响应: {message.content_blocks}")
def _render_interrupt(interrupt: Interrupt) -> None:
interrupts = interrupt.value
for request in interrupts["action_requests"]:
print(request["description"])
input_message = {
"role": "user",
"content": (
"你能查一下波士顿和旧金山的天气吗?"
),
}
config = {"configurable": {"thread_id": "some_id"}}
interrupts = []
for chunk in agent.stream(
{"messages": [input_message]},
config=config,
stream_mode=["messages", "updates"],
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if isinstance(token, AIMessageChunk):
_render_message_chunk(token)
elif chunk["type"] == "updates":
for source, update in chunk["data"].items():
if source in ("model", "tools"):
_render_completed_message(update["messages"][-1])
if source == "__interrupt__":
interrupts.extend(update)
_render_interrupt(update[0])
输出
[{'name': 'get_weather', 'args': '', 'id': 'call_GOwNaQHeqMixay2qy80padfE', 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '{"ci', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'ty": ', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '"Bosto', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'n"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': 'get_weather', 'args': '', 'id': 'call_Ndb4jvWm2uMA0JDQXu37wDH6', 'index': 1, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '{"ci', 'id': None, 'index': 1, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'ty": ', 'id': None, 'index': 1, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '"San F', 'id': None, 'index': 1, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'ranc', 'id': None, 'index': 1, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'isco"', 'id': None, 'index': 1, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '}', 'id': None, 'index': 1, 'type': 'tool_call_chunk'}]
工具调用: [{'name': 'get_weather', 'args': {'city': 'Boston'}, 'id': 'call_GOwNaQHeqMixay2qy80padfE', 'type': 'tool_call'}, {'name': 'get_weather', 'args': {'city': 'San Francisco'}, 'id': 'call_Ndb4jvWm2uMA0JDQXu37wDH6', 'type': 'tool_call'}]
工具执行需要批准
工具: get_weather
参数: {'city': 'Boston'}
工具执行需要批准
工具: get_weather
参数: {'city': 'San Francisco'}
def _get_interrupt_decisions(interrupt: Interrupt) -> list[dict]:
return [
{
"type": "edit",
"edited_action": {
"name": "get_weather",
"args": {"city": "Boston, U.K."},
},
}
if "boston" in request["description"].lower()
else {"type": "approve"}
for request in interrupt.value["action_requests"]
]
decisions = {}
for interrupt in interrupts:
decisions[interrupt.id] = {
"decisions": _get_interrupt_decisions(interrupt)
}
decisions
输出
{
'a96c40474e429d661b5b32a8d86f0f3e': {
'decisions': [
{
'type': 'edit',
'edited_action': {
'name': 'get_weather',
'args': {'city': 'Boston, U.K.'}
}
},
{'type': 'approve'},
]
}
}
interrupts = []
for chunk in agent.stream(
Command(resume=decisions),
config=config,
stream_mode=["messages", "updates"],
version="v2",
):
# 流式循环保持不变
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if isinstance(token, AIMessageChunk):
_render_message_chunk(token)
elif chunk["type"] == "updates":
for source, update in chunk["data"].items():
if source in ("model", "tools"):
_render_completed_message(update["messages"][-1])
if source == "__interrupt__":
interrupts.extend(update)
_render_interrupt(update[0])
输出
工具响应: [{'type': 'text', 'text': "It's always sunny in Boston, U.K.!"}]
工具响应: [{'type': 'text', 'text': "It's always sunny in San Francisco!"}]
-| **|Boston|**|:| It|'s| always| sunny| in| Boston|,| U|.K|.|
|-| **|San| Francisco|**|:| It|'s| always| sunny| in| San| Francisco|!|
从子代理流式传输
当代理中的任何点存在多个大语言模型时,通常需要在生成消息时区分消息的来源。 为此,在创建每个代理时传递一个name。然后,在以 "messages" 模式流式传输时,该名称可通过元数据中的 lc_agent_name 键获得。
下面,我们更新流式传输工具调用示例:
- 我们将工具替换为一个
call_weather_agent工具,该工具内部调用一个代理 - 我们为每个代理添加一个
name - 我们在创建流时指定
subgraphs=True - 我们的流处理与之前相同,但我们添加了逻辑以使用
create_agent的name参数跟踪哪个代理处于活动状态
当你为代理设置
name 时,该名称也会附加到该代理生成的任何 AIMessage 上。from typing import Any
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.messages import AIMessage, AnyMessage
def get_weather(city: str) -> str:
"""获取给定城市的天气。"""
return f"It's always sunny in {city}!"
weather_model = init_chat_model("openai:gpt-5.4")
weather_agent = create_agent(
model=weather_model,
tools=[get_weather],
name="weather_agent",
)
def call_weather_agent(query: str) -> str:
"""查询天气代理。"""
result = weather_agent.invoke({
"messages": [{"role": "user", "content": query}]
})
return result["messages"][-1].text
supervisor_model = init_chat_model("openai:gpt-5.4")
agent = create_agent(
model=supervisor_model,
tools=[call_weather_agent],
name="supervisor",
)
def _render_message_chunk(token: AIMessageChunk) -> None:
if token.text:
print(token.text, end="|")
if token.tool_call_chunks:
print(token.tool_call_chunks)
def _render_completed_message(message: AnyMessage) -> None:
if isinstance(message, AIMessage) and message.tool_calls:
print(f"工具调用: {message.tool_calls}")
if isinstance(message, ToolMessage):
print(f"工具响应: {message.content_blocks}")
input_message = {"role": "user", "content": "What is the weather in Boston?"}
current_agent = None
for chunk in agent.stream(
{"messages": [input_message]},
stream_mode=["messages", "updates"],
subgraphs=True,
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if agent_name := metadata.get("lc_agent_name"):
if agent_name != current_agent:
print(f"🤖 {agent_name}: ")
current_agent = agent_name
if isinstance(token, AIMessage):
_render_message_chunk(token)
elif chunk["type"] == "updates":
for source, update in chunk["data"].items():
if source in ("model", "tools"):
_render_completed_message(update["messages"][-1])
输出
🤖 supervisor:
[{'name': 'call_weather_agent', 'args': '', 'id': 'call_asorzUf0mB6sb7MiKfgojp7I', 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'query', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'Boston', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': ' weather', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': ' right', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': ' now', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': ' and', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': " today's", 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': ' forecast', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
工具调用: [{'name': 'call_weather_agent', 'args': {'query': "Boston weather right now and today's forecast"}, 'id': 'call_asorzUf0mB6sb7MiKfgojp7I', 'type': 'tool_call'}]
🤖 weather_agent:
[{'name': 'get_weather', 'args': '', 'id': 'call_LZ89lT8fW6w8vqck5pZeaDIx', 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'city', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'Boston', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
工具调用: [{'name': 'get_weather', 'args': {'city': 'Boston'}, 'id': 'call_LZ89lT8fW6w8vqck5pZeaDIx', 'type': 'tool_call'}]
工具响应: [{'type': 'text', 'text': "It's always sunny in Boston!"}]
Boston| weather| right| now|:| **|Sunny|**|.
|Today|'s| forecast| for| Boston|:| **|Sunny| all| day|**|.|工具响应: [{'type': 'text', 'text': 'Boston weather right now: **Sunny**.\n\nToday\'s forecast for Boston: **Sunny all day**.'}]
🤖 supervisor:
Boston| weather| right| now|:| **|Sunny|**|.
|Today|'s| forecast| for| Boston|:| **|Sunny| all| day|**|.|
禁用流式传输
在某些应用程序中,你可能需要禁用给定模型的单个令牌流式传输。这在以下情况下很有用: 在初始化模型时设置streaming=False。
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="gpt-5.4",
streaming=False
)
部署到 LangSmith 时,在任何不希望其输出流式传输到客户端的模型上设置
streaming=False。这在部署前在你的图代码中配置。并非所有聊天模型集成都支持
streaming 参数。如果你的模型不支持,请改用 disable_streaming=True。此参数通过基类在所有聊天模型上可用。v2 流式格式
需要 LangGraph >= 1.1。
version="v2" 传递给 stream() 或 astream() 以获得统一的输出格式。每个块都是一个 StreamPart 字典,包含 type、ns 和 data 键——无论流式模式或模式数量如何,形状都相同:
# 统一格式 - 不再需要元组解包
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode=["updates", "custom"],
version="v2",
):
print(chunk["type"]) # "updates" 或 "custom"
print(chunk["data"]) # 有效负载
invoke()——它返回一个 GraphOutput 对象,具有 .value 和 .interrupts 属性,清晰地将状态与中断元数据分开:
result = agent.invoke(
{"messages": [{"role": "user", "content": "Hello"}]},
version="v2",
)
print(result.value) # 状态(字典、Pydantic 模型或 dataclass)
print(result.interrupts) # Interrupt 对象的元组(如果没有则为空)
相关内容
- 前端流式传输—使用
useStream构建 React UI 以实现实时代理交互 - 使用聊天模型进行流式传输—直接从聊天模型流式传输令牌,无需使用代理或图
- 使用聊天模型进行推理—配置和访问聊天模型的推理输出
- 标准内容块—了解用于推理、文本和其他内容类型的标准化内容块格式
- 与人机交互一起流式传输—在处理中断以供人工审查时流式传输代理进度
- LangGraph 流式传输—高级流式传输选项,包括
values、debug模式和子图流式传输
将这些文档通过 MCP 连接到 Claude、VSCode 等,以获取实时答案。

