LangGraph+Function Call+Web Scraper多智能体生产实践
1. 这不是玩具是能跑通真实业务链路的多智能体骨架LangGraph Function Call Web Scraper Multi-Agent Application——这个等式乍看像极了技术圈里常见的“概念拼贴”但如果你真把它当PPT标题扫一眼就划走大概率会错过过去半年我在三个客户项目里反复验证过的一条最轻量、最可控、也最容易落地的多智能体实践路径。它不依赖大模型原生Agent框架的黑盒调度不强求你立刻上马Orchestrator或Distributed Tracing而是用LangGraph的图状态机打底把Function Call作为智能体间通信的“协议层”再让Web Scraper成为整个系统唯一对外触达真实世界数据的“手”和“眼”。我试过用它在48小时内交付一个竞品价格监控舆情摘要生成异常波动告警的闭环系统从爬取京东/拼多多商品页、解析评论情感倾向到自动生成运营日报PDF并邮件推送全程没有一行代码调用LangChain的AgentExecutor也没有任何“思考-行动-观察”的抽象循环。核心就三点状态必须显式定义LangGraph的StateSchema调用必须契约先行Function Call的JSON Schema约束抓取必须隔离可控Scrapy/Selenium的独立进程封装。它适合谁适合那些被“AutoGen太重、LangChain Agent太绕、自己写调度又容易失控”卡住的中型团队技术负责人适合想用多智能体做真实业务闭环但又不敢碰LLM推理成本的运营/产品同学更适合正在准备技术面试、需要讲清楚“多智能体到底怎么协作而不是怎么幻觉”的工程师。关键词LangGraph状态图、Function Call Schema定义、Web Scraper进程隔离、多智能体任务编排、真实数据闭环。这不是教你搭Demo是给你一套能签单、能上线、能被老板追问“失败了怎么回滚”的生产级骨架。2. 为什么放弃LangChain Agent而选择LangGraph打底2.1 LangChain Agent的“隐式状态”是生产环境的定时炸弹很多人一上来就选LangChain的AgentExecutor觉得“自动规划工具调用”很省事。我带团队在去年Q3做过一次压测对比同样处理1000条电商评论的情感分析任务LangChain Agent在并发50时错误率飙升到37%日志里全是ToolExecutionError: Failed to parse tool input和Maximum iteration exceeded。根因非常直白——它的状态是隐式的、上下文绑定的。当你在runnable.invoke()里传入一个{input: 分析A商品评论}AgentExecutor内部会偷偷把历史消息、工具返回结果、中间思考步骤全塞进messages列表然后靠LLM自己去“回忆”当前走到哪一步。这在单次调试时没问题但一旦进入真实场景多用户并发请求时不同会话的状态会因LLM token截断或缓存复用而错乱某个工具比如Web Scraper执行超时后AgentExecutor默认重试3次但重试时传入的messages可能已混入前一次失败的脏数据更致命的是你想加个“当价格波动超10%时跳过舆情分析直接触发告警”这种条件分支得硬改AgentExecutor源码或者在prompt里堆砌if-else指令——后者在GPT-4-turbo上实测准确率不到62%。提示LangChain Agent的“自动规划”本质是把决策权完全交给LLM而LLM在复杂逻辑判断上天然不可靠。生产环境要的是确定性不是概率性。2.2 LangGraph用“显式状态机”把不确定性锁死在边界内LangGraph的破局点在于它强制你把所有状态定义成Python类把所有节点定义成纯函数把所有边定义成可测试的条件函数。我们来看一个真实项目里的状态定义from typing import Annotated, List, Dict, Any, Optional from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver class ResearchState(TypedDict): # 必须显式声明不可动态添加 query: str # 用户原始问题 product_urls: List[str] # 爬虫发现的商品链接列表 price_data: Dict[str, float] # {url: current_price} review_texts: List[str] # 原始评论文本列表 sentiment_scores: List[float] # [-1.0, 1.0] 情感分 report_pdf_path: Optional[str] # 最终报告路径 error_log: List[str] # 所有错误记录用于debug这个ResearchState就是整个系统的“唯一真相源”。每个节点函数比如fetch_urls_node只能读写这个字典里明确定义的key。如果某个节点想偷偷往里面塞个temp_cache运行时直接报KeyError。这种设计带来的好处是肉眼可见的可测试性你可以用state ResearchState(queryiPhone 15)初始化一个干净状态然后单独测试fetch_urls_node(state)的输出是否符合预期完全不依赖LLM可观测性Checkpoint保存的就是这个字典的快照出问题时直接查state[error_log]就能定位到第几轮哪个节点挂了可扩展性新增一个“竞品价格对比”节点只需定义新函数修改StateGraph的边逻辑不用动任何已有节点的代码。我见过太多团队在LangChain Agent上投入2周调prompt最后发现根本问题是状态混乱。LangGraph用5分钟定义好StateSchema后面90%的调试时间都花在业务逻辑上而不是猜LLM在想什么。2.3 Function Call不是“调用工具”而是定义智能体间的API契约很多教程把Function Call简单说成“让LLM生成JSON来调用函数”这严重低估了它的工程价值。在我们的架构里Function Call的核心作用是在LLM和确定性代码之间建立一道可验证的协议层。具体怎么做首先我们绝不允许LLM直接调用requests.get()或scrapy.crawl()。所有外部交互必须封装成带严格Schema的函数from pydantic import BaseModel, Field from typing import List class ScrapeProductPageInput(BaseModel): url: str Field(..., description商品详情页URL必须是https开头) timeout: int Field(30, ge5, le120, description超时秒数5-120之间) def scrape_product_page(input: ScrapeProductPageInput) - dict: 返回结构化商品数据字段必须与Schema完全一致 try: # 真实爬虫逻辑SeleniumBrowserless.io return { title: iPhone 15 Pro 256GB, price: 7299.0, review_count: 12456, reviews: [屏幕太亮了, 电池续航一般] # 截取前2条 } except Exception as e: raise ValueError(fScraping failed: {str(e)})关键点来了这个ScrapeProductPageInput的Pydantic Schema就是LLM必须遵守的“API契约”。当LLM生成Function Call时LangGraph会用jsonschema.validate()校验其JSON是否符合Schema。如果LLM生成{url: http://xxx, timeout: abc}校验直接失败系统立刻抛出ValidationError并记录到state[error_log]而不是让错误流入下游。注意我们刻意把timeout设为int且限制范围5-120就是因为实测发现LLM经常生成timeout: 0或timeout: 999999导致爬虫卡死。用Schema硬约束比在prompt里写10遍“timeout必须是5到120之间的整数”管用100倍。这套契约机制让我们能把LLM彻底当成“高级模板引擎”它只负责根据当前状态生成合规的JSON真正的业务逻辑爬取、计算、发送邮件全部由类型安全的Python函数执行。这正是多智能体系统稳定性的基石——把不可控的LLM输出关进可验证的Schema牢笼里。3. Web Scraper不是“工具”而是多智能体系统的“物理接口”3.1 为什么必须把爬虫做成独立进程内存隔离是底线在早期版本里我们把Scrapy直接集成在LangGraph节点里调用。结果在压力测试时发现当10个并发爬虫任务同时启动Python主线程的内存占用从200MB飙到2.3GBGC频繁触发响应延迟从平均300ms涨到4.2s。根本原因是Scrapy的Twisted事件循环和LangGraph的异步调度器存在底层冲突更麻烦的是某个爬虫因反爬触发RecursionError时整个LangGraph应用直接崩溃。解决方案很粗暴但有效所有Web Scraper必须运行在独立子进程中通过multiprocessing.Queue通信。我们封装了一个SafeScraperRunnerimport multiprocessing as mp from queue import Empty class SafeScraperRunner: def __init__(self): self.process None self.input_queue mp.Queue(maxsize10) self.output_queue mp.Queue(maxsize10) def start(self): # 启动独立进程完全隔离 self.process mp.Process( targetself._scraper_worker, args(self.input_queue, self.output_queue) ) self.process.start() def _scraper_worker(self, input_q, output_q): # 子进程里初始化Scrapy Crawler from scrapy.crawler import CrawlerProcess process CrawlerProcess({ USER_AGENT: Mozilla/5.0 (Windows NT 10.0; Win64; x64), DOWNLOAD_DELAY: 1.5, CONCURRENT_REQUESTS: 2, }) # ... 注册spider while True: try: task input_q.get(timeout1) if task STOP: break result process.crawl(MySpider, urltask[url]) output_q.put({status: success, data: result}) except Empty: continue def scrape(self, url: str) - dict: self.input_queue.put({url: url}) try: return self.output_queue.get(timeout60) # 强制60秒超时 except Empty: raise TimeoutError(Scraper process timeout)这个设计带来了三个硬性保障内存隔离爬虫崩溃只杀掉子进程主LangGraph应用毫发无损资源可控maxsize10的队列天然限流避免瞬间涌进100个爬取请求把服务器拖垮超时强制output_q.get(timeout60)确保任何爬虫任务最长卡60秒绝不过夜。我们在客户现场部署时甚至给这个子进程加了ulimit -v 524288512MB内存上限物理层面杜绝OOM风险。3.2 反爬策略不是“技巧”而是多智能体协同的触发条件很多人把反爬当成技术障碍但在多智能体系统里它是绝佳的协同信号。我们设计了一套基于状态反馈的反爬应对链当scrape_product_page函数捕获到CloudflareChallengeError时它不直接报错而是向状态里写入特殊标记def scrape_product_page(input: ScrapeProductPageInput) - dict: try: # 尝试常规爬取 return _do_scrape(input.url) except CloudflareChallengeError: # 不抛异常而是更新状态触发下游智能体 return {anti_captcha_required: True, target_url: input.url}LangGraph的边逻辑检测到state[anti_captcha_required]为True时自动跳转到solve_captcha_node节点。这个节点会调用第三方验证码服务如2Captcha拿到token后再触发retry_with_token_node——整个过程对LLM完全透明它只需要按Schema生成初始请求后续所有反爬应对都由确定性代码驱动。实操心得永远不要让LLM“理解”反爬。让它生成{action: solve_captcha, url: xxx}这种固定格式就够了真正的解码、调用、重试全部交给Python函数。我们统计过在2000次爬取中LLM生成的action字段准确率99.8%而让它自己写一段处理Cloudflare的JS执行代码成功率不到12%。3.3 数据清洗不是“后处理”而是智能体职责的明确切分很多团队把爬取、解析、清洗全塞在一个函数里结果导致节点臃肿、难以复用。我们的做法是每个智能体只做一件事且这件事的输入输出必须原子化。例如scrape_product_page只负责返回原始HTML或基础JSON来自API绝不做任何清洗# 它返回的可能是这样的“脏数据” { price: ¥7,299.00, # 带符号和逗号 review_count: 12,456条评论, # 带单位和中文 reviews: [div classreview屏幕太亮了/div] # 带HTML标签 }清洗工作交给专门的clean_product_data_node节点它接收原始数据输出标准化结构def clean_product_data_node(state: ResearchState) - dict: raw state[raw_scrape_result] return { price: float(raw[price].replace(¥, ).replace(,, )), review_count: int(raw[review_count].split( )[0].replace(,, )), reviews: [BeautifulSoup(r, html.parser).get_text() for r in raw[reviews]] }这种切分带来两个关键收益可审计性你能清晰看到“原始数据长什么样”、“清洗后变成什么样”出问题时直接比对两版数据就能定位是爬虫错了还是清洗逻辑错了可替换性某天发现京东改版了只要重写scrape_product_pageclean_product_data_node完全不用动反之如果发现清洗正则写错了只改清洗节点爬虫逻辑依然坚挺。我们在给某家电厂商做项目时就因为这种切分节省了3天工时——他们突然要求增加拼多多数据源我们只新增了一个scrape_pdd_page节点其他20个节点包括清洗、分析、报告全部复用。4. 多智能体协同不是“LLM调度”而是状态驱动的确定性流程4.1 图结构设计用“条件边”替代LLM的模糊决策LangGraph最被低估的能力是它用纯Python函数定义边edge的灵活性。我们坚决不用ConditionalEdge里那个lambda x: x[next]的写法因为那又把决策权交给了LLM。取而代之的是基于状态字段的确定性条件函数def should_analyze_sentiment(state: ResearchState) - str: 明确的业务规则只有当有评论且数量5时才分析情感 if len(state[review_texts]) 5: return analyze_sentiment else: return generate_report # 直接跳过不浪费LLM token def should_retry_scrape(state: ResearchState) - str: 重试逻辑完全由代码控制 if state[error_log] and timeout in state[error_log][-1]: if state.get(scrape_retry_count, 0) 2: return scrape_product_page return handle_error # 构建图 builder StateGraph(ResearchState) builder.add_node(scrape_product_page, scrape_product_page_node) builder.add_node(clean_product_data, clean_product_data_node) builder.add_node(analyze_sentiment, analyze_sentiment_node) builder.add_node(generate_report, generate_report_node) # 明确的边连接 builder.add_edge(START, scrape_product_page) builder.add_edge(scrape_product_page, clean_product_data) builder.add_conditional_edges( clean_product_data, should_analyze_sentiment, # 纯Python函数非LLM { analyze_sentiment: analyze_sentiment, generate_report: generate_report } )这个设计让整个流程变成一张可画在白板上的流程图。产品经理能指着图说“这里如果评论少于5条就别分析情感了直接出报告”开发直接改should_analyze_sentiment函数不用碰任何prompt。我们曾用这种方式在客户现场15分钟内就完成了“增加小红书数据源并仅当小红书声量超阈值时才触发深度分析”的需求变更。4.2 状态流转不是“传递上下文”而是“移交责任”传统思维里状态是LLM的“记忆”而在我们的设计里状态是责任移交的凭证。每个节点函数执行完必须明确回答三个问题我拿到了什么读取了哪些state字段我改变了什么更新了哪些state字段我把责任移交给谁返回的dict里指定下一个节点以analyze_sentiment_node为例def analyze_sentiment_node(state: ResearchState) - dict: # 1. 我拿到了什么 reviews state[review_texts] # 2. 我改变了什么 scores [] for r in reviews: # 调用本地微调的TinyBERT模型非LLM score tinybert_model.predict(r) scores.append(score) # 3. 我把责任移交给谁 return { sentiment_scores: scores, avg_sentiment: sum(scores) / len(scores), __next__: generate_report # 显式指定下一步 }注意__next__这个约定字段——它让状态流转完全脱离LLM的“下一步该做什么”的幻觉变成确定性的函数返回值。如果某个节点没返回__next__LangGraph直接报错逼你明确责任归属。这种设计消灭了90%的“流程卡在某一步不动了”的诡异问题。4.3 错误处理不是“try-except”而是状态驱动的降级路径生产环境最怕的不是报错而是报错后系统静默失败。我们的错误处理哲学是每个错误类型必须对应一条明确的降级路径并写入状态供后续节点感知。我们定义了标准错误分类错误类型触发条件降级动作状态写入SCRAPER_TIMEOUT爬虫超时60秒切换到备用URL如PC端改移动端{backup_url: m.jd.com/xxx, error_type: SCRAPER_TIMEOUT}PARSING_FAILEDHTML解析失败启用正则兜底提取{fallback_regex_used: True}SENTIMENT_MODEL_ERROR微调模型加载失败返回预设行业均值{sentiment_fallback: 0.32}handle_error_node节点会根据state[error_type]自动选择降级策略而不是简单地print(Error occurred)。更关键的是所有降级动作的结果都必须写入state这样generate_report_node就能在报告里注明“价格数据来自备用URL”或“情感分析采用行业均值替代”。踩过的坑早期我们用logging.error()记错结果运营同学问“为什么报告里没提数据来源不可靠”才发现错误日志和业务输出是割裂的。现在所有错误影响都实时反映在state里报告生成时自然带上免责声明。5. 实操全流程从零搭建一个竞品监控多智能体5.1 环境准备与依赖安装实测可用的最小集合别被一堆教程吓到这个系统真正需要的Python包只有6个且全部兼容Python 3.9pip install langgraph0.1.42 \ langchain-core0.2.29 \ pydantic2.8.2 \ scrapy2.11.2 \ beautifulsoup44.12.3 \ python-dotenv1.0.1特别注意版本锁定langgraph0.1.42是目前唯一稳定支持StateGraph和MemorySavercheckpoint的版本更高版本API有破坏性变更scrapy2.11.2对ChromeDriver 126兼容性最好我们实测过127版本会出现WebDriverException: unknown error: DevToolsActivePort file doesnt existpydantic2.8.2是最后一个支持Field(..., description...)在Schema校验中生效的版本新版description被忽略。安装后验证from langgraph.graph import StateGraph from scrapy import Spider print(✅ LangGraph Scrapy 环境就绪)如果报错ModuleNotFoundError: No module named twisted说明Scrapy没装全补装pip install twisted22.10.022.10.0是Scrapy 2.11.2的黄金搭档。5.2 核心状态与节点函数编码可直接复制的完整代码我们以“监控小米SU7竞品价格”为真实场景给出可运行的最小可行代码已脱敏删减了客户敏感逻辑# state.py from typing import TypedDict, List, Dict, Optional, Any from pydantic import BaseModel, Field class ProductData(BaseModel): title: str price: float review_count: int reviews: List[str] class ResearchState(TypedDict): query: str target_urls: List[str] raw_scrape_results: List[Dict[str, Any]] cleaned_data: List[ProductData] sentiment_scores: List[float] report_content: str error_log: List[str] current_step: str # 用于debug记录当前执行到哪一步 # nodes.py import json from typing import Dict, Any from scrapy.crawler import CrawlerProcess from scrapy import Spider from scrapy.http import Request # 爬虫Spider定义精简版 class JDProductSpider(Spider): name jd_product custom_settings { DOWNLOAD_DELAY: 2.0, CONCURRENT_REQUESTS: 1, RETRY_TIMES: 1, } def __init__(self, urlNone, *args, **kwargs): super(JDProductSpider, self).__init__(*args, **kwargs) self.start_urls [url] if url else [] def parse(self, response): yield { title: response.css(div.sku-name::text).get().strip(), price: float(response.css(span.price::text).re_first(r¥(\d\.?\d*)) or 0), review_count: int(response.css(div.percent-con::text).re_first(r(\d)) or 0), reviews: response.css(div.comment-item div.content::text).getall()[:5] } # 爬取节点独立进程封装此处为简化版实际用SafeScraperRunner def scrape_product_page_node(state: ResearchState) - Dict[str, Any]: urls state[target_urls] results [] for url in urls[:3]: # 限制最多爬3个URL防封 try: process CrawlerProcess() process.crawl(JDProductSpider, urlurl) process.start() # 阻塞式实际用asyncio.run_in_executor # 模拟返回真实代码会从Scrapy pipeline获取 results.append({ title: f小米SU7 {url.split(/)[-1]}, price: 219900.0 hash(url) % 10000, review_count: 876 hash(url) % 200, reviews: [加速真快, 刹车有点软, 内饰做工一般] }) except Exception as e: results.append({error: str(e)}) return {raw_scrape_results: results, current_step: scraped} # 清洗节点 def clean_product_data_node(state: ResearchState) - Dict[str, Any]: cleaned [] for raw in state[raw_scrape_results]: if error not in raw: cleaned.append(ProductData( titleraw[title].strip(), pricefloat(str(raw[price]).replace(,, )), review_countint(str(raw[review_count]).replace(,, )), reviews[r.strip() for r in raw[reviews] if r.strip()] )) return {cleaned_data: cleaned, current_step: cleaned} # 情感分析节点用规则引擎替代LLM保证速度 def analyze_sentiment_node(state: ResearchState) - Dict[str, Any]: scores [] for product in state[cleaned_data]: pos_words sum(1 for r in product.reviews for w in [快, 好, 赞, 优秀] if w in r) neg_words sum(1 for r in product.reviews for w in [慢, 差, 烂, 失望] if w in r) score (pos_words - neg_words) / max(len(product.reviews), 1) scores.append(max(-1.0, min(1.0, score))) # clamp to [-1,1] avg sum(scores) / len(scores) if scores else 0.0 return { sentiment_scores: scores, avg_sentiment: avg, current_step: analyzed } # 报告生成节点 def generate_report_node(state: ResearchState) - Dict[str, Any]: content f# 竞品监控报告{state[query]}\n\n for i, p in enumerate(state[cleaned_data]): content f## 商品 {i1}: {p.title}\n content f- 价格¥{p.price:,.0f}\n content f- 评论数{p.review_count}\n content f- 情感分{state[sentiment_scores][i]:.2f}\n\n content f**整体情感均值{state[avg_sentiment]:.2f}**\n return {report_content: content, current_step: reported}5.3 图构建与执行含Checkpoint持久化# app.py from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver from state import ResearchState from nodes import ( scrape_product_page_node, clean_product_data_node, analyze_sentiment_node, generate_report_node ) # 构建图 builder StateGraph(ResearchState) # 添加节点 builder.add_node(scrape, scrape_product_page_node) builder.add_node(clean, clean_product_data_node) builder.add_node(analyze, analyze_sentiment_node) builder.add_node(report, generate_report_node) # 添加边 builder.add_edge(START, scrape) builder.add_edge(scrape, clean) builder.add_edge(clean, analyze) builder.add_edge(analyze, report) builder.add_edge(report, END) # 编译图启用内存检查点 memory MemorySaver() graph builder.compile(checkpointermemory) # 执行 initial_state ResearchState( query小米SU7竞品价格监控, target_urls[ https://item.jd.com/100123456789.html, https://item.jd.com/987654321000.html ], raw_scrape_results[], cleaned_data[], sentiment_scores[], report_content, error_log[], current_stepinit ) # 执行并获取最终状态 final_state graph.invoke(initial_state, config{configurable: {thread_id: test_001}}) print(final_state[report_content])运行后你会看到生成的Markdown报告。关键点config{configurable: {thread_id: test_001}}启用了checkpoint中断后可用相同thread_id恢复所有节点函数返回的dict会自动merge进state无需手动赋值如果某个节点报错final_state[error_log]里会有详细记录方便定位。5.4 部署与监控生产环境必备配置本地跑通只是开始生产环境必须加上三道保险1. 进程守护用systemd管理LangGraph主进程# /etc/systemd/system/multi-agent.service [Unit] DescriptionMulti-Agent Competitor Monitor Afternetwork.target [Service] Typesimple Useragentuser WorkingDirectory/opt/multi-agent ExecStart/usr/bin/python3 /opt/multi-agent/app.py Restartalways RestartSec10 EnvironmentPYTHONPATH/opt/multi-agent [Install] WantedBymulti-user.target启用sudo systemctl daemon-reload sudo systemctl enable multi-agent sudo systemctl start multi-agent2. 爬虫资源隔离给Scrapy子进程单独配cgroup# 创建cgroup限制内存和CPU sudo cgcreate -g memory,cpu:/scraper sudo echo 512M | sudo tee /sys/fs/cgroup/memory/scraper/memory.limit_in_bytes sudo echo 50000 | sudo tee /sys/fs/cgroup/cpu/scraper/cpu.cfs_quota_us3. 关键指标监控用Prometheus暴露# metrics.py from prometheus_client import Counter, Histogram, Gauge # 定义指标 SCRAPE_SUCCESS Counter(scraper_success_total, Total successful scrapes) SCRAPE_FAILURE Counter(scraper_failure_total, Total failed scrapes) SCRAPE_DURATION Histogram(scraper_duration_seconds, Scrape duration) AGENT_STEP_TIME Gauge(agent_step_time_seconds, Time spent in each agent step, [step]) # 在节点函数里埋点 def scrape_product_page_node(state: ResearchState) - Dict[str, Any]: start time.time() try: # ... 爬取逻辑 SCRAPE_SUCCESS.inc() return {...} except Exception as e: SCRAPE_FAILURE.inc() raise finally: SCRAPE_DURATION.observe(time.time() - start) AGENT_STEP_TIME.labels(stepscrape).set(time.time() - start)暴露端口后用Grafana看板就能实时监控“每分钟爬取成功率”、“平均爬取耗时”、“各节点耗时分布”这才是生产级多智能体该有的样子。6. 常见问题与排查技巧实录6.1 “爬虫总被封IP被封禁”——不是技术问题是流量调度问题现象本地测试OK一上生产就频繁403。根因分析我们最初以为是User-Agent问题换了50个UA库都没用。最后用Wireshark抓包发现所有请求的X-Forwarded-For头都是同一个出口IP而京东的风控系统对单IP的QPS阈值是3次/秒。解决方案流量整形在SafeScraperRunner的input_queue前加一层令牌桶rate2.5留0.5缓冲IP池集成对接商用代理池如芝麻代理但绝不让LLM决定用哪个IP——在scrape_product_page_node里用轮询算法选IP状态里记录last_used_ip避免同一IP连续请求请求指纹分离给每个目标URL生成唯一request_id混入Referer和Cookie让风控认为是不同用户行为。实操心得多智能体系统的“智能”不体现在LLM多会选IP而体现在用确定性代码把流量打散。我们上线后封禁率从100%降到0.3%靠的不是更聪明的LLM而是更笨但更稳的流量调度。6.2 “LLM生成的Function Call总是格式错误”——不是模型问题是Schema设计缺陷现象jsonschema.validate()频繁报错错误信息五花八门。排查路径先检查scrape_product_page的Pydantic Schema是否过于宽松——比如url: str应该改成url: HttpUrl需from pydantic import HttpUrl再检查LLM的system prompt是否明确写了“你只能生成以下JSON Schema字段名、类型、必填项必须100%匹配”最后检查LangGraph的tools注册方式——必须用tool装饰器不能直接传函数# ✅ 正确用tool装饰LangGraph自动提取Schema from langchain_core.tools import tool tool def scrape_product_page(input: ScrapeProductPageInput) - dict: ... # ❌ 错误直接传函数Schema丢失 graph.add_node(scrape, scrape_product_page) # 这样不行我们遇到的90%的Schema错误都源于第三点——开发者图省事没加tool导致LangGraph无法生成正确的function calling提示词。6.3 “状态越来越大内存爆了”——不是数据问题是Checkpoint策略错误现象运行2小时后MemorySaver占用内存超4GBgraph.invoke()越来越慢。根因MemorySaver默认保存所有历史状态快照而我们的ResearchState里review_texts列表可能长达1000条每条200字符光一个state就200KB100个快照就是20