Nếu các bạn chưa biết Debezium là gì thì có thể theo dõi bài viết này trước: https://viblo.asia/p/gioi-thieu-ve-debezium-y3RL1wMp4ao
Công cụ cần thiết:
- Docker
- Docker compose
Các bước cài đặt
1. Tạo file docker-compose.yml
📌Tạo một thư mục, vào thư mục đó và tạo file docker-compose.yml:
version: '3.8'
services: zookeeper: image: confluentinc/cp-zookeeper:latest container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - "2181:2181" kafka: image: confluentinc/cp-kafka:latest container_name: kafka depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 ports: - "9092:9092" postgres: image: debezium/postgres:16-alpine container_name: postgres environment: POSTGRES_USER: debezium POSTGRES_PASSWORD: debezium POSTGRES_DB: testdb ports: - "5432:5432" debezium-connect: image: debezium/connect:2.5 # Sửa lại từ "latest" thành phiên bản cụ thể container_name: debezium-connect depends_on: - kafka - postgres environment: BOOTSTRAP_SERVERS: kafka:9092 GROUP_ID: "1" CONFIG_STORAGE_TOPIC: debezium_connect_configs OFFSET_STORAGE_TOPIC: debezium_connect_offsets STATUS_STORAGE_TOPIC: debezium_connect_status ports: - "8083:8083" kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui depends_on: - kafka environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 ports: - "8080:8080"
📌Lưu file này lại và chạy lệnh sau để khởi động Docker containers:
docker-compose up -d
2. Tạo table muốn CDC trong PostgreSQL
📌 Sau khi khởi động thành công, chúng ta sẽ vào PostgreSQL để tạo table . Ví dụ
CREATE TABLE orders ( id SERIAL PRIMARY KEY, product_name TEXT NOT NULL, status TEXT NOT NULL
);
📌 Kích hoạt Logical Replication cho PostgreSQL
Debezium sử dụng Logical Replication để theo dõi các thay đổi trong database. Do đó, chúng ta cần:
Bật chế độ logical replication trong PostgreSQL
ALTER SYSTEM SET wal_level = logical; #Cho phép PostgreSQL ghi lại dữ liệu thay đổi (change stream) dưới dạng logical, thay vì physical.
SELECT pg_reload_conf(); #Load lại cấu hình mà không cần restart database.
📌 Tạo publication cho các bảng cần theo dõi:
Tạo publication để chỉ định bảng nào cần theo dõi
CREATE PUBLICATION debezium_pub FOR TABLE orders; #Chỉ định rằng chỉ cần theo dõi bảng orders
3. Cấu hình Debezium để theo dõi PostgreSQL
Mở Gitbash lên và chạy curl :
curl -X POST "http://localhost:8083/connectors" \ -H "Content-Type: application/json" \ -d '{ "name": "postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "debezium", "database.password": "debezium", "database.dbname": "testdb", "database.server.name": "postgres_server", "table.include.list": "public.orders", "plugin.name": "pgoutput", "slot.name": "debezium_slot", "publication.name": "debezium_pub", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes", "topic.prefix": "dbserver1" #tạo topic với prefix là dbserver1 } }'
4. Thử nghiệm thêm dữ liệu
📌 Mở terminal và truy cập PostgreSQL:
docker exec -it postgres psql -U debezium -d testdb
insert vào bảng order
INSERT INTO orders (product_name, status) VALUES ('Laptop', 'PENDING');
5. Kiểm tra dữ liệu trên Kafka
📌 Mở Kafka UI tại 👉 http://localhost:8080
📌 Chọn topic postgresserver.public.orders để xem dữ liệu.
Đây là topic mà debezium publish message lên :
Và đây là thành quả :
Các bạn cũng có thể custom lại được message này trước khi publish message lên topic.
Tớ sẽ hướng dẫn mọi người cách custom message ở bài sau.
Nếu các bạn có bất kỳ thắc mắc nào hãy comment ở bên dưới mình sẽ giải đáp. 😁😁😁😁