1. Tổng quan
Như đã trao đổi ở bài viết trước, dữ liệu chỉ khả dụng, sẵn sàng cho consumer có thể nhìn thấy và đọc sau khi đã được cam kết (commit) với Kafka—tức là đã được ghi vào tất cả các bản sao đồng bộ. Điều này đảm bảo rằng consumer nhận được dữ liệu nhất quán. Việc duy nhất consumer cần làm là theo dõi xem đã đọc những message nào và chưa đọc những message nào. Đây là yếu tố rất quan trọng để không mất dữ liệu trong quá trình tiêu thụ.
Khi đọc dữ liệu từ một partition, consumer sẽ lấy một batch message, kiểm tra offset cuối cùng trong batch, sau đó yêu cầu một batch message khác bắt đầu từ offset cuối cùng đã nhận. Điều này đảm bảo rằng consumer luôn nhận được dữ liệu mới theo đúng thứ tự mà không bỏ lỡ bất kỳ message nào.
Khi một consumer dừng lại, một consumer khác cần biết bắt đầu công việc từ đâu—offset cuối cùng mà consumer trước đó đã xử lý trước khi dừng lại là gì? "Consumer khác" thậm chí có thể là chính consumer ban đầu sau khi khởi động lại. Một consumer nào đó sẽ tiếp tục tiêu thụ từ partition đó, và nó cần biết bắt đầu từ offset nào. Đây là lý do vì sao consumer cần "commit" các offset của chúng. Đối với mỗi partition mà nó đang tiêu thụ, consumer sẽ lưu vị trí hiện tại của nó để nó hoặc một consumer khác sẽ biết bắt đầu tiếp tục từ đâu sau khi khởi động lại. Trường hợp mà các consumer có thể mất message là khi đã commit các offset cho các sự kiện mà chúng đã đọc nhưng chưa xử lý xong. Bằng cách này, khi một consumer khác tiếp tục công việc, nó sẽ bỏ qua những message đó và chúng sẽ không bao giờ được xử lý. Do đó, việc chú ý kỹ đến thời điểm và cách commit các offset là rất quan trọng.
Một lưu ý quan trọng cần phải ghi nhớ trước khi chung ta đi sâu vào việc phân tích
Message đã được commit khác với các offset đã được commit.
Như đã đề cập trước đây, một message được commit là message đã được ghi vào tất cả các bản sao đồng bộ và có sẵn cho các consumer. Trong khi đó, các offset đã được commit là những offset mà consumer gửi đến Kafka để xác nhận rằng nó đã nhận và xử lý tất cả các message trong một partition cho đến offset cụ thể này.
2. Các thuộc tính cấu hình quan trọng của consumer để quá trình xử lý đảm bảo tin cậy
Có bốn thuộc tính cấu hình của consumer mà chúng ta cần hiểu rõ để cấu hình consumer với hành vi tin cậy mong muốn.
- Đầu tiên là
group.id
. Nếu hai consumer có cùng group ID và đăng ký cùng một topic, mỗi consumer sẽ được gán một tập hợp partition trong topic và do đó, chỉ đọc một phần tập message (nhưng cả nhóm sẽ đọc tất cả các message). Nếu cần một consumer nhìn thấy tất cả message trong các topic mà nó đăng ký, thì nó cần có mộtgroup.id
duy nhất. - Cấu hình thứ hai cần quan tâm là auto.offset.reset. Tham số này điều khiển hành vi của consumer khi không có offset nào được commit (ví dụ: khi consumer khởi động lần đầu) hoặc khi consumer yêu cầu offset không tồn tại trên broker. Có hai tùy chọn ở đây. Nếu chọn earliest, consumer sẽ bắt đầu từ đầu partition bất cứ khi nào không có offset hợp lệ. Điều này có thể khiến consumer xử lý lại nhiều message, nhưng đảm bảo giảm thiểu việc mất mát dữ liệu. Nếu chọn latest, consumer sẽ bắt đầu từ cuối partition. Điều này giảm thiểu việc xử lý trùng lặp của consumer nhưng gần như chắc chắn sẽ bỏ qua một số message.
- Cấu hình thứ ba liên quan là enable.auto.commit. Đây là một quyết định quan trọng: chúng ta sẽ để consumer tự động commit offset theo lịch trình, hay chúng ta sẽ commit offset thủ công trong code? Lợi ích chính của commit offset tự động là không cần phải lo lắng khi sử dụng consumer trong ứng dụng. Khi chúng ta xử lý tất cả các bản ghi đã tiêu thụ trong vòng lặp poll của consumer, commit tự động đảm bảo không commit nhầm offset chưa được xử lý. Tuy nhiên, nhược điểm của commit tự động là không kiểm soát được số lượng bản ghi trùng lặp mà ứng dụng có thể xử lý, do ứng dụng có thể dừng sau khi xử lý một số bản ghi nhưng trước khi commit tự động diễn ra. Khi ứng dụng có quy trình phức tạp hơn, như chuyển bản ghi cho một luồng khác xử lý, commit offset thủ công là lựa chọn duy nhất, vì commit tự động có thể commit các offset mà consumer đã đọc nhưng chưa xử lý.
- Cấu hình thứ tư liên quan, auto.commit.interval.ms, gắn liền với cấu hình thứ ba. Nếu chúng ta chọn commit offset tự động, cấu hình này cho phép chúng ta điều chỉnh tần suất commit. Mặc định là mỗi 5 giây. Nói chung, commit thường xuyên hơn sẽ tạo thêm chi phí nhưng giảm số lượng bản ghi trùng lặp khi consumer dừng lại.
Mặc dù không liên quan trực tiếp đến xử lý dữ liệu đáng tin cậy, nhưng rất khó để coi một consumer là đáng tin cậy nếu nó thường xuyên dừng việc tiêu thụ message và gây ra quá trình rebalance.
3. Commit Offset một cách rõ ràng trong các consumer
Nếu chúng ta quyết định cần nhiều sự kiểm soát hơn và chọn cách commit offset thủ công, chúng ta cần quan tâm đến tính chính xác và các tác động liên quan đến hiệu suất.
Chúng ta sẽ không đi sâu vào cơ chế và API liên quan đến việc commit offset ở đây. Thay vào đó, chúng ta sẽ xem xét những điểm quan trọng khi phát triển một consumer để xử lý dữ liệu một cách đáng tin cậy. Chúng ta sẽ bắt đầu với những điểm đơn giản và có thể rất hiển nhiên, sau đó chuyển sang những pattern phức tạp hơn.
Luôn commit offset sau khi xử lý xong message
Nếu chúng ta thực hiện toàn bộ quá trình xử lý trong vòng lặp poll và không duy trì trạng thái giữa các vòng lặp (ví dụ: trong quá trình tổng hợp dữ liệu), việc này sẽ khá đơn giản. Chúng ta có thể sử dụng cấu hình tự động cam kết (auto-commit), commit offset ở cuối vòng lặp poll, hoặc commit offset bên trong vòng lặp với tần suất cân bằng giữa yêu cầu về chi phí và việc tránh xử lý trùng lặp. Nếu có các luồng xử lý bổ sung hoặc xử lý có trạng thái, vấn đề này sẽ trở nên phức tạp hơn, đặc biệt vì đối tượng consumer không an toàn với các luồng (thread-safe).
Tần suất commit là sự đánh đổi giữa hiệu suất và số lượng bản sao trong trường hợp xảy ra sự cố
Ngay cả trong trường hợp đơn giản nhất khi chúng ta thực hiện toàn bộ quá trình xử lý trong vòng lặp poll và không duy trì trạng thái giữa các vòng lặp, chúng ta có thể chọn commit nhiều lần trong một vòng lặp hoặc chỉ commit sau một số vòng lặp. Việc commit có tác động lớn đến hiệu suất, tương tự như việc produce với acks=all, nhưng tất cả các commit offset của một consumer group được gửi đến cùng một broker, điều này có thể dẫn đến quá tải. Tần suất commit cần cân bằng giữa yêu cầu về hiệu suất và việc tránh trùng lặp. Việc commit sau mỗi message chỉ nên được thực hiện với các topic có lưu lượng (throughtput) rất thấp.
Commit đúng offset vào đúng thời điểm
Một lỗi phổ biến khi commit giữa vòng lặp poll là vô tình commit offset cuối cùng đã đọc khi polling thay vì offset sau offset cuối cùng đã xử lý. Hãy nhớ rằng, điều quan trọng là luôn commit offset cho các message sau khi chúng đã được xử lý—việc commit offset cho các message đã đọc nhưng chưa được xử lý có thể khiến consumer bỏ lỡ các message. Bài viết có các ví dụ chỉ ra cách làm điều này một cách chính xác.
Tái cân bằng (Rebalances)
Khi thiết kế một ứng dụng, chúng ta cần nhớ rằng quá trình rebalance consumer sẽ xảy ra, và chúng ta cần xử lý chúng đúng cách. Bài viết chứa một vài ví dụ về điều này. Việc xử lý thường bao gồm việc commit offset trước khi các partition bị thu hồi và dọn dẹp bất kỳ trạng thái nào mà ứng dụng duy trì khi được gán các partition mới.
Consumer có thể cần phải thử lại (retry)
Trong một số trường hợp, sau khi gọi poll và xử lý các bản ghi, có những bản ghi chưa được xử lý hoàn toàn và cần được xử lý sau đó. Ví dụ, chúng ta có thể cố gắng ghi các bản ghi từ Kafka vào một cơ sở dữ liệu nhưng phát hiện cơ sở dữ liệu không khả dụng vào thời điểm đó và cần phải thử lại sau. Lưu ý rằng, không giống như các hệ thống message pub/sub truyền thống, consumer Kafka commit offset và không "xác nhận" từng message riêng lẻ. Điều này có nghĩa là nếu chúng ta thất bại trong việc xử lý bản ghi #30 và thành công với bản ghi #31, chúng ta không nên commit offset #31—việc này sẽ đánh dấu là đã xử lý tất cả các bản ghi đến #31 bao gồm cả #30, điều này thường không phải là điều chúng ta muốn. Thay vào đó, hãy thử theo một trong hai mô hình sau:
Một lựa chọn khi gặp lỗi có thể thử lại là commit bản ghi cuối cùng đã được xử lý thành công. Sau đó, chúng ta sẽ lưu các bản ghi cần xử lý lại vào một bộ đệm (để lượt poll tiếp theo không ghi đè chúng), sử dụng phương thức pause() của consumer để đảm bảo rằng các poll tiếp theo sẽ không trả về dữ liệu, và tiếp tục thử xử lý các bản ghi đó. Một lựa chọn thứ hai khi gặp lỗi có thể thử lại là ghi lỗi vào một topic riêng biệt và tiếp tục. Một consumer group dùng riêng có thể được sử dụng để xử lý các bản ghi từ retry topic, hoặc một consumer có thể đăng ký cả main topic và retry topic, nhưng tạm dừng retry topic giữa các lần thử lại. Mô hình này tương tự như hệ thống dead-letter-queue được sử dụng trong nhiều hệ thống message queue.
Consumer có thể cần duy trì trạng thái
Trong một số ứng dụng, chúng ta cần duy trì trạng thái qua nhiều lần gọi poll. Ví dụ, nếu chúng ta muốn tính toán giá trị trung bình của số tiền giao dịch sau mỗi phút, chúng ta sẽ cần cập nhật giá trị trung bình sau mỗi lần poll Kafka để lấy message mới. Nếu quy trình của chúng ta bị khởi động lại, chúng ta không chỉ cần bắt đầu tiêu thụ từ offset cuối cùng mà còn cần khôi phục giá trị trung bình tương ứng. Một cách để làm điều này là ghi giá trị tích lũy mới nhất vào một "results topic" cùng lúc ứng dụng commit offset để lưu lại trạng thái tính toán. Điều này có nghĩa là khi một luồng khởi động lại, nó có thể lấy giá trị tích lũy mới nhất và tiếp tục từ nơi nó đã dừng.
4. Thông tin kết nối
Nếu anh em muốn trao đổi thêm về bài viết, hãy kết nối với mình qua LinkedIn và Facebook:
- LinkedIn: https://www.linkedin.com/in/nguyentrungnam/
- Facebook: https://www.facebook.com/trungnam.nguyen.395/
Rất mong được kết nối và cùng thảo luận!