What is the Stream
A Stream is a sequence of asynchronous events. It is similar to an asynchronous Iterable, except that it will give you the next event as soon as you fetch it from Iterable, whereas a Stream will not give you the next event as soon as it is ready.
A single | Zero or more | |
---|---|---|
Sync | int |
Interable<int> |
Async | Future<int> |
Stream<int> |
The Stream object
- StreamController: Used to control the Stream process and provides interfaces for creating various streams of events.
- StreamSink: the entry point of an event,
add
.addStream
And so on. - Stream: the source of the event itself, usually used to listen for events or to transform events, as in
listen
,where
. - StreamSubscription: An event subscription object that is ostensibly used to manage subscribed operations such as
cancel
,pause
At the same time, it is also the key to transfer events internally.
Relationship between
There is an event source called Stream. To facilitate Stream control, the StreamController is officially provided as a management. It also provides a StreamSink object as an event entry; Listen and transform are provided for the Stream property, and StreamSubscription manages events.
Stream working principle
Stream
inlisten
When passed inonData
Callback, this callback is passed toStreamSubscription
Medium, then passzone.registerUnaryCallback
To register for_onData
objectStreamSink
When an event is added, theStreamSubscription
In the_sendData
Method, and then through_zone.runUnaryGuarded(_onData, data)
Obtained in 1_onData
Object, triggerlisten
Is passed to the callback method
StreamSubscription<T> listen(void onData(T data)? , {Function? onError, void onDone()? , bool? cancelOnError}) { cancelOnError ?? = false; StreamSubscription<T> subscription = _createSubscription(onData, onError, onDone, cancelOnError); _onListen(subscription); return subscription; } // To save space, some code has been omitted // here, OnData = _registerDataHandler<T>(_zone, onData) Static void Function(T) _registerDataHandler<T>(Zone Zone, void Function(T)? handleData) { return zone.registerUnaryCallback<void, T>(handleData ?? _nullDataHandler); } Duplicate codeCopy the code
//2. Sink added the StreamSubscription._sendData event and called _zone.runUnaryGuarded(_onData, data). /* _EventDispatch interface. */ void _sendData(T data) { assert(! _isCanceled); assert(! _isPaused); assert(! _inCallback); bool wasInputPaused = _isInputPaused; _state |= _STATE_IN_CALLBACK; _zone.runUnaryGuarded(_onData, data); _state &= ~_STATE_IN_CALLBACK; _checkState(wasInputPaused); }Copy the code
About the zone and zone. RegisterUnaryCallback zone. RunUnaryGuarded, concrete is described here
It is worth noting that the Stream is scheduled in microTask
Stream supports synchronous, asynchronous (asynchronous by default), broadcast, and unbroadcast
Stream supports the following methods:
map()
: Converts each element of this stream to a new stream event.where()
: Creates a new stream from the current stream and discards elements based on conditionsdistinct()
: If the data event is equal to the previous data event, the data event is skipped.- .
The use of the Stream
Created through the Stream constructor
- Stream.fromFutureThrough:
Future
Create a single subscriptionstream
- Stream.fromIterableThrough:
可迭代
Create a single subscription of the datastream
- Stream.fromFutures: Through a group
Future
To create a single single subscription streamstream
- Stream.periodic: creates a repeat emitted event by time period
stream
Created by StreamController
import 'dart:async'; Void main() {//1. Create a StreamController object of any type StreamController(onListen: () => print('listen'), onCancel: () => print('cancel'), onPause: () => print('pause'), onResume: () => print('resumr')); / / 2. Through the slit add any type the event data sink streamController. Sink. Add (100); StreamController. Sink. The add (100.121212); streamController.sink.add('THIS IS STRING'); streamController.sink.close(); OnDone will be called back only if the close method is manually called to send a done event. Register to monitor streamController. Stream. Listen ((event) = > print (event), onDone: () = > print (' is done '), onError: (error, stacktrace) => print('is error, errMsg: $error'), cancelOnError: true); }Copy the code
Created via async*
Note that async* is the Dart keyword, indicating that the function returns a stream, and that yield returns a single Iterable, followed by stream
asyncGet(31).listen((event) {
print(event);
});
Stream<String> asyncGet(int count) async* {
yield* asyncGetString(count).map((event) => event + 'C');
}
Stream<String> asyncGetString(int count) async* {
for (int i = 0; i < count; i++) {
yield await delayedGet(i);
}
}
Future<String> delayedGet(int i) async {
await Future.delayed(Duration(seconds: 1));
return i.toString() + 'B';
}
Copy the code
The resources
Juejin. Cn/post / 684490…
Juejin. Cn/post / 694210…