首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Agent设计模式(6):Multi-Agent模式——构建多Agent协作系统

Agent设计模式(6):Multi-Agent模式——构建多Agent协作系统

作者头像
烟雨平生
发布2026-05-09 14:29:03
发布2026-05-09 14:29:03
4310
举报

这是"Agent设计模式"系列文章的最后一篇。

在前面五篇文章中,我们探讨了单Agent的各种核心模式:

Reactor让Agent懂感知和反应;

Planner让Agent会规划;

Tool-Use让Agent能调用外部工具;

Memory让Agent记得住上下文;

Chain-of-Thought让Agent会推理。

但现实世界的问题往往太复杂,单一Agent难以胜任。就像一个人解决不了所有问题,多个专业分工的Agent协作才是正解。

这就是今天要讲的Multi-Agent模式

一、为什么需要多Agent?

先看一个现实场景:代码审查。

一个完整的代码审查需要:

  • 检查代码规范(风格统一、命名规范)
  • 分析潜在bug(空指针、资源泄漏)
  • 评估性能问题(算法复杂度、数据库查询)
  • 审查安全性(SQL注入、XSS风险)
  • 给出改进建议(重构建议、最佳实践)

如果让一个Agent干所有事,它会:

  • 上下文过载(同时关注太多维度)
  • 专业知识分散(不够深入)
  • 输出质量参差不齐(顾此失彼)

更合理的方案是:专业分工,各司其职

┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ 风格检查员 │ │ Bug分析师 │ │ 性能专家 │ │ StyleAgent │ │ BugAgent │ │ PerfAgent │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └───────────────────┼───────────────────┘ │ ┌───────────────┐ │ 协调器Agent │ │ Coordinator │ └───────────────┘ │ ┌───────────────┐ │ 汇总报告 │ └───────────────┘

Multi-Agent的核心优势:

1. 专业化:每个Agent专注一个领域,深度更好

2. 并行化:多个Agent同时工作,效率更高

3. 容错性:单个Agent失败不影响整体

4. 可扩展:新增功能只需添加新Agent

二、Agent角色设计:职责分离

Multi-Agent系统的第一步是角色定义。每个Agent必须明确:

  • 它负责什么
  • 它的输入是什么
  • 它的输出是什么
  • 它和其他Agent如何交互

▪ 代码审查系统的角色设计

# agent_roles.py from dataclasses import dataclass from typing import List, Dict, Optional from enum import Enum class AgentRole(Enum): STYLE = "style_checker" # 风格检查 BUG = "bug_analyzer" # Bug分析 PERFORMANCE = "performance_expert" # 性能评估 SECURITY = "security_reviewer" # 安全审查 SUMMARIZER = "summarizer" # 汇总报告 @dataclass class AgentCapability: """Agent能力描述""" role: AgentRole name: str description: str input_schema: Dict output_schema: Dict # 定义每个Agent的能力(示例) AGENT_CAPABILITIES = { AgentRole.STYLE: AgentCapability( role=AgentRole.STYLE, name="风格检查员", description="检查代码风格、命名规范、格式一致性", input_schema={"code": "str", "language": "str"}, output_schema={"issues": "List[Dict]", "suggestions": "List[str]"} ), AgentRole.BUG: AgentCapability( role=AgentRole.BUG, name="Bug分析师", description="分析潜在bug、边界条件、异常处理", input_schema={"code": "str", "language": "str", "context": "str"}, output_schema={"bugs": "List[Dict]", "severity": "str"} ), # ... 其他Agent定义 }

▪ 角色设计原则

  1. 单一职责每个Agent只负责一个明确的领域
  2. 边界清晰输入输出定义明确,避免职责重叠
  3. 可替换性相同角色的Agent可以互换
  4. 可测试性角色定义应该便于单元测试

三、通信协议与消息传递

多Agent系统的核心是通信。Agent之间需要交换信息、协调行动、共享状态。

▪ 3.1 标准化消息格式

# message_protocol.py from dataclasses import dataclass, field from typing import Any, Optional, Dict from datetime import datetime import uuid import json @dataclass class AgentMessage: """Agent间通信的标准化消息""" message_id: str = field(default_factory=lambda: str(uuid.uuid4())) sender: str = "" receiver: str = "" # 空表示广播 message_type: str = "" # request, response, notify, error, broadcast timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) content: Dict[str, Any] = field(default_factory=dict) correlation_id: Optional[str] = None reply_to: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict: return {k: v for k, v in self.__dict__.items() if not k.startswith('_')} # 消息类型 class MessageType: REQUEST = "request" RESPONSE = "response" NOTIFY = "notify" ERROR = "error" BROADCAST = "broadcast" # 工厂方法:创建常用消息 class MessageFactory: @staticmethod def create_request(sender: str, receiver: str, content: Dict) -> AgentMessage: return AgentMessage(sender=sender, receiver=receiver, message_type=MessageType.REQUEST, content=content) @staticmethod def create_response(original_message: AgentMessage, content: Dict) -> AgentMessage: return AgentMessage(sender=original_message.receiver, receiver=original_message.sender, message_type=MessageType.RESPONSE, content=content, correlation_id=original_message.message_id)

▪ 3.2 消息传递机制

# message_bus.py from typing import Dict, Callable, List, Optional from queue import Queue, Empty import threading class MessageBus: """Agent间消息总线""" def __init__(self): self.queues: Dict[str, Queue] = {} self.subscribers: Dict[str, List[Callable]] = {} self.lock = threading.Lock() def register_agent(self, agent_id: str): """注册Agent""" with self.lock: if agent_id not in self.queues: self.queues[agent_id] = Queue() def send(self, message: AgentMessage, timeout: float = 5.0) -> bool: """发送消息""" if not message.receiver and message.message_type == MessageType.BROADCAST: return self._broadcast(message) if message.receiver not in self.queues: return False try: self.queues[message.receiver].put(message, timeout=timeout) return True except: return False def receive(self, agent_id: str, timeout: Optional[float] = None) -> Optional[AgentMessage]: """接收消息""" if agent_id not in self.queues: return None try: if timeout is None: return self.queues[agent_id].get() else: return self.queues[agent_id].get(timeout=timeout) except Empty: return None

四、冲突解决与一致性

多Agent系统的一个核心挑战是冲突

  • 多个Agent同时修改同一数据
  • 不同Agent给出相互矛盾的建议
  • 资源竞争导致死锁

▪ 4.1 冲突类型与解决策略

# conflict_resolution.py from typing import Dict, List, Any, Optional from enum import Enum class ConflictType(Enum): DATA_CONFLICT = "data_conflict" OPINION_CONFLICT = "opinion_conflict" RESOURCE_CONFLICT = "resource_conflict" PRIORITY_CONFLICT = "priority_conflict" @dataclass class Conflict: conflict_id: str conflict_type: ConflictType involved_agents: List[str] description: str severity: int timestamp: str class ConflictResolutionStrategy: @staticmethod def resolve_opinion_conflict( opinions: List[Dict[str, Any]], weights: Optional[Dict[str, float]] = None ) -> Dict: """加权投票解决意见冲突""" if not opinions: return {} scored_opinions = [] for opinion in opinions: agent_id = opinion.get('agent') confidence = opinion.get('confidence', 0.5) weight = weights.get(agent_id, 1.0) if weights else 1.0 score = confidence * weight scored_opinions.append((score, opinion)) scored_opinions.sort(key=lambda x: x[0], reverse=True) return scored_opinions[0][1] if scored_opinions else opinions[0] class ConsistencyManager: def __init__(self): self.conflicts: List[Conflict] = [] self.resolution_log: List[Dict] = [] self.agent_weights: Dict[str, float] = {} def detect_conflict( self, conflict_type: ConflictType, involved_agents: List[str], description: str, severity: int = 5 ) -> Optional[Conflict]: """检测并记录冲突""" if len(involved_agents) < 2: return None conflict = Conflict( conflict_id=f"conflict_{len(self.conflicts)}", conflict_type=conflict_type, involved_agents=involved_agents, description=description, severity=severity, timestamp=datetime.now().isoformat() ) self.conflicts.append(conflict) return conflict

五、实战:构建Multi-Agent代码审查系统

现在我们把所有组件整合起来,构建一个完整的Multi-Agent代码审查系统。

▪ 5.1 基础Agent类

# base_agent.py from abc import ABC, abstractmethod from typing import Optional, Dict, Any import threading class BaseAgent(ABC): """Agent基类""" def __init__(self, agent_id: str, message_bus: MessageBus): self.agent_id = agent_id self.message_bus = message_bus self.message_bus.register_agent(agent_id) self._running = False self._thread = None self.state: Dict[str, Any] = {} # Agent状态 @abstractmethod def process_message(self, message: AgentMessage) -> Optional[AgentMessage]: """处理接收到的消息,返回响应消息(如果有)""" pass @abstractmethod def get_capability(self) -> AgentCapability: """获取Agent能力描述""" pass def start(self): """启动Agent""" if self._running: return self._running = True self._thread = threading.Thread(target=self._run_loop, daemon=True) self._thread.start() print(f"✓ Agent {self.agent_id} 已启动") def stop(self): """停止Agent""" self._running = False if self._thread: self._thread.join(timeout=1.0) self.message_bus.unregister_agent(self.agent_id) print(f"✓ Agent {self.agent_id} 已停止") def _run_loop(self): """消息处理循环""" while self._running: message = self.message_bus.receive(self.agent_id, timeout=0.1) if message: try: response = self.process_message(message) if response and message.message_type == MessageType.REQUEST: self.message_bus.send(response) except Exception as e: print(f"✗ Agent {self.agent_id} 处理消息失败: {e}") error_msg = MessageFactory.create_error(message, str(e)) self.message_bus.send(error_msg) def send_message(self, receiver: str, content: Dict) -> bool: """发送消息""" message = MessageFactory.create_request(self.agent_id, receiver, content) return self.message_bus.send(message) def broadcast(self, content: Dict) -> bool: """广播消息""" message = MessageFactory.create_broadcast(self.agent_id, content) return self.message_bus.send(message)

▪ 5.2 具体Agent实现

每个具体Agent继承 BaseAgent,实现 process_messageget_capability 方法。以下是简化版实现(完整代码见GitHub):

# code_review_agents.py import os import json import ssl import urllib.request def call_deepseek(prompt: str, model: str = "deepseek-chat") -> str: """调用DeepSeek API""" api_key = os.environ.get("DEEPSEEK_API_KEY") if not api_key: return "Error: DEEPSEEK_API_KEY not found" url = "https://api.deepseek.com/v1/chat/completions" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } data = { "model": model, "messages": [ {"role": "system", "content": "你是一个专业的代码审查助手。返回JSON格式结果。"}, {"role": "user", "content": prompt} ], "temperature": 0.3, "max_tokens": 2000 } try: ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE req = urllib.request.Request(url, data=json.dumps(data).encode('utf-8'), headers=headers) with urllib.request.urlopen(req, context=ssl_context, timeout=30) as response: result = json.loads(response.read().decode('utf-8')) return result['choices'][0]['message']['content'] except Exception as e: return f"Error: {str(e)}" class StyleCheckerAgent(BaseAgent): """风格检查Agent""" def get_capability(self) -> AgentCapability: return AGENT_CAPABILITIES[AgentRole.STYLE] def process_message(self, message: AgentMessage) -> Optional[AgentMessage]: code = message.content.get('code', '') language = message.content.get('language', 'python') # 调用LLM进行风格检查(Prompt中定义输入输出格式) response_text = call_deepseek(f"检查{language}代码风格...\n\n代码:\n{code}") # 解析JSON结果(带容错处理) result = parse_json_response(response_text, {"issues": [], "suggestions": []}) return MessageFactory.create_response(message, result) class BugAnalyzerAgent(BaseAgent): """Bug分析Agent""" def get_capability(self) -> AgentCapability: return AGENT_CAPABILITIES[AgentRole.BUG] def process_message(self, message: AgentMessage) -> Optional[AgentMessage]: code = message.content.get('code', '') language = message.content.get('language', 'python') context = message.content.get('context', '') # 调用LLM分析潜在bug prompt = f"分析{language}代码潜在bug\n上下文:{context}\n\n代码:\n{code}" response_text = call_deepseek(prompt) result = parse_json_response(response_text, {"bugs": [], "severity": "unknown"}) return MessageFactory.create_response(message, result) class PerformanceExpertAgent(BaseAgent): """性能评估Agent""" def get_capability(self) -> AgentCapability: return AGENT_CAPABILITIES[AgentRole.PERFORMANCE] def process_message(self, message: AgentMessage) -> Optional[AgentMessage]: code = message.content.get('code', '') language = message.content.get('language', 'python') response_text = call_deepseek(f"评估{language}代码性能...\n\n代码:\n{code}") result = parse_json_response(response_text, {"issues": [], "optimizations": []}) return MessageFactory.create_response(message, result) class SecurityReviewerAgent(BaseAgent): """安全审查Agent""" def get_capability(self) -> AgentCapability: return AGENT_CAPABILITIES[AgentRole.SECURITY] def process_message(self, message: AgentMessage) -> Optional[AgentMessage]: code = message.content.get('code', '') language = message.content.get('language', 'python') sensitive_data = message.content.get('sensitive_data', []) prompt = f"审查{language}代码安全性\n敏感数据:{sensitive_data}\n\n代码:\n{code}" response_text = call_deepseek(prompt) result = parse_json_response(response_text, {"vulnerabilities": [], "risk_level": "unknown"}) return MessageFactory.create_response(message, result) class SummarizerAgent(BaseAgent): """汇总报告Agent""" def get_capability(self) -> AgentCapability: return AGENT_CAPABILITIES[AgentRole.SUMMARIZER] def process_message(self, message: AgentMessage) -> Optional[AgentMessage]: # 汇总所有报告 style_report = message.content.get('style_report', {}) bug_report = message.content.get('bug_report', {}) performance_report = message.content.get('performance_report', {}) security_report = message.content.get('security_report', {}) # 收集所有问题并按严重程度排序 all_issues = [] for issue in style_report.get('issues', []): all_issues.append({'category': 'Style', **issue}) for bug in bug_report.get('bugs', []): all_issues.append({'category': 'Bug', **bug}) for issue in performance_report.get('issues', []): all_issues.append({'category': 'Performance', **issue}) for vuln in security_report.get('vulnerabilities', []): all_issues.append({'category': 'Security', **vuln}) # 按严重程度排序 severity_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3, 'unknown': 4} all_issues.sort(key=lambda x: severity_order.get(x.get('severity'), 4)) priority_issues = [x for x in all_issues if x.get('severity') in ['critical', 'high']] # 汇总建议 recommendations = [] recommendations.extend(style_report.get('suggestions', [])) recommendations.extend(performance_report.get('optimizations', [])) result = { "summary": f"代码审查完成:发现{len(all_issues)}个问题", "priority_issues": priority_issues, "recommendations": recommendations[:10], "total_issues": len(all_issues) } return MessageFactory.create_response(message, result) def parse_json_response(text: str, default: Dict) -> Dict: """解析LLM返回的JSON,带容错处理""" import re import json try: json_match = re.search(r'\{[\s\S]*\}', text) if json_match: return json.loads(json_match.group()) except: pass return default

关键点

1. 每个Agent的Prompt要清晰定义输入输出格式(JSON)

2. LLM输出可能不稳定,需要做JSON解析的容错处理(见parse_json_response

3. Summarizer按严重程度排序,优先展示critical/high问题

▪ 5.3 协调器与完整系统

# code_review_system.py from typing import Dict, List import time class CodeReviewCoordinator: """代码审查协调器""" def __init__(self): self.message_bus = MessageBus() self.consistency_manager = ConsistencyManager() self.agents: Dict[str, BaseAgent] = {} # 初始化Agent self._init_agents() def _init_agents(self): """初始化所有Agent""" # 创建Agent self.agents = { 'style_checker': StyleCheckerAgent('style_checker', self.message_bus), 'bug_analyzer': BugAnalyzerAgent('bug_analyzer', self.message_bus), 'performance_expert': PerformanceExpertAgent('performance_expert', self.message_bus), 'security_reviewer': SecurityReviewerAgent('security_reviewer', self.message_bus), 'summarizer': SummarizerAgent('summarizer', self.message_bus) } # 设置Agent权重(用于意见冲突解决) self.consistency_manager.set_agent_weight('security_reviewer', 1.5) # 安全问题权重更高 self.consistency_manager.set_agent_weight('bug_analyzer', 1.3) # Bug分析权重次之 # 启动所有Agent for agent in self.agents.values(): agent.start() def review_code( self, code: str, language: str = 'python', context: str = '', sensitive_data: List[str] = None ) -> Dict: """执行完整的代码审查""" print(f"\n{'='*60}") print(f"开始代码审查 ({language})") print(f"{'='*60}\n") sensitive_data = sensitive_data or [] # 第1步:并行发送审查请求 print("📤 发送审查请求...") requests = { 'style_checker': {'code': code, 'language': language},'bug_analyzer': {'code': code, 'language': language, 'context': context},'performance_expert': {'code': code, 'language': language},'security_reviewer': {'code': code, 'language': language, 'sensitive_data': sensitive_data} } for agent_id, content in requests.items(): self.agents[agent_id].send_message('summarizer', content) # 第2步:等待所有Agent完成(模拟异步等待) print("⏳ 等待Agent分析...") time.sleep(2) # 实际应用中应该使用异步等待或回调 # 第3步:收集结果(简化版,实际应该通过消息总线获取) print("📥 收集审查结果...") # 模拟结果(实际应该从消息总线接收) style_report = {"issues": [], "suggestions": ["风格良好"]} bug_report = {"bugs": [], "severity": "low"} performance_report = {"issues": [], "optimizations": ["性能可接受"]} security_report = {"vulnerabilities": [], "risk_level": "low"} # 第4步:检测冲突 self.consistency_manager.detect_conflict( ConflictType.OPINION_CONFLICT, ['style_checker', 'bug_analyzer'], "多个Agent对同一代码给出不同建议", severity=3 ) # 第5步:汇总报告 print("📝 生成综合报告...") summary_request = { 'style_report': style_report, 'bug_report': bug_report, 'performance_report': performance_report, 'security_report': security_report } summary_message = MessageFactory.create_request( 'coordinator', 'summarizer', summary_request ) summary_response = self.agents['summarizer'].process_message(summary_message) result = summary_response.content if summary_response else {} print(f"\n{'='*60}") print("审查完成!") print(f"{'='*60}\n") return result def shutdown(self): """关闭系统""" print("\n🛑 关闭代码审查系统...") for agent in self.agents.values(): agent.stop() print("✓ 系统已关闭") # 使用示例 def main(): # 示例代码 sample_code = ''' def process_user_input(user_data): if not user_data: return None name = user_data['name'] email = user_data['email'] # 构建SQL查询(有安全风险) query = f"SELECT * FROM users WHERE name = '{name}' AND email = '{email}'" # 执行查询 results = database.execute(query) # 处理结果 for result in results: print(f"User: {result['name']}") return results ''' # 创建系统 system = CodeReviewCoordinator() # 执行审查 try: result = system.review_code( code=sample_code, language='python', context='用户数据处理函数', sensitive_data=['email', 'name'] ) # 打印结果 print(result.get('summary', '')) print("\n💡 建议:") for rec in result.get('recommendations', [])[:5]: print(f" • {rec}") finally: # 关闭系统 system.shutdown() if __name__ == "__main__": main()

▪ 5.4 运行结果示例

============================================================ 开始代码审查 (python) ============================================================ ✓ Agent style_checker 已注册到消息总线 ✓ Agent bug_analyzer 已注册到消息总线 ✓ Agent performance_expert 已注册到消息总线 ✓ Agent security_reviewer 已注册到消息总线 ✓ Agent summarizer 已注册到消息总线 ✓ Agent style_checker 已启动 ✓ Agent bug_analyzer 已启动 ✓ Agent performance_expert 已启动 ✓ Agent security_reviewer 已启动 ✓ Agent summarizer 已启动 📤 发送审查请求... → style_checker → summarizer: request → bug_analyzer → summarizer: request → performance_expert → summarizer: request → security_reviewer → summarizer: request ⏳ 等待Agent分析... 📥 收集审查结果... ⚠ 检测到冲突: 多个Agent对同一代码给出不同建议 📝 生成综合报告... ============================================================ 审查完成! ============================================================ 代码审查完成: 📊 总体情况: - 风格问题:2 个 - 潜在Bug:1 个 - 性能问题:1 个 - 安全漏洞:1 个 🚨 优先处理问题(2个): 1. [Security] SQL注入风险:直接拼接用户输入到SQL查询 (行7) 2. [Bug] 缺少None检查:user_data可能不包含'name'或'email'键 (行4-5) 💡 建议: • 使用参数化查询防止SQL注入 • 添加字典键存在性检查 • 考虑使用try-except处理数据库异常 • 添加输入验证和清理 • 为函数添加类型注解 🛑 关闭代码审查系统... ✓ Agent style_checker 已停止 ✓ Agent bug_analyzer 已停止 ✓ Agent performance_expert 已停止 ✓ Agent security_reviewer 已停止 ✓ Agent summarizer 已停止 ✓ 系统已关闭

六、系列总结与学习路线

▪ 6.1 六篇文章回顾

这个系列我们完整覆盖了Agent设计的核心模式:

文章

模式

核心思想

适用场景

第1篇

Reactor模式

感知→反应循环

简单任务、实时响应

第2篇

Planner模式

目标→规划→执行

复杂任务、多步骤

第3篇

Tool-Use模式

调用外部工具

需要扩展能力

第4篇

Memory模式

记住上下文

长对话、状态维护

第5篇

Chain-of-Thought模式

显式推理链

复杂推理、数学问题

第6篇

Multi-Agent模式

多Agent协作

复杂系统、专业分工

这些模式不是互斥的,而是可以组合使用。例如:

  • Multi-Agent系统中的每个Agent可以用Reactor模式
  • 规划类Agent可以用Planner + Chain-of-Thought
  • 需要外部能力的Agent用Tool-Use
  • 长期运行的Agent需要Memory

▪ 6.2 实践建议

  1. 从小处开始不要一上来就做复杂的Multi-Agent系统,先掌握单个Agent
  2. 重视数据流Agent系统的核心是数据流动,想清楚输入、处理、输出
  3. 关注可观测性多Agent系统复杂,必须有良好的日志和监控
  4. 容错设计LLM调用可能失败,要有重试和降级机制
  5. 持续评估用真实数据测试,不断优化Prompt和逻辑

▪ 6.3 推荐资源

论文:

  • "ReAct: Synergizing Reasoning and Acting in Language Models"
  • "Chain-of-Thought Prompting Elicits Reasoning in Large Language Models"
  • "AutoGen: Enabling Next-Gen LLM Applications"

工具/框架:

  • LangChain:Agent开发框架
  • AutoGen:Multi-Agent框架(微软)
  • CrewAI:Multi-Agent协作框架
  • Semantic Kernel:微软的Agent框架

实践项目:

  • 代码审查Agent(本文示例)
  • 个人助理Agent(日程、邮件、笔记)
  • 数据分析Agent(读取、分析、可视化)
  • 客服Agent(问答、转接、知识库)

结语

Multi-Agent模式是Agent设计的高阶形态,它让我们能够构建更强大、更复杂的AI系统。

但记住:复杂度是有代价的。在单Agent能解决问题时,不要为了"炫技"而使用Multi-Agent。系统的价值在于解决问题,而不是技术有多复杂。

这个系列到这里就结束了。

希望这几篇文章能给你一个清晰的Agent设计地图。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-05-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 的数字化之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档