
作者: HOS(安全风信子) 日期: 2026-05-24 主要来源平台: GitHub 摘要: 复杂任务需要多个 Agent 协作完成。本文深入探讨 Multi-Agent 系统的设计与实现,围绕 Agent 角色划分(planner、coder、reviewer、tester)、Agent 间通信协议(Blackboard、Message Passing、Shared Memory)、任务分解与分配、协作策略(串行执行、并行执行、层次化执行)、冲突解决机制(投票、仲裁、优先级)以及多 Agent 系统规模化问题展开系统性论述。文中通过 3 个完整代码实现、3 个 Mermaid 架构图表与大量实践案例,展示如何构建高效、稳定、可扩展的 Multi-Agent 协作系统,为 AI IDE 领域的工程实践提供可直接落地的技术方案。
本节为你提供的核心技术价值:理解 Multi-Agent 协作的本质——从角色划分到冲突解决的完整方法论,掌握构建生产级 Multi-Agent 系统的核心设计原则。
在软件开发的复杂任务面前,单一 Agent 的能力往往存在上限。当任务涉及多领域知识的融合、长时间跨度的规划、以及对质量的多重保障时,单 Agent 架构会面临注意力分散、上下文溢出、错误累积等问题。Multi-Agent 系统通过将复杂任务分解给多个专业化 Agent,让每个 Agent 专注于其擅长的领域,通过协作完成单一 Agent 无法胜任的工作。
Multi-Agent 系统的核心价值体现在三个维度:
垂直专业化:每个 Agent 可以成为特定领域的专家。Planner 精通任务分解与规划,Coder 擅长代码生成与实现,Reviewer 专注于代码质量评估,Tester 精于测试用例设计与缺陷发现。这种专业化使得每个 Agent 的决策质量显著高于通用型 Agent。
水平并行化:独立子任务可以同时执行,显著缩短任务完成时间。在一个包含 10 个独立模块的项目中,4 个 Coder Agent 并行工作理论上可以将编码时间缩短至串行执行的 1/4。
容错与鲁棒性:多 Agent 相互校验与审核可以有效降低错误率。当 Coder 生成的代码经过 Reviewer 的严格审核后,潜在缺陷在早期被发现,避免了后期修复的高昂成本。
然而,Multi-Agent 系统也带来了额外的复杂度:Agent 间的通信开销、协作策略的设计、冲突解决机制的实现、以及系统规模化后的稳定性维护。这些问题正是本文要深入探讨的核心内容。
在 Multi-Agent 系统中,角色划分是协作的基础。角色决定了 Agent 的职责边界、能力范围以及与其他 Agent 的交互方式。合理的角色划分可以使系统事半功倍,而不合理的角色划分则会导致职责混乱、效率低下。
从角色设计的角度,Agent 可以分为两种范式:Specialist(专家型) 和 Generalist(通才型)。
Specialist 模式 为每个 Agent 分配高度专业化的职责。例如:
Generalist 模式 则让每个 Agent 具备较为全面的能力,能够处理多种类型的任务。这种模式下,Agent 之间的区分度较低,主要通过动态任务分配来实现协作。
Specialist 优势:
优势项 | 描述 |
|---|---|
深度专业化 | Agent 可在特定领域达到极高水平 |
决策质量 | 单一职责减少决策复杂度,提高准确率 |
可解释性 | 问题出现时容易定位到具体 Agent |
可复用性 | 专业组件可在多个系统中复用 |
并行效率 | 独立专业任务可充分并行执行 |
Specialist 局限:
Generalist 优势:
Generalist 局限:
在生产环境中,混合角色模型 被证明是最有效的设计方案。这种模型结合了 Specialist 的深度优势和 Generalist 的灵活性:

核心设计原则:
在实际系统中,角色可以通过以下形式化方式进行定义:
# 角色定义的形式化模型
class AgentRole:
def __init__(self, name: str, capabilities: list, responsibilities: list):
self.name = name
self.capabilities = capabilities # Agent 能做什么
self.responsibilities = responsibilities # Agent 负责什么
self.input_types = [] # 接受什么类型的输入
self.output_types = [] # 产生什么类型的输出
self.priority = 0 # 在冲突时的优先级
def can_handle(self, task: 'Task') -> bool:
"""判断该角色是否能处理给定任务"""
return any(cap in self.capabilities for cap in task.required_capabilities)
def get_affinity_score(self, task: 'Task') -> float:
"""计算角色与任务的匹配度"""
score = 0.0
for cap in self.capabilities:
if cap in task.required_capabilities:
score += 1.0
return score / max(len(task.required_capabilities), 1)
# 角色专业化程度评估
def evaluate_specialization(agent: 'Agent') -> dict:
"""评估 Agent 的专业化程度"""
return {
'role_diversity': len(set(agent.roles)),
'capability_concentration': calculate_concentration(agent.capabilities),
'responsibility_clarity': measure_clarity(agent.responsibilities),
'avg_task_completion_rate': agent.completion_history.mean(),
'avg_task_quality_score': agent.quality_history.mean()
}任务分解是 Multi-Agent 系统的核心环节。良好的任务分解应该满足以下原则:
原子性原则:每个子任务应该是原子性的,即不可再分。一个原子任务要么完整执行,要么不执行,不存在部分执行的状态。
独立性原则:子任务之间应尽量减少依赖。依赖越少,可并行化的程度越高,系统效率也就越高。
可观测性原则:每个子任务应该有明确的输入、输出和验收标准,使得任务执行情况可观测、可度量。
均衡性原则:子任务的复杂度应该尽量均衡,避免出现部分子任务耗时过长导致整体进度阻塞的情况。
Task Graph 是任务分解的结构化表示,它不仅描述了任务之间的包含关系,还描述了任务之间的依赖关系。
from dataclasses import dataclass, field
from typing import List, Dict, Set, Optional, Any
from enum import Enum
import uuid
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
class TaskPriority(Enum):
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
@dataclass
class Task:
"""任务节点"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
assigned_agent: Optional[str] = None
# 依赖关系
dependencies: Set[str] = field(default_factory=set) # 任务执行前必须完成的任务
dependents: Set[str] = field(default_factory=set) # 依赖该任务的任务
# 状态与属性
status: TaskStatus = TaskStatus.PENDING
priority: TaskPriority = TaskPriority.NORMAL
# 输入输出
input_data: Any = None
output_data: Any = None
# 度量指标
estimated_duration: float = 0.0 # 估计执行时间(分钟)
actual_duration: float = 0.0
retry_count: int = 0
max_retries: int = 3
# 元数据
metadata: Dict[str, Any] = field(default_factory=dict)
def can_execute(self) -> bool:
"""检查任务是否可执行(所有依赖是否完成)"""
if self.status != TaskStatus.PENDING:
return False
return True
def add_dependency(self, task_id: str):
"""添加前置依赖"""
self.dependencies.add(task_id)
def add_dependent(self, task_id: str):
"""添加依赖者"""
self.dependents.add(task_id)
def get_depth(self, task_graph: 'TaskGraph') -> int:
"""计算任务在依赖图中的深度"""
if not self.dependencies:
return 0
return max(
task_graph.tasks[dep].get_depth(task_graph) + 1
for dep in self.dependencies
)
class TaskGraph:
"""任务依赖图"""
def __init__(self, root_task: Optional[Task] = None):
self.tasks: Dict[str, Task] = {}
self.root_task = root_task
if root_task:
self.tasks[root_task.id] = root_task
def add_task(self, task: Task, parent_ids: List[str] = None) -> None:
"""添加任务节点"""
self.tasks[task.id] = task
# 建立父子依赖关系
if parent_ids:
for parent_id in parent_ids:
if parent_id in self.tasks:
self.tasks[parent_id].add_dependent(task.id)
task.add_dependency(parent_id)
def get_executable_tasks(self) -> List[Task]:
"""获取当前可执行的任务(所有依赖已满足)"""
executable = []
for task in self.tasks.values():
if task.status != TaskStatus.PENDING:
continue
# 检查所有依赖是否已完成
all_deps_completed = all(
self.tasks[dep].status == TaskStatus.COMPLETED
for dep in task.dependencies
)
if all_deps_completed:
executable.append(task)
return executable
def get_parallel_groups(self) -> List[List[Task]]:
"""获取可并行执行的任务组"""
groups = []
remaining_tasks = set(self.tasks.keys())
while remaining_tasks:
# 找出一组可以并行执行的任务
current_group = []
for task_id in list(remaining_tasks):
task = self.tasks[task_id]
if task.status != TaskStatus.PENDING:
remaining_tasks.discard(task_id)
continue
# 检查依赖是否都已完成或不在 remaining_tasks 中
deps_in_remaining = remaining_tasks & task.dependencies
if not deps_in_remaining:
current_group.append(task)
if not current_group:
break
groups.append(current_group)
# 将当前组任务标记为完成以解锁下一组
for task in current_group:
remaining_tasks.discard(task.id)
return groups
def validate(self) -> tuple[bool, List[str]]:
"""验证任务图的合理性"""
errors = []
# 检查循环依赖
if self.has_cycle():
errors.append("检测到循环依赖")
# 检查孤立任务
for task in self.tasks.values():
if not task.dependencies and not task.dependents:
if task != self.root_task:
errors.append(f"任务 {task.id} ({task.name}) 是孤立节点")
return len(errors) == 0, errors
def has_cycle(self) -> bool:
"""检测图中是否存在环"""
visited = set()
rec_stack = set()
def dfs(task_id: str) -> bool:
visited.add(task_id)
rec_stack.add(task_id)
for dep_id in self.tasks[task_id].dependencies:
if dep_id not in visited:
if dfs(dep_id):
return True
elif dep_id in rec_stack:
return True
rec_stack.remove(task_id)
return False
for task_id in self.tasks:
if task_id not in visited:
if dfs(task_id):
return True
return False
def calculate_critical_path(self) -> List[str]:
"""计算关键路径(耗时最长的依赖链)"""
memo = {}
def longest_path_from(task_id: str) -> tuple[int, List[str]]:
if task_id in memo:
return memo[task_id]
task = self.tasks[task_id]
if not task.dependents:
memo[task_id] = (task.estimated_duration, [task_id])
return memo[task_id]
max_len = 0
max_path = []
for dep_id in task.dependents:
length, path = longest_path_from(dep_id)
if length > max_len:
max_len = length
max_path = path
total_len = task.estimated_duration + max_len
memo[task_id] = (total_len, [task_id] + max_path)
return memo[task_id]
if not self.root_task:
return []
_, path = longest_path_from(self.root_task.id)
return path
def visualize(self) -> str:
"""生成 Mermaid 格式的图描述"""
lines = ["flowchart TD"]
# 添加节点定义
for task in self.tasks.values():
status_color = {
TaskStatus.PENDING: "#gray",
TaskStatus.RUNNING: "#yellow",
TaskStatus.COMPLETED: "#green",
TaskStatus.FAILED: "#red",
TaskStatus.BLOCKED: "#orange"
}.get(task.status, "#gray")
lines.append(f' {task.id}["{task.name} ({task.status.value})"]:::{task.status.value}')
# 添加样式
lines.append("")
lines.append(" classDef pending fill:#f9f,stroke:#333,stroke-width:4px")
lines.append(" classDef running fill:#ff9,stroke:#333,stroke-width:4px")
lines.append(" classDef completed fill:#9f9,stroke:#333,stroke-width:4px")
lines.append(" classDef failed fill:#f99,stroke:#333,stroke-width:4px")
lines.append(" classDef blocked fill:#ff99,stroke:#333,stroke-width:4px")
# 添加依赖边
lines.append("")
for task in self.tasks.values():
for dep_id in task.dependencies:
lines.append(f" {dep_id} --> {task.id}")
return "\n".join(lines)
def build_code_development_graph() -> TaskGraph:
"""构建代码开发任务的依赖图"""
root = Task(
id="root",
name="完成功能开发",
description="完整的代码开发任务"
)
graph = TaskGraph(root)
spec_task = Task(id="task_spec", name="编写规格说明书", estimated_duration=30)
design_task = Task(id="task_design", name="系统设计", estimated_duration=60)
test_plan_task = Task(id="task_test_plan", name="测试计划", estimated_duration=20)
graph.add_task(spec_task)
graph.add_task(design_task)
graph.add_task(test_plan_task)
impl_task = Task(id="task_impl", name="代码实现", estimated_duration=180)
graph.add_task(impl_task, parent_ids=["task_design"])
unit_test_task = Task(id="task_unit_test", name="单元测试", estimated_duration=40)
integration_test_task = Task(id="task_integration_test", name="集成测试", estimated_duration=60)
graph.add_task(unit_test_task, parent_ids=["task_impl"])
graph.add_task(integration_test_task, parent_ids=["task_impl", "task_test_plan"])
final_review_task = Task(id="task_final_review", name="最终评审", estimated_duration=30)
graph.add_task(final_review_task, parent_ids=["task_unit_test", "task_integration_test"])
return graph依赖管理是 Task Graph 维护的核心功能。以下是几种常用的依赖管理策略:
策略一:严格依赖模式
在这种模式下,只有当所有前置任务完全完成后,后续任务才能开始。这种模式适用于任务之间存在强数据依赖的场景。
class StrictDependencyManager:
"""严格依赖管理器"""
def __init__(self, task_graph: TaskGraph):
self.task_graph = task_graph
def can_schedule(self, task_id: str) -> bool:
"""检查任务是否可以调度"""
task = self.task_graph.tasks[task_id]
if task.status != TaskStatus.PENDING:
return False
for dep_id in task.dependencies:
dep_task = self.task_graph.tasks[dep_id]
if dep_task.status != TaskStatus.COMPLETED:
return False
return True
def get_blocking_tasks(self, task_id: str) -> List[Task]:
"""获取阻塞指定任务的所有任务"""
task = self.task_graph.tasks[task_id]
blocking = []
for dep_id in task.dependencies:
dep_task = self.task_graph.tasks[dep_id]
if dep_task.status != TaskStatus.COMPLETED:
blocking.append(dep_task)
return blocking策略二:乐观并发模式
在这种模式下,系统会预测任务完成时间,预先调度可能就绪的任务。当预测错误时,系统需要回滚或重新调度。
class OptimisticConcurrencyManager:
"""乐观并发依赖管理器"""
def __init__(self, task_graph: TaskGraph):
self.task_graph = task_graph
self.predictions = {}
self.speculative_schedules = {}
def predict_completion_time(self, task_id: str) -> float:
"""预测任务完成时间"""
if task_id in self.predictions:
return self.predictions[task_id]
task = self.task_graph.tasks[task_id]
base_time = task.estimated_duration
if task.retry_count > 0:
base_time *= (1 + 0.2 * task.retry_count)
for dep_id in task.dependencies:
dep_task = self.task_graph.tasks[dep_id]
if dep_task.actual_duration > 0:
base_time += dep_task.actual_duration
self.predictions[task_id] = base_time
return base_time
def get_speculative_ready_tasks(self, current_time: float) -> List[Task]:
"""获取投机型就绪任务"""
ready = []
for task in self.task_graph.tasks.values():
if task.status != TaskStatus.PENDING:
continue
max_dep_time = 0
all_deps_will_complete = True
for dep_id in task.dependencies:
dep_task = self.task_graph.tasks[dep_id]
if dep_task.status == TaskStatus.COMPLETED:
max_dep_time = max(max_dep_time, dep_task.actual_duration)
else:
pred_time = self.predict_completion_time(dep_id)
if current_time + task.estimated_duration < pred_time:
all_deps_will_complete = False
break
max_dep_time = max(max_dep_time, pred_time)
if all_deps_will_complete:
ready.append(task)
return ready策略三:时间窗口依赖模式
在这种模式下,依赖关系与时间窗口关联。一个任务可能在前置任务完成后的某个时间窗口内随时开始,而不是严格等待完成。
class TimeWindowDependencyManager:
"""时间窗口依赖管理器"""
def __init__(self, task_graph: TaskGraph, default_window_minutes: int = 30):
self.task_graph = task_graph
self.default_window = default_window_minutes
self.completion_windows = {}
self.time_windows = {}
def set_dependency_window(
self,
task_id: str,
dependency_id: str,
window_start: int,
window_end: int
):
"""设置依赖的时间窗口"""
if task_id not in self.time_windows:
self.time_windows[task_id] = {}
self.time_windows[task_id][dependency_id] = (window_start, window_end)
def can_start_at(
self,
task_id: str,
current_time: float,
dependency_completion_times: Dict[str, float]
) -> bool:
"""判断任务是否可以在指定时间开始"""
task = self.task_graph.tasks[task_id]
for dep_id in task.dependencies:
dep_completion = dependency_completion_times.get(dep_id)
if dep_completion is None:
return False
if task_id in self.time_windows:
if dep_id in self.time_windows[task_id]:
window_start, window_end = self.time_windows[task_id][dep_id]
time_since_dep = current_time - dep_completion
if time_since_dep < window_start:
return False
if time_since_dep > window_end:
return False
elif current_time - dep_completion > self.default_window:
return False
return TrueMulti-Agent 系统的核心挑战之一是 Agent 之间的通信。不同的通信协议适用于不同的场景,选择合适的通信协议对系统性能有决定性影响。

Blackboard(黑板)模式是一种经典的 AI 协作模式。其核心思想是多个 Agent 通过读写共享的"黑板"来进行间接通信。
Blackboard 的核心特性:
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from enum import Enum
import threading
import uuid
import json
class BlackboardEventType(Enum):
WRITE = "write"
UPDATE = "update"
DELETE = "delete"
READ = "read"
SUBSCRIBE = "subscribe"
@dataclass
class BlackboardEntry:
"""黑板条目"""
id: str
key: str
value: Any
timestamp: datetime
author: str
version: int = 1
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class BlackboardEvent:
"""黑板事件"""
event_type: BlackboardEventType
entry: Optional[BlackboardEntry]
key: str
agent_id: str
timestamp: datetime
class Blackboard:
"""Blackboard 实现 - Multi-Agent 共享知识空间"""
def __init__(self, name: str = "default"):
self.name = name
self._storage: Dict[str, BlackboardEntry] = {}
self._version_history: Dict[str, List[BlackboardEntry]] = {}
self._lock = threading.RLock()
self._observers: Dict[str, List[Callable]] = {}
self._key_subscribers: Dict[str, List[str]] = {}
self._event_history: List[BlackboardEvent] = []
def write(self, key: str, value: Any, agent_id: str, metadata: Dict = None) -> BlackboardEntry:
"""写入黑板"""
with self._lock:
entry = BlackboardEntry(
id=str(uuid.uuid4()),
key=key,
value=value,
timestamp=datetime.now(),
author=agent_id,
version=1,
metadata=metadata or {}
)
if key in self._storage:
old_entry = self._storage[key]
entry.version = old_entry.version + 1
if key not in self._version_history:
self._version_history[key] = []
self._version_history[key].append(old_entry)
self._storage[key] = entry
self._event_history.append(
BlackboardEvent(
event_type=BlackboardEventType.WRITE,
entry=entry,
key=key,
agent_id=agent_id,
timestamp=datetime.now()
)
)
return entry
def read(self, key: str, agent_id: str) -> Optional[Any]:
"""读取黑板内容"""
with self._lock:
if key not in self._storage:
return None
entry = self._storage[key]
self._event_history.append(
BlackboardEvent(
event_type=BlackboardEventType.READ,
entry=entry,
key=key,
agent_id=agent_id,
timestamp=datetime.now()
)
)
return entry.value
def read_entry(self, key: str) -> Optional[BlackboardEntry]:
"""读取完整条目"""
with self._lock:
return self._storage.get(key)
def update(self, key: str, value: Any, agent_id: str, metadata: Dict = None) -> Optional[BlackboardEntry]:
"""更新黑板内容"""
with self._lock:
if key not in self._storage:
return None
old_entry = self._storage[key]
new_entry = BlackboardEntry(
id=str(uuid.uuid4()),
key=key,
value=value,
timestamp=datetime.now(),
author=agent_id,
version=old_entry.version + 1,
metadata=metadata or {}
)
self._storage[key] = new_entry
if key not in self._version_history:
self._version_history[key] = []
self._version_history[key].append(old_entry)
self._event_history.append(
BlackboardEvent(
event_type=BlackboardEventType.UPDATE,
entry=new_entry,
key=key,
agent_id=agent_id,
timestamp=datetime.now()
)
)
return new_entry
def delete(self, key: str, agent_id: str) -> bool:
"""删除黑板条目"""
with self._lock:
if key not in self._storage:
return False
entry = self._storage[key]
del self._storage[key]
self._event_history.append(
BlackboardEvent(
event_type=BlackboardEventType.DELETE,
entry=entry,
key=key,
agent_id=agent_id,
timestamp=datetime.now()
)
)
return True
def subscribe(self, key: str, agent_id: str):
"""订阅 key 的变化"""
with self._lock:
if key not in self._key_subscribers:
self._key_subscribers[key] = []
if agent_id not in self._key_subscribers[key]:
self._key_subscribers[key].append(agent_id)
def get_history(self, key: str, limit: int = 10) -> List[BlackboardEntry]:
"""获取 key 的历史版本"""
with self._lock:
history = self._version_history.get(key, [])
return history[-limit:]
def get_stats(self) -> Dict[str, Any]:
"""获取黑板统计信息"""
with self._lock:
return {
'total_entries': len(self._storage),
'total_events': len(self._event_history),
'subscribed_keys': len(self._key_subscribers)
}
class AgentWithBlackboard:
"""使用 Blackboard 的 Agent 基类"""
def __init__(self, agent_id: str, blackboard: Blackboard):
self.agent_id = agent_id
self.blackboard = blackboard
def write_task_result(self, task_id: str, result: Any):
"""写入任务结果到黑板"""
key = f"task_result:{task_id}"
self.blackboard.write(key, result, self.agent_id)
def read_task_result(self, task_id: str) -> Any:
"""从黑板读取任务结果"""
key = f"task_result:{task_id}"
return self.blackboard.read(key, self.agent_id)Message Passing(消息传递)模式是 Multi-Agent 系统中最常用的通信方式。
Message Passing 的核心特性:
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from enum import Enum
import threading
import uuid
import asyncio
from collections import defaultdict
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
NOTIFICATION = "notification"
BROADCAST = "broadcast"
HEARTBEAT = "heartbeat"
class MessagePriority(Enum):
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
@dataclass
class Message:
"""消息"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
msg_type: MessageType = MessageType.REQUEST
sender: str = ""
receiver: str = ""
subject: str = ""
content: Any = None
timestamp: datetime = field(default_factory=datetime.now)
priority: MessagePriority = MessagePriority.NORMAL
correlation_id: str = ""
headers: Dict[str, str] = field(default_factory=dict)
def is_broadcast(self) -> bool:
return self.receiver == ""
def is_direct(self) -> bool:
return self.receiver != ""
class MessageBus:
"""消息总线"""
def __init__(self):
self._queues: Dict[str, asyncio.Queue] = {}
self._handlers: Dict[str, List[Callable]] = {}
self._lock = threading.RLock()
self._subscribers: Dict[str, List[str]] = defaultdict(list)
self._message_history: List[Message] = []
self._max_history = 10000
self._stats = {
'total_messages': 0,
'total_broadcasts': 0,
'total_direct': 0
}
def register_agent(self, agent_id: str, queue_size: int = 100):
"""注册 Agent"""
with self._lock:
if agent_id not in self._queues:
self._queues[agent_id] = asyncio.Queue(maxsize=queue_size)
def unregister_agent(self, agent_id: str):
"""注销 Agent"""
with self._lock:
if agent_id in self._queues:
del self._queues[agent_id]
async def send(self, message: Message):
"""发送消息"""
with self._lock:
self._message_history.append(message)
if len(self._message_history) > self._max_history:
self._message_history = self._message_history[-self._max_history:]
self._stats['total_messages'] += 1
if message.is_broadcast():
self._stats['total_broadcasts'] += 1
else:
self._stats['total_direct'] += 1
if message.is_broadcast():
await self._broadcast(message)
else:
await self._deliver_direct(message)
async def _broadcast(self, message: Message):
"""广播消息"""
topic = message.subject
with self._lock:
subscribers = list(self._subscribers.get(topic, []))
if not subscribers:
with self._lock:
subscribers = list(self._queues.keys())
tasks = []
for agent_id in subscribers:
if agent_id != message.sender:
msg_copy = Message(
id=str(uuid.uuid4()),
msg_type=message.msg_type,
sender=message.sender,
receiver=agent_id,
subject=message.subject,
content=message.content,
timestamp=datetime.now(),
priority=message.priority,
correlation_id=message.id,
headers=message.headers.copy()
)
tasks.append(self._enqueue(agent_id, msg_copy))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def _deliver_direct(self, message: Message):
"""直接发送消息"""
if message.receiver in self._queues:
await self._enqueue(message.receiver, message)
async def _enqueue(self, agent_id: str, message: Message):
"""将消息放入队列"""
try:
queue = self._queues.get(agent_id)
if queue:
await asyncio.wait_for(queue.put(message), timeout=5.0)
except asyncio.QueueFull:
print(f"Queue full for agent {agent_id}")
except Exception as e:
print(f"Error enqueuing message: {e}")
async def receive(self, agent_id: str, timeout: float = None) -> Optional[Message]:
"""接收消息"""
if agent_id not in self._queues:
return None
try:
if timeout:
return await asyncio.wait_for(
self._queues[agent_id].get(),
timeout=timeout
)
else:
return await self._queues[agent_id].get()
except asyncio.TimeoutError:
return None
def get_stats(self) -> Dict:
"""获取统计信息"""
with self._lock:
stats = self._stats.copy()
stats['registered_agents'] = len(self._queues)
return statsShared Memory(共享内存)模式是最简单直接的通信方式。
Shared Memory 的核心特性:
import threading
from typing import Any, Dict, List, Optional
from datetime import datetime
class SharedMemoryStore:
"""共享内存存储"""
def __init__(self):
self._storage: Dict[str, Any] = {}
self._locks: Dict[str, threading.Lock] = {}
self._thread_lock = threading.Lock()
def _get_lock(self, key: str) -> threading.Lock:
"""获取键的锁"""
with self._thread_lock:
if key not in self._locks:
self._locks[key] = threading.Lock()
return self._locks[key]
def set(self, key: str, value: Any):
"""设置值"""
lock = self._get_lock(key)
with lock:
self._storage[key] = value
def get(self, key: str, default: Any = None) -> Any:
"""获取值"""
lock = self._get_lock(key)
with lock:
return self._storage.get(key, default)
def update(self, key: str, update_func: callable):
"""原子更新"""
lock = self._get_lock(key)
with lock:
old_value = self._storage.get(key)
new_value = update_func(old_value)
self._storage[key] = new_value
return new_value
def delete(self, key: str):
"""删除值"""
lock = self._get_lock(key)
with lock:
if key in self._storage:
del self._storage[key]
def exists(self, key: str) -> bool:
"""检查键是否存在"""
lock = self._get_lock(key)
with lock:
return key in self._storage
class AtomicCounter:
"""原子计数器"""
def __init__(self, initial_value: int = 0):
self._value = {'counter': initial_value}
self._lock = threading.Lock()
def increment(self, delta: int = 1) -> int:
"""递增"""
with self._lock:
self._value['counter'] += delta
return self._value['counter']
def decrement(self, delta: int = 1) -> int:
"""递减"""
return self.increment(-delta)
def get(self) -> int:
"""获取当前值"""
with self._lock:
return self._value['counter']
class AgentWithSharedMemory:
"""使用共享内存的 Agent"""
def __init__(self, agent_id: str, shared_storage: SharedMemoryStore):
self.agent_id = agent_id
self.shared_storage = shared_storage
def write_code(self, task_id: str, code: str):
"""写入代码到共享内存"""
key = f"code:{task_id}"
self.shared_storage.set(key, {
'code': code,
'author': self.agent_id,
'timestamp': datetime.now().isoformat()
})
def read_code(self, task_id: str) -> Optional[Dict]:
"""从共享内存读取代码"""
key = f"code:{task_id}"
return self.shared_storage.get(key)
def update_progress(self, task_id: str, progress: float):
"""更新进度"""
key = f"progress:{task_id}"
self.shared_storage.set(key, {
'progress': progress,
'agent': self.agent_id,
'timestamp': datetime.now().isoformat()
})特性 | Blackboard | Message Passing | Shared Memory |
|---|---|---|---|
耦合度 | 松耦合 | 中等耦合 | 紧耦合 |
通信延迟 | 中等 | 可高可低 | 最低 |
同步方式 | 异步 | 同步/异步可选 | 同步 |
扩展性 | 高 | 高 | 低 |
实现复杂度 | 中等 | 高 | 低 |
适用场景 | 知识共享、协作编辑 | 任务分配、结果汇报 | 高速数据交换 |
容错性 | 高 | 中等 | 低 |
消息持久化 | 可选 | 通常不持久化 | 不持久化 |
协作策略决定了多个 Agent 如何协同完成复杂任务。不同的策略适用于不同的场景。

串行执行是最简单的协作策略,任务按严格的顺序执行。
适用场景:
class SequentialCoordinator:
"""串行执行协调器"""
def __init__(self, agents: Dict[str, Any], task_graph: TaskGraph):
self.agents = agents
self.task_graph = task_graph
self.current_task_id: Optional[str] = None
self.execution_log: List[Dict] = []
async def execute(self) -> bool:
"""执行串行计划"""
print(f"Starting sequential execution with {len(self.agents)} agents")
tasks_by_depth = self._group_tasks_by_depth()
for depth, tasks in tasks_by_depth.items():
print(f"\n=== Depth {depth} ===")
for task in tasks:
success = await self._execute_task(task)
self.execution_log.append({
'task_id': task.id,
'depth': depth,
'success': success,
'timestamp': datetime.now().isoformat()
})
if not success:
return False
return True
def _group_tasks_by_depth(self) -> Dict[int, List[Task]]:
"""按深度分组任务"""
depths = {}
for task in self.task_graph.tasks.values():
depth = task.get_depth(self.task_graph)
if depth not in depths:
depths[depth] = []
depths[depth].append(task)
return depths
async def _execute_task(self, task: Task) -> bool:
"""执行单个任务"""
print(f"Executing task: {task.id} - {task.name}")
agent_id = self._get_agent_for_task(task)
agent = self.agents.get(agent_id)
if not agent:
return False
dep_outputs = {}
for dep_id in task.dependencies:
dep_task = self.task_graph.tasks[dep_id]
dep_outputs[dep_id] = dep_task.output_data
task.status = TaskStatus.RUNNING
self.current_task_id = task.id
try:
output = await agent.execute(task.input_data, dep_outputs)
task.output_data = output
task.status = TaskStatus.COMPLETED
return True
except Exception as e:
print(f"Task {task.id} failed: {e}")
task.status = TaskStatus.FAILED
return False
finally:
self.current_task_id = None
def _get_agent_for_task(self, task: Task) -> Optional[str]:
"""确定任务应该由哪个 Agent 执行"""
if task.assigned_agent:
return task.assigned_agent
task_type = task.metadata.get('type', '')
if 'code' in task_type or 'implement' in task_type:
return 'coder'
elif 'review' in task_type or 'check' in task_type:
return 'reviewer'
elif 'test' in task_type:
return 'tester'
elif 'plan' in task_type or 'design' in task_type:
return 'planner'
return None并行执行允许多个 Agent 同时工作,显著提高任务完成效率。
class ParallelCoordinator:
"""并行执行协调器"""
def __init__(
self,
agents: Dict[str, Any],
task_graph: TaskGraph,
max_parallel_tasks: int = 4
):
self.agents = agents
self.task_graph = task_graph
self.max_parallel_tasks = max_parallel_tasks
self.execution_log: List[Dict] = []
self._running_tasks: Dict[str, asyncio.Task] = {}
self._semaphore = asyncio.Semaphore(max_parallel_tasks)
async def execute(self) -> bool:
"""执行并行计划"""
print(f"Starting parallel execution with max {self.max_parallel_tasks} concurrent tasks")
while True:
executable = self.task_graph.get_executable_tasks()
if not executable and not self._running_tasks:
break
for task in executable[:self.max_parallel_tasks]:
asyncio.create_task(self._execute_task_with_semaphore(task))
await asyncio.sleep(0.1)
failed_tasks = [
task for task in self.task_graph.tasks.values()
if task.status == TaskStatus.FAILED
]
if failed_tasks:
return False
return True
async def _execute_task_with_semaphore(self, task: Task):
"""使用信号量控制并发执行任务"""
async with self._semaphore:
await self._execute_task(task)
async def _execute_task(self, task: Task) -> bool:
"""执行单个任务"""
print(f"Starting task: {task.id} - {task.name}")
agent_id = self._get_agent_for_task(task)
agent = self.agents.get(agent_id)
if not agent:
task.status = TaskStatus.FAILED
return False
task.status = TaskStatus.RUNNING
self._running_tasks[task.id] = asyncio.current_task()
dep_outputs = {}
for dep_id in task.dependencies:
dep_task = self.task_graph.tasks[dep_id]
dep_outputs[dep_id] = dep_task.output_data
try:
output = await agent.execute(task.input_data, dep_outputs)
task.output_data = output
task.status = TaskStatus.COMPLETED
self.execution_log.append({
'task_id': task.id,
'agent_id': agent_id,
'status': 'completed',
'timestamp': datetime.now().isoformat()
})
return True
except Exception as e:
task.status = TaskStatus.FAILED
self.execution_log.append({
'task_id': task.id,
'agent_id': agent_id,
'status': 'failed',
'error': str(e),
'timestamp': datetime.now().isoformat()
})
return False
finally:
if task.id in self._running_tasks:
del self._running_tasks[task.id]
def _get_agent_for_task(self, task: Task) -> Optional[str]:
"""确定任务应该由哪个 Agent 执行"""
if task.assigned_agent:
return task.assigned_agent
task_type = task.metadata.get('type', '')
if 'code' in task_type or 'implement' in task_type:
return 'coder'
elif 'review' in task_type or 'check' in task_type:
return 'reviewer'
elif 'test' in task_type:
return 'tester'
elif 'plan' in task_type or 'design' in task_type:
return 'planner'
return None
class HierarchicalCoordinator:
"""层次化执行协调器"""
def __init__(self, task_graph: TaskGraph):
self.task_graph = task_graph
self.levels: Dict[int, List[Task]] = {}
self.level_coordinators: Dict[int, ParallelCoordinator] = {}
self.execution_results: Dict[str, Any] = {}
def build_hierarchy(self) -> int:
"""构建层次结构"""
max_depth = 0
for task in self.task_graph.tasks.values():
depth = task.get_depth(self.task_graph)
max_depth = max(max_depth, depth)
if depth not in self.levels:
self.levels[depth] = []
self.levels[depth].append(task)
return max_depth
async def execute(self, agents: Dict[str, Any], max_parallel: int = 4) -> bool:
"""层次化执行"""
max_depth = self.build_hierarchy()
print(f"Hierarchical execution with {max_depth + 1} levels")
for level in range(max_depth + 1):
if level not in self.levels:
continue
print(f"\n=== Level {level} ===")
tasks = self.levels[level]
coordinator = ParallelCoordinator(
agents,
self.task_graph,
max_parallel_tasks=max_parallel
)
self.level_coordinators[level] = coordinator
success = await coordinator.execute()
if not success:
return False
return True在实际应用中,纯粹的串行或并行策略往往不是最优解。混合协作策略结合两者的优点。
class HybridCoordinator:
"""混合协作策略协调器"""
def __init__(
self,
agents: Dict[str, Any],
task_graph: TaskGraph,
strategy_config: Dict[str, Any] = None
):
self.agents = agents
self.task_graph = task_graph
self.config = strategy_config or self._default_config()
self.sequential_threshold = self.config.get('sequential_threshold', 0.3)
self.max_parallel = self.config.get('max_parallel', 4)
self.task_strategies: Dict[str, str] = {}
self._analyze_strategies()
def _default_config(self) -> Dict[str, Any]:
"""默认配置"""
return {
'sequential_threshold': 0.3,
'parallel_threshold': 0.7,
'max_parallel': 4,
'enable_hierarchy': True,
'hierarchy_depth_threshold': 3
}
def _analyze_strategies(self):
"""分析任务图,确定每个任务的最佳策略"""
for task in self.task_graph.tasks.values():
if not task.dependencies:
self.task_strategies[task.id] = 'sequential'
elif len(task.dependencies) > 3:
self.task_strategies[task.id] = 'sequential'
else:
self.task_strategies[task.id] = 'parallel'
async def execute(self) -> bool:
"""混合执行"""
print("Starting hybrid execution")
levels = self._group_by_depth()
for depth, tasks in levels.items():
print(f"\n=== Processing depth {depth} ===")
has_strong_deps = any(
self.task_strategies.get(t.id) == 'sequential'
for t in tasks
)
if has_strong_deps:
print(f"Using sequential execution for {len(tasks)} tasks")
success = await self._execute_sequential(tasks)
else:
print(f"Using parallel execution for {len(tasks)} tasks")
success = await self._execute_parallel(tasks)
if not success:
return False
return True
def _group_by_depth(self) -> Dict[int, List[Task]]:
"""按深度分组"""
levels = {}
for task in self.task_graph.tasks.values():
depth = task.get_depth(self.task_graph)
if depth not in levels:
levels[depth] = []
levels[depth].append(task)
return dict(sorted(levels.items()))
async def _execute_sequential(self, tasks: List[Task]) -> bool:
"""串行执行任务组"""
for task in tasks:
agent_id = self._get_agent_for_task(task)
agent = self.agents.get(agent_id)
if not agent:
return False
task.status = TaskStatus.RUNNING
try:
output = await agent.execute(task.input_data, {})
task.output_data = output
task.status = TaskStatus.COMPLETED
except Exception:
task.status = TaskStatus.FAILED
return False
return True
async def _execute_parallel(self, tasks: List[Task]) -> bool:
"""并行执行任务组"""
semaphore = asyncio.Semaphore(self.max_parallel)
async def execute_with_semaphore(task: Task):
async with semaphore:
return await self._execute_single(task)
results = await asyncio.gather(
*[execute_with_semaphore(t) for t in tasks],
return_exceptions=True
)
return all(r is True or r is None for r in results)
async def _execute_single(self, task: Task) -> bool:
"""执行单个任务"""
agent_id = self._get_agent_for_task(task)
agent = self.agents.get(agent_id)
if not agent:
return False
task.status = TaskStatus.RUNNING
try:
output = await agent.execute(task.input_data, {})
task.output_data = output
task.status = TaskStatus.COMPLETED
return True
except Exception:
task.status = TaskStatus.FAILED
return False
def _get_agent_for_task(self, task: Task) -> Optional[str]:
"""确定任务应该由哪个 Agent 执行"""
if task.assigned_agent:
return task.assigned_agent
task_type = task.metadata.get('type', '')
if 'code' in task_type or 'implement' in task_type:
return 'coder'
elif 'review' in task_type or 'check' in task_type:
return 'reviewer'
elif 'test' in task_type:
return 'tester'
elif 'plan' in task_type or 'design' in task_type:
return 'planner'
return None在 Multi-Agent 系统中,冲突是不可避免的。
冲突的主要类型:
投票机制是一种民主化的冲突解决方法。
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime
from enum import Enum
import random
class VoteStrategy(Enum):
MAJORITY = "majority"
WEIGHTED = "weighted"
UNANIMOUS = "unanimous"
PREFERENCE = "preference"
@dataclass
class Vote:
"""投票"""
voter_id: str
candidate_id: str
weight: float = 1.0
reasoning: str = ""
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class VotingResult:
"""投票结果"""
winner_id: str
total_votes: int
vote_counts: Dict[str, int]
weighted_counts: Dict[str, float]
confidence: float
timestamp: datetime = field(default_factory=datetime.now)
class VotingSystem:
"""投票系统"""
def __init__(self, strategy: VoteStrategy = VoteStrategy.MAJORITY):
self.strategy = strategy
self.votes: List[Vote] = []
self.voter_weights: Dict[str, float] = {}
def set_voter_weight(self, voter_id: str, weight: float):
"""设置投票者权重"""
self.voter_weights[voter_id] = weight
def cast_vote(
self,
voter_id: str,
candidate_id: str,
reasoning: str = ""
) -> Vote:
"""投票"""
weight = self.voter_weights.get(voter_id, 1.0)
vote = Vote(
voter_id=voter_id,
candidate_id=candidate_id,
weight=weight,
reasoning=reasoning
)
self.votes.append(vote)
return vote
def tally_votes(self) -> VotingResult:
"""计票"""
if not self.votes:
return VotingResult(
winner_id="",
total_votes=0,
vote_counts={},
weighted_counts={},
confidence=0.0
)
vote_counts: Dict[str, int] = {}
weighted_counts: Dict[str, float] = {}
for vote in self.votes:
vote_counts[vote.candidate_id] = vote_counts.get(vote.candidate_id, 0) + 1
weighted_counts[vote.candidate_id] = (
weighted_counts.get(vote.candidate_id, 0.0) + vote.weight
)
if self.strategy == VoteStrategy.MAJORITY:
winner_id = max(vote_counts, key=vote_counts.get)
confidence = vote_counts[winner_id] / len(self.votes)
elif self.strategy == VoteStrategy.WEIGHTED:
winner_id = max(weighted_counts, key=weighted_counts.get)
total_weight = sum(weighted_counts.values())
confidence = weighted_counts[winner_id] / total_weight if total_weight > 0 else 0
else:
winner_id = max(vote_counts, key=vote_counts.get)
confidence = vote_counts[winner_id] / len(self.votes)
return VotingResult(
winner_id=winner_id,
total_votes=len(self.votes),
vote_counts=vote_counts,
weighted_counts=weighted_counts,
confidence=confidence
)
def clear_votes(self):
"""清除投票记录"""
self.votes = []仲裁机制通过引入第三方来解决冲突。
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
@dataclass
class Dispute:
"""争议"""
id: str
dispute_type: str
parties: List[str]
subject: str
details: Any
evidence: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class ArbitrationResult:
"""仲裁结果"""
dispute_id: str
winner: str
loser: str
decision: str
reasoning: str
remedies: List[str] = field(default_factory=list)
class Arbitrator(ABC):
"""仲裁者抽象基类"""
@abstractmethod
def arbitrate(self, dispute: Dispute) -> ArbitrationResult:
pass
@abstractmethod
def can_handle(self, dispute: Dispute) -> bool:
pass
class RuleBasedArbitrator(Arbitrator):
"""基于规则的仲裁者"""
def __init__(self):
self.rules: List[Dict] = []
self._load_default_rules()
def _load_default_rules(self):
"""加载默认规则"""
self.rules = [
{
'name': 'priority_rule',
'condition': lambda d: d.dispute_type == 'priority',
'action': self._resolve_priority_dispute
},
{
'name': 'resource_rule',
'condition': lambda d: d.dispute_type == 'resource',
'action': self._resolve_resource_dispute
},
{
'name': 'quality_rule',
'condition': lambda d: d.dispute_type == 'quality',
'action': self._resolve_quality_dispute
}
]
def can_handle(self, dispute: Dispute) -> bool:
return any(rule['condition'](dispute) for rule in self.rules)
def arbitrate(self, dispute: Dispute) -> ArbitrationResult:
for rule in self.rules:
if rule['condition'](dispute):
return rule['action'](dispute)
return ArbitrationResult(
dispute_id=dispute.id,
winner=dispute.parties[0] if dispute.parties else "",
loser=dispute.parties[1] if len(dispute.parties) > 1 else "",
decision="No specific rule matched",
reasoning="Using default resolution"
)
def _resolve_priority_dispute(self, dispute: Dispute) -> ArbitrationResult:
priorities = dispute.evidence.get('priorities', {})
if priorities:
winner = max(priorities, key=priorities.get)
loser = [p for p in dispute.parties if p != winner][0]
return ArbitrationResult(
dispute_id=dispute.id,
winner=winner,
loser=loser,
decision=f"{winner} wins with higher priority",
reasoning="Based on explicitly assigned priorities"
)
return ArbitrationResult(
dispute_id=dispute.id,
winner=dispute.parties[0],
loser=dispute.parties[1],
decision="Random resolution",
reasoning="No priority information available"
)
def _resolve_resource_dispute(self, dispute: Dispute) -> ArbitrationResult:
resources = dispute.evidence.get('resource_usage', {})
if resources:
winner = min(resources, key=resources.get)
loser = [p for p in dispute.parties if p != winner][0]
return ArbitrationResult(
dispute_id=dispute.id,
winner=winner,
loser=loser,
decision=f"{winner} wins with lower resource usage",
reasoning="Resource efficiency is prioritized"
)
return ArbitrationResult(
dispute_id=dispute.id,
winner=dispute.parties[0],
loser=dispute.parties[1],
decision="First-come-first-served",
reasoning="No resource usage data available"
)
def _resolve_quality_dispute(self, dispute: Dispute) -> ArbitrationResult:
quality_scores = dispute.evidence.get('quality_scores', {})
if quality_scores:
winner = max(quality_scores, key=quality_scores.get)
loser = [p for p in dispute.parties if p != winner][0]
return ArbitrationResult(
dispute_id=dispute.id,
winner=winner,
loser=loser,
decision=f"{winner} wins with higher quality score",
reasoning=f"Quality score: {quality_scores[winner]}",
remedies=["Lower quality Agent should adopt winner's approach"]
)
return ArbitrationResult(
dispute_id=dispute.id,
winner=dispute.parties[0],
loser=dispute.parties[1],
decision="Cannot determine quality winner",
reasoning="No quality metrics available"
)
class ArbitrationService:
"""仲裁服务"""
def __init__(self):
self.arbitrators: List[Arbitrator] = []
self.dispute_history: List[Dispute] = []
self._register_default_arbitrators()
def _register_default_arbitrators(self):
self.register_arbitrator(RuleBasedArbitrator())
def register_arbitrator(self, arbitrator: Arbitrator):
self.arbitrators.append(arbitrator)
def raise_dispute(
self,
dispute_type: str,
parties: List[str],
subject: str,
details: Any = None,
evidence: Dict[str, Any] = None
) -> Dispute:
dispute = Dispute(
id=str(uuid.uuid4()),
dispute_type=dispute_type,
parties=parties,
subject=subject,
details=details,
evidence=evidence or {}
)
self.dispute_history.append(dispute)
return dispute
def resolve_dispute(self, dispute: Dispute) -> Optional[ArbitrationResult]:
for arbitrator in self.arbitrators:
if arbitrator.can_handle(dispute):
result = arbitrator.arbitrate(dispute)
return result
return None优先级机制通过预设的优先级规则来解决冲突。
from enum import IntEnum
from typing import Dict, List, Any, Optional
class AgentPriority(IntEnum):
LOWEST = 1
LOW = 2
NORMAL = 3
HIGH = 4
HIGHEST = 5
CRITICAL = 6
class TaskPriority(IntEnum):
IDLE = 0
LOW = 1
NORMAL = 2
HIGH = 3
URGENT = 4
CRITICAL = 5
class PriorityManager:
"""优先级管理器"""
def __init__(self):
self.agent_priorities: Dict[str, AgentPriority] = {}
self.task_priorities: Dict[str, TaskPriority] = {}
self.overrides: Dict[str, int] = {}
self.priority_rules: List[callable] = []
def set_agent_priority(self, agent_id: str, priority: AgentPriority):
self.agent_priorities[agent_id] = priority
def get_agent_priority(self, agent_id: str) -> AgentPriority:
if agent_id in self.overrides:
return AgentPriority(self.overrides[agent_id])
return self.agent_priorities.get(agent_id, AgentPriority.NORMAL)
def set_task_priority(self, task_id: str, priority: TaskPriority):
self.task_priorities[task_id] = priority
def get_task_priority(self, task_id: str) -> TaskPriority:
return self.task_priorities.get(task_id, TaskPriority.NORMAL)
def override_priority(self, entity_id: str, priority: int, duration: float = None):
self.overrides[entity_id] = priority
def add_priority_rule(self, rule: callable):
self.priority_rules.append(rule)
def resolve_conflict(
self,
tasks: List[Task],
agents: List[str]
) -> Dict[str, Any]:
"""解决冲突 - 为任务分配最优 Agent"""
if not tasks or not agents:
return {}
assignments = {}
for task in sorted(tasks, key=lambda t: self.get_task_priority(t.id), reverse=True):
best_agent = None
best_score = -1
for agent_id in agents:
if agent_id in assignments.values():
continue
score = self._calculate_assignment_score(task, agent_id)
if score > best_score:
best_score = score
best_agent = agent_id
if best_agent:
assignments[task.id] = best_agent
return assignments
def _calculate_assignment_score(
self,
task: Task,
agent_id: str
) -> float:
agent_priority = self.get_agent_priority(agent_id)
task_priority = self.get_task_priority(task.id)
score = float(agent_priority * 10 + task_priority)
for rule in self.priority_rules:
score = rule(score, task, agent_id, self)
return score随着 Multi-Agent 系统规模的扩大,会面临:
Agent Pool 是一种有效的规模化解决方案。
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from enum import Enum
import asyncio
import threading
class AgentState(Enum):
IDLE = "idle"
BUSY = "busy"
BLOCKED = "blocked"
TERMINATED = "terminated"
@dataclass
class AgentInstance:
"""Agent 实例"""
instance_id: str
agent_type: str
state: AgentState = AgentState.IDLE
current_task_id: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
last_used_at: datetime = field(default_factory=datetime.now)
total_tasks_completed: int = 0
total_execution_time: float = 0.0
metadata: Dict[str, Any] = field(default_factory=dict)
class AgentPool:
"""Agent 池"""
def __init__(
self,
pool_config: Dict[str, int] = None,
max_pool_size: int = 100
):
self.pools: Dict[str, List[AgentInstance]] = {}
self.all_instances: Dict[str, AgentInstance] = {}
self.max_pool_size = max_pool_size
self.agent_factories: Dict[str, Callable] = {}
if pool_config:
for agent_type, count in pool_config.items():
self._initialize_pool(agent_type, count)
self._stats = {
'total_created': 0,
'total_destroyed': 0,
'total_tasks_executed': 0
}
self._lock = threading.RLock()
def register_factory(self, agent_type: str, factory: Callable):
self.agent_factories[agent_type] = factory
def _initialize_pool(self, agent_type: str, count: int):
with self._lock:
if agent_type not in self.pools:
self.pools[agent_type] = []
for i in range(count):
instance = self._create_instance(agent_type)
self.pools[agent_type].append(instance)
self.all_instances[instance.instance_id] = instance
def _create_instance(self, agent_type: str) -> AgentInstance:
instance_id = f"{agent_type}_{uuid.uuid4().hex[:8]}"
instance = AgentInstance(
instance_id=instance_id,
agent_type=agent_type
)
self._stats['total_created'] += 1
return instance
def acquire(
self,
agent_type: str,
timeout: float = 30.0
) -> Optional[AgentInstance]:
"""获取可用的 Agent 实例"""
start_time = datetime.now()
while True:
with self._lock:
pool = self.pools.get(agent_type, [])
for instance in pool:
if instance.state == AgentState.IDLE:
instance.state = AgentState.BUSY
instance.last_used_at = datetime.now()
return instance
total_in_pool = len(pool)
if total_in_pool < self.max_pool_size:
instance = self._create_instance(agent_type)
instance.state = AgentState.BUSY
pool.append(instance)
self.all_instances[instance.instance_id] = instance
return instance
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > timeout:
return None
import time
time.sleep(0.1)
def release(self, instance: AgentInstance):
"""释放 Agent 实例回池"""
with self._lock:
instance.state = AgentState.IDLE
instance.current_task_id = None
instance.last_used_at = datetime.now()
def destroy_instance(self, instance_id: str):
"""销毁 Agent 实例"""
with self._lock:
if instance_id not in self.all_instances:
return
instance = self.all_instances[instance_id]
instance.state = AgentState.TERMINATED
pool = self.pools.get(instance.agent_type, [])
if instance in pool:
pool.remove(instance)
del self.all_instances[instance_id]
self._stats['total_destroyed'] += 1
def get_stats(self) -> Dict[str, Any]:
"""获取池统计信息"""
with self._lock:
stats = {
'total_instances': len(self.all_instances),
'total_created': self._stats['total_created'],
'total_destroyed': self._stats['total_destroyed'],
'by_type': {}
}
for agent_type, pool in self.pools.items():
idle_count = sum(1 for i in pool if i.state == AgentState.IDLE)
busy_count = sum(1 for i in pool if i.state == AgentState.BUSY)
stats['by_type'][agent_type] = {
'total': len(pool),
'idle': idle_count,
'busy': busy_count,
'utilization': (len(pool) - idle_count) / len(pool) if pool else 0
}
return stats
def scale_pool(
self,
agent_type: str,
target_size: int
) -> int:
"""扩展或收缩池大小"""
with self._lock:
pool = self.pools.get(agent_type, [])
current_size = len(pool)
if target_size > current_size:
for _ in range(target_size - current_size):
instance = self._create_instance(agent_type)
pool.append(instance)
self.all_instances[instance.instance_id] = instance
return target_size - current_size
elif target_size < current_size:
removed = 0
for instance in list(pool):
if instance.state == AgentState.IDLE and removed < (current_size - target_size):
self.destroy_instance(instance.instance_id)
removed += 1
return removed
return 0
def get_idle_agents(self, agent_type: str = None) -> List[AgentInstance]:
"""获取所有空闲的 Agent"""
with self._lock:
if agent_type:
pool = self.pools.get(agent_type, [])
return [i for i in pool if i.state == AgentState.IDLE]
else:
return [
i for i in self.all_instances.values()
if i.state == AgentState.IDLE
]动态调度根据实时的系统状态和任务特征,动态分配任务到合适的 Agent。
class DynamicScheduler:
"""动态调度器"""
def __init__(self, agent_pool: AgentPool, priority_manager: PriorityManager):
self.agent_pool = agent_pool
self.priority_manager = priority_manager
self.scheduling_history: List[Dict] = []
self._running = False
async def start(self):
"""启动调度器"""
self._running = True
asyncio.create_task(self._scheduling_loop())
async def stop(self):
"""停止调度器"""
self._running = False
async def _scheduling_loop(self):
"""调度循环"""
while self._running:
try:
await self._process_scheduling()
except Exception as e:
print(f"Scheduling error: {e}")
await asyncio.sleep(1.0)
async def _process_scheduling(self):
"""处理调度"""
pending_tasks = self._get_pending_tasks()
pending_tasks.sort(
key=lambda t: self.priority_manager.get_task_priority(t.id),
reverse=True
)
for task in pending_tasks:
agent = self._select_best_agent(task)
if agent:
await self._assign_task(task, agent)
def _get_pending_tasks(self) -> List[Task]:
"""获取待调度任务"""
return []
def _select_best_agent(self, task: Task) -> Optional[AgentInstance]:
"""选择最优 Agent"""
agent_type = self._infer_agent_type(task)
return self.agent_pool.acquire(agent_type, timeout=5.0)
def _infer_agent_type(self, task: Task) -> str:
"""推断任务需要的 Agent 类型"""
if task.assigned_agent:
return task.assigned_agent
task_type = task.metadata.get('type', '')
if 'code' in task_type or 'implement' in task_type:
return 'coder'
elif 'review' in task_type or 'check' in task_type:
return 'reviewer'
elif 'test' in task_type:
return 'tester'
elif 'plan' in task_type or 'design' in task_type:
return 'planner'
return 'generalist'
async def _assign_task(self, task: Task, agent: AgentInstance):
"""分配任务给 Agent"""
task.assigned_agent = agent.instance_id
agent.current_task_id = task.id
self.scheduling_history.append({
'task_id': task.id,
'agent_id': agent.instance_id,
'timestamp': datetime.now().isoformat()
})
class LoadBalancer:
"""负载均衡器"""
def __init__(self, agent_pool: AgentPool):
self.agent_pool = agent_pool
self.assignment_count: Dict[str, int] = {}
def select_agent(
self,
agent_type: str,
strategy: str = "least_loaded"
) -> Optional[AgentInstance]:
"""选择 Agent"""
idle_agents = self.agent_pool.get_idle_agents(agent_type)
if not idle_agents:
return None
if strategy == "least_loaded":
return min(
idle_agents,
key=lambda a: self.assignment_count.get(a.instance_id, 0)
)
elif strategy == "round_robin":
return self._round_robin_select(idle_agents, agent_type)
elif strategy == "random":
import random
return random.choice(idle_agents)
elif strategy == "capability_based":
return self._capability_based_select(idle_agents)
return idle_agents[0]
def _round_robin_select(
self,
agents: List[AgentInstance],
agent_type: str
) -> AgentInstance:
if not agents:
return None
key = f"round_robin_{agent_type}"
current = self.assignment_count.get(key, 0)
selected = agents[current % len(agents)]
self.assignment_count[key] = (current + 1) % len(agents)
return selected
def _capability_based_select(
self,
agents: List[AgentInstance]
) -> AgentInstance:
best = agents[0]
best_score = self._evaluate_capability(best)
for agent in agents[1:]:
score = self._evaluate_capability(agent)
if score > best_score:
best = agent
best_score = score
return best
def _evaluate_capability(self, agent: AgentInstance) -> float:
completed = agent.total_tasks_completed
time_since_use = (datetime.now() - agent.last_used_at).total_seconds()
return completed - (time_since_use / 60)根据负载自动调整 Agent 池的大小。
class AutoScaler:
"""自动扩缩容器"""
def __init__(
self,
agent_pool: AgentPool,
min_size: int = 2,
max_size: int = 20,
scale_up_threshold: float = 0.8,
scale_down_threshold: float = 0.3,
scale_up_cooldown: float = 60.0,
scale_down_cooldown: float = 300.0
):
self.agent_pool = agent_pool
self.min_size = min_size
self.max_size = max_size
self.scale_up_threshold = scale_up_threshold
self.scale_down_threshold = scale_down_threshold
self.scale_up_cooldown = scale_up_cooldown
self.scale_down_cooldown = scale_down_cooldown
self.last_scale_up: Dict[str, datetime] = {}
self.last_scale_down: Dict[str, datetime] = {}
self._running = False
async def start(self):
"""启动自动扩缩容"""
self._running = True
asyncio.create_task(self._scaling_loop())
async def stop(self):
"""停止自动扩缩容"""
self._running = False
async def _scaling_loop(self):
"""扩缩容循环"""
while self._running:
try:
await self._check_and_scale()
except Exception as e:
print(f"Scaling error: {e}")
await asyncio.sleep(10.0)
async def _check_and_scale(self):
"""检查并执行扩缩容"""
stats = self.agent_pool.get_stats()
for agent_type, type_stats in stats.get('by_type', {}).items():
utilization = type_stats['utilization']
current_size = type_stats['total']
await self._handle_scale_up(agent_type, utilization, current_size)
await self._handle_scale_down(agent_type, utilization, current_size)
async def _handle_scale_up(
self,
agent_type: str,
utilization: float,
current_size: int
):
"""处理扩容"""
if utilization < self.scale_up_threshold:
return
last_up = self.last_scale_up.get(agent_type)
if last_up:
elapsed = (datetime.now() - last_up).total_seconds()
if elapsed < self.scale_up_cooldown:
return
if current_size < self.max_size:
target_size = min(current_size + 2, self.max_size)
self.agent_pool.scale_pool(agent_type, target_size)
self.last_scale_up[agent_type] = datetime.now()
print(f"Scaled up {agent_type}: {current_size} -> {target_size}")
async def _handle_scale_down(
self,
agent_type: str,
utilization: float,
current_size: int
):
"""处理缩容"""
if utilization > self.scale_down_threshold:
return
last_down = self.last_scale_down.get(agent_type)
if last_down:
elapsed = (datetime.now() - last_down).total_seconds()
if elapsed < self.scale_down_cooldown:
return
if current_size > self.min_size:
target_size = max(current_size - 2, self.min_size)
removed = self.agent_pool.scale_pool(agent_type, target_size)
self.last_scale_down[agent_type] = datetime.now()
print(f"Scaled down {agent_type}: {current_size} -> {target_size} (removed {removed})")本节实现一个完整的 Planner-Coder-Reviewer 三 Agent 协作系统。

"""
Planner-Coder-Reviewer 三 Agent 协作系统
完整实现代码
"""
import asyncio
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Any, Optional
from enum import Enum
# ========== 基础组件 ==========
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
@dataclass
class Task:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
status: TaskStatus = TaskStatus.PENDING
assigned_agent: str = ""
input_data: Any = None
output_data: Any = None
dependencies: List[str] = field(default_factory=list)
metadata: Dict = field(default_factory=dict)
@dataclass
class ReviewResult:
task_id: str
passed: bool
issues: List[str] = field(default_factory=list)
suggestions: List[str] = field(default_factory=list)
score: float = 0.0
# ========== Agent 基类 ==========
class BaseAgent:
def __init__(self, agent_id: str, name: str):
self.agent_id = agent_id
self.name = name
self.blackboard: Optional['Blackboard'] = None
self.message_bus: Optional['MessageBus'] = None
def set_blackboard(self, blackboard: 'Blackboard'):
self.blackboard = blackboard
def set_message_bus(self, message_bus: 'MessageBus'):
self.message_bus = message_bus
async def execute(self, task: Task) -> Any:
raise NotImplementedError
def log(self, message: str):
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] [{self.name}] {message}")
# ========== Planner Agent ==========
class PlannerAgent(BaseAgent):
def __init__(self):
super().__init__("planner", "Planner")
self.planning_rules = self._load_planning_rules()
def _load_planning_rules(self) -> Dict:
return {
'task_types': {
'feature': {
'steps': ['analysis', 'design', 'implementation', 'review', 'test'],
'estimated_time': 120
},
'bugfix': {
'steps': ['diagnosis', 'implementation', 'review'],
'estimated_time': 30
},
'refactor': {
'steps': ['analysis', 'planning', 'implementation', 'review', 'test'],
'estimated_time': 60
}
}
}
async def execute(self, task: Task) -> List[Dict]:
self.log(f"开始规划任务: {task.name}")
task_type = self._infer_task_type(task)
planning_rule = self.planning_rules['task_types'].get(task_type, {})
subtasks = self._create_subtasks(task, planning_rule)
if self.blackboard:
self.blackboard.write('current_plan', subtasks, self.agent_id)
self.blackboard.write('original_task', task, self.agent_id)
self.log(f"规划完成,生成 {len(subtasks)} 个子任务")
return subtasks
def _infer_task_type(self, task: Task) -> str:
description = task.description.lower()
if 'bug' in description or 'fix' in description or '修复' in description:
return 'bugfix'
elif 'refactor' in description or '重构' in description:
return 'refactor'
else:
return 'feature'
def _create_subtasks(self, task: Task, rule: Dict) -> List[Dict]:
steps = rule.get('steps', ['implementation', 'review'])
estimated_time = rule.get('estimated_time', 60)
subtasks = []
for i, step in enumerate(steps):
subtask = {
'id': f"{task.id}_sub_{i}",
'name': f"{task.name} - {step}",
'type': step,
'status': 'pending',
'agent': self._infer_agent_for_step(step),
'estimated_time': estimated_time // len(steps),
'dependencies': [subtasks[-1]['id']] if subtasks else []
}
subtasks.append(subtask)
return subtasks
def _infer_agent_for_step(self, step: str) -> str:
if step in ['implementation', 'coding']:
return 'coder'
elif step in ['review', 'analysis']:
return 'reviewer'
elif step == 'test':
return 'tester'
else:
return 'planner'
# ========== Coder Agent ==========
class CoderAgent(BaseAgent):
def __init__(self):
super().__init__("coder", "Coder")
async def execute(self, task: Task) -> Dict:
self.log(f"开始实现任务: {task.name}")
if self.blackboard:
original_task = self.blackboard.read('original_task', self.agent_id)
if original_task:
task.description = original_task.description
code = self._generate_code(task)
if self.blackboard:
self.blackboard.write(f'code_{task.id}', code, self.agent_id)
self.blackboard.write(f'code_status_{task.id}', 'implemented', self.agent_id)
self.log(f"代码实现完成: {task.name}")
return {
'task_id': task.id,
'code': code,
'status': 'implemented'
}
def _generate_code(self, task: Task) -> str:
code = f"""
# 代码实现: {task.name}
# 描述: {task.description}
class Implementation:
def execute(self):
result = {{
'task': '{task.name}',
'status': 'implemented',
'timestamp': '{datetime.now().isoformat()}'
}}
return result
if __name__ == '__main__':
impl = Implementation()
print(impl.execute())
"""
return code
# ========== Reviewer Agent ==========
class ReviewerAgent(BaseAgent):
def __init__(self):
super().__init__("reviewer", "Reviewer")
self.review_criteria = self._load_review_criteria()
def _load_review_criteria(self) -> Dict:
return {
'required_checks': [
'code_completeness',
'naming_conventions',
'error_handling',
'documentation',
'test_coverage'
],
'min_score': 0.7,
'weights': {
'code_completeness': 0.3,
'naming_conventions': 0.15,
'error_handling': 0.25,
'documentation': 0.15,
'test_coverage': 0.15
}
}
async def execute(self, task: Task) -> ReviewResult:
self.log(f"开始审查任务: {task.name}")
code = ""
if self.blackboard:
code = self.blackboard.read(f'code_{task.id}', self.agent_id)
if not code:
return ReviewResult(
task_id=task.id,
passed=False,
issues=["代码不存在"],
score=0.0
)
issues = self._perform_review(code)
suggestions = self._generate_suggestions(code, issues)
score = self._calculate_score(issues)
passed = score >= self.review_criteria['min_score']
if self.blackboard:
review_result = ReviewResult(
task_id=task.id,
passed=passed,
issues=issues,
suggestions=suggestions,
score=score
)
self.blackboard.write(f'review_result_{task.id}', review_result, self.agent_id)
self.blackboard.write(f'code_status_{task.id}', 'reviewed', self.agent_id)
self.log(f"审查完成: {task.name}, 通过: {passed}, 评分: {score:.2f}")
return ReviewResult(
task_id=task.id,
passed=passed,
issues=issues,
suggestions=suggestions,
score=score
)
def _perform_review(self, code: str) -> List[str]:
issues = []
if len(code) < 100:
issues.append("代码过于简短,可能不完整")
if 'def ' not in code:
issues.append("缺少函数定义")
if '"""' not in code and "'''" not in code:
issues.append("缺少文档字符串")
if 'TODO' in code:
issues.append("存在未完成的 TODO")
if 'try' not in code and 'except' not in code:
issues.append("缺少错误处理")
return issues
def _generate_suggestions(self, code: str, issues: List[str]) -> List[str]:
suggestions = []
if any('文档' in issue for issue in issues):
suggestions.append("建议添加完整的文档字符串")
if any('TODO' in issue for issue in issues):
suggestions.append("请完成所有 TODO 项后再提交审查")
if any('错误处理' in issue for issue in issues):
suggestions.append("建议添加 try-except 块处理可能的异常")
return suggestions
def _calculate_score(self, issues: List[str]) -> float:
base_score = 1.0
deductions = {
'代码过于简短': 0.2,
'缺少函数定义': 0.25,
'缺少文档字符串': 0.1,
'存在未完成的 TODO': 0.15,
'缺少错误处理': 0.15
}
for issue in issues:
for key, deduction in deductions.items():
if key in issue:
base_score -= deduction
break
return max(0.0, base_score)
# ========== Multi-Agent 协作系统 ==========
class MultiAgentCollaborationSystem:
def __init__(self):
self.blackboard = Blackboard("main")
self.message_bus = MessageBus()
self.agents: Dict[str, BaseAgent] = {
'planner': PlannerAgent(),
'coder': CoderAgent(),
'reviewer': ReviewerAgent()
}
for agent in self.agents.values():
agent.set_blackboard(self.blackboard)
agent.set_message_bus(self.message_bus)
self.scheduler = DynamicScheduler(
agent_pool=self._create_agent_pool(),
priority_manager=PriorityManager()
)
self.execution_history: List[Dict] = []
def _create_agent_pool(self) -> 'AgentPool':
pool = AgentPool({
'planner': 2,
'coder': 4,
'reviewer': 2
})
return pool
async def process_request(self, task_description: str) -> Dict:
task_id = str(uuid.uuid4())
start_time = datetime.now()
planning_task = Task(
id=f"{task_id}_planning",
name="任务规划",
description=task_description
)
planner = self.agents['planner']
await planner.execute(planning_task)
plan = self.blackboard.read('current_plan', 'planner')
results = []
for subtask_info in plan:
subtask = Task(
id=subtask_info['id'],
name=subtask_info['name'],
description=f"{subtask_info['name']} - {task_description}",
metadata=subtask_info
)
agent_type = subtask_info['agent']
agent = self.agents.get(agent_type)
if agent:
result = await agent.execute(subtask)
results.append(result)
final_review_task = Task(
id=f"{task_id}_final_review",
name="最终审查",
description=f"审查整体实现 - {task_description}"
)
reviewer = self.agents['reviewer']
final_result = await reviewer.execute(final_review_task)
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds()
history_entry = {
'task_id': task_id,
'description': task_description,
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat(),
'execution_time': execution_time,
'subtask_count': len(plan),
'final_review': {
'passed': final_result.passed,
'score': final_result.score,
'issues': final_result.issues
}
}
self.execution_history.append(history_entry)
return {
'task_id': task_id,
'status': 'completed',
'execution_time': execution_time,
'review_result': final_result,
'subtasks': results
}
def get_system_status(self) -> Dict:
return {
'agents': {
agent_id: {
'name': agent.name,
'status': 'running'
}
for agent_id, agent in self.agents.items()
},
'blackboard_stats': self.blackboard.get_stats(),
'execution_history': len(self.execution_history),
'recent_executions': self.execution_history[-5:]
}
# ========== 运行示例 ==========
async def main():
print("=" * 60)
print("Multi-Agent 协作系统演示")
print("=" * 60)
system = MultiAgentCollaborationSystem()
task_description = "实现一个用户认证系统,包括登录、注册、登出功能"
print(f"\n处理任务: {task_description}\n")
result = await system.process_request(task_description)
print("\n" + "=" * 60)
print("执行结果:")
print(f" 任务ID: {result['task_id']}")
print(f" 状态: {result['status']}")
print(f" 执行时间: {result['execution_time']:.2f}秒")
print(f" 审查通过: {result['review_result'].passed}")
print(f" 审查评分: {result['review_result'].score:.2f}")
if result['review_result'].issues:
print(f" 问题列表:")
for issue in result['review_result'].issues:
print(f" - {issue}")
print("=" * 60)
status = system.get_system_status()
print("\n系统状态:")
print(f" Agent 数量: {len(status['agents'])}")
print(f" 执行历史: {status['execution_history']} 条")
if __name__ == "__main__":
asyncio.run(main())原则一:单一职责原则
每个 Agent 应该只负责一个明确的职责领域。
原则二:最小化通信原则
Agent 之间的通信应该尽可能少。
原则三:容错设计原则
每个 Agent 都应该设计容错机制。
原则四:可观测性原则
系统应该提供足够的日志和监控信息。
星型架构:Planner 作为中央协调者,其他 Agent 只与 Planner 通信。
网状架构:Agent 之间可以直接通信,更加灵活但也更加复杂。
层次架构:适用于大规模系统,将 Agent 分为多个层次。
技巧一:任务批处理 - 将多个小任务合并为一个批次执行
技巧二:结果缓存 - 对于相同的查询,缓存结果以避免重复计算
技巧三:异步优先 - 优先使用异步通信,避免 Agent 相互等待
技巧四:智能调度 - 根据 Agent 的负载和任务特性,动态调整任务分配
本文系统性地探讨了 Multi-Agent 系统的设计与实现:
Multi-Agent 系统的发展趋势包括:
参考链接:
附录(Appendix):
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Set
from datetime import datetime
from enum import Enum
import uuid
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
class AgentState(Enum):
IDLE = "idle"
BUSY = "busy"
BLOCKED = "blocked"
TERMINATED = "terminated"
@dataclass
class Task:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
assigned_agent: Optional[str] = None
dependencies: Set[str] = field(default_factory=set)
dependents: Set[str] = field(default_factory=set)
status: TaskStatus = TaskStatus.PENDING
input_data: Any = None
output_data: Any = None
estimated_duration: float = 0.0
actual_duration: float = 0.0
metadata: Dict[str, Any] = field(default_factory=dict)class Blackboard:
def __init__(self, name: str = "default"):
self.name = name
self._storage: Dict[str, Any] = {}
self._lock = threading.RLock()
def write(self, key: str, value: Any, agent_id: str) -> None:
with self._lock:
self._storage[key] = value
def read(self, key: str, agent_id: str) -> Optional[Any]:
with self._lock:
return self._storage.get(key)
def delete(self, key: str) -> bool:
with self._lock:
if key in self._storage:
del self._storage[key]
return True
return Falseclass MessageBus:
def __init__(self):
self._queues: Dict[str, asyncio.Queue] = {}
self._lock = threading.RLock()
def register_agent(self, agent_id: str):
with self._lock:
self._queues[agent_id] = asyncio.Queue()
async def send(self, message: 'Message'):
if message.is_broadcast():
for agent_id in self._queues:
if agent_id != message.sender:
await self._queues[agent_id].put(message)
else:
await self._queues[message.receiver].put(message)
async def receive(self, agent_id: str, timeout: float = None) -> Optional['Message']:
try:
return await asyncio.wait_for(
self._queues[agent_id].get(),
timeout=timeout
)
except asyncio.TimeoutError:
return Noneclass BaseAgent:
def __init__(self, agent_id: str, name: str):
self.agent_id = agent_id
self.name = name
self.blackboard: Optional[Blackboard] = None
self.message_bus: Optional[MessageBus] = None
def set_blackboard(self, blackboard: Blackboard):
self.blackboard = blackboard
def set_message_bus(self, message_bus: MessageBus):
self.message_bus = message_bus
async def execute(self, task: Task) -> Any:
raise NotImplementedError
def log(self, message: str):
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] [{self.name}] {message}")关键词: Multi-Agent、Agent 协作、任务分解、Task Graph、Blackboard、Message Passing、Shared Memory、冲突解决、Agent Pool、动态调度、Planner、Coder、Reviewer、AI IDE、协作策略
