🍱 BentoML

AI模型部署框架源码解析

源码级别解析 · 生产级模型 serving 系统 · 源码级深度分析
2026-05-25 | 每日技术深度解读

什么是BentoML

统一的AI模型 serving 解决方案
  • 任何模型的 REST API 服务
  • Docker 容器化部署
  • 高性能推理优化
  • 生产就绪的云原生架构

BentoML 是一个用于构建AI模型推理服务的Python框架

核心特性

企业级模型 serving 能力
  • 🍱 易于构建 Any AI/ML 模型 API
  • 🐳 Docker 容器简化管理
  • 🧭 最大化 CPU/GPU 利用率
  • 👩💻 完全可定制化
  • 🚀 生产就绪

支持任何机器学习框架、模态和推理运行时

BentoML生态系统

┌─────────────────────────────────────────────────────────────┐ │ BentoML 生态系统 │ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 开发者 │ │ BentoML │ │ 部署 │ │ │ │ (定义API) │ │ (核心框架) │ │ (生产) │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 模型 │ │ 服务 │ │ BentoCloud│ │ │ │ 训练 │ │ 构建 │ │ 托管 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 运行时环境 │ │ │ (Docker/Kubernetes/Serverless) │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘

从模型训练到生产部署的完整工作流

快速开始

3步部署模型服务
  • 1. 定义 service.py 文件
  • 2. 配置依赖和环境
  • 3. 运行 bentoml serve

将任何模型转换为生产就绪的推理API

基本服务定义

import bentoml

@bentoml.service(
    image=bentoml.images.Image(python_version="3.11").python_packages("torch", "transformers"),
)
class Summarization:
    def __init__(self) -> None:
        import torch
        from transformers import pipeline
        
        device = "cuda" if torch.cuda.is_available() else "cpu"
        self.pipeline = pipeline('summarization', device=device)

    @bentoml.api(batchable=True)
    def summarize(self, texts: list[str]) -> list[str]:
        results = self.pipeline(texts)
        return [item['summary_text'] for item in results]

使用 @bentoml.service 装饰器定义服务,@bentoml.api 定义API端点

核心概念

BentoML核心组件
  • 🍱 Bento: 标准化部署单元
  • 🏃 Runner: 模型推理执行器
  • 🔌 API: HTTP/gRPC 端点
  • 📦 Model: 模型存储管理
  • 🐳 Container: 容器化运行时

Bento是BentoML的核心部署单元,包含模型、代码、配置

BentoML组件架构

┌─────────────────────────────────────────────────────────────┐ │ BentoML 组件架构 │ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────────────────────────────────────────────────┐ │ │ │ BentoML │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ │ Service │ │ Bento │ │ │ │ │ (API定义) │ │ (部署包) │ │ │ │ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ │ Runner 1 │ │ Model 1 │ │ │ │ │ Runner 2 │ │ Model 2 │ │ │ │ └─────────────┘ └─────────────┘ │ │ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ HTTP/gRPC Server │ │ │ │ └─────────────────────────────────────────────┘ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 本地开发 │ │ Docker容器 │ │ BentoCloud│ │ │ │ (调试) │ │ (部署) │ │ (云服务) │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────────┘

Service定义API,Bento是完整部署包,Runner执行推理

服务生命周期

BentoML服务管理
  • 开发 → 构建 → 测试 → 部署 → 监控 → 扩容

完整的CI/CD流水线支持

服务运行

# 本地运行
bentoml serve

# 构建Bento包
bentoml build

# 生成Docker镜像
bentoml containerize summarization:latest

# 运行Docker容器
docker run --rm -p 3000:3000 summarization:latest

# 部署到BentoCloud
bentoml deploy

从开发到部署的完整命令行操作

API定义

灵活的API接口设计
  • 支持同步/异步函数
  • 自动类型推断
  • 批量处理支持
  • 自定义路由
  • WebSocket支持

使用标准Python类型注解自动生成API文档

API类型支持

from bentoml.io import Text, JSON, JSONDescriptor
from typing import List, Dict

# 文本输入输出
@bentoml.api(input=Text(), output=Text())
def text_process(text: str) -> str:
    return text.upper()

# JSON输入输出
@bentoml.api(input=JSON(), output=JSON())
def json_process(data: dict) -> dict:
    return {"processed": True, **data}

# 批量处理
@bentoml.api(input=Text(), output=Text(), batchable=True)
def batch_process(texts: List[str]) -> List[str]:
    return [t.upper() for t in texts]

# 自定义IO描述
@bentoml.api(
    input=JSONDescriptor.from_pydantic(MyInputModel),
    output=JSONDescriptor.from_pydantic(MyOutputModel)
)
def custom_api(input_data: MyInputModel) -> MyOutputModel:
    return MyOutputModel(**output_data)

支持多种输入输出类型,包括自定义数据模型

模型管理

模型存储和版本控制
  • 📦 自动模型打包
  • 🏷️ 标签化管理
  • 🔄 版本控制
  • 🔄 模型复制
  • 💾 高效存储

BentoML提供统一的模型存储和管理机制

模型操作

import bentoml

# 保存模型
bentoml.sklearn.save_model("my_classifier", model)

# 加载模型
model = bentoml.sklearn.load_model("my_classifier:latest")

# 查看模型信息
model = bentoml.models.get("my_classifier:latest")
print(model.tag)
print(model.path)
print(model.options)

# 模型标签
tag = bentoml.Tag("my_classifier", version="v1.0.0")

# 模型复制
bentoml.models.copy("my_classifier:v1", "my_classifier:v2")

完整的CRUD操作支持,包括版本管理和元数据

高性能优化

推理性能优化技术
  • ⚡ 动态批处理
  • 🔀 模型并行
  • 🎯 多阶段流水线
  • 📊 多模型推理图编排
  • 🎮 GPU加速

内置多种优化技术,最大化硬件利用率

动态批处理

@bentoml.api(batchable=True, max_batch_size=32, batch_timeout=0.1)
def predict_batch(self, texts: List[str]) -> List[float]:
    """支持动态批处理的推理API"""
    results = []
    for text in texts:
        # 模型推理
        result = self.model.predict(text)
        results.append(result)
    return results

# 或者更简单的异步处理
@bentoml.api(batchable=True, max_batch_size=64)
async def async_predict(self, texts: List[str]) -> List[float]:
    """异步批处理推理"""
    batch = await self.model.batch_predict(texts)
    return batch.tolist()

自动收集请求并批量处理,显著提高吞吐量

GPU支持

GPU推理加速
  • 🔥 CUDA设备检测
  • 🔄 多GPU并行
  • 💾 GPU内存优化
  • ⚡ 专用推理引擎
  • 🌊 流式推理

原生支持GPU加速,自动检测和优化

GPU推理配置

import bentoml

@bentoml.service(
    image=bentoml.images.Image(
        python_version="3.11"
    ).python_packages("torch", "transformers"),
)
class GPUInference:
    def __init__(self) -> None:
        import torch
        from transformers import AutoModelForCausalLM, AutoTokenizer
        
        # 自动检测CUDA
        device = "cuda" if torch.cuda.is_available() else "cpu"
        self.device = device
        
        # 加载模型到GPU
        self.model = AutoModelForCausalLM.from_pretrained(
            "meta-llama/Llama-2-7b-chat-hf",
            device_map="auto",  # 自动设备映射
            torch_dtype=torch.float16  # 半精度减少内存
        )
        self.tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-chat-hf")

    @bentoml.api(batchable=True)
    def generate(self, prompts: List[str]) -> List[str]:
        """GPU加速的文本生成"""
        inputs = self.tokenizer(
            prompts, 
            return_tensors="pt", 
            padding=True, 
            truncation=True
        ).to(self.device)
        
        # GPU推理
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_length=150,
                num_return_sequences=1
            )
        
        # 解码结果
        results = self.tokenizer.batch_decode(
            outputs, 
            skip_special_tokens=True
        )
        return results

自动设备映射,半精度优化,多GPU支持

分布式部署

多节点服务编排
  • 🌐 多实例部署
  • ⚖️ 负载均衡
  • 🔄 故障转移
  • 📊 自动扩容
  • 🔌 服务发现

支持Kubernetes和其他容器编排平台

分布式服务配置

# docker-compose.yml
version: '3.8'
services:
  bentoml-service:
    image: summarization:latest
    ports:
      - "3000:3000"
    environment:
      - BENTOML_SERVICE=summarization
      - BENTOML_PORT=3000
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 2G
          cpus: '1.0'
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

# Kubernetes配置
apiVersion: apps/v1
kind: Deployment
metadata:
  name: bentoml-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: bentoml-service
  template:
    metadata:
      labels:
        app: bentoml-service
    spec:
      containers:
      - name: bentoml
        image: summarization:latest
        ports:
        - containerPort: 3000

Docker Compose和Kubernetes部署配置

监控和观测性

生产环境监控
  • 📊 Prometheus指标
  • 📝 日志聚合
  • 🔍 分布式追踪
  • 🚨 告警系统
  • 📈 实时监控面板

内置完整的监控和可观测性支持

监控配置

import bentoml
from bentoml.monitoring import monitor

@bentoml.service
class MonitoredService:
    def __init__(self):
        self.counter = 0
    
    @bentoml.api(batchable=True)
    @monitor("inference_duration", unit="seconds")
    def predict(self, data: List[str]) -> List[str]:
        """带监控的推理API"""
        # 记录推理次数
        self.counter += len(data)
        
        # 模拟处理时间
        import time
        time.sleep(0.1)
        
        # 业务逻辑
        results = [item.upper() for item in data]
        return results
    
    @bentoml.api
def get_stats(self) -> dict:
        """获取服务统计信息"""
        return {
            "total_requests": self.counter,
            "timestamp": datetime.now().isoformat()
        }

# 指标导出配置
bentoml.metrics.expose_to_prometheus(port=8000)

自动收集推理时长、吞吐量等关键指标

安全性

企业级安全防护
  • 🔐 API认证和授权
  • 🛡️ 输入验证
  • 🔒 网络安全
  • 📋 审计日志
  • 🔑 密钥管理

内置多层次安全防护机制

安全配置

import bentoml
from bentoml.io import JSON
from pydantic import BaseModel

# API认证
@bentoml.service(
    api_secret={
        "api_key": "your-secret-key",
        "require_auth": True
    }
)
class SecureService:
    @bentoml.api(input=JSON(), output=JSON())
    def secure_api(self, data: dict) -> dict:
        """需要认证的API"""
        return {"message": "success", "data": data}

# 输入验证
class InputModel(BaseModel):
    text: str
    max_length: int = 100

@bentoml.api(
    input=bentoml.io.JSON.from_pydantic(InputModel),
    output=bentoml.io.JSON()
)
def validate_input(self, data: InputModel) -> dict:
    """带输入验证的API"""
    if len(data.text) > data.max_length:
        raise ValueError("Input text too long")
    return {"processed": data.text}

支持API密钥、JWT等多种认证方式

配置管理

灵活的配置系统
  • 🔧 环境变量配置
  • 📁 配置文件
  • 🎯 动态配置
  • 🔄 热重载
  • 🏷️ 环境隔离

支持多种配置方式,适应不同环境需求

配置示例

# config.yml
service:
  name: "summarization-service"
  port: 3000
  workers: 4

model:
  name: "facebook/bart-large-cnn"
  device: "cuda"
  batch_size: 32

api:
  max_request_size: "10MB"
  timeout: 30
  rate_limit: "100/minute"

monitoring:
  enabled: true
  prometheus_port: 8000

logging:
  level: "INFO"
  format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

YAML配置文件,支持环境变量覆盖

错误处理

优雅的错误管理
  • 🚨 自动错误捕获
  • 📝 详细的错误信息
  • 🔄 重试机制
  • ⏱️ 超时控制
  • 🎯 自定义错误响应

内置完善的错误处理机制

错误处理示例

import bentoml
from bentoml.exceptions import BentoMLException
from bentoml.io import JSON

@bentoml.service
class ErrorHandlingService:
    @bentoml.api(input=JSON(), output=JSON())
    def process(self, data: dict) -> dict:
        """带错误处理的API"""
        try:
            # 业务逻辑
            if not data.get("text"):
                raise BentoMLException("Input text is required", code=400)
            
            result = self._process_data(data["text"])
            return {"success": True, "result": result}
            
        except Exception as e:
            # 自动捕获并记录错误
            return {
                "success": False,
                "error": str(e),
                "code": getattr(e, 'code', 500)
            }
    
    def _process_data(self, text: str) -> str:
        """数据处理逻辑"""
        # 模拟可能出错的操作
        if len(text) > 1000:
            raise ValueError("Text too long")
        return text.upper()

内置错误类型,自动HTTP状态码映射

WebSockets支持

实时双向通信
  • 🔄 实时数据流
  • 📡 双向通信
  • ⚡ 流式响应
  • 🎮 实时应用
  • 📊 数据推送

支持WebSocket协议,适合实时应用场景

WebSocket示例

import bentoml
import asyncio
from typing import AsyncGenerator

@bentoml.service
class WebSocketService:
    @bentoml.websocket("chat")
    async def chat_stream(self, websocket) -> AsyncGenerator[str, None]:
        """WebSocket聊天接口"""
        async for message in websocket:
            # 流式处理
            response = await self._process_message(message)
            yield response
    
    @bentoml.websocket("stream")
    async def data_stream(self, websocket) -> AsyncGenerator[str, None]:
        """数据流推送"""
        count = 0
        while True:
            # 推送实时数据
            data = {"timestamp": time.time(), "count": count}
            yield json.dumps(data)
            count += 1
            await asyncio.sleep(1)

    async def _process_message(self, message: str) -> str:
        """处理消息并返回流式响应"""
        words = message.split()
        response = []
        for i, word in enumerate(words):
            # 逐步构建响应
            response.append(f"Word {i+1}: {word}\n")
            # 发送部分响应
            yield "".join(response)
        yield "\nProcessing complete!"

支持WebSocket协议,支持双向通信

微服务集成

服务间通信
  • 🔌 ASGI应用集成
  • 🌐 HTTP客户端
  • 🔄 服务发现
  • 📊 负载均衡
  • 🔗 服务编排

支持与其他微服务无缝集成

微服务集成

import bentoml
from fastapi import FastAPI

# ASGI应用集成
fastapi_app = FastAPI()

@fastapi_app.get("/health")
def health_check():
    return {"status": "healthy"}

@bentoml.service
class IntegratedService:
    def __init__(self):
        self.http_client = bentoml.SyncHTTPClient("http://other-service:8000")
    
    @bentoml.api
def integrated_call(self, data: dict) -> dict:
        """调用其他微服务"""
        # 调用外部服务
        external_result = self.http_client.process(data)
        
        # 本地处理
        local_result = self._process(external_result)
        
        return local_result
    
    def _process(self, data: dict) -> dict:
        """本地处理逻辑"""
        return {"processed": True, **data}
    
    @bentoml.mount_asgi_app(fastapi_app, path="/api")
    def mount_fastapi(self, app):
        """挂载FastAPI应用"""
        return app

支持HTTP客户端和ASGI应用挂载

数据流处理

流式数据处理
  • 🔄 流式输入
  • 📊 实时处理
  • 🎯 批量处理
  • 📈 状态管理
  • ⏱️ 时序数据

支持流式数据处理,适合实时分析场景

流式数据处理

import bentoml
import asyncio
from typing import AsyncIterator, List

@bentoml.service
class StreamProcessingService:
    def __init__(self):
        self.processed_count = 0
    
    @bentoml.api(
        input=bentoml.io.JSON(),
        output=bentoml.io.JSON(),
        batchable=True
    )
    async def process_stream(self, data_stream: AsyncIterator[dict]) -> List[dict]:
        """流式数据处理API"""
        results = []
        async for batch in self._batch_stream(data_stream, batch_size=10):
            # 批量处理
            processed = await self._process_batch(batch)
            results.extend(processed)
        
        return results
    
    async def _batch_stream(self, stream: AsyncIterator[dict], batch_size: int) -> AsyncIterator[List[dict]]:
        """批量化流式数据"""
        batch = []
        async for item in stream:
            batch.append(item)
            if len(batch) >= batch_size:
                yield batch
                batch = []
        
        if batch:
            yield batch
    
    async def _process_batch(self, batch: List[dict]) -> List[dict]:
        """批量处理逻辑"""
        processed = []
        for item in batch:
            # 处理每个数据项
            processed_item = {
                "id": item["id"],
                "value": item["value"] * 2,
                "timestamp": time.time(),
                "processed": True
            }
            processed.append(processed_item)
            self.processed_count += 1
        
        return processed

支持流式输入和批量化处理

事件驱动架构

事件驱动处理
  • 🔔 事件监听
  • 📨 消息队列
  • ⚡ 异步处理
  • 🔄 事件转发
  • 📊 事件聚合

支持事件驱动架构,适合微服务架构

事件处理

import bentoml
import asyncio
from typing import Callable, Dict, Any

@bentoml.service
class EventDrivenService:
    def __init__(self):
        self.event_handlers: Dict[str, Callable] = {}
        self.event_queue = asyncio.Queue()
    
    def register_handler(self, event_type: str):
        """注册事件处理器"""
        def decorator(handler: Callable):
            self.event_handlers[event_type] = handler
            return handler
        return decorator
    
    @bentoml.api(input=bentoml.io.JSON(), output=bentoml.io.JSON())
    async def emit_event(self, event_data: dict) -> dict:
        """触发事件"""
        event_type = event_data.get("type")
        if not event_type:
            raise ValueError("Event type is required")
        
        # 将事件加入队列
        await self.event_queue.put(event_data)
        
        # 触发处理器
        if event_type in self.event_handlers:
            await self.event_handlers[event_type](event_data)
        
        return {"status": "event_emitted", "type": event_type}
    
    async def process_events(self):
        """后台事件处理"""
        while True:
            event = await self.event_queue.get()
            await self._handle_event(event)
    
    async def _handle_event(self, event: dict):
        """处理单个事件"""
        event_type = event["type"]
        event_data = event["data"]
        
        # 根据事件类型处理
        if event_type == "user_action":
            await self._handle_user_action(event_data)
        elif event_type == "model_update":
            await self._handle_model_update(event_data)
    
    @register_handler("user_action")
    async def _handle_user_action(self, data: dict):
        """用户动作事件处理"""
        # 处理用户动作
        print(f"Processing user action: {data}")
    
    @register_handler("model_update")
    async def _handle_model_update(self, data: dict):
        """模型更新事件处理"""
        # 重新加载模型
        await self._reload_model(data["model_id"])

支持事件驱动架构,可扩展的事件处理系统

任务队列

后台任务处理
  • 📋 任务调度
  • ⏰ 延迟执行
  • 🔄 重试机制
  • 📊 任务监控
  • 🎯 任务优先级

支持异步任务队列,适合长时间运行的任务

任务队列实现

import bentoml
import asyncio
from dataclasses import dataclass
from typing import Dict, Any

@dataclass
class Task:
    id: str
    type: str
    data: Dict[str, Any]
    priority: int = 0
    retry_count: int = 0
    max_retries: int = 3

@bentoml.service
class TaskQueueService:
    def __init__(self):
        self.tasks: asyncio.Queue[Task] = asyncio.PriorityQueue()
        self.task_results: Dict[str, Any] = {}
        self.running = True
    
    @bentoml.api(input=bentoml.io.JSON(), output=bentoml.io.JSON())
    async def submit_task(self, task_data: dict) -> dict:
        """提交任务"""
        task = Task(
            id=task_data["id"],
            type=task_data["type"],
            data=task_data["data"],
            priority=task_data.get("priority", 0)
        )
        
        await self.tasks.put((-task.priority, task))  # 负值用于优先级排序
        
        return {"status": "task_submitted", "task_id": task.id}
    
    @bentoml.api(input=bentoml.JSON(), output=bentoml.JSON())
    async def get_task_result(self, task_id: str) -> dict:
        """获取任务结果"""
        if task_id in self.task_results:
            return self.task_results[task_id]
        return {"status": "pending"}
    
    async def process_tasks(self):
        """后台任务处理"""
        while self.running:
            try:
                priority, task = await self.tasks.get()
                await self._execute_task(task)
            except asyncio.CancelledError:
                break
    
    async def _execute_task(self, task: Task):
        """执行任务"""
        try:
            result = await self._run_task(task)
            self.task_results[task.id] = {
                "status": "completed",
                "result": result,
                "timestamp": time.time()
            }
        except Exception as e:
            if task.retry_count < task.max_retries:
                # 重试
                task.retry_count += 1
                await self.tasks.put((-task.priority, task))
            else:
                # 标记失败
                self.task_results[task.id] = {
                    "status": "failed",
                    "error": str(e),
                    "timestamp": time.time()
                }
    
    async def _run_task(self, task: Task) -> Any:
        """执行具体任务逻辑"""
        if task.type == "model_training":
            return await self._train_model(task.data)
        elif task.type == "data_processing":
            return await self._process_data(task.data)
        else:
            raise ValueError(f"Unknown task type: {task.type}")

支持优先级队列、重试机制、任务状态跟踪

Service类源码结构

Service (bentoml/_internal/service/service.py) ├── __init__(self, name, ...) ├── _setup_runner() # 初始化Runner ├── _setup_api() # 注册API路由 ├── api() # @api装饰器 │ ├── input_spec # IO描述符 │ ├── output_spec │ ├── route # HTTP路由 │ └── batchable # 批处理标志 ├── on_startup() # 生命周期钩子 ├── on_shutdown() ├── on_asgi_app_startup() └── asgi_app # ASGI应用实例 关键继承链: object -> _Service -> Service └── on_startup/shutdown 钩子

Service是BentoML的核心,管理API注册、Runner初始化和生命周期

@bentoml.service 装饰器源码

# bentoml/_internal/service/service.py 核心逻辑

def service(
    cls=None,
    *,
    name: str | None = None,
    image: Image | None = None,
    resources: dict | None = None,
    traffic: dict | None = None,
    ...,
) -> Service:
    """将普通Python类转为BentoML Service"""
    def decorator(cls):
        # 1. 收集 @api 装饰的方法
        apis = {}
        for name, method in cls.__dict__.items():
            if hasattr(method, '_bentoml_api'):
                apis[name] = method._bentoml_api
        
        # 2. 创建ServiceConfig
        config = ServiceConfig(
            name=name or cls.__name__,
            image=image,
            resources=resources,
            traffic=traffic,
        )
        
        # 3. 注册到全局ServiceStore
        svc = Service(cls, config, apis)
        ServiceStore.add(svc)
        
        return svc
    
    return decorator if cls is None else decorator(cls)

装饰器收集API方法、创建配置、注册到全局Store

@bentoml.api 装饰器源码

# bentoml/_internal/service/api.py

def api(
    func=None,
    *,
    input=None,
    output=None,
    route: str | None = None,
    batchable: bool = False,
    max_batch_size: int = 100,
    batch_wait: float = 0.1,
    ...) -> callable:
    """将方法标记为API端点"""
    def decorator(func):
        # 推断IO类型
        input_spec = input or _infer_input(func)
        output_spec = output or _infer_output(func)
        
        # 构建API描述
        api_desc = APIDescriptor(
            name=func.__name__,
            input_spec=input_spec,
            output_spec=output_spec,
            route=route or f"/{func.__name__}",
            batchable=batchable,
            max_batch_size=max_batch_size,
            batch_wait=batch_wait,
        )
        
        func._bentoml_api = api_desc
        return func
    
    return decorator if func is None else decorator(func)

自动推断输入输出类型,支持批处理配置

IO描述符体系

类型安全的输入输出
  • Text: 纯文本数据
  • JSON: JSON对象
  • NumpyNdarray: NumPy数组
  • PandasDataFrame: DataFrame
  • Image: 图像数据
  • File: 文件上传
  • Multipart: 多部分表单

IO描述符定义了API的输入输出类型,支持自动验证和序列化

IO描述符源码解析

# bentoml/_internal/io_descriptors.py

class IODescriptor(ABC):
    """所有IO描述符的基类"""
    
    @abstractmethod
    def from_http_request(self, request: Request) -> Any:
        """从HTTP请求解析输入"""
    
    @abstractmethod
    def to_http_response(self, obj: Any) -> Response:
        """将输出转为HTTP响应"""
    
    @abstractmethod
    async def from_proto(self, field) -> Any:
        """从gRPC消息解析"""
    
    @abstractmethod
    async def to_proto(self, obj: Any) -> Any:
        """将输出转为gRPC消息"""

class Text(IODescriptor):
    def from_http_request(self, request):
        return request.body()  # 返回文本
    
    def to_http_response(self, text: str):
        return Response(text, media_type="text/plain")

统一的IO抽象,支持HTTP和gRPC双协议

Bento打包流程

bentoml build 流程: service.py │ ▼ 读取 @service 配置 │ ▼ 解析 Docker Image │ ├── base image (Python版本) ├── pip packages ├── conda packages ├── system packages └── copy models │ ▼ 生成 Dockerfile │ ▼ 生成 bentofile.yaml │ ▼ 打包为 Bento └── ~/.bentoml/bentos/<name>/<version>/ ├── bento.yaml ├── Dockerfile ├── requirements.txt ├── src/ └── models/

Bento是自包含的部署单元,包含代码、模型和所有依赖

Bento配置文件

# bentofile.yaml - Bento构建配置
service: "service:Summarization"  # 服务入口
include:
  - "*.py"
  - "config/*.yml"
exclude:
  - "tests/"
  - "*.md"

docker:
  base_image: "python:3.11-slim"
  python:
    packages:
      - torch>=2.0
      - transformers>=4.30
      - numpy
  system:
    packages:
      - libgl1
      - libglib2.0-0

models:
  - "my_classifier:latest"
  - "my_encoder:v1.0.0"

envs:
  - name: MODEL_CACHE_DIR
    value: "/tmp/models"
  - name: LOG_LEVEL
    value: "INFO"

声明式配置,定义服务入口、依赖和模型

Runner系统

模型推理执行引擎
  • 独立进程运行模型
  • 支持多Worker并行
  • 共享内存通信
  • 自动资源管理
  • 与Service解耦

Runner是模型推理的执行单元,独立于API服务运行

Runner源码解析

# bentoml/_internal/runner/runner.py

class Runner:
    """模型推理执行器"""
    
    def __init__(self, svc, runner_config):
        self.svc = svc
        self.config = runner_config
        self._workers = []
    
    def init_local(self, device_id=None):
        """本地模式初始化"""
        # 直接在当前进程创建实例
        self._instance = self.svc()
        return self
    
    def init_server(self, runner_map=None):
        """服务器模式初始化"""
        # 启动多个Worker进程
        for i in range(self.config.workers):
            worker = RunnerWorker(
                svc=self.svc,
                device_id=i,  # GPU设备ID
                runner_map=runner_map,
            )
            self._workers.append(worker)
    
    async def async_run(self, method_name, *args, **kwargs):
        """异步执行推理"""
        # 通过共享内存/NVLink调度到Worker
        worker = self._schedule()
        return await worker.run(method_name, *args, **kwargs)

Runner管理Worker进程,负责推理调度和资源分配

BentoCloud部署

一键云端部署
  • ☁️ bentoml deploy 一键部署
  • 📊 自动扩缩容
  • 🔒 内置安全认证
  • 💰 按用量计费
  • 🔍 A/B测试支持

BentoCloud是BentoML的托管平台,零运维部署

总结

BentoML核心优势
  • 🍱 统一的AI模型 serving 解决方案
  • ⚡ 高性能推理引擎
  • 🐳 完整的容器化部署
  • 🌐 云原生架构支持
  • 🔧 灵活的可扩展设计
  • 📊 完善的监控和观测性
  • 🚀 生产就绪的企业级框架

BentoML为AI模型部署提供了一站式解决方案

参考资料

  • BentoML GitHub: https://github.com/bentoml/BentoML
  • 官方文档: https://docs.bentoml.com
  • 示例项目: https://github.com/bentoml/BentoML/tree/main/examples
  • 社区论坛: https://forum.modular.com/c/bento/31
  • BentoCloud: https://www.bentoml.com

感谢阅读!
访问 https://atcfu.com/ai-articles/bentoml/ 回顾本文