Agent 技术栈正在经历从原型验证到生产部署的跨越。Google ADK(Agent Development Kit)作为 2025 年末发布的 Agent 开发框架,已经在不少云端项目中跑通了完整闭环。但"能跑"和"能扛"之间,差的就是架构设计。
本文从云部署和 API 工程化的视角出发,拆解 ADK 中 5 种 Skill 编排模式的架构选型逻辑、部署策略和资源开销优化方案。重点不是教你用 ADK,而是帮你在真实云环境中把 Agent 系统搭稳。
Sequential Chain 是最直观的编排方式——Skill A 处理完交给 Skill B,再交给 Skill C。映射到云架构中,本质就是一条同步处理流水线。
from google.adk import Agent, Skill, SequentialRunner
class ExtractSkill(Skill):
"""从 API 响应中提取结构化数据"""
def execute(self, context):
raw_text = context.get("user_input")
entities = self.llm.extract_entities(raw_text)
context.set("entities", entities)
return context
class ValidateSkill(Skill):
"""字段完整性校验——防止脏数据进入下游"""
def execute(self, context):
entities = context.get("entities")
required_fields = ["name", "date", "amount"]
missing = [f for f in required_fields if f not in entities]
if missing:
context.set("validation_error", f"缺少字段: {missing}")
context.set("is_valid", False)
else:
context.set("is_valid", True)
return context
class PersistSkill(Skill):
"""校验通过后写入持久化存储"""
def execute(self, context):
if not context.get("is_valid"):
return context
entities = context.get("entities")
db.insert("records", entities)
context.set("result", "写入成功")
return context
agent = Agent(
skills=[ExtractSkill(), ValidateSkill(), PersistSkill()],
runner=SequentialRunner()
)问题 | 现象 | 工程化方案 |
|---|---|---|
中间节点故障 | 校验 Skill 异常后,持久化 Skill 收到空数据 | 每个 Skill 加防御性校验 + 熔断机制 |
Context 对象泄漏 | 高并发下不同请求的 context 串数据 | 容器级隔离,每次请求创建独立 context 实例 |
链路排查困难 | 出错时不知道是哪一步导致的 | 接入分布式链路追踪,每个 Skill 写 trace span |
选型建议: 流程确定、步骤固定的业务链路首选。但如果涉及条件分支(比如"校验失败走人工审核"),就需要升级到 Router 或 Supervisor 模式。
一个请求同时分发到多个 Skill 并行执行,最后聚合。对应到云端就是典型的扇出-聚合模型,和微服务的并行调用本质一致。
from google.adk import Agent, Skill, ParallelRunner, AggregatorSkill
class SearchDBSkill(Skill):
def execute(self, context):
result = db.query(context.get("query"))
return {"source": "db", "data": result}
class SearchAPISkill(Skill):
def execute(self, context):
result = api.search(context.get("query"))
return {"source": "api", "data": result}
class SearchCacheSkill(Skill):
def execute(self, context):
result = cache.get(context.get("query"))
return {"source": "cache", "data": result}
class MergeResultsSkill(AggregatorSkill):
def aggregate(self, results):
valid = [r for r in results if r["data"] is not None]
priority = {"cache": 0, "db": 1, "api": 2}
valid.sort(key=lambda x: priority.get(x["source"], 99))
return valid[0] if valid else {"error": "所有数据源均无结果"}
agent = Agent(
skills=[SearchDBSkill(), SearchAPISkill(), SearchCacheSkill()],
aggregator=MergeResultsSkill(),
runner=ParallelRunner(timeout_seconds=10)
)方案 | 三源总耗时 | 资源利用率 |
|---|---|---|
串行调用 | 1.2s + 0.8s + 0.1s ≈ 2.1s | 低,大量时间在等待 |
并行扇出 | max(1.2, 0.8, 0.1) ≈ 1.2s | 高,并发利用计算资源 |
并行 + 超时降级 | ≤1.0s(超时丢弃慢源) | 最优,配合 SLA 兜底 |
部署提醒: 并行 Skill 部署时注意连接池配置。3 个 Skill 并发打下游 API,如果连接池太小会变成伪并行。云函数场景下需要关注冷启动对扇出延迟的影响。
用一个决策层根据输入特征,动态选择不同的下游 Skill 执行。等价于 API 网关里的智能路由,但决策引擎可以是 LLM 也可以是规则。
from google.adk import Agent, Skill, RouterSkill
class IntentRouter(RouterSkill):
"""根据请求意图路由到对应处理服务"""
ROUTES = {
"query": "QuerySkill",
"create": "CreateSkill",
"analyze": "AnalyzeSkill",
}
def route(self, context):
user_input = context.get("user_input")
# 方案 A:LLM 分类(适合语义复杂的场景)
intent = self.llm.classify(
user_input,
categories=list(self.ROUTES.keys())
)
# 方案 B:规则匹配(延迟低、零 Token 消耗)
# intent = self.rule_classify(user_input)
target_skill = self.ROUTES.get(intent, "FallbackSkill")
context.set("routed_intent", intent)
return target_skill维度 | LLM 路由 | 规则路由 |
|---|---|---|
准确率 | 95%+(复杂语义场景) | 85%(模糊输入准确率下降) |
单次延迟 | 200-800ms(含 API 调用) | <5ms(本地计算) |
API Token 开销 | ~500 token/次 | 零 |
运维成本 | 调 Prompt 即可迭代 | 正则/关键词表需人工维护 |
工程化方案: 生产环境推荐混合路由——规则层先拦截高频明确意图,命中率低于 90% 的长尾请求再走 LLM 路由。这样既控制 API 调用成本,又保障整体准确率。
一个 Supervisor Skill 持有全局状态,动态规划、调度、评估、重试其他 Skill。这是构建复杂 Agent 系统的核心架构模式,类似于微服务中的编排器(Orchestrator)。
from google.adk import Agent, Skill, SupervisorSkill
class TaskSupervisor(SupervisorSkill):
MAX_RETRIES = 3
def supervise(self, context):
plan = self.llm.plan(context.get("task_description"))
context.set("plan", plan)
for step in plan["steps"]:
skill_name = step["skill"]
retries = 0
while retries < self.MAX_RETRIES:
result = self.dispatch(skill_name, context)
quality = self.llm.evaluate(
task=step["description"],
result=result,
criteria=step.get("success_criteria", "完成且无错误")
)
if quality["passed"]:
context.set(f"step_{step['id']}_result", result)
break
else:
retries += 1
context.set(f"step_{step['id']}_feedback", quality["feedback"])
if retries >= self.MAX_RETRIES:
return {
"status": "failed",
"step": step["id"],
"reason": "超过最大重试次数"
}
return {"status": "completed", "results": context.get_all_results()}Supervisor 模式最大的风险是失控——LLM 不断重试,Token 和 API 调用费用飙升。生产环境必须加硬限制:
# Token 预算 + 时间预算双保险
if total_tokens_used > budget_limit:
return fallback_result()
if elapsed_time > timeout:
return partial_result()策略 | Token 消耗 | 灵活性 | 适用场景 |
|---|---|---|---|
静态(一次性规划) | 基准 | 低 | 流程确定的业务 |
动态(逐步决策) | 3-5 倍 | 高 | 探索性任务 |
架构建议: 先用静态计划上线,观察失败率。如果某类任务静态计划的成功率低于 80%,再针对该类切换到动态计划。避免全局动态导致 API 成本失控。
多个 Skill 扮演不同专业角色,各自独立分析同一问题,最后由一个聚合 Skill 综合所有专家意见输出最终结论。本质是用多视角降低单模型的盲区风险。
class SecurityExpert(Skill):
SYSTEM_PROMPT = "你是一位安全工程师,专注于发现代码中的安全漏洞。"
def execute(self, context):
code = context.get("code_to_review")
analysis = self.llm.analyze(
system=self.SYSTEM_PROMPT,
prompt=f"审查以下代码的安全性:\n{code}"
)
return {"expert": "security", "findings": analysis}
class PerformanceExpert(Skill):
SYSTEM_PROMPT = "你是一位性能优化专家,专注于发现性能瓶颈和优化机会。"
def execute(self, context):
code = context.get("code_to_review")
analysis = self.llm.analyze(
system=self.SYSTEM_PROMPT,
prompt=f"分析以下代码的性能:\n{code}"
)
return {"expert": "performance", "findings": analysis}
class ReviewModerator(AggregatorSkill):
def aggregate(self, expert_results):
combined = "\n".join([
f"【{r['expert']}】{r['findings']}"
for r in expert_results
])
final = self.llm.synthesize(
prompt=f"综合以下专家意见,给出最终代码审查报告:\n{combined}"
)
return final方案 | 发现问题数 | 误报率 | API Token 消耗 |
|---|---|---|---|
单模型直审 | 12 | 25% | 3,200 |
3 专家并行 | 23 | 15% | 9,800 |
3 专家 + 交叉验证 | 21 | 8% | 14,500 |
问题发现率翻倍的同时误报率反降,但 Token 消耗也翻了 3-5 倍。需要根据业务对质量和成本的敏感度做取舍。
模式 | 架构复杂度 | API 成本 | 最佳场景 | 延迟特征 |
|---|---|---|---|---|
Sequential | 低 | 低 | 确定性流水线 | 各步累加 |
Parallel | 中 | 中 | 多源聚合 | 取最慢步 |
Router | 低 | 低-中 | 多类型请求分发 | 路由 + 单步 |
Supervisor | 高 | 高 | 复杂动态编排 | 不可预测 |
Specialist | 中 | 高 | 多维度分析 | 可并行优化 |
当 Router + Supervisor + Specialist 组合使用时,API 调用量会呈指数增长。三个实战经验:
# 路由判断用轻量模型——快且便宜
router.set_model("gemini-2.5-flash")
# 执行层用中端模型
worker.set_model("claude-sonnet-4-6")
# 关键决策走最强模型
supervisor.set_model("claude-opus-4-6")在多模型混用的 Agent 架构中,通过统一的 API 聚合平台管理模型调度能显著降低运维复杂度。ofox 提供了 Claude、GPT、Gemini 等主流模型的统一接口,一套 Key 覆盖多模型调用,省去分别对接多家 API 的工作。
class SmartContextManager:
"""只给每个 Skill 传它需要的最小 context"""
def get_for_skill(self, skill_name, full_context):
required_keys = SKILL_CONTEXT_MAP[skill_name]
return {k: full_context[k] for k in required_keys if k in full_context}把 context 控制在最小范围,既减少 Token 消耗也降低信息泄漏风险。
@skill_cache(ttl=3600, key_fn=lambda ctx: hash(ctx.get("query")))
def execute(self, context):
# 缓存命中直接返回,未命中才调 LLM
pass对重复性高的查询类 Skill,加一层缓存能砍掉 30-60% 的冗余 API 调用。
Q: ADK 和 LangChain 怎么选?
如果基础设施在 GCP 上,ADK 与 Vertex AI 的原生集成更丝滑。LangChain 生态更广泛但框架层抽象较厚,调试成本更高。不在 GCP 上的项目,LangChain 的灵活性可能更适合。
Q: 模式可以嵌套吗?
生产系统几乎都是嵌套的。典型组合是 Router → Supervisor → Parallel。但建议嵌套不超过 3 层,否则链路追踪和错误定位的工程成本会急剧上升。
Q: 单个 Router 能管多少下游?
ADK 没有硬限制,但路由准确率会随候选数增多而下降。经验值:单 Router 不超过 10 个下游 Skill。超出时拆成分层 Router,先粗分类再细路由。
更新日期:2026-03-26
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。