Skip to main content
在用户最有可能响应时,让 AI 智能体主动与用户互动。

概述

Valthera 是一个开源框架,使 LLM 智能体能够以更有意义的方式与用户互动。它基于 BJ Fogg 的行为模型(B=MAT),并利用来自多个数据源(如 HubSpot、PostHog 和 Snowflake)的数据,在触发行动之前评估用户的动机能力 在本指南中,您将学习:
  • 核心概念:各组件概述(数据聚合器、评分器、推理引擎和触发生成器)。
  • 系统架构:数据如何在系统中流转以及如何做出决策。
  • 自定义:如何扩展连接器、评分指标和决策规则以满足您的需求。
让我们开始吧!

设置

本节涵盖依赖项的安装以及为 Valthera 设置自定义数据连接器。
pip install openai langchain langchain_openai valthera langchain_valthera langgraph
from typing import Any, Dict, List

from valthera.connectors.base import BaseConnector


class MockHubSpotConnector(BaseConnector):
    """
    模拟从 HubSpot 获取数据。提供潜在客户评分、
    生命周期阶段和营销指标等信息。
    """

    def get_user_data(self, user_id: str) -> Dict[str, Any]:
        """
        为指定用户获取模拟 HubSpot 数据。

        Args:
            user_id: 用户的唯一标识符

        Returns:
            包含 HubSpot 用户数据的字典
        """
        return {
            "hubspot_contact_id": "999-ZZZ",
            "lifecycle_stage": "opportunity",
            "lead_status": "engaged",
            "hubspot_lead_score": 100,
            "company_name": "MaxMotivation Corp.",
            "last_contacted_date": "2023-09-20",
            "hubspot_marketing_emails_opened": 20,
            "marketing_emails_clicked": 10,
        }


class MockPostHogConnector(BaseConnector):
    """
    模拟从 PostHog 获取数据。提供会话数据和参与事件。
    """

    def get_user_data(self, user_id: str) -> Dict[str, Any]:
        """
        为指定用户获取模拟 PostHog 数据。

        Args:
            user_id: 用户的唯一标识符

        Returns:
            包含 PostHog 用户数据的字典
        """
        return {
            "distinct_ids": [user_id, f"email_{user_id}"],
            "last_event_timestamp": "2023-09-20T12:34:56Z",
            "feature_flags": ["beta_dashboard", "early_access"],
            "posthog_session_count": 30,
            "avg_session_duration_sec": 400,
            "recent_event_types": ["pageview", "button_click", "premium_feature_used"],
            "posthog_events_count_past_30days": 80,
            "posthog_onboarding_steps_completed": 5,
        }


class MockSnowflakeConnector(BaseConnector):
    """
    模拟从 Snowflake 获取额外的用户画像数据。
    """

    def get_user_data(self, user_id: str) -> Dict[str, Any]:
        """
        为指定用户获取模拟 Snowflake 数据。

        Args:
            user_id: 用户的唯一标识符

        Returns:
            包含 Snowflake 用户数据的字典
        """
        return {
            "user_id": user_id,
            "email": f"{user_id}@example.com",
            "subscription_status": "paid",
            "plan_tier": "premium",
            "account_creation_date": "2023-01-01",
            "preferred_language": "en",
            "last_login_datetime": "2023-09-20T12:00:00Z",
            "behavior_complexity": 3,
        }

实例化

本节实例化核心组件。首先,创建数据聚合器以合并来自自定义连接器的数据。然后,配置动机和能力的评分指标。
from valthera.aggregator import DataAggregator

# 配置常量
LEAD_SCORE_MAX = 100
EVENTS_COUNT_MAX = 50
EMAILS_OPENED_FACTOR = 10.0
SESSION_COUNT_FACTOR_1 = 5.0
ONBOARDING_STEPS_FACTOR = 5.0
SESSION_COUNT_FACTOR_2 = 10.0
BEHAVIOR_COMPLEXITY_MAX = 5.0

# 初始化数据聚合器
data_aggregator = DataAggregator(
    connectors={
        "hubspot": MockHubSpotConnector(),
        "posthog": MockPostHogConnector(),
        "snowflake": MockSnowflakeConnector(),
    }
)

# 现在可以调用 data_aggregator.get_user_context(user_id) 来获取统一的用户数据
from typing import Callable, Union

from valthera.scorer import ValtheraScorer


# 定义带有适当类型注解的转换函数
def transform_lead_score(x: Union[int, float]) -> float:
    """将潜在客户评分转换为 0 到 1 之间的值。"""
    return min(x, LEAD_SCORE_MAX) / LEAD_SCORE_MAX


def transform_events_count(x: Union[int, float]) -> float:
    """将事件数量转换为 0 到 1 之间的值。"""
    return min(x, EVENTS_COUNT_MAX) / EVENTS_COUNT_MAX


def transform_emails_opened(x: Union[int, float]) -> float:
    """将邮件打开数转换为 0 到 1 之间的值。"""
    return min(x / EMAILS_OPENED_FACTOR, 1.0)


def transform_session_count_1(x: Union[int, float]) -> float:
    """将动机的会话数转换为 0 到 1 之间的值。"""
    return min(x / SESSION_COUNT_FACTOR_1, 1.0)


def transform_onboarding_steps(x: Union[int, float]) -> float:
    """将引导步骤数转换为 0 到 1 之间的值。"""
    return min(x / ONBOARDING_STEPS_FACTOR, 1.0)


def transform_session_count_2(x: Union[int, float]) -> float:
    """将能力的会话数转换为 0 到 1 之间的值。"""
    return min(x / SESSION_COUNT_FACTOR_2, 1.0)


def transform_behavior_complexity(x: Union[int, float]) -> float:
    """将行为复杂度转换为 0 到 1 之间的值。"""
    return 1 - (min(x, BEHAVIOR_COMPLEXITY_MAX) / BEHAVIOR_COMPLEXITY_MAX)


# 用户动机评分配置
motivation_config = [
    {"key": "hubspot_lead_score", "weight": 0.30, "transform": transform_lead_score},
    {
        "key": "posthog_events_count_past_30days",
        "weight": 0.30,
        "transform": transform_events_count,
    },
    {
        "key": "hubspot_marketing_emails_opened",
        "weight": 0.20,
        "transform": transform_emails_opened,
    },
    {
        "key": "posthog_session_count",
        "weight": 0.20,
        "transform": transform_session_count_1,
    },
]

# 用户能力评分配置
ability_config = [
    {
        "key": "posthog_onboarding_steps_completed",
        "weight": 0.30,
        "transform": transform_onboarding_steps,
    },
    {
        "key": "posthog_session_count",
        "weight": 0.30,
        "transform": transform_session_count_2,
    },
    {
        "key": "behavior_complexity",
        "weight": 0.40,
        "transform": transform_behavior_complexity,
    },
]

# 实例化评分器
scorer = ValtheraScorer(motivation_config, ability_config)

调用

接下来,我们设置推理引擎和触发生成器,然后实例化 Valthera 工具将所有组件整合在一起。最后,执行智能体工作流以处理输入消息。
import os

from langchain_openai import ChatOpenAI
from valthera.reasoning_engine import ReasoningEngine

# 将阈值定义为常量
SCORE_THRESHOLD = 0.75


# 安全获取 API 密钥的函数
def get_openai_api_key() -> str:
    """获取 OpenAI API 密钥,含错误处理。"""
    api_key = os.environ.get("OPENAI_API_KEY")
    if not api_key:
        raise ValueError("OPENAI_API_KEY not found in environment variables")
    return api_key


# 使用常量的决策规则
decision_rules = [
    {
        "condition": f"motivation >= {SCORE_THRESHOLD} and ability >= {SCORE_THRESHOLD}",
        "action": "trigger",
        "description": "两项评分均足够高。",
    },
    {
        "condition": f"motivation < {SCORE_THRESHOLD}",
        "action": "improve_motivation",
        "description": "用户动机偏低。",
    },
    {
        "condition": f"ability < {SCORE_THRESHOLD}",
        "action": "improve_ability",
        "description": "用户能力偏低。",
    },
    {
        "condition": "otherwise",
        "action": "defer",
        "description": "当前无需采取行动。",
    },
]

try:
    api_key = get_openai_api_key()
    reasoning_engine = ReasoningEngine(
        llm=ChatOpenAI(
            model_name="gpt-4-turbo", temperature=0.0, openai_api_key=api_key
        ),
        decision_rules=decision_rules,
    )
except ValueError as e:
    print(f"Error initializing reasoning engine: {e}")
from valthera.trigger_generator import TriggerGenerator

try:
    api_key = get_openai_api_key()  # 复用函数以保持一致性
    trigger_generator = TriggerGenerator(
        llm=ChatOpenAI(
            model_name="gpt-4-turbo", temperature=0.7, openai_api_key=api_key
        )
    )
except ValueError as e:
    print(f"Error initializing trigger generator: {e}")
from langchain_valthera.tools import ValtheraTool
from langchain.agents import create_agent


try:
    api_key = get_openai_api_key()

    # 初始化 Valthera 工具
    valthera_tool = ValtheraTool(
        data_aggregator=data_aggregator,
        motivation_config=motivation_config,
        ability_config=ability_config,
        reasoning_engine=reasoning_engine,
        trigger_generator=trigger_generator,
    )

    # 使用 LLM 创建智能体
    model = ChatOpenAI(model_name="gpt-4-turbo", temperature=0.0, openai_api_key=api_key)
    tools = [valthera_tool]
    graph = create_agent(model, tools=tools)

    # 定义用于测试的输入消息
    inputs = {
        "messages": [("user", "Evaluate behavior for user_12345: Finish Onboarding")]
    }

    # 处理输入并显示响应
    print("Running Valthera agent workflow...")
    for response in graph.stream(inputs, stream_mode="values"):
        print(response)

except Exception as e:
    print(f"Error running Valthera workflow: {e}")

链式调用

该集成目前不支持链式操作。未来版本可能会包含链式支持。

API 参考

以下是 Valthera 集成提供的主要 API 概述:
  • 数据聚合器:使用 data_aggregator.get_user_context(user_id) 获取聚合后的用户数据。
  • 评分器ValtheraScorer 根据提供的配置计算动机和能力评分。
  • 推理引擎ReasoningEngine 评估决策规则以确定适当的行动(触发、提升动机、提升能力或延迟)。
  • 触发生成器:使用 LLM 生成个性化触发消息。
  • Valthera 工具:整合所有组件以处理输入并执行智能体工作流。
有关详细用法,请参阅源代码中的内联文档。