源码深度解析
2026-03-31 | vLLM High-Throughput Inference
第一部分:基础架构
第二部分:核心实现
第三部分:服务层
第四部分:进阶优化
vLLM 是一个快速、易于使用的 LLM 推理和 Serving 库,由 UC Berkeley Sky Computing Lab 开发,现已成为社区驱动的项目。
核心优势
关键技术
| 类别 | 特性 | 优势 |
|---|---|---|
| 高性能 | 最先进的推理吞吐量 | 相比 HuggingFace Transformers 2-3 倍提升 |
| 内存管理 | PagedAttention | 大幅减少内存占用,支持批处理 |
| 批处理 | 连续批处理 | 动态合并请求,提升 GPU 利用率 |
| 量化 | GPTQ/AWQ/INT4/INT8/FP8 | 在保持质量的同时减少内存使用 |
┌─────────────────────────────────────────────────────────┐
│ vLLM Architecture │
├─────────────────────────────────────────────────────────┤
│ 用户接口层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ LLM 类 │ │ API 服务 │ │ API 服务 │ │
│ │ │ │ (HTTP) │ │ (OpenAI) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────┤
│ 引擎层 │
│ ┌─────────────────────────────────────────────────────┐│
│ │ LLM Engine ││
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ 调度器 │ │ Worker │ │ Worker │ ││
│ │ │ Scheduler │ │ (GPU) │ │ (GPU) │ ││
│ │ └─────────────┘ └─────────────┘ └─────────────┘ ││
│ └─────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────┤
│ 核心实现层 │
│ ┌─────────────────────────────────────────────────────┐│
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │PagedAttention││ KV Cache │ │ 采样策略 │ ││
│ │ │ ││ 管理 │ │ │ ││
│ │ └─────────────┘ └─────────────┘ └─────────────┘ ││
│ └─────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────┤
│ 硬件抽象层 │
│ ┌─────────────────────────────────────────────────────┐│
│ │ CUDA/HIP │ PyTorch │ FlashAttention │ ││
│ │ 优化 │ 框架 │ 集成 │ ││
│ └─────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────┘
2022年:UC Berkeley Sky Computing Lab 启动 vLLM 项目
2023年:发布 PagedAttention 技术,论文入选 ACM SIGOPS
2024年:加入社区贡献,支持更多模型和硬件
2025-2026年:持续优化,成为 LLM 推理事实标准
关键突破:PagedAttention 解决了传统连续批处理的内存碎片问题
引擎层 (Engine)
内存管理
服务层
优化层
# vllm/entrypoints/llm.py - LLM 类主入口
class LLM:
"""LLM 推理核心类"""
def __init__(
self,
model: str,
tensor_parallel_size: int = 1,
gpu_memory_utilization: float = 0.9,
dtype: ModelDType = "auto",
quantization: QuantizationMethods | None = None,
**kwargs,
):
# 1. 解析配置
engine_args = EngineArgs(
model=model,
tensor_parallel_size=tensor_parallel_size,
gpu_memory_utilization=gpu_memory_utilization,
dtype=dtype,
quantization=quantization,
**kwargs,
)
# 2. 创建 LLM 引擎
self.llm_engine = LLMEngine.from_engine_args(
engine_args=engine_args,
usage_context=UsageContext.LLM_CLASS
)
# 3. 初始化相关组件
self.model_config = self.llm_engine.model_config
self.engine_class = type(self.llm_engine)
self.request_counter = Counter()
# vllm/scheduler - 核心调度逻辑
class Scheduler:
"""调度器:负责任务的调度和执行管理"""
def __init__(
self,
*args,
chunked_prefill_enabled: bool = False,
**kwargs,
):
# 连续批处理配置
self.chunked_prefill_enabled = chunked_prefill_enabled
# 请求管理
self.waiting_requests: Dict[RequestSeqId, SequenceGroupMetadata] = {}
self.swapped_requests: Dict[RequestSeqId, SequenceGroup] = {}
self.running_requests: Dict[RequestSeqId, SequenceGroup] = {}
# 调度策略
self.policy = SchedulerPolicy(**kwargs)
# PagedAttention 集成
self.block_manager = block_manager
def schedule(self) -> List[SequenceGroup]:
"""调度等待的请求"""
# 1. 根据优先级选择请求
# 2. 分配内存块
# 3. 调度到 Worker
pass
# vllm/worker - GPU 计算单元
class Worker:
"""Worker:负责任务的具体执行"""
def __init__(
self,
*args,
enable_lora: bool = False,
enable_prompt_adapter: bool = False,
**kwargs,
):
# 模型初始化
self.model = model
self.lora_manager = LoRAManager() if enable_lora else None
# PagedAttention 集成
self.cache_engine = CacheEngine(...)
# 量化支持
self.quant_config = quant_config
def execute_model(self, *args, **kwargs):
"""执行模型推理"""
# 1. 准备输入
# 2. 执行前向计算
# 3. 返回结果
pass
def profile_run(self, *args, **kwargs):
"""性能分析运行"""
pass
PagedAttention 是 vLLM 的核心创新,将 KV Cache 管理类比操作系统的虚拟内存管理,支持按需分配和交换。
┌─────────────────────────────────────────────────────────┐
│ 传统 KV Cache │
├─────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Sequence 1: [AAA][BBB][CCC] │ │
│ │ Sequence 2: [XXX][YYY] │ │
│ │ Sequence 3: [PPP][QQQ][RRR][SSS] │ │
│ │ │ │
│ │ 问题: │ │
│ │ - 需要连续内存空间 │ │
│ │ - 内存碎片严重 │ │
│ │ - 无法动态扩展 │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ PagedAttention │
├─────────────────────────────────────────────────────────┤
│ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ │
│ │ Block │ │ Block │ │ Block │ │ Block │ │
│ │ 0 │ │ 1 │ │ 2 │ │ 3 │ │
│ └───────┘ └───────┘ └───────┘ └───────┘ │
│ ↑ ↑ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Page Table │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Entry 0 │ │ Entry 1 │ │ Entry 2 │ │ │
│ │ │ Block 0 │ │ Block 1 │ │ Block 2 │ │ │
│ │ │ P │ │ P │ │ P │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Sequence 1: Block 0, Block 2, Block 3 │
│ Sequence 2: Block 1 │
│ Sequence 3: Block 0, Block 1, Block 2, Block 3 │
└─────────────────────────────────────────────────────────┘
# vllm/attention/paged_attention.py
class PagedAttention:
"""PagedAttention 实现"""
def __init__(
self,
num_kv_heads: int,
head_size: int,
scale: float,
num_blocks: int,
sliding_window: Optional[int] = None,
**kwargs,
):
self.num_kv_heads = num_kv_heads
self.head_size = head_size
self.scale = scale
self.sliding_window = sliding_window
# KV Cache 缓冲区
self.kv_cache = torch.empty(
(num_blocks, 2, num_kv_heads, head_size),
dtype=torch_dtype,
device=device,
)
# 槽位管理
self.block_tables = torch.full(
(max_num_seqs, max_num_blocks_per_seq),
INVALID_BLOCK_ID,
dtype=torch.long,
device=device,
)
# 自定义 CUDA Kernel
self.kv_cache_ops = KVCacheOps(num_kv_heads, head_size)
def get_output(
self,
query: torch.Tensor,
key_cache: torch.Tensor,
value_cache: torch.Tensor,
num_tokens: int,
block_tables: torch.Tensor,
) -> torch.Tensor:
"""执行 PagedAttention 计算"""
# 使用自定义 CUDA kernel 进行高效计算
return self.kv_cache_ops.attention_kernel(
query, key_cache, value_cache, block_tables, num_tokens
)
# vllm/core/block/sync_manager.py
class BlockSpaceManager:
"""内存块空间管理器"""
def __init__(
self,
block_size: int,
num_gpu_blocks: int,
num_cpu_blocks: int,
sliding_window: Optional[int] = None,
):
self.block_size = block_size
self.num_gpu_blocks = num_gpu_blocks
self.num_cpu_blocks = num_cpu_blocks
# 内存分配
self.gpu_allocator = CudaMallocAllocator()
self.cpu_allocator = MallocAllocator()
# 块状态跟踪
self.gpu_blocks: List[Optional[torch.Tensor]] = [None] * num_gpu_blocks
self.cpu_blocks: List[Optional[torch.Tensor]] = [None] * num_cpu_blocks
# 空闲列表
self.free_gpu_blocks: Deque[int] = deque(range(num_gpu_blocks))
self.free_cpu_blocks: Deque[int] = deque(range(num_cpu_blocks))
def allocate_blocks(self, num_blocks: int, device: str = "cuda") -> List[int]:
"""分配内存块"""
if device == "cuda":
blocks = self._allocate_from_gpu(num_blocks)
else:
blocks = self._allocate_from_cpu(num_blocks)
return blocks
def allocate_sequence_slots(
self,
seq_group: SequenceGroup,
) -> SequenceGroupMetadata:
"""为序列组分配槽位"""
pass
# vllm/core/block/manager.py
class GPUBlockAllocator:
"""GPU 内存块分配器"""
def __init__(self, num_blocks: int, block_size: int):
self.num_blocks = num_blocks
self.block_size = block_size
# 使用 torch.empty 预分配内存
self.kv_cache = torch.empty(
(num_blocks, 2, num_kv_heads, head_size),
dtype=torch_dtype,
device=device,
)
# 自由块位图
self.free_blocks = torch.ones(num_blocks, dtype=torch.bool)
def allocate(self, num_blocks: int) -> List[int]:
"""分配连续的内存块"""
# 找到足够数量的连续空闲块
free_indices = torch.where(self.free_blocks)[0]
if len(free_indices) < num_blocks:
raise OutOfMemoryError
# 分配逻辑
allocated = []
for i in range(num_blocks):
block_id = free_indices[i].item()
allocated.append(block_id)
self.free_blocks[block_id] = False
return allocated
def free(self, block_ids: List[int]):
"""释放内存块"""
for block_id in block_ids:
self.free_blocks[block_id] = True
def get_block(self, block_id: int) -> torch.Tensor:
"""获取内存块"""
return self.kv_cache[block_id]
Continuous Batching 允许在推理过程中动态添加新请求,提高 GPU 利用率。
# 传统批处理 vs 连续批处理
┌─────────────────────────────────────────────────────────┐
│ 传统批处理 │
├─────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 批次 1: [请求1] [请求2] [请求3] │ │
│ │ │ │ │ │ │
│ │ ↓ ↓ ↓ │ │
│ │ [计算] → [输出] → [下一个批次] │ │
│ │ │ │
│ │ 问题: │ │
│ │ - 需要等待整个批次完成 │ │
│ │ - 短请求要等待长请求 │ │
│ │ - GPU 利用率不均匀 │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 连续批处理 │
├─────────────────────────────────────────────────────────┤
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 等待队列 │ │ 运行队列 │ │ 完成 │ │ 新请求 │ │
│ │ │ │ │ │ │ │ 加入 │ │
│ │ 请求A │ │ 请求B │ │ 请求A │ │ │ │
│ │ 请求C │ │ 请求D │ │ │ │ 请求E │ │
│ │ │ │ │ │ │ │ 请求F │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ ↓ ↓ ↑ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 调度器动态管理 │ │
│ │ - 请求完成立即开始新请求 │ │
│ │ - 按优先级调度 │ │
│ │ - 内存动态分配 │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
# vllm/executor/executor_base.py
class ExecutorBase:
"""执行器基类"""
def execute_model(
self,
*args,
**kwargs,
) -> SequenceGroupOutputs:
"""执行模型推理"""
# 1. 准备批处理输入
batch_inputs = self._prepare_batch_inputs(*args, **kwargs)
# 2. 执行模型前向计算
outputs = self._execute_model(batch_inputs)
# 3. 后处理结果
processed_outputs = self._process_outputs(outputs)
return processed_outputs
def _prepare_batch_inputs(self, *args, **kwargs):
"""准备批处理输入"""
# 合并多个请求的输入
batch_size = len(args)
# 初始化批处理张量
input_ids = torch.full(
(batch_size, self.max_prompt_len),
self.tokenizer.pad_token_id,
dtype=torch.long,
)
# 填充输入
for i, seq_group in enumerate(args):
seq_len = min(len(seq_group.token_ids), self.max_prompt_len)
input_ids[i, :seq_len] = torch.tensor(seq_group.token_ids[:seq_len])
return input_ids
# vllm/worker/worker.py - Worker 执行逻辑
class Worker:
def execute_model(
self,
seq_group_metadata_list: List[SequenceGroupMetadata],
blocks_to_swap_in: Dict[int, torch.Tensor],
blocks_to_swap_out: Dict[int, torch.Tensor],
num_lookahead_slots: int = 0,
) -> List[SequenceGroupOutputs]:
"""Worker 执行模型"""
# 1. 执行内存交换
self._swap_blocks(blocks_to_swap_in, blocks_to_swap_out)
# 2. 执行模型计算
outputs = self.model_runner.execute_model(seq_group_metadata_list)
# 3. 返回结果
return outputs
# vllm/scheduler/policies/Policy.py
class SchedulerPolicy:
"""调度策略基类"""
def __init__(self, *args, **kwargs):
# 调度参数
self.max_num_batched_tokens = max_num_batched_tokens
self.max_num_seqs = max_num_seqs
self.chunked_prefill_enabled = chunked_prefill_enabled
def schedule(
self,
running: List[SequenceGroup],
waiting: List[SequenceGroup],
swapped: List[SequenceGroup],
) -> List[SequenceGroup]:
"""调度算法"""
# 1. 优先运行计算量大的请求
running = self._select_running_requests(running, waiting, swapped)
# 2. 添加新请求到运行队列
if not running:
running = self._add_new_requests(waiting, running)
# 3. 添加预填充请求
if self.chunked_prefill_enabled:
running = self._add_chunked_prefills(waiting, running)
return running
def _select_running_requests(
self,
running: List[SequenceGroup],
waiting: List[SequenceGroup],
swapped: List[SequenceGroup],
) -> List[SequenceGroup]:
"""选择要运行的请求"""
# 按优先级排序
all_requests = running + waiting + swapped
sorted_requests = sorted(
all_requests,
key=lambda x: x.get_priority(),
reverse=True,
)
# 计算可用的计算资源
available_tokens = self._get_available_tokens()
# 选择请求直到资源耗尽
selected = []
total_tokens = 0
for req in sorted_requests:
req_tokens = req.get_num_unprocessed_tokens()
if total_tokens + req_tokens <= available_tokens:
selected.append(req)
total_tokens += req_tokens
return selected
Priority Scheduling 允许根据请求优先级动态调整执行顺序,确保高优先级请求能够快速响应。
# vllm/scheduler/sequence_group.py
class SequenceGroup:
"""序列组:管理多个序列"""
def __init__(
self,
request_id: RequestSeqId,
seqs: List[Sequence],
sampling_params: SamplingParams,
arrival_time: float,
lora_request: LoRARequest | None = None,
is_encoder_decoder: bool = False,
multimodal_data: MultiModalData | None = None,
block_tables: Optional[BlockTables] = None,
num_computed_tokens: int = 0,
eos_token_id: int = None,
logit_bias: Optional[Dict[int, float]] = None,
prompt_adapter_request: PromptAdapterRequest | None = None,
arrival_time_ms: Optional[int] = None,
trace_headers: Optional[SequenceGroupMetadata] = None,
external_group_id: Optional[str] = None,
):
# 请求信息
self.request_id = request_id
self.seqs = seqs
self.sampling_params = sampling_params
self.arrival_time = arrival_time
# 状态管理
self.state = SequenceGroupState.WAITING
self.sampling_params = sampling_params
def get_priority(self) -> float:
"""计算请求优先级"""
# 1. 基础优先级
base_priority = 1.0
# 2. 等待时间权重
wait_time = time.time() - self.arrival_time
wait_priority = min(wait_time / 60.0, 10.0) # 最多10倍优先级
# 3. 模型优先级(MoE 模型优先)
model_priority = 2.0 if hasattr(self, 'is_moe') else 1.0
# 4. LoRA 请求优先级
lora_priority = 1.5 if self.lora_request else 1.0
# 综合优先级
total_priority = base_priority * (1.0 + wait_priority + model_priority + lora_priority)
return total_priority
# vllm/engine/arg_utils.py
class EngineArgs:
"""引擎配置参数"""
def __init__(
self,
model: str,
tokenizer: Optional[str] = None,
tensor_parallel_size: int = 1,
dtype: ModelDType = "auto",
quantization: Optional[QuantizationMethods] = None,
gpu_memory_utilization: float = 0.9,
max_num_batched_tokens: Optional[int] = None,
max_num_seqs: int = 256,
max_model_len: Optional[int] = None,
distributed_executor_backend: Optional[str] = None,
**kwargs,
):
# 模型配置
self.model = model
self.tokenizer = tokenizer
self.tensor_parallel_size = tensor_parallel_size
# 内存配置
self.gpu_memory_utilization = gpu_memory_utilization
self.max_num_batched_tokens = max_num_batched_tokens
self.max_num_seqs = max_num_seqs
self.max_model_len = max_model_len
# 量化配置
self.quantization = quantization
self.dtype = dtype
# 分布式配置
self.distributed_executor_backend = distributed_executor_backend
def verify_with_parallel_config(self, parallel_config: ParallelConfig):
"""验证配置的兼容性"""
# 检查并行配置
if self.tensor_parallel_size > 1:
if parallel_config.world_size % self.tensor_parallel_size != 0:
raise ValueError("Invalid tensor parallel size")
# 检查内存配置
if self.gpu_memory_utilization <= 0 or self.gpu_memory_utilization > 1:
raise ValueError("GPU memory utilization must be between 0 and 1")
# vllm/engine/llm_engine.py
class LLMEngine:
"""LLM 引擎主类"""
@classmethod
def from_engine_args(cls, engine_args: EngineArgs) -> "LLMEngine":
"""从配置参数创建引擎"""
# 1. 解析配置
parallel_config = engine_args.create_parallel_config()
cache_config = engine_args.create_cache_config()
scheduler_config = engine_args.create_scheduler_config()
# 2. 创建模型加载器
model_loader = ModelLoader(
model_config=engine_args.create_model_config(),
parallel_config=parallel_config,
cache_config=cache_config,
)
# 3. 创建缓存引擎
cache_engine = CacheEngine(
num_blocks=cache_config.num_gpu_blocks,
block_size=cache_config.block_size,
sliding_window=cache_config.sliding_window,
dtype=cache_config.dtype,
)
# 4. 创建调度器
scheduler = Scheduler(
*args,
cache_config=cache_config,
parallel_config=parallel_config,
scheduler_config=scheduler_config,
lora_config=engine_args.lora_config,
prompt_adapter_config=engine_args.prompt_adapter_config,
)
# 5. 创建执行器
executor = Executor(
*args,
engine_config=engine_config,
cache_engine=cache_engine,
scheduler=scheduler,
)
return cls(
*args,
executor=executor,
model_config=model_config,
tokenizer_config=tokenizer_config,
cache_config=cache_config,
scheduler_config=scheduler_config,
)
# vllm/entrypoints/llm.py - 用户接口
class LLM:
"""用户使用的 LLM 接口类"""
def __init__(
self,
model: str,
*,
runner: RunnerOption = "auto",
convert: ConvertOption = "auto",
tokenizer: Optional[str] = None,
tensor_parallel_size: int = 1,
dtype: ModelDType = "auto",
quantization: Optional[QuantizationMethods] = None,
**kwargs,
):
"""初始化 LLM"""
# 1. 创建引擎配置
engine_args = EngineArgs(
model=model,
tokenizer=tokenizer,
tensor_parallel_size=tensor_parallel_size,
dtype=dtype,
quantization=quantization,
**kwargs,
)
# 2. 创建 LLM 引擎
self.llm_engine = LLMEngine.from_engine_args(
engine_args=engine_args,
usage_context=UsageContext.LLM_CLASS,
)
# 3. 初始化组件
self.model_config = self.llm_engine.model_config
self.engine_class = type(self.llm_engine)
self.request_counter = Counter()
# 4. 设置默认采样参数
self.default_sampling_params = None
def generate(
self,
prompts: PromptType | Sequence[PromptType],
sampling_params: SamplingParams | None = None,
**kwargs,
) -> list[RequestOutput]:
"""生成文本"""
# 1. 准备输入
engine_inputs = self._preprocess_cmpl(prompts)
# 2. 执行推理
outputs = self._run_completion(
prompts=engine_inputs,
params=sampling_params,
output_type=RequestOutput,
**kwargs,
)
return outputs
# vllm/entrypoints/llm.py - 生成核心逻辑
def generate(
self,
prompts: PromptType | Sequence[PromptType],
sampling_params: SamplingParams | None = None,
*,
use_tqdm: bool = True,
lora_request: Optional[LoRARequest] = None,
**kwargs,
) -> list[RequestOutput]:
"""生成文本的核心方法"""
# 1. 验证运行类型
runner_type = self.model_config.runner_type
if runner_type != "generate":
raise ValueError("LLM.generate() is only supported for generative models")
# 2. 使用默认采样参数
if sampling_params is None:
sampling_params = self.get_default_sampling_params()
# 3. 准备输入并执行
return self._run_completion(
prompts=prompts,
params=sampling_params,
output_type=RequestOutput,
use_tqdm=use_tqdm,
lora_request=lora_request,
**kwargs,
)
def _run_completion(
self,
prompts: Sequence[EngineInput],
params: SamplingParams | Sequence[SamplingParams],
output_type: type[RequestOutput],
**kwargs,
) -> list[RequestOutput]:
"""运行完成推理"""
# 1. 添加请求到引擎
request_ids = self._add_completion_requests(
prompts=prompts,
params=params,
**kwargs,
)
# 2. 等待结果
outputs = self._run_engine(output_type, use_tqdm=kwargs.get("use_tqdm", True))
return outputs
分层配置架构:EngineArgs → ParallelConfig → CacheConfig → SchedulerConfig
# 配置层次结构
┌─────────────────────────────────────────────────────────┐
│ EngineArgs │
├─────────────────────────────────────────────────────────┤
│ - model: str │
│ - tokenizer: Optional[str] │
│ - tensor_parallel_size: int │
│ - gpu_memory_utilization: float │
│ - max_num_batched_tokens: int │
│ - max_num_seqs: int │
│ └─────────────────────────────────────────────────────┤
│ ↓ │
│ ParallelConfig │
│ - world_size: int │
│ - tensor_parallel_size: int │
│ - pipeline_parallel_size: int │
│ └─────────────────────────────────────────────────────┤
│ ↓ │
│ CacheConfig │
│ - num_gpu_blocks: int │
│ - block_size: int │
│ - dtype: torch.dtype │
│ └─────────────────────────────────────────────────────┤
│ ↓ │
│ SchedulerConfig │
│ - max_num_batched_tokens: int │
│ - max_num_seqs: int │
│ - max_model_len: int │
│ └─────────────────────────────────────────────────────┘
张量并行 (Tensor Parallel)
流水线并行
数据并行
MoE 并行
# vllm/sampling_params.py
class SamplingParams:
"""采样参数配置"""
def __init__(
self,
n: int = 1,
best_of: int | None = None,
use_beam_search: bool = False,
temperature: float = 1.0,
top_p: float = 1.0,
top_k: int = -1,
top_ema_p: float = 0.0,
top_ema_d: int = 0,
min_p: float = 0.0,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
repetition_penalty: float = 1.0,
length_penalty: float = 1.0,
early_stopping: bool = False,
stop_token_ids: List[int] | None = None,
stop_strings: List[str] | None = None,
ignore_eos: bool = False,
max_tokens: int | None = None,
logprobs: int | None = None,
prompt_logprobs: int | None = None,
skip_special_tokens: bool = True,
spaces_between_special_tokens: bool = True,
include_stop_str_in_output: bool = False,
multiple_stop_sequences: bool = False,
logit_bias: Dict[int, float] | None = None,
allowed_token_ids: List[int] | None = None,
forbidden_token_ids: List[int] | None = None,
):
# 采样控制
self.n = n
self.best_of = best_of
self.use_beam_search = use_beam_search
# 温度控制
self.temperature = temperature
self.top_p = top_p
self.top_k = top_k
# 惩罚机制
self.presence_penalty = presence_penalty
self.frequency_penalty = frequency_penalty
self.repetition_penalty = repetition_penalty
# 停止条件
self.stop_token_ids = stop_token_ids or []
self.max_tokens = max_tokens
self.ignore_eos = ignore_eos
# vllm/entrypoints/openai/api_server.py
app = FastAPI()
@app.post("/v1/completions")
async def create_completion(request: CompletionRequest):
"""创建完成请求"""
# 1. 解析请求
prompts = request.prompt
sampling_params = SamplingParams(
n=request.n,
temperature=request.temperature,
max_tokens=request.max_tokens,
**request.sampling_params,
)
# 2. 执行推理
llm = LLM(
model=request.model,
**request.engine_args,
)
outputs = llm.generate(prompts, sampling_params)
# 3. 返回结果
return CompletionResponse(
id=outputs[0].request_id,
object="text_completion",
created=int(time.time()),
model=request.model,
choices=[
{
"text": output.text,
"index": i,
"logprobs": output.logprobs,
"finish_reason": output.finish_reason,
}
for i, output in enumerate(outputs[0].outputs)
],
usage=outputs[0].usage,
)
@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
"""创建聊天完成请求"""
# 支持多轮对话
prompts = request.messages
return await create_completion(request)
完全兼容:vLLM 提供与 OpenAI API 完全兼容的接口,可以无缝替换 OpenAI 的服务
| 端点 | 功能 | 支持特性 |
|---|---|---|
/v1/completions |
文本补全 | n, temperature, max_tokens, stop |
/v1/chat/completions |
聊天补全 | 多轮对话, tools, streaming |
/v1/models |
模型列表 | 模型信息, 权重 |
/v1/embeddings |
文本嵌入 | 向量化表示 |
# vllm/engine/async_llm_engine.py
class AsyncLLMEngine:
"""异步 LLM 引擎"""
def __init__(self, *args, **kwargs):
# 1. 创建同步引擎
self.engine = LLMEngine(*args, **kwargs)
# 2. 创建异步任务队列
self.engine = engine
self.request_counter = Counter()
# 3. 启动后台进程
self.process_manager = ProcessManager(
engine_class=self.engine,
engine_args=engine_args,
)
async def generate(
self,
prompt: PromptType,
sampling_params: SamplingParams,
request_id: str,
**kwargs,
) -> RequestOutput:
"""异步生成"""
# 1. 创建请求
request = Request(
request_id=request_id,
prompt=prompt,
sampling_params=sampling_params,
**kwargs,
)
# 2. 添加到队列
self.request_counter.add_request(request.request_id)
self.process_manager.add_request(request)
# 3. 异步等待结果
output = await self.process_manager.get_output(request.request_id)
return output
async def abort(self, request_id: str) -> None:
"""中止请求"""
self.request_counter.remove_request(request_id)
self.process_manager.abort_request(request_id)
Streaming 支持实时返回生成结果,提供更好的用户体验
# WebSocket 流式响应
@app.websocket("/v1/chat/completions")
async def chat_completion_websocket(websocket: WebSocket):
"""WebSocket 流式聊天完成"""
await websocket.accept()
try:
# 1. 接收请求
request = await websocket.receive_json()
# 2. 创建流式生成器
async for chunk in llm.generate_stream(
request["messages"],
request["sampling_params"],
):
# 3. 发送流式响应
response = {
"id": chunk.request_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request["model"],
"choices": [
{
"index": i,
"delta": {"content": output.text},
"finish_reason": output.finish_reason,
}
for i, output in enumerate(chunk.outputs)
],
}
await websocket.send_json(response)
# 4. 发送完成信号
await websocket.send_json({
"id": chunk.request_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request["model"],
"choices": [{"index": i, "finish_reason": "stop"}
for i in range(len(chunk.outputs))],
})
except Exception as e:
await websocket.close(code=4001, reason=str(e))
多模态融合:支持文本+图像、文本+视频等多种模态输入
# 多模态处理流程
┌─────────────────────────────────────────────────────────┐
│ 多模态处理 │
├─────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 文本输入 │ │ 图像输入 │ │ 视频输入 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ↓ ↓ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 预处理器 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Tokenizer ││ Vision ││ Video │ │ │
│ │ │ ││ Processor││ Processor│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 模型融合 │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ Transformer │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ 文本编码 ││ 视觉编码 ││ 时序编码 │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 生成器 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Language ││ 策略网络 ││ 输出层 │ │ │
│ │ │ Model ││ ││ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
| 量化方法 | 压缩率 | 精度影响 | 适用场景 |
|---|---|---|---|
| GPTQ | 4-8x | 轻微 | 通用推理 |
| AWQ | 4-8x | 极小 | 高质量推理 |
| INT4 | 8x | 中等 | 内存受限 |
| INT8 | 4x | 较小 | 平衡场景 |
| FP8 | 4x | 可接受 | 现代 GPU |
GPU 支持
特殊硬件
内存优化
多 GPU 策略
多层优化栈:从算法到硬件的全栈优化,确保最佳推理性能
性能优化层次结构:
┌─────────────────────────────────────────────────────────┐
│ 应用层优化 │
├─────────────────────────────────────────────────────────┤
│ • 连续批处理策略 │
│ • 优先级调度 │
│ • 动态内存管理 │
│ • 智能缓存策略 │
│ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 算法层优化 │ │
│ │ • PagedAttention 内存管理 │ │
│ │ • FlashAttention 注意力优化 │ │
│ │ • 滑动窗口机制 │ │
│ │ • MoE 路由优化 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 系统层优化 │ │
│ │ • CUDA Kernel 自定义 │ │
│ │ • 内存池管理 │ │
│ │ • 异步 I/O 优化 │ │
│ │ • 多线程调度 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 硬件层优化 │ │
│ │ • GPU Tensor Core 利用 │ │
│ │ • 显存带宽优化 │ │
│ │ • NVLink 通信优化 │ │
│ │ • CPU-GPU 协同优化 │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
# vllm/attention/paged_attention.py - 关键优化
class PagedAttention:
"""优化的 PagedAttention 实现"""
def __init__(self, *args, **kwargs):
# 1. 初始化自定义 CUDA kernel
self.kv_cache_ops = KVCacheOps(
num_kv_heads=self.num_kv_heads,
head_size=self.head_size,
scale=self.scale,
)
# 2. 预分配内存池
self.kv_cache_buffer = torch.empty(
(self.num_blocks, 2, self.num_kv_heads, self.head_size),
dtype=torch_dtype,
device=device,
)
# 3. 初始化位图管理
self.block_bitmap = torch.zeros(
self.num_blocks, dtype=torch.bool, device=device
)
# 4. 滑动窗口支持
if self.sliding_window is not None:
self.sliding_window_blocks = self.sliding_window // self.block_size
def get_output(
self,
query: torch.Tensor,
block_tables: torch.Tensor,
num_tokens: int,
) -> torch.Tensor:
"""优化的 PagedAttention 计算"""
# 1. 使用自定义 kernel 进行高效计算
output = self.kv_cache_ops.attention_kernel(
query, self.kv_cache_buffer, block_tables, num_tokens
)
# 2. 滑动窗口处理
if self.sliding_window is not None:
output = self._apply_sliding_window(output, block_tables)
return output
def _apply_sliding_window(
self,
output: torch.Tensor,
block_tables: torch.Tensor,
) -> torch.Tensor:
"""应用滑动窗口机制"""
# 遍历序列组,应用滑动窗口
for seq_idx in range(block_tables.size(0)):
blocks = block_tables[seq_idx]
valid_blocks = torch.sum(blocks != INVALID_BLOCK_ID).item()
if valid_blocks > self.sliding_window_blocks:
# 裁剪超出窗口的部分
overflow_blocks = valid_blocks - self.sliding_window_blocks
start_block = overflow_blocks * self.block_size
output[seq_idx, start_block:] = 0
return output
自定义 Kernel:使用 CUDA 编写高性能算子,充分利用 GPU 的并行计算能力
# vllm/attention/kv_cache_ops.cu - CUDA Kernel 实现
__global__ void paged_attention_kernel(
float* output, // 输出 [num_seqs, num_heads, head_size]
const float* query, // 查询 [num_seqs, num_heads, head_size]
const float* key_cache, // Key 缓存 [num_blocks, num_heads, head_size]
const float* value_cache, // Value 缓存 [num_blocks, num_heads, head_size]
const int* block_tables, // 块表 [num_seqs, max_blocks_per_seq]
int num_seqs, // 序列数量
int num_heads, // 头数量
int head_size, // 头大小
int block_size, // 块大小
float scale, // 缩放因子
int max_blocks_per_seq // 每个序列最大块数
) {
// 计算全局线程索引
int seq_idx = blockIdx.x;
int head_idx = blockIdx.y;
int token_idx = threadIdx.x;
// 输出指针
float* output_seq = output + seq_idx * num_heads * head_size + head_idx * head_size;
// 初始化累加器
float acc = 0.0f;
// 遍历块表
for (int block_idx = 0; block_idx < max_blocks_per_seq; block_idx++) {
int block_table_idx = block_tables[seq_idx * max_blocks_per_seq + block_idx];
if (block_table_idx == -1) break;
// 计算注意力
float sum = 0.0f;
// ... 自定义注意力计算逻辑
acc += sum;
}
// 写入结果
if (token_idx < head_size) {
output_seq[token_idx] = acc * scale;
}
}
FlashAttention 是一种高效的注意力计算算法,显著减少内存访问开销
# vllm/attention/flash_attention.py
class FlashAttention:
"""Flash Attention 集成"""
def __init__(self, *args, **kwargs):
# 1. 检查支持
if not has_flash_attn:
raise ImportError("FlashAttention not available")
# 2. 初始化 FlashAttention
self.flash_attn = flash_attn_varlen
# 3. 配置参数
self.window_size = kwargs.get("window_size", 512)
def forward(
self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
seq_lens: torch.Tensor,
) -> torch.Tensor:
"""使用 FlashAttention 计算"""
# 1. 检查输入维度
if query.dim() != 3:
raise ValueError("Query must be 3D tensor")
# 2. 使用 FlashAttention
output = self.flash_attn(
q=query,
k=key,
v=value,
cu_seqlens=seq_lens,
max_seqlen=torch.max(seq_lens).item(),
window_size=self.window_size,
alibi_slopes=None,
softmax_scale=None,
deterministic=False,
training=False,
)
return output
def flash_attention_with_paged_kv(
self,
query: torch.Tensor,
key_cache: torch.Tensor,
value_cache: torch.Tensor,
block_tables: torch.Tensor,
) -> torch.Tensor:
"""带 PagedAttention 的 FlashAttention"""
# 1. 将 Paged KV 转换为连续格式
key, value = self._convert_paged_kv_to_contiguous(
key_cache, value_cache, block_tables
)
# 2. 使用标准 FlashAttention
output = self.flash_attention(query, key, value)
return output
解码策略
高级策略
优化策略
调度策略
广泛的模型兼容性:支持主流的开源模型和商业模型
| 模型类型 | 支持模型 | 特性 |
|---|---|---|
| Transformer | Llama, GPT, BERT | 标准架构 |
| MoE | Mixtral, DeepSeek-V2 | 专家混合 |
| Embedding | E5-Mistral, Sentence-BERT | 向量化 |
| Multi-modal | LLaVA, PaLM-E | 多模态融合 |
专家混合 (Mixture of Experts) 支持万亿参数模型的高效推理
# vllm/model_executor/models/moe.py
class MoEModel:
"""MoE 模型支持"""
def __init__(
self,
num_experts: int,
expert_capacity: int,
hidden_size: int,
intermediate_size: int,
**kwargs,
):
# 专家数量和容量
self.num_experts = num_experts
self.expert_capacity = expert_capacity
# 路由层
self.gate = nn.Linear(hidden_size, num_experts)
# 专家层
self.experts = nn.ModuleList([
nn.Linear(hidden_size, intermediate_size)
for _ in range(num_experts)
])
# 专家负载均衡
self.expert_capacity = expert_capacity
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
"""MoE 前向传播"""
# 1. 计算专家权重
gate_logits = self.gate(hidden_states)
# 2. 专家路由
top_k_weights, top_k_indices = torch.topk(
gate_logits, k=self.num_experts_per_tok, dim=-1
)
# 3. 专家并行计算
outputs = self._compute_expert_parallel(
hidden_states, top_k_indices, top_k_weights
)
# 4. 负载均衡
outputs = self._balance_experts(outputs, top_k_indices)
return outputs
def _compute_expert_parallel(
self,
hidden_states: torch.Tensor,
top_k_indices: torch.Tensor,
top_k_weights: torch.Tensor,
) -> torch.Tensor:
"""并行计算多个专家"""
# 调整维度用于专家计算
batch_size, seq_len, hidden_dim = hidden_states.shape
hidden_states = hidden_states.view(-1, hidden_dim)
# 收集专家输入
expert_inputs = self._gather_expert_inputs(
hidden_states, top_k_indices
)
# 并行计算专家
expert_outputs = []
for i in range(self.num_experts):
expert_input = expert_inputs[i]
if expert_input.size(0) > 0:
output = self.experts[i](expert_input)
expert_outputs.append(output)
# 合并专家输出
final_output = self._merge_expert_outputs(
expert_outputs, top_k_indices, batch_size, seq_len
)
return final_output
批处理算法
动态批处理
批处理优化
质量优化
智能内存管理:PagedAttention + 动态分配 + 多级缓存
# vllm/core/cache/huggingface_cache.py
class HuggingFaceCache:
"""HuggingFace 兼容的缓存系统"""
def __init__(
self,
model: nn.Module,
cache_config: CacheConfig,
**kwargs,
):
# 缓存配置
self.cache_config = cache_config
self.model = model
# 初始化缓存
self._init_cache()
# 统计信息
self.cache_hit = 0
self.cache_miss = 0
def _init_cache(self):
"""初始化模型缓存"""
# 1. 检查模型层
self.layers = self._get_model_layers()
# 2. 初始化每层的缓存
self.layer_caches = []
for layer in self.layers:
cache = self._create_layer_cache(layer)
self.layer_caches.append(cache)
# 3. 全局缓存管理
self.global_cache = {}
def get_kv_cache(
self,
layer_idx: int,
input_ids: torch.Tensor,
) -> Tuple[torch.Tensor, torch.Tensor]:
"""获取 KV 缓存"""
# 1. 检查缓存命中
cache_key = self._generate_cache_key(input_ids)
if cache_key in self.global_cache:
self.cache_hit += 1
return self.global_cache[cache_key]
# 2. 缓存未命中
self.cache_miss += 1
key, value = self._compute_kv_cache(layer_idx, input_ids)
# 3. 更新缓存
self.global_cache[cache_key] = (key, value)
return key, value
def _compute_kv_cache(
self,
layer_idx: int,
input_ids: torch.Tensor,
) -> Tuple[torch.Tensor, torch.Tensor]:
"""计算 KV 缓存"""
# 1. 获取模型层
layer = self.layers[layer_idx]
# 2. 前向传播
with torch.no_grad():
hidden_states = layer(input_ids)
key, value = layer.attention(
hidden_states, hidden_states
)
return key, value
数据并行
张量并行
流水线并行
专家并行
配置优化
性能调优
模型选择
内存管理
系统性调优方法:从基础配置到高级优化的完整流程
# vllm/performance_tuning.py
class PerformanceTuner:
"""性能调优器"""
def __init__(self, model: str, **kwargs):
self.model = model
self.config = kwargs
# 性能基准
self.baseline = None
self.current_config = kwargs
def run_benchmark(self) -> Dict[str, float]:
"""运行性能基准测试"""
llm = LLM(
model=self.model,
**self.current_config,
)
# 测试集
test_prompts = ["Hello, world!", "What is AI?"] * 100
# 测量性能
start_time = time.time()
outputs = llm.generate(test_prompts, SamplingParams())
end_time = time.time()
# 计算指标
duration = end_time - start_time
throughput = len(outputs) / duration
latency = duration / len(outputs)
results = {
"throughput": throughput,
"latency": latency,
"gpu_util": self._get_gpu_utilization(),
"memory_usage": self._get_memory_usage(),
}
return results
def optimize_config(self) -> Dict[str, Any]:
"""优化配置"""
# 1. 基准测试
self.baseline = self.run_benchmark()
# 2. 参数调优
best_config = self.current_config.copy()
best_throughput = self.baseline["throughput"]
# 调试 gpu_memory_utilization
for util in [0.8, 0.85, 0.9, 0.95]:
config = self.current_config.copy()
config["gpu_memory_utilization"] = util
self.current_config = config
results = self.run_benchmark()
if results["throughput"] > best_throughput:
best_throughput = results["throughput"]
best_config = config.copy()
return best_config
def analyze_bottlenecks(self) -> Dict[str, str]:
"""分析性能瓶颈"""
results = self.run_benchmark()
bottlenecks = {}
# GPU 利用率分析
if results["gpu_util"] < 80:
bottlenecks["gpu"] = "GPU利用率过低,可能受CPU或内存限制"
# 内存使用分析
if results["memory_usage"] > 0.9:
bottlenecks["memory"] = "内存使用过高,考虑增加GPU内存或减少批处理大小"
# 延迟分析
if results["latency"] > 0.5:
bottlenecks["latency"] = "延迟过高,考虑优化模型配置"
return bottlenecks
多层次内存管理:从模型权重到 KV Cache 的全栈优化
# 内存优化策略
┌─────────────────────────────────────────────────────────┐
│ 内存优化层次 │
├─────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 模型权重 │ │
│ │ • Quantization (INT4/INT8/FP8) │ │
│ │ • Weight Sharing (模型共享) │ │
│ │ • CPU Offloading (权重卸载) │ │
│ │ • Pruning (模型剪枝) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┼─────────────────────────────────────────────────────┤ │
│ │ KV Cache │ │
│ │ • PagedAttention (分页管理) │ │
│ │ • Sliding Window (滑动窗口) │ │
│ │ • Prefix Caching (前缀缓存) │ │
│ │ • Block Management (块管理) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┼─────────────────────────────────────────────────────┤ │
│ │ 中间结果 │ │
│ │ • Activation Checkpointing (检查点) │ │
│ │ • Gradient Accumulation (梯度累积) │ │
│ │ • Memory Pool (内存池) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┼─────────────────────────────────────────────────────┤ │
│ │ 系统内存 │ │
│ │ • Memory Pinning (页面锁定) │ │
│ │ • Zero Copy (零拷贝) │ │
│ │ • Unified Memory (统一内存) │ │
│ │ • Memory Prefetching (预取) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
# 内存优化代码示例
class MemoryOptimizer:
"""内存优化器"""
def __init__(self, model_size: int, available_memory: float):
self.model_size = model_size
self.available_memory = available_memory
def optimize_memory_layout(self) -> Dict[str, Any]:
"""优化内存布局"""
# 1. 计算各组件大小
kv_cache_size = self._estimate_kv_cache_size()
model_weight_size = self.model_size
# 2. 分配内存
memory_plan = {
"model_weights": {
"size": model_weight_size,
"location": "gpu",
"optimization": "quantization" if model_weight_size > self.available_memory * 0.6 else "none"
},
"kv_cache": {
"size": kv_cache_size,
"location": "gpu",
"optimization": "paged_attention"
},
"intermediate": {
"size": kv_cache_size * 0.1,
"location": "gpu",
"optimization": "memory_pool"
}
}
return memory_plan
批处理优化
计算优化
I/O 优化
调度优化
内存不足
# 解决方案:
1. 减少 gpu_memory_utilization (0.9 → 0.8)
2. 启用量化 (quantization="gptq")
3. 增加分页大小 (block_size=32)
4. 减少最大批处理大小
GPU 利用率低
# 解决方案:
1. 增加批处理大小
2. 启用连续批处理
3. 调整滑动窗口
4. 检查 CPU 瓶颈
延迟过高
# 解决方案:
1. 减少 max_num_batched_tokens
2. 启用 chunked prefill
3. 优化模型选择
4. 检查网络带宽
系统级故障
# 检查系统资源
nvidia-smi # GPU 状态
free -h # 内存使用
df -h # 磁盘空间
top # CPU 使用率
# 检查 CUDA 环境
nvcc --version
python -c "import torch; print(torch.cuda.is_available())"
vLLM 特定故障
# 启用详细日志
export VLLM_LOG_LEVEL=debug
# 检查配置兼容性
python -c "from vllm import LLM; LLM('model')"
# 性能分析
python -m cProfile -o profile.txt your_script.py
# 内存分析
python -m memory_profiler your_script.py
关键监控指标
# 核心性能指标
• GPU 利用率 (GPU Utilization)
• 内存使用 (Memory Usage)
• 吞吐量 (Throughput - tokens/sec)
• 延迟 (Latency - p99, p95)
• 请求队列长度 (Queue Length)
• 缓存命中率 (Cache Hit Rate)
• 错误率 (Error Rate)
监控实现
# Prometheus 集成
from prometheus_client import Counter, Histogram, Gauge
# 指标定义
REQUEST_COUNT = Counter('vllm_requests_total', 'Total requests')
REQUEST_DURATION = Histogram('vllm_request_duration_seconds', 'Request duration')
GPU_UTILIZATION = Gauge('vllm_gpu_utilization', 'GPU utilization')
# 更新指标
def update_metrics():
gpu_util = get_gpu_utilization()
GPU_UTILIZATION.set(gpu_util)
核心技术亮点
性能优势
vLLM 重新定义了 LLM 推理的性能标准,通过 PagedAttention 和连续批处理等技术,实现了业界领先的推理性能,已成为 LLM 服务部署的事实标准。
源码地址
https://github.com/vllm-project/vllm