Skip to main content
Agent2Agent (A2A) 是 Google 用于实现对话式 AI 代理之间通信的协议。LangSmith 实现了 A2A 支持,允许您的代理通过标准化协议与其他兼容 A2A 的代理进行通信。 A2A 端点在 Agent Server 中可用,路径为 /a2a/{assistant_id}

支持的方法

Agent Server 支持以下 A2A RPC 方法:
  • message/send:向助手发送消息并接收完整响应
  • message/stream:发送消息并使用服务器发送事件 (SSE) 实时流式传输响应
  • tasks/get:检索先前创建的任务的状态和结果

代理卡发现

每个助手会自动公开一个 A2A 代理卡,描述其功能并提供其他代理连接所需的信息。您可以使用以下方式检索任何助手的代理卡:
GET /.well-known/agent-card.json?assistant_id={assistant_id}
代理卡包括助手的名称、描述、可用技能、支持的输入/输出模式以及用于通信的 A2A 端点 URL。

要求

要使用 A2A,请确保已安装以下依赖项:
  • langgraph-api >= 0.4.21
使用以下命令安装:
pip install "langgraph-api>=0.4.21"

使用概述

要启用 A2A:
  • 升级以使用 langgraph-api>=0.4.21。
  • 使用基于消息的状态结构部署您的代理。
  • 使用端点与其他兼容 A2A 的代理连接。

创建兼容 A2A 的代理

此示例创建一个兼容 A2A 的代理,该代理使用 OpenAI 的 API 处理传入消息并维护对话状态。代理定义了一个基于消息的状态结构,并处理 A2A 协议的消息格式。 要兼容 A2A “text” 部分,代理的状态中必须有一个 messages 键。 A2A 协议使用两个标识符来维持对话连续性:
  • contextId:将消息分组到一个对话线程中(类似于会话 ID)
  • taskId:标识该对话中的每个单独请求
在第一条消息中,省略 contextIdtaskId - 代理将生成并返回它们。对于对话中的所有后续消息,请包含先前响应中的 contextIdtaskId 以维持线程连续性。 LangSmith 追踪: LangSmith 部署的 A2A 端点会自动将 A2A contextId 转换为 LangSmith 追踪的 thread_id,将对话中的所有消息分组到一个线程下。 例如:
"""LangGraph A2A 对话代理。

支持 A2A 协议,使用消息输入进行对话交互。
"""

from __future__ import annotations

import os
from dataclasses import dataclass
from typing import Any, Dict, List, TypedDict

from langgraph.graph import StateGraph
from langgraph.runtime import Runtime
from openai import AsyncOpenAI


class Context(TypedDict):
    """代理的上下文参数。"""
    my_configurable_param: str


@dataclass
class State:
    """代理的输入状态。

    定义 A2A 对话消息的初始结构。
    """
    messages: List[Dict[str, Any]]


async def call_model(state: State, runtime: Runtime[Context]) -> Dict[str, Any]:
    """处理对话消息并使用 OpenAI 返回输出。"""
    # 初始化 OpenAI 客户端
    client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    # 处理传入的消息
    latest_message = state.messages[-1] if state.messages else {}
    user_content = latest_message.get("content", "No message content")

    # 为 OpenAI API 创建消息
    openai_messages = [
        {
            "role": "system",
            "content": "You are a helpful conversational agent. Keep responses brief and engaging."
        },
        {
            "role": "user",
            "content": user_content
        }
    ]

    try:
        # 调用 OpenAI API
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=openai_messages,
            max_tokens=100,
            temperature=0.7
        )

        ai_response = response.choices[0].message.content

    except Exception as e:
        ai_response = f"I received your message but had trouble processing it. Error: {str(e)[:50]}..."

    # 创建响应消息
    response_message = {
        "role": "assistant",
        "content": ai_response
    }

    return {
        "messages": state.messages + [response_message]
    }


# 定义图
graph = (
    StateGraph(State, context_schema=Context)
    .add_node(call_model)
    .add_edge("__start__", "call_model")
    .compile()
)

代理间通信

一旦您的代理通过 langgraph dev 在本地运行或部署到生产环境,您就可以使用 A2A 协议促进它们之间的通信。 此示例演示了两个代理如何通过向彼此的 A2A 端点发送 JSON-RPC 消息进行通信。该脚本模拟了一个多轮对话,其中每个代理处理对方的响应并继续对话。
#!/usr/bin/env python3
"""使用 LangGraph A2A 端点进行代理间对话模拟。"""

import asyncio
import aiohttp
import os
import uuid


def extract_text(result: dict) -> str:
    """从 A2A 结果中尽力提取响应文本。"""
    for art in result.get("result", {}).get("artifacts", []) or []:
        for part in art.get("parts", []) or []:
            if part.get("kind") == "text" and part.get("text"):
                return part["text"]

    msg = (result.get("result", {}).get("status", {}) or {}).get("message", {}) or {}
    for part in msg.get("parts", []) or []:
        if part.get("kind") == "text" and part.get("text"):
            return part["text"]

    return "(no text found)"


async def send_message(session, port, assistant_id, text, context_id=None, task_id=None):
    """发送 A2A 消息。返回 (response_text, returned_context_id, returned_task_id)。"""
    url = f"http://127.0.0.1:{port}/a2a/{assistant_id}"

    message = {
        "role": "user",
        "parts": [{"kind": "text", "text": text}],
        "messageId": str(uuid.uuid4()),
    }

    # A2A 多轮连续性:在轮次/代理间重用 contextId 和 taskId
    if context_id:
        message["contextId"] = context_id
    if task_id:
        message["taskId"] = task_id

    payload = {
        "jsonrpc": "2.0",
        "id": str(uuid.uuid4()),
        "method": "message/send",
        "params": {"message": message},
    }

    headers = {"Accept": "application/json"}
    async with session.post(url, json=payload, headers=headers) as response:
        result = await response.json()

    returned_context_id = result.get("result", {}).get("contextId") or context_id
    returned_task_id = result.get("result", {}).get("id")
    return extract_text(result), returned_context_id, returned_task_id


async def simulate_conversation():
    """模拟两个代理之间的对话。"""

    # 助手 ID
    agent_a_id = os.getenv("AGENT_A_ID")
    agent_b_id = os.getenv("AGENT_B_ID")

    if not agent_a_id or not agent_b_id:
        print("Set AGENT_A_ID and AGENT_B_ID environment variables")
        return

    message = "Hello! Let's have a conversation."
    context_id = None
    task_id = None

    async with aiohttp.ClientSession() as session:
        for i in range(3):
            print(f"--- Round {i + 1} ---")

            message, context_id, task_id = await send_message(
                session, 2024, agent_a_id, message,
                context_id=context_id,
                task_id=task_id,
            )
            print(f"🔵 Agent A: {message}")

            message, context_id, task_id = await send_message(
                session, 2025, agent_b_id, message,
                context_id=context_id,
                task_id=task_id,
            )
            print(f"🔴 Agent B: {message}\n")


if __name__ == "__main__":
    asyncio.run(simulate_conversation())
有关完整的工作示例,请参阅:

分布式追踪

当多个代理通过 A2A 通信时,LangSmith 可以将它们的所有追踪分组到一个线程中,为您提供整个多代理对话的统一视图。

contextId 如何映射到 thread_id

Agent Server A2A 端点会自动将 A2A contextId 转换为 LangSmith 追踪的 thread_id。这意味着对话中的每条消息,无论来自哪个参与代理,都会在 LangSmith 中分组到同一线程下,无需您进行任何额外配置。 流程如下:
  1. 在第一条消息中,客户端省略 contextId。服务器生成一个并在响应中返回。
  2. 客户端在所有后续消息中传递 contextId 以维持对话连续性。
  3. Agent Server 将 contextId 映射到 LangSmith 元数据中的 thread_id,因此所有轮次都出现在同一线程中。

跨多个代理的追踪

当来自不同框架的代理通过 A2A 通信时,您可以通过在所有代理间共享相同的 thread_id 来在 LangSmith 中统一它们的追踪。使用第一个代理返回的 contextId 作为所有后续请求的 thread_id 以下代码片段演示了关键概念。有关两个代理的完整可运行实现,请参阅 Google ADK + LangChain 示例
import asyncio
import aiohttp
import uuid


async def send_message(session, url, text, context_id=None, task_id=None, thread_id=None):
    """发送 A2A 消息并返回 (response_text, context_id, task_id)。"""

    # --- 1. 构建消息 ---
    # 在后续轮次中,在消息对象内包含 contextId 和 taskId,
    # 以便服务器将它们与正在进行的对话关联起来。
    message = {
        "role": "user",
        "parts": [{"kind": "text", "text": text}],
        "messageId": str(uuid.uuid4()),
    }
    if context_id:
        message["contextId"] = context_id
    if task_id:
        message["taskId"] = task_id

    # --- 2. 在元数据中设置 thread_id ---
    # thread_id 位于 JSON-RPC 载荷的顶层,而不是在 params 内部。
    payload = {
        "jsonrpc": "2.0",
        "id": str(uuid.uuid4()),
        "method": "message/send",
        "params": {"message": message},
        "metadata": {"thread_id": thread_id},
    }

    async with session.post(url, json=payload, headers={"Accept": "application/json"}) as response:
        if response.status != 200:
            raise RuntimeError(f"HTTP {response.status}: {await response.text()}")
        result = await response.json()

    if "error" in result:
        raise RuntimeError(result["error"].get("message", "Unknown error"))

    result_obj = result.get("result", {})
    returned_context_id = result_obj.get("contextId") or context_id
    returned_task_id = result_obj.get("id")
    text_out = next(
        (
            part.get("text", "")
            for art in result_obj.get("artifacts", []) or []
            for part in art.get("parts", []) or []
            if part.get("kind") == "text"
        ),
        "(no text)",
    )
    return text_out, returned_context_id, returned_task_id


async def run_conversation(agent_a_url, agent_b_url):
    # --- 3. 在代理间共享 thread_id ---
    # 预先生成一个共享的 thread_id。一旦服务器返回 contextId,
    # 就使用它——这使 A2A 上下文和 LangSmith 线程保持同步。
    thread_id = str(uuid.uuid4())
    context_id = None
    task_id = None
    message = "Hello! Let's collaborate."

    async with aiohttp.ClientSession() as session:
        for _ in range(3):
            message, context_id, task_id = await send_message(
                session, agent_a_url, message,
                context_id=context_id, task_id=task_id,
                thread_id=context_id or thread_id,
            )

            # 将相同的 thread_id 传递给每个代理,将所有追踪分组到 LangSmith 中
            message, context_id, task_id = await send_message(
                session, agent_b_url, message,
                context_id=context_id, task_id=task_id,
                thread_id=context_id or thread_id,
            )


asyncio.run(run_conversation(
    "http://localhost:2024/a2a/<agent_a_assistant_id>",
    "http://localhost:2025/a2a/<agent_b_assistant_id>",
))
1. 构建消息:在后续轮次中,在 message 对象内包含 contextIdtaskId,以便服务器可以将它们与正在进行的对话关联起来。在第一条消息中省略它们,因为服务器会生成一个 contextId 并在响应中返回。 2. 在元数据中设置 thread_id:在 JSON-RPC 载荷的顶层 metadata 字段中传递 thread_id,而不是在 params 内部。 3. 在代理间共享 thread_id:在第一条消息之前生成一个随机的 thread_id。一旦服务器返回 contextId,就将其用作所有后续请求的 thread_id,这使 A2A 对话上下文和 LangSmith 线程保持同步。将相同的 thread_id 传递给每个代理,以便所有追踪都分组到一个线程中。

在非 LangGraph 代理中接收 thread_id

上一节涵盖了客户端——发送消息时传播 thread_id。如果您的某个代理不是基于 LangGraph 构建的,它还需要在接收端提取并附加 thread_id,以便其追踪落入同一个 LangSmith 线程。使用 langsmith.integrations.otel.configure() 设置自动追踪,并从传入的 A2A 请求元数据中提取 thread_id 以将追踪分组到同一线程中。
from fastapi import FastAPI, Request
from langsmith.integrations.otel import configure as configure_otel
from opentelemetry import trace
import json

# --- 1. 配置 OTel ---
# 为您的非 LangGraph 代理设置自动追踪到 LangSmith。
configure_otel(project_name="my-a2a-project")
tracer = trace.get_tracer(__name__)

app = FastAPI()

@app.middleware("http")
async def set_thread_id_middleware(request: Request, call_next):
    thread_id = None
    if request.method == "POST":
        body_bytes = await request.body()
        if body_bytes:
            # --- 2. 从传入的 A2A 元数据中提取 thread_id ---
            try:
                body = json.loads(body_bytes)
                thread_id = body.get("metadata", {}).get("thread_id")
            except Exception:
                pass
            # 重新注入请求体,以便下游处理程序仍然可以读取它
            async def receive():
                return {"type": "http.request", "body": body_bytes}
            request._receive = receive

    # --- 3. 将 thread_id 附加到追踪 ---
    # langsmith.metadata.thread_id 将此追踪与同一线程中的其他追踪分组。
    with tracer.start_as_current_span("agent") as span:
        if thread_id:
            span.set_attribute("langsmith.metadata.thread_id", thread_id)
        return await call_next(request)
在此中间件之后,在 app 上注册您的代理路由。
在您的环境中设置 LANGSMITH_API_KEY 和可选的 LANGSMITH_PROJECT 以启用追踪。对话中的所有代理应使用相同的项目,以便它们的追踪可以一起查看。

在 LangSmith 中查看追踪

运行多代理对话后,打开 LangSmith UI 并导航到 Threads。所有参与代理的所有轮次将出现在一个线程下,由共享的 thread_id 标识。

禁用 A2A

要禁用 A2A 端点,请在您的 langgraph.json 配置文件中将 disable_a2a 设置为 true
{
  "$schema": "https://langgra.ph/schema.json",
  "http": {
    "disable_a2a": true
  }
}