第5集:LangGraph 多 Agent 协作!面试官最爱的“状态机编排”手写实战
第5集LangGraph 多 Agent 协作面试官最爱的“状态机编排”手写实战本集解锁内容手写 LangGraph 状态机、实现“巡检→诊断→修复→验证”SOP、条件分支与人工审批Human-in-the-Loop、面试官必问的“为什么用状态机而不是 if-else”。学完本集你能在面试中画出完整的状态流转图并解释状态的持久化与断点续跑。 用户痛点引入单 Agent 只会乱调工具流程一复杂就翻车兄弟们上一集我们给 Agent 装上了自愈手术刀它能自动重启服务、清理磁盘。但如果你直接把这个 Agent 丢到生产环境大概率会遇到这样的问题Agent 一上来就重启服务根本没先做巡检确认。重启后忘了验证服务其实没起来它还报告“修复成功”。多个操作顺序混乱该先清理磁盘的却先杀了进程。遇到需要人工审批的高风险操作比如数据库重启Agent 直接卡住或跳过。执行到一半服务挂了重启后工作流完全丢失不知道修到哪了。面试官一听就知道你的系统没有流程控制。他会追问“你是怎么编排多个 Agent 或工具的怎么保证执行顺序怎么处理异常分支怎么支持人工审批”如果你回答“我写了很多 if-else”那基本就是初级水平了。真正的企业级 AIOps必须用状态机来编排复杂工作流。而 LangGraph 正是为此而生——它把 Agent 从“黑箱”变成了“可解释的流程图”。你可以定义每一步做什么、下一步怎么走、什么情况下暂停等人工审批甚至中间断电重启也能从断点继续。2026 年的 AIOps 面试多 Agent 协作与工作流编排是高级工程师的必考点。今天我们就用 LangGraph 手写一个完整的巡检→诊断→修复→验证状态机并且加入人工审批节点。学完本集面试时你就能轻松画出状态流转图并解释清楚状态持久化、条件路由、断点续跑等核心问题。 环境准备确认 LangGraph 已就绪cd~/aiops-interviewsourcevenv/bin/activate# 确认 LangGraph 已安装第1集已装python-cimport langgraph; print(fLangGraph {langgraph.__version__})# 应输出 0.2.0本集需要 SQLite 持久化支持pipinstalllanggraph-checkpoint-sqlite 分步实操手写 LangGraph 状态机工作流第一步设计工作流面试时直接画这个图我们以最常见的“CPU 飙高处理 SOP”为例设计如下工作流[开始] → 远程巡检CPU ──正常──→ [结束健康] │ 异常 ↓ 诊断分析原因 │ ┌───────┼───────┐ ↓ ↓ ↓ 需要重启 需要清理 无法自动修复 │ │ │ ↓ ↓ ↓ 人工审批 自动清理 人工告警 │ │ 批准 拒绝 │ │ ↓ ↓ 重启 跳过 │ ↓ 验证重启结果 │ ├──成功→ [结束已修复] └──失败→ [结束升级告警]面试时说出这个流程面试官就知道你理解了复杂运维场景的编排。第二步定义状态State—— 工作流的“记忆”新建src/workflow/self_healing_graph.py#!/usr/bin/env python3 LangGraph 状态机工作流 - CPU 飙高处理 SOP 面试重点 1. State 是工作流的核心定义了所有需要跨步骤传递的数据 2. 使用 TypedDict 定义状态保证类型安全 3. 状态通过 Checkpointer 持久化支持断点续跑 fromtypingimportTypedDict,Literal,Optionalfromdatetimeimportdatetimefromlanggraph.graphimportStateGraph,ENDfromlanggraph.checkpoint.sqliteimportSqliteSaverfromlanggraph.typesimportinterrupt,Command# 状态定义面试必讲 classSOPState(TypedDict):工作流共享状态# 事件基础信息incident_id:strtarget_host:strtarget_service:str# 巡检数据cpu_usage:float# CPU 使用率cpu_threshold:float# 阈值# 诊断结果diagnosis:str# 诊断结论suggested_action:str# 建议动作restart / clean / escalate# 审批状态approval_required:boolapproval_granted:Optional[bool]# 执行结果repair_result:strfinal_status:str# healthy / repaired / escalated / failed# 审计actions_log:list# 记录每一步操作# 节点函数 definspect_node(state:SOPState)-SOPState: 巡检节点获取远程 CPU 使用率 面试时可说这里调用实际巡检工具本演示用静态值 print(f[NODE:巡检] 检查{state[target_host]}的 CPU...)# 模拟获取 CPU 数据实际应调用 remote_check_cpu# 这里为了演示从 state 读取传入的值cpustate[cpu_usage]thresholdstate[cpu_threshold]state[actions_log].append(f{datetime.now()}: 巡检 CPU{cpu}%, 阈值{threshold}%)ifcputhreshold:state[final_status]healthystate[diagnosis]CPU 正常else:state[diagnosis]fCPU{cpu}% 超过阈值{threshold}%returnstatedefdiagnose_node(state:SOPState)-SOPState: 诊断节点分析原因并给出建议动作 面试时可说实际场景会调用诊断 Agent 结合历史数据分析 print(f[NODE:诊断] 分析 CPU 异常原因...)cpustate[cpu_usage]# 简化逻辑根据 CPU 值推断动作ifcpu95:actionrestart# 极高可能需要重启elifcpu85:actionclean# 偏高可能是日志或临时文件导致else:actionescalate# 无法自动判断state[suggested_action]action state[actions_log].append(f{datetime.now()}: 诊断建议{action})# 高风险操作标记需要审批ifactionrestart:state[approval_required]Truereturnstatedefhuman_approval_node(state:SOPState)-SOPState: 人工审批节点Human-in-the-Loop 面试重点通过 interrupt 暂停工作流等待外部审批 print(f[NODE:审批] 需要人工审批才能执行{state[suggested_action]}...)# 暂停并发送审批请求answerinterrupt(f⚠️ 需要审批对{state[target_host]}/{state[target_service]}执行{state[suggested_action]}f\n当前状态{state[diagnosis]}f\n输入 yes 批准 / no 拒绝)state[approval_granted](answer.strip().lower()yes)state[actions_log].append(f{datetime.now()}: 审批结果{批准ifstate[approval_granted]else拒绝})ifnotstate[approval_granted]:state[final_status]escalatedstate[repair_result]人工审批拒绝returnstatedefrepair_node(state:SOPState)-SOPState: 修复节点根据建议动作执行修复 实际调用自愈工具restart_service / clean_disk_space 等 actionstate[suggested_action]hoststate[target_host]svcstate[target_service]print(f[NODE:修复] 执行动作:{action}对{host}/{svc})# 模拟执行修复实际调用 remotetoolsifactionrestart:# 模拟重启成功state[repair_result]f已重启{svc}elifactionclean:state[repair_result]f已清理{host}日志else:state[repair_result]未知动作state[actions_log].append(f{datetime.now()}: 执行{action})returnstatedefverify_node(state:SOPState)-SOPState: 验证节点修复后再次检查 print(f[NODE:验证] 检查修复效果...)# 模拟验证修复后 CPU 降回正常state[cpu_usage]45.0# 模拟恢复ifstate[cpu_usage]state[cpu_threshold]:state[final_status]repairedstate[actions_log].append(f{datetime.now()}: 验证通过CPU 已恢复)else:state[final_status]escalatedstate[actions_log].append(f{datetime.now()}: 验证失败需人工介入)returnstatedefescalate_node(state:SOPState)-SOPState:升级节点无法自动修复通知人工print(f[NODE:升级] 无法自动修复升级告警...)state[final_status]escalatedstate[actions_log].append(f{datetime.now()}: 升级为人工处理)returnstate# 条件路由函数面试必讲 defroute_after_inspect(state:SOPState)-Literal[diagnose,__end__]:巡检后路由正常结束异常去诊断ifstate[final_status]healthy:return__end__returndiagnosedefroute_after_diagnose(state:SOPState)-Literal[approve,repair,escalate]:诊断后路由根据建议动作选择路径actionstate[suggested_action]ifactionrestart:returnapprove# 需要审批elifactionclean:returnrepair# 直接修复else:returnescalate# 无法修复defroute_after_approval(state:SOPState)-Literal[repair,escalate]:审批后路由批准去修复拒绝去升级ifstate[approval_granted]:returnrepairreturnescalatedefroute_after_repair(state:SOPState)-Literal[verify,escalate]:修复后路由去验证重启需要验证清理可直接结束ifstate[suggested_action]restart:returnverifyelse:# 清理操作直接标记成功state[final_status]repairedreturnescalate# 实际走 escalte 但 final_status 已修改简化# 构建图面试时可展示这段代码结构 defbuild_workflow():构建 LangGraph 工作流workflowStateGraph(SOPState)# 添加节点workflow.add_node(inspect,inspect_node)workflow.add_node(diagnose,diagnose_node)workflow.add_node(approve,human_approval_node)workflow.add_node(repair,repair_node)workflow.add_node(verify,verify_node)workflow.add_node(escalate,escalate_node)# 设置入口workflow.set_entry_point(inspect)# 添加条件边核心工作流的路由逻辑workflow.add_conditional_edges(inspect,route_after_inspect)workflow.add_conditional_edges(diagnose,route_after_diagnose)workflow.add_conditional_edges(approve,route_after_approval)workflow.add_conditional_edges(repair,route_after_repair)# 终止边workflow.add_edge(verify,END)workflow.add_edge(escalate,END)returnworkflow第三步实现工作流执行器面试演示用在src/workflow/self_healing_graph.py中追加importosimportsqlite3classWorkflowRunner:工作流执行器配合人工审批def__init__(self,db_path:str./data/workflow_checkpoints.db):os.makedirs(os.path.dirname(db_path),exist_okTrue)self.connsqlite3.connect(db_path,check_same_threadFalse)self.checkpointerSqliteSaver(self.conn)self.graphbuild_workflow().compile(checkpointerself.checkpointer)defstart(self,incident_id:str,host:str,service:str,cpu_usage:float,threshold:float80.0)-dict:启动工作流config{configurable:{thread_id:incident_id}}initial_state:SOPState{incident_id:incident_id,target_host:host,target_service:service,cpu_usage:cpu_usage,cpu_threshold:threshold,diagnosis:,suggested_action:,approval_required:False,approval_granted:None,repair_result:,final_status:pending,actions_log:[],}try:resultself.graph.invoke(initial_state,config)returnself._format_result(result)exceptExceptionase:return{error:str(e),incident_id:incident_id}defresume(self,incident_id:str,approval:str)-dict:恢复中断的工作流传入审批结果config{configurable:{thread_id:incident_id}}try:resultself.graph.invoke(Command(resumeapproval),config)returnself._format_result(result)exceptExceptionase:return{error:str(e),incident_id:incident_id}defget_state(self,incident_id:str)-dict:查询工作流当前状态config{configurable:{thread_id:incident_id}}stateself.graph.get_state(config)ifstateandstate.values:returnstate.valuesreturn{error:状态不存在}def_format_result(self,result:dict)-dict:return{incident_id:result.get(incident_id),final_status:result.get(final_status),diagnosis:result.get(diagnosis),actions_log:result.get(actions_log,[]),}# 全局实例_runnerNonedefget_runner()-WorkflowRunner:global_runnerif_runnerisNone:_runnerWorkflowRunner()return_runner✅ 效果验证演示工作流执行第一步启动一个需要审批的工作流模拟 CPU 96%新建scripts/demo_workflow.py#!/usr/bin/env python3工作流面试演示importsys sys.path.insert(0,.)fromsrc.workflow.self_healing_graphimportget_runner runnerget_runner()incident_idinc-001print(*60)print( 启动 LangGraph 工作流)print(f事件ID:{incident_id}| CPU: 96%)print(*60)# 启动会中断在审批节点resultrunner.start(incident_id,server1,order-service,cpu_usage96.0)print(\n 工作流暂停等待审批)print(f状态:{result.get(final_status)})print(f动作:{result.get(actions_log)})print(f建议:{result.get(diagnosis)})# 模拟人工批准并恢复input(\n按回车模拟审批通过...)result2runner.resume(incident_id,yes)print(\n 最终结果:)print(f状态:{result2[final_status]})print(f日志:{result2[actions_log]})运行cd~/aiops-interviewsourcevenv/bin/activate python scripts/demo_workflow.py输出类似 启动 LangGraph 工作流 事件ID: inc-001 | CPU: 96% [NODE:巡检] 检查 server1 的 CPU... [NODE:诊断] 分析 CPU 异常原因... [NODE:审批] 需要人工审批才能执行 restart... 工作流暂停等待审批 状态: pending 动作: [...巡检 CPU96%..., ...诊断建议restart] 按回车模拟审批通过... [NODE:修复] 执行动作: restart 对 server1/order-service [NODE:验证] 检查修复效果... 最终结果: 状态: repaired 日志: [..., ...审批结果批准, 执行 restart, 验证通过CPU 已恢复]面试现场你可以说“这里我们演示了一个带人工审批的故障处理 SOP。LangGraph 通过interrupt在审批节点暂停等人工输入yes后继续执行修复和验证。整个流程的状态都持久化在 SQLite 中即使程序重启也能从断点继续。” 企业级避坑总结 面试标准答案1. 面试官问“为什么用状态机而不是写一堆 if-else”✅ 标准答案“if-else 在简单场景可以但一旦流程变复杂比如加了并行、回退、挂起代码会变成意大利面条。状态机把每一步抽象成独立节点路由逻辑集中在条件函数中修改一个节点的行为不影响其他节点。而且 LangGraph 天然支持状态持久化和暂停/恢复这是 if-else 做不到的。”2. 面试官问“interrupt 暂停后如果程序重启了怎么办”✅ 标准答案“状态通过 Checkpointer 持久化到数据库。只要 thread_id 不变重启后调用get_state就能恢复现场然后用Command(resume...)继续执行。我们生产环境用 PostgresSaver 替代 SQLite多实例共享状态。”3. 面试官问“多个工作流并发执行会冲突吗”✅ 标准答案“每个工作流通过thread_id隔离互不影响。Checkpointer 保证了每个线程的独立状态。LangGraph 底层有锁机制同一个 thread_id 不会并发执行。”4. 面试官问“工作流节点执行失败如何处理”✅ 标准答案“可以在节点内部加 try-except并设置重试逻辑。LangGraph 也支持配置retry策略。如果重试仍失败就跳转到升级节点走人工处理流程。”5. 面试官问“如何动态修改正在运行的工作流”✅ 标准答案“可以通过get_state获取当前状态直接修改状态字典然后用update_state更新。这在需要人工干预流程时非常有用比如强制跳转到某个节点。” 下集预告今天我们让 Agent 学会了“按 SOP 办事”工作流引擎让运维流程变得清晰可控。但面试官还会问“你的 Agent 怎么能快速找到对应的运维知识比如新人问‘Nginx 502 怎么处理’它能自动回答吗”《第6集RAG 知识库 对话记忆让 Agent 成为运维“百科全书”》我们将接入企业内部运维文档Wiki、SOP用 ChromaDB 构建向量知识库并通过 LangChain 的记忆组件实现多轮对话。学完本集面试官问你“知识库怎么做的如何保证检索准确率”你都能对答如流。点赞、收藏、关注下一集让 Agent 拥有“终身学习”的能力面试官听了直呼内行