【Day6】vllm 一条请求的生命周期 2
1. 今日目标same with day5以一条请求的生命周期为切入点找到经典设计的代码入口。行业共识主要是三个设计Continuous batching连续批处理KV cache以存代算Memory-aware Scheduling内存感知调度根据 Day5 概述今日先学习这两步HTTP Request(OpenAI Protocol)│ ▼ ① API Layer(FastAPIOpenAI Serving)│ ▼ ② AsyncLLM Engine(Request Enqueuing)2. Phase1进入 entrypointsEntrypoints 这个文件夹下其实就是 API Layer (请求入口) 它的目的是将外部世界的请求或者命令等转换成内部 engine 调用。这一层会对接不同的服务比如 openai、anthropic 等。2.1 entrypoints/openai兼容 openai 的核心文件vllm/entrypoints/openai/api_server.py - FastAPI 应用入口vllm/entrypoints/openai/chat_completion/serving.py - OpenAIServingChat 类处理 openai 格式的请求vllm/entrypoints/openai/chat_completion/api_router.py - 路由处理什么是 openAI 协议其实就是 HTTP JSON请求格式长得像 OpenAI API。某种程度上就是 openAI 的影响力定义了规则。形如这种POST /v1/chat/completions { model: Qwen/Qwen2.5-3B-Instruct, messages: [ {role: user, content: 你好} ] }2.2 兼容 Anthropic-style APIvllm/vllm/entrypoints/anthropic2.3 entrypoints/cli命令行入口。像下面这些命令vllm serve vllm bench serve vllm bench throughput // 前面看的跑 benchmark vllm chat vllm complete vllm run-batch都会从 entrypoints/cli 这里进。2.4 处理流程以 openai 为例的处理流程HTTP Request (POST /v1/chat/completions)↓api_server.py: build_app() 创建 FastAPI 应用↓api_router.py: create_chat_completion()↓serving.py: openai_serving.render_chat() → Engine Inputs↓engine_client.generate(prompt, sampling_params) → 异步提交请求可以看下 create_chat_completion 代码with_cancellation load_aware_call async def create_chat_completion(request: ChatCompletionRequest, raw_request: Request): metrics_header_format raw_request.headers.get( ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL, ) handler chat(raw_request) if handler is None: raise NotImplementedError(The model does not support Chat Completions API) generator await handler.create_chat_completion(request, raw_request) // 这里 handler 就是一个 OpenAIServingChat 对象 if isinstance(generator, ErrorResponse): return JSONResponse( contentgenerator.model_dump(), status_codegenerator.error.code ) elif isinstance(generator, ChatCompletionResponse): return JSONResponse( contentgenerator.model_dump(), headersmetrics_header(metrics_header_format), ) return StreamingResponse(contentgenerator, media_typetext/event-stream)具体到 openai 的 create_chat_completion 实现把 openai 格式的输入处理成 vllm engine 识别的格式代码路径result await self.render_chat_request(request) 对 conversation, engine_inputs result 进行枚举然后交给 engine generator self.engine_client.generate(engine_inputs, XX) // 这里返回的是一个异步生成器后面流式/非流式都围绕着他消费。遗留问题1、注意到 create_chat_completion 函数里面有一个 logger可以后续进行观察传递给 engine 的参数self._log_inputs( sub_request_id, engine_input, // 模型输入已经由 chat messages 渲染而来 paramssampling_params, // 控制生成行为比如 temperature、top_p、max_tokens、stop 等。 lora_requestlora_request, )2、观察 # stream/full generator 内部看首 token、总生成时间if request.stream: return self.chat_completion_stream_generator(3. Phase2: 进入 AsyncLLM Engine接上面的例子engine_client 就是一个 class EngineClient 类型。EngineClient(ABC) 只是个抽象基类实际功能实现 class AsyncLLM(EngineClient)。这一步就是把请求放进输入队列中等待 EngineCorescheduler 主循环取走scheduler 的核心设计就是决定如何取。3.1 基础流程代码路径这里做了版本管理vllm/enginevllm-main/vllm/v1/engine核心文件 /data1/lixizhang/AI/vllm-main/vllm/v1/engine/async_llm.pyclass AsyncLLM: async def add_request( self, request_id: str, prompt_token_ids: list[int], params: SamplingParams, ) - None: # 1. InputProcessor 处理输入 # 2. 添加到 EngineCoreRequest # 3. 通过 IPC 发送到 EngineCore async def generate( self, request: Union[str, list[int], PromptType], sampling_params: SamplingParams None, request_id: str None, multi_modal_data: MultiModalData None, ) - AsyncIterator[RequestOutput]: # 流式输出的生成器generate 函数核心1、input_processor把外部请求加工成 EngineCoreRequest输出 EngineCoreRequest 2、queue / client把 EngineCoreRequest 发送到 EngineCore 进程 3、scheduler loopEngineCore 取请求scheduler.schedule() 组 batch 4、output_processor把 EngineCoreOutputs 还原成 RequestOutput / streaming 输出每个请求有自己的输出队列/stream注意这代码里面有个控制逻辑is_pooling 这个是算向量一次前项传播就结束了处理更简单。我们关注生成模型。另外还有个控制关于生成多个回答的用的一个 parent 收集多 child 的输出每个 child 的输出都会被加入到 queue 中也都会被作为输入加入到引擎中。# Add the EngineCoreRequest to EngineCore (separate process). await self.engine_core.add_request_async(request)这里的 engine_core在 engine/core_client.py 下# EngineCore (starts the engine in background process). self.engine_core EngineCoreClient.make_async_mp_client( vllm_configvllm_config, executor_classexecutor_class, log_statsself.log_stats, client_addressesclient_addresses, client_countclient_count, client_indexclient_index, )把 EngineCoreRequest 发送到 EngineCore 走的是 IPCInter-process Communication进程间通信。API 层和 EngineCore调度推理是两个进程。为什么要区分成两个进程主要有这些原因1稳定性如果推理崩了只挂 EngineCoreAPI server 还能活2并发 调度EngineCore 专门做 batching、scheduling、KV cache 管理3多 GPU / 多节点EngineCore 可以分布式、多 worker3.2 IPC 模块常见的 IPC 有multiprocessing Queueshared memorypipesocketvllm 选择的是 zero message queueZMQ 序列化msgpack比较适合这种流式的处理因为 EngineCore 是一边算一边把 token 又推回来的。3.3 并行 EngineCore这里其实有三个类AsyncMPClient 所有 api 的请求就对接到同一个 EngineCoreDPAsyncMPClient Data Parallel EngineCore, 外部决定发给哪个 EngineCorechosen_engine self.get_core_engine_for_request(request)DPLBAsyncMPClient LBload balancing内部自己决定发送给哪个 coredef make_async_mp_client( vllm_config: VllmConfig, executor_class: type[Executor], log_stats: bool, client_addresses: dict[str, str] | None None, client_count: int 1, client_index: int 0, ) - AsyncMPClient: parallel_config vllm_config.parallel_config client_args ( vllm_config, executor_class, log_stats, client_addresses, client_count, client_index, ) if parallel_config.data_parallel_size 1: if parallel_config.data_parallel_external_lb: # External load balancer - client per DP rank. return DPAsyncMPClient(*client_args) # Internal load balancer - client balances to all DP ranks. return DPLBAsyncMPClient(*client_args) return AsyncMPClient(*client_args)我的疑问是为什么要起多个 EngineCore 进程ChatGptData Parallel模型复制 N 份每个 GPU 一份。每个 engine 独立的 GPU独立的 KV-cache独立的 scheduler不会相互影响一个 engine 崩了其他不会蹦有容错。这里先持保留意见感觉大有学问后续再来研究Mark。如何配置1、 启动参数可以指定python -m vllm.entrypoints.openai.api_server \ --model xxx \ --tensor-parallel-size 1 \ --pipeline-parallel-size 1 \ --data-parallel-size 2data-parallel-size 2就会起 2 个 EngineCore2、默认–data-parallel-size GPU 数量3.4 load balance 设计DPLBAsyncMPClient 具体是怎么算“最空闲”的源码级分析先略。