Skip to main content
深度智能体基于 LangGraph 的流式基础设施构建,并对子智能体流提供一流支持。当深度智能体将工作委托给子智能体时,您可以独立地从每个子智能体流式获取更新——实时跟踪进度、LLM token 和工具调用。 深度智能体流式传输的功能:

启用子图流式传输

深度智能体使用 LangGraph 的子图流式传输来呈现子智能体执行中的事件。要接收子智能体事件,请在流式传输时启用 stream_subgraphs
from deepagents import create_deep_agent

agent = create_deep_agent(
    system_prompt="You are a helpful research assistant",
    subagents=[
        {
            "name": "researcher",
            "description": "Researches a topic in depth",
            "system_prompt": "You are a thorough researcher.",
        },
    ],
)

for namespace, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Research quantum computing advances"}]},
    stream_mode="updates",
    subgraphs=True,
):
    if namespace:
        # Subagent event — namespace identifies the source
        print(f"[subagent: {namespace}]")
    else:
        # Main agent event
        print("[main agent]")
    print(chunk)

命名空间

启用 subgraphs 后,每个流式事件都包含一个命名空间,用于标识产生该事件的智能体。命名空间是节点名称和任务 ID 的路径,代表智能体层级结构。
命名空间来源
() (空)主智能体
("tools:abc123",)由主智能体 task 工具调用 abc123 派生的子智能体
("tools:abc123", "model_request:def456")子智能体内部的模型请求节点
使用命名空间将事件路由到正确的 UI 组件:
for namespace, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Plan my vacation"}]},
    stream_mode="updates",
    subgraphs=True,
):
    # Check if this event came from a subagent
    is_subagent = any(
        segment.startswith("tools:") for segment in namespace
    )

    if is_subagent:
        # Extract the tool call ID from the namespace
        tool_call_id = next(
            s.split(":")[1] for s in namespace if s.startswith("tools:")
        )
        print(f"Subagent {tool_call_id}: {chunk}")
    else:
        print(f"Main agent: {chunk}")

子智能体进度

使用 stream_mode="updates" 在每步完成时跟踪子智能体进度。这对于显示哪些子智能体处于活跃状态以及已完成哪些工作非常有用。
from deepagents import create_deep_agent

agent = create_deep_agent(
    system_prompt=(
        "You are a project coordinator. Always delegate research tasks "
        "to your researcher subagent using the task tool. Keep your final response to one sentence."
    ),
    subagents=[
        {
            "name": "researcher",
            "description": "Researches topics thoroughly",
            "system_prompt": (
                "You are a thorough researcher. Research the given topic "
                "and provide a concise summary in 2-3 sentences."
            ),
        },
    ],
)

for namespace, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Write a short summary about AI safety"}]},
    stream_mode="updates",
    subgraphs=True,
):
    # Main agent updates (empty namespace)
    if not namespace:
        for node_name, data in chunk.items():
            if node_name == "tools":
                # Subagent results returned to main agent
                for msg in data.get("messages", []):
                    if msg.type == "tool":
                        print(f"\nSubagent complete: {msg.name}")
                        print(f"  Result: {str(msg.content)[:200]}...")
            else:
                print(f"[main agent] step: {node_name}")

    # Subagent updates (non-empty namespace)
    else:
        for node_name, data in chunk.items():
            print(f"  [{namespace[0]}] step: {node_name}")
Output
[main agent] step: model_request
  [tools:call_abc123] step: model_request
  [tools:call_abc123] step: tools
  [tools:call_abc123] step: model_request

Subagent complete: task
  Result: ## AI Safety Report...
[main agent] step: model_request

LLM token

使用 stream_mode="messages" 从主智能体和子智能体流式获取单个 token。每个消息事件都包含标识来源智能体的元数据。
current_source = ""

for namespace, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Research quantum computing advances"}]},
    stream_mode="messages",
    subgraphs=True,
):
    token, metadata = chunk

    # Check if this event came from a subagent (namespace contains "tools:")
    is_subagent = any(s.startswith("tools:") for s in namespace)

    if is_subagent:
        # Token from a subagent
        subagent_ns = next(s for s in namespace if s.startswith("tools:"))
        if subagent_ns != current_source:
            print(f"\n\n--- [subagent: {subagent_ns}] ---")
            current_source = subagent_ns
        if token.content:
            print(token.content, end="", flush=True)
    else:
        # Token from the main agent
        if "main" != current_source:
            print("\n\n--- [main agent] ---")
            current_source = "main"
        if token.content:
            print(token.content, end="", flush=True)

print()

工具调用

当子智能体使用工具时,您可以流式获取工具调用事件,以显示每个子智能体正在执行的操作。工具调用块出现在 messages 流模式中。
for namespace, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Research recent quantum computing advances"}]},
    stream_mode="messages",
    subgraphs=True,
):
    token, metadata = chunk

    # Identify source: "main" or the subagent namespace segment
    is_subagent = any(s.startswith("tools:") for s in namespace)
    source = next((s for s in namespace if s.startswith("tools:")), "main") if is_subagent else "main"

    # Tool call chunks (streaming tool invocations)
    if token.tool_call_chunks:
        for tc in token.tool_call_chunks:
            if tc.get("name"):
                print(f"\n[{source}] Tool call: {tc['name']}")
            # Args stream in chunks — write them incrementally
            if tc.get("args"):
                print(tc["args"], end="", flush=True)

    # Tool results
    if token.type == "tool":
        print(f"\n[{source}] Tool result [{token.name}]: {str(token.content)[:150]}")

    # Regular AI content (skip tool call messages)
    if token.type == "ai" and token.content and not token.tool_call_chunks:
        print(token.content, end="", flush=True)

print()

自定义更新

在您的子智能体工具内部使用 @[get_stream_writer][langgraph.config.get_stream_writer] 发出自定义进度事件:
import time
from langchain.tools import tool
from langgraph.config import get_stream_writer
from deepagents import create_deep_agent


@tool
def analyze_data(topic: str) -> str:
    """Run a data analysis on a given topic.

    This tool performs the actual analysis and emits progress updates.
    You MUST call this tool for any analysis request.
    """
    writer = get_stream_writer()

    writer({"status": "starting", "topic": topic, "progress": 0})
    time.sleep(0.5)

    writer({"status": "analyzing", "progress": 50})
    time.sleep(0.5)

    writer({"status": "complete", "progress": 100})
    return (
        f'Analysis of "{topic}": Customer sentiment is 85% positive, '
        "driven by product quality and support response times."
    )


agent = create_deep_agent(
    system_prompt=(
        "You are a coordinator. For any analysis request, you MUST delegate "
        "to the analyst subagent using the task tool. Never try to answer directly. "
        "After receiving the result, summarize it in one sentence."
    ),
    subagents=[
        {
            "name": "analyst",
            "description": "Performs data analysis with real-time progress tracking",
            "system_prompt": (
                "You are a data analyst. You MUST call the analyze_data tool "
                "for every analysis request. Do not use any other tools. "
                "After the analysis completes, report the result."
            ),
            "tools": [analyze_data],
        },
    ],
)

for namespace, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Analyze customer satisfaction trends"}]},
    stream_mode="custom",
    subgraphs=True,
):
    is_subagent = any(s.startswith("tools:") for s in namespace)
    if is_subagent:
        subagent_ns = next(s for s in namespace if s.startswith("tools:"))
        print(f"[{subagent_ns}]", chunk)
    else:
        print("[main]", chunk)
Output
[tools:call_abc123] {'status': 'starting', 'topic': 'customer satisfaction trends', 'progress': 0}
[tools:call_abc123] {'status': 'analyzing', 'progress': 50}
[tools:call_abc123] {'status': 'complete', 'progress': 100}

组合多种流模式

组合多种流模式以全面了解智能体的执行情况:
# Skip internal middleware steps — only show meaningful node names
INTERESTING_NODES = {"model_request", "tools"}

last_source = ""
mid_line = False  # True when we've written tokens without a trailing newline

for namespace, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Analyze the impact of remote work on team productivity"}]},
    stream_mode=["updates", "messages", "custom"],
    subgraphs=True,
):
    mode, data = chunk[0], chunk[1]

    is_subagent = any(s.startswith("tools:") for s in namespace)
    source = "subagent" if is_subagent else "main"

    if mode == "updates":
        for node_name in data:
            if node_name not in INTERESTING_NODES:
                continue
            if mid_line:
                print()
                mid_line = False
            print(f"[{source}] step: {node_name}")

    elif mode == "messages":
        token, metadata = data
        if token.content:
            # Print a header when the source changes
            if source != last_source:
                if mid_line:
                    print()
                    mid_line = False
                print(f"\n[{source}] ", end="")
                last_source = source
            print(token.content, end="", flush=True)
            mid_line = True

    elif mode == "custom":
        if mid_line:
            print()
            mid_line = False
        print(f"[{source}] custom event:", data)

print()

常见模式

跟踪子智能体生命周期

监控子智能体的启动、运行和完成:
active_subagents = {}

for namespace, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Research the latest AI safety developments"}]},
    stream_mode="updates",
    subgraphs=True,
):
    for node_name, data in chunk.items():
        # ─── 阶段 1:检测子智能体启动 ────────────────────────
        # 当主智能体的 model_request 包含 task 工具调用时,
        # 表示子智能体已被派生。
        if not namespace and node_name == "model_request":
            for msg in data.get("messages", []):
                for tc in getattr(msg, "tool_calls", []):
                    if tc["name"] == "task":
                        active_subagents[tc["id"]] = {
                            "type": tc["args"].get("subagent_type"),
                            "description": tc["args"].get("description", "")[:80],
                            "status": "pending",
                        }
                        print(
                            f'[lifecycle] PENDING  → subagent "{tc["args"].get("subagent_type")}" '
                            f'({tc["id"]})'
                        )

        # ─── 阶段 2:检测子智能体运行 ─────────────────────────
        # 当我们从 tools:UUID 命名空间接收到事件时,
        # 该子智能体正在主动执行。
        if namespace and namespace[0].startswith("tools:"):
            pregel_id = namespace[0].split(":")[1]
            # 检查是否有待定的子智能体需要标记为运行中。
            # 注意:pregel 任务 ID 与 tool_call_id 不同,
            # 因此在第一个子智能体事件时将任何待定的子智能体标记为运行中。
            for sub_id, sub in active_subagents.items():
                if sub["status"] == "pending":
                    sub["status"] = "running"
                    print(
                        f'[lifecycle] RUNNING  → subagent "{sub["type"]}" '
                        f"(pregel: {pregel_id})"
                    )
                    break

        # ─── 阶段 3:检测子智能体完成 ──────────────────────
        # 当主智能体的 tools 节点返回工具消息时,
        # 子智能体已完成并返回结果。
        if not namespace and node_name == "tools":
            for msg in data.get("messages", []):
                if msg.type == "tool":
                    sub = active_subagents.get(msg.tool_call_id)
                    if sub:
                        sub["status"] = "complete"
                        print(
                            f'[lifecycle] COMPLETE → subagent "{sub["type"]}" '
                            f"({msg.tool_call_id})"
                        )
                        print(f"  Result preview: {str(msg.content)[:120]}...")

# Print final state
print("\n--- Final subagent states ---")
for sub_id, sub in active_subagents.items():
    print(f"  {sub['type']}: {sub['status']}")

相关资源