Because Dart is a single-threaded modeling language, it avoids a number of rundown issues that can occur in multithreaded environments. However, the single-threaded model has a very serious drawback, which is that the execution of computationally intensive tasks will block the execution of the current task, resulting in undesirable effects (such as UI stalling, etc.), requiring the provision of a new thread or thread-like thing to perform computationally intensive tasks asynchronously. Since Dart was unable to create threads, Isolate was provided to perform computation-intensive tasks asynchronously.

When I first learned about the Isolate, I thought it was just something like a thread. However, as I studied further, I found that THE Isolate was much more complex than threads and could even be regarded as a process. Due to the complexity of the ISOLATE, this paper only analyzes the use, creation and communication of the ISOLATE.

1. Use of Isolate

In Dart, the use and communication of isolate. spawn and isolate. spawnUri are used to create isolate. spawn and spawnUri, and ReceivePort is used to communicate with each other. Let’s see how to use THE Isolate.

One-way communication is isolated

Let’s look at one-way communication between the Isolate. The code is as follows.

// called in the parent Isolate
Isolate isolate;
start() async {
  ReceivePort receivePort = ReceivePort();
  // Create child Isolate objects
  isolate = await Isolate.spawn(getMsg, receivePort.sendPort);
  // Listen for data returned from the subisolate
  receivePort.listen((data) {
    print('data: $data');
    receivePort.close();
    // Close the Isolate objectisolate? .kill(priority: Isolate.immediate); isolate =null;
  });
}
// The entry function of the subISOLATE object in which time-consuming operations can be performed
getMsg(sendPort) => sendPort.send("hello");
Copy the code

After running the code, the data returned by the newly created Isolate object is printed as follows.

1.2. Isolate two-way communication

Let’s look at the implementation of communication between multiple isolates. The code is as follows.


// The current function is in the parent Isolate
Future<dynamic> asyncFactoriali(n) async {
  // The ReceivePort object corresponding to the parent Isolate
  final response = ReceivePort();
  // Create a subISOLATE object
  await Isolate.spawn(_isolate, response.sendPort);
  final sendPort = await response.first as SendPort;
  final answer = ReceivePort();
  // Send data to the subisolate
  sendPort.send([n, answer.sendPort]);
  return answer.first;
}

// The entry function of the subISOLATE, where time-consuming operations can be performed
// _ISOLATE must be a top-level function (cannot exist in any class) or a static function (can exist in a class)
_isolate(SendPort initialReplyTo) async {
  // ReceivePort object corresponding to the subISOLATE
  final port = ReceivePort();
  initialReplyTo.send(port.sendPort);
  final message = await port.first as List;
  final data = message[0] as int;
  final send = message[1] as SendPort;
  // Return data to parent Isolate
  send.send(syncFactorial(data));
}

// Run the code
start() async {
  print(The calculation results are as follows:${await asyncFactoriali(4)}");
}
start();
Copy the code

The result is as follows after the data is calculated on the newly created Isolate and returned.

With the above code, we were able to perform asynchronous tasks using Isolate. Let’s see how it works.

2. Establishment and operation of ISOLATE

Take a look at the sequence diagram below to see how the ISOLATE was created, initialized, and run.

2.1 Establishment of ISOLATE

Let’s start with the creation of the ISOLATE. In the above example, the isolate. Spawn is used to create an isolate object.

class Isolate {
  // Declare an external implementation
  external static Future<Isolate> spawn<T>(
      void entryPoint(T message), T message,
      {bool paused: false.bool errorsAreFatal,
      SendPort onExit,
      SendPort onError,
      @Since("2.3") String debugName});
}
Copy the code

The external keyword here mainly declares the spawn function, the implementation of which is provided externally. In Dart, the concrete implementation of this function is in isolate_patch.dart. Let’s take a look at the spawn implementation.

@patch
class Isolate {
  @patch
  static Future<Isolate> spawn<T>(void entryPoint(T message), T message,
      {bool paused: false.bool errorsAreFatal,
      SendPort onExit,
      SendPort onError,
      String debugName}) async {
    // `paused` isn't handled yet.
    RawReceivePort readyPort;
    try {
      // This function is executed asynchronously
      _spawnFunction(
          readyPort.sendPort,
          script.toString(),
          entryPoint,
          message,
          paused,
          errorsAreFatal,
          onExit,
          onError,
          null,
          packageConfig,
          debugName);
      return await _spawnCommon(readyPort);
    } catch(e, st) { ... }}static Future<Isolate> _spawnCommon(RawReceivePort readyPort) {
    Completer completer = new Completer<Isolate>.sync(a);// Monitor whether the child Isolate has been created and notify the parent Isolate when the child Isolate has been created
    readyPort.handler = (readyMessage) {
      // Close the port
      readyPort.close();
      if (readyMessage is List && readyMessage.length == 2) {// The subisolate was created successfully
        SendPort controlPort = readyMessage[0];
        List capabilities = readyMessage[1];
        completer.complete(new Isolate(controlPort,
            pauseCapability: capabilities[0],
            terminateCapability: capabilities[1]));
      } else if (readyMessage is String) {... }else {...}
    };
    returncompleter.future; }...// Call Isolate_spawnFunction in the virtual machine
  static void _spawnFunction(
      SendPort readyPort,
      String uri,
      Function topLevelFunction,
      var message,
      bool paused,
      bool errorsAreFatal,
      SendPort onExit,
      SendPort onError,
      String packageRoot,
      String packageConfig,
      String debugName) native "Isolate_spawnFunction"; . }Copy the code

_spawnFunction calls Isolate_spawnFunction in the Dart VM, which gives the creation of the Isolate object to the thread pool, so the creation of the Isolate object is asynchronous. The thread pool here was created when the Dart VM was initialized.

[->third_party/dart/runtime/lib/isolate.cc]

DEFINE_NATIVE_ENTRY(Isolate_spawnFunction, 0.11) {...if (closure.IsClosure()) {
    ...
      Thread_pool is a thread pool that is created when the Dart VM is created
      Dart::thread_pool()->Run<SpawnIsolateTask>(isolate, std::move(state));
      returnObject::null(); }}...return Object::null();
}
Copy the code

SpawnIsolateTask is a Java class that implements the Runable interface. The SpawnIsolateTask is used to create and run a subISOLATE object.

[->third_party/dart/runtime/lib/isolate.cc]

// Execute in child thread
class SpawnIsolateTask : public ThreadPool::Task {
  void Run(a) override {
  
    auto group = state_->isolate_group();

    // Create_group_callback is initialized when the Dart VM is created.Dart_IsolateGroupCreateCallback create_group_callback = Isolate::CreateGroupCallback(); .OnIsolateInitialize is set during Dart VM initializationDart_InitializeIsolateCallback initialize_callback = Isolate::InitializeCallback(); .char* error = nullptr;
    Isolate* isolate = nullptr;
    // Group and initialize_callback are set when the virtual machine is initialized
    if(! FLAG_enable_isolate_groups || group ==nullptr ||
        initialize_callback == nullptr) {
      Dart_IsolateFlags api_flags = *(state_->isolate_flags());
      // Create a new ISOLATE
      isolate = reinterpret_cast<Isolate*>((create_group_callback)(
          state_->script_url(), name, nullptr, state_->package_config(),
          &api_flags, parent_isolate_->init_callback_data(), &error));
      parent_isolate_->DecrementSpawnCount();
      parent_isolate_ = nullptr;
    } else{... }...// Isolate is available
    // This is set in OnIsolateInitialize
    if (isolate->is_runnable()) {
      / / run the isolate
      //[see section 2.3]isolate->Run(); }}};Copy the code

By default, FLAG_enable_isolate_groups is false, and neither group nor Initialize_callback is null. So the new ISOLATE was created with create_group_callback.

According to Dart VM startup of Flutter, create_group_callback and initialize_callback were assigned to the Isolate at the end of Dart VM initialization.

[->third_party/dart/runtime/vm/dart.cc]

char* Dart::Init(const uint8_t* vm_isolate_snapshot,
                 const uint8_t* instructions_snapshot,
                 Dart_IsolateGroupCreateCallback create_group,
                 Dart_InitializeIsolateCallback initialize_isolate,
                 Dart_IsolateShutdownCallback shutdown,
                 Dart_IsolateCleanupCallback cleanup,
                 Dart_IsolateGroupCleanupCallback cleanup_group,
                 Dart_ThreadExitCallback thread_exit,
                 Dart_FileOpenCallback file_open,
                 Dart_FileReadCallback file_read,
                 Dart_FileWriteCallback file_write,
                 Dart_FileCloseCallback file_close,
                 Dart_EntropySource entropy_source,
                 Dart_GetVMServiceAssetsArchive get_service_assets,
                 boolstart_kernel_isolate, Dart_CodeObserver* observer) { ... Isolate::SetCreateGroupCallback(create_group); Isolate::SetInitializeCallback_(initialize_isolate); . }Copy the code

Here’s the implementation of the create_group_callback function, which is passed in when the init function is called. According to the call chain, the init function will be called when the DartVM object is initialized.

[->flutter/runtime/dart_vm.cc]

DartVM::DartVM(std: :shared_ptr<const DartVMData> vm_data,
               std: :shared_ptr<IsolateNameServer> isolate_name_server)
    : settings_(vm_data->GetSettings()),
      concurrent_message_loop_(fml::ConcurrentMessageLoop::Create()),
      skia_concurrent_executor_(
          [runner = concurrent_message_loop_->GetTaskRunner()](
              fml::closure work) { runner->PostTask(work); }),
      vm_data_(vm_data),
      isolate_name_server_(std::move(isolate_name_server)),
      service_protocol_(std::make_shared<ServiceProtocol>()) { ... { Dart_InitializeParams params = {}; .// The create_group_callback function is implemented
    params.create_group = reinterpret_cast<decltype(params.create_group)>(
        DartIsolate::DartIsolateGroupCreateCallback);
    Initialize_isolate Corresponding function implementation
    params.initialize_isolate =
        reinterpret_cast<decltype(params.initialize_isolate)>( DartIsolate::DartIsolateInitializeCallback); .char* init_error = Dart_Initialize(&params); . }... }Copy the code

Look at below DartIsolateGroupCreateCallback implementation.

[->flutter/runtime/dart_isolate.cc]

Dart_Isolate DartIsolate::DartIsolateGroupCreateCallback(
    const char* advisory_script_uri,
    const char* advisory_script_entrypoint,
    const char* package_root,
    const char* package_config,
    Dart_IsolateFlags* flags,
    std: :shared_ptr<DartIsolate>* parent_isolate_data,
    char** error) {
  ... 
  // Create a DartIsolate object that is in the Uninitialized state
  auto isolate_data = std::make_unique<std: :shared_ptr<DartIsolate>>(
      std: :shared_ptr<DartIsolate>(new DartIsolate(
          (*isolate_group_data)->GetSettings(),  // settings
          null_task_runners,                     // task_runners
          fml::WeakPtr<SnapshotDelegate>{},      // snapshot_delegate
          fml::WeakPtr<IOManager>{},             // io_manager
          fml::RefPtr<SkiaUnrefQueue>{},         // unref_queue
          fml::WeakPtr<ImageDecoder>{},          // image_decoder
          advisory_script_uri,                   // advisory_script_uri
          advisory_script_entrypoint,            // advisory_script_entrypoint
          false))); 
  // Create an ISOLATE object
  Dart_Isolate vm_isolate = CreateDartIsolateGroup(
      std::move(isolate_group_data), std::move(isolate_data), flags, error); .return vm_isolate;
}


Dart_Isolate DartIsolate::CreateDartIsolateGroup(
    std: :unique_ptr<std: :shared_ptr<DartIsolateGroupData>> isolate_group_data,
    std: :unique_ptr<std: :shared_ptr<DartIsolate>> isolate_data,
    Dart_IsolateFlags* flags,
    char** error) {

  // Create an Isoalte object
  Dart_Isolate isolate = Dart_CreateIsolateGroup(
      (*isolate_group_data)->GetAdvisoryScriptURI().c_str(),
      (*isolate_group_data)->GetAdvisoryScriptEntrypoint().c_str(),
      (*isolate_group_data)->GetIsolateSnapshot()->GetDataMapping(),
      (*isolate_group_data)->GetIsolateSnapshot()->GetInstructionsMapping(),
      flags, isolate_group_data.get(), isolate_data.get(), error);

  if (isolate == nullptr) {
    return nullptr;
  }

  Transfer control of the Isolate to the Dart VM
  std: :shared_ptr<DartIsolate> embedder_isolate(*isolate_data);
  isolate_group_data.release();
  isolate_data.release();
  
  // Initialize isoalte
  if(! InitializeIsolate(std::move(embedder_isolate), isolate, error)) {
    return nullptr;
  }

  return isolate;
}
Copy the code

Passed in DartIsolateGroupCreateCallback CreateDartIsolateGroup to invoke the Dart VM Isolate Dart_CreateIsolateGroup function in the creation of a code is as follows.

[->third_party/dart/runtime/vm/dart_api_impl.cc]

DART_EXPORT Dart_Isolate
Dart_CreateIsolateGroup(const char* script_uri,
                        const char* name,
                        const uint8_t* snapshot_data,
                        const uint8_t* snapshot_instructions,
                        Dart_IsolateFlags* flags,
                        void* isolate_group_data,
                        void* isolate_data,
                        char** error) {

  Dart_IsolateFlags api_flags;
  if (flags == nullptr) {
    Isolate::FlagsInitialize(&api_flags);
    flags = &api_flags;
  }

  const char* non_null_name = name == nullptr ? "isolate" : name;
  std: :unique_ptr<IsolateGroupSource> source(
      new IsolateGroupSource(script_uri, non_null_name, snapshot_data,
                             snapshot_instructions, nullptr.- 1, *flags));
  auto group = new IsolateGroup(std::move(source), isolate_group_data);
  IsolateGroup::RegisterIsolateGroup(group);
  // Create a new ISOLATE
  Dart_Isolate isolate =
      CreateIsolate(group, non_null_name, isolate_data, error);
  if(isolate ! =nullptr) {
    group->set_initial_spawn_successful();
  }
  returnisolate; }...static Dart_Isolate CreateIsolate(IsolateGroup* group,
                                  const char* name,
                                  void* isolate_data,
                                  char** error) {

  autosource = group->source(); Isolate* I = Dart::CreateIsolate(name, source->flags, group); . Dart::ShutdownIsolate();return reinterpret_cast<Dart_Isolate>(NULL);
}
Copy the code

After a series of calls, the CreateIsolate function in Dart.cc is called, which simply creates a new Isolate object.

[->third_party/dart/runtime/vm/dart.cc]

Isolate* Dart::CreateIsolate(const char* name_prefix,
                             const Dart_IsolateFlags& api_flags,
                             IsolateGroup* isolate_group) {
  // Create a new isolate.
  Isolate* isolate =
      Isolate::InitIsolate(name_prefix, isolate_group, api_flags);
  return isolate;
}
Copy the code

[->third_party/dart/runtime/vm/isolate.cc]

// Initialize the Isolate
Isolate* Isolate::InitIsolate(const char* name_prefix,
                              IsolateGroup* isolate_group,
                              const Dart_IsolateFlags& api_flags,
                              bool is_vm_isolate) {
  // create an Isolate object
  Isolate* result = newIsolate(isolate_group, api_flags); .Create the heap space corresponding to the Isolate. In the heap space, objects are allocated and garbage is collected.
  Heap::Init(result,
             is_vm_isolate
                 ? 0  // New gen size 0; VM isolate should only allocate in old.
                 : FLAG_new_gen_semi_max_size * MBInWords,//MBInWords is 128kb,
             (is_service_or_kernel_isolate ? kDefaultMaxOldGenHeapSize
                                           : FLAG_old_gen_heap_size) *
                 MBInWords);
  //3. Isolate and associate threads
  if(! Thread::EnterIsolate(result)) {// We failed to enter the isolate, it is possible the VM is shutting down,
    // return back a NULL so that CreateIsolate reports back an error.
    if (KernelIsolate::IsKernelIsolate(result)) {
      KernelIsolate::SetKernelIsolate(nullptr);
    }
    if (ServiceIsolate::IsServiceIsolate(result)) {
      ServiceIsolate::SetServiceIsolate(nullptr);
    }
    delete result;
    return nullptr;
  }

  // Setup the isolate message handler.
  //4. Set the MESSAGE processor of the ISOLATE
  MessageHandler* handler = new IsolateMessageHandler(result);
  result->set_message_handler(handler);

  // Setup the Dart API state.
  //5. Start Dart API status
  ApiState* state = new ApiState();
  result->set_api_state(state);
  
  //6. Set the primary port
  result->set_main_port(PortMap::CreatePort(result->message_handler()));

  // Add to isolate list. Shutdown and delete the isolate on failure.
  // Add current Isolate to the linked list (a single linked list)
  if(! AddIsolateToList(result)) {// Failed to add the Isolate
    result->LowLevelShutdown();
    // Cancel the association between threads and Isolate
    Thread::ExitIsolate();
    // If the status is internal Isolate of the VIRTUAL machine
    if (KernelIsolate::IsKernelIsolate(result)) {
      KernelIsolate::SetKernelIsolate(nullptr);
    }
    // If it is Service Isolate
    if (ServiceIsolate::IsServiceIsolate(result)) {
      ServiceIsolate::SetServiceIsolate(nullptr);
    }
    // Delete the current Isolate object
    delete result;
    return nullptr;
  }

  return result;
}
Copy the code

The InitIsolate function is important and does the following.

  1. createIsolateobject
  2. createIsolateThe heap space inIsolateThere is only one heap space. Where there is heap space, there is object allocation, garbage collection, and so on.
  3. willIsolateObject is associated with a thread, that is, one thread for oneIsolateObject.
  4. Set the message handler (IsolateMessageHandler), mainly forIsolateMessage processing in. The childIsolateYou can call the parent through the portIsolatetheMessageHandlerAdd messages to, and vice versa. This is alsoIsolateThe realization of communication between.
  5. Set API state.
  6. Set the primary port.
  7. The currentIsolateAdd to the linked list.

When the above operations are complete, an Isolate object has been created.

2.2 DartIsolate initialization

After the creation of the ISOLATE object is successful by calling the Dart_CreateIsolateGroup function in the CreateDartIsolateGroup, the DartIsolate object is also initialized, which is implemented in the InitializeIsolate function.

[->flutter/runtime/dart_isolate.cc]

bool DartIsolate::InitializeIsolate(
    std: :shared_ptr<DartIsolate> embedder_isolate,
    Dart_Isolate isolate,
    char** error) {
  // Initialize the ISOLATE, and the DartIsolate object is in Initialized state
  if(! embedder_isolate->Initialize(isolate)) {return false;
  }
  
  // Load the library with the DartIsolate object in the LibrariesSetup state
  if(! embedder_isolate->LoadLibraries()) {return false;
  }

  // If it is not RootIsolate, the DartIsolate object will be Ready, and the Isolate will run only if the DartIsolate object is Ready.
  if(! embedder_isolate->IsRootIsolate()) {auto child_isolate_preparer =
        embedder_isolate->GetIsolateGroupData().GetChildIsolatePreparer();
    if(! child_isolate_preparer(embedder_isolate.get())) {return false; }}return true;
}
Copy the code

After DartIsolate is initialized, the DartIsolate object changes from Uninitialized to Ready, and the ISOLATE becomes available.

2.3. Operation of ISOLATE

Once the ISOLATE is running, the next step is to run the ISOLATE. In the SpawnIsolateTask class, the Run function that runs the ISOLATE calls is implemented as follows.

[->third_party/dart/runtime/vm/isolate.cc]

void Isolate::Run() {
  // The first message added to the message handler
  // Remember the RunIsolate function, as discussed later
  message_handler()->Run(Dart::thread_pool(), RunIsolate, ShutdownIsolate,
                         reinterpret_cast<uword>(this));
}
Copy the code

[->third_party/dart/runtime/vm/MessageHandler.cc]

void MessageHandler::Run(ThreadPool* pool,
                         StartCallback start_callback,
                         EndCallback end_callback,
                         CallbackData data) {
  MonitorLocker ml(&monitor_);
  pool_ = pool;
  start_callback_ = start_callback;
  end_callback_ = end_callback;
  callback_data_ = data;
  task_running_ = true;
  // Execute tasks in the thread pool
  const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
}
Copy the code

It then continued to execute asynchronously, but this time within the sub-isolate. The MessageHandlerTask runs the TaskCallback function.

[->third_party/dart/runtime/vm/message_handler.cc]

void MessageHandler::TaskCallback() {
  MessageStatus status = kOK;
  bool run_end_callback = false;
  bool delete_me = false;
  EndCallback end_callback = NULL;
  CallbackData callback_data = 0; {...if (status == kOK) {
      // Start_callback_ is not null only when the subISOLATE runs for the first time
      if(start_callback_ ! =nullptr) {
        ml.Exit();
        // Call the first function of Isolate (allows simultaneous execution of multiple threads)
        status = start_callback_(callback_data_);
        ASSERT(Isolate::Current() == NULL);
        start_callback_ = NULL; ml.Enter(); }... }... }... }Copy the code

Leaving message processing aside [see summary 3], I’ll focus on start_callback_, which corresponds to the function RunIsolate.

[->third_party/dart/runtime/vm/isolate.cc]

/ / run the Isolate
static MessageHandler::MessageStatus RunIsolate(uword parameter) {... {...// Args is the set of parameters required to call the Dart layer _startIsolate function
    const Array& args = Array::Handle(Array::New(7));
    args.SetAt(0, SendPort::Handle(SendPort::New(state->parent_port())));
    args.SetAt(1, Instance::Handle(func.ImplicitStaticClosure()));
    args.SetAt(2, Instance::Handle(state->BuildArgs(thread)));
    args.SetAt(3, Instance::Handle(state->BuildMessage(thread)));
    args.SetAt(4, is_spawn_uri ? Bool::True() : Bool::False());
    args.SetAt(5, ReceivePort::Handle(ReceivePort::New(
                      isolate->main_port(), true /* control port */)));
    args.SetAt(6, capabilities);

    // Call the _startIsolate function of the Dart layer, which is in the isolate_patch.dart file
    const Library& lib = Library::Handle(Library::IsolateLibrary());
    const String& entry_name = String::Handle(String::New("_startIsolate"));
    constFunction& entry_point = Function::Handle(lib.LookupLocalFunction(entry_name)); ASSERT(entry_point.IsFunction() && ! entry_point.IsNull()); result = DartEntry::InvokeFunction(entry_point, args);if (result.IsError()) {
      returnStoreError(thread, Error::Cast(result)); }}return MessageHandler::kOK;
}
Copy the code

Within RunIsolate, the _startIsolate function in isolate_patch.dart is called, which calls the initialization function passed when the Isolate object is created.

@pragma("vm:entry-point"."call")
void _startIsolate(
    SendPort parentPort,
    Function entryPoint,
    List<String> args,
    var message,
    bool isSpawnUri,
    RawReceivePort controlPort,
    List capabilities) {
  // The control port (aka the main isolate port) does not handle any messages.
  if(controlPort ! =null) {
    controlPort.handler = (_) {}; // Nobody home on the control port.
  }

  if(parentPort ! =null) {
    // Build a message to our parent isolate providing access to the
    // current isolate's control port and capabilities.
    //
    // TODO(floitsch): Send an error message if we can't find the entry point.
    var readyMessage = new List(2);
    readyMessage[0] = controlPort.sendPort;
    readyMessage[1] = capabilities;

    // Out of an excess of paranoia we clear the capabilities from the
    // stack. Not really necessary.
    capabilities = null;
    // Tell the parent that the current 'Isolate' has been created successfully
    parentPort.send(readyMessage);
  }

  // Delay all user code handling to the next run of the message loop. This
  // allows us to intercept certain conditions in the event dispatch, such as
  // starting in paused state.
  RawReceivePort port = new RawReceivePort();
  port.handler = (_) {
    port.close();

    if (isSpawnUri) {
      if (entryPoint is _BinaryFunction) {
        (entryPoint as dynamic)(args, message);
      } else if (entryPoint is _UnaryFunction) {
        (entryPoint as dynamic)(args);
      } else{ entryPoint(); }}else {
      // Initialize the functionentryPoint(message); }};// Make sure the message handler is triggered.
  port.sendPort.send(null);
}
Copy the code

The _startIsolate function does the following things.

  1. Tell fatherIsolateSon,IsolateThe system has been created successfully.
  2. Call the childIsolateThe initialization function of, which is the entry function.

At this point, a new Isolate has been created. During creation, virtual machine functions are called from the Dart SDK and then portal functions are called asynchronously from the new Isolate object.

Note: the mainIsolateThe entry function is familiarmainFunction.

3. Communication between isolate

From the previous section, you saw how Dart creates a new Isolate object. But there were a lot of things that were left out, such as the way that the child isolates communicate with their parents, that is, how they communicate with each other.

3.1 ReceivePort and SendPort

Before the Isolate can send messages to another Isolate, you need to familiarize yourself with ReceivePort and SendPort. Here’s the code.

abstract class ReceivePort implements Stream {
  // Declare an external implementation
  external factory ReceivePort();
}

/ / in isolate_patch. Dart
@patch
class ReceivePort {
  @patch
  factory ReceivePort() => new _ReceivePortImpl();

  @patch
  factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort) {
    return new_ReceivePortImpl.fromRawReceivePort(rawPort); }}class _ReceivePortImpl extends Stream implements ReceivePort {
  _ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePort());

  _ReceivePortImpl.fromRawReceivePort(this._rawPort) {
    _controller = new StreamController(onCancel: close, sync: true);
    _rawPort.handler = _controller.add;
  }
  // Return a SendPort object
  SendPort get sendPort {
    return _rawPort.sendPort;
  }
  // Listen for outgoing messages
  StreamSubscription listen(void onData(var message),
      {Function onError, void onDone(), bool cancelOnError}) {
    return_controller.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); }... }@patch
class RawReceivePort {
  @patch
  factory RawReceivePort([Function handler]) {
    _RawReceivePortImpl result = new _RawReceivePortImpl();
    result.handler = handler;
    returnresult; }}@pragma("vm:entry-point")
class _RawReceivePortImpl implements RawReceivePort {
  factory _RawReceivePortImpl() native "RawReceivePortImpl_factory"; . SendPortget sendPort {
    return_get_sendport(); }.../**** Internal implementation details *** * /
  _get_id() native "RawReceivePortImpl_get_id";
  _get_sendport() native "RawReceivePortImpl_get_sendport"; . }Copy the code

In the code, a ReceivePort object contains a RawReceivePort object and a SendPort object. The RawReceivePort object is created on the VM and corresponds to the ReceivePort class on the VM. Here’s the code.

[->third_party/dart/runtime/lib.isolate.cc]

DEFINE_NATIVE_ENTRY(RawReceivePortImpl_factory, 0.1) {
  ASSERT(
      TypeArguments::CheckedHandle(zone, arguments->NativeArgAt(0)).IsNull());
  // Create an Entry object and return a port number.
  Dart_Port port_id = PortMap::CreatePort(isolate->message_handler());
  Create a ReceivePort object
  return ReceivePort::New(port_id, false /* not control port */);
}
Copy the code

Before creating a ReceivePort object, the MessageHandler object of the current Isolate is added to the Map. Here is a global map, created during Dart VM initialization. Each element is an Entry object. In the Entry, there is a MessageHandler object, a port number, and the state of that port.

  typedef struct {
    / / the port number
    Dart_Port port;
    // Message handler
    MessageHandler* handler;
    // Port number status
    PortState state;
  } Entry;
Copy the code

[->third_party/dart/runtime/vm/port.cc]

Dart_Port PortMap::CreatePort(MessageHandler* handler) {
  ...
  Entry entry;
  // Assign a port number
  entry.port = AllocatePort();
  // Set the message handler
  entry.handler = handler;
  // Port number status
  entry.state = kNewPort;
  // Find the current entry location
  intptr_t index = entry.port % capacity_;
  Entry cur = map_[index];
  // Stop the search at the first found unused (free or deleted) slot.
  // Find the Entry that is free or about to be deleted.
  while(cur.port ! =0) {
    index = (index + 1) % capacity_;
    cur = map_[index];
  }

  if (map_[index].handler == deleted_entry_) {
    // Consuming a deleted entry.
    deleted_--;
  }
  // Insert into map
  map_[index] = entry;

  // Increment number of used slots and grow if necessary.
  used_++;
  // Check whether the capacity needs to be expandedMaintainInvariants(); .// Return the port number
  return entry.port;
}
Copy the code

Note: The initial capacity of the map is 8. When it reaches 3/4 of the capacity, it is expanded. The new capacity is twice the old capacity. For those familiar with Java, this is similar to HashMap, with an initial capacity of 8, a load factor of 0.75, and exponential growth.

Now look at the creation of the ReceivePort object.

[->third_party/dart/runtime/vm/object.cc]

RawReceivePort* ReceivePort::New(Dart_Port id,
                                 bool is_control_port,
                                 Heap::Space space) {
  Thread* thread = Thread::Current();
  Zone* zone = thread->zone();
  const SendPort& send_port =
      // Create SendPort object
      SendPort::Handle(zone, SendPort::New(id, thread->isolate()->origin_id()));

  ReceivePort& result = ReceivePort::Handle(zone);
  { 
    Create a ReceivePort object
    RawObject* raw = Object::Allocate(ReceivePort::kClassId,//classId
                                      ReceivePort::InstanceSize(),// Object size
                                      space);
    NoSafepointScope no_safepoint;
    result ^= raw;
    result.StorePointer(&result.raw_ptr()->send_port_, send_port.raw());
  }
  if (is_control_port) {
    // Update the status of the port, set to kControlPort
    PortMap::SetPortState(id, PortMap::kControlPort);
  } else {
    // Update the status of the port, set to kLivePort
    PortMap::SetPortState(id, PortMap::kLivePort);
  }
  return result.raw();
}
Copy the code

[->third_party/dart/runtime/vm/object.cc]

RawSendPort* SendPort::New(Dart_Port id,
                           Dart_Port origin_id,
                           Heap::Space space) {
  SendPort& result = SendPort::Handle();
  { 
    // Create SendPort object
    RawObject* raw =
        Object::Allocate(SendPort::kClassId, //classId
                         SendPort::InstanceSize(), / / object ID
                         space);
    NoSafepointScope no_safepoint;
    result ^= raw;
    result.StoreNonPointer(&result.raw_ptr()->id_, id);
    result.StoreNonPointer(&result.raw_ptr()->origin_id_, origin_id);
  }
  return result.raw();
}
Copy the code

The classId that was passed when the object was created was registered when the Isolate object was initialized, and then the corresponding object was created based on this classId. In this case, ReceivePort corresponds to the _RawReceivePortImpl object in the Dart SDK, and SendPort corresponds to the _SendPortImpl object in the Dart SDK.

When a ReceivePort object is created, the Dart VM is used to create the _RawReceivePortImpl object and the _SendPortImpl object.

3.2. Inter-isolate Communication

After ReceivePort is created, you can send messages by calling the send function of _SendPortImpl.

@pragma("vm:entry-point")
class _SendPortImpl implements SendPort {.../*--- public interface ---*/
  @pragma("vm:entry-point"."call")
  void send(varmessage) { _sendInternal(message); }...// Forward the implementation of sending messages to the VM.
  void _sendInternal(var message) native "SendPortImpl_sendInternal_";
}
Copy the code

The actual implementation of _sendInternal is in the Dart VM.

[->third_party/dart/runtime/lib/isolate.cc]

DEFINE_NATIVE_ENTRY(SendPortImpl_sendInternal_, 0.2) {...// Port number of the target Isolate
  const Dart_Port destination_port_id = port.Id();
  const bool can_send_any_object = isolate->origin_id() == port.origin_id();

  if (ApiObjectConverter::CanConvert(obj.raw())) {// If the sent message is null or the sent message is not a heap object
    PortMap::PostMessage(
        Message::New(destination_port_id, obj.raw(), Message::kNormalPriority));
  } else {
    // Create a MessageWriter object -- writer
    MessageWriter writer(can_send_any_object);
    // TODO(turnidge): Throw an exception when the return value is false?
    PortMap::PostMessage(writer.WriteMessage(obj, destination_port_id,
                                             Message::kNormalPriority));
  }
  return Object::null();
}
Copy the code

[->third_party/dart/runtime/vm/port.cc]

bool PortMap::PostMessage(std: :unique_ptr<Message> message,
                          bool before_events) {
  MutexLocker ml(mutex_);
  // Find the Entry position in the map based on the destination port number
  intptr_t index = FindPort(message->dest_port());
  if (index < 0) {
    return false;
  }
  // Get the Entry object from the map and fetch the MessageHandler object
  MessageHandler* handler = map_[index].handler;
  // The handler here is the MessageHandler on the target Isolate
  handler->PostMessage(std::move(message), before_events);
  return true;
}
Copy the code

At this point, the message has been added to the MessageHandler of the target Isolate, successfully transferring messages between isolates, but the message has not yet been processed.

Let’s look at how the Isolate handles messages.

[->third_party/dart/runtime/vm/message_handler.cc]

void MessageHandler::PostMessage(std: :unique_ptr<Message> message,
                                 bool before_events) {
  Message::Priority saved_priority;

  {
    MonitorLocker ml(&monitor_); . saved_priority = message->priority();if (message->IsOOB()) {
      // Joins the queue for OOB messages
      oob_queue_->Enqueue(std::move(message), before_events);
    } else {
      // Joins the normal message queue
      queue_->Enqueue(std::move(message), before_events);
    }
    if (paused_for_messages_) {
      ml.Notify();
    }

    if(pool_ ! =nullptr && !task_running_) {
      task_running_ = true;
      // Asynchronous processing
      const bool launched_successfully = pool_->Run<MessageHandlerTask>(this); }}// Invoke any custom message notification.
  // If you define a message notification function, it will be called after the message is processed
  MessageNotify(saved_priority);
}
Copy the code

PostMessage does the following.

  1. Add messages to different queues based on the message level. There are two levels: OOB message and ordinary message. OOB message is in the queueoob_queue_The normal message is in the queuequeue_In the. OOB messages are of higher level than ordinary messages and are processed immediately.
  2. Sends a message processing taskMessageHandlerTaskAdd to the thread.

The thread pool here is created when the Dart VM is created and passed to the MessageHandler when the Isolate runs.

The MessageHandlerTask runs the TaskCallback function.

[->third_party/dart/runtime/vm/message_handler.cc]

void MessageHandler::TaskCallback() {
  MessageStatus status = kOK;
  bool run_end_callback = false;
  bool delete_me = false;
  EndCallback end_callback = NULL;
  CallbackData callback_data = 0; {...if (status == kOK) {
      ...
      bool handle_messages = true;
      while (handle_messages) {
        handle_messages = false;
        // Handle any pending messages for this message handler.
        if(status ! = kShutdown) {// Process the message
          status = HandleMessages(&ml, (status == kOK), true);
        }
        if(status == kOK && HasLivePorts()) { handle_messages = CheckIfIdleLocked(&ml); }}}... }... }Copy the code

The processing of messages takes place in the HandleMessages function.

[->third_party/dart/runtime/vm/message_handler.cc]

MessageHandler::MessageStatus MessageHandler::HandleMessages(
    MonitorLocker* ml,
    bool allow_normal_messages,
    bool allow_multiple_normal_messages) {
  ...
  // Get a message from the queue, preferentially the OOB message
  std: :unique_ptr<Message> message = DequeueMessage(min_priority);
  // Exit the loop when there is no message to stop message processing
  while(message ! =nullptr) {
  
    // Get the length of the message
    intptr_tmessage_len = message->Size(); .// Get the message level
    Message::Priority saved_priority = message->priority();
    Dart_Port saved_dest_port = message->dest_port();
    MessageStatus status = kOK;
    {
      DisableIdleTimerScope disable_idle_timer(idle_time_handler);
      // Message processing
      status = HandleMessage(std::move(message)); }...// If it is closed, OOB messages are cleared
    if (status == kShutdown) {
      ClearOOBQueue();
      break; }...// Continue to fetch messages from the queue
    message = DequeueMessage(min_priority);
  }
  return max_status;
}
Copy the code

In the HandleMessages function, all messages are iterated through and processed according to their priority level until they are finished. The actual message processing is done in the HandleMessage function. This function is implemented in its subclass IsolateMessageHandler.

[->third_party/dart/runtime/vm/isolate.cc]

MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(
    std: :unique_ptr<Message> message) {
  ...
  // If it is a normal message
  if(! message->IsOOB() && (message->dest_port() ! = Message::kIllegalPort)) {// Call the _lookupHandler function of the Dart layer and return the function in isolate_patch.dartmsg_handler = DartLibraryCalls::LookupHandler(message->dest_port()); . }... MessageStatus status = kOK;if (message->IsOOB()) {// Process OOB messages. }else if (message->dest_port() == Message::kIllegalPort) {// Process OOB messages, mainly delayed OOB messages. }else {// Process normal messages.// Call the _handleMessage function in the _RawReceivePortImpl object of the Dart layer, which is in isolate_patch.dart
    const Object& result =
        Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
    if (result.IsError()) {
      status = ProcessUnhandledException(Error::Cast(result));
    } else{... }}return status;
}

Copy the code

Let’s ignore OOB message processing for a moment and look at normal message processing.

  1. The Dart SDK is called first_RawReceivePortImplThe object’s_lookupHandlerFunction that returns a function in the create_RawReceivePortImplObject is registered as a custom function.
  2. Call the Dart SDK_RawReceivePortImplThe object’s_handleMessageFunction and pass in the custom function returned in 1, through which the message is distributed.

At this point, one Isolate has successfully sent and received messages to another Isolate. Two-way communication is also easy, creating a port in the parent Isolate and passing the port to the child Isolate when it is created. Then, when the child Isolate calls its entry function, a new port is created, and the port created by the child Isolate is transferred to the parent Isolate through the port transferred from the parent Isolate. In this way, the parent and child Isolate each have a port number of the other, thus realizing communication. See Section 1.2 for the code.

4, summarize

Through the above content, you can have a basic understanding of the establishment, operation and communication of THE ISOLATE. Of course, the ISOLATE is more than just the above. However, due to space constraints, other topics (memory allocation of the heap, garbage collection of objects, etc.) will be discussed later.

【 References 】

Glossary of VM Terms

Dart asynchronous programming: Isolates and event loops

Introduction to Dart VM