In this section, the target

  • The Stream to create
  • StreamController control
  • StreamSubscription subscription
  • StreamTransformer conversion
  • Sink, StreamSink, and EventSink modify data

video

www.bilibili.com/video/BV1YK…

code

Github.com/ducafecat/f…

The body of the

Core classes

The name of the instructions
Stream Stream of events or pipes
StreamController Incident manager
StreamSubscription Manage event subscriptions, such as cacenl and Pause
StreamSink Stream Sink entry, such as Add, addStream, etc
EventSink Event Sink entrance
StreamTransformer Flow switch

Ready to function

// Prints a list of streams
printStream(Stream<Object> stream) async {
  await for (var val in stream) {
    print(val); }}// async function I
Future<int> funi = Future(() {
  return 100;
});

// asynchronous function ii
Future<int> funii = Future(() {
  return 200;
});
Copy the code

The stream to create

Delay interval

periodic() async {
  Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
  await printStream(stream);
}
Copy the code

Futrue data source

fromFuture() async {
  Stream<int> stream = Stream<int>.fromFuture(funi);
  await printStream(stream);
}
Copy the code

Futrues Multiple data sources

fromFutures() async {
  Stream<int> stream = Stream<int>.fromFutures([
    funi,
    funii,
  ]);
  await printStream(stream);
}
Copy the code

The stream to monitor

A one-to-one

listen() async {
  Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
  stream.listen(
    (event) {
      print(event);
    },
    onError: (err) {
      print(err);
    },
    onDone: () {},
    cancelOnError: true,); }Copy the code

radio

boardcast() async {
  Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val)
      .asBroadcastStream();
  stream.listen((event) {
    print(event);
  });
  stream.listen((event) {
    print(event);
  });
}
Copy the code

Operation task skip

opt() async {
  Stream<int> stream = Stream<int>.fromIterable([1.2.3.4.5]);
  stream = stream.take(3);
  // stream = stream.skip(2);

  stream.listen((event) {
    print(event);
  });
}
Copy the code

StreamController Flow control class

A single point

scListen() async {
  StreamController sc = StreamController(
      onListen: () => print("onListen"),
      onPause: () => print("onPause"),
      onResume: () => print("onResume"),
      onCancel: () => print("onCancel"),
      sync: false);

  // Subscribe object
  StreamSubscription ss = sc.stream.listen(((event) {
    print(event);
  }));

  sc.add(100);

  / / pause
  ss.pause();
  / / recovery
  ss.resume();
  / / cancel
  ss.cancel();

  / / close the flow
  sc.close();
}
Copy the code

radio

scBroadcast() async {
  StreamController sc = StreamController.broadcast();

  StreamSubscription ss1 = sc.stream.listen(print);
  StreamSubscription ss2 = sc.stream.listen(print);

  sc.addStream(Stream.fromIterable([1.2.3.4.5]));

  await Future.delayed(Duration(seconds: 2));
  sc.close();
}

Copy the code

StreamTransformer flow conversion

scTransformer() async {
  StreamController sc = StreamController<int>.broadcast();

  StreamTransformer stf = StreamTransformer<int.double>.fromHandlers(
    handleData: (int data, EventSink sink) {
      sink.add((data * 2).toDouble());
    },
    handleError: (error, stacktrace, sink) {
      sink.addError('wrong: $error'); }, handleDone: (sink) { sink.close(); }); Stream stream = sc.stream.transform(stf); stream.listen(print);
  stream.listen(print);

  sc.addStream(Stream<int>.fromIterable([1.2.3.4.5]));

  await Future.delayed(Duration(seconds: 2));
  sc.close();
}
Copy the code

perform


main(List<String> args) async {
  print('--- start ---');

  // await periodic();
  // await fromFuture();
  // await fromFutures();

  // await listen();
  // await boardcast();
  // await opt();

  // await scListen();
  // await scBroadcast();
  // await scTransformer();

  print('--- end ---');
}

Copy the code

The elder brother of the © cat

ducafecat.tech

ducafecat.gitee.io