- vừa được xem lúc

Data Change Capture (CDC) với Debezium

0 0 61

Người đăng: Hoàng Việt

Theo Viblo Asia

Mở đầu

Làm việc với hệ thống database từ trước đến nay vẫn luôn là công việc khó khăn và bạc đầu, dạo gần đây mình có cơ hội làm việc với một giải pháp mới cho database sử dụng để phát hiện ra sự thay đổi từ một database và sao chép dữ liệu đó sang một database khác cùng loại hoặc khác loại. Để xử lý bài toán này mình sử dụng giải pháp có tên như title - Change Data Capture hay nói gắn gọn là CDC.

Change Data Capture là gì?

Đúng như cái tên của nó là bắt sự thay đổi dữ liệu, đây là kỹ thuật sử dụng để chúng ta bắt được những sự thay đổi về dữ liệu chứa trong database. Việc bắt được sự thay đổi về dữ liệu sẽ giúp chúng ta xử lý được kha khá các bài toán trong việc xử lý dữ liệu, những bài toán này thế nào cũng ta sẽ tìm hiểu trong phần tiếp theo.

Để mà có thể bắt được sự thay đổi dữ liệu này thì có rất nhiều các khác nhau, nguyên thủy nhất ta có thể sử dụng cơ chế TRIGGER trong các database đã hỗ trợ sẵn để bắt các ACTION về update, insert, delete,... Hoặc nhẹ nhàng hơn chúng ta có thể sử dụng các công cụ để làm việc này, điển hình có công cụ Debezium đang nổi bật nhất.

Lợi ích của CDC

Lợi ích đầu tiên mà chắc ai cũng thấy đó là việc sao chép dữ liệu sang các hệ thống khác, nếu nói về lợi ích này có thể 1 số bạn sẽ nói rằng: Các hệ thống database đều hỗ trợ các cơ chế replica rồi thì sử dụng luôn chứ cần gì phải sử dụng một tool bên ngoài cho rức đầu mà kém ổn định? Okay, đúng! Nếu bạn chỉ cần sao chép dữ liệu từ các database cùng loại (MySQL => MySQL, Mongo => Mongo) thì sử dụng luôn tính năng của database là tốt nhất. Tuy nhiên giờ nếu bạn muốn sao chép dữ liệu từ MySQL qua MongoDB hay từ MySQL qua PostgreSQL thì sẽ không có cơ chế đó. Trong bài toán này CDC sẽ đứng giữa để phát hiện sự thay đổi ở Database cần theo dõi và xử lý dữ liệu, sao đó có thể dùng code để xử lý và đẩy dữ liệu và hệ thống cần sao chép dữ liệu.

Một lợi ích khác cũng không kém phần quan trọng đó là khả năng Backup dữ liệu. Các sự kiện thay đổi dữ liệu sẽ được lưu trữ lại chính vì vậy nếu có chăng không may database của bạn bị drop vào lúc 9h sáng, bạn có thể lấy bản backup lúc 3h sáng và apply lại những thay đổi đã được lưu lại từ 3h đến 9h. Theo lý thuyết nếu bạn không miss bất cứ event nào thì dữ liệu của bạn sẽ được phục hồi đầy đủ như trước lúc bị drop. Quá đã đúng không 😃)))

Tiếp tục với lợi ịch đầu tiên, sau khi ta sao chép dữ liệu sang hệ thống khác ta có thể sử dụng hệ thống này cho việc testing thay vì tương tác trực tiếp trên hệ thống database thật. Không hiếm có những chuyện trong quá trình test Developers chạy những câu query tốn hàng phút để xử lý, thậm chí tệ hơn có thể gây ra lock hệ thống. Vấn đề này nhẹ thì gây giảm hiệu năng hệ thống, nặng thì gây crash luôn. CDC cũng là một cách giúp chúng ta giảm thiểu đi các trường hợp thế này sẽ xảy ra.

Ngoài ra CDC còn hỗ trợ cho một số bài toán đặc thù của từng hệ thống hay xử lý dữ liệu Big Data, bạn nào từng ứng dụng CDC vào các bài toán này rồi chia sẻ với mình ở dưới nhé.

Debezium - CDC Tool

Nói lý thuyết hoài mà không có tý ví dụ để xem thì có ích gì đâu, chính vì thế mình sẽ giới thiệu công cụ mà đã có thời gian làm việc và thấy khá ngon, công cụ này là Debezium. Debezium cốt lỗi của nó sử dụng Kafka để tạo ra các messages tương ứng với các events thay đổi dữ liệu. Debezium sử dụng các Connector để kết nối đến các hệ thống database và bắt ra sự thay đổi, hiện tại Debezium 1.9 hỗ trợ MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, DB2, Cassandra, Vitess. Bạn có thể xem hướng dẫn cho từng connector tại document chính thức: https://debezium.io/documentation/reference/2.0/connectors/index.html

Trong phần này mình sẽ mô tả các bước cài đặt của MySQL, loại database chắc mọi người làm việc nhiều nhất.

Đối với MySQL, Debezium sẽ dựa vào binlog để có thể phát hiện ra sự thay đổi về dữ liệu, chính vì vậy đối với hệ thống cần monitor thì bạn cần enable tính năng binlog này và chắc chắn user dùng để kết nối đến cần có quyền đọc binlog.

Mình làm việc với Kubernetes nhiều nên công cụ này mình sẽ hướng dẫn mọi người dựng trên K8s, đối với các môi trường khác như VM hay Docker thì về cơ bản cũng có các thành phần tương tự.

Debezium khi chạy trên K8s sẽ sử dụng Strimzi Operator (đây là một Operator cho Kafka). Đầu tiên ta tạo một namespace riêng cho ứng dụng này:

kubectl create ns debezium-example

Sau đó ta cần cài đặt Strimzi Operator

curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.20.0/install.sh | bash -s v0.20.0

Tạo Secret cho database demo

cat << EOF | kubectl create -n debezium-example -f
apiVersion: v1
kind: Secret
metadata: name: debezium-secret namespace: debezium-example
type: Opaque
data: username: ZGViZXppdW0= password: ZGJ6
EOF

Tạo User và phân quyền cho Debezium

cat << EOF | kubectl create -n debezium-example -f
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata: name: connector-configuration-role namespace: debezium-example
rules:
- apiGroups: [""] resources: ["secrets"] resourceNames: ["debezium-secret"] verbs: ["get"]
EOF
$ cat << EOF | kubectl create -n debezium-example -f
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata: name: connector-configuration-role-binding namespace: debezium-example
subjects:
- kind: ServiceAccount name: debezium-connect-cluster-connect namespace: debezium-example
roleRef: kind: Role name: connector-configuration-role apiGroup: rbac.authorization.k8s.io
EOF

Giờ đến phần quan trọng, chúng ta sẽ khởi tạo một cụm Kafka phục vụ cho việc lưu trữ changes event. Cấu hình dưới sẽ tạo 1 pod kafka tương ứng 1 broker và 1 pod zookeeper.

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata: name: debezium-cluster
spec: kafka: replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true authentication: type: tls - name: external port: 9094 type: nodeport tls: false storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 zookeeper: replicas: 1 storage: type: persistent-claim size: 100Gi deleteClaim: false entityOperator: topicOperator: {} userOperator: {}
EOF

Tiếp theo ta triển khai một database MySQL để test thử, user và pass của DB này là mysqluser - msqlpw

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: v1
kind: Service
metadata: name: mysql
spec: ports: - port: 3306 selector: app: mysql clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata: name: mysql
spec: selector: matchLabels: app: mysql strategy: type: Recreate template: metadata: labels: app: mysql spec: containers: - image: quay.io/debezium/example-mysql:1.9 name: mysql env: - name: MYSQL_ROOT_PASSWORD value: debezium - name: MYSQL_USER value: mysqluser - name: MYSQL_PASSWORD value: mysqlpw ports: - containerPort: 3306 name: mysql
EOF

Giờ ta sẽ triển khai các thành phần với vai trò kết nối đến MySQL và detect changes. Đầu tiên ta cần tạo KafkaConnect để thực hiện công việc detect changes:

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata: name: debezium-connect-cluster annotations: strimzi.io/use-connector-resources: "true"
spec: version: 3.1.0 replicas: 1 bootstrapServers: debezium-cluster-kafka-bootstrap:9092 config: config.providers: secrets config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status # -1 means it will use the default replication factor configured in the broker config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1 build: output: type: docker image: 10.110.154.103/debezium-connect-mysql:latest plugins: - name: debezium-mysql-connector artifacts: - type: tgz url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz
EOF

Sau đó ta triển khai thêm các KafkaConnector để kết nối đến MySQL được gắn vào KafkaConnect đã tạo ở trên

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata: name: debezium-connector-mysql labels: strimzi.io/cluster: debezium-connect-cluster
spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: tasks.max: 1 database.hostname: mysql database.port: 3306 database.user: ${secrets:debezium-example/debezium-secret:username} database.password: ${secrets:debezium-example/debezium-secret:password} database.server.id: 184054 database.server.name: mysql database.include.list: inventory database.history.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092 database.history.kafka.topic: schema-changes.inventory
EOF

Như vậy là đã setup xong và giờ ta có thể giám sát được sự thay đổi trong database. Ta chạy câu lệnh này để listen messages trong kafka

kubectl run -n debezium-example -it --rm --image=quay.io/debezium/tooling:1.2 --restart=Never watcher -- kcat -b debezium-cluster-kafka-bootstrap:9092 -C -o beginning -t mysql.inventory.customers

Mở một terminal khác, Giờ ta sẽ truy cập vào DB và tiến hành thêm một bản ghi để test thử:

kubectl run -n debezium-example -it --rm --image=mysql:8.0 --restart=Never --env MYSQL_ROOT_PASSWORD=debezium mysqlterm -- mysql -hmysql -P3306 -uroot -pdebezium

Add thêm một bản ghi:

sql> update customers set first_name="Sally Marie" where id=1001;

Nếu các bạn nhìn thấy có message dạng JSON như sau thì việc setup đã thành công:

{
... "payload": { "before": { "id": 1001, "first_name": "Sally", "last_name": "Thomas", "email": "sally.thomas@acme.com" }, "after": { "id": 1001, "first_name": "Sally Marie", "last_name": "Thomas", "email": "sally.thomas@acme.com" }, "source": { "version": "{debezium-version}", "connector": "mysql", "name": "mysql", "ts_ms": 1646300467000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 401, "row": 0, "thread": null, "query": null }, "op": "u", "ts_ms": 1646300467746, "transaction": null }
}

Message thông báo ra sẽ có 3 mục chính gồm source (nguồn chứa dữ liệu, ví dụ file binlog bao nhiêu), before (dữ liệu trước khi thay đổi) và after (dữ liệu sau khi thay đổi).

Kết

Trong quá trình cài mình gặp khá nhiều vấn đề :v, một phần do chưa có nhiều kinh nghiệm với Kafka. Nếu các bạn cài đặt gặp lỗi thì bạn không cô đơn đâu 😃))) Lỗi gì bạn có thể comment để mình hỗ trợ. Hy vọng bài viết này đã giúp bạn có thêm 1 vài kiến thức về Change Data Capture.

Một số source mình tham khảo: https://luminousmen.com/post/change-data-capture https://www.striim.com/tutorial/streaming-data-integration-using-cdc-to-stream-database-changes/

Bình luận

Bài viết tương tự

- vừa được xem lúc

Giới thiệu về Change Data Capture

Hiện tại, có nhiều cách để thực hiện việc lưu lại change data khi thực hiện Insert, Update, Delete như : triggers , so sánh bảng nguồn và bảng đích, cdc, change tracking, Row version, time stamps,… Mỗ

0 0 56

- vừa được xem lúc

001: Message-driven programming với Message broker và Apache Kafka

Bài viết nằm trong series Apache Kafka từ zero đến one. . . Asynchronous programming.

0 0 164

- vừa được xem lúc

002: Apache Kafka topic, partition, offset và broker

Bài viết nằm trong series Apache Kafka từ zero đến one. Nói qua về lịch sử, Kafka được phát triển bởi LinkedIn (các anh em dev chắc chẳng xa lạ gì) và viết bằng ngôn ngữ JVM, cụ thể là Java và Scala.

0 0 153

- vừa được xem lúc

003: Gửi và nhận message trong Apache Kafka

Bài viết nằm trong series Apache Kafka từ zero đến one. . . Nếu muốn các message được lưu trên cùng một partition để đảm bảo thứ tự thì làm cách nào.

0 0 222

- vừa được xem lúc

004: Apache Kafka consumer offset, Broker discovery và Zookeeper

Bài viết nằm trong series Apache Kafka từ zero đến one. 1) Consumer offset.

0 0 130

- vừa được xem lúc

Change account

Tài khoản cũ của mình có chút vấn đề nên mình chuyển sang dùng tài khoản mới để viết bài. Xin cảm ơn mọi người.

0 0 26