Tác giả: Danica Fine | Ngày đăng: 25/11/2024
Link bài viết gốc | Link series tiếng Việt
Đây là phần cuối cùng trong bốn phần của chúng ta:
- Cách Producer Hoạt Động: Khám phá Producer đã làm gì phía sau hậu trường để chuẩn bị dữ liệu sự kiện thô (raw event data) để gửi đến Broker.
- Xử Lý Request Từ Producer: Tìm hiểu cách dữ liệu đi từ Producer Client đến ổ đĩa trên Broker.
- Chuẩn Bị Cho Yêu Cầu Truy Vấn Của Consumer: Xem cách Consumer chuẩn bị để gửi request lấy dữ liệu.
- Xử Lý Request Truy Vấn Của Consumer: Đi sâu vào hoạt động bên trong của Broker khi chúng cố gắng cung cấp dữ liệu cho Consumer.
Chúng ta đã đi một chặng đường dài, nhưng cuối cùng đã đến với phần thứ tư và cũng là phần cuối cùng của loạt bài viết. Trong loạt bài này, chúng ta đã lột bỏ từng lớp vỏ của Apache Kafka® để hiểu sâu hơn về cách tương tác hiệu quả với cluster thông qua các producer client và consumer client.
Và dưới đây là bài thứ tư trong bốn phần, bàn về:
Xử Lý Request Truy Vấn Của Consumer: Đi sâu vào hoạt động bên trong của Broker khi chúng cố gắng cung cấp dữ liệu cho Consumer.
Ở góc độ tổng quan, một fetch request gồm hai phần:
- Request Metadata: Đây là cấu hình của request.
- Fetch Request Data: Dữ liệu thực trong message
Hãy đi sâu tìm hiểu nào.
1. Xử Lý Yêu Cầu Truy Xuất Của Consumer
1.1. On Broker
1.1.1. General request handling
Không có gì ngạc nhiên khi quy trình xử lý fetch request từ Consumer có nhiều điểm tương đồng với request từ Producer. Thực tế, các bước giống hệt nhau cho đến đoạn các I/O thread truy cập dữ liệu trên ổ đĩa và gửi lại cho client.
Để cho ngắn gọn, chúng ta sẽ không lặp lại các bước trùng lặp ở đây. Nếu bạn cần ôn lại, hãy xem phần 2 của loạt bài này, nơi chúng ta đã đề cập chi tiết về quy trình xử lý request từ producer.
1.1.2. Page cache and disk
Đây là khoảnh khắc tất cả chúng ta đã chờ đợi: hobbit – nhân vật chính của chúng ta – cuối cùng đối mặt với nhiệm vụ tối thượng là đánh cắp kho báu... hay đúng hơn, đọc dữ liệu từ ổ đĩa.
Tất nhiên, các I/O thread sẽ xử lý phần này. Nhưng trước đó, hãy cùng ôn lại các kiến thức cơ bản về lưu trữ của Kafka.
Các Kafka topic là các commit log. Trên ổ đĩa (disk), các log này được chia thành các phân đoạn (segments). Mỗi segments gồm một vài tệp, nhưng có hai tệp quan trọng là:
Tệp .log
: Chứa dữ liệu sự kiện thực (actual event data) – các raw byte được lưu trữ.Tệp .index
: Rất quan trọng vì nó ánh xạ các offset của bản ghi (record) đến vị trí cụ thể của chúng trong tệp.log
, cho phép truy xuất nhanh chóng.
Khi các I/O thread phân tích một fetch request, chúng kiểm tra offset của các partition mục tiêu. Sử dụng offset đó cùng với tệp .index
, chúng xác định phạm vi của record cần truy xuất. Ngay lập tức, chúng kiểm tra xem có dữ liệu có đủ để đáp ứng fetch.min.bytes
trong request hay không. Nếu có, chúng tính toán vị trí của dữ liệu và tiến hành lấy nó.
- Lưu ý: Bạn có thể tự hỏi –
fetch.max.bytes
có vai trò gì ở đây, và tại sao nó không phải là giới hạn cứng? Một câu hỏi hay. Khi dữ liệu được ghi vào ổ đĩa trong tệp.log
, nó được serialize thành dạng blob – một khối byte liền mạch. Các I/O thread sẽ không chia nhỏ khối này, và cũng không chịu trách nhiệm giải mã các message. Vì vậy, nếu dữ liệu được yêu cầu có tồn tại – ngay cả khi blob lớn hơnfetch.max.bytes
– broker vẫn sẽ trả nó về để Consumer có thể tiếp tục xử lý. Logic tương tự áp dụng ở cấp độ partition vớimax.partition.fetch.bytes
.
Tại giai đoạn này, các I/O thread đã xác định được dữ liệu cần để gửi về cho Consumer. Bây giờ, một trong những trường hợp sau sẽ xảy ra:
- Nếu các tệp segment trên ổ đĩa chứa toàn bộ phạm vi dữ liệu, chúng ta (broker) sẵn sàng tạo phản hồi để gửi về. ✅
- Nếu dữ liệu nằm trong tiered storage, fetch request sẽ được chuyển cho các tiered fetch threads.📦🔄
- Nếu không có đủ dữ liệu để đáp ứng yêu cầu fetch tối thiểu (fetch.min.bytes), do đó request chưa được thực hiện nên sẽ được xếp vào vùng đợi... purgatory ⏳
Note từ dịch giả:
- Tiered Storage (Lưu trữ phân cấp) là một kiến trúc quản lý dữ liệu cho phép Kafka lưu trữ dữ liệu trên nhiều tầng (tier) lưu trữ khác nhau, chẳng hạn như ổ đĩa cục bộ, lưu trữ đám mây, ... Việc này dựa trên tần suất truy cập và yêu cầu về chi phí/hiệu suất.
- Tiered Fetch Threads (Luồng truy xuất dữ liệu phân cấp) là các luồng (thread) đặc biệt trong Kafka broker được thiết kế để xử lý các yêu cầu truy xuất (fetch requests) dữ liệu mà hiện đang nằm trong Tiered Storage.
1.1.3. Mondor Purgatory
Purgatory (tạm dịch là "vùng chờ") là một cấu trúc dữ liệu dạng map, được xây dựng trên cơ chế hierarchical timing wheel, nơi các fetch request được giữ lại tạm thời. Nếu broker chưa thể xử lý ngay, nó sẽ chờ ở đây cho đến khi:
- Đạt ngưỡng dữ liệu tối thiểu (
fetch.min.bytes
), hoặc - Hết thời gian chờ tối đa (
fetch.max.wait.ms
).
Khi một trong hai điều kiện này được thỏa mãn, response được gửi lại cho consumer.
Nếu một request đi vào purgatory, thì khi đạt ngưỡng fetch.min.bytes
hoặc vượt ngưỡng fetch.max.wait.ms
, broker sẽ lấy request ra và tạo response với dữ liệu phù hợp.
Ngược lại, nếu một request có thể được thực hiện ngay lập tức, nó sẽ bỏ qua purgatory và đi thẳng đến response queue.
- Lưu ý: Nếu consumer của bạn không nhận được dữ liệu trong quá trình polling, có thể là do chưa có đủ dữ liệu để đáp ứng request. Hãy kiểm tra lại cấu hình
fetch.min.bytes
để đảm bảo nó không bị đặt giá trị quá cao.
Note từ dịch giả:
- Hierarchical Timing Wheel là một cấu trúc dữ liệu dùng để quản lý các tác vụ hẹn giờ (timer) một cách hiệu quả.
- Hình dung Timing Wheel như một chiếc đồng hồ tròn chia ô theo dạng bánh xe – mỗi ô là một khoảng thời gian (1ms, 100ms,...).
- Các tác vụ (task) được đặt vào ô tương ứng với thời điểm cần thực thi.
- Khi thời gian đến, con trỏ xoay đến ô nào thì thực thi tác vụ của ô đó.
- "Hierarchical" nghĩa là chiếc đồng hồ có nhiều phân cấp, dùng cho các khoảng thời gian ngắn - trung - dài, giúp xử lý hàng triệu timer mà không tốn nhiều bộ nhớ hoặc CPU.
- Ví dụ:
- Timing Wheel 1 (cấp thấp): xử lý timer ngắn (1ms – vài giây)
- Timing Wheel 2 (cấp cao hơn): xử lý timer trung bình (vài giây – phút)
- Timing Wheel 3 (cấp cao nhất): xử lý timer dài (phút – giờ)
- Khi thời gian của một tác vụ vượt quá giới hạn của bánh xe hiện tại, nó sẽ được chuyển lên timing wheel cấp cao hơn.
1.1.4. Socket send buffer
Như đã đề cập trước đó, có nhiều điểm tương đồng giữa việc xử lý request từ producer và fetch request từ consumer. Tuy nhiên, cần làm nổi bật một số khác biệt trong cách dữ liệu được gửi lại cho Consumer client. Hãy cùng tìm hiểu xem nhé.
Cho đến hiện tại, chúng ta đã cùng thảo luận về cách một response được gửi lại cho Consumer như thế nào. Tuy nhiên, ngay cả sau khi đã xác định được dữ liệu cần gửi, dữ liệu đó không được nhúng trực tiếp vào đối tượng response mà broker tạo ra. Tại sao? Bởi vì việc broker phải di chuyển các khối dữ liệu lớn trong bộ nhớ nội bộ sẽ rất tốn tài nguyên, không tối ưu.
Thay vào đó, để tối ưu hóa hiệu suất, Kafka sử dụng cơ chế zero-copy – event data chuyền thẳng từ ổ đĩa đến socket send buffer, giảm tải bộ nhớ và tăng tốc độ truyền dữ liệu.
1.2. Back to the consumer
Bây giờ, chúng ta cuối cùng cũng đã trở lại consumer với một request chứa đầy dữ liệu - hoặc ít nhất, đó là điều chúng ta hướng tới. Nhưng khoan đã – mọi chuyện vẫn chưa xong đâu. Thực tế, phần thú vị giờ mới bắt đầu!
1.2.1. Deserialization
Hãy nhớ rằng, các Kafka broker chỉ làm việc với dữ liệu dạng byte. Vì vậy, event data chúng ta nhận được chỉ là các raw byte stream.
Bước đầu tiên là lấy payload từ request và giải mã (deserialize) nó, bao gồm cả phần key (nếu có). Để thực hiện điều này, bạn cần cấu hình hai tham số quan trọng: key.deserializer
và value.deserializer
. Các cấu hình này chuyển đổi các raw byte thành dữ liệu có thể sử dụng.
1.2.2. Poll and process
Khi dữ liệu đã được deserialize, đây chính là lúc áp dụng logic nghiệp vụ (business logic) mà bạn đã triển khai. Bạn nhớ poll loop bạn đã viết trong code của bạn chứ? Đúng vậy, bây giờ là lúc nó bắt đầu phát huy tác dụng!
Note từ dịch giả:
- Poll loop: Trong Kafka Consumer API, bạn thường sẽ có một vòng lặp vô hạn gọi phương thức consumer.poll(timeout) liên tục. Phương thức này chịu trách nhiệm:
- Tương tác với broker để lấy dữ liệu (nếu cần).
- Xử lý việc tái cân bằng partition (rebalancing).
- Trả về các bản ghi (record) đã được fetch và deserialize để ứng dụng của bạn xử lý.
- consumer.poll(): Phương thức mà consumer gọi để nhận các bản ghi.
1.2.2.1. Fetch ≠ Polling
Từ nãy đến giờ, chúng ta chỉ tập trung vào fetch request của consumer, nhưng có một điều bất ngờ: không phải lần gọi consumer.poll()
nào trong code của bạn cũng gửi đi fetch request. 🕵️♂️
Quá trình truy xuất dữ liệu từ broker (fetching) và consumer.poll()
thực ra là hai quá trình riêng biệt. Poll loop gửi đi một fetch request, consumer nhận được dữ liệu, nó sẽ lưu trữ tạm thời (cache) các bản ghi (record) đó. Sau đó, mỗi lần consumer.poll()
sẽ tiếp tục poll từ cache của nó cho đến khi cache rỗng. Chỉ khi cache rỗng, consumer mới gửi một fetch request mới đến broker.
1.2.2.2. Configuring polling
Vì consumer lưu trữ tạm thời (cache) các bản ghi (record), do đó bạn nên kiểm soát số lượng record được trả về mỗi lần gọi poll()
. May mắn thay, có các cấu hình thực hiện việc này!
Cấu hình quan trọng trong việc này là max.poll.records
, giới hạn số lượng record tối đa được trả về trong một lần poll (mặc định là 500). Nhưng nếu còn nhiều dữ liệu trong cache thì giới hạn này để làm gì?
Việc đặt giới hạn giúp theo dõi hiệu suất của consumer và đảm bảo nó hoạt động bình thường. Poll loop đóng vai trò như một "tín hiệu" để xác định xem liệu consumer còn "sống" hay không, cùng với các "tín hiệu" khác như heartbeat và session timeout.
Để quản lý việc này, chúng ta không chỉ giới hạn số lượng record trả về trên mỗi lần poll mà còn phải đặt giới hạn thời gian xử lý mỗi lần poll với max.poll.interval.ms
(mặc định là 5 phút). Nếu consumer xử lý vượt quá thời gian trên, đó là dấu hiệu cho thấy nó có thể đang bị kẹt – có thể bị treo khi xử lý một vài record nào đó hoặc bị lỗi. Nếu consumer đó không gửi heartbeat hoặc vượt quá session timeout, consumer group coordinator sẽ thu hồi partition được phân cho nó và loại bỏ nó khỏi group.
Note từ dịch giả:
Heartbeat là cơ chế "ping" định kỳ mà consumer gửi đến broker (cụ thể là consumer group coordinator) để thông báo rằng nó vẫn còn hoạt động và không bị treo hoặc mất kết nối.
1.2.3. Committing records
Khi xử lý các record được lấy về, điều quan trọng là consumer phải để ý xem nó đã xử lý đến đâu, để nếu có bị khởi động lại, nó cũng biết nên tiếp tục từ offset nào. Điều này được thực hiện bằng cách committing offsets.
Theo mặc định, việc commit offset là tự động khi cấu hình enable.auto.commit=true
. Trong chế độ này, một bộ xử lý offset sẽ commit các record đã xử lý 5 giây một lần dựa trên timestamps. Bạn có thể điều chỉnh khoảng thời gian này với auto.commit.interval.ms
.
Tuy nhiên, hãy nhớ rằng việc auto-commit này chỉ diễn ra theo lịch trình dựa trên thời gian. Nếu bạn cần commit sau một số lượng record cụ thể, bạn sẽ cần cấu hình thủ công consumer.commit()
. Nhưng hãy cẩn thận – làm như vậy có thể ảnh hưởng đến hiệu suất và thường không được khuyến nghị trừ khi thực sự cần thiết.
Khi các offset được commit, consumer sẽ tiếp tục xử lý các record từ cache của nó và gửi đi các fetch request mới khi cần.
1.2.4. Wash, rinse, and repeat
Miễn là consumer vẫn tiếp tục gửi heartbeat đến consumer group coordinator và không bị timeout, consumer group sẽ tiếp tục hoạt động trơn tru. Và ngay cả khi một consumer bị lỗi, đừng lo – các partition mà nó xử lý sẽ tự động được cân bằng lại, phân bổ lại (rebalanced) cho một phiên bản (instance) khác đang hoạt động, đảm bảo quá trình xử lý liên tục, không bị gián đoạn.
1.3. The End
Wow, đó là một lượng thông tin khổng lồ để có thể tiêu hóa! Nhưng thay vì cảm thấy choáng ngợp, hãy xem đây như là việc mở khóa một cấp độ mới về kiến thức.
Apache Kafka là một công nghệ thú vị, và mặc dù nó phức tạp, nhưng chính sự phức tạp đó làm cho nó mạnh mẽ – và thực sự là một chủ đề thú vị để học hỏi! Việc bạn đã hiểu được cách hoạt động bên trong của producer và consumer có nghĩa là bạn đã được trang bị đầy đủ kiến thức hơn bao giờ hết.
Điều tuyệt vời nhất là? Bạn không còn phải coi Kafka như một "hộp đen" bí ẩn nữa. Lần tới khi có sự cố xảy ra với producer hoặc consumer của bạn, bạn đã có đủ kiến thức để chẩn đoán và tự tin xử lý nó.
2. Tóm tắttt
Phần cuối trong chuỗi series 4 bài viết tìm hiểu sâu về cách hoạt động của Kafka.
2.1. Handling the consumer fetch on the broker
Quá trình xử lý fetch request từ consumer gần giống với request từ producer, trừ phần đọc dữ liệu từ ổ đĩa (page cache and disk).
2.1.1. Page cache and disk
Kafka lưu dữ liệu theo commit log, chia thành các segment chứa:
- Tệp
.log
(chứa raw byte), - Tệp
.index
(chứa index ánh xạ offset của các record → vị trí của các record trong tệp.log
).
⚙️ I/O thread xử lý fetch request::
-
Broker kiểm tra offset trên các partition mà consumer yêu cầu, xác định phạm vi record cần trả về. Sử dụng tệp
.index
để ánh xạ offset sang vị trí raw byte tương ứng trong tệp.log
. -
Kiểm tra nếu đủ dữ liệu theo
fetch.min.bytes
thì bắt đầu truy xuất. -
Dữ liệu được xử lý theo 3 hướng sau:
- ✅ Nếu dữ liệu nằm trong segment (tệp trên đĩa) → tạo response gửi ngay.
- 📦 Nếu nằm ở tiered storage → chuyển sang tiered fetch threads.
- ⏳ Nếu chưa đủ
fetch.min.bytes
→ yêu cầu bị đưa vào vùng chờ (purgatory).
📝 Lưu ý:
fetch.max.bytes
không phải giới hạn cứng vì dữ liệu được lưu thành blob không thể chia nhỏ → Broker vẫn trả về toàn bộ blob dù lớn hơn giới hạn nếu dữ liệu tồn tại.
2.1.2. Mordor Purgatory
-
Purgatory là một cấu trúc dữ liệu dạng map (sử dụng Hierarchical Timing Wheel) dùng để tạm giữ fetch request chưa đủ điều kiện xử lý.
-
Tại đây, fetch request sẽ chờ cho đến khi:
- Đủ dữ liệu theo
fetch.min.bytes
, hoặc - Hết thời gian chờ
fetch.max.wait.ms
.
- Đủ dữ liệu theo
-
Khi một trong hai điều kiện trên được đáp ứng, broker sẽ lấy request ra khỏi purgatory và gửi response về cho consumer.
-
Nếu request có thể xử lý ngay, nó bỏ qua purgatory và chuyển thẳng sang response queue.
🔍 Gợi ý xử lý lỗi: Nếu consumer không nhận được dữ liệu khi polling, có thể do fetch.min.bytes
quá cao, hãy kiểm tra lại cấu hình này.
2.1.3. Socket send buffer
-
Dù dữ liệu đã được xác định, broker không nhúng trực tiếp dữ liệu vào response object.
-
Thay vào đó, để tối ưu hiệu suất, Kafka dùng cơ chế zero-copy:
- Dữ liệu được chuyển trực tiếp từ ổ đĩa sang socket send buffer.
- Giảm chi phí cho việc chuyển dữ liệu qua lại giữa các bộ nhớ trong và tăng tốc độ truyền dữ liệu về client.
2.2. Back to the consumer
Sau khi broker gửi response, consumer nhận được raw bytes (dữ liệu gốc dưới dạng byte).
Việc đầu tiên consumer cần làm là giải mã dữ liệu (deserialization).
2.2.1. Deserialization
- Kafka gửi dữ liệu ở dạng raw byte.
- Consumer phải chuyển đổi các byte này thành dạng dữ liệu có thể dùng được bằng cách cấu hình:
key.deserializer
: để giải mã phần key.value.deserializer
: để giải mã phần giá trị (value).
2.2.2. Poll and process
Sau khi giải mã, consumer thực thi logic nghiệp vụ trong vòng lặp poll()
.
🔄 Fetching ≠ polling
- Gọi
consumer.poll()
không phải lúc nào cũng gửi fetch request lên broker. - Dữ liệu sau khi được fetch sẽ được cache lại, và consumer poll từ cache trước.
- Chỉ khi cache trống, consumer mới gửi fetch request mới.
⚙️ Configuring polling
max.poll.records
: Giới hạn số record trả về mỗi lầnpoll()
(mặc định: 500).
→ Giúp theo dõi và duy trì sức khỏe consumer.max.poll.interval.ms
: Giới hạn thời gian xử lý mỗi lần poll (mặc định: 5 phút).
→ Nếu xử lý quá lâu, consumer bị coi là "treo máy", không ping heartbeat cho consumer group coordinator → bị loại khỏi consumer group.
2.2.3. Committing records
-
Khi consumer xử lý record, cần lưu lại vị trí đã đọc (offset) để nếu bị khởi động lại, có thể tiếp tục đúng chỗ.
-
Mặc định, Kafka tự động commit offset khi
enable.auto.commit=true
:- Commit mỗi 5 giây, có thể thay đổi qua
auto.commit.interval.ms
.
- Commit mỗi 5 giây, có thể thay đổi qua
⚠️ Nếu muốn commit sau một số bản ghi cụ thể, cần gọi consumer.commit()
thủ công → nhưng có thể ảnh hưởng hiệu năng.
2.2.4. Wash, rinse, and repeat
Miễn là:
- Consumer vẫn gửi heartbeat đúng hạn
- Không vượt quá timeout
→ Consumer group sẽ hoạt động ổn định.
Ngay cả khi 1 consumer lỗi, Kafka sẽ tự rebalance partition cho consumer khác → đảm bảo xử lý liên tục.
Lời Kết
Mình xin gửi lời cảm ơn chân thành đến Danica Fine và đội ngũ Confluent đã mang đến loạt bài sâu sắc này - nhờ đó mà mình có cơ hội dịch thuật và chia sẻ kiến thức quý giá với mọi người.
Cảm ơn bạn đọc đã dành thời gian theo dõi blog, hy vọng phần dịch và ghi chú của mình sẽ giúp các bạn nắm bắt rõ hơn về cách Kafka vận hành “hộp đen” và tự tin hơn khi triển khai, vận hành hệ thống cũng như gỡ lỗi.
Nếu bạn có góp ý, câu hỏi hay kinh nghiệm thực tế muốn chia sẻ, đừng ngần ngại để lại bình luận hoặc liên hệ với mình. Mình rất mong được thảo luận và học hỏi thêm từ cộng đồng!
Chúc các bạn học tập và làm việc hiệu quả với Apache Kafka!