🦀 Tokio 运行时

深度解析 Rust 异步运行时核心原理

基于 Tokio 1.x 源码深度解析
2026-03-07 | 技术深度解读

📑 目录

第一部分:基础架构

  • Tokio 简介
  • 运行时架构
  • 核心组件概览
  • 调度器类型

第二部分:演进历史

  • 异步编程演进
  • Rust 异步生态

第三部分:核心模块

  • Runtime / Handle / Context
  • Scheduler / Worker / Core
  • Task / Harness / Queue

第四部分:核心算法

  • 工作窃取算法
  • LIFO Slot 优化
  • 任务调度流程

🦀 Tokio 简介

Tokio 是 Rust 生态中最流行的异步运行时,提供了构建异步应用所需的核心组件。

核心特性

  • 基于 Future 的异步模型
  • 工作窃取调度器
  • 非阻塞 I/O
  • 零成本抽象
  • 多线程支持

运行时组成

  • Scheduler - 任务调度器
  • I/O Driver - 事件循环
  • Timer - 时间轮定时器
  • Blocking Pool - 阻塞线程池
  • Signal - 信号处理

🏗️ 运行时架构

Tokio 运行时采用分层架构,核心层为调度器,外围为各种驱动。

┌─────────────────────────────────────────────┐
│                  Application                 │
├─────────────────────────────────────────────┤
│    spawn()    │    block_on()    │  handle   │
├───────────────┴──────────────────┴──────────┤
│                 Runtime Core                 │
│  ┌─────────────────────────────────────────┐│
│  │            Scheduler (调度器)            ││
│  │  ┌─────────────┐  ┌──────────────────┐  ││
│  │  │  Worker 0   │  │  Worker 1..N     │  ││
│  │  │  LocalQueue │  │  LocalQueue      │  ││
│  │  └─────────────┘  └──────────────────┘  ││
│  │         │              │                ││
│  │         └──────────────┘                ││
│  │              Injector Queue             ││
│  └─────────────────────────────────────────┘│
│  ┌─────────┐ ┌─────────┐ ┌────────────────┐ │
│  │I/O Driver│ │ Timer  │ │ Blocking Pool  │ │
│  └─────────┘ └─────────┘ └────────────────┘ │
└─────────────────────────────────────────────┘

📦 核心组件概览

组件 文件路径 职责
Runtime runtime/mod.rs 运行时入口,管理生命周期
Scheduler scheduler/mod.rs 任务调度核心
MultiThread scheduler/multi_thread/ 多线程工作窃取调度器
Worker scheduler/multi_thread/worker.rs 工作线程执行逻辑
Queue scheduler/multi_thread/queue.rs 无锁任务队列
Task task/mod.rs 任务抽象与状态管理
Harness task/harness.rs 任务执行引擎

⚖️ 调度器类型

CurrentThread (单线程)

  • 所有任务在单线程执行
  • 适用于轻量级任务
  • 无跨线程同步开销
  • 适合 CLI 工具
// 单线程运行时
#[tokio::main(flavor = "current_thread")]
async fn main() {
    // ...
}

MultiThread (多线程)

  • 工作窃取算法
  • 多核并行执行
  • 自动负载均衡
  • 适合高并发服务
// 多线程运行时 (默认)
#[tokio::main(flavor = "multi_thread")]
async fn main() {
    // ...
}

📜 异步编程演进

Rust 异步编程经历了从回调到 Future 再到 async/await 的演进。

阶段 方式 问题
1.0 回调函数 回调地狱、难以组合
1.19 Future trait 手动状态机、复杂
1.36 async/await 稳定 零成本抽象、简洁
1.39 async fn in traits 更强大的抽象
1.75 async closure 更灵活的异步闭包

🌐 Rust 异步生态

运行时

  • Tokio - 最流行
  • async-std - 标准库风格
  • smol - 轻量级
  • glommio - io_uring

异步原语

  • futures crate
  • async-channel
  • parking_lot

网络框架

  • hyper - HTTP
  • tonic - gRPC
  • warp - Web 框架
  • axum - Web 框架

数据库

  • sqlx
  • sea-orm
  • diesel-async

🔧 Runtime 模块

Runtime 是 Tokio 运行时的入口点,负责初始化和管理所有组件。

pub struct Runtime {
    // 调度器 - 核心组件
    scheduler: Scheduler,
    
    // I/O 驱动
    io_handle: io::Handle,
    
    // 时间驱动
    time_handle: time::Handle,
    
    // 信号驱动
    signal_handle: signal::Handle,
    
    // 阻塞线程池
    blocking_spawner: blocking::Spawner,
}

enum Scheduler {
    // 单线程调度器
    CurrentThread(current_thread::Scheduler),
    // 多线程调度器
    MultiThread(multi_thread::Scheduler),
}

🎛️ Handle 枚举

Handle 是对运行时的句柄,用于在异步上下文中访问运行时功能。

#[derive(Debug, Clone)]
pub(crate) enum Handle {
    // 单线程运行时句柄
    #[cfg(feature = "rt")]
    CurrentThread(Arc<current_thread::Handle>),

    // 多线程运行时句柄
    #[cfg(feature = "rt-multi-thread")]
    MultiThread(Arc<multi_thread::Handle>),
}

impl Handle {
    // 获取当前运行时句柄
    pub(crate) fn current() -> Handle {
        match context::with_current(Clone::clone) {
            Ok(handle) => handle,
            Err(e) => panic!("{}", e),
        }
    }
    
    // 生成新任务
    pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
    where F: Future + Send + 'static {
        match self {
            Handle::CurrentThread(h) => h.spawn(future),
            Handle::MultiThread(h) => h.spawn(future),
        }
    }
}

📝 Context 枚举

Context 保存当前执行上下文,用于任务执行期间访问调度器状态。

pub(super) enum Context {
    // 单线程上下文
    CurrentThread(current_thread::Context),

    // 多线程上下文
    #[cfg(feature = "rt-multi-thread")]
    MultiThread(multi_thread::Context),
}

impl Context {
    // 延迟唤醒任务
    pub(crate) fn defer(&self, waker: &Waker) {
        match self {
            Context::CurrentThread(ctx) => ctx.defer(waker),
            Context::MultiThread(ctx) => ctx.defer(waker),
        }
    }
}

设计要点:Context 通过 thread-local 存储,确保每个线程都能访问自己的执行上下文。

⚙️ Scheduler 模块

Scheduler 是任务调度的核心,负责决定何时何地执行任务。

cfg_rt! {
    pub(crate) mod current_thread;
    pub(crate) use current_thread::CurrentThread;

    mod defer;
    use defer::Defer;

    pub(crate) mod inject;
    pub(crate) use inject::Inject;
}

cfg_rt_multi_thread! {
    mod block_in_place;
    pub(crate) use block_in_place::block_in_place;

    mod lock;
    use lock::Lock;

    pub(crate) mod multi_thread;
    pub(crate) use multi_thread::MultiThread;
}

🔀 MultiThread 调度器

pub(crate) struct MultiThread;

impl MultiThread {
    pub(crate) fn new(
        size: usize,                   // 工作线程数
        driver: Driver,                // I/O 驱动
        driver_handle: driver::Handle, // 驱动句柄
        blocking_spawner: blocking::Spawner,
        seed_generator: RngSeedGenerator,
        config: Config,
        timer_flavor: TimerFlavor,
    ) -> (MultiThread, Arc<Handle>, Launch) {
        let parker = Parker::new(driver);
        let (handle, launch) = worker::create(
            size, parker, driver_handle,
            blocking_spawner, seed_generator,
            config, timer_flavor,
        );
        (MultiThread, handle, launch)
    }

    // 阻塞等待 Future 完成
    pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F)
        -> F::Output
    where F: Future {
        context::enter_runtime(handle, true, |blocking| {
            blocking.block_on(future).expect("failed to park thread")
        })
    }
}

👷 Worker 结构

Worker 代表一个工作线程,持有调度器句柄和核心数据。

pub(super) struct Worker {
    // 调度器句柄引用
    handle: Arc<Handle>,

    // Worker 索引
    index: usize,

    // 核心数据 (可被其他线程窃取)
    core: AtomicCell<Core>,
}

// 每个线程的上下文
pub(crate) struct Context {
    worker: Arc<Worker>,
    core: RefCell<Option<Box<Core>>>,
    defer: Defer,  // 延迟唤醒队列
}

关键设计:Core 存储在 AtomicCell 中,可以被其他线程"偷走"以支持 block_in_place 功能。

💾 Core 数据结构

struct Core {
    // 调度计数器
    tick: u32,

    // LIFO 插槽 - 最后调度的任务
    lifo_slot: Option<Notified>,

    // LIFO 是否启用
    lifo_enabled: bool,

    // 本地运行队列
    run_queue: queue::Local<Arc<Handle>>,

    // 是否正在搜索工作
    is_searching: bool,

    // 是否已关闭
    is_shutdown: bool,

    // Parker - 线程等待机制
    park: Option<Parker>,

    // 统计信息
    stats: Stats,

    // 全局队列检查间隔
    global_queue_interval: u32,

    // 随机数生成器
    rand: FastRand,
}

🔗 Shared 共享状态

pub(crate) struct Shared {
    // 每个 Worker 的远程状态
    remotes: Box<[Remote]>,

    // 全局注入队列
    pub(super) inject: inject::Shared<Arc<Handle>>,

    // 空闲 Worker 协调
    idle: Idle,

    // 所有活跃任务
    pub(crate) owned: OwnedTasks<Arc<Handle>>,

    // 同步锁保护的数据
    pub(super) synced: Mutex<Synced>,

    // 关闭时的 Core 收集
    shutdown_cores: Mutex<Vec<Box<Core>>>,

    // 配置
    config: Config,

    // 指标
    scheduler_metrics: SchedulerMetrics,
    worker_metrics: Box<[WorkerMetrics]>,
}

📡 Remote 结构

Remote 用于跨线程通信,允许其他线程与 Worker 交互。

struct Remote {
    // 窃取句柄
    pub(super) steal: queue::Steal<Arc<Handle>>,

    // 唤醒 Worker
    unpark: Unparker,
}

pub(crate) struct Synced {
    // Idle 的同步状态
    pub(super) idle: idle::Synced,

    // Inject 的同步状态
    pub(crate) inject: inject::Synced,
}

设计要点:Remote 实现了跨线程的任务窃取和通知机制,是工作窃取算法的基础。

🎯 Context 上下文详解

impl Context {
    fn run(&self, mut core: Box<Core>) -> RunResult {
        // 重置 LIFO 状态
        self.reset_lifo_enabled(&mut core);
        core.stats.start_processing_scheduled_tasks();

        while !core.is_shutdown {
            // 增加调度计数
            core.tick();

            // 运行维护任务
            core = self.maintenance(core);

            // 获取下一个任务
            if let Some(task) = core.next_task(&self.worker) {
                core = self.run_task(task, core)?;
                continue;
            }

            // 尝试从其他 Worker 窃取
            if let Some(task) = core.steal_work(&self.worker) {
                core = self.run_task(task, core)?;
            } else {
                // 没有工作,进入等待
                core = self.park(core);
            }
        }
        // ...
    }
}

📋 Task 模块

Task 是 Tokio 中异步任务的抽象,封装了 Future 及其状态。

//! 任务引用类型:
//! * OwnedTask - 存储在 OwnedTasks 中
//! * JoinHandle - 获取任务输出
//! * Waker - 唤醒任务
//! * Notified - 跟踪任务是否被通知
//! * Unowned - 不属于任何运行时的任务

#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
    raw: RawTask,
    _p: PhantomData<S>,
}

// 被通知的任务
#[repr(transparent)]
pub(crate) struct Notified<S: 'static>(Task<S>);

// 本地通知的任务 (非 Send)
pub(crate) struct LocalNotified<S: 'static> {
    task: Task<S>,
    _not_send: PhantomData<*const ()>,
}

🔖 Task 引用类型

类型 用途 引用计数
Task 拥有的任务句柄 1
Notified 已通知待执行 1
LocalNotified 本地任务 (非 Send) 1
UnownedTask 不属于运行时的任务 2
JoinHandle 获取任务输出 1
Waker 唤醒机制 可变

📊 Task 状态位

//! 状态字段使用原子 usize 存储多个位字段:
//!
//! * RUNNING   - 任务是否正在被 poll 或取消 (作为锁)
//! * COMPLETE  - Future 是否已完成 (完成后永不重置)
//! * NOTIFIED  - 是否存在 Notified 对象
//! * CANCELLED - 任务是否应被取消
//! * JOIN_INTEREST - 是否存在 JoinHandle
//! * JOIN_WAKER - JoinHandle waker 访问控制位
//!
//! 其余位用于引用计数

const RUNNING: usize = 1 << 0;
const COMPLETE: usize = 1 << 1;
const NOTIFIED: usize = 1 << 2;
const CANCELLED: usize = 1 << 3;
const JOIN_INTEREST: usize = 1 << 4;
const JOIN_WAKER: usize = 1 << 5;

// 引用计数在高位
const REF_COUNT_MASK: usize = !((1 << 6) - 1);

📤 Schedule Trait

pub(crate) trait Schedule: Sync + Sized + 'static {
    /// 释放任务,返回任务引用 (如果存在)
    fn release(&self, task: &Task<Self>) -> Option<Task<Self>>;

    /// 调度任务执行
    fn schedule(&self, task: Notified<Self>);

    /// 获取钩子
    fn hooks(&self) -> TaskHarnessScheduleHooks;

    /// 让出当前线程
    fn yield_now(&self, task: Notified<Self>) {
        self.schedule(task);
    }

    /// 处理 panic
    fn unhandled_panic(&self) {
        // 默认不做任何事
    }
}

关键点:Schedule trait 定义了调度器与任务之间的接口,Handle 实现了此 trait。

⚙️ Harness 任务执行

pub(super) struct Harness<T: Future, S: 'static> {
    cell: NonNull<Cell<T, S>>,
}

impl<T, S> Harness<T, S>
where T: Future, S: Schedule {
    /// Poll 内部 Future,消费一个引用计数
    pub(super) fn poll(self) {
        match self.poll_inner() {
            PollFuture::Notified => {
                // 任务被重新通知,让出执行
                self.core().scheduler.yield_now(Notified(...));
                self.drop_reference();
            }
            PollFuture::Complete => {
                self.complete();
            }
            PollFuture::Dealloc => {
                self.dealloc();
            }
            PollFuture::Done => (),
        }
    }
}

🔄 poll 方法详解

fn poll_inner(&self) -> PollFuture {
    match self.state().transition_to_running() {
        TransitionToRunning::Success => {
            // 创建 waker 和 context
            let waker_ref = waker_ref::<S>(&header_ptr);
            let cx = Context::from_waker(&waker_ref);
            
            // Poll Future
            let res = poll_future(self.core(), cx);

            if res == Poll::Ready(()) {
                return PollFuture::Complete;
            }

            // 转换回空闲状态
            let transition_res = self.state().transition_to_idle();
            // ...
        }
        TransitionToRunning::Cancelled => {
            cancel_task(self.core());
            PollFuture::Complete
        }
        // ...
    }
}

✅ complete 完成处理

fn complete(self) {
    // 转换到完成状态
    let snapshot = self.state().transition_to_complete();

    let _ = panic::catch_unwind(|| {
        if !snapshot.is_join_interested() {
            // 没有 JoinHandle,直接丢弃输出
            self.core().drop_future_or_output();
        } else if snapshot.is_join_waker_set() {
            // 唤醒 JoinHandle
            self.trailer().wake_join();
            
            if !self.state().unset_waker_after_complete()
                    .is_join_interested() {
                unsafe { self.trailer().set_waker(None); };
            }
        }
    });

    // 从调度器释放
    let num_release = self.release();
    if self.state().transition_to_terminal(num_release) {
        self.dealloc();
    }
}

📚 Queue 运行队列

Queue 实现了无锁的工作窃取队列,是 Tokio 高性能的关键。

pub(crate) struct Local<T: 'static> {
    inner: Arc<Inner<T>>,
}

pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>);

pub(crate) struct Inner<T: 'static> {
    // Head: LSB = 真实 head, MSB = 窃取者正在操作的 head
    head: AtomicUnsignedLong,

    // Tail: 仅由生产者更新
    tail: AtomicUnsignedShort,

    // 环形缓冲区
    buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; 256]>,
}

const LOCAL_QUEUE_CAPACITY: usize = 256;

🎯 Local/Steal 句柄

Local (生产者)

  • 单线程访问
  • push_back - 推入任务
  • pop - 弹出任务
  • len - 获取长度
impl<T> Local<T> {
    pub(crate) fn pop(&mut self) 
        -> Option<Notified<T>> {
        // CAS 操作获取任务
    }
}

Steal (消费者)

  • 多线程访问
  • steal_into - 窃取任务
  • is_empty - 检查空
impl<T> Steal<T> {
    pub(crate) fn steal_into(
        &self,
        dst: &mut Local<T>,
    ) -> Option<Notified<T>> {
        // 窃取一半任务
    }
}

📥 push_back_or_overflow

pub(crate) fn push_back_or_overflow<O: Overflow<T>>(
    &mut self,
    mut task: Notified<T>,
    overflow: &O,
) {
    let tail = loop {
        let head = self.inner.head.load(Acquire);
        let (steal, real) = unpack(head);
        let tail = unsafe { self.inner.tail.unsync_load() };

        if tail.wrapping_sub(steal) < 256 {
            // 队列有空间
            break tail;
        } else if steal != real {
            // 正在被窃取,空间会释放
            overflow.push(task);
            return;
        } else {
            // 队列满,移动一半到 inject queue
            match self.push_overflow(task, real, tail, overflow) {
                Ok(_) => return,
                Err(v) => task = v,  // CAS 失败,重试
            }
        }
    };
    self.push_back_finish(task, tail);
}

🔀 steal_into 工作窃取

pub(crate) fn steal_into(
    &self,
    dst: &mut Local<T>,
) -> Option<Notified<T>> {
    let n = loop {
        let (src_head_steal, src_head_real) = unpack(prev_packed);
        let src_tail = self.0.tail.load(Acquire);

        // 如果不匹配,说明有其他线程在窃取
        if src_head_steal != src_head_real {
            return None;
        }

        // 计算要窃取的数量 (一半)
        let n = src_tail.wrapping_sub(src_head_real);
        let n = n - n / 2;  // 窃取一半

        // CAS 更新 head
        let res = self.0.head.compare_exchange(
            prev_packed, next_packed, AcqRel, Acquire
        );
        match res {
            Ok(_) => break n,
            Err(actual) => prev_packed = actual,
        }
    };
    // 复制任务到目标队列...
}

🏗️ create 函数

pub(super) fn create(
    size: usize,  // Worker 数量
    park: Parker,
    // ...其他参数
) -> (Arc<Handle>, Launch) {
    let mut cores = Vec::with_capacity(size);
    let mut remotes = Vec::with_capacity(size);

    // 创建每个 Worker 的本地队列和核心
    for _ in 0..size {
        let (steal, run_queue) = queue::local();
        let unpark = park.clone().unpark();

        cores.push(Box::new(Core {
            tick: 0,
            lifo_slot: None,
            run_queue,
            park: Some(park.clone()),
            // ...
        }));

        remotes.push(Remote { steal, unpark });
    }

    // 创建共享状态
    let handle = Arc::new(Handle { ... });
    (handle, Launch(workers))
}

▶️ run 函数

fn run(worker: Arc<Worker>) {
    // 获取 Core,如果已被其他线程拿走则退出
    let core = match worker.core.take() {
        Some(core) => core,
        None => return,
    };

    // 设置线程上下文
    let handle = scheduler::Handle::MultiThread(worker.handle.clone());
    
    context::enter_runtime(&handle, true, |_| {
        let cx = scheduler::Context::MultiThread(Context {
            worker,
            core: RefCell::new(None),
            defer: Defer::new(),
        });

        context::set_scheduler(&cx, || {
            // 主循环
            assert!(cx.expect_multi_thread().run(core).is_err());
            cx.defer.wake();
        });
    });
}

⚡ run_task 执行任务

fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
    let task = self.worker.handle.shared.owned.assert_owner(task);

    // 确保不在搜索状态
    core.transition_from_searching(&self.worker);

    core.stats.start_poll();

    // 将 Core 放入上下文
    *self.core.borrow_mut() = Some(core);

    // 使用协作式调度执行
    coop::budget(|| {
        task.run();  // 执行任务

        // 处理 LIFO slot 中的任务
        loop {
            let mut core = self.core.borrow_mut().take()?;
            let task = match core.lifo_slot.take() {
                Some(t) => t,
                None => return Ok(core),
            };

            if !coop::has_budget_remaining() {
                // 预算用完,放回队列
                core.run_queue.push_back_or_overflow(task, ...);
                return Ok(core);
            }

            // 执行 LIFO 任务
            *self.core.borrow_mut() = Some(core);
            self.worker.handle.shared.owned.assert_owner(task).run();
        }
    })
}

🔍 steal_work 窃取工作

fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
    // 转换到搜索状态
    if !self.transition_to_searching(worker) {
        return None;  // 已有足够多的搜索者
    }

    let num = worker.handle.shared.remotes.len();
    // 随机起始点
    let start = self.rand.fastrand_n(num as u32) as usize;

    for i in 0..num {
        let i = (start + i) % num;
        if i == worker.index { continue; }  // 跳过自己

        let target = &worker.handle.shared.remotes[i];
        if let Some(task) = target.steal.steal_into(
            &mut self.run_queue, &mut self.stats
        ) {
            return Some(task);
        }
    }

    // 回退到全局队列
    worker.handle.next_remote_task()
}

😴 park 等待机制

fn park(&self, mut core: Box<Core>) -> Box<Core> {
    // 转换到休眠状态
    if core.transition_to_parked(&self.worker) {
        while !core.is_shutdown {
            core.stats.about_to_park();

            // 取出 Parker
            let mut park = core.park.take().expect("park missing");
            *self.core.borrow_mut() = Some(core);

            // 阻塞等待
            park.park(&self.worker.handle.driver);

            core = self.core.borrow_mut().take().expect("core missing");
            core.park = Some(park);

            // 运行维护任务
            core.maintenance(&self.worker);

            if core.transition_from_parked(&self.worker) {
                break;  // 有工作了
            }
        }
    }
    core
}

🧱 block_in_place

block_in_place 允许在异步上下文中执行阻塞操作,而不阻塞整个运行时。

pub(crate) fn block_in_place<F, R>(f: F) -> R
where F: FnOnce() -> R {
    // ...

    // 从上下文取出 Core
    let mut core = cx.core.borrow_mut().take()?;

    // 将 LIFO slot 的任务移到队列
    if let Some(task) = core.lifo_slot.take() {
        core.run_queue.push_back_or_overflow(task, ...);
    }

    // 将 Core 放回 Worker,允许其他线程接管
    cx.worker.core.set(core);

    // 启动新线程运行 Worker
    let worker = cx.worker.clone();
    runtime::spawn_blocking(move || run(worker));

    // 执行阻塞操作
    crate::runtime::context::exit_runtime(f)
}

📤 schedule_task

pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
    with_current(|maybe_cx| {
        if let Some(cx) = maybe_cx {
            // 确保是当前调度器
            if self.ptr_eq(&cx.worker.handle) {
                if let Some(core) = cx.core.borrow_mut().as_mut() {
                    self.schedule_local(core, task, is_yield);
                    return;
                }
            }
        }

        // 远程调度 - 使用 inject queue
        self.push_remote_task(task);
        self.notify_parked_remote();
    });
}

fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
    if is_yield || !core.lifo_enabled {
        // 放入队列尾部
        core.run_queue.push_back_or_overflow(task, ...);
    } else {
        // 放入 LIFO slot
        core.lifo_slot = Some(task);
    }
}

🎯 LIFO Slot 优化

LIFO Slot 是 Tokio 的关键优化,提高了消息传递模式的性能。

工作原理

  • 最后调度的任务进入 LIFO slot
  • 优先执行 LIFO slot 任务
  • 提高缓存局部性
  • 减少消息传递延迟

限制机制

  • 最多连续执行 3 次 LIFO
  • 防止任务饥饿
  • 可配置禁用
  • yield 操作绕过 LIFO
const MAX_LIFO_POLLS_PER_TICK: usize = 3;

if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
    core.lifo_enabled = false;  // 禁用 LIFO
}

🔀 工作窃取算法

工作窃取 (Work Stealing) 实现了自动负载均衡,充分利用多核。

算法步骤

  1. 检查本地队列
  2. 检查 LIFO slot
  3. 检查全局注入队列
  4. 随机选择目标 Worker
  5. 窃取一半任务
  6. 如果失败,尝试下一个

关键特性

  • 窃取一半,而非全部
  • 随机起始点
  • 限制搜索者数量
  • 无锁队列实现

📬 Injector Queue

Injector Queue 是全局任务队列,用于外部提交任务。

pub struct Shared<S: 'static> {
    // 内部队列
    inner: Mutex<Inner<S>>,
    // 关闭标志
    closed: AtomicBool,
}

impl<S: 'static> Shared<S> {
    // 推入任务
    pub fn push(&self, synced: &mut Synced, task: Notified<S>) {
        self.inner.lock().queue.push_back(task);
    }

    // 弹出任务
    pub fn pop(&self, synced: &mut Synced) -> Option<Notified<S>> {
        self.inner.lock().queue.pop_front()
    }

    // 批量弹出
    pub fn pop_n(&self, synced: &mut Synced, n: usize) 
        -> impl Iterator<Item = Notified<S>> { ... }
}

💤 Idle 空闲管理

Idle 组件协调空闲 Worker,决定何时唤醒以及谁来处理新任务。

pub(crate) struct Idle {
    // 同步状态
    synced: Mutex<Synced>,
    // 原子搜索者计数
    num_searching: AtomicUsize,
}

impl Idle {
    // Worker 转换到搜索状态
    pub fn transition_worker_to_searching(&self) -> bool {
        // 限制搜索者数量不超过 Worker 数的一半
    }

    // Worker 转换到休眠状态
    pub fn transition_worker_to_parked(&self, ...) -> bool {
        // 如果是最后一个搜索者,需要最后检查一次
    }

    // 选择要唤醒的 Worker
    pub fn worker_to_notify(&self, shared: &Shared) -> Option<usize> {
        // 返回一个休眠中的 Worker 索引
    }
}

🛑 Shutdown 关闭流程

Shutdown 是分阶段进行的,确保所有任务都能正确清理。

//! 关闭流程:
//! 1. Shared::close() 关闭 inject queue 和 OwnedTasks
//! 2. Worker 在 maintenance 中检测到关闭信号
//! 3. 设置 Core::is_shutdown = true
//! 4. Worker 并行执行 pre_shutdown,清空 OwnedTasks
//! 5. Worker 调用 Shared::shutdown 进入单线程阶段
//! 6. 最后一个 Worker 清空本地队列和注入队列
//! 7. 完成关闭

impl Shared {
    pub(super) fn close(&self) {
        if self.inject.close(&mut synced.inject) {
            self.notify_all();  // 唤醒所有 Worker
        }
    }
}

🎨 架构设计模式

工作窃取模式

  • 每个 Worker 有本地队列
  • 空闲时窃取其他队列
  • 自动负载均衡

Actor 模式

  • 每个 Worker 是独立 Actor
  • 通过消息传递通信
  • 无共享状态

无锁数据结构

  • Atomic CAS 操作
  • 避免锁竞争
  • 高并发性能

协作式调度

  • Budget 机制
  • 公平性保证
  • 防止饥饿

📐 运行时 UML

┌─────────────────────────────────────────────────────────────┐
│                        Runtime                               │
├─────────────────────────────────────────────────────────────┤
│  - scheduler: Scheduler                                     │
│  - io_handle, time_handle, signal_handle                    │
│  - blocking_spawner                                         │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                       Handle (Arc)                           │
├─────────────────────────────────────────────────────────────┤
│  - shared: Shared                                           │
│  - driver: driver::Handle                                   │
│  - blocking_spawner                                         │
└─────────────────────────────────────────────────────────────┘
                              │
          ┌───────────────────┼───────────────────┐
          ▼                   ▼                   ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│    Worker[0]    │ │    Worker[1]    │ │    Worker[N]    │
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
│  - index: 0     │ │  - index: 1     │ │  - index: N     │
│  - core: Core   │ │  - core: Core   │ │  - core: Core   │
│  - run_queue    │ │  - run_queue    │ │  - run_queue    │
└─────────────────┘ └─────────────────┘ └─────────────────┘
          │                   │                   │
          └───────────────────┴───────────────────┘
                              │
                              ▼
                   ┌─────────────────────┐
                   │   Injector Queue    │
                   └─────────────────────┘

🔄 任务调度流程

spawn(task)
    │
    ▼
┌──────────────────────────────────────┐
│  当前是否在 Worker 线程?              │
└──────────────────────────────────────┘
    │                    │
   Yes                  No
    │                    │
    ▼                    ▼
┌──────────────┐   ┌──────────────────┐
│ LIFO Slot?   │   │ Injector Queue   │
│ 或 LocalQueue│   │ (全局队列)        │
└──────────────┘   └──────────────────┘
    │                    │
    │                    ▼
    │            ┌──────────────────┐
    │            │ 通知空闲 Worker   │
    │            └──────────────────┘
    │                    │
    └────────────────────┘
              │
              ▼
    ┌──────────────────┐
    │ Worker 执行任务   │
    └──────────────────┘

🔀 工作窃取流程

Worker 没有本地任务
    │
    ▼
┌─────────────────────────────────────┐
│ transition_to_searching()           │
│ (限制搜索者数量)                     │
└─────────────────────────────────────┘
    │
    ▼
┌─────────────────────────────────────┐
│ 随机选择起始 Worker                  │
└─────────────────────────────────────┘
    │
    ▼
┌─────────────────────────────────────┐
│ 遍历所有 Worker (从随机点开始)        │
│   - 尝试 steal_into()               │
│   - 窃取一半任务                     │
└─────────────────────────────────────┘
    │           │
   成功        失败
    │           │
    ▼           ▼
执行任务    尝试下一个
                │
                ▼
        ┌───────────────────┐
        │ 检查 Injector Queue│
        └───────────────────┘
                │
                ▼
        ┌───────────────────┐
        │ 没有工作则 park    │
        └───────────────────┘

🚀 性能优化策略

缓存优化

  • LIFO Slot 提高局部性
  • 本地队列减少竞争
  • 任务在同一个 Worker

无锁设计

  • CAS 操作代替锁
  • 无锁环形队列
  • 最小化临界区

减少唤醒

  • 批量窃取
  • 延迟通知
  • Defer 队列

协作调度

  • Budget 限制
  • 公平性保证
  • 防止 CPU 饥饿

⚖️ 与其他运行时对比

特性 Tokio async-std Go Runtime
调度算法 工作窃取 工作窃取 工作窃取
栈大小 无栈 无栈 2KB 起
抢占式 协作式 协作式 抢占式
零成本
生态系统 丰富 中等 标准库

✨ 最佳实践

配置建议

  • 默认使用多线程运行时
  • Worker 数 = CPU 核心数
  • 合理使用 block_in_place
// 推荐
#[tokio::main(flavor = "multi_thread",
              worker_threads = 4)]
async fn main() { }

避免阻塞

  • 使用 spawn_blocking
  • 避免长时间计算
  • 正确处理取消
// 阻塞操作
tokio::task::spawn_blocking(|| {
    // 阻塞代码
}).await?;

⚠️ 常见陷阱

阻塞异步上下文

// ❌ 错误
async fn bad() {
    std::thread::sleep(
        Duration::from_secs(1)
    );
}

// ✅ 正确
async fn good() {
    tokio::time::sleep(
        Duration::from_secs(1)
    ).await;
}

持有锁跨 await

// ❌ 错误
async fn bad() {
    let lock = mutex.lock();
    async_op().await;  // 死锁风险
}

// ✅ 正确
async fn good() {
    {
        let lock = mutex.lock();
        // 同步操作
    }
    async_op().await;
}

📖 总结与扩展

Tokio 是 Rust 异步编程的核心,理解其内部机制有助于编写高性能异步代码。

核心要点

  • 工作窃取调度器
  • 无锁队列实现
  • LIFO Slot 优化
  • 协作式调度
  • 零成本抽象

扩展阅读

感谢阅读! 🦀