- vừa được xem lúc

Chinh phục RxDart Flutter trong 3 nốt nhạc. Nốt thứ 2: Tiếp tục với 'Sờ tream'

0 0 180

Người đăng: Nguyễn Thành Minh

Theo Viblo Asia

Lời mở đầu

Mình viết series này với mục đích chia sẻ kiến thức về RxDart trong Flutter. Vì nó có liên quan đến các kiến thức về lập trình bất đồng bộ như Stream nên mình quyết định sẽ đi từ gốc cho đến ngọn ?

bài 1 mình đã chia sẻ khái niệm Stream là gì và các thuật ngữ quan trọng liên quan. Bây giờ chúng ta sẽ tiếp tục Sờ tream.

1. Stream Operators

Như mình đã giới thiệu ở bài 1, operators là những function được cung cấp để giúp ta dễ dàng xử lý event trong Stream. Có thể từ 1 Stream chế biến ra 1 Stream khác (Stream transformation), hoặc chỉ đơn giản là get 1 giá trị nào đó trong Stream (Receiving stream events), khi đó giá trị đó sẽ được trả dưới dạng một Future. Vì vậy mình sẽ chia thành 2 nhóm nhỏ: nhóm trả về Future và nhóm trả về Stream khác. Vì số function trong mỗi nhóm là rất nhiều nên mình sẽ giới thiệu 5 function điển hình, hay dùng.

I. Nhóm function trả về Future

1. elementAt

Hàm này giúp ta get được phần tử tại index bất kỳ trong Stream

void main() async { var future = await testStream().elementAt(2); // get data tại index = 2 print(future); // print ra: 15
} // xuyên suốt các ví dụ mình sẽ sử dụng Stream này
Stream<int> testStream() async* { yield 5; yield 10; yield 15; yield 20;
}

2. length

Hàm này giúp ta get được số lượng phần tử có trong stream.

void main() async { print(await testStream().length); // print: 4
} Stream<int> testStream() async* { yield 5; yield 10; yield 15; yield 20;
}

3. firstWhere

Hàm này giúp ta get được phần tử đầu tiên thỏa mãn một điều kiện được chúng ta cho trước. Ví dụ dưới đây mình muốn get ra phần tử đầu tiên có giá trị lớn hơn 11. Đó chính là 15

void main() async { print(await testStream().firstWhere((element) => (element > 11))); // print: 15
} Stream<int> testStream() async* { yield 5; yield 10; yield 15; yield 20;
}

4. join

Hàm này nó nối tất cả element thành 1 String duy nhất, giữa các element được ngăn cách bởi 1 ký tự separator do chúng ta truyền vào.

void main() async { print(await testStream().join(',')); // print: '5,10,15,20'
} Stream<int> testStream() async* { yield 5; yield 10; yield 15; yield 20;
}

5. await for

Cú pháp await for giúp chúng ta tính tổng các phần tử trong Stream một cách bất đồng bộ. Vì là tính bất đồng bộ nên nó sẽ tính nhanh hơn dùng for thông thường rất nhiều.

void main() async { var sum = 0; await for (final value in testStream()) { sum += value; } print(sum); // print: 50
} Stream<int> testStream() async* { yield 5; yield 10; yield 15; yield 20;
}

Mình thấy trong lớp Stream không có hàm sum mà dùng cú pháp await for đó thì nó dài dòng quá nên chúng ta có thể viết thêm 1 extension function cho nó:

void main() async { print(await testStream().sum()); // chỉ cần gọi hàm sum là print: 50
} Stream<int> testStream() async* { yield 5; yield 10; yield 15; yield 20;
} // extension function của Stream dành để tính tổng
extension StreamExtensions on Stream<int> { Future<int> sum() async { var sum = 0; await for (final value in this) { sum += value; } return sum; }
}

Bonus: Xem thêm các function khác tại đây

II. Nhóm function trả về Stream

Do các hàm này nó transform (biến đổi) một Stream sang một Stream khác nên để tránh nhầm lẫn, chúng ta sẽ thống nhất đặt cho chúng 2 cái tên là Source Stream và Output Stream như sau:

Source Stream -> transform -> Output Stream

1. map

Hàm này cho phép chúng ta truyền vào 1 hàm, hàm này sẽ giúp ta biến đổi từng element của Source Stream.

void main() { var outputStream = testStream().map((event) => event / 5); // chia từng phần tử của Source Stream cho 5 outputStream.listen(print); // subscribe outputStream để nhận kết quả
} Stream<int> testStream() async* { yield 5; yield 10; yield 15; yield 20;
}

Output:

1.0
2.0
3.0
4.0

2. where

Hàm này lọc ra những phần tử trong Source Stream thỏa mãn điều kiện cho trước.

void main() { testStream().where((event) => event > 11).listen(print); // lọc ra những phần tử > 11
} Stream<int> testStream() async* { yield 5; yield 10; yield 15; yield 20;
}

Output:

15
20

3. takeWhile

Hàm này nó sẽ emit ra các phần tử thỏa mãn điều kiện cho trước cho đến khi có một phần tử KHÔNG thỏa mãn điều kiện thì nó dừng Stream.

void main() { testStream().takeWhile((event) => event < 11).listen(print, onDone: () => print('Done!'));
} Stream<int> testStream() async* { yield 5; yield 10; yield 15; // tới đây gặp 15 KHÔNG pass điều kiện nên dừng stream, ko emit ra 15 yield 20;
}

Output:

5
10
Done!

Và như vậy có nghĩa là nếu Source Stream yield 15 trước tiên thì Output Stream sẽ không có phần tử nào.

void main() { testStream().takeWhile((event) => event < 11).listen(print, onDone: () => print('Done!'));
} Stream<int> testStream() async* { yield 15; // tới đây gặp 15 KHÔNG pass điều kiện nên dừng stream, ko emit ra 15 yield 10; yield 5; yield 20;
}

Output:

Done!

4. skipWhile

Hàm này nó sẽ bỏ qua, không emit ra các phần tử thỏa mãn điều kiện cho trước cho đến khi có một phần tử KHÔNG thỏa mãn điều kiện được emit ra.

void main() { testStream().skipWhile((event) => event < 11).listen(print); // bỏ qua các phần tử < 11
} Stream<int> testStream() async* { yield 5; yield 10; yield 15; yield 20;
}

Output:

15
20

Các bạn chú ý là: skipWhile không có lọc theo điều kiện where nhé. Thằng skipWhile một khi đã emit được 1 phần tử đầu tiên rồi thì nó sẽ không skip bất cứ phần tử nào nữa. Ví dụ như case này:

void main() { testStream().skipWhile((event) => event < 11).listen(print);
} Stream<int> testStream() async* { yield 5; // thằng 5 CÓ pass điều kiện nên bị skip yield 15; // thằng 15 KHÔNG pass điều kiện nên được emit ra đầu tiên yield 6; // nhờ vậy mà tất cả phần tử sau 15 đều được emit kể cả 6 yield 20;
}

Output:

15
6
20

5. distinct

Hàm này skip (bỏ qua) các phần tử mà chúng equal (bằng) với phần tử đã được emit trước đó (the previous element).

void main() { testStream().distinct().listen(print, onDone: () => print('Done!'));
} Stream<int> testStream() async* { yield 5; // emit 5 yield 5; // bỏ qua vì phần tử liền trước là 5 yield 5; // bỏ qua vì phần tử liền trước là 5 yield 10; // emit 10 (vì 10 khác thằng liền trước đã emit là 5) yield 5; // emit 5 (vì 5 khác thằng liền trước đã emit là 10) yield 10; // emit 10 (vì 10 khác thằng liền trước đã emit là 5)
}

Output:

5
10
5
10
Done!

Ngoài ra, distinct còn cho phép chúng ta có thể định nghĩa lại thế nào là bằng nhau.

void main() { // định nghĩa lại: 2 số bằng nhau là khi 2 số đó chia lấy phần nguyên với 10 ra kết quả bằng nhau  testStream().distinct((int previous, int next) => previous ~/ 10 == next ~/ 10) .listen(print, onDone: () => print('Done!'));
} Stream<int> testStream() async* { yield 5; yield 10; yield 15; // không được emit ra vì 10 ~/ 10 = 15 ~/10 = 1 yield 20;
}

Output:

5
10
20
Done!

Bonus: Xem thêm các function khác tại đây

2. Handle errors

Stream có một function giúp ta bắt lỗi trong Stream là handleError. Hàm này nó lợi hại hơn onError trong hàm listen mà mình đã giới thiệu ở bài 1 ở chỗ. Nó cung cấp cho ta 1 optional param là hàm test, ở hàm test ta có thể check xem cái lỗi đó có đáng để ta catch hay không, nếu không đáng thì ta có thể cho ném lỗi đó luôn không cần catch (tương tự rethrow lỗi).

Hàm test này return true thì đồng nghĩa với việc chúng ta catch lỗi đó, ngược lại return false thì chúng ta rethrow lỗi đó.

void main() { testStream().handleError(print, test: (error) { if (error is HttpException) { return true; // return true thì catch lỗi đó lại } else { return false; // return false thì rethrow lỗi đó lun } }).listen(print);
} Stream<int> testStream() async* { yield 5; throw FormatException('lỗi rồi'); yield 10;
}

FormatException không phải là HttpException nên hậu quả là máu nhuộm cờn sô lơ ?

Vì hàm testoptional param nên nếu chúng ta không truyền vào thì mặc định Dart sẽ hiểu tất cả lỗi trong Stream sẽ được catch

void main() { testStream().handleError(print).listen(print);
} Stream<int> testStream() async* { yield 5; throw FormatException('lỗi rồi'); yield 10;
}

Output:

5
FormatException: lỗi rồi Process finished with exit code 0

3. Stream Controlller

StreamController đơn giản chỉ là 1 controller bên trong có 1 stream và controller này giúp ta điều khiển stream đó dễ dàng. Chúng ta có thể thoải mái push các events vào stream, lắng nghe nhận data từ stream đó. Nói cách khác, đây là một con đường khác giúp ta tạo ra 1 Stream và thoải mái emit events và nhận events bất cứ thời điểm nào chúng ta muốn.

Trong mỗi StreamController đều có 2 đối tượng là sink(giúp chúng ta push events đến stream) và stream (giúp chúng ta nhận events từ sink)

void main() async { // tạo stream controller var streamController = StreamController(); // lắng nghe streamController.stream.listen(print); // push events streamController.sink.add('Tui là Minh'); streamController.sink.add(100); // Khi không cần sử dụng controller này nữa thì nên close controller await Future.delayed(Duration(seconds: 2)); // sau 2 giây ta sẽ close controller await streamController.close(); // sau khi close mà chúng ta vẫn cố push event sẽ gặp Exception:  // Unhandled exception: Bad state: Cannot add new events after calling close // streamController.sink.add(11); // cố push event sau khi controller đã close
}

Output:

Tui là Minh
100

4. Broadcast StreamController

Giả sử chúng ta muốn có 2 nguồn lắng nghe events từ sink

void main() { // tạo broadcast stream controller var streamController = StreamController(); // subscription thứ nhất streamController.stream.listen((event) { print('subscription thứ 1: $event'); }); // subscription thứ hai sẽ double giá trị của event lên streamController.stream.listen((event) { print('subscription thứ 2: ${event + event}'); // double value lên }); // push events streamController.sink.add('Tui là Minh'); streamController.sink.add(100);
}

Cái mà chúng ta nhận được là máu nhuộm cờn sô lơ:

Nó báo lỗi là Stream này đã được lắng nghe rồi, vậy không có cách nào để có nhiều hơn 1 thằng lắng nghe Stream à?

Đừng lo, vì chúng ta đã có Broadcast StreamController. Với Broadcast StreamController chúng ta có thể tạo ra bao nhiêu thằng lắng nghe Stream đó cũng được.

Để tạo ra một Broadcast StreamController rất đơn giản, thay vì sử dụng constructor : StreamController() để tạo streamController thì ta sử dụng constructor StreamController.broadcast(). Và đối tượng stream trong streamController bây giờ người ta gọi là một Broadcast Stream.

void main() { // tạo Broadcast StreamController var streamController = StreamController.broadcast(); // subscription thứ nhất streamController.stream.listen((event) { // streamController.stream là 1 broadcast stream print('subscription thứ 1: $event'); }); // subscription thứ hai sẽ double giá trị của event lên streamController.stream.listen((event) { // streamController.stream là 1 broadcast stream print('subscription thứ 2: ${event + event}'); }); // push events streamController.sink.add('Tui là Minh'); streamController.sink.add(100);
}

Output:

subscription thứ 1: Tui là Minh
subscription thứ 2: Tui là MinhTui là Minh
subscription thứ 1: 100
subscription thứ 2: 200

Kết luận

Như vậy, chúng ta đã đi hết phần Stream. Sau khi hiểu được Stream bạn đã có thể học được Bloc Pattern mà không cần thiết phải học RxDart. Nếu bạn đang cần học Bloc Pattern thì bạn có thể dễ dàng học ở nguồn khác, có rất nhiều người chia sẻ về nó. Tất nhiên mình vẫn tiếp tục viết về RxDart và cả Bloc Pattern. Cơ mà có bỏ đi thì đừng quên tặng cho mình 1 vote nếu thấy những chia sẻ của mình có giá trị nhé =))

Nguồn tham khảo: https://github.com/boriszv/Programming-Addict-Code-Examples/tree/master/12. Reactive Programming Course

Đọc bài cuối cùng: Chinh phục RxDart Flutter trong 3 nốt nhạc. Nốt nhạc cuối: RxDart không đáng sợ như bạn nghĩ

Bình luận

Bài viết tương tự

- vừa được xem lúc

Giới thiệu Typescript - Sự khác nhau giữa Typescript và Javascript

Typescript là gì. TypeScript là một ngôn ngữ giúp cung cấp quy mô lớn hơn so với JavaScript.

0 0 500

- vừa được xem lúc

Cài đặt WSL / WSL2 trên Windows 10 để code như trên Ubuntu

Sau vài ba năm mình chuyển qua code trên Ubuntu thì thật không thể phủ nhận rằng mình đã yêu em nó. Cá nhân mình sử dụng Ubuntu để code web thì thật là tuyệt vời.

0 0 374

- vừa được xem lúc

Đặt tên commit message sao cho "tình nghĩa anh em chắc chắn bền lâu"????

. Lời mở đầu. .

1 1 701

- vừa được xem lúc

Tìm hiểu về Resource Controller trong Laravel

Giới thiệu. Trong laravel, việc sử dụng các route post, get, group để gọi đến 1 action của Controller đã là quá quen đối với các bạn sử dụng framework này.

0 0 335

- vừa được xem lúc

Phân quyền đơn giản với package Laravel permission

Như các bạn đã biết, phân quyền trong một ứng dụng là một phần không thể thiếu trong việc phát triển phần mềm, dù đó là ứng dụng web hay là mobile. Vậy nên, hôm nay mình sẽ giới thiệu một package có thể giúp các bạn phân quyền nhanh và đơn giản trong một website được viết bằng PHP với framework là L

0 0 421

- vừa được xem lúc

Bạn đã biết các tips này khi làm việc với chuỗi trong JavaScript chưa ?

Hi xin chào các bạn, tiếp tục chuỗi chủ đề về cái thằng JavaScript này, hôm nay mình sẽ giới thiệu cho các bạn một số thủ thuật hay ho khi làm việc với chuỗi trong JavaScript có thể bạn đã hoặc chưa từng dùng. Cụ thể như nào thì hãy cùng mình tìm hiểu trong bài viết này nhé (go).

0 0 414