Tác giả: Danica Fine | Ngày đăng: 24/09/2024
Link bài viết gốc | Link series tiếng Việt
Chào mừng bạn đến với phần thứ hai của loạt bài viết nhằm tìm hiểu hoạt động bên trong của chiếc hộp đen tuyệt vời mang tên Apache Kafka®.
Chúng ta đào sâu vào Kafka để xem cách chúng ta thực sự tương tác với cụm (cluster) thông qua các Producer và Consumer. Trong quá trình này, chúng ta sẽ khám phá các cấu hình ảnh hưởng đến từng bước của hành trình hoành tráng này và các chỉ số (metrics) giúp giám sát quá trình hiệu quả hơn.
Trong bài viết trước, chúng ta đã khám phá những gì Producer client của Kafka thực hiện phía sau hậu trường mỗi khi gọi producer.send()
(hoặc tương tự, tùy thuộc vào ngôn ngữ bạn chọn). Trong bài viết này, chúng ta theo chân nhân vật chính dũng cảm của chúng ta – một Request từ Producer được tạo hoàn chỉnh – trên hành trình đến broker để được xử lý và lưu trữ dữ liệu trên cluster.
Để biết thêm thông tin về phần còn lại của hành trình, hãy xem các bài viết khác trong loạt bài:
- Cách Producer Hoạt Động: Khám phá Producer đã làm gì phía sau hậu trường để chuẩn bị dữ liệu sự kiện thô (raw event data) để gửi đến Broker.
- Xử Lý Request Từ Producer: Tìm hiểu cách dữ liệu đi từ Producer Client đến ổ đĩa trên Broker.
- Chuẩn Bị Cho Yêu Cầu Truy Vấn Của Consumer: Xem cách Consumer gửi request lấy dữ liệu.
- Xử Lý Request Truy Vấn Của Consumer: Đi sâu vào hoạt động bên trong của Broker khi chúng cố gắng cung cấp dữ liệu cho Consumer.
Và dưới đây là bài thứ hai trong bốn phần, bàn về:
Xử Lý Request Từ Producer: Tìm hiểu cách dữ liệu đi từ Producer Client đến ổ đĩa trên Broker.
1. Xử Lý Request Producer
Bên trong Broker
1.1. Bộ Đệm Nhận Dữ Liệu Của Socket (Socket receive buffer)
Bước đầu tiên cho các request khi đến broker là được lưu vào bộ đệm nhận dữ liệu của server socket. Đây là một khu vực tiếp nhận dữ liệu đến; tại đây, request sẽ chờ được các luồng mạng (network threads) xử lý.
Có một số tham số cấu hình cấp thấp để điều chỉnh hiệu suất của bộ đệm này, như kích thước toàn bộ bộ đệm (socket.receive.buffer.bytes
) và kích thước tối đa của một request được gửi đến (socket.request.max.bytes
). Thông thường, bạn không cần thay đổi các giá trị mặc định, và nếu bạn thực sự nghĩ cần thay đổi, hãy bàn bạc với Kafka cluster administrator của bạn.
Note của dịch giả:
Hãy hình dung như sau nhé:
-Socket là cánh cửa kết nối giữa ứng dụng của bạn và mạng lưới bên ngoài.
Dữ liệu là những "hàng hóa" đi qua cánh cửa đó.
-Socket receive buffer là một kho chứa tạm thời ngay phía trong cánh cửa (socket). Khi hàng hóa (dữ liệu) được chuyển qua cánh cửa, chúng sẽ được đặt vào kho này trước. Ứng dụng của bạn sẽ đến kho này để lấy hàng ra xử lý khi sẵn sàng.
1.2. Luồng Mạng (Network threads)
Sau một khoảng thời gian ngắn "lưu trú" trong bộ đệm nhận dữ liệu của socket, request sẽ được một network thread có sẵn từ bể (pool) tiếp nhận. Điều quan trọng cần lưu ý là network thread nào tiếp nhận request sẽ xử lý request ấy trong suốt vòng đời của nó.
Nhiệm vụ đầu tiên của network thread là đọc request từ socket buffer, chuyển nó thành một produce request object (khác với consumer fetch request), và thêm vào hàng đợi xử lý.
Có một cấu hình nâng cao tại bước này; thường bạn không cần thay đổi, nhưng cũng nên biết về nó. num.network.threads
xác định số lượng network thread hoạt động tại bước này. Mặc định là 3, và giới hạn trên thường bằng số lõi CPU trên server.
Cách để giám sát các luồng này là sử dụng NetworkProcessorAvgIdlePercent
. Giá trị dao động từ 0 (luồng được sử dụng hoàn toàn) đến 1 (luồng hoàn toàn rảnh rỗi). Bạn sẽ muốn giá trị này càng gần 1 càng tốt, cho thấy các luồng không bị quá tải.
1.3. Hàng Đợi Yêu Cầu (Request queue)
Một chặng ngắn nữa cho "người hùng" của chúng ta. Request của chúng ta có một khoảng thời gian ngắn trong request queue, nơi nó chờ được xử lý thêm bởi các I/O thread của máy chủ.
Tại bước này, bạn có thể kiểm soát số lượng request (queued.max.requests
) và kích thước tối đa của request (queued.max.request.bytes
). Nếu muốn thay đổi, một nguyên tắc chung là giới hạn queued.max.requests
bằng số lượng client đang hoạt động; điều này giúp đảm bảo thứ tự nghiêm ngặt của các message.
Bạn có thể giám sát kích thước request queue và thời gian một request nằm trong queue bằng các số liệu RequestQueueSize
và RequestQueueTimeMs
. Các số liệu này cùng nhau là đại diện cho hiệu năng của các I/O thread và tình trạng quá tải của broker. Theo dõi hai số liệu này là cần thiết vì nếu request queue bị đầy, nó sẽ chặn và ngăn các network thread thêm request mới.
1.4. Các Luồng I/O (Xử Lý Request)
Tiếp theo, request sẽ được một I/O thread đang rảnh - còn gọi là luồng xử lý request - lấy ra khỏi request queue.
Khi luồng này truy cập request, công việc đầu tiên là kiểm tra tính toàn vẹn của dữ liệu bên trong bằng cơ chế kiểm tra dự phòng tuần hoàn (cyclic redundancy check). Đây chỉ là một bước bổ sung để đảm bảo dữ liệu không bị vấn đề gì trong quá trình truyền.
Vì các I/O thread có khá nhiều việc phải làm trong việc xử lý dữ liệu và lưu vào ổ đĩa, bạn có thể cấu hình số lượng luồng này. Mặc định là 8, nhưng bạn có thể thay đổi số lượng bằng num.io.threads
.
Nếu bạn muốn theo dõi hiệu năng của các luồng, bạn có thể sử dụng RequestHandlerAvgIdlePercent
. Một lần nữa, bạn sẽ muốn giá trị này càng gần 1 càng tốt, thể hiện các luồng của bạn thường rảnh rỗi. Giá trị gần 0 có nghĩa là các luồng hiếm khi rảnh và đang làm việc hết công suất
Note của dịch giả:
Cyclic Redundancy Check (CRC)
Đây là một phép tính (checksum):
B1: Tính toán giá trị kiểm tra từ dữ liệu ban đầu.
B2: So sánh với giá trị kiểm tra kèm theo request.
B3: Phát hiện và loại bỏ các request có dữ liệu bị hỏng hoặc thay đổi không mong muốn.
1.5. Page cache and disk
Cuối cùng, khoảnh khắc bạn hằng chờ đợi: "người hùng" của chúng ta đã bước sâu vào hành trình sử thi và đối mặt với nhiệm vụ tối thượng… lưu dữ liệu xuống ổ đĩa. Thực ra, các I/O thread sẽ đảm nhiệm phần này của hành trình.
Nói sơ qua về cấu trúc lưu trữ trong Kafka. Kafka sử dụng log - cụ thể là commit log. Trên ổ đĩa, commit log gồm một tập hợp các thành phần gọi là segments. Mỗi segment lại bao gồm một vài tệp:
- Tệp
.log
, chứa dữ liệu sự kiện thực tế, các raw byte mà chúng ta muốn lưu trữ. - Tệp
.index
, rất quan trọng. Nó lưu trữ index ánh xạ từ offset của bản ghi (record) đến vị trí vật lý của bản ghi đó trong tệp.log
tương ứng. - Tệp
.timeindex
để truy cập các bản ghi sử dụng offset dựa trên thời gian (time-based offsets); dùng trong các tình huống khôi phục thảm họa (disaster recovery) cho consumer. - Tệp
.snapshot
lưu trữ thông tin về số thứ tự của producer (producer sequence numbers), hữu ích cho tính idempotence (tính bất biến khi lặp thao tác).
Có khá nhiều tệp để các I/O thread ghi vào. Và chúng ta đều biết việc ghi xuống ổ đĩa có thể tốn tài nguyên như thế nào. Vì vậy, khi các I/O thread ghi event vào các tệp .log
và .index
, để hiệu quả, các cập nhật sẽ được ghi hết lại trong page cache. Sau đó, chúng mới được đồng bộ (flush) xuống ổ đĩa.
Một số người có thể thấy hơi lo vì Kafka không dùng fsync
, nhưng như đồng nghiệp của tôi, Jack Vanlightly, đã nói trong blog của anh ấy về chủ đề này, “Giao thức nhân bản dữ liệu của Kafka được thiết kế để chạy an toàn mà không cần dùng fsync
.” Hơn nữa, bạn có thể tùy chỉnh một số cấu hình để kiểm soát tần suất ghi dữ liệu xuống đĩa:
- log.flush.interval.ms: Giới hạn thời gian cho việc flush tệp
.log
xuống ổ đĩa. - log.flush.interval.messages: Số lượng message có thể được ghi vào tệp
.log
trước khi tệp được flush xuống ổ đĩa. - log.segment.bytes: Kích thước tối đa của tệp
.log
trước khi các tệp chuyển sang segment kế tiếp.
Ngoài ra, có những cấu hình có thể ghi đè ở cấp độ topic. Một cấu hình phổ biến là cleanup.policy
, có thể nhận giá trị delete
và compact
. Giá trị delete
có nghĩa là các log cũ sẽ bị xóa sau một thời gian nhất định hoặc khi vượt ngưỡng kích thước tệp, trong khi giá trị compact
có nghĩa là sẽ giữ lại ít nhất một giá trị cho mỗi key, xóa các bản ghi cũ hơn của key đó.
Nếu bạn tò mò, cách tốt nhất để giám sát việc các log được flush xuống ổ đĩa như thế nào là sử dụng LogFlushRateAndTimeMs
. Một chỉ số (metrics) tốt khác là LocalTimeMs
, đo khoảng thời gian từ khi các I/O thread nhận request đến khi dữ liệu được ghi vào page cache.
Rõ ràng các I/O thread là những "siêu sao" thực sự trong quá trình này. Do đó, chúng ta cần để các I/O thread hoàn thành việc ghi dữ liệu nhanh nhất có thể để tiếp tục xử lý. Nhưng liệu page cache có đủ để đảm bảo dữ liệu của chúng ta an toàn không?
Nói chung thì có. Nhưng bạn vẫn nên đảm bảo chắc chắn dữ liệu đã được sao chép - acks
, và đó là lý do chúng ta có…
1.6. Vùng chờ (Purgatory)
Chúng ta đã thấy ở bước trước rằng dữ liệu không được ghi đồng bộ xuống ổ đĩa từ page cache, vì vậy việc Kafka sử dụng replication để đảm bảo dữ liệu được lưu trữ bền vững là rất quan trọng.
Vậy "vùng chờ" (purgatory) đóng vai trò gì? Purgatory là một cấu trúc dữ liệu giống như map dựa trên cơ chế hierarchical timing wheel, nơi các request được giữ lại cho đến khi quá trình nhân bản hoàn tất. Nhớ lại acks
từ Bài đầu tiên trong loạt bài này chứ? Theo mặc định, Kafka sẽ không gửi phản hồi lại cho producer cho đến khi dữ liệu được sao chép đến tất cả các broker.
Cluster administrator của bạn có thể thiết lập hệ số nhân bản trên toàn bộ cluster bằng default.replication.factor
, nhưng bạn có thể ghi đè cho từng topic riêng lẻ bằng replication.factor
.
Bạn có thể tự hỏi điều gì xảy ra khi request đang nằm trong vùng chờ. Câu trả lời là: hầu như không có gì xảy ra. Broker đang lưu request này chỉ đơn giản chờ các broker khác – những broker đang giữ bản sao phụ (follower replicas) – gửi fetch request để đồng bộ bản sao của mình. Mỗi broker biết rõ topic-partition nào là replica và broker nào giữ bản gốc (“golden copy”), rồi chúng sẽ fetch từ broker gốc để cập nhật.
Theo mặc định, các follower broker sẽ fetch thông tin này mỗi 500 ms, nhưng bạn có thể cấu hình lại với replica.fetch.wait.max.ms
. Bạn cũng có thể điều chỉnh số luồng mà follower broker dùng để fetch dữ liệu thông qua num.replica.fetchers
.
Để đảm bảo hoạt động ổn định cho cluster, bạn nên giám sát thời gian của quá trình replication – tức là thời gian request "chờ" trong purgatory – bằng chỉ số (metric) RemoteTimeMs
.
1.7. Hàng Đợi Phản Hồi (Response queue)
Khi đã đáp ứng đủ điều kiện acks
cho request ban đầu, broker sẽ lấy request ra khỏi vùng chờ (purgatory) và bắt đầu tạo response để gửi lại cho Producer client. Điểm dừng đầu tiên của response là hàng đợi phản hồi (response queue).
Chúng ta không thể cấu hình response queue như request queue, nhưng chúng ta có thể giám sát nó bằng các số liệu tương tự, như số lượng response trong hàng đợi (ResponseQueueSize
) và thời gian response ở trong hàng đợi (ResponseQueueTimeMs
).
1.8. Trở Lại Luồng Mạng (Network thread handoff)
Khi response sẵn sàng, nó lại được chuyển về các network thread!
Về mặt kỹ thuật, network thread đã tiếp nhận request ban đầu chính là thread đã xử lý mọi thứ cho đến giờ. Nhiệm vụ cuối cùng của nó lúc này là lấy response object đã tạo và gửi trả lại cho Producer client.
1.9. Bộ Đệm Gửi Dữ Liệu Của Socket (Socket send buffer)
Cuối cùng, response sẽ được đặt vào bộ đệm gửi dữ liệu của socket (socket send buffer) để chờ producer nhận. Thực tế, broker sẽ đợi cho đến khi producer nhận đầy đủ response rồi mới tiếp tục.
Với socket.send.buffer.bytes
, administrator của bạn có thể kiểm soát kích thước tổng thể của bộ đệm này. Và bạn nên theo dõi thời gian gửi response mất bao lâu; bạn có thể giám sát bằng ResponseSendTimeMs
.
1.10. Trở Lại Producer
Chúng ta đã trở lại Producer client với một response – hy vọng là một response tốt!
Điều đầu tiên, Producer sẽ kết thúc mọi bộ đếm thời gian và thu thập số liệu về request; ví dụ, với TotalTimeMs
, ta biết được request mất bao lâu để xử lý.
Tiếp theo, nếu dữ liệu được lưu thành công, producer client sẽ dọn dẹp và giải phóng batch trong buffer. Nếu không, nó sẽ thực hiện retry theo cấu hình và có thể gửi lại dữ liệu.
1.11. Phew!
Và đó là vòng đời của một request đến từ Producer. Đơn giản vờ lờ
Chúng ta đều biết đó là một quá trình dài và chi tiết, nhưng đó là những gì bạn sẽ thu được khi khám phá “hộp đen” Kafka.
Apache Kafka là một công nghệ phức tạp. Hành trình đào sâu vào nó không nhằm mục đích làm bạn ngợp. Thay vào đó, nó giúp bạn hiểu rõ quy trình và chuẩn bị tốt hơn cho lần debug tiếp theo.
2. Tóm tắttt
Trong bài viết này, nội dung tập trung vào việc theo dõi hành trình của một request gửi dữ liệu từ phía producer client đến broker, nơi nó sẽ được xử lý và ghi vào ổ đĩa trong Kafka cluster.
Bài viết cũng nhấn mạnh:
- Các thiết lập cấu hình ảnh hưởng đến từng bước trong quá trình xử lý.
- Các chỉ số giám sát (metrics) giúp theo dõi hiệu suất và tình trạng hệ thống hiệu quả hơn.
Loạt bài gồm bốn phần:
- Cách Producer Hoạt Động: Khám phá Producer đã làm gì phía sau hậu trường để chuẩn bị dữ liệu sự kiện thô (raw event data) để gửi đến Broker.
- Xử Lý Request Từ Producer: Tìm hiểu cách dữ liệu đi từ Producer Client đến ổ đĩa trên Broker.
- Chuẩn Bị Cho request Truy Vấn Của Consumer: Xem cách Consumer gửi request lấy dữ liệu.
- Xử Lý Request Truy Vấn Của Consumer: Đi sâu vào hoạt động bên trong của Broker khi chúng cố gắng cung cấp dữ liệu cho Consumer.
Và dưới đây là bài thứ hai trong bốn phần, bàn về:
Xử Lý Request Từ Producer: Tìm hiểu cách dữ liệu đi từ Producer Client đến ổ đĩa trên Broker.
Vòng đời Request của Producer
Mọi thứ diễn ra bên trong Broker
1. Socket Receive Buffer
- Bước đầu tiên khi Broker nhận request từ Producer sẽ đi vào
- Nó đóng vai trò như một vùng tiếp nhận tạm thời, chờ các luồng mạng (network threads) đến xử lý.
- Có 2 cấu hình quan trọng để tối ưu:
socket.receive.buffer.bytes
: kích thước tổng của buffer.socket.request.max.bytes
: kích thước tối đa của một request đến.
- Thông thường không cần thay đổi, trừ khi có request cần tối ưu đặc biệt.
2. Network Threads
- Một network thread trong pool sẽ nhận request và xử lý nó trong suốt vòng đời.
- Nhiệm vụ:
- Đọc từ buffer.
- Tạo thành request object.
- Đưa vào request queue để tiếp tục xử lý.
- Số lượng network thread điều chỉnh bằng
num.network.threads
(mặc định: 3). - Theo dõi hiệu suất bằng
NetworkProcessorAvgIdlePercent
(gần 1 nghĩa là luồng rảnh (tốt), gần 0 nghĩa là đang hoạt động hết công suất).
3. Request Queue
- Khu vực chờ trước khi I/O thread tiếp quản.
- Cấu hình:
queued.max.requests
: số lượng request tối đa trong hàng đợi.queued.max.request.bytes
: kích thước tối đa theo byte.
- Chỉ số:
RequestQueueSize
: số lượng request trong hàng đợi.RequestQueueTimeMs
: thời gian trung bình request nằm chờ.
→ Nếu hàng đợi đầy, network thread không thể đưa thêm request mới vào.
4. I/O Thread (Request Handlers)
- I/O Thread sẽ lấy request từ hàng đợi và xử lý nó.
- Việc đầu tiên là thực hiện Cyclic Redundancy Check để xác thực tính toàn vẹn của dữ liệu.
- Cấu hình:
- num.io.threads: số luồng I/O, mặc định là 8, có thể thay đổi nếu tải cao.
- Chỉ số:
- RequestHandlerAvgIdlePercent: gần 1 là tốt (luồng rảnh nhiều), gần 0 là dấu hiệu luồng đang quá tải.
5. Page cache and disk
Cấu trúc lưu trữ trong Kafka
Kafka lưu dữ liệu theo dạng commit log, được chia thành các segment. Mỗi segment gồm các file:
.log
: chứa dữ liệu thô (raw bytes)..index
: ánh xạ giữa offset và vị trí tương ứng trong.log
..timeindex
: hỗ trợ truy xuất theo thời gian (hữu ích khi khôi phục)..snapshot
: lưu số thứ tự producer (phục vụ idempotence).
Ghi dữ liệu: từ I/O Thread đến Page Cache
- Luồng I/O sẽ ghi dữ liệu vào
.log
và.index
. - Để tăng hiệu suất, thao tác này ghi vào page cache trước.
- Dữ liệu sẽ được flush ra đĩa sau đó.
Cấu hình flush dữ liệu:
log.flush.interval.ms
: khoảng thời gian tối đa giữa các lần flush.log.flush.interval.messages
: số message tối đa trước khi flush.log.segment.bytes
: kích thước tối đa của file.log
trước khi tạo segment mới.cleanup.policy
:delete
: xóa log cũ theo thời gian hoặc kích thước.compact
: giữ lại 1 bản ghi cuối cùng trên mỗi key.
Theo dõi hiệu suất ghi đĩa:
LogFlushRateAndTimeMs
: tốc độ & thời gian flush log.LocalTimeMs
: thời gian từ khi luồng I/O xử lý đến khi ghi vào page cache.
6. Purgatory
- Purgatory là nơi Kafka tạm giữ request sau khi ghi vào page cache nhưng trước khi gửi response, để đảm bảo replication đã hoàn tất.
- Đây là cấu trúc dạng map, dựa trên hierarchical timing wheel.
- Kafka chỉ phản hồi producer khi dữ liệu được replicate đến đủ số lượng broker, theo cấu hình
acks
.
Cấu hình liên quan:
default.replication.factor
: hệ số nhân bản mặc định của cluster.replication.factor
: hệ số riêng cho từng topic.
Cơ chế hoạt động:
- Mỗi broker biết partition nào mình là follower, và broker nào giữ bản gốc (golden copy).
- Các broker follower sẽ gửi fetch request định kỳ (mặc định mỗi 500ms, cấu hình qua
replica.fetch.wait.max.ms
) để cập nhật dữ liệu. - Có thể điều chỉnh số luồng fetch qua
num.replica.fetchers
.
Giám sát độ trễ:
- Dùng
RemoteTimeMs
để đo thời gian request nằm trong purgatory, phản ánh độ trễ của replication.
7. Response queue
- Khi replication xong, broker lấy request ra khỏi purgatory và bắt đầu tạo response.
- Response được đưa vào response queue (mỗi network thread có một).
- Không cấu hình được nhưng có thể theo dõi bằng:
ResponseQueueSize
: số lượng response trong hàng đợi.ResponseQueueTimeMS
: thời gian trung bình response nằm chờ.
8. Network thread handoff
- Cùng sẽ tiếp tục đảm nhiệm việc gửi response về producer.
9. Socket send buffer
- Response được đặt vào socket send buffer, chờ producer nhận.
- Kafka sẽ chờ tới khi toàn bộ response được producer nhận xong mới tiếp tục.
- Quản lý bằng:
socket.send.buffer.bytes
: kích thước tổng thể của bộ đệm- Theo dõi thời gian gửi qua
ResponseSendTimeMs
10. Back to the producer
- Khi nhận phản hồi:
- Producer dừng các timer và ghi nhận metric như
TotalTimeMs
(tổng thời gian xử lý yêu cầu). - Nếu thành công: xóa batch khỏi bộ đệm.
- Nếu thất bại: thực hiện retry theo cấu hình.
- Producer dừng các timer và ghi nhận metric như
Lời Kết
Mình xin gửi lời cảm ơn chân thành đến Danica Fine và đội ngũ Confluent đã mang đến loạt bài sâu sắc này - nhờ đó mà mình có cơ hội dịch thuật và chia sẻ kiến thức quý giá với mọi người.
Cảm ơn bạn đọc đã dành thời gian theo dõi blog, hy vọng phần dịch và ghi chú của mình sẽ giúp các bạn nắm bắt rõ hơn về cách Kafka vận hành “hộp đen” và tự tin hơn khi triển khai, vận hành hệ thống cũng như gỡ lỗi.
Nếu bạn có góp ý, câu hỏi hay kinh nghiệm thực tế muốn chia sẻ, đừng ngần ngại để lại bình luận hoặc liên hệ với mình. Mình rất mong được thảo luận và học hỏi thêm từ cộng đồng!
Chúc các bạn học tập và làm việc hiệu quả với Apache Kafka!