构建可配置技能路由框架:从硬编码到智能调度的工程实践
1. 项目概述一个技能路由器的诞生最近在折腾一些自动化流程和智能助手发现一个挺普遍的问题不同的工具、脚本或者AI模型各自都有擅长的领域但想让它们协同工作或者根据不同的输入条件自动调用最合适的那个往往需要写一堆胶水代码。比如一个处理文本摘要一个负责翻译另一个能生成图表用户发来一段话我得先判断他想干嘛再手动去调用对应的服务效率很低。这让我开始思考能不能做一个“路由器”但不是转发网络数据包的那种而是转发“技能”或“任务”的。这就是skill-router这个项目名字的由来。它本质上是一个轻量级的、可配置的技能路由框架。你可以把它想象成一个智能调度中心或者一个万能的中控台。它的核心工作很简单接收一个输入比如一段用户指令、一个API请求、或者一个事件然后根据预设的规则自动判断并路由到最匹配的“技能”Skill去执行最后将执行结果返回。这里的“技能”可以是你写的任何一段功能代码一个Python函数、一个调用外部API的封装、一个本地运行的脚本甚至是一个提示词模板驱动的AI模型调用。我之所以动手做这个是因为厌倦了在每一个新项目里重复编写if-else或switch-case这种硬编码的路由逻辑。当技能越来越多规则越来越复杂时这种代码会变得难以维护和扩展。skill-router的目标就是把路由逻辑配置化和外部化让技能的注册、匹配和执行变成一件声明式的事情从而把开发者从繁琐的流程控制中解放出来更专注于单个技能本身的实现。2. 核心设计思路与架构拆解2.1 从“硬编码”到“可配置”的范式转变传统的技能调用模式我们姑且称之为“硬编码路由”。它的代码结构通常是这样的def handle_command(user_input): if 翻译 in user_input: return translate(user_input) elif 摘要 in user_input: return summarize(user_input) elif 画图 in user_input: return draw_image(user_input) else: return 抱歉我不理解您的指令。这种方法在技能少的时候没问题但缺点非常明显紧耦合路由逻辑和业务代码混杂在一起。难以扩展每增加一个新技能都必须修改这个中心调度函数违反了开闭原则。规则僵化匹配规则写在代码里想要调整比如从关键词匹配改为意图分类模型匹配非常麻烦。缺乏动态性技能无法在运行时动态注册或下线。skill-router的设计思路就是解耦。它定义了三个核心概念技能Skill一个可执行的最小单元对外提供统一的执行接口。路由器Router负责管理技能集并根据输入和路由策略选择技能。路由策略Routing Policy决定如何根据输入选择技能的规则这部分是可插拔的。这样技能开发者只需要关心如何实现自己的功能而系统集成者则通过配置文件或API来定义“什么情况下用什么技能”。架构上它倾向于一个微内核的插件化系统。2.2 核心组件交互流程一个典型的路由流程其内部交互可以分解为以下几个步骤技能注册在系统启动或运行时各个技能模块向路由器进行注册。注册信息至少包含技能的唯一标识符ID和技能对象本身。还可以携带元数据如技能的类型、描述、所需参数模式、能力标签等这些元数据将为高级路由策略提供依据。请求接收路由器接收外部输入。这个输入通常是一个结构化的请求对象至少包含原始输入内容也可以包含上下文信息、用户标识、会话ID等。策略评估路由器将输入和所有已注册技能的元数据交给当前启用的路由策略去评估。策略的核心工作是计算每个技能对于当前输入的“匹配度”或“优先级”。最简单的策略可以是关键词匹配复杂的可以是基于向量相似度的语义匹配或者甚至调用一个轻量级AI模型进行意图分类。技能选择根据路由策略的评估结果路由器选择一个匹配度最高的技能。这里可能需要处理平局情况或者设置最低匹配度阈值低于阈值则视为无匹配技能。技能执行路由器调用被选中的技能的execute方法并将输入或解析后的参数传递给它。结果返回与后处理技能执行完毕后将结果返回给路由器。路由器可以对结果进行统一的格式化、日志记录、错误包装等后处理操作最后返回给调用方。这个流程的关键在于第3步的路由策略是完全可替换的。你可以根据业务复杂度从简单的规则引擎升级到机器学习模型而无需改动技能和路由器的核心代码。注意在设计初期就要考虑异步支持。很多技能可能是I/O密集型的如网络请求、数据库查询路由器应该具备处理异步技能的能力避免阻塞整个系统。通常的做法是统一技能接口同时支持同步的execute和异步的execute_async方法或者全部基于异步范式如asyncio来构建。3. 关键技术点实现详解3.1 技能Skill的抽象与定义如何定义一个技能是框架易用性的基础。一个良好的技能抽象应该足够简单又能满足大多数场景。我采用了面向对象的方式定义一个基础的BaseSkill抽象类。from abc import ABC, abstractmethod from typing import Any, Dict, Optional class BaseSkill(ABC): 技能基类所有具体技能必须继承此类。 id: str # 技能唯一标识如 “translator_en_to_zh” name: str # 技能可读名称如 “英译中” description: str # 技能详细描述用于路由策略理解 def __init__(self, skill_id: str, name: str, description: str ): self.id skill_id self.name name self.description description abstractmethod def execute(self, input_data: Dict[str, Any], context: Optional[Dict] None) - Dict[str, Any]: 执行技能的核心方法。 :param input_data: 输入数据字典通常包含用户原始输入或已解析的参数。 :param context: 可选上下文信息如用户会话、历史记录等。 :return: 执行结果字典应至少包含一个 ‘output‘ 字段也可包含状态码、错误信息等。 pass # 可选异步执行接口 # async def execute_async(self, input_data: Dict[str, Any], context: Optional[Dict] None) - Dict[str, Any]: # pass def get_metadata(self) - Dict[str, Any]: 获取技能的元数据用于路由匹配。 return { “id”: self.id, “name”: self.name, “description”: self.description, # 可以添加更多如输入输出模式、标签列表 “tags”: getattr(self, ‘tags‘, []), }为什么这么设计强制接口统一通过抽象基类确保所有技能都有相同的执行入口路由器可以无差别调用。输入输出标准化使用字典作为输入输出提供了最大的灵活性。技能可以约定字典内部的字段格式如input_data[“text“]代表待处理文本这是一种松散的契约。元数据支持get_metadata方法提供了技能的自描述信息这是实现基于内容的路由而不仅仅是ID路由的基础。异步预留注释掉的异步接口展示了扩展方向。在实际项目中如果确定需要高性能异步处理我会直接将execute定义为async方法。实操心得在早期版本中我尝试过更灵活的函数式注册即直接注册一个函数。但后来发现当技能需要维护内部状态如API客户端、模型加载、需要更丰富的元数据时类的方式更有优势。函数式可以作为类的一种轻量级包装核心路由器还是面向类的设计。3.2 路由器Router的核心调度逻辑路由器是大脑它的核心是维护一个技能注册表并提供一个route_and_execute方法。class SkillRouter: def __init__(self, routing_policyNone): # 技能注册表skill_id - Skill 实例 self._skills: Dict[str, BaseSkill] {} # 路由策略可插拔 self._routing_policy routing_policy or DefaultKeywordPolicy() def register_skill(self, skill: BaseSkill): 注册一个技能实例。 if skill.id in self._skills: raise ValueError(f“Skill with id ‘{skill.id}‘ already registered.“) self._skills[skill.id] skill # 通知路由策略有新的技能注册如果策略需要 self._routing_policy.on_skill_registered(skill) def route_and_execute(self, user_input: str, context: Optional[Dict] None) - Dict[str, Any]: 路由并执行的主入口。 1. 根据输入和策略选择最佳技能。 2. 调用该技能的 execute 方法。 3. 返回执行结果。 if not self._skills: return {“error”: “No skills registered.“, “output”: None} # 步骤1: 通过路由策略选择技能 selected_skill_id self._routing_policy.select_skill( user_inputuser_input, contextcontext, available_skillslist(self._skills.values()) # 传入所有可用技能信息 ) if not selected_skill_id or selected_skill_id not in self._skills: # 处理无匹配技能的情况 return self._handle_no_match(user_input, context) # 步骤2: 获取技能实例并执行 skill_to_execute self._skills[selected_skill_id] try: # 这里可以将 user_input 包装成 input_data也可以由技能自己解析 input_data {“raw_input”: user_input, “context”: context} result skill_to_execute.execute(input_data, context) result[“skill_id”] selected_skill_id # 在结果中标注是哪个技能执行的 return result except Exception as e: # 统一的错误处理 return {“error”: f“Skill execution failed: {str(e)}“, “skill_id”: selected_skill_id, “output”: None} def _handle_no_match(self, user_input, context): 无匹配技能时的处理策略可被重写。 # 默认返回一个友好提示也可以配置一个默认的“兜底”技能 return {“error”: “No matching skill found for your input.“, “output”: None}设计要点依赖注入路由策略路由器不关心具体匹配逻辑它只依赖一个实现了select_skill方法的策略对象。这使得替换路由算法变得极其简单。统一的错误处理将技能执行可能抛出的异常捕获并转化为统一的错误返回格式避免因单个技能崩溃导致整个路由进程失败。上下文传递context参数贯穿整个流程可以用于传递用户身份、会话状态等信息供策略和技能使用。3.3 路由策略Routing Policy的实现演进路由策略是框架的“智能”所在。我们可以从简单到复杂实现多种策略。3.3.1 基础关键词匹配策略这是最简单直接的策略适合技能少、指令明确的场景。class DefaultKeywordPolicy: 默认基于关键词的路由策略。 def __init__(self): # 可以预定义一些技能ID到关键词列表的映射 self._skill_keywords: Dict[str, List[str]] {} def on_skill_registered(self, skill: BaseSkill): 技能注册时可以提取其名称、描述中的关键词或由技能自行提供。 # 简单示例将技能名称作为关键词 self._skill_keywords[skill.id] [skill.name] def select_skill(self, user_input: str, context, available_skills) - Optional[str]: user_input_lower user_input.lower() for skill in available_skills: keywords self._skill_keywords.get(skill.id, []) for kw in keywords: if kw.lower() in user_input_lower: return skill.id # 找到第一个匹配的关键词就返回 return None3.3.2 基于向量相似度的语义匹配策略当技能很多或者用户输入非常口语化时关键词匹配就不够了。我们可以引入文本嵌入Embedding和向量相似度计算。# 需要安装 sentence-transformers 或类似库 # pip install sentence-transformers from sentence_transformers import SentenceTransformer import numpy as np class SemanticRoutingPolicy: 基于语义相似度的路由策略。 def __init__(self, model_name‘paraphrase-MiniLM-L6-v2‘): self.model SentenceTransformer(model_name) # 缓存技能描述的向量 self._skill_vectors: Dict[str, np.ndarray] {} self._skill_descriptions: Dict[str, str] {} def on_skill_registered(self, skill: BaseSkill): 技能注册时将其描述文本编码为向量并缓存。 description_text f“{skill.name} {skill.description}“ self._skill_descriptions[skill.id] description_text vector self.model.encode(description_text, convert_to_tensorFalse) self._skill_vectors[skill.id] vector def select_skill(self, user_input: str, context, available_skills) - Optional[str]: if not self._skill_vectors: return None # 将用户输入编码为向量 query_vector self.model.encode(user_input, convert_to_tensorFalse) best_skill_id None best_similarity -1.0 threshold 0.5 # 相似度阈值可调 for skill_id, skill_vector in self._skill_vectors.items(): # 计算余弦相似度 cos_sim np.dot(query_vector, skill_vector) / (np.linalg.norm(query_vector) * np.linalg.norm(skill_vector)) if cos_sim best_similarity and cos_sim threshold: best_similarity cos_sim best_skill_id skill_id return best_skill_id为什么选择 Sentence-BERT 模型像paraphrase-MiniLM-L6-v2这类模型在语义相似度任务上表现好且模型较小约80MB推理速度快非常适合作为轻量级路由的语义理解组件。它比单纯的关键词匹配更能理解“帮我翻译成英文”和“把这段话变成英语”是同一个意图。3.3.3 混合路由策略在实际生产中单一策略可能不够用。我们可以设计一个策略链或策略组合。例如首先用规则引擎如匹配特定命令前缀/summarize。如果规则不匹配再用语义相似度匹配。如果语义匹配分数低于阈值则触发一个澄清提问的默认技能让用户确认意图。class HybridRoutingPolicy: def __init__(self): self.rule_policy RuleBasedPolicy() self.semantic_policy SemanticRoutingPolicy() self.fallback_policy FallbackPolicy() def select_skill(self, user_input, context, available_skills): # 第一层规则匹配优先级最高 skill_id self.rule_policy.select_skill(user_input, context, available_skills) if skill_id: return skill_id # 第二层语义匹配 skill_id self.semantic_policy.select_skill(user_input, context, available_skills) if skill_id: return skill_id # 第三层兜底 return self.fallback_policy.select_skill(user_input, context, available_skills)实操心得策略的可配置化非常重要。最好的方式是通过一个配置文件如YAML来定义策略链和每个策略的参数如相似度阈值。这样调整路由行为不需要修改代码重启服务或热加载配置即可生效。4. 完整配置与使用示例让我们通过一个完整的例子把上面的部分串联起来。假设我们要构建一个简单的个人助手具备翻译、摘要和查天气三个技能。第一步定义具体技能# skills/translator.py class TranslationSkill(BaseSkill): def __init__(self): super().__init__(skill_id“translator_zh_to_en“, name“中译英“, description“将中文文本翻译成英文“) # 这里可以初始化真正的翻译API客户端 # self.client SomeTranslationAPIClient() def execute(self, input_data, contextNone): text_to_translate input_data.get(“raw_input“, “”) # 模拟翻译过程 translated_text f“[TRANSLATED TO EN]: {text_to_translate}“ return {“output”: translated_text, “status”: “success“} # skills/summarizer.py class SummarizationSkill(BaseSkill): def __init__(self): super().__init__(skill_id“summarizer“, name“文本摘要“, description“对长文本进行摘要总结“) def execute(self, input_data, contextNone): long_text input_data.get(“raw_input“, “”) summary f“[SUMMARY]: {long_text[:50]}...“ # 模拟摘要 return {“output”: summary, “status”: “success“} # skills/weather.py class WeatherSkill(BaseSkill): def __init__(self, api_key): super().__init__(skill_id“weather“, name“查询天气“, description“根据城市名称查询当前天气情况“) self.api_key api_key def execute(self, input_data, contextNone): # 这里可以解析城市名例如从输入中提取“北京”或通过上下文获取 city “Beijing“ # 简化处理 # 模拟调用天气API weather_info f“Weather in {city}: Sunny, 25°C“ return {“output”: weather_info, “status”: “success“}第二步配置与组装路由器# app.py from skill_router import SkillRouter, SemanticRoutingPolicy from skills.translator import TranslationSkill from skills.summarizer import SummarizationSkill from skills.weather import WeatherSkill def main(): # 1. 初始化路由器并指定使用语义路由策略 router SkillRouter(routing_policySemanticRoutingPolicy()) # 2. 创建并注册技能 translator TranslationSkill() summarizer SummarizationSkill() weather WeatherSkill(api_key“your_api_key“) router.register_skill(translator) router.register_skill(summarizer) router.register_skill(weather) # 3. 模拟用户请求 test_inputs [ “把这句话翻译成英文今天天气真好“, “请帮我总结一下这篇长文章的主要内容“, “北京今天天气怎么样“, “讲个笑话“ # 这个没有匹配技能 ] for user_input in test_inputs: print(f“\n输入: {user_input}“) result router.route_and_execute(user_input) print(f“结果: {result}“) if __name__ “__main__“: main()第三步预期输出运行上面的程序你可能会得到类似下面的输出语义相似度匹配的结果会根据模型和描述略有不同输入: 把这句话翻译成英文今天天气真好 结果: {‘output‘: ‘[TRANSLATED TO EN]: 把这句话翻译成英文今天天气真好‘, ‘status‘: ‘success‘, ‘skill_id‘: ‘translator_zh_to_en‘} 输入: 请帮我总结一下这篇长文章的主要内容 结果: {‘output‘: ‘[SUMMARY]: 请帮我总结一下这篇长文章的主要内容...‘, ‘status‘: ‘success‘, ‘skill_id‘: ‘summarizer‘} 输入: 北京今天天气怎么样 结果: {‘output‘: ‘Weather in Beijing: Sunny, 25°C‘, ‘status‘: ‘success‘, ‘skill_id‘: ‘weather‘} 输入: 讲个笑话 结果: {‘error‘: ‘No matching skill found for your input.‘, ‘output‘: None}可以看到路由器成功地将不同的用户指令路由到了对应的技能。对于不支持的指令也返回了清晰的错误信息。5. 高级特性与生产级考量一个基础的skill-router框架跑起来后要用于实际项目还需要考虑很多增强特性。5.1 技能的热注册与发现在微服务或长期运行的应用中我们可能希望技能可以动态地注册和注销而无需重启路由器。这可以通过在路由器中暴露管理API来实现。class SkillRouter: # ... 其他代码同上 ... def register_skill_dynamic(self, skill: BaseSkill): 动态注册技能线程安全版本。 with self._registration_lock: # 需要引入线程锁 self.register_skill(skill) def unregister_skill(self, skill_id: str): 注销技能。 with self._registration_lock: if skill_id in self._skills: del self._skills[skill_id] # 同样需要通知路由策略更新 self._routing_policy.on_skill_unregistered(skill_id) def get_available_skills(self) - List[Dict]: 获取当前所有可用技能的信息列表。 return [skill.get_metadata() for skill in self._skills.values()]更进一步可以实现基于服务发现如Consul, Etcd或消息队列的技能自动发现。技能服务在启动时将自己的信息注册到中心注册中心路由器定期从注册中心拉取或监听变更事件自动更新本地的技能注册表。5.2 输入预处理与参数解析目前的例子中技能直接拿到了原始输入raw_input。但在复杂场景下技能可能需要结构化的参数。我们可以在路由器和技能之间增加一个输入处理器Input Processor层。class InputProcessor: 负责将原始输入解析为结构化数据。 def process(self, user_input: str, context: Dict) - Dict[str, Any]: 解析输入。可以包含 - 意图识别可复用路由策略 - 实体抽取如日期、城市名、金额 - 参数标准化 processed_data {“raw_input”: user_input} # 示例简单提取可能存在的城市名非常简单的规则 if “天气” in user_input: # 这里应该用更健壮的NLP方法如正则或模型 for city in [“北京“, “上海“, “广州“]: if city in user_input: processed_data[“city”] city break return processed_data # 在路由器中使用 class SkillRouter: def __init__(self, routing_policyNone, input_processorNone): self._input_processor input_processor or InputProcessor() # ... 其他初始化 ... def route_and_execute(self, user_input, contextNone): # 先预处理输入 structured_input self._input_processor.process(user_input, context or {}) # 将预处理后的数据也作为上下文的一部分供路由策略使用 enhanced_context {**(context or {}), “processed_input”: structured_input} # 路由策略现在可以基于 processed_input 做更精准的判断 selected_skill_id self._routing_policy.select_skill( user_inputuser_input, contextenhanced_context, available_skills... ) # ... 后续执行 ...这样WeatherSkill在执行时就可以直接从input_data[“city”]获取城市参数而不需要自己再去解析原始字符串。5.3 执行链路、中间件与可观测性对于生产环境我们需要监控路由器的健康度和性能。中间件Middleware可以在技能执行前后插入钩子函数用于日志记录、性能监控、权限校验、限流等。这类似于Web框架的中间件机制。class LoggingMiddleware: def pre_execute(self, skill_id, input_data): print(f“[{datetime.now()}] Executing skill: {skill_id} with input: {input_data}“) def post_execute(self, skill_id, result, duration): print(f“[{datetime.now()}] Skill {skill_id} finished in {duration:.2f}s, result: {result}“)链路追踪Tracing为每个请求生成唯一ID并在路由、预处理、技能执行的整个链路中传递这个ID方便在分布式系统中追踪一个请求的完整生命周期。指标收集Metrics收集诸如请求量、路由耗时、技能执行耗时、匹配成功率、错误率等指标并导出到Prometheus等监控系统。5.4 配置化与持久化将路由策略、技能参数、中间件配置等全部外置到配置文件如YAML, JSON中。# config.yaml router: policy: class: “skill_router.policy.HybridRoutingPolicy“ params: rule_first: true semantic_threshold: 0.6 input_processor: “skill_router.processor.BasicProcessor“ middlewares: - “skill_router.middleware.LoggingMiddleware“ - “skill_router.middleware.MetricsMiddleware“ skills: - id: “translator_zh_to_en“ class: “my_skills.TranslationSkill“ params: api_key: ${TRANSLATOR_API_KEY} # 支持环境变量 - id: “weather“ class: “my_skills.WeatherSkill“ params: api_key: ${WEATHER_API_KEY} default_city: “Beijing“应用启动时从配置文件加载并动态创建路由器实例。这带来了极大的灵活性和可维护性。6. 常见问题、排查技巧与优化方向在实际开发和部署skill-router的过程中我遇到了不少坑也总结了一些优化经验。6.1 匹配冲突与优先级问题问题两个技能的描述非常相似或者用户输入同时匹配了多个规则导致路由不稳定。解决方案设置优先级为每个技能定义一个静态优先级字段。在路由策略计算匹配度后如果分数接近比如差值小于0.1则优先选择优先级高的技能。技能互斥标签为技能打上标签并在策略中实现基于标签的过滤。例如给“翻译英文”和“翻译法文”技能都打上translation标签但设置互斥规则确保一次只匹配一个翻译类技能。用户确认当匹配度前两名的技能分数都很高且接近时可以设计一个“澄清”技能将候选技能列表返回给用户让用户选择。6.2 语义路由策略性能瓶颈问题当技能数量达到数百上千时对每个输入都计算与所有技能描述的向量相似度耗时可能成为瓶颈。优化方案向量索引使用专业的向量数据库如FAISS, Milvus, Qdrant或支持向量搜索的关系型数据库如PgVector。在技能注册时将描述向量存入索引。查询时通过近似最近邻搜索快速找到Top-K个最相似的技能而不是全量计算。这对于大规模技能库是质的提升。两阶段路由先用非常快速的规则或关键词过滤掉绝大部分不相关的技能得到一个较小的候选集再对这个候选集进行精细的语义匹配。缓存对常见的、标准的用户输入及其路由结果进行缓存。可以在路由器层面加一个LRU缓存键是用户输入的哈希值是最佳技能ID。6.3 技能执行超时与故障隔离问题某个技能执行时间过长或崩溃不应阻塞整个路由器或影响其他请求。解决方案超时控制为skill.execute()方法设置超时。在Python中可以使用concurrent.futures的ThreadPoolExecutor或asyncio.wait_for来实现。import concurrent.futures def execute_with_timeout(skill, input_data, timeout5.0): with concurrent.futures.ThreadPoolExecutor() as executor: future executor.submit(skill.execute, input_data) try: return future.result(timeouttimeout) except concurrent.futures.TimeoutError: return {“error”: “Skill execution timeout“, “skill_id”: skill.id}熔断器模式为每个技能维护一个熔断器如pybreaker。当技能连续失败多次熔断器“跳闸”短时间内对该技能的所有请求直接失败返回不再真正调用给下游服务恢复的时间。定期允许一个试探请求如果成功则关闭熔断器。6.4 技能版本管理与灰度发布问题如何对某个技能进行升级并控制新版本只对部分流量生效解决方案在技能注册时带上版本号。路由器可以支持基于版本的路由策略或者通过上下文信息如用户ID、实验标签来决定将请求路由到新版本还是旧版本技能。这需要将技能ID细化为skill_id:version的格式并在路由策略中做相应处理。6.5 调试与日志问题路由决策像个黑盒出了问题不好排查为什么选择了A技能而不是B技能。解决方案详细日志在路由策略的select_skill方法中记录下所有候选技能的匹配分数。在调试模式或特定请求头触发下将这些信息返回给客户端或记录到结构化日志中。可视化工具可以开发一个简单的管理界面输入一段文本实时展示路由器内部的计算过程预处理结果、各技能匹配度分数、最终选择结果等。这对于算法策略调优至关重要。skill-router从一个简单的想法逐步演进为一个考虑生产级需求的可扩展框架。它的核心价值在于分离了“做什么”技能实现和“谁来做”路由决策并通过可插拔的策略和丰富的扩展点适应从简单脚本到复杂智能助手的各种场景。在实现过程中从基础的功能抽象到性能优化再到可观测性和可维护性每一步都需要结合具体的业务需求来权衡和设计。