Giới thiệu
Tiếp nối series Debezium cơ bản hôm nay cùng mình tìm hiểu về Debezium + PostgreSQL + Kafka Connect nhé.
Debezium + Kafka Connect
như trong phần 1 + 2 mình đã giới thiệu qua về Debezium, khi kết hợp cùng kafka nó đóng vai trò như một connector để gửi records vào Kafka và các sink connectors để truyền records từ các topic Kafka đến các hệ thống khác. Kafka Connect đóng vai trò phát hiện ra các thay đổi và push event vào trong Kafka.
Setup Debezium + Kafka Connect
Trong bài viết này, mình sẽ dùng Postgresql để làm demo, nên cài đặt Postgresql là tất nhiên rồi. Ngoài ra chúng ta sẽ cần cài thêm những cái như sau
- Debezium UI: giao diện web giúp bạn cấu hình và quản lý các connector(thằng này sẽ không có authen đâu nha, nếu muốn thì bạn setup thêm 1 em NGINX ở ngoài)
- Debezium Connect: Nó giúp ae theo dõi và bắt được những thay đổi diễn ra ở phía database. Debezium Connect sẽ đọc transaction log từ source database và phát hiện những thay đổi và gửi đến những streaming service như Kafka, AWS Kinesis...
- Kafka: một nền tảng streaming phân tán, chủ yếu được áp dụng làm hệ thống phân tán, “vận chuyển” message và thu thập, xử lý, lưu trữ và phân tích dữ liệu.
- Zookeeper: một dịch vụ phối hợp nguồn mở, phân tán dành cho các ứng dụng phân tán, tất nhiên nó thường đi kèm với kafka.
Để nhanh nhất có thể nên mình sẽ cung cấp docker file để ae có thể đem về làm demo cho các síp nhanh nhất có thể luôn nha. (mình dùng bản 1.7.2, bản này cũng khá cũ rồi, ae nhớ nâng cấp lên nha, tui lấy luôn docker file từ hồi demo cho síp nên nó hơi cũ)
version: '3'
services: zookeeper: container_name: zookeeper image: quay.io/debezium/zookeeper:1.7.2.Final networks: - ui-network ports: - "2181:2181" kafka: container_name: kafka image: quay.io/debezium/kafka:1.7.2.Final ports: - "9092:9092" depends_on: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 networks: - ui-network db-pg: container_name: db-pg image: quay.io/debezium/example-postgres:1.7.2.Final ports: - "65432:5432" environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres networks: - ui-network connect: container_name: connect image: quay.io/debezium/connect:nightly ports: - "8083:8083" depends_on: - kafka - db-pg environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses - ENABLE_DEBEZIUM_KC_REST_EXTENSION=true - ENABLE_DEBEZIUM_SCRIPTING=true - CONNECT_REST_EXTENSION_CLASSES=io.debezium.kcrestextension.DebeziumConnectRestExtension networks: - ui-network debezium-ui: container_name: debezium-ui image: quay.io/debezium/debezium-ui:1.7.2.Final ports: - "8080:8080" environment: - KAFKA_CONNECT_URIS=http://connect:8083 depends_on: - connect networks: - ui-network
networks: ui-network: external: false
Và ae sẽ lên trình duyệt gõ localhost:8080 sẽ nhận được 1 trong 2 hình cái gì đó như này.
Nếu như bị giống hình 1 thì đừng lo lắng quá, vô restart con connect lại là được nha (vì khi start có thể nó start trước con kafka nên bị như vậy thôi)
Tiếp theo ae có thể nhấn vô "Create a connector" vào mò mẫn trong đó, vô vàn thứ hay ho chờ ae trong đó nha.
Mình sẽ cung cấp cho ae 1 connector luôn để tiết kiệm thời gian nha.
POST http://localhost:8080/api/connector/1/postgres
{ "name": "test.31", "config": { "topic.prefix": "prefix", "database.hostname": "db-pg", "database.port":"5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "schema.include.list": "inventory", "table.include.list": "inventory.orders ", "heartbeat.interval.ms": 30000, "heartbeat.action.query": "select * from inventory.orders where id = 10001", "query.fetch.size": 500, "topic.creation.groups": "debezium-etl", "topic.creation.debezium-etl.include": "", "topic.creation.debezium-etl.exclude": "", "topic.creation.default.partitions": -1, "topic.creation.default.replication.factor": -1, "plugin.name": "pgoutput", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "skipped.operations": "r", "snapshot.mode": "never", "slot.name": "debezium_order", "publication.autocreate.mode": "filtered", "publication.name": "dbz_order_publication" }
}
Nếu bạn chưa hiểu những ý nghĩa config này, hãy đọc lại P2 tại đây
Ngoài ra config publication sẽ là những cái bạn cần đọc chi tiết hơn, nó khá quan trọng trong vận hành connector đó (xem thêm)
Debezium cung cấp 3 loại publication (config publication.autocreate.mode)
- all_tables - Nếu một publication tồn tại, connector sẽ sử dụng nó. Nếu không có publication nào tồn tại, connector sẽ tạo ra một publication cho tất cả các bảng trong cơ sở dữ liệu mà connector đang theo dõi sự thay đổi. Để connector tạo ra một publication, nó phải truy cập cơ sở dữ liệu thông qua một tài khoản người dùng cơ sở dữ liệu có quyền tạo publications và thực hiện các hoạt động sao chép. Bạn cấp quyền cần thiết bằng cách sử dụng lệnh SQL sau: CREATE PUBLICATION <publication_name> FOR ALL TABLES;.
- disabled - Connector không cố gắng tạo ra một publication. Một quản trị viên cơ sở dữ liệu hoặc người dùng được cấu hình để thực hiện các hoạt động sao chép phải đã tạo ra publication trước khi chạy connector. Nếu connector không thể tìm thấy publication, connector sẽ ném ra một ngoại lệ và dừng lại.
- filtered - Nếu một publication tồn tại, connector sẽ sử dụng nó. Nếu không có publication nào tồn tại, connector sẽ tạo ra một publication mới cho các bảng phù hợp với cấu hình bộ lọc hiện tại như được chỉ định bởi các thuộc tính cấu hình connector schema.include.list, schema.exclude.list, và table.include.list, và table.exclude.list. Ví dụ: CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>. Nếu publication tồn tại, connector cập nhật publication cho các bảng phù hợp với cấu hình bộ lọc hiện tại. Ví dụ: ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>.
Thường thì bạn sẽ rất khó để dùng all_tables vì nó đòi quyền khá cao(gần như quyền root), mình vẫn khuyến kích bạn nên sài filtered
nhé.
và kết quả nhận được là
Bạn có thể thực hiện sau SQL sau
SELECT slot_name, plugin, slot_type
FROM pg_replication_slots;
bạn sẽ nhận thấy có 1 dòng slot_name là debezium_order, đó chính là slot của bạn tạo ra.
Bạn tiếp tục thực hiện câu SQL sau
SELECT * FROM pg_stat_replication;
Bạn sẽ thấy connector của bạn đang được kết nối từ client nào.
SELECT * FROM pg_publication;
SELECT *
FROM pg_publication_tables
where pubname = 'dbz_order_publication';
Câu SQL này cung cấp cho bạn publication được tạo ra, trạng thái của nó và danh sách table đang được kết nối tới connector.
Và đây là kết quả, message đã được bắn lên kafka. Các bạn có thể kết nối tới kafka và xử lý dữ liệu theo nhu cầu của mình(phần 2 có code xử lý mẫu rồi nhen)
Và thế là kết thúc series cơ bản về Debezium, Trong series này mình đã mô tả lại khá kỹ những kiến thức cơ bản và những lưu ý khi sử dụng, chúc các ae tiếp nạp thêm kiến thức mới thành công.
Nếu setup gặp lỗi khó quá thì có thể ib cho mình(thông tin trong phần contact), mình sẽ giúp đỡ nếu có thể. *khó quá không fix được mới ib nhaaaaa.
Series
Phần 1: Debezium là gì? ứng dụng thực tế.
Phần 2: Cấu hình sử dụng Debezium Engine + PostgreSQL
Phần 3: Cấu hình sử dụng Debezium + PostgreSQL + Kafka Connect