🦋 Dart Isolate 机制深度解析

基于 dart-lang/sdk 源码的技术剖析

深入理解 Dart 并发模型核心实现
2026-03-04 | 技术深度解读

📋 目录

Part 1: 基础概念

  • Isolate 概述
  • 为什么需要 Isolate
  • Isolate vs Thread

Part 2: 核心架构

  • Isolate / IsolateGroup / Thread
  • Port 与消息传递
  • ThreadPool 与 Worker

Part 3: 关键机制

  • Safepoint 机制
  • 消息处理流程
  • 生命周期管理

Part 4: 高级特性

  • Hot Reload 支持
  • FFI 回调
  • 性能优化

🦋 Isolate 概述

Isolate 是 Dart 的并发单元,类似于 Actor 模型中的 Actor。

核心特征:

  • 每个 Isolate 拥有独立的内存堆
  • Isolate 之间不共享内存
  • 通过消息传递进行通信
  • 每个 Isolate 有自己的事件循环

设计哲学:

"Do not communicate by sharing memory; instead, share memory by communicating."

— Go 语言箴言(同样适用于 Dart)

🤔 为什么需要 Isolate?

Dart 是单线程执行模型,所有 Dart 代码在一个线程中运行。

问题:

  • CPU 密集型任务会阻塞 UI
  • 长时间计算导致帧丢失
  • 无法利用多核 CPU

Isolate 解决方案:

  • 在独立线程中运行 Dart 代码
  • 不阻塞主 Isolate(UI 线程)
  • 利用多核并行处理
  • 内存隔离保证线程安全

⚖️ Isolate vs Thread

特性 Isolate Thread (Java/C++)
内存模型 独立堆内存 共享堆内存
通信方式 消息传递 共享内存 + 锁
数据同步 无需同步 需要锁/Mutex
创建开销 较大(~2MB) 较小(~1MB)
调试难度 简单 复杂(竞态条件)
错误隔离 Isolate 崩溃不影响其他 可能影响整个进程

🏗️ 核心架构概览

┌─────────────────────────────────────────────────────┐
│                    IsolateGroup                      │
│  ┌───────────────────────────────────────────────┐  │
│  │              ObjectStore (共享)                 │  │
│  │              ClassTable (共享)                  │  │
│  │              Heap (共享)                        │  │
│  └───────────────────────────────────────────────┘  │
│                                                      │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐             │
│  │Isolate 1│  │Isolate 2│  │Isolate N│             │
│  │  Thread │  │  Thread │  │  Thread │             │
│  │  Port   │  │  Port   │  │  Port   │             │
│  │  Queue  │  │  Queue  │  │  Queue  │             │
│  └─────────┘  └─────────┘  └─────────┘             │
│                                                      │
│  ┌───────────────────────────────────────────────┐  │
│  │              ThreadPool                        │  │
│  │  Worker1  Worker2  Worker3  ...               │  │
│  └───────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────┘

📦 Isolate 类定义

class Isolate : public IntrusiveDListEntry<Isolate> {
 public:
  // Isolate API 消息类型
  enum LibMsgId {
    kPauseMsg = 1, kResumeMsg = 2, kPingMsg = 3,
    kKillMsg = 4, kAddExitMsg = 5, kDelExitMsg = 6,
    kAddErrorMsg = 7, kDelErrorMsg = 8, kErrorFatalMsg = 9,
    // 内部消息
    kInterruptMsg = 10, kInternalKillMsg = 11,
    kDrainServiceExtensionsMsg = 12, kCheckForReload = 13,
  };

  // 获取当前 Isolate
  static inline Isolate* Current() {
    Thread* thread = Thread::Current();
    return thread == nullptr ? nullptr : thread->isolate();
  }

  // 核心成员
  IsolateGroup* group() const { return isolate_group_; }
  Dart_Port main_port() const { return main_port_; }
  const char* name() const { return name_; }
  Thread* mutator_thread() const { return mutator_thread_; }
};

👥 IsolateGroup 类

IsolateGroup 是一组共享资源的 Isolate 集合。

共享资源:

  • ObjectStore - 共享对象存储
  • ClassTable - 类定义表
  • Heap - 共享堆内存
  • ThreadPool - 线程池
  • BackgroundCompiler - 后台编译器
class IsolateGroup : public IntrusiveDListEntry<IsolateGroup> {
  // 访问当前 IsolateGroup
  static inline IsolateGroup* Current() {
    Thread* thread = Thread::Current();
    return thread == nullptr ? nullptr : thread->isolate_group();
  }
  
  // 核心资源
  Heap* heap() const { return heap_.get(); }
  ClassTable* class_table() const { return class_table_; }
  ObjectStore* object_store() const { return object_store_.get(); }
  ThreadPool* thread_pool() { return thread_pool_.get(); }
};

📋 IsolateGroup 核心字段

class IsolateGroup {
 private:
  // 从生成代码访问
  ClassTable* class_table_;
  AcqRelAtomic<ClassPtr*> cached_class_table_table_;
  std::unique_ptr<ObjectStore> object_store_;
  
  // 核心资源
  std::unique_ptr<Heap> heap_;
  std::unique_ptr<ThreadPool> thread_pool_;
  std::unique_ptr<SafepointHandler> safepoint_handler_;
  std::unique_ptr<ThreadRegistry> thread_registry_;
  
  // Isolate 管理
  std::unique_ptr<SafepointRwLock> isolates_lock_;
  IntrusiveDList<Isolate> isolates_;
  intptr_t isolate_count_ = 0;
  
  // Mutator 管理
  IntrusiveDList<Thread> mutators_;
  intptr_t group_mutator_count_ = 0;
  
  // 并发控制
  Mutex symbols_mutex_;
  Mutex type_canonicalization_mutex_;
  std::unique_ptr<SafepointRwLock> program_lock_;
};

🧵 Thread 类定义

class Thread : public ThreadState, public IntrusiveDListEntry<Thread> {
 public:
  // 当前执行线程
  static Thread* Current() {
    return static_cast<Thread*>(OSThread::CurrentVMThread());
  }
  
  // 关联的 Isolate 和 IsolateGroup
  Isolate* isolate() const { return isolate_; }
  IsolateGroup* isolate_group() const { return isolate_group_; }
  
  // 进入/退出 Isolate
  static void EnterIsolate(Isolate* isolate);
  static void ExitIsolate(bool isolate_shutdown = false);
  
  // 堆访问
  Heap* heap() const;
  
  // TLAB 内存边界
  uword top() const;
  uword end() const;
  uword true_end() const;
  
  // Safepoint 相关
  bool IsAtSafepoint() const;
  void EnterSafepoint();
  void ExitSafepoint();
};

📝 Thread 任务类型

class Thread {
 public:
  enum TaskKind {
    kUnknownTask = 0,         // 未知任务
    kMutatorTask,             // Mutator 线程(执行 Dart 代码)
    kCompilerTask,            // 编译任务
    kMarkerTask,              // GC 标记任务
    kSweeperTask,             // GC 清扫任务
    kCompactorTask,           // GC 压缩任务
    kScavengerTask,           // GC Scavenger 任务
    kSampleBlockTask,         // 采样块任务
    kIncrementalCompactorTask,// 增量压缩任务
    kSpawnTask,               // Isolate 生成任务
    kIsolateGroupBoundCallbackTask, // IsolateGroup 绑定回调
  };
  
  TaskKind task_kind() const {
    return task_kind_.load(std::memory_order_acquire);
  }
  
  bool IsDartMutatorThread() const { 
    return task_kind_ == kMutatorTask; 
  }
};

🔄 Thread 执行状态

class Thread {
 public:
  enum ExecutionState {
    kThreadInVM = 0,          // 在 VM 代码中
    kThreadInGenerated,       // 在生成代码中(Dart 代码)
    kThreadInNative,          // 在 Native 代码中
    kThreadInBlockedState,    // 阻塞状态
    kThreadInReloadableBlockedState, // 可重载的阻塞状态
  };
  
  ExecutionState execution_state() const {
    return static_cast<ExecutionState>(execution_state_);
  }
  
  bool IsExecutingDartCode() const;
  bool HasExitedDartCode() const;
  
  // 栈限制(用于栈溢出检测)
  uword stack_limit_address() const;
  void SetStackLimit(uword value);
  void ClearStackLimit();
};

📨 Port 与消息传递

Port 是 Isolate 间通信的端点。

SendPort

  • 用于发送消息
  • 可以跨 Isolate 传递
  • 对应一个 ReceivePort

ReceivePort

  • 用于接收消息
  • 每个 Isolate 有 main_port
  • 维护消息队列
// Dart 代码示例
final receivePort = ReceivePort();
final sendPort = receivePort.sendPort;

// 在另一个 Isolate 中
Isolate.spawn(entryPoint, sendPort);

void entryPoint(SendPort port) {
  port.send('Hello from isolate!');
}

🗺️ PortMap 类

class PortMap : public AllStatic {
 public:
  // 为 handler 分配端口
  static Dart_Port CreatePort(PortHandler* handler);
  
  // 关闭端口
  static bool ClosePort(Dart_Port id, PortHandler** port_handler = nullptr);
  
  // 关闭 handler 的所有端口
  static void ClosePorts(MessageHandler* handler);
  
  // 发送消息到端口
  static bool PostMessage(std::unique_ptr<Message> message,
                         bool before_events = false);
  
  // 获取端口来源 ID
  static Dart_Port GetOriginId(Dart_Port id);
  
  // 检查端口是否属于当前线程
  static bool IsOwnedByCurrentThread(Dart_Port id);

 private:
  struct Entry : public PortSet<Entry>::Entry {
    Entry() : handler(nullptr) {}
    Entry(Dart_Port port, PortHandler* handler)
        : PortSet<Entry>::Entry(port), handler(handler) {}
    PortHandler* handler;
  };
  
  static Mutex* mutex_;
  static PortSet<Entry>* ports_;
  static Random* prng_;
};

📬 Message 类

class Message {
 public:
  typedef enum {
    kNormalPriority = 0,  // 空闲时传递
    kOOBPriority = 1,     // 尽快传递(带外消息)
  } Priority;
  
  // 带外消息类型
  typedef enum {
    kIllegalOOB = 0,
    kServiceOOBMsg = 1,         // VM Service 消息
    kIsolateLibOOBMsg = 2,      // Isolate 库消息
    kDelayedIsolateLibOOBMsg = 3, // 延迟的 Isolate 库消息
  } OOBMsgTag;
  
  // 构造函数
  Message(Dart_Port dest_port, uint8_t* snapshot, 
          intptr_t snapshot_length, Priority priority);
  Message(Dart_Port dest_port, ObjectPtr raw_obj, Priority priority);
  Message(Dart_Port dest_port, PersistentHandle* handle, Priority priority);
  
  // 消息类型判断
  bool IsOOB() const { return priority_ == kOOBPriority; }
  bool IsSnapshot() const;
  bool IsRaw() const { return snapshot_length_ == 0; }
  bool IsPersistentHandle() const;
};

📬 MessageQueue 类

class MessageQueue {
 public:
  MessageQueue();
  
  // 入队
  void Enqueue(std::unique_ptr<Message> msg, bool before_events);
  
  // 出队(非阻塞)
  std::unique_ptr<Message> Dequeue();
  
  bool IsEmpty() { return head_ == nullptr; }
  
  // 清空所有消息
  void Clear();
  
  // 迭代器
  class Iterator {
   public:
    explicit Iterator(const MessageQueue* queue);
    bool HasNext();
    Message* Next();
   private:
    Message* next_;
  };
  
  intptr_t Length() const;
  Message* FindMessageById(intptr_t id);
  void PrintJSON(JSONStream* stream);

 private:
  Message* head_;
  Message* tail_;
};

🔧 ThreadPool 架构

┌────────────────────────────────────────────┐
│               ThreadPool                    │
│  ┌──────────────────────────────────────┐  │
│  │  pool_mutex_ (保护所有状态)            │  │
│  │                                      │  │
│  │  ┌──────────┐  ┌──────────┐         │  │
│  │  │ Running  │  │  Idle    │         │  │
│  │  │ Workers  │  │ Workers  │         │  │
│  │  │ ──────── │  │ ──────── │         │  │
│  │  │ Worker1  │  │ Worker5  │         │  │
│  │  │ Worker2  │  │ Worker6  │         │  │
│  │  │ Worker3  │  │ ...      │         │  │
│  │  │ Worker4  │  │          │         │  │
│  │  └──────────┘  └──────────┘         │  │
│  │                                      │  │
│  │  ┌────────────────────────────────┐ │  │
│  │  │      Pending Tasks             │ │  │
│  │  │  Task1 → Task2 → Task3 → ...   │ │  │
│  │  └────────────────────────────────┘ │  │
│  └──────────────────────────────────────┘  │
└────────────────────────────────────────────┘

📋 ThreadPool Task

class ThreadPool {
 public:
  // Task 基类
  class Task : public IntrusiveDListEntry<Task> {
   public:
    virtual ~Task() {}
    // 子类实现具体任务逻辑
    virtual void Run() = 0;
  };
  
  // 运行任务
  template <typename T, typename... Args>
  bool Run(Args&&... args) {
    return RunImpl(std::unique_ptr<Task>(
        new T(std::forward<Args>(args)...)));
  }
  
  // 标记线程阻塞/解除阻塞
  void MarkCurrentWorkerAsBlocked();
  void MarkCurrentWorkerAsUnBlocked();
  
  // 关闭线程池
  void Shutdown();
  
 protected:
  // 子类可重写:进入空闲时的回调
  virtual void OnEnterIdleLocked(MutexLocker* ml, Worker* worker) {}
  
 private:
  bool RunImpl(std::unique_ptr<Task> task);
  void WorkerLoop(Worker* worker);
};

👷 ThreadPool Worker

class ThreadPool {
 protected:
  class Worker : public IntrusiveDListEntry<Worker> {
   public:
    explicit Worker(ThreadPool* pool);
    
    // 启动工作线程
    void StartThread();
    
    // 休眠(等待任务或超时)
    ConditionVariable::WaitResult Sleep(int64_t timeout_micros) {
      return wakeup_cv_.WaitMicros(&pool_->pool_mutex_, timeout_micros);
    }
    
   private:
    // 工作线程入口点
    static void Main(uword args);
    
    ThreadPool* pool_;
    ThreadJoinId join_id_;
    OSThread* os_thread_ = nullptr;
    bool is_blocked_ = false;
    ConditionVariable wakeup_cv_;
  };
  
 private:
  mutable Mutex pool_mutex_;
  bool shutting_down_ = false;
  uint64_t count_running_ = 0;
  uint64_t count_idle_ = 0;
  WorkerList running_workers_;
  WorkerList idle_workers_;
  TaskList tasks_;
};

🛡️ Safepoint 机制

Safepoint 是一个全局同步点,所有线程在此处停止。

为什么需要 Safepoint?

  • GC - 垃圾回收需要一致的堆状态
  • Deopt - 去优化需要安全的栈状态
  • Reload - 热重载需要一致的程序状态
class Thread {
 public:
  bool IsAtSafepoint() const {
    return IsAtSafepoint(SafepointLevel::kGC);
  }
  
  void EnterSafepoint();
  void ExitSafepoint();
  void BlockForSafepoint();
  
  bool IsSafepointRequested() const;
};

📊 SafepointLevel

enum SafepointLevel {
  // 安全进行 GC
  kGC,
  // 安全进行 GC 和去优化
  kGCAndDeopt,
  // 安全进行 GC、去优化和热重载
  kGCAndDeoptAndReload,
  // 层级数量
  kNumLevels,
  // 无 Safepoint
  kNoSafepoint,
};

层级关系:

  • 数字越大,保证越强
  • 达到 safepoint 的延迟随层级增加
  • 可能的安全点频率随层级减少
// 拥有更高层级 safepoint 意味着同时拥有低层级
bool OwnsGCSafepoint() const;
bool OwnsDeoptSafepoint() const;
bool OwnsReloadSafepoint() const;

🔐 Safepoint 状态管理

class Thread {
 public:
  // 位字段编码的 safepoint 状态
  using SafepointRequestedField = BitField<uword, bool, 0>;
  using DeoptSafepointRequestedField = BitField<uword, bool, 1>;
  using ReloadSafepointRequestedField = BitField<uword, bool, 2>;
  using AtSafepointField = BitField<uword, bool, 3>;
  using BlockedForSafepointField = BitField<uword, bool, 4>;
  
  // 尝试进入/退出 safepoint(快速路径)
  bool TryEnterSafepoint() {
    uword old_state = 0;
    uword new_state = AtSafepointBits(current_safepoint_level());
    return safepoint_state_.compare_exchange_strong(
        old_state, new_state, std::memory_order_release);
  }
  
  bool TryExitSafepoint() {
    uword old_state = AtSafepointBits(current_safepoint_level());
    uword new_state = 0;
    return safepoint_state_.compare_exchange_strong(
        old_state, new_state, std::memory_order_acquire);
  }
  
  // 慢速路径(需要锁)
  void EnterSafepointUsingLock();
  void ExitSafepointUsingLock();
};

🚀 Isolate 创建流程

// 1. 创建 IsolateGroup
IsolateGroup::IsolateGroup(
    std::shared_ptr<IsolateGroupSource> source,
    void* embedder_data,
    Dart_IsolateFlags api_flags,
    bool is_vm_isolate)
    : class_table_(nullptr),
      object_store_(new ObjectStore()),
      thread_pool_(new MutatorThreadPool(this, max_worker_threads)),
      isolates_lock_(new SafepointRwLock()),
      thread_registry_(new ThreadRegistry()),
      safepoint_handler_(new SafepointHandler(this)),
      heap_(nullptr),
      ...

// 2. 创建 Heap
void IsolateGroup::CreateHeap(bool is_vm_isolate,
                              bool is_service_or_kernel_isolate) {
  Heap::Init(this, is_vm_isolate,
             FLAG_new_gen_semi_max_size * MBInWords,
             FLAG_old_gen_heap_size * MBInWords);
}

// 3. 注册 IsolateGroup
IsolateGroup::RegisterIsolateGroup(this);

// 4. 创建第一个 Isolate
Isolate* Isolate::InitIsolate("main", this, flags);

📝 Isolate 注册与注销

// 注册 Isolate 到 IsolateGroup
void IsolateGroup::RegisterIsolate(Isolate* isolate) {
  SafepointWriteRwLocker ml(Thread::Current(), isolates_lock_.get());
  if (isolates_.IsEmpty()) {
    interrupt_port_ = isolate->main_port();
  }
  isolates_.Append(isolate);
  isolate_count_++;
}

// 从 IsolateGroup 注销 Isolate
void IsolateGroup::UnregisterIsolate(Isolate* isolate) {
  SafepointWriteRwLocker ml(Thread::Current(), isolates_lock_.get());
  isolates_.Remove(isolate);
  if (isolates_.IsEmpty()) {
    interrupt_port_ = ILLEGAL_PORT;
  } else {
    interrupt_port_ = isolates_.First()->main_port();
  }
}

// 注销并减少计数
bool IsolateGroup::UnregisterIsolateDecrementCount() {
  SafepointWriteRwLocker ml(Thread::Current(), isolates_lock_.get());
  isolate_count_--;
  return isolate_count_ == 0;
}

🏃 Isolate 运行机制

class Isolate {
 public:
  // 使 Isolate 可运行
  const char* MakeRunnable();
  void MakeRunnableLocked();
  
  // 运行 Isolate
  void Run();
  
  // 检查是否可运行
  bool is_runnable() const { 
    return isolate_flags_.Read<IsRunnableBit>(); 
  }
  void set_is_runnable(bool value) {
    isolate_flags_.UpdateBool<IsRunnableBit>(value);
  }
  
  // 消息处理器
  MessageHandler* message_handler() const;
  
  // 检查是否有待处理消息
  bool HasPendingMessages();
  
  // 调度中断
  void ScheduleInterrupts(uword interrupt_bits);
};

📨 消息处理流程

┌─────────────────────────────────────────────────────┐
│               消息处理流程                           │
│                                                     │
│  SendPort.send()                                    │
│       │                                             │
│       ▼                                             │
│  ┌─────────┐                                        │
│  │序列化    │ → Message (snapshot/raw_obj/handle)   │
│  └─────────┘                                        │
│       │                                             │
│       ▼                                             │
│  ┌─────────┐                                        │
│  │PortMap  │ → PostMessage(dest_port)               │
│  │.PostMsg │                                        │
│  └─────────┘                                        │
│       │                                             │
│       ▼                                             │
│  ┌─────────┐                                        │
│  │Message  │ → Enqueue(message)                     │
│  │Queue    │                                        │
│  └─────────┘                                        │
│       │                                             │
│       ▼                                             │
│  ┌─────────┐                                        │
│  │Message  │ → HandleMessage(message)               │
│  │Handler  │                                        │
│  └─────────┘                                        │
│       │                                             │
│       ▼                                             │
│  ┌─────────┐                                        │
│  │反序列化  │ → 执行 Dart 代码                       │
│  └─────────┘                                        │
└─────────────────────────────────────────────────────┘

📬 IsolateMessageHandler

class IsolateMessageHandler : public MessageHandler {
 public:
  explicit IsolateMessageHandler(Isolate* isolate);
  
  const char* name() const override { return isolate_->name(); }
  
  MessageStatus HandleMessage(std::unique_ptr<Message> message) override;
  
  // 处理 Isolate 库消息
  ErrorPtr HandleLibMessage(const Array& message);
  
  // 保持活跃检查
  bool KeepAliveLocked() override {
    return isolate_->HasLivePorts() || 
           isolate_->HasOpenNativeCallables();
  }
  
 private:
  Isolate* isolate_;
};

// 消息状态
enum MessageStatus {
  OK,           // 正常处理
  Error,        // 发生错误
  Shutdown,     // 需要关闭
};

⚡ OOB 消息处理

ErrorPtr IsolateMessageHandler::HandleLibMessage(const Array& message) {
  const intptr_t msg_type = Smi::Cast(message.At(1)).Value();
  
  switch (msg_type) {
    case Isolate::kPauseMsg:
      // [OOB, kPauseMsg, pause_capability, resume_capability]
      if (I->AddResumeCapability(Capability::Cast(obj))) {
        increment_paused();
      }
      break;
      
    case Isolate::kResumeMsg:
      if (I->RemoveResumeCapability(Capability::Cast(obj))) {
        decrement_paused();
      }
      break;
      
    case Isolate::kKillMsg:
    case Isolate::kInternalKillMsg:
      if (priority == Isolate::kImmediateAction) {
        Thread::Current()->StartUnwindError();
        return UnwindError::New(msg);
      }
      break;
      
    case Isolate::kPingMsg:
      // [OOB, kPingMsg, responsePort, priority, response]
      PortMap::PostMessage(SerializeMessage(send_port.Id(), response));
      break;
  }
}

🔄 Isolate 生命周期

                    ┌─────────┐
                    │ Created │
                    └────┬────┘
                         │ RegisterIsolate()
                         ▼
                    ┌─────────┐
          ┌────────│Runnable │←───────┐
          │        └────┬────┘        │
          │             │ Run()       │ Resume
          │             ▼             │
          │        ┌─────────┐        │
          │        │ Running │────────┤
          │        └────┬────┘        │
          │             │ Pause       │
          │             ▼             │
          │        ┌─────────┐────────┘
          │        │ Paused  │
          │        └────┬────┘
          │             │ Kill/Exit
          │             ▼
          │        ┌─────────┐
          └───────→│Shutdown │
                   └────┬────┘
                        │ UnregisterIsolate()
                        ▼
                   ┌─────────┐
                   │  Dead   │
                   └─────────┘

🛑 Isolate 关闭流程

void IsolateGroup::Shutdown() {
  // 1. 关闭线程池
  if (!is_vm_isolate_) {
    thread_pool_->Shutdown();
    thread_pool_.reset();
  }
  
  // 2. 增加待关闭计数
  {
    MonitorLocker ml(Isolate::isolate_creation_monitor_);
    Isolate::pending_shutdowns_++;
  }
  
  // 3. 注销 IsolateGroup
  UnregisterIsolateGroup(this);
  
  // 4. 等待 GC 任务完成
  if (heap_ != nullptr) {
    PageSpace* old_space = heap_->old_space();
    MonitorLocker ml(old_space->tasks_lock());
    while (old_space->tasks() > 0) {
      ml.Wait();
    }
    old_space->AbandonMarkingForShutdown();
  }
  
  // 5. 调用清理回调
  auto group_shutdown_callback = Isolate::GroupCleanupCallback();
  if (group_shutdown_callback != nullptr) {
    group_shutdown_callback(embedder_data());
  }
  
  // 6. 删除 IsolateGroup
  delete this;
}

⏰ IdleTimeHandler

class IdleTimeHandler : public ValueObject {
 public:
  // 使用堆初始化
  void InitializeWithHeap(Heap* heap);
  
  // 更新空闲开始时间
  void UpdateStartIdleTime();
  
  // 检查是否应该通知空闲
  bool ShouldNotifyIdle(int64_t* expiry);
  
  // 通知堆空闲(适合做压缩等操作)
  void NotifyIdle(int64_t deadline);
  
 private:
  Mutex mutex_;
  Heap* heap_ = nullptr;
  intptr_t disabled_counter_ = 0;
  int64_t idle_start_time_ = 0;
};

// 禁用空闲计时器的作用域
class DisableIdleTimerScope : public ValueObject {
 public:
  explicit DisableIdleTimerScope(IdleTimeHandler* handler);
  ~DisableIdleTimerScope();
};

👤 Mutator 管理

class IsolateGroup {
 public:
  // 增加 Mutator 计数
  void IncreaseMutatorCount(Thread* thread, 
                           bool is_nested_reenter,
                           bool was_stolen);
  
  // 减少 Mutator 计数
  void DecreaseMutatorCount(bool is_nested_exit);
  
  // 获取当前 Mutator 数量
  intptr_t MutatorCount() const { return active_mutators_; }
  
  // 注册/注销 Mutator
  void RegisterIsolateGroupMutator(Thread* mutator);
  void UnregisterIsolateGroupMutator(Thread* mutator);

 private:
  std::unique_ptr<Monitor> active_mutators_monitor_;
  intptr_t active_mutators_ = 0;
  intptr_t waiting_mutators_ = 0;
  intptr_t max_active_mutators_ = 0;
};

限制 Mutator 数量:防止太多线程竞争 TLAB

📊 Field Table 管理

class IsolateGroup {
 public:
  // 各类字段表
  FieldTable* initial_field_table() const;
  FieldTable* sentinel_field_table() const;
  FieldTable* shared_initial_field_table() const;
  FieldTable* shared_field_table() const;
  
  // 设置字段表
  void set_initial_field_table(std::shared_ptr<FieldTable> field_table);
  void set_shared_field_table(Thread* T, FieldTable* shared_field_table);

 private:
  std::shared_ptr<FieldTable> initial_field_table_;
  std::shared_ptr<FieldTable> sentinel_field_table_;
  std::shared_ptr<FieldTable> shared_initial_field_table_;
  std::shared_ptr<FieldTable> shared_field_table_;
};

📝 静态字段注册

void IsolateGroup::RegisterStaticField(const Field& field,
                                      const Object& initial_value) {
  ASSERT(field.is_static());
  
  if (field.is_shared()) {
    // 共享字段
    RegisterSharedStaticField(field, initial_value);
    return;
  }
  
  // 非共享字段
  const bool need_to_grow = initial_field_table()->Register(field);
  const intptr_t field_id = field.field_id();
  
  if (need_to_grow) {
    sentinel_field_table()->AllocateIndex(field_id);
  }
  
  initial_field_table()->SetAt(field_id, initial_value.ptr());
  sentinel_field_table()->SetAt(field_id, Object::sentinel().ptr());
  
  // 更新所有 Isolate 的字段表
  SafepointReadRwLocker ml(Thread::Current(), isolates_lock_.get());
  if (need_to_grow) {
    GcSafepointOperationScope scope(Thread::Current());
    for (auto isolate : isolates_) {
      auto field_table = isolate->field_table();
      if (field_table->IsReadyToUse()) {
        field_table->Register(field, field_id);
        field_table->SetAt(field_id, initial_value.ptr());
      }
    }
  }
}

📚 类表管理

class IsolateGroup {
 public:
  // 主类表
  ClassTable* class_table() const { return class_table_; }
  
  // GC 遍历使用的类表(重载时可能不同)
  ClassTable* heap_walk_class_table() const { 
    return heap_walk_class_table_; 
  }
  
  // 注册类
  void RegisterClass(const Class& cls);
  
  // 重载时的类表操作
  void CloneClassTableForReload();
  void RestoreOriginalClassTable();
  void DropOriginalClassTable();

 private:
  ClassTable* class_table_;
  ClassTableAllocator class_table_allocator_;
  ClassTable* heap_walk_class_table_;
};

🔥 Hot Reload 支持

class IsolateGroup {
 public:
#if !defined(PRODUCT) && !defined(DART_PRECOMPILED_RUNTIME)
  // 重载源码
  bool ReloadSources(JSONStream* js,
                    bool force_reload,
                    const char* root_script_url = nullptr,
                    const char* packages_url = nullptr);
  
  // 重载 Kernel
  bool ReloadKernel(JSONStream* js,
                   bool force_reload,
                   const uint8_t* kernel_buffer = nullptr,
                   intptr_t kernel_buffer_size = 0);
  
  // 检查是否可以重载
  bool CanReload();
  
  // 是否正在重载
  bool IsReloading() const { return group_reload_context_ != nullptr; }
  
  // 重载上下文
  IsolateGroupReloadContext* reload_context();

 private:
  std::shared_ptr<IsolateGroupReloadContext> group_reload_context_;
  ProgramReloadContext* program_reload_context_ = nullptr;
#endif
};

🐛 调试器集成

class Isolate {
 public:
#if !defined(PRODUCT)
  Debugger* debugger() const { return debugger_; }
  
  // 断点管理
  void set_has_resumption_breakpoints(bool value);
  bool has_resumption_breakpoints() const;
  
  // 恢复请求
  bool ResumeRequest() const;
  void SetResumeRequest();
  bool GetAndClearResumeRequest();
  
  // 暂停状态
  bool IsPaused() const;
  
  // 暂停后服务请求
  bool should_pause_post_service_request() const;
  void set_should_pause_post_service_request(bool value);
  
  // 最后恢复时间戳
  int64_t last_resume_timestamp() const;
#endif
};

🔗 FFI 回调支持

class Isolate {
 public:
  // 创建异步 FFI 回调
  FfiCallbackMetadata::Trampoline CreateAsyncFfiCallback(
      Zone* zone, const Function& send_function, Dart_Port send_port);
  
  // 创建 Isolate 本地 FFI 回调
  FfiCallbackMetadata::Trampoline CreateIsolateLocalFfiCallback(
      Zone* zone, const Function& trampoline, 
      const Closure& target, bool keep_isolate_alive);
  
  // 创建 IsolateGroup 绑定的 FFI 回调
  FfiCallbackMetadata::Trampoline CreateIsolateGroupBoundFfiCallback(
      Zone* zone, const Function& trampoline, const Closure& target);
  
  // 删除 FFI 回调
  void DeleteFfiCallback(FfiCallbackMetadata::Trampoline callback);
  
  // Native Callable 计数管理
  void UpdateNativeCallableKeepIsolateAliveCounter(intptr_t delta);
  bool HasOpenNativeCallables();
};

📥 ReceivePort 机制

class Isolate {
 public:
  // 创建接收端口
  ReceivePortPtr CreateReceivePort(const String& debug_name);
  
  // 设置接收端口保持活跃状态
  void SetReceivePortKeepAliveState(const ReceivePort& receive_port,
                                    bool keep_isolate_alive);
  
  // 关闭接收端口
  void CloseReceivePort(const ReceivePort& receive_port);
  
  // 检查是否有活跃端口
  bool HasLivePorts();
};

Keep Alive 机制:

  • keep_isolate_alive = true:端口阻止 Isolate 退出
  • keep_isolate_alive = false:端口不阻止退出
  • 当所有活跃端口关闭时,Isolate 可以退出

📤 SendPort 机制

// SendPort 是跨 Isolate 通信的发送端
// 对应 VM 中的 Dart_Port(int64_t 类型的端口 ID)

class PortMap {
 public:
  // 发送消息
  static bool PostMessage(std::unique_ptr<Message> message,
                         bool before_events = false);
};

// 序列化消息
static std::unique_ptr<Message> SerializeMessage(
    Dart_Port dest_port, const Instance& obj) {
  return WriteMessage(/* same_group */ false, obj, dest_port,
                     Message::kNormalPriority);
}

// 同组内的消息传递(使用 PersistentHandle)
Message(Dart_Port dest_port, PersistentHandle* handle, Priority priority);

🔀 跨 Isolate 通信

┌──────────────────┐          ┌──────────────────┐
│   Isolate A      │          │   Isolate B      │
│                  │          │                  │
│  ┌────────────┐  │          │  ┌────────────┐  │
│  │ ReceivePort│←─┼──────────┼──│ SendPort   │  │
│  │ (main_port)│  │  Message │  │ (ref)      │  │
│  └────────────┘  │          │  └────────────┘  │
│                  │          │                  │
│  ┌────────────┐  │          │  ┌────────────┐  │
│  │ SendPort   │──┼──────────┼─→│ ReceivePort│  │
│  │ (ref)      │  │  Message │  │            │  │
│  └────────────┘  │          │  └────────────┘  │
│                  │          │                  │
│  MessageQueue    │          │  MessageQueue    │
└──────────────────┘          └──────────────────┘

消息传递步骤:
1. 序列化对象(跨组)或传递句柄(同组)
2. 通过 PortMap 路由到目标端口
3. 入队到目标 MessageQueue
4. 通知目标 Isolate 有新消息
5. 反序列化并处理

📦 消息序列化

class Message {
 public:
  // 消息类型
  bool IsSnapshot() const {
    return !IsRaw() && !IsPersistentHandle() && !IsFinalizerInvocationRequest();
  }
  
  bool IsRaw() const { 
    return snapshot_length_ == 0;  // VM isolate 中的不朽对象
  }
  
  bool IsPersistentHandle() const {
    return snapshot_length_ == kPersistentHandleSnapshotLen;  // 同组传递
  }

 private:
  union Payload {
    uint8_t* snapshot_;          // 序列化的快照
    ObjectPtr raw_obj_;          // 原始对象指针(VM isolate)
    PersistentHandle* persistent_handle_;  // 持久句柄(同组)
  };
  
  Dart_Port dest_port_;
  intptr_t snapshot_length_;
  MessageFinalizableData* finalizable_data_;
  Priority priority_;
};

🚩 Isolate Flags

// Isolate Group 标志位
#define BOOL_ISOLATE_GROUP_FLAG_LIST(V) \
  V(obfuscate, Obfuscate)               \  // 代码混淆
  V(enable_asserts, EnableAsserts)      \  // 启用断言
  V(use_field_guards, UseFieldGuards)   \  // 字段守卫
  V(use_osr, UseOsr)                    \  // 栈上替换
  V(branch_coverage, BranchCoverage)    \  // 分支覆盖
  V(coverage, Coverage)                 \  // 代码覆盖

// Isolate 标志位
#define BOOL_ISOLATE_FLAG_LIST(V)       \
  V(is_system_isolate, IsSystemIsolate) \  // 系统 Isolate
  V(is_service_isolate, IsServiceIsolate) \  // 服务 Isolate
  V(is_kernel_isolate, IsKernelIsolate)   \  // Kernel Isolate

// 标志访问
class IsolateGroup {
  bool obfuscate() const;
  bool enable_asserts() const;
  bool use_osr() const;
};

⚡ 性能优化策略

消息传递优化

  • 同组使用 PersistentHandle
  • VM isolate 对象直接传递
  • 批量消息处理

Safepoint 优化

  • 快速路径使用 CAS
  • 分层 Safepoint
  • 减少全局同步

ThreadPool 优化

  • Worker 线程复用
  • 阻塞时动态扩展
  • 空闲超时回收

💾 内存管理

class IsolateGroup {
 public:
  // 共享堆
  Heap* heap() const { return heap_.get(); }
  
  // 创建堆
  void CreateHeap(bool is_vm_isolate, bool is_service_or_kernel_isolate) {
    Heap::Init(this, is_vm_isolate,
               FLAG_new_gen_semi_max_size * MBInWords,
               (is_service_or_kernel_isolate ? kDefaultMaxOldGenHeapSize
                                             : FLAG_old_gen_heap_size) 
                   * MBInWords);
  }
  
  // Store Buffer(写屏障)
  StoreBuffer* store_buffer() const { return store_buffer_.get(); }
  
  // Marking Stack(GC 标记)
  MarkingStack* old_marking_stack() const;
  MarkingStack* new_marking_stack() const;
  MarkingStack* deferred_marking_stack() const;
};

🎨 设计模式分析

Actor 模式

Isolate = Actor,通过消息通信

对象池模式

ThreadPool 管理可复用 Worker

观察者模式

Exit/Error 监听器机制

命令模式

Task 封装可执行操作

生产者-消费者模式

MessageQueue 作为消息缓冲

📊 UML 类图

┌─────────────┐     1:N    ┌──────────────┐
│IsolateGroup │────────────→│   Isolate    │
├─────────────┤             ├──────────────┤
│heap_        │             │main_port_    │
│class_table_ │             │name_         │
│thread_pool_ │             │mutator_thread│
│object_store_│             │field_table_  │
└──────┬──────┘             └──────┬───────┘
       │ 1                         │ 1
       │                           │
       │ N:1                       │ 1:1
       ▼                           ▼
┌─────────────┐             ┌──────────────┐
│   Thread    │             │MessageHandler│
├─────────────┤             ├──────────────┤
│isolate_     │             │message_queue_│
│isolate_group│             │HandleMessage()│
│task_kind_   │             └──────────────┘
│safepoint_   │
└─────────────┘

✅ 最佳实践

1. 合理使用 Isolate

  • CPU 密集型任务放入独立 Isolate
  • 避免频繁创建/销毁 Isolate
  • 考虑使用 Isolate.run() 简化代码

2. 消息传递优化

  • 尽量传递简单数据类型
  • 大数据考虑分片传递
  • 使用 TransferableTypedData

3. 错误处理

  • 设置 Exit/Error 监听器
  • 捕获并处理 Isolate 异常
  • 使用 Completer 等待结果

⚠️ 常见陷阱

1. 内存泄漏

// ❌ 忘记关闭 ReceivePort
final receivePort = ReceivePort();
Isolate.spawn(entry, receivePort.sendPort);
// 忘记: receivePort.close();

2. 消息过大

// ❌ 传递大对象
isolate.sendPort.send(largeList);  // 复制开销大

// ✅ 使用 Transferable
final transferable = TransferableTypedData.fromList([largeList]);
isolate.sendPort.send(transferable);

3. 死锁

// ❌ 双向等待
// Isolate A 等待 B 的响应,B 等待 A 的响应

📚 扩展阅读

源码位置

  • runtime/vm/isolate.h - Isolate 定义
  • runtime/vm/isolate.cc - Isolate 实现
  • runtime/vm/thread.h - Thread 定义
  • runtime/vm/port.h - Port/PortMap
  • runtime/vm/message.h - Message/MessageQueue
  • runtime/vm/thread_pool.h - ThreadPool

🎯 总结

核心要点

  • Isolate 是 Dart 并发的基本单位,内存隔离
  • IsolateGroup 共享 Heap、ClassTable、ThreadPool
  • Port 是 Isolate 间通信的端点
  • Message 支持快照、原始对象、句柄三种形式
  • Safepoint 用于全局同步(GC/Deopt/Reload)
  • ThreadPool 管理 Worker 线程执行任务

设计哲学

"Share memory by communicating, don't communicate by sharing memory."