preface

There are two ways that Flutter can handle asynchronous operations: a Future for a single asynchronous operation and a Stream for a continuous asynchronous operation. For example, if you pour water into a cup, fill a cup with water into a Future. If you continuously fill multiple cups with water, you will see a Stream.


The Stream,

Stream is an abstract class that represents the source of a sequence of asynchronous data. It is a way of generating sequential events, either data events or error events, as well as completion events at the end of the stream.

abstract class Stream<T{

  Stream();

}

Copy the code

Stream consists of single subscription streams and broadcast streams.

A single-subscription stream allows only one listener to be set up until it has finished sending events, and events will not start to be generated until the listener is set on the stream, and events will stop being sent when the listener is cancelled. Even if the first listener is cancelled, other listeners are not allowed to be set on a single subscription stream. Broadcast streams allow you to set up multiple listeners or add new listeners after canceling the previous one.

A Stream can be synchronous or asynchronous.

The difference is that synchronous flows send events to StreamSubscription, a stream listener, immediately upon execution of add, addError, or close methods, whereas asynchronous flows always send events after code in the event queue completes execution.

Streamfamily

StreamController

Flow with control flow methods. You can send data, errors, and completion events to its stream, and you can check to see if the data stream has been paused and if there are listeners. The sync parameter determines whether the flow is synchronous or asynchronous.

abstract class StreamController<Timplements StreamSink<T{

  Stream<T> get stream;

  / / /...

}



StreamController _streamController = StreamController(

  onCancel: () {},

  onListen: () {},

  onPause: () {},

  onResume: () {},

  syncfalse.

);

Copy the code

StreamSink

Stream event entry. Provide add, addError, and addStream methods to send events to streams.

abstract class StreamSink<Simplements EventSink<S>, StreamConsumer<S{

  Future close();

  / / /...

  Future get done;

}

Copy the code

StreamSubscription

Listeners for streams. Provides cacenL, pause, resume and other methods for management.

abstract class StreamSubscription<T{

  / / /...

}



StreamSubscription subscription = StreamController().stream.listen(print);

subscription.onDone(() => print('done'));

Copy the code

StreamBuilder

Widgets that use streaming data to render UI interfaces.

StreamBuilder(

  / / data flow

  stream: stream,

  // Initial data

  initialData: 'loading... '.

  builder: (context, AsyncSnapshot snapshot) {

    // The AsyncSnapshot object is a data snapshot that caches the current data and status

    // snapshot.connectionState

    // snapshot.data

    if (snapshot.hasData) {

      Map data = snapshot.data;

      return Text(data),

    }

    return CircularProgressIndicator();

  },

)

Copy the code

Create a Stream

There are several ways to create streams in Dart

  1. Generate a new stream from an existing oneStream, the use ofmap.where.takeWhileMethods.
/ / integer flow

Stream<int> intStream = StreamController<int>().stream;

/ / the even flow

Stream<int> evenStream = intStream.where((int n) => n.isEven);

/ / two times the flow

Stream<int> doubleStream = intStream.map((int n) => n * 2);

// Streams with numbers greater than 10

Stream<int> biggerStream = intStream.takeWhile((int n) => n > 10);

Copy the code
  1. useasync*Function.
Stream<int> countStream(int to) async* {

  for (int i = 1; i <= to; i++) {

    yield i;

  }

}



Stream stream = countStream(10);

stream.listen(print);

Copy the code
  1. useStreamController.
StreamController<Map> _streamController = StreamController(

  onCancel: () {},

  onListen: () {},

  onPause: () {},

  onResume: () {},

  syncfalse.

);



Stream _stream = _streamController.stream;

Copy the code
  1. useFutureObject to generate
Future<int> _delay(int seconds) async {

  await Future.delayed(Duration(seconds: seconds));

  return seconds;

}



List<Future> futures = [];

for (int i = 0; i < 10; i++) {

  futures.add(_delay(3));

}



Stream _futuresStream = Stream.fromFutures(futures);

Copy the code

Application of the Stream

Stream Counter

Change the default project of Flutter to a Stream implementation


import 'dart:async';

import 'package:flutter/material.dart';



class StreamCounter extends StatefulWidget {

  @override

  _StreamCounterState createState() => _StreamCounterState();

}



class _StreamCounterState extends State<StreamCounter{

  // Create a StreamController

  StreamController<int> _counterStreamController = StreamController<int> (

    onCancel: () {

      print('cancel');

    },

    onListen: () {

      print('listen');

    },

  );



  int _counter = 0;

  Stream _counterStream;

  StreamSink _counterSink;



  // Use StreamSink to send events to the Stream, and close the Stream when _counter is greater than 9.

  void _incrementCounter() {

    if (_counter > 9) {

      _counterSink.close();

      return;

    }

    _counter++;

    _counterSink.add(_counter);

  }



  // Actively close the stream

  void _closeStream() {

    _counterStreamController.close();

  }



  @override

  void initState() {

    super.initState();

    _counterSink = _counterStreamController.sink;

    _counterStream = _counterStreamController.stream;

  }



  @override

  void dispose() {

    super.dispose();

    _counterSink.close();

    _counterStreamController.close();

  }



  @override

  Widget build(BuildContext context) {

    return Scaffold(

      appBar: AppBar(

        title: Text('Stream Counter'),

      ),

      body: Center(

        child: Column(

          mainAxisAlignment: MainAxisAlignment.center,

          children: <Widget>[

            Text('You have pushed the button this many times:'),

            // Use StreamBuilder to display and update UI

            StreamBuilder<int> (

              stream: _counterStream,

              initialData: _counter,

              builder: (context, snapshot) {

                if (snapshot.connectionState == ConnectionState.done) {

                  return Text(

                    'Done'.

                    style: Theme.of(context).textTheme.bodyText2,

                  );

                }



                int number = snapshot.data;

                return Text(

                  '$number'.

                  style: Theme.of(context).textTheme.bodyText2,

                );

              },

            ),

].

        ),

      ),

      floatingActionButton: Row(

        mainAxisAlignment: MainAxisAlignment.center,

        children: <Widget>[

          FloatingActionButton(

            onPressed: _incrementCounter,

            tooltip: 'Increment'.

            child: Icon(Icons.add),

          ),

          SizedBox(width: 24.0),

          FloatingActionButton(

            onPressed: _closeStream,

            tooltip: 'Close'.

            child: Icon(Icons.close),

          ),

].

      ),

    );

  }

}



Copy the code

NetWork Status

To monitor the network connection status of mobile phones, first add connectivity plug-in

dependencies:

connectivity: ^ 0.4.8 + 2

Copy the code

import 'dart:async';

import 'package:connectivity/connectivity.dart';

import 'package:flutter/material.dart';



class NetWorkStatus extends StatefulWidget {

  @override

  _NetWorkStatusState createState() => _NetWorkStatusState();

}



class _NetWorkStatusState extends State<NetWorkStatus{

  StreamController<ConnectivityResult> _streamController = StreamController();

  StreamSink _streamSink;

  Stream _stream;

  String _result;



  void _checkStatus() async {

    final ConnectivityResult result = await Connectivity().checkConnectivity();



    if (result == ConnectivityResult.mobile) {

      _result = 'mobile';

    } else if (result == ConnectivityResult.wifi) {

      _result = 'wifi';

    } else if (result == ConnectivityResult.none) {

      _result = 'none';

    }



    setState(() {});

  }



  @override

  void initState() {

    super.initState();

    _stream = _streamController.stream;

    _streamSink = _streamController.sink;

    _checkStatus();

    Connectivity().onConnectivityChanged.listen(

      (ConnectivityResult result) {

        _streamSink.add(result);

      },

    );

  }



  @override

  dispose() {

    super.dispose();

    _streamSink.close();

    _streamController.close();

  }



  @override

  Widget build(BuildContext context) {

    return Scaffold(

      appBar: AppBar(

        title: Text('Network Status'),

      ),

      body: Center(

        child: StreamBuilder<ConnectivityResult>(

          stream: _stream,

          builder: (context, AsyncSnapshot snapshot) {

            if (snapshot.hasData) {

              if (snapshot.data == ConnectivityResult.mobile) {

                _result = 'mobile';

              } else if (snapshot.data == ConnectivityResult.wifi) {

                _result = 'wifi';

              } else if (snapshot.data == ConnectivityResult.none) {

                return Text('Not connected yet');

              }

            }



            if (_result == null) {

              return CircularProgressIndicator();

            }



            return ResultText(_result);

          },

        ),

      ),

    );

  }

}



class ResultText extends StatelessWidget {

  final String result;



  const ResultText(this.result);



  @override

  Widget build(BuildContext context) {

    return RichText(

      text: TextSpan(

        style: TextStyle(color: Colors.black),

        text: 'In use'.

        children: [

          TextSpan(

            text: $result '.

            style: TextStyle(

              color: Colors.red,

              fontSize: 20.0.

              fontWeight: FontWeight.bold,

            ),

          ),

          TextSpan(text: 'Linked network'),

].

      ),

    );

  }

}



Copy the code

Random Article

Request network data creation flow

dependencies:

dio: ^ 3.0.9

flutter_html: ^ 0.11.1

Copy the code

import 'dart:async';



import 'package:dio/dio.dart';

import 'package:flutter/material.dart';

import 'package:flutter_html/flutter_html.dart';



class RandomArticle extends StatefulWidget {

  @override

  _RandomArticleState createState() => _RandomArticleState();

}



class _RandomArticleState extends State<RandomArticle{

  static Dio _dio = Dio(

    BaseOptions(baseUrl: 'https://interface.meiriyiwen.com'),

  );



  static Future<Map> _getArticle() async {

    Response response = await _dio.get(

      '/article/random'.

      queryParameters: {"dev"1},

    );



    final data = response.data['data'];

    return data;

  }



  Stream<Map> _futuresStream;



  @override

  void initState() {

    List<Future<Map>> futures = [];

    for (int i = 0; i < 10; i++) {

      / / add the Future

      futures.add(_getArticle());

    }



    / / generated Stream

    _futuresStream = Stream<Map>.fromFutures(futures);

    super.initState();

  }



  @override

  Widget build(BuildContext context) {

    return Scaffold(

      appBar: AppBar(title: Text('Random Article')),

      body: SingleChildScrollView(

        child: Center(

          child: StreamBuilder<Map> (

            stream: _futuresStream,

            builder: (context, AsyncSnapshot snapshot) {

              if (snapshot.hasData) {

                Map article = snapshot.data;



                return Container(

                  child: Column(

                    children: <Widget>[

                      SizedBox(height: 24.0),

                      Text(

                        article['title'].

                        style: TextStyle(fontSize: 24.0),

                      ),

                      Padding(

                        padding: const EdgeInsets.only(

                          top: 12.0.

                          left: 12.0.

                          right: 12.0.

                          bottom: 60.0.

                        ),

                        child: Html(

                          data: article['content'].

                        ),

                      ),

].

                  ),

                );

              }

              return CircularProgressIndicator();

            },

          ),

        ),

      ),

    );

  }

}



Copy the code

Broadcast Stream

Using broadcast streams


import 'dart:async';



import 'package:flutter/material.dart';



class BroadcastStream extends StatefulWidget {

  @override

  _BroadcastStreamState createState() => _BroadcastStreamState();

}



class _BroadcastStreamState extends State<BroadcastStream{

  StreamController<int> _streamController = StreamController<int>.broadcast();

  StreamSubscription _subscription1;

  StreamSubscription _subscription2;

  StreamSubscription _subscription3;



  int _count = 0;

  int _s1 = 0;

  int _s2 = 0;

  int _s3 = 0;



  @override

  void initState() {

    _subscription1 = _streamController.stream.listen((n) {

      setState(() {

        _s1 += 1;

      });

    });



    _subscription2 = _streamController.stream.listen((n) {

      setState(() {

        _s2 += 2;

      });

    });



    _subscription3 = _streamController.stream.listen((n) {

      setState(() {

        _s3 -= 1;

      });

    });



    super.initState();

  }



  void _add() {

    if (_count > 10) {

      // Stop the first subscription when it is greater than 10

      _subscription1.cancel();

    }

    _count++;

    _streamController.add(_count);

  }



  @override

  void dispose() {

    super.dispose();

    _streamController.close();

    _subscription1.cancel();

    _subscription2.cancel();

    _subscription3.cancel();

  }



  @override

  Widget build(BuildContext context) {

    return Scaffold(

      appBar: AppBar(

        title: Text('Broadcast Stream'),

      ),

      body: Container(

        width: double.infinity,

        height: MediaQuery.of(context).size.height,

        child: Column(

          mainAxisAlignment: MainAxisAlignment.center,

          crossAxisAlignment: CrossAxisAlignment.center,

          children: [

            Text('Count: $_count'),

            SizedBox(height: 12.0),

            Text('S1: $_s1'),

            SizedBox(height: 12.0),

            Text('S2: $_s2'),

            SizedBox(height: 12.0),

            Text('S3: $_s3'),

            SizedBox(height: 12.0),

            FloatingActionButton(

              onPressed: _add,

              child: Icon(Icons.plus_one),

            ),

].

        ),

      ),

    );

  }

}



Copy the code

conclusion

Stream is one way to handle asynchronous programming by providing an asynchronous sequence of events and sending it when you’re ready to receive it. Streams in Dart are synchronous and asynchronous, as well as single-subscription and broadcast streams, and there are several ways to create streams.

reference

Asynchronous programming: Use stream

Use Stream in Dart

Understand Stream thoroughly

Building a Widget with StreamBuilder

StreamBuilder (Flutter of the Week widget)

Dart Streams – Focus on Flutter

This article is formatted using MDNICE

PS: [Guangzhou] Welcome to contact me.