- vừa được xem lúc

Hướng dẫn chi tiết cách cài đặt và chạy Debezium với Kafka, PostgreSQL trên Docker

0 0 1

Người đăng: Hanh Kiều Văn

Theo Viblo Asia

Nếu các bạn chưa biết Debezium là gì thì có thể theo dõi bài viết này trước: https://viblo.asia/p/gioi-thieu-ve-debezium-y3RL1wMp4ao

Công cụ cần thiết:

  • Docker
  • Docker compose

Các bước cài đặt

1. Tạo file docker-compose.yml

📌Tạo một thư mục, vào thư mục đó và tạo file docker-compose.yml:

version: '3.8'
services: zookeeper: image: confluentinc/cp-zookeeper:latest container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - "2181:2181" kafka: image: confluentinc/cp-kafka:latest container_name: kafka depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 ports: - "9092:9092" postgres: image: debezium/postgres:16-alpine container_name: postgres environment: POSTGRES_USER: debezium POSTGRES_PASSWORD: debezium POSTGRES_DB: testdb ports: - "5432:5432" debezium-connect: image: debezium/connect:2.5 # Sửa lại từ "latest" thành phiên bản cụ thể container_name: debezium-connect depends_on: - kafka - postgres environment: BOOTSTRAP_SERVERS: kafka:9092 GROUP_ID: "1" CONFIG_STORAGE_TOPIC: debezium_connect_configs OFFSET_STORAGE_TOPIC: debezium_connect_offsets STATUS_STORAGE_TOPIC: debezium_connect_status ports: - "8083:8083" kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui depends_on: - kafka environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 ports: - "8080:8080" 

📌Lưu file này lại và chạy lệnh sau để khởi động Docker containers:

docker-compose up -d

2. Tạo table muốn CDC trong PostgreSQL

📌 Sau khi khởi động thành công, chúng ta sẽ vào PostgreSQL để tạo table . Ví dụ

CREATE TABLE orders ( id SERIAL PRIMARY KEY, product_name TEXT NOT NULL, status TEXT NOT NULL
);

📌 Kích hoạt Logical Replication cho PostgreSQL
Debezium sử dụng Logical Replication để theo dõi các thay đổi trong database. Do đó, chúng ta cần:
Bật chế độ logical replication trong PostgreSQL

ALTER SYSTEM SET wal_level = logical; #Cho phép PostgreSQL ghi lại dữ liệu thay đổi (change stream) dưới dạng logical, thay vì physical.
SELECT pg_reload_conf(); #Load lại cấu hình mà không cần restart database.

📌 Tạo publication cho các bảng cần theo dõi:
Tạo publication để chỉ định bảng nào cần theo dõi

CREATE PUBLICATION debezium_pub FOR TABLE orders; #Chỉ định rằng chỉ cần theo dõi bảng orders

3. Cấu hình Debezium để theo dõi PostgreSQL

Mở Gitbash lên và chạy curl :

curl -X POST "http://localhost:8083/connectors" \ -H "Content-Type: application/json" \ -d '{ "name": "postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "debezium", "database.password": "debezium", "database.dbname": "testdb", "database.server.name": "postgres_server", "table.include.list": "public.orders", "plugin.name": "pgoutput", "slot.name": "debezium_slot", "publication.name": "debezium_pub", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes", "topic.prefix": "dbserver1" #tạo topic với prefix là dbserver1 } }' 

4. Thử nghiệm thêm dữ liệu

📌 Mở terminal và truy cập PostgreSQL:

docker exec -it postgres psql -U debezium -d testdb

insert vào bảng order

INSERT INTO orders (product_name, status) VALUES ('Laptop', 'PENDING');

5. Kiểm tra dữ liệu trên Kafka

📌 Mở Kafka UI tại 👉 http://localhost:8080

📌 Chọn topic postgresserver.public.orders để xem dữ liệu. Đây là topic mà debezium publish message lên : image.png

Và đây là thành quả : image.png

Các bạn cũng có thể custom lại được message này trước khi publish message lên topic.


Tớ sẽ hướng dẫn mọi người cách custom message ở bài sau.
Nếu các bạn có bất kỳ thắc mắc nào hãy comment ở bên dưới mình sẽ giải đáp. 😁😁😁😁

Bình luận

Bài viết tương tự

- vừa được xem lúc

Giới thiệu về Change Data Capture

Hiện tại, có nhiều cách để thực hiện việc lưu lại change data khi thực hiện Insert, Update, Delete như : triggers , so sánh bảng nguồn và bảng đích, cdc, change tracking, Row version, time stamps,… Mỗ

0 0 57

- vừa được xem lúc

Data Change Capture (CDC) với Debezium

Mở đầu. Làm việc với hệ thống database từ trước đến nay vẫn luôn là công việc khó khăn và bạc đầu, dạo gần đây mình có cơ hội làm việc với một giải pháp mới cho database sử dụng để phát hiện ra sự tha

0 0 62

Giới thiệu về Debezium

1. Giới thiệu.

0 0 0

- vừa được xem lúc

001: Message-driven programming với Message broker và Apache Kafka

Bài viết nằm trong series Apache Kafka từ zero đến one. . . Asynchronous programming.

0 0 165

- vừa được xem lúc

002: Apache Kafka topic, partition, offset và broker

Bài viết nằm trong series Apache Kafka từ zero đến one. Nói qua về lịch sử, Kafka được phát triển bởi LinkedIn (các anh em dev chắc chẳng xa lạ gì) và viết bằng ngôn ngữ JVM, cụ thể là Java và Scala.

0 0 155

- vừa được xem lúc

003: Gửi và nhận message trong Apache Kafka

Bài viết nằm trong series Apache Kafka từ zero đến one. . . Nếu muốn các message được lưu trên cùng một partition để đảm bảo thứ tự thì làm cách nào.

0 0 224