❝当 Agent 在生产环境中"黑盒"运行时,你真的知道它在做什么吗?❞
随着 LLM 应用从 Demo 走向生产,「可观测性 (Observability)」 成为了不可忽视的核心能力。与传统微服务不同,Agent 系统存在独特的挑战:
本文将以 trpc-agent-go 框架为例,深入剖析 「如何为 AI Agent 构建企业级可观测体系」,包括分布式追踪、指标采集、以及与 Langfuse 平台的深度集成。
借鉴云原生可观测性的经典理论,Agent 的可观测性同样建立在三大支柱之上:
┌─────────────────────────────────────────────────────────────────────┐
│ Agent 可观测性三大支柱 │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────┐
│ Traces │ ◄── 调用链追踪
│ 分布式追踪 │ 一次 Agent 执行的完整路径
└────────┬────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────────┐ ┌───────────┐
│Metrics│ │ Logs │ │ LLM 专属 │
│ 指标 │ │ 日志 │ │ Traces │
└───────┘ └───────────┘ └───────────┘
│ │ │
│ 请求量 │ 错误详情 │ Token 消耗
│ 延迟分布 │ 调试信息 │ 模型参数
│ 错误率 │ │ 输入/输出
└─────────────┴─────────────┘
传统的 APM 工具(如 Jaeger、Prometheus)虽然能追踪 HTTP 请求,但对于 Agent 场景存在盲区:
传统 APM | Agent 专属需求 |
|---|---|
请求耗时 | Token 消耗量 |
HTTP 状态码 | LLM 输入/输出内容 |
调用链路 | 工具调用参数和结果 |
QPS | 首 Token 时间 (TTFT) |
这就是为什么我们需要 「LLM 原生的可观测方案」。
trpc-agent-go 采用 「OpenTelemetry (OTel)」 作为可观测性的基础设施,实现了"一次埋点,多处上报"的能力:
┌─────────────────────────────────────────────────────────────────────────────┐
│ 可观测性架构总览 │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 应用层调用 │
│ (Runner/Agent/Tool 等组件) │
└─────────────────┬───────────────────┘
│
│ OTel SDK 埋点
▼
┌───────────────────────────────────────────────────┐
│ OpenTelemetry SDK │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│
│ │ Tracer │ │ Meter │ │ Logger ││
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘│
└─────────┼────────────────┼────────────────┼───────┘
│ │ │
┌──────────┴──────────┬─────┴─────┐ │
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ OTLP Exporter │ │ Langfuse │ │ Prometheus │
│ (Jaeger/Tempo) │ │ Exporter │ │ Exporter │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Jaeger/Tempo │ │ Langfuse │ │ Prometheus │
│ 分布式追踪 │ │ LLM 可观测 │ │ 指标监控 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
框架定义了标准化的追踪属性,遵循 「OpenTelemetry GenAI 语义约定」:
// 核心追踪属性
const (
// 操作类型
KeyGenAIOperationName = "gen_ai.operation.name"// invoke_agent / chat / execute_tool
// LLM 相关
KeyGenAISystem = "gen_ai.system" // openai / hunyuan / deepseek
KeyGenAIRequestModel = "gen_ai.request.model" // gpt-4 / hunyuan-pro
// Token 统计
KeyGenAIUsageInputTokens = "gen_ai.usage.input_tokens"
KeyGenAIUsageOutputTokens = "gen_ai.usage.output_tokens"
// 工具调用
KeyGenAIToolName = "gen_ai.tool.name"
KeyGenAIToolCallArguments = "gen_ai.tool.call.arguments"
KeyGenAIToolCallResult = "gen_ai.tool.call.result"
)
启动分布式追踪只需几行代码:
import "trpc.group/trpc-go/trpc-agent-go/telemetry/trace"
func main() {
ctx := context.Background()
// 启动 OTLP 追踪
cleanup, err := trace.Start(ctx,
trace.WithEndpoint("otel-collector:4317"), // OTLP Collector 地址
trace.WithServiceName("my-agent-service"), // 服务名
trace.WithServiceNamespace("production"), // 命名空间
trace.WithProtocol("grpc"), // 协议:grpc 或 http
)
if err != nil {
log.Fatal(err)
}
defer cleanup()
// ... 运行 Agent
}
框架会自动为 Agent 执行链路生成层级分明的 Span:
Trace: invoke_agent (用户请求入口)
│
├── Span: invoke_agent [agent=weather-assistant]
│ │
│ ├── Span: chat [model=gpt-4, tokens_in=150, tokens_out=80]
│ │ └── 属性: llm.request, llm.response, gen_ai.usage.*
│ │
│ ├── Span: execute_tool [tool=get_weather]
│ │ └── 属性: tool.call.arguments, tool.call.result
│ │
│ └── Span: chat [model=gpt-4, tokens_in=200, tokens_out=120]
│ └── 最终响应生成
│
└── 总耗时: 2.3s, 总 Token: 550
框架内置了 Agent 场景最关键的指标:
指标名 | 类型 | 说明 |
|---|---|---|
trpc_agent_go.client.request_cnt | Counter | 请求总量 |
gen_ai.client.token.usage | Histogram | Token 使用量分布 |
gen_ai.client.operation.duration | Histogram | 操作耗时分布 |
gen_ai.server.time_to_first_token | Histogram | 首 Token 时间 |
trpc_agent_go.client.output_token_per_time | Histogram | Token 生成速率 |
import "trpc.group/trpc-go/trpc-agent-go/telemetry/metric"
func main() {
ctx := context.Background()
// 创建 MeterProvider
mp, err := metric.NewMeterProvider(ctx,
metric.WithEndpoint("otel-collector:4317"),
metric.WithServiceName("my-agent-service"),
)
if err != nil {
log.Fatal(err)
}
// 初始化全局 MeterProvider
metric.InitMeterProvider(mp)
// 可选:自定义 Histogram 桶边界
metric.SetHistogramBuckets(
"chat", // Meter 名称
"gen_ai.client.operation.duration", // 指标名
[]float64{0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0}, // 桶边界(秒)
)
}
一个亮点是支持「运行时动态修改 Histogram 桶边界」,无需重启服务:
// 场景:发现 LLM 延迟分布变化,需要调整桶边界
metric.SetHistogramBuckets(
"chat",
"gen_ai.client.operation.duration",
[]float64{0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 60.0}, // 新的桶边界
)
这在生产环境中非常实用——当你发现指标分布不合理时,可以热调整而不影响服务。
「Langfuse」 是目前最流行的 LLM 可观测平台之一,专为 AI 应用设计。trpc-agent-go 提供了开箱即用的集成。
┌─────────────────────────────────────────────────────────────────────────────┐
│ Langfuse 平台能力 │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Trace 追踪 │ │ 成本分析 │ │ Prompt 管理 │
│ │ │ │ │ │
│ • Agent 调用链 │ │ • Token 消耗 │ │ • 版本控制 │
│ • LLM 输入输出 │ │ • 模型成本 │ │ • A/B 测试 │
│ • 工具执行详情 │ │ • 用户级统计 │ │ • 效果评估 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 用户分析 │ │ 评估系统 │ │ 数据集管理 │
│ │ │ │ │ │
│ • 会话追踪 │ │ • 人工标注 │ │ • 测试用例 │
│ • 用户反馈 │ │ • 自动评估 │ │ • 回归测试 │
│ • 使用模式 │ │ • 质量评分 │ │ • 持续优化 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Langfuse 实现了 「OpenTelemetry OTLP 协议」,这意味着我们可以复用 OTel 的 SDK,只需要:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Langfuse 集成架构 │
└─────────────────────────────────────────────────────────────────────────────┘
OTel Span SpanProcessor Exporter
│ │ │
│ 1. Span 开始 │ │
├────────────────────────▶│ │
│ │ │
│ 2. 从 Baggage 复制属性 │ │
│ (userId, sessionId) │ │
│ │ │
│ 3. Span 结束 │ │
├────────────────────────▶│ │
│ │ │
│ │ 4. 转换属性格式 │
│ ├───────────────────────▶│
│ │ │
│ │ 5. POST 到 Langfuse │
│ │ /api/public/otel │
│ │ /v1/traces │
│ │ ▼
│ │ ┌─────────────────┐
│ │ │ Langfuse API │
│ │ │ (OTLP 协议) │
│ │ └─────────────────┘
框架的 Langfuse 集成并非简单地使用标准 OTLP Exporter,而是实现了「自定义 SpanProcessor」,这是整个集成的核心技术点。
标准的 OTel Span 虽然能被 Langfuse 接收,但存在两个问题:
langfuse.observation.input,而非 llm.request因此,框架实现了 baggageBatchSpanProcessor,在 Span 生命周期的关键节点进行处理:
┌─────────────────────────────────────────────────────────────────────────────┐
│ 自定义 SpanProcessor 处理流程 │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────┐
│ Span 开始 (OnStart) │
└────────────┬────────────┘
│
▼
┌────────────────────────────────┐
│ 从 Context 提取 Baggage │
│ • langfuse.user.id │
│ • langfuse.session.id │
│ • langfuse.trace.tags │
│ • langfuse.trace.metadata.* │
└────────────────┬───────────────┘
│
▼
┌────────────────────────────────┐
│ 将 Baggage 复制到 Span 属性 │
│ span.SetAttributes(...) │
└────────────────┬───────────────┘
│
▼
┌─────────────────────────┐
│ Span 结束 (OnEnd) │
└────────────┬────────────┘
│
▼
┌────────────────────────────────┐
│ 属性格式转换 (Exporter 中) │
│ • gen_ai.operation.name │
│ → langfuse.observation.type│
│ • llm.request │
│ → langfuse.observation.input│
│ • llm.response │
│ → langfuse.observation.output│
└────────────────┬───────────────┘
│
▼
┌────────────────────────────────┐
│ 上传到 Langfuse API │
│ POST /api/public/otel/v1/traces│
└────────────────────────────────┘
「1. Baggage 传播处理器」
// baggageBatchSpanProcessor 包装了标准的 BatchSpanProcessor
// 在 Span 开始时,将 Baggage 中的属性复制到 Span 上
type baggageBatchSpanProcessor struct {
next sdktrace.SpanProcessor
}
func (p *baggageBatchSpanProcessor) OnStart(ctx context.Context, span sdktrace.ReadWriteSpan) {
// 从 Context 中提取 Baggage
for _, member := range baggage.FromContext(ctx).Members() {
// 只传播 Langfuse 需要的属性(白名单过滤)
if defaultLangfuseTraceAttributeFilter(member) {
span.SetAttributes(attribute.String(member.Key(), member.Value()))
}
}
// 继续传递给下一个处理器
if p.next != nil {
p.next.OnStart(ctx, span)
}
}
「2. Baggage 白名单过滤」
// 限制哪些 Baggage 条目会被传播到 Span 属性
// 避免所有 Baggage 都被无差别复制,造成属性污染
func defaultLangfuseTraceAttributeFilter(member baggage.Member) bool {
k := member.Key()
switch k {
case"langfuse.user.id", "user.id", // 用户 ID
"langfuse.session.id", "session.id", // 会话 ID
"langfuse.version", "langfuse.release", // 版本信息
"langfuse.trace.tags": // 标签
returntrue
default:
// 只传播顶层 metadata 键:langfuse.trace.metadata.<key>
return strings.HasPrefix(k, "langfuse.trace.metadata.")
}
}
「3. Exporter 中的属性转换」
// transformSpan 根据操作类型应用不同的转换规则
func transformSpan(span *tracepb.Span) {
// 查找操作类型
var operationName string
for _, attr := range span.Attributes {
if attr.Key == "gen_ai.operation.name" {
operationName = attr.Value.GetStringValue()
break
}
}
// 根据操作类型应用不同的转换
switch operationName {
case"invoke_agent":
transformInvokeAgent(span) // Agent 调用
case"chat":
transformCallLLM(span) // LLM 调用
case"execute_tool":
transformExecuteTool(span) // 工具执行
}
}
// LLM 调用转换示例
func transformCallLLM(span *tracepb.Span) {
var newAttributes []*commonpb.KeyValue
// 1. 添加 Langfuse 观测类型
newAttributes = append(newAttributes, &commonpb.KeyValue{
Key: "langfuse.observation.type",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_StringValue{StringValue: "generation"},
},
})
// 2. 转换属性
for _, attr := range span.Attributes {
switch attr.Key {
case"llm.request":
// llm.request → langfuse.observation.input
newAttributes = append(newAttributes, &commonpb.KeyValue{
Key: "langfuse.observation.input",
Value: attr.Value,
})
// 额外提取 generation_config 作为模型参数
var req map[string]any
if err := json.Unmarshal([]byte(attr.Value.GetStringValue()), &req); err == nil {
if genConfig, exists := req["generation_config"]; exists {
jsonConfig, _ := json.Marshal(genConfig)
newAttributes = append(newAttributes, &commonpb.KeyValue{
Key: "langfuse.observation.model.parameters",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_StringValue{StringValue: string(jsonConfig)},
},
})
}
}
case"llm.response":
// llm.response → langfuse.observation.output
newAttributes = append(newAttributes, &commonpb.KeyValue{
Key: "langfuse.observation.output",
Value: attr.Value,
})
default:
// 保留其他属性
newAttributes = append(newAttributes, attr)
}
}
span.Attributes = newAttributes
}
baggageBatchSpanProcessor
包装标准处理器,不破坏原有逻辑你可能会问:为什么要在 Exporter 层做转换,而不是直接在 SpanProcessor 的 OnEnd 中修改 Span?
原因是 「多后端兼容性」:
┌─────────────────┐
│ 原始 Span │
│ gen_ai.* │
│ llm.request │
└────────┬────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Jaeger │ │ Langfuse │ │ Prometheus │
│ Exporter │ │ Exporter │ │ Exporter │
│ │ │ (转换属性) │ │ │
│ 保持原始格式 │ │ 转换为专属格式│ │ 保持原始格式 │
└─────────────┘ └─────────────┘ └─────────────┘
如果在 SpanProcessor 中直接修改,会影响所有后端。在 Exporter 层转换,可以保证:
「方式一:环境变量配置」
export LANGFUSE_SECRET_KEY=sk-lf-xxx
export LANGFUSE_PUBLIC_KEY=pk-lf-xxx
export LANGFUSE_HOST=cloud.langfuse.com:443
import "trpc.group/trpc-go/trpc-agent-go/telemetry/langfuse"
func main() {
ctx := context.Background()
// 自动读取环境变量
cleanup, err := langfuse.Start(ctx)
if err != nil {
log.Fatal(err)
}
defer cleanup(ctx)
// ... 运行 Agent
}
「方式二:代码配置」
cleanup, err := langfuse.Start(ctx,
langfuse.WithSecretKey("sk-lf-xxx"),
langfuse.WithPublicKey("pk-lf-xxx"),
langfuse.WithHost("cloud.langfuse.com:443"),
)
「方式三:本地开发(自建 Langfuse)」
cleanup, err := langfuse.Start(ctx,
langfuse.WithHost("localhost:3000"),
langfuse.WithInsecure(), // 本地开发使用非安全连接
)
框架自动将 Agent 的操作类型映射为 Langfuse 的 Observation 类型:
Agent 操作 | Langfuse 类型 | UI 展示 |
|---|---|---|
invoke_agent | agent | 🤖 Agent 节点 |
chat (LLM 调用) | generation | 🧠 Generation 节点 |
execute_tool | tool | 🔧 Tool 节点 |
框架会自动将 OTel 通用属性转换为 Langfuse 专属属性:
┌──────────────────────────────────────────────────────────────────────────┐
│ 属性转换示例 │
└──────────────────────────────────────────────────────────────────────────┘
原始 OTel Span 转换后 Langfuse Span
┌────────────────────────────┐ ┌────────────────────────────┐
│ gen_ai.operation.name: │ ───► │ langfuse.observation.type: │
│ "chat" │ │ "generation" │
│ │ │ │
│ llm.request: │ ───► │ langfuse.observation.input:│
│ "{messages:[...]}" │ │ "{messages:[...]}" │
│ │ │ │
│ llm.response: │ ───► │ langfuse.observation. │
│ "{content:...}" │ │ output: "{content:...}" │
│ │ │ │
│ gen_ai.request.model: │ ───► │ langfuse.observation.model.│
│ "gpt-4" │ │ name: "gpt-4" │
│ │ │ │
│ gen_ai.usage.input_tokens: │ ───► │ (保留,用于成本计算) │
│ 150 │ │ │
└────────────────────────────┘ └────────────────────────────┘
通过 「Baggage 传播机制」,可以将用户 ID 和会话 ID 自动传播到所有 Span:
import (
"go.opentelemetry.io/otel/baggage"
)
func handleRequest(ctx context.Context, userID, sessionID string) {
// 设置 Baggage
userMember, _ := baggage.NewMember("langfuse.user.id", userID)
sessionMember, _ := baggage.NewMember("langfuse.session.id", sessionID)
bag, _ := baggage.New(userMember, sessionMember)
ctx = baggage.ContextWithBaggage(ctx, bag)
// 后续所有 Span 都会自动携带这些属性
agent.Run(ctx, request)
}
这样在 Langfuse 中就可以:
在生产环境中,通常需要同时对接多个可观测性平台:
func initTelemetry(ctx context.Context) (cleanup func(), err error) {
var cleanups []func() error
// 1. 初始化 OTLP 追踪(发送到内部 Jaeger/Tempo)
traceCleanup, err := trace.Start(ctx,
trace.WithEndpoint("otel-collector:4317"),
trace.WithServiceName("my-agent"),
)
if err != nil {
returnnil, err
}
cleanups = append(cleanups, traceCleanup)
// 2. 追加 Langfuse(复用同一个 TracerProvider)
langfuseCleanup, err := langfuse.Start(ctx,
langfuse.WithHost("cloud.langfuse.com:443"),
)
if err != nil {
returnnil, err
}
cleanups = append(cleanups, langfuseCleanup)
// 3. 初始化指标
mp, err := metric.NewMeterProvider(ctx,
metric.WithEndpoint("otel-collector:4317"),
)
if err != nil {
returnnil, err
}
metric.InitMeterProvider(mp)
returnfunc() {
for _, c := range cleanups {
c()
}
}, nil
}
对于高流量场景,需要配置合理的采样策略:
// 在 OTLP Collector 配置中设置采样
// otel-collector-config.yaml
processors:
probabilistic_sampler:
sampling_percentage: 10 # 采样 10%
pipelines:
traces:
processors: [probabilistic_sampler]
LLM 的输入输出可能包含敏感信息,需要在导出前脱敏:
// 自定义 SpanProcessor 实现脱敏
type SensitiveDataFilter struct {
next sdktrace.SpanProcessor
}
func (f *SensitiveDataFilter) OnEnd(span sdktrace.ReadOnlySpan) {
// 检查并脱敏敏感属性
// ...
f.next.OnEnd(span)
}
基于采集的指标配置告警规则:
# Prometheus AlertManager 规则示例
groups:
-name:agent-alerts
rules:
# Token 消耗异常
-alert:HighTokenUsage
expr:sum(rate(gen_ai_client_token_usage_sum[5m]))>10000
for:5m
labels:
severity:warning
annotations:
summary:"Token 消耗异常升高"
# LLM 延迟过高
-alert:HighLLMLatency
expr:histogram_quantile(0.95,gen_ai_client_operation_duration_bucket)>10
for:5m
labels:
severity:critical
annotations:
summary:"LLM P95 延迟超过 10 秒"
# 错误率过高
-alert:HighErrorRate
expr:sum(rate(trpc_agent_go_client_request_cnt{status="error"}[5m]))/sum(rate(trpc_agent_go_client_request_cnt[5m]))>0.05
for:5m
labels:
severity:critical
annotations:
summary:"Agent 错误率超过 5%"
┌─────────────────────────────────────────────────────────────────────────────┐
│ Trace: weather-query-12345 Duration: 3.2s│
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ▼ invoke_agent [weather-assistant] 3.2s │
│ │ │
│ ├─▼ chat [gpt-4] 1.1s │
│ │ tokens_in: 150, tokens_out: 45 │
│ │ │
│ ├─▼ execute_tool [get_current_weather] 0.8s │
│ │ input: {"city": "beijing"} │
│ │ output: {"temp": 25, "weather": "sunny"} │
│ │ │
│ └─▼ chat [gpt-4] 1.3s │
│ tokens_in: 200, tokens_out: 120 │
│ response: "北京今天天气晴朗,气温25度..." │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ Langfuse Dashboard │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 📊 今日统计 │
│ ┌─────────────┬─────────────┬─────────────┬─────────────┐ │
│ │ 总请求数 │ Token 消耗 │ 平均延迟 │ 成本 │ │
│ │ 12,345 │ 2.5M │ 2.3s │ $45.6 │ │
│ └─────────────┴─────────────┴─────────────┴─────────────┘ │
│ │
│ 🔍 Trace 详情 │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ 🤖 invoke_agent: weather-assistant ││
│ │ │ ││
│ │ ├── 🧠 generation: gpt-4 ││
│ │ │ Input: [user: 今天北京天气怎么样?] ││
│ │ │ Output: [assistant: 让我查询一下...] ││
│ │ │ Tokens: 150 → 45 | Cost: $0.02 ││
│ │ │ ││
│ │ ├── 🔧 tool: get_current_weather ││
│ │ │ Input: {"city": "beijing"} ││
│ │ │ Output: {"temp": 25, "weather": "sunny"} ││
│ │ │ ││
│ │ └── 🧠 generation: gpt-4 ││
│ │ Input: [tool_result: ...] ││
│ │ Output: [assistant: 北京今天天气晴朗...] ││
│ │ Tokens: 200 → 120 | Cost: $0.03 ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────────────────┘
构建 AI Agent 的可观测体系,核心在于:
要点 | 实践 |
|---|---|
「标准化」 | 基于 OpenTelemetry,遵循 GenAI 语义约定 |
「全链路」 | 追踪 Agent → LLM → Tool 的完整调用链 |
「LLM 原生」 | 采集 Token、首 Token 时间、输入输出等专属指标 |
「多后端」 | 同时对接 Jaeger + Langfuse + Prometheus |
「低侵入」 | 框架层自动埋点,业务代码无感知 |
通过 trpc-agent-go 的 telemetry 模块,你可以:
✅ 「一行代码」启用分布式追踪 ✅ 「自动采集」 LLM 专属指标 ✅ 「无缝对接」 Langfuse 平台 ✅ 「灵活配置」多后端同时导出
当 Agent 出现问题时,你不再需要"猜"——完整的调用链、详细的 Token 统计、精确的延迟分布,都在你的指尖。
❝💡 「思考题」:你的 Agent 应用是否已经具备完整的可观测能力?Token 消耗是否在预期范围内?欢迎在评论区分享你的实践经验。❞