首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Multi-Agent 协作:分工、协作与冲突解决

Multi-Agent 协作:分工、协作与冲突解决

作者头像
安全风信子
发布2026-05-28 08:15:24
发布2026-05-28 08:15:24
840
举报
文章被收录于专栏:AI SPPECHAI SPPECH

作者: HOS(安全风信子) 日期: 2026-05-24 主要来源平台: GitHub 摘要: 复杂任务需要多个 Agent 协作完成。本文深入探讨 Multi-Agent 系统的设计与实现,围绕 Agent 角色划分(planner、coder、reviewer、tester)、Agent 间通信协议(Blackboard、Message Passing、Shared Memory)、任务分解与分配、协作策略(串行执行、并行执行、层次化执行)、冲突解决机制(投票、仲裁、优先级)以及多 Agent 系统规模化问题展开系统性论述。文中通过 3 个完整代码实现、3 个 Mermaid 架构图表与大量实践案例,展示如何构建高效、稳定、可扩展的 Multi-Agent 协作系统,为 AI IDE 领域的工程实践提供可直接落地的技术方案。

目录
  • 1 Multi-Agent 系统的本质与价值
  • 2 角色模型:Specialist 与 Generalist 的权衡
    • 2.1 角色划分的理论基础
    • 2.2 Specialist 的优势与局限
    • 2.3 Generalist 的优势与局限
    • 2.4 混合角色模型:实践最优解
    • 2.5 角色定义的形式化描述
  • 3 任务分解:Task Graph 与依赖管理
    • 3.1 任务分解的核心原则
    • 3.2 Task Graph 的构建
    • 3.3 依赖管理的策略
  • 4 通信协议:Blackboard、Message Passing、Shared Memory
    • 4.1 通信协议概述
    • 4.2 Blackboard 模式
    • 4.3 Message Passing 模式
    • 4.4 Shared Memory 模式
    • 4.5 通信协议对比与选择
  • 5 协作策略:串行执行、并行执行与层次化执行
    • 5.1 协作策略概述
    • 5.2 串行执行策略
    • 5.3 并行执行策略
    • 5.4 混合协作策略
  • 6 冲突解决:投票、仲裁与优先级机制
    • 6.1 冲突的类型与来源
    • 6.2 投票机制
    • 6.3 仲裁机制
    • 6.4 优先级机制
  • 7 规模化:Agent Pool 与动态调度
    • 7.1 规模化的挑战
    • 7.2 Agent Pool 架构
    • 7.3 动态调度
    • 7.4 自动扩缩容
  • 8 实践:Planner-Coder-Reviewer 三 Agent 协作系统
    • 8.1 系统架构
    • 8.2 完整实现
  • 9 最佳实践与设计模式
    • 9.1 Multi-Agent 系统设计原则
    • 9.2 常见架构模式
    • 9.3 性能优化技巧
  • 10 总结与展望
    • 10.1 核心要点回顾
    • 10.2 未来发展方向
  • 附录 A:完整代码清单
    • A.1 核心数据结构和类型定义
    • A.2 Blackboard 实现
    • A.3 MessageBus 实现
    • A.4 Agent 基类

本节为你提供的核心技术价值:理解 Multi-Agent 协作的本质——从角色划分到冲突解决的完整方法论,掌握构建生产级 Multi-Agent 系统的核心设计原则。

1 Multi-Agent 系统的本质与价值

在软件开发的复杂任务面前,单一 Agent 的能力往往存在上限。当任务涉及多领域知识的融合、长时间跨度的规划、以及对质量的多重保障时,单 Agent 架构会面临注意力分散、上下文溢出、错误累积等问题。Multi-Agent 系统通过将复杂任务分解给多个专业化 Agent,让每个 Agent 专注于其擅长的领域,通过协作完成单一 Agent 无法胜任的工作。

Multi-Agent 系统的核心价值体现在三个维度:

垂直专业化:每个 Agent 可以成为特定领域的专家。Planner 精通任务分解与规划,Coder 擅长代码生成与实现,Reviewer 专注于代码质量评估,Tester 精于测试用例设计与缺陷发现。这种专业化使得每个 Agent 的决策质量显著高于通用型 Agent。

水平并行化:独立子任务可以同时执行,显著缩短任务完成时间。在一个包含 10 个独立模块的项目中,4 个 Coder Agent 并行工作理论上可以将编码时间缩短至串行执行的 1/4。

容错与鲁棒性:多 Agent 相互校验与审核可以有效降低错误率。当 Coder 生成的代码经过 Reviewer 的严格审核后,潜在缺陷在早期被发现,避免了后期修复的高昂成本。

然而,Multi-Agent 系统也带来了额外的复杂度:Agent 间的通信开销、协作策略的设计、冲突解决机制的实现、以及系统规模化后的稳定性维护。这些问题正是本文要深入探讨的核心内容。

2 角色模型:Specialist 与 Generalist 的权衡

2.1 角色划分的理论基础

在 Multi-Agent 系统中,角色划分是协作的基础。角色决定了 Agent 的职责边界、能力范围以及与其他 Agent 的交互方式。合理的角色划分可以使系统事半功倍,而不合理的角色划分则会导致职责混乱、效率低下。

从角色设计的角度,Agent 可以分为两种范式:Specialist(专家型)Generalist(通才型)

Specialist 模式 为每个 Agent 分配高度专业化的职责。例如:

  • Planner Agent:负责任务分解、进度规划、优先级排序
  • Coder Agent:负责代码编写、实现具体功能
  • Reviewer Agent:负责代码审核、质量评估、提出改进建议
  • Tester Agent:负责测试用例设计、测试执行、缺陷报告

Generalist 模式 则让每个 Agent 具备较为全面的能力,能够处理多种类型的任务。这种模式下,Agent 之间的区分度较低,主要通过动态任务分配来实现协作。

2.2 Specialist 的优势与局限

Specialist 优势

优势项

描述

深度专业化

Agent 可在特定领域达到极高水平

决策质量

单一职责减少决策复杂度,提高准确率

可解释性

问题出现时容易定位到具体 Agent

可复用性

专业组件可在多个系统中复用

并行效率

独立专业任务可充分并行执行

Specialist 局限

  1. 通信开销:专业任务之间的转换需要额外的信息传递
  2. 单点故障:某个专业 Agent 失效可能阻塞整个流程
  3. 资源利用率不均:某些专业 Agent 可能长时间空闲
  4. 上下文断裂:跨专业任务可能出现上下文丢失
2.3 Generalist 的优势与局限

Generalist 优势

  1. 灵活性高:可适应多种类型的任务
  2. 资源均衡:任务分配更加均匀
  3. 容错性强:某个 Agent 失效时其他 Agent 可替代
  4. 上下文连贯:单一 Agent 处理完整任务流,上下文不中断

Generalist 局限

  1. 专业化程度低:每个领域的能力不如 Specialist
  2. 决策质量参差:复杂决策的质量可能不稳定
  3. 效率低下:需要不断切换上下文,增加开销
  4. 难以优化:针对单一任务的深度优化困难
2.4 混合角色模型:实践最优解

在生产环境中,混合角色模型 被证明是最有效的设计方案。这种模型结合了 Specialist 的深度优势和 Generalist 的灵活性:

核心设计原则

  1. 底层 Specialist:执行层面的 Agent 保持高度专业化
  2. 顶层 Generalist:协调层面的 Agent 保持全面视野
  3. 动态角色切换:根据任务需要,Agent 可以临时承担其他角色
2.5 角色定义的形式化描述

在实际系统中,角色可以通过以下形式化方式进行定义:

代码语言:javascript
复制
# 角色定义的形式化模型
class AgentRole:
    def __init__(self, name: str, capabilities: list, responsibilities: list):
        self.name = name
        self.capabilities = capabilities  # Agent 能做什么
        self.responsibilities = responsibilities  # Agent 负责什么
        self.input_types = []  # 接受什么类型的输入
        self.output_types = []  # 产生什么类型的输出
        self.priority = 0  # 在冲突时的优先级
    
    def can_handle(self, task: 'Task') -> bool:
        """判断该角色是否能处理给定任务"""
        return any(cap in self.capabilities for cap in task.required_capabilities)
    
    def get_affinity_score(self, task: 'Task') -> float:
        """计算角色与任务的匹配度"""
        score = 0.0
        for cap in self.capabilities:
            if cap in task.required_capabilities:
                score += 1.0
        return score / max(len(task.required_capabilities), 1)


# 角色专业化程度评估
def evaluate_specialization(agent: 'Agent') -> dict:
    """评估 Agent 的专业化程度"""
    return {
        'role_diversity': len(set(agent.roles)),
        'capability_concentration': calculate_concentration(agent.capabilities),
        'responsibility_clarity': measure_clarity(agent.responsibilities),
        'avg_task_completion_rate': agent.completion_history.mean(),
        'avg_task_quality_score': agent.quality_history.mean()
    }

3 任务分解:Task Graph 与依赖管理

3.1 任务分解的核心原则

任务分解是 Multi-Agent 系统的核心环节。良好的任务分解应该满足以下原则:

原子性原则:每个子任务应该是原子性的,即不可再分。一个原子任务要么完整执行,要么不执行,不存在部分执行的状态。

独立性原则:子任务之间应尽量减少依赖。依赖越少,可并行化的程度越高,系统效率也就越高。

可观测性原则:每个子任务应该有明确的输入、输出和验收标准,使得任务执行情况可观测、可度量。

均衡性原则:子任务的复杂度应该尽量均衡,避免出现部分子任务耗时过长导致整体进度阻塞的情况。

3.2 Task Graph 的构建

Task Graph 是任务分解的结构化表示,它不仅描述了任务之间的包含关系,还描述了任务之间的依赖关系。

代码语言:javascript
复制
from dataclasses import dataclass, field
from typing import List, Dict, Set, Optional, Any
from enum import Enum
import uuid


class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    BLOCKED = "blocked"


class TaskPriority(Enum):
    LOW = 1
    NORMAL = 2
    HIGH = 3
    CRITICAL = 4


@dataclass
class Task:
    """任务节点"""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    description: str = ""
    assigned_agent: Optional[str] = None
    
    # 依赖关系
    dependencies: Set[str] = field(default_factory=set)  # 任务执行前必须完成的任务
    dependents: Set[str] = field(default_factory=set)  # 依赖该任务的任务
    
    # 状态与属性
    status: TaskStatus = TaskStatus.PENDING
    priority: TaskPriority = TaskPriority.NORMAL
    
    # 输入输出
    input_data: Any = None
    output_data: Any = None
    
    # 度量指标
    estimated_duration: float = 0.0  # 估计执行时间(分钟)
    actual_duration: float = 0.0
    retry_count: int = 0
    max_retries: int = 3
    
    # 元数据
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def can_execute(self) -> bool:
        """检查任务是否可执行(所有依赖是否完成)"""
        if self.status != TaskStatus.PENDING:
            return False
        return True
    
    def add_dependency(self, task_id: str):
        """添加前置依赖"""
        self.dependencies.add(task_id)
    
    def add_dependent(self, task_id: str):
        """添加依赖者"""
        self.dependents.add(task_id)
    
    def get_depth(self, task_graph: 'TaskGraph') -> int:
        """计算任务在依赖图中的深度"""
        if not self.dependencies:
            return 0
        return max(
            task_graph.tasks[dep].get_depth(task_graph) + 1 
            for dep in self.dependencies
        )


class TaskGraph:
    """任务依赖图"""
    
    def __init__(self, root_task: Optional[Task] = None):
        self.tasks: Dict[str, Task] = {}
        self.root_task = root_task
        
        if root_task:
            self.tasks[root_task.id] = root_task
    
    def add_task(self, task: Task, parent_ids: List[str] = None) -> None:
        """添加任务节点"""
        self.tasks[task.id] = task
        
        # 建立父子依赖关系
        if parent_ids:
            for parent_id in parent_ids:
                if parent_id in self.tasks:
                    self.tasks[parent_id].add_dependent(task.id)
                    task.add_dependency(parent_id)
    
    def get_executable_tasks(self) -> List[Task]:
        """获取当前可执行的任务(所有依赖已满足)"""
        executable = []
        for task in self.tasks.values():
            if task.status != TaskStatus.PENDING:
                continue
            
            # 检查所有依赖是否已完成
            all_deps_completed = all(
                self.tasks[dep].status == TaskStatus.COMPLETED
                for dep in task.dependencies
            )
            
            if all_deps_completed:
                executable.append(task)
        
        return executable
    
    def get_parallel_groups(self) -> List[List[Task]]:
        """获取可并行执行的任务组"""
        groups = []
        remaining_tasks = set(self.tasks.keys())
        
        while remaining_tasks:
            # 找出一组可以并行执行的任务
            current_group = []
            
            for task_id in list(remaining_tasks):
                task = self.tasks[task_id]
                if task.status != TaskStatus.PENDING:
                    remaining_tasks.discard(task_id)
                    continue
                
                # 检查依赖是否都已完成或不在 remaining_tasks 中
                deps_in_remaining = remaining_tasks & task.dependencies
                if not deps_in_remaining:
                    current_group.append(task)
            
            if not current_group:
                break
            
            groups.append(current_group)
            
            # 将当前组任务标记为完成以解锁下一组
            for task in current_group:
                remaining_tasks.discard(task.id)
        
        return groups
    
    def validate(self) -> tuple[bool, List[str]]:
        """验证任务图的合理性"""
        errors = []
        
        # 检查循环依赖
        if self.has_cycle():
            errors.append("检测到循环依赖")
        
        # 检查孤立任务
        for task in self.tasks.values():
            if not task.dependencies and not task.dependents:
                if task != self.root_task:
                    errors.append(f"任务 {task.id} ({task.name}) 是孤立节点")
        
        return len(errors) == 0, errors
    
    def has_cycle(self) -> bool:
        """检测图中是否存在环"""
        visited = set()
        rec_stack = set()
        
        def dfs(task_id: str) -> bool:
            visited.add(task_id)
            rec_stack.add(task_id)
            
            for dep_id in self.tasks[task_id].dependencies:
                if dep_id not in visited:
                    if dfs(dep_id):
                        return True
                elif dep_id in rec_stack:
                    return True
            
            rec_stack.remove(task_id)
            return False
        
        for task_id in self.tasks:
            if task_id not in visited:
                if dfs(task_id):
                    return True
        
        return False
    
    def calculate_critical_path(self) -> List[str]:
        """计算关键路径(耗时最长的依赖链)"""
        memo = {}
        
        def longest_path_from(task_id: str) -> tuple[int, List[str]]:
            if task_id in memo:
                return memo[task_id]
            
            task = self.tasks[task_id]
            if not task.dependents:
                memo[task_id] = (task.estimated_duration, [task_id])
                return memo[task_id]
            
            max_len = 0
            max_path = []
            
            for dep_id in task.dependents:
                length, path = longest_path_from(dep_id)
                if length > max_len:
                    max_len = length
                    max_path = path
            
            total_len = task.estimated_duration + max_len
            memo[task_id] = (total_len, [task_id] + max_path)
            return memo[task_id]
        
        if not self.root_task:
            return []
        
        _, path = longest_path_from(self.root_task.id)
        return path
    
    def visualize(self) -> str:
        """生成 Mermaid 格式的图描述"""
        lines = ["flowchart TD"]
        
        # 添加节点定义
        for task in self.tasks.values():
            status_color = {
                TaskStatus.PENDING: "#gray",
                TaskStatus.RUNNING: "#yellow",
                TaskStatus.COMPLETED: "#green",
                TaskStatus.FAILED: "#red",
                TaskStatus.BLOCKED: "#orange"
            }.get(task.status, "#gray")
            
            lines.append(f'    {task.id}["{task.name} ({task.status.value})"]:::{task.status.value}')
        
        # 添加样式
        lines.append("")
        lines.append("    classDef pending fill:#f9f,stroke:#333,stroke-width:4px")
        lines.append("    classDef running fill:#ff9,stroke:#333,stroke-width:4px")
        lines.append("    classDef completed fill:#9f9,stroke:#333,stroke-width:4px")
        lines.append("    classDef failed fill:#f99,stroke:#333,stroke-width:4px")
        lines.append("    classDef blocked fill:#ff99,stroke:#333,stroke-width:4px")
        
        # 添加依赖边
        lines.append("")
        for task in self.tasks.values():
            for dep_id in task.dependencies:
                lines.append(f"    {dep_id} --> {task.id}")
        
        return "\n".join(lines)


def build_code_development_graph() -> TaskGraph:
    """构建代码开发任务的依赖图"""
    root = Task(
        id="root",
        name="完成功能开发",
        description="完整的代码开发任务"
    )
    graph = TaskGraph(root)
    
    spec_task = Task(id="task_spec", name="编写规格说明书", estimated_duration=30)
    design_task = Task(id="task_design", name="系统设计", estimated_duration=60)
    test_plan_task = Task(id="task_test_plan", name="测试计划", estimated_duration=20)
    
    graph.add_task(spec_task)
    graph.add_task(design_task)
    graph.add_task(test_plan_task)
    
    impl_task = Task(id="task_impl", name="代码实现", estimated_duration=180)
    graph.add_task(impl_task, parent_ids=["task_design"])
    
    unit_test_task = Task(id="task_unit_test", name="单元测试", estimated_duration=40)
    integration_test_task = Task(id="task_integration_test", name="集成测试", estimated_duration=60)
    
    graph.add_task(unit_test_task, parent_ids=["task_impl"])
    graph.add_task(integration_test_task, parent_ids=["task_impl", "task_test_plan"])
    
    final_review_task = Task(id="task_final_review", name="最终评审", estimated_duration=30)
    graph.add_task(final_review_task, parent_ids=["task_unit_test", "task_integration_test"])
    
    return graph
3.3 依赖管理的策略

依赖管理是 Task Graph 维护的核心功能。以下是几种常用的依赖管理策略:

策略一:严格依赖模式

在这种模式下,只有当所有前置任务完全完成后,后续任务才能开始。这种模式适用于任务之间存在强数据依赖的场景。

代码语言:javascript
复制
class StrictDependencyManager:
    """严格依赖管理器"""
    
    def __init__(self, task_graph: TaskGraph):
        self.task_graph = task_graph
    
    def can_schedule(self, task_id: str) -> bool:
        """检查任务是否可以调度"""
        task = self.task_graph.tasks[task_id]
        
        if task.status != TaskStatus.PENDING:
            return False
        
        for dep_id in task.dependencies:
            dep_task = self.task_graph.tasks[dep_id]
            if dep_task.status != TaskStatus.COMPLETED:
                return False
        
        return True
    
    def get_blocking_tasks(self, task_id: str) -> List[Task]:
        """获取阻塞指定任务的所有任务"""
        task = self.task_graph.tasks[task_id]
        blocking = []
        
        for dep_id in task.dependencies:
            dep_task = self.task_graph.tasks[dep_id]
            if dep_task.status != TaskStatus.COMPLETED:
                blocking.append(dep_task)
        
        return blocking

策略二:乐观并发模式

在这种模式下,系统会预测任务完成时间,预先调度可能就绪的任务。当预测错误时,系统需要回滚或重新调度。

代码语言:javascript
复制
class OptimisticConcurrencyManager:
    """乐观并发依赖管理器"""
    
    def __init__(self, task_graph: TaskGraph):
        self.task_graph = task_graph
        self.predictions = {}
        self.speculative_schedules = {}
    
    def predict_completion_time(self, task_id: str) -> float:
        """预测任务完成时间"""
        if task_id in self.predictions:
            return self.predictions[task_id]
        
        task = self.task_graph.tasks[task_id]
        base_time = task.estimated_duration
        
        if task.retry_count > 0:
            base_time *= (1 + 0.2 * task.retry_count)
        
        for dep_id in task.dependencies:
            dep_task = self.task_graph.tasks[dep_id]
            if dep_task.actual_duration > 0:
                base_time += dep_task.actual_duration
        
        self.predictions[task_id] = base_time
        return base_time
    
    def get_speculative_ready_tasks(self, current_time: float) -> List[Task]:
        """获取投机型就绪任务"""
        ready = []
        
        for task in self.task_graph.tasks.values():
            if task.status != TaskStatus.PENDING:
                continue
            
            max_dep_time = 0
            all_deps_will_complete = True
            
            for dep_id in task.dependencies:
                dep_task = self.task_graph.tasks[dep_id]
                if dep_task.status == TaskStatus.COMPLETED:
                    max_dep_time = max(max_dep_time, dep_task.actual_duration)
                else:
                    pred_time = self.predict_completion_time(dep_id)
                    if current_time + task.estimated_duration < pred_time:
                        all_deps_will_complete = False
                        break
                    max_dep_time = max(max_dep_time, pred_time)
            
            if all_deps_will_complete:
                ready.append(task)
        
        return ready

策略三:时间窗口依赖模式

在这种模式下,依赖关系与时间窗口关联。一个任务可能在前置任务完成后的某个时间窗口内随时开始,而不是严格等待完成。

代码语言:javascript
复制
class TimeWindowDependencyManager:
    """时间窗口依赖管理器"""
    
    def __init__(self, task_graph: TaskGraph, default_window_minutes: int = 30):
        self.task_graph = task_graph
        self.default_window = default_window_minutes
        self.completion_windows = {}
        self.time_windows = {}
    
    def set_dependency_window(
        self, 
        task_id: str, 
        dependency_id: str, 
        window_start: int,
        window_end: int
    ):
        """设置依赖的时间窗口"""
        if task_id not in self.time_windows:
            self.time_windows[task_id] = {}
        
        self.time_windows[task_id][dependency_id] = (window_start, window_end)
    
    def can_start_at(
        self, 
        task_id: str, 
        current_time: float, 
        dependency_completion_times: Dict[str, float]
    ) -> bool:
        """判断任务是否可以在指定时间开始"""
        task = self.task_graph.tasks[task_id]
        
        for dep_id in task.dependencies:
            dep_completion = dependency_completion_times.get(dep_id)
            if dep_completion is None:
                return False
            
            if task_id in self.time_windows:
                if dep_id in self.time_windows[task_id]:
                    window_start, window_end = self.time_windows[task_id][dep_id]
                    time_since_dep = current_time - dep_completion
                    
                    if time_since_dep < window_start:
                        return False
                    if time_since_dep > window_end:
                        return False
            
            elif current_time - dep_completion > self.default_window:
                return False
        
        return True

4 通信协议:Blackboard、Message Passing、Shared Memory

4.1 通信协议概述

Multi-Agent 系统的核心挑战之一是 Agent 之间的通信。不同的通信协议适用于不同的场景,选择合适的通信协议对系统性能有决定性影响。

4.2 Blackboard 模式

Blackboard(黑板)模式是一种经典的 AI 协作模式。其核心思想是多个 Agent 通过读写共享的"黑板"来进行间接通信。

Blackboard 的核心特性

  1. 中央数据存储:所有 Agent 共享一个集中式的知识库
  2. 异步通信:Agent 不需要直接交互,通过读写黑板进行协作
  3. 松耦合:Agent 之间不需要知道彼此的存在
  4. 动态组织:知识可以随时添加、修改、删除
代码语言:javascript
复制
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from enum import Enum
import threading
import uuid
import json


class BlackboardEventType(Enum):
    WRITE = "write"
    UPDATE = "update"
    DELETE = "delete"
    READ = "read"
    SUBSCRIBE = "subscribe"


@dataclass
class BlackboardEntry:
    """黑板条目"""
    id: str
    key: str
    value: Any
    timestamp: datetime
    author: str
    version: int = 1
    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass
class BlackboardEvent:
    """黑板事件"""
    event_type: BlackboardEventType
    entry: Optional[BlackboardEntry]
    key: str
    agent_id: str
    timestamp: datetime


class Blackboard:
    """Blackboard 实现 - Multi-Agent 共享知识空间"""
    
    def __init__(self, name: str = "default"):
        self.name = name
        self._storage: Dict[str, BlackboardEntry] = {}
        self._version_history: Dict[str, List[BlackboardEntry]] = {}
        self._lock = threading.RLock()
        self._observers: Dict[str, List[Callable]] = {}
        self._key_subscribers: Dict[str, List[str]] = {}
        self._event_history: List[BlackboardEvent] = []
    
    def write(self, key: str, value: Any, agent_id: str, metadata: Dict = None) -> BlackboardEntry:
        """写入黑板"""
        with self._lock:
            entry = BlackboardEntry(
                id=str(uuid.uuid4()),
                key=key,
                value=value,
                timestamp=datetime.now(),
                author=agent_id,
                version=1,
                metadata=metadata or {}
            )
            
            if key in self._storage:
                old_entry = self._storage[key]
                entry.version = old_entry.version + 1
                
                if key not in self._version_history:
                    self._version_history[key] = []
                self._version_history[key].append(old_entry)
            
            self._storage[key] = entry
            self._event_history.append(
                BlackboardEvent(
                    event_type=BlackboardEventType.WRITE,
                    entry=entry,
                    key=key,
                    agent_id=agent_id,
                    timestamp=datetime.now()
                )
            )
            
            return entry
    
    def read(self, key: str, agent_id: str) -> Optional[Any]:
        """读取黑板内容"""
        with self._lock:
            if key not in self._storage:
                return None
            
            entry = self._storage[key]
            
            self._event_history.append(
                BlackboardEvent(
                    event_type=BlackboardEventType.READ,
                    entry=entry,
                    key=key,
                    agent_id=agent_id,
                    timestamp=datetime.now()
                )
            )
            
            return entry.value
    
    def read_entry(self, key: str) -> Optional[BlackboardEntry]:
        """读取完整条目"""
        with self._lock:
            return self._storage.get(key)
    
    def update(self, key: str, value: Any, agent_id: str, metadata: Dict = None) -> Optional[BlackboardEntry]:
        """更新黑板内容"""
        with self._lock:
            if key not in self._storage:
                return None
            
            old_entry = self._storage[key]
            
            new_entry = BlackboardEntry(
                id=str(uuid.uuid4()),
                key=key,
                value=value,
                timestamp=datetime.now(),
                author=agent_id,
                version=old_entry.version + 1,
                metadata=metadata or {}
            )
            
            self._storage[key] = new_entry
            
            if key not in self._version_history:
                self._version_history[key] = []
            self._version_history[key].append(old_entry)
            
            self._event_history.append(
                BlackboardEvent(
                    event_type=BlackboardEventType.UPDATE,
                    entry=new_entry,
                    key=key,
                    agent_id=agent_id,
                    timestamp=datetime.now()
                )
            )
            
            return new_entry
    
    def delete(self, key: str, agent_id: str) -> bool:
        """删除黑板条目"""
        with self._lock:
            if key not in self._storage:
                return False
            
            entry = self._storage[key]
            del self._storage[key]
            
            self._event_history.append(
                BlackboardEvent(
                    event_type=BlackboardEventType.DELETE,
                    entry=entry,
                    key=key,
                    agent_id=agent_id,
                    timestamp=datetime.now()
                )
            )
            
            return True
    
    def subscribe(self, key: str, agent_id: str):
        """订阅 key 的变化"""
        with self._lock:
            if key not in self._key_subscribers:
                self._key_subscribers[key] = []
            if agent_id not in self._key_subscribers[key]:
                self._key_subscribers[key].append(agent_id)
    
    def get_history(self, key: str, limit: int = 10) -> List[BlackboardEntry]:
        """获取 key 的历史版本"""
        with self._lock:
            history = self._version_history.get(key, [])
            return history[-limit:]
    
    def get_stats(self) -> Dict[str, Any]:
        """获取黑板统计信息"""
        with self._lock:
            return {
                'total_entries': len(self._storage),
                'total_events': len(self._event_history),
                'subscribed_keys': len(self._key_subscribers)
            }


class AgentWithBlackboard:
    """使用 Blackboard 的 Agent 基类"""
    
    def __init__(self, agent_id: str, blackboard: Blackboard):
        self.agent_id = agent_id
        self.blackboard = blackboard
    
    def write_task_result(self, task_id: str, result: Any):
        """写入任务结果到黑板"""
        key = f"task_result:{task_id}"
        self.blackboard.write(key, result, self.agent_id)
    
    def read_task_result(self, task_id: str) -> Any:
        """从黑板读取任务结果"""
        key = f"task_result:{task_id}"
        return self.blackboard.read(key, self.agent_id)
4.3 Message Passing 模式

Message Passing(消息传递)模式是 Multi-Agent 系统中最常用的通信方式。

Message Passing 的核心特性

  1. 直接通信:Agent 之间直接交换消息
  2. 明确的协议:消息有明确的格式和语义
  3. 同步/异步可选:可以根据需求选择同步或异步通信
  4. 点对点或广播:支持一对一一对多、多对一等多种通信模式
代码语言:javascript
复制
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from enum import Enum
import threading
import uuid
import asyncio
from collections import defaultdict


class MessageType(Enum):
    REQUEST = "request"
    RESPONSE = "response"
    NOTIFICATION = "notification"
    BROADCAST = "broadcast"
    HEARTBEAT = "heartbeat"


class MessagePriority(Enum):
    LOW = 1
    NORMAL = 2
    HIGH = 3
    CRITICAL = 4


@dataclass
class Message:
    """消息"""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    msg_type: MessageType = MessageType.REQUEST
    sender: str = ""
    receiver: str = ""
    subject: str = ""
    content: Any = None
    timestamp: datetime = field(default_factory=datetime.now)
    priority: MessagePriority = MessagePriority.NORMAL
    correlation_id: str = ""
    headers: Dict[str, str] = field(default_factory=dict)
    
    def is_broadcast(self) -> bool:
        return self.receiver == ""
    
    def is_direct(self) -> bool:
        return self.receiver != ""


class MessageBus:
    """消息总线"""
    
    def __init__(self):
        self._queues: Dict[str, asyncio.Queue] = {}
        self._handlers: Dict[str, List[Callable]] = {}
        self._lock = threading.RLock()
        self._subscribers: Dict[str, List[str]] = defaultdict(list)
        self._message_history: List[Message] = []
        self._max_history = 10000
        
        self._stats = {
            'total_messages': 0,
            'total_broadcasts': 0,
            'total_direct': 0
        }
    
    def register_agent(self, agent_id: str, queue_size: int = 100):
        """注册 Agent"""
        with self._lock:
            if agent_id not in self._queues:
                self._queues[agent_id] = asyncio.Queue(maxsize=queue_size)
    
    def unregister_agent(self, agent_id: str):
        """注销 Agent"""
        with self._lock:
            if agent_id in self._queues:
                del self._queues[agent_id]
    
    async def send(self, message: Message):
        """发送消息"""
        with self._lock:
            self._message_history.append(message)
            if len(self._message_history) > self._max_history:
                self._message_history = self._message_history[-self._max_history:]
            
            self._stats['total_messages'] += 1
            
            if message.is_broadcast():
                self._stats['total_broadcasts'] += 1
            else:
                self._stats['total_direct'] += 1
        
        if message.is_broadcast():
            await self._broadcast(message)
        else:
            await self._deliver_direct(message)
    
    async def _broadcast(self, message: Message):
        """广播消息"""
        topic = message.subject
        
        with self._lock:
            subscribers = list(self._subscribers.get(topic, []))
        
        if not subscribers:
            with self._lock:
                subscribers = list(self._queues.keys())
        
        tasks = []
        for agent_id in subscribers:
            if agent_id != message.sender:
                msg_copy = Message(
                    id=str(uuid.uuid4()),
                    msg_type=message.msg_type,
                    sender=message.sender,
                    receiver=agent_id,
                    subject=message.subject,
                    content=message.content,
                    timestamp=datetime.now(),
                    priority=message.priority,
                    correlation_id=message.id,
                    headers=message.headers.copy()
                )
                tasks.append(self._enqueue(agent_id, msg_copy))
        
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _deliver_direct(self, message: Message):
        """直接发送消息"""
        if message.receiver in self._queues:
            await self._enqueue(message.receiver, message)
    
    async def _enqueue(self, agent_id: str, message: Message):
        """将消息放入队列"""
        try:
            queue = self._queues.get(agent_id)
            if queue:
                await asyncio.wait_for(queue.put(message), timeout=5.0)
        except asyncio.QueueFull:
            print(f"Queue full for agent {agent_id}")
        except Exception as e:
            print(f"Error enqueuing message: {e}")
    
    async def receive(self, agent_id: str, timeout: float = None) -> Optional[Message]:
        """接收消息"""
        if agent_id not in self._queues:
            return None
        
        try:
            if timeout:
                return await asyncio.wait_for(
                    self._queues[agent_id].get(), 
                    timeout=timeout
                )
            else:
                return await self._queues[agent_id].get()
        except asyncio.TimeoutError:
            return None
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        with self._lock:
            stats = self._stats.copy()
            stats['registered_agents'] = len(self._queues)
            return stats
4.4 Shared Memory 模式

Shared Memory(共享内存)模式是最简单直接的通信方式。

Shared Memory 的核心特性

  1. 高速访问:无需序列化/反序列化,开销最小
  2. 紧密耦合:Agent 之间耦合度高
  3. 锁机制必需:需要同步机制防止竞争条件
  4. 适合单机部署:通常用于同一进程内的 Agent 通信
代码语言:javascript
复制
import threading
from typing import Any, Dict, List, Optional
from datetime import datetime


class SharedMemoryStore:
    """共享内存存储"""
    
    def __init__(self):
        self._storage: Dict[str, Any] = {}
        self._locks: Dict[str, threading.Lock] = {}
        self._thread_lock = threading.Lock()
    
    def _get_lock(self, key: str) -> threading.Lock:
        """获取键的锁"""
        with self._thread_lock:
            if key not in self._locks:
                self._locks[key] = threading.Lock()
            return self._locks[key]
    
    def set(self, key: str, value: Any):
        """设置值"""
        lock = self._get_lock(key)
        
        with lock:
            self._storage[key] = value
    
    def get(self, key: str, default: Any = None) -> Any:
        """获取值"""
        lock = self._get_lock(key)
        
        with lock:
            return self._storage.get(key, default)
    
    def update(self, key: str, update_func: callable):
        """原子更新"""
        lock = self._get_lock(key)
        
        with lock:
            old_value = self._storage.get(key)
            new_value = update_func(old_value)
            self._storage[key] = new_value
            return new_value
    
    def delete(self, key: str):
        """删除值"""
        lock = self._get_lock(key)
        
        with lock:
            if key in self._storage:
                del self._storage[key]
    
    def exists(self, key: str) -> bool:
        """检查键是否存在"""
        lock = self._get_lock(key)
        
        with lock:
            return key in self._storage


class AtomicCounter:
    """原子计数器"""
    
    def __init__(self, initial_value: int = 0):
        self._value = {'counter': initial_value}
        self._lock = threading.Lock()
    
    def increment(self, delta: int = 1) -> int:
        """递增"""
        with self._lock:
            self._value['counter'] += delta
            return self._value['counter']
    
    def decrement(self, delta: int = 1) -> int:
        """递减"""
        return self.increment(-delta)
    
    def get(self) -> int:
        """获取当前值"""
        with self._lock:
            return self._value['counter']


class AgentWithSharedMemory:
    """使用共享内存的 Agent"""
    
    def __init__(self, agent_id: str, shared_storage: SharedMemoryStore):
        self.agent_id = agent_id
        self.shared_storage = shared_storage
    
    def write_code(self, task_id: str, code: str):
        """写入代码到共享内存"""
        key = f"code:{task_id}"
        self.shared_storage.set(key, {
            'code': code,
            'author': self.agent_id,
            'timestamp': datetime.now().isoformat()
        })
    
    def read_code(self, task_id: str) -> Optional[Dict]:
        """从共享内存读取代码"""
        key = f"code:{task_id}"
        return self.shared_storage.get(key)
    
    def update_progress(self, task_id: str, progress: float):
        """更新进度"""
        key = f"progress:{task_id}"
        self.shared_storage.set(key, {
            'progress': progress,
            'agent': self.agent_id,
            'timestamp': datetime.now().isoformat()
        })
4.5 通信协议对比与选择

特性

Blackboard

Message Passing

Shared Memory

耦合度

松耦合

中等耦合

紧耦合

通信延迟

中等

可高可低

最低

同步方式

异步

同步/异步可选

同步

扩展性

实现复杂度

中等

适用场景

知识共享、协作编辑

任务分配、结果汇报

高速数据交换

容错性

中等

消息持久化

可选

通常不持久化

不持久化

5 协作策略:串行执行、并行执行与层次化执行

5.1 协作策略概述

协作策略决定了多个 Agent 如何协同完成复杂任务。不同的策略适用于不同的场景。

5.2 串行执行策略

串行执行是最简单的协作策略,任务按严格的顺序执行。

适用场景

  1. 任务之间存在强依赖关系
  2. 资源受限,无法支持多个 Agent 同时运行
  3. 任务流程简单,不需要复杂的协调
代码语言:javascript
复制
class SequentialCoordinator:
    """串行执行协调器"""
    
    def __init__(self, agents: Dict[str, Any], task_graph: TaskGraph):
        self.agents = agents
        self.task_graph = task_graph
        self.current_task_id: Optional[str] = None
        self.execution_log: List[Dict] = []
    
    async def execute(self) -> bool:
        """执行串行计划"""
        print(f"Starting sequential execution with {len(self.agents)} agents")
        
        tasks_by_depth = self._group_tasks_by_depth()
        
        for depth, tasks in tasks_by_depth.items():
            print(f"\n=== Depth {depth} ===")
            
            for task in tasks:
                success = await self._execute_task(task)
                self.execution_log.append({
                    'task_id': task.id,
                    'depth': depth,
                    'success': success,
                    'timestamp': datetime.now().isoformat()
                })
                
                if not success:
                    return False
        
        return True
    
    def _group_tasks_by_depth(self) -> Dict[int, List[Task]]:
        """按深度分组任务"""
        depths = {}
        for task in self.task_graph.tasks.values():
            depth = task.get_depth(self.task_graph)
            if depth not in depths:
                depths[depth] = []
            depths[depth].append(task)
        
        return depths
    
    async def _execute_task(self, task: Task) -> bool:
        """执行单个任务"""
        print(f"Executing task: {task.id} - {task.name}")
        
        agent_id = self._get_agent_for_task(task)
        agent = self.agents.get(agent_id)
        
        if not agent:
            return False
        
        dep_outputs = {}
        for dep_id in task.dependencies:
            dep_task = self.task_graph.tasks[dep_id]
            dep_outputs[dep_id] = dep_task.output_data
        
        task.status = TaskStatus.RUNNING
        self.current_task_id = task.id
        
        try:
            output = await agent.execute(task.input_data, dep_outputs)
            task.output_data = output
            task.status = TaskStatus.COMPLETED
            return True
        except Exception as e:
            print(f"Task {task.id} failed: {e}")
            task.status = TaskStatus.FAILED
            return False
        finally:
            self.current_task_id = None
    
    def _get_agent_for_task(self, task: Task) -> Optional[str]:
        """确定任务应该由哪个 Agent 执行"""
        if task.assigned_agent:
            return task.assigned_agent
        
        task_type = task.metadata.get('type', '')
        
        if 'code' in task_type or 'implement' in task_type:
            return 'coder'
        elif 'review' in task_type or 'check' in task_type:
            return 'reviewer'
        elif 'test' in task_type:
            return 'tester'
        elif 'plan' in task_type or 'design' in task_type:
            return 'planner'
        
        return None
5.3 并行执行策略

并行执行允许多个 Agent 同时工作,显著提高任务完成效率。

代码语言:javascript
复制
class ParallelCoordinator:
    """并行执行协调器"""
    
    def __init__(
        self, 
        agents: Dict[str, Any], 
        task_graph: TaskGraph,
        max_parallel_tasks: int = 4
    ):
        self.agents = agents
        self.task_graph = task_graph
        self.max_parallel_tasks = max_parallel_tasks
        self.execution_log: List[Dict] = []
        self._running_tasks: Dict[str, asyncio.Task] = {}
        self._semaphore = asyncio.Semaphore(max_parallel_tasks)
    
    async def execute(self) -> bool:
        """执行并行计划"""
        print(f"Starting parallel execution with max {self.max_parallel_tasks} concurrent tasks")
        
        while True:
            executable = self.task_graph.get_executable_tasks()
            
            if not executable and not self._running_tasks:
                break
            
            for task in executable[:self.max_parallel_tasks]:
                asyncio.create_task(self._execute_task_with_semaphore(task))
            
            await asyncio.sleep(0.1)
        
        failed_tasks = [
            task for task in self.task_graph.tasks.values()
            if task.status == TaskStatus.FAILED
        ]
        
        if failed_tasks:
            return False
        
        return True
    
    async def _execute_task_with_semaphore(self, task: Task):
        """使用信号量控制并发执行任务"""
        async with self._semaphore:
            await self._execute_task(task)
    
    async def _execute_task(self, task: Task) -> bool:
        """执行单个任务"""
        print(f"Starting task: {task.id} - {task.name}")
        
        agent_id = self._get_agent_for_task(task)
        agent = self.agents.get(agent_id)
        
        if not agent:
            task.status = TaskStatus.FAILED
            return False
        
        task.status = TaskStatus.RUNNING
        self._running_tasks[task.id] = asyncio.current_task()
        
        dep_outputs = {}
        for dep_id in task.dependencies:
            dep_task = self.task_graph.tasks[dep_id]
            dep_outputs[dep_id] = dep_task.output_data
        
        try:
            output = await agent.execute(task.input_data, dep_outputs)
            task.output_data = output
            task.status = TaskStatus.COMPLETED
            
            self.execution_log.append({
                'task_id': task.id,
                'agent_id': agent_id,
                'status': 'completed',
                'timestamp': datetime.now().isoformat()
            })
            
            return True
            
        except Exception as e:
            task.status = TaskStatus.FAILED
            
            self.execution_log.append({
                'task_id': task.id,
                'agent_id': agent_id,
                'status': 'failed',
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            })
            
            return False
        finally:
            if task.id in self._running_tasks:
                del self._running_tasks[task.id]
    
    def _get_agent_for_task(self, task: Task) -> Optional[str]:
        """确定任务应该由哪个 Agent 执行"""
        if task.assigned_agent:
            return task.assigned_agent
        
        task_type = task.metadata.get('type', '')
        
        if 'code' in task_type or 'implement' in task_type:
            return 'coder'
        elif 'review' in task_type or 'check' in task_type:
            return 'reviewer'
        elif 'test' in task_type:
            return 'tester'
        elif 'plan' in task_type or 'design' in task_type:
            return 'planner'
        
        return None


class HierarchicalCoordinator:
    """层次化执行协调器"""
    
    def __init__(self, task_graph: TaskGraph):
        self.task_graph = task_graph
        self.levels: Dict[int, List[Task]] = {}
        self.level_coordinators: Dict[int, ParallelCoordinator] = {}
        self.execution_results: Dict[str, Any] = {}
    
    def build_hierarchy(self) -> int:
        """构建层次结构"""
        max_depth = 0
        
        for task in self.task_graph.tasks.values():
            depth = task.get_depth(self.task_graph)
            max_depth = max(max_depth, depth)
            
            if depth not in self.levels:
                self.levels[depth] = []
            self.levels[depth].append(task)
        
        return max_depth
    
    async def execute(self, agents: Dict[str, Any], max_parallel: int = 4) -> bool:
        """层次化执行"""
        max_depth = self.build_hierarchy()
        print(f"Hierarchical execution with {max_depth + 1} levels")
        
        for level in range(max_depth + 1):
            if level not in self.levels:
                continue
            
            print(f"\n=== Level {level} ===")
            tasks = self.levels[level]
            
            coordinator = ParallelCoordinator(
                agents, 
                self.task_graph, 
                max_parallel_tasks=max_parallel
            )
            self.level_coordinators[level] = coordinator
            
            success = await coordinator.execute()
            
            if not success:
                return False
        
        return True
5.4 混合协作策略

在实际应用中,纯粹的串行或并行策略往往不是最优解。混合协作策略结合两者的优点。

代码语言:javascript
复制
class HybridCoordinator:
    """混合协作策略协调器"""
    
    def __init__(
        self, 
        agents: Dict[str, Any], 
        task_graph: TaskGraph,
        strategy_config: Dict[str, Any] = None
    ):
        self.agents = agents
        self.task_graph = task_graph
        self.config = strategy_config or self._default_config()
        
        self.sequential_threshold = self.config.get('sequential_threshold', 0.3)
        self.max_parallel = self.config.get('max_parallel', 4)
        
        self.task_strategies: Dict[str, str] = {}
        self._analyze_strategies()
    
    def _default_config(self) -> Dict[str, Any]:
        """默认配置"""
        return {
            'sequential_threshold': 0.3,
            'parallel_threshold': 0.7,
            'max_parallel': 4,
            'enable_hierarchy': True,
            'hierarchy_depth_threshold': 3
        }
    
    def _analyze_strategies(self):
        """分析任务图,确定每个任务的最佳策略"""
        for task in self.task_graph.tasks.values():
            if not task.dependencies:
                self.task_strategies[task.id] = 'sequential'
            elif len(task.dependencies) > 3:
                self.task_strategies[task.id] = 'sequential'
            else:
                self.task_strategies[task.id] = 'parallel'
    
    async def execute(self) -> bool:
        """混合执行"""
        print("Starting hybrid execution")
        
        levels = self._group_by_depth()
        
        for depth, tasks in levels.items():
            print(f"\n=== Processing depth {depth} ===")
            
            has_strong_deps = any(
                self.task_strategies.get(t.id) == 'sequential' 
                for t in tasks
            )
            
            if has_strong_deps:
                print(f"Using sequential execution for {len(tasks)} tasks")
                success = await self._execute_sequential(tasks)
            else:
                print(f"Using parallel execution for {len(tasks)} tasks")
                success = await self._execute_parallel(tasks)
            
            if not success:
                return False
        
        return True
    
    def _group_by_depth(self) -> Dict[int, List[Task]]:
        """按深度分组"""
        levels = {}
        for task in self.task_graph.tasks.values():
            depth = task.get_depth(self.task_graph)
            if depth not in levels:
                levels[depth] = []
            levels[depth].append(task)
        return dict(sorted(levels.items()))
    
    async def _execute_sequential(self, tasks: List[Task]) -> bool:
        """串行执行任务组"""
        for task in tasks:
            agent_id = self._get_agent_for_task(task)
            agent = self.agents.get(agent_id)
            
            if not agent:
                return False
            
            task.status = TaskStatus.RUNNING
            
            try:
                output = await agent.execute(task.input_data, {})
                task.output_data = output
                task.status = TaskStatus.COMPLETED
            except Exception:
                task.status = TaskStatus.FAILED
                return False
        
        return True
    
    async def _execute_parallel(self, tasks: List[Task]) -> bool:
        """并行执行任务组"""
        semaphore = asyncio.Semaphore(self.max_parallel)
        
        async def execute_with_semaphore(task: Task):
            async with semaphore:
                return await self._execute_single(task)
        
        results = await asyncio.gather(
            *[execute_with_semaphore(t) for t in tasks],
            return_exceptions=True
        )
        
        return all(r is True or r is None for r in results)
    
    async def _execute_single(self, task: Task) -> bool:
        """执行单个任务"""
        agent_id = self._get_agent_for_task(task)
        agent = self.agents.get(agent_id)
        
        if not agent:
            return False
        
        task.status = TaskStatus.RUNNING
        
        try:
            output = await agent.execute(task.input_data, {})
            task.output_data = output
            task.status = TaskStatus.COMPLETED
            return True
        except Exception:
            task.status = TaskStatus.FAILED
            return False
    
    def _get_agent_for_task(self, task: Task) -> Optional[str]:
        """确定任务应该由哪个 Agent 执行"""
        if task.assigned_agent:
            return task.assigned_agent
        
        task_type = task.metadata.get('type', '')
        
        if 'code' in task_type or 'implement' in task_type:
            return 'coder'
        elif 'review' in task_type or 'check' in task_type:
            return 'reviewer'
        elif 'test' in task_type:
            return 'tester'
        elif 'plan' in task_type or 'design' in task_type:
            return 'planner'
        
        return None

6 冲突解决:投票、仲裁与优先级机制

6.1 冲突的类型与来源

在 Multi-Agent 系统中,冲突是不可避免的。

冲突的主要类型

  1. 资源冲突:多个 Agent 竞争同一资源
  2. 目标冲突:不同 Agent 的目标相互矛盾
  3. 方案冲突:对同一问题提出不同的解决方案
  4. 优先级冲突:对任务执行顺序有不同意见
  5. 数据冲突:对数据状态有不一致的看法
6.2 投票机制

投票机制是一种民主化的冲突解决方法。

代码语言:javascript
复制
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime
from enum import Enum
import random


class VoteStrategy(Enum):
    MAJORITY = "majority"
    WEIGHTED = "weighted"
    UNANIMOUS = "unanimous"
    PREFERENCE = "preference"


@dataclass
class Vote:
    """投票"""
    voter_id: str
    candidate_id: str
    weight: float = 1.0
    reasoning: str = ""
    timestamp: datetime = field(default_factory=datetime.now)


@dataclass
class VotingResult:
    """投票结果"""
    winner_id: str
    total_votes: int
    vote_counts: Dict[str, int]
    weighted_counts: Dict[str, float]
    confidence: float
    timestamp: datetime = field(default_factory=datetime.now)


class VotingSystem:
    """投票系统"""
    
    def __init__(self, strategy: VoteStrategy = VoteStrategy.MAJORITY):
        self.strategy = strategy
        self.votes: List[Vote] = []
        self.voter_weights: Dict[str, float] = {}
    
    def set_voter_weight(self, voter_id: str, weight: float):
        """设置投票者权重"""
        self.voter_weights[voter_id] = weight
    
    def cast_vote(
        self, 
        voter_id: str, 
        candidate_id: str, 
        reasoning: str = ""
    ) -> Vote:
        """投票"""
        weight = self.voter_weights.get(voter_id, 1.0)
        
        vote = Vote(
            voter_id=voter_id,
            candidate_id=candidate_id,
            weight=weight,
            reasoning=reasoning
        )
        
        self.votes.append(vote)
        return vote
    
    def tally_votes(self) -> VotingResult:
        """计票"""
        if not self.votes:
            return VotingResult(
                winner_id="",
                total_votes=0,
                vote_counts={},
                weighted_counts={},
                confidence=0.0
            )
        
        vote_counts: Dict[str, int] = {}
        weighted_counts: Dict[str, float] = {}
        
        for vote in self.votes:
            vote_counts[vote.candidate_id] = vote_counts.get(vote.candidate_id, 0) + 1
            weighted_counts[vote.candidate_id] = (
                weighted_counts.get(vote.candidate_id, 0.0) + vote.weight
            )
        
        if self.strategy == VoteStrategy.MAJORITY:
            winner_id = max(vote_counts, key=vote_counts.get)
            confidence = vote_counts[winner_id] / len(self.votes)
        elif self.strategy == VoteStrategy.WEIGHTED:
            winner_id = max(weighted_counts, key=weighted_counts.get)
            total_weight = sum(weighted_counts.values())
            confidence = weighted_counts[winner_id] / total_weight if total_weight > 0 else 0
        else:
            winner_id = max(vote_counts, key=vote_counts.get)
            confidence = vote_counts[winner_id] / len(self.votes)
        
        return VotingResult(
            winner_id=winner_id,
            total_votes=len(self.votes),
            vote_counts=vote_counts,
            weighted_counts=weighted_counts,
            confidence=confidence
        )
    
    def clear_votes(self):
        """清除投票记录"""
        self.votes = []
6.3 仲裁机制

仲裁机制通过引入第三方来解决冲突。

代码语言:javascript
复制
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional


@dataclass
class Dispute:
    """争议"""
    id: str
    dispute_type: str
    parties: List[str]
    subject: str
    details: Any
    evidence: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)


@dataclass
class ArbitrationResult:
    """仲裁结果"""
    dispute_id: str
    winner: str
    loser: str
    decision: str
    reasoning: str
    remedies: List[str] = field(default_factory=list)


class Arbitrator(ABC):
    """仲裁者抽象基类"""
    
    @abstractmethod
    def arbitrate(self, dispute: Dispute) -> ArbitrationResult:
        pass
    
    @abstractmethod
    def can_handle(self, dispute: Dispute) -> bool:
        pass


class RuleBasedArbitrator(Arbitrator):
    """基于规则的仲裁者"""
    
    def __init__(self):
        self.rules: List[Dict] = []
        self._load_default_rules()
    
    def _load_default_rules(self):
        """加载默认规则"""
        self.rules = [
            {
                'name': 'priority_rule',
                'condition': lambda d: d.dispute_type == 'priority',
                'action': self._resolve_priority_dispute
            },
            {
                'name': 'resource_rule',
                'condition': lambda d: d.dispute_type == 'resource',
                'action': self._resolve_resource_dispute
            },
            {
                'name': 'quality_rule',
                'condition': lambda d: d.dispute_type == 'quality',
                'action': self._resolve_quality_dispute
            }
        ]
    
    def can_handle(self, dispute: Dispute) -> bool:
        return any(rule['condition'](dispute) for rule in self.rules)
    
    def arbitrate(self, dispute: Dispute) -> ArbitrationResult:
        for rule in self.rules:
            if rule['condition'](dispute):
                return rule['action'](dispute)
        
        return ArbitrationResult(
            dispute_id=dispute.id,
            winner=dispute.parties[0] if dispute.parties else "",
            loser=dispute.parties[1] if len(dispute.parties) > 1 else "",
            decision="No specific rule matched",
            reasoning="Using default resolution"
        )
    
    def _resolve_priority_dispute(self, dispute: Dispute) -> ArbitrationResult:
        priorities = dispute.evidence.get('priorities', {})
        
        if priorities:
            winner = max(priorities, key=priorities.get)
            loser = [p for p in dispute.parties if p != winner][0]
            
            return ArbitrationResult(
                dispute_id=dispute.id,
                winner=winner,
                loser=loser,
                decision=f"{winner} wins with higher priority",
                reasoning="Based on explicitly assigned priorities"
            )
        
        return ArbitrationResult(
            dispute_id=dispute.id,
            winner=dispute.parties[0],
            loser=dispute.parties[1],
            decision="Random resolution",
            reasoning="No priority information available"
        )
    
    def _resolve_resource_dispute(self, dispute: Dispute) -> ArbitrationResult:
        resources = dispute.evidence.get('resource_usage', {})
        
        if resources:
            winner = min(resources, key=resources.get)
            loser = [p for p in dispute.parties if p != winner][0]
            
            return ArbitrationResult(
                dispute_id=dispute.id,
                winner=winner,
                loser=loser,
                decision=f"{winner} wins with lower resource usage",
                reasoning="Resource efficiency is prioritized"
            )
        
        return ArbitrationResult(
            dispute_id=dispute.id,
            winner=dispute.parties[0],
            loser=dispute.parties[1],
            decision="First-come-first-served",
            reasoning="No resource usage data available"
        )
    
    def _resolve_quality_dispute(self, dispute: Dispute) -> ArbitrationResult:
        quality_scores = dispute.evidence.get('quality_scores', {})
        
        if quality_scores:
            winner = max(quality_scores, key=quality_scores.get)
            loser = [p for p in dispute.parties if p != winner][0]
            
            return ArbitrationResult(
                dispute_id=dispute.id,
                winner=winner,
                loser=loser,
                decision=f"{winner} wins with higher quality score",
                reasoning=f"Quality score: {quality_scores[winner]}",
                remedies=["Lower quality Agent should adopt winner's approach"]
            )
        
        return ArbitrationResult(
            dispute_id=dispute.id,
            winner=dispute.parties[0],
            loser=dispute.parties[1],
            decision="Cannot determine quality winner",
            reasoning="No quality metrics available"
        )


class ArbitrationService:
    """仲裁服务"""
    
    def __init__(self):
        self.arbitrators: List[Arbitrator] = []
        self.dispute_history: List[Dispute] = []
        self._register_default_arbitrators()
    
    def _register_default_arbitrators(self):
        self.register_arbitrator(RuleBasedArbitrator())
    
    def register_arbitrator(self, arbitrator: Arbitrator):
        self.arbitrators.append(arbitrator)
    
    def raise_dispute(
        self,
        dispute_type: str,
        parties: List[str],
        subject: str,
        details: Any = None,
        evidence: Dict[str, Any] = None
    ) -> Dispute:
        dispute = Dispute(
            id=str(uuid.uuid4()),
            dispute_type=dispute_type,
            parties=parties,
            subject=subject,
            details=details,
            evidence=evidence or {}
        )
        
        self.dispute_history.append(dispute)
        return dispute
    
    def resolve_dispute(self, dispute: Dispute) -> Optional[ArbitrationResult]:
        for arbitrator in self.arbitrators:
            if arbitrator.can_handle(dispute):
                result = arbitrator.arbitrate(dispute)
                return result
        
        return None
6.4 优先级机制

优先级机制通过预设的优先级规则来解决冲突。

代码语言:javascript
复制
from enum import IntEnum
from typing import Dict, List, Any, Optional


class AgentPriority(IntEnum):
    LOWEST = 1
    LOW = 2
    NORMAL = 3
    HIGH = 4
    HIGHEST = 5
    CRITICAL = 6


class TaskPriority(IntEnum):
    IDLE = 0
    LOW = 1
    NORMAL = 2
    HIGH = 3
    URGENT = 4
    CRITICAL = 5


class PriorityManager:
    """优先级管理器"""
    
    def __init__(self):
        self.agent_priorities: Dict[str, AgentPriority] = {}
        self.task_priorities: Dict[str, TaskPriority] = {}
        self.overrides: Dict[str, int] = {}
        self.priority_rules: List[callable] = []
    
    def set_agent_priority(self, agent_id: str, priority: AgentPriority):
        self.agent_priorities[agent_id] = priority
    
    def get_agent_priority(self, agent_id: str) -> AgentPriority:
        if agent_id in self.overrides:
            return AgentPriority(self.overrides[agent_id])
        return self.agent_priorities.get(agent_id, AgentPriority.NORMAL)
    
    def set_task_priority(self, task_id: str, priority: TaskPriority):
        self.task_priorities[task_id] = priority
    
    def get_task_priority(self, task_id: str) -> TaskPriority:
        return self.task_priorities.get(task_id, TaskPriority.NORMAL)
    
    def override_priority(self, entity_id: str, priority: int, duration: float = None):
        self.overrides[entity_id] = priority
    
    def add_priority_rule(self, rule: callable):
        self.priority_rules.append(rule)
    
    def resolve_conflict(
        self, 
        tasks: List[Task], 
        agents: List[str]
    ) -> Dict[str, Any]:
        """解决冲突 - 为任务分配最优 Agent"""
        if not tasks or not agents:
            return {}
        
        assignments = {}
        
        for task in sorted(tasks, key=lambda t: self.get_task_priority(t.id), reverse=True):
            best_agent = None
            best_score = -1
            
            for agent_id in agents:
                if agent_id in assignments.values():
                    continue
                
                score = self._calculate_assignment_score(task, agent_id)
                if score > best_score:
                    best_score = score
                    best_agent = agent_id
            
            if best_agent:
                assignments[task.id] = best_agent
        
        return assignments
    
    def _calculate_assignment_score(
        self, 
        task: Task, 
        agent_id: str
    ) -> float:
        agent_priority = self.get_agent_priority(agent_id)
        task_priority = self.get_task_priority(task.id)
        
        score = float(agent_priority * 10 + task_priority)
        
        for rule in self.priority_rules:
            score = rule(score, task, agent_id, self)
        
        return score

7 规模化:Agent Pool 与动态调度

7.1 规模化的挑战

随着 Multi-Agent 系统规模的扩大,会面临:

  1. 资源竞争:更多 Agent 竞争有限的计算资源
  2. 通信爆炸:Agent 数量增加导致通信复杂度呈指数增长
  3. 协调开销:管理大量 Agent 的协调成本增加
  4. 容错复杂性:故障影响范围更大
  5. 状态一致性:保持全局状态一致性更加困难
7.2 Agent Pool 架构

Agent Pool 是一种有效的规模化解决方案。

代码语言:javascript
复制
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from enum import Enum
import asyncio
import threading


class AgentState(Enum):
    IDLE = "idle"
    BUSY = "busy"
    BLOCKED = "blocked"
    TERMINATED = "terminated"


@dataclass
class AgentInstance:
    """Agent 实例"""
    instance_id: str
    agent_type: str
    state: AgentState = AgentState.IDLE
    current_task_id: Optional[str] = None
    created_at: datetime = field(default_factory=datetime.now)
    last_used_at: datetime = field(default_factory=datetime.now)
    total_tasks_completed: int = 0
    total_execution_time: float = 0.0
    metadata: Dict[str, Any] = field(default_factory=dict)


class AgentPool:
    """Agent 池"""
    
    def __init__(
        self, 
        pool_config: Dict[str, int] = None,
        max_pool_size: int = 100
    ):
        self.pools: Dict[str, List[AgentInstance]] = {}
        self.all_instances: Dict[str, AgentInstance] = {}
        self.max_pool_size = max_pool_size
        
        self.agent_factories: Dict[str, Callable] = {}
        
        if pool_config:
            for agent_type, count in pool_config.items():
                self._initialize_pool(agent_type, count)
        
        self._stats = {
            'total_created': 0,
            'total_destroyed': 0,
            'total_tasks_executed': 0
        }
        
        self._lock = threading.RLock()
    
    def register_factory(self, agent_type: str, factory: Callable):
        self.agent_factories[agent_type] = factory
    
    def _initialize_pool(self, agent_type: str, count: int):
        with self._lock:
            if agent_type not in self.pools:
                self.pools[agent_type] = []
            
            for i in range(count):
                instance = self._create_instance(agent_type)
                self.pools[agent_type].append(instance)
                self.all_instances[instance.instance_id] = instance
    
    def _create_instance(self, agent_type: str) -> AgentInstance:
        instance_id = f"{agent_type}_{uuid.uuid4().hex[:8]}"
        
        instance = AgentInstance(
            instance_id=instance_id,
            agent_type=agent_type
        )
        
        self._stats['total_created'] += 1
        
        return instance
    
    def acquire(
        self, 
        agent_type: str, 
        timeout: float = 30.0
    ) -> Optional[AgentInstance]:
        """获取可用的 Agent 实例"""
        start_time = datetime.now()
        
        while True:
            with self._lock:
                pool = self.pools.get(agent_type, [])
                
                for instance in pool:
                    if instance.state == AgentState.IDLE:
                        instance.state = AgentState.BUSY
                        instance.last_used_at = datetime.now()
                        return instance
                
                total_in_pool = len(pool)
                if total_in_pool < self.max_pool_size:
                    instance = self._create_instance(agent_type)
                    instance.state = AgentState.BUSY
                    pool.append(instance)
                    self.all_instances[instance.instance_id] = instance
                    return instance
                
                elapsed = (datetime.now() - start_time).total_seconds()
                if elapsed > timeout:
                    return None
            
            import time
            time.sleep(0.1)
    
    def release(self, instance: AgentInstance):
        """释放 Agent 实例回池"""
        with self._lock:
            instance.state = AgentState.IDLE
            instance.current_task_id = None
            instance.last_used_at = datetime.now()
    
    def destroy_instance(self, instance_id: str):
        """销毁 Agent 实例"""
        with self._lock:
            if instance_id not in self.all_instances:
                return
            
            instance = self.all_instances[instance_id]
            instance.state = AgentState.TERMINATED
            
            pool = self.pools.get(instance.agent_type, [])
            if instance in pool:
                pool.remove(instance)
            
            del self.all_instances[instance_id]
            self._stats['total_destroyed'] += 1
    
    def get_stats(self) -> Dict[str, Any]:
        """获取池统计信息"""
        with self._lock:
            stats = {
                'total_instances': len(self.all_instances),
                'total_created': self._stats['total_created'],
                'total_destroyed': self._stats['total_destroyed'],
                'by_type': {}
            }
            
            for agent_type, pool in self.pools.items():
                idle_count = sum(1 for i in pool if i.state == AgentState.IDLE)
                busy_count = sum(1 for i in pool if i.state == AgentState.BUSY)
                
                stats['by_type'][agent_type] = {
                    'total': len(pool),
                    'idle': idle_count,
                    'busy': busy_count,
                    'utilization': (len(pool) - idle_count) / len(pool) if pool else 0
                }
            
            return stats
    
    def scale_pool(
        self, 
        agent_type: str, 
        target_size: int
    ) -> int:
        """扩展或收缩池大小"""
        with self._lock:
            pool = self.pools.get(agent_type, [])
            current_size = len(pool)
            
            if target_size > current_size:
                for _ in range(target_size - current_size):
                    instance = self._create_instance(agent_type)
                    pool.append(instance)
                    self.all_instances[instance.instance_id] = instance
                return target_size - current_size
            
            elif target_size < current_size:
                removed = 0
                for instance in list(pool):
                    if instance.state == AgentState.IDLE and removed < (current_size - target_size):
                        self.destroy_instance(instance.instance_id)
                        removed += 1
                return removed
            
            return 0
    
    def get_idle_agents(self, agent_type: str = None) -> List[AgentInstance]:
        """获取所有空闲的 Agent"""
        with self._lock:
            if agent_type:
                pool = self.pools.get(agent_type, [])
                return [i for i in pool if i.state == AgentState.IDLE]
            else:
                return [
                    i for i in self.all_instances.values() 
                    if i.state == AgentState.IDLE
                ]
7.3 动态调度

动态调度根据实时的系统状态和任务特征,动态分配任务到合适的 Agent。

代码语言:javascript
复制
class DynamicScheduler:
    """动态调度器"""
    
    def __init__(self, agent_pool: AgentPool, priority_manager: PriorityManager):
        self.agent_pool = agent_pool
        self.priority_manager = priority_manager
        self.scheduling_history: List[Dict] = []
        self._running = False
    
    async def start(self):
        """启动调度器"""
        self._running = True
        asyncio.create_task(self._scheduling_loop())
    
    async def stop(self):
        """停止调度器"""
        self._running = False
    
    async def _scheduling_loop(self):
        """调度循环"""
        while self._running:
            try:
                await self._process_scheduling()
            except Exception as e:
                print(f"Scheduling error: {e}")
            await asyncio.sleep(1.0)
    
    async def _process_scheduling(self):
        """处理调度"""
        pending_tasks = self._get_pending_tasks()
        
        pending_tasks.sort(
            key=lambda t: self.priority_manager.get_task_priority(t.id),
            reverse=True
        )
        
        for task in pending_tasks:
            agent = self._select_best_agent(task)
            
            if agent:
                await self._assign_task(task, agent)
    
    def _get_pending_tasks(self) -> List[Task]:
        """获取待调度任务"""
        return []
    
    def _select_best_agent(self, task: Task) -> Optional[AgentInstance]:
        """选择最优 Agent"""
        agent_type = self._infer_agent_type(task)
        
        return self.agent_pool.acquire(agent_type, timeout=5.0)
    
    def _infer_agent_type(self, task: Task) -> str:
        """推断任务需要的 Agent 类型"""
        if task.assigned_agent:
            return task.assigned_agent
        
        task_type = task.metadata.get('type', '')
        
        if 'code' in task_type or 'implement' in task_type:
            return 'coder'
        elif 'review' in task_type or 'check' in task_type:
            return 'reviewer'
        elif 'test' in task_type:
            return 'tester'
        elif 'plan' in task_type or 'design' in task_type:
            return 'planner'
        
        return 'generalist'
    
    async def _assign_task(self, task: Task, agent: AgentInstance):
        """分配任务给 Agent"""
        task.assigned_agent = agent.instance_id
        agent.current_task_id = task.id
        
        self.scheduling_history.append({
            'task_id': task.id,
            'agent_id': agent.instance_id,
            'timestamp': datetime.now().isoformat()
        })


class LoadBalancer:
    """负载均衡器"""
    
    def __init__(self, agent_pool: AgentPool):
        self.agent_pool = agent_pool
        self.assignment_count: Dict[str, int] = {}
    
    def select_agent(
        self, 
        agent_type: str, 
        strategy: str = "least_loaded"
    ) -> Optional[AgentInstance]:
        """选择 Agent"""
        idle_agents = self.agent_pool.get_idle_agents(agent_type)
        
        if not idle_agents:
            return None
        
        if strategy == "least_loaded":
            return min(
                idle_agents,
                key=lambda a: self.assignment_count.get(a.instance_id, 0)
            )
        elif strategy == "round_robin":
            return self._round_robin_select(idle_agents, agent_type)
        elif strategy == "random":
            import random
            return random.choice(idle_agents)
        elif strategy == "capability_based":
            return self._capability_based_select(idle_agents)
        
        return idle_agents[0]
    
    def _round_robin_select(
        self, 
        agents: List[AgentInstance],
        agent_type: str
    ) -> AgentInstance:
        if not agents:
            return None
        
        key = f"round_robin_{agent_type}"
        current = self.assignment_count.get(key, 0)
        selected = agents[current % len(agents)]
        
        self.assignment_count[key] = (current + 1) % len(agents)
        return selected
    
    def _capability_based_select(
        self, 
        agents: List[AgentInstance]
    ) -> AgentInstance:
        best = agents[0]
        best_score = self._evaluate_capability(best)
        
        for agent in agents[1:]:
            score = self._evaluate_capability(agent)
            if score > best_score:
                best = agent
                best_score = score
        
        return best
    
    def _evaluate_capability(self, agent: AgentInstance) -> float:
        completed = agent.total_tasks_completed
        time_since_use = (datetime.now() - agent.last_used_at).total_seconds()
        
        return completed - (time_since_use / 60)
7.4 自动扩缩容

根据负载自动调整 Agent 池的大小。

代码语言:javascript
复制
class AutoScaler:
    """自动扩缩容器"""
    
    def __init__(
        self,
        agent_pool: AgentPool,
        min_size: int = 2,
        max_size: int = 20,
        scale_up_threshold: float = 0.8,
        scale_down_threshold: float = 0.3,
        scale_up_cooldown: float = 60.0,
        scale_down_cooldown: float = 300.0
    ):
        self.agent_pool = agent_pool
        self.min_size = min_size
        self.max_size = max_size
        
        self.scale_up_threshold = scale_up_threshold
        self.scale_down_threshold = scale_down_threshold
        self.scale_up_cooldown = scale_up_cooldown
        self.scale_down_cooldown = scale_down_cooldown
        
        self.last_scale_up: Dict[str, datetime] = {}
        self.last_scale_down: Dict[str, datetime] = {}
        
        self._running = False
    
    async def start(self):
        """启动自动扩缩容"""
        self._running = True
        asyncio.create_task(self._scaling_loop())
    
    async def stop(self):
        """停止自动扩缩容"""
        self._running = False
    
    async def _scaling_loop(self):
        """扩缩容循环"""
        while self._running:
            try:
                await self._check_and_scale()
            except Exception as e:
                print(f"Scaling error: {e}")
            await asyncio.sleep(10.0)
    
    async def _check_and_scale(self):
        """检查并执行扩缩容"""
        stats = self.agent_pool.get_stats()
        
        for agent_type, type_stats in stats.get('by_type', {}).items():
            utilization = type_stats['utilization']
            current_size = type_stats['total']
            
            await self._handle_scale_up(agent_type, utilization, current_size)
            await self._handle_scale_down(agent_type, utilization, current_size)
    
    async def _handle_scale_up(
        self, 
        agent_type: str, 
        utilization: float,
        current_size: int
    ):
        """处理扩容"""
        if utilization < self.scale_up_threshold:
            return
        
        last_up = self.last_scale_up.get(agent_type)
        if last_up:
            elapsed = (datetime.now() - last_up).total_seconds()
            if elapsed < self.scale_up_cooldown:
                return
        
        if current_size < self.max_size:
            target_size = min(current_size + 2, self.max_size)
            self.agent_pool.scale_pool(agent_type, target_size)
            self.last_scale_up[agent_type] = datetime.now()
            print(f"Scaled up {agent_type}: {current_size} -> {target_size}")
    
    async def _handle_scale_down(
        self, 
        agent_type: str, 
        utilization: float,
        current_size: int
    ):
        """处理缩容"""
        if utilization > self.scale_down_threshold:
            return
        
        last_down = self.last_scale_down.get(agent_type)
        if last_down:
            elapsed = (datetime.now() - last_down).total_seconds()
            if elapsed < self.scale_down_cooldown:
                return
        
        if current_size > self.min_size:
            target_size = max(current_size - 2, self.min_size)
            removed = self.agent_pool.scale_pool(agent_type, target_size)
            self.last_scale_down[agent_type] = datetime.now()
            print(f"Scaled down {agent_type}: {current_size} -> {target_size} (removed {removed})")

8 实践:Planner-Coder-Reviewer 三 Agent 协作系统

8.1 系统架构

本节实现一个完整的 Planner-Coder-Reviewer 三 Agent 协作系统。

8.2 完整实现
代码语言:javascript
复制
"""
Planner-Coder-Reviewer 三 Agent 协作系统
完整实现代码
"""

import asyncio
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Any, Optional
from enum import Enum


# ========== 基础组件 ==========

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    BLOCKED = "blocked"


@dataclass
class Task:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    description: str = ""
    status: TaskStatus = TaskStatus.PENDING
    assigned_agent: str = ""
    input_data: Any = None
    output_data: Any = None
    dependencies: List[str] = field(default_factory=list)
    metadata: Dict = field(default_factory=dict)


@dataclass
class ReviewResult:
    task_id: str
    passed: bool
    issues: List[str] = field(default_factory=list)
    suggestions: List[str] = field(default_factory=list)
    score: float = 0.0


# ========== Agent 基类 ==========

class BaseAgent:
    def __init__(self, agent_id: str, name: str):
        self.agent_id = agent_id
        self.name = name
        self.blackboard: Optional['Blackboard'] = None
        self.message_bus: Optional['MessageBus'] = None
    
    def set_blackboard(self, blackboard: 'Blackboard'):
        self.blackboard = blackboard
    
    def set_message_bus(self, message_bus: 'MessageBus'):
        self.message_bus = message_bus
    
    async def execute(self, task: Task) -> Any:
        raise NotImplementedError
    
    def log(self, message: str):
        timestamp = datetime.now().strftime("%H:%M:%S")
        print(f"[{timestamp}] [{self.name}] {message}")


# ========== Planner Agent ==========

class PlannerAgent(BaseAgent):
    def __init__(self):
        super().__init__("planner", "Planner")
        self.planning_rules = self._load_planning_rules()
    
    def _load_planning_rules(self) -> Dict:
        return {
            'task_types': {
                'feature': {
                    'steps': ['analysis', 'design', 'implementation', 'review', 'test'],
                    'estimated_time': 120
                },
                'bugfix': {
                    'steps': ['diagnosis', 'implementation', 'review'],
                    'estimated_time': 30
                },
                'refactor': {
                    'steps': ['analysis', 'planning', 'implementation', 'review', 'test'],
                    'estimated_time': 60
                }
            }
        }
    
    async def execute(self, task: Task) -> List[Dict]:
        self.log(f"开始规划任务: {task.name}")
        
        task_type = self._infer_task_type(task)
        planning_rule = self.planning_rules['task_types'].get(task_type, {})
        
        subtasks = self._create_subtasks(task, planning_rule)
        
        if self.blackboard:
            self.blackboard.write('current_plan', subtasks, self.agent_id)
            self.blackboard.write('original_task', task, self.agent_id)
        
        self.log(f"规划完成,生成 {len(subtasks)} 个子任务")
        
        return subtasks
    
    def _infer_task_type(self, task: Task) -> str:
        description = task.description.lower()
        
        if 'bug' in description or 'fix' in description or '修复' in description:
            return 'bugfix'
        elif 'refactor' in description or '重构' in description:
            return 'refactor'
        else:
            return 'feature'
    
    def _create_subtasks(self, task: Task, rule: Dict) -> List[Dict]:
        steps = rule.get('steps', ['implementation', 'review'])
        estimated_time = rule.get('estimated_time', 60)
        
        subtasks = []
        
        for i, step in enumerate(steps):
            subtask = {
                'id': f"{task.id}_sub_{i}",
                'name': f"{task.name} - {step}",
                'type': step,
                'status': 'pending',
                'agent': self._infer_agent_for_step(step),
                'estimated_time': estimated_time // len(steps),
                'dependencies': [subtasks[-1]['id']] if subtasks else []
            }
            subtasks.append(subtask)
        
        return subtasks
    
    def _infer_agent_for_step(self, step: str) -> str:
        if step in ['implementation', 'coding']:
            return 'coder'
        elif step in ['review', 'analysis']:
            return 'reviewer'
        elif step == 'test':
            return 'tester'
        else:
            return 'planner'


# ========== Coder Agent ==========

class CoderAgent(BaseAgent):
    def __init__(self):
        super().__init__("coder", "Coder")
    
    async def execute(self, task: Task) -> Dict:
        self.log(f"开始实现任务: {task.name}")
        
        if self.blackboard:
            original_task = self.blackboard.read('original_task', self.agent_id)
            if original_task:
                task.description = original_task.description
        
        code = self._generate_code(task)
        
        if self.blackboard:
            self.blackboard.write(f'code_{task.id}', code, self.agent_id)
            self.blackboard.write(f'code_status_{task.id}', 'implemented', self.agent_id)
        
        self.log(f"代码实现完成: {task.name}")
        
        return {
            'task_id': task.id,
            'code': code,
            'status': 'implemented'
        }
    
    def _generate_code(self, task: Task) -> str:
        code = f"""
# 代码实现: {task.name}
# 描述: {task.description}

class Implementation:
    def execute(self):
        result = {{
            'task': '{task.name}',
            'status': 'implemented',
            'timestamp': '{datetime.now().isoformat()}'
        }}
        return result

if __name__ == '__main__':
    impl = Implementation()
    print(impl.execute())
"""
        return code


# ========== Reviewer Agent ==========

class ReviewerAgent(BaseAgent):
    def __init__(self):
        super().__init__("reviewer", "Reviewer")
        self.review_criteria = self._load_review_criteria()
    
    def _load_review_criteria(self) -> Dict:
        return {
            'required_checks': [
                'code_completeness',
                'naming_conventions',
                'error_handling',
                'documentation',
                'test_coverage'
            ],
            'min_score': 0.7,
            'weights': {
                'code_completeness': 0.3,
                'naming_conventions': 0.15,
                'error_handling': 0.25,
                'documentation': 0.15,
                'test_coverage': 0.15
            }
        }
    
    async def execute(self, task: Task) -> ReviewResult:
        self.log(f"开始审查任务: {task.name}")
        
        code = ""
        if self.blackboard:
            code = self.blackboard.read(f'code_{task.id}', self.agent_id)
        
        if not code:
            return ReviewResult(
                task_id=task.id,
                passed=False,
                issues=["代码不存在"],
                score=0.0
            )
        
        issues = self._perform_review(code)
        suggestions = self._generate_suggestions(code, issues)
        score = self._calculate_score(issues)
        passed = score >= self.review_criteria['min_score']
        
        if self.blackboard:
            review_result = ReviewResult(
                task_id=task.id,
                passed=passed,
                issues=issues,
                suggestions=suggestions,
                score=score
            )
            self.blackboard.write(f'review_result_{task.id}', review_result, self.agent_id)
            self.blackboard.write(f'code_status_{task.id}', 'reviewed', self.agent_id)
        
        self.log(f"审查完成: {task.name}, 通过: {passed}, 评分: {score:.2f}")
        
        return ReviewResult(
            task_id=task.id,
            passed=passed,
            issues=issues,
            suggestions=suggestions,
            score=score
        )
    
    def _perform_review(self, code: str) -> List[str]:
        issues = []
        
        if len(code) < 100:
            issues.append("代码过于简短,可能不完整")
        
        if 'def ' not in code:
            issues.append("缺少函数定义")
        
        if '"""' not in code and "'''" not in code:
            issues.append("缺少文档字符串")
        
        if 'TODO' in code:
            issues.append("存在未完成的 TODO")
        
        if 'try' not in code and 'except' not in code:
            issues.append("缺少错误处理")
        
        return issues
    
    def _generate_suggestions(self, code: str, issues: List[str]) -> List[str]:
        suggestions = []
        
        if any('文档' in issue for issue in issues):
            suggestions.append("建议添加完整的文档字符串")
        
        if any('TODO' in issue for issue in issues):
            suggestions.append("请完成所有 TODO 项后再提交审查")
        
        if any('错误处理' in issue for issue in issues):
            suggestions.append("建议添加 try-except 块处理可能的异常")
        
        return suggestions
    
    def _calculate_score(self, issues: List[str]) -> float:
        base_score = 1.0
        
        deductions = {
            '代码过于简短': 0.2,
            '缺少函数定义': 0.25,
            '缺少文档字符串': 0.1,
            '存在未完成的 TODO': 0.15,
            '缺少错误处理': 0.15
        }
        
        for issue in issues:
            for key, deduction in deductions.items():
                if key in issue:
                    base_score -= deduction
                    break
        
        return max(0.0, base_score)


# ========== Multi-Agent 协作系统 ==========

class MultiAgentCollaborationSystem:
    def __init__(self):
        self.blackboard = Blackboard("main")
        self.message_bus = MessageBus()
        
        self.agents: Dict[str, BaseAgent] = {
            'planner': PlannerAgent(),
            'coder': CoderAgent(),
            'reviewer': ReviewerAgent()
        }
        
        for agent in self.agents.values():
            agent.set_blackboard(self.blackboard)
            agent.set_message_bus(self.message_bus)
        
        self.scheduler = DynamicScheduler(
            agent_pool=self._create_agent_pool(),
            priority_manager=PriorityManager()
        )
        
        self.execution_history: List[Dict] = []
    
    def _create_agent_pool(self) -> 'AgentPool':
        pool = AgentPool({
            'planner': 2,
            'coder': 4,
            'reviewer': 2
        })
        return pool
    
    async def process_request(self, task_description: str) -> Dict:
        task_id = str(uuid.uuid4())
        start_time = datetime.now()
        
        planning_task = Task(
            id=f"{task_id}_planning",
            name="任务规划",
            description=task_description
        )
        
        planner = self.agents['planner']
        await planner.execute(planning_task)
        
        plan = self.blackboard.read('current_plan', 'planner')
        
        results = []
        
        for subtask_info in plan:
            subtask = Task(
                id=subtask_info['id'],
                name=subtask_info['name'],
                description=f"{subtask_info['name']} - {task_description}",
                metadata=subtask_info
            )
            
            agent_type = subtask_info['agent']
            agent = self.agents.get(agent_type)
            
            if agent:
                result = await agent.execute(subtask)
                results.append(result)
        
        final_review_task = Task(
            id=f"{task_id}_final_review",
            name="最终审查",
            description=f"审查整体实现 - {task_description}"
        )
        
        reviewer = self.agents['reviewer']
        final_result = await reviewer.execute(final_review_task)
        
        end_time = datetime.now()
        execution_time = (end_time - start_time).total_seconds()
        
        history_entry = {
            'task_id': task_id,
            'description': task_description,
            'start_time': start_time.isoformat(),
            'end_time': end_time.isoformat(),
            'execution_time': execution_time,
            'subtask_count': len(plan),
            'final_review': {
                'passed': final_result.passed,
                'score': final_result.score,
                'issues': final_result.issues
            }
        }
        
        self.execution_history.append(history_entry)
        
        return {
            'task_id': task_id,
            'status': 'completed',
            'execution_time': execution_time,
            'review_result': final_result,
            'subtasks': results
        }
    
    def get_system_status(self) -> Dict:
        return {
            'agents': {
                agent_id: {
                    'name': agent.name,
                    'status': 'running'
                }
                for agent_id, agent in self.agents.items()
            },
            'blackboard_stats': self.blackboard.get_stats(),
            'execution_history': len(self.execution_history),
            'recent_executions': self.execution_history[-5:]
        }


# ========== 运行示例 ==========

async def main():
    print("=" * 60)
    print("Multi-Agent 协作系统演示")
    print("=" * 60)
    
    system = MultiAgentCollaborationSystem()
    
    task_description = "实现一个用户认证系统,包括登录、注册、登出功能"
    
    print(f"\n处理任务: {task_description}\n")
    
    result = await system.process_request(task_description)
    
    print("\n" + "=" * 60)
    print("执行结果:")
    print(f"  任务ID: {result['task_id']}")
    print(f"  状态: {result['status']}")
    print(f"  执行时间: {result['execution_time']:.2f}秒")
    print(f"  审查通过: {result['review_result'].passed}")
    print(f"  审查评分: {result['review_result'].score:.2f}")
    
    if result['review_result'].issues:
        print(f"  问题列表:")
        for issue in result['review_result'].issues:
            print(f"    - {issue}")
    
    print("=" * 60)
    
    status = system.get_system_status()
    print("\n系统状态:")
    print(f"  Agent 数量: {len(status['agents'])}")
    print(f"  执行历史: {status['execution_history']} 条")


if __name__ == "__main__":
    asyncio.run(main())

9 最佳实践与设计模式

9.1 Multi-Agent 系统设计原则

原则一:单一职责原则

每个 Agent 应该只负责一个明确的职责领域。

原则二:最小化通信原则

Agent 之间的通信应该尽可能少。

原则三:容错设计原则

每个 Agent 都应该设计容错机制。

原则四:可观测性原则

系统应该提供足够的日志和监控信息。

9.2 常见架构模式

星型架构:Planner 作为中央协调者,其他 Agent 只与 Planner 通信。

网状架构:Agent 之间可以直接通信,更加灵活但也更加复杂。

层次架构:适用于大规模系统,将 Agent 分为多个层次。

9.3 性能优化技巧

技巧一:任务批处理 - 将多个小任务合并为一个批次执行

技巧二:结果缓存 - 对于相同的查询,缓存结果以避免重复计算

技巧三:异步优先 - 优先使用异步通信,避免 Agent 相互等待

技巧四:智能调度 - 根据 Agent 的负载和任务特性,动态调整任务分配

10 总结与展望

10.1 核心要点回顾

本文系统性地探讨了 Multi-Agent 系统的设计与实现:

  1. 角色模型:Specialist 与 Generalist 各有优劣,混合角色模型是实践最优解
  2. 任务分解:Task Graph 是任务分解的有效工具
  3. 通信协议:Blackboard、Message Passing、Shared Memory 三种模式各有适用场景
  4. 协作策略:串行、并行、层次化执行策略需要根据任务特性灵活选择
  5. 冲突解决:投票、仲裁、优先级机制为 Agent 协作提供了冲突解决手段
  6. 规模化:Agent Pool 与动态调度是应对大规模系统的有效方案
10.2 未来发展方向

Multi-Agent 系统的发展趋势包括:

  1. 自适应协作:Agent 能够根据任务特性自动调整协作策略
  2. 意图理解:更深层次的意图理解使得 Agent 协作更加自然
  3. 长期记忆:跨会话的持久化记忆使得 Agent 能够学习和积累经验
  4. 安全与隐私:更加完善的安全机制保护 Agent 协作中的敏感信息
  5. 标准化接口:行业标准的制定使得不同厂商的 Agent 能够互操作

参考链接:


附录(Appendix):

附录 A:完整代码清单

A.1 核心数据结构和类型定义
代码语言:javascript
复制
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Set
from datetime import datetime
from enum import Enum
import uuid

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    BLOCKED = "blocked"

class AgentState(Enum):
    IDLE = "idle"
    BUSY = "busy"
    BLOCKED = "blocked"
    TERMINATED = "terminated"

@dataclass
class Task:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    description: str = ""
    assigned_agent: Optional[str] = None
    dependencies: Set[str] = field(default_factory=set)
    dependents: Set[str] = field(default_factory=set)
    status: TaskStatus = TaskStatus.PENDING
    input_data: Any = None
    output_data: Any = None
    estimated_duration: float = 0.0
    actual_duration: float = 0.0
    metadata: Dict[str, Any] = field(default_factory=dict)
A.2 Blackboard 实现
代码语言:javascript
复制
class Blackboard:
    def __init__(self, name: str = "default"):
        self.name = name
        self._storage: Dict[str, Any] = {}
        self._lock = threading.RLock()
    
    def write(self, key: str, value: Any, agent_id: str) -> None:
        with self._lock:
            self._storage[key] = value
    
    def read(self, key: str, agent_id: str) -> Optional[Any]:
        with self._lock:
            return self._storage.get(key)
    
    def delete(self, key: str) -> bool:
        with self._lock:
            if key in self._storage:
                del self._storage[key]
                return True
            return False
A.3 MessageBus 实现
代码语言:javascript
复制
class MessageBus:
    def __init__(self):
        self._queues: Dict[str, asyncio.Queue] = {}
        self._lock = threading.RLock()
    
    def register_agent(self, agent_id: str):
        with self._lock:
            self._queues[agent_id] = asyncio.Queue()
    
    async def send(self, message: 'Message'):
        if message.is_broadcast():
            for agent_id in self._queues:
                if agent_id != message.sender:
                    await self._queues[agent_id].put(message)
        else:
            await self._queues[message.receiver].put(message)
    
    async def receive(self, agent_id: str, timeout: float = None) -> Optional['Message']:
        try:
            return await asyncio.wait_for(
                self._queues[agent_id].get(), 
                timeout=timeout
            )
        except asyncio.TimeoutError:
            return None
A.4 Agent 基类
代码语言:javascript
复制
class BaseAgent:
    def __init__(self, agent_id: str, name: str):
        self.agent_id = agent_id
        self.name = name
        self.blackboard: Optional[Blackboard] = None
        self.message_bus: Optional[MessageBus] = None
    
    def set_blackboard(self, blackboard: Blackboard):
        self.blackboard = blackboard
    
    def set_message_bus(self, message_bus: MessageBus):
        self.message_bus = message_bus
    
    async def execute(self, task: Task) -> Any:
        raise NotImplementedError
    
    def log(self, message: str):
        timestamp = datetime.now().strftime("%H:%M:%S")
        print(f"[{timestamp}] [{self.name}] {message}")

关键词: Multi-Agent、Agent 协作、任务分解、Task Graph、Blackboard、Message Passing、Shared Memory、冲突解决、Agent Pool、动态调度、Planner、Coder、Reviewer、AI IDE、协作策略

在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-05-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 1 Multi-Agent 系统的本质与价值
  • 2 角色模型:Specialist 与 Generalist 的权衡
    • 2.1 角色划分的理论基础
    • 2.2 Specialist 的优势与局限
    • 2.3 Generalist 的优势与局限
    • 2.4 混合角色模型:实践最优解
    • 2.5 角色定义的形式化描述
  • 3 任务分解:Task Graph 与依赖管理
    • 3.1 任务分解的核心原则
    • 3.2 Task Graph 的构建
    • 3.3 依赖管理的策略
  • 4 通信协议:Blackboard、Message Passing、Shared Memory
    • 4.1 通信协议概述
    • 4.2 Blackboard 模式
    • 4.3 Message Passing 模式
    • 4.4 Shared Memory 模式
    • 4.5 通信协议对比与选择
  • 5 协作策略:串行执行、并行执行与层次化执行
    • 5.1 协作策略概述
    • 5.2 串行执行策略
    • 5.3 并行执行策略
    • 5.4 混合协作策略
  • 6 冲突解决:投票、仲裁与优先级机制
    • 6.1 冲突的类型与来源
    • 6.2 投票机制
    • 6.3 仲裁机制
    • 6.4 优先级机制
  • 7 规模化:Agent Pool 与动态调度
    • 7.1 规模化的挑战
    • 7.2 Agent Pool 架构
    • 7.3 动态调度
    • 7.4 自动扩缩容
  • 8 实践:Planner-Coder-Reviewer 三 Agent 协作系统
    • 8.1 系统架构
    • 8.2 完整实现
  • 9 最佳实践与设计模式
    • 9.1 Multi-Agent 系统设计原则
    • 9.2 常见架构模式
    • 9.3 性能优化技巧
  • 10 总结与展望
    • 10.1 核心要点回顾
    • 10.2 未来发展方向
  • 附录 A:完整代码清单
    • A.1 核心数据结构和类型定义
    • A.2 Blackboard 实现
    • A.3 MessageBus 实现
    • A.4 Agent 基类
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档