源码级别解析 · 生产级模型 serving 系统 · 源码级深度分析
2026-05-25 | 每日技术深度解读
BentoML 是一个用于构建AI模型推理服务的Python框架
支持任何机器学习框架、模态和推理运行时
从模型训练到生产部署的完整工作流
将任何模型转换为生产就绪的推理API
import bentoml
@bentoml.service(
image=bentoml.images.Image(python_version="3.11").python_packages("torch", "transformers"),
)
class Summarization:
def __init__(self) -> None:
import torch
from transformers import pipeline
device = "cuda" if torch.cuda.is_available() else "cpu"
self.pipeline = pipeline('summarization', device=device)
@bentoml.api(batchable=True)
def summarize(self, texts: list[str]) -> list[str]:
results = self.pipeline(texts)
return [item['summary_text'] for item in results]
使用 @bentoml.service 装饰器定义服务,@bentoml.api 定义API端点
Bento是BentoML的核心部署单元,包含模型、代码、配置
Service定义API,Bento是完整部署包,Runner执行推理
完整的CI/CD流水线支持
# 本地运行
bentoml serve
# 构建Bento包
bentoml build
# 生成Docker镜像
bentoml containerize summarization:latest
# 运行Docker容器
docker run --rm -p 3000:3000 summarization:latest
# 部署到BentoCloud
bentoml deploy
从开发到部署的完整命令行操作
使用标准Python类型注解自动生成API文档
from bentoml.io import Text, JSON, JSONDescriptor
from typing import List, Dict
# 文本输入输出
@bentoml.api(input=Text(), output=Text())
def text_process(text: str) -> str:
return text.upper()
# JSON输入输出
@bentoml.api(input=JSON(), output=JSON())
def json_process(data: dict) -> dict:
return {"processed": True, **data}
# 批量处理
@bentoml.api(input=Text(), output=Text(), batchable=True)
def batch_process(texts: List[str]) -> List[str]:
return [t.upper() for t in texts]
# 自定义IO描述
@bentoml.api(
input=JSONDescriptor.from_pydantic(MyInputModel),
output=JSONDescriptor.from_pydantic(MyOutputModel)
)
def custom_api(input_data: MyInputModel) -> MyOutputModel:
return MyOutputModel(**output_data)
支持多种输入输出类型,包括自定义数据模型
BentoML提供统一的模型存储和管理机制
import bentoml
# 保存模型
bentoml.sklearn.save_model("my_classifier", model)
# 加载模型
model = bentoml.sklearn.load_model("my_classifier:latest")
# 查看模型信息
model = bentoml.models.get("my_classifier:latest")
print(model.tag)
print(model.path)
print(model.options)
# 模型标签
tag = bentoml.Tag("my_classifier", version="v1.0.0")
# 模型复制
bentoml.models.copy("my_classifier:v1", "my_classifier:v2")
完整的CRUD操作支持,包括版本管理和元数据
内置多种优化技术,最大化硬件利用率
@bentoml.api(batchable=True, max_batch_size=32, batch_timeout=0.1)
def predict_batch(self, texts: List[str]) -> List[float]:
"""支持动态批处理的推理API"""
results = []
for text in texts:
# 模型推理
result = self.model.predict(text)
results.append(result)
return results
# 或者更简单的异步处理
@bentoml.api(batchable=True, max_batch_size=64)
async def async_predict(self, texts: List[str]) -> List[float]:
"""异步批处理推理"""
batch = await self.model.batch_predict(texts)
return batch.tolist()
自动收集请求并批量处理,显著提高吞吐量
原生支持GPU加速,自动检测和优化
import bentoml
@bentoml.service(
image=bentoml.images.Image(
python_version="3.11"
).python_packages("torch", "transformers"),
)
class GPUInference:
def __init__(self) -> None:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
# 自动检测CUDA
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
# 加载模型到GPU
self.model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-7b-chat-hf",
device_map="auto", # 自动设备映射
torch_dtype=torch.float16 # 半精度减少内存
)
self.tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-chat-hf")
@bentoml.api(batchable=True)
def generate(self, prompts: List[str]) -> List[str]:
"""GPU加速的文本生成"""
inputs = self.tokenizer(
prompts,
return_tensors="pt",
padding=True,
truncation=True
).to(self.device)
# GPU推理
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=150,
num_return_sequences=1
)
# 解码结果
results = self.tokenizer.batch_decode(
outputs,
skip_special_tokens=True
)
return results
自动设备映射,半精度优化,多GPU支持
支持Kubernetes和其他容器编排平台
# docker-compose.yml
version: '3.8'
services:
bentoml-service:
image: summarization:latest
ports:
- "3000:3000"
environment:
- BENTOML_SERVICE=summarization
- BENTOML_PORT=3000
deploy:
replicas: 3
resources:
limits:
memory: 2G
cpus: '1.0'
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
interval: 30s
timeout: 10s
retries: 3
# Kubernetes配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: bentoml-deployment
spec:
replicas: 3
selector:
matchLabels:
app: bentoml-service
template:
metadata:
labels:
app: bentoml-service
spec:
containers:
- name: bentoml
image: summarization:latest
ports:
- containerPort: 3000
Docker Compose和Kubernetes部署配置
内置完整的监控和可观测性支持
import bentoml
from bentoml.monitoring import monitor
@bentoml.service
class MonitoredService:
def __init__(self):
self.counter = 0
@bentoml.api(batchable=True)
@monitor("inference_duration", unit="seconds")
def predict(self, data: List[str]) -> List[str]:
"""带监控的推理API"""
# 记录推理次数
self.counter += len(data)
# 模拟处理时间
import time
time.sleep(0.1)
# 业务逻辑
results = [item.upper() for item in data]
return results
@bentoml.api
def get_stats(self) -> dict:
"""获取服务统计信息"""
return {
"total_requests": self.counter,
"timestamp": datetime.now().isoformat()
}
# 指标导出配置
bentoml.metrics.expose_to_prometheus(port=8000)
自动收集推理时长、吞吐量等关键指标
内置多层次安全防护机制
import bentoml
from bentoml.io import JSON
from pydantic import BaseModel
# API认证
@bentoml.service(
api_secret={
"api_key": "your-secret-key",
"require_auth": True
}
)
class SecureService:
@bentoml.api(input=JSON(), output=JSON())
def secure_api(self, data: dict) -> dict:
"""需要认证的API"""
return {"message": "success", "data": data}
# 输入验证
class InputModel(BaseModel):
text: str
max_length: int = 100
@bentoml.api(
input=bentoml.io.JSON.from_pydantic(InputModel),
output=bentoml.io.JSON()
)
def validate_input(self, data: InputModel) -> dict:
"""带输入验证的API"""
if len(data.text) > data.max_length:
raise ValueError("Input text too long")
return {"processed": data.text}
支持API密钥、JWT等多种认证方式
支持多种配置方式,适应不同环境需求
# config.yml
service:
name: "summarization-service"
port: 3000
workers: 4
model:
name: "facebook/bart-large-cnn"
device: "cuda"
batch_size: 32
api:
max_request_size: "10MB"
timeout: 30
rate_limit: "100/minute"
monitoring:
enabled: true
prometheus_port: 8000
logging:
level: "INFO"
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
YAML配置文件,支持环境变量覆盖
内置完善的错误处理机制
import bentoml
from bentoml.exceptions import BentoMLException
from bentoml.io import JSON
@bentoml.service
class ErrorHandlingService:
@bentoml.api(input=JSON(), output=JSON())
def process(self, data: dict) -> dict:
"""带错误处理的API"""
try:
# 业务逻辑
if not data.get("text"):
raise BentoMLException("Input text is required", code=400)
result = self._process_data(data["text"])
return {"success": True, "result": result}
except Exception as e:
# 自动捕获并记录错误
return {
"success": False,
"error": str(e),
"code": getattr(e, 'code', 500)
}
def _process_data(self, text: str) -> str:
"""数据处理逻辑"""
# 模拟可能出错的操作
if len(text) > 1000:
raise ValueError("Text too long")
return text.upper()
内置错误类型,自动HTTP状态码映射
支持WebSocket协议,适合实时应用场景
import bentoml
import asyncio
from typing import AsyncGenerator
@bentoml.service
class WebSocketService:
@bentoml.websocket("chat")
async def chat_stream(self, websocket) -> AsyncGenerator[str, None]:
"""WebSocket聊天接口"""
async for message in websocket:
# 流式处理
response = await self._process_message(message)
yield response
@bentoml.websocket("stream")
async def data_stream(self, websocket) -> AsyncGenerator[str, None]:
"""数据流推送"""
count = 0
while True:
# 推送实时数据
data = {"timestamp": time.time(), "count": count}
yield json.dumps(data)
count += 1
await asyncio.sleep(1)
async def _process_message(self, message: str) -> str:
"""处理消息并返回流式响应"""
words = message.split()
response = []
for i, word in enumerate(words):
# 逐步构建响应
response.append(f"Word {i+1}: {word}\n")
# 发送部分响应
yield "".join(response)
yield "\nProcessing complete!"
支持WebSocket协议,支持双向通信
支持与其他微服务无缝集成
import bentoml
from fastapi import FastAPI
# ASGI应用集成
fastapi_app = FastAPI()
@fastapi_app.get("/health")
def health_check():
return {"status": "healthy"}
@bentoml.service
class IntegratedService:
def __init__(self):
self.http_client = bentoml.SyncHTTPClient("http://other-service:8000")
@bentoml.api
def integrated_call(self, data: dict) -> dict:
"""调用其他微服务"""
# 调用外部服务
external_result = self.http_client.process(data)
# 本地处理
local_result = self._process(external_result)
return local_result
def _process(self, data: dict) -> dict:
"""本地处理逻辑"""
return {"processed": True, **data}
@bentoml.mount_asgi_app(fastapi_app, path="/api")
def mount_fastapi(self, app):
"""挂载FastAPI应用"""
return app
支持HTTP客户端和ASGI应用挂载
支持流式数据处理,适合实时分析场景
import bentoml
import asyncio
from typing import AsyncIterator, List
@bentoml.service
class StreamProcessingService:
def __init__(self):
self.processed_count = 0
@bentoml.api(
input=bentoml.io.JSON(),
output=bentoml.io.JSON(),
batchable=True
)
async def process_stream(self, data_stream: AsyncIterator[dict]) -> List[dict]:
"""流式数据处理API"""
results = []
async for batch in self._batch_stream(data_stream, batch_size=10):
# 批量处理
processed = await self._process_batch(batch)
results.extend(processed)
return results
async def _batch_stream(self, stream: AsyncIterator[dict], batch_size: int) -> AsyncIterator[List[dict]]:
"""批量化流式数据"""
batch = []
async for item in stream:
batch.append(item)
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
async def _process_batch(self, batch: List[dict]) -> List[dict]:
"""批量处理逻辑"""
processed = []
for item in batch:
# 处理每个数据项
processed_item = {
"id": item["id"],
"value": item["value"] * 2,
"timestamp": time.time(),
"processed": True
}
processed.append(processed_item)
self.processed_count += 1
return processed
支持流式输入和批量化处理
支持事件驱动架构,适合微服务架构
import bentoml
import asyncio
from typing import Callable, Dict, Any
@bentoml.service
class EventDrivenService:
def __init__(self):
self.event_handlers: Dict[str, Callable] = {}
self.event_queue = asyncio.Queue()
def register_handler(self, event_type: str):
"""注册事件处理器"""
def decorator(handler: Callable):
self.event_handlers[event_type] = handler
return handler
return decorator
@bentoml.api(input=bentoml.io.JSON(), output=bentoml.io.JSON())
async def emit_event(self, event_data: dict) -> dict:
"""触发事件"""
event_type = event_data.get("type")
if not event_type:
raise ValueError("Event type is required")
# 将事件加入队列
await self.event_queue.put(event_data)
# 触发处理器
if event_type in self.event_handlers:
await self.event_handlers[event_type](event_data)
return {"status": "event_emitted", "type": event_type}
async def process_events(self):
"""后台事件处理"""
while True:
event = await self.event_queue.get()
await self._handle_event(event)
async def _handle_event(self, event: dict):
"""处理单个事件"""
event_type = event["type"]
event_data = event["data"]
# 根据事件类型处理
if event_type == "user_action":
await self._handle_user_action(event_data)
elif event_type == "model_update":
await self._handle_model_update(event_data)
@register_handler("user_action")
async def _handle_user_action(self, data: dict):
"""用户动作事件处理"""
# 处理用户动作
print(f"Processing user action: {data}")
@register_handler("model_update")
async def _handle_model_update(self, data: dict):
"""模型更新事件处理"""
# 重新加载模型
await self._reload_model(data["model_id"])
支持事件驱动架构,可扩展的事件处理系统
支持异步任务队列,适合长时间运行的任务
import bentoml
import asyncio
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class Task:
id: str
type: str
data: Dict[str, Any]
priority: int = 0
retry_count: int = 0
max_retries: int = 3
@bentoml.service
class TaskQueueService:
def __init__(self):
self.tasks: asyncio.Queue[Task] = asyncio.PriorityQueue()
self.task_results: Dict[str, Any] = {}
self.running = True
@bentoml.api(input=bentoml.io.JSON(), output=bentoml.io.JSON())
async def submit_task(self, task_data: dict) -> dict:
"""提交任务"""
task = Task(
id=task_data["id"],
type=task_data["type"],
data=task_data["data"],
priority=task_data.get("priority", 0)
)
await self.tasks.put((-task.priority, task)) # 负值用于优先级排序
return {"status": "task_submitted", "task_id": task.id}
@bentoml.api(input=bentoml.JSON(), output=bentoml.JSON())
async def get_task_result(self, task_id: str) -> dict:
"""获取任务结果"""
if task_id in self.task_results:
return self.task_results[task_id]
return {"status": "pending"}
async def process_tasks(self):
"""后台任务处理"""
while self.running:
try:
priority, task = await self.tasks.get()
await self._execute_task(task)
except asyncio.CancelledError:
break
async def _execute_task(self, task: Task):
"""执行任务"""
try:
result = await self._run_task(task)
self.task_results[task.id] = {
"status": "completed",
"result": result,
"timestamp": time.time()
}
except Exception as e:
if task.retry_count < task.max_retries:
# 重试
task.retry_count += 1
await self.tasks.put((-task.priority, task))
else:
# 标记失败
self.task_results[task.id] = {
"status": "failed",
"error": str(e),
"timestamp": time.time()
}
async def _run_task(self, task: Task) -> Any:
"""执行具体任务逻辑"""
if task.type == "model_training":
return await self._train_model(task.data)
elif task.type == "data_processing":
return await self._process_data(task.data)
else:
raise ValueError(f"Unknown task type: {task.type}")
支持优先级队列、重试机制、任务状态跟踪
Service是BentoML的核心,管理API注册、Runner初始化和生命周期
# bentoml/_internal/service/service.py 核心逻辑
def service(
cls=None,
*,
name: str | None = None,
image: Image | None = None,
resources: dict | None = None,
traffic: dict | None = None,
...,
) -> Service:
"""将普通Python类转为BentoML Service"""
def decorator(cls):
# 1. 收集 @api 装饰的方法
apis = {}
for name, method in cls.__dict__.items():
if hasattr(method, '_bentoml_api'):
apis[name] = method._bentoml_api
# 2. 创建ServiceConfig
config = ServiceConfig(
name=name or cls.__name__,
image=image,
resources=resources,
traffic=traffic,
)
# 3. 注册到全局ServiceStore
svc = Service(cls, config, apis)
ServiceStore.add(svc)
return svc
return decorator if cls is None else decorator(cls)
装饰器收集API方法、创建配置、注册到全局Store
# bentoml/_internal/service/api.py
def api(
func=None,
*,
input=None,
output=None,
route: str | None = None,
batchable: bool = False,
max_batch_size: int = 100,
batch_wait: float = 0.1,
...) -> callable:
"""将方法标记为API端点"""
def decorator(func):
# 推断IO类型
input_spec = input or _infer_input(func)
output_spec = output or _infer_output(func)
# 构建API描述
api_desc = APIDescriptor(
name=func.__name__,
input_spec=input_spec,
output_spec=output_spec,
route=route or f"/{func.__name__}",
batchable=batchable,
max_batch_size=max_batch_size,
batch_wait=batch_wait,
)
func._bentoml_api = api_desc
return func
return decorator if func is None else decorator(func)
自动推断输入输出类型,支持批处理配置
IO描述符定义了API的输入输出类型,支持自动验证和序列化
# bentoml/_internal/io_descriptors.py
class IODescriptor(ABC):
"""所有IO描述符的基类"""
@abstractmethod
def from_http_request(self, request: Request) -> Any:
"""从HTTP请求解析输入"""
@abstractmethod
def to_http_response(self, obj: Any) -> Response:
"""将输出转为HTTP响应"""
@abstractmethod
async def from_proto(self, field) -> Any:
"""从gRPC消息解析"""
@abstractmethod
async def to_proto(self, obj: Any) -> Any:
"""将输出转为gRPC消息"""
class Text(IODescriptor):
def from_http_request(self, request):
return request.body() # 返回文本
def to_http_response(self, text: str):
return Response(text, media_type="text/plain")
统一的IO抽象,支持HTTP和gRPC双协议
Bento是自包含的部署单元,包含代码、模型和所有依赖
# bentofile.yaml - Bento构建配置
service: "service:Summarization" # 服务入口
include:
- "*.py"
- "config/*.yml"
exclude:
- "tests/"
- "*.md"
docker:
base_image: "python:3.11-slim"
python:
packages:
- torch>=2.0
- transformers>=4.30
- numpy
system:
packages:
- libgl1
- libglib2.0-0
models:
- "my_classifier:latest"
- "my_encoder:v1.0.0"
envs:
- name: MODEL_CACHE_DIR
value: "/tmp/models"
- name: LOG_LEVEL
value: "INFO"
声明式配置,定义服务入口、依赖和模型
Runner是模型推理的执行单元,独立于API服务运行
# bentoml/_internal/runner/runner.py
class Runner:
"""模型推理执行器"""
def __init__(self, svc, runner_config):
self.svc = svc
self.config = runner_config
self._workers = []
def init_local(self, device_id=None):
"""本地模式初始化"""
# 直接在当前进程创建实例
self._instance = self.svc()
return self
def init_server(self, runner_map=None):
"""服务器模式初始化"""
# 启动多个Worker进程
for i in range(self.config.workers):
worker = RunnerWorker(
svc=self.svc,
device_id=i, # GPU设备ID
runner_map=runner_map,
)
self._workers.append(worker)
async def async_run(self, method_name, *args, **kwargs):
"""异步执行推理"""
# 通过共享内存/NVLink调度到Worker
worker = self._schedule()
return await worker.run(method_name, *args, **kwargs)
Runner管理Worker进程,负责推理调度和资源分配
BentoCloud是BentoML的托管平台,零运维部署
BentoML为AI模型部署提供了一站式解决方案
感谢阅读!
访问 https://atcfu.com/ai-articles/bentoml/ 回顾本文