Chuẩn bị cho yêu cầu truy vấn từ Consumer: Bên trong Kafka Producer và Consumer - Phần 3

0 0 0

Người đăng: Tom Riu

Theo Viblo Asia

Tác giả: Danica Fine | Ngày đăng: 15/10/2024

Link bài viết gốc | Link series tiếng Việt

Chào mừng bạn trở lại với phần thứ ba của series, nơi chúng ta đi sâu vào chiếc hộp đen diệu kì mang tên Apache Kafka® để hiểu rõ hơn cách chúng ta tương tác với cluster thông qua các Producer client và Consumer client.

Trong các phần trước của series, chúng ta đã quan sát Kafka Producer để thấy cách client hoạt động, sau đó theo dõi một Request từ Producer khi nó được cluster xử lý.

Trong bài viết này, chúng ta sẽ chuyển sự chú ý sang các Consumer client của Kafka để xem cách Consumer tương tác với các broker, điều phối các phân vùng và gửi yêu cầu để đọc dữ liệu từ các topic của bạn.

Nếu bạn đang tìm kiếm thêm thông tin về các phần còn lại của quy trình, hãy xem các bài viết khác trong series:

  1. Cách Producer Hoạt Động: Khám phá Producer đã làm gì phía sau hậu trường để chuẩn bị dữ liệu sự kiện thô (raw event data) để gửi đến Broker.
  2. Xử Lý Request Từ Producer: Tìm hiểu cách dữ liệu đi từ Producer Client đến ổ đĩa trên Broker.
  3. Chuẩn Bị Cho Yêu Cầu Truy Vấn Của Consumer: Xem cách Consumer chuẩn bị để gửi request lấy dữ liệu.
  4. Xử Lý Request Truy Vấn Của Consumer: Đi sâu vào hoạt động bên trong của Broker khi chúng cố gắng cung cấp dữ liệu cho Consumer.

Và dưới đây là bài thứ ba trong bốn phần, bàn về:
Chuẩn Bị Cho Yêu Cầu Truy Vấn Của Consumer: Xem cách Consumer chuẩn bị để gửi request lấy dữ liệu.

1. Chuẩn Bị Cho Yêu Cầu Truy Vấn Của Consumer

Không để lãng phí thời gian. Hãy bắt đầu thiết lập Consumer của chúng ta và bắt đầu lấy dữ liệu thôi nào! Nhưng mà ta sẽ lấy dữ liệu nào?

Để ôn lại, chúng ta đang đọc các dữ liệu ghi lại vị trí của các hobbit từ topic hobbit-updates với 6 phân vùng (partition). Sự kiện của chúng ta trông như sau:

{ "doc": "Accounting for the whereabouts and current activities of hobbits.", "fields": [ { "doc": "Name of the hobbit in question.", "name": "hobbit_name", "type": "string" }, { "doc": "Current location of the hobbit.", "name": "location", "type": "string" }, { "doc": "Current status of the hobbit.", "name": "status", "type": { "name": "Status", "type": "enum", "symbols": ["EATING", "NAPPING", "SMOKING", "ADVENTURING", "THIEVING"] } } ], "name": "hobbitUpdate", "type": "record"
}

Việc tiêu thụ một sự kiện từ topic này là một vấn đề đã được giải quyết bởi Kafka. Tất cả những gì bạn cần làm là thiết lập một Consumer client bằng ngôn ngữ lập trình yêu thích, gọi consumer.poll(), và bam… bạn đã có dữ liệu!

kafka-to-consumer-client.png

Nhưng không đơn giản như vậy đâu.

Tương tự như Kafka Producer, Kafka Consumer phải vượt qua nhiều bước để lấy dữ liệu từ Kafka cluster. Đó là một hành trình dài, đầy rẫy nguy hiểm và khó khăn.

consumer-client-request-flow.png

Nhưng để bắt đầu hành trình và bắt đầu lấy dữ liệu, trước tiên chúng ta cần thiết lập một Consumer và cấu hình nó để đọc từ topic hobbit-updates.

Bên Trong Consumer

consumer-client.png

1.1. Phân Vùng (Partitions)

Khi đã kết nối với các broker, nhiệm vụ đầu tiên của Consumer là xác định các topic và phân vùng (partition) mà nó sẽ tiêu thụ dữ liệu từ đó.

Nếu chỉ có một Consumer duy nhất, quá trình này rất đơn giản – Consumer sẽ được phân bổ tất cả các partition từ các topic mà nó đăng ký (subscribe). Tuy nhiên, nếu không phải vậy, có một vài cấu hình ảnh hưởng đến cách phân bổ các partition.

1.1.1. Cấu Hình Phân Vùng (Configuring partitions)

Tối thiểu, bạn cần chỉ định cho consumer biết các topic mà nó cần đăng ký (subscribe), nhưng không phải lúc nào consumer cũng đọc tất cả các partition từ topic đó, đặc biệt là khi nó trong một consumer group. Việc thuộc một consumer group có nghĩa là một tập hợp các consumer có thể hợp tác để xử lý song song dữ liệu từ các partition khác nhau trong một Kafka topic.

Chúng ta kích hoạt cơ chế consumer group bằng cách đặt tham số cấu hình group.id giống nhau trong các consumer. Khi group.id được thiết lập, consumer sẽ kết nối với consumer group coordinator – là một broker nằm trong cluster có nhiệm vụ điều phối toàn bộ các consumer group.

Lưu ý rằng nếu bạn gọi Consumer.subscribe(topic) hoặc muốn sử dụng cơ chế quản lý offset tích hợp sẵn của Kafka, bạn phải thiết lập group.id.

Nếu bạn sử dụng consumer group, một cấu hình quan trọng khác cần lưu ý là partition.assignment.strategy; đây chính là cách mà consumer group coordinator quyết định phân bổ các partition cho các consumer trong group. Có một vài kiểu phân bổ (assignor) mà bạn có thể sử dụng:

  • RangeAssignor: Assignor này sắp xếp các partition từ nhiều topic theo thứ tự từ 0 đến n và cố gắng gom các partition có cùng số thứ tự vào cùng một consumer. Trường hợp sử dụng tốt nhất cho assignor này là khi bạn có nhiều topic cùng sử dụng key partitioning strategy (xem lại bài đầu tiên của series) và bạn muốn consumer của mình kết hợp các luồng dữ liệu đó lại với nhau.
  • RoundRobinAssignor: Assignor này khá đơn giản. Nó sẽ duyệt qua tất cả các consumer trong consumer group và toàn bộ các topic-partition đã đăng ký (subscribe). Sau đó, nó phân bổ từng topic-partition luân phiên (round-robin) qua các consumer trong group.
  • StickyAssignor: Để hiểu assignor này, bạn cần hiểu chuyện gì xảy ra khi một consumer tham gia hoặc rời khỏi group. Về cơ bản, toàn bộ hệ thống sẽ "tạm dừng" (stop-the-world), partition đã được gán sẽ bị thu hồi, sau đó phân bổ lại và các consumer mới tiếp tục xử lý. Tuy nhiên, trong một số trường hợp, việc giữ nguyên phân bổ hiện tại cho các consumer không bị ảnh hưởng sẽ tối ưu hơn. StickyAssignor sẽ chỉ chuyển một partition sang một consumer khác nếu consumer được thêm vào hoặc bị loại bỏ khỏi group.
  • CooperativeStickyAssignor: Tương tự StickyAssignor, assignor này sử dụng cơ chế tái cân bằng hợp tác (cooperative rebalancing) thay vì tái cân bằng tức thì (eager rebalancing). Điểm mấu chốt là cooperative rebalancing (hay còn gọi là incremental rebalance) giúp tránh được tình trạng "stop-the-world" – khi không có consumer nào xử lý dữ liệu. Nó làm việc này bằng cách thu hồi từng phần và gán dần các partition bị ảnh hưởng, để những consumer không bị ảnh hưởng vẫn tiếp tục xử lý partition của mình.

1.1.2. Giám Sát Phân Vùng (Monitoring partitions)

Bây giờ, nếu chúng ta muốn theo dõi việc phân vùng và đảm bảo consumer group của mình hoạt động đúng như mong đợi? Đừng lo, Kafka cung cấp sẵn các số liệu (metrics) hữu ích cho việc này!

Dưới đây là các metrics ở cấp độ consumer nhưng chỉ khả dụng khi bạn bật group.id:

  • assigned-partitions: Giám sát số lượng partition được gán cho mỗi client.
  • rebalance-latency-avg: Trong quá trình rebalance, metric này hiển thị thời gian trung bình để group hoàn tất một lần rebalance.
  • rebalance-total: Tổng số lần rebalance đã diễn ra.

Tất nhiên, bạn sẽ mong muốn hai chỉ số cuối càng thấp càng tốt, cho thấy group đang hoạt động ổn định. Bên cạnh đó còn rất nhiều metrics hữu ích khác cho consumer group; Hãy khám phá thêm tại đây!

1.1.3. Phân Bổ Phân Vùng (The partition assignment)

Như bạn còn nhớ, topic hobbit-updates6 partition. Chúng ta có thể hình dung rằng có một số consumer cùng nhau xử lý và phân tích thông tin.

Hãy cùng xem cách gán partition trông như thế nào.

consumer-group

Ví dụ nếu chúng ta sử dụng round-robin partitioner hoặc sticky partitioner, các partition của topic hobbit-updates có thể được phân bổ cho các consumer đang chạy như trên. Giả sử chúng ta là Consumer 0, do đó ta sẽ phải chịu trách nhiệm đọc dữ liệu từ partition 0 và 3.

1.2. Offset

Mặc dù chúng ta đã biết các topic-partition mà consumer sẽ đọc, nhưng vẫn chưa thể bắt đầu đọc – vì ta cần xác định offset bắt đầu cho mỗi partition.

Offset giống như bookmark của các topic trong Kafka. Chúng xác định vị trí của một message trong topic-partition, giúp consumer biết mình đã đọc đến đâu. Vậy làm sao consumer vừa nhận được partition lại biết offset của chúng là bao nhiêu?

1.2.1. Truy Xuất Trước Khi Truy Xuất (The fetch before the fetch)

Đơn giản thôi! Consumer gửi một fetch request để lấy các offset liên quan từ topic nội bộ Kafka: __consumer_offsets, nơi các offset được lưu trữ trên Kafka cluster.

Nhưng nếu không có bất kỳ offset nào trong topic đó thì sao? Hoặc nếu offset không hợp lệ? Đó là lúc tham số cấu hình auto.offset.reset phát huy tác dụng. Chỉ khi có một offset không hợp lệ – không hợp lệ cũng có thể là offset không tồn tại - auto.offset.reset sẽ cho consumer biết nên bắt đầu đọc từ đâu trong Kafka topic. Các tùy chọn có thể là:

  • earliest: đọc lại từ offset đầu tiên
  • latest: đọc từ offset cuối cùng
  • none: consumer báo lỗi và dừng hoạt động, hoặc bạn tự xử lý exception và tìm một offset cụ thể

Về các số liệu (metrics) liên quan đến offset, khi consumer của bạn đang hoạt động, bạn có thể muốn theo dõi độ trễ của consumer (consumer lag) thông qua consumer-lag-offsets. Độ trễ (Lag) nghĩa là chênh lệch giữa offset cuối trên broker và offset cuối mà consumer group đã commit trên mỗi partition.

Note từ dịch giả:

  • Offset cuối trên broker: Vị trí của message mới nhất đã được ghi vào partition.
  • Offset cuối mà consumer group đã commit: Vị trí của message cuối cùng mà consumer group đã xử lý thành công và báo cáo lại cho Kafka.

Một độ trễ lớn cho thấy consumer đang bị tụt lại phía sau và không xử lý dữ liệu kịp thời, điều này có thể ảnh hưởng đến hiệu suất của ứng dụng.

1.3. Gửi Yêu Cầu Truy Xuất Dữ Liệu (Sending a fetch request)

Giờ chúng ta đã sẵn sàng để lấy dữ liệu từ topic hobbit-updates bằng cách gửi request đến broker. Quá trình này diễn ra thế nào?

Như đã đề cập, Consumer duy trì các kết nối socket với một số Kafka broker, nơi chứa các topic-partition mà consumer đang sử dụng và cả broker có vai trò là consumer group coordinator. Các consumer gửi fetch request bằng giao thức nhị phân qua TCP.

Đây là mô hình request-response. Consumer gửi request để fetch dữ liệu, và broker trả về response với kết quả của request đó – thường là kèm theo dữ liệu để xử lý. Chính quá trình request này là bước khởi động thật sự cho hành trình của chúng ta!

1.3.1. Cấu Hình Yêu Cầu Của Consumer (Configuring consumer requests)

Trước khi đi tiếp, có một số cấu hình cần lưu ý:

  • fetch.min.bytes, fetch.max.bytes, và max.partition.fetch.bytes: kiểm soát tổng quát lượng dữ liệu sẽ được trả về bởi mỗi fetch request từ consumer, nhưng lưu ý rằng giới hạn fetch.max.bytes không phải là giới hạn cứng – sẽ làm rõ sau. max.partition.fetch.bytes đặc biệt quan trọng khi một fetch request có thể trả về dữ liệu từ nhiều partition. Trong ba cấu hình trên, fetch.min.bytes lại là quan trọng nhất, vì nó ảnh hưởng đến thời gian broker cần để hoàn thành request.
  • fetch.max.wait.ms: Đây là thời gian server sẽ chặn và chờ để đạt đến fetch.min.bytes trước khi tiếp tục. Thời gian chờ này nên nhỏ hơn thời gian chờ toàn bộ request, được xác định bởi request.timeout.ms.
  • request.timeout.ms: Giới hạn thời gian tối đa cho request.

1.3.2. Giám Sát Yêu Cầu Của Consumer (Monitoring consumer requests)

Trước khi chuyển sang vòng đời của request, đáng chú ý là có một số số liệu liên quan đến cách request được xử lý bởi Consumer.

  • request-rate: Số lượng request được thực hiện mỗi giây bởi Consumer.
  • fetch-latency-avg: Khi fetch request được gửi đến broker, một bộ đếm thời gian bắt đầu. Nó chỉ dừng lại cho đến khi consumer nhận được response. Số liệu này mô tả thời gian thực hiện quá trình đó.
  • fetch-size-avg: Mô tả lượng dữ liệu trung bình được trả về trong mỗi lần fetch.

1.3.3. Yêu Cầu (The request)

Khi đã phân vùng (partition assignment) và xác định xong offset, mỗi yêu cầu truy xuất (fetch request) từ Consumer 0 của chúng ta sẽ trông như thế này:

fetch-request-consumer

Mỗi request đều có một số metadata đi kèm để broker biết các giới hạn mà request này cần tuân theo, bao gồm ngưỡng lấy dữ liệu (fetch thresholds)thời gian chờ tối đa (timeout) mà Consumer đã thiết lập.

Lưu ý rằng mỗi request chỉ được gửi đến một node hoặc broker duy nhất trong cụm (cluster). Trong ví dụ này, giả định rằng các partition 0 và 3 của topic hobbit-updates nằm trên cùng một broker (cùng với partition 1 của topic elf-updates). Nếu các topic-partition này nằm trên nhiều broker khác nhau, thì consumer sẽ gửi nhiều fetch request trong mỗi lần gọi poll (consumer.poll()).

1.4. Và Chúng Ta Bắt Đầu!

Bây giờ, chúng ta sẵn sàng bắt đầu hành trình mà bạn mong đợi. Hãy gửi request này đi và để các broker xử lý nó.

Hãy xem phần cuối cùng của loạt bài để biết những cuộc phiêu lưu nào đang chờ đợi fetch request của Consumer!

2. Tóm tắttt

Trong bài viết thứ ba của series, chúng ta chuyển trọng tâm từ producer sang consumer. Sau khi tìm hiểu cách producer chuẩn bị và gửi dữ liệu, chúng ta sẽ khám phá cách Kafka consumer:

  1. Kết nối với các broker và tham gia consumer group.
  2. Xác định các topic-partition cần đọc (trong ví dụ này là topic hobbit-updates với 6 partition).
  3. Lấy các sự kiện theo định nghĩa schema—mỗi sự kiện bao gồm tên, vị trí và trạng thái của một hobbit—bằng cách gọi consumer.poll().

Mặc dù kéo message về cho consumer trông có vẻ đơn giản (chỉ cần gọi poll()), nhưng thực tế phía sau, consumer phải làm rất nhiều việc trước khi thực sự nhận được message. Bài viết này sẽ hướng dẫn chi tiết các bước đó trước khi đi sâu vào vòng đời fetch request của consumer.

Bên trong Consumer

2.1. Partitions

  • Sau khi kết nối với các broker, consumer phải xác định ra topic và partition nào để đọc.
  • Một consumer đơn lẻ sẽ đọc tất cả partition của các topic đã đăng ký.
  • Trong consumer group, các partition được chia cho nhiều consumer để xử lý song song.

2.1.1. Configuring partitions

  • Đăng ký (Subscription)

    • Sử dụng Consumer.subscribe(topics) để chỉ định topic cần đọc.
    • Phải thiết lập group.id để kích hoạt subscription theo nhóm và quản lý offset tích hợp của Kafka.
  • Consumer Group

    • Các consumer cùng group.id sẽ kết nối với consumer group coordinator (một broker)
    • Coordinator chịu trách nhiệm phân bổ partition và commit offset.
  • Chiến lược Gán Phân vùng (partition.assignment.strategy)

    • RangeAssignor: Sắp xếp phân vùng từ 0→n và phân bổ các partition cùng số thứ tự cho cùng một consumer.
    • RoundRobinAssignor: Phân phối partition lần lượt theo vòng tròn.
    • StickyAssignor: Trong quá trình rebalance, chỉ di chuyển partition của consumer tham gia/rời consumer group.
    • CooperativeStickyAssignor: Sử dụng rebalance từng phần để tránh toàn bộ hệ thống bị dừng ("stop-the-world"), chỉ thu hồi và phân bổ lại các partition bị ảnh hưởng.

2.1.2. Monitoring partitions

  • Chỉ có khi bật group.id.
  • Các Metric Chính:
    • assigned-partitions: Số partition gán cho mỗi consumer.
    • rebalance-latency-avg: Thời gian trung bình để hoàn thành một lần rebalance.
    • rebalance-total: Tổng số lần rebalance đã diễn ra.
  • Mục tiêu là giữ chỉ số latency và số lần rebalance thấp để đảm bảo ổn định.

2.2. Offset

  • Mục đích: Xác định offset để bắt đầu đọc trong mỗi topic-partition (giống "bookmark").
  • Lấy ra offsets: Gửi fetch request đến topic nội bộ Kafka __consumer_offsets.
  • auto.offset.reset: Khi không có offset hoặc offset không hợp lệ, chọn earliest (từ đầu), latest (từ cuối) hoặc none (ném lỗi).
  • Giám sát: Dùng metric consumer-lag-offsets – chênh lệch giữa offset trên broker và offset được commit bởi comsumer group.

2.3. Sending a fetch request

  • Kết nối: Consumer giữ kết nối TCP với các broker và coordinator.
  • Luồng: Consumer gửi request → Broker trả về response với dữ liệu.

2.3.1. Configuring consumer requests

  • fetch.min.bytes / fetch.max.bytes: Giới hạn kích thước dữ liệu trả về (ưu tiên dùng min để điều chỉnh độ trễ).
  • max.partition.fetch.bytes: Giới hạn kích thước dữ liệu tối đa trả về từ mỗi partition
  • fetch.max.wait.ms: Thời gian chờ để đạt fetch.min.bytes trước khi phản hồi.
  • request.timeout.ms: Thời gian tối đa cho toàn bộ request.

2.3.2. Monitoring consumer requests

  • request-rate: Số request mỗi giây.
  • fetch-latency-avg: Thời gian từ khi gửi request đến khi nhận response.
  • fetch-size-avg: Kích thước trung bình dữ liệu trả về mỗi lần fetch.

2.3.3. The request

  • Mỗi request gửi kèm metadata (threshold, timeout) và được gửi tới một broker duy nhất.
  • Nếu các partition bạn cần nằm trên nhiều broker, consumer sẽ gửi nhiều request song song trong mỗi lần gọi poll().

Lời Kết

Mình xin gửi lời cảm ơn chân thành đến Danica Fine và đội ngũ Confluent đã mang đến loạt bài sâu sắc này - nhờ đó mà mình có cơ hội dịch thuật và chia sẻ kiến thức quý giá với mọi người.

Cảm ơn bạn đọc đã dành thời gian theo dõi blog, hy vọng phần dịch và ghi chú của mình sẽ giúp các bạn nắm bắt rõ hơn về cách Kafka vận hành “hộp đen” và tự tin hơn khi triển khai, vận hành hệ thống cũng như gỡ lỗi.

Nếu bạn có góp ý, câu hỏi hay kinh nghiệm thực tế muốn chia sẻ, đừng ngần ngại để lại bình luận hoặc liên hệ với mình. Mình rất mong được thảo luận và học hỏi thêm từ cộng đồng!

Chúc các bạn học tập và làm việc hiệu quả với Apache Kafka!

Link bài viết gốc | Link series tiếng Việt

Bình luận

Bài viết tương tự

- vừa được xem lúc

Kafka là gì?

Apache Kafka® là một nền tảng stream dữ liệu phân tán. . stream data: dòng dữ liệu, hãy tưởng tượng dữ liệu là nước trong 1 con suối. .

0 0 53

- vừa được xem lúc

001: Message-driven programming với Message broker và Apache Kafka

Bài viết nằm trong series Apache Kafka từ zero đến one. . . Asynchronous programming.

0 0 178

- vừa được xem lúc

002: Apache Kafka topic, partition, offset và broker

Bài viết nằm trong series Apache Kafka từ zero đến one. Nói qua về lịch sử, Kafka được phát triển bởi LinkedIn (các anh em dev chắc chẳng xa lạ gì) và viết bằng ngôn ngữ JVM, cụ thể là Java và Scala.

0 0 160

- vừa được xem lúc

003: Gửi và nhận message trong Apache Kafka

Bài viết nằm trong series Apache Kafka từ zero đến one. . . Nếu muốn các message được lưu trên cùng một partition để đảm bảo thứ tự thì làm cách nào.

0 0 236

- vừa được xem lúc

004: Apache Kafka consumer offset, Broker discovery và Zookeeper

Bài viết nằm trong series Apache Kafka từ zero đến one. 1) Consumer offset.

0 0 134

- vừa được xem lúc

Apache Kafka - Producer - Gửi message đến Kafka bằng kafka-python

Overview. Understand how to produce message and send to the Kafka topic. Architecture. .

0 0 77