源码深度解读
2026-03-28 | JAX Automatic Differentiation
第一部分:基础概念
第二部分:核心实现
第三部分:高级功能
第四部分:优化与实战
JAX 是 Google 开发的高性能数值计算库,基于 NumPy 和 XLA(Accelerated Linear Algebra)构建,为机器学习和科学计算提供强大的自动微分和 JIT 编译能力。
核心特性
主要优势
┌─────────────────────────────────────────────────────────┐
│ @jax/_src │
├─────────────────────────────────────────────────────────┤
│ Array │ Core │ AD │ JIT │ PMAP │ VMAP │
├─────────────────────────────────────────────────────────┤
│ dtypes │ util │ api │ xla │ mesh │ tree │
├─────────────────────────────────────────────────────────┤
│ Tracer │ Trace │ Jaxpr │ Transform │
├─────────────────────────────────────────────────────────┤
│ XLA Runtime │ MLIR │ HLO │ Hardware │
└─────────────────────────────────────────────────────────┘
JAX 采用分层架构:API 层 → Transform 层 → Core 层 → Runtime 层
自动微分(AD)是 JAX 的核心功能,通过计算函数的导数来实现梯度计算、反向传播等机器学习算法。
┌───────────────────────────────────────────────────────┐
│ 自动微分流程 │
├───────────────────────────────────────────────────────┤
│ 1. 前向计算 │
│ └→ f(x) = x² + 2x + 1 │
│ │
│ 2. 计算图构建 │
│ └→ Jaxpr 中间表示 │
│ │
│ 3. 反向传播 │
│ └→ ∂f/∂x = 2x + 2 │
│ │
│ 4. 梯度计算 │
│ └→ grad(f)(x) = 2x + 2 │
└───────────────────────────────────────────────────────┘
2018 年:JAX 初始版本发布,基于 Autograd 和 XLA
2019 年:引入 Transform 机制(jit, vmap, pmap)
2020 年:完善并行计算和内存管理
2021-2023 年:性能优化和生态系统扩展
关键突破:通过 MLIR 中间表示实现跨硬件平台的统一编译优化
| 特性 | JAX | TensorFlow |
|---|---|---|
| 编程模型 | 函数式 | 命令式 + 声明式 |
| 自动微分 | 源码转换 | 计算图追踪 |
| 编译优化 | XLA + MLIR | XLA + TensorFlow |
| 内存管理 | 自动垃圾回收 | 显式内存管理 |
| 生态系统 | 简洁专注 | 完整生态 |
核心原语
变换机制
// packages/jax/_src/array.py
class Array:
"""JAX Array 类型 - 继承自 ndarray,添加追踪能力"""
def __init__(self,
shape: Shape,
dtype: DTypeLike,
device: Device = None,
n_buffer: int = 0):
# 核心属性
self._shape = shape # 数组形状
self._dtype = dtype # 数据类型
self._device = device # 设备位置
self._n_buffer = n_buffer # 缓存计数
# 追踪相关
self._aval = None # AbstractValue
self._trace = None # Trace 对象
@property
def shape(self) -> Shape:
return self._shape
@property
def dtype(self) -> DTypeLike:
return self._dtype
@property
def device(self) -> Device:
return self._device
# 创建 Array
def array(object: Any, *, dtype: DTypeLike = None) -> Array:
return _array(object, dtype=dtype, device=None, n_buffer=0)
Array 是 JAX 的核心数据类型,在 NumPy ndarray 基础上添加了自动微分和编译支持
// packages/jax/_src/api.py
def transform(f: Callable, *transforms: Transform) -> Callable:
"""组合多个变换装饰器"""
@functools.wraps(f)
def wrapped_f(*args, **kwargs):
# 应用所有变换
for transform in reversed(transforms):
f = transform(f)
return f(*args, **kwargs)
return wrapped_f
# 变换类型定义
Transform = Callable[[Callable], Callable]
def jit(f: Callable, static_argnums: Sequence[int] = ()) -> Callable:
"""JIT 编译变换"""
return transform(f, partial(jit_compile, static_argnums=static_argnums))
def grad(f: Callable, has_aux: bool = False) -> Callable:
"""梯度计算变换"""
return transform(f, partial(compute_grad, has_aux=has_aux))
def vmap(f: Callable, in_axes=0, out_axes=0) -> Callable:
"""向量化变换"""
return transform(f, partial(vectorize_map, in_axes=in_axes, out_axes=out_axes))
JIT(Just-In-Time)编译是 JAX 的核心特性,将 Python 函数编译为高效的机器代码。
JIT 编译流程:
1. 函数追踪 (Tracing)
↓
2. Jaxpr 生成 (Jaxpr Generation)
↓
3. XLA 编译 (XLA Compilation)
↓
4. 代码优化 (Code Optimization)
↓
5. 执行部署 (Execution)
// packages/jax/_src/interpreters/ad.py
def jvp(fun: lu.WrappedFun, has_aux=False, instantiate=True) -> Any:
"""前向自动微分(Jacobian-Vector Product)"""
if not has_aux:
return jvpfun(jvp_subtrace(fun), instantiate)
else:
fun, aux = jvp_subtrace_aux(fun)
return jvpfun(fun, instantiate), aux
@lu.transformation2
def jvpfun(f: Callable, instantiate, primals, tangents):
tag = core.TraceTag()
tangents = [p2tz(t) if not isinstance(t, Zero)
and isinstance(typeof(t), core.ShapedArray)
and dtype(t) == float0 else t for t in tangents]
ctx = (source_info_util.transform_name_stack('jvp'))
with ctx:
out_primals, out_tangents = f(tag, primals, tangents)
if type(instantiate) is bool:
instantiate = [instantiate] * len(out_tangents)
out_tangents = [instantiate_zeros(t) if inst else t
for t, inst in zip(out_tangents, instantiate)]
return out_primals, out_tangents
┌──────────────────────────────────────────────────────────┐
│ JAX 数据结构层次 │
├──────────────────────────────────────────────────────────┤
│ jax.Array (用户层) │
│ ┌────────────────────────────────────────────────────┐ │
│ │ _aval: AbstractValue (抽象值) │ │
│ │ _trace: Trace (追踪信息) │ │
│ │ _device: Device (设备信息) │ │
│ │ _n_buffer: int (缓冲区计数) │ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ AbstractValue (抽象层) │
│ ┌────────────────────────────────────────────────────┐ │
│ │ ShapedArray (形状数组) │ │
│ │ ConcreteArray (具体数组) │ │
│ │ UnshapedArray (无形状数组) │ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ Trace (追踪层) │
│ ┌────────────────────────────────────────────────────┐ │
│ │ DynamicJaxprTrace (动态追踪) │ │
│ │ RewriterTrace (重写追踪) │ │
│ │ CallbackTrace (回调追踪) │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘
// packages/jax/_src/core.py
class Tracer:
"""追踪器基类 - 表示被追踪的值"""
def __init__(self, trace: 'Trace', aval: 'AbstractValue'):
self._trace = trace
self._aval = aval
@property
def aval(self) -> 'AbstractValue':
return self._aval
@property
def trace(self) -> 'Trace':
return self._trace
class ConcreteArrayTracer(Tracer):
"""具体数组追踪器"""
def __init__(self, trace: 'Trace', aval: 'AbstractValue',
const: bool, val: Array):
super().__init__(trace, aval)
self.const = const # 是否为常量
self.val = val # 具体值
def full_lower(self):
"""完全降低到具体值"""
if self.const:
return self.val
else:
return self # 保持为追踪器
class DynamicJaxprTracer(Tracer):
"""动态 Jaxpr 追踪器"""
def __init__(self, trace: 'DynamicJaxprTrace', aval: 'AbstractValue'):
super().__init__(trace, aval)
self._trace: 'DynamicJaxprTrace' = trace
self._aval: 'AbstractValue' = aval
Tracer 是自动微分系统的核心,它包装原始值并记录计算过程
// packages/jax/_src/core.py
class AbstractValue:
"""抽象值基类 - 定义值的结构属性"""
def __init__(self):
raise TypeError("AbstractValue is not instantiable")
def at_least_space(self) -> 'AbstractValue':
"""获取至少包含此空间的最小抽象值"""
raise NotImplementedError
def update(self, **kwargs) -> 'AbstractValue':
"""创建新抽象值,更新指定属性"""
raise NotImplementedError
class ShapedArray(AbstractValue):
"""形状数组 - 包含形状和dtype信息"""
def __init__(self, shape: Shape, dtype: DTypeLike):
self._shape = shape
self._dtype = dtype
@property
def shape(self) -> Shape:
return self._shape
@property
def ndim(self) -> int:
return len(self._shape)
@property
def size(self) -> int:
return functools.reduce(operator.mul, self._shape, 1)
@property
def dtype(self) -> DTypeLike:
return self._dtype
def at_least_space(self):
return ShapedArray(self._shape, promote_types(self._dtype, float_))
// packages/jax/_src/core.py
class Primitive:
"""原语 - 计算的基本操作单元"""
def __init__(self, name: str, multiple_results: bool = False):
self.name = name
self.multiple_results = multiple_results
self.bind: Bindings = {} # 绑定规则
def def_impl(self, impl: Callable) -> None:
"""定义实现函数"""
self.bind['impl'] = impl
def def_abstract_eval(self, abstract_eval: Callable) -> None:
"""定义抽象求值函数"""
self.bind['abstract_eval'] = abstract_eval
def def_custom_jvp(self, jvp: Callable) -> None:
"""定义自定义导数规则"""
self.bind['custom_jvp'] = jvp
# 常用原语
def_p = Primitive('def') # 定义操作
call_p = Primitive('call') # 调用操作
add_p = Primitive('add') # 加法操作
mul_p = Primitive('mul') # 乘法操作
设计理念:Primitive 将计算操作与具体实现分离,支持不同的变换和优化策略
// packages/jax/_src/core.py
class Jaxpr:
"""JAX 中间表示 - 编译后的计算图"""
def __init__(self,
constvars: List[Var],
invars: List[Var],
outvars: List[Var],
eqns: List[Eqn],
consts: List[Any]):
self.constvars = constvars # 常量变量
self.invars = invars # 输入变量
self.outvars = outvars # 输出变量
self.eqns = eqns # 方程列表
self.consts = consts # 常量值
class Eqn:
"""计算方程"""
def __init__(self,
lhs: List[Var],
rhs: Expression,
primitive: Primitive):
self.lhs = lhs # 左侧结果变量
self.rhs = rhs # 右侧表达式
self.primitive = primitive # 使用的原语
# Jaxpr 示例:f(x) = x² + 2x + 1
const [1.0] # 常量
x = input() # 输入变量
x2 = mul x x # x²
x2x = mul x2 [2.0] # 2x
result = add x2x [1.0] # x² + 2x + 1
output result
// packages/jax/_src/ad_util.py
def add_jaxvals(x: Array, y: Array) -> Array:
"""两个 JAX 数值的加法,考虑导数传播"""
if isinstance(x, Zero):
return y
if isinstance(y, Zero):
return x
return add_p.bind(x, y)
def p2tz(t: Union[Array, Zero]) -> Union[Array, TangentArray]:
"""将零值转换为切线数组"""
if isinstance(t, Zero):
return TangentArray(t.zeros_like(), t)
return t
def p2cz(t: Union[Array, Zero]) -> Union[Array, CotangentArray]:
"""将零值转为共轭切线数组(反向传播用)"""
if isinstance(t, Zero):
return CotangentArray(t.zeros_like(), t)
return t
核心思想:通过零值(Zero)表示不参与导数传播的值,避免不必要的计算
// 线性求导(前向模式)实现
def linearize_subtrace_2(f: Callable, is_vjp: bool,
tag: core.TraceTag, nzs_in: Sequence[bool],
debug_info: core.DebugInfo, primals):
"""线性化追踪 - 前向模式核心"""
source_info = source_info_util.current()
# 创建追踪器
with core.take_current_trace() as parent_trace:
tangent_trace = pe.DynamicJaxprTrace(debug_info, auto_dce=True)
tangent_trace.tag = tag
linearize_trace = LinearizeTrace(parent_trace, tangent_trace, is_vjp)
# 创建输入追踪器
tracers = [LinearizeTracer(linearize_trace, p,
tangent_trace.new_arg(
typeof(p).to_tangent_aval(),
source_info))
if nz else p
for p, nz in zip(primals, nzs_in)]
# 执行函数
with core.set_current_trace(linearize_trace, check_leaks=True):
ans = f(*tracers)
out_primals, out_tangents = ans.map(
linearize_trace.to_primal_tangent_pair).unzip2()
del linearize_trace, ans, tracers
# 计算非零输出
nzs_out = tuple(type(t) is not Zero for t in out_tangents)
out_tangents = tuple(t for t, nz in zip(out_tangents, nzs_out) if nz)
return out_primals, out_tangents
// 反向传播(反向模式)实现
def linearize_subtrace(f: Callable, is_vjp: bool,
tag: core.TraceTag, nzs_in: Sequence[bool],
debug_info: core.DebugInfo, primals):
"""反向传播追踪 - 反向模式核心"""
source_info = source_info_util.current()
# 创建前向追踪器
with core.take_current_trace() as parent_trace:
forward_trace = pe.DynamicJaxprTrace(debug_info, auto_dce=True)
forward_trace.tag = tag
# 执行前向传播
primals_out, tracers_out = forward_primitive.bind(
primals, forward_trace.new_args(primals))
# 创建反向追踪器
backward_trace = RewriterTrace(forward_trace, parent_trace)
backward_trace.tag = tag
# 执行反向传播
cotangents_in = zeros_like(primals_out)
_, ct_out = backward_primitive.bind(
cotangents_in, *tracers_out)
# 返回梯度
grads = ct_out[:len(primals)]
nzs_out = tuple(type(t) is not Zero for t in grads)
return primals_out, tuple(t for t, nz in zip(grads, nzs_out) if nz)
// packages/jax/_src/api.py
def grad(fun: Callable,
argnums: Union[int, Sequence[int]] = 0,
has_aux: bool = False,
holomorphic: bool = False,
reduce_fn: Optional[Callable] = None) -> Callable:
"""计算函数的梯度"""
def gradfun(*args, **kwargs):
# 处理多参数梯度
if isinstance(argnums, int):
argnums_ = (argnums,)
else:
argnums_ = argnums
# 计算梯度
g = value_and_grad(fun, argnums=argnums_,
has_aux=has_aux, holomorphic=holomorphic,
reduce_fn=reduce_fn)(*args, **kwargs)
if has_aux:
(val, aux), g = g
return g, aux
else:
return g
return gradfun
def value_and_grad(fun: Callable,
argnums: Union[int, Sequence[int]] = 0,
has_aux: bool = False,
holomorphic: bool = False,
reduce_fn: Optional[Callable] = None) -> Callable:
"""计算函数值和梯度"""
def gradfun(*args, **kwargs):
if isinstance(argnums, int):
argnums_ = (argnums,)
else:
argnums_ = argnums
# 使用 vjp 计算
def f_wrapper(*args):
return fun(*args, **kwargs)
y, vjp_fun = vjp(f_wrapper, *[args[i] for i in argnums_], has_aux=has_aux)
if has_aux:
y, aux = y
# 计算梯度
if reduce_fn is not None:
g = vjp_fun(reduce_fn(y))
else:
g = vjp_fun(ones_like(y))
if isinstance(argnums, int):
g = g[0]
if has_aux:
return (y, aux), g
else:
return y, g
return gradfun
// 自定义梯度实现
def custom_jvp(funs: Union[Callable, Tuple[Callable, Callable]]):
"""自定义前向导数"""
if isinstance(funs, tuple):
fun, fun_jvp = funs
else:
fun, fun_jvp = funs, None
def custom_grad_wrapper(fun):
@functools.wraps(fun)
def gradfun(*args, **kwargs):
if fun_jvp is not None:
# 使用自定义导数规则
return fun_jvp(*args, **kwargs)
else:
# 默认导数规则
return grad(fun)(*args, **kwargs)
return gradfun
return fun.def_custom_jvp(custom_grad_wrapper)
# 使用示例
@custom_jvp
def sigmoid(x):
"""Sigmoid 函数"""
return 1 / (1 + jnp.exp(-x))
def sigmoid_jvp(primals, tangents):
"""自定义 Sigmoid 导数"""
x = primals[0]
return sigmoid(x) * (1 - sigmoid(x)) * tangents[0]
sigmoid.def_custom_jvp((sigmoid, sigmoid_jvp))
// Vector-Jacobian 乘积实现
def vjp(f: Callable, *primals, has_aux: bool = False):
"""计算函数的 VJP(向量-Jacobian 乘积)"""
def vjpfun(*cotangents):
"""计算反向传播"""
if len(cotangents) != len(out_aval):
raise TypeError(f"Expected {len(out_aval)} cotangents, got {len(cotangents)}")
# 执行反向传播
in_cts = backward_pass(cotangents, jaxpr, consts, *out_aval)
if has_aux:
in_cts, aux = in_cts
return (in_cts, aux)
else:
return in_cts
# 执行前向传播,获取 jaxpr
out_aval, jaxpr, consts, out_avals = pe.trace_to_jaxpr_dynamic(f, primals)
if has_aux:
out_aval, aux = out_aval
if not isinstance(out_aval, (list, tuple)):
out_aval = [out_aval]
if has_aux:
return vjpfun, aux
else:
return vjpfun
# 使用示例
def f(x):
return jnp.sin(x) * jnp.cos(x)
# 计算 f'(x) * v
vjp_fn = jax.vjp(f, 1.0)
gradient_times_vector = vjp_fn(0.5) # 计算梯度乘以向量 0.5
// JIT 编译核心实现
def jit_compile(f: Callable, static_argnums: Sequence[int] = ()) -> Callable:
"""JIT 编译器核心"""
@functools.wraps(f)
def compiled_f(*args, **kwargs):
# 分离静态参数和动态参数
static_args = [args[i] for i in static_argnums]
dynamic_args = [arg for i, arg in enumerate(args) if i not in static_argnums]
# 创建缓存键
cache_key = (f.__name__, static_args)
# 检查缓存
if cache_key in compiled_f.cache:
compiled_fn, compiled_consts = compiled_f.cache[cache_key]
return compiled_f(*args, **kwargs)
# 执行追踪
with core.new_main(core.TraceLevel.STRICT) as main:
trace = core.DynamicJaxprTrace(main, True)
with core.set_current_trace(trace):
args_flat, in_tree = tree_util.flatten(dynamic_args)
in_avals = [trace_to_aval(x) for x in args_flat]
# 执行函数,生成 jaxpr
jaxpr, (out_avals, consts, out_tree) = trace_to_subjaxpr(
f, in_tree, *args_flat)
# 编译 XLA 程序
compiled_fn, compiled_consts = xla_compile(
jaxpr, consts, in_avals, out_avals)
# 缓存结果
compiled_f.cache[cache_key] = (compiled_fn, compiled_consts)
# 执行编译后的函数
out = compiled_fn(*args, **kwargs)
return tree_util.unflatten(out_tree, out)
compiled_f.cache = {}
return compiled_f
XLA(Accelerated Linear Algebra)是 Google 的线性代数编译器,将 JAX 计算图优化为高效的机器代码。
XLA 优化流程:
1. 计算 Lowering
↓
2. MLIR 转换
↓
3. HLO 优化
↓
├── 算子融合
├── 内存优化
└── 并行化
↓
4. 代码生成
↓
5. 执行部署
// packages/jax/_src/pjit.py
def jvp(fun: lu.WrappedFun, has_aux=False, instantiate=True,
transform_stack=True) -> Any:
"""并行 JIT 编译的 JVP 实现"""
if not has_aux:
return jvpfun(jvp_subtrace(fun), instantiate, transform_stack)
else:
fun, aux = jvp_subtrace_aux(fun)
return jvpfun(fun, instantiate, transform_stack), aux
@lu.transformation2
def jvpfun(f: Callable, instantiate, transform_stack, primals, tangents):
tag = core.TraceTag()
tangents = [p2tz(t) if not isinstance(t, Zero)
and isinstance(typeof(t), core.ShapedArray)
and dtype(t) == float0 else t for t in tangents]
ctx = (source_info_util.transform_name_stack('jvp') if transform_stack
else contextlib.nullcontext())
with ctx:
out_primals, out_tangents = f(tag, primals, tangents)
if type(instantiate) is bool:
instantiate = [instantiate] * len(out_tangents)
out_tangents = [instantiate_zeros(t) if inst else t
for t, inst in zip(out_tangents, instantiate)]
return out_primals, out_tangents
// packages/jax/_src/pjit.py
class PjitInfo(NamedTuple):
"""Pjit 信息容器"""
fun_sourceinfo: str
fun_signature: inspect.Signature | None
user_specified_in_shardings: bool
in_shardings_treedef: PyTreeDef
in_shardings_leaves: tuple[Any, ...]
out_shardings_treedef: PyTreeDef
out_shardings_leaves: tuple[Any, ...]
in_layouts_treedef: PyTreeDef
in_layouts_leaves: tuple[Any, ...]
out_layouts_treedef: PyTreeDef
out_layouts_leaves: tuple[Any, ...]
static_argnums: tuple[int, ...]
def pjit(fun: Callable,
in_shardings: Optional[ShardingSpec] = None,
out_shardings: Optional[ShardingSpec] = None,
static_argnums: Optional[Sequence[int]] = None,
donate_args: Optional[Sequence[int]] = None) -> Callable:
"""并行 JIT 编译"""
def pjit_wrapper(*args, **kwargs):
# 处理参数分片
in_shardings_expanded = expand_shardings(in_shardings, args)
out_shardings_expanded = expand_shardings(out_shardings, args)
# 创建分片编译器
compiler = ShardedCompiler(in_shardings_expanded, out_shardings_expanded)
# 执行编译
compiled_fn = compiler.compile(fun, args, kwargs)
# 执行并行计算
return compiled_fn(*args, **kwargs)
return pjit_wrapper
// packages/jax/_src/mesh.py
class AbstractMesh:
"""抽象网格 - 定义并行计算设备拓扑"""
def __init__(self, axis_names: Sequence[str], axis_sizes: Sequence[int]):
self.axis_names = tuple(axis_names)
self.axis_sizes = tuple(axis_sizes)
self.devices = self._create_devices()
def _create_devices(self):
"""创建设备集合"""
num_devices = functools.reduce(operator.mul, self.axis_sizes, 1)
return [xc.Device(i) for i in range(num_devices)]
@property
def shape(self) -> tuple[int, ...]:
"""网格形状"""
return self.axis_sizes
def index_to_device(self, index: tuple[int, ...]) -> xc.Device:
"""网格索引到设备"""
flat_index = sum(i * prod(self.axis_sizes[j+1:])
for i, j in zip(index, range(len(index))))
return self.devices[flat_index]
# 使用示例
mesh = AbstractMesh(['batch', 'model'], [8, 4]) # 32 个设备
device = mesh.index_to_device((2, 1)) # 访问第 2 批次第 1 个模型设备
Mesh是 JAX 并行计算的核心概念,定义了计算设备如何组织成多维网格。
// Mesh 分片策略
class Sharding:
"""数据分片策略"""
def __init__(self, mesh: AbstractMesh, partition_spec: PartitionSpec):
self.mesh = mesh
self.partition_spec = partition_spec
def shard_array(self, array: Array) -> Array:
"""将数组按照策略分片"""
# 实现分片逻辑
return array
def unshard_array(self, array: Array) -> Array:
"""将分片数组合并为全局数组"""
# 实现合并逻辑
return array
class PartitionSpec:
"""分片规范"""
def __init__(self, *partition_dims: Union[str, None]):
self.partition_dims = partition_dims
def __repr__(self):
return f"PartitionSpec({', '.join(str(d) for d in self.partition_dims)})"
# 使用示例
mesh = AbstractMesh(['batch', 'model'], [8, 4])
sharding = Sharding(mesh, PartitionSpec('batch', None)) # 只在 batch 维度分片
// 数据并行实现
def data_parallel(fun: Callable, mesh: AbstractMesh,
in_axis_resources: AxisResources = None,
out_axis_resources: AxisResources = None) -> Callable:
"""数据并行装饰器"""
def parallel_wrapper(*args, **kwargs):
# 将数据分片到各个设备
args_sharded = _shard_args(args, mesh, in_axis_resources)
# 并行执行函数
results_sharded = pmap(fun, mesh, in_axis_resources, out_axis_resources)(
*args_sharded, **kwargs)
# 合并结果
return _unshard_results(results_sharded, mesh, out_axis_resources)
return parallel_wrapper
def _shard_args(args, mesh: AbstractMesh, axis_resources):
"""参数分片"""
if axis_resources is None:
axis_resources = {i: mesh.axis_names[0] for i in range(len(args))}
sharded_args = {}
for i, arg in enumerate(args):
if i in axis_resources:
device_axis = axis_resources[i]
shard_spec = _create_shard_spec(mesh, device_axis)
sharded_args[i] = _shard_array(arg, mesh, shard_spec)
else:
sharded_args[i] = arg
return sharded_args
// packages/jax/_src/vmap.py
def vmap(fun: Callable, in_axes=0, out_axes=0) -> Callable:
"""向量化映射 - 自动将函数应用到数组的每个元素"""
def vmapped_fun(*args, **kwargs):
# 处理输入轴
if isinstance(in_axes, int):
in_axes = (in_axes,) * len(args)
elif len(in_axes) != len(args):
raise ValueError(f"in_axes length {len(in_axes)} != args length {len(args)}")
# 确定输出轴
if isinstance(out_axes, int):
out_axes = (out_axes,)
# 创建向量化追踪器
with core.new_main(core.TraceLevel.MAP) as main:
trace = core.DynamicJaxprTrace(main, True)
with core.set_current_trace(trace):
# 展开输入
args_mapped = _map_inputs(trace, args, in_axes)
# 执行函数
result_mapped = fun(*args_mapped, **kwargs)
# 聚合输出
result = _map_outputs(trace, result_mapped, out_axes)
return result
return vmapped_fun
def _map_inputs(trace, args, in_axes):
"""展开输入维度"""
mapped_args = []
for arg, axis in zip(args, in_axes):
if axis is not None:
# 在指定维度展开
mapped_arg = _expand_dim(trace, arg, axis)
mapped_args.append(mapped_arg)
else:
mapped_args.append(arg)
return mapped_args
// pmap 并行映射实现
def pmap(fun: Callable, mesh: AbstractMesh,
in_axes: AxisResources = None,
out_axes: AxisResources = None) -> Callable:
"""并行映射 - 在多个设备上并行执行函数"""
def pmapped_fun(*args, **kwargs):
# 创建并行编译器
compiler = ParallelCompiler(fun, mesh, in_axes, out_axes)
# 编译并行计算图
compiled_pmap = compiler.compile(*args, **kwargs)
# 执行并行计算
return compiled_pmap.execute(*args, **kwargs)
return pmapped_fun
class ParallelCompiler:
"""并行编译器"""
def __init__(self, fun: Callable, mesh: AbstractMesh,
in_axes: AxisResources, out_axes: AxisResources):
self.fun = fun
self.mesh = mesh
self.in_axes = in_axes
self.out_axes = out_axes
def compile(self, *args, **kwargs):
# 生成并行计算图
parallel_jaxpr = self._generate_parallel_jaxpr(*args, **kwargs)
# 编译为 XLA 程序
xla_program = self._compile_to_xla(parallel_jaxpr)
return CompiledPMap(xla_program, self.mesh)
def _generate_parallel_jaxpr(self, *args, **kwargs):
# 为每个设备生成计算子图
device_jaxprs = []
for device_index in range(self.mesh.devices_count):
device_args = self._shard_args_for_device(device_index, args)
device_jaxpr = self._trace_function(device_args, **kwargs)
device_jaxprs.append(device_jaxpr)
return device_jaxprs
向量化是将标量函数自动推广为向量函数的过程,通过在虚拟维度上创建计算副本来实现。
// 向量化原理示例
def scalar_sin(x):
"""标量 sin 函数"""
return jnp.sin(x)
# 使用 vmap 自动向量化
vector_sin = vmap(scalar_sin)
# 原理:
# 输入: [x1, x2, x3, x4] (形状: (4,))
# 虚拟维度: vmapped
#
# 计算图:
#
# vmapped[0] vmapped[1] vmapped[2] vmapped[3]
# ↓ ↓ ↓ ↓
# scalar_sin(x1) scalar_sin(x2) scalar_sin(x3) scalar_sin(x4)
# ↑ ↑ ↑ ↑
# vmapped[0] vmapped[1] vmapped[2] vmapped[3]
#
# 输出: [sin(x1), sin(x2), sin(x3), sin(x4)] (形状: (4,))
装饰器模式
策略模式
设计优势:通过组合模式实现复杂功能,保持代码模块化和可扩展性。
// 装饰器模式示例
def compose_transforms(*transforms):
"""组合多个变换装饰器"""
def decorator(f):
for transform in reversed(transforms):
f = transform(f)
return f
return decorator
# 使用装饰器模式
@compose_transforms(jit, grad, vmap)
def optimized_function(x):
"""组合使用 JIT、梯度、向量化"""
return jnp.sum(jnp.sin(x) * jnp.cos(x))
# 等价于:
# optimized_function = vmap(grad(jit(function)))
# 装饰器核心实现
def jit(f):
@functools.wraps(f)
def jit_wrapper(*args, **kwargs):
# JIT 编译逻辑
compiled_f = _compile_to_xla(f, args, kwargs)
return compiled_f(*args, **kwargs)
return jit_wrapper
def grad(f):
@functools.wraps(f)
def grad_wrapper(*args, **kwargs):
# 梯度计算逻辑
g = _compute_gradient(f, args, kwargs)
return g
return grad_wrapper
// 管道模式 - 计算流处理
class TransformPipeline:
"""变换管道 - 按顺序应用多个变换"""
def __init__(self, *transforms):
self.transforms = list(transforms)
def apply(self, data):
"""按顺序应用变换"""
result = data
for transform in self.transforms:
result = transform(result)
return result
def __call__(self, *args, **kwargs):
return self.apply(*args, **kwargs)
# 管道示例
optimization_pipeline = TransformPipeline(
jit, # JIT 编译
grad, # 梯度计算
vmap, # 向量化
pmap(mesh) # 并行化
)
# 使用管道
def loss_fn(params, data):
model = NeuralNetwork(params)
predictions = model(data)
return jnp.mean((predictions - data) ** 2)
# 应用优化管道
gradient = optimization_pipeline(loss_fn, params, training_data)
// 策略模式 - 不同的微分策略
class DifferentiationStrategy:
"""微分策略基类"""
def differentiate(self, f, *args, **kwargs):
raise NotImplementedError
class ForwardDiffStrategy(DifferentiationStrategy):
"""前向微分策略"""
def differentiate(self, f, *args, **kwargs):
return jvp(f, *args, **kwargs)
class ReverseDiffStrategy(DifferentiationStrategy):
"""反向微分策略"""
def differentiate(self, f, *args, **kwargs):
return vjp(f, *args, **kwargs)
class MixedDiffStrategy(DifferentiationStrategy):
"""混合微分策略"""
def differentiate(self, f, *args, **kwargs):
# 根据函数特性选择最佳策略
if _is_linear_function(f):
return ForwardDiffStrategy().differentiate(f, *args, **kwargs)
else:
return ReverseDiffStrategy().differentiate(f, *args, **kwargs)
# 使用策略模式
def compute_derivative(f, strategy: DifferentiationStrategy, *args, **kwargs):
"""使用指定策略计算导数"""
return strategy.differentiate(f, *args, **kwargs)
# 选择策略
forward_strategy = ForwardDiffStrategy()
reverse_strategy = ReverseDiffStrategy()
gradient = compute_derivative(loss_fn, reverse_strategy, params, data)
数据流分析是编译优化的核心技术,通过分析计算图中的数据依赖关系来优化执行效率。
// 数据流分析实现
class DataFlowAnalyzer:
"""数据流分析器"""
def __init__(self, jaxpr: Jaxpr):
self.jaxpr = jaxpr
self.dependency_graph = self._build_dependency_graph()
def _build_dependency_graph(self):
"""构建依赖关系图"""
graph = {}
# 初始化节点
for var in self.jaxpr.invars + self.jaxpr.constvars:
graph[var] = set()
# 分析每个方程的依赖
for eqn in self.jaxpr.eqns:
# 左侧变量依赖右侧表达式
for lhs_var in eqn.lhs:
# 收集右侧所有变量
rhs_vars = self._extract_variables(eqn.rhs)
graph[lhs_var].update(rhs_vars)
return graph
def _extract_variables(self, expr):
"""提取表达式中的变量"""
if isinstance(expr, Var):
return {expr}
elif isinstance(expr, tuple):
return set().union(*[self._extract_variables(e) for e in expr])
else:
return set()
def get_execution_order(self):
"""获取执行顺序(拓扑排序)"""
return self._topological_sort(self.dependency_graph)
def _topological_sort(self, graph):
"""拓扑排序"""
in_degree = {node: 0 for node in graph}
for node in graph:
for neighbor in graph[node]:
in_degree[neighbor] += 1
queue = deque([node for node in in_degree if in_degree[node] == 0])
result = []
while queue:
node = queue.popleft()
result.append(node)
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return result
// 计算图构建算法
class ComputationalGraph:
"""计算图 - 表示计算依赖关系"""
def __init__(self, jaxpr: Jaxpr):
self.jaxpr = jaxpr
self.graph = self._build_graph()
self.optimized_graph = None
def _build_graph(self):
"""构建计算图"""
graph = {
'nodes': {},
'edges': [],
'input_vars': self.jaxpr.invars,
'output_vars': self.jaxpr.outvars,
'const_vars': self.jaxpr.constvars
}
# 添加节点
for eqn in self.jaxpr.eqns:
node_id = f"node_{len(graph['nodes'])}"
graph['nodes'][node_id] = {
'equation': eqn,
'inputs': eqn.rhs if isinstance(eqn.rhs, tuple) else (eqn.rhs,),
'outputs': eqn.lhs
}
# 添加边
for input_var in graph['nodes'][node_id]['inputs']:
graph['edges'].append({
'from': self._find_node_for_var(input_var, graph),
'to': node_id,
'data': {'var': input_var}
})
return graph
def _find_node_for_var(self, var, graph):
"""查找变量对应的节点"""
for node_id, node in graph['nodes'].items():
if var in node['outputs']:
return node_id
return None # 输入变量
def optimize_graph(self):
"""优化计算图"""
# 算子融合
self._fuse_operations()
# 死代码消除
self._eliminate_dead_code()
# 常量折叠
self._constant_folding()
return self.graph
def _fuse_operations(self):
"""算子融合"""
# 实现算子融合逻辑
# 例如:矩阵乘法 + 偏置计算 -> 线性层
pass
// JAX 内存管理
class MemoryManager:
"""内存管理器"""
def __init__(self):
self.buffer_pool = {}
self.allocated_buffers = {}
self.reference_counts = {}
def allocate_buffer(self, shape: Shape, dtype: DTypeLike) -> Array:
"""分配缓冲区"""
buffer_key = (shape, dtype)
# 从池中获取
if buffer_key in self.buffer_pool:
buffer = self.buffer_pool[buffer_key]
del self.buffer_pool[buffer_key]
self.reference_counts[buffer] = 1
return buffer
# 创建新缓冲区
buffer = jnp.zeros(shape, dtype=dtype)
self.reference_counts[buffer] = 1
self.allocated_buffers[buffer] = buffer_key
return buffer
def release_buffer(self, buffer: Array):
"""释放缓冲区"""
if buffer in self.reference_counts:
self.reference_counts[buffer] -= 1
if self.reference_counts[buffer] == 0:
# 回收到池中
buffer_key = self.allocated_buffers[buffer]
self.buffer_pool[buffer_key] = buffer
# 清除引用
del self.reference_counts[buffer]
del self.allocated_buffers[buffer]
def reference_counting(self):
"""引用计数垃圾回收"""
# 查找零引用的缓冲区
zero_ref_buffers = [buffer for buffer, count in self.reference_counts.items()
if count == 0]
# 释放零引用缓冲区
for buffer in zero_ref_buffers:
self.release_buffer(buffer)
def get_memory_usage(self):
"""获取内存使用统计"""
total_allocated = len(self.allocated_buffers)
total_buffered = len(self.buffer_pool)
total_references = sum(self.reference_counts.values())
return {
'allocated_buffers': total_allocated,
'buffered_buffers': total_buffered,
'active_references': total_references
}
编译优化
运行时优化
优化原则:减少计算开销,最大化硬件利用率,最小化数据传输成本。
// XLA 编译优化
class XLAOptimizer:
"""XLA 优化器"""
def __init__(self):
self.optimization_passes = [
self._constant_folding,
self._dead_code_elimination,
self._operation_fusion,
self._memory_optimization,
self._vectorization
]
def optimize(self, hlo_module: ir.Module) -> ir.Module:
"""应用所有优化 Pass"""
for optimization_pass in self.optimization_passes:
hlo_module = optimization_pass(hlo_module)
return hlo_module
def _constant_folding(self, hlo_module: ir.Module) -> ir.Module:
"""常量折叠"""
# 识别常量计算
for computation in hlo_module.body.operations:
if computation.entry_computation():
constants = self._find_constants(computation)
for const in constants:
# 替换为常量值
self._replace_with_constant(computation, const)
return hlo_module
def _operation_fusion(self, hlo_module: ir.Module) -> ir.Module:
"""算子融合"""
# 识别可以融合的算子
fusion_candidates = self._find_fusion_candidates(hlo_module)
for candidate in fusion_candidates:
# 检查融合条件
if self._can_fuse(candidate):
# 创建融合算子
fused_op = self._create_fused_operation(candidate)
# 替换原始算子
self._replace_operations(candidate, fused_op)
return hlo_module
def _memory_optimization(self, hlo_module: ir.Module) -> ir.Module:
"""内存优化"""
# 分析内存使用模式
memory_analysis = self._analyze_memory_usage(hlo_module)
# 应用内存重用策略
for memory_op in memory_analysis['memory_ops']:
if self._can_reallocate(memory_op):
self._reuse_memory(memory_op)
return hlo_module
// 运行时优化实现
class RuntimeOptimizer:
"""运行时优化器"""
def __init__(self):
self.execution_cache = {}
self.profile_data = {}
def optimize_execution(self, jaxpr: Jaxpr, *args):
"""优化执行"""
# 检查缓存
cache_key = self._get_cache_key(jaxpr, args)
if cache_key in self.execution_cache:
return self.execution_cache[cache_key]
# 分析执行模式
execution_plan = self._analyze_execution_pattern(jaxpr, args)
# 应用优化策略
optimized_plan = self._apply_optimizations(execution_plan)
# 缓存结果
self.execution_cache[cache_key] = optimized_plan
return optimized_plan
def _analyze_execution_pattern(self, jaxpr: Jaxpr, *args):
"""分析执行模式"""
execution_plan = {
'parallel_ops': [],
'sequential_ops': [],
'memory_access_pattern': {},
'computation_dependencies': {}
}
# 识别并行操作
for eqn in jaxpr.eqns:
if self._is_parallel_operation(eqn):
execution_plan['parallel_ops'].append(eqn)
else:
execution_plan['sequential_ops'].append(eqn)
# 分析内存访问模式
execution_plan['memory_access_pattern'] = self._analyze_memory_access(jaxpr)
# 分析计算依赖
execution_plan['computation_dependencies'] = self._build_dependency_graph(jaxpr)
return execution_plan
def _apply_optimizations(self, execution_plan):
"""应用优化策略"""
optimized_plan = execution_plan.copy()
# 并行化优化
if len(optimized_plan['parallel_ops']) > 1:
optimized_plan['parallel_group'] = self._group_parallel_operations(
optimized_plan['parallel_ops'])
# 内存访问优化
if optimized_plan['memory_access_pattern']['read_write_ratio'] > 2.0:
optimized_plan['memory_prefetch'] = True
# 计算依赖优化
if self._has_long_dependency_chain(optimized_plan):
optimized_plan['pipeline_execution'] = True
return optimized_plan
编程建议
性能优化
1. JIT 编译陷阱
// ❌ 错误:在 JIT 中使用 Python 控制流
@jax.jit
def bad_function(x):
if x > 0:
return x ** 2
else:
return x + 1
// ✅ 正确:使用 jax.lax.cond
@jax.jit
def good_function(x):
return jax.lax.cond(x > 0, lambda x: x ** 2, lambda x: x + 1, x)
2. 内存管理陷阱
// ❌ 错误:内存泄漏
def leak_memory():
for i in range(1000):
arr = jnp.ones((1000, 1000))
# arr 未被正确释放
// ✅ 正确:使用 jax.lax.scan
def scan_memory():
def body_fn(i, state):
return state + jnp.ones((1000, 1000))
return jax.lax.scan(body_fn, 0, jnp.arange(1000))
// 调试工具使用
import jax
from jax import tree_util, grad, jit
import jax.numpy as jnp
# 1. 梯度调试
def debug_gradient(f, *args, **kwargs):
"""调试梯度计算"""
g = grad(f)(*args, **kwargs)
print(f"Gradient shape: {tree_util.tree_map(lambda x: x.shape, g)}")
print(f"Gradient norm: {tree_util.tree_map(jnp.linalg.norm, g)}")
return g
# 2. JIT 编译调试
def debug_jit(f, *args, **kwargs):
"""调试 JIT 编译"""
compiled_f = jit(f)
print(f"Compiled function: {compiled_f}")
result = compiled_f(*args, **kwargs)
return result
# 3. 数据结构调试
def debug_tree_structure(x):
"""调试树状结构"""
tree_def = tree_util.tree_structure(x)
print(f"Tree structure: {tree_def}")
leaves = tree_util.tree_leaves(x)
print(f"Number of leaves: {len(leaves)}")
return tree_def
# 4. 内存调试
def debug_memory_usage():
"""调试内存使用"""
print(f"Allocated memory: {jax.devices()[0].memory_all()}")
print(f"Memory limit: {jax.devices()[0].memory_limit()}")
# 5. 计算图调试
def debug_jaxpr(f, *args, **kwargs):
"""调试 JAXPR"""
jaxpr, consts = jax.make_jaxpr(f)(*args, **kwargs)
print(f"JAXPR:\n{jaxpr}")
return jaxpr, consts
官方文档
技术论文
相关技术
替代方案
JAX vs NumPy 性能对比
// 性能基准测试示例
import time
import jax
import jax.numpy as jnp
import numpy as np
def benchmark_numpy(n=10000):
"""NumPy 基准测试"""
a = np.random.randn(n, n)
b = np.random.randn(n, n)
start = time.time()
result = np.dot(a, b)
end = time.time()
return end - start
def benchmark_jax(n=10000):
"""JAX 基准测试"""
a = jnp.random.randn(n, n)
b = jnp.random.randn(n, n)
# JIT 编译
matmul = jax.jit(jnp.dot)
start = time.time()
result = matmul(a, b)
end = time.time()
return end - start
# 运行基准测试
numpy_time = benchmark_numpy()
jax_time = benchmark_jax()
print(f"NumPy time: {numpy_time:.4f}s")
print(f"JAX time: {jax_time:.4f}s")
print(f"Speedup: {numpy_time/jax_time:.2f}x")
核心要点
架构优势
JAX 是一个精心设计的高性能数值计算框架,通过编译优化、自动微分和并行计算为机器学习研究提供了强大的基础设施。
源码地址
https://github.com/google/jax