Giới thiệu
Trong những năm gần đây, với sự bùng nổ của AI thì domain Big data, vốn đã rất phổ biến, cũng theo đó được quan tâm ngày càng nhiều hơn. Và trong những luồng Data streaming, ETL, v.v. thì có một thành phần trung chuyển không thể thiếu là Message queue. Message queue thì hiện có nhiều loại phổ biến và được dùng nhiều, tuy vậy, khi đề cập tới Big data, Apache Kafka được tin tưởng và sử dụng bởi rất nhiều đơn vị, bao gồm cả các corp lớn như Netflix, Tesla, Meta, etc. Chi tiết hơn về Apache Kafka các bạn có thể tìm hiểu thêm trên mạng, vì đã có rất nhiều bài viết phân tích về con Kafka này rồi 😄, trong bài này mình sẽ chỉ đề cập tới việc cài đặt nó với multi broker trên Docker.
Cài đặt
Các thành phần
Trước hết thì các bạn cần cài đặt Docker, các bước rất chi tiết và đơn giản đã được ghi rõ trên document của Docker, có thể xem tại đây. Trong bài viết này, mình sẽ cài đặt Kafka bao gồm 1 Zookeeper, 3 broker và 1 service Kafdrop cho mục đích visualization.
Docker compose
Bật Docker lên, tạo 1 file docker-compose.yml tại 1 directory bất kỳ và bắt đầu thôi.
version: "3" services: zookeeper: image: zookeeper:3.4.9 hostname: zookeeper ports: - "2181:2181" environment: ZOO_MY_ID: 1 ZOO_PORT: 2181 ZOO_SERVERS: server.1=zookeeper:2888:3888 volumes: - ./data/zookeeper/data:/data - ./data/zookeeper/datalog:/datalog kafka1: image: confluentinc/cp-kafka:5.3.0 hostname: kafka1 container_name: kafka-broker-1 ports: - "9091:9091" environment: KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://kafka1:9091 KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL volumes: - ./data/kafka1/data:/var/lib/kafka/data depends_on: - zookeeper kafka2: image: confluentinc/cp-kafka:5.3.0 hostname: kafka2 container_name: kafka-broker-2 ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 2 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:29092,LISTENER_DOCKER_EXTERNAL://kafka2:9092 KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:29092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL volumes: - ./data/kafka2/data:/var/lib/kafka/data depends_on: - zookeeper kafka3: image: confluentinc/cp-kafka:5.3.0 hostname: kafka3 container_name: kafka-broker-3 ports: - "9093:9093" environment: KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 3 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:29093,LISTENER_DOCKER_EXTERNAL://kafka3:9093 KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:29093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL volumes: - ./data/kafka3/data:/var/lib/kafka/data depends_on: - zookeeper kafdrop: image: obsidiandynamics/kafdrop:latest depends_on: - kafka1 - kafka2 - kafka3 ports: - "9123:9123" environment: SERVER_PORT: 9123 MANAGEMENT_SERVER_PORT: 9123 KAFKA_BROKERCONNECT: kafka-broker-1:19091,kafka-broker-2:29092,kafka-broker-3:29093
Zookeeper
Service đầu tiên mình cài đặt sẽ là Zookeeper, tiên quyết để các broker chạy vì thằng này sẽ có vai trò điều phối các broker. Hiện tại đã có cách cài đặt Kafka không cần Zookeeper, nhưng mình sẽ chưa đề cập đến trong bài này 😁. \
- Trong service Zookeeper, field
image
để xác định image được dùng,hostname
để đặt hostname cho container. Hostname được đặt cho container nằm xác định được container đó trong 1 network, giúp các container có thể giao tiếp với nhau. - Tại
port
, ta có thể thấy có 2 port được gắn vào. Port đầu tiên được gọi làHOST_PORT
, và port còn lại làCONTAINER_PORT
. Port này khá gây lú cho ae mới bắt đầu, có thể hiểu đơn giản là các container trong 1 network sẽ gọi nhau thông qua CONTAINER_PORT, còn các client bên ngoài gọi tới service sẽ thông qua CONTAINER_PORT environment
để xác định các cấu hìnhvolumes
để mount directory từ máy tới container, giúp không bị mất data khi stop/restart container. Volumes sẽ được viết theo định dạng[SOURCE]:[TARGET]:[MODE]
, trong đó source là directory tại máy, và target là directory trong container, mode mình thường để luôn là mặc định là read-write.
Kafka broker
Các field như port, image, v.v. trong service broker tương tự như với Zookeeper nên mình sẽ không viết thêm ở đây 😉. Ngoài ra, depends_on
để xác định thời điểm chạy container sau khi container mà nó depend đã được chạy. Trong này mình sẽ mô tả kỹ hơn các biến trong environment
\
KAFKA_ZOOKEEPER_CONNECT
: xác định ZookeeperKAFKA_BROKER_ID
: id của broker, các broker cần được đảm bảo có unique idKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
: replication factor của các topic, lưu ý value cho field này cần <= số brokerKAFKA_LISTENERS
: xác định physical network interface mà Kafka listenKAFKA_ADVERTISED_LISTENERS
: xác định phương thức mà client có thể gọi tới, là metadata sẽ được gửi lại cho client.KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
: các cặp key-value để định nghĩa các phương thức cho các listener khác nhauKAFKA_INTER_BROKER_LISTENER_NAME
: xác định listener dành cho các lời gọi internal
Trong cài đặt, ví dụ với broker 1, client ở trong Docker network sẽ connect với listener LISTENER_DOCKER_INTERNAL
với port 19091 và host kafka1. Ngược lại, client phía ngoài sẽ connect tới broker 1 thông qua listener LISTENER_DOCKER_EXTERNAL
với port 9091 và host là localhost.
Kafdrop
Kafdrop là một service được dùng cho mục đích visualization, cung cấp UI để người dùng dễ dàng thao tác với Kafka cluster. Cấu hình của Kafdrop khá đơn giản, chỉ cần xác định các broker trong biến KAFKA_BROKERCONNECT
. Ngoài ra field SERVER_PORT
và MANAGEMENT_SERVER_PORT
để mình chỉnh port cho Kafdrop, vì port default của nó là 9000 nhưng mà mình đang chạy 1 service khác ở 9000 mất rồi 😄.
Trong con Kafdrop này thì ae có thể view các message trong các partition của các broker, check các topic và các thông số liên quan như replication factor, leader, tạo topic, etc. Nói chung là dùng khá tiện và nên dùng 🤌.
Test
Sau khi đã cài đặt và mở Docker desktop lên check các container chạy ngon nghẻ, ae viết vài dòng code để check xem cụm Kafka của mình có hoạt động đúng không. Ở đây mình dùng Python vì nó nhanh và nó tiện, còn các bạn có thể dùng Java, nhưng nó sẽ dài và viết lâu hơn. Để gửi message vào Kafka viết bằng Python, ae cần install 1 lib là kafka-python, sau đó vào vụt luôn thôi.
Ở đây mình tạo 1 file kafka_prod.py
có nội dung như sau:
from kafka import KafkaProducer
import json
import datetime producer = KafkaProducer(bootstrap_servers=['localhost:9091', 'localhost:9092', 'localhost:9093'], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
topic = 'tuan' msg = { "current_time": str(datetime.datetime.now())
} producer.send(topic, value=msg)
producer.flush()
# print(msg)
Chạy file này bằng cmd python kafka_prod.py
, và khi thấy nó print ra terminal dict như đã viết là OK, sau đó vào Kafdrop check thử, thấy message vừa gửi là đã đẩy message vào thành công rồi 👌.
Tham khảo
https://www.confluent.io/blog/kafka-listeners-explained/
https://medium.com/@fintechdevlondon/kafka-listeners-and-advertised-listeners-what-are-they-and-what-do-they-do-9b004e2eb93d
https://demanejar.github.io/posts/Kafka-In-Depth/