Stream is one of dart’s core libraries. Future is used to represent the result of a single operation, while Stream represents a sequence of multiple results.

To implement regular polling via Stream:

typedef Future<T> FutureGenerator<T>();

class StreamTool {

  /// Interval Polling interval
  /// MaxCount Maximum number of polls
  Stream<T> timedPolling<T>(Duration interval, FutureGenerator<T> future,
      [int maxCount]) {
    StreamController<T> controller;
    int counter = 0;
    bool polling = true;

    void stopTimer() {
      polling = false;
    }

    void tick() async {
      counter++;
      T result = await future();
      if(! controller.isClosed) { controller.add(result); }if (counter == maxCount) {
        stopTimer();
        controller.close();
      } else if(polling) { Future.delayed(interval, tick); }}void startTimer() {
      polling = true;
      tick();
    }
    
    StreamSubscription calls pause, cancel, and stream polling can also be paused or cancelled
    controller = StreamController<T>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer,
    );

    returncontroller.stream; }}Copy the code

The following is an example of the test method:

// Simulate the future task with test
Future<String> testFuture() async {
    // The simulation takes time
    await Future.delayed(Duration(seconds: 1));
    String randomStr = Random().nextInt(10).toString();
    return Future.value(randomStr);
  }

void test() {
    var pollingStream = timedPolling(const Duration(seconds: 1), testFuture, 15);
    StreamSubscription<String> subscription;
    int counter = 0;
    // Perform stream content listening
    subscription = pollingStream.listen((result) {
      counter++;
      print("stream result is $result");
      if (counter == 5) {
        // On the fifth print, notify subscription to pause, which resumes after 5 seconds, during which polling within the stream is also pausedsubscription? .pause(Future.delayed(const Duration(seconds: 5))); }}); }Copy the code

There are two types of Stream:

  • Single-subscription Stream allows only one listen to be set and the original event is lost. The above code implementation is of this type, with the benefit that we can call the associated method during the lifecycle:
controller = StreamController<T>(
      onListen: startTimer, // Enable the polling task when the listener is established
      onPause: stopTimer, // Close polling when paused
      onResume: startTimer, // Pause the recovery and restart the polling
      onCancel: stopTimer, // Stop polling when cancelled
    )
Copy the code
  • The second type: Broadcast Stream, can be set up at the same time multiple different listeners listen. This type of Stream is commonly used to implement broadcast notifications. If you are interested, read the previous article: Flutter uses a Stream for message notifications

The key codes are as follows:

StreamController<T>.broadcast().stream
Copy the code