Skip to main content
在对 LangGraph agent 进行原型开发之后,下一步通常是添加测试。本指南介绍在编写单元测试时可使用的一些有用模式。 注意:本指南针对 LangGraph 特有场景并涵盖具有自定义结构的图;如果只是入门,请查看 此部分,其中使用 LangChain 内置的 createAgent

前置条件

首先,确保安装了 vitest
$ npm install -D vitest

开始

由于许多 LangGraph agent 依赖状态,一个有用的模式是在每个测试之前创建图,然后在测试中用新的 checkpointer 实例编译它。 下面示例展示了如何对一个简单的线性图(依次经过 node1node2)进行测试,每个节点更新单个状态键 my_key
import { test, expect } from 'vitest';
import {
  StateGraph,
  StateSchema,
  START,
  END,
  MemorySaver,
} from '@langchain/langgraph';
import * as z from "zod";

const State = new StateSchema({
  my_key: z.string(),
});

const createGraph = () => {
  return new StateGraph(State)
    .addNode('node1', (state) => ({ my_key: 'hello from node1' }))
    .addNode('node2', (state) => ({ my_key: 'hello from node2' }))
    .addEdge(START, 'node1')
    .addEdge('node1', 'node2')
    .addEdge('node2', END);
};

test('basic agent execution', async () => {
  const uncompiledGraph = createGraph();
  const checkpointer = new MemorySaver();
  const compiledGraph = uncompiledGraph.compile({ checkpointer });
  const result = await compiledGraph.invoke(
    { my_key: 'initial_value' },
    { configurable: { thread_id: '1' } }
  );
  expect(result.my_key).toBe('hello from node2');
});

测试单个节点和边

已编译的 LangGraph agent 在 graph.nodes 中暴露每个单独节点的引用。你可以利用这一点来测试 agent 中的单个节点。注意,这种方式会绕过在编译图时传入的任何 checkpointer:
import { test, expect } from 'vitest';
import {
  StateGraph,
  START,
  END,
  MemorySaver,
  StateSchema,
} from '@langchain/langgraph';
import * as z from "zod";

const State = new StateSchema({
  my_key: z.string(),
});

const createGraph = () => {
  return new StateGraph(State)
    .addNode('node1', (state) => ({ my_key: 'hello from node1' }))
    .addNode('node2', (state) => ({ my_key: 'hello from node2' }))
    .addEdge(START, 'node1')
    .addEdge('node1', 'node2')
    .addEdge('node2', END);
};

test('individual node execution', async () => {
  const uncompiledGraph = createGraph();
  // 在此示例中将被忽略
  const checkpointer = new MemorySaver();
  const compiledGraph = uncompiledGraph.compile({ checkpointer });
  // 仅调用 node1
  const result = await compiledGraph.nodes['node1'].invoke(
    { my_key: 'initial_value' },
  );
  expect(result.my_key).toBe('hello from node1');
});

局部执行

对于由较大图构成的 agent,可能希望测试工作流中的局部执行路径而不是端到端的完整流程。在某些情况下,将这些部分重构为子图(subgraphs)是语义上合理的,这样可以像平常一样独立调用它们。 如果不希望更改整体图结构,也可以使用 LangGraph 的持久化机制来模拟这样一种状态:agent 暂停在目标段之前,并在目标段结束时再次暂停。步骤如下:
  1. 使用 checkpointer 编译 agent(测试中可使用内存 checkpointer MemorySaver)。
  2. 使用 asNode 参数调用 agent 的 update_state 方法,将其设置为希望开始测试的节点之前的节点名称。
  3. 使用与 updateState 时相同的 thread_id 并传入 interruptBefore 参数(设置为希望停止的节点名称)来调用 agent。
下面示例仅执行线性图中的第二和第三个节点:
import { test, expect } from 'vitest';
import {
  StateGraph,
  StateSchema,
  START,
  END,
  MemorySaver,
} from '@langchain/langgraph';
import * as z from "zod";

const State = new StateSchema({
  my_key: z.string(),
});

const createGraph = () => {
  return new StateGraph(State)
    .addNode('node1', (state) => ({ my_key: 'hello from node1' }))
    .addNode('node2', (state) => ({ my_key: 'hello from node2' }))
    .addNode('node3', (state) => ({ my_key: 'hello from node3' }))
    .addNode('node4', (state) => ({ my_key: 'hello from node4' }))
    .addEdge(START, 'node1')
    .addEdge('node1', 'node2')
    .addEdge('node2', 'node3')
    .addEdge('node3', 'node4')
    .addEdge('node4', END);
};

test('partial execution from node2 to node3', async () => {
  const uncompiledGraph = createGraph();
  const checkpointer = new MemorySaver();
  const compiledGraph = uncompiledGraph.compile({ checkpointer });
  await compiledGraph.updateState(
    { configurable: { thread_id: '1' } },
    // 传入 node2 的状态 —— 模拟 node1 结束时的状态
    { my_key: 'initial_value' },
    // 将更新保存为来自 node1 的结果
    'node1',
  );
  const result = await compiledGraph.invoke(
    // 通过传入 null 来恢复执行
    null,
    {
      configurable: { thread_id: '1' },
      // 在 node3 之后中断,避免 node4 运行
      interruptAfter: ['node3']
    },
  );
  expect(result.my_key).toBe('hello from node3');
});