LangGraph Agent架构实战:构建具备动态规划与执行能力的智能体工作流
1. LangGraph Agent架构概述LangGraph是一个专注于构建有状态、多角色应用程序的库它利用大语言模型LLMs来创建智能体和多智能体工作流。这个框架的核心优势体现在以下几个方面周期性支持LangGraph允许开发者定义包含循环的流程这对大多数智能体架构至关重要。这种能力使得LangGraph能够处理需要重复步骤或迭代循环的复杂任务而不仅仅是基于有向无环图DAG的解决方案。高度可控性LangGraph提供了对应用程序流程和状态的精细控制。这种精细控制对于创建行为可预测、符合预期的智能体至关重要特别是在处理复杂或敏感的应用场景时。持久性功能LangGraph内置了持久性功能这意味着智能体能够跨交互保持上下文和记忆。这对于实现长期任务的一致性和连续性非常关键。持久性还支持高级的人机交互允许人类输入无缝集成到工作流程中使智能体能够通过记忆功能学习和适应。2. LangGraph核心特性解析2.1 循环与分支循环和分支是LangGraph最强大的特性之一。它允许在应用程序中实现循环和条件语句适用于需要重复执行任务或根据不同条件执行不同操作的场景如自动化决策流程、复杂业务逻辑处理等。在实际应用中这意味着你可以构建这样的智能体自动生成代码并自我修正直到通过测试执行Web导航时根据页面内容动态调整操作处理多步骤任务时根据中间结果改变执行路径2.2 状态管理LangGraph的状态管理是其核心优势之一。状态是一个共享的数据结构存储了所有与当前执行上下文相关的信息。状态可以是任何Python类型通常使用TypedDict或PydanticBaseModel。状态管理的关键特点自动维护当一个节点执行并返回更新后的状态时框架确保新状态被传递到下一个节点生命周期状态的生命周期与图的执行周期相匹配从图的初始状态开始在每个节点执行时更新直到图执行结束通信机制状态提供了节点间的通信机制每个节点可以读取前一个节点更新的状态并在此基础上操作2.3 节点与边在LangGraph框架中节点Nodes是Python函数它们编码了智能体的逻辑。节点的第一个位置参数是State第二个可选参数是config节点对应的处理逻辑。边Edges是连接节点的对象它们定义了节点之间的转换逻辑。每条边都连接两个节点一个源节点和一个目标节点。边的主要功能是确定何时应该从源节点跳转到目标节点。LangGraph支持两种类型的边正常边直接从节点A跳转到节点B条件边调用一个函数来确定下一步应该跳转到哪个节点3. 构建Plan-and-Execute智能体3.1 设计思路Plan-and-Execute的核心思想是首先针对用户的目标提出一系列具体操作之后依次完成各个操作如果未达成目标则会重新规划。这种架构的优势在于明确的长期规划有利于在执行规划中使用最好的模型在执行各个任务过程中可以使用较弱的模型以节省成本动态调整能力根据执行结果优化后续步骤3.2 实现步骤典型的Plan-and-Execute工作流包含以下步骤用户提出需求针对需求进行规划生成一系列任务列表执行每一项任务这里使用到各个任务对应的智能体任务执行结束更新状态状态用于确认用户需求是否完成如果达成用户需求则返回结果给用户否则基于当前状态重新生成任务列表跳转到第3步3.3 关键组件实现3.3.1 状态定义我们首先定义一个PlanExecute类来描述执行计划的结构from typing import Annotated, List, Tuple from typing_extensions import TypedDict import operator class PlanExecute(TypedDict): input: str # 原始输入字符串类型 plan: List[str] # 构建计划列表 past_steps: Annotated[List[Tuple], operator.add] # 追踪之前的执行步骤数据类型为元组列表 response: str # 最终输出字符串类型3.3.2 规划节点规划节点负责根据用户输入生成初始计划from pydantic import BaseModel, Field class Plan(BaseModel): 要遵循的未来计划 steps: List[str] Field( description要遵循的不同步骤应按排序顺序排列 ) planner_prompt ChatPromptTemplate.from_messages([ (system, 针对给定目标提出一个简单的分步计划。 这个计划应包含单独的任务如果正确执行将产生正确答案。 不要添加任何多余的步骤。 最后一步的结果应该是最终答案。 确保每个步骤都包含所需的所有信息 - 不要跳过步骤。), (placeholder, {messages}), ]) planner planner_prompt | llm.with_structured_output(Plan)3.3.3 执行节点执行节点负责实际执行计划中的任务async def execute_step(state: PlanExecute): plan state[plan] plan_str \n.join(f{i1}. {step} for i, step in enumerate(plan)) task plan[0] task_formatted f对于以下计划 {plan_str} 你的任务是执行第1步{task}。 agent_response await agent_executor.ainvoke( {messages: [(user, task_formatted)]} ) return { past_steps: [(task, agent_response[messages][-1].content)], }3.3.4 重新规划节点重新规划节点根据当前状态决定是继续执行还是结束from typing import Union class Response(BaseModel): 给用户的响应 response: str class Act(BaseModel): 要执行的动作 action: Union[Response, Plan] Field( description要执行的动作。如果想响应用户使用Response。 如果需要进一步使用工具获取答案使用Plan。 ) replanner_prompt ChatPromptTemplate.from_template( 针对给定目标提出一个简单的分步计划。 这个计划应包含单独的任务如果正确执行将产生正确答案。 不要添加任何多余的步骤。 最后一步的结果应该是最终答案。 确保每个步骤都包含所需的所有信息 - 不要跳过步骤。 你的原始目标是{input} 你的原始计划是{plan} 你目前已完成以下步骤{past_steps} 相应地更新你的计划。如果不需要更多步骤并且可以返回给用户那么就用那个响应。 否则填写计划。只添加计划中仍然需要完成的步骤。不要将之前完成的步骤作为计划的一部分返回。 ) replanner replanner_prompt | llm.with_structured_output(Act)4. 构建完整工作流4.1 图结构定义将各个节点组合成完整的工作流from langgraph.graph import StateGraph, START workflow StateGraph(PlanExecute) # 添加规划节点 workflow.add_node(planner, plan_step) # 添加执行节点 workflow.add_node(agent, execute_step) # 添加重新规划节点 workflow.add_node(replan, replan_step) # 从开始到规划节点 workflow.add_edge(START, planner) # 从规划到执行节点 workflow.add_edge(planner, agent) # 从执行到重新规划节点 workflow.add_edge(agent, replan) # 条件边控制流程 workflow.add_conditional_edges( replan, should_end, [agent, END], ) app workflow.compile()4.2 执行流程完整的执行流程如下用户提出目标目标被拆分成多个步骤开始执行第一个步骤执行完成后将智能体的回答加入到past_steps中使用replan更新计划如果已完成目标则结束否则更新计划后循环到第3步执行4.3 实际应用示例让我们看一个实际的市场调研报告生成示例inputs { input: 调研2024年新能源汽车市场的三大趋势并给出简要分析 } async def main(inputs, config): async for event in app.astream(inputs, configconfig): for k, v in event.items(): if k ! __end__: print(v) config {recursion_limit: 50} asyncio.run(main(inputs, config))这个智能体会自动执行以下步骤识别新能源汽车市场的关键指标搜索最新的市场报告和数据分析并总结出三大趋势验证信息的准确性生成结构化的调研报告如果在任何步骤中发现信息不足或分析不充分智能体会自动调整计划补充新的调研任务直到生成满意的报告为止。5. 高级技巧与优化建议5.1 性能优化对于复杂的任务可以考虑以下优化策略并行执行修改图结构使不依赖的任务可以并行执行缓存机制对重复的子任务结果进行缓存模型选择对规划节点使用强大但昂贵的模型对执行节点使用经济型模型5.2 错误处理健壮的智能体需要处理各种异常情况async def safe_execute_step(state: PlanExecute): try: return await execute_step(state) except Exception as e: return { error: str(e), past_steps: [(failed_step, fStep failed with error: {str(e)})] }5.3 人机协作集成人工审核节点在关键决策点引入人工干预async def human_review_step(state: PlanExecute): important_data state.get(critical_data) send_for_review(important_data) await wait_for_review() return { human_feedback: get_review_result() }5.4 评估与监控建立评估机制来监控智能体性能def evaluate_performance(state: PlanExecute): steps_taken len(state[past_steps]) time_used time.time() - state[start_time] accuracy calculate_accuracy(state[response]) return { performance: { steps: steps_taken, time: time_used, accuracy: accuracy } }6. 实际案例市场调研智能体让我们深入构建一个市场调研智能体的具体实现6.1 状态定义class MarketResearchState(TypedDict): research_topic: str objectives: List[str] data_sources: List[str] collected_data: Dict[str, Any] analysis: str report: str validation: List[str] completed: bool6.2 规划节点research_planner_prompt ChatPromptTemplate.from_messages([ (system, 你是一个市场调研专家。针对给定的调研主题请制定一个详细的调研计划。 计划应包含 1. 明确的调研目标3-5个 2. 推荐的数据来源 3. 分析方法 4. 报告结构建议), (user, {research_topic}) ]) class ResearchPlan(BaseModel): objectives: List[str] data_sources: List[str] analysis_methods: List[str] report_structure: List[str] research_planner research_planner_prompt | llm.with_structured_output(ResearchPlan)6.3 数据收集节点async def collect_data(state: MarketResearchState): tools [ TavilySearchResults(max_results5), FinancialDataTool(), IndustryReportTool() ] agent create_react_agent(llm, tools) collected {} for source in state[data_sources]: result await agent.ainvoke({ messages: [(user, f收集关于{state[research_topic]}的{source}数据)] }) collected[source] result[messages][-1].content return {collected_data: collected}6.4 分析节点async def analyze_data(state: MarketResearchState): analysis_prompt ChatPromptTemplate.from_messages([ (system, 你是一个资深市场分析师。请根据以下数据进行分析 调研主题{research_topic} 调研目标{objectives} 数据 {collected_data} 请提供详细的分析重点关注趋势、模式和关键发现。), ]) analyzer analysis_prompt | llm analysis await analyzer.ainvoke(state) return {analysis: analysis.content}6.5 报告生成节点async def generate_report(state: MarketResearchState): report_prompt ChatPromptTemplate.from_messages([ (system, 根据以下分析生成专业市场调研报告 调研主题{research_topic} 分析结果 {analysis} 请按照以下结构组织报告 1. 执行摘要 2. 主要发现 3. 市场趋势 4. 建议 5. 结论), ]) reporter report_prompt | llm report await reporter.ainvoke(state) return {report: report.content, completed: True}6.6 构建完整工作流research_workflow StateGraph(MarketResearchState) research_workflow.add_node(plan, research_plan_step) research_workflow.add_node(collect, collect_data) research_workflow.add_node(analyze, analyze_data) research_workflow.add_node(report, generate_report) research_workflow.add_edge(START, plan) research_workflow.add_edge(plan, collect) research_workflow.add_edge(collect, analyze) research_workflow.add_edge(analyze, report) research_workflow.add_edge(report, END) research_app research_workflow.compile()这个市场调研智能体可以处理复杂的调研任务自动完成从规划到报告生成的全过程大大提高了市场调研的效率和准确性。