1. Consumer Api
Trái tim của Consumer API là một vòng lặp đơn giản để liên tục gọi đến kafka server lấy thêm dữ liệu.
Giống như cá mập phải liên tục di chuyển để sống, consumer cũng phải liên tục polling Kafka, nếu không chúng sẽ bị coi là "đã chết" và các partition mà chúng đang tiêu thụ sẽ được chuyển sang cho một consumer khác trong nhóm để tiếp tục tiêu thụ. Tham số chúng ta truyền vào phương thức poll() là một khoảng thời gian chờ và kiểm soát thời gian mà poll() sẽ bị block nếu không có dữ liệu trong bộ đệm của consumer. Nếu giá trị này được đặt là 0 hoặc đã có sẵn bản ghi, poll() sẽ trả về ngay lập tức; nếu không, nó sẽ chờ trong số mili giây được chỉ định.
Vòng lặp poll() làm nhiều việc hơn là chỉ lấy dữ liệu. Lần đầu tiên chúng ta gọi poll() với một consumer mới, nó sẽ có nhiệm vụ tìm GroupCoordinator, tham gia vào consumer group, và nhận partition được gán. Nếu có một rebalance được kích hoạt, nó cũng sẽ được xử lý bên trong vòng lặp poll(), bao gồm cả các callback liên quan. Điều này có nghĩa là hầu hết mọi sự cố có thể xảy ra với một consumer hoặc trong các callback được sử dụng trong listeners của nó đều có khả năng xuất hiện dưới dạng ngoại lệ được ném ra bởi poll().
Hãy nhớ rằng nếu poll() không được gọi trong khoảng thời gian dài hơn max.poll.interval.ms
, consumer sẽ được coi là đã chết và bị loại khỏi consumer group, vì vậy cần tránh làm bất cứ điều gì có thể gây ra việc bị block trong khoảng thời gian không xác định bên trong vòng lặp poll().
2. Thread Safety
Chúng ta không thể có nhiều consumer thuộc cùng một consumer group trong một thread, và chúng ta cũng không thể có nhiều thread sử dụng cùng một consumer một cách an toàn. Quy tắc là mỗi thread chỉ có một consumer. Để chạy nhiều consumer trong cùng một consumer group trong một ứng dụng, chúng ta sẽ cần chạy mỗi consumer trong thread riêng của nó. Trong java, việc gói gọn logic của consumer trong một đối tượng riêng và sau đó sử dụng ExecutorService để khởi chạy nhiều thread.
3. Kết luận
Trong các phiên bản cũ của Kafka, signature đầy đủ của phương thức là poll(long); signature này hiện đã bị ngừng sử dụng và API mới là poll(Duration). Ngoài việc thay đổi kiểu tham số, hành vi block của phương thức cũng thay đổi một chút. Phương thức ban đầu, poll(long), sẽ block cho đến khi có được metadata cần thiết từ Kafka, ngay cả khi thời gian chờ lâu hơn thời lượng timeout. Phương thức mới, poll(Duration), sẽ tuân thủ giới hạn timeout và không chờ metadata. Nếu code consumer của chúng ta hiện tại sử dụng poll(0) như một cách để buộc Kafka lấy metadata mà không tiêu thụ bất kỳ bản ghi nào (một mẹo khá phổ biến), chúng ta không thể chỉ thay đổi nó thành poll(Duration.ofMillis(0)) và mong đợi hành vi giống nhau. Chúng ta sẽ cần tìm một cách mới để đạt được mục tiêu của mình. Thường thì giải pháp là đặt logic trong phương thức rebalanceListener.onPartitionAssignment(), phương thức này được đảm bảo sẽ được gọi sau khi chúng ta có metadata cho các partition được gán nhưng trước khi các message bắt đầu đến.
4. Thông tin kết nối
Nếu anh em muốn trao đổi thêm về bài viết, hãy kết nối với mình qua LinkedIn và Facebook:
- LinkedIn: https://www.linkedin.com/in/nguyentrungnam/
- Facebook: https://www.facebook.com/trungnam.nguyen.395/
Rất mong được kết nối và cùng thảo luận!