Skip to main content
Deep Agents 基于 LangGraph 的流式传输基础设施构建,并提供对子智能体流的一等支持。当深度智能体将工作委派给子智能体时,您可以独立流式传输每个子智能体的更新——实时跟踪进度、LLM token 和工具调用。 深度智能体流式传输可实现的功能:

启用子图流式传输

Deep Agents 使用 LangGraph 的子图流式传输来呈现子智能体执行中的事件。要接收子智能体事件,请在流式传输时启用 stream_subgraphs
from deepagents import create_deep_agent

agent = create_deep_agent(
    model="google_genai:gemini-3.1-pro-preview",
    system_prompt="You are a helpful research assistant",
    subagents=[
        {
            "name": "researcher",
            "description": "Researches a topic in depth",
            "system_prompt": "You are a thorough researcher.",
        },
    ],
)

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Research quantum computing advances"}]},
    stream_mode="updates",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "updates":
        if chunk["ns"]:
            # 子智能体事件 - 命名空间标识来源
            print(f"[subagent: {chunk['ns']}]")
        else:
            # 主智能体事件
            print("[main agent]")
        print(chunk["data"])

命名空间

当启用 subgraphs 时,每个流式传输事件都包含一个命名空间,用于标识是哪个智能体生成的。命名空间是表示智能体层级的节点名称和任务 ID 的路径。
命名空间来源
() (空)主智能体
("tools:abc123",)由主智能体的 task 工具调用 abc123 生成的子智能体
("tools:abc123", "model_request:def456")子智能体内部的模型请求节点
使用命名空间将事件路由到正确的 UI 组件:
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Plan my vacation"}]},
    stream_mode="updates",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "updates":
        # 检查此事件是否来自子智能体
        is_subagent = any(
            segment.startswith("tools:") for segment in chunk["ns"]
        )

        if is_subagent:
            # 从命名空间提取工具调用 ID
            tool_call_id = next(
                s.split(":")[1] for s in chunk["ns"] if s.startswith("tools:")
            )
            print(f"Subagent {tool_call_id}: {chunk['data']}")
        else:
            print(f"Main agent: {chunk['data']}")

子智能体进度

使用 stream_mode="updates" 跟踪每个步骤完成时的子智能体进度。这对于显示哪些子智能体处于活动状态以及它们完成了什么工作很有用。
from deepagents import create_deep_agent

agent = create_deep_agent(
    model="google_genai:gemini-3.1-pro-preview",
    system_prompt=(
        "You are a project coordinator. Always delegate research tasks "
        "to your researcher subagent using the task tool. Keep your final response to one sentence."
    ),
    subagents=[
        {
            "name": "researcher",
            "description": "Researches topics thoroughly",
            "system_prompt": (
                "You are a thorough researcher. Research the given topic "
                "and provide a concise summary in 2-3 sentences."
            ),
        },
    ],
)

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Write a short summary about AI safety"}]},
    stream_mode="updates",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "updates":
        # 主智能体更新(空命名空间)
        if not chunk["ns"]:
            for node_name, data in chunk["data"].items():
                if node_name == "tools":
                    # 返回给主智能体的子智能体结果
                    for msg in data.get("messages", []):
                        if msg.type == "tool":
                            print(f"\nSubagent complete: {msg.name}")
                            print(f"  Result: {str(msg.content)[:200]}...")
                else:
                    print(f"[main agent] step: {node_name}")

        # 子智能体更新(非空命名空间)
        else:
            for node_name, data in chunk["data"].items():
                print(f"  [{chunk['ns'][0]}] step: {node_name}")
输出
[main agent] step: model_request
  [tools:call_abc123] step: model_request
  [tools:call_abc123] step: tools
  [tools:call_abc123] step: model_request

Subagent complete: task
  Result: ## AI Safety Report...
[main agent] step: model_request

LLM token

使用 stream_mode="messages" 流式传输来自主智能体和子智能体的单个 token。每个消息事件都包含标识来源智能体的元数据。
current_source = ""

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Research quantum computing advances"}]},
    stream_mode="messages",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]

        # 检查此事件是否来自子智能体(命名空间包含 "tools:")
        is_subagent = any(s.startswith("tools:") for s in chunk["ns"])

        if is_subagent:
            # 来自子智能体的 token
            subagent_ns = next(s for s in chunk["ns"] if s.startswith("tools:"))
            if subagent_ns != current_source:
                print(f"\n\n--- [subagent: {subagent_ns}] ---")
                current_source = subagent_ns
            if token.content:
                print(token.content, end="", flush=True)
        else:
            # 来自主智能体的 token
            if "main" != current_source:
                print("\n\n--- [main agent] ---")
                current_source = "main"
            if token.content:
                print(token.content, end="", flush=True)

print()

工具调用

当子智能体使用工具时,您可以流式传输工具调用事件以显示每个子智能体正在做什么。工具调用块出现在 messages 流模式中。
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Research recent quantum computing advances"}]},
    stream_mode="messages",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]

        # 识别来源:"main" 或子智能体命名空间段
        is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
        source = next((s for s in chunk["ns"] if s.startswith("tools:")), "main") if is_subagent else "main"

        # 工具调用块(流式传输工具调用)
        if token.tool_call_chunks:
            for tc in token.tool_call_chunks:
                if tc.get("name"):
                    print(f"\n[{source}] Tool call: {tc['name']}")
                # 参数字流式传输 - 增量写入
                if tc.get("args"):
                    print(tc["args"], end="", flush=True)

        # 工具结果
        if token.type == "tool":
            print(f"\n[{source}] Tool result [{token.name}]: {str(token.content)[:150]}")

        # 常规 AI 内容(跳过工具调用消息)
        if token.type == "ai" and token.content and not token.tool_call_chunks:
            print(token.content, end="", flush=True)

print()

自定义更新

在子智能体工具内部使用 get_stream_writer 发出自定义进度事件:
import time
from langchain.tools import tool
from langgraph.config import get_stream_writer
from deepagents import create_deep_agent


@tool
def analyze_data(topic: str) -> str:
    """对给定主题运行数据分析。

    此工具执行实际分析并发出进度更新。
    对于任何分析请求,您必须调用此工具。
    """
    writer = get_stream_writer()

    writer({"status": "starting", "topic": topic, "progress": 0})
    time.sleep(0.5)

    writer({"status": "analyzing", "progress": 50})
    time.sleep(0.5)

    writer({"status": "complete", "progress": 100})
    return (
        f'Analysis of "{topic}": Customer sentiment is 85% positive, '
        "driven by product quality and support response times."
    )


agent = create_deep_agent(
    model="google_genai:gemini-3.1-pro-preview",
    system_prompt=(
        "You are a coordinator. For any analysis request, you MUST delegate "
        "to the analyst subagent using the task tool. Never try to answer directly. "
        "After receiving the result, summarize it in one sentence."
    ),
    subagents=[
        {
            "name": "analyst",
            "description": "Performs data analysis with real-time progress tracking",
            "system_prompt": (
                "You are a data analyst. You MUST call the analyze_data tool "
                "for every analysis request. Do not use any other tools. "
                "After the analysis completes, report the result."
            ),
            "tools": [analyze_data],
        },
    ],
)

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Analyze customer satisfaction trends"}]},
    stream_mode="custom",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "custom":
        is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
        if is_subagent:
            subagent_ns = next(s for s in chunk["ns"] if s.startswith("tools:"))
            print(f"[{subagent_ns}]", chunk["data"])
        else:
            print("[main]", chunk["data"])
输出
[tools:call_abc123] {'status': 'starting', 'topic': 'customer satisfaction trends', 'progress': 0}
[tools:call_abc123] {'status': 'analyzing', 'progress': 50}
[tools:call_abc123] {'status': 'complete', 'progress': 100}

流式传输多种模式

组合多种流式传输模式以获取智能体执行的完整画面:
# 跳过内部中间件步骤 - 仅显示有意义的节点名称
INTERESTING_NODES = {"model_request", "tools"}

last_source = ""
mid_line = False  # 当我们写入 token 但没有尾随换行符时为 True

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Analyze the impact of remote work on team productivity"}]},
    stream_mode=["updates", "messages", "custom"],
    subgraphs=True,
    version="v2",
):
    is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
    source = "subagent" if is_subagent else "main"

    if chunk["type"] == "updates":
        for node_name in chunk["data"]:
            if node_name not in INTERESTING_NODES:
                continue
            if mid_line:
                print()
                mid_line = False
            print(f"[{source}] step: {node_name}")

    elif chunk["type"] == "messages":
        token, metadata = chunk["data"]
        if token.content:
            # 当来源更改时打印标题
            if source != last_source:
                if mid_line:
                    print()
                    mid_line = False
                print(f"\n[{source}] ", end="")
                last_source = source
            print(token.content, end="", flush=True)
            mid_line = True

    elif chunk["type"] == "custom":
        if mid_line:
            print()
            mid_line = False
        print(f"[{source}] custom event:", chunk["data"])

print()

常见模式

跟踪子智能体生命周期

监控子智能体何时启动、运行和完成:
active_subagents = {}

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Research the latest AI safety developments"}]},
    stream_mode="updates",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "updates":
        for node_name, data in chunk["data"].items():
            # ─── 阶段 1:检测子智能体启动 ────────────────────────
            # 当主智能体的 model_request 包含任务工具调用时,
            # 已生成子智能体。
            if not chunk["ns"] and node_name == "model_request":
                for msg in data.get("messages", []):
                    for tc in getattr(msg, "tool_calls", []):
                        if tc["name"] == "task":
                            active_subagents[tc["id"]] = {
                                "type": tc["args"].get("subagent_type"),
                                "description": tc["args"].get("description", "")[:80],
                                "status": "pending",
                            }
                            print(
                                f'[lifecycle] PENDING  → subagent "{tc["args"].get("subagent_type")}" '
                                f'({tc["id"]})'
                            )

            # ─── 阶段 2:检测子智能体运行 ─────────────────────────
            # 当我们收到来自 tools:UUID 命名空间的事件时,该
            # 子智能体正在积极执行。
            if chunk["ns"] and chunk["ns"][0].startswith("tools:"):
                pregel_id = chunk["ns"][0].split(":")[1]
                # 检查是否有任何待处理的子智能体需要标记为运行中。
                # 注意:pregel 任务 ID 与 tool_call_id 不同,
                # 因此我们在第一个子智能体事件时将任何待处理的子智能体标记为运行中。
                for sub_id, sub in active_subagents.items():
                    if sub["status"] == "pending":
                        sub["status"] = "running"
                        print(
                            f'[lifecycle] RUNNING  → subagent "{sub["type"]}" '
                            f"(pregel: {pregel_id})"
                        )
                        break

            # ─── 阶段 3:检测子智能体完成 ──────────────────────
            # 当主智能体的 tools 节点返回工具消息时,
            # 子智能体已完成并返回其结果。
            if not chunk["ns"] and node_name == "tools":
                for msg in data.get("messages", []):
                    if msg.type == "tool":
                        sub = active_subagents.get(msg.tool_call_id)
                        if sub:
                            sub["status"] = "complete"
                            print(
                                f'[lifecycle] COMPLETE → subagent "{sub["type"]}" '
                                f"({msg.tool_call_id})"
                            )
                            print(f"  Result preview: {str(msg.content)[:120]}...")

# 打印最终状态
print("\n--- Final subagent states ---")
for sub_id, sub in active_subagents.items():
    print(f"  {sub['type']}: {sub['status']}")

v2 流式传输格式

需要 LangGraph >= 1.1。
本页面上的所有示例均使用 v2 流式传输格式(version="v2"),这是推荐的方法。每个块都是一个带有 typensdata 键的 StreamPart 字典——无论流模式、模式数量或子图设置如何,形状都相同。 v2 格式消除了嵌套元组解包,使得在 Deep Agents 中处理子图流式传输变得简单。比较两种格式:
# 统一格式 — 无嵌套元组解包
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Research quantum computing"}]},
    stream_mode=["updates", "messages", "custom"],
    subgraphs=True,
    version="v2",
):
    print(chunk["type"])  # "updates", "messages", or "custom"
    print(chunk["ns"])    # () for main agent, ("tools:<id>",) for subagent
    print(chunk["data"])  # 负载
请参阅 LangGraph 流式传输文档 以获取有关 v2 格式的更多详细信息,包括类型窄化和 Pydantic/dataclass 转换。

相关内容