Skip to main content
本指南涵盖两种使用 REST API 进行追踪的方法:使用 POST /runsPATCH /runs 端点进行基本追踪,以及使用 POST /runs/multipart 进行批量摄取以获得更高吞吐量。 有关端点和请求/响应模式的完整列表,请参阅 API 参考文档
我们强烈建议使用 PythonTypeScript SDK 将追踪数据发送到 LangSmith,而不是直接使用 REST API。SDK 包含批处理和后台发送优化,可防止追踪影响应用程序的性能。如果无法使用 SDK,请注意同步发送追踪数据可能会影响应用程序性能。
我们建议为运行 ID 使用 UUID v7。UUIDv7 嵌入了时间戳,这可以保留追踪中运行的正确时间顺序。使用 LangSmith SDK 中的 uuid7() 生成它们,或参阅指定自定义运行 ID了解更多详情。

基本追踪

记录运行的最简单方法是通过 POST /runsPATCH /runs 端点。此方法需要最少的信息来建立追踪层次结构。
使用 LangSmith REST API 时,请在请求头中提供您的 API 密钥,格式为 "x-api-key"如果您的 API 密钥链接到多个工作区,请在请求头中使用 "x-tenant-id" 指定工作区。在此方法中,您无需设置 dotted_ordertrace_id 字段——系统会自动生成它们。虽然更简单,但比批量摄取更慢,且受更低的速率限制。
以下示例追踪一个聊天补全,包含一个父链运行和一个子 LLM 运行。在子运行上设置 parent_run_id 以将其附加到其父运行:
import openai
import os
import requests
from datetime import datetime, timezone
from langsmith import uuid7

# 在请求头中发送您的 API 密钥
headers = {
    "x-api-key": os.environ["LANGSMITH_API_KEY"],
    "x-tenant-id": os.environ["LANGSMITH_WORKSPACE_ID"]
}

def post_run(run_id, name, run_type, inputs, parent_id=None):
    """向 API 发送新运行的函数。"""
    data = {
        "id": run_id.hex,
        "name": name,
        "run_type": run_type,
        "inputs": inputs,
        "start_time": datetime.utcnow().isoformat(),
        # "session_name": "project-name",  # 要追踪到的项目名称
        # "session_id": "project-id",  # 要追踪到的项目 ID。指定 session_name 或 session_id 之一
    }
    if parent_id:
        data["parent_run_id"] = parent_id.hex

    requests.post(
        "https://api.smith.langchain.com/runs",  # 对于自托管、欧盟 (GCP) (`eu.api...`) 或美国 (AWS) (`aws.api...`) 请更新
        json=data,
        headers=headers
    )

def patch_run(run_id, outputs):
    """使用输出更新运行的函数。"""
    requests.patch(
        f"https://api.smith.langchain.com/runs/{run_id}",
        json={
            "outputs": outputs,
            "end_time": datetime.now(timezone.utc).isoformat(),
        },
        headers=headers,
    )

# 这可以是您应用程序的用户输入
question = "Can you summarize this morning's meetings?"

# 这可以在检索步骤中检索到
context = "During this morning's meeting, we solved all world conflict."

messages = [
    {"role": "system", "content": "You are a helpful assistant. Please respond to the user's request only based on the given context."},
    {"role": "user", "content": f"Question: {question}\nContext: {context}"}
]

# 创建父运行
parent_run_id = uuid7()
post_run(parent_run_id, "Chat Pipeline", "chain", {"question": question})

# 创建子运行
child_run_id = uuid7()
post_run(child_run_id, "OpenAI Call", "llm", {"messages": messages}, parent_run_id)

# 生成补全
client = openai.Client()
chat_completion = client.chat.completions.create(
    model="gpt-5.4-mini",
    messages=messages
)

# 结束运行
patch_run(child_run_id, chat_completion.dict())
patch_run(parent_run_id, {"answer": chat_completion.choices[0].message.content})
更多信息,请参阅运行(跨度)数据格式

批量摄取

为了更快的摄取和更高的速率限制,请使用 POST /runs/multipart 端点。这需要 requests-toolbeltuuid-utils 包。 与基本追踪不同,此端点要求您自行计算并设置 dotted_ordertrace_iddotted_order 编码每个运行的时间戳和 UUID,父条目和子条目用点连接(例如,20240101T000000Z<parent-uuid>.20240101T000001Z<child-uuid>),告诉 LangSmith 运行之间的关系以及它们发生的顺序。trace_id 是根运行的 UUID。 以下示例创建一个父运行和一个子运行,在单个批量请求中发送它们,然后使用它们的输出更新两者:
import json
import os
import uuid
from datetime import datetime, timezone
from typing import Dict, List
import requests
from requests_toolbelt import MultipartEncoder
from uuid_utils.compat import uuid7

def create_dotted_order(
    start_time: datetime | None = None,
    run_id: uuid.UUID | None = None
) -> str:
    """为运行排序和层次结构创建点序字符串。

    点序用于建立运行之间的顺序和关系。
    它将时间戳与唯一标识符结合,以确保正确的排序和追踪。
    """
    st = start_time or datetime.now(timezone.utc)
    id_ = run_id or uuid7()
    return f"{st.strftime('%Y%m%dT%H%M%S%fZ')}{id_}"

def create_run_base(
    name: str,
    run_type: str,
    inputs: dict,
    start_time: datetime
) -> dict:
    """创建运行的基础结构。"""
    run_id = uuid7()
    return {
        "id": str(run_id),
        "trace_id": str(run_id),
        "name": name,
        "start_time": start_time.isoformat(),
        "inputs": inputs,
        "run_type": run_type,
    }

def construct_run(
    name: str,
    run_type: str,
    inputs: dict,
    parent_dotted_order: str | None = None,
) -> dict:
    """使用给定参数构建运行字典。

    此函数创建一个具有唯一 ID 和点序的运行,如果它是子运行,则建立其在追踪层次结构中的位置。
    """
    start_time = datetime.now(timezone.utc)
    run = create_run_base(name, run_type, inputs, start_time)
    current_dotted_order = create_dotted_order(start_time, uuid.UUID(run["id"]))

    if parent_dotted_order:
        current_dotted_order = f"{parent_dotted_order}.{current_dotted_order}"
        run["trace_id"] = parent_dotted_order.split(".")[0].split("Z")[1]
        run["parent_run_id"] = parent_dotted_order.split(".")[-1].split("Z")[1]

    run["dotted_order"] = current_dotted_order
    return run

def serialize_run(operation: str, run_data: dict) -> List[tuple]:
    """为多部分请求序列化运行。

    此函数将运行数据分离为多个部分,以便高效传输和存储。
    主要运行数据和可选字段(输入、输出、事件)分别序列化。
    """
    run_id = run_data.get("id", str(uuid7()))

    # 分离可选字段
    inputs = run_data.pop("inputs", None)
    outputs = run_data.pop("outputs", None)
    events = run_data.pop("events", None)

    parts = []

    # 序列化主要运行数据
    run_data_json = json.dumps(run_data).encode("utf-8")
    parts.append(
        (
            f"{operation}.{run_id}",
            (
                None,
                run_data_json,
                "application/json",
                {"Content-Length": str(len(run_data_json))},
            ),
        )
    )

    # 序列化可选字段
    for key, value in [("inputs", inputs), ("outputs", outputs), ("events", events)]:
        if value:
            serialized_value = json.dumps(value).encode("utf-8")
            parts.append(
                (
                    f"{operation}.{run_id}.{key}",
                    (
                        None,
                        serialized_value,
                        "application/json",
                        {"Content-Length": str(len(serialized_value))},
                    ),
                )
            )

    return parts

def batch_ingest_runs(
    api_url: str,
    api_key: str,
    posts: list[dict] | None = None,
    patches: list[dict] | None = None,
) -> None:
    """在单个批量请求中摄取多个运行。

    此函数处理创建新运行(posts)和更新现有运行(patches)。
    与单独的 API 调用相比,它对于摄取多个运行更有效率。
    """
    boundary = uuid.uuid4().hex
    all_parts = []

    for operation, runs in zip(("post", "patch"), (posts, patches)):
        if runs:
            all_parts.extend(
                [part for run in runs for part in serialize_run(operation, run)]
            )

    encoder = MultipartEncoder(fields=all_parts, boundary=boundary)
    headers = {"Content-Type": encoder.content_type, "x-api-key": api_key}

    try:
        response = requests.post(
            f"{api_url}/runs/multipart",
            data=encoder,
            headers=headers
        )
        response.raise_for_status()
        print("Successfully ingested runs.")
    except requests.RequestException as e:
        print(f"Error ingesting runs: {e}")
        # 在生产环境中,您可能希望记录此错误或更稳健地处理它

# 配置 API URL 和密钥
# 对于生产用途,考虑使用配置文件或环境变量
api_url = "https://api.smith.langchain.com"  # 欧盟 (GCP): eu.api...; 美国 (AWS): aws.api... 用于区域 SaaS
api_key = os.environ.get("LANGSMITH_API_KEY")

if not api_key:
    raise ValueError("LANGSMITH_API_KEY environment variable is not set")

# 创建父运行
parent_run = construct_run(
    name="Parent Run",
    run_type="chain",
    inputs={"main_question": "Tell me about France"},
)

# 创建子运行,链接到父运行
child_run = construct_run(
    name="Child Run",
    run_type="llm",
    inputs={"question": "What is the capital of France?"},
    parent_dotted_order=parent_run["dotted_order"],
)

# 首先,发送运行以创建它们
posts = [parent_run, child_run]
batch_ingest_runs(api_url, api_key, posts=posts)

# 然后,使用结束时间和任何输出更新运行
child_run_update = {
    **child_run,
    "end_time": datetime.now(timezone.utc).isoformat(),
    "outputs": {"answer": "Paris is the capital of France."},
}

parent_run_update = {
    **parent_run,
    "end_time": datetime.now(timezone.utc).isoformat(),
    "outputs": {"summary": "Discussion about France, including its capital."},
}

patches = [parent_run_update, child_run_update]
batch_ingest_runs(api_url, api_key, patches=patches)

# 注意:此示例需要 `requests` 和 `requests_toolbelt` 库。
# 您可以使用 pip 安装它们:
# pip install requests requests_toolbelt

相关内容