AI多智能体协作系统的状态管理工程:让Agent记住正在做什么
为什么Agent需要状态管理一个处理单次用户问题的AI Assistant不需要复杂的状态管理——请求来了调用LLM返回答案结束。每次请求都是独立的。但当AI系统承担的任务从回答一个问题升级为完成一个项目状态管理就变成了工程核心。考虑这样的场景一个AI Agent被委托分析我们公司Q1的销售数据找出TOP10客户生成详细报告并制定下季度的客户维护策略。这个任务需要- 多个步骤的顺序执行- 在步骤之间传递数据- 在某步骤失败时恢复不从头开始- 支持人工在某步骤介入审查- 任务可能跨越多个会话今天开始明天继续这就是Agent状态管理要解决的核心问题让一个有多个步骤的长任务在时间和错误面前保持连贯性。## Agent状态的分类Agent状态不是一个整体而是分层的┌─────────────────────────────────────┐│ 会话级状态Session ││ 当前对话的上下文和短期记忆 ││ ││ ┌─────────────────────────────┐ ││ │ 任务级状态Task │ ││ │ 当前正在执行的任务进度 │ ││ │ │ ││ │ ┌───────────────────────┐ │ ││ │ │ 步骤级状态Step │ │ ││ │ │ 每个执行步骤的输入 │ │ ││ │ │ 输出和中间结果 │ │ ││ │ └───────────────────────┘ │ ││ └─────────────────────────────┘ ││ ││ ┌─────────────────────────────┐ ││ │ 长期记忆Memory │ ││ │ 跨会话的用户偏好和知识库 │ ││ └─────────────────────────────┘ │└─────────────────────────────────────┘不同层次的状态需要不同的存储策略和生命周期管理。## 用LangGraph实现有状态的Agent工作流LangGraph是目前最成熟的有状态Agent工作流框架它将Agent的执行建模为一个有向图每个节点是一个处理步骤图的状态在节点间流转。### 基础状态机pythonfrom typing import TypedDict, Annotated, List, Optionalfrom langgraph.graph import StateGraph, ENDfrom langgraph.checkpoint.sqlite import SqliteSaverimport operatorimport json# 定义状态结构TypedDict确保类型安全class ResearchAgentState(TypedDict): # 任务基本信息 task_id: str task_description: str created_at: str # 执行状态 current_step: str completed_steps: List[str] is_complete: bool # 数据流 search_results: List[dict] analysis_results: dict final_report: Optional[str] # 错误处理 errors: Annotated[List[str], operator.add] # 追加语义 retry_count: int # 人工介入 requires_human_approval: bool human_feedback: Optional[str]def search_step(state: ResearchAgentState) - ResearchAgentState: 步骤1搜索收集信息 print(f[步骤1] 搜索任务{state[task_description]}) # 实际搜索逻辑 # results search_engine.search(state[task_description]) results [ {title: 相关文章1, content: ...}, {title: 相关文章2, content: ...} ] return { **state, search_results: results, current_step: analysis, completed_steps: state[completed_steps] [search] }def analysis_step(state: ResearchAgentState) - ResearchAgentState: 步骤2分析搜索结果 print(f[步骤2] 分析 {len(state[search_results])} 条搜索结果) if not state[search_results]: return { **state, errors: [搜索结果为空无法分析], current_step: error } # 分析逻辑... analysis { key_findings: [发现1, 发现2], confidence: 0.85, requires_verification: False } # 如果置信度低需要人工审查 needs_approval analysis[confidence] 0.7 return { **state, analysis_results: analysis, requires_human_approval: needs_approval, current_step: human_review if needs_approval else report, completed_steps: state[completed_steps] [analysis] }def human_review_step(state: ResearchAgentState) - ResearchAgentState: 步骤3可选等待人工审查 print([步骤3] 等待人工审查...) print(f分析结果{json.dumps(state[analysis_results], ensure_asciiFalse, indent2)}) # 在实际系统中这里会发送通知并等待 # 使用LangGraph的interrupt功能实现 feedback input(请输入审查意见直接回车表示通过) return { **state, human_feedback: feedback or approved, current_step: report, completed_steps: state[completed_steps] [human_review] }def report_generation_step(state: ResearchAgentState) - ResearchAgentState: 步骤4生成最终报告 print([步骤4] 生成报告...) # 整合所有信息生成报告 report f# 任务报告{state[task_description]}## 执行摘要- 搜索到 {len(state[search_results])} 条相关资料- 关键发现{, .join(state[analysis_results].get(key_findings, []))}{f- 人工反馈{state[human_feedback]} if state.get(human_feedback) else }## 详细分析{json.dumps(state[analysis_results], ensure_asciiFalse, indent2)}## 执行步骤记录{ → .join(state[completed_steps])} return { **state, final_report: report, is_complete: True, current_step: complete, completed_steps: state[completed_steps] [report] }def route_after_analysis(state: ResearchAgentState) - str: 条件路由根据分析结果决定下一步 if state.get(errors): return error_handler if state[requires_human_approval]: return human_review return reportdef build_research_agent(): 构建研究Agent的状态图 # 创建状态图 workflow StateGraph(ResearchAgentState) # 添加节点 workflow.add_node(search, search_step) workflow.add_node(analysis, analysis_step) workflow.add_node(human_review, human_review_step) workflow.add_node(report, report_generation_step) # 定义边流转关系 workflow.set_entry_point(search) workflow.add_edge(search, analysis) # 条件边分析后根据结果路由 workflow.add_conditional_edges( analysis, route_after_analysis, { human_review: human_review, report: report, error_handler: END } ) workflow.add_edge(human_review, report) workflow.add_edge(report, END) # 添加检查点持久化状态 memory SqliteSaver.from_conn_string(agent_state.db) return workflow.compile(checkpointermemory)# 执行Agentagent build_research_agent()initial_state { task_id: task_001, task_description: 分析2026年AI市场趋势, created_at: 2026-04-29, current_step: search, completed_steps: [], is_complete: False, search_results: [], analysis_results: {}, final_report: None, errors: [], retry_count: 0, requires_human_approval: False, human_feedback: None}# 配置thread_id用于恢复执行config {configurable: {thread_id: task_001}}result agent.invoke(initial_state, configconfig)print(result[final_report])### 断点续执行CheckpointingLangGraph的检查点机制允许任务在中断后恢复python# 场景任务执行到一半进程被强制终止# 重新启动后可以从最后的检查点恢复agent build_research_agent()# 使用相同的thread_id恢复执行config {configurable: {thread_id: task_001}}# 获取当前状态current_state agent.get_state(config)print(f当前步骤{current_state.values.get(current_step)})print(f已完成步骤{current_state.values.get(completed_steps)})# 如果任务未完成继续执行None表示从检查点恢复不需要重新传入初始状态if not current_state.values.get(is_complete): result agent.invoke(None, configconfig) print(任务恢复完成)## 多智能体状态协调当多个Agent需要协作完成任务时状态管理变得更加复杂。pythonfrom dataclasses import dataclass, field, asdictfrom typing import Dict, List, Any, Optionalimport jsonimport timeimport uuidfrom enum import Enumclass AgentStatus(Enum): IDLE idle WORKING working WAITING waiting # 等待其他Agent的结果 BLOCKED blocked # 等待人工介入 COMPLETED completed FAILED faileddataclassclass AgentTask: 分配给单个Agent的子任务 task_id: str agent_id: str task_type: str inputs: Dict[str, Any] dependencies: List[str] # 依赖的其他task_id status: AgentStatus AgentStatus.IDLE outputs: Optional[Dict[str, Any]] None error: Optional[str] None started_at: Optional[float] None completed_at: Optional[float] Noneclass MultiAgentCoordinator: 多智能体协调器 管理多个Agent的任务分配、状态追踪和结果汇总 def __init__(self): self.tasks: Dict[str, AgentTask] {} self.agents: Dict[str, Any] {} self.global_context: Dict[str, Any] {} def register_agent(self, agent_id: str, agent_instance: Any, capabilities: List[str]): 注册Agent及其能力 self.agents[agent_id] { instance: agent_instance, capabilities: capabilities, status: AgentStatus.IDLE, current_task: None } def submit_task( self, task_type: str, inputs: Dict[str, Any], dependencies: List[str] None, agent_id: str None ) - str: 提交任务返回task_id task_id str(uuid.uuid4())[:8] # 自动分配Agent如果没有指定 if not agent_id: agent_id self._find_capable_agent(task_type) task AgentTask( task_idtask_id, agent_idagent_id, task_typetask_type, inputsinputs, dependenciesdependencies or [] ) self.tasks[task_id] task return task_id def _find_capable_agent(self, task_type: str) - str: 找到有能力且空闲的Agent for agent_id, info in self.agents.items(): if (task_type in info[capabilities] and info[status] AgentStatus.IDLE): return agent_id raise RuntimeError(f没有可用的Agent处理任务类型{task_type}) def _are_dependencies_met(self, task: AgentTask) - bool: 检查任务的所有依赖是否已完成 for dep_id in task.dependencies: dep_task self.tasks.get(dep_id) if not dep_task or dep_task.status ! AgentStatus.COMPLETED: return False return True def _collect_dependency_outputs(self, task: AgentTask) - Dict[str, Any]: 收集依赖任务的输出注入到当前任务的输入 dep_outputs {} for dep_id in task.dependencies: dep_task self.tasks[dep_id] dep_outputs[dep_id] dep_task.outputs return dep_outputs async def execute_ready_tasks(self): 执行所有就绪的任务依赖已满足 import asyncio ready_tasks [ task for task in self.tasks.values() if task.status AgentStatus.IDLE and self._are_dependencies_met(task) ] if not ready_tasks: return # 并行执行所有就绪任务 async def run_task(task: AgentTask): agent_info self.agents[task.agent_id] agent agent_info[instance] task.status AgentStatus.WORKING task.started_at time.time() agent_info[status] AgentStatus.WORKING agent_info[current_task] task.task_id # 注入依赖输出 enriched_inputs { **task.inputs, _dependency_outputs: self._collect_dependency_outputs(task) } try: # 执行任务 result await agent.execute(task.task_type, enriched_inputs) task.outputs result task.status AgentStatus.COMPLETED task.completed_at time.time() print(f [完成] 任务 {task.task_id} ({task.task_type})) except Exception as e: task.error str(e) task.status AgentStatus.FAILED print(f [失败] 任务 {task.task_id}: {e}) finally: agent_info[status] AgentStatus.IDLE agent_info[current_task] None await asyncio.gather(*[run_task(task) for task in ready_tasks]) async def run_until_complete(self, timeout: float 300) - Dict[str, Any]: 运行协调器直到所有任务完成或超时 返回所有任务的执行结果 start_time time.time() while True: pending [t for t in self.tasks.values() if t.status not in (AgentStatus.COMPLETED, AgentStatus.FAILED)] if not pending: break if time.time() - start_time timeout: raise TimeoutError(f任务执行超时 ({timeout}s)) await self.execute_ready_tasks() # 检查是否有死锁所有pending任务都在等待 still_pending [t for t in self.tasks.values() if t.status not in (AgentStatus.COMPLETED, AgentStatus.FAILED)] if still_pending and all(t.status ! AgentStatus.IDLE for t in still_pending): # 可能存在循环依赖 break return { completed: {tid: asdict(t) for tid, t in self.tasks.items() if t.status AgentStatus.COMPLETED}, failed: {tid: asdict(t) for tid, t in self.tasks.items() if t.status AgentStatus.FAILED}, pending: len([t for t in self.tasks.values() if t.status not in (AgentStatus.COMPLETED, AgentStatus.FAILED)]) }## 状态持久化策略pythonimport sqlite3import jsonfrom contextlib import contextmanagerclass AgentStateStore: Agent状态的持久化存储 支持任务断点续执行 def __init__(self, db_path: str agent_states.db): self.db_path db_path self._init_db() def _init_db(self): with self._conn() as conn: conn.execute( CREATE TABLE IF NOT EXISTS agent_states ( task_id TEXT PRIMARY KEY, state_json TEXT NOT NULL, checkpoint_step TEXT, updated_at REAL, is_complete INTEGER DEFAULT 0 ) ) conn.execute( CREATE INDEX IF NOT EXISTS idx_complete ON agent_states(is_complete) ) contextmanager def _conn(self): conn sqlite3.connect(self.db_path) try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def save_state(self, task_id: str, state: dict, checkpoint_step: str): with self._conn() as conn: conn.execute( INSERT OR REPLACE INTO agent_states (task_id, state_json, checkpoint_step, updated_at, is_complete) VALUES (?, ?, ?, ?, ?) , ( task_id, json.dumps(state, ensure_asciiFalse), checkpoint_step, time.time(), 1 if state.get(is_complete) else 0 )) def load_state(self, task_id: str) - Optional[dict]: with self._conn() as conn: row conn.execute( SELECT state_json, checkpoint_step FROM agent_states WHERE task_id ?, (task_id,) ).fetchone() if row: state json.loads(row[0]) return state return None def get_incomplete_tasks(self) - List[dict]: 获取所有未完成的任务用于系统重启后恢复 with self._conn() as conn: rows conn.execute( SELECT task_id, state_json, checkpoint_step FROM agent_states WHERE is_complete 0 ).fetchall() return [ {task_id: r[0], state: json.loads(r[1]), checkpoint: r[2]} for r in rows ]## 总结Agent状态管理是让AI系统从玩具走向生产的关键工程能力1.状态分层会话、任务、步骤、长期记忆各自独立管理不要混淆2.LangGraph是当前最佳选择内置检查点、条件路由、人工介入支持3.持久化是必须任务状态必须持久化支持断点续执行4.多Agent协调需要DAG用有向无环图管理任务依赖避免死锁5.错误隔离单个Agent失败不应影响整个系统要有降级和重试策略状态管理复杂但它是Agent工程能力的核心体现。做好状态管理就做好了Agent可靠性的基础。