What is the Stream

A Stream is a sequence of asynchronous events. It is similar to an asynchronous Iterable, except that it will give you the next event as soon as you fetch it from Iterable, whereas a Stream will not give you the next event as soon as it is ready.

A single Zero or more
Sync int Interable<int>
Async Future<int> Stream<int>

The Stream object

  • StreamController: Used to control the Stream process and provides interfaces for creating various streams of events.
  • StreamSink: the entry point of an event,add.addStreamAnd so on.
  • Stream: the source of the event itself, usually used to listen for events or to transform events, as inlisten,where.
  • StreamSubscription: An event subscription object that is ostensibly used to manage subscribed operations such ascancel,pauseAt the same time, it is also the key to transfer events internally.

Relationship between

There is an event source called Stream. To facilitate Stream control, the StreamController is officially provided as a management. It also provides a StreamSink object as an event entry; Listen and transform are provided for the Stream property, and StreamSubscription manages events.

Stream working principle

  1. StreaminlistenWhen passed inonDataCallback, this callback is passed toStreamSubscriptionMedium, then passzone.registerUnaryCallbackTo register for_onDataobject
  2. StreamSinkWhen an event is added, theStreamSubscriptionIn the_sendDataMethod, and then through_zone.runUnaryGuarded(_onData, data)Obtained in 1_onDataObject, triggerlistenIs passed to the callback method
StreamSubscription<T> listen(void onData(T data)? , {Function? onError, void onDone()? , bool? cancelOnError}) { cancelOnError ?? = false; StreamSubscription<T> subscription = _createSubscription(onData, onError, onDone, cancelOnError); _onListen(subscription); return subscription; } // To save space, some code has been omitted // here, OnData = _registerDataHandler<T>(_zone, onData) Static void Function(T) _registerDataHandler<T>(Zone Zone, void Function(T)? handleData) { return zone.registerUnaryCallback<void, T>(handleData ?? _nullDataHandler); } Duplicate codeCopy the code
//2. Sink added the StreamSubscription._sendData event and called _zone.runUnaryGuarded(_onData, data).  /* _EventDispatch interface. */ void _sendData(T data) { assert(! _isCanceled); assert(! _isPaused); assert(! _inCallback); bool wasInputPaused = _isInputPaused; _state |= _STATE_IN_CALLBACK; _zone.runUnaryGuarded(_onData, data); _state &= ~_STATE_IN_CALLBACK; _checkState(wasInputPaused); }Copy the code

About the zone and zone. RegisterUnaryCallback zone. RunUnaryGuarded, concrete is described here

It is worth noting that the Stream is scheduled in microTask

Stream supports synchronous, asynchronous (asynchronous by default), broadcast, and unbroadcast

Stream supports the following methods:

  • map(): Converts each element of this stream to a new stream event.
  • where(): Creates a new stream from the current stream and discards elements based on conditions
  • distinct(): If the data event is equal to the previous data event, the data event is skipped.
  • .

The use of the Stream

Created through the Stream constructor

  • Stream.fromFutureThrough:FutureCreate a single subscriptionstream
  • Stream.fromIterableThrough:可迭代Create a single subscription of the datastream
  • Stream.fromFutures: Through a groupFutureTo create a single single subscription streamstream
  • Stream.periodic: creates a repeat emitted event by time periodstream

Created by StreamController

import 'dart:async'; Void main() {//1. Create a StreamController object of any type StreamController(onListen: () => print('listen'), onCancel: () => print('cancel'), onPause: () => print('pause'), onResume: () => print('resumr')); / / 2. Through the slit add any type the event data sink streamController. Sink. Add (100); StreamController. Sink. The add (100.121212); streamController.sink.add('THIS IS STRING'); streamController.sink.close(); OnDone will be called back only if the close method is manually called to send a done event. Register to monitor streamController. Stream. Listen ((event) = > print (event), onDone: () = > print (' is done '), onError: (error, stacktrace) => print('is error, errMsg: $error'), cancelOnError: true); }Copy the code

Created via async*

Note that async* is the Dart keyword, indicating that the function returns a stream, and that yield returns a single Iterable, followed by stream


asyncGet(31).listen((event) {
  print(event);
});

Stream<String> asyncGet(int count) async* {
  yield* asyncGetString(count).map((event) => event + 'C');
}

Stream<String> asyncGetString(int count) async* {
  for (int i = 0; i < count; i++) {
    yield await delayedGet(i);
  }
}

Future<String> delayedGet(int i) async {
  await Future.delayed(Duration(seconds: 1));
  return i.toString() + 'B';
}
Copy the code

The resources

Juejin. Cn/post / 684490…

Juejin. Cn/post / 694210…