加入与重新加入功能允许您在不停止代理的情况下断开与正在运行的代理流的连接,然后稍后重新连接。当客户端离开时,代理继续在服务器端执行,您可以从离开的位置精确地继续流。
为什么需要加入与重新加入?
传统的流式 API 将客户端和服务器紧密耦合:如果客户端断开连接,流就会丢失。加入与重新加入打破了这种耦合,实现了几个重要的模式:
- 网络中断:移动用户在蜂窝基站或 Wi-Fi 网络之间移动时可以无缝恢复
- 页面导航:用户离开聊天页面后稍后返回,而不会丢失进度
- 移动应用后台运行:被操作系统挂起的应用可以在回到前台时重新加入流
- 长时间运行的任务:代理执行多分钟的操作(研究、代码生成、数据分析),用户无需保持页面打开
- 多设备交接:在手机上开始对话,在桌面上重新加入
核心概念
加入/重新加入模式涉及三个关键机制:
| 方法 / 选项 | 目的 |
|---|
stream.stop() | 断开客户端与流的连接,但不停止代理 |
stream.joinStream(runId) | 通过其运行 ID 重新连接到现有流 |
onDisconnect: "continue" | 提交选项,告诉服务器在客户端断开连接后继续运行 |
streamResumable: true | 提交选项,使流可以在稍后重新加入 |
stream.stop() 与取消运行有根本区别。停止仅断开客户端连接。代理继续在服务器端处理。要实际取消代理的执行,您应使用中断或取消机制。
设置 useStream
关键设置步骤是从 onCreated 回调中捕获 run_id,以便稍后可以重新加入。
定义一个与您的代理状态模式匹配的 TypeScript 接口,并将其作为类型参数传递给 useStream,以便类型安全地访问状态值。在下面的示例中,将 typeof myAgent 替换为您的接口名称:
import type { BaseMessage } from "@langchain/core/messages";
interface AgentState {
messages: BaseMessage[];
}
import { useStream } from "@langchain/react";
import { useState } from "react";
function Chat() {
const [savedRunId, setSavedRunId] = useState<string | null>(null);
const stream = useStream<typeof myAgent>({
apiUrl: "http://localhost:2024",
assistantId: "join_rejoin",
onCreated(run) {
setSavedRunId(run.run_id);
},
});
const isConnected = stream.isLoading;
return (
<div>
<ConnectionStatus connected={isConnected} />
<MessageList messages={stream.messages} />
<ChatControls
stream={stream}
savedRunId={savedRunId}
isConnected={isConnected}
/>
</div>
);
}
使用可恢复选项提交
提交消息时,传递 onDisconnect: "continue" 和 streamResumable: true 以启用加入/重新加入流程:
stream.submit(
{ messages: [{ type: "human", content: text }] },
{
onDisconnect: "continue",
streamResumable: true,
}
);
| 选项 | 默认值 | 描述 |
|---|
onDisconnect | "cancel" | 客户端断开连接时发生的情况。"continue" 保持代理运行;"cancel" 停止它。 |
streamResumable | false | 当为 true 时,服务器保留流状态,以便客户端稍后可以重新加入。 |
始终同时使用这两个选项。设置 onDisconnect: "continue" 而不设置 streamResumable: true 意味着代理保持运行,但您无法重新加入流以查看其输出。
从流断开连接
调用 stream.stop() 以断开客户端连接。代理继续在服务器端处理。
调用 stop() 后:
stream.isLoading 变为 false
- 消息列表保留断开连接点之前收到的所有消息
- 代理继续在服务器上运行
- 在重新加入之前不会收到新消息
重新加入流
使用保存的运行 ID 调用 stream.joinStream(runId) 以重新连接:
stream.joinStream(savedRunId);
重新加入后:
stream.isLoading 再次变为 true
- 断开连接期间生成的任何消息都会被传递
- 新的流式消息实时恢复
- 如果代理已经完成,您会立即收到最终状态
构建连接状态指示器
视觉指示器可帮助用户了解他们是否正在主动接收代理的更新。
function ConnectionStatus({ connected }: { connected: boolean }) {
return (
<div className="connection-status">
<span
className={`status-dot ${connected ? "connected" : "disconnected"}`}
/>
<span className="status-text">
{connected ? "Connected" : "Disconnected"}
</span>
</div>
);
}
使用绿色/红色点设置指示器样式:
.status-dot {
width: 8px;
height: 8px;
border-radius: 50%;
display: inline-block;
margin-right: 6px;
}
.status-dot.connected {
background-color: #22c55e;
box-shadow: 0 0 4px #22c55e;
}
.status-dot.disconnected {
background-color: #ef4444;
box-shadow: 0 0 4px #ef4444;
}
断开和重新加入控件
提供显式的断开和重新加入按钮,以便用户拥有完全控制权:
function ChatControls({ stream, savedRunId, isConnected }) {
const [input, setInput] = useState("");
const handleSend = () => {
if (!input.trim()) return;
stream.submit(
{ messages: [{ type: "human", content: input.trim() }] },
{ onDisconnect: "continue", streamResumable: true }
);
setInput("");
};
return (
<div className="controls">
<div className="input-row">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Type a message..."
onKeyDown={(e) => e.key === "Enter" && handleSend()}
/>
<button onClick={handleSend}>Send</button>
</div>
<div className="stream-controls">
{isConnected ? (
<button onClick={() => stream.stop()} className="disconnect-btn">
Disconnect
</button>
) : (
savedRunId && (
<button
onClick={() => stream.joinStream(savedRunId)}
className="rejoin-btn"
>
Rejoin stream
</button>
)
)}
</div>
</div>
);
}
持久化运行 ID
对于跨会话重新加入(例如,用户关闭浏览器后稍后返回),将运行 ID 持久化到存储中:
const stream = useStream<typeof myAgent>({
apiUrl: "http://localhost:2024",
assistantId: "join_rejoin",
onCreated(run) {
localStorage.setItem("activeRunId", run.run_id);
},
});
// 页面加载时,检查是否有活动运行
const existingRunId = localStorage.getItem("activeRunId");
if (existingRunId) {
stream.joinStream(existingRunId);
}
持久化的运行 ID 应在运行完成时清理。监听流完成并移除存储的 ID,以避免尝试重新加入已完成的运行。
错误处理
如果运行已过期、被删除或服务器已重新启动,重新加入可能会失败。请优雅地处理这些情况:
try {
stream.joinStream(savedRunId);
} catch (error) {
console.error("Failed to rejoin stream:", error);
// 清除过时的运行 ID 并通知用户
setSavedRunId(null);
localStorage.removeItem("activeRunId");
}
完整示例
function JoinRejoinChat() {
const [savedRunId, setSavedRunId] = useState<string | null>(null);
const [input, setInput] = useState("");
const stream = useStream<typeof myAgent>({
apiUrl: "http://localhost:2024",
assistantId: "join_rejoin",
onCreated(run) {
setSavedRunId(run.run_id);
},
});
const isConnected = stream.isLoading;
const handleSend = () => {
if (!input.trim()) return;
stream.submit(
{ messages: [{ type: "human", content: input.trim() }] },
{ onDisconnect: "continue", streamResumable: true }
);
setInput("");
};
return (
<div className="chat-container">
<header>
<h2>Join & Rejoin Demo</h2>
<ConnectionStatus connected={isConnected} />
</header>
<div className="messages">
{stream.messages.map((msg, i) => (
<MessageBubble key={i} message={msg} />
))}
</div>
<div className="controls">
<form onSubmit={(e) => { e.preventDefault(); handleSend(); }}>
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Type a message..."
/>
<button type="submit">Send</button>
</form>
<div className="stream-actions">
{isConnected ? (
<button onClick={() => stream.stop()}>
Disconnect
</button>
) : (
savedRunId && (
<button onClick={() => stream.joinStream(savedRunId)}>
Rejoin stream
</button>
)
)}
</div>
</div>
</div>
);
}
最佳实践
- 始终保存运行 ID:没有它,无法重新加入。同时使用组件状态和持久化存储以提高弹性。
- 显示清晰的连接状态:用户应始终知道他们是在接收实时更新还是查看快照。
- 在可见性变化时自动重新加入:使用 Page Visibility API 在用户返回标签页时自动重新加入。
- 设置合理的超时:如果重新加入尝试耗时过长,则回退到获取线程历史记录。
- 清理已完成的运行:代理完成时移除持久化的运行 ID,以避免过时的重新加入尝试。