Flutter之isolate的使用及通信原理

11,291 阅读17分钟

由于Dart是一种单线程模型语言,所以可以避免多线程环境下产生的一系列降低运行效率问题。但单线程模型却有一个非常严重的缺陷,那就是执行计算密集型任务时会阻塞当前任务的执行,从而产生不好的影响(如UI的卡顿等),这时候就需要提供一个新的线程或类似线程的东西来异步执行计算密集型任务。由于Dart 无法创建线程,所以就提供了Isolate来异步执行计算密集型任务。

在刚开始学习Isolate时,以为它就是一个类似线程一样的东西。但随着学习的深入,发现Isolate远比线程复杂,甚至都可以看作一个进程。也由于isolate比较复杂,所以本文仅分析isolate的使用、创建及通信。

1、Isolate的使用

Dart中,Isolate的使用及通信都较为复杂,主要是通过 Isolate.spawnIsolate.spawnUri来创建IsolateReceivePort来进行Isolate间通信。下面就来看如何使用Isolate

1.1、Isolate单向通信

先来看Isolate间的单向通信,代码如下。

//在父Isolate中调用
Isolate isolate;
start() async {
  ReceivePort receivePort = ReceivePort();
  //创建子Isolate对象
  isolate = await Isolate.spawn(getMsg, receivePort.sendPort);
  //监听子Isolate的返回数据
  receivePort.listen((data) {
    print('data:$data');
    receivePort.close();
    //关闭Isolate对象
    isolate?.kill(priority: Isolate.immediate);
    isolate = null;
  });
}
//子Isolate对象的入口函数,可以在该函数中做耗时操作
getMsg(sendPort) => sendPort.send("hello");

运行代码后,就会输出新创建Isolate对象返回的数据,如下。

1.2、Isolate双向通信

再来看多个Isolate之间的通信实现,代码如下。


//当前函数在父Isolate中
Future<dynamic> asyncFactoriali(n) async {
  //父Isolate对应的ReceivePort对象
  final response = ReceivePort();
  //创建一个子Isolate对象
  await Isolate.spawn(_isolate, response.sendPort);
  final sendPort = await response.first as SendPort;
  final answer = ReceivePort();
  //给子Isolate发送数据
  sendPort.send([n, answer.sendPort]);
  return answer.first;
}

//子Isolate的入口函数,可以在该函数中做耗时操作
//_isolate必须是顶级函数(不能存在任何类中)或者是静态函数(可以存在类中)
_isolate(SendPort initialReplyTo) async {
  //子Isolate对应的ReceivePort对象
  final port = ReceivePort();
  initialReplyTo.send(port.sendPort);
  final message = await port.first as List;
  final data = message[0] as int;
  final send = message[1] as SendPort;
  //给父Isolate的返回数据
  send.send(syncFactorial(data));
}

//运行代码
start() async {
  print("计算结果:${await asyncFactoriali(4)}");
}
start();

通过在新创建的Isolate中计算并返回数据后,得到如下返回结果。

通过上面代码,我们就可以能够通过Isolate来执行异步任务。下面再来看其具体实现原理。

2、isolate的创建与运行

先从下面的时序图来看isolate是如何创建、初始化及运行的。

时序图还是比较复杂的,下面就就来对上图进行详细介绍。

2.1、isolate的创建

首先来看isolate的创建,在上面例子中是通过Isolate.spawn来创建Isolate对象。

class Isolate {
  //声明外部实现
  external static Future<Isolate> spawn<T>(
      void entryPoint(T message), T message,
      {bool paused: false,
      bool errorsAreFatal,
      SendPort onExit,
      SendPort onError,
      @Since("2.3") String debugName});
}

这里的external关键字主要是声明spawn这个函数,具体实现由外部提供。在Dart中,该函数的具体实现是在isolate_patch.dart中。先来看spawn的具体实现。

@patch
class Isolate {
  @patch
  static Future<Isolate> spawn<T>(void entryPoint(T message), T message,
      {bool paused: false,
      bool errorsAreFatal,
      SendPort onExit,
      SendPort onError,
      String debugName}) async {
    // `paused` isn't handled yet.
    RawReceivePort readyPort;
    try {
      //该函数执行是异步的
      _spawnFunction(
          readyPort.sendPort,
          script.toString(),
          entryPoint,
          message,
          paused,
          errorsAreFatal,
          onExit,
          onError,
          null,
          packageConfig,
          debugName);
      return await _spawnCommon(readyPort);
    } catch (e, st) {
      ...
    }
  }

  static Future<Isolate> _spawnCommon(RawReceivePort readyPort) {
    Completer completer = new Completer<Isolate>.sync();
    //监听Isolate是否创建完毕,当子Isolate创建完毕后会通知父Isolate
    readyPort.handler = (readyMessage) {
      //关闭端口
      readyPort.close();
      if (readyMessage is List && readyMessage.length == 2) {//子Isolate创建成功
        SendPort controlPort = readyMessage[0];
        List capabilities = readyMessage[1];
        completer.complete(new Isolate(controlPort,
            pauseCapability: capabilities[0],
            terminateCapability: capabilities[1]));
      } else if (readyMessage is String) {...} else {...}
    };
    return completer.future;
  }
  ......
  //调用虚拟机中的Isolate_spawnFunction函数
  static void _spawnFunction(
      SendPort readyPort,
      String uri,
      Function topLevelFunction,
      var message,
      bool paused,
      bool errorsAreFatal,
      SendPort onExit,
      SendPort onError,
      String packageRoot,
      String packageConfig,
      String debugName) native "Isolate_spawnFunction";

  ......
}

这里的_spawnFunction调用的是Dart VM中的Isolate_spawnFunction函数,该函数就是把Isolate对象的创建交给线程池执行,所以Isolate对象的创建是异步的。这里的线程池是在Dart VM初始化的时候创建的

[->third_party/dart/runtime/lib/isolate.cc]

DEFINE_NATIVE_ENTRY(Isolate_spawnFunction, 0, 11) {
  ...
  if (closure.IsClosure()) {
    ...
      //异步执行,thread_pool是一个线程池,该线程池是在Dart VM创建时创建的
      Dart::thread_pool()->Run<SpawnIsolateTask>(isolate, std::move(state));
      return Object::null();
    }
  }
  ...
  return Object::null();
}

SpawnIsolateTask是一个类似Java中实现了Runable接口的类,在该类中主要是进行子Isolate对象的创建及运行,来看其具体实现。

[->third_party/dart/runtime/lib/isolate.cc]

//在子线程中执行
class SpawnIsolateTask : public ThreadPool::Task {
  void Run() override {
  
    auto group = state_->isolate_group();

    // create_group_callback是在Dart VM创建时初始化的。
    Dart_IsolateGroupCreateCallback create_group_callback =
        Isolate::CreateGroupCallback();
    ...

    // OnIsolateInitialize是在Dart VM初始化时设置的
    Dart_InitializeIsolateCallback initialize_callback =
        Isolate::InitializeCallback();
    ...
    char* error = nullptr;
    Isolate* isolate = nullptr;
    //group及initialize_callback都是在虚拟机初始化的时候设置的
    if (!FLAG_enable_isolate_groups || group == nullptr ||
        initialize_callback == nullptr) {
      Dart_IsolateFlags api_flags = *(state_->isolate_flags());
      //创建一个新的isolate
      isolate = reinterpret_cast<Isolate*>((create_group_callback)(
          state_->script_url(), name, nullptr, state_->package_config(),
          &api_flags, parent_isolate_->init_callback_data(), &error));
      parent_isolate_->DecrementSpawnCount();
      parent_isolate_ = nullptr;
    } else {
      ...
    }
    ...
    // isolate是否是可运行的
    // 是在OnIsolateInitialize中设置的
    if (isolate->is_runnable()) {
      //运行isolate
      //[见2.3小节]
      isolate->Run();
    }
  }
  
};

默认情况下,FLAG_enable_isolate_groups为false,groupinitialize_callback都不为null。所以新的isolate是通过create_group_callback来创建的。

通过Flutter之Dart虚拟机启动一文,可以看到在Dart VM初始化的最后,给Isolate赋予了对应的create_group_callbackinitialize_callback

[->third_party/dart/runtime/vm/dart.cc]

char* Dart::Init(const uint8_t* vm_isolate_snapshot,
                 const uint8_t* instructions_snapshot,
                 Dart_IsolateGroupCreateCallback create_group,
                 Dart_InitializeIsolateCallback initialize_isolate,
                 Dart_IsolateShutdownCallback shutdown,
                 Dart_IsolateCleanupCallback cleanup,
                 Dart_IsolateGroupCleanupCallback cleanup_group,
                 Dart_ThreadExitCallback thread_exit,
                 Dart_FileOpenCallback file_open,
                 Dart_FileReadCallback file_read,
                 Dart_FileWriteCallback file_write,
                 Dart_FileCloseCallback file_close,
                 Dart_EntropySource entropy_source,
                 Dart_GetVMServiceAssetsArchive get_service_assets,
                 bool start_kernel_isolate,
                 Dart_CodeObserver* observer) {
  ...
  Isolate::SetCreateGroupCallback(create_group);
  Isolate::SetInitializeCallback_(initialize_isolate);
  ...
}

这里来看create_group_callback对应的函数实现,它是在调用init函数时传递过来的。根据调用链来看,在DartVM对象初始化时,会将调用init函数。

[->flutter/runtime/dart_vm.cc]

DartVM::DartVM(std::shared_ptr<const DartVMData> vm_data,
               std::shared_ptr<IsolateNameServer> isolate_name_server)
    : settings_(vm_data->GetSettings()),
      concurrent_message_loop_(fml::ConcurrentMessageLoop::Create()),
      skia_concurrent_executor_(
          [runner = concurrent_message_loop_->GetTaskRunner()](
              fml::closure work) { runner->PostTask(work); }),
      vm_data_(vm_data),
      isolate_name_server_(std::move(isolate_name_server)),
      service_protocol_(std::make_shared<ServiceProtocol>()) {
  ...
  {
    Dart_InitializeParams params = {};
    ...
    //create_group_callback对应函数实现
    params.create_group = reinterpret_cast<decltype(params.create_group)>(
        DartIsolate::DartIsolateGroupCreateCallback);
    //initialize_isolate对应函数实现
    params.initialize_isolate =
        reinterpret_cast<decltype(params.initialize_isolate)>(
            DartIsolate::DartIsolateInitializeCallback);
    ...
    char* init_error = Dart_Initialize(&params);
    ...
  }
  ...
}

下面再来看DartIsolateGroupCreateCallback的实现。

[->flutter/runtime/dart_isolate.cc]

Dart_Isolate DartIsolate::DartIsolateGroupCreateCallback(
    const char* advisory_script_uri,
    const char* advisory_script_entrypoint,
    const char* package_root,
    const char* package_config,
    Dart_IsolateFlags* flags,
    std::shared_ptr<DartIsolate>* parent_isolate_data,
    char** error) {
  ... 
  //创建DartIsolate对象,此时DartIsolate对象处于Uninitialized状态
  auto isolate_data = std::make_unique<std::shared_ptr<DartIsolate>>(
      std::shared_ptr<DartIsolate>(new DartIsolate(
          (*isolate_group_data)->GetSettings(),  // settings
          null_task_runners,                     // task_runners
          fml::WeakPtr<SnapshotDelegate>{},      // snapshot_delegate
          fml::WeakPtr<IOManager>{},             // io_manager
          fml::RefPtr<SkiaUnrefQueue>{},         // unref_queue
          fml::WeakPtr<ImageDecoder>{},          // image_decoder
          advisory_script_uri,                   // advisory_script_uri
          advisory_script_entrypoint,            // advisory_script_entrypoint
          false))); 
  //创建isolate对象
  Dart_Isolate vm_isolate = CreateDartIsolateGroup(
      std::move(isolate_group_data), std::move(isolate_data), flags, error);

  ...

  return vm_isolate;
}


Dart_Isolate DartIsolate::CreateDartIsolateGroup(
    std::unique_ptr<std::shared_ptr<DartIsolateGroupData>> isolate_group_data,
    std::unique_ptr<std::shared_ptr<DartIsolate>> isolate_data,
    Dart_IsolateFlags* flags,
    char** error) {

  // 创建Isoalte对象
  Dart_Isolate isolate = Dart_CreateIsolateGroup(
      (*isolate_group_data)->GetAdvisoryScriptURI().c_str(),
      (*isolate_group_data)->GetAdvisoryScriptEntrypoint().c_str(),
      (*isolate_group_data)->GetIsolateSnapshot()->GetDataMapping(),
      (*isolate_group_data)->GetIsolateSnapshot()->GetInstructionsMapping(),
      flags, isolate_group_data.get(), isolate_data.get(), error);

  if (isolate == nullptr) {
    return nullptr;
  }

  // 将Isolate的控制权交给Dart VM
  std::shared_ptr<DartIsolate> embedder_isolate(*isolate_data);
  isolate_group_data.release();
  isolate_data.release();
  
  //初始化isoalte
  if (!InitializeIsolate(std::move(embedder_isolate), isolate, error)) {
    return nullptr;
  }

  return isolate;
}

DartIsolateGroupCreateCallback中通过CreateDartIsolateGroup来调用Dart VM中的Dart_CreateIsolateGroup函数实现Isolate的创建,代码实现如下。

[->third_party/dart/runtime/vm/dart_api_impl.cc]

DART_EXPORT Dart_Isolate
Dart_CreateIsolateGroup(const char* script_uri,
                        const char* name,
                        const uint8_t* snapshot_data,
                        const uint8_t* snapshot_instructions,
                        Dart_IsolateFlags* flags,
                        void* isolate_group_data,
                        void* isolate_data,
                        char** error) {

  Dart_IsolateFlags api_flags;
  if (flags == nullptr) {
    Isolate::FlagsInitialize(&api_flags);
    flags = &api_flags;
  }

  const char* non_null_name = name == nullptr ? "isolate" : name;
  std::unique_ptr<IsolateGroupSource> source(
      new IsolateGroupSource(script_uri, non_null_name, snapshot_data,
                             snapshot_instructions, nullptr, -1, *flags));
  auto group = new IsolateGroup(std::move(source), isolate_group_data);
  IsolateGroup::RegisterIsolateGroup(group);
  //创建新的isolate
  Dart_Isolate isolate =
      CreateIsolate(group, non_null_name, isolate_data, error);
  if (isolate != nullptr) {
    group->set_initial_spawn_successful();
  }
  return isolate;
}
...
static Dart_Isolate CreateIsolate(IsolateGroup* group,
                                  const char* name,
                                  void* isolate_data,
                                  char** error) {

  auto source = group->source();
  Isolate* I = Dart::CreateIsolate(name, source->flags, group);
  ...

  Dart::ShutdownIsolate();
  return reinterpret_cast<Dart_Isolate>(NULL);
}

经过一系列调用,最终调用dart.cc中的CreateIsolate函数,该函数很简单,就是创建一个新的Isolate对象。

[->third_party/dart/runtime/vm/dart.cc]

Isolate* Dart::CreateIsolate(const char* name_prefix,
                             const Dart_IsolateFlags& api_flags,
                             IsolateGroup* isolate_group) {
  // Create a new isolate.
  Isolate* isolate =
      Isolate::InitIsolate(name_prefix, isolate_group, api_flags);
  return isolate;
}

[->third_party/dart/runtime/vm/isolate.cc]

//初始化Isolate
Isolate* Isolate::InitIsolate(const char* name_prefix,
                              IsolateGroup* isolate_group,
                              const Dart_IsolateFlags& api_flags,
                              bool is_vm_isolate) {
  //1、创建一个Isolate对象
  Isolate* result = new Isolate(isolate_group, api_flags);
  ...
  //2、创建Isolate对应的堆空间,在该堆空间中,存在对象的分配,垃圾回收等。
  Heap::Init(result,
             is_vm_isolate
                 ? 0  // New gen size 0; VM isolate should only allocate in old.
                 : FLAG_new_gen_semi_max_size * MBInWords,//MBInWords值是128kb,
             (is_service_or_kernel_isolate ? kDefaultMaxOldGenHeapSize
                                           : FLAG_old_gen_heap_size) *
                 MBInWords);
  //3、将Isolate与Thread相关联
  if (!Thread::EnterIsolate(result)) {
    // We failed to enter the isolate, it is possible the VM is shutting down,
    // return back a NULL so that CreateIsolate reports back an error.
    if (KernelIsolate::IsKernelIsolate(result)) {
      KernelIsolate::SetKernelIsolate(nullptr);
    }
    if (ServiceIsolate::IsServiceIsolate(result)) {
      ServiceIsolate::SetServiceIsolate(nullptr);
    }
    delete result;
    return nullptr;
  }

  // Setup the isolate message handler.
  //4、设置isolate的消息处理器
  MessageHandler* handler = new IsolateMessageHandler(result);
  result->set_message_handler(handler);

  // Setup the Dart API state.
  //5、启动Dart API状态
  ApiState* state = new ApiState();
  result->set_api_state(state);
  
  //6、设置主端口
  result->set_main_port(PortMap::CreatePort(result->message_handler()));

  // Add to isolate list. Shutdown and delete the isolate on failure.
  //7、将当前的Isolate添加到链表中(一个单链表)
  if (!AddIsolateToList(result)) {
    //添加失败,销毁该Isolate
    result->LowLevelShutdown();
    //取消线程与Isolate的关联
    Thread::ExitIsolate();
    //如果是虚拟机内部的Isolate
    if (KernelIsolate::IsKernelIsolate(result)) {
      KernelIsolate::SetKernelIsolate(nullptr);
    }
    //如果是Service Isolate
    if (ServiceIsolate::IsServiceIsolate(result)) {
      ServiceIsolate::SetServiceIsolate(nullptr);
    }
    //删除当前Isolate对象
    delete result;
    return nullptr;
  }

  return result;
}

InitIsolate函数比较重要,主要做了以下事情。

  1. 创建Isolate对象
  2. 创建Isolate中的堆空间,在Isolate仅有一块堆空间。存在堆空间也就会存在对象分配、垃圾回收等。
  3. Isolate对象与一个线程进行关联,也就是可以说一个线程对应着一个Isolate对象。
  4. 设置消息处理器(IsolateMessageHandler),主要是对于Isolate中的消息处理。子Isolate可以通过端口向父IsolateMessageHandler中添加消息,反之亦然。这也是Isolate间的通信的实现。
  5. 设置api state,暂时没搞懂这个是干啥的。
  6. 设置主端口。
  7. 将当前Isolate添加到链表中。

当上面的一些操作执行完毕后,一个Isolate对象就创建成功了。

2.2、DartIsolate初始化

CreateDartIsolateGroup中调用Dart_CreateIsolateGroup函数创建isolate对象成功后,还会进行DartIsolate对象的初始化,其初始化是在InitializeIsolate函数中实现的。

[->flutter/runtime/dart_isolate.cc]

bool DartIsolate::InitializeIsolate(
    std::shared_ptr<DartIsolate> embedder_isolate,
    Dart_Isolate isolate,
    char** error) {
  //isolate的初始化,此时DartIsolate对象处于Initialized状态
  if (!embedder_isolate->Initialize(isolate)) {
    return false;
  }
  
  //加载library,此时DartIsolate对象处于LibrariesSetup状态
  if (!embedder_isolate->LoadLibraries()) {
    return false;
  }

  //如果是非RootIsolate,那么DartIsolate对象将处于Ready状态,也只有DartIsolate对象处于Ready状态时,Isolate才可以运行。
  if (!embedder_isolate->IsRootIsolate()) {
    auto child_isolate_preparer =
        embedder_isolate->GetIsolateGroupData().GetChildIsolatePreparer();
    if (!child_isolate_preparer(embedder_isolate.get())) {
      return false;
    }
  }

  return true;
}

DartIsolate初始化后,DartIsolate对象就由Uninitialized状态切换为Ready状态,这时候isolate也就是可运行的。

2.3、isolate的运行

isolate可运行后,接下来就会来运行isolate。在SpawnIsolateTask类中,运行isolate调用的是其Run函数,实现如下。

[->third_party/dart/runtime/vm/isolate.cc]

void Isolate::Run() {
  //向消息处理器中添加的第一个消息
  //记住该RunIsolate函数,在后面会说到
  message_handler()->Run(Dart::thread_pool(), RunIsolate, ShutdownIsolate,
                         reinterpret_cast<uword>(this));
}

[->third_party/dart/runtime/vm/MessageHandler.cc]

void MessageHandler::Run(ThreadPool* pool,
                         StartCallback start_callback,
                         EndCallback end_callback,
                         CallbackData data) {
  MonitorLocker ml(&monitor_);
  pool_ = pool;
  start_callback_ = start_callback;
  end_callback_ = end_callback;
  callback_data_ = data;
  task_running_ = true;
  //在线程池中执行任务
  const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
}

然后继续异步执行,但这次是在子Isolate中执行的。下面再来看MessageHandlerTask,在MessageHandlerTaskrun函数中执行的是TaskCallback函数。

[->third_party/dart/runtime/vm/message_handler.cc]

void MessageHandler::TaskCallback() {
  MessageStatus status = kOK;
  bool run_end_callback = false;
  bool delete_me = false;
  EndCallback end_callback = NULL;
  CallbackData callback_data = 0;
  {
    ...

    if (status == kOK) {
      //仅当子Isolate第一次运行时,start_callback_才不为null
      if (start_callback_ != nullptr) {
        ml.Exit();
        //调用Isolate的第一个函数(允许多线程并发执行)
        status = start_callback_(callback_data_);
        ASSERT(Isolate::Current() == NULL);
        start_callback_ = NULL;
        ml.Enter();
      }
      ...
    }

    ...
  }

  ...
}

先不管消息处理[见小结3],这里重点来看start_callback_,它对应着RunIsolate这个函数。

[->third_party/dart/runtime/vm/isolate.cc]

//运行Isolate
static MessageHandler::MessageStatus RunIsolate(uword parameter) {
  ...
  {
    ...

    //args是调用Dart层_startIsolate函数所需的参数集合
    const Array& args = Array::Handle(Array::New(7));
    args.SetAt(0, SendPort::Handle(SendPort::New(state->parent_port())));
    args.SetAt(1, Instance::Handle(func.ImplicitStaticClosure()));
    args.SetAt(2, Instance::Handle(state->BuildArgs(thread)));
    args.SetAt(3, Instance::Handle(state->BuildMessage(thread)));
    args.SetAt(4, is_spawn_uri ? Bool::True() : Bool::False());
    args.SetAt(5, ReceivePort::Handle(ReceivePort::New(
                      isolate->main_port(), true /* control port */)));
    args.SetAt(6, capabilities);

    //调用Dart层的_startIsolate函数,该函数在isolate_patch.dart文件中
    const Library& lib = Library::Handle(Library::IsolateLibrary());
    const String& entry_name = String::Handle(String::New("_startIsolate"));
    const Function& entry_point =
        Function::Handle(lib.LookupLocalFunction(entry_name));
    ASSERT(entry_point.IsFunction() && !entry_point.IsNull());

    result = DartEntry::InvokeFunction(entry_point, args);
    if (result.IsError()) {
      return StoreError(thread, Error::Cast(result));
    }
  }
  return MessageHandler::kOK;
}

RunIsolate中,会调用isolate_patch.dart中的_startIsolate函数,从而调用创建Isolate对象时传递的初始化函数。

@pragma("vm:entry-point", "call")
void _startIsolate(
    SendPort parentPort,
    Function entryPoint,
    List<String> args,
    var message,
    bool isSpawnUri,
    RawReceivePort controlPort,
    List capabilities) {
  // The control port (aka the main isolate port) does not handle any messages.
  if (controlPort != null) {
    controlPort.handler = (_) {}; // Nobody home on the control port.
  }

  if (parentPort != null) {
    // Build a message to our parent isolate providing access to the
    // current isolate's control port and capabilities.
    //
    // TODO(floitsch): Send an error message if we can't find the entry point.
    var readyMessage = new List(2);
    readyMessage[0] = controlPort.sendPort;
    readyMessage[1] = capabilities;

    // Out of an excess of paranoia we clear the capabilities from the
    // stack.  Not really necessary.
    capabilities = null;
    //告诉父Isolate,当前`Isolate`已经创建成功
    parentPort.send(readyMessage);
  }

  // Delay all user code handling to the next run of the message loop. This
  // allows us to intercept certain conditions in the event dispatch, such as
  // starting in paused state.
  RawReceivePort port = new RawReceivePort();
  port.handler = (_) {
    port.close();

    if (isSpawnUri) {
      if (entryPoint is _BinaryFunction) {
        (entryPoint as dynamic)(args, message);
      } else if (entryPoint is _UnaryFunction) {
        (entryPoint as dynamic)(args);
      } else {
        entryPoint();
      }
    } else {
      //初始化函数
      entryPoint(message);
    }
  };
  // Make sure the message handler is triggered.
  port.sendPort.send(null);
}

_startIsolate函数中主要是做了以下几件事。

  1. 告诉父Isolate,子Isolate已经创建成功。
  2. 调用子Isolate的初始化函数,也就是入口函数。

到此,一个新的Isolate就已经创建完毕。在创建过程中,会从Dart SDK调用虚拟机函数,然后在新的Isolate对象中通过异步的方式调用入口函数。

注意:主Isolate的入口函数就是熟悉的main函数。

3、isolate之间的通信原理

通过前面一节,知道了Dart是如何创建一个新的Isolate对象的。但也还是省略了很多东西的,比如子Isolate通知父Isolate的原理,也就是Isolate间的通信原理。

3.1、ReceivePort与SendPort

Isolate给另外一个Isolate发送消息之前,需要先来熟悉ReceivePortSendPort。代码如下。

abstract class ReceivePort implements Stream {
  //声明外部实现
  external factory ReceivePort();
}

//在isolate_patch.dart中
@patch
class ReceivePort {
  @patch
  factory ReceivePort() => new _ReceivePortImpl();

  @patch
  factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort) {
    return new _ReceivePortImpl.fromRawReceivePort(rawPort);
  }
}

class _ReceivePortImpl extends Stream implements ReceivePort {
  _ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePort());

  _ReceivePortImpl.fromRawReceivePort(this._rawPort) {
    _controller = new StreamController(onCancel: close, sync: true);
    _rawPort.handler = _controller.add;
  }
  //返回一个SendPort对象
  SendPort get sendPort {
    return _rawPort.sendPort;
  }
  //监听发送的消息
  StreamSubscription listen(void onData(var message),
      {Function onError, void onDone(), bool cancelOnError}) {
    return _controller.stream.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }

  ...
}

@patch
class RawReceivePort {
  @patch
  factory RawReceivePort([Function handler]) {
    _RawReceivePortImpl result = new _RawReceivePortImpl();
    result.handler = handler;
    return result;
  }
}
@pragma("vm:entry-point")
class _RawReceivePortImpl implements RawReceivePort {
  factory _RawReceivePortImpl() native "RawReceivePortImpl_factory";
  ...

  SendPort get sendPort {
    return _get_sendport();
  }

 ...

  /**** Internal implementation details ****/
  _get_id() native "RawReceivePortImpl_get_id";
  _get_sendport() native "RawReceivePortImpl_get_sendport";
  ...
}

在代码中,一个ReceivePort对象包含一个RawReceivePort对象及SendPort对象。其中RawReceivePort对象是在虚拟机中创建的,它对应着虚拟机中的ReceivePort类。代码如下。

[->third_party/dart/runtime/lib.isolate.cc]

DEFINE_NATIVE_ENTRY(RawReceivePortImpl_factory, 0, 1) {
  ASSERT(
      TypeArguments::CheckedHandle(zone, arguments->NativeArgAt(0)).IsNull());
  //创建一个Entry对象并返回一个端口号。
  Dart_Port port_id = PortMap::CreatePort(isolate->message_handler());
  //创建ReceivePort对象
  return ReceivePort::New(port_id, false /* not control port */);
}

在创建ReceivePort对象对象之前,首先会将当前Isolate中的MessageHandler对象添加到map中。这里是一个全局的map,在Dart VM初始化的时候创建,每个元素都是一个Entry对象,在Entry中,有一个MessageHandler对象,一个端口号及该端口的状态。

  typedef struct {
    //端口号
    Dart_Port port;
    //消息处理器
    MessageHandler* handler;
    //端口号状态
    PortState state;
  } Entry;

[->third_party/dart/runtime/vm/port.cc]

Dart_Port PortMap::CreatePort(MessageHandler* handler) {
  ...
  Entry entry;
  //分配一个端口号
  entry.port = AllocatePort();
  //设置消息处理器
  entry.handler = handler;
  //端口号状态
  entry.state = kNewPort;
  //查找当前entry的位置
  intptr_t index = entry.port % capacity_;
  Entry cur = map_[index];
  // Stop the search at the first found unused (free or deleted) slot.
  //找到空闲或将要被删除的Entry。
  while (cur.port != 0) {
    index = (index + 1) % capacity_;
    cur = map_[index];
  }

  if (map_[index].handler == deleted_entry_) {
    // Consuming a deleted entry.
    deleted_--;
  }
  //插入到map中
  map_[index] = entry;

  // Increment number of used slots and grow if necessary.
  used_++;
  //检查是否需要扩容
  MaintainInvariants();

  ...
  //返回端口号
  return entry.port;
}

注意: 这里的map的初始容量是8,当达到容量的3/4时,会进行扩容,新的容量是旧的容量2倍。熟悉Java的就知道,这跟HashMap类似,初始容量为8,加载因子为0.75,扩容是指数级增长。

再来看ReceivePort对象的创建。

[->third_party/dart/runtime/vm/object.cc]

RawReceivePort* ReceivePort::New(Dart_Port id,
                                 bool is_control_port,
                                 Heap::Space space) {
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  const SendPort& send_port =
      //创建SendPort对象
      SendPort::Handle(zone, SendPort::New(id, thread->isolate()->origin_id()));

  ReceivePort& result = ReceivePort::Handle(zone);
  { 
    //创建ReceivePort对象
    RawObject* raw = Object::Allocate(ReceivePort::kClassId,//classId
                                      ReceivePort::InstanceSize(),//对象大小
                                      space);
    NoSafepointScope no_safepoint;
    result ^= raw;
    result.StorePointer(&result.raw_ptr()->send_port_, send_port.raw());
  }
  if (is_control_port) {
    //更新端口的状态,设为kControlPort
    PortMap::SetPortState(id, PortMap::kControlPort);
  } else {
    //更新端口的状态,设为kLivePort
    PortMap::SetPortState(id, PortMap::kLivePort);
  }
  return result.raw();
}

[->third_party/dart/runtime/vm/object.cc]

RawSendPort* SendPort::New(Dart_Port id,
                           Dart_Port origin_id,
                           Heap::Space space) {
  SendPort& result = SendPort::Handle();
  { 
    //创建SendPort对象
    RawObject* raw =
        Object::Allocate(SendPort::kClassId, //classId
                         SendPort::InstanceSize(), //对象ID
                         space);
    NoSafepointScope no_safepoint;
    result ^= raw;
    result.StoreNonPointer(&result.raw_ptr()->id_, id);
    result.StoreNonPointer(&result.raw_ptr()->origin_id_, origin_id);
  }
  return result.raw();
}

这里创建对象时传递的classId是在Isolate对象初始化时注册的,然后根据该classId来创建相应的对象。在这里,ReceivePort对应着Dart SDK中的_RawReceivePortImpl对象,SendPort对应着Dart SDK中的_SendPortImpl对象。

也就是当创建ReceivePort对象时,会通过Dart VM来创建对应的_RawReceivePortImpl对象及SendPort对应的_SendPortImpl对象。

3.2、isolate间通信

ReceivePort创建成功后,就可以通过调用_SendPortImplsend函数来发送消息。

@pragma("vm:entry-point")
class _SendPortImpl implements SendPort {
  ...
  /*--- public interface ---*/
  @pragma("vm:entry-point", "call")
  void send(var message) {
    _sendInternal(message);
  }

  ...

  // Forward the implementation of sending messages to the VM.
  void _sendInternal(var message) native "SendPortImpl_sendInternal_";
}

_sendInternal的具体实现在Dart VM中。

[->third_party/dart/runtime/lib/isolate.cc]

DEFINE_NATIVE_ENTRY(SendPortImpl_sendInternal_, 0, 2) {
  ...

  //目标Isolate所对应端口号
  const Dart_Port destination_port_id = port.Id();
  const bool can_send_any_object = isolate->origin_id() == port.origin_id();

  if (ApiObjectConverter::CanConvert(obj.raw())) {//如果发送消息为null或者发送消息不是堆对象
    PortMap::PostMessage(
        Message::New(destination_port_id, obj.raw(), Message::kNormalPriority));
  } else {
    //创建一个MessageWriter对象——writer
    MessageWriter writer(can_send_any_object);
    // TODO(turnidge): Throw an exception when the return value is false?
    PortMap::PostMessage(writer.WriteMessage(obj, destination_port_id,
                                             Message::kNormalPriority));
  }
  return Object::null();
}

[->third_party/dart/runtime/vm/port.cc]

bool PortMap::PostMessage(std::unique_ptr<Message> message,
                          bool before_events) {
  MutexLocker ml(mutex_);
  //在map中根据目标端口号寻找Entry所在的位置
  intptr_t index = FindPort(message->dest_port());
  if (index < 0) {
    return false;
  }
  //从map中拿到Entry对象并取出MessageHandler对象
  MessageHandler* handler = map_[index].handler;
  //这里的handler是目标Isolate中的MessageHandler
  handler->PostMessage(std::move(message), before_events);
  return true;
}

到这里就已经成功将消息加入到了目标IsolateMessageHandler中,成功完成了Isolate间消息的传递,但还尚未对消息进行处理。

再来看Isolate对于消息的处理。

[->third_party/dart/runtime/vm/message_handler.cc]

void MessageHandler::PostMessage(std::unique_ptr<Message> message,
                                 bool before_events) {
  Message::Priority saved_priority;

  {
    MonitorLocker ml(&monitor_);
    ...

    saved_priority = message->priority();
    if (message->IsOOB()) {
      //加入到OOB类型消息的队列中
      oob_queue_->Enqueue(std::move(message), before_events);
    } else {
      //加入到普通消息队列中
      queue_->Enqueue(std::move(message), before_events);
    }
    if (paused_for_messages_) {
      ml.Notify();
    }

    if (pool_ != nullptr && !task_running_) {
      task_running_ = true;
      //异步处理
      const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
    }
  }

  // Invoke any custom message notification.
  //如果自定义了消息通知函数,那么在消息处理完毕后会调用该函数
  MessageNotify(saved_priority);
}

PostMessage中主要是做了以下操作。

  1. 根据消息级别将消息加入到不同的队列中。主要有OOB消息及普通消息两个级别,OOB消息在队列oob_queue_中,普通消息在队列queue_中。OOB消息级别高于普通消息,会立即处理。
  2. 将一个消息处理任务MessageHandlerTask加入到线程中。

这里的线程池是在Dart VM创建的时候创建的,在Isolate运行时传递给MessageHandler的。

下面再来看MessageHandlerTask,在MessageHandlerTaskrun函数中执行的是TaskCallback函数。

[->third_party/dart/runtime/vm/message_handler.cc]

void MessageHandler::TaskCallback() {
  MessageStatus status = kOK;
  bool run_end_callback = false;
  bool delete_me = false;
  EndCallback end_callback = NULL;
  CallbackData callback_data = 0;
  {
    ...

    if (status == kOK) {
      ...
      bool handle_messages = true;
      while (handle_messages) {
        handle_messages = false;
        // Handle any pending messages for this message handler.
        if (status != kShutdown) {
          //处理消息
          status = HandleMessages(&ml, (status == kOK), true);
        }
        if (status == kOK && HasLivePorts()) {
          handle_messages = CheckIfIdleLocked(&ml);
        }
      }
    }
    ...
  }

  ...
}

消息的处理是在HandleMessages函数中进行的。

[->third_party/dart/runtime/vm/message_handler.cc]

MessageHandler::MessageStatus MessageHandler::HandleMessages(
    MonitorLocker* ml,
    bool allow_normal_messages,
    bool allow_multiple_normal_messages) {
  ...
  //从队列中获取一个消息,优先OOB消息
  std::unique_ptr<Message> message = DequeueMessage(min_priority);
  //没有消息时退出循环,停止消息的处理
  while (message != nullptr) {
  
    //获取消息的长度
    intptr_t message_len = message->Size();

    ...
    //获取消息级别
    Message::Priority saved_priority = message->priority();
    Dart_Port saved_dest_port = message->dest_port();
    MessageStatus status = kOK;
    {
      DisableIdleTimerScope disable_idle_timer(idle_time_handler);
      //消息的处理
      status = HandleMessage(std::move(message));
    }
    ...
    //如果是已关闭状态,将清除OOB类型消息
    if (status == kShutdown) {
      ClearOOBQueue();
      break;
    }
    ...
    //继续从队列中获取消息
    message = DequeueMessage(min_priority);
  }
  return max_status;
}

HandleMessages函数中会根据消息的优先级别来遍历所有消息并一一处理,直至处理完毕。具体消息处理是在HandleMessage函数中进行的。该函数在其子类IsolateMessageHandler中实现。

[->third_party/dart/runtime/vm/isolate.cc]

MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(
    std::unique_ptr<Message> message) {
  ...
  //如果是普通消息
  if (!message->IsOOB() && (message->dest_port() != Message::kIllegalPort)) {
    //调用Dart层的_lookupHandler函数,返回该函数在isolate_patch.dart中
    msg_handler = DartLibraryCalls::LookupHandler(message->dest_port());
    ...
  }

  ...

  MessageStatus status = kOK;
  if (message->IsOOB()) {//处理OOB消息
    ...
  } else if (message->dest_port() == Message::kIllegalPort) {//处理OOB消息,主要是处理延迟OOB消息
    ...
  } else {//处理普通消息
    ...
    //调用Dart层的_RawReceivePortImpl对象中的_handleMessage函数,该函数在isolate_patch.dart中
    const Object& result =
        Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
    if (result.IsError()) {
      status = ProcessUnhandledException(Error::Cast(result));
    } else {
      ...
    }
  }
  return status;
}

在这里先暂时不管OOB消息的处理,来看普通消息的处理。

  1. 首先调用Dart SDK中_RawReceivePortImpl对象的_lookupHandler函数,返回一个在创建_RawReceivePortImpl对象时注册的一个自定义函数。
  2. 调用Dart SDK中_RawReceivePortImpl对象的_handleMessage函数并传入1中返回的自定义函数,通过该自定义函数将消息分发出去。

至此,一个Isolate就已经成功的向另外一个Isolate成功发送并接收消息。而双向通信也很简单,在父Isolate中创建一个端口,并在创建子Isolate时,将这个端口传递给子Isolate。然后在子Isolate调用其入口函数时也创建一个新端口,并通过父Isolate传递过来的端口把子Isolate创建的端口传递给父Isolate,这样父Isolate与子Isolate分别拥有对方的一个端口号,从而实现了通信。具体代码[见小节1.2]。

4、总结

通过上面的内容就可以对isolate的的创建、运行及通信的实现有了一个基本的了解,当然isolate也不仅仅只有上述的一些东西。但由于篇幅限制,其他内容(堆的内存分配、对象的垃圾回收等),后面再来一一分析。

【参考资料】

Glossary of VM Terms

Dart asynchronous programming: Isolates and event loops

Introduction to Dart VM