Skip to main content
useStream React Hook 提供了与 LangGraph 流式功能的无缝集成。它处理流式传输、状态管理和分支逻辑的所有复杂性,让你专注于构建出色的生成式 UI 体验。 主要特性:
  • 消息流式传输 — 处理消息块流以形成完整消息
  • 自动状态管理 — 管理消息、中断、加载状态和错误
  • 对话分支 — 从聊天历史的任意位置创建备用对话路径
  • UI 无关设计 — 使用你自己的组件和样式

安装

在你的 React 应用中安装 LangGraph SDK 以使用 useStream Hook:

基本用法

useStream Hook 可连接到任何 LangGraph 图,无论是运行在你自己的端点,还是通过 LangSmith 部署 托管的图。
import { useStream } from "@langchain/langgraph-sdk/react";

function Chat() {
  const stream = useStream({
    assistantId: "agent",
    // Local development
    apiUrl: "http://localhost:2024",
    // Production deployment (LangSmith hosted)
    // apiUrl: "https://your-deployment.us.langgraph.app"
  });

  const handleSubmit = (message: string) => {
    stream.submit({
      messages: [
        { content: message, type: "human" }
      ],
    });
  };

  return (
    <div>
      {stream.messages.map((message, idx) => (
        <div key={message.id ?? idx}>
          {message.type}: {message.content}
        </div>
      ))}

      {stream.isLoading && <div>Loading...</div>}
      {stream.error && <div>Error: {stream.error.message}</div>}
    </div>
  );
}
了解如何将代理部署到 LangSmith,以获得具有内置可观测性、认证和扩展能力的生产就绪托管。
assistantId
string
required
要连接的代理 ID。使用 LangSmith 部署时,此 ID 必须与部署仪表板中显示的代理 ID 一致。对于自定义 API 部署或本地开发,可以是服务器用于标识代理的任意字符串。
apiUrl
string
Agent Server 的 URL。本地开发默认为 http://localhost:2024
apiKey
string
用于身份验证的 API 密钥。连接到 LangSmith 上部署的代理时需要。
threadId
string
连接到已有线程而非新建线程。适用于恢复对话。
onThreadId
(id: string) => void
新线程创建时调用的回调。用于持久化线程 ID 以备后用。
reconnectOnMount
boolean | (() => Storage)
组件挂载时自动恢复正在进行的运行。设为 true 使用 session storage,或提供自定义存储函数。
onCreated
(run: Run) => void
新运行创建时调用的回调。适用于持久化运行元数据以便恢复。
onError
(error: Error) => void
流式传输期间发生错误时调用的回调。
onFinish
(state: StateType, run?: Run) => void
流式传输成功完成并返回最终状态时调用的回调。
onCustomEvent
(data: unknown, context: { mutate }) => void
处理代理通过 writer 发出的自定义事件。参见自定义流式事件
onUpdateEvent
(data: unknown, context: { mutate }) => void
处理每个图步骤后的状态更新事件。
onMetadataEvent
(metadata: { run_id, thread_id }) => void
处理包含运行和线程信息的元数据事件。
messagesKey
string
default:"messages"
图状态中包含消息数组的键。
throttle
boolean
default:"true"
批量处理状态更新以提升渲染性能。禁用后可立即更新。
initialValues
StateType | null
首次流式加载时显示的初始状态值。适用于立即展示缓存的线程数据。
messages
Message[]
当前线程中的所有消息,包括人类消息和 AI 消息。
values
StateType
当前图状态值。类型根据代理或图类型参数推断。
isLoading
boolean
是否正在进行流式传输。用于显示加载指示器。
error
Error | null
流式传输期间发生的错误。无错误时为 null
interrupt
Interrupt | undefined
需要用户输入的当前中断,例如人机协作的审批请求。
toolCalls
ToolCallWithResult[]
所有消息中的工具调用,包含结果和状态(pendingcompletederror)。
submit
(input, options?) => Promise<void>
向代理提交新输入。从中断恢复时可将输入传为 null 并附带命令。选项包括用于分支的 checkpoint、乐观更新的 optimisticValues 以及乐观线程创建的 threadId
stop
() => void
立即停止当前流式传输。
joinStream
(runId: string) => void
通过运行 ID 恢复已有流式传输。配合 onCreated 实现手动流式恢复。
setBranch
(branch: string) => void
切换到对话历史中的不同分支。
getToolCalls
(message) => ToolCall[]
获取特定 AI 消息的所有工具调用。
getMessagesMetadata
(message) => MessageMetadata
获取消息的元数据,包括流式信息(如用于识别来源节点的 langgraph_node)和用于分支的 firstSeenState
experimental_branchTree
BranchTree
线程的树形表示,适用于非消息图的高级分支控制。

线程管理

通过内置线程管理跟踪对话。你可以访问当前线程 ID,并在新线程创建时收到通知:
import { useState } from "react";
import { useStream } from "@langchain/langgraph-sdk/react";

function Chat() {
  const [threadId, setThreadId] = useState<string | null>(null);

  const stream = useStream({
    apiUrl: "http://localhost:2024",
    assistantId: "agent",
    threadId: threadId,
    onThreadId: setThreadId,
  });

  // threadId is updated when a new thread is created
  // Store it in URL params or localStorage for persistence
}
我们建议存储 threadId,以便用户在页面刷新后可以恢复对话。

页面刷新后恢复

useStream Hook 可在组件挂载时通过设置 reconnectOnMount: true 自动恢复正在进行的运行。这对于在页面刷新后继续流式传输非常有用,确保下线期间生成的消息和事件不会丢失。
const stream = useStream({
  apiUrl: "http://localhost:2024",
  assistantId: "agent",
  reconnectOnMount: true,
});
默认情况下,创建的运行 ID 存储在 window.sessionStorage 中,可以通过传入自定义存储函数来替换:
const stream = useStream({
  apiUrl: "http://localhost:2024",
  assistantId: "agent",
  reconnectOnMount: () => window.localStorage,
});
如需手动控制恢复过程,可使用运行回调持久化元数据,并通过 joinStream 恢复:
import { useStream } from "@langchain/langgraph-sdk/react";
import { useEffect, useRef } from "react";

function Chat({ threadId }: { threadId: string | null }) {
  const stream = useStream({
    apiUrl: "http://localhost:2024",
    assistantId: "agent",
    threadId,
    onCreated: (run) => {
      // Persist run ID when stream starts
      window.sessionStorage.setItem(`resume:${run.thread_id}`, run.run_id);
    },
    onFinish: (_, run) => {
      // Clean up when stream completes
      window.sessionStorage.removeItem(`resume:${run?.thread_id}`);
    },
  });

  // Resume stream on mount if there's a stored run ID
  const joinedThreadId = useRef<string | null>(null);
  useEffect(() => {
    if (!threadId) return;
    const runId = window.sessionStorage.getItem(`resume:${threadId}`);
    if (runId && joinedThreadId.current !== threadId) {
      stream.joinStream(runId);
      joinedThreadId.current = threadId;
    }
  }, [threadId]);

  const handleSubmit = (text: string) => {
    // Use streamResumable to ensure events aren't lost
    stream.submit(
      { messages: [{ type: "human", content: text }] },
      { streamResumable: true }
    );
  };
}

查看会话持久化示例

session-persistence 示例中查看使用 reconnectOnMount 和线程持久化实现流式恢复的完整实现。

乐观更新

你可以在执行网络请求之前乐观地更新客户端状态,立即向用户提供反馈:
const stream = useStream({
  apiUrl: "http://localhost:2024",
  assistantId: "agent",
});

const handleSubmit = (text: string) => {
  const newMessage = { type: "human" as const, content: text };

  stream.submit(
    { messages: [newMessage] },
    {
      optimisticValues(prev) {
        const prevMessages = prev.messages ?? [];
        return { ...prev, messages: [...prevMessages, newMessage] };
      },
    }
  );
};

乐观线程创建

submit 中使用 threadId 选项,可以在线程创建前预先知道线程 ID,实现乐观 UI 模式:
import { useState } from "react";
import { useStream } from "@langchain/langgraph-sdk/react";

function Chat() {
  const [threadId, setThreadId] = useState<string | null>(null);
  const [optimisticThreadId] = useState(() => crypto.randomUUID());

  const stream = useStream({
    apiUrl: "http://localhost:2024",
    assistantId: "agent",
    threadId,
    onThreadId: setThreadId,
  });

  const handleSubmit = (text: string) => {
    // Navigate immediately without waiting for thread creation
    window.history.pushState({}, "", `/threads/${optimisticThreadId}`);

    // Create thread with the predetermined ID
    stream.submit(
      { messages: [{ type: "human", content: text }] },
      { threadId: optimisticThreadId }
    );
  };
}

缓存线程展示

使用 initialValues 选项在从服务器加载历史记录期间立即显示缓存的线程数据:
function Chat({ threadId, cachedData }) {
  const stream = useStream({
    apiUrl: "http://localhost:2024",
    assistantId: "agent",
    threadId,
    initialValues: cachedData?.values,
  });

  // Shows cached messages instantly, then updates when server responds
}

分支

通过编辑历史消息或重新生成 AI 回复来创建备用对话路径。使用 getMessagesMetadata() 访问用于分支的检查点信息:
import { useStream } from "@langchain/langgraph-sdk/react";
import { BranchSwitcher } from "./BranchSwitcher";

function Chat() {
  const stream = useStream({
    apiUrl: "http://localhost:2024",
    assistantId: "agent",
  });

  return (
    <div>
      {stream.messages.map((message) => {
        const meta = stream.getMessagesMetadata(message);
        const parentCheckpoint = meta?.firstSeenState?.parent_checkpoint;

        return (
          <div key={message.id}>
            <div>{message.content as string}</div>

            {/* Edit human messages */}
            {message.type === "human" && (
              <button
                onClick={() => {
                  const newContent = prompt("Edit message:", message.content as string);
                  if (newContent) {
                    stream.submit(
                      { messages: [{ type: "human", content: newContent }] },
                      { checkpoint: parentCheckpoint }
                    );
                  }
                }}
              >
                Edit
              </button>
            )}

            {/* Regenerate AI messages */}
            {message.type === "ai" && (
              <button
                onClick={() => stream.submit(undefined, { checkpoint: parentCheckpoint })}
              >
                Regenerate
              </button>
            )}

            {/* Switch between branches */}
            <BranchSwitcher
              branch={meta?.branch}
              branchOptions={meta?.branchOptions}
              onSelect={(branch) => stream.setBranch(branch)}
            />
          </div>
        );
      })}
    </div>
  );
}
对于高级用例,可使用 experimental_branchTree 属性获取线程的树形表示,适用于非消息图。

查看分支示例

branching-chat 示例中查看包含编辑、重新生成和分支切换的完整对话分支实现。

类型安全流式传输

useStream Hook 在与通过 @[createAgent] 创建的代理或通过 StateGraph 创建的图一起使用时,支持完整的类型推断。将 typeof agenttypeof graph 作为类型参数传入,即可自动推断工具调用类型。

createAgent 配合使用

使用 @[createAgent] 时,工具调用类型会自动从注册到代理的工具中推断:
from langchain import create_agent, tool

@tool
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: Sunny, 72°F"

agent = create_agent(
    model="openai:gpt-4.1-mini",
    tools=[get_weather],
)

StateGraph 配合使用

对于自定义 StateGraph 应用,状态类型从图的注解中推断:
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated

class State(TypedDict):
    messages: Annotated[list, add_messages]

model = ChatOpenAI(model="gpt-4.1-mini")

async def agent(state: State) -> dict:
    response = await model.ainvoke(state["messages"])
    return {"messages": [response]}

workflow = StateGraph(State)
workflow.add_node("agent", agent)
workflow.add_edge(START, "agent")
workflow.add_edge("agent", END)

graph = workflow.compile()

使用 Annotation 类型

如果你在使用 LangGraph.js,可以复用图的注解类型。确保只导入类型,以避免引入整个 LangGraph.js 运行时:

高级类型配置

你可以为中断、自定义事件和可配置选项指定额外的类型参数:

渲染工具调用

使用 getToolCalls 从 AI 消息中提取并渲染工具调用。工具调用包含调用详情、结果(如已完成)和状态。
from langchain import create_agent, tool

@tool
def get_weather(location: str) -> str:
    """Get the current weather for a location."""
    return f'{{"status": "success", "content": "Weather in {location}: Sunny, 72°F"}}'

agent = create_agent(
    model="openai:gpt-4.1-mini",
    tools=[get_weather],
)

查看工具调用示例

tool-calling-agent 示例中查看包含天气、计算器和笔记工具的完整工具调用渲染实现。

自定义流式事件

在工具或节点中使用 writer 从代理流式传输自定义数据。通过 onCustomEvent 回调在 UI 中处理这些事件。
import asyncio
import time
from langchain import create_agent, tool
from langchain.types import ToolRuntime

@tool
async def analyze_data(data_source: str, *, config: ToolRuntime) -> str:
    """Analyze data with progress updates."""
    steps = ["Connecting...", "Fetching...", "Processing...", "Done!"]

    for i, step in enumerate(steps):
        # Emit progress events during execution
        if config.writer:
            config.writer({
                "type": "progress",
                "id": f"analysis-{int(time.time() * 1000)}",
                "message": step,
                "progress": ((i + 1) / len(steps)) * 100,
            })
        await asyncio.sleep(0.5)

    return '{"result": "Analysis complete"}'

agent = create_agent(
    model="openai:gpt-4.1-mini",
    tools=[analyze_data],
)

查看自定义流式传输示例

custom-streaming 示例中查看包含进度条、状态徽章和文件操作卡片的自定义事件完整实现。

事件处理

useStream Hook 提供了回调选项,让你可以访问不同类型的流式事件。你无需显式配置流模式——只需传入你想处理的事件类型的回调即可:

可用回调

回调描述流模式
onUpdateEvent每个图步骤后收到状态更新时调用updates
onCustomEvent收到图发出的自定义事件时调用custom
onMetadataEvent携带运行和线程元数据时调用metadata
onError发生错误时调用-
onFinish流式传输完成时调用-

多代理流式传输

在使用多代理系统或包含多个节点的图时,可通过消息元数据识别每条消息的来源节点。当多个 LLM 并行运行且你希望以不同视觉样式展示各自输出时,这尤为有用。
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END, Send
from langgraph.graph.state import CompiledStateGraph
from langchain.messages import BaseMessage, AIMessage
from typing import TypedDict, Annotated
import operator

# Use different model instances for variety
analytical_model = ChatOpenAI(model="gpt-4.1-mini", temperature=0.3)
creative_model = ChatOpenAI(model="gpt-4.1-mini", temperature=0.9)
practical_model = ChatOpenAI(model="gpt-4.1-mini", temperature=0.5)

class State(TypedDict):
    messages: Annotated[list[BaseMessage], operator.add]
    topic: str
    analytical_research: str
    creative_research: str
    practical_research: str

def fan_out_to_researchers(state: State) -> list[Send]:
    return [
        Send("researcher_analytical", state),
        Send("researcher_creative", state),
        Send("researcher_practical", state),
    ]

def dispatcher(state: State) -> dict:
    last_message = state["messages"][-1] if state["messages"] else None
    topic = last_message.content if last_message else ""
    return {"topic": topic}

async def researcher_analytical(state: State) -> dict:
    response = await analytical_model.ainvoke([
        {"role": "system", "content": "You are an analytical research expert."},
        {"role": "user", "content": f"Research: {state['topic']}"},
    ])
    return {
        "analytical_research": response.content,
        "messages": [AIMessage(content=response.content, name="researcher_analytical")],
    }

# Similar nodes for creative and practical researchers...

workflow = StateGraph(State)
workflow.add_node("dispatcher", dispatcher)
workflow.add_node("researcher_analytical", researcher_analytical)
workflow.add_node("researcher_creative", researcher_creative)
workflow.add_node("researcher_practical", researcher_practical)
workflow.add_edge(START, "dispatcher")
workflow.add_conditional_edges("dispatcher", fan_out_to_researchers)
workflow.add_edge("researcher_analytical", END)
workflow.add_edge("researcher_creative", END)
workflow.add_edge("researcher_practical", END)

agent: CompiledStateGraph = workflow.compile()

查看并行研究示例

parallel-research 示例中查看包含三个并行研究者和不同视觉样式的多代理流式传输完整实现。

人机协作

处理代理需要人工审批工具执行的中断。在如何处理中断指南中了解更多。
from langchain import create_agent, tool, human_in_the_loop_middleware
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver

model = ChatOpenAI(model="gpt-4.1-mini")

@tool
def send_email(to: str, subject: str, body: str) -> dict:
    """Send an email. Requires human approval."""
    return {
        "status": "success",
        "content": f'Email sent to {to} with subject "{subject}"',
    }

@tool
def delete_file(path: str) -> dict:
    """Delete a file. Requires human approval."""
    return {"status": "success", "content": f'File "{path}" deleted'}

@tool
def read_file(path: str) -> dict:
    """Read file contents. No approval needed."""
    return {"status": "success", "content": f"Contents of {path}..."}

agent = create_agent(
    model=model,
    tools=[send_email, delete_file, read_file],
    middleware=[
        human_in_the_loop_middleware(
            interrupt_on={
                "send_email": {
                    "allowed_decisions": ["approve", "edit", "reject"],
                    "description": "📧 Review email before sending",
                },
                "delete_file": {
                    "allowed_decisions": ["approve", "reject"],
                    "description": "🗑️ Confirm file deletion",
                },
                "read_file": False,  # Safe - auto-approved
            }
        ),
    ],
    checkpointer=MemorySaver(),
)

查看人机协作示例

human-in-the-loop 示例中查看包含批准、拒绝和编辑操作的审批工作流完整实现。

推理模型

扩展推理/思考支持目前处于实验阶段。推理 token 的流式接口因提供商(OpenAI vs. Anthropic)而异,随着抽象层的完善可能会发生变化。
使用具有扩展推理能力的模型(如 OpenAI 的推理模型或 Anthropic 的扩展思考)时,思考过程会嵌入在消息内容中。你需要将其提取并单独展示。
from langchain import create_agent
from langchain_openai import ChatOpenAI

# Use a reasoning-capable model
# For OpenAI: o1, o1-mini, o1-preview
# For Anthropic: claude-sonnet-4-20250514 with extended thinking enabled
model = ChatOpenAI(model="o1-mini")

agent = create_agent(
    model=model,
    tools=[],  # Reasoning models work best for complex reasoning tasks
)

查看推理示例

reasoning-agent 示例中查看使用 OpenAI 和 Anthropic 模型展示推理 token 的完整实现。

自定义状态类型

对于自定义 LangGraph 应用,可将工具调用类型嵌入状态的 messages 属性中。

自定义传输层

对于自定义 API 端点或非标准部署,可使用带有 FetchStreamTransporttransport 选项连接到任意流式 API。

相关资源