🧠 MLflow 实验追踪系统

开源机器学习生命周期管理平台源码深度解析

源码深度解读
2026-04-02 | MLflow Experiment Tracking System

📑 目录

第一部分:基础概念

  • MLflow 简介
  • 核心架构设计
  • 架构演进历史
  • 实验追踪系统

第二部分:核心组件

  • 实验追踪系统
  • 模型注册表
  • 项目管理器
  • 模型服务
  • 追踪 API

第三部分:源码解析

  • 核心概念解析
  • 后端存储系统
  • 客户端 API
  • 实验与运行管理
  • 模型注册表实现

第四部分:高级主题

  • 设计模式分析
  • 数据流架构
  • 最佳实践
  • 扩展阅读

🧠 MLflow 简介

MLflow 是一个开源的机器学习生命周期管理平台,由 Databricks 开发,旨在解决机器学习项目中实验管理、模型注册、项目打包和部署等核心问题。

核心功能

  • 实验追踪 (Tracking)
  • 模型注册表 (Registry)
  • 项目 (Projects)
  • 模型服务 (Models)

主要优势

  • 开源且易于集成
  • 多语言支持 (Python/R/Java)
  • 丰富的生态系统
  • 企业级稳定性

🏗️ 核心架构

┌─────────────────────────────────────────────────────────┐
│                        MLflow                           │
├─────────────────────────────────────────────────────────┤
│  Tracking  │  Registry  │  Projects  │  Models  │  UI  │
├─────────────────────────────────────────────────────────┤
│            MLflow Server (Backend)                        │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │Tracking API │  │Registry API │  │Projects API │      │
│  └─────────────┘  └─────────────┘  └─────────────┘      │
├─────────────────────────────────────────────────────────┤
│                  Backend Store                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │  Database   │  │Artifact DB │  │  Metadata   │      │
│  │  (SQLite/   │  │ (File Store │  │   Service   │      │
│  │  PostgreSQL)│  │  /S3)       │  │             │      │
│  └─────────────┘  └─────────────┘  └─────────────┘      │
├─────────────────────────────────────────────────────────┤
│                  Client SDKs                            │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │  Python SDK │  │  R SDK     │  │  Java SDK   │      │
│  └─────────────┘  └─────────────┘  └─────────────┘      │
└─────────────────────────────────────────────────────────┘

MLflow 采用客户端-服务器架构,支持多种数据库和存储后端

📜 架构演进

MLflow 1.x (2018):核心实验追踪功能,本地文件存储

MLflow 2.x (2020-2021):模型注册表、项目支持、UI改进

MLflow 3.x (2023-2026):LLM追踪、分布式存储、企业级特性

关键突破:从简单的实验记录工具发展为完整的 MLOps 平台

📊 实验追踪系统

核心功能:自动记录实验参数、指标、模型、日志等,支持实验比较和版本管理

组件 功能 技术实现
实验 (Experiment) 逻辑分组,同类型实验 UUID 标识,层级结构
运行 (Run) 单次训练过程 生命周期管理,状态跟踪
参数 (Params) 超参数配置 键值对存储
指标 (Metrics) 性能评估指标 时间序列存储

📦 模型注册表

┌────────────────────────────────────────────────────────────┐
│                    Model Registry                         │
├────────────────────────────────────────────────────────────┤
│  ┌────────────────────────────────────────────────────┐  │
│  │                Model                              │  │
│  │  ┌────────────────────────────────────────────┐   │  │
│  │  │               Version                      │   │  │
│  │  │  ┌────────────────────────────────────┐    │   │  │
│  │  │  │              Stage                  │    │   │  │
│  │  │  │  ┌────────────────────────────┐     │    │   │  │
│  │  │  │  │       Model Artifact      │     │    │   │  │
│  │  │  │  │  (Model + Code + Data)   │     │    │   │  │
│  │  │  │  └────────────────────────────┘     │    │   │  │
│  │  │  └────────────────────────────────────┘    │   │  │
│  │  └────────────────────────────────────────────┘   │  │
│  └────────────────────────────────────────────────────┘  │
│                                                         │
│  Operations: Create, Register, Transition, Delete     │
│  Stages: None, Staging, Production, Archived           │
└────────────────────────────────────────────────────────────┘

模型注册表提供模型版本管理、生命周期控制和模型分阶段发布功能

📁 项目管理器

MLflow Projects 允许将机器学习项目打包成可重现的格式,支持本地和远程执行

project.yaml 配置文件:
name: my-ml-project
entry_points:
  main:
    parameters:
      learning_rate: {type: float, default: 0.01}
      batch_size: {type: int, default: 32}
    command: "python train.py --learning_rate={learning_rate} --batch_size={batch_size}"

执行方式:
mlflow run . -P learning_rate=0.001 -P batch_size=64
mlflow run uri -P learning_rate=0.001 -P batch_size=64

🚀 模型服务

MLflow Models 提供模型打包、部署和推理服务功能

模型格式

  • Python 官方格式
  • TensorFlow SavedModel
  • PyTorch TorchScript
  • 自定义格式

部署选项

  • 本地模型服务 (CLI)
  • REST API 服务
  • Docker 容器化
  • 云平台集成

🌐 追踪 API

# Python API 示例
import mlflow

# 开始实验追踪
with mlflow.start_run(experiment_name="my_experiment") as run:
    # 记录参数
    mlflow.log_param("learning_rate", 0.01)
    mlflow.log_param("batch_size", 32)
    
    # 记录指标
    for epoch in range(100):
        mlflow.log_metric("loss", loss_value, step=epoch)
        mlflow.log_metric("accuracy", accuracy_value, step=epoch)
    
    # 记录模型
    mlflow.sklearn.log_model(model, "model")
    
    # 记录艺术工件
    mlflow.log_artifact("logs.txt")

# REST API 示例
POST /api/2.0/mlflow/experiments/create
POST /api/2.0/mlflow/runs/create
POST /api/2.0/mlflow/runs/log-params
POST /api/2.0/mlflow/runs/log-metrics
POST /api/2.0/mlflow/register-model

🎯 核心概念解析

MLflow 核心设计理念:简单、可扩展、生态系统友好,通过标准化的接口管理机器学习生命周期

追踪对象

  • 实验 (Experiment):实验的逻辑容器
  • 运行 (Run):单次训练过程
  • 参数 (Params):输入配置
  • 指标 (Metrics):输出结果
  • 模型 (Models):训练产物

生命周期

  • 实验创建:创建或选择实验
  • 运行启动:开始记录
  • 数据记录:记录各种数据
  • 模型注册:保存和注册
  • 运行结束:清理和总结

🔄 实验生命周期

实验生命周期流程:

1. 实验创建 (Experiment Creation)
   ├── 创建实验
   ├── 设置实验名称和属性
   └── 返回实验 ID

2. 运行启动 (Run Start)
   ├── mlflow.start_run()
   ├── 设置运行名称和属性
   ├── 生成运行 ID
   └── 激活运行上下文

3. 数据记录 (Data Logging)
   ├── 参数记录: mlflow.log_param()
   ├── 指标记录: mlflow.log_metric()
   ├── 模型记录: mlflow.log_model()
   ├── 工件记录: mlflow.log_artifact()
   └── 标签记录: mlflow.set_tag()

4. 运行结束 (Run End)
   ├── mlflow.end_run()
   ├── 运行状态更新
   ├── 数据持久化
   └── 清理上下文

⚙️ 运行机制

MLflow 运行管理:每个运行都有独立的生命周期,支持并行运行和运行状态管理

状态 描述 转换
SCHEDULED 已调度,等待执行 → RUNNING
RUNNING 正在执行中 → FINISHED/FAILED/KILLED
FINISHED 正常完成 不能转换
FAILED 执行失败 不能转换
KILLED 被终止 不能转换

💾 数据存储架构

MLflow 数据存储层次:

┌──────────────────────────────────────────────────────────┐
│                  Application Layer                       │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │Tracking API │  │Registry API │  │Projects API │      │
│  └─────────────┘  └─────────────┘  └─────────────┘      │
├──────────────────────────────────────────────────────────┤
│                  Business Logic Layer                    │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │Experiment   │  │Run          │  │Model        │      │
│  │Manager      │  │Manager      │  │Manager      │      │
│  └─────────────┘  └─────────────┘  └─────────────┘      │
├──────────────────────────────────────────────────────────┤
│                  Data Access Layer                       │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │DAO Pattern  │  │Repository   │  │Factory      │      │
│  └─────────────┘  └─────────────┘  └─────────────┘      │
├──────────────────────────────────────────────────────────┤
│                  Storage Backend                        │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │ Database    │  │Artifact     │  │ File System │      │
│  │ (SQLite/    │  │ Store       │  │  Storage    │      │
│  │ PostgreSQL) │  │ (Local/S3)  │  │             │      │
│  └─────────────┘  └─────────────┘  └─────────────┘      │
└──────────────────────────────────────────────────────────┘

🗄️ 后端存储系统

存储架构:分离的元数据存储和工件存储,支持多种数据库和存储后端

后端存储组件:

1. 元数据存储
   ├── PostgreSQL 推荐生产环境
   ├── SQLite 适合开发环境
   ├── MySQL/MariaDB 支持
   └── SQLAlchemy ORM 统一接口

2. 工件存储
   ├── 本地文件系统
   ├── Amazon S3
   ├── Azure Blob Storage
   ├── Google Cloud Storage
   └── Hadoop HDFS

3. 配置示例
backend_store_uri: postgresql://localhost/mlflow
artifact_root: s3://mlflow-bucket/artifacts

📊 数据库模型

MLflow 核心数据模型:

1. Experiment 表
   experiment_id: UUID (主键)
   name: VARCHAR(255)
   artifact_location: TEXT
   lifecycle_stage: ENUM('active', 'deleted')
   creation_time: TIMESTAMP
   last_update_time: TIMESTAMP

2. Run 表
   run_uuid: UUID (主键)
   experiment_id: UUID (外键)
   run_name: VARCHAR(255)
   run_status: ENUM('SCHEDULED', 'RUNNING', 'FINISHED', 'FAILED', 'KILLED')
   lifecycle_stage: ENUM('active', 'deleted')
   start_time: TIMESTAMP
   end_time: TIMESTAMP
   source_type: VARCHAR(50)
   source_name: VARCHAR(255)
   entry_point_name: VARCHAR(255)

3. Param 表
   param_id: SERIAL (主键)
   run_uuid: UUID (外键)
   key: VARCHAR(255)
   value: TEXT
   timestamp: TIMESTAMP

🖥️ MLflowTrackingServer

# MLflow Tracking Server 架构
class MLflowTrackingServer:
    def __init__(self, store, artifact_manager):
        self.store = store  # MLflowStore
        self.artifact_manager = artifact_manager  # LocalArtifactManager
        self.experiment_manager = ExperimentManager(store)
        self.run_manager = RunManager(store)
        self.model_manager = ModelRegistryManager(store)
        
    def start(self, host="0.0.0.0", port=5000):
        app = Flask(__name__)
        self.register_blueprints(app)
        return app.run(host=host, port=port)
        
    def register_blueprints(self, app):
        # 实验管理 API
        app.register_blueprint(experiments_bp)
        # 运行管理 API
        app.register_blueprint(runs_bp)
        # 模型注册表 API
        app.register_blueprint(model_registry_bp)
        # 工件管理 API
        app.register_blueprint(artifact_bp)

Tracking Server 提供统一的 RESTful API 接口,管理所有 MLflow 功能

🌐 RESTful API

API 设计原则:RESTful 风格,状态码语义化,JSON 格式数据交换

核心 API 端点:

实验管理:
GET /api/2.0/mlflow/experiments/list
POST /api/2.0/mlflow/experiments/create
GET /api/2.0/mlflow/experiments/get
DELETE /api/2.0/mlflow/experiments/delete

运行管理:
POST /api/2.0/mlflow/runs/create
GET /api/2.0/mlflow/runs/list
GET /api/2.0/mlflow/runs/get-info
POST /api/2.0/mlflow/runs/log-params
POST /api/2.0/mlflow/runs/log-metrics
POST /api/2.0/mlflow/runs/log-model
POST /api/2.0/mlflow/runs/set-tag
POST /api/2.0/mlflow/runs/delete

模型注册表:
POST /api/2.0/mlflow/register-model
GET /api/2.0/mlflow/model-registry/get
POST /api/2.0/mlflow/model-registry/create-version
POST /api/2.0/mlflow/model-registry/transition-stage
POST /api/2.0/mlflow/model-registry/delete-version

📊 数据收集器

# MLflow 数据收集机制
class DataCollector:
    def __init__(self, run_id, store):
        self.run_id = run_id
        self.store = store
        self.metrics_buffer = []
        self.params_buffer = []
        
    def log_param(self, key, value):
        """记录参数"""
        param = Param(
            run_uuid=self.run_id,
            key=key,
            value=str(value),
            timestamp=datetime.utcnow()
        )
        self.params_buffer.append(param)
        
    def log_metric(self, key, value, step=None, timestamp=None):
        """记录指标"""
        metric = Metric(
            run_uuid=self.run_id,
            key=key,
            value=value,
            step=step,
            timestamp=timestamp or datetime.utcnow(),
            is_nan=False,
            is_inf=False
        )
        self.metrics_buffer.append(metric)
        
    def _flush_buffer(self):
        """批量写入数据库"""
        with self.store.session() as session:
            session.add_all(self.params_buffer)
            session.add_all(self.metrics_buffer)
            session.commit()
            self.params_buffer.clear()
            self.metrics_buffer.clear()

🧪 实验管理器

# Experiment Manager 核心实现
class ExperimentManager:
    def __init__(self, store):
        self.store = store
        
    def create_experiment(self, name, artifact_location=None, tags=None):
        """创建新实验"""
        experiment = Experiment(
            name=name,
            artifact_location=artifact_location or self._default_artifact_location(),
            lifecycle_stage='active',
            creation_time=datetime.utcnow(),
            last_update_time=datetime.utcnow()
        )
        
        # 处理标签
        if tags:
            for key, value in tags.items():
                experiment.experiment_tags.append(
                    ExperimentTag(key=key, value=value)
                )
                
        with self.store.session() as session:
            session.add(experiment)
            session.commit()
            return experiment
            
    def get_experiment(self, experiment_id):
        """获取实验信息"""
        return self.store.get_experiment(experiment_id)
        
    def list_experiments(self, view_type='ALL'):
        """列出所有实验"""
        return self.store.list_experiments(view_type)
        
    def delete_experiment(self, experiment_id):
        """删除实验"""
        return self.store.delete_experiment(experiment_id)

📝 跟踪日志

运行日志管理:支持不同类型的日志记录,包括训练日志、验证日志、系统日志等

# 运行日志记录机制
class RunLogger:
    def __init__(self, run_uuid, store):
        self.run_uuid = run_uuid
        self.store = store
        self.log_buffer = []
        
    def log_metric(self, key, value, step=None, timestamp=None, 
                   synchronous=False, prefix=None):
        """记录指标"""
        # 验证数据
        if not isinstance(value, (int, float)):
            raise MLflowException(f"Invalid metric value: {value}")
            
        # 构建键名
        metric_key = f"{prefix}.{key}" if prefix else key
        
        # 创建指标记录
        metric = Metric(
            run_uuid=self.run_uuid,
            key=metric_key,
            value=value,
            step=step,
            timestamp=timestamp or datetime.utcnow(),
            is_nan=False,
            is_inf=False
        )
        
        # 异步或同步记录
        if synchronous:
            self.store.log_metric(metric)
        else:
            self._buffer_metric(metric)
            
    def _buffer_metric(self, metric):
        """缓冲指标数据"""
        self.log_buffer.append(metric)
        if len(self.log_buffer) >= self.MAX_BUFFER_SIZE:
            self._flush_logs()

📊 运行记录

# 运行记录核心功能
class RunManager:
    def __init__(self, store):
        self.store = store
        self.active_runs = {}  # 运行 ID 到运行实例的映射
        
    def create_run(self, experiment_id, run_name=None, 
                   user_id=None, source_type=None, source_name=None):
        """创建新的运行"""
        run_uuid = uuid.uuid4()
        
        run = Run(
            run_uuid=run_uuid,
            experiment_id=experiment_id,
            run_name=run_name or f"run_{int(time.time())}",
            run_status='SCHEDULED',
            lifecycle_stage='active',
            start_time=datetime.utcnow(),
            end_time=None,
            source_type=source_type,
            source_name=source_name,
            user_id=user_id
        )
        
        with self.store.session() as session:
            session.add(run)
            session.commit()
            
        # 加入活跃运行列表
        self.active_runs[run_uuid] = run
        return run
        
    def end_run(self, run_uuid, status='FINISHED'):
        """结束运行"""
        if run_uuid not in self.active_runs:
            raise MLflowException(f"Run {run_uuid} not active")
            
        run = self.active_runs[run_uuid]
        run.run_status = status
        run.end_time = datetime.utcnow()
        
        with self.store.session() as session:
            session.merge(run)
            session.commit()
            
        # 从活跃运行列表移除
        del self.active_runs[run_uuid]
        
        return run

🤖 模型管理

# 模型注册表管理器
class ModelRegistryManager:
    def __init__(self, store):
        self.store = store
        
    def create_model(self, name, description=None, tags=None):
        """创建新模型"""
        model = Model(
            name=name,
            description=description,
            lifecycle_stage='active',
            creation_time=datetime.utcnow(),
            last_update_time=datetime.utcnow()
        )
        
        # 处理标签
        if tags:
            for key, value in tags.items():
                model.model_tags.append(ModelTag(key=key, value=value))
                
        with self.store.session() as session:
            session.add(model)
            session.commit()
            return model
            
    def register_model_version(self, name, run_id, description=None, 
                              tags=None, source=None, run_link=None):
        """注册模型版本"""
        # 获取模型
        model = self.store.get_model_by_name(name)
        if not model:
            raise MLflowException(f"Model {name} not found")
            
        # 获取运行
        run = self.store.get_run(run_id)
        
        # 创建模型版本
        model_version = ModelVersion(
            name=name,
            version=model.next_version(),
            run_id=run_id,
            current_stage='None',
            description=description,
            creation_time=datetime.utcnow(),
            last_update_time=datetime.utcnow()
        )
        
        # 保存模型文件
        artifact_path = self._save_model_artifacts(run, model_version)
        model_version.source = artifact_path
        
        with self.store.session() as session:
            session.add(model_version)
            session.commit()
            return model_version

📦 Artifact 存储

# Artifact 管理器
class ArtifactManager:
    def __init__(self, artifact_root, artifact_uri):
        self.artifact_root = Path(artifact_root)
        self.artifact_uri = artifact_uri
        
    def log_artifact(self, run_uuid, local_path, artifact_path=None):
        """记录工件"""
        # 确定目标路径
        if artifact_path is None:
            filename = Path(local_path).name
            artifact_path = f"{run_uuid}/{filename}"
            
        # 创建目标目录
        target_dir = self.artifact_root / run_uuid
        target_dir.mkdir(parents=True, exist_ok=True)
        
        # 复制文件
        source = Path(local_path)
        target = target_dir / Path(artifact_path).name
        
        if source.is_file():
            shutil.copy2(source, target)
        else:
            shutil.copytree(source, target)
            
        return f"{self.artifact_uri}/{run_uuid}/{target.name}"
        
    def list_artifacts(self, run_uuid, path=''):
        """列出工件"""
        run_dir = self.artifact_root / run_uuid
        if not run_dir.exists():
            return []
            
        artifacts = []
        full_path = run_dir / path if path else run_dir
        
        if full_path.is_file():
            artifacts.append({
                'path': path,
                'is_dir': False,
                'file_size': full_path.stat().st_size
            })
        else:
            for item in full_path.iterdir():
                rel_path = str(item.relative_to(run_dir))
                artifacts.append({
                    'path': rel_path,
                    'is_dir': item.is_dir(),
                    'file_size': item.stat().st_size if item.is_file() else 0
                })
                
        return artifacts

🌐 客户端 API

MLflow Client:提供统一的客户端接口,支持多种编程语言和协议

# MLflow Client 核心功能
class MLflowClient:
    def __init__(self, tracking_uri=None, registry_uri=None):
        self.tracking_uri = tracking_uri or "http://localhost:5000"
        self.registry_uri = registry_uri or tracking_uri
        self.tracking_client = TrackingClient(tracking_uri)
        self.registry_client = RegistryClient(registry_uri)
        
    # 实验管理
    def create_experiment(self, name, artifact_location=None):
        """创建实验"""
        return self.tracking_client.create_experiment(name, artifact_location)
        
    def get_experiment(self, experiment_id):
        """获取实验信息"""
        return self.tracking_client.get_experiment(experiment_id)
        
    def list_experiments(self):
        """列出所有实验"""
        return self.tracking_client.list_experiments()
        
    # 运行管理
    def create_run(self, experiment_id, run_name=None):
        """创建运行"""
        return self.tracking_client.create_run(experiment_id, run_name)
        
    def log_param(self, run_id, key, value):
        """记录参数"""
        return self.tracking_client.log_param(run_id, key, value)
        
    def log_metric(self, run_id, key, value, step=None):
        """记录指标"""
        return self.tracking_client.log_metric(run_id, key, value, step)
        
    def log_model(self, run_id, model, artifact_path):
        """记录模型"""
        return self.tracking_client.log_model(run_id, model, artifact_path)

🐍 Python SDK

MLflow Python SDK:提供高级 API 接口,支持自动记录、参数搜索、模型评估等功能

# MLflow Python API 使用示例
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# 设置追踪 URI
mlflow.set_tracking_uri("http://localhost:5000")

# 创建实验
experiment_name = "experiment_" + datetime.now().strftime("%Y%m%d")
mlflow.create_experiment(experiment_name)

# 开始运行
with mlflow.start_run(experiment_name=experiment_name) as run:
    # 记录参数
    n_estimators = 100
    max_depth = 10
    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_param("max_depth", max_depth)
    
    # 训练模型
    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth
    )
    model.fit(X_train, y_train)
    
    # 评估模型
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    mlflow.log_metric("accuracy", accuracy)
    
    # 记录模型
    mlflow.sklearn.log_model(model, "random_forest_model")
    
    # 记录工件
    mlflow.log_artifact("training_data.csv")
    mlflow.log_artifact("model_summary.txt")

🔗 MLflowClient

# MLflowClient 详细实现
class MLflowClient:
    def __init__(self, tracking_uri=None, registry_uri=None):
        self.tracking_uri = tracking_uri or DEFAULT_TRACKING_URI
        self.registry_uri = registry_uri or DEFAULT_REGISTRY_URI
        
        # 初始化 HTTP 客户端
        self.tracking_client = ApiClient(self.tracking_uri)
        self.registry_client = ApiClient(self.registry_uri)
        
        # 初始化管理器
        self.experiment_client = ExperimentClient(self.tracking_client)
        self.run_client = RunClient(self.tracking_client)
        self.registry_client_obj = ModelRegistryClient(self.registry_client)
        self.artifact_client = ArtifactClient(self.tracking_client)
        
    def search_runs(self, experiment_ids, filter_string="", 
                    max_results=100, order_by="start_time DESC"):
        """搜索运行"""
        request = SearchRunsRequest(
            experiment_ids=experiment_ids,
            filter=filter_string,
            max_results=max_results,
            order_by=order_by
        )
        
        response = self.tracking_client.search_runs(request)
        return [Run.from_proto(run) for run in response.runs]
        
    def set_experiment(self, experiment_name):
        """设置当前实验"""
        experiments = self.search_experiments(f"name = '{experiment_name}'")
        if experiments:
            self.active_experiment = experiments[0]
        else:
            self.active_experiment = self.create_experiment(experiment_name)

📊 TrackingClient

# TrackingClient 核心实现
class TrackingClient:
    def __init__(self, tracking_uri):
        self.tracking_uri = tracking_uri
        self.api_client = TrackingApi(tracking_uri)
        self.store = MLflowStore(tracking_uri)
        
    def start_run(self, experiment_id=None, run_name=None, 
                  tags=None, run_uuid=None):
        """启动新运行"""
        request = StartRunRequest(
            experiment_id=experiment_id,
            run_name=run_name,
            run_uuid=run_uuid,
            start_time=datetime.utcnow()
        )
        
        response = self.api_client.start_run(request)
        return response.run
        
    def log_param(self, run_uuid, key, value, timestamp=None):
        """记录参数"""
        request = LogParamRequest(
            run_uuid=run_uuid,
            key=key,
            value=str(value),
            timestamp=timestamp or datetime.utcnow()
        )
        
        self.api_client.log_param(request)
        
    def log_metric(self, run_uuid, key, value, step=None, 
                   timestamp=None, synchronous=False):
        """记录指标"""
        request = LogMetricRequest(
            run_uuid=run_uuid,
            key=key,
            value=value,
            step=step,
            timestamp=timestamp or datetime.utcnow(),
            synchronous=synchronous
        )
        
        self.api_client.log_metric(request)
        
    def log_model(self, run_uuid, model, artifact_path, 
                 conda_env=None, registered_model_name=None):
        """记录模型"""
        # 保存模型到本地临时目录
        temp_dir = tempfile.mkdtemp()
        model_path = os.path.join(temp_dir, "model")
        mlflow.sklearn.save_model(model, model_path)
        
        # 上传模型工件
        artifact_uri = self._upload_artifact(run_uuid, artifact_path, model_path)
        
        # 创建模型信息
        model_info = Model(
            run_uuid=run_uuid,
            key=artifact_path,
            artifact_path=artifact_uri,
            flavors=self._extract_model_flavors(model)
        )
        
        # 如果需要注册模型
        if registered_model_name:
            self._register_model(registered_model_name, model_info)

📦 RegistryClient

# ModelRegistryClient 实现
class ModelRegistryClient:
    def __init__(self, registry_uri):
        self.registry_uri = registry_uri
        self.api_client = ModelRegistryApi(registry_uri)
        self.store = MLflowStore(registry_uri)
        
    def create_model(self, name, description=None, tags=None):
        """创建新模型"""
        request = CreateModelRequest(
            name=name,
            description=description,
            tags=tags or {}
        )
        
        response = self.api_client.create_model(request)
        return response.model
        
    def register_model_version(self, name, run_id, source=None, 
                              run_link=None, tags=None, description=None):
        """注册模型版本"""
        # 获取运行信息
        run = self.store.get_run(run_id)
        
        # 构建请求
        request = RegisterModelVersionRequest(
            name=name,
            source=source or run.info.artifact_uri,
            run_id=run_id,
            run_link=run_link,
            tags=tags or {},
            description=description
        )
        
        response = self.api_client.register_model_version(request)
        return response.model_version
        
    def transition_model_version_stage(self, name, version, stage):
        """转换模型版本阶段"""
        request = TransitionModelVersionStageRequest(
            name=name,
            version=version,
            stage=stage
        )
        
        self.api_client.transition_model_version_stage(request)
        
    def search_model_versions(self, filter_string="", max_results=100):
        """搜索模型版本"""
        request = SearchModelVersionsRequest(
            filter=filter_string,
            max_results=max_results
        )
        
        response = self.api_client.search_model_versions(request)
        return [ModelVersion.from_proto(mv) for mv in response.model_versions]

📁 ProjectClient

MLflow Projects:提供项目打包、参数管理和远程执行功能

# ProjectClient 实现
class ProjectClient:
    def __init__(self, project_uri=None):
        self.project_uri = project_uri or "."
        self.config = ProjectConfig.load(self.project_uri)
        
    def run(self, uri=None, params=None, version=None, 
            experiment_name=None, entry_point=None):
        """运行项目"""
        # 解析项目 URI
        project_uri = uri or self.project_uri
        project = Project.load(project_uri)
        
        # 合并参数
        run_params = self._merge_params(params, project.entry_points)
        
        # 创建临时目录
        temp_dir = tempfile.mkdtemp()
        
        try:
            # 克隆项目
            project.clone(temp_dir)
            
            # 替换参数
            project.replace_params(run_params, temp_dir)
            
            # 构建环境
            conda_env = self._build_conda_env(project, temp_dir)
            
            # 执行项目
            return self._execute_project(
                project, temp_dir, run_params, 
                experiment_name, entry_point, conda_env
            )
            
        finally:
            # 清理临时目录
            shutil.rmtree(temp_dir)
            
    def _execute_project(self, project, project_dir, params, 
                        experiment_name, entry_point, conda_env):
        """执行项目"""
        # 构建命令
        cmd = self._build_command(project, params, entry_point)
        
        # 设置环境变量
        env = os.environ.copy()
        if conda_env:
            env.update(conda_env)
            
        # 执行命令
        process = subprocess.Popen(
            cmd, cwd=project_dir, env=env,
            stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
        
        # 等待完成
        stdout, stderr = process.communicate()
        
        return ProjectRun(
            status=process.returncode,
            stdout=stdout.decode(),
            stderr=stderr.decode(),
            params=params
        )

🧪 实验与运行

实验和运行管理:MLflow 的核心功能,提供完整的实验生命周期管理

# 实验和运行的高级功能

class ExperimentManager:
    def __init__(self, store):
        self.store = store
        self.search_index = ExperimentSearchIndex()
        
    def search_experiments(self, query, max_results=100):
        """搜索实验"""
        return self.search_index.search(query, max_results)
        
    def restore_experiment(self, experiment_id):
        """恢复已删除的实验"""
        experiment = self.store.get_experiment(experiment_id)
        if experiment.lifecycle_stage == 'deleted':
            experiment.lifecycle_stage = 'active'
            experiment.last_update_time = datetime.utcnow()
            with self.store.session() as session:
                session.merge(experiment)
                session.commit()
        return experiment

class RunManager:
    def __init__(self, store):
        self.store = store
        self.run_context = RunContext()
        
    def restore_run(self, run_uuid):
        """恢复已删除的运行"""
        run = self.store.get_run(run_uuid)
        if run.lifecycle_stage == 'deleted':
            run.lifecycle_stage = 'active'
            run.last_update_time = datetime.utcnow()
            with self.store.session() as session:
                session.merge(run)
                session.commit()
        return run
        
    def get_run_history(self, experiment_id, max_results=100):
        """获取运行历史"""
        runs = self.store.list_runs(experiment_id)
        sorted_runs = sorted(
            runs, 
            key=lambda r: (r.start_time or datetime.min), 
            reverse=True
        )
        return sorted_runs[:max_results]

🏗️ 实验创建与管理

# 实验创建和管理详解

class ExperimentManager:
    def create_experiment(self, name, artifact_location=None, 
                         tags=None, description=None):
        """创建实验的完整流程"""
        
        # 1. 实验名称验证
        if not name or len(name) > 255:
            raise MLflowException("Experiment name must be 1-255 characters")
            
        # 2. 检查重名
        existing = self.store.get_experiment_by_name(name)
        if existing:
            raise MLflowException(f"Experiment '{name}' already exists")
            
        # 3. 构建工件存储路径
        if not artifact_location:
            experiment_id = str(uuid.uuid4())
            artifact_location = f"file:///tmp/mlflow/experiments/{experiment_id}"
            
        # 4. 创建实验实体
        experiment = Experiment(
            name=name,
            artifact_location=artifact_location,
            lifecycle_stage='active',
            creation_time=datetime.utcnow(),
            last_update_time=datetime.utcnow(),
            description=description
        )
        
        # 5. 处理标签
        if tags:
            for key, value in tags.items():
                tag = ExperimentTag(
                    experiment_id=experiment.experiment_id,
                    key=key,
                    value=str(value)
                )
                experiment.experiment_tags.append(tag)
                
        # 6. 保存实验
        with self.store.session() as session:
            session.add(experiment)
            session.commit()
            
        # 7. 更新搜索索引
        self.search_index.add(experiment)
        
        return experiment
        
    def delete_experiment(self, experiment_id, delete_artifacts=False):
        """删除实验"""
        experiment = self.store.get_experiment(experiment_id)
        if not experiment:
            raise MLflowException(f"Experiment {experiment_id} not found")
            
        # 标记为已删除
        experiment.lifecycle_stage = 'deleted'
        experiment.last_update_time = datetime.utcnow()
        
        with self.store.session() as session:
            session.merge(experiment)
            session.commit()
            
        # 删除工件
        if delete_artifacts:
            self._delete_experiment_artifacts(experiment)
            
        # 更新搜索索引
        self.search_index.remove(experiment)
        
        return experiment

🏃 运行创建与跟踪

# 运行创建和跟踪详解

class RunManager:
    def start_run(self, experiment_id=None, run_name=None, 
                  tags=None, run_uuid=None, experiment_name=None):
        """启动运行的完整流程"""
        
        # 1. 确定实验
        if experiment_name:
            experiment = self.get_experiment_by_name(experiment_name)
            if not experiment:
                experiment = self.create_experiment(experiment_name)
        else:
            experiment = self.get_experiment(experiment_id)
            if not experiment:
                raise MLflowException("Experiment not found")
                
        # 2. 生成或使用运行 UUID
        if not run_uuid:
            run_uuid = str(uuid.uuid4())
            
        # 3. 创建运行实体
        run = Run(
            run_uuid=run_uuid,
            experiment_id=experiment.experiment_id,
            run_name=run_name or f"run_{int(time.time())}",
            run_status='SCHEDULED',
            lifecycle_stage='active',
            start_time=datetime.utcnow(),
            end_time=None,
            source_type='NOTEBOOK',
            source_name='default',
            entry_point_name='main'
        )
        
        # 4. 处理标签
        if tags:
            for key, value in tags.items():
                tag = RunTag(
                    run_uuid=run_uuid,
                    key=key,
                    value=str(value)
                )
                run.run_tags.append(tag)
                
        # 5. 保存运行
        with self.store.session() as session:
            session.add(run)
            session.commit()
            
        # 6. 设置活动运行上下文
        self.run_context.set_active_run(run)
        
        return run
        
    def end_run(self, run_uuid, status='FINISHED'):
        """结束运行"""
        run = self.get_run(run_uuid)
        if not run:
            raise MLflowException(f"Run {run_uuid} not found")
            
        # 更新运行状态
        run.run_status = status
        run.end_time = datetime.utcnow()
        run.last_update_time = datetime.utcnow()
        
        with self.store.session() as session:
            session.merge(run)
            session.commit()
            
        # 清除活动运行上下文
        self.run_context.clear_active_run()
        
        return run

📝 参数记录

# 参数记录详解

class ParamManager:
    def log_param(self, run_uuid, key, value, timestamp=None):
        """记录参数"""
        
        # 1. 参数验证
        if not key or len(key) > 255:
            raise MLflowException("Parameter key must be 1-255 characters")
            
        if len(str(value)) > 5000:
            raise MLflowException("Parameter value must be <= 5000 characters")
            
        # 2. 检查运行是否存在
        run = self.get_run(run_uuid)
        if not run:
            raise MLflowException(f"Run {run_uuid} not found")
            
        # 3. 创建参数记录
        param = Param(
            run_uuid=run_uuid,
            key=key,
            value=str(value),
            timestamp=timestamp or datetime.utcnow()
        )
        
        # 4. 保存参数
        with self.store.session() as session:
            session.add(param)
            session.commit()
            
        # 5. 更新运行时间戳
        run.last_update_time = datetime.utcnow()
        with self.store.session() as session:
            session.merge(run)
            session.commit()
            
        return param
        
    def get_param(self, run_uuid, key):
        """获取参数值"""
        param = self.store.get_param(run_uuid, key)
        if not param:
            raise MLflowException(f"Parameter '{key}' not found for run {run_uuid}")
        return param.value
        
    def get_all_params(self, run_uuid):
        """获取所有参数"""
        return self.store.get_all_params(run_uuid)
        
    def delete_param(self, run_uuid, key):
        """删除参数"""
        with self.store.session() as session:
            param = session.query(Param).filter_by(
                run_uuid=run_uuid,
                key=key
            ).first()
            
            if param:
                session.delete(param)
                session.commit()
                return True
            return False

📊 指标记录

# 指标记录详解

class MetricManager:
    def log_metric(self, run_uuid, key, value, step=None, 
                   timestamp=None, synchronous=False):
        """记录指标"""
        
        # 1. 指标验证
        if not key or len(key) > 255:
            raise MLflowException("Metric key must be 1-255 characters")
            
        if not isinstance(value, (int, float)):
            raise MLflowException(f"Metric value must be numeric, got {type(value)}")
            
        # 2. 检查运行是否存在
        run = self.get_run(run_uuid)
        if not run:
            raise MLflowException(f"Run {run_uuid} not found")
            
        # 3. 处理特殊值
        is_nan = math.isnan(value)
        is_inf = math.isinf(value)
        
        # 4. 创建指标记录
        metric = Metric(
            run_uuid=run_uuid,
            key=key,
            value=value,
            step=step,
            timestamp=timestamp or datetime.utcnow(),
            is_nan=is_nan,
            is_inf=is_inf
        )
        
        # 5. 异步或同步记录
        if synchronous:
            self._log_metric_sync(metric)
        else:
            self._log_metric_async(metric)
            
        # 6. 更新运行时间戳
        run.last_update_time = datetime.utcnow()
        with self.store.session() as session:
            session.merge(run)
            session.commit()
            
        return metric
        
    def _log_metric_async(self, metric):
        """异步记录指标"""
        # 使用线程池或异步任务队列
        self.metric_queue.put(metric)
        
        # 定期刷新队列
        if self.metric_queue.qsize() >= self.BATCH_SIZE:
            self._flush_metrics()
            
    def _log_metric_sync(self, metric):
        """同步记录指标"""
        with self.store.session() as session:
            session.add(metric)
            session.commit()
            
    def get_metric_history(self, run_uuid, key):
        """获取指标历史"""
        return self.store.get_metric_history(run_uuid, key)
        
    def get_latest_metrics(self, run_uuid, run_uuids):
        """获取最新指标"""
        return self.store.get_latest_metrics(run_uuid, run_uuids)

🤖 模型记录

# 模型记录详解

class ModelManager:
    def log_model(self, run_uuid, model, artifact_path, 
                  conda_env=None, registered_model_name=None, 
                  await_registration_for=300, pip_requirements=None):
        """记录模型"""
        
        # 1. 创建临时目录
        temp_dir = tempfile.mkdtemp()
        model_path = os.path.join(temp_dir, "model")
        
        try:
            # 2. 保存模型
            self._save_model(model, model_path, conda_env, pip_requirements)
            
            # 3. 上传模型工件
            artifact_uri = self._upload_artifact(
                run_uuid, artifact_path, temp_dir
            )
            
            # 4. 创建模型信息
            model_info = self._create_model_info(
                run_uuid, artifact_path, artifact_uri, model
            )
            
            # 5. 保存模型信息
            self._save_model_info(run_uuid, model_info)
            
            # 6. 如果需要注册模型
            if registered_model_name:
                self._register_model_version(
                    registered_model_name, run_uuid, model_info
                )
                
            return model_info
            
        finally:
            # 7. 清理临时目录
            shutil.rmtree(temp_dir)
            
    def _save_model(self, model, model_path, conda_env, pip_requirements):
        """保存模型到本地"""
        # 根据模型类型保存
        if hasattr(model, 'predict'):  # scikit-learn
            mlflow.sklearn.save_model(
                model, model_path, 
                conda_env=conda_env,
                pip_requirements=pip_requirements
            )
        elif hasattr(model, 'save'):  # TensorFlow/PyTorch
            model.save(model_path)
        else:
            # 默认保存为 pickle
            joblib.dump(model, os.path.join(model_path, "model.pkl"))
            
    def _create_model_info(self, run_uuid, artifact_path, artifact_uri, model):
        """创建模型信息"""
        # 提取模型特征
        flavors = self._extract_model_flavors(model)
        
        model_info = Model(
            run_uuid=run_uuid,
            key=artifact_path,
            artifact_path=artifact_uri,
            flavors=flavors,
            timestamp=datetime.utcnow()
        )
        
        return model_info

📦 Artifact 管理

# Artifact 管理详解

class ArtifactManager:
    def log_artifact(self, run_uuid, local_path, artifact_path=None):
        """记录工件"""
        
        # 1. 验证输入路径
        if not os.path.exists(local_path):
            raise MLflowException(f"Local path does not exist: {local_path}")
            
        # 2. 确定目标路径
        if artifact_path is None:
            filename = os.path.basename(local_path)
            artifact_path = filename
            
        # 3. 创建目标目录
        run_artifact_dir = self._get_run_artifact_dir(run_uuid)
        target_dir = os.path.join(run_artifact_dir, os.path.dirname(artifact_path))
        os.makedirs(target_dir, exist_ok=True)
        
        # 4. 复制文件或目录
        target_path = os.path.join(target_dir, os.path.basename(artifact_path))
        
        if os.path.isfile(local_path):
            shutil.copy2(local_path, target_path)
        else:
            shutil.copytree(local_path, target_path)
            
        # 5. 创建工件记录
        artifact = Artifact(
            run_uuid=run_uuid,
            path=artifact_path,
            is_dir=os.path.isdir(local_path),
            file_size=self._get_file_size(target_path),
            creation_time=datetime.utcnow(),
            last_update_time=datetime.utcnow()
        )
        
        # 6. 保存记录
        with self.store.session() as session:
            session.add(artifact)
            session.commit()
            
        return self._artifact_to_uri(artifact_path)
        
    def list_artifacts(self, run_uuid, path=''):
        """列出工件"""
        run_artifact_dir = self._get_run_artifact_dir(run_uuid)
        full_path = os.path.join(run_artifact_dir, path)
        
        if not os.path.exists(full_path):
            return []
            
        artifacts = []
        for item in os.listdir(full_path):
            item_path = os.path.join(full_path, item)
            rel_path = os.path.join(path, item)
            
            artifacts.append({
                'path': rel_path,
                'is_dir': os.path.isdir(item_path),
                'file_size': os.path.getsize(item_path) if os.path.isfile(item_path) else 0,
                'creation_time': datetime.fromtimestamp(os.path.getctime(item_path)),
                'last_update_time': datetime.fromtimestamp(os.path.getmtime(item_path))
            })
            
        return artifacts
        
    def delete_artifact(self, run_uuid, path):
        """删除工件"""
        run_artifact_dir = self._get_run_artifact_dir(run_uuid)
        artifact_path = os.path.join(run_artifact_dir, path)
        
        if os.path.exists(artifact_path):
            if os.path.isfile(artifact_path):
                os.remove(artifact_path)
            else:
                shutil.rmtree(artifact_path)
                
            # 删除数据库记录
            with self.store.session() as session:
                session.query(Artifact).filter_by(
                    run_uuid=run_uuid,
                    path=path
                ).delete()
                session.commit()
                
            return True
        return False

🏪 模型注册表

# 模型注册表详解

class ModelRegistryManager:
    def __init__(self, store):
        self.store = store
        self.stage_transitions = {
            'None': ['Staging', 'Archived'],
            'Staging': ['Production', 'Archived'],
            'Production': ['Staging', 'Archived'],
            'Archived': []
        }
        
    def create_model(self, name, description=None, tags=None):
        """创建模型注册表"""
        # 检查模型名称唯一性
        existing = self.store.get_model_by_name(name)
        if existing:
            raise MLflowException(f"Model '{name}' already exists")
            
        # 创建模型
        model = Model(
            name=name,
            description=description,
            lifecycle_stage='active',
            creation_time=datetime.utcnow(),
            last_update_time=datetime.utcnow()
        )
        
        # 处理标签
        if tags:
            for key, value in tags.items():
                tag = ModelTag(
                    name=name,
                    key=key,
                    value=str(value)
                )
                model.model_tags.append(tag)
                
        with self.store.session() as session:
            session.add(model)
            session.commit()
            
        return model
        
    def register_model_version(self, name, run_id, source=None, 
                              run_link=None, tags=None, description=None):
        """注册模型版本"""
        # 获取模型
        model = self.store.get_model_by_name(name)
        if not model:
            model = self.create_model(name)
            
        # 获取运行
        run = self.store.get_run(run_id)
        
        # 创建模型版本
        version = model.next_version()
        model_version = ModelVersion(
            name=name,
            version=version,
            run_id=run_id,
            current_stage='None',
            source=source or run.info.artifact_uri,
            description=description,
            creation_time=datetime.utcnow(),
            last_update_time=datetime.utcnow()
        )
        
        with self.store.session() as session:
            session.add(model_version)
            session.commit()
            
        return model_version

🔄 模型注册流程

模型注册流程:从训练完成到模型注册的完整流程,包括版本管理和状态转换

# 完整的模型注册流程

class ModelRegistrationFlow:
    def register_model_from_run(self, run_id, model_name, 
                               description=None, tags=None):
        """从运行中注册模型"""
        
        # 1. 获取运行信息
        run = self.get_run(run_id)
        if not run:
            raise MLflowException(f"Run {run_id} not found")
            
        # 2. 检查运行中是否有模型
        models = self.store.get_models_by_run(run_id)
        if not models:
            raise MLflowException(f"No models found in run {run_id}")
            
        # 3. 获取第一个模型
        model_artifact = models[0]
        
        # 4. 创建或获取模型注册表
        model = self.get_or_create_model(model_name, description, tags)
        
        # 5. 创建模型版本
        version = self.create_model_version(
            model, run_id, model_artifact, run
        )
        
        # 6. 设置阶段
        version.current_stage = 'Staging'
        
        # 7. 更新数据库
        with self.store.session() as session:
            session.add(version)
            session.commit()
            
        # 8. 触发事件
        self._on_model_registered(version)
        
        return version
        
    def transition_model_version_stage(self, name, version, stage, 
                                     comment=None):
        """转换模型版本阶段"""
        
        # 1. 获取模型版本
        model_version = self.store.get_model_version(name, version)
        if not model_version:
            raise MLflowException(f"Model version {name}:{version} not found")
            
        # 2. 验证阶段转换
        if stage not in self.stage_transitions[model_version.current_stage]:
            raise MLflowException(
                f"Cannot transition from {model_version.current_stage} to {stage}"
            )
            
        # 3. 记录阶段转换
        transition = ModelVersionStageTransition(
            name=name,
            version=version,
            from_stage=model_version.current_stage,
            to_stage=stage,
            comment=comment,
            timestamp=datetime.utcnow()
        )
        
        # 4. 更新模型版本
        model_version.current_stage = stage
        model_version.last_update_time = datetime.utcnow()
        
        with self.store.session() as session:
            session.add(transition)
            session.merge(model_version)
            session.commit()
            
        # 5. 触发事件
        self._on_stage_transition(model_version, transition)
        
        return model_version

📊 模型版本管理

# 模型版本管理详解

class ModelVersionManager:
    def __init__(self, store):
        self.store = store
        self.version_cache = {}  # 缓存版本信息
        
    def create_model_version(self, model_name, run_id, source, 
                            run_link=None, tags=None, description=None):
        """创建模型版本"""
        
        # 1. 获取模型信息
        model = self.store.get_model_by_name(model_name)
        if not model:
            raise MLflowException(f"Model {model_name} not found")
            
        # 2. 获取运行信息
        run = self.store.get_run(run_id)
        
        # 3. 确定版本号
        version = model.next_version()
        
        # 4. 创建模型版本
        model_version = ModelVersion(
            name=model_name,
            version=version,
            run_id=run_id,
            current_stage='None',
            source=source,
            run_link=run_link,
            description=description,
            creation_time=datetime.utcnow(),
            last_update_time=datetime.utcnow()
        )
        
        # 5. 处理标签
        if tags:
            for key, value in tags.items():
                tag = ModelVersionTag(
                    name=model_name,
                    version=version,
                    key=key,
                    value=str(value)
                )
                model_version.tags.append(tag)
                
        # 6. 保存模型版本
        with self.store.session() as session:
            session.add(model_version)
            session.commit()
            
        # 7. 更新缓存
        self.version_cache[f"{model_name}:{version}"] = model_version
        
        return model_version
        
    def get_model_version(self, name, version):
        """获取模型版本"""
        cache_key = f"{name}:{version}"
        
        # 检查缓存
        if cache_key in self.version_cache:
            return self.version_cache[cache_key]
            
        # 从数据库获取
        model_version = self.store.get_model_version(name, version)
        if model_version:
            self.version_cache[cache_key] = model_version
            
        return model_version
        
    def list_model_versions(self, name=None, max_results=None):
        """列出模型版本"""
        return self.store.list_model_versions(name, max_results)
        
    def delete_model_version(self, name, version):
        """删除模型版本"""
        model_version = self.get_model_version(name, version)
        if not model_version:
            raise MLflowException(f"Model version {name}:{version} not found")
            
        # 标记为已删除
        model_version.lifecycle_stage = 'deleted'
        model_version.last_update_time = datetime.utcnow()
        
        with self.store.session() as session:
            session.merge(model_version)
            session.commit()
            
        # 更新缓存
        cache_key = f"{name}:{version}"
        if cache_key in self.version_cache:
            del self.version_cache[cache_key]
            
        return model_version

🎯 模型 Stage 管理

模型阶段管理:管理模型在不同阶段的生命周期状态,支持生产环境管理

阶段 描述 可用转换
None 新创建的模型版本 → Staging, Archived
Staging 准备进入生产环境 → Production, Archived
Production 生产环境中使用 → Staging, Archived
Archived 已归档,不可用
# 阶段转换实现
class StageTransitionManager:
    def __init__(self):
        self.stage_transitions = {
            'None': ['Staging', 'Archived'],
            'Staging': ['Production', 'Archived'],
            'Production': ['Staging', 'Archived'],
            'Archived': []
        }
        
    def validate_transition(self, from_stage, to_stage):
        """验证阶段转换"""
        if to_stage not in self.stage_transitions.get(from_stage, []):
            raise MLflowException(
                f"Invalid stage transition: {from_stage} → {to_stage}"
            )
            
    def transition_stage(self, name, version, to_stage, comment=None):
        """执行阶段转换"""
        
        # 1. 获取模型版本
        model_version = self.get_model_version(name, version)
        
        # 2. 验证转换
        self.validate_transition(model_version.current_stage, to_stage)
        
        # 3. 记录转换历史
        transition = ModelVersionStageTransition(
            name=name,
            version=version,
            from_stage=model_version.current_stage,
            to_stage=to_stage,
            comment=comment,
            timestamp=datetime.utcnow()
        )
        
        # 4. 更新阶段
        model_version.current_stage = to_stage
        model_version.last_update_time = datetime.utcnow()
        
        # 5. 保存数据
        with self.store.session() as session:
            session.add(transition)
            session.merge(model_version)
            session.commit()
            
        return model_version

🌐 模型 API

# 模型管理 API

class ModelRegistryAPI:
    def __init__(self, client):
        self.client = client
        
    def create_model(self, name, description=None, tags=None):
        """创建模型"""
        return self.client.create_model(name, description, tags)
        
    def get_model(self, name):
        """获取模型信息"""
        return self.client.get_model(name)
        
    def list_models(self, filter_string="", max_results=100):
        """列出模型"""
        return self.client.list_models(filter_string, max_results)
        
    def delete_model(self, name):
        """删除模型"""
        return self.client.delete_model(name)
        
    def register_model(self, name, source, run_id=None, 
                      tags=None, description=None):
        """注册模型版本"""
        return self.client.register_model_version(
            name, source, run_id, tags, description
        )
        
    def get_model_version(self, name, version):
        """获取模型版本"""
        return self.client.get_model_version(name, version)
        
    def transition_model_version_stage(self, name, version, stage, comment=None):
        """转换模型版本阶段"""
        return self.client.transition_model_version_stage(
            name, version, stage, comment
        )
        
    def search_model_versions(self, filter_string="", max_results=100):
        """搜索模型版本"""
        return self.client.search_model_versions(filter_string, max_results)
        
    def get_model_version_download_uri(self, name, version):
        """获取模型下载 URI"""
        return self.client.get_model_version_download_uri(name, version)
        
    def load_model_version(self, name, version, dst_path=None):
        """加载模型版本"""
        return self.client.load_model_version(name, version, dst_path)

🚀 模型服务

模型服务系统:提供模型部署、推理服务和管理功能,支持多种模型格式和部署方式

# 模型服务实现
class ModelService:
    def __init__(self, model_registry_client):
        self.registry_client = model_registry_client
        self.model_cache = {}  # 模型缓存
        self.model_servers = {}  # 活跃模型服务
        
    def serve_model(self, name, version=None, host="localhost", port=5000):
        """部署模型"""
        
        # 1. 获取模型
        if version:
            model_version = self.registry_client.get_model_version(name, version)
        else:
            model_version = self.registry_client.get_latest_model_version(name)
            
        if not model_version:
            raise MLflowException(f"Model {name}:{version} not found")
            
        # 2. 检查是否已部署
        service_key = f"{name}:{version}"
        if service_key in self.model_servers:
            return self.model_servers[service_key]
            
        # 3. 加载模型
        model = self._load_model(model_version)
        
        # 4. 创建服务端点
        app = self._create_model_app(model, model_version)
        
        # 5. 启动服务
        server = threading.Thread(
            target=app.run, 
            kwargs={"host": host, "port": port}
        )
        server.daemon = True
        server.start()
        
        # 6. 保存服务引用
        self.model_servers[service_key] = {
            "server": server,
            "app": app,
            "model": model,
            "version": model_version
        }
        
        return self.model_servers[service_key]
        
    def _create_model_app(self, model, model_version):
        """创建模型服务应用"""
        app = Flask(__name__)
        
        @app.route("/predict", methods=["POST"])
        def predict():
            data = request.json
            prediction = model.predict(data)
            return jsonify({"prediction": prediction})
            
        @app.route("/health", methods=["GET"])
        def health():
            return jsonify({"status": "healthy"})
            
        return app

📦 模型打包

模型打包机制:将模型、代码、环境信息打包成标准格式,支持跨平台部署

# 模型打包实现
class ModelPacker:
    def __init__(self, model_dir):
        self.model_dir = Path(model_dir)
        self.packaged_dir = None
        
    def package_model(self, model_flavors, artifacts_path=None):
        """打包模型"""
        
        # 1. 创建打包目录
        self.packaged_dir = self.model_dir / "packaged_model"
        self.packaged_dir.mkdir(parents=True, exist_ok=True)
        
        # 2. 创建 MLmodel 文件
        self._create_mlmodel_file(model_flavors)
        
        # 3. 复制工件
        if artifacts_path:
            self._copy_artifacts(artifacts_path)
            
        # 4. 创建 conda 环境
        self._create_conda_env()
        
        # 5. 创建 requirements.txt
        self._create_requirements_file()
        
        # 6. 创建签名文件
        self._create_signature()
        
        return self.packaged_dir
        
    def _create_mlmodel_file(self, model_flavors):
        """创建 MLmodel 文件"""
        mlmodel_path = self.packaged_dir / "MLmodel"
        
        model_spec = {
            "artifact_path": ".",
            "flavors": model_flavors,
            "mlflow_version": mlflow.__version__,
            "signature": self._generate_signature(),
            "loader_module": "mlflow.sklearn",
            "python_version": sys.version
        }
        
        with open(mlmodel_path, 'w') as f:
            yaml.dump(model_spec, f, default_flow_style=False)
            
    def _copy_artifacts(self, artifacts_path):
        """复制工件文件"""
        if os.path.exists(artifacts_path):
            for item in os.listdir(artifacts_path):
                src = os.path.join(artifacts_path, item)
                dst = self.packaged_dir / item
                if os.path.isfile(src):
                    shutil.copy2(src, dst)
                else:
                    shutil.copytree(src, dst)

🚀 模型部署

模型部署策略:支持多种部署方式,包括本地服务、容器化部署、云平台部署等

# 模型部署实现
class ModelDeploymentManager:
    def __init__(self, registry_client):
        self.registry_client = registry_client
        self.deployment_configs = {}
        
    def deploy_local(self, name, version=None, host="localhost", port=5000):
        """本地部署"""
        
        # 1. 获取模型
        model_version = self._get_model_version(name, version)
        
        # 2. 创建部署配置
        config = {
            "type": "local",
            "host": host,
            "port": port,
            "model": model_version
        }
        
        # 3. 创建服务
        service = self._create_local_service(model_version, config)
        
        # 4. 启动服务
        service.start()
        
        # 5. 保存配置
        self.deployment_configs[f"{name}:{version}"] = config
        
        return service
        
    def deploy_docker(self, name, version=None, image_name=None, 
                     port=5000, gpu=False):
        """Docker 部署"""
        
        # 1. 获取模型
        model_version = self._get_model_version(name, version)
        
        # 2. 构建镜像
        if not image_name:
            image_name = f"mlflow-model-{name}-{version}"
            
        dockerfile = self._generate_dockerfile(model_version)
        
        # 3. 创建容器
        container = self._create_docker_container(
            model_version, image_name, port, gpu
        )
        
        # 4. 启动容器
        container.start()
        
        # 5. 保存配置
        config = {
            "type": "docker",
            "image": image_name,
            "container": container,
            "model": model_version
        }
        self.deployment_configs[f"{name}:{version}"] = config
        
        return container
        
    def deploy_cloud(self, name, version=None, provider="aws", 
                    region="us-west-2", instance_type="ml.m5.large"):
        """云平台部署"""
        
        # 1. 获取模型
        model_version = self._get_model_version(name, version)
        
        # 2. 根据云平台部署
        if provider == "aws":
            return self._deploy_aws(model_version, region, instance_type)
        elif provider == "gcp":
            return self._deploy_gcp(model_version, region)
        elif provider == "azure":
            return self._deploy_azure(model_version, region)
        else:
            raise MLflowException(f"Unsupported cloud provider: {provider}")

🤖 模型推理

模型推理系统:提供高效的模型推理服务,支持批量推理、异步处理和负载均衡

# 模型推理服务实现
class ModelInferenceService:
    def __init__(self, model_registry_client):
        self.registry_client = model_registry_client
        self.active_models = {}  # 活跃模型缓存
        self.request_queue = Queue()
        self.worker_threads = []
        
    def load_model(self, name, version=None):
        """加载模型到内存"""
        
        model_key = f"{name}:{version}" if version else f"{name}:latest"
        
        # 检查是否已加载
        if model_key in self.active_models:
            return self.active_models[model_key]
            
        # 获取模型版本
        if version:
            model_version = self.registry_client.get_model_version(name, version)
        else:
            model_version = self.registry_client.get_latest_model_version(name)
            
        if not model_version:
            raise MLflowException(f"Model {name} not found")
            
        # 加载模型
        model = self._load_model_from_uri(model_version.source)
        
        # 缓存模型
        self.active_models[model_key] = {
            "model": model,
            "version": model_version,
            "loaded_time": time.time()
        }
        
        return self.active_models[model_key]
        
    def predict(self, name, input_data, version=None, batch=False):
        """模型推理"""
        
        # 加载模型
        model_info = self.load_model(name, version)
        model = model_info["model"]
        
        if batch:
            # 批量推理
            predictions = []
            for data in input_data:
                pred = self._run_inference(model, data)
                predictions.append(pred)
            return predictions
        else:
            # 单次推理
            return self._run_inference(model, input_data)
            
    def _run_inference(self, model, input_data):
        """执行推理"""
        try:
            # 根据模型类型处理输入
            if hasattr(model, 'predict'):
                return model.predict(input_data)
            elif hasattr(model, 'call'):
                return model.call(input_data)
            else:
                return model([input_data])
                
        except Exception as e:
            raise MLflowException(f"Inference failed: {str(e)}")
            
    def batch_predict(self, name, input_data, version=None, 
                      max_batch_size=32, timeout=60):
        """批量预测"""
        
        # 分批处理
        batches = self._split_into_batches(input_data, max_batch_size)
        results = []
        
        for batch in batches:
            future = self.submit_prediction(name, batch, version)
            try:
                result = future.result(timeout=timeout)
                results.extend(result)
            except TimeoutError:
                raise MLflowException(f"Batch prediction timed out")
                
        return results

📁 项目管理

MLflow Projects:提供机器学习项目的打包、参数管理和可重现执行功能

# 项目管理核心实现
class ProjectManager:
    def __init__(self, project_uri=None):
        self.project_uri = project_uri or "."
        self.project_spec = self._load_project_spec()
        
    def run(self, params=None, version=None, experiment_name=None, 
            entry_point=None, timeout=None):
        """运行项目"""
        
        # 1. 验证参数
        params = self._validate_params(params)
        
        # 2. 克隆项目(如果是 URI)
        if self._is_remote_uri(self.project_uri):
            local_path = self._clone_project(self.project_uri, version)
        else:
            local_path = self.project_uri
            
        # 3. 构建命令
        cmd = self._build_command(local_path, params, entry_point)
        
        # 4. 设置环境
        env = self._setup_environment(params, experiment_name)
        
        # 5. 执行命令
        result = self._execute_command(cmd, env, local_path, timeout)
        
        # 6. 处理结果
        return self._process_project_result(result, experiment_name)
        
    def _validate_params(self, params):
        """验证参数"""
        if not params:
            params = {}
            
        # 根据项目规范验证参数
        if self.project_spec.entry_points:
            for ep_name, ep_spec in self.project_spec.entry_points.items():
                if ep_name == params.get('entry_point'):
                    self._validate_entry_point_params(params, ep_spec)
                    
        return params
        
    def _build_command(self, project_path, params, entry_point):
        """构建执行命令"""
        cmd = []
        
        # 添加 Python 解释器
        cmd.append(sys.executable)
        
        # 添加入口点
        if entry_point:
            cmd.append(os.path.join(project_path, entry_point))
        else:
            cmd.append(os.path.join(project_path, self.project_spec.default_entry_point))
            
        # 添加参数
        for key, value in params.items():
            if key != 'entry_point':
                cmd.extend([f"--{key}", str(value)])
                
        return cmd
        
    def _execute_command(self, cmd, env, cwd, timeout):
        """执行命令"""
        try:
            process = subprocess.Popen(
                cmd, cwd=cwd, env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
            
            if timeout:
                stdout, stderr = process.communicate(timeout=timeout)
            else:
                stdout, stderr = process.communicate()
                
            return ProjectRunResult(
                exit_code=process.returncode,
                stdout=stdout.decode(),
                stderr=stderr.decode(),
                cmd=cmd
            )
            
        except subprocess.TimeoutExpired:
            process.kill()
            raise MLflowException(f"Project execution timed out")

📋 项目规范

project.yaml 规范:定义 MLflow 项目的结构和执行方式,支持参数化和配置管理

# project.yaml 配置规范示例
name: "my-ml-project"                    # 项目名称
entry_points:                           # 入口点定义
  main:
    parameters:                         # 参数定义
      learning_rate:
        type: float                      # 参数类型
        default: 0.01                   # 默认值
        description: "Learning rate for training"
      batch_size:
        type: int
        default: 32
        description: "Batch size for training"
      epochs:
        type: int
        default: 100
      optimizer:
        type: enum
        default: "adam"
        enum: ["adam", "sgd", "rmsprop"]
      data_path:
        type: str
        description: "Path to training data"
    command: "python train.py --learning_rate={learning_rate} --batch_size={batch_size} --epochs={epochs} --optimizer={optimizer} --data_path={data_path}"
    
  evaluate:
    parameters:
      model_path:
        type: str
        description: "Path to model file"
      test_data_path:
        type: str
        description: "Path to test data"
    command: "python evaluate.py --model_path={model_path} --test_data_path={test_data_path}"

# 项目管理器解析配置
class ProjectSpec:
    def __init__(self, project_dir):
        self.project_dir = Path(project_dir)
        self.spec_file = self.project_dir / "project.yaml"
        self._load_spec()
        
    def _load_spec(self):
        """加载项目规范"""
        if not self.spec_file.exists():
            raise MLflowException("project.yaml not found")
            
        with open(self.spec_file, 'r') as f:
            self.spec = yaml.safe_load(f)
            
        self.name = self.spec.get('name', 'unnamed_project')
        self.entry_points = {}
        self.default_entry_point = self.spec.get('default_entry_point')
        
        # 解析入口点
        for ep_name, ep_spec in self.spec.get('entry_points', {}).items():
            self.entry_points[ep_name] = EntryPointSpec(ep_name, ep_spec)

🚀 项目运行

# 项目运行实现
class ProjectRunner:
    def __init__(self, project_uri):
        self.project_uri = project_uri
        self.artifact_manager = ArtifactManager()
        self.model_registry = ModelRegistryManager()
        
    def run_with_mlflow(self, params=None, version=None, 
                       experiment_name=None, entry_point=None):
        """使用 MLflow 运行项目"""
        
        # 1. 设置实验
        if experiment_name:
            mlflow.set_experiment(experiment_name)
            
        # 2. 启动运行
        with mlflow.start_run() as run:
            # 3. 记录参数
            if params:
                for key, value in params.items():
                    mlflow.log_param(key, value)
                    
            # 4. 执行项目
            result = self._run_project(params, version, entry_point)
            
            # 5. 记录结果
            self._log_results(result, run.info.run_uuid)
            
            # 6. 记录模型(如果产生)
            self._log_models(result, run.info.run_uuid)
            
            # 7. 记录工件
            self._log_artifacts(result, run.info.run_uuid)
            
            return result
            
    def _run_project(self, params, version, entry_point):
        """执行项目"""
        
        # 1. 获取项目规范
        project_spec = ProjectSpec(self.project_uri)
        
        # 2. 克隆项目
        local_path = self._clone_project(version)
        
        # 3. 构建命令
        cmd = self._build_command(project_spec, params, entry_point)
        
        # 4. 执行命令
        env = os.environ.copy()
        env.update(self._setup_environment(params))
        
        process = subprocess.Popen(
            cmd, cwd=local_path, env=env,
            stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
        
        # 5. 等待完成
        stdout, stderr = process.communicate()
        
        # 6. 解析结果
        result = ProjectRunResult(
            exit_code=process.returncode,
            stdout=stdout.decode(),
            stderr=stderr.decode(),
            cmd=cmd
        )
        
        return result
        
    def _log_results(self, result, run_uuid):
        """记录运行结果"""
        # 解析日志中的指标
        metrics = self._parse_metrics_from_logs(result.stdout)
        
        for key, value in metrics.items():
            mlflow.log_metric(key, value)
            
        # 解析日志中的损失
        losses = self._parse_losses_from_logs(result.stdout)
        
        for epoch, loss in losses.items():
            mlflow.log_metric("loss", loss, step=epoch)

📐 设计模式分析

MLflow 架构模式:采用多种设计模式实现高内聚、低耦合的模块化架构

观察者模式

  • 实验运行状态通知
  • 模型阶段变化通知
  • 异步任务完成通知

工厂模式

  • 不同类型模型创建
  • 不同存储后端创建
  • 不同格式模型加载

策略模式

  • 不同指标计算策略
  • 不同模型评估策略
  • 不同部署策略

代理模式

  • MLflow Client 代理
  • 模型服务代理
  • 存储访问代理

🔄 数据流架构

MLflow 数据流架构:

┌──────────────────────────────────────────────────────────┐
│                    Client Layer                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │ Python SDK  │  │ R SDK      │  │ Java SDK    │      │
│  └─────────────┘  └─────────────┘  └─────────────┘      │
├──────────────────────────────────────────────────────────┤
│                   API Gateway                           │
│  ┌─────────────────────────────────────────────────┐   │
│  │              RESTful API                         │   │
│  │  /api/2.0/mlflow/                               │   │
│  │  POST /runs/create                              │   │
│  │  POST /runs/log-metrics                         │   │
│  │  POST /register-model                           │   │
│  └─────────────────────────────────────────────────┘   │
├──────────────────────────────────────────────────────────┤
│                  Business Logic                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │Experiment  │  │ Run        │  │ Model       │      │
│  │Manager     │  │Manager     │  │Registry     │      │
│  └─────────────┘  └─────────────┘  └─────────────┘      │
├──────────────────────────────────────────────────────────┤
│                   Data Access                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │Repository  │  │ DAO        │  │ Unit of     │      │
│  │Pattern     │  │Pattern     │  │Work         │      │
│  └─────────────┘  └─────────────┘  └─────────────┘      │
├──────────────────────────────────────────────────────────┤
│                  Storage Layer                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │ Database   │  │ Artifact   │  │ File System │      │
│  │ (PostgreSQL│  │ Store      │  │  Storage    │      │
│  │ / SQLite)  │  │ (S3/Local) │  │             │      │
│  └─────────────┘  └─────────────┘  └─────────────┘      │
└──────────────────────────────────────────────────────────┘

✅ 最佳实践

实验管理

  • 按项目/模型类型组织实验
  • 使用有意义的实验名称
  • 定期清理旧的实验
  • 使用标签进行分类管理

参数管理

  • 使用有意义的参数名称
  • 记录参数范围和搜索空间
  • 避免记录敏感信息
  • 使用参数网格搜索

模型管理

  • 使用语义化版本号
  • 合理设置模型阶段
  • 记录模型评估指标
  • 定期清理无用模型

性能优化

  • 使用批量记录指标
  • 异步上传工件
  • 合理配置数据库连接池
  • 使用索引加速查询

📚 扩展阅读

官方文档

源码仓库

相关技术

高级主题

  • 分布式实验管理
  • 模型可解释性集成
  • 自动化 ML 流程
  • 企业级部署方案

🎯 总结

核心功能

  • 实验追踪:记录和管理 ML 实验
  • 模型注册表:版本控制和生命周期管理
  • 项目管理:可重现的 ML 项目打包
  • 模型服务:部署和推理服务

架构特点

  • 模块化设计,高内聚低耦合
  • 多语言支持,丰富的生态系统
  • 扩展性强,支持自定义插件
  • 企业级稳定性,生产环境验证

技术优势

  • 标准化的 ML 元数据管理
  • 灵活的存储后端支持
  • 高效的并发处理能力
  • 完整的实验生命周期管理

应用场景

  • 个人项目实验管理
  • 团队协作实验追踪
  • 企业级 MLOps 平台
  • 科研实验复现

MLflow 作为开源机器学习生命周期管理平台,通过标准化的接口和丰富的功能,为机器学习实验、模型管理和项目部署提供了完整的解决方案。