POST /runs 和 PATCH /runs 端点进行基本追踪,以及使用 POST /runs/multipart 进行批量摄取以获得更高吞吐量。
有关端点和请求/响应模式的完整列表,请参阅 API 参考文档。
我们强烈建议使用 Python 或 TypeScript SDK 将追踪数据发送到 LangSmith,而不是直接使用 REST API。SDK 包含批处理和后台发送优化,可防止追踪影响应用程序的性能。如果无法使用 SDK,请注意同步发送追踪数据可能会影响应用程序性能。
我们建议为运行 ID 使用 UUID v7。UUIDv7 嵌入了时间戳,这可以保留追踪中运行的正确时间顺序。使用 LangSmith SDK 中的
uuid7() 生成它们,或参阅指定自定义运行 ID了解更多详情。基本追踪
记录运行的最简单方法是通过POST /runs 和 PATCH /runs 端点。此方法需要最少的信息来建立追踪层次结构。
使用 LangSmith REST API 时,请在请求头中提供您的 API 密钥,格式为
"x-api-key"。如果您的 API 密钥链接到多个工作区,请在请求头中使用 "x-tenant-id" 指定工作区。在此方法中,您无需设置 dotted_order 或 trace_id 字段——系统会自动生成它们。虽然更简单,但比批量摄取更慢,且受更低的速率限制。parent_run_id 以将其附加到其父运行:
import openai
import os
import requests
from datetime import datetime, timezone
from langsmith import uuid7
# 在请求头中发送您的 API 密钥
headers = {
"x-api-key": os.environ["LANGSMITH_API_KEY"],
"x-tenant-id": os.environ["LANGSMITH_WORKSPACE_ID"]
}
def post_run(run_id, name, run_type, inputs, parent_id=None):
"""向 API 发送新运行的函数。"""
data = {
"id": run_id.hex,
"name": name,
"run_type": run_type,
"inputs": inputs,
"start_time": datetime.utcnow().isoformat(),
# "session_name": "project-name", # 要追踪到的项目名称
# "session_id": "project-id", # 要追踪到的项目 ID。指定 session_name 或 session_id 之一
}
if parent_id:
data["parent_run_id"] = parent_id.hex
requests.post(
"https://api.smith.langchain.com/runs", # 对于自托管、欧盟 (GCP) (`eu.api...`) 或美国 (AWS) (`aws.api...`) 请更新
json=data,
headers=headers
)
def patch_run(run_id, outputs):
"""使用输出更新运行的函数。"""
requests.patch(
f"https://api.smith.langchain.com/runs/{run_id}",
json={
"outputs": outputs,
"end_time": datetime.now(timezone.utc).isoformat(),
},
headers=headers,
)
# 这可以是您应用程序的用户输入
question = "Can you summarize this morning's meetings?"
# 这可以在检索步骤中检索到
context = "During this morning's meeting, we solved all world conflict."
messages = [
{"role": "system", "content": "You are a helpful assistant. Please respond to the user's request only based on the given context."},
{"role": "user", "content": f"Question: {question}\nContext: {context}"}
]
# 创建父运行
parent_run_id = uuid7()
post_run(parent_run_id, "Chat Pipeline", "chain", {"question": question})
# 创建子运行
child_run_id = uuid7()
post_run(child_run_id, "OpenAI Call", "llm", {"messages": messages}, parent_run_id)
# 生成补全
client = openai.Client()
chat_completion = client.chat.completions.create(
model="gpt-5.4-mini",
messages=messages
)
# 结束运行
patch_run(child_run_id, chat_completion.dict())
patch_run(parent_run_id, {"answer": chat_completion.choices[0].message.content})
批量摄取
为了更快的摄取和更高的速率限制,请使用POST /runs/multipart 端点。这需要 requests-toolbelt 和 uuid-utils 包。
与基本追踪不同,此端点要求您自行计算并设置 dotted_order 和 trace_id。dotted_order 编码每个运行的时间戳和 UUID,父条目和子条目用点连接(例如,20240101T000000Z<parent-uuid>.20240101T000001Z<child-uuid>),告诉 LangSmith 运行之间的关系以及它们发生的顺序。trace_id 是根运行的 UUID。
以下示例创建一个父运行和一个子运行,在单个批量请求中发送它们,然后使用它们的输出更新两者:
import json
import os
import uuid
from datetime import datetime, timezone
from typing import Dict, List
import requests
from requests_toolbelt import MultipartEncoder
from uuid_utils.compat import uuid7
def create_dotted_order(
start_time: datetime | None = None,
run_id: uuid.UUID | None = None
) -> str:
"""为运行排序和层次结构创建点序字符串。
点序用于建立运行之间的顺序和关系。
它将时间戳与唯一标识符结合,以确保正确的排序和追踪。
"""
st = start_time or datetime.now(timezone.utc)
id_ = run_id or uuid7()
return f"{st.strftime('%Y%m%dT%H%M%S%fZ')}{id_}"
def create_run_base(
name: str,
run_type: str,
inputs: dict,
start_time: datetime
) -> dict:
"""创建运行的基础结构。"""
run_id = uuid7()
return {
"id": str(run_id),
"trace_id": str(run_id),
"name": name,
"start_time": start_time.isoformat(),
"inputs": inputs,
"run_type": run_type,
}
def construct_run(
name: str,
run_type: str,
inputs: dict,
parent_dotted_order: str | None = None,
) -> dict:
"""使用给定参数构建运行字典。
此函数创建一个具有唯一 ID 和点序的运行,如果它是子运行,则建立其在追踪层次结构中的位置。
"""
start_time = datetime.now(timezone.utc)
run = create_run_base(name, run_type, inputs, start_time)
current_dotted_order = create_dotted_order(start_time, uuid.UUID(run["id"]))
if parent_dotted_order:
current_dotted_order = f"{parent_dotted_order}.{current_dotted_order}"
run["trace_id"] = parent_dotted_order.split(".")[0].split("Z")[1]
run["parent_run_id"] = parent_dotted_order.split(".")[-1].split("Z")[1]
run["dotted_order"] = current_dotted_order
return run
def serialize_run(operation: str, run_data: dict) -> List[tuple]:
"""为多部分请求序列化运行。
此函数将运行数据分离为多个部分,以便高效传输和存储。
主要运行数据和可选字段(输入、输出、事件)分别序列化。
"""
run_id = run_data.get("id", str(uuid7()))
# 分离可选字段
inputs = run_data.pop("inputs", None)
outputs = run_data.pop("outputs", None)
events = run_data.pop("events", None)
parts = []
# 序列化主要运行数据
run_data_json = json.dumps(run_data).encode("utf-8")
parts.append(
(
f"{operation}.{run_id}",
(
None,
run_data_json,
"application/json",
{"Content-Length": str(len(run_data_json))},
),
)
)
# 序列化可选字段
for key, value in [("inputs", inputs), ("outputs", outputs), ("events", events)]:
if value:
serialized_value = json.dumps(value).encode("utf-8")
parts.append(
(
f"{operation}.{run_id}.{key}",
(
None,
serialized_value,
"application/json",
{"Content-Length": str(len(serialized_value))},
),
)
)
return parts
def batch_ingest_runs(
api_url: str,
api_key: str,
posts: list[dict] | None = None,
patches: list[dict] | None = None,
) -> None:
"""在单个批量请求中摄取多个运行。
此函数处理创建新运行(posts)和更新现有运行(patches)。
与单独的 API 调用相比,它对于摄取多个运行更有效率。
"""
boundary = uuid.uuid4().hex
all_parts = []
for operation, runs in zip(("post", "patch"), (posts, patches)):
if runs:
all_parts.extend(
[part for run in runs for part in serialize_run(operation, run)]
)
encoder = MultipartEncoder(fields=all_parts, boundary=boundary)
headers = {"Content-Type": encoder.content_type, "x-api-key": api_key}
try:
response = requests.post(
f"{api_url}/runs/multipart",
data=encoder,
headers=headers
)
response.raise_for_status()
print("Successfully ingested runs.")
except requests.RequestException as e:
print(f"Error ingesting runs: {e}")
# 在生产环境中,您可能希望记录此错误或更稳健地处理它
# 配置 API URL 和密钥
# 对于生产用途,考虑使用配置文件或环境变量
api_url = "https://api.smith.langchain.com" # 欧盟 (GCP): eu.api...; 美国 (AWS): aws.api... 用于区域 SaaS
api_key = os.environ.get("LANGSMITH_API_KEY")
if not api_key:
raise ValueError("LANGSMITH_API_KEY environment variable is not set")
# 创建父运行
parent_run = construct_run(
name="Parent Run",
run_type="chain",
inputs={"main_question": "Tell me about France"},
)
# 创建子运行,链接到父运行
child_run = construct_run(
name="Child Run",
run_type="llm",
inputs={"question": "What is the capital of France?"},
parent_dotted_order=parent_run["dotted_order"],
)
# 首先,发送运行以创建它们
posts = [parent_run, child_run]
batch_ingest_runs(api_url, api_key, posts=posts)
# 然后,使用结束时间和任何输出更新运行
child_run_update = {
**child_run,
"end_time": datetime.now(timezone.utc).isoformat(),
"outputs": {"answer": "Paris is the capital of France."},
}
parent_run_update = {
**parent_run,
"end_time": datetime.now(timezone.utc).isoformat(),
"outputs": {"summary": "Discussion about France, including its capital."},
}
patches = [parent_run_update, child_run_update]
batch_ingest_runs(api_url, api_key, patches=patches)
# 注意:此示例需要 `requests` 和 `requests_toolbelt` 库。
# 您可以使用 pip 安装它们:
# pip install requests requests_toolbelt
相关内容
将这些文档通过 MCP 连接到 Claude、VSCode 等,以获取实时答案。

