深入理解 Dart 并发模型核心实现
2026-03-04 | 技术深度解读
Part 1: 基础概念
Part 2: 核心架构
Part 3: 关键机制
Part 4: 高级特性
Isolate 是 Dart 的并发单元,类似于 Actor 模型中的 Actor。
核心特征:
设计哲学:
"Do not communicate by sharing memory; instead, share memory by communicating."
— Go 语言箴言(同样适用于 Dart)
Dart 是单线程执行模型,所有 Dart 代码在一个线程中运行。
问题:
Isolate 解决方案:
| 特性 | Isolate | Thread (Java/C++) |
|---|---|---|
| 内存模型 | 独立堆内存 | 共享堆内存 |
| 通信方式 | 消息传递 | 共享内存 + 锁 |
| 数据同步 | 无需同步 | 需要锁/Mutex |
| 创建开销 | 较大(~2MB) | 较小(~1MB) |
| 调试难度 | 简单 | 复杂(竞态条件) |
| 错误隔离 | Isolate 崩溃不影响其他 | 可能影响整个进程 |
┌─────────────────────────────────────────────────────┐
│ IsolateGroup │
│ ┌───────────────────────────────────────────────┐ │
│ │ ObjectStore (共享) │ │
│ │ ClassTable (共享) │ │
│ │ Heap (共享) │ │
│ └───────────────────────────────────────────────┘ │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Isolate 1│ │Isolate 2│ │Isolate N│ │
│ │ Thread │ │ Thread │ │ Thread │ │
│ │ Port │ │ Port │ │ Port │ │
│ │ Queue │ │ Queue │ │ Queue │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ ThreadPool │ │
│ │ Worker1 Worker2 Worker3 ... │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
class Isolate : public IntrusiveDListEntry<Isolate> {
public:
// Isolate API 消息类型
enum LibMsgId {
kPauseMsg = 1, kResumeMsg = 2, kPingMsg = 3,
kKillMsg = 4, kAddExitMsg = 5, kDelExitMsg = 6,
kAddErrorMsg = 7, kDelErrorMsg = 8, kErrorFatalMsg = 9,
// 内部消息
kInterruptMsg = 10, kInternalKillMsg = 11,
kDrainServiceExtensionsMsg = 12, kCheckForReload = 13,
};
// 获取当前 Isolate
static inline Isolate* Current() {
Thread* thread = Thread::Current();
return thread == nullptr ? nullptr : thread->isolate();
}
// 核心成员
IsolateGroup* group() const { return isolate_group_; }
Dart_Port main_port() const { return main_port_; }
const char* name() const { return name_; }
Thread* mutator_thread() const { return mutator_thread_; }
};
IsolateGroup 是一组共享资源的 Isolate 集合。
共享资源:
class IsolateGroup : public IntrusiveDListEntry<IsolateGroup> {
// 访问当前 IsolateGroup
static inline IsolateGroup* Current() {
Thread* thread = Thread::Current();
return thread == nullptr ? nullptr : thread->isolate_group();
}
// 核心资源
Heap* heap() const { return heap_.get(); }
ClassTable* class_table() const { return class_table_; }
ObjectStore* object_store() const { return object_store_.get(); }
ThreadPool* thread_pool() { return thread_pool_.get(); }
};
class IsolateGroup {
private:
// 从生成代码访问
ClassTable* class_table_;
AcqRelAtomic<ClassPtr*> cached_class_table_table_;
std::unique_ptr<ObjectStore> object_store_;
// 核心资源
std::unique_ptr<Heap> heap_;
std::unique_ptr<ThreadPool> thread_pool_;
std::unique_ptr<SafepointHandler> safepoint_handler_;
std::unique_ptr<ThreadRegistry> thread_registry_;
// Isolate 管理
std::unique_ptr<SafepointRwLock> isolates_lock_;
IntrusiveDList<Isolate> isolates_;
intptr_t isolate_count_ = 0;
// Mutator 管理
IntrusiveDList<Thread> mutators_;
intptr_t group_mutator_count_ = 0;
// 并发控制
Mutex symbols_mutex_;
Mutex type_canonicalization_mutex_;
std::unique_ptr<SafepointRwLock> program_lock_;
};
class Thread : public ThreadState, public IntrusiveDListEntry<Thread> {
public:
// 当前执行线程
static Thread* Current() {
return static_cast<Thread*>(OSThread::CurrentVMThread());
}
// 关联的 Isolate 和 IsolateGroup
Isolate* isolate() const { return isolate_; }
IsolateGroup* isolate_group() const { return isolate_group_; }
// 进入/退出 Isolate
static void EnterIsolate(Isolate* isolate);
static void ExitIsolate(bool isolate_shutdown = false);
// 堆访问
Heap* heap() const;
// TLAB 内存边界
uword top() const;
uword end() const;
uword true_end() const;
// Safepoint 相关
bool IsAtSafepoint() const;
void EnterSafepoint();
void ExitSafepoint();
};
class Thread {
public:
enum TaskKind {
kUnknownTask = 0, // 未知任务
kMutatorTask, // Mutator 线程(执行 Dart 代码)
kCompilerTask, // 编译任务
kMarkerTask, // GC 标记任务
kSweeperTask, // GC 清扫任务
kCompactorTask, // GC 压缩任务
kScavengerTask, // GC Scavenger 任务
kSampleBlockTask, // 采样块任务
kIncrementalCompactorTask,// 增量压缩任务
kSpawnTask, // Isolate 生成任务
kIsolateGroupBoundCallbackTask, // IsolateGroup 绑定回调
};
TaskKind task_kind() const {
return task_kind_.load(std::memory_order_acquire);
}
bool IsDartMutatorThread() const {
return task_kind_ == kMutatorTask;
}
};
class Thread {
public:
enum ExecutionState {
kThreadInVM = 0, // 在 VM 代码中
kThreadInGenerated, // 在生成代码中(Dart 代码)
kThreadInNative, // 在 Native 代码中
kThreadInBlockedState, // 阻塞状态
kThreadInReloadableBlockedState, // 可重载的阻塞状态
};
ExecutionState execution_state() const {
return static_cast<ExecutionState>(execution_state_);
}
bool IsExecutingDartCode() const;
bool HasExitedDartCode() const;
// 栈限制(用于栈溢出检测)
uword stack_limit_address() const;
void SetStackLimit(uword value);
void ClearStackLimit();
};
Port 是 Isolate 间通信的端点。
SendPort
ReceivePort
// Dart 代码示例
final receivePort = ReceivePort();
final sendPort = receivePort.sendPort;
// 在另一个 Isolate 中
Isolate.spawn(entryPoint, sendPort);
void entryPoint(SendPort port) {
port.send('Hello from isolate!');
}
class PortMap : public AllStatic {
public:
// 为 handler 分配端口
static Dart_Port CreatePort(PortHandler* handler);
// 关闭端口
static bool ClosePort(Dart_Port id, PortHandler** port_handler = nullptr);
// 关闭 handler 的所有端口
static void ClosePorts(MessageHandler* handler);
// 发送消息到端口
static bool PostMessage(std::unique_ptr<Message> message,
bool before_events = false);
// 获取端口来源 ID
static Dart_Port GetOriginId(Dart_Port id);
// 检查端口是否属于当前线程
static bool IsOwnedByCurrentThread(Dart_Port id);
private:
struct Entry : public PortSet<Entry>::Entry {
Entry() : handler(nullptr) {}
Entry(Dart_Port port, PortHandler* handler)
: PortSet<Entry>::Entry(port), handler(handler) {}
PortHandler* handler;
};
static Mutex* mutex_;
static PortSet<Entry>* ports_;
static Random* prng_;
};
class Message {
public:
typedef enum {
kNormalPriority = 0, // 空闲时传递
kOOBPriority = 1, // 尽快传递(带外消息)
} Priority;
// 带外消息类型
typedef enum {
kIllegalOOB = 0,
kServiceOOBMsg = 1, // VM Service 消息
kIsolateLibOOBMsg = 2, // Isolate 库消息
kDelayedIsolateLibOOBMsg = 3, // 延迟的 Isolate 库消息
} OOBMsgTag;
// 构造函数
Message(Dart_Port dest_port, uint8_t* snapshot,
intptr_t snapshot_length, Priority priority);
Message(Dart_Port dest_port, ObjectPtr raw_obj, Priority priority);
Message(Dart_Port dest_port, PersistentHandle* handle, Priority priority);
// 消息类型判断
bool IsOOB() const { return priority_ == kOOBPriority; }
bool IsSnapshot() const;
bool IsRaw() const { return snapshot_length_ == 0; }
bool IsPersistentHandle() const;
};
class MessageQueue {
public:
MessageQueue();
// 入队
void Enqueue(std::unique_ptr<Message> msg, bool before_events);
// 出队(非阻塞)
std::unique_ptr<Message> Dequeue();
bool IsEmpty() { return head_ == nullptr; }
// 清空所有消息
void Clear();
// 迭代器
class Iterator {
public:
explicit Iterator(const MessageQueue* queue);
bool HasNext();
Message* Next();
private:
Message* next_;
};
intptr_t Length() const;
Message* FindMessageById(intptr_t id);
void PrintJSON(JSONStream* stream);
private:
Message* head_;
Message* tail_;
};
┌────────────────────────────────────────────┐
│ ThreadPool │
│ ┌──────────────────────────────────────┐ │
│ │ pool_mutex_ (保护所有状态) │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Running │ │ Idle │ │ │
│ │ │ Workers │ │ Workers │ │ │
│ │ │ ──────── │ │ ──────── │ │ │
│ │ │ Worker1 │ │ Worker5 │ │ │
│ │ │ Worker2 │ │ Worker6 │ │ │
│ │ │ Worker3 │ │ ... │ │ │
│ │ │ Worker4 │ │ │ │ │
│ │ └──────────┘ └──────────┘ │ │
│ │ │ │
│ │ ┌────────────────────────────────┐ │ │
│ │ │ Pending Tasks │ │ │
│ │ │ Task1 → Task2 → Task3 → ... │ │ │
│ │ └────────────────────────────────┘ │ │
│ └──────────────────────────────────────┘ │
└────────────────────────────────────────────┘
class ThreadPool {
public:
// Task 基类
class Task : public IntrusiveDListEntry<Task> {
public:
virtual ~Task() {}
// 子类实现具体任务逻辑
virtual void Run() = 0;
};
// 运行任务
template <typename T, typename... Args>
bool Run(Args&&... args) {
return RunImpl(std::unique_ptr<Task>(
new T(std::forward<Args>(args)...)));
}
// 标记线程阻塞/解除阻塞
void MarkCurrentWorkerAsBlocked();
void MarkCurrentWorkerAsUnBlocked();
// 关闭线程池
void Shutdown();
protected:
// 子类可重写:进入空闲时的回调
virtual void OnEnterIdleLocked(MutexLocker* ml, Worker* worker) {}
private:
bool RunImpl(std::unique_ptr<Task> task);
void WorkerLoop(Worker* worker);
};
class ThreadPool {
protected:
class Worker : public IntrusiveDListEntry<Worker> {
public:
explicit Worker(ThreadPool* pool);
// 启动工作线程
void StartThread();
// 休眠(等待任务或超时)
ConditionVariable::WaitResult Sleep(int64_t timeout_micros) {
return wakeup_cv_.WaitMicros(&pool_->pool_mutex_, timeout_micros);
}
private:
// 工作线程入口点
static void Main(uword args);
ThreadPool* pool_;
ThreadJoinId join_id_;
OSThread* os_thread_ = nullptr;
bool is_blocked_ = false;
ConditionVariable wakeup_cv_;
};
private:
mutable Mutex pool_mutex_;
bool shutting_down_ = false;
uint64_t count_running_ = 0;
uint64_t count_idle_ = 0;
WorkerList running_workers_;
WorkerList idle_workers_;
TaskList tasks_;
};
Safepoint 是一个全局同步点,所有线程在此处停止。
为什么需要 Safepoint?
class Thread {
public:
bool IsAtSafepoint() const {
return IsAtSafepoint(SafepointLevel::kGC);
}
void EnterSafepoint();
void ExitSafepoint();
void BlockForSafepoint();
bool IsSafepointRequested() const;
};
enum SafepointLevel {
// 安全进行 GC
kGC,
// 安全进行 GC 和去优化
kGCAndDeopt,
// 安全进行 GC、去优化和热重载
kGCAndDeoptAndReload,
// 层级数量
kNumLevels,
// 无 Safepoint
kNoSafepoint,
};
层级关系:
// 拥有更高层级 safepoint 意味着同时拥有低层级
bool OwnsGCSafepoint() const;
bool OwnsDeoptSafepoint() const;
bool OwnsReloadSafepoint() const;
class Thread {
public:
// 位字段编码的 safepoint 状态
using SafepointRequestedField = BitField<uword, bool, 0>;
using DeoptSafepointRequestedField = BitField<uword, bool, 1>;
using ReloadSafepointRequestedField = BitField<uword, bool, 2>;
using AtSafepointField = BitField<uword, bool, 3>;
using BlockedForSafepointField = BitField<uword, bool, 4>;
// 尝试进入/退出 safepoint(快速路径)
bool TryEnterSafepoint() {
uword old_state = 0;
uword new_state = AtSafepointBits(current_safepoint_level());
return safepoint_state_.compare_exchange_strong(
old_state, new_state, std::memory_order_release);
}
bool TryExitSafepoint() {
uword old_state = AtSafepointBits(current_safepoint_level());
uword new_state = 0;
return safepoint_state_.compare_exchange_strong(
old_state, new_state, std::memory_order_acquire);
}
// 慢速路径(需要锁)
void EnterSafepointUsingLock();
void ExitSafepointUsingLock();
};
// 1. 创建 IsolateGroup
IsolateGroup::IsolateGroup(
std::shared_ptr<IsolateGroupSource> source,
void* embedder_data,
Dart_IsolateFlags api_flags,
bool is_vm_isolate)
: class_table_(nullptr),
object_store_(new ObjectStore()),
thread_pool_(new MutatorThreadPool(this, max_worker_threads)),
isolates_lock_(new SafepointRwLock()),
thread_registry_(new ThreadRegistry()),
safepoint_handler_(new SafepointHandler(this)),
heap_(nullptr),
...
// 2. 创建 Heap
void IsolateGroup::CreateHeap(bool is_vm_isolate,
bool is_service_or_kernel_isolate) {
Heap::Init(this, is_vm_isolate,
FLAG_new_gen_semi_max_size * MBInWords,
FLAG_old_gen_heap_size * MBInWords);
}
// 3. 注册 IsolateGroup
IsolateGroup::RegisterIsolateGroup(this);
// 4. 创建第一个 Isolate
Isolate* Isolate::InitIsolate("main", this, flags);
// 注册 Isolate 到 IsolateGroup
void IsolateGroup::RegisterIsolate(Isolate* isolate) {
SafepointWriteRwLocker ml(Thread::Current(), isolates_lock_.get());
if (isolates_.IsEmpty()) {
interrupt_port_ = isolate->main_port();
}
isolates_.Append(isolate);
isolate_count_++;
}
// 从 IsolateGroup 注销 Isolate
void IsolateGroup::UnregisterIsolate(Isolate* isolate) {
SafepointWriteRwLocker ml(Thread::Current(), isolates_lock_.get());
isolates_.Remove(isolate);
if (isolates_.IsEmpty()) {
interrupt_port_ = ILLEGAL_PORT;
} else {
interrupt_port_ = isolates_.First()->main_port();
}
}
// 注销并减少计数
bool IsolateGroup::UnregisterIsolateDecrementCount() {
SafepointWriteRwLocker ml(Thread::Current(), isolates_lock_.get());
isolate_count_--;
return isolate_count_ == 0;
}
class Isolate {
public:
// 使 Isolate 可运行
const char* MakeRunnable();
void MakeRunnableLocked();
// 运行 Isolate
void Run();
// 检查是否可运行
bool is_runnable() const {
return isolate_flags_.Read<IsRunnableBit>();
}
void set_is_runnable(bool value) {
isolate_flags_.UpdateBool<IsRunnableBit>(value);
}
// 消息处理器
MessageHandler* message_handler() const;
// 检查是否有待处理消息
bool HasPendingMessages();
// 调度中断
void ScheduleInterrupts(uword interrupt_bits);
};
┌─────────────────────────────────────────────────────┐
│ 消息处理流程 │
│ │
│ SendPort.send() │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │序列化 │ → Message (snapshot/raw_obj/handle) │
│ └─────────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │PortMap │ → PostMessage(dest_port) │
│ │.PostMsg │ │
│ └─────────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │Message │ → Enqueue(message) │
│ │Queue │ │
│ └─────────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │Message │ → HandleMessage(message) │
│ │Handler │ │
│ └─────────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │反序列化 │ → 执行 Dart 代码 │
│ └─────────┘ │
└─────────────────────────────────────────────────────┘
class IsolateMessageHandler : public MessageHandler {
public:
explicit IsolateMessageHandler(Isolate* isolate);
const char* name() const override { return isolate_->name(); }
MessageStatus HandleMessage(std::unique_ptr<Message> message) override;
// 处理 Isolate 库消息
ErrorPtr HandleLibMessage(const Array& message);
// 保持活跃检查
bool KeepAliveLocked() override {
return isolate_->HasLivePorts() ||
isolate_->HasOpenNativeCallables();
}
private:
Isolate* isolate_;
};
// 消息状态
enum MessageStatus {
OK, // 正常处理
Error, // 发生错误
Shutdown, // 需要关闭
};
ErrorPtr IsolateMessageHandler::HandleLibMessage(const Array& message) {
const intptr_t msg_type = Smi::Cast(message.At(1)).Value();
switch (msg_type) {
case Isolate::kPauseMsg:
// [OOB, kPauseMsg, pause_capability, resume_capability]
if (I->AddResumeCapability(Capability::Cast(obj))) {
increment_paused();
}
break;
case Isolate::kResumeMsg:
if (I->RemoveResumeCapability(Capability::Cast(obj))) {
decrement_paused();
}
break;
case Isolate::kKillMsg:
case Isolate::kInternalKillMsg:
if (priority == Isolate::kImmediateAction) {
Thread::Current()->StartUnwindError();
return UnwindError::New(msg);
}
break;
case Isolate::kPingMsg:
// [OOB, kPingMsg, responsePort, priority, response]
PortMap::PostMessage(SerializeMessage(send_port.Id(), response));
break;
}
}
┌─────────┐
│ Created │
└────┬────┘
│ RegisterIsolate()
▼
┌─────────┐
┌────────│Runnable │←───────┐
│ └────┬────┘ │
│ │ Run() │ Resume
│ ▼ │
│ ┌─────────┐ │
│ │ Running │────────┤
│ └────┬────┘ │
│ │ Pause │
│ ▼ │
│ ┌─────────┐────────┘
│ │ Paused │
│ └────┬────┘
│ │ Kill/Exit
│ ▼
│ ┌─────────┐
└───────→│Shutdown │
└────┬────┘
│ UnregisterIsolate()
▼
┌─────────┐
│ Dead │
└─────────┘
void IsolateGroup::Shutdown() {
// 1. 关闭线程池
if (!is_vm_isolate_) {
thread_pool_->Shutdown();
thread_pool_.reset();
}
// 2. 增加待关闭计数
{
MonitorLocker ml(Isolate::isolate_creation_monitor_);
Isolate::pending_shutdowns_++;
}
// 3. 注销 IsolateGroup
UnregisterIsolateGroup(this);
// 4. 等待 GC 任务完成
if (heap_ != nullptr) {
PageSpace* old_space = heap_->old_space();
MonitorLocker ml(old_space->tasks_lock());
while (old_space->tasks() > 0) {
ml.Wait();
}
old_space->AbandonMarkingForShutdown();
}
// 5. 调用清理回调
auto group_shutdown_callback = Isolate::GroupCleanupCallback();
if (group_shutdown_callback != nullptr) {
group_shutdown_callback(embedder_data());
}
// 6. 删除 IsolateGroup
delete this;
}
class IdleTimeHandler : public ValueObject {
public:
// 使用堆初始化
void InitializeWithHeap(Heap* heap);
// 更新空闲开始时间
void UpdateStartIdleTime();
// 检查是否应该通知空闲
bool ShouldNotifyIdle(int64_t* expiry);
// 通知堆空闲(适合做压缩等操作)
void NotifyIdle(int64_t deadline);
private:
Mutex mutex_;
Heap* heap_ = nullptr;
intptr_t disabled_counter_ = 0;
int64_t idle_start_time_ = 0;
};
// 禁用空闲计时器的作用域
class DisableIdleTimerScope : public ValueObject {
public:
explicit DisableIdleTimerScope(IdleTimeHandler* handler);
~DisableIdleTimerScope();
};
class IsolateGroup {
public:
// 增加 Mutator 计数
void IncreaseMutatorCount(Thread* thread,
bool is_nested_reenter,
bool was_stolen);
// 减少 Mutator 计数
void DecreaseMutatorCount(bool is_nested_exit);
// 获取当前 Mutator 数量
intptr_t MutatorCount() const { return active_mutators_; }
// 注册/注销 Mutator
void RegisterIsolateGroupMutator(Thread* mutator);
void UnregisterIsolateGroupMutator(Thread* mutator);
private:
std::unique_ptr<Monitor> active_mutators_monitor_;
intptr_t active_mutators_ = 0;
intptr_t waiting_mutators_ = 0;
intptr_t max_active_mutators_ = 0;
};
限制 Mutator 数量:防止太多线程竞争 TLAB
class IsolateGroup {
public:
// 各类字段表
FieldTable* initial_field_table() const;
FieldTable* sentinel_field_table() const;
FieldTable* shared_initial_field_table() const;
FieldTable* shared_field_table() const;
// 设置字段表
void set_initial_field_table(std::shared_ptr<FieldTable> field_table);
void set_shared_field_table(Thread* T, FieldTable* shared_field_table);
private:
std::shared_ptr<FieldTable> initial_field_table_;
std::shared_ptr<FieldTable> sentinel_field_table_;
std::shared_ptr<FieldTable> shared_initial_field_table_;
std::shared_ptr<FieldTable> shared_field_table_;
};
void IsolateGroup::RegisterStaticField(const Field& field,
const Object& initial_value) {
ASSERT(field.is_static());
if (field.is_shared()) {
// 共享字段
RegisterSharedStaticField(field, initial_value);
return;
}
// 非共享字段
const bool need_to_grow = initial_field_table()->Register(field);
const intptr_t field_id = field.field_id();
if (need_to_grow) {
sentinel_field_table()->AllocateIndex(field_id);
}
initial_field_table()->SetAt(field_id, initial_value.ptr());
sentinel_field_table()->SetAt(field_id, Object::sentinel().ptr());
// 更新所有 Isolate 的字段表
SafepointReadRwLocker ml(Thread::Current(), isolates_lock_.get());
if (need_to_grow) {
GcSafepointOperationScope scope(Thread::Current());
for (auto isolate : isolates_) {
auto field_table = isolate->field_table();
if (field_table->IsReadyToUse()) {
field_table->Register(field, field_id);
field_table->SetAt(field_id, initial_value.ptr());
}
}
}
}
class IsolateGroup {
public:
// 主类表
ClassTable* class_table() const { return class_table_; }
// GC 遍历使用的类表(重载时可能不同)
ClassTable* heap_walk_class_table() const {
return heap_walk_class_table_;
}
// 注册类
void RegisterClass(const Class& cls);
// 重载时的类表操作
void CloneClassTableForReload();
void RestoreOriginalClassTable();
void DropOriginalClassTable();
private:
ClassTable* class_table_;
ClassTableAllocator class_table_allocator_;
ClassTable* heap_walk_class_table_;
};
class IsolateGroup {
public:
#if !defined(PRODUCT) && !defined(DART_PRECOMPILED_RUNTIME)
// 重载源码
bool ReloadSources(JSONStream* js,
bool force_reload,
const char* root_script_url = nullptr,
const char* packages_url = nullptr);
// 重载 Kernel
bool ReloadKernel(JSONStream* js,
bool force_reload,
const uint8_t* kernel_buffer = nullptr,
intptr_t kernel_buffer_size = 0);
// 检查是否可以重载
bool CanReload();
// 是否正在重载
bool IsReloading() const { return group_reload_context_ != nullptr; }
// 重载上下文
IsolateGroupReloadContext* reload_context();
private:
std::shared_ptr<IsolateGroupReloadContext> group_reload_context_;
ProgramReloadContext* program_reload_context_ = nullptr;
#endif
};
class Isolate {
public:
#if !defined(PRODUCT)
Debugger* debugger() const { return debugger_; }
// 断点管理
void set_has_resumption_breakpoints(bool value);
bool has_resumption_breakpoints() const;
// 恢复请求
bool ResumeRequest() const;
void SetResumeRequest();
bool GetAndClearResumeRequest();
// 暂停状态
bool IsPaused() const;
// 暂停后服务请求
bool should_pause_post_service_request() const;
void set_should_pause_post_service_request(bool value);
// 最后恢复时间戳
int64_t last_resume_timestamp() const;
#endif
};
class Isolate {
public:
// 创建异步 FFI 回调
FfiCallbackMetadata::Trampoline CreateAsyncFfiCallback(
Zone* zone, const Function& send_function, Dart_Port send_port);
// 创建 Isolate 本地 FFI 回调
FfiCallbackMetadata::Trampoline CreateIsolateLocalFfiCallback(
Zone* zone, const Function& trampoline,
const Closure& target, bool keep_isolate_alive);
// 创建 IsolateGroup 绑定的 FFI 回调
FfiCallbackMetadata::Trampoline CreateIsolateGroupBoundFfiCallback(
Zone* zone, const Function& trampoline, const Closure& target);
// 删除 FFI 回调
void DeleteFfiCallback(FfiCallbackMetadata::Trampoline callback);
// Native Callable 计数管理
void UpdateNativeCallableKeepIsolateAliveCounter(intptr_t delta);
bool HasOpenNativeCallables();
};
class Isolate {
public:
// 创建接收端口
ReceivePortPtr CreateReceivePort(const String& debug_name);
// 设置接收端口保持活跃状态
void SetReceivePortKeepAliveState(const ReceivePort& receive_port,
bool keep_isolate_alive);
// 关闭接收端口
void CloseReceivePort(const ReceivePort& receive_port);
// 检查是否有活跃端口
bool HasLivePorts();
};
Keep Alive 机制:
keep_isolate_alive = true:端口阻止 Isolate 退出keep_isolate_alive = false:端口不阻止退出// SendPort 是跨 Isolate 通信的发送端
// 对应 VM 中的 Dart_Port(int64_t 类型的端口 ID)
class PortMap {
public:
// 发送消息
static bool PostMessage(std::unique_ptr<Message> message,
bool before_events = false);
};
// 序列化消息
static std::unique_ptr<Message> SerializeMessage(
Dart_Port dest_port, const Instance& obj) {
return WriteMessage(/* same_group */ false, obj, dest_port,
Message::kNormalPriority);
}
// 同组内的消息传递(使用 PersistentHandle)
Message(Dart_Port dest_port, PersistentHandle* handle, Priority priority);
┌──────────────────┐ ┌──────────────────┐
│ Isolate A │ │ Isolate B │
│ │ │ │
│ ┌────────────┐ │ │ ┌────────────┐ │
│ │ ReceivePort│←─┼──────────┼──│ SendPort │ │
│ │ (main_port)│ │ Message │ │ (ref) │ │
│ └────────────┘ │ │ └────────────┘ │
│ │ │ │
│ ┌────────────┐ │ │ ┌────────────┐ │
│ │ SendPort │──┼──────────┼─→│ ReceivePort│ │
│ │ (ref) │ │ Message │ │ │ │
│ └────────────┘ │ │ └────────────┘ │
│ │ │ │
│ MessageQueue │ │ MessageQueue │
└──────────────────┘ └──────────────────┘
消息传递步骤:
1. 序列化对象(跨组)或传递句柄(同组)
2. 通过 PortMap 路由到目标端口
3. 入队到目标 MessageQueue
4. 通知目标 Isolate 有新消息
5. 反序列化并处理
class Message {
public:
// 消息类型
bool IsSnapshot() const {
return !IsRaw() && !IsPersistentHandle() && !IsFinalizerInvocationRequest();
}
bool IsRaw() const {
return snapshot_length_ == 0; // VM isolate 中的不朽对象
}
bool IsPersistentHandle() const {
return snapshot_length_ == kPersistentHandleSnapshotLen; // 同组传递
}
private:
union Payload {
uint8_t* snapshot_; // 序列化的快照
ObjectPtr raw_obj_; // 原始对象指针(VM isolate)
PersistentHandle* persistent_handle_; // 持久句柄(同组)
};
Dart_Port dest_port_;
intptr_t snapshot_length_;
MessageFinalizableData* finalizable_data_;
Priority priority_;
};
// Isolate Group 标志位
#define BOOL_ISOLATE_GROUP_FLAG_LIST(V) \
V(obfuscate, Obfuscate) \ // 代码混淆
V(enable_asserts, EnableAsserts) \ // 启用断言
V(use_field_guards, UseFieldGuards) \ // 字段守卫
V(use_osr, UseOsr) \ // 栈上替换
V(branch_coverage, BranchCoverage) \ // 分支覆盖
V(coverage, Coverage) \ // 代码覆盖
// Isolate 标志位
#define BOOL_ISOLATE_FLAG_LIST(V) \
V(is_system_isolate, IsSystemIsolate) \ // 系统 Isolate
V(is_service_isolate, IsServiceIsolate) \ // 服务 Isolate
V(is_kernel_isolate, IsKernelIsolate) \ // Kernel Isolate
// 标志访问
class IsolateGroup {
bool obfuscate() const;
bool enable_asserts() const;
bool use_osr() const;
};
消息传递优化
Safepoint 优化
ThreadPool 优化
class IsolateGroup {
public:
// 共享堆
Heap* heap() const { return heap_.get(); }
// 创建堆
void CreateHeap(bool is_vm_isolate, bool is_service_or_kernel_isolate) {
Heap::Init(this, is_vm_isolate,
FLAG_new_gen_semi_max_size * MBInWords,
(is_service_or_kernel_isolate ? kDefaultMaxOldGenHeapSize
: FLAG_old_gen_heap_size)
* MBInWords);
}
// Store Buffer(写屏障)
StoreBuffer* store_buffer() const { return store_buffer_.get(); }
// Marking Stack(GC 标记)
MarkingStack* old_marking_stack() const;
MarkingStack* new_marking_stack() const;
MarkingStack* deferred_marking_stack() const;
};
Actor 模式
Isolate = Actor,通过消息通信
对象池模式
ThreadPool 管理可复用 Worker
观察者模式
Exit/Error 监听器机制
命令模式
Task 封装可执行操作
生产者-消费者模式
MessageQueue 作为消息缓冲
┌─────────────┐ 1:N ┌──────────────┐
│IsolateGroup │────────────→│ Isolate │
├─────────────┤ ├──────────────┤
│heap_ │ │main_port_ │
│class_table_ │ │name_ │
│thread_pool_ │ │mutator_thread│
│object_store_│ │field_table_ │
└──────┬──────┘ └──────┬───────┘
│ 1 │ 1
│ │
│ N:1 │ 1:1
▼ ▼
┌─────────────┐ ┌──────────────┐
│ Thread │ │MessageHandler│
├─────────────┤ ├──────────────┤
│isolate_ │ │message_queue_│
│isolate_group│ │HandleMessage()│
│task_kind_ │ └──────────────┘
│safepoint_ │
└─────────────┘
1. 合理使用 Isolate
2. 消息传递优化
3. 错误处理
1. 内存泄漏
// ❌ 忘记关闭 ReceivePort
final receivePort = ReceivePort();
Isolate.spawn(entry, receivePort.sendPort);
// 忘记: receivePort.close();
2. 消息过大
// ❌ 传递大对象
isolate.sendPort.send(largeList); // 复制开销大
// ✅ 使用 Transferable
final transferable = TransferableTypedData.fromList([largeList]);
isolate.sendPort.send(transferable);
3. 死锁
// ❌ 双向等待
// Isolate A 等待 B 的响应,B 等待 A 的响应
源码位置
runtime/vm/isolate.h - Isolate 定义runtime/vm/isolate.cc - Isolate 实现runtime/vm/thread.h - Thread 定义runtime/vm/port.h - Port/PortMapruntime/vm/message.h - Message/MessageQueueruntime/vm/thread_pool.h - ThreadPool核心要点
设计哲学
"Share memory by communicating, don't communicate by sharing memory."