Dart is single-threaded, so how can it be asynchronous? Or why not stick to the thread?

Dart code runs in a single thread of execution. The Flutter engine is not single-threaded

The Flutter engine does not create threads and embedder provides 4 Task runners with references to the Flutter engine:

  • Platform Task Runner
  • UI Task Runner
  • GPU Task Runner
  • IO Task Runner

All Dart code runs in the context of an ISOLATE that has all the memory needed to run the Dart code snippet. So in development, we often encounter some time-consuming operations, such as network requests, file reads, etc., then the thread is bound to block, unable to respond to other times, the UI is stuck, so how to handle time-consuming operations in a single thread?

Typically we use a Future object to represent the result of an asynchronous operation that is being processed or I/O that will be completed later. So how does a Future implement single-threaded asynchrony?

Event loop

Events are triggered, such as click, redraw, event loop to fetch the event, process it, discard it. It’s like when the package is delivered to the recipient, it’s opened, it’s taken away, it’s thrown away. The future-modified function is similar to a refrigerator, which is put in the hands of the Courier. Instead, it is thawed and then told the recipient that it can be opened, and the recipient will open it again. During this time, the CPU is scheduled to execute other IO, and when the asynchronous processing is complete, the result is put into an event loop, which processes the result.

When it comes to asynchronous processing, many articles have mentioned it in a single brush. We can’t help wondering why a single thread can handle asynchronous processing. Let’s start with the establishment of Future:

The Future will create a Timer, Dart static method _handleMessage() of _Timer object into static member variable _handlerMap of _RawReceivePortImpld object in isolate_patch.dart; ReceivePort and SendPort are created. Here, just like the Hander that communicates between Android threads, Future will hand off tasks to the operating system and continue to perform other tasks. For example, for network requests, the Socket itself provides a select model to query asynchronously; And file IO, the operating system also provides event-based callback mechanism. After the event is processed, the result is sent back to ReceivePort and the event loop processes the result.

What about other time-consuming, high-load operations? For example, for common time-consuming computing tasks, such as solving factorial, the OS does not have asynchronous interfaces for Dart to call, so asynchronous programming is not helpful. In this case, multiple threads are needed to deal with them. Dart’s multiple threads are called ISOLATE, but the ISOLATE is more like a process than a memory sharing one.

Isolate application

The simplest compute

Usually the network returns JSON, which we need to parse into entity beans, and if JSON is too large and time-consuming, it’s stuck. So it needs to be treated in the ISOLATE.

import 'dart:convert';

main(List<String> args) {
  String jsonString = {"id":"123", "name":" score" : 95}";
  Student student = parseJson(jsonString);
  print(student.name);
}

Student parseJson(String json) {
  Map<String.dynamic> map = jsonDecode(json);
  return Student.fromJson(map);
}

class Student {
  String id;
  String name;
  int score;
  Student({this.id, this.name, this.score});
  factory Student.fromJson(Map parsedJson) {
    return Student(id: parsedJson['id'], name: parsedJson['name'], score: parsedJson['score']); }}Copy the code

We put the above code into the ISOLATE to execute:

Future<Student> loadStudent(String json) {
  return compute(parseJson, json);
}

Student parseJson(String json) {
  Map<String.dynamic> map = jsonDecode(json);
  return Student.fromJson(map);
}
Copy the code

Compute is the API of Flutter, which encapsulates the ISOLATE. It is very simple to use, but it has its limitations. It cannot return results multiple times or perform continuous value transfer computations. In some cases, we can use Compute, but in others, we can only use THE ISOLATE provided by DART.

Unidirectional communication isolate

Let’s implement the above code using isolate:

import 'dart:convert';
import 'dart:isolate';

main(List<String> args) async {
  await start();
}

Isolate isolate;

start() async {
  // Create a receive port to receive messages from child threads
  ReceivePort receivePort = ReceivePort();

  // Create and manage the Isolate and pass it to the main thread sending port
  isolate = await Isolate.spawn(entryPoint, receivePort.sendPort);
  // Listen for child thread messages
  receivePort.listen((data) {
    print('Data:$data');
  });
}

/ / concurrent Isolate
entryPoint(SendPort sendPort) {
  String jsonString = {"id":"123", "name":" score" : 95}";
  Student student = parseJson(jsonString);
  sendPort.send(student);
}

Student parseJson(String json) {
  Map<String.dynamic> map = jsonDecode(json);
  return Student.fromJson(map);
}

class Student {
  String id;
  String name;
  int score;
  Student({this.id, this.name, this.score});
  factory Student.fromJson(Map parsedJson) {
    return Student(id: parsedJson['id'], name: parsedJson['name'], score: parsedJson['score']); }}Copy the code

Sometimes, we need to pass parameters to child threads, or manage the ISOLATE like a thread pool, so we need to implement two-way communication:

import 'dart:isolate';

main(List<String> args) async {
  await start();
  await Future.delayed(Duration(seconds: 1), () {
    threadPort.send('I'm coming to the autonomous thread.');
    print('1');
  });
  await Future.delayed(Duration(seconds: 1), () {
    threadPort.send('I'll do the autonomous thread, too.');
    print('2');
  });
  await Future.delayed(Duration(seconds: 1), () {
    threadPort.send('end');
    print('3');
  });
}

Isolate isolate;
// Send port of the child thread
SendPort threadPort;
start() async {
  // Create a main thread receive port to receive messages from child threads
  ReceivePort receivePort = ReceivePort();

  // Create and manage the Isolate and pass it to the main thread sending port
  isolate = await Isolate.spawn(entryPoint, receivePort.sendPort);
  // Listen for child thread messages
  receivePort.listen((data) {
    print('The main thread received a message from the child thread$data');
    if (data isSendPort) { threadPort = data; }}); }/ / concurrent Isolate
entryPoint(dynamic message) {
  // Create a subthread receive port to receive messages from the main thread
  ReceivePort receivePort = ReceivePort();

  SendPort sendPort;
  print('==entryPoint==$message');
  if (message is SendPort) {
    sendPort = message;
    print('Child thread open');
    sendPort.send(receivePort.sendPort);
    // Listen for child thread messages
    receivePort.listen((data) {
      print(The child thread receives a message from the master thread$data');
      assert(data is String);
      if (data == 'end') { isolate? .kill(); isolate =null;
        print('Child thread terminated');
        return; }});return; }}Copy the code
SendPort Child thread starts the main thread receives a message from the child thread SendPort 1 child thread receives a message from the master thread I am from the master thread 2 child thread receives a message from the master thread I am from the master thread 3 child thread receives a message from the master thread End The child thread endsCopy the code

Two-way communication is complicated, so we need to encapsulate it and make external calls through API:

import 'dart:async';
import 'dart:isolate';

main(List<String> args) async {
  var worker = Worker();
  worker.reuqest('Send message 1').then((data) {
    print('Message after child thread processing:$data');
  });

  Future.delayed(Duration(seconds: 2), () {
    worker.reuqest('Send message 2').then((data) {
      print('Message after child thread processing:$data');
    });
  });
}

class Worker {
  SendPort _sendPort;
  Isolate _isolate;
  final _isolateReady = Completer<void> ();final Map<Capability, Completer> _completers = {};

  Worker() {
    init();
  }

  void dispose() {
    _isolate.kill();
  }

  Future reuqest(dynamic message) async {
    await _isolateReady.future;
    final completer = new Completer();
    final requestId = new Capability();
    _completers[requestId] = completer;
    _sendPort.send(new _Request(requestId, message));
    return completer.future;
  }

  Future<void> init() async {
    final receivePort = ReceivePort();
    final errorPort = ReceivePort();
    errorPort.listen(print);
    receivePort.listen(_handleMessage);
    _isolate = await Isolate.spawn(
      _isolateEntry,
      receivePort.sendPort,
      onError: errorPort.sendPort,
    );
  }

  void _handleMessage(message) {
    if (message is SendPort) {
      _sendPort = message;
      _isolateReady.complete();
      return;
    }
    if (message is _Response) {
      final completer = _completers[message.requestId];
      if (completer == null) {
        print("Invalid request ID received.");
      } else if (message.success) {
        completer.complete(message.message);
      } else {
        completer.completeError(message.message);
      }
      return;
    }
    throw UnimplementedError("Undefined behavior for message: $message");
  }

  static void _isolateEntry(dynamic message) {
    SendPort sendPort;
    final receivePort = ReceivePort();

    receivePort.listen((dynamic message) async {
      if (message is _Request) {
        print('The child thread received:${message.message}');
        sendPort.send(_Response.ok(message.requestId, 'Processed message'));
        return; }});if (message is SendPort) {
      sendPort = message;
      sendPort.send(receivePort.sendPort);
      return; }}}class _Request {
  /// The ID of the request so the response may be associated to the request's future completer.
  final Capability requestId;

  /// The actual message of the request.
  final dynamic message;

  const _Request(this.requestId, this.message);
}

class _Response {
  /// The ID of the request this response is meant to.
  final Capability requestId;

  /// Indicates if the request succeeded.
  final bool success;

  /// If [success] is true, holds the response message.
  /// Otherwise, holds the error that occured.
  final dynamic message;

  const _Response.ok(this.requestId, this.message) : success = true;

  const _Response.error(this.requestId, this.message) : success = false;
}


Copy the code