Skip to main content
推荐的查询 运行(LangSmith 追踪中的 span 数据)的方法是使用 SDK 中的 list_runs 方法或 API 中的 /runs/query 端点。LangSmith 以 运行(span)数据格式 中指定的简单格式存储追踪。 本页涵盖:
如果您需要导出大量追踪,我们建议您使用批量数据导出功能,因为它能更好地处理大量数据,并支持自动重试和跨分区并行处理。

使用筛选参数

对于简单查询,您不必依赖我们的查询语法。您可以使用 筛选参数参考 中指定的筛选参数。
前提条件在运行以下代码片段之前,请初始化客户端。
from langsmith import Client

client = Client()
以下是一些使用关键字参数列出运行的示例:

列出项目中的所有运行

project_runs = client.list_runs(project_name="<your_project>")

列出过去 24 小时内的 LLM 和聊天运行

todays_llm_runs = client.list_runs(
    project_name="<your_project>",
    start_time=datetime.now() - timedelta(days=1),
    run_type="llm",
)

列出项目中的根运行

根运行是没有父级的运行。这些运行的 is_root 值为 True。您可以使用此值来筛选根运行。
root_runs = client.list_runs(
    project_name="<your_project>",
    is_root=True
)

列出没有错误的运行

correct_runs = client.list_runs(project_name="<your_project>", error=False)

按运行 ID 列出运行

忽略其他参数如果您按上述方式提供运行 ID 列表,它将忽略所有其他筛选参数(如 project_namerun_type 等),并直接返回与给定 ID 匹配的运行。
如果您有一个运行 ID 列表,您可以直接列出它们:
run_ids = ['a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836','9398e6be-964f-4aa4-8ae9-ad78cd4b7074']
selected_runs = client.list_runs(id=run_ids)

通过 ID 获取单个运行

要通过 ID 获取单个运行(追踪),请使用 read_run 方法。当您有一个特定的追踪 ID(例如,来自 LangSmith 分享链接,如 https://smith.langchain.com/public/<trace-id>/r)并希望检索其完整数据时,此方法很有用。
run_id = "a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836"
run = client.read_run(run_id)

# 访问运行数据
print(run.inputs)
print(run.outputs)
print(run.name)
使用 LangGraph 在本地重放追踪如果您正在使用带有检查点的 LangGraph,您可以从 LangSmith 获取追踪并在本地重放以进行调试。有关从检查点恢复执行的详细信息,请参阅 LangGraph 的时间旅行和重放文档

使用筛选查询语言

对于更复杂的查询,您可以使用 筛选查询语言参考 中描述的查询语言。

列出对话线程中的所有根运行

这是获取对话线程中运行的方法。有关设置线程的更多信息,请参阅我们的设置线程操作指南。 线程通过设置共享的线程 ID 进行分组。LangSmith UI 允许您使用以下三个元数据键中的任何一个:session_idconversation_idthread_id。会话 ID 也称为追踪项目 ID。以下查询匹配其中任何一个。
group_key = "<your_thread_id>"
filter_string = f'and(in(metadata_key, ["session_id","conversation_id","thread_id"]), eq(metadata_value, "{group_key}"))'
thread_runs = client.list_runs(
    project_name="<your_project>",
    filter=filter_string,
    is_root=True
)

列出所有名为 “extractor” 且其追踪根节点被分配了 “user_score” 反馈分数为 1 的运行

client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "extractor")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))'
)

列出具有 “star_rating” 键且分数大于 4 的运行

client.list_runs(
    project_name="<your_project>",
    filter='and(eq(feedback_key, "star_rating"), gt(feedback_score, 4))'
)

列出完成时间超过 5 秒的运行

client.list_runs(project_name="<your_project>", filter='gt(latency, "5s")')

列出所有 “error” 不等于 null 的运行

client.list_runs(project_name="<your_project>", filter='neq(error, null)')

列出所有 start_time 大于特定时间戳的运行

client.list_runs(project_name="<your_project>", filter='gt(start_time, "2023-07-15T12:34:56Z")')

列出所有包含字符串 “substring” 的运行

client.list_runs(project_name="<your_project>", filter='search("substring")')

列出所有标记为 git 哈希值 “2aa1cf4” 的运行

client.list_runs(project_name="<your_project>", filter='has(tags, "2aa1cf4")')

列出所有在特定时间戳之后开始,并且 “error” 不等于 null 或 “Correctness” 反馈分数等于 0 的运行

client.list_runs(
  project_name="<your_project>",
  filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))'
)

复杂查询:列出所有标签包含 “experimental” 或 “beta” 且延迟大于 2 秒的运行

client.list_runs(
  project_name="<your_project>",
  filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))'
)

通过全文搜索追踪树

您可以使用不带任何特定字段的 search() 函数对运行中的所有字符串字段进行全文搜索。这使您可以快速找到与搜索词匹配的追踪。
client.list_runs(
  project_name="<your_project>",
  filter='search("image classification")'
)

检查元数据是否存在

如果您想检查元数据是否存在,可以使用 eq 运算符,也可以选择使用 and 语句按值匹配。如果您想记录有关运行的更多结构化信息,这很有用。
to_search = {
    "user_id": ""
}

# 检查任何具有 "user_id" 元数据键的运行
client.list_runs(
  project_name="default",
  filter="eq(metadata_key, 'user_id')"
)
# 检查 user_id=4070f233-f61e-44eb-bff1-da3c163895a3 的运行
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'user_id'), eq(metadata_value, '4070f233-f61e-44eb-bff1-da3c163895a3'))"
)

检查元数据中的环境详细信息

一个常见的模式是通过元数据向追踪添加环境信息。如果您想筛选包含环境元数据的运行,可以使用与上述相同的模式:
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'environment'), eq(metadata_value, 'production'))"
)

检查元数据中的会话 ID

另一种关联同一对话中追踪的常见方法是使用共享的会话 ID。如果您想基于会话 ID 以这种方式筛选运行,可以在元数据中搜索该 ID。
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

键值对的否定筛选

您可以对元数据、输入和输出键值对使用否定筛选,以从结果中排除特定运行。以下是一些元数据键值对的示例,但相同的逻辑也适用于输入和输出键值对。
# 查找所有元数据不包含 "conversation_id" 键的运行
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'))"
)

# 查找所有元数据中 conversation_id 不是 "a1b2c3d4-e5f6-7890" 的运行
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# 查找所有没有 "conversation_id" 元数据键且 "a1b2c3d4-e5f6-7890" 值不存在的运行
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# 查找所有 conversation_id 元数据键不存在但 "a1b2c3d4-e5f6-7890" 值存在的运行
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

组合多个筛选器

如果您想组合多个条件来细化搜索,可以使用 and 运算符以及其他筛选函数。以下是如何搜索名为 “ChatOpenAI” 且其元数据中具有特定 conversation_id 的运行:
client.list_runs(
  project_name="default",
  filter="and(eq(name, 'ChatOpenAI'), eq(metadata_key, 'conversation_id'), eq(metadata_value, '69b12c91-b1e2-46ce-91de-794c077e8151'))"
)

树筛选器

列出所有名为 “RetrieveDocs” 的运行,其根运行具有 “user_score” 反馈为 1,并且完整追踪中的任何运行名为 “ExpandQuery”。 如果您想提取特定运行,但条件是追踪中达到了各种状态或步骤,这种类型的查询很有用。
client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "RetrieveDocs")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
    tree_filter='eq(name, "ExpandQuery")'
)

高级:导出包含子工具使用情况的扁平化追踪视图

以下 Python 示例演示了如何导出追踪的扁平化视图,包括每个追踪中代理使用的工具信息(来自嵌套运行)。 这可用于分析代理在多个追踪中的行为。 此示例查询指定天数内的所有工具运行,并按其父(根)运行 ID 进行分组。然后获取每个根运行的相关信息,例如运行名称、输入、输出,并将这些信息与子运行信息组合。 为了优化查询,该示例:
  1. 在查询工具运行时仅选择必要的字段以减少查询时间。
  2. 在并发处理工具运行的同时批量获取根运行。
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "my-project"
num_days = 30

# 列出所有工具运行
tool_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="tool",
    # 我们不需要获取输入、输出和其他可能增加查询时间的值
    select=["trace_id", "name", "run_type"],
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

tool_runs_by_parent = defaultdict(lambda: defaultdict(set))
# 不要超过速率限制
with ThreadPoolExecutor(max_workers=2) as executor:
    # 按父运行 ID 对工具运行进行分组
    for run in tqdm(tool_runs):
        # 收集给定追踪中调用的所有工具
        tool_runs_by_parent[run.trace_id]["tools_involved"].add(run.name)
        # 也许向服务器发送一批父运行 ID
        # 这让我们可以批量查询根运行
        # 同时仍在处理工具运行
        if len(tool_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(tool_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=["name", "inputs", "outputs", "run_type"],
                    )
                )
    if this_batch := list(tool_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = tool_runs_by_parent[root_run.id]
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                "run_type": root_run.run_type,
                "inputs": root_run.inputs,
                "outputs": root_run.outputs,
                "tools_involved": list(root_data["tools_involved"]),
            }
        )

# (可选):转换为 pandas DataFrame
import pandas as pd

df = pd.DataFrame(data)
df.head()

高级:导出具有反馈的追踪的检索器 IO

如果您想根据检索器行为微调嵌入或诊断端到端系统性能问题,此查询很有用。 以下 Python 示例演示了如何导出具有特定反馈分数的追踪中的检索器输入和输出。
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

import pandas as pd
from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "your-project-name"
num_days = 1

# 列出所有工具运行
retriever_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="retriever",
    # 这次我们确实想获取输入和输出,因为它们
    # 可能会被查询扩展步骤调整。
    select=["trace_id", "name", "run_type", "inputs", "outputs"],
    trace_filter='eq(feedback_key, "user_score")',
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

retriever_runs_by_parent = defaultdict(lambda: defaultdict(list))
# 不要超过速率限制
with ThreadPoolExecutor(max_workers=2) as executor:
    # 按父运行 ID 对检索器运行进行分组
    for run in tqdm(retriever_runs):
        # 收集给定追踪中调用的所有检索器调用
        for k, v in run.inputs.items():
            retriever_runs_by_parent[run.trace_id][f"retriever.inputs.{k}"].append(v)
        for k, v in (run.outputs or {}).items():
            # 扩展文档
            retriever_runs_by_parent[run.trace_id][f"retriever.outputs.{k}"].extend(v)
        # 也许向服务器发送一批父运行 ID
        # 这让我们可以批量查询根运行
        # 同时仍在处理检索器运行
        if len(retriever_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(retriever_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=[
                            "name",
                            "inputs",
                            "outputs",
                            "run_type",
                            "feedback_stats",
                        ],
                    )
                )
    if this_batch := list(retriever_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = retriever_runs_by_parent[root_run.id]
        feedback = {
            f"feedback.{k}": v.get("avg")
            for k, v in (root_run.feedback_stats or {}).items()
        }
        inputs = {f"inputs.{k}": v for k, v in root_run.inputs.items()}
        outputs = {f"outputs.{k}": v for k, v in (root_run.outputs or {}).items()}
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                **inputs,
                **outputs,
                **feedback,
                **root_data,
            }
        )

# (可选):转换为 pandas DataFrame
import pandas as pd
df = pd.DataFrame(data)
df.head()

速率限制

POST /runs/query 端点(Python 中的 list_runs,JavaScript 中的 listRuns)具有每个租户的速率限制,根据查询参数而异:
查询类型限制时间窗口
短时间窗口(≤ 7 天)10 个请求10 秒
大时间窗口(> 7 天)3 个请求10 秒
全文搜索,短时间窗口(≤ 7 天)3 个请求10 秒
全文搜索,大时间窗口(> 7 天)1 个请求10 秒
选择 child_run_ids,短时间窗口(≤ 7 天)3 个请求10 秒
选择 child_run_ids,大时间窗口(> 7 天)1 个请求10 秒
时间窗口由 end_time - start_time 确定。如果未提供 end_time,LangSmith 将使用当前时间。没有 start_time 的查询被视为大时间窗口查询。

最佳实践

为避免达到速率限制并减少查询时间,特别是对于具有大输入/输出的运行:
  • 设置 start_time:省略它会触发大时间窗口速率限制层(每 10 秒 3 个请求而不是 10 个)。尽可能使用 7 天或更短的时间窗口。
  • 使用 select:默认情况下返回所有字段。仅指定您需要的字段(例如,select=["inputs", "outputs"])可以显著减少响应大小和查询时间,特别是对于具有大输入/输出的运行。
  • 设置 limit:如果您不需要分页浏览所有内容,请限制结果数量。
  • 避免全文搜索filter='search("...")' 具有最严格的速率限制;尽可能使用结构化筛选器(例如,eq()has())。
  • 避免选择 child_run_ids:这也会触发更严格的速率限制层。
当您超过这些限制时,API 会返回 429 Too Many Requests 响应。有关一般速率限制信息,请参阅管理概览