Reactive programming Stream

Each future represents a single value that asynchronously passes data or errors. A Stream works like a Future, except that it represents a single event and can pass zero or more data, or errors, over time.

A single multiple
Sync int Iterator
Async Future Stream

With Stream, there are callback functions when data arrives, when an error occurs, and when the sending is complete.

Using StreamController

If the Stream’s events come from more than a Stream and Future that the asynchronous function can iterate over (the following loop receives Stream events), then we typically use the StreamController to create and populate the Stream.

StreamController can generate a Stream for you and provide methods to add events to that Stream anytime, anywhere. The Stream has all the logic needed to handle listeners and pauses, and the StreamController handles the sending and receiving of data by itself, typically simply returning the Stream that the caller needs.

  final _controller = StreamController();

  Stream<int> get stream => _controller.stream.asBroadcastStream();

  StreamSink<int> get sink => _controller.sink;

	stream.listen(print); // Prints out integers.

Copy the code

You can use stream to listen for events and sink to load events.

The loop receives the Stream event

A Stream can be created in many ways, and just as we iterate over an Iterable using a for loop, we can also iterate over events in the Stream using an asynchronous for loop (often referred to as await for). Such as:

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  }
Copy the code

The code above simply takes each event in the stream of integer events, adds them up, and returns (wrapped by the Future) the added integer value. When the loop body ends, the function pauses until the next event arrives or the Stream completes.

Error events

The Stream becomes complete when there are no more events to handle, and at the same time, the caller can receive the Stream’s completed event callback as if it were a new event callback. When an event is read with await for loop, the loop stops when the Stream completes.

Sometimes an error occurs before the Stream completes; Such as a failed network request to fetch a file from a remote server, or a bug in the creation of an event, although errors are always possible, consumers should be notified when they occur.

A Stream can provide error events in the same way it provides data events. Most streams stop after the first error, but they can also provide multiple errors and continue to provide data events after an error occurs.

When reading a Stream with await for, if an error occurs, it is thrown by the loop statement and the loop ends. You can catch errors using try-catch statements. The following example throws an error when the loop iterates until the value of the argument equals 4:

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (var value instream) { sum += value; }}catch (e) {
    return - 1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw new Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}
Copy the code

Use Stream to communicate across components

There are many implementations of flutter to transfer values between components, but using the Stream provided by DART is the easiest. We can implement a single data center in simple interest mode, with different streams implementing the corresponding stream of events.

import 'dart:async';

class DataModel {
  static DataModel _instance;

  static DataModel get instance => _getInstance();

  factory DataModel() => _getInstance();

  DataModel._internal();

  static DataModel _getInstance() {
    if (_instance == null) {
      _instance = DataModel._internal();
    }
    return _instance;
  }

  // ignore: close_sinks
  final _controller = StreamController();

  Stream<int> get stream => _controller.stream.asBroadcastStream();

  StreamSink<int> get sink => _controller.sink;

}
Copy the code

Above is an event center that receives and displays data in Page1 and sends a random number in Page2.

// page1
  _addDataListener() {
    DataModel().stream.listen((event) {
      print("==_Page1State._addDataListener: event==$event");
      if (mounted) {
        setState(() {
          _num= event; }); }}); }Copy the code
// page2
_sendData() {
    var nextInt = Random().nextInt(100);
    DataModel().sink.add(nextInt);
  }
Copy the code

Enter Page1, then exit, enter Page2, send a random number, and the console prints the output of Page1 and warns of the risk of a memory leak.

E/flutter ( 7437): This error might indicate a memory leak if setState() is being called because another object is retaining a reference to this State object after it has been removed from the tree. To avoid memory leaks, consider breaking the reference to this object during dispose().
Copy the code

Although data can be passed across components, there is no unsubscription. We can manually take the StreamSubscription returned by dispose and unsubscribe from it. But each time this is cumbersome and there are multiple streams, multiple StreamSubscription is generated.

Use ValueNotifier to communicate across components

If ValueNotifier’s value is not equal to the old value, its listener is notified. Next we declare a ValueNotifier in DataModel, also listen in page1, and pass values in Page2.


  ValueNotifier<int> _valueNotifier = ValueNotifier(0);

  ValueNotifier<int> get valueNotifier => _valueNotifier;
Copy the code
  _addValueListener() {
    DataModel().valueNotifier.addListener(() {
      var value = DataModel().valueNotifier.value;
      print("==_Page1State._addDataListener: event==$value");
      if(mounted) { setState(() { _value = value; }); }}); }... Scaffold( appBar: AppBar( title: Text('PAge1'),
        ),
        body: Center(
          child: Column(
            children: <Widget>[
              Text(The stream received is:$_num'),
              Text(The data received from ValueNotifier is:$_value')],)));// page1
Copy the code
  _sendValueData() {
    var nextInt = Random().nextInt(100);
    DataModel().valueNotifier.value = nextInt;
  } // page2
Copy the code

Also go to Page1 and then go to Page2, click the button to send the data, and the console prints:

I/flutter ( 7437): ==_Page1State._addDataListener: event==2

Automatic unsubscribe

The examples above all need to implement unsubscribe when exiting the page themselves. True trouble, now we use the proxy mode, to achieve automatic unsubscribe.

class Disposer {
  final List<StreamSubscription> _subscriptions = [];

  final List<Listenable> _listenables = [];
  final List<VoidCallback> _listeners = [];

  /// Track a stream subscription to be automatically cancelled on dispose.
  void autoDispose(StreamSubscription subscription) {
    if (subscription == null) return;
    _subscriptions.add(subscription);
  }

  /// Add a listener to a Listenable object that is automatically removed when
  /// cancel is called.
  void addAutoDisposeListener(Listenable listenable, [VoidCallback listener]) {
    if (listenable == null || listener == null) return;
    _listenables.add(listenable);
    _listeners.add(listener);
    listenable.addListener(listener);
  }

  /// Cancel all listeners added & stream subscriptions.
  ///
  /// It is fine to call this method and then add additional listeners.
  void cancel() {
    for (StreamSubscription subscription in _subscriptions) {
      subscription.cancel();
    }
    _subscriptions.clear();

    assert(_listenables.length == _listeners.length);
    for (int i = 0; i < _listenables.length; ++i) { _listenables[i].removeListener(_listeners[i]); } _listenables.clear(); _listeners.clear(); }}Copy the code

Maintain a StreamSubscription list from Disposer, add subscription events via autoDispose, or add Subscription to ValueNotifier via addAutoDisposeListener.

mixin AutoDisposeMixin<T extends StatefulWidget> on State<T>
implements Disposer {
  final Disposer _delegate = Disposer();

  @override
  void dispose() {
    cancel();
    super.dispose();
  }

  void _refresh() => setState(() {});

  @override
  void addAutoDisposeListener(Listenable listenable, [VoidCallback listener]) {
    _delegate.addAutoDisposeListener(listenable, listener ?? _refresh);
  }

  @override
  void autoDispose(StreamSubscription subscription) {
    _delegate.autoDispose(subscription);
  }

  @override
  voidcancel() { _delegate.cancel(); }}Copy the code

This one blends into the Disposer agent action class to manage subscriptions. Here’s how to use:

class _AutoPageState extends State<AutoPage> with AutoDisposeMixin {
  int _num = 0;
  int _value = 0;

  @override
  void initState() {
    super.initState();
    _addDataListener();
    _addValueListener();
  }

  _addDataListener() {
    autoDispose(DataModel().stream.listen((event) {
      print("==AutoPage._addDataListener: event==$event");
      if (mounted) {
        setState(() {
          _num= event; }); }})); } _addValueListener() { addAutoDisposeListener(DataModel().valueNotifier, () {var value = DataModel().valueNotifier.value;
      print("==AutoPage._addValueListener: event==$value");
      if(mounted) { setState(() { _value = value; }); }}); }@override
  void dispose() {
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text('AutoPage'),
      ),
      body: Center(
        child: Column(
          children: <Widget>[
            Text(The stream received is:$_num'),
            Text(The data received from ValueNotifier is:$_value'() [(), (), (); }}Copy the code

The same logic as page1, but at the time of subscription, there is an autoDispose and addAutoDisposeListener, enter autopage and then go to Page2 to send data, and there is no print, indicating that the cancellation is successful.

Of course, not all communication is global. Some are fixed components, scoped only to the current component, some are in the VM layer, not in the component layer. In this case, we need to expose the Dispose method like presenter in MVP or VM in MVVM architecture. Called at the View layer. We also implemented a mixin:

mixin AutoDisposeControllerMixin on DisposableController implements Disposer {
  final Disposer _delegate = Disposer();

  @override
  void dispose() {
    cancel();
    super.dispose();
  }

  @override
  void addAutoDisposeListener(Listenable listenable, [VoidCallback listener]) {
    _delegate.addAutoDisposeListener(listenable, listener);
  }

  @override
  void autoDispose(StreamSubscription subscription) {
    _delegate.autoDispose(subscription);
  }

  @override
  voidcancel() { _delegate.cancel(); }}Copy the code

When using inheritance DisposableController and mixed with AutoDisposeControllerMixin can, where the need to cancel the subscription, invoke the dispose can achieve unsubscribe.

code

Reference: (Asynchronous programming: use stream)[dart.dev/tutorials/l…]