Chúng ta ai ai cũng thích mì Hảo Hảo ăn liền nhanh, gọn, tiện thì Reactive Programming cũng vậy. Nó không block, không đợi, xử lý nhiều việc cùng lúc chỉ với vài thread. Trong bài viết này, chúng ta sẽ cùng tìm hiểu reactive là gì, vì sao nó “ăn liền” như Hảo Hảo, và tại sao context switch quá nhiều lại khiến hệ thống của bạn… “lì như mì để nguội”.
1.Non-blocking I/O - Gốc rễ của reactive
1.1. Blocking I/O là gì - vấn đề của nó là gì ?
Blocking I/O là mô hình lập trình mà một thread (có thể nằm trong thread pool ) phải đợi cho đến khi một hành động I/O hoàn tất thì mới tiếp tục thực hiện được bước tiếp theo.
Các hành I/O phổ biến diễn ra trong quá trình lập trình:
- Truy vấn cơ sở dữ liệu
- Gọi API đến các services khác
- Ghi / đọc dữ liệu từ disk
🧪 Ví dụ: Đặt món trong nhà hàng
Giả sử ta có một hệ thống , coi như microservice luôn đi. Gồm các services như OrderBooking, ingredientPreparation, DeliverySystem ,…
Ông A đặt một món ăn từ cửa hàng, các bước xử lý bao gồm
-
Nhận request từ hệ thống OrderBookkeping
-
Validate dữ liệu có hợp lệ không
-
Nếu hợp lệ gửi yêu cầu đến ingredient* để chuẩn bị món ăn
-
Kiểm tra còn nguyên liệu không
-
Nếu mọi thứ ổn, chuẩn bị món ăn
-
Gửi đến hệ thống deliverySystem để giao món ăn
Các bước xử lý được thực hiện trên một thread, phần màu xanh lá thể hiện việc xử lý trong CPU của service, phần màu đỏ (blocking) thể hiện việc chờ phản hồi từ I/O.
Đi sâu vào 1 action thì sao:
Mỗi bước gọi DB hay API như trên đều đựọc thực hiện bởi một thread xuyên suốt và các action đều là blocking. Mỗi 1 request xử lý bởi 1 thread thì đều phải đứng yên đợi cho I/O trả về, vậy thì thời gian hoạt động sẽ bằng
time = operation time + wait time
OK vậy vấn đề xuất hiện rồi đó =)).
1.2. Vấn đề của Blocking I/O
Với các hệ thống thực tế - đặc biệt là các hệ thống liên quan đến payment, việc xử lý hàng trăm, hàng ngàn request/ s bộc lộ các điểm yếu dễ thấy:
- Số lượng request càng nhìu → số lượng thread càng lớn và chi phí context switch giữa các thread tăng cao
Context switch( dịch ra tiếng ziệt là chi phí chuyển đổi trạng thái) là tài nguyên và thời gian mà hệ thống đang xử lý tiến trình A cần để chuyển sang tiến trình B. Vậy tại sao cần context switch ?
Mik khuyên các bạn nên đọc bài của a Đạt Bùi: https://viblo.asia/p/005-os-doi-xu-voi-thread-the-nao-bJzKmVvYZ9N?utm_source=chatgpt.com Nhưng để nói sơ thì tưởng tự 1 CPU có n cores (processors) mà có tận m threads. Thì việc xử lý sao cho cân bằng được việc m threads được thực thi cần 2 yếu tố: + Nếu đang thực thi thread này mà cần chuyển sang thread khác để thực thi thread khác ( thread blocking, I/O operation , timeout CPU slice - thời gian sử dụng CPU giới hạn) : Lưu trạng thái của tiến trình hoặc luồng hiện tại (ví dụ: giá trị các thanh ghi, bộ đếm chương trình, ngăn xếp). + Tải trạng thái của tiến trình hoặc luồng tiếp theo để tiếp tục thực thi.
- Lãng phí CPU: Thread chờ I/O nhưng vẫn chiếm tài nguyên.
- Khó scale: Khi số lượng kết nối tăng cao, hệ thống dễ bị ngẽn do giới hạn số thread của JVM hoặc OS cores.
Suy cho cùng, về mặt bản chất, chúng ta đang dùng một tài nguyên đắt tiền (thread) chỉ để... chờ.
2. (Non-)Blocking I/O – Gốc rễ của Reactive
2.1. Non-blocking I/O – Cách tiếp cận hiệu quả hơn
Non-blocking I/O thay đổi cách xử lý I/O: thread không cần chờ kết quả từ I/O, mà chỉ đăng ký nhận kết quả khi I/O hoàn tất. Dễ dàng nhận ra điều này giúp ích rất nhiều trong việc:
- Tăng khả năng tận dụng thread hiệu quả: trong lúc I/O xử lý, threads có thể làm việc khác. => viễn cảnh nhiều threads cùng thực thi bắt đầu từ đây
- Hệ thống xử lý được nhiều request hơn với số lượng thread ít hơn.
- Giảm chi phí context switching, tăng throughput.
Một hệ thống non-blocking có thể xử lý hàng chục nghìn kết nối chỉ với vài chục thread.
2.2. Event loop - Trái tim của bất đồng bộ
Event loop - cái tên ko còn gì xa lạ được xử dùng nhìu trong hệ thống non - blocking như nodeJS, netty, reactor.
Mô hình hoạt động:
- Cần hình dung platform gồm những action CPU intensive, event loop sẽ poll/get task từ queue, register callback cho các action I/O đến platform. Sau khi operation complete sẽ trigger (dispatch) callback tới task queue.
- Mô hình này sẽ chạy liên tục đến khi ứng dụng chết, hoặc tạm ngưng khi hết task trong queue.
2.3. Các điểm cần lưu ý
-
❗ Không block trong event loop:
Một khi bạn block (ví dụ gọi
Thread.sleep()
hay truy cập database sync), bạn đang làm nghẽn toàn bộ vòng lặp. -
✅ Tách blocking ra thread pool riêng:
Dùng
.publishOn(Schedulers.boundedElastic())
hoặc tương đương để thực thi các tác vụ blocking một cách cách ly. -
❗ Tránh logic phức tạp ngay trong callback:
Hãy viết code chia nhỏ, rõ ràng, tránh callback hell. Reactive streams với
Mono
vàFlux
giúp xử lý luồng dữ liệu theo pipeline rõ ràng hơn.
3. Các nguyên tắc và khái niệm cốt lõi
3.1. Reactive stream
- Là một đặc tả ( hay specification) cho lập trình non-blocking, hướng luồng dữ liệu ( stream - oriented), xử lý không đồng bộ với backpressure (kiểm soát dòng dữ liệu) - ví dụ như Project Reactor (Spring), RxJava, Akka Streams, hay Java Flow API (Java 9+).
- Reactive Streams định nghĩa 4 interface:
Publisher
,Subscriber
,Subscription
,Processor
để xử lý luồng dữ liệu bất đồng bộ với back‑pressure
Back‑pressure cho phép Subscriber điều khiển tốc độ nhận dữ liệu, tránh tràn bộ đệm và quá tải hệ thống.
Reactive Streams giải quyết bằng cách để Subscriber tự quyết định tốc độ bằng cách gọi request(n).
3.2. Mô hình publisher - subscriber
Reactive streams hoạt động dựa trên mô hình publisher - subcriber, với mục tiêu tách biệt giữa publisher - subcriber và đảm bảo xử lý bất đồng bộ ( async) và có kiểm soát (backpressure aware).
Các thành phần chính:
- Publisher: Nơi phát sinh dữ liệu. Có thể là một API, một sự kiện, hoặc bất kỳ luồng dữ liệu nào.
- Subscriber: Nơi xử lý dữ liệu đầu ra. Có thể là một hệ thống ghi log, lưu vào cơ sở dữ liệu, gửi HTTP request,…
- Subscription: là kết nối giữa
Publisher
vàSubscriber
. Đây là nơi xảy ra cơ chế backpressure:Subscriber
dùngsubscription.request(n)
để yêu cầu bao nhiêu phần tử nó sẵn sàng xử lý. - Processor: là một thành phần đặc biệt vừa là
Subscriber
, vừa làPublisher
, thường dùng để trung gian xử lý giữa 2 bên.
Cơ chế hoạt động:
Subscriber
đăng ký vàoPublisher
thông quasubscribe()
.Publisher
gọionSubscribe()
và truyền vào mộtSubscription
.Subscriber
gọirequest(n)
để yêu cầu dữ liệu.Publisher
bắt đầu gửi từng phần tử quaonNext()
cho đến khi:- Gửi đủ
n
phần tử được yêu cầu. - Gặp lỗi (
onError()
). - Kết thúc (
onComplete()
).
- Gửi đủ
3.3 Backpressure và cơ chế xử lý
Backpressure là cơ chế cho phép Subscriber
kiểm soát lượng dữ liệu nhận từ Publisher
, nhằm tránh tình trạng quá tải, tràn bộ đệm, hoặc treo hệ thống khi nguồn dữ liệu phát quá nhanh.
Reactive Streams định nghĩa backpressure ở cấp độ giao tiếp giữa Publisher
và Subscriber
. Khi Subscriber
chưa sẵn sàng, nó không gọi request(n)
, và dữ liệu sẽ không được gửi tiếp.
Một số chiến lược xử lý khi thiếu backpressure:
.onBackpressureBuffer()
: đệm tạm các phần tử chưa được xử lý. Có thể gây tràn bộ nhớ nếu không giới hạn kích thước..onBackpressureDrop()
: bỏ qua các phần tử không thể xử lý kịp..onBackpressureLatest()
: chỉ giữ phần tử mới nhất, bỏ qua các phần tử cũ..onBackpressureError()
: phát sinh lỗi ngay khi không có demand phù hợp.
3.4. Cold vs Hot Publisher
- Cold Publisher: chỉ bắt đầu phát dữ liệu khi có
Subscriber
, và phát lại từ đầu cho mỗi subscriber. Ví dụ:Flux.range()
,Mono.fromCallable()
. - Hot Publisher: phát dữ liệu ngay cả khi chưa có subscriber, và subscriber sau sẽ chỉ thấy dữ liệu tại thời điểm mình bắt đầu theo dõi. Ví dụ:
ConnectableFlux
,Flux.interval()
.
3.5. Schedulers và Multi-threading
Reactor hỗ trợ schedulers để chuyển đổi thread trong chuỗi xử lý:
Schedulers.immediate()
– chạy trong thread hiện tại.Schedulers.single()
– một thread duy nhất, giống event loop.Schedulers.boundedElastic()
– dành cho các tác vụ blocking (ví dụ: truy cập database, API call).Schedulers.parallel()
– dùng cho xử lý song song, chia workload ra nhiều core.
Sử dụng .publishOn()
hoặc .subscribeOn()
để thay đổi scheduler trong luồng xử
Mono.fromCallable(() -> blockingCall()) .subscribeOn(Schedulers.boundedElastic()) // chạy blocking trong thread pool riêng
3.6. Operators
Tiếp tục move đến Operators - thành phần cốt lõi để biến đổi dữ liệu. Reactive stream sử dụng các toán tử (operators) để xử lý dữ liệu một cách tuần tự, tương tự như trong các ngôn ngữ xử lý tập hợp:
map()
,flatMap()
: chuyển đổi dữ liệufilter()
,take()
,skip()
: lọc dữ liệuconcat()
,merge()
,zip()
: kết hợp nhiều streamdoOnNext()
,doOnError()
: xử lý các side-effect
Mỗi operator trả về một Flux
hoặc Mono
mới, nên bạn có thể chain các bước xử lý rất linh hoạt mà vẫn đảm bảo bất đồng bộ.
4. Ứng dụng thực tiễn của Reactive Programming
Reactive Programming không chỉ là lý thuyết — nó ngày càng được ứng dụng rộng rãi trong các hệ thống hiện đại, đặc biệt là khi xử lý dữ liệu lớn, kết nối đồng thời cao, hoặc tương tác nhiều I/O.
4.1. Hệ thống microservices và API gateway
- Reactive rất phù hợp trong các hệ thống microservices nhờ khả năng mở rộng tốt, xử lý bất đồng bộ và giảm thiểu độ trễ.
- Các API gateway như Spring Cloud Gateway được xây dựng trên nền tảng Project Reactor, giúp dễ dàng xử lý hàng ngàn request đồng thời mà không cần tạo hàng ngàn thread ( vì có thể reuse được).
4.2. Streaming dữ liệu real-time
- Reactive rất mạnh khi áp dụng trong các hệ thống xử lý dữ liệu theo luồng (real-time streaming), ví dụ như:
- Hệ thống gợi ý sản phẩm theo thời gian thực.
- Dashboards theo dõi số liệu trực tiếp.
- Theo dõi cảm biến IoT.
4.3. Hệ thống tích hợp với nhiều nguồn I/O
- Reactive giúp xử lý hiệu quả các tác vụ như:
- Truy xuất nhiều database cùng lúc.
- Gửi/nhận API call đồng thời.
- Kết nối tới nhiều hệ thống message broker (Kafka, RabbitMQ...).
- Tất cả đều được xử lý theo kiểu non-blocking, không cần tạo thread mới cho mỗi I/O, giảm tải tài nguyên và tăng throughput.
5. So sánh với lập trình truyền thống (imperative programming)
Tiêu chí | Lập trình truyền thống (Blocking) | Reactive (Non-blocking) |
---|---|---|
Kiểu xử lý | Tuần tự, đồng bộ | Bất đồng bộ, hướng sự kiện |
Tài nguyên | Cần nhiều thread | Ít thread, tận dụng tối đa CPU |
Hiệu năng khi xử lý nhiều kết nối | Dễ nghẽn, tốn bộ nhớ | Quy mô lớn, nhẹ và ổn định |
Quản lý lỗi | Try/catch theo flow | Operators như .onErrorResume() , .retry() |
Ví dụ | Spring MVC, JDBC | WebFlux, R2DBC, Reactor |
Reactive không thay thế hoàn toàn lập trình truyền thống. Tuy nhiên, trong các hệ thống cần hiệu năng cao, phản hồi nhanh, hoặc xử lý nhiều tác vụ I/O đồng thời, reactive là một lựa chọn rất mạnh mẽ.
6. Kết luận
Những hệ thống lớn hiện tại mà mình từng làm qua cũng đã đổi sang cách tiếp cận này.
Reactive Programming không phải là các gì hoàn toàn mới - nó giúp ta thay đổi cách nhìn nhận dòng dữ liệu (data pipeline) được xem như trung tâm, và backpressure, non-blocking, cùng với event-driven trở thành yếu tố cốt lõi.
Điều cốt yếu mà nó mang lại tổng hợp ở 3 yếu tố: nhẹ, mượt, và linh hoạt với quy mô lớn.
7. Refers:
- https://projectreactor.io/docs/core/release/reference/reactiveProgramming.html
- https://medium.com/swlh/reactive-programming-reactor-core-part1-1053fe3ae1a0
- https://www.ibm.com/docs/en/zos/2.5.0?topic=otap-clientserver-socket-programs-blocking-nonblocking-asynchronous-socket-calls
- My brother: M.Châu
Welcome to my space !!! Mình là Minh, nick name Minh Drake, rất mong nhận được thêm góp ý của các bạn để có thêm góc nhìn cũng như cải thiện thêm.