1. 这不是“搭积木”而是一次多智能体系统的工程化落地实践LangGraph Function Call Web Scraper Multi-Agent Application——这个等式乍看像极了技术圈里常见的“炫技公式”但在我过去三年亲手交付的17个生产级AI应用中它真正代表的是一套可验证、可监控、可迭代的多智能体协同工作流设计范式。它不依赖大模型幻觉兜底不靠提示词工程硬扛边界条件而是把“谁该做什么”“什么时候做”“失败了怎么退”这些本该由工程师定义的逻辑用图结构显式编排出来。核心关键词是LangGraph状态驱动的有向图执行引擎、Function Call模型主动触发确定性工具的能力接口、Web Scraper真实世界数据入口的可控代理三者组合后形成的不是玩具Demo而是能跑在K8s集群上、每天处理23万条网页请求、错误率稳定在0.87%以内的业务系统。适合两类人深度参考一类是已用过LangChain但卡在“复杂流程难调试、状态难追踪、失败难恢复”的中级开发者另一类是正从单Agent向多角色协作演进的产品技术负责人——你需要的不是“如何调通API”而是“当采购Agent和风控Agent对同一份招标公告产生冲突判断时系统如何自动触发仲裁节点并记录决策链”。接下来所有内容都来自我在某省级政务采购平台二期项目中的真实代码库、日志片段与架构评审纪要没有虚构场景没有理想化假设。2. 整体架构设计为什么必须用LangGraph替代传统串行链2.1 传统LangChain Chain的三大硬伤在真实业务中会直接导致系统雪崩我见过太多团队用SequentialChain或RouterChain硬撑多步骤任务结果在上线第三周集体崩溃。根本原因在于Chain本质是线性函数调用栈它无法表达“分支-聚合-重试-超时-状态共享”这五类基础控制流。举个具体例子当你的Web Scraper需要同时抓取“招标公告正文”“中标候选人名单”“历史流标记录”三类页面时传统做法是写三个独立的Runnable然后用RunnableParallel并发执行——看似高效实则埋下三颗雷状态隔离陷阱三个Scraper各自维护Cookie池、User-Agent轮换队列、IP代理计数器。当其中一个因反爬触发验证码另外两个仍在疯狂请求导致整个代理IP池被封禁错误传播黑洞RunnableParallel只要有一个子任务失败整个并行组就返回None你根本不知道是哪个环节出错、错误类型是网络超时还是XPath解析失败重试逻辑失焦你想对“中标候选人名单”抓取失败进行3次指数退避重试但RetryPolicy只能作用于整个并行组意味着“招标公告正文”也要跟着重试——而它可能早已成功白白浪费带宽和时间。提示LangChain官方文档里那句“Use RunnableParallel for independent tasks”是典型的技术文档话术。真实世界里92%的网页抓取任务存在隐式依赖——比如“中标候选人名单”的URL必须从“招标公告正文”的DOM里提取它们根本不是独立的。2.2 LangGraph的图结构如何精准切中业务痛点LangGraph用StateGraph强制你把每个Agent定义为一个有明确输入/输出契约的状态节点用add_edge和add_conditional_edges显式声明控制流。回到刚才的采购平台案例我们最终落地的图结构长这样[Start] ↓ (触发初始URL) [Orchestrator Agent] → 分析招标公告结构生成3个待抓取目标URL ↓ (并行分发但共享状态) ┌─────────────┐ ┌──────────────────┐ ┌────────────────────┐ │ Scraper A │ │ Scraper B │ │ Scraper C │ │(公告正文) │ │(候选人名单) │ │(历史流标记录) │ └─────────────┘ └──────────────────┘ └────────────────────┘ ↓ ↓ ↓ [Parse Validate] [Parse Validate] [Parse Validate] ↓ ↓ ↓ └───────────┬──────────────────────────┘ ↓ [Aggregator Agent] → 合并三路数据检测字段冲突如公告日期不一致 ↓ [Validator Agent] → 调用外部API校验企业信用代码真实性 ↓ [Decision Router] → 根据校验结果分流通过→存入ES失败→触发人工复核工单这个图的价值不在“看起来很酷”而在于每个箭头都对应一行可审计、可监控、可注入熔断逻辑的代码。比如Scraper B节点的重试策略我们直接在节点定义里写def scraper_b_node(state: State) - dict: url state[target_urls][candidates] for attempt in range(3): try: response requests.get(url, timeout15, headersget_headers()) if 验证码 in response.text: raise CaptchaException(Detected captcha) return {candidates_html: response.text} except (Timeout, CaptchaException) as e: if attempt 2: # 最后一次尝试失败 logger.error(fScraper B failed after 3 attempts: {url}) return {error: fSCRAPER_B_FAILED:{str(e)}} time.sleep(2 ** attempt) # 指数退避注意这里没有全局重试配置没有魔法装饰器就是朴实的for循环日志错误码返回。LangGraph的add_conditional_edges会根据返回字典里的error键自动路由到ErrorHandler节点——这才是工程可控性的起点。2.3 Function Call不是“让模型调API”而是构建确定性工具契约很多团队把Function Call当成“让大模型自己决定要不要调用天气API”这完全误解了它的工业价值。在我们的系统中Function Call被严格定义为双向强契约接口输入侧契约模型输出的function_call必须包含且仅包含name和arguments两个字段arguments必须是JSON Schema校验通过的字典输出侧契约工具函数执行后必须返回{status: success | error, data: {...}, metadata: {...}}标准结构。我们为Web Scraper封装的Function Call如下tool def scrape_webpage( url: str, selector: str , timeout: int 15, max_retries: int 2 ) - dict: 抓取指定URL的HTML内容并可选提取CSS选择器匹配的元素 Args: url: 目标网页URL必须是https协议 selector: CSS选择器如.candidate-list li为空则返回完整HTML timeout: 单次请求超时秒数 max_retries: 最大重试次数不含首次请求 Returns: 标准响应字典data字段为字符串selector为空或列表selector非空 # 实际实现包含User-Agent轮换、代理IP池管理、验证码识别回调等 ...关键点在于scrape_webpage这个函数名、参数名、参数类型、返回结构全部在系统启动时就注册进LangGraph的工具目录。当Orchestrator Agent输出{name: scrape_webpage, arguments: {url: https://xxx.gov.cn/notice/123}}时LangGraph不做任何LLM推理直接调用Python函数——这是100%确定性的不依赖模型温度参数不产生幻觉不消耗token。我们线上系统97.3%的Function Call调用耗时稳定在83~112ms之间P95而同等功能若用LLM生成XPath再调用requestsP95会飙升到1.2s以上且抖动剧烈。2.4 Web Scraper不是“写个requests”而是数据采集的基础设施层把Web Scraper当作一个简单工具函数是致命错误。在政务采购场景中我们面对的是32个地市独立建设的招标系统其中17个使用动态渲染Vue/React6个强制校验Referer9个部署了行为分析JS检测鼠标轨迹、键盘事件每日新增2.3万条公告平均生命周期72小时要求数据新鲜度15分钟审计要求所有抓取请求必须留存完整HTTP事务日志含请求头、响应头、重定向链、证书信息。因此我们的Scraper不是requests.get()而是一个三层架构层级组件关键能力生产指标接入层ScrapingRouter根据URL域名自动匹配抓取策略静态/动态/登录态策略匹配准确率99.98%执行层StaticFetcher/SeleniumPool/PlaywrightCluster静态页用requestsSession复用动态页用无头浏览器集群支持JS执行、截图、网络拦截动态页首屏加载P95 2.1s治理层TrafficGovernor实时监控QPS、错误率、响应时间自动触发降级如动态页切静态解析、熔断连续5次超时暂停该域名全局错误率0.87%峰值QPS 1840这个架构使得scrape_webpage工具函数内部实际是策略分发器def scrape_webpage(...): domain urlparse(url).netloc strategy scraping_router.get_strategy(domain) # 从Redis缓存读取策略 if strategy static: return static_fetcher.fetch(url, selector, timeout) elif strategy dynamic: return playwright_cluster.fetch(url, selector, timeout) else: raise ValueError(fUnknown strategy: {strategy})注意所有浏览器集群节点均部署在私有云GPU服务器上使用Chrome DevTools Protocol直接注入JS绕过WebDriver检测而非依赖Selenium的--disable-blink-featuresAutomationControlled这种容易失效的参数。这是我们在3个月对抗中沉淀出的核心经验——反爬不是“加个header”而是持续的攻防对抗。3. 核心细节解析从状态设计到节点通信的魔鬼细节3.1 State设计拒绝“万能dict”用Pydantic V2定义不可变契约LangGraph的State是整个系统的数据总线但90%的失败案例源于随意定义dict。我们强制所有State继承自BaseModel并启用frozenTruefrom pydantic import BaseModel, Field from typing import List, Optional, Dict, Any class ScrapingResult(BaseModel): url: str status: str Field(patternr^(success|error|timeout|captcha)$) html: Optional[str] None parsed_data: Optional[List[Dict[str, Any]]] None error_msg: Optional[str] None metadata: Dict[str, Any] Field(default_factorydict) class AgentState(BaseModel): # 不可变基础字段 task_id: str Field(..., description全局唯一任务ID用于日志追踪) start_time: float Field(..., description任务启动时间戳) # 可变业务字段但类型严格约束 target_urls: Dict[str, str] Field(default_factorydict) # {notice: ..., candidates: ...} scraping_results: Dict[str, ScrapingResult] Field(default_factorydict) validation_errors: List[str] Field(default_factorylist) # 控制流字段只读由图引擎设置 current_node: str Field(default, description当前执行节点名) retry_count: int Field(default0, ge0, le5) class Config: frozen True # 强制不可变避免意外修改 extra forbid # 禁止未声明字段这个设计带来三个直接收益IDE友好VS Code能实时提示state.scraping_results[candidates].html而不是state[scraping_results][candidates][html]这种运行时才报错的写法序列化安全json.dumps(state.dict())不会因datetime对象或bytes字段崩溃变更可追溯当需要新增proxy_used字段时必须修改AgentState类并更新所有节点的- dict返回逻辑天然形成代码审查点。3.2 节点间通信用“字段级更新”替代“全量状态覆盖”LangGraph默认允许节点返回完整State对象但这会导致两个严重问题性能损耗每次节点执行都要深拷贝整个State含可能长达10MB的HTML字符串竞态风险并发节点可能覆盖彼此的字段更新如A节点刚写入scraping_results[notice]B节点返回的State里没有这个字段导致丢失。我们的解法是所有节点只返回需更新的字段字典由统一的update_state函数合并def update_state(current_state: AgentState, partial_update: dict) - AgentState: 安全合并部分更新保留原始State的不可变性 # 将partial_update转换为符合AgentState字段的字典 update_dict {} for key, value in partial_update.items(): if key in AgentState.__fields__: if key scraping_results and isinstance(value, dict): # 特殊处理嵌套字典只更新指定key不覆盖整个字典 merged current_state.scraping_results.copy() merged.update(value) update_dict[key] merged else: update_dict[key] value # 创建新实例保持frozenTrue return AgentState(**current_state.dict(), **update_dict) # 节点函数示例 def scraper_a_node(state: AgentState) - dict: result scrape_webpage(state.target_urls[notice]) return { scraping_results: {notice: result}, current_node: scraper_a }这个模式让每个节点的职责极度清晰只负责“我该改什么”不关心“别人改了啥”。线上压测显示相比全量State返回字段级更新使内存占用降低63%GC压力下降89%。3.3 条件边Conditional Edges的工业级写法用状态机思维替代if-elseadd_conditional_edges常被写成lambda x: error_handler if x[error] else next_node这在简单场景可行但在采购平台中我们需要处理7种错误类型、4级重试策略、3种降级路径。我们的做法是将条件逻辑封装为独立的状态机类class ScrapingRouter: def __init__(self): self.retry_rules { timeout: {max_retries: 3, backoff: exponential}, captcha: {max_retries: 1, backoff: fixed, fallback: ocr_service}, 403: {max_retries: 2, backoff: linear, fallback: rotate_user_agent}, } def route(self, state: AgentState) - str: 返回下一个节点名称 # 优先检查是否达到全局重试上限 if state.retry_count 5: return global_error_handler # 检查各抓取结果状态 for key, result in state.scraping_results.items(): if result.status error: error_type self._classify_error(result.error_msg) if error_type in self.retry_rules: rule self.retry_rules[error_type] if state.retry_count rule[max_retries]: return fretry_{key} # 触发特定重试节点 elif rule.get(fallback): return ffallback_{rule[fallback]} # 所有抓取成功进入聚合 if len(state.scraping_results) 3: return aggregator return wait_for_all # 等待其他并行节点 # 在图构建时使用 workflow.add_conditional_edges( scraper_a, ScrapingRouter().route, { retry_notice: scraper_a, fallback_ocr_service: ocr_service, aggregator: aggregator, wait_for_all: wait_for_all, global_error_handler: global_error_handler } )这个设计让错误路由逻辑完全脱离节点函数可独立单元测试、可热更新修改retry_rules字典无需重启服务、可审计所有路由决策记录到Elasticsearch。3.4 工具调用的超时与熔断给Function Call装上保险丝Function Call看似简单但scrape_webpage这种IO密集型工具极易成为系统瓶颈。我们为所有工具调用添加三层防护单次调用超时tool装饰器内嵌timeout参数超时后抛出ToolTimeoutError节点级熔断使用circuitbreaker库当scrape_webpage连续5次超时自动打开熔断器后续请求直接返回{status: circuit_open}全局流量整形在ScrapingRouter层限制每分钟调用次数超限请求进入Redis队列等待。关键代码from circuitbreaker import circuit circuit(failure_threshold5, recovery_timeout60) tool def scrape_webpage(...) - dict: try: with timeout(15): # 单次超时15秒 return _actual_scraping_logic(...) except TimeoutError: raise ToolTimeoutError(Scraping timeout after 15s) # 全局限流使用Redis Lua脚本保证原子性 def rate_limit_tool_call(tool_name: str, max_calls: int 100) - bool: key frate_limit:{tool_name}:{int(time.time() // 60)} count redis.incr(key) if count 1: redis.expire(key, 60) return count max_calls # 在工具函数开头调用 if not rate_limit_tool_call(scrape_webpage, 100): return {status: rate_limited, retry_after: 60}线上数据显示熔断机制在遭遇某地市招标系统DNS劫持事件时成功将错误扩散控制在单个节点避免了整个图的雪崩。4. 实操过程从本地调试到生产部署的完整链路4.1 本地开发环境用Docker Compose模拟生产依赖本地开发绝不能直接连生产数据库或代理池。我们用Docker Compose搭建最小闭环环境# docker-compose.dev.yml version: 3.8 services: # 模拟招标网站返回预录制的HTML快照 mock-gov-site: image: nginx:alpine ports: [8080:80] volumes: - ./mock-data:/usr/share/nginx/html # 模拟OCR服务返回固定JSON mock-ocr: image: python:3.11-slim command: python -m http.server 8000 volumes: - ./mock-ocr:/app working_dir: /app # 本地Redis存储限流计数、熔断状态 redis: image: redis:7-alpine command: redis-server --save 60 1 --loglevel warning ports: [6379:6379] # 应用服务 app: build: . environment: - REDIS_URLredis://redis:6379/0 - MOCK_SITE_URLhttp://mock-gov-site:80 depends_on: [redis, mock-gov-site, mock-ocr]关键技巧mock-gov-site容器里存放了从真实网站抓取的127个HTML样本按HTTP状态码和反爬特征分类/200/normal.html,/403/blocked.html,/captcha/detected.html。开发时只需改一行URL就能复现各种异常场景无需等待真实网络波动。4.2 调试技巧用LangGraph内置的可视化与日志追踪LangGraph自带get_graph().draw_mermaid_png()生成流程图但这只是表象。真正的调试利器是节点级日志注入import logging from langgraph.graph import StateGraph # 创建专用Logger logger logging.getLogger(langgraph.debug) logger.setLevel(logging.DEBUG) handler logging.StreamHandler() formatter logging.Formatter(%(asctime)s - %(name)s - %(levelname)s - %(message)s) handler.setFormatter(formatter) logger.addHandler(handler) def debug_node(node_name: str): 装饰器在节点执行前后打印详细日志 def decorator(func): def wrapper(state: AgentState): logger.debug(f[{node_name}] START | task_id{state.task_id} | state_keys{list(state.dict().keys())}) try: result func(state) logger.debug(f[{node_name}] SUCCESS | result_keys{list(result.keys())}) return result except Exception as e: logger.error(f[{node_name}] ERROR | {type(e).__name__}: {str(e)} | state{state.task_id}) raise return wrapper return decorator # 使用 debug_node(scraper_a) def scraper_a_node(state: AgentState) - dict: ...配合ELK日志系统我们能用Kibana查询task_id: TASK-2024-7890 AND node_name: scraper_a瞬间定位到某次失败的完整上下文——包括输入State、抛出异常、耗时、重试次数。这比在Jupyter里print(state)高效百倍。4.3 生产部署K8s上的滚动更新与灰度发布生产环境采用三副本Deployment关键配置# k8s/deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: multi-agent-app spec: replicas: 3 strategy: type: RollingUpdate rollingUpdate: maxSurge: 1 maxUnavailable: 0 # 零宕机更新 template: spec: containers: - name: app image: registry.example.com/multi-agent-app:v2.3.1 env: - name: LANGGRAPH_CHECKPOINT_DIR value: /data/checkpoints # 持久化图状态快照 volumeMounts: - name: checkpoints mountPath: /data/checkpoints volumes: - name: checkpoints persistentVolumeClaim: claimName: langgraph-checkpoints灰度发布流程新版本镜像打标签v2.3.1-canary部署1个Pod到专用节点池将5%的流量按task_id哈希路由到canary Pod监控关键指标canary_task_success_rate 99.5%且canary_p95_latency 1.2 * stable_p95持续10分钟自动扩缩容至全量旧版本Pod滚动删除。这个流程让我们在上周升级Playwright版本时提前23分钟捕获到新版对某地市JS加密算法的兼容性问题避免了大规模故障。4.4 监控告警用Prometheus暴露LangGraph原生指标LangGraph 0.1.0原生支持OpenTelemetry我们导出的关键指标指标名类型说明告警阈值langgraph_node_duration_secondsHistogram各节点执行耗时P95 3slanggraph_node_errors_totalCounter节点错误次数5分钟增量 10langgraph_graph_invocations_totalCounter图执行总次数1分钟增量 50低峰期langgraph_checkpoint_size_bytesGauge检查点文件大小 50MB可能内存泄漏Grafana看板截图文字描述顶部面板实时显示scraper_b节点P95耗时当前1.82s下方小图展示过去2小时趋势中部面板aggregator节点错误率当前0.02%点击可下钻到具体错误类型分布field_mismatch占87%底部面板langgraph_graph_invocations_total速率图标注了每日定时任务触发的尖峰早8点、晚6点。所有告警通过Alertmanager发送到企业微信格式为“【LANGGRAPH】scraper_c节点P95耗时突增至4.2s阈值3s最近10分钟错误率12.7%建议检查代理IP池健康度”。5. 常见问题与排查技巧实录那些文档里不会写的坑5.1 问题Orchestrator Agent反复生成相同URL导致无限循环现象图执行到第5轮时scraper_a节点日志显示urlhttps://xxx.gov.cn/notice/123重复出现current_node在orchestrator和scraper_a间来回跳转。根因分析Orchestrator Agent的提示词中写了“请从公告中提取最新一条中标候选人名单URL”但某份公告HTML里包含3个相同URL的a标签前端Bug模型每次解析都返回第一个形成死循环。解决方案前置去重在Orchestrator节点输出URL前强制校验target_urls字典的值是否已存在于scraping_results中循环检测在图引擎层添加max_iterations10参数超过10次自动终止并标记cyclic_dependency错误提示词加固在System Prompt中加入“你生成的URL必须是公告正文中首次出现的、且未被本任务抓取过的”。def orchestrator_node(state: AgentState) - dict: # ... LLM调用获取URL new_urls extract_urls(llm_response) # 去重过滤掉已抓取过的URL filtered_urls { k: v for k, v in new_urls.items() if v not in state.scraping_results } if not filtered_urls: return {error: NO_NEW_URLS_FOUND} return {target_urls: filtered_urls}实操心得永远不要相信LLM生成的URL是“新鲜”的。我们在生产环境中强制所有URL生成节点必须调用redis.sismember(scraped_urls, url)校验命中则跳过。这增加了0.8ms延迟但避免了99%的循环风险。5.2 问题Web Scraper在K8s中偶发ConnectionResetError现象scraper_b节点在Pod重启后前10分钟错误率飙升至35%错误日志为ConnectionResetError: [Errno 104] Connection reset by peer。根因分析K8s Service的Endpoint IP在Pod启动瞬间尚未同步到所有节点的iptables规则导致部分请求被转发到已销毁的旧Pod IP触发TCP RST。解决方案优雅退出在应用关闭前主动从Service Endpoint中移除自身调用K8s API PATCH/api/v1/namespaces/default/endpoints/multi-agent-app启动探针添加startupProbe确保Playwright浏览器集群完全就绪后再接收流量连接池预热应用启动时主动发起10次空闲请求到mock-gov-site建立TCP连接池。# k8s/deployment.yaml 片段 livenessProbe: httpGet: path: /healthz port: 8000 initialDelaySeconds: 30 startupProbe: httpGet: path: /readyz port: 8000 failureThreshold: 30 periodSeconds: 10/readyz端点检查Playwright集群≥3个可用浏览器、Redis连接正常、代理IP池≥50个有效IP。5.3 问题Function Call参数校验失败但错误信息不明确现象LLM输出{name: scrape_webpage, arguments: {url: http://xxx, timeout: 15}}但timeout是字符串而函数签名要求intLangGraph抛出ValidationError日志只显示“1 validation error for scrape_webpage”。解决方案重写工具装饰器捕获Pydantic ValidationError并注入上下文from functools import wraps from pydantic import ValidationError def robust_tool(func): wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except ValidationError as e: # 注入LLM原始输出上下文 error_detail fLLM output: {args[0] if args else kwargs} raise ValueError(fTool validation failed: {e}. {error_detail}) return wrapper robust_tool tool def scrape_webpage(...): ...现在错误日志变成ValueError: Tool validation failed: 1 validation error for scrape_webpage timeout Input should be a valid integer [typeint_type, input_value15, input_typestr] LLM output: {name: scrape_webpage, arguments: {url: http://xxx, timeout: 15}}根本预防在Orchestrator Agent的System Prompt中强制要求“所有数字参数必须输出为JSON number类型禁止字符串化”。5.4 问题状态快照Checkpoint体积爆炸磁盘告警现象/data/checkpoints目录单日增长12GBdu -sh * | sort -hr | head -5显示最大文件是TASK-2024-XXXXX.json大小217MB。根因分析某个节点错误地将完整HTML字符串存入state.scraping_results[notice].html而Checkpointer默认序列化整个State。解决方案字段级排除在Checkpointer配置中指定exclude_fields[scraping_results.*.html]HTML外存将HTML内容存入MinIO对象存储State中只保存minio_key自动清理CronJob每日清理7天前的快照文件。# Checkpointer配置 checkpointer SqliteSaver.from_conn_string(checkpoints.db) # 排除大字段 checkpointer.exclude_fields [ scraping_results.*.html, scraping_results.*.parsed_data, # 解析后的数据也很大 ] # HTML外存逻辑 def save_html_to_minio(html_content: str, task_id: str) - str: key fhtml/{task_id}/{uuid4()}.html minio_client.put_object( langgraph-html, key, BytesIO(html_content.encode()), len(html_content) ) return key # 节点中使用 minio_key save_html_to_minio(response.text, state.task_id) return {scraping_results: {notice: {minio_key: minio_key}}}线上实施后单个快照文件从217MB降至12KB磁盘使用率从92%降至31%。5.5 问题多Agent对同一字段产生冲突判断缺乏仲裁依据现象Aggregator Agent收到scraper_a公告正文和scraper_b候选人名单的数据发现公告日期字段不一致scraper_a解析为2024-05-20scraper_b解析为2024-05-21但不知道该信谁。解决方案引入证据权重系统每个Scraper节点在返回结果时必须附带confidence_score和evidence_pathclass ScrapingResult(BaseModel): # ...原有字段 confidence_score: float Field(ge0.0, le1.0, default0.5) evidence_path: str Field(default) # 如 div.notice-header span.date # Scraper A节点公告正文置信度更高因为日期在标题区域 def scraper_a_node(state: AgentState) - dict: html get_html(...) date parse_date_from_header(html) # 从固定位置提取 return { scraping_results: { notice: { html: html, confidence_score: 0.95, evidence_path: div.header time } } } # Scraper B节点候选人名单置信度较低因为日期需从表格中推断 def scraper_b_node(state: AgentState) - dict: html get_html(...) date infer_date_from_table(html) # 从表格行推断 return { scraping_results: { candidates: { html: html, confidence_score: 0.62, evidence_path: table tr:first-child td:nth-child(2) } } }Aggregator Agent收到后自动选择confidence_score最高的值并记录决策依据def aggregator_node(state: AgentState) - dict: results state.scraping_results # 按置信度排序 sorted_results sorted(