“This is the 8th day of my participation in the August More Text Challenge. For details, see: August More Text Challenge juejin.cn/post/698796…

I have used Stream data in EventChannel before. Now I want to learn about Flutter_bloc, which mainly uses Stream for asynchronous processing. So I will learn a little about what Stream is.

A source of asynchronous data events.

Stream is primarily used for asynchronous operations with Flutter, but also exists in other programming languages; Stream provides a method to receive an event queue. Listen is used to listen for data, error is used to receive the failure state, and done is used to receive the end state.

1. The Stream created

Flutter provides a variety of ways to create streams;

1.1 Stream. FromFuture (Future Future)

Stream creates a new single-subscription Stream from a Future. When the Future is complete, it emits data/Error and ends with a done event.

Future<String> getData() async { await Future.delayed(Duration(seconds: 3)); ${datetime.now ()}'; } _streamFromFuture() { Stream.fromFuture(getData()) .listen((event) => print('Stream.fromFuture -> $event')) .onDone(() => print(' stream.fromFuture -> done '); }Copy the code

1.2 Stream. FromFutures (Iterable < Future > futures)

A Stream creates a new single-subscription Stream from a series of Futures, each with its own data/Error event. When the series of Futures is complete, the Stream ends with a done event. If Futures is empty, the Stream closes immediately; Its analysis source code, it is very direct to see that each Future event will be monitored after the end of the execution of the micro event;

= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = source = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = factory Stream.fromFutures(Iterable<Future<T>> futures) { _StreamController<T> controller = new _SyncStreamController<T>(null, null, null, null); int count = 0; var onValue = (T value) { if (! controller.isClosed) { controller._add(value); if (--count == 0) controller._closeUnchecked(); }}; var onError = (error, StackTrace stack) { if (! controller.isClosed) { controller._addError(error, stack); if (--count == 0) controller._closeUnchecked(); }}; for (var future in futures) { count++; future.then(onValue, onError: onError); } // Use schedule microtask since controller is sync. if (count == 0) scheduleMicrotask(controller.close); return controller.stream; } = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = test = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = _streamFromFutures () {var data = [getData(), getData(), getData()]; Stream.fromFutures(data) .listen((event) => print('Stream.fromFutures -> $event')) .onDone(() => Print (' stream.fromfutures -> done '); }Copy the code

1.3 Stream. FromIterable (Iterable elements)

Stream retrieves and creates a single subscription Stream from the data set. It listens to each child element of the Iterator and stops iterating when the Stream listens to an unsubscription or when Iterator. MoveNext returns false/throw.

_streamFromIterable() { var data = [1, 2, '3.toString()', true, false, 6]; Stream.fromIterable(data) .listen((event) => print('Stream.fromIterable -> $event')) .onDone(() => Print (' stream.fromiterable -> done '); }Copy the code

Periodic (Duration period, [T computation(int computationCount)])

Stream creates a periodic Stream of events with a Duration object as an argument, where onData gets null if it is not set to computation; If no event ends, it will be executed periodically.

_streamFromPeriodic() { Duration interval = Duration(seconds: 1); // onData: null Stream<int> Stream = Stream<int>.periodic(interval); stream.listen((event) { print('Stream.periodic -> $event'); }).ondone (() {print(' stream.periodic -> done end '); }); Data Stream<int> streamData = Stream<int>. Periodic (interval, (data) => data); streamData.listen((event) { print('Stream.periodic -> $event'); If (event >= 10) {}}).ondone (() {print(' stream.periodic -> done '); }); }Copy the code

2. Basic operation of Stream

2.1 the Stream take (int count)

Take () for a single subscription, you can provide the Stream subscription data before the take setting, for example, set the number of interrupt stream. periodic cycles display; If take is set more times than the number of subscription times before onDone, Stream still gets all subscription data before onDone.

_streamFromPeriodic() { Duration interval = Duration(seconds: 1); Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.take(5).listen((event) { print('Stream.periodic -> $event'); }).ondone (() {print(' stream.periodic -> done end '); }); }Copy the code

_streamFromIterable() { var data = [1, 2, '3.toString()', true, false, 6]; Stream.fromIterable(data) .take(8) .listen((event) => print('Stream.fromIterable -> $event')) .onDone(() => Print (' stream.fromiterable -> done '); }Copy the code

2.2 Stream takeWhile(bool test(T element))

TakeWhile can do the same by returning a Boolean through test and breaking the subscription if it is false;

_streamFromPeriodic() { Duration interval = Duration(seconds: 1); Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 5; }).listen((event) { print('Stream.periodic -> $event'); }).ondone (() {print(' stream.periodic -> done end '); }); }Copy the code

2.3 Stream WHERE (bool test(T event))

Where is used to create a new Stream within the current Stream to discard data that does not conform to test; The side dish is simply understood as similar to a database query, only to filter the data flow that meets the requirements; And where can be set multiple times;

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 5; }).where((event) { print('Stream.periodic.where -> $event'); return event > 3; }).listen((event) { print('Stream.periodic -> $event'); }).ondone (() {print(' stream.periodic -> done end '); });Copy the code

Stream distinct(bool equals(T previous, T next))

Distinct side dishes are understood as two adjacent data filter weights;

var data = [1, 2, '3.toString()', true, true, false, true, 6]; Stream.fromIterable(data) .distinct() .listen((event) => print('Stream.fromIterable -> $event')) .onDone(() => Print (' stream.fromiterable -> done ');Copy the code

2.5 the Stream skip (int count)

Skip is used to skip the subscription data times that meet the conditions.

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 6; }).where((event) { print('Stream.periodic.where -> $event'); return event > 2; }) .skip(2).listen((event) { print('Stream.periodic -> $event'); }).ondone (() {print(' stream.periodic -> done end '); });Copy the code

2.6 Stream skipWhile(bool test(T element))

SkipWhile is used to skip subscription data that meets the test condition set when where matches. When test is true, the current subscription data listener is skipped.

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 6; }).where((event) { print('Stream.periodic.where -> $event'); return event > 2; }).skipWhile((element) { print('Stream.periodic.skipWhile -> $element'); return element <= 4; }).listen((event) { print('Stream.periodic -> $event'); }).ondone (() {print(' stream.periodic -> done end '); });Copy the code

2.7 Stream<S> map<S>(S convert(T event))

Create a new Stream based on the current Stream and perform data operations on the current Stream. OnData listens to the new Stream after the map changes.

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 6; }).where((event) { print('Stream.periodic.where -> $event'); return event > 2; }).skipWhile((element) { print('Stream.periodic.skipWhile -> $element'); return element <= 4; }).map((event) { print('Stream.periodic.map -> $event -> ${event * 100}'); return event * 100; }).listen((event) { print('Stream.periodic -> $event'); }).ondone (() {print(' stream.periodic -> done end '); });Copy the code

2 Stream<S> expand<S>(Iterable<S> convert(T element))

Creates a new Stream based on the current Stream and converts the current subscription data into a new subscription data group. OnData listens for each new subscription data element in the data group.

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 6; }).where((event) { print('Stream.periodic.where -> $event'); return event > 2; }).skipWhile((element) { print('Stream.periodic.skipWhile -> $element'); return element <= 4; }).expand((element) { print('Stream.periodic.expand -> $element -> ${element * 10} -> ${element * 100}'); return [element, element * 10, element * 100]; }).listen((event) { print('Stream.periodic -> $event'); }).ondone (() {print(' stream.periodic -> done end '); });Copy the code

2.9 the Future get length

The number of where conditions that are met after the Stream listener subscription event ends;

_streamLength(index) async { Duration interval = Duration(seconds: 1); Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData = streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 6; }).where((event) { print('Stream.periodic.where -> $event'); return event > 2; }).skipWhile((element) { print('Stream.periodic.skipWhile -> $element'); return element <= 4; }); switch (index) { case 1: var length = await streamData.length; print('Stream.length -> $length'); break; case 2: var isEmpty = await streamData.isEmpty; print('Stream.isEmpty -> $isEmpty'); break; case 3: var isBroadcast = await streamData.isBroadcast; print('Stream.isBroadcast -> $isBroadcast'); break; case 4: var first = await streamData.first; print('Stream.first -> $first'); break; case 5: var last = await streamData.last; print('Stream.last -> $last'); break; }}Copy the code

2.10 the Future get isEmpty

After the Stream listener subscription event ends, the Stream statistics whether the subscription data that meets the WHERE condition is empty;

_streamLength(2);
Copy the code

2.11 the Future get first

Get the first subscription data that a Stream matches;

_streamLength(4);
Copy the code

2.12 the Future get last

Get the last subscription data that a Stream matches

_streamLength(5);
Copy the code

2.13 the Future < List > toList ()

After the Stream listener ends, the subscription data is stored in the List. This operation is asynchronous;

_streamToList() async { var data = [1, 2, '3.toString()', true, true, false, true, 6]; Stream stream = Stream.fromIterable(data).distinct(); List list = await stream.toList(); if (list ! = null) { print('Stream.toList -> ${list}'); for (int i = 0; i < list.length; i++) { print('Stream.toList -> ${i + 1} -> ${list[i]}'); }}}Copy the code

2.14 the Future < Set > toSet ()

After the Stream listener ends, the subscription data is stored in a Set that filters duplicate data.

_streamToSet() async {
  var data = [1, 2, '3.toString()', true, true, false, true, 6];
  Stream stream = Stream.fromIterable(data);
  Set set = await stream.toSet();
  if (set != null) {
    print('Stream.toSet -> ${set}');
  }
}
Copy the code

2.15 Future forEach(void Action (T element))

Listening for subscribed data in Stream is a form of listening.

_streamForEach() {
  var data = [1, 2, '3.toString()', true, true, false, true, 6];
  Stream stream = Stream.fromIterable(data).distinct();
  stream.forEach((element) => print('Stream.forEach -> $element'));
}
Copy the code


The experiment of Stream has just begun, and there are many methods that have not been tried yet. The understanding of Stream is still very simple. If there is any mistake, please give me more guidance!

Source: Young Monk Atze