源码级别解析 · 源码级别解析 · 框架演进与架构设计
2026-04-29 | 每日技术深度解读
重点解析Keras如何实现框架无关性
Keras始终坚持"Deep Learning for humans"的核心理念
# 安装Keras 3
pip install --upgrade keras
# 配置后端
export KERAS_BACKEND="jax"
# 或者在代码中配置
import os
os.environ["KERAS_BACKEND"] = "jax"
import keras
print(keras.__version__)
# 安装对应后端
pip install tensorflow # TensorFlow后端
pip install jax # JAX后端
pip install torch # PyTorch后端
Keras 3支持JAX、TensorFlow、PyTorch三大后端
Keras 3旨在成为深度学习的"瑞士军刀"
| 后端 | 最低版本 | GPU支持 | 特点 |
|---|---|---|---|
| TensorFlow | 2.16.1 | ✅ | 生产环境成熟 |
| JAX | 0.4.20 | ✅ | 高性能计算 |
| PyTorch | 2.1.0 | ✅ | 研究友好 |
| OpenVINO | 2025.3.0 | ❌ | 推理专用 |
分层设计确保了可扩展性和兼容性
清晰的分层架构实现了真正的框架无关
Model API是Keras与其他框架交互的核心
class Model:
"""Base class for all Keras models.
Args:
inputs: Input tensor(s).
outputs: Output tensor(s).
name: String name of the model.
trainable: Boolean, whether the model weights are trainable.
Keras models have:
- `model.layers`: list of layers in the model
- `model.inputs`: list of input tensors
- `model.outputs`: list of output tensors
- `model.summary()`: prints a string summary
"""
def __init__(
self,
inputs=None,
outputs=None,
name=None,
trainable=True,
def compile(
self,
optimizer="rmsprop",
loss=None,
metrics=None,
loss_weights=None,
weighted_metrics=None,
run_eagerly=False,
steps_per_execution=None,
jit_compile=None,
):
"""Configures the model for training."""
def fit(
self,
x=None,
y=None,
... # 其他参数
):
"""Trains the model for a fixed number of epochs."""
Model类是Keras所有模型的基础类
层的统一设计实现了即插即用的模块化
class Layer:
"""Base class for all Keras layers.
All Keras layers have:
- `input_spec`: Input specification (list of InputSpec instances)
- `output_spec`: Output specification (list of InputSpec instances)
- `trainable_weights`: List of trainable variables
- `non_trainable_weights`: List of non-trainable variables
- `weights`: List of all variables
- `name`: String name of the layer
- """
def __init__(self, **kwargs):
self._trainable = True
self._activity_regularizer = None
def build(self, input_shape):
"""Creates the layer's variables.
This method is called the first time the layer is used.
"""
pass
def call(self, inputs, **kwargs):
"""Computes the output of the layer.
Args:
inputs: Input tensor.
"""
pass
def compute_output_shape(self, input_shape):
"""Computes the output shape of the layer."""
pass
Layer基类定义了所有层的标准接口
后端抽象是Keras 3最大的创新
class Backend:
"""Backend API for Keras.
This defines the interface that all backends must implement.
"""
def name(self):
"""Returns the name of the backend."""
raise NotImplementedError
def convert_to_tensor(self, x, dtype=None):
"""Converts a value to a Tensor."""
raise NotImplementedError
def compute_output_spec(self, layer, input_spec):
"""Computes the output spec for a layer."""
raise NotImplementedError
def execute_operation(self, operation, inputs, kwargs):
"""Executes an operation on the backend."""
raise NotImplementedError
# 其他核心方法...
class TensorFlowBackend(Backend):
"""TensorFlow backend implementation."""
def name(self):
return "tensorflow"
def convert_to_tensor(self, x, dtype=None):
import tensorflow as tf
return tf.convert_to_tensor(x, dtype=dtype)
Backend基类定义了所有后端必须实现的方法
Keras Ops是实现跨框架的关键
def dense_op(inputs, weights, bias, activation=None):
"""Dense operation implementation using Keras Ops.
This operation works transparently across backends.
"""
# 矩阵乘法(后端无关)
outputs = keras.ops.matmul(inputs, weights)
# 偏置加法
outputs = outputs + bias
# 激活函数(后端无关)
if activation == "relu":
outputs = keras.ops.relu(outputs)
elif activation == "sigmoid":
outputs = keras.ops.sigmoid(outputs)
return outputs
# 使用示例
outputs = dense_op(inputs, kernel, bias, activation="relu")
Keras Ops提供了跨框架的统一算子
灵活的后端切换是Keras 3的重要特性
# 后端配置检查
def get_backend():
"""获取当前配置的后端."""
import os
import json
# 1. 检查环境变量
env_backend = os.environ.get("KERAS_BACKEND")
if env_backend:
return env_backend
# 2. 检查配置文件
config_path = os.path.expanduser("~/.keras/keras.json")
if os.path.exists(config_path):
with open(config_path, "r") as f:
config = json.load(f)
return config.get("backend", "tensorflow")
# 3. 默认后端
return "tensorflow"
# 后端验证
def validate_backend(backend_name):
"""验证后端是否可用."""
supported_backends = ["tensorflow", "jax", "torch", "openvino"]
return backend_name in supported_backends
Keras提供了灵活的后端配置方式
性能优化是Keras 3的重要改进
# JAX后端启用JIT编译
import jax
import jax.numpy as jnp
import keras
# 配置JAX后端
os.environ["KERAS_BACKEND"] = "jax"
# 模型定义
def create_model():
inputs = keras.Input(shape=(784,))
x = keras.layers.Dense(512, activation="relu")(inputs)
x = keras.layers.Dense(256, activation="relu")(x)
outputs = keras.layers.Dense(10, activation="softmax")(x)
return keras.Model(inputs, outputs)
# 启用JIT编译
@jax.jit
def train_step(model, x_batch, y_batch):
with tf.GradientTape() as tape:
predictions = model(x_batch, training=True)
loss = keras.losses.sparse_categorical_crossentropy(y_batch, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
# 训练循环
for epoch in range(epochs):
for x_batch, y_batch in dataset:
loss = train_step(model, x_batch, y_batch)
JAX JIT编译可以显著提升训练速度
XLA编译可进一步提升性能
# 启用XLA编译
def enable_xla():
"""启用XLA编译优化."""
import os
# JAX后端XLA配置
if get_backend() == "jax":
os.environ["XLA_FLAGS"] = (
"--xla_force_host_device_count=4 "
"--xla_gpu_data_dir=/usr/local/cuda"
)
# 启用XLA JIT
jax.config.update("jax_jit_compile", True)
# TensorFlow后端XLA配置
elif get_backend() == "tensorflow":
# 在模型编译时启用XLA
model.compile(
optimizer="adam",
loss="categorical_crossentropy",
metrics=["accuracy"],
jit_compile=True # 启用XLA编译
)
def create_xla_model():
"""创建支持XLA的模型."""
inputs = keras.Input(shape=(28, 28, 1))
x = keras.layers.Conv2D(32, 3, activation="relu")(inputs)
x = keras.layers.MaxPooling2D()(x)
x = keras.layers.Flatten()(x)
outputs = keras.layers.Dense(10, activation="softmax")(x)
model = keras.Model(inputs, outputs)
# 启用XLA编译
model.compile(
optimizer="adam",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
jit_compile=True
)
return model
XLA编译可显著提升计算密集型操作的性能
混合精度训练是现代深度学习的标配
# 自动混合精度设置
import keras
# 启用混合精度
policy = keras.mixed_precision.Policy("mixed_float16")
keras.mixed_precision.set_global_policy(policy)
# 或者通过环境变量设置
os.environ["KERAS_ENABLE_MIXED_PRECISION"] = "1"
# 模型构建(自动处理精度转换)
def create_mixed_precision_model():
inputs = keras.Input(shape=(784,), name="input")
# 主权重使用FP16,BN使用FP32
x = keras.layers.Dense(512, activation="relu",
kernel_initializer="glorot_normal",
name="dense1")(inputs)
# 批归一化使用FP32以确保数值稳定性
x = keras.layers.BatchNormalization(name="batch_norm1")(x)
# 输出层保持FP32精度
outputs = keras.layers.Dense(10, activation="softmax",
name="output")(x)
return keras.Model(inputs, outputs)
# 训练时自动处理梯度缩放
optimizer = keras.optimizers.Adam(learning_rate=0.001)
optimizer = keras.mixed_precision.LossScaleOptimizer(optimizer)
混合精度在保持精度的同时提升性能
异步执行最大化硬件利用率
# 异步数据加载器
class AsyncDataLoader:
"""异步数据加载器,提升训练效率."""
def __init__(self, dataset, batch_size=32, shuffle=True):
self.dataset = dataset
self.batch_size = batch_size
self.shuffle = shuffle
self.prefetch_buffer_size = tf.data.AUTOTUNE
def get_dataset(self):
"""返回异步优化的数据集."""
dataset = self.dataset.batch(self.batch_size)
if self.shuffle:
dataset = dataset.shuffle(buffer_size=10000)
# 预加载数据
dataset = dataset.prefetch(buffer_size=self.prefetch_buffer_size)
# 并行处理
dataset = dataset.interleave(
lambda x: x,
cycle_length=4,
num_parallel_calls=tf.data.AUTOTUNE
)
return dataset
# 使用示例
data_loader = AsyncDataLoader(train_dataset, batch_size=64)
train_dataset = data_loader.get_dataset()
# 训练循环(异步执行)
for batch in train_dataset:
x_batch, y_batch = batch
# 模型训练(与前一个batch的数据加载并行)
loss = model.train_on_batch(x_batch, y_batch)
异步数据加载显著提升训练效率
分布式训练支持大规模模型训练
# 多GPU数据并行
def create_distributed_model():
"""创建支持多GPU数据并行的模型."""
# 策略配置
strategy = tf.distribute.MirroredStrategy()
print("Number of devices: {}".format(strategy.num_replicas_in_sync))
with strategy.scope():
# 模型构建(每个GPU复制)
inputs = keras.Input(shape=(784,))
x = keras.layers.Dense(512, activation="relu")(inputs)
x = keras.layers.Dense(256, activation="relu")(x)
outputs = keras.layers.Dense(10, activation="softmax")(x)
model = keras.Model(inputs, outputs)
# 优化器(学习率按GPU数量缩放)
learning_rate = 0.001 * strategy.num_replicas_in_sync
optimizer = keras.optimizers.Adam(learning_rate)
# 模型编译
model.compile(
optimizer=optimizer,
loss="sparse_categorical_crossentropy",
metrics=["accuracy"]
)
return model
# 使用示例
strategy = tf.distribute.MirroredStrategy()
model = create_distributed_model()
# 分布式训练
model.fit(train_dataset, epochs=10, validation_data=val_dataset)
多GPU训练可以显著加速大规模模型训练
内存管理支持超大模型训练
# 梯度检查点优化
from tensorflow.python.training import py_checkpoint_ops
class GradientCheckpointing:
"""梯度检查点优化器,节省内存."""
def __init__(self, checkpoint_every=1):
self.checkpoint_every = checkpoint_every
self.saved_tensors = {}
def apply(self, layer, inputs, outputs):
"""应用梯度检查点."""
layer_id = id(layer)
# 每隔一定层进行一次检查点
if layer_id % self.checkpoint_every == 0:
# 保存中间结果,重新计算时恢复
tensor_name = f"checkpoint_{layer_id}"
self.saved_tensors[tensor_name] = outputs
return outputs
else:
return outputs
def reconstruct(self, layer_id):
"""重新计算检查点."""
tensor_name = f"checkpoint_{layer_id}"
if tensor_name in self.saved_tensors:
return self.saved_tensors[tensor_name]
return None
# 使用示例
checkpointing = GradientCheckpointing(checkpoint_every=3)
# 模型构建
def create_checkpoint_model():
inputs = keras.Input(shape=(784,))
x = inputs
# 大型网络
for i in range(20):
layer = keras.layers.Dense(512, activation="relu")
x = layer(x)
# 应用梯度检查点
x = checkpointing.apply(layer, x, x)
outputs = keras.layers.Dense(10, activation="softmax")(x)
return keras.Model(inputs, outputs)
梯度检查点用计算换内存,适合超大模型
完善的序列化支持模型部署
# 模型保存与加载
import keras
import tensorflow as tf
# 创建示例模型
def create_sample_model():
inputs = keras.Input(shape=(784,))
x = keras.layers.Dense(512, activation="relu")(inputs)
x = keras.layers.Dropout(0.2)(x)
outputs = keras.layers.Dense(10, activation="softmax")(x)
return keras.Model(inputs, outputs)
# 创建模型
model = create_sample_model()
# 1. 保存为.keras格式(推荐)
model.save("my_model.keras")
# 2. 保存为SavedModel格式
model.save("my_model_savedmodel")
# 3. 只保存权重
model.save_weights("my_weights.keras")
# 4. 保存为HDF5格式(遗留支持)
model.save("my_model.h5")
# 模型加载
# 从.keras文件加载
loaded_model = keras.models.load_model("my_model.keras")
# 从SavedModel加载
loaded_model = keras.models.load_model("my_model_savedmodel")
# 从权重加载模型
new_model = create_sample_model()
new_model.load_weights("my_weights.keras")
# 验证模型
print("模型结构:")
loaded_model.summary()
# 验证加载的模型可以正常预测
import numpy as np
test_input = np.random.random((1, 784))
prediction = loaded_model.predict(test_input)
print(f"预测结果:{prediction}")
.keras格式是Keras 3推荐的序列化格式
多种部署选项满足不同场景需求
# 模型导出到不同格式
import keras
import tensorflow as tf
from tf_keras.src import saved_model
# 创建示例模型
def create_export_model():
inputs = keras.Input(shape=(28, 28, 1))
x = keras.layers.Conv2D(32, 3, activation="relu")(inputs)
x = keras.layers.MaxPooling2D()(x)
x = keras.layers.Flatten()(x)
outputs = keras.layers.Dense(10, activation="softmax")(x)
return keras.Model(inputs, outputs)
model = create_export_model()
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
# 1. 导出为TensorFlow SavedModel
saved_model.save(model, "export_model", save_format="tf")
# 2. 导出为ONNX格式
import tf2onnx
onnx_model, _ = tf2onnx.convert.from_keras(model)
with open("export_model.onnx", "wb") as f:
f.write(onnx_model.SerializeToString())
# 3. 转换为TensorFlow Lite(移动端)
converter = tf.lite.TFLiteConverter.from_keras_model(model)
lite_model = converter.convert()
with open("export_model.tflite", "wb") as f:
f.write(lite_model)
# 4. Web部署准备
# 将模型转换为TensorFlow.js格式
tf.saved_model.save(
model,
"export_model_web",
signatures="serving_default"
)
!saved_model_cli show --dir export_model_web --all
Keras支持导出到多种部署格式
与TensorFlow的无缝集成是Keras的传统优势
# TensorFlow后端配置优化
import os
import tensorflow as tf
import keras
# TensorFlow 2.16+ 后端配置
os.environ["KERAS_BACKEND"] = "tensorflow"
# 1. TF兼容性配置
def setup_tf_compatibility():
"""配置TensorFlow兼容性."""
# 启用eager execution(TF 2.x默认)
tf.config.run_functions_eagerly(True)
# 禁用v2行为(可选)
tf.compat.v1.disable_v2_behavior()
# GPU内存增长设置
gpus = tf.config.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
except RuntimeError as e:
print(e)
# 2. 优化TF编译
def create_optimized_tf_model():
"""创建优化的TensorFlow模型."""
setup_tf_compatibility()
inputs = keras.Input(shape=(784,))
# 使用TF优化操作
x = keras.layers.Dense(512, activation="relu")(inputs)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Dropout(0.2)(x)
# 第二层
x = keras.layers.Dense(256, activation="relu")(x)
x = keras.layers.BatchNormalization()(x)
outputs = keras.layers.Dense(10, activation="softmax")(x)
model = keras.Model(inputs, outputs)
# 优化器配置
optimizer = tf.keras.optimizers.Adam(
learning_rate=0.001,
beta_1=0.9,
beta_2=0.999,
epsilon=1e-07
)
model.compile(
optimizer=optimizer,
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
jit_compile=True # 启用XLA编译
)
return model
# 3. TF数据集优化
def create_tf_dataset(dataset):
"""创建优化的TF数据集."""
return (
dataset
.batch(32)
.prefetch(tf.data.AUTOTUNE)
.shuffle(10000)
)
TensorFlow后端提供了丰富的优化选项
JAX后端提供顶级性能
# JAX后端高级配置
import os
import jax
import jax.numpy as jnp
import jax.random as random
import keras
# 配置JAX后端
os.environ["KERAS_BACKEND"] = "jax"
# 1. JAX JIT优化
@jax.jit
def train_step_jax(model, params, batch):
"""JAX JIT优化的训练步骤."""
x, y = batch
def loss_fn(params):
predictions = model.apply({"params": params}, x)
return model.loss(y, predictions)
# 计算梯度
grad_fn = jax.value_and_grad(loss_fn)
loss, grads = grad_fn(params)
# 更新参数
updates = model.optimizer.apply_gradients(grads, params)
new_params = updates
return loss, new_params
# 2. 自定义JAX层
class JAXCustomLayer(keras.layers.Layer):
"""自定义JAX层示例."""
def __init__(self, units, **kwargs):
super().__init__(**kwargs)
self.units = units
def build(self, input_shape):
# JAX初始化
key = self.make_rng("kernel")
self.kernel = self.add_variable(
"kernel",
shape=[input_shape[-1], self.units],
initializer=jax.nn.initializers.glorot_uniform(key)
)
self.bias = self.add_variable(
"bias",
shape=[self.units],
initializer=jax.nn.initializers.zeros
)
def call(self, inputs):
# JAX操作
return jnp.dot(inputs, self.kernel) + self.bias
# 3. JAX模型
def create_jax_model():
"""创建JAX后端模型."""
inputs = keras.Input(shape=(784,))
# 使用JAX自定义层
x = JAXCustomLayer(512, activation="relu")(inputs)
x = keras.layers.Dropout(0.2)(x)
x = JAXCustomLayer(256, activation="relu")(x)
outputs = keras.layers.Dense(10, activation="softmax")(x)
model = keras.Model(inputs, outputs)
model.compile(
optimizer="adam",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"]
)
return model
JAX后端提供JAX的所有强大功能
PyTorch后端让PyTorch用户享受Keras的便利
# PyTorch后端配置
import os
import torch
import torch.nn as nn
import torch.optim as optim
import keras
# 配置PyTorch后端
os.environ["KERAS_BACKEND"] = "torch"
# 1. PyTorch数据集适配
class PyTorchDataset:
"""适配PyTorch数据集到Keras."""
def __init__(self, torch_dataset):
self.torch_dataset = torch_dataset
def __len__(self):
return len(self.torch_dataset)
def __getitem__(self, idx):
x, y = self.torch_dataset[idx]
return x.numpy(), y.numpy()
# 2. PyTorch模块包装
class PyTorchWrapper(keras.Model):
"""PyTorch模型包装器."""
def __init__(self, pytorch_model, **kwargs):
super().__init__(**kwargs)
self.pytorch_model = pytorch_model
def call(self, inputs):
# Keras张量转换为PyTorch张量
tensor = torch.from_numpy(inputs.numpy())
# PyTorch前向传播
with torch.no_grad():
output = self.pytorch_model(tensor)
# 返回Keras张量
return output.numpy()
# 3. 混合训练示例
def create_hybrid_training():
"""创建PyTorch + Keras混合训练."""
# PyTorch模型
class SimpleNet(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(784, 512)
self.fc2 = nn.Linear(512, 256)
self.fc3 = nn.Linear(256, 10)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.2)
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.dropout(x)
x = self.relu(self.fc2(x))
x = self.dropout(x)
return self.fc3(x)
# 创建模型
pytorch_model = SimpleNet()
keras_model = PyTorchWrapper(pytorch_model)
# Keras编译
keras_model.compile(
optimizer="adam",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"]
)
return keras_model
PyTorch后端实现了两大框架的无缝集成
图像分类是深度学习的经典应用
# 完整的图像分类实现
import keras
import tensorflow as tf
import numpy as np
from tensorflow import keras as tf_keras
# 1. 数据加载和预处理
def load_and_preprocess_data():
"""加载MNIST数据集并进行预处理."""
# 加载数据
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# 数据预处理
x_train = x_train.astype("float32") / 255.0
x_test = x_test.astype("float32") / 255.0
# 添加通道维度(灰度图)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
# 类别编码
y_train = keras.utils.to_categorical(y_train, 10)
y_test = keras.utils.to_categorical(y_test, 10)
return (x_train, y_train), (x_test, y_test)
# 2. 数据增强
def create_data_augmentation():
"""创建数据增强层."""
return tf.keras.Sequential([
tf.keras.layers.RandomRotation(0.1),
tf.keras.layers.RandomZoom(0.1),
tf.keras.layers.RandomFlip("horizontal")
])
# 3. CNN模型构建
def create_cnn_model():
"""构建CNN模型."""
inputs = keras.Input(shape=(28, 28, 1), name="inputs")
# 数据增强(仅在训练时)
augmentation = create_data_augmentation()
x = augmentation(inputs)
# 卷积层1
x = keras.layers.Conv2D(32, 3, activation="relu", padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.MaxPooling2D()(x)
x = keras.layers.Dropout(0.25)(x)
# 卷积层2
x = keras.layers.Conv2D(64, 3, activation="relu", padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.MaxPooling2D()(x)
x = keras.layers.Dropout(0.25)(x)
# 卷积层3
x = keras.layers.Conv2D(128, 3, activation="relu", padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.MaxPooling2D()(x)
x = keras.layers.Dropout(0.25)(x)
# 展平层
x = keras.layers.Flatten()(x)
# 全连接层
x = keras.layers.Dense(256, activation="relu")(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Dropout(0.5)(x)
# 输出层
outputs = keras.layers.Dense(10, activation="softmax", name="outputs")(x)
model = keras.Model(inputs, outputs)
# 编译模型
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss="categorical_crossentropy",
metrics=["accuracy"]
)
return model
# 4. 训练和评估
def train_and_evaluate():
"""训练和评估模型."""
# 加载数据
(x_train, y_train), (x_test, y_test) = load_and_preprocess_data()
# 创建模型
model = create_cnn_model()
# 训练
history = model.fit(
x_train, y_train,
batch_size=128,
epochs=20,
validation_split=0.2,
callbacks=[
keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True),
keras.callbacks.ReduceLROnPlateau(factor=0.1, patience=2)
]
)
# 评估
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f"测试准确率: {test_acc:.4f}")
return model, history
# 执行训练
if __name__ == "__main__":
model, history = train_and_evaluate()
CNN是图像分类的标准架构
NLP是Keras的另一个重要应用领域
# 文本分类模型实现
import keras
import tensorflow as tf
from tensorflow import keras as tf_keras
import numpy as np
# 1. 数据预处理
def preprocess_text_data():
"""文本数据预处理."""
# 加载IMDb电影评论数据集
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(
num_words=10000 # 只考虑前10000个最常见词
)
# 单词到索引的映射
word_index = keras.datasets.imdb.get_word_index()
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])
# 填充序列到固定长度
x_train = tf.keras.preprocessing.sequence.pad_sequences(
x_train, maxlen=200
)
x_test = tf.keras.preprocessing.sequence.pad_sequences(
x_test, maxlen=200
)
return (x_train, y_train), (x_test, y_test), reverse_word_index
# 2. 文本分类模型
def create_text_classification_model():
"""创建文本分类模型."""
# 输入层
inputs = keras.Input(shape=(200,), name="text_input")
# 嵌入层
embedding = keras.layers.Embedding(
input_dim=10000,
output_dim=128,
name="embedding"
)(inputs)
# LSTM层
lstm = keras.layers.LSTM(
64,
return_sequences=True,
name="lstm"
)(embedding)
# 注意力机制
attention = keras.layers.Attention(name="attention")([lstm, lstm])
# 全局平均池化
global_pool = keras.layers.GlobalAveragePooling1D(name="global_pool")(attention)
# Dropout
dropout = keras.layers.Dropout(0.5, name="dropout")(global_pool)
# 输出层
outputs = keras.layers.Dense(
1,
activation="sigmoid",
name="output"
)(dropout)
model = keras.Model(inputs, outputs)
# 编译模型
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss="binary_crossentropy",
metrics=["accuracy"]
)
return model
# 3. 模型训练
def train_text_classification():
"""训练文本分类模型."""
# 加载数据
(x_train, y_train), (x_test, y_test), _ = preprocess_text_data()
# 创建模型
model = create_text_classification_model()
# 训练
history = model.fit(
x_train, y_train,
batch_size=32,
epochs=10,
validation_split=0.2,
callbacks=[
keras.callbacks.EarlyStopping(patience=2),
keras.callbacks.ModelCheckpoint(
"best_text_classifier.keras",
save_best_only=True
)
]
)
# 评估
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f"测试准确率: {test_acc:.4f}")
return model
# 执行训练
if __name__ == "__main__":
model = train_text_classification()
LSTM是处理序列数据的经典模型
推荐系统是Keras的重要商业应用
# 神经协同过滤推荐系统
import keras
import tensorflow as tf
import numpy as np
import pandas as pd
class NeuralCollaborativeFiltering:
"""神经协同过滤推荐系统."""
def __init__(self, num_users, num_items, embedding_dim=50):
self.num_users = num_users
self.num_items = num_items
self.embedding_dim = embedding_dim
def build_model(self):
"""构建神经协同过滤模型."""
# 用户输入
user_input = keras.Input(shape=(1,), name="user_id")
user_embedding = keras.layers.Embedding(
self.num_users, self.embedding_dim, name="user_embedding"
)(user_input)
user_vec = keras.layers.Flatten(name="user_vec")(user_embedding)
# 物品输入
item_input = keras.Input(shape=(1,), name="item_id")
item_embedding = keras.layers.Embedding(
self.num_items, self.embedding_dim, name="item_embedding"
)(item_input)
item_vec = keras.layers.Flatten(name="item_vec")(item_embedding)
# 特征工程
# 用户特征
user_features = keras.Input(
shape=(10,), name="user_features"
)
user_dense = keras.layers.Dense(
32, activation="relu", name="user_dense"
)(user_features)
# 物品特征
item_features = keras.Input(
shape=(15,), name="item_features"
)
item_dense = keras.layers.Dense(
32, activation="relu", name="item_dense"
)(item_features)
# 组合特征
concat = keras.layers.Concatenate(name="concat")([
user_vec, item_vec, user_dense, item_dense
])
# 隐藏层
hidden1 = keras.layers.Dense(
128, activation="relu", name="hidden1"
)(concat)
hidden2 = keras.layers.Dense(
64, activation="relu", name="hidden2"
)(hidden1)
# 输出层(预测评分)
rating_output = keras.layers.Dense(
1, activation="linear", name="rating"
)(hidden2)
# 模型定义
model = keras.Model(
inputs=[user_input, item_input, user_features, item_features],
outputs=rating_output
)
# 编译
model.compile(
optimizer="adam",
loss="mse",
metrics=["mae"]
)
return model
def predict_rating(self, user_id, item_id, user_features, item_features):
"""预测用户对物品的评分."""
return self.model.predict({
"user_id": [user_id],
"item_id": [item_id],
"user_features": [user_features],
"item_features": [item_features]
})
# 使用示例
def create_and_train_recommender():
"""创建和训练推荐系统."""
# 假设数据
num_users = 10000
num_items = 5000
# 创建模型
ncf = NeuralCollaborativeFiltering(num_users, num_items)
model = ncf.build_model()
# 模型总结
model.summary()
return model
# 执行创建
if __name__ == "__main__":
model = create_and_train_recommender()
神经协同过滤是现代推荐系统的核心技术
性能测试验证Keras 3的多后端优势
# Keras 3 性能基准测试
import time
import memory_profiler
import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
class PerformanceBenchmark:
"""Keras 3 性能基准测试类."""
def __init__(self, model_name, input_shape=(1000, 100)):
self.model_name = model_name
self.input_shape = input_shape
self.test_data = np.random.random((1000,) + input_shape)
def build_test_model(self, backend="tensorflow"):
"""构建测试模型."""
# 设置后端
import os
os.environ["KERAS_BACKEND"] = backend
# 重新导入keras以应用后端设置
import importlib
import keras as keras_module
importlib.reload(keras_module)
# 创建简单模型
inputs = keras.Input(shape=self.input_shape)
x = keras.layers.Dense(512, activation="relu")(inputs)
x = keras.layers.Dense(256, activation="relu")(x)
outputs = keras.layers.Dense(10, activation="softmax")(x)
model = keras.Model(inputs, outputs)
model.compile(
optimizer="adam",
loss="categorical_crossentropy",
metrics=["accuracy"]
)
return model
def measure_training_time(self, model, epochs=5):
"""测量训练时间."""
# 创建随机标签
y_train = np.random.randint(0, 10, size=(1000,))
y_train = keras.utils.to_categorical(y_train, 10)
# 训练并计时
start_time = time.time()
history = model.fit(
self.test_data, y_train,
epochs=epochs,
batch_size=32,
verbose=0
)
training_time = time.time() - start_time
return {
"training_time": training_time,
"avg_time_per_epoch": training_time / epochs,
"final_loss": history.history["loss"][-1],
"final_accuracy": history.history["accuracy"][-1]
}
def measure_memory_usage(self, model):
"""测量内存使用."""
@memory_profiler.profile
def train_with_memory_profiling():
y_train = np.random.randint(0, 10, size=(1000,))
y_train = keras.utils.to_categorical(y_train, 10)
model.fit(
self.test_data, y_train,
epochs=3,
batch_size=32,
verbose=0
)
return train_with_memory_profiling()
def run_benchmark(self, backends=["tensorflow", "jax", "torch"]):
"""运行基准测试."""
results = {}
for backend in backends:
try:
print(f"测试 {backend} 后端...")
# 构建模型
model = self.build_test_model(backend)
# 测量训练时间
time_results = self.measure_training_time(model)
# 测量内存使用
memory_results = self.measure_memory_usage(model)
results[backend] = {
**time_results,
"memory_usage": memory_results
}
except Exception as e:
print(f"{backend} 后端测试失败: {e}")
results[backend] = {
"error": str(e)
}
return results
# 执行基准测试
if __name__ == "__main__":
benchmark = PerformanceBenchmark("deep_network")
print("开始Keras 3性能基准测试...")
results = benchmark.run_benchmark()
# 显示结果
print("\n=== 性能基准测试结果 ===")
for backend, data in results.items():
if "error" not in data:
print(f"\n{backend} 后端:")
print(f" 平均训练时间: {data['avg_time_per_epoch']:.2f}s")
print(f" 最终损失: {data['final_loss']:.4f}")
print(f" 最终准确率: {data['final_accuracy']:.4f}")
else:
print(f"\n{backend} 后端: {data['error']}")
性能基准测试帮助选择最适合的后端
强大的调试工具提升开发效率
# Keras 3 模型可视化与监控
import keras
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tensorflow import keras as tf_keras
# 1. 模型可视化
def visualize_model(model):
"""可视化模型结构."""
# 使用TensorBoard
log_dir = "logs/fit/"
tensorboard_callback = tf_keras.callbacks.TensorBoard(
log_dir=log_dir,
histogram_freq=1,
profile_batch="10,20"
)
return tensorboard_callback
# 2. 自定义回调函数
class TrainingMonitor(keras.callbacks.Callback):
"""训练监控回调."""
def on_train_begin(self, logs=None):
self.train_losses = []
self.val_losses = []
self.train_accuracies = []
self.val_accuracies = []
print("训练开始...")
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
self.train_losses.append(logs.get("loss"))
self.val_losses.append(logs.get("val_loss"))
self.train_accuracies.append(logs.get("accuracy"))
self.val_accuracies.append(logs.get("val_accuracy"))
print(f"\nEpoch {epoch + 1}:")
print(f" Train Loss: {logs.get('loss'):.4f}, Val Loss: {logs.get('val_loss'):.4f}")
print(f" Train Acc: {logs.get('accuracy'):.4f}, Val Acc: {logs.get('val_accuracy'):.4f}")
def on_train_end(self, logs=None):
self.plot_training_history()
def plot_training_history(self):
"""绘制训练历史."""
plt.figure(figsize=(12, 4))
# 损失曲线
plt.subplot(1, 2, 1)
plt.plot(self.train_losses, label="Training Loss")
plt.plot(self.val_losses, label="Validation Loss")
plt.title("Model Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
# 准确率曲线
plt.subplot(1, 2, 2)
plt.plot(self.train_accuracies, label="Training Accuracy")
plt.plot(self.val_accuracies, label="Validation Accuracy")
plt.title("Model Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.tight_layout()
plt.savefig("training_history.png")
plt.show()
# 3. 梯度检查
class GradientInspector(keras.callbacks.Callback):
"""梯度检查器."""
def on_epoch_end(self, epoch, logs=None):
if epoch % 5 == 0: # 每5个epoch检查一次梯度
# 获取权重和梯度
gradients = []
for layer in self.model.layers:
if hasattr(layer, "kernel") and hasattr(layer, "kernel"):
if hasattr(layer, "get_gradients"):
grad = layer.get_gradients()
if grad is not None:
gradients.append(np.mean(np.abs(grad)))
if gradients:
avg_gradient = np.mean(gradients)
print(f"\nEpoch {epoch}: 平均梯度大小: {avg_gradient:.6f}")
# 梯度爆炸/消失检测
if avg_gradient > 1.0:
print("警告: 梯度爆炸!")
elif avg_gradient < 1e-6:
print("警告: 梯度消失!")
# 4. 模型训练示例
def train_with_monitoring():
"""带监控的模型训练."""
# 创建模型
model = keras.Sequential([
keras.layers.Dense(512, activation="relu", input_shape=(784,)),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.2),
keras.layers.Dense(256, activation="relu"),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.2),
keras.layers.Dense(10, activation="softmax")
])
model.compile(
optimizer="adam",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"]
)
# 生成随机数据
x_train = np.random.random((1000, 784))
y_train = np.random.randint(0, 10, size=(1000,))
# 训练
callbacks = [
TrainingMonitor(),
GradientInspector(),
visualize_model(model)
]
model.fit(
x_train, y_train,
epochs=20,
batch_size=32,
validation_split=0.2,
callbacks=callbacks
)
# 执行训练
if __name__ == "__main__":
train_with_monitoring()
完善的监控工具确保训练稳定性
遵循最佳实践提升开发效率
# Keras 3 开发最佳实践模板
import keras
import tensorflow as tf
import numpy as np
class KerasProjectTemplate:
"""Keras 3项目开发模板."""
def __init__(self, project_name="my_keras_project"):
self.project_name = project_name
self.model = None
self.history = None
def setup_data_pipeline(self, dataset_path=None):
"""设置数据处理管道."""
# 数据加载
def load_data():
if dataset_path:
# 自定义数据集
# 实现具体的数据加载逻辑
pass
else:
# 使用内置数据集
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
return (x_train, y_train), (x_test, y_test)
# 数据预处理
def preprocess_data(x, y):
# 数据标准化
x = x.astype("float32") / 255.0
# 数据增强(如果需要)
# x = self.data_augmentation(x)
# 类别编码
y = keras.utils.to_categorical(y, num_classes=10)
return x, y
# 数据集创建
(x_train, y_train), (x_test, y_test) = load_data()
x_train, y_train = preprocess_data(x_train, y_train)
x_test, y_test = preprocess_data(x_test, y_test)
return (x_train, y_train), (x_test, y_test)
def data_augmentation(self, x):
"""数据增强."""
augmentation = keras.Sequential([
keras.layers.RandomRotation(0.1),
keras.layers.RandomZoom(0.1),
keras.layers.RandomFlip("horizontal")
])
return augmentation(x)
def build_model(self, input_shape, num_classes):
"""构建模型."""
inputs = keras.Input(shape=input_shape)
# 特征提取层
x = keras.layers.Conv2D(32, 3, activation="relu", padding="same")(inputs)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.MaxPooling2D()(x)
x = keras.layers.Dropout(0.25)(x)
# 深层特征提取
x = keras.layers.Conv2D(64, 3, activation="relu", padding="same")(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.MaxPooling2D()(x)
x = keras.layers.Dropout(0.25)(x)
# 分类头
x = keras.layers.Flatten()(x)
x = keras.layers.Dense(128, activation="relu")(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Dropout(0.5)(x)
outputs = keras.layers.Dense(num_classes, activation="softmax")(x)
model = keras.Model(inputs, outputs)
return model
def compile_model(self, model, learning_rate=0.001):
"""编译模型."""
optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(
optimizer=optimizer,
loss="categorical_crossentropy",
metrics=["accuracy", "top_k_categorical_accuracy"]
)
return model
def create_callbacks(self):
"""创建回调函数."""
callbacks = [
# 早停
keras.callbacks.EarlyStopping(
patience=10,
restore_best_weights=True,
monitor="val_loss",
mode="min"
),
# 学习率调度
keras.callbacks.ReduceLROnPlateau(
factor=0.1,
patience=5,
min_lr=1e-7,
monitor="val_loss",
mode="min"
),
# 模型检查点
keras.callbacks.ModelCheckpoint(
filepath=f"{self.project_name}_best.keras",
save_best_only=True,
monitor="val_accuracy",
mode="max"
),
# TensorBoard日志
keras.callbacks.TensorBoard(
log_dir=f"logs/{self.project_name}",
histogram_freq=1
)
]
return callbacks
def train(self, batch_size=64, epochs=100):
"""训练模型."""
# 设置数据管道
(x_train, y_train), (x_test, y_test) = self.setup_data_pipeline()
# 构建模型
input_shape = x_train.shape[1:] # 去掉batch维度
num_classes = y_train.shape[1]
self.model = self.build_model(input_shape, num_classes)
self.model = self.compile_model(self.model)
# 创建回调
callbacks = self.create_callbacks()
# 训练
self.history = self.model.fit(
x_train, y_train,
batch_size=batch_size,
epochs=epochs,
validation_split=0.2,
callbacks=callbacks,
verbose=1
)
return self.history
def evaluate(self):
"""评估模型."""
if self.model is None:
raise ValueError("请先训练模型")
(x_train, y_train), (x_test, y_test) = self.setup_data_pipeline()
# 在测试集上评估
test_loss, test_acc, test_top_k = self.model.evaluate(x_test, y_test)
print(f"测试损失: {test_loss:.4f}")
print(f"测试准确率: {test_acc:.4f}")
print(f"Top-K准确率: {test_top_k:.4f}")
return test_loss, test_acc, test_top_k
# 使用模板
if __name__ == "__main__":
# 创建项目
project = KerasProjectTemplate("mnist_classifier")
# 训练模型
history = project.train(batch_size=128, epochs=50)
# 评估模型
project.evaluate()
模板化开发确保代码质量和一致性
完整的生态系统支持各种深度学习任务
# KerasCV 使用示例 - 图像分类
import keras_cv
import keras
import tensorflow as tf
import numpy as np
# 1. 数据增强
create_data_augmentation = ()
data_augmentation = keras.Sequential([
keras_cv.layers.RandomFlip(mode="horizontal"),
keras_cv.layers.RandomRotation(factor=0.1),
keras_cv.layers.RandomZoom(height_factor=0.1),
keras_cv.layers.RandomBrightness(value_factor=0.1),
keras_cv.layers.RandomContrast(factor=0.1),
])
# 2. 目标检测模型
def create_object_detection_model():
"""创建目标检测模型."""
# 使用预训练的ResNet50作为backbone
backbone = keras_cv.models.ResNet50(
include_rescaling=True,
include_top=False,
input_shape=(224, 224, 3)
)
# FPN特征金字塔
fpn = keras_cv.models.FPN(
backbone=backbone,
bounding_box_format="xywh"
)
# RetinaNet检测头
classification_head = keras_cv.layers.RetinaNetClassificationHead(
num_classes=20, # 20个物体类别
num_anchors=9,
name="classification_head"
)
box_head = keras_cv.layers.RetinaNetBoxHead(
num_anchors=9,
name="box_head"
)
# 组合模型
model = keras_cv.models.RetinaNet(
backbone=backbone,
classification_head=classification_head,
box_head=box_head,
bounding_box_format="xywh"
)
return model
# 3. 图像分割模型
def create_segmentation_model():
"""创建图像分割模型."""
# U-Net模型
model = keras_cv.models.UNet(
num_classes=21, # VOC数据集21类
input_shape=(256, 256, 3),
encoder_filters=[64, 128, 256, 512],
encoder_kernel_sizes=[3, 3, 3, 3],
decoder_filters=[512, 256, 128, 64],
decoder_kernel_sizes=[3, 3, 3, 3],
output_activation="softmax"
)
return model
# 4. 训练示例
def train_segmentation():
"""训练图像分割模型."""
model = create_segmentation_model()
model.compile(
optimizer="adam",
loss="categorical_crossentropy",
metrics=["accuracy"]
)
# 这里应该加载真实的分割数据集
# 这里使用随机数据作为示例
x_train = np.random.random((100, 256, 256, 3))
y_train = np.random.randint(0, 21, size=(100, 256, 256, 1))
model.fit(
x_train, y_train,
batch_size=8,
epochs=10,
validation_split=0.1
)
# 执行训练
if __name__ == "__main__":
train_segmentation()
KerasCV提供了专业的计算机视觉工具
预训练模型加速AI应用开发
# KerasHub 预训练模型使用示例
import keras_hub
import keras
import tensorflow as tf
import numpy as np
# 1. BERT文本分类
def create_bert_classifier():
"""创建BERT文本分类器."""
# 加载预训练BERT模型
bert_preprocessor = keras_hub.models.BertPreprocessor.from_preset(
"bert_base_en_uncased"
)
bert_backbone = keras_hub.models.BertBackbone.from_preset(
"bert_base_en_uncased"
)
# 分类模型
inputs = keras.Input(shape=(), dtype="string")
# 文本预处理
preprocessed = bert_preprocessor(inputs)
# BERT编码
encoded = bert_backbone(preprocessed)
# 池化层
pooled = encoded["pooled_output"]
# 分类头
outputs = keras.layers.Dense(
1, activation="sigmoid", name="classifier"
)(pooled)
model = keras.Model(inputs, outputs)
return model
# 2. ResNet图像分类
def create_resnet_classifier():
"""创建ResNet图像分类器."""
# 加载预训练ResNet
resnet_backbone = keras_hub.models.ResNet50Backbone.from_preset(
"resnet50_imagenet"
)
# 数据预处理
preprocessor = keras_hub.models.ResNetImageClassifierPreprocessor.from_preset(
"resnet50_imagenet"
)
# 图像分类器
model = keras_hub.models.ImageClassifier(
backbone=resnet_backbone,
preprocessor=preprocessor,
num_classes=1000
)
return model
# 3. GPT文本生成
def create_gpt_generator():
"""创建GPT文本生成器."""
# GPT预训练模型
gpt_backbone = keras_hub.models.GPT2Backbone.from_preset(
"gpt2_base_en"
)
# 文本生成器
model = keras_hub.models.GPT2CausalLM(
backbone=gpt_backbone,
preprocessor=keras_hub.models.GPT2CausalLMPreprocessor.from_preset(
"gpt2_base_en"
)
)
return model
# 4. 模型微调示例
def fine_tune_bert():
"""微调BERT模型."""
# 创建BERT分类器
model = create_bert_classifier()
# 编译模型
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=2e-5),
loss="binary_crossentropy",
metrics=["accuracy"]
)
# 示例数据(实际使用真实数据)
x_train = ["This is a positive review",
"This movie is terrible",
"I love this product",
"Terrible experience"]
y_train = [1, 0, 1, 0]
# 微调
model.fit(
x_train, y_train,
batch_size=2,
epochs=3
)
# 预测
test_text = ["This is a great movie"]
prediction = model.predict(test_text)
print(f"预测结果: {prediction}")
# 执行微调
if __name__ == "__main__":
fine_tune_bert()
预训练模型极大降低了深度学习的技术门槛
自动化超参数优化提升模型性能
# KerasTuner 超参数优化示例
import keras
import keras_tuner as kt
import tensorflow as tf
import numpy as np
class MyHyperModel(kt.HyperModel):
"""自定义超参数模型."""
def build(self, hp):
"""构建模型."""
model = keras.Sequential()
# 输入层
model.add(keras.layers.Input(shape=(784,)))
# 隐藏层数量 (1-5层)
num_layers = hp.Int("num_layers", min_value=1, max_value=5)
for i in range(num_layers):
# 每层神经元数量 (32-512)
units = hp.Int(f"units_{i}", min_value=32, max_value=512, step=32)
# 激活函数选择
activation = hp.Choice(f"activation_{i}", ["relu", "tanh", "sigmoid"])
# Dropout率 (0-0.5)
dropout_rate = hp.Float(f"dropout_{i}", min_value=0.0, max_value=0.5, step=0.1)
# 添加层
model.add(keras.layers.Dense(units, activation=activation))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dropout(dropout_rate))
# 输出层
model.add(keras.layers.Dense(10, activation="softmax"))
# 优化器选择
optimizer_name = hp.Choice("optimizer", ["adam", "rmsprop", "sgd"])
if optimizer_name == "adam":
optimizer = keras.optimizers.Adam(
learning_rate=hp.Float("learning_rate", min_value=1e-4, max_value=1e-2, sampling="log")
)
elif optimizer_name == "rmsprop":
optimizer = keras.optimizers.RMSprop(
learning_rate=hp.Float("learning_rate", min_value=1e-4, max_value=1e-2, sampling="log")
)
else:
optimizer = keras.optimizers.SGD(
learning_rate=hp.Float("learning_rate", min_value=1e-4, max_value=1e-2, sampling="log")
)
model.compile(
optimizer=optimizer,
loss="categorical_crossentropy",
metrics=["accuracy"]
)
return model
def create_random_search():
"""创建随机搜索超参数优化."""
tuner = kt.RandomSearch(
MyHyperModel(),
objective="val_accuracy",
max_trials=20,
executions_per_trial=2,
directory="my_tuning",
project_name="random_search"
)
return tuner
def create_bayesian_search():
"""创建贝叶斯优化."""
tuner = kt.BayesianOptimization(
MyHyperModel(),
objective="val_accuracy",
max_trials=20,
executions_per_trial=2,
directory="my_tuning",
project_name="bayesian_optimization"
)
return tuner
def create_grid_search():
"""创建网格搜索."""
# 网格搜索需要定义明确的参数组合
tuner = kt.GridSearch(
MyHyperModel(),
objective="val_accuracy",
max_trials=10,
executions_per_trial=1,
directory="my_tuning",
project_name="grid_search"
)
# 设置网格搜索的参数范围
tuner.search_space = {
"num_layers": [1, 2, 3],
"activation": ["relu", "tanh"],
"learning_rate": [0.001, 0.01]
}
return tuner
def run_hyperparameter_tuning():
"""运行超参数优化."""
# 生成随机数据
x_train = np.random.random((1000, 784))
y_train = np.random.randint(0, 10, size=(1000,))
y_train = keras.utils.to_categorical(y_train, 10)
x_val = np.random.random((200, 784))
y_val = np.random.randint(0, 10, size=(200,))
y_val = keras.utils.to_categorical(y_val, 10)
# 创建调谐器(选择一种)
# tuner = create_random_search()
tuner = create_bayesian_search()
# tuner = create_grid_search()
# 执行搜索
tuner.search(
x_train, y_train,
validation_data=(x_val, y_val),
epochs=10,
batch_size=32
)
# 获取最佳模型
best_model = tuner.get_best_models(num_models=1)[0]
best_model.summary()
# 获取最佳超参数
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"最佳超参数: {best_hps.values}")
# 评估最佳模型
test_loss, test_acc = best_model.evaluate(x_val, y_val)
print(f"最佳模型准确率: {test_acc:.4f}")
# 执行超参数优化
if __name__ == "__main__":
run_hyperparameter_tuning()
自动超参数优化是提升模型性能的关键
Keras持续进化,引领深度学习发展方向
Keras 3成为深度学习开发的理想选择
深入探讨Keras 3的各个方面
感谢阅读!
访问 https://atcfu.com/ai-articles/keras/ 回顾本文