构建统一LLM API调用层:从适配器模式到工厂模式实战
1. 项目缘起一次统一调用的探索最近在做一个需要集成多种大语言模型LLM的智能应用需求很简单根据不同的场景、成本或性能要求动态切换调用不同的模型服务。一开始我天真地为每个模型写了一套独立的调用代码——OpenAI一套Anthropic一套Google一套还有本地部署的模型又是一套。很快代码就变得臃肿不堪维护起来简直是噩梦。每次加个新功能比如流式输出或者函数调用都得在四五个地方重复修改。这让我意识到必须得有一个统一的抽象层用同一套代码去调用不同的LLM API。这个想法听起来简单做起来却处处是坑。每个厂商的API设计哲学、参数命名、返回格式、错误处理乃至计费方式都大相径庭。把“调用模型”这个动作抽象出来背后是对不同服务商技术栈、商业模式和设计理念的一次深度碰撞。我花了相当一段时间从接口设计、错误处理到成本优化踩遍了能踩的坑最终打磨出了一套相对健壮的方案。今天就把这段从混乱到统一的实战经历以及从中提炼出的核心洞察分享给你。无论你是想构建一个多模型的路由网关还是单纯希望自己的应用不那么容易被某个供应商“绑定”这些经验都能让你少走弯路。2. 核心挑战四大API的“方言”差异想要用同一套代码说话首先得听懂各家都在说什么。这不仅仅是换一个api_key和endpoint那么简单而是深入到设计细节的“方言”转换。2.1 消息格式的“巴别塔”最直观的差异就在消息格式上。OpenAI的ChatCompletion API采用了一个清晰的角色role系统通常是system,user,assistant。而Anthropic的Claude则使用了独特的Human:和Assistant:前缀文本块格式。Google的Gemini虽然也类似OpenAI但其content字段的结构化程度更高可以内嵌多模态数据。至于一些开源模型的本地API比如通过vLLM或TGI部署的它们可能遵循OpenAI的格式也可能有自己的一套。我的解决方案是在内部定义一个标准消息格式。我选择以OpenAI的格式为蓝本进行扩展因为它相对通用。内部统一使用{role: ‘system’|‘user’|‘assistant’, content: string}的数组。在调用具体API前通过一个“适配器”Adapter层将这个内部格式转换成目标API所需的格式。例如对于Anthropic适配器需要将消息数组拼接成特定的文本格式# 内部格式 messages [ {“role”: “system”, “content”: “你是一个助手”}, {“role”: “user”, “content”: “你好”} ] # 转换为Anthropic格式简化示意 anthropic_prompt “\n\nHuman: 你是一个助手\n\nHuman: 你好\n\nAssistant:”这个过程需要特别注意system提示词的处理因为不同API对system角色的支持方式和位置不同有的放在独立参数有的需要融入对话历史。2.2 参数映射的“迷阵”即使是最常见的参数如温度temperature和最大生成长度max_tokens在不同API中的名称和取值范围也可能不同。参数含义OpenAIAnthropicGoogle Gemini通用本地API (OpenAI兼容)随机性控制temperature(0-2)temperature(0-1)temperature(0-2)temperature(0-2)确定性采样top_p(0-1)top_p(0-1)top_p(0-1)top_p(0-1)最大生成长度max_tokensmax_tokens_to_samplemax_output_tokensmax_tokens停止序列stop(字符串或数组)stop_sequences(数组)stop_sequences(数组)stop(字符串或数组)注意temperature的取值范围需要特别留意。Anthropic通常建议保持在0到1之间超过1可能导致不可预测的输出。而OpenAI和Gemini的文档显示支持到2。在实际使用中除非有特殊需求否则建议将温度值统一限制在0到1的范围内进行传递以确保跨平台行为的一致性。此外一些高级功能参数差异更大。比如OpenAI的function calling函数调用在Anthropic中对应的是tools工具参数其JSON结构定义方式也不同。流式输出Streaming虽然各家都支持但数据块chunk的格式和解析方式千差万别有的返回纯文本片段有的返回完整的增量JSON对象。2.3 错误处理与速率限制的“丛林法则”这是最让人头疼的部分。每家API的错误码status code、错误信息格式和速率限制Rate Limiting策略都自成体系。OpenAI错误码相对标准如429表示速率限制401表示鉴权失败错误信息JSON结构固定通常包含error字段里面有code和message。它的速率限制通常以RPM每分钟请求数和TPM每分钟令牌数来计量。Anthropic也有429、401等标准错误码但错误信息结构不同。其速率限制主要关注的是“请求数”对令牌数的限制不如OpenAI那么突出但在计费上体现。Google Gemini错误码遵循Google Cloud API的通用规范可能包含更复杂的status字段如RESOURCE_EXHAUSTED对应429。速率限制策略也与Google Cloud项目配额深度绑定。本地API错误处理最不可预测完全取决于部署方式。可能返回标准HTTP错误也可能返回一个包含错误信息的JSON响应体甚至直接抛出连接异常。统一错误处理的关键在于异常封装。我创建了一个统一的LLMAPIError异常类内部捕获所有供应商特定的异常如openai.error.RateLimitError,anthropic.APIConnectionError然后将其转换为内部定义的错误类型和用户友好的消息。同时必须为每个供应商实现独立的退避重试Backoff Retry逻辑因为它们的速率限制阈值和重置时间窗口各不相同。3. 架构设计构建健壮的统一抽象层面对这些差异一个清晰的架构是成功的关键。我的目标是应用层代码只与一个统一的“客户端”交互完全感知不到底层是哪个模型。3.1 核心接口设计我定义了一个核心的LLMProvider抽象基类ABC它规定了所有模型提供商必须实现的方法。from abc import ABC, abstractmethod from typing import List, Dict, Any, AsyncIterator, Optional class LLMProvider(ABC): 大语言模型提供商抽象基类 abstractmethod async def create_completion( self, messages: List[Dict[str, str]], # 标准消息格式 model: str, # 模型标识符如 “gpt-4”, “claude-3-opus” **kwargs # 其他标准化参数温度、最大令牌数等 ) - Dict[str, Any]: 创建非流式补全 pass abstractmethod async def create_completion_stream( self, messages: List[Dict[str, str]], model: str, **kwargs ) - AsyncIterator[str]: 创建流式补全返回一个异步迭代器产出文本块 pass abstractmethod def calculate_cost(self, model: str, prompt_tokens: int, completion_tokens: int) - float: 根据使用量计算成本美元 pass这个接口非常精简只关注最核心的“补全”功能。**kwargs用于传递那些已经过内部适配器统一化的参数如temperature、max_tokens。3.2 具体提供商实现接下来为每个供应商创建具体的实现类如OpenAIProvider、AnthropicProvider、GeminiProvider、LocalProvider。每个实现类内部都包含两个关键组件参数适配器负责将内部标准参数如max_tokens映射到该供应商API的实际参数名如max_tokens_to_sample并进行必要的值域转换或格式化。响应解析器负责解析供应商返回的原始响应无论是JSON还是流式数据块都将其转换为内部标准格式。对于流式响应解析器需要从复杂的数据块中提取出纯文本增量。以AnthropicProvider的流式处理为例Claude的流式响应是一系列SSEServer-Sent Events事件每个事件是一个JSON对象其中可能包含completion字段的增量。解析器需要持续读取这些事件拼接出完整的回复并处理好可能出现的error事件。3.3 工厂模式与动态路由为了让应用层方便地获取所需的提供商实例我使用了工厂模式。一个ProviderFactory根据配置或传入的模型名称字符串返回对应的LLMProvider实例。class ProviderFactory: _providers {} classmethod def register_provider(cls, name: str, provider_class): cls._providers[name] provider_class classmethod def get_provider(cls, model_identifier: str, api_key: str, base_url: Optional[str] None) - LLMProvider: # 根据模型标识符判断提供商例如 “gpt-”开头 - OpenAI, “claude-” - Anthropic if model_identifier.startswith(“gpt-”): provider_class cls._providers[“openai”] elif model_identifier.startswith(“claude-”): provider_class cls._providers[“anthropic”] elif model_identifier.startswith(“gemini-”): provider_class cls._providers[“gemini”] else: # 默认为本地或自定义OpenAI兼容端点 provider_class cls._providers[“local”] return provider_class(api_keyapi_key, base_urlbase_url, default_modelmodel_identifier) # 注册提供商 ProviderFactory.register_provider(“openai”, OpenAIProvider) ProviderFactory.register_provider(“anthropic”, AnthropicProvider) ...这样在业务代码中你只需要这样调用async def ask_llm(question: str, model: str “gpt-4”): provider ProviderFactory.get_provider(model, api_keyos.getenv(f”{model.upper()}_API_KEY”)) messages [{“role”: “user”, “content”: question}] response await provider.create_completion(messagesmessages, modelmodel, temperature0.7) return response[“choices”][0][“message”][“content”]业务逻辑完全与具体的API解耦。如果想切换到Claude只需改变model参数为”claude-3-sonnet”工厂会自动选择正确的提供商。4. 高级功能与边缘案例处理统一了基础调用更复杂的挑战在于高级功能。这些功能往往各家实现差异最大也最能体现抽象层设计的优劣。4.1 流式输出的统一流式输出对于提升用户体验至关重要但如前所述数据格式五花八门。我的策略是在create_completion_stream方法内部将不同供应商的流式响应统一转换为一个异步迭代器Async Iterator每次迭代只产出纯文本字符串片段。# 在 OpenAIProvider 内部 async def create_completion_stream(self, messages, model, **kwargs): # 调用OpenAI原生流式API stream await self.async_client.chat.completions.create( modelmodel, messagesmessages, # 已适配过的消息 streamTrue, **self._adapt_params(kwargs) # 参数适配 ) async for chunk in stream: if chunk.choices[0].delta.content is not None: yield chunk.choices[0].delta.content # 产出文本片段 # 在 AnthropicProvider 内部 async def create_completion_stream(self, messages, model, **kwargs): # 将消息转换为Anthropic格式 prompt self._message_adapter.to_anthropic_prompt(messages) # 调用Anthropic流式API with self.client.messages.stream( max_tokenskwargs.get(“max_tokens”, 1024), messages…, # Anthropic有新的消息格式API modelmodel, **self._adapt_params(kwargs) ) as stream: async for chunk in stream: if chunk.type “content_block_delta”: yield chunk.delta.text # 产出文本片段这样上层应用消费流式响应的代码是完全一致的provider get_provider(“gpt-4”) async for text_chunk in provider.create_completion_stream(messages, “gpt-4”): print(text_chunk, end“”, flushTrue) # 逐块打印4.2 函数调用/工具使用的抽象这是一个高阶特性。OpenAI将其称为function callingAnthropic和Gemini则称为tools。它们的共同点是让模型根据用户需求决定是否以及如何调用外部工具函数并返回结构化的调用参数。我的抽象方式是在内部定义一套工具描述格式。这个格式尽可能通用包含工具名称、描述、参数JSON Schema等。在调用时通过适配器将其转换为供应商特定的格式。更重要的是处理模型的响应模型可能返回一个表示“希望调用工具”的特殊结构。我的统一响应解析器需要识别这种结构并将其标准化为内部格式例如一个包含tool_call_id、tool_name和arguments的对象。然后由应用层执行对应的函数并将执行结果以特定格式如tool_call_id加结果放回对话历史再次调用模型。这个过程非常繁琐但抽象后应用层只需要定义工具和执行工具的逻辑无需关心底层是OpenAI的function还是Anthropic的tool_use。4.3 上下文长度与令牌计算的迷雾不同模型的上下文窗口Context Window大小不同计价方式也不同有的按输入输出总令牌数有的分开计。在统一层中集成一个令牌计数器Token Counter是很有必要的。然而并非所有供应商都公开其分词器Tokenizer。策略1精确对于提供官方分词器的如OpenAI的tiktoken Anthropic的sentencepiece在对应Provider内部使用其分词器进行精确计数。策略2估算对于不提供或难以集成的使用一个通用的估算方法如平均1个token≈4个英文字符或0.75个中文字符。在LocalProvider中如果后端是开源模型可以尝试加载对应的Hugging Face分词器。我实现了一个TokenUsage类在每次调用后尽可能准确地记录prompt_tokens和completion_tokens并调用Provider的calculate_cost方法进行成本估算。这对于多模型负载均衡和成本监控至关重要。5. 实战踩坑与性能优化心得纸上得来终觉浅绝知此事要躬行。下面分享几个在实战中遇到的典型问题和优化技巧。5.1 超时与网络不稳定的应对不同API的响应速度差异巨大。本地部署的模型可能很快而云服务受网络波动影响。必须为每个Provider设置合理的、可能不同的超时Timeout参数。OpenAI的复杂任务可能需要30秒以上而简单的本地模型调用5秒不响应就可能出问题了。我的做法是在Provider的配置中允许自定义超时并为流式响应设置更长的超时或根本不做超时限制依赖心跳机制。对于网络抖动除了重试还要考虑幂等性Idempotency。特别是对于非流式、非确定性的请求temperature 0简单的重试可能导致重复消费和计费。部分API如OpenAI支持传递idempotency_key这是一个很好的实践应在抽象层中支持。5.2 成本监控与限流当你的应用可以自由调用多个模型时成本控制就成了必须项。我在抽象层中集成了一个简单的成本监控装饰器。它包裹create_completion等方法在每次调用成功后通过calculate_cost方法计算花费并累加到全局或用户级别的统计中。可以设置阈值当某个模型或用户的当日消耗接近预算时自动降级到更便宜的模型或直接拒绝请求。def cost_monitor(func): async def wrapper(provider, *args, **kwargs): start_time time.time() try: result await func(provider, *args, **kwargs) # 从result中提取token使用量需各Provider统一返回格式 prompt_tokens result[“usage”][“prompt_tokens”] completion_tokens result[“usage”][“completion_tokens”] cost provider.calculate_cost(kwargs.get(“model”), prompt_tokens, completion_tokens) # 记录到监控系统 CostTracker.record(provider.name, cost, time.time() - start_time) return result except Exception as e: # 记录失败请求可能也有成本如输入令牌 CostTracker.record_failure(provider.name) raise e return wrapper5.3 模型降级与故障转移这是统一API层最大的价值之一——构建韧性。我实现了一个简单的FallbackProvider。它内部维护一个模型优先级列表例如[“gpt-4”, “claude-3-sonnet”, “gpt-3.5-turbo”]。当调用主模型失败超时、达到速率限制、返回特定错误时自动、无缝地降级到列表中的下一个模型进行重试。这对于保证SLA服务等级协议非常有用。class FallbackProvider(LLMProvider): def __init__(self, provider_factory, model_fallback_chain: List[str], **global_kwargs): self.factory provider_factory self.chain model_fallback_chain self.global_kwargs global_kwargs async def create_completion(self, messages, modelNone, **kwargs): # 如果指定了model则以其为链头否则使用默认链 chain [model] self.chain if model else self.chain last_exception None for current_model in chain: try: provider self.factory.get_provider(current_model, …) # 合并参数 merged_kwargs {**self.global_kwargs, **kwargs} return await provider.create_completion(messages, current_model, **merged_kwargs) except (RateLimitError, APITimeoutError, APIConnectionError) as e: logging.warning(f”Model {current_model} failed: {e}. Falling back to next.”) last_exception e continue # 所有模型都失败 raise AllModelsFailedError(“All models in the fallback chain failed.”) from last_exception5.4 测试策略模拟与契约测试为这样一个多后端的系统编写测试颇具挑战。我的测试金字塔如下单元测试针对每个Provider内部的适配器和解析器使用模拟的API响应数据进行测试。集成测试谨慎使用在测试环境中使用真实的API密钥调用各供应商的测试端点或低功耗模型如gpt-3.5-turbo-instruct,claude-instant-1.2验证整个调用链路。这类测试需要网络且可能产生费用应放在CI/CD的特定阶段或手动触发。契约测试这是最有效的方法。为LLMProvider抽象接口定义“契约”——即给定标准的输入应产生何种格式的输出。然后为每个具体Provider运行测试验证其是否符合契约。这能确保无论底层如何变化上层应用的行为是一致的。可以使用pytest的插件来方便地管理这类契约测试。6. 总结与展望构建一个统一的多LLM API调用层初期投入的工作量不小但长期来看它带来的收益是巨大的解耦、韧性、成本优化和可观测性。你的应用不再被某个供应商锁定可以在不同模型间灵活切换利用各家优势。当某个服务出现故障或限流时可以自动故障转移保障服务连续性。统一的日志、监控和成本计算也让管理变得清晰。经过这次实践我深刻体会到在AI应用开发中基础设施的抽象能力正变得越来越重要。随着模型生态的进一步繁荣类似langchain、litellm这样的开源库也在解决同样的问题。我的这套自定义方案更适合对控制力要求高、有特定定制化需求的场景。如果你刚开始接触不妨先使用这些成熟库理解其设计思路。当你遇到瓶颈时再回过头来自己造轮子你会对如何与这些强大的“智能体”高效、稳定地对话有更深刻的理解。最后一个小建议无论用哪种方式请务必为你的抽象层编写清晰的文档特别是记录下每个支持模型的特有参数、限制和成本结构。这将是你和你的团队在未来最宝贵的财富。