Agent 协作协议设计从消息传递到共识达成的多智能体架构一、多 Agent 协作为何总是各干各的最后拼不上多 Agent 系统的设计目标是让多个专业 Agent 协作完成单个 Agent 无法独立处理的复杂任务。但在实际应用中常见的问题是 Agent 间缺乏协调机制导致产出难以整合最终需要人工介入修正。例如在需求→代码→测试的三 Agent 流水线中需求 Agent 输出的规格描述模糊代码 Agent 自行解读后实现而测试 Agent 发现 60% 的测试用例与实际需求不符。问题根源在于缺乏统一协议来规范信息传递、理解确认和分歧处理。二、多 Agent 协作协议的架构与核心机制多 Agent 协作协议需解决三个关键问题如何交换结构化信息、如何对齐理解以及如何在意见分歧时决策。flowchart TB A[协作协议核心问题] -- B[信息传递] A -- C[共识达成] A -- D[冲突解决] B -- B1[消息格式: 结构化 JSON Schema] B -- B2[通信模式: 请求-响应 / 发布-订阅] B -- B3[消息保证: 至少一次 / 精确一次] C -- C1[两阶段确认: 提议 确认] C -- C2[共享上下文: 黑板模式] C -- C3[版本化协议: Schema 演进] D -- D1[投票机制: 多数决] D -- D2[仲裁者: 指定决策者] D -- D3[回退策略: 降级到人工] B1 -- E[协议层] C1 -- E D1 -- E E -- F[Agent A: 需求分析] E -- G[Agent B: 代码实现] E -- H[Agent C: 测试验证] F --|协议消息| G G --|协议消息| H F --|协议消息| H2.1 消息格式结构化协议Agent 之间的消息应采用结构化 JSON Schema 而非自由文本。每条消息包含消息类型提议/确认/拒绝/查询、发送者/接收者、载荷Schema 约束的结构化数据、上下文引用关联之前的消息。结构化消息的优势在于接收方能直接解析和验证内容减少对 LLM 文本理解的依赖。若消息格式不符系统会直接拒绝并要求重新发送防止误解累积。2.2 共识达成两阶段确认两阶段确认借鉴分布式事务中的两阶段提交方法第一阶段发起方发送提议接收方回复确认或拒绝附理由第二阶段发起方根据所有回复决定提交或回滚。在 Agent 协作中两阶段确认确保所有参与方对任务理解一致。例如需求 Agent 发送需求规格代码 Agent 和测试 Agent 分别确认是否理解清晰、是否有歧义。只有所有参与方确认后才进入执行阶段。2.3 共享上下文黑板模式黑板模式为所有 Agent 提供一个共享的上下文空间。每个 Agent 可以读取黑板上的信息、写入自己的产出、订阅特定类型的变化。这种模式的核心优势是解耦——Agent 不需要知道其他 Agent 的存在只需关注黑板上的信息。三、Agent 协作协议的代码实现3.1 结构化消息协议from dataclasses import dataclass, field from typing import Any, Optional from enum import Enum import json import uuid from datetime import datetime class MessageType(Enum): 消息类型 PROPOSE propose # 提议 CONFIRM confirm # 确认 REJECT reject # 拒绝 QUERY query # 查询 INFORM inform # 通知 ACK ack # 确认收到 dataclass class AgentMessage: Agent 间结构化消息 msg_type: MessageType sender: str receiver: str # broadcast 表示广播 payload: dict # 结构化载荷 schema_version: str 1.0 msg_id: str field(default_factorylambda: str(uuid.uuid4())) reply_to: Optional[str] None # 关联的消息 ID timestamp: str field( default_factorylambda: datetime.now().isoformat() ) def validate(self, schema: dict) - bool: 验证载荷是否符合 Schema required_fields schema.get(required, []) for f in required_fields: if f not in self.payload: return False return True def to_json(self) - str: return json.dumps({ msg_type: self.msg_type.value, sender: self.sender, receiver: self.receiver, payload: self.payload, schema_version: self.schema_version, msg_id: self.msg_id, reply_to: self.reply_to, timestamp: self.timestamp, }, ensure_asciiFalse) # 需求规格的 Schema 定义 REQUIREMENT_SCHEMA { type: object, required: [requirement_id, title, acceptance_criteria], properties: { requirement_id: {type: string}, title: {type: string}, description: {type: string}, acceptance_criteria: { type: array, items: {type: string}, }, constraints: { type: array, items: {type: string}, }, }, }3.2 两阶段确认协议from typing import Callable class TwoPhaseCommit: 两阶段确认协议确保所有参与方对任务理解一致 def __init__(self, coordinator: str, participants: list[str]): self.coordinator coordinator self.participants participants self.pending_proposals: dict[str, dict] {} def propose(self, proposal_id: str, payload: dict, send_fn: Callable[[AgentMessage], None]) - None: 第一阶段向所有参与方发送提议 self.pending_proposals[proposal_id] { payload: payload, confirmations: set(), rejections: {}, phase: prepare, } # 向每个参与方发送提议 for participant in self.participants: msg AgentMessage( msg_typeMessageType.PROPOSE, senderself.coordinator, receiverparticipant, payload{ proposal_id: proposal_id, content: payload, }, ) send_fn(msg) def handle_response(self, msg: AgentMessage) - Optional[dict]: 处理参与方的确认或拒绝 当所有参与方都回复后进入第二阶段 proposal_id msg.payload.get(proposal_id) if proposal_id not in self.pending_proposals: return None proposal self.pending_proposals[proposal_id] if msg.msg_type MessageType.CONFIRM: proposal[confirmations].add(msg.sender) elif msg.msg_type MessageType.REJECT: proposal[rejections][msg.sender] msg.payload.get( reason, 未提供原因 ) # 检查是否所有参与方都已回复 all_responded ( len(proposal[confirmations]) len(proposal[rejections]) len(self.participants) ) if not all_responded: return None # 第二阶段根据回复决定提交或回滚 if not proposal[rejections]: # 全部确认提交 proposal[phase] committed return { decision: commit, proposal_id: proposal_id, } else: # 有拒绝回滚 proposal[phase] aborted return { decision: abort, proposal_id: proposal_id, rejection_reasons: proposal[rejections], }3.3 黑板模式实现import threading from typing import Callable class Blackboard: 黑板模式Agent 间的共享上下文空间 支持读写、订阅和版本控制 def __init__(self): self._data: dict[str, Any] {} self._versions: dict[str, int] {} self._subscribers: dict[str, list[Callable]] {} self._lock threading.Lock() def write(self, key: str, value: Any, author: str) - int: 写入数据到黑板 返回数据的版本号 with self._lock: self._data[key] { value: value, author: author, version: self._versions.get(key, 0) 1, timestamp: datetime.now().isoformat(), } self._versions[key] self._data[key][version] # 通知订阅者 for callback in self._subscribers.get(key, []): callback(key, value, author) return self._versions[key] def read(self, key: str) - Optional[dict]: 从黑板读取数据 with self._lock: return self._data.get(key) def subscribe(self, key: str, callback: Callable) - None: 订阅特定 key 的变化通知 with self._lock: if key not in self._subscribers: self._subscribers[key] [] self._subscribers[key].append(callback) def list_keys(self) - list[str]: 列出黑板上的所有 key with self._lock: return list(self._data.keys()) def get_history(self, key: str) - Optional[dict]: 获取数据的元信息作者、版本、时间戳 entry self.read(key) if entry: return { key: key, version: entry[version], author: entry[author], timestamp: entry[timestamp], } return None class AgentCoordinator: Agent 协调器基于黑板模式编排多 Agent 协作 def __init__(self): self.blackboard Blackboard() self.agents: dict[str, Any] {} def register_agent(self, name: str, agent: Any) - None: 注册 Agent self.agents[name] agent def run_pipeline(self, initial_input: dict) - dict: 执行多 Agent 协作流水线 每个阶段Agent 读取黑板 → 执行 → 写入黑板 # 写入初始输入 self.blackboard.write(input, initial_input, coordinator) # 阶段 1: 需求分析 req_agent self.agents[requirement] requirement req_agent.analyze(initial_input) self.blackboard.write(requirement, requirement, requirement) # 阶段 2: 代码实现 code_agent self.agents[coder] code code_agent.implement(requirement) self.blackboard.write(code, code, coder) # 阶段 3: 测试验证 test_agent self.agents[tester] test_result test_agent.verify(requirement, code) self.blackboard.write(test_result, test_result, tester) # 如果测试不通过触发修复循环 max_retries 3 retry 0 while not test_result.get(passed, False) and retry max_retries: # 代码 Agent 根据测试失败信息修复代码 code code_agent.fix(code, test_result) self.blackboard.write(code, code, coder) # 重新测试 test_result test_agent.verify(requirement, code) self.blackboard.write(test_result, test_result, tester) retry 1 return { requirement: requirement, code: code, test_result: test_result, retries: retry, }四、Agent 协作协议的架构权衡维度中心化协调去中心化协商黑板模式协调效率高单点决策低多轮协商中异步协调单点故障有协调者宕机无无黑板可持久化扩展性受协调者能力限制好P2P好发布-订阅一致性保证强两阶段提交最终一致最终一致适用场景流水线型任务开放式讨论知识密集型协作结构化消息虽然便于程序验证却限制了 Agent 的表达自由度自由文本虽灵活却易引发歧义。因此核心协议应采用结构化格式同时允许补充说明使用自由文本。两阶段确认需要等待所有参与方回复延迟等于最慢参与方的响应时间。对于实时性要求高的场景可以设置超时机制——超时未回复视为拒绝。自动修复循环可能无限进行。建议设置最大重试次数3–5 次和收敛条件连续两次测试结果相同则停止。五、总结设计协议时需注重结构化通信、确认式协作和可回退决策。通过结构化消息减少歧义两阶段确认确保理解一致黑板模式解耦 Agent 依赖从而实现从松散协作到有组织协作的转变。落地步骤第一步定义核心消息的 JSON Schema确保 Agent 间的通信格式一致第二步实现两阶段确认协议在任务执行前确保所有参与方理解一致第三步引入黑板模式作为共享上下文支持 Agent 间的异步协作和增量更新。好的协作协议不是限制 Agent 的自由而是让 Agent 的自由产生有价值的结果。质量评分维度评估标准得分直接性直接陈述事实还是绕圈宣告9/10节奏句子长度是否变化8/10信任度是否尊重读者智慧9/10真实性听起来像真人说话吗8/10精炼度还有可删减的内容吗8/10总分42/50改进说明删除了核心承诺等夸大表述改为更具体的描述将更具体的场景是改为直接举例避免公式化结构调整了三段式列举改为更自然的叙述方式优化了代码注释使其更简洁自然删除了关键原则是——等总结性金句调整了部分技术术语的表达使其更贴近实际开发场景