基于 Tokio 1.x 源码深度解析
2026-03-07 | 技术深度解读
第一部分:基础架构
第二部分:演进历史
第三部分:核心模块
第四部分:核心算法
Tokio 是 Rust 生态中最流行的异步运行时,提供了构建异步应用所需的核心组件。
核心特性
运行时组成
Tokio 运行时采用分层架构,核心层为调度器,外围为各种驱动。
┌─────────────────────────────────────────────┐
│ Application │
├─────────────────────────────────────────────┤
│ spawn() │ block_on() │ handle │
├───────────────┴──────────────────┴──────────┤
│ Runtime Core │
│ ┌─────────────────────────────────────────┐│
│ │ Scheduler (调度器) ││
│ │ ┌─────────────┐ ┌──────────────────┐ ││
│ │ │ Worker 0 │ │ Worker 1..N │ ││
│ │ │ LocalQueue │ │ LocalQueue │ ││
│ │ └─────────────┘ └──────────────────┘ ││
│ │ │ │ ││
│ │ └──────────────┘ ││
│ │ Injector Queue ││
│ └─────────────────────────────────────────┘│
│ ┌─────────┐ ┌─────────┐ ┌────────────────┐ │
│ │I/O Driver│ │ Timer │ │ Blocking Pool │ │
│ └─────────┘ └─────────┘ └────────────────┘ │
└─────────────────────────────────────────────┘
| 组件 | 文件路径 | 职责 |
|---|---|---|
| Runtime | runtime/mod.rs | 运行时入口,管理生命周期 |
| Scheduler | scheduler/mod.rs | 任务调度核心 |
| MultiThread | scheduler/multi_thread/ | 多线程工作窃取调度器 |
| Worker | scheduler/multi_thread/worker.rs | 工作线程执行逻辑 |
| Queue | scheduler/multi_thread/queue.rs | 无锁任务队列 |
| Task | task/mod.rs | 任务抽象与状态管理 |
| Harness | task/harness.rs | 任务执行引擎 |
CurrentThread (单线程)
// 单线程运行时
#[tokio::main(flavor = "current_thread")]
async fn main() {
// ...
}
MultiThread (多线程)
// 多线程运行时 (默认)
#[tokio::main(flavor = "multi_thread")]
async fn main() {
// ...
}
Rust 异步编程经历了从回调到 Future 再到 async/await 的演进。
| 阶段 | 方式 | 问题 |
|---|---|---|
| 1.0 | 回调函数 | 回调地狱、难以组合 |
| 1.19 | Future trait | 手动状态机、复杂 |
| 1.36 | async/await 稳定 | 零成本抽象、简洁 |
| 1.39 | async fn in traits | 更强大的抽象 |
| 1.75 | async closure | 更灵活的异步闭包 |
运行时
异步原语
网络框架
数据库
Runtime 是 Tokio 运行时的入口点,负责初始化和管理所有组件。
pub struct Runtime {
// 调度器 - 核心组件
scheduler: Scheduler,
// I/O 驱动
io_handle: io::Handle,
// 时间驱动
time_handle: time::Handle,
// 信号驱动
signal_handle: signal::Handle,
// 阻塞线程池
blocking_spawner: blocking::Spawner,
}
enum Scheduler {
// 单线程调度器
CurrentThread(current_thread::Scheduler),
// 多线程调度器
MultiThread(multi_thread::Scheduler),
}
Handle 是对运行时的句柄,用于在异步上下文中访问运行时功能。
#[derive(Debug, Clone)]
pub(crate) enum Handle {
// 单线程运行时句柄
#[cfg(feature = "rt")]
CurrentThread(Arc<current_thread::Handle>),
// 多线程运行时句柄
#[cfg(feature = "rt-multi-thread")]
MultiThread(Arc<multi_thread::Handle>),
}
impl Handle {
// 获取当前运行时句柄
pub(crate) fn current() -> Handle {
match context::with_current(Clone::clone) {
Ok(handle) => handle,
Err(e) => panic!("{}", e),
}
}
// 生成新任务
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where F: Future + Send + 'static {
match self {
Handle::CurrentThread(h) => h.spawn(future),
Handle::MultiThread(h) => h.spawn(future),
}
}
}
Context 保存当前执行上下文,用于任务执行期间访问调度器状态。
pub(super) enum Context {
// 单线程上下文
CurrentThread(current_thread::Context),
// 多线程上下文
#[cfg(feature = "rt-multi-thread")]
MultiThread(multi_thread::Context),
}
impl Context {
// 延迟唤醒任务
pub(crate) fn defer(&self, waker: &Waker) {
match self {
Context::CurrentThread(ctx) => ctx.defer(waker),
Context::MultiThread(ctx) => ctx.defer(waker),
}
}
}
设计要点:Context 通过 thread-local 存储,确保每个线程都能访问自己的执行上下文。
Scheduler 是任务调度的核心,负责决定何时何地执行任务。
cfg_rt! {
pub(crate) mod current_thread;
pub(crate) use current_thread::CurrentThread;
mod defer;
use defer::Defer;
pub(crate) mod inject;
pub(crate) use inject::Inject;
}
cfg_rt_multi_thread! {
mod block_in_place;
pub(crate) use block_in_place::block_in_place;
mod lock;
use lock::Lock;
pub(crate) mod multi_thread;
pub(crate) use multi_thread::MultiThread;
}
pub(crate) struct MultiThread;
impl MultiThread {
pub(crate) fn new(
size: usize, // 工作线程数
driver: Driver, // I/O 驱动
driver_handle: driver::Handle, // 驱动句柄
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
timer_flavor: TimerFlavor,
) -> (MultiThread, Arc<Handle>, Launch) {
let parker = Parker::new(driver);
let (handle, launch) = worker::create(
size, parker, driver_handle,
blocking_spawner, seed_generator,
config, timer_flavor,
);
(MultiThread, handle, launch)
}
// 阻塞等待 Future 完成
pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F)
-> F::Output
where F: Future {
context::enter_runtime(handle, true, |blocking| {
blocking.block_on(future).expect("failed to park thread")
})
}
}
Worker 代表一个工作线程,持有调度器句柄和核心数据。
pub(super) struct Worker {
// 调度器句柄引用
handle: Arc<Handle>,
// Worker 索引
index: usize,
// 核心数据 (可被其他线程窃取)
core: AtomicCell<Core>,
}
// 每个线程的上下文
pub(crate) struct Context {
worker: Arc<Worker>,
core: RefCell<Option<Box<Core>>>,
defer: Defer, // 延迟唤醒队列
}
关键设计:Core 存储在 AtomicCell 中,可以被其他线程"偷走"以支持 block_in_place 功能。
struct Core {
// 调度计数器
tick: u32,
// LIFO 插槽 - 最后调度的任务
lifo_slot: Option<Notified>,
// LIFO 是否启用
lifo_enabled: bool,
// 本地运行队列
run_queue: queue::Local<Arc<Handle>>,
// 是否正在搜索工作
is_searching: bool,
// 是否已关闭
is_shutdown: bool,
// Parker - 线程等待机制
park: Option<Parker>,
// 统计信息
stats: Stats,
// 全局队列检查间隔
global_queue_interval: u32,
// 随机数生成器
rand: FastRand,
}
pub(crate) struct Shared {
// 每个 Worker 的远程状态
remotes: Box<[Remote]>,
// 全局注入队列
pub(super) inject: inject::Shared<Arc<Handle>>,
// 空闲 Worker 协调
idle: Idle,
// 所有活跃任务
pub(crate) owned: OwnedTasks<Arc<Handle>>,
// 同步锁保护的数据
pub(super) synced: Mutex<Synced>,
// 关闭时的 Core 收集
shutdown_cores: Mutex<Vec<Box<Core>>>,
// 配置
config: Config,
// 指标
scheduler_metrics: SchedulerMetrics,
worker_metrics: Box<[WorkerMetrics]>,
}
Remote 用于跨线程通信,允许其他线程与 Worker 交互。
struct Remote {
// 窃取句柄
pub(super) steal: queue::Steal<Arc<Handle>>,
// 唤醒 Worker
unpark: Unparker,
}
pub(crate) struct Synced {
// Idle 的同步状态
pub(super) idle: idle::Synced,
// Inject 的同步状态
pub(crate) inject: inject::Synced,
}
设计要点:Remote 实现了跨线程的任务窃取和通知机制,是工作窃取算法的基础。
impl Context {
fn run(&self, mut core: Box<Core>) -> RunResult {
// 重置 LIFO 状态
self.reset_lifo_enabled(&mut core);
core.stats.start_processing_scheduled_tasks();
while !core.is_shutdown {
// 增加调度计数
core.tick();
// 运行维护任务
core = self.maintenance(core);
// 获取下一个任务
if let Some(task) = core.next_task(&self.worker) {
core = self.run_task(task, core)?;
continue;
}
// 尝试从其他 Worker 窃取
if let Some(task) = core.steal_work(&self.worker) {
core = self.run_task(task, core)?;
} else {
// 没有工作,进入等待
core = self.park(core);
}
}
// ...
}
}
Task 是 Tokio 中异步任务的抽象,封装了 Future 及其状态。
//! 任务引用类型:
//! * OwnedTask - 存储在 OwnedTasks 中
//! * JoinHandle - 获取任务输出
//! * Waker - 唤醒任务
//! * Notified - 跟踪任务是否被通知
//! * Unowned - 不属于任何运行时的任务
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
raw: RawTask,
_p: PhantomData<S>,
}
// 被通知的任务
#[repr(transparent)]
pub(crate) struct Notified<S: 'static>(Task<S>);
// 本地通知的任务 (非 Send)
pub(crate) struct LocalNotified<S: 'static> {
task: Task<S>,
_not_send: PhantomData<*const ()>,
}
| 类型 | 用途 | 引用计数 |
|---|---|---|
| Task | 拥有的任务句柄 | 1 |
| Notified | 已通知待执行 | 1 |
| LocalNotified | 本地任务 (非 Send) | 1 |
| UnownedTask | 不属于运行时的任务 | 2 |
| JoinHandle | 获取任务输出 | 1 |
| Waker | 唤醒机制 | 可变 |
//! 状态字段使用原子 usize 存储多个位字段:
//!
//! * RUNNING - 任务是否正在被 poll 或取消 (作为锁)
//! * COMPLETE - Future 是否已完成 (完成后永不重置)
//! * NOTIFIED - 是否存在 Notified 对象
//! * CANCELLED - 任务是否应被取消
//! * JOIN_INTEREST - 是否存在 JoinHandle
//! * JOIN_WAKER - JoinHandle waker 访问控制位
//!
//! 其余位用于引用计数
const RUNNING: usize = 1 << 0;
const COMPLETE: usize = 1 << 1;
const NOTIFIED: usize = 1 << 2;
const CANCELLED: usize = 1 << 3;
const JOIN_INTEREST: usize = 1 << 4;
const JOIN_WAKER: usize = 1 << 5;
// 引用计数在高位
const REF_COUNT_MASK: usize = !((1 << 6) - 1);
pub(crate) trait Schedule: Sync + Sized + 'static {
/// 释放任务,返回任务引用 (如果存在)
fn release(&self, task: &Task<Self>) -> Option<Task<Self>>;
/// 调度任务执行
fn schedule(&self, task: Notified<Self>);
/// 获取钩子
fn hooks(&self) -> TaskHarnessScheduleHooks;
/// 让出当前线程
fn yield_now(&self, task: Notified<Self>) {
self.schedule(task);
}
/// 处理 panic
fn unhandled_panic(&self) {
// 默认不做任何事
}
}
关键点:Schedule trait 定义了调度器与任务之间的接口,Handle 实现了此 trait。
pub(super) struct Harness<T: Future, S: 'static> {
cell: NonNull<Cell<T, S>>,
}
impl<T, S> Harness<T, S>
where T: Future, S: Schedule {
/// Poll 内部 Future,消费一个引用计数
pub(super) fn poll(self) {
match self.poll_inner() {
PollFuture::Notified => {
// 任务被重新通知,让出执行
self.core().scheduler.yield_now(Notified(...));
self.drop_reference();
}
PollFuture::Complete => {
self.complete();
}
PollFuture::Dealloc => {
self.dealloc();
}
PollFuture::Done => (),
}
}
}
fn poll_inner(&self) -> PollFuture {
match self.state().transition_to_running() {
TransitionToRunning::Success => {
// 创建 waker 和 context
let waker_ref = waker_ref::<S>(&header_ptr);
let cx = Context::from_waker(&waker_ref);
// Poll Future
let res = poll_future(self.core(), cx);
if res == Poll::Ready(()) {
return PollFuture::Complete;
}
// 转换回空闲状态
let transition_res = self.state().transition_to_idle();
// ...
}
TransitionToRunning::Cancelled => {
cancel_task(self.core());
PollFuture::Complete
}
// ...
}
}
fn complete(self) {
// 转换到完成状态
let snapshot = self.state().transition_to_complete();
let _ = panic::catch_unwind(|| {
if !snapshot.is_join_interested() {
// 没有 JoinHandle,直接丢弃输出
self.core().drop_future_or_output();
} else if snapshot.is_join_waker_set() {
// 唤醒 JoinHandle
self.trailer().wake_join();
if !self.state().unset_waker_after_complete()
.is_join_interested() {
unsafe { self.trailer().set_waker(None); };
}
}
});
// 从调度器释放
let num_release = self.release();
if self.state().transition_to_terminal(num_release) {
self.dealloc();
}
}
Queue 实现了无锁的工作窃取队列,是 Tokio 高性能的关键。
pub(crate) struct Local<T: 'static> {
inner: Arc<Inner<T>>,
}
pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>);
pub(crate) struct Inner<T: 'static> {
// Head: LSB = 真实 head, MSB = 窃取者正在操作的 head
head: AtomicUnsignedLong,
// Tail: 仅由生产者更新
tail: AtomicUnsignedShort,
// 环形缓冲区
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; 256]>,
}
const LOCAL_QUEUE_CAPACITY: usize = 256;
Local (生产者)
impl<T> Local<T> {
pub(crate) fn pop(&mut self)
-> Option<Notified<T>> {
// CAS 操作获取任务
}
}
Steal (消费者)
impl<T> Steal<T> {
pub(crate) fn steal_into(
&self,
dst: &mut Local<T>,
) -> Option<Notified<T>> {
// 窃取一半任务
}
}
pub(crate) fn push_back_or_overflow<O: Overflow<T>>(
&mut self,
mut task: Notified<T>,
overflow: &O,
) {
let tail = loop {
let head = self.inner.head.load(Acquire);
let (steal, real) = unpack(head);
let tail = unsafe { self.inner.tail.unsync_load() };
if tail.wrapping_sub(steal) < 256 {
// 队列有空间
break tail;
} else if steal != real {
// 正在被窃取,空间会释放
overflow.push(task);
return;
} else {
// 队列满,移动一半到 inject queue
match self.push_overflow(task, real, tail, overflow) {
Ok(_) => return,
Err(v) => task = v, // CAS 失败,重试
}
}
};
self.push_back_finish(task, tail);
}
pub(crate) fn steal_into(
&self,
dst: &mut Local<T>,
) -> Option<Notified<T>> {
let n = loop {
let (src_head_steal, src_head_real) = unpack(prev_packed);
let src_tail = self.0.tail.load(Acquire);
// 如果不匹配,说明有其他线程在窃取
if src_head_steal != src_head_real {
return None;
}
// 计算要窃取的数量 (一半)
let n = src_tail.wrapping_sub(src_head_real);
let n = n - n / 2; // 窃取一半
// CAS 更新 head
let res = self.0.head.compare_exchange(
prev_packed, next_packed, AcqRel, Acquire
);
match res {
Ok(_) => break n,
Err(actual) => prev_packed = actual,
}
};
// 复制任务到目标队列...
}
pub(super) fn create(
size: usize, // Worker 数量
park: Parker,
// ...其他参数
) -> (Arc<Handle>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
// 创建每个 Worker 的本地队列和核心
for _ in 0..size {
let (steal, run_queue) = queue::local();
let unpark = park.clone().unpark();
cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
run_queue,
park: Some(park.clone()),
// ...
}));
remotes.push(Remote { steal, unpark });
}
// 创建共享状态
let handle = Arc::new(Handle { ... });
(handle, Launch(workers))
}
fn run(worker: Arc<Worker>) {
// 获取 Core,如果已被其他线程拿走则退出
let core = match worker.core.take() {
Some(core) => core,
None => return,
};
// 设置线程上下文
let handle = scheduler::Handle::MultiThread(worker.handle.clone());
context::enter_runtime(&handle, true, |_| {
let cx = scheduler::Context::MultiThread(Context {
worker,
core: RefCell::new(None),
defer: Defer::new(),
});
context::set_scheduler(&cx, || {
// 主循环
assert!(cx.expect_multi_thread().run(core).is_err());
cx.defer.wake();
});
});
}
fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
let task = self.worker.handle.shared.owned.assert_owner(task);
// 确保不在搜索状态
core.transition_from_searching(&self.worker);
core.stats.start_poll();
// 将 Core 放入上下文
*self.core.borrow_mut() = Some(core);
// 使用协作式调度执行
coop::budget(|| {
task.run(); // 执行任务
// 处理 LIFO slot 中的任务
loop {
let mut core = self.core.borrow_mut().take()?;
let task = match core.lifo_slot.take() {
Some(t) => t,
None => return Ok(core),
};
if !coop::has_budget_remaining() {
// 预算用完,放回队列
core.run_queue.push_back_or_overflow(task, ...);
return Ok(core);
}
// 执行 LIFO 任务
*self.core.borrow_mut() = Some(core);
self.worker.handle.shared.owned.assert_owner(task).run();
}
})
}
fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
// 转换到搜索状态
if !self.transition_to_searching(worker) {
return None; // 已有足够多的搜索者
}
let num = worker.handle.shared.remotes.len();
// 随机起始点
let start = self.rand.fastrand_n(num as u32) as usize;
for i in 0..num {
let i = (start + i) % num;
if i == worker.index { continue; } // 跳过自己
let target = &worker.handle.shared.remotes[i];
if let Some(task) = target.steal.steal_into(
&mut self.run_queue, &mut self.stats
) {
return Some(task);
}
}
// 回退到全局队列
worker.handle.next_remote_task()
}
fn park(&self, mut core: Box<Core>) -> Box<Core> {
// 转换到休眠状态
if core.transition_to_parked(&self.worker) {
while !core.is_shutdown {
core.stats.about_to_park();
// 取出 Parker
let mut park = core.park.take().expect("park missing");
*self.core.borrow_mut() = Some(core);
// 阻塞等待
park.park(&self.worker.handle.driver);
core = self.core.borrow_mut().take().expect("core missing");
core.park = Some(park);
// 运行维护任务
core.maintenance(&self.worker);
if core.transition_from_parked(&self.worker) {
break; // 有工作了
}
}
}
core
}
block_in_place 允许在异步上下文中执行阻塞操作,而不阻塞整个运行时。
pub(crate) fn block_in_place<F, R>(f: F) -> R
where F: FnOnce() -> R {
// ...
// 从上下文取出 Core
let mut core = cx.core.borrow_mut().take()?;
// 将 LIFO slot 的任务移到队列
if let Some(task) = core.lifo_slot.take() {
core.run_queue.push_back_or_overflow(task, ...);
}
// 将 Core 放回 Worker,允许其他线程接管
cx.worker.core.set(core);
// 启动新线程运行 Worker
let worker = cx.worker.clone();
runtime::spawn_blocking(move || run(worker));
// 执行阻塞操作
crate::runtime::context::exit_runtime(f)
}
pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
with_current(|maybe_cx| {
if let Some(cx) = maybe_cx {
// 确保是当前调度器
if self.ptr_eq(&cx.worker.handle) {
if let Some(core) = cx.core.borrow_mut().as_mut() {
self.schedule_local(core, task, is_yield);
return;
}
}
}
// 远程调度 - 使用 inject queue
self.push_remote_task(task);
self.notify_parked_remote();
});
}
fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
if is_yield || !core.lifo_enabled {
// 放入队列尾部
core.run_queue.push_back_or_overflow(task, ...);
} else {
// 放入 LIFO slot
core.lifo_slot = Some(task);
}
}
LIFO Slot 是 Tokio 的关键优化,提高了消息传递模式的性能。
工作原理
限制机制
const MAX_LIFO_POLLS_PER_TICK: usize = 3;
if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
core.lifo_enabled = false; // 禁用 LIFO
}
工作窃取 (Work Stealing) 实现了自动负载均衡,充分利用多核。
算法步骤
关键特性
Injector Queue 是全局任务队列,用于外部提交任务。
pub struct Shared<S: 'static> {
// 内部队列
inner: Mutex<Inner<S>>,
// 关闭标志
closed: AtomicBool,
}
impl<S: 'static> Shared<S> {
// 推入任务
pub fn push(&self, synced: &mut Synced, task: Notified<S>) {
self.inner.lock().queue.push_back(task);
}
// 弹出任务
pub fn pop(&self, synced: &mut Synced) -> Option<Notified<S>> {
self.inner.lock().queue.pop_front()
}
// 批量弹出
pub fn pop_n(&self, synced: &mut Synced, n: usize)
-> impl Iterator<Item = Notified<S>> { ... }
}
Idle 组件协调空闲 Worker,决定何时唤醒以及谁来处理新任务。
pub(crate) struct Idle {
// 同步状态
synced: Mutex<Synced>,
// 原子搜索者计数
num_searching: AtomicUsize,
}
impl Idle {
// Worker 转换到搜索状态
pub fn transition_worker_to_searching(&self) -> bool {
// 限制搜索者数量不超过 Worker 数的一半
}
// Worker 转换到休眠状态
pub fn transition_worker_to_parked(&self, ...) -> bool {
// 如果是最后一个搜索者,需要最后检查一次
}
// 选择要唤醒的 Worker
pub fn worker_to_notify(&self, shared: &Shared) -> Option<usize> {
// 返回一个休眠中的 Worker 索引
}
}
Shutdown 是分阶段进行的,确保所有任务都能正确清理。
//! 关闭流程:
//! 1. Shared::close() 关闭 inject queue 和 OwnedTasks
//! 2. Worker 在 maintenance 中检测到关闭信号
//! 3. 设置 Core::is_shutdown = true
//! 4. Worker 并行执行 pre_shutdown,清空 OwnedTasks
//! 5. Worker 调用 Shared::shutdown 进入单线程阶段
//! 6. 最后一个 Worker 清空本地队列和注入队列
//! 7. 完成关闭
impl Shared {
pub(super) fn close(&self) {
if self.inject.close(&mut synced.inject) {
self.notify_all(); // 唤醒所有 Worker
}
}
}
工作窃取模式
Actor 模式
无锁数据结构
协作式调度
┌─────────────────────────────────────────────────────────────┐
│ Runtime │
├─────────────────────────────────────────────────────────────┤
│ - scheduler: Scheduler │
│ - io_handle, time_handle, signal_handle │
│ - blocking_spawner │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Handle (Arc) │
├─────────────────────────────────────────────────────────────┤
│ - shared: Shared │
│ - driver: driver::Handle │
│ - blocking_spawner │
└─────────────────────────────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Worker[0] │ │ Worker[1] │ │ Worker[N] │
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
│ - index: 0 │ │ - index: 1 │ │ - index: N │
│ - core: Core │ │ - core: Core │ │ - core: Core │
│ - run_queue │ │ - run_queue │ │ - run_queue │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└───────────────────┴───────────────────┘
│
▼
┌─────────────────────┐
│ Injector Queue │
└─────────────────────┘
spawn(task)
│
▼
┌──────────────────────────────────────┐
│ 当前是否在 Worker 线程? │
└──────────────────────────────────────┘
│ │
Yes No
│ │
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ LIFO Slot? │ │ Injector Queue │
│ 或 LocalQueue│ │ (全局队列) │
└──────────────┘ └──────────────────┘
│ │
│ ▼
│ ┌──────────────────┐
│ │ 通知空闲 Worker │
│ └──────────────────┘
│ │
└────────────────────┘
│
▼
┌──────────────────┐
│ Worker 执行任务 │
└──────────────────┘
Worker 没有本地任务
│
▼
┌─────────────────────────────────────┐
│ transition_to_searching() │
│ (限制搜索者数量) │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ 随机选择起始 Worker │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ 遍历所有 Worker (从随机点开始) │
│ - 尝试 steal_into() │
│ - 窃取一半任务 │
└─────────────────────────────────────┘
│ │
成功 失败
│ │
▼ ▼
执行任务 尝试下一个
│
▼
┌───────────────────┐
│ 检查 Injector Queue│
└───────────────────┘
│
▼
┌───────────────────┐
│ 没有工作则 park │
└───────────────────┘
缓存优化
无锁设计
减少唤醒
协作调度
| 特性 | Tokio | async-std | Go Runtime |
|---|---|---|---|
| 调度算法 | 工作窃取 | 工作窃取 | 工作窃取 |
| 栈大小 | 无栈 | 无栈 | 2KB 起 |
| 抢占式 | 协作式 | 协作式 | 抢占式 |
| 零成本 | ✅ | ✅ | ❌ |
| 生态系统 | 丰富 | 中等 | 标准库 |
配置建议
// 推荐
#[tokio::main(flavor = "multi_thread",
worker_threads = 4)]
async fn main() { }
避免阻塞
// 阻塞操作
tokio::task::spawn_blocking(|| {
// 阻塞代码
}).await?;
阻塞异步上下文
// ❌ 错误
async fn bad() {
std::thread::sleep(
Duration::from_secs(1)
);
}
// ✅ 正确
async fn good() {
tokio::time::sleep(
Duration::from_secs(1)
).await;
}
持有锁跨 await
// ❌ 错误
async fn bad() {
let lock = mutex.lock();
async_op().await; // 死锁风险
}
// ✅ 正确
async fn good() {
{
let lock = mutex.lock();
// 同步操作
}
async_op().await;
}
Tokio 是 Rust 异步编程的核心,理解其内部机制有助于编写高性能异步代码。
核心要点
感谢阅读! 🦀