源码深度解读
2026-04-02 | MLflow Experiment Tracking System
第一部分:基础概念
第二部分:核心组件
第三部分:源码解析
第四部分:高级主题
MLflow 是一个开源的机器学习生命周期管理平台,由 Databricks 开发,旨在解决机器学习项目中实验管理、模型注册、项目打包和部署等核心问题。
核心功能
主要优势
┌─────────────────────────────────────────────────────────┐
│ MLflow │
├─────────────────────────────────────────────────────────┤
│ Tracking │ Registry │ Projects │ Models │ UI │
├─────────────────────────────────────────────────────────┤
│ MLflow Server (Backend) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │Tracking API │ │Registry API │ │Projects API │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────┤
│ Backend Store │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Database │ │Artifact DB │ │ Metadata │ │
│ │ (SQLite/ │ │ (File Store │ │ Service │ │
│ │ PostgreSQL)│ │ /S3) │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────┤
│ Client SDKs │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Python SDK │ │ R SDK │ │ Java SDK │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
MLflow 采用客户端-服务器架构,支持多种数据库和存储后端
MLflow 1.x (2018):核心实验追踪功能,本地文件存储
MLflow 2.x (2020-2021):模型注册表、项目支持、UI改进
MLflow 3.x (2023-2026):LLM追踪、分布式存储、企业级特性
关键突破:从简单的实验记录工具发展为完整的 MLOps 平台
核心功能:自动记录实验参数、指标、模型、日志等,支持实验比较和版本管理
| 组件 | 功能 | 技术实现 |
|---|---|---|
| 实验 (Experiment) | 逻辑分组,同类型实验 | UUID 标识,层级结构 |
| 运行 (Run) | 单次训练过程 | 生命周期管理,状态跟踪 |
| 参数 (Params) | 超参数配置 | 键值对存储 |
| 指标 (Metrics) | 性能评估指标 | 时间序列存储 |
┌────────────────────────────────────────────────────────────┐
│ Model Registry │
├────────────────────────────────────────────────────────────┤
│ ┌────────────────────────────────────────────────────┐ │
│ │ Model │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ Version │ │ │
│ │ │ ┌────────────────────────────────────┐ │ │ │
│ │ │ │ Stage │ │ │ │
│ │ │ │ ┌────────────────────────────┐ │ │ │ │
│ │ │ │ │ Model Artifact │ │ │ │ │
│ │ │ │ │ (Model + Code + Data) │ │ │ │ │
│ │ │ │ └────────────────────────────┘ │ │ │ │
│ │ │ └────────────────────────────────────┘ │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ Operations: Create, Register, Transition, Delete │
│ Stages: None, Staging, Production, Archived │
└────────────────────────────────────────────────────────────┘
模型注册表提供模型版本管理、生命周期控制和模型分阶段发布功能
MLflow Projects 允许将机器学习项目打包成可重现的格式,支持本地和远程执行
project.yaml 配置文件:
name: my-ml-project
entry_points:
main:
parameters:
learning_rate: {type: float, default: 0.01}
batch_size: {type: int, default: 32}
command: "python train.py --learning_rate={learning_rate} --batch_size={batch_size}"
执行方式:
mlflow run . -P learning_rate=0.001 -P batch_size=64
mlflow run uri -P learning_rate=0.001 -P batch_size=64
MLflow Models 提供模型打包、部署和推理服务功能
模型格式
部署选项
# Python API 示例
import mlflow
# 开始实验追踪
with mlflow.start_run(experiment_name="my_experiment") as run:
# 记录参数
mlflow.log_param("learning_rate", 0.01)
mlflow.log_param("batch_size", 32)
# 记录指标
for epoch in range(100):
mlflow.log_metric("loss", loss_value, step=epoch)
mlflow.log_metric("accuracy", accuracy_value, step=epoch)
# 记录模型
mlflow.sklearn.log_model(model, "model")
# 记录艺术工件
mlflow.log_artifact("logs.txt")
# REST API 示例
POST /api/2.0/mlflow/experiments/create
POST /api/2.0/mlflow/runs/create
POST /api/2.0/mlflow/runs/log-params
POST /api/2.0/mlflow/runs/log-metrics
POST /api/2.0/mlflow/register-model
MLflow 核心设计理念:简单、可扩展、生态系统友好,通过标准化的接口管理机器学习生命周期
追踪对象
生命周期
实验生命周期流程:
1. 实验创建 (Experiment Creation)
├── 创建实验
├── 设置实验名称和属性
└── 返回实验 ID
2. 运行启动 (Run Start)
├── mlflow.start_run()
├── 设置运行名称和属性
├── 生成运行 ID
└── 激活运行上下文
3. 数据记录 (Data Logging)
├── 参数记录: mlflow.log_param()
├── 指标记录: mlflow.log_metric()
├── 模型记录: mlflow.log_model()
├── 工件记录: mlflow.log_artifact()
└── 标签记录: mlflow.set_tag()
4. 运行结束 (Run End)
├── mlflow.end_run()
├── 运行状态更新
├── 数据持久化
└── 清理上下文
MLflow 运行管理:每个运行都有独立的生命周期,支持并行运行和运行状态管理
| 状态 | 描述 | 转换 |
|---|---|---|
| SCHEDULED | 已调度,等待执行 | → RUNNING |
| RUNNING | 正在执行中 | → FINISHED/FAILED/KILLED |
| FINISHED | 正常完成 | 不能转换 |
| FAILED | 执行失败 | 不能转换 |
| KILLED | 被终止 | 不能转换 |
MLflow 数据存储层次:
┌──────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │Tracking API │ │Registry API │ │Projects API │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├──────────────────────────────────────────────────────────┤
│ Business Logic Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │Experiment │ │Run │ │Model │ │
│ │Manager │ │Manager │ │Manager │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├──────────────────────────────────────────────────────────┤
│ Data Access Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │DAO Pattern │ │Repository │ │Factory │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├──────────────────────────────────────────────────────────┤
│ Storage Backend │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Database │ │Artifact │ │ File System │ │
│ │ (SQLite/ │ │ Store │ │ Storage │ │
│ │ PostgreSQL) │ │ (Local/S3) │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└──────────────────────────────────────────────────────────┘
存储架构:分离的元数据存储和工件存储,支持多种数据库和存储后端
后端存储组件:
1. 元数据存储
├── PostgreSQL 推荐生产环境
├── SQLite 适合开发环境
├── MySQL/MariaDB 支持
└── SQLAlchemy ORM 统一接口
2. 工件存储
├── 本地文件系统
├── Amazon S3
├── Azure Blob Storage
├── Google Cloud Storage
└── Hadoop HDFS
3. 配置示例
backend_store_uri: postgresql://localhost/mlflow
artifact_root: s3://mlflow-bucket/artifacts
MLflow 核心数据模型:
1. Experiment 表
experiment_id: UUID (主键)
name: VARCHAR(255)
artifact_location: TEXT
lifecycle_stage: ENUM('active', 'deleted')
creation_time: TIMESTAMP
last_update_time: TIMESTAMP
2. Run 表
run_uuid: UUID (主键)
experiment_id: UUID (外键)
run_name: VARCHAR(255)
run_status: ENUM('SCHEDULED', 'RUNNING', 'FINISHED', 'FAILED', 'KILLED')
lifecycle_stage: ENUM('active', 'deleted')
start_time: TIMESTAMP
end_time: TIMESTAMP
source_type: VARCHAR(50)
source_name: VARCHAR(255)
entry_point_name: VARCHAR(255)
3. Param 表
param_id: SERIAL (主键)
run_uuid: UUID (外键)
key: VARCHAR(255)
value: TEXT
timestamp: TIMESTAMP
# MLflow Tracking Server 架构
class MLflowTrackingServer:
def __init__(self, store, artifact_manager):
self.store = store # MLflowStore
self.artifact_manager = artifact_manager # LocalArtifactManager
self.experiment_manager = ExperimentManager(store)
self.run_manager = RunManager(store)
self.model_manager = ModelRegistryManager(store)
def start(self, host="0.0.0.0", port=5000):
app = Flask(__name__)
self.register_blueprints(app)
return app.run(host=host, port=port)
def register_blueprints(self, app):
# 实验管理 API
app.register_blueprint(experiments_bp)
# 运行管理 API
app.register_blueprint(runs_bp)
# 模型注册表 API
app.register_blueprint(model_registry_bp)
# 工件管理 API
app.register_blueprint(artifact_bp)
Tracking Server 提供统一的 RESTful API 接口,管理所有 MLflow 功能
API 设计原则:RESTful 风格,状态码语义化,JSON 格式数据交换
核心 API 端点:
实验管理:
GET /api/2.0/mlflow/experiments/list
POST /api/2.0/mlflow/experiments/create
GET /api/2.0/mlflow/experiments/get
DELETE /api/2.0/mlflow/experiments/delete
运行管理:
POST /api/2.0/mlflow/runs/create
GET /api/2.0/mlflow/runs/list
GET /api/2.0/mlflow/runs/get-info
POST /api/2.0/mlflow/runs/log-params
POST /api/2.0/mlflow/runs/log-metrics
POST /api/2.0/mlflow/runs/log-model
POST /api/2.0/mlflow/runs/set-tag
POST /api/2.0/mlflow/runs/delete
模型注册表:
POST /api/2.0/mlflow/register-model
GET /api/2.0/mlflow/model-registry/get
POST /api/2.0/mlflow/model-registry/create-version
POST /api/2.0/mlflow/model-registry/transition-stage
POST /api/2.0/mlflow/model-registry/delete-version
# MLflow 数据收集机制
class DataCollector:
def __init__(self, run_id, store):
self.run_id = run_id
self.store = store
self.metrics_buffer = []
self.params_buffer = []
def log_param(self, key, value):
"""记录参数"""
param = Param(
run_uuid=self.run_id,
key=key,
value=str(value),
timestamp=datetime.utcnow()
)
self.params_buffer.append(param)
def log_metric(self, key, value, step=None, timestamp=None):
"""记录指标"""
metric = Metric(
run_uuid=self.run_id,
key=key,
value=value,
step=step,
timestamp=timestamp or datetime.utcnow(),
is_nan=False,
is_inf=False
)
self.metrics_buffer.append(metric)
def _flush_buffer(self):
"""批量写入数据库"""
with self.store.session() as session:
session.add_all(self.params_buffer)
session.add_all(self.metrics_buffer)
session.commit()
self.params_buffer.clear()
self.metrics_buffer.clear()
# Experiment Manager 核心实现
class ExperimentManager:
def __init__(self, store):
self.store = store
def create_experiment(self, name, artifact_location=None, tags=None):
"""创建新实验"""
experiment = Experiment(
name=name,
artifact_location=artifact_location or self._default_artifact_location(),
lifecycle_stage='active',
creation_time=datetime.utcnow(),
last_update_time=datetime.utcnow()
)
# 处理标签
if tags:
for key, value in tags.items():
experiment.experiment_tags.append(
ExperimentTag(key=key, value=value)
)
with self.store.session() as session:
session.add(experiment)
session.commit()
return experiment
def get_experiment(self, experiment_id):
"""获取实验信息"""
return self.store.get_experiment(experiment_id)
def list_experiments(self, view_type='ALL'):
"""列出所有实验"""
return self.store.list_experiments(view_type)
def delete_experiment(self, experiment_id):
"""删除实验"""
return self.store.delete_experiment(experiment_id)
运行日志管理:支持不同类型的日志记录,包括训练日志、验证日志、系统日志等
# 运行日志记录机制
class RunLogger:
def __init__(self, run_uuid, store):
self.run_uuid = run_uuid
self.store = store
self.log_buffer = []
def log_metric(self, key, value, step=None, timestamp=None,
synchronous=False, prefix=None):
"""记录指标"""
# 验证数据
if not isinstance(value, (int, float)):
raise MLflowException(f"Invalid metric value: {value}")
# 构建键名
metric_key = f"{prefix}.{key}" if prefix else key
# 创建指标记录
metric = Metric(
run_uuid=self.run_uuid,
key=metric_key,
value=value,
step=step,
timestamp=timestamp or datetime.utcnow(),
is_nan=False,
is_inf=False
)
# 异步或同步记录
if synchronous:
self.store.log_metric(metric)
else:
self._buffer_metric(metric)
def _buffer_metric(self, metric):
"""缓冲指标数据"""
self.log_buffer.append(metric)
if len(self.log_buffer) >= self.MAX_BUFFER_SIZE:
self._flush_logs()
# 运行记录核心功能
class RunManager:
def __init__(self, store):
self.store = store
self.active_runs = {} # 运行 ID 到运行实例的映射
def create_run(self, experiment_id, run_name=None,
user_id=None, source_type=None, source_name=None):
"""创建新的运行"""
run_uuid = uuid.uuid4()
run = Run(
run_uuid=run_uuid,
experiment_id=experiment_id,
run_name=run_name or f"run_{int(time.time())}",
run_status='SCHEDULED',
lifecycle_stage='active',
start_time=datetime.utcnow(),
end_time=None,
source_type=source_type,
source_name=source_name,
user_id=user_id
)
with self.store.session() as session:
session.add(run)
session.commit()
# 加入活跃运行列表
self.active_runs[run_uuid] = run
return run
def end_run(self, run_uuid, status='FINISHED'):
"""结束运行"""
if run_uuid not in self.active_runs:
raise MLflowException(f"Run {run_uuid} not active")
run = self.active_runs[run_uuid]
run.run_status = status
run.end_time = datetime.utcnow()
with self.store.session() as session:
session.merge(run)
session.commit()
# 从活跃运行列表移除
del self.active_runs[run_uuid]
return run
# 模型注册表管理器
class ModelRegistryManager:
def __init__(self, store):
self.store = store
def create_model(self, name, description=None, tags=None):
"""创建新模型"""
model = Model(
name=name,
description=description,
lifecycle_stage='active',
creation_time=datetime.utcnow(),
last_update_time=datetime.utcnow()
)
# 处理标签
if tags:
for key, value in tags.items():
model.model_tags.append(ModelTag(key=key, value=value))
with self.store.session() as session:
session.add(model)
session.commit()
return model
def register_model_version(self, name, run_id, description=None,
tags=None, source=None, run_link=None):
"""注册模型版本"""
# 获取模型
model = self.store.get_model_by_name(name)
if not model:
raise MLflowException(f"Model {name} not found")
# 获取运行
run = self.store.get_run(run_id)
# 创建模型版本
model_version = ModelVersion(
name=name,
version=model.next_version(),
run_id=run_id,
current_stage='None',
description=description,
creation_time=datetime.utcnow(),
last_update_time=datetime.utcnow()
)
# 保存模型文件
artifact_path = self._save_model_artifacts(run, model_version)
model_version.source = artifact_path
with self.store.session() as session:
session.add(model_version)
session.commit()
return model_version
# Artifact 管理器
class ArtifactManager:
def __init__(self, artifact_root, artifact_uri):
self.artifact_root = Path(artifact_root)
self.artifact_uri = artifact_uri
def log_artifact(self, run_uuid, local_path, artifact_path=None):
"""记录工件"""
# 确定目标路径
if artifact_path is None:
filename = Path(local_path).name
artifact_path = f"{run_uuid}/{filename}"
# 创建目标目录
target_dir = self.artifact_root / run_uuid
target_dir.mkdir(parents=True, exist_ok=True)
# 复制文件
source = Path(local_path)
target = target_dir / Path(artifact_path).name
if source.is_file():
shutil.copy2(source, target)
else:
shutil.copytree(source, target)
return f"{self.artifact_uri}/{run_uuid}/{target.name}"
def list_artifacts(self, run_uuid, path=''):
"""列出工件"""
run_dir = self.artifact_root / run_uuid
if not run_dir.exists():
return []
artifacts = []
full_path = run_dir / path if path else run_dir
if full_path.is_file():
artifacts.append({
'path': path,
'is_dir': False,
'file_size': full_path.stat().st_size
})
else:
for item in full_path.iterdir():
rel_path = str(item.relative_to(run_dir))
artifacts.append({
'path': rel_path,
'is_dir': item.is_dir(),
'file_size': item.stat().st_size if item.is_file() else 0
})
return artifacts
MLflow Client:提供统一的客户端接口,支持多种编程语言和协议
# MLflow Client 核心功能
class MLflowClient:
def __init__(self, tracking_uri=None, registry_uri=None):
self.tracking_uri = tracking_uri or "http://localhost:5000"
self.registry_uri = registry_uri or tracking_uri
self.tracking_client = TrackingClient(tracking_uri)
self.registry_client = RegistryClient(registry_uri)
# 实验管理
def create_experiment(self, name, artifact_location=None):
"""创建实验"""
return self.tracking_client.create_experiment(name, artifact_location)
def get_experiment(self, experiment_id):
"""获取实验信息"""
return self.tracking_client.get_experiment(experiment_id)
def list_experiments(self):
"""列出所有实验"""
return self.tracking_client.list_experiments()
# 运行管理
def create_run(self, experiment_id, run_name=None):
"""创建运行"""
return self.tracking_client.create_run(experiment_id, run_name)
def log_param(self, run_id, key, value):
"""记录参数"""
return self.tracking_client.log_param(run_id, key, value)
def log_metric(self, run_id, key, value, step=None):
"""记录指标"""
return self.tracking_client.log_metric(run_id, key, value, step)
def log_model(self, run_id, model, artifact_path):
"""记录模型"""
return self.tracking_client.log_model(run_id, model, artifact_path)
MLflow Python SDK:提供高级 API 接口,支持自动记录、参数搜索、模型评估等功能
# MLflow Python API 使用示例
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# 设置追踪 URI
mlflow.set_tracking_uri("http://localhost:5000")
# 创建实验
experiment_name = "experiment_" + datetime.now().strftime("%Y%m%d")
mlflow.create_experiment(experiment_name)
# 开始运行
with mlflow.start_run(experiment_name=experiment_name) as run:
# 记录参数
n_estimators = 100
max_depth = 10
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
# 训练模型
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth
)
model.fit(X_train, y_train)
# 评估模型
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
mlflow.log_metric("accuracy", accuracy)
# 记录模型
mlflow.sklearn.log_model(model, "random_forest_model")
# 记录工件
mlflow.log_artifact("training_data.csv")
mlflow.log_artifact("model_summary.txt")
# MLflowClient 详细实现
class MLflowClient:
def __init__(self, tracking_uri=None, registry_uri=None):
self.tracking_uri = tracking_uri or DEFAULT_TRACKING_URI
self.registry_uri = registry_uri or DEFAULT_REGISTRY_URI
# 初始化 HTTP 客户端
self.tracking_client = ApiClient(self.tracking_uri)
self.registry_client = ApiClient(self.registry_uri)
# 初始化管理器
self.experiment_client = ExperimentClient(self.tracking_client)
self.run_client = RunClient(self.tracking_client)
self.registry_client_obj = ModelRegistryClient(self.registry_client)
self.artifact_client = ArtifactClient(self.tracking_client)
def search_runs(self, experiment_ids, filter_string="",
max_results=100, order_by="start_time DESC"):
"""搜索运行"""
request = SearchRunsRequest(
experiment_ids=experiment_ids,
filter=filter_string,
max_results=max_results,
order_by=order_by
)
response = self.tracking_client.search_runs(request)
return [Run.from_proto(run) for run in response.runs]
def set_experiment(self, experiment_name):
"""设置当前实验"""
experiments = self.search_experiments(f"name = '{experiment_name}'")
if experiments:
self.active_experiment = experiments[0]
else:
self.active_experiment = self.create_experiment(experiment_name)
# TrackingClient 核心实现
class TrackingClient:
def __init__(self, tracking_uri):
self.tracking_uri = tracking_uri
self.api_client = TrackingApi(tracking_uri)
self.store = MLflowStore(tracking_uri)
def start_run(self, experiment_id=None, run_name=None,
tags=None, run_uuid=None):
"""启动新运行"""
request = StartRunRequest(
experiment_id=experiment_id,
run_name=run_name,
run_uuid=run_uuid,
start_time=datetime.utcnow()
)
response = self.api_client.start_run(request)
return response.run
def log_param(self, run_uuid, key, value, timestamp=None):
"""记录参数"""
request = LogParamRequest(
run_uuid=run_uuid,
key=key,
value=str(value),
timestamp=timestamp or datetime.utcnow()
)
self.api_client.log_param(request)
def log_metric(self, run_uuid, key, value, step=None,
timestamp=None, synchronous=False):
"""记录指标"""
request = LogMetricRequest(
run_uuid=run_uuid,
key=key,
value=value,
step=step,
timestamp=timestamp or datetime.utcnow(),
synchronous=synchronous
)
self.api_client.log_metric(request)
def log_model(self, run_uuid, model, artifact_path,
conda_env=None, registered_model_name=None):
"""记录模型"""
# 保存模型到本地临时目录
temp_dir = tempfile.mkdtemp()
model_path = os.path.join(temp_dir, "model")
mlflow.sklearn.save_model(model, model_path)
# 上传模型工件
artifact_uri = self._upload_artifact(run_uuid, artifact_path, model_path)
# 创建模型信息
model_info = Model(
run_uuid=run_uuid,
key=artifact_path,
artifact_path=artifact_uri,
flavors=self._extract_model_flavors(model)
)
# 如果需要注册模型
if registered_model_name:
self._register_model(registered_model_name, model_info)
# ModelRegistryClient 实现
class ModelRegistryClient:
def __init__(self, registry_uri):
self.registry_uri = registry_uri
self.api_client = ModelRegistryApi(registry_uri)
self.store = MLflowStore(registry_uri)
def create_model(self, name, description=None, tags=None):
"""创建新模型"""
request = CreateModelRequest(
name=name,
description=description,
tags=tags or {}
)
response = self.api_client.create_model(request)
return response.model
def register_model_version(self, name, run_id, source=None,
run_link=None, tags=None, description=None):
"""注册模型版本"""
# 获取运行信息
run = self.store.get_run(run_id)
# 构建请求
request = RegisterModelVersionRequest(
name=name,
source=source or run.info.artifact_uri,
run_id=run_id,
run_link=run_link,
tags=tags or {},
description=description
)
response = self.api_client.register_model_version(request)
return response.model_version
def transition_model_version_stage(self, name, version, stage):
"""转换模型版本阶段"""
request = TransitionModelVersionStageRequest(
name=name,
version=version,
stage=stage
)
self.api_client.transition_model_version_stage(request)
def search_model_versions(self, filter_string="", max_results=100):
"""搜索模型版本"""
request = SearchModelVersionsRequest(
filter=filter_string,
max_results=max_results
)
response = self.api_client.search_model_versions(request)
return [ModelVersion.from_proto(mv) for mv in response.model_versions]
MLflow Projects:提供项目打包、参数管理和远程执行功能
# ProjectClient 实现
class ProjectClient:
def __init__(self, project_uri=None):
self.project_uri = project_uri or "."
self.config = ProjectConfig.load(self.project_uri)
def run(self, uri=None, params=None, version=None,
experiment_name=None, entry_point=None):
"""运行项目"""
# 解析项目 URI
project_uri = uri or self.project_uri
project = Project.load(project_uri)
# 合并参数
run_params = self._merge_params(params, project.entry_points)
# 创建临时目录
temp_dir = tempfile.mkdtemp()
try:
# 克隆项目
project.clone(temp_dir)
# 替换参数
project.replace_params(run_params, temp_dir)
# 构建环境
conda_env = self._build_conda_env(project, temp_dir)
# 执行项目
return self._execute_project(
project, temp_dir, run_params,
experiment_name, entry_point, conda_env
)
finally:
# 清理临时目录
shutil.rmtree(temp_dir)
def _execute_project(self, project, project_dir, params,
experiment_name, entry_point, conda_env):
"""执行项目"""
# 构建命令
cmd = self._build_command(project, params, entry_point)
# 设置环境变量
env = os.environ.copy()
if conda_env:
env.update(conda_env)
# 执行命令
process = subprocess.Popen(
cmd, cwd=project_dir, env=env,
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# 等待完成
stdout, stderr = process.communicate()
return ProjectRun(
status=process.returncode,
stdout=stdout.decode(),
stderr=stderr.decode(),
params=params
)
实验和运行管理:MLflow 的核心功能,提供完整的实验生命周期管理
# 实验和运行的高级功能
class ExperimentManager:
def __init__(self, store):
self.store = store
self.search_index = ExperimentSearchIndex()
def search_experiments(self, query, max_results=100):
"""搜索实验"""
return self.search_index.search(query, max_results)
def restore_experiment(self, experiment_id):
"""恢复已删除的实验"""
experiment = self.store.get_experiment(experiment_id)
if experiment.lifecycle_stage == 'deleted':
experiment.lifecycle_stage = 'active'
experiment.last_update_time = datetime.utcnow()
with self.store.session() as session:
session.merge(experiment)
session.commit()
return experiment
class RunManager:
def __init__(self, store):
self.store = store
self.run_context = RunContext()
def restore_run(self, run_uuid):
"""恢复已删除的运行"""
run = self.store.get_run(run_uuid)
if run.lifecycle_stage == 'deleted':
run.lifecycle_stage = 'active'
run.last_update_time = datetime.utcnow()
with self.store.session() as session:
session.merge(run)
session.commit()
return run
def get_run_history(self, experiment_id, max_results=100):
"""获取运行历史"""
runs = self.store.list_runs(experiment_id)
sorted_runs = sorted(
runs,
key=lambda r: (r.start_time or datetime.min),
reverse=True
)
return sorted_runs[:max_results]
# 实验创建和管理详解
class ExperimentManager:
def create_experiment(self, name, artifact_location=None,
tags=None, description=None):
"""创建实验的完整流程"""
# 1. 实验名称验证
if not name or len(name) > 255:
raise MLflowException("Experiment name must be 1-255 characters")
# 2. 检查重名
existing = self.store.get_experiment_by_name(name)
if existing:
raise MLflowException(f"Experiment '{name}' already exists")
# 3. 构建工件存储路径
if not artifact_location:
experiment_id = str(uuid.uuid4())
artifact_location = f"file:///tmp/mlflow/experiments/{experiment_id}"
# 4. 创建实验实体
experiment = Experiment(
name=name,
artifact_location=artifact_location,
lifecycle_stage='active',
creation_time=datetime.utcnow(),
last_update_time=datetime.utcnow(),
description=description
)
# 5. 处理标签
if tags:
for key, value in tags.items():
tag = ExperimentTag(
experiment_id=experiment.experiment_id,
key=key,
value=str(value)
)
experiment.experiment_tags.append(tag)
# 6. 保存实验
with self.store.session() as session:
session.add(experiment)
session.commit()
# 7. 更新搜索索引
self.search_index.add(experiment)
return experiment
def delete_experiment(self, experiment_id, delete_artifacts=False):
"""删除实验"""
experiment = self.store.get_experiment(experiment_id)
if not experiment:
raise MLflowException(f"Experiment {experiment_id} not found")
# 标记为已删除
experiment.lifecycle_stage = 'deleted'
experiment.last_update_time = datetime.utcnow()
with self.store.session() as session:
session.merge(experiment)
session.commit()
# 删除工件
if delete_artifacts:
self._delete_experiment_artifacts(experiment)
# 更新搜索索引
self.search_index.remove(experiment)
return experiment
# 运行创建和跟踪详解
class RunManager:
def start_run(self, experiment_id=None, run_name=None,
tags=None, run_uuid=None, experiment_name=None):
"""启动运行的完整流程"""
# 1. 确定实验
if experiment_name:
experiment = self.get_experiment_by_name(experiment_name)
if not experiment:
experiment = self.create_experiment(experiment_name)
else:
experiment = self.get_experiment(experiment_id)
if not experiment:
raise MLflowException("Experiment not found")
# 2. 生成或使用运行 UUID
if not run_uuid:
run_uuid = str(uuid.uuid4())
# 3. 创建运行实体
run = Run(
run_uuid=run_uuid,
experiment_id=experiment.experiment_id,
run_name=run_name or f"run_{int(time.time())}",
run_status='SCHEDULED',
lifecycle_stage='active',
start_time=datetime.utcnow(),
end_time=None,
source_type='NOTEBOOK',
source_name='default',
entry_point_name='main'
)
# 4. 处理标签
if tags:
for key, value in tags.items():
tag = RunTag(
run_uuid=run_uuid,
key=key,
value=str(value)
)
run.run_tags.append(tag)
# 5. 保存运行
with self.store.session() as session:
session.add(run)
session.commit()
# 6. 设置活动运行上下文
self.run_context.set_active_run(run)
return run
def end_run(self, run_uuid, status='FINISHED'):
"""结束运行"""
run = self.get_run(run_uuid)
if not run:
raise MLflowException(f"Run {run_uuid} not found")
# 更新运行状态
run.run_status = status
run.end_time = datetime.utcnow()
run.last_update_time = datetime.utcnow()
with self.store.session() as session:
session.merge(run)
session.commit()
# 清除活动运行上下文
self.run_context.clear_active_run()
return run
# 参数记录详解
class ParamManager:
def log_param(self, run_uuid, key, value, timestamp=None):
"""记录参数"""
# 1. 参数验证
if not key or len(key) > 255:
raise MLflowException("Parameter key must be 1-255 characters")
if len(str(value)) > 5000:
raise MLflowException("Parameter value must be <= 5000 characters")
# 2. 检查运行是否存在
run = self.get_run(run_uuid)
if not run:
raise MLflowException(f"Run {run_uuid} not found")
# 3. 创建参数记录
param = Param(
run_uuid=run_uuid,
key=key,
value=str(value),
timestamp=timestamp or datetime.utcnow()
)
# 4. 保存参数
with self.store.session() as session:
session.add(param)
session.commit()
# 5. 更新运行时间戳
run.last_update_time = datetime.utcnow()
with self.store.session() as session:
session.merge(run)
session.commit()
return param
def get_param(self, run_uuid, key):
"""获取参数值"""
param = self.store.get_param(run_uuid, key)
if not param:
raise MLflowException(f"Parameter '{key}' not found for run {run_uuid}")
return param.value
def get_all_params(self, run_uuid):
"""获取所有参数"""
return self.store.get_all_params(run_uuid)
def delete_param(self, run_uuid, key):
"""删除参数"""
with self.store.session() as session:
param = session.query(Param).filter_by(
run_uuid=run_uuid,
key=key
).first()
if param:
session.delete(param)
session.commit()
return True
return False
# 指标记录详解
class MetricManager:
def log_metric(self, run_uuid, key, value, step=None,
timestamp=None, synchronous=False):
"""记录指标"""
# 1. 指标验证
if not key or len(key) > 255:
raise MLflowException("Metric key must be 1-255 characters")
if not isinstance(value, (int, float)):
raise MLflowException(f"Metric value must be numeric, got {type(value)}")
# 2. 检查运行是否存在
run = self.get_run(run_uuid)
if not run:
raise MLflowException(f"Run {run_uuid} not found")
# 3. 处理特殊值
is_nan = math.isnan(value)
is_inf = math.isinf(value)
# 4. 创建指标记录
metric = Metric(
run_uuid=run_uuid,
key=key,
value=value,
step=step,
timestamp=timestamp or datetime.utcnow(),
is_nan=is_nan,
is_inf=is_inf
)
# 5. 异步或同步记录
if synchronous:
self._log_metric_sync(metric)
else:
self._log_metric_async(metric)
# 6. 更新运行时间戳
run.last_update_time = datetime.utcnow()
with self.store.session() as session:
session.merge(run)
session.commit()
return metric
def _log_metric_async(self, metric):
"""异步记录指标"""
# 使用线程池或异步任务队列
self.metric_queue.put(metric)
# 定期刷新队列
if self.metric_queue.qsize() >= self.BATCH_SIZE:
self._flush_metrics()
def _log_metric_sync(self, metric):
"""同步记录指标"""
with self.store.session() as session:
session.add(metric)
session.commit()
def get_metric_history(self, run_uuid, key):
"""获取指标历史"""
return self.store.get_metric_history(run_uuid, key)
def get_latest_metrics(self, run_uuid, run_uuids):
"""获取最新指标"""
return self.store.get_latest_metrics(run_uuid, run_uuids)
# 模型记录详解
class ModelManager:
def log_model(self, run_uuid, model, artifact_path,
conda_env=None, registered_model_name=None,
await_registration_for=300, pip_requirements=None):
"""记录模型"""
# 1. 创建临时目录
temp_dir = tempfile.mkdtemp()
model_path = os.path.join(temp_dir, "model")
try:
# 2. 保存模型
self._save_model(model, model_path, conda_env, pip_requirements)
# 3. 上传模型工件
artifact_uri = self._upload_artifact(
run_uuid, artifact_path, temp_dir
)
# 4. 创建模型信息
model_info = self._create_model_info(
run_uuid, artifact_path, artifact_uri, model
)
# 5. 保存模型信息
self._save_model_info(run_uuid, model_info)
# 6. 如果需要注册模型
if registered_model_name:
self._register_model_version(
registered_model_name, run_uuid, model_info
)
return model_info
finally:
# 7. 清理临时目录
shutil.rmtree(temp_dir)
def _save_model(self, model, model_path, conda_env, pip_requirements):
"""保存模型到本地"""
# 根据模型类型保存
if hasattr(model, 'predict'): # scikit-learn
mlflow.sklearn.save_model(
model, model_path,
conda_env=conda_env,
pip_requirements=pip_requirements
)
elif hasattr(model, 'save'): # TensorFlow/PyTorch
model.save(model_path)
else:
# 默认保存为 pickle
joblib.dump(model, os.path.join(model_path, "model.pkl"))
def _create_model_info(self, run_uuid, artifact_path, artifact_uri, model):
"""创建模型信息"""
# 提取模型特征
flavors = self._extract_model_flavors(model)
model_info = Model(
run_uuid=run_uuid,
key=artifact_path,
artifact_path=artifact_uri,
flavors=flavors,
timestamp=datetime.utcnow()
)
return model_info
# Artifact 管理详解
class ArtifactManager:
def log_artifact(self, run_uuid, local_path, artifact_path=None):
"""记录工件"""
# 1. 验证输入路径
if not os.path.exists(local_path):
raise MLflowException(f"Local path does not exist: {local_path}")
# 2. 确定目标路径
if artifact_path is None:
filename = os.path.basename(local_path)
artifact_path = filename
# 3. 创建目标目录
run_artifact_dir = self._get_run_artifact_dir(run_uuid)
target_dir = os.path.join(run_artifact_dir, os.path.dirname(artifact_path))
os.makedirs(target_dir, exist_ok=True)
# 4. 复制文件或目录
target_path = os.path.join(target_dir, os.path.basename(artifact_path))
if os.path.isfile(local_path):
shutil.copy2(local_path, target_path)
else:
shutil.copytree(local_path, target_path)
# 5. 创建工件记录
artifact = Artifact(
run_uuid=run_uuid,
path=artifact_path,
is_dir=os.path.isdir(local_path),
file_size=self._get_file_size(target_path),
creation_time=datetime.utcnow(),
last_update_time=datetime.utcnow()
)
# 6. 保存记录
with self.store.session() as session:
session.add(artifact)
session.commit()
return self._artifact_to_uri(artifact_path)
def list_artifacts(self, run_uuid, path=''):
"""列出工件"""
run_artifact_dir = self._get_run_artifact_dir(run_uuid)
full_path = os.path.join(run_artifact_dir, path)
if not os.path.exists(full_path):
return []
artifacts = []
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
rel_path = os.path.join(path, item)
artifacts.append({
'path': rel_path,
'is_dir': os.path.isdir(item_path),
'file_size': os.path.getsize(item_path) if os.path.isfile(item_path) else 0,
'creation_time': datetime.fromtimestamp(os.path.getctime(item_path)),
'last_update_time': datetime.fromtimestamp(os.path.getmtime(item_path))
})
return artifacts
def delete_artifact(self, run_uuid, path):
"""删除工件"""
run_artifact_dir = self._get_run_artifact_dir(run_uuid)
artifact_path = os.path.join(run_artifact_dir, path)
if os.path.exists(artifact_path):
if os.path.isfile(artifact_path):
os.remove(artifact_path)
else:
shutil.rmtree(artifact_path)
# 删除数据库记录
with self.store.session() as session:
session.query(Artifact).filter_by(
run_uuid=run_uuid,
path=path
).delete()
session.commit()
return True
return False
# 模型注册表详解
class ModelRegistryManager:
def __init__(self, store):
self.store = store
self.stage_transitions = {
'None': ['Staging', 'Archived'],
'Staging': ['Production', 'Archived'],
'Production': ['Staging', 'Archived'],
'Archived': []
}
def create_model(self, name, description=None, tags=None):
"""创建模型注册表"""
# 检查模型名称唯一性
existing = self.store.get_model_by_name(name)
if existing:
raise MLflowException(f"Model '{name}' already exists")
# 创建模型
model = Model(
name=name,
description=description,
lifecycle_stage='active',
creation_time=datetime.utcnow(),
last_update_time=datetime.utcnow()
)
# 处理标签
if tags:
for key, value in tags.items():
tag = ModelTag(
name=name,
key=key,
value=str(value)
)
model.model_tags.append(tag)
with self.store.session() as session:
session.add(model)
session.commit()
return model
def register_model_version(self, name, run_id, source=None,
run_link=None, tags=None, description=None):
"""注册模型版本"""
# 获取模型
model = self.store.get_model_by_name(name)
if not model:
model = self.create_model(name)
# 获取运行
run = self.store.get_run(run_id)
# 创建模型版本
version = model.next_version()
model_version = ModelVersion(
name=name,
version=version,
run_id=run_id,
current_stage='None',
source=source or run.info.artifact_uri,
description=description,
creation_time=datetime.utcnow(),
last_update_time=datetime.utcnow()
)
with self.store.session() as session:
session.add(model_version)
session.commit()
return model_version
模型注册流程:从训练完成到模型注册的完整流程,包括版本管理和状态转换
# 完整的模型注册流程
class ModelRegistrationFlow:
def register_model_from_run(self, run_id, model_name,
description=None, tags=None):
"""从运行中注册模型"""
# 1. 获取运行信息
run = self.get_run(run_id)
if not run:
raise MLflowException(f"Run {run_id} not found")
# 2. 检查运行中是否有模型
models = self.store.get_models_by_run(run_id)
if not models:
raise MLflowException(f"No models found in run {run_id}")
# 3. 获取第一个模型
model_artifact = models[0]
# 4. 创建或获取模型注册表
model = self.get_or_create_model(model_name, description, tags)
# 5. 创建模型版本
version = self.create_model_version(
model, run_id, model_artifact, run
)
# 6. 设置阶段
version.current_stage = 'Staging'
# 7. 更新数据库
with self.store.session() as session:
session.add(version)
session.commit()
# 8. 触发事件
self._on_model_registered(version)
return version
def transition_model_version_stage(self, name, version, stage,
comment=None):
"""转换模型版本阶段"""
# 1. 获取模型版本
model_version = self.store.get_model_version(name, version)
if not model_version:
raise MLflowException(f"Model version {name}:{version} not found")
# 2. 验证阶段转换
if stage not in self.stage_transitions[model_version.current_stage]:
raise MLflowException(
f"Cannot transition from {model_version.current_stage} to {stage}"
)
# 3. 记录阶段转换
transition = ModelVersionStageTransition(
name=name,
version=version,
from_stage=model_version.current_stage,
to_stage=stage,
comment=comment,
timestamp=datetime.utcnow()
)
# 4. 更新模型版本
model_version.current_stage = stage
model_version.last_update_time = datetime.utcnow()
with self.store.session() as session:
session.add(transition)
session.merge(model_version)
session.commit()
# 5. 触发事件
self._on_stage_transition(model_version, transition)
return model_version
# 模型版本管理详解
class ModelVersionManager:
def __init__(self, store):
self.store = store
self.version_cache = {} # 缓存版本信息
def create_model_version(self, model_name, run_id, source,
run_link=None, tags=None, description=None):
"""创建模型版本"""
# 1. 获取模型信息
model = self.store.get_model_by_name(model_name)
if not model:
raise MLflowException(f"Model {model_name} not found")
# 2. 获取运行信息
run = self.store.get_run(run_id)
# 3. 确定版本号
version = model.next_version()
# 4. 创建模型版本
model_version = ModelVersion(
name=model_name,
version=version,
run_id=run_id,
current_stage='None',
source=source,
run_link=run_link,
description=description,
creation_time=datetime.utcnow(),
last_update_time=datetime.utcnow()
)
# 5. 处理标签
if tags:
for key, value in tags.items():
tag = ModelVersionTag(
name=model_name,
version=version,
key=key,
value=str(value)
)
model_version.tags.append(tag)
# 6. 保存模型版本
with self.store.session() as session:
session.add(model_version)
session.commit()
# 7. 更新缓存
self.version_cache[f"{model_name}:{version}"] = model_version
return model_version
def get_model_version(self, name, version):
"""获取模型版本"""
cache_key = f"{name}:{version}"
# 检查缓存
if cache_key in self.version_cache:
return self.version_cache[cache_key]
# 从数据库获取
model_version = self.store.get_model_version(name, version)
if model_version:
self.version_cache[cache_key] = model_version
return model_version
def list_model_versions(self, name=None, max_results=None):
"""列出模型版本"""
return self.store.list_model_versions(name, max_results)
def delete_model_version(self, name, version):
"""删除模型版本"""
model_version = self.get_model_version(name, version)
if not model_version:
raise MLflowException(f"Model version {name}:{version} not found")
# 标记为已删除
model_version.lifecycle_stage = 'deleted'
model_version.last_update_time = datetime.utcnow()
with self.store.session() as session:
session.merge(model_version)
session.commit()
# 更新缓存
cache_key = f"{name}:{version}"
if cache_key in self.version_cache:
del self.version_cache[cache_key]
return model_version
模型阶段管理:管理模型在不同阶段的生命周期状态,支持生产环境管理
| 阶段 | 描述 | 可用转换 |
|---|---|---|
| None | 新创建的模型版本 | → Staging, Archived |
| Staging | 准备进入生产环境 | → Production, Archived |
| Production | 生产环境中使用 | → Staging, Archived |
| Archived | 已归档,不可用 | 无 |
# 阶段转换实现
class StageTransitionManager:
def __init__(self):
self.stage_transitions = {
'None': ['Staging', 'Archived'],
'Staging': ['Production', 'Archived'],
'Production': ['Staging', 'Archived'],
'Archived': []
}
def validate_transition(self, from_stage, to_stage):
"""验证阶段转换"""
if to_stage not in self.stage_transitions.get(from_stage, []):
raise MLflowException(
f"Invalid stage transition: {from_stage} → {to_stage}"
)
def transition_stage(self, name, version, to_stage, comment=None):
"""执行阶段转换"""
# 1. 获取模型版本
model_version = self.get_model_version(name, version)
# 2. 验证转换
self.validate_transition(model_version.current_stage, to_stage)
# 3. 记录转换历史
transition = ModelVersionStageTransition(
name=name,
version=version,
from_stage=model_version.current_stage,
to_stage=to_stage,
comment=comment,
timestamp=datetime.utcnow()
)
# 4. 更新阶段
model_version.current_stage = to_stage
model_version.last_update_time = datetime.utcnow()
# 5. 保存数据
with self.store.session() as session:
session.add(transition)
session.merge(model_version)
session.commit()
return model_version
# 模型管理 API
class ModelRegistryAPI:
def __init__(self, client):
self.client = client
def create_model(self, name, description=None, tags=None):
"""创建模型"""
return self.client.create_model(name, description, tags)
def get_model(self, name):
"""获取模型信息"""
return self.client.get_model(name)
def list_models(self, filter_string="", max_results=100):
"""列出模型"""
return self.client.list_models(filter_string, max_results)
def delete_model(self, name):
"""删除模型"""
return self.client.delete_model(name)
def register_model(self, name, source, run_id=None,
tags=None, description=None):
"""注册模型版本"""
return self.client.register_model_version(
name, source, run_id, tags, description
)
def get_model_version(self, name, version):
"""获取模型版本"""
return self.client.get_model_version(name, version)
def transition_model_version_stage(self, name, version, stage, comment=None):
"""转换模型版本阶段"""
return self.client.transition_model_version_stage(
name, version, stage, comment
)
def search_model_versions(self, filter_string="", max_results=100):
"""搜索模型版本"""
return self.client.search_model_versions(filter_string, max_results)
def get_model_version_download_uri(self, name, version):
"""获取模型下载 URI"""
return self.client.get_model_version_download_uri(name, version)
def load_model_version(self, name, version, dst_path=None):
"""加载模型版本"""
return self.client.load_model_version(name, version, dst_path)
模型服务系统:提供模型部署、推理服务和管理功能,支持多种模型格式和部署方式
# 模型服务实现
class ModelService:
def __init__(self, model_registry_client):
self.registry_client = model_registry_client
self.model_cache = {} # 模型缓存
self.model_servers = {} # 活跃模型服务
def serve_model(self, name, version=None, host="localhost", port=5000):
"""部署模型"""
# 1. 获取模型
if version:
model_version = self.registry_client.get_model_version(name, version)
else:
model_version = self.registry_client.get_latest_model_version(name)
if not model_version:
raise MLflowException(f"Model {name}:{version} not found")
# 2. 检查是否已部署
service_key = f"{name}:{version}"
if service_key in self.model_servers:
return self.model_servers[service_key]
# 3. 加载模型
model = self._load_model(model_version)
# 4. 创建服务端点
app = self._create_model_app(model, model_version)
# 5. 启动服务
server = threading.Thread(
target=app.run,
kwargs={"host": host, "port": port}
)
server.daemon = True
server.start()
# 6. 保存服务引用
self.model_servers[service_key] = {
"server": server,
"app": app,
"model": model,
"version": model_version
}
return self.model_servers[service_key]
def _create_model_app(self, model, model_version):
"""创建模型服务应用"""
app = Flask(__name__)
@app.route("/predict", methods=["POST"])
def predict():
data = request.json
prediction = model.predict(data)
return jsonify({"prediction": prediction})
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "healthy"})
return app
模型打包机制:将模型、代码、环境信息打包成标准格式,支持跨平台部署
# 模型打包实现
class ModelPacker:
def __init__(self, model_dir):
self.model_dir = Path(model_dir)
self.packaged_dir = None
def package_model(self, model_flavors, artifacts_path=None):
"""打包模型"""
# 1. 创建打包目录
self.packaged_dir = self.model_dir / "packaged_model"
self.packaged_dir.mkdir(parents=True, exist_ok=True)
# 2. 创建 MLmodel 文件
self._create_mlmodel_file(model_flavors)
# 3. 复制工件
if artifacts_path:
self._copy_artifacts(artifacts_path)
# 4. 创建 conda 环境
self._create_conda_env()
# 5. 创建 requirements.txt
self._create_requirements_file()
# 6. 创建签名文件
self._create_signature()
return self.packaged_dir
def _create_mlmodel_file(self, model_flavors):
"""创建 MLmodel 文件"""
mlmodel_path = self.packaged_dir / "MLmodel"
model_spec = {
"artifact_path": ".",
"flavors": model_flavors,
"mlflow_version": mlflow.__version__,
"signature": self._generate_signature(),
"loader_module": "mlflow.sklearn",
"python_version": sys.version
}
with open(mlmodel_path, 'w') as f:
yaml.dump(model_spec, f, default_flow_style=False)
def _copy_artifacts(self, artifacts_path):
"""复制工件文件"""
if os.path.exists(artifacts_path):
for item in os.listdir(artifacts_path):
src = os.path.join(artifacts_path, item)
dst = self.packaged_dir / item
if os.path.isfile(src):
shutil.copy2(src, dst)
else:
shutil.copytree(src, dst)
模型部署策略:支持多种部署方式,包括本地服务、容器化部署、云平台部署等
# 模型部署实现
class ModelDeploymentManager:
def __init__(self, registry_client):
self.registry_client = registry_client
self.deployment_configs = {}
def deploy_local(self, name, version=None, host="localhost", port=5000):
"""本地部署"""
# 1. 获取模型
model_version = self._get_model_version(name, version)
# 2. 创建部署配置
config = {
"type": "local",
"host": host,
"port": port,
"model": model_version
}
# 3. 创建服务
service = self._create_local_service(model_version, config)
# 4. 启动服务
service.start()
# 5. 保存配置
self.deployment_configs[f"{name}:{version}"] = config
return service
def deploy_docker(self, name, version=None, image_name=None,
port=5000, gpu=False):
"""Docker 部署"""
# 1. 获取模型
model_version = self._get_model_version(name, version)
# 2. 构建镜像
if not image_name:
image_name = f"mlflow-model-{name}-{version}"
dockerfile = self._generate_dockerfile(model_version)
# 3. 创建容器
container = self._create_docker_container(
model_version, image_name, port, gpu
)
# 4. 启动容器
container.start()
# 5. 保存配置
config = {
"type": "docker",
"image": image_name,
"container": container,
"model": model_version
}
self.deployment_configs[f"{name}:{version}"] = config
return container
def deploy_cloud(self, name, version=None, provider="aws",
region="us-west-2", instance_type="ml.m5.large"):
"""云平台部署"""
# 1. 获取模型
model_version = self._get_model_version(name, version)
# 2. 根据云平台部署
if provider == "aws":
return self._deploy_aws(model_version, region, instance_type)
elif provider == "gcp":
return self._deploy_gcp(model_version, region)
elif provider == "azure":
return self._deploy_azure(model_version, region)
else:
raise MLflowException(f"Unsupported cloud provider: {provider}")
模型推理系统:提供高效的模型推理服务,支持批量推理、异步处理和负载均衡
# 模型推理服务实现
class ModelInferenceService:
def __init__(self, model_registry_client):
self.registry_client = model_registry_client
self.active_models = {} # 活跃模型缓存
self.request_queue = Queue()
self.worker_threads = []
def load_model(self, name, version=None):
"""加载模型到内存"""
model_key = f"{name}:{version}" if version else f"{name}:latest"
# 检查是否已加载
if model_key in self.active_models:
return self.active_models[model_key]
# 获取模型版本
if version:
model_version = self.registry_client.get_model_version(name, version)
else:
model_version = self.registry_client.get_latest_model_version(name)
if not model_version:
raise MLflowException(f"Model {name} not found")
# 加载模型
model = self._load_model_from_uri(model_version.source)
# 缓存模型
self.active_models[model_key] = {
"model": model,
"version": model_version,
"loaded_time": time.time()
}
return self.active_models[model_key]
def predict(self, name, input_data, version=None, batch=False):
"""模型推理"""
# 加载模型
model_info = self.load_model(name, version)
model = model_info["model"]
if batch:
# 批量推理
predictions = []
for data in input_data:
pred = self._run_inference(model, data)
predictions.append(pred)
return predictions
else:
# 单次推理
return self._run_inference(model, input_data)
def _run_inference(self, model, input_data):
"""执行推理"""
try:
# 根据模型类型处理输入
if hasattr(model, 'predict'):
return model.predict(input_data)
elif hasattr(model, 'call'):
return model.call(input_data)
else:
return model([input_data])
except Exception as e:
raise MLflowException(f"Inference failed: {str(e)}")
def batch_predict(self, name, input_data, version=None,
max_batch_size=32, timeout=60):
"""批量预测"""
# 分批处理
batches = self._split_into_batches(input_data, max_batch_size)
results = []
for batch in batches:
future = self.submit_prediction(name, batch, version)
try:
result = future.result(timeout=timeout)
results.extend(result)
except TimeoutError:
raise MLflowException(f"Batch prediction timed out")
return results
MLflow Projects:提供机器学习项目的打包、参数管理和可重现执行功能
# 项目管理核心实现
class ProjectManager:
def __init__(self, project_uri=None):
self.project_uri = project_uri or "."
self.project_spec = self._load_project_spec()
def run(self, params=None, version=None, experiment_name=None,
entry_point=None, timeout=None):
"""运行项目"""
# 1. 验证参数
params = self._validate_params(params)
# 2. 克隆项目(如果是 URI)
if self._is_remote_uri(self.project_uri):
local_path = self._clone_project(self.project_uri, version)
else:
local_path = self.project_uri
# 3. 构建命令
cmd = self._build_command(local_path, params, entry_point)
# 4. 设置环境
env = self._setup_environment(params, experiment_name)
# 5. 执行命令
result = self._execute_command(cmd, env, local_path, timeout)
# 6. 处理结果
return self._process_project_result(result, experiment_name)
def _validate_params(self, params):
"""验证参数"""
if not params:
params = {}
# 根据项目规范验证参数
if self.project_spec.entry_points:
for ep_name, ep_spec in self.project_spec.entry_points.items():
if ep_name == params.get('entry_point'):
self._validate_entry_point_params(params, ep_spec)
return params
def _build_command(self, project_path, params, entry_point):
"""构建执行命令"""
cmd = []
# 添加 Python 解释器
cmd.append(sys.executable)
# 添加入口点
if entry_point:
cmd.append(os.path.join(project_path, entry_point))
else:
cmd.append(os.path.join(project_path, self.project_spec.default_entry_point))
# 添加参数
for key, value in params.items():
if key != 'entry_point':
cmd.extend([f"--{key}", str(value)])
return cmd
def _execute_command(self, cmd, env, cwd, timeout):
"""执行命令"""
try:
process = subprocess.Popen(
cmd, cwd=cwd, env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
if timeout:
stdout, stderr = process.communicate(timeout=timeout)
else:
stdout, stderr = process.communicate()
return ProjectRunResult(
exit_code=process.returncode,
stdout=stdout.decode(),
stderr=stderr.decode(),
cmd=cmd
)
except subprocess.TimeoutExpired:
process.kill()
raise MLflowException(f"Project execution timed out")
project.yaml 规范:定义 MLflow 项目的结构和执行方式,支持参数化和配置管理
# project.yaml 配置规范示例
name: "my-ml-project" # 项目名称
entry_points: # 入口点定义
main:
parameters: # 参数定义
learning_rate:
type: float # 参数类型
default: 0.01 # 默认值
description: "Learning rate for training"
batch_size:
type: int
default: 32
description: "Batch size for training"
epochs:
type: int
default: 100
optimizer:
type: enum
default: "adam"
enum: ["adam", "sgd", "rmsprop"]
data_path:
type: str
description: "Path to training data"
command: "python train.py --learning_rate={learning_rate} --batch_size={batch_size} --epochs={epochs} --optimizer={optimizer} --data_path={data_path}"
evaluate:
parameters:
model_path:
type: str
description: "Path to model file"
test_data_path:
type: str
description: "Path to test data"
command: "python evaluate.py --model_path={model_path} --test_data_path={test_data_path}"
# 项目管理器解析配置
class ProjectSpec:
def __init__(self, project_dir):
self.project_dir = Path(project_dir)
self.spec_file = self.project_dir / "project.yaml"
self._load_spec()
def _load_spec(self):
"""加载项目规范"""
if not self.spec_file.exists():
raise MLflowException("project.yaml not found")
with open(self.spec_file, 'r') as f:
self.spec = yaml.safe_load(f)
self.name = self.spec.get('name', 'unnamed_project')
self.entry_points = {}
self.default_entry_point = self.spec.get('default_entry_point')
# 解析入口点
for ep_name, ep_spec in self.spec.get('entry_points', {}).items():
self.entry_points[ep_name] = EntryPointSpec(ep_name, ep_spec)
# 项目运行实现
class ProjectRunner:
def __init__(self, project_uri):
self.project_uri = project_uri
self.artifact_manager = ArtifactManager()
self.model_registry = ModelRegistryManager()
def run_with_mlflow(self, params=None, version=None,
experiment_name=None, entry_point=None):
"""使用 MLflow 运行项目"""
# 1. 设置实验
if experiment_name:
mlflow.set_experiment(experiment_name)
# 2. 启动运行
with mlflow.start_run() as run:
# 3. 记录参数
if params:
for key, value in params.items():
mlflow.log_param(key, value)
# 4. 执行项目
result = self._run_project(params, version, entry_point)
# 5. 记录结果
self._log_results(result, run.info.run_uuid)
# 6. 记录模型(如果产生)
self._log_models(result, run.info.run_uuid)
# 7. 记录工件
self._log_artifacts(result, run.info.run_uuid)
return result
def _run_project(self, params, version, entry_point):
"""执行项目"""
# 1. 获取项目规范
project_spec = ProjectSpec(self.project_uri)
# 2. 克隆项目
local_path = self._clone_project(version)
# 3. 构建命令
cmd = self._build_command(project_spec, params, entry_point)
# 4. 执行命令
env = os.environ.copy()
env.update(self._setup_environment(params))
process = subprocess.Popen(
cmd, cwd=local_path, env=env,
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# 5. 等待完成
stdout, stderr = process.communicate()
# 6. 解析结果
result = ProjectRunResult(
exit_code=process.returncode,
stdout=stdout.decode(),
stderr=stderr.decode(),
cmd=cmd
)
return result
def _log_results(self, result, run_uuid):
"""记录运行结果"""
# 解析日志中的指标
metrics = self._parse_metrics_from_logs(result.stdout)
for key, value in metrics.items():
mlflow.log_metric(key, value)
# 解析日志中的损失
losses = self._parse_losses_from_logs(result.stdout)
for epoch, loss in losses.items():
mlflow.log_metric("loss", loss, step=epoch)
MLflow 架构模式:采用多种设计模式实现高内聚、低耦合的模块化架构
观察者模式
工厂模式
策略模式
代理模式
MLflow 数据流架构:
┌──────────────────────────────────────────────────────────┐
│ Client Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Python SDK │ │ R SDK │ │ Java SDK │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├──────────────────────────────────────────────────────────┤
│ API Gateway │
│ ┌─────────────────────────────────────────────────┐ │
│ │ RESTful API │ │
│ │ /api/2.0/mlflow/ │ │
│ │ POST /runs/create │ │
│ │ POST /runs/log-metrics │ │
│ │ POST /register-model │ │
│ └─────────────────────────────────────────────────┘ │
├──────────────────────────────────────────────────────────┤
│ Business Logic │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │Experiment │ │ Run │ │ Model │ │
│ │Manager │ │Manager │ │Registry │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├──────────────────────────────────────────────────────────┤
│ Data Access │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │Repository │ │ DAO │ │ Unit of │ │
│ │Pattern │ │Pattern │ │Work │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├──────────────────────────────────────────────────────────┤
│ Storage Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Database │ │ Artifact │ │ File System │ │
│ │ (PostgreSQL│ │ Store │ │ Storage │ │
│ │ / SQLite) │ │ (S3/Local) │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└──────────────────────────────────────────────────────────┘
实验管理
参数管理
模型管理
性能优化
相关技术
高级主题
核心功能
架构特点
技术优势
应用场景
MLflow 作为开源机器学习生命周期管理平台,通过标准化的接口和丰富的功能,为机器学习实验、模型管理和项目部署提供了完整的解决方案。