Skip to main content
Flyte 是一个开源编排器,用于构建生产级数据和 ML 管道。 它以 Kubernetes 为底层平台,为可扩展性和可复现性而设计。
本 notebook 的目的是演示将 FlyteCallback 集成到 Flyte 任务中,让你能够有效地监控和跟踪 LangChain 实验。

安装与设置

  • 运行命令 pip install flytekit 安装 Flytekit 库。
  • 运行命令 pip install flytekitplugins-envd 安装 Flytekit-Envd 插件。
  • 运行命令 pip install langchain 安装 LangChain。
  • 在你的系统上安装 Docker

Flyte 任务

Flyte 任务 是 Flyte 的基础构建块。 要执行 LangChain 实验,你需要编写定义具体步骤和操作的 Flyte 任务。 注意:入门指南 提供了在本地安装 Flyte 和运行初始 Flyte 管道的详细分步说明。 首先,导入支持 LangChain 实验所需的依赖项。
import os

from flytekit import ImageSpec, task
from langchain.agents import create_agent, load_tools
from langchain.callbacks import FlyteCallbackHandler
from langchain_classic.chains import LLMChain
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain.messages import HumanMessage
设置使用 OpenAI API 和 Serp API 所需的环境变量:
# 设置 OpenAI API 密钥
os.environ["OPENAI_API_KEY"] = "<your_openai_api_key>"

# 设置 Serp API 密钥
os.environ["SERPAPI_API_KEY"] = "<your_serp_api_key>"
<your_openai_api_key><your_serp_api_key> 替换为从 OpenAI 和 Serp API 获取的相应 API 密钥。 为确保管道的可复现性,Flyte 任务是容器化的。 每个 Flyte 任务必须关联一个镜像,该镜像可以在整个 Flyte 工作流 中共享,也可以为每个任务单独提供。 为了简化为每个 Flyte 任务提供所需依赖项的过程,你可以初始化一个 ImageSpec 对象。 这种方法会自动触发 Docker 构建,免去了用户手动创建 Docker 镜像的需要。
custom_image = ImageSpec(
    name="langchain-flyte",
    packages=[
        "langchain",
        "openai",
        "spacy",
        "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.5.0/en_core_web_sm-3.5.0.tar.gz",
        "textstat",
        "google-search-results",
    ],
    registry="<your-registry>",
)
你可以灵活地将 Docker 镜像推送到你偏好的镜像仓库。 Docker HubGitHub Container Registry (GHCR) 是不错的起始选择。 选择镜像仓库后,你可以继续创建将 LangChain 指标记录到 Flyte Deck 的 Flyte 任务。 以下示例展示了与 OpenAI LLM、链和带工具代理相关的任务:

LLM

@task(disable_deck=False, container_image=custom_image)
def langchain_llm() -> str:
    llm = ChatOpenAI(
        model_name="gpt-3.5-turbo",
        temperature=0.2,
        callbacks=[FlyteCallbackHandler()],
    )
    return llm.invoke([HumanMessage(content="Tell me a joke")]).content

@task(disable_deck=False, container_image=custom_image)
def langchain_chain() -> list[dict[str, str]]:
    template = """You are a playwright. Given the title of play, it is your job to write a synopsis for that title.
Title: {title}
Playwright: This is a synopsis for the above play:"""
    llm = ChatOpenAI(
        model_name="gpt-3.5-turbo",
        temperature=0,
        callbacks=[FlyteCallbackHandler()],
    )
    prompt_template = PromptTemplate(input_variables=["title"], template=template)
    synopsis_chain = LLMChain(
        llm=llm, prompt=prompt_template, callbacks=[FlyteCallbackHandler()]
    )
    test_prompts = [
        {
            "title": "documentary about good video games that push the boundary of game design"
        },
    ]
    return synopsis_chain.apply(test_prompts)

代理

@task(disable_deck=False, container_image=custom_image)
def langchain_agent() -> str:
    llm = ChatOpenAI(
        model_name="gpt-3.5-turbo",
        temperature=0,
        callbacks=[FlyteCallbackHandler()],
    )

    tools = load_tools(
        ["serpapi", "llm-math"],
        llm=llm,
        callbacks=[FlyteCallbackHandler()],
    )

    agent = create_agent(
        model=llm,
        tools=tools,
        callbacks=[FlyteCallbackHandler()],
        verbose=True,
    )

    return agent.invoke(
        "Who is Leonardo DiCaprio's girlfriend? Could you calculate her current age and raise it to the power of 0.43?"
    )
这些任务是在 Flyte 中运行 LangChain 实验的起点。

在 Kubernetes 上执行 Flyte 任务

要在配置好的 Flyte 后端上执行 Flyte 任务,请使用以下命令:
pyflyte run --image <your-image> langchain_flyte.py langchain_llm
此命令将启动 Flyte 后端上 langchain_llm 任务的执行。你可以以类似方式触发其余两个任务。 指标将在 Flyte UI 上显示如下: Flyte Deck 截图,显示 LangChain 指标和依赖树可视化。