In this section, the target
- The Stream to create
- StreamController control
- StreamSubscription subscription
- StreamTransformer conversion
- Sink, StreamSink, and EventSink modify data
video
www.bilibili.com/video/BV1YK…
code
Github.com/ducafecat/f…
The body of the
Core classes
The name of the | instructions |
---|---|
Stream | Stream of events or pipes |
StreamController | Incident manager |
StreamSubscription | Manage event subscriptions, such as cacenl and Pause |
StreamSink | Stream Sink entry, such as Add, addStream, etc |
EventSink | Event Sink entrance |
StreamTransformer | Flow switch |
Ready to function
// Prints a list of streams
printStream(Stream<Object> stream) async {
await for (var val in stream) {
print(val); }}// async function I
Future<int> funi = Future(() {
return 100;
});
// asynchronous function ii
Future<int> funii = Future(() {
return 200;
});
Copy the code
The stream to create
Delay interval
periodic() async {
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
await printStream(stream);
}
Copy the code
Futrue data source
fromFuture() async {
Stream<int> stream = Stream<int>.fromFuture(funi);
await printStream(stream);
}
Copy the code
Futrues Multiple data sources
fromFutures() async {
Stream<int> stream = Stream<int>.fromFutures([
funi,
funii,
]);
await printStream(stream);
}
Copy the code
The stream to monitor
A one-to-one
listen() async {
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
stream.listen(
(event) {
print(event);
},
onError: (err) {
print(err);
},
onDone: () {},
cancelOnError: true,); }Copy the code
radio
boardcast() async {
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val)
.asBroadcastStream();
stream.listen((event) {
print(event);
});
stream.listen((event) {
print(event);
});
}
Copy the code
Operation task skip
opt() async {
Stream<int> stream = Stream<int>.fromIterable([1.2.3.4.5]);
stream = stream.take(3);
// stream = stream.skip(2);
stream.listen((event) {
print(event);
});
}
Copy the code
StreamController Flow control class
A single point
scListen() async {
StreamController sc = StreamController(
onListen: () => print("onListen"),
onPause: () => print("onPause"),
onResume: () => print("onResume"),
onCancel: () => print("onCancel"),
sync: false);
// Subscribe object
StreamSubscription ss = sc.stream.listen(((event) {
print(event);
}));
sc.add(100);
/ / pause
ss.pause();
/ / recovery
ss.resume();
/ / cancel
ss.cancel();
/ / close the flow
sc.close();
}
Copy the code
radio
scBroadcast() async {
StreamController sc = StreamController.broadcast();
StreamSubscription ss1 = sc.stream.listen(print);
StreamSubscription ss2 = sc.stream.listen(print);
sc.addStream(Stream.fromIterable([1.2.3.4.5]));
await Future.delayed(Duration(seconds: 2));
sc.close();
}
Copy the code
StreamTransformer flow conversion
scTransformer() async {
StreamController sc = StreamController<int>.broadcast();
StreamTransformer stf = StreamTransformer<int.double>.fromHandlers(
handleData: (int data, EventSink sink) {
sink.add((data * 2).toDouble());
},
handleError: (error, stacktrace, sink) {
sink.addError('wrong: $error'); }, handleDone: (sink) { sink.close(); }); Stream stream = sc.stream.transform(stf); stream.listen(print);
stream.listen(print);
sc.addStream(Stream<int>.fromIterable([1.2.3.4.5]));
await Future.delayed(Duration(seconds: 2));
sc.close();
}
Copy the code
perform
main(List<String> args) async {
print('--- start ---');
// await periodic();
// await fromFuture();
// await fromFutures();
// await listen();
// await boardcast();
// await opt();
// await scListen();
// await scBroadcast();
// await scTransformer();
print('--- end ---');
}
Copy the code
The elder brother of the © cat
ducafecat.tech
ducafecat.gitee.io