Giới thiệu Stream
Lập trình bất đồng bộ là một thuật ngữ phổ biến trong lập trình. Với ngôn ngữ lập trình Dart, chúng ta đã quen với Future class cung cấp một tính toán không được hoàn thành ngay lập tức và sẽ thông báo kết quả khi sẵn sàng, việc này chỉ thực hiện đúng 1 lần. Thay vào đó Stream là một luồng các sự kiện bất đồng bộ và cho phép chúng ta lắng nghe các sự kiện này khi chúng được bắn ra từ luồng.
A stream is a sequence of asynchronous events.
Phân loại Stream
-
Có 2 loại Stream
- Single subscription streams
- Broadcast streams
Hiểu cơ bản là Single subscription streams chỉ có thể lắng nghe 1 lần duy nhất. (Nếu có sự kiện lắng nghe từ một nơi nào khác hoặc lần thứ 2 trở lên thì sẽ báo lỗi). Trái lại Broadcast streams có thể lắng nghe bất kỳ đâu.
//Sử dụng controller //**Single subscription streams StreamController streamController = StreamController(); Stream stream = streamController.stream; //Broadcast streams- StreamController broadcastStreamController = StreamController.broadcast(); Stream stream = broadcastStreamController .stream; //Sử dụng async* Stream<int> countStream(int to) async* { for (int i = 1; i <= to; i++) { yield i; } } //yield -> bắn ra một event //yield* -> bắn ra 1 stream**
Các phương thức
- Sức mạnh của Stream không chỉ giúp việc lập trình bất đồng bộ dễ dàng mà nó còn có các bộ method hổ trợ rất mạnh mẽ như filter, transform…
- Xêm thêm tại https://dart.dev/tutorials/language/streams
- Thư viện support: extension cho stream Rxdart
- Tham khảo https://reactivex.io/ để tìm hiểu về Reactivex/Functional reactive programming để tìm hiểu về cơ chế, cách hoạt động, cách sử dụng (support rất nhiều ngôn ngữ)
Tại sao nên sử dụng Stream
- Stream là một thư viện/api core của Dart.
- → Dễ dàng tạo ra các Plugin (Tách nhỏ repo)
- → Nắm vững công nghệ
- Stream phù hợp với phong cách *làm mới giao diện (declarative) của Flutter. (***so với cách Imperative ở Native) và mô hình MVVM.
- Giảm phụ thuộc vào các thư viện của các bên thứ 3.
- → Tránh lỗi khi thay đổi version
- → Giảm app size
- Một số thư viện như bloc, getx có cơ chế phụ thuộc vào Stream.
Xây dựng giao diện bằng StreamBuilder
StreamBuilder lắng nghe sự thay đổi của Stream và làm mới lại giao diện.
Để sử dụng StreamBuilder cần gọi
const StreamBuilder({ Key? key, Stream<T>? stream, T? initialData, required AsyncWidgetBuilder<T> builder,
})
Ý nghĩa Parameters:
T? initialData: giá trị mặc định, nếu không truyền vào thì coi như chưa nhận được dữ liệu từ Stream Stream<T>? stream: truyền Stream cần lắng nghe và được xử lý ở hàm builder required AsyncWidgetBuilder<T> builder: Xây dựng giao diện được thiết lập ở đây
builder: ( BuildContext context, AsyncSnapshot<int> snapshot, ) { //return Widget here;
} class AsyncSnapshot<T> { /// Creates an [AsyncSnapshot] with the specified [connectionState], /// and optionally either [data] or [error] with an optional [stackTrace] /// (but not both data and error). const AsyncSnapshot._(this.connectionState, this.data, this.error, this.stackTrace) : assert(connectionState != null), assert(!(data != null && error != null)), assert(stackTrace == null || error != null);
Khi xây dựng UI cần chú ý tới thành phần snapshot trong builder
- Kiểm tra ConnectionState xem tình trạng kết nối với Stream
- snapshot.hasError và snapshot.error: Kiểm tra có lỗi và lấy lỗi. (Xử dụng
addError
để bắn ra sự kiện lỗi) - snapshot.hasData và snapshot.data: Kiểm tra có dữ liệu và lấy dữ liệu. (Xử dụng
add
để bắn ra dữ liệu)
Các trạng thái của ConnectionState
/// The state of connection to an asynchronous computation.
///
/// The usual flow of state is as follows:
///
/// 1. [none], maybe with some initial data.
/// 2. [waiting], indicating that the asynchronous operation has begun,
/// typically with the data being null.
/// 3. [active], with data being non-null, and possible changing over time.
/// 4. [done], with data being non-null.
///
/// See also:
///
/// * [AsyncSnapshot], which augments a connection state with information
/// received from the asynchronous computation.
enum ConnectionState { /// Not currently connected to any asynchronous computation. /// /// For example, a [FutureBuilder] whose [FutureBuilder.future] is null. none, /// Connected to an asynchronous computation and awaiting interaction. waiting, /// Connected to an active asynchronous computation. /// /// For example, a [Stream] that has returned at least one value, but is not /// yet done. active, /// Connected to a terminated asynchronous computation. done,
}
- Khi không truyền Stream vào StreamBuilder (có nghĩa Stream là null) → none
- Khi truyền Stream (initialData có thể bằng null hoặc không) và chưa add sự kiện vào → waiting
- Khi truyền Stream và add sự kiện → active
- Khi truyền Stream và close() → done
Ví dụ về xử lý giao diện:
StreamBuilder<int>( stream: stream, builder: ( BuildContext context, AsyncSnapshot<int> snapshot, ) { if (snapshot.connectionState == ConnectionState.waiting) { return CircularProgressIndicator(); } else if (snapshot.connectionState == ConnectionState.active || snapshot.connectionState == ConnectionState.done) { if (snapshot.hasError) { **//<-- Kiểm tra có lỗi** return const Text('Error'); } else if (snapshot.hasData) { **//<-- Kiểm tra có data** return Text( snapshot.data.toString(), **//<-- Lấy data** style: const TextStyle(color: Colors.red, fontSize: 40) ); } else { return const Text('Empty data'); } } else { return Text('State: ${snapshot.connectionState}'); } },
),
[Demo] Xây dựng ứng dụng đếm ngược thời gian bằng Stream
<aside> 📌 Demo bên dưới không xử lý các trạng thái lỗi </aside>Các tính năng cơ bản:
Bắt đầu - Tạm dừng - Tiếp tục - Làm mới
UI Bắt đầu
UI khi đã Bắt đầu
Các bước thực hiện
<aside> ✍🏻 Các khai báo và các hàm được viết trong State của StatefulWidget </aside>class CountDownCustomCubitPage extends StatefulWidget { const CountDownCustomCubitPage({Key? key, required this.seconds}) : super(key: key); final int seconds; State<CountDownCustomCubitPage> createState() => CountDownCustomCubitPageState();
} class CountDownCustomCubitPageState extends State<CountDownCustomCubitPage> { /* Code here */
}
- Tạo StreamController để quản lý luồng dữ liệu
final StreamController<int> _timeStreamController = StreamController(); Stream<int> get _timeStream => _timeStreamController.stream;
- Tạo StreamSubscription để quản lý việc đếm ngược
_timeSubscription?.pause(); //tạm dừng
_timeSubscription?.resume(); //tiếp tục
_timeSubscription?.cancel(); //huỷ bỏ
StreamSubscription? _timeSubscription; void _onStart() { _timeSubscription = Stream.periodic(const Duration(seconds: 1), (computationCount) => _start - computationCount).listen( (event) { _timeStreamController.add(event); if (event == 0) { _onFinish(); } }, );
} //nhớ _timeSubscription?.dispose(); ở dispose() void _onResume() { if (_timeSubscription?.isPaused ?? false) { _timeSubscription?.resume(); } } void _onPause() { if (!(_timeSubscription?.isPaused ?? true)) { _timeSubscription?.pause(); } } void _onFinish() { _timeSubscription?.cancel(); _timeSubscription = null; } void _onReset() { _timeSubscription?.cancel(); _timeSubscription = null; _timeStreamController.add(_start); }
Stream.periodic(**const** Duration(seconds: 1), (computationCount) => **_start** - computationCount)
tạo một Stream trả về giá trị định kỳ sau 1 giây.
**_timeStreamController**.add(event);
add thời gian mới vào Stream
- Hiển thị dữ liệu lên giao diện
StreamBuilder<int>( initialData: _start, stream: _timeStream, builder: (context, snapshot) { if (snapshot.hasData) { final int time = snapshot.data!; var separateWidget = Padding( padding: const EdgeInsets.symmetric(horizontal: 4), child: Text( ':', style: Theme.of(context).textTheme.headline2?.copyWith( fontFamily: 'BlackOpsOne', ), textAlign: TextAlign.center, ), ); return Row( mainAxisSize: MainAxisSize.min, crossAxisAlignment: CrossAxisAlignment.center, children: [ _TextWidget( number: time.hour.tens, ), _TextWidget( number: time.hour.ones, ), separateWidget, _TextWidget( number: time.minute.tens, ), _TextWidget( number: time.minute.ones, ), separateWidget, _TextWidget( number: time.second.tens, ), _TextWidget( number: time.second.ones, ), ], ),; } return const SizedBox(); }),
- Extension lấy thông tin thời gian từ kiểu int
extension IntToTime on int { ///lấy thông tin giờ int get hour => _getHour(); int _getHour() { return (this / 3600).floor(); // return Duration(seconds: this).inHours; } ///lấy thông tin giờ int get minute => _getMinute(); int _getMinute() { return (this / 60).floor() % 60; } ///lấy thông tin giây int get second => _getSecond(); int _getSecond() { return this % 60; } ///format hiển thị số ở hàng chục int get tens => _getTens(); int _getTens() { if (this >= 10) { return ((this - (this % 10)) / 10).round(); } return 0; } ///format hiển thị số ở hàng đơn vị int get ones => _getOnes(); int _getOnes() { return this % 10; }
}
- Nhằm
giúp cho các công việc không thực hiện lại công việc nó đang thực hiện
thực hiện gọi các function thông qua streamController
void initState() { super.initState(); setTime(); ///việc quản lý các sự kiện bằng stream ở đây ///giúp cho các công việc không thực hiện lại công việc nó đang thực hiện ///bằng hàm distinct() _functionSubscription = _functionController.stream.distinct().listen((event) { switch (event) { case CountDownEvent.start: _onStart(); break; case CountDownEvent.pause: _onPause(); break; case CountDownEvent.resume: _onResume(); break; case CountDownEvent.reset: _onReset(); break; } }); } //ví dụ
Button( onTap: () { _functionController.add(CountDownEvent.resume); }, title: 'Resume',
),
<aside>
📖 Full code →
</aside>
[Demo] Tự tạo flutter_bloc theo style Cubit bằng Stream
- Nhằm tăng tính đọc hiểu dữ liệu và phân chia mã nguồn giữa giao diện và logic chúng ta cần tách biệt xử lý ở các mà
Bước 1: Tạo abstract class để định nghĩa các biến và hàm cơ bản của bloc
abstract class CustomCubit<T> { CustomCubit(T initValue) { _streamCtrl = StreamController.broadcast()..add(initValue); //nếu dùng thêm thư viện rxdart thì xài seed để init value } late StreamController<T> _streamCtrl; Stream<T> get stream => _streamCtrl.stream; void emit(T state) { _streamCtrl.add(state); } void close() { _streamCtrl.close(); }
}
<aside>
⚠️ **`_streamCtrl** = StreamController.broadcast()..add(initValue);` stream không thể nhận được event này bới stream.listen được gọi sau khi hàm hàm constructor chạy.
Có thể tự initValue ở StreamBuilder hoặc dùng thư viện RxDart có hổ trợ chức năng này
</aside>Bước 2:
- Tạo TimerCubit
- Mang hết khai báo và hàm từ vào trong TimerCubit
class TimerCubit extends CustomCubit<int> { TimerCubit(int initValue) : super(initValue) { startTime = initValue; } StreamSubscription? _subscription; late StreamSubscription _controlSubscription; final StreamController<CountDownEvent> _timerController = StreamController.broadcast(); Stream<CountDownEvent> get timerControllerStream => _timerController.stream; int startTime = 0; void close() { _subscription?.cancel(); _controlSubscription.cancel(); _timerController.close(); super.close(); } void init() { _controlSubscription = _timerController.stream.distinct().listen((event) { switch (event) { case CountDownEvent.start: _onStart(); break; case CountDownEvent.pause: _onPause(); break; case CountDownEvent.resume: _onResume(); break; case CountDownEvent.reset: _onReset(); break; } }); } void _setTime(int time) { emit(time); } void _onStart() { if (_subscription != null) { _onReset(); } _subscription = Stream.periodic(const Duration(seconds: 1), (computationCount) => startTime - computationCount).listen( (time) { _setTime(time); if (time == 0) { _onFinish(); } }, ); } void _onResume() { if (_subscription?.isPaused ?? false) { _subscription?.resume(); } } void _onPause() { if (!(_subscription?.isPaused ?? true)) { _subscription?.pause(); } } void _onFinish() { _subscription?.cancel(); _subscription = null; } void _onReset() { _subscription?.cancel(); _subscription = null; } void timerController(CountDownEvent event) { _timerController.add(event); }
}
Bước 3: Tạo BlocProvider
- Việc tạo BlocProvider để quản lý instance của bloc bằng context nếu không muốn làm theo cách này có thể tạo biến toàn cục để quán lý riêng.
- BlocProvider được ứng dùng từ InheritedWidget
// final timerCubit = CustomBlocProvider.of<TimerCubit>(context);
class CustomBlocProvider<T extends CustomCubit> extends InheritedWidget { const CustomBlocProvider({ super.key, required this.bloc, required super.child, }); final T bloc; static CustomBlocProvider? maybeOf<T extends CustomCubit>(BuildContext context) { return context.dependOnInheritedWidgetOfExactType<CustomBlocProvider<T>>(); } static T of<T extends CustomCubit>(BuildContext context) { final CustomBlocProvider? result = maybeOf<T>(context); assert(result != null, 'No BlocProvider found in context'); return result!.bloc as T; } bool updateShouldNotify(CustomBlocProvider oldWidget) => bloc != oldWidget.bloc;
} final timerCubit = context.read<TimerCubit>();
extension ReadCustomBlocProviderOfContext on BuildContext { T read<T extends CustomCubit>() { return CustomBlocProvider.of<T>(this); }
}
Bước 4: Khai báo
class CountDownCustomCubitPage extends StatefulWidget { const CountDownCustomCubitPage({Key? key, required this.seconds}) : super(key: key); final int seconds; State<CountDownCustomCubitPage> createState() => CountDownCustomCubitPageState();
} class CountDownCustomCubitPageState extends State<CountDownCustomCubitPage> { late TimerCubit _timerCubit; void initState() { super.initState(); _timerCubit = TimerCubit(widget.seconds) ..init(); } void dispose() { _timerCubit.close(); super.dispose(); } Widget build(BuildContext context) { return CustomBlocProvider<TimerCubit>( bloc: _timerCubit, child: Scaffold( appBar: AppBar(title: Text("Timer test")), body: Center( child: SingleChildScrollView( child: Column( children: <Widget>[ _Content(), /*Some Code*/ ], ), ), ), ), ); }
}
class _Content extends StatelessWidget { const _Content({Key? key}) : super(key: key); Widget build(BuildContext context) { final timerCubit = context.read<TimerCubit>(); // final timerCubit = CustomBlocProvider.of<TimerCubit>(context); return StreamBuilder<int>( initialData: timerCubit.startTime, stream: timerCubit.stream, builder: (context, snapshot) { if (snapshot.hasData) { final int time = snapshot.data!; var separateWidget = Padding( padding: const EdgeInsets.symmetric(horizontal: 4), child: Text( ':', style: Theme.of(context).textTheme.headline2?.copyWith( fontFamily: 'BlackOpsOne', ), textAlign: TextAlign.center, ), ); return FittedBox( child: InkWell( onTap: () {}, child: Row( mainAxisSize: MainAxisSize.min, crossAxisAlignment: CrossAxisAlignment.center, children: [ _TextWidget( number: time.hour.tens, ), _TextWidget( number: time.hour.ones, ), separateWidget, _TextWidget( number: time.minute.tens, ), _TextWidget( number: time.minute.ones, ), separateWidget, _TextWidget( number: time.second.tens, ), _TextWidget( number: time.second.ones, ), ], ), ), ); } return const SizedBox(); }); }
}
<aside>
📖 Full code →
</aside>
Các tính năng có thể bổ xung
- Có thể custom _Content thành BlocBuilder
- Thêm các tính năng như buildWhen, listenner
- MultiBlocProvider
Tham khảo
https://dart.dev/tutorials/language/streams
https://medium.flutterdevs.com/exploring-streambuilder-in-flutter-5958381bca67