Skip to main content

概述

聊天界面一直主导着我们与人工智能的交互方式,但多模态人工智能的最新突破正在开启令人兴奋的新可能性。高质量的生成模型和富有表现力的文本转语音(TTS)系统现在使得构建感觉更像对话伙伴而非工具的代理成为可能。 语音代理就是这样一个例子。您无需依赖键盘和鼠标向代理输入,而是可以使用口语与之交互。这可以是一种更自然、更吸引人的人工智能交互方式,对于某些特定场景尤其有用。

什么是语音代理?

语音代理是能够与用户进行自然口语对话的代理。这些代理结合了语音识别、自然语言处理、生成式人工智能和文本转语音技术,以创建无缝、自然的对话。 它们适用于多种用例,包括:
  • 客户支持
  • 个人助理
  • 免提界面
  • 辅导和培训

语音代理如何工作?

从高层次来看,每个语音代理都需要处理三个任务:
  1. 倾听 - 捕获音频并将其转录
  2. 思考 - 解释意图、推理、规划
  3. 说话 - 生成音频并将其流式传输回用户
差异在于这些步骤的排序和耦合方式。在实践中,生产代理遵循两种主要架构之一:

1. STT > 代理 > TTS 架构(“三明治”)

三明治架构由三个不同的组件组成:语音转文本(STT)、基于文本的 LangChain 代理和文本转语音(TTS)。 优点:
  • 对每个组件的完全控制(可根据需要更换 STT/TTS 提供商)
  • 访问现代文本模态模型的最新功能
  • 行为透明,组件之间界限清晰
缺点:
  • 需要编排多个服务
  • 管理管道的额外复杂性
  • 从语音到文本的转换会丢失信息(例如,语调、情感)

2. 语音到语音架构(S2S)

语音到语音使用多模态模型,该模型原生处理音频输入并生成音频输出。 优点:
  • 架构更简单,活动部件更少
  • 对于简单交互,通常延迟更低
  • 直接音频处理能捕捉语调和其他语音细微差别
缺点:
  • 模型选项有限,提供商锁定风险更大
  • 功能可能落后于文本模态模型
  • 音频处理方式透明度较低
  • 可控性和自定义选项减少
本指南演示了三明治架构,以平衡性能、可控性和对现代模型功能的访问。使用某些 STT 和 TTS 提供商,三明治架构可以实现低于 700 毫秒的延迟,同时保持对模块化组件的控制。

演示应用程序概述

我们将逐步构建一个基于语音的代理,使用三明治架构。该代理将管理三明治店的订单。该应用程序将演示三明治架构的所有三个组件,使用 AssemblyAI 进行 STT,使用 Cartesia 进行 TTS(尽管可以为大多数提供商构建适配器)。 一个端到端的参考应用程序可在 voice-sandwich-demo 仓库中找到。我们将在此处逐步介绍该应用程序。 演示使用 WebSockets 在浏览器和服务器之间进行实时双向通信。相同的架构可以适应其他传输方式,如电话系统(Twilio、Vonage)或 WebRTC 连接。

架构

演示实现了一个流式管道,其中每个阶段异步处理数据: 客户端(浏览器)
  • 捕获麦克风音频并将其编码为 PCM
  • 建立与后端服务器的 WebSocket 连接
  • 实时将音频块流式传输到服务器
  • 接收并播放合成的语音音频
服务器(Node.js)
  • 接受来自客户端的 WebSocket 连接
  • 编排三步管道:
    • 语音转文本 (STT):将音频转发给 STT 提供商(例如 AssemblyAI),接收转录事件
    • 代理:使用 LangChain 代理处理转录,流式传输响应令牌
    • 文本转语音 (TTS):将代理响应发送给 TTS 提供商(例如 Cartesia),接收音频块
  • 将合成的音频返回给客户端进行播放
该管道使用异步迭代器在每个阶段启用流式传输。这允许下游组件在上游阶段完成之前开始处理,从而最大限度地减少端到端延迟。

设置

有关详细的安装说明和设置,请参阅仓库 README

1. 语音转文本

STT 阶段将传入的音频流转换为文本转录。实现使用生产者-消费者模式来并发处理音频流和转录接收。

关键概念

生产者-消费者模式:音频块与接收转录事件同时发送到 STT 服务。这允许在所有音频到达之前开始转录。 事件类型
  • stt_chunk:STT 服务处理音频时提供的部分转录
  • stt_output:触发代理处理的最终、格式化转录
WebSocket 连接:维护与 AssemblyAI 实时 STT API 的持久连接,配置为 16kHz PCM 音频和自动轮换格式。

实现

import { AssemblyAISTT } from "./assemblyai";
import type { VoiceAgentEvent } from "./types";

async function* sttStream(
  audioStream: AsyncIterable<Uint8Array>
): AsyncGenerator<VoiceAgentEvent> {
  const stt = new AssemblyAISTT({ sampleRate: 16000 });
  const passthrough = writableIterator<VoiceAgentEvent>();

  // 生产者:将音频块泵送到 AssemblyAI
  const producer = (async () => {
    try {
      for await (const audioChunk of audioStream) {
        await stt.sendAudio(audioChunk);
      }
    } finally {
      await stt.close();
    }
  })();

  // 消费者:接收转录事件
  const consumer = (async () => {
    for await (const event of stt.receiveEvents()) {
      passthrough.push(event);
    }
  })();

  try {
    // 在事件到达时产生事件
    yield* passthrough;
  } finally {
    // 等待生产者和消费者完成
    await Promise.all([producer, consumer]);
  }
}
该应用程序实现了一个 AssemblyAI 客户端来管理 WebSocket 连接和消息解析。请参阅下面的实现;可以为其他 STT 提供商构建类似的适配器。
export class AssemblyAISTT {
  protected _bufferIterator = writableIterator<VoiceAgentEvent.STTEvent>();
  protected _connectionPromise: Promise<WebSocket> | null = null;

  async sendAudio(buffer: Uint8Array): Promise<void> {
    const conn = await this._connection;
    conn.send(buffer);
  }

  async *receiveEvents(): AsyncGenerator<VoiceAgentEvent.STTEvent> {
    yield* this._bufferIterator;
  }

  protected get _connection(): Promise<WebSocket> {
    if (this._connectionPromise) return this._connectionPromise;

    this._connectionPromise = new Promise((resolve, reject) => {
      const params = new URLSearchParams({
        sample_rate: this.sampleRate.toString(),
        format_turns: "true",
      });
      const url = `wss://streaming.assemblyai.com/v3/ws?${params}`;
      const ws = new WebSocket(url, {
        headers: { Authorization: this.apiKey },
      });

      ws.on("open", () => resolve(ws));

      ws.on("message", (data) => {
        const message = JSON.parse(data.toString());
        if (message.type === "Turn") {
          if (message.turn_is_formatted) {
            this._bufferIterator.push({
              type: "stt_output",
              transcript: message.transcript,
              ts: Date.now()
            });
          } else {
            this._bufferIterator.push({
              type: "stt_chunk",
              transcript: message.transcript,
              ts: Date.now()
            });
          }
        }
      });
    });

    return this._connectionPromise;
  }
}

2. LangChain 代理

代理阶段通过 LangChain 代理 处理文本转录,并流式传输响应令牌。在这种情况下,我们流式传输代理生成的所有文本内容块

关键概念

流式响应:代理使用 stream_mode="messages" 在生成时发出响应令牌,而不是等待完整响应。这使得 TTS 阶段可以立即开始合成。 对话记忆检查点 使用唯一的线程 ID 维护跨轮次的对话状态。这允许代理在对话中引用先前的交流。

实现

import { createAgent } from "langchain";
import { HumanMessage } from "@langchain/core/messages";
import { MemorySaver } from "@langchain/langgraph";
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { v7 as uuid7 } from "uuid";

// 定义代理工具
const addToOrder = tool(
  async ({ item, quantity }) => {
    return `Added ${quantity} x ${item} to the order.`;
  },
  {
    name: "add_to_order",
    description: "Add an item to the customer's sandwich order.",
    schema: z.object({
      item: z.string(),
      quantity: z.number(),
    }),
  }
);

const confirmOrder = tool(
  async ({ orderSummary }) => {
    return `Order confirmed: ${orderSummary}. Sending to kitchen.`;
  },
  {
    name: "confirm_order",
    description: "Confirm the final order with the customer.",
    schema: z.object({
      orderSummary: z.string().describe("Summary of the order"),
    }),
  }
);

// 创建带有工具和记忆的代理
const agent = createAgent({
  model: "claude-haiku-4-5",
  tools: [addToOrder, confirmOrder],
  checkpointer: new MemorySaver(),
  systemPrompt: `You are a helpful sandwich shop assistant.
Your goal is to take the user's order. Be concise and friendly.
Do NOT use emojis, special characters, or markdown.
Your responses will be read by a text-to-speech engine.`,
});

async function* agentStream(
  eventStream: AsyncIterable<VoiceAgentEvent>
): AsyncGenerator<VoiceAgentEvent> {
  // 为对话记忆生成唯一的线程 ID
  const threadId = uuidv4();

  for await (const event of eventStream) {
    // 传递所有上游事件
    yield event;

    // 通过代理处理最终转录
    if (event.type === "stt_output") {
      const stream = await agent.stream(
        { messages: [new HumanMessage(event.transcript)] },
        {
          configurable: { thread_id: threadId },
          streamMode: "messages",
        }
      );

      // 在代理响应块到达时产生它们
      for await (const [message] of stream) {
        yield { type: "agent_chunk", text: message.text, ts: Date.now() };
      }
    }
  }
}

3. 文本转语音

TTS 阶段将代理响应文本合成为音频,并将其流式传输回客户端。与 STT 阶段类似,它使用生产者-消费者模式来处理并发文本发送和音频接收。

关键概念

并发处理:实现合并了两个异步流:
  • 上游处理:传递所有事件并将代理文本块发送给 TTS 提供商
  • 音频接收:从 TTS 提供商接收合成的音频块
流式 TTS:一些提供商(如 Cartesia)在收到文本后立即开始合成音频,从而在代理完成生成完整响应之前开始音频播放。 事件传递:所有上游事件保持不变地流过,允许客户端或其他观察者跟踪完整的管道状态。

实现

import { CartesiaTTS } from "./cartesia";

async function* ttsStream(
  eventStream: AsyncIterable<VoiceAgentEvent>
): AsyncGenerator<VoiceAgentEvent> {
  const tts = new CartesiaTTS();
  const passthrough = writableIterator<VoiceAgentEvent>();

  // 生产者:读取上游事件并将文本发送给 Cartesia
  const producer = (async () => {
    try {
      for await (const event of eventStream) {
        passthrough.push(event);
        if (event.type === "agent_chunk") {
          await tts.sendText(event.text);
        }
      }
    } finally {
      await tts.close();
    }
  })();

  // 消费者:从 Cartesia 接收音频
  const consumer = (async () => {
    for await (const event of tts.receiveEvents()) {
      passthrough.push(event);
    }
  })();

  try {
    // 从生产者和消费者产生事件
    yield* passthrough;
  } finally {
    await Promise.all([producer, consumer]);
  }
}
该应用程序实现了一个 Cartesia 客户端来管理 WebSocket 连接和音频流式传输。请参阅下面的实现;可以为其他 TTS 提供商构建类似的适配器。
export class CartesiaTTS {
  protected _bufferIterator = writableIterator<VoiceAgentEvent.TTSEvent>();
  protected _connectionPromise: Promise<WebSocket> | null = null;

  async sendText(text: string | null): Promise<void> {
    if (!text || !text.trim()) return;

    const conn = await this._connection;
    const payload = { text, try_trigger_generation: false };
    conn.send(JSON.stringify(payload));
  }

  async *receiveEvents(): AsyncGenerator<VoiceAgentEvent.TTSEvent> {
    yield* this._bufferIterator;
  }

  protected _generateContextId(): string {
    const timestamp = Date.now();
    const counter = this._contextCounter++;
    return `ctx_${timestamp}_${counter}`;
  }

  protected get _connection(): Promise<WebSocket> {
    if (this._connectionPromise) return this._connectionPromise;

    this._connectionPromise = new Promise((resolve, reject) => {
      const params = new URLSearchParams({
        api_key: this.apiKey,
        cartesia_version: this.cartesiaVersion,
      });
      const url = `wss://api.cartesia.ai/tts/websocket?${params.toString()}`;
      const ws = new WebSocket(url);

      ws.on("open", () => {
        resolve(ws);
      });

      ws.on("message", (data: WebSocket.RawData) => {
        const message: CartesiaTTSResponse = JSON.parse(data.toString());
        if (message.data) {
          this._bufferIterator.push({
            type: "tts_chunk",
            audio: message.data,
            ts: Date.now(),
          });
        } else if (message.error) {
          throw new Error(`Cartesia error: ${message.error}`);
        }
      });
    });

    return this._connectionPromise;
  }
}

LangSmith

您使用 LangChain 构建的许多应用程序将包含多个步骤和多次 LLM 调用。随着这些应用程序变得越来越复杂,能够检查链或代理内部究竟发生了什么变得至关重要。最好的方法是使用 LangSmith 在上面的链接注册后,确保设置环境变量以开始记录跟踪:
export LANGSMITH_TRACING="true"
export LANGSMITH_API_KEY="..."

整合所有部分

完整的管道将三个阶段链接在一起:
// 使用 https://hono.dev/
app.get("/ws", upgradeWebSocket(async () => {
  const inputStream = writableIterator<Uint8Array>();

  // 链接三个阶段
  const transcriptEventStream = sttStream(inputStream);
  const agentEventStream = agentStream(transcriptEventStream);
  const outputEventStream = ttsStream(agentEventStream);

  // 处理管道并将 TTS 音频发送给客户端
  const flushPromise = (async () => {
    for await (const event of outputEventStream) {
      if (event.type === "tts_chunk") {
        currentSocket?.send(event.audio);
      }
    }
  })();

  return {
    onMessage(event) {
      // 将传入音频推送到管道中
      const data = event.data;
      if (Buffer.isBuffer(data)) {
        inputStream.push(new Uint8Array(data));
      }
    },
    async onClose() {
      inputStream.cancel();
      await flushPromise;
    },
  };
}));
每个阶段独立且并发地处理事件:音频转录在音频到达时开始,代理在转录可用时开始推理,语音合成在代理文本生成时开始。这种架构可以实现低于 700 毫秒的延迟,以支持自然对话。 有关使用 LangChain 构建代理的更多信息,请参阅代理指南