- vừa được xem lúc

[Debezium Series] Sử dụng Debezium Engine + PostgreSQL

0 0 12

Người đăng: Chu Văn Hạnh

Theo Viblo Asia

Giới thiệu

Tiếp nối series Debezium cơ bản hôm nay cùng mình tìm hiểu về Debezium Engine và cách sử dụng nó với CSDL PostgreSQL nhé.

Debezium Engine

Debezium hỗ trợ cho cho ae dev 1 bộ module debezium-api được phát triển bằng java cho phép bạn kết nối tới DB để đọc logs từ chúng.

Add dependencies vào file POM nhé

<dependency> <groupId>io.debezium</groupId> <artifactId>debezium-api</artifactId> <version>${version.debezium}</version>
</dependency>
<dependency> <groupId>io.debezium</groupId> <artifactId>debezium-embedded</artifactId> <version>${version.debezium}</version>
</dependency>
<dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-postgres</artifactId> <version>${debezium.version}</version>
</dependency>

nếu bạn sài các CSDL khác thì sẽ có những connector khác, ví dụ Mysql thì sẽ sẽ add như sau

<dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-mysql</artifactId> <version>${version.debezium}</version>
</dependency>

Tiếp theo mình tiếp hành khởi tạo connector

 this.engine = DebeziumEngine .create(ChangeEventFormat.of(Connect.class)) .using(setConfig()) .notifying(new CdcSummaryBatchHandler2()) .build();

để thực thi connector này mình sẽ tiến hành tạo ra luồng mới và tiến hành chạy.

 private final Executor executor = Executors.newSingleThreadExecutor(); @PostConstruct private void start() { this.executor.execute(engine); } @PreDestroy private void stop() { if (this.engine != null) { try { this.engine.close(); } catch (IOException e) { log.error("Can not close debezium engine"); } } }

trong đó hàm setConfig và class CdcSummaryBatchHandler2 là 2 đối tượng được mình tạo ra để khai báo config và lắng nghe các sự kiện từ CSDL.

	private Properties setConfig() { Properties configProperties = new Properties(); configProperties.put("plugin.name", "pgoutput"); configProperties.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); ///KAFKA configProperties.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); configProperties.put("key.converter.schemas.enable", "false"); configProperties.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); configProperties.put("value.converter.schemas.enable", "false"); configProperties.put("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); configProperties.put("offset.storage.file.filename", "/Users/vanhanhchu/Documents/file.DAT"); configProperties.put("offset.flush.interval.ms", "60000"); configProperties.put("name", "connectorName"); configProperties.put("database.server.name", applicationProperties.getInventorySummaryBranchCdc().getDatabaseServerName()); configProperties.put("database.hostname", applicationProperties.getInventorySummaryBranchCdc().getDatabaseHostname()); configProperties.put("database.port", applicationProperties.getInventorySummaryBranchCdc().getDatabasePort()); configProperties.put("database.user", applicationProperties.getInventorySummaryBranchCdc().getDatabaseUser()); configProperties.put("database.password", applicationProperties.getInventorySummaryBranchCdc().getDatabasePassword()); configProperties.put("database.dbname", applicationProperties.getInventorySummaryBranchCdc().getDatabaseDbname()); configProperties.put("database.tcpKeepAlive", 600); configProperties.put("table.include.list", "item-services.inventory_summary_branch,item-services.cdc_update_es,item-services.item_model,item-services.cdc_heart_beat"); // advantage config configProperties.put("skipped.operations", "d,u"); configProperties.put("provide.transaction.metadata", "true"); configProperties.put("max.batch.size", "500"); configProperties.put("slot.name", "etl_replication_1"); configProperties.put("snapshot.mode", "never"); configProperties.put("heartbeat.interval.ms", "30000"); configProperties.put("heartbeat.action.query", "UPDATE \"item-services\".cdc_heart_beat SET ts_time = now() WHERE id = 1;"); // return config return configProperties; }

Trong đó: skipped.operations là những event được bỏ qua(d=delete, u=update)

slot.name tên slot replication sẽ được khởi tạo trong CSDL của bạn

max.batch.size số lượng batch tối đa.

offset.storage.file.filename là đường dẫn file lưu trữ offset

table.include.list là danh sách table sẽ được debezium đọc logs

snapshot.mode là chế độ snapshot khi lần đầu khởi tạo, mặc định là initial thì debezium sẽ tiến hành snapshot toàn bộ dữ liệu từ logs và các giá trị bi thiếu. giá trị never thì debezium chỉ đọc các log từ thời điểm tạo replication.

heartbeat.interval.ms là cấu hình cứ sao X ms sẽ thực thi câu truy vấn heartbeat.action.query

heartbeat.action.query là cấu truy vấn được thực thi, việc thêm tác vụ này giúp bạn duy trì được kết nối ổn định tránh tình trạng quá lâu DB của bạn không phát sinh thay đổi.

còn lại là các giá trị mặc định và bạn không cần thay đổi.

Tiếp theo ta tiến hành implements class CdcSummaryBatchHandler2

public class CdcSummaryBatchHandler2 implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> { private final Logger log = LoggerFactory.getLogger(CdcSummaryBatchHandler2.class); @Override public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException { try { Map<String, List<RecordChangeEvent<SourceRecord>>> group = records.stream().collect(Collectors.groupingBy(x -> x.record().topic())); for (Map.Entry<String, List<RecordChangeEvent<SourceRecord>>> data : group.entrySet()) { log.debug("record value" + data.getValue()); committer.markBatchFinished(); } } catch (Throwable e) { log.error("ETL_ERROR " + e.getMessage()); e.printStackTrace(); } }
}

và đây là kết quả mình nhận được

cụ thể object RecordChangeEvent<SourceRecord> nó sẽ như này

record value[EmbeddedEngineChangeEvent [key=null, value=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206200, lsn_commit=34194206200, lsn=34194206264, txId=13456207, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=BEGIN,id=13456207}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, sourceRecord=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206200, lsn_commit=34194206200, lsn=34194206264, txId=13456207, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=BEGIN,id=13456207}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}], EmbeddedEngineChangeEvent [key=null, value=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206400, lsn_commit=34194206400, lsn=34194206400, transaction_data_collection_order_item-services.cdc_heart_beat=1, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=END,id=13456207,event_count=1,data_collections=[Struct{data_collection=item-services.cdc_heart_beat,event_count=1}]}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, sourceRecord=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206400, lsn_commit=34194206400, lsn=34194206400, transaction_data_collection_order_item-services.cdc_heart_beat=1, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=END,id=13456207,event_count=1,data_collections=[Struct{data_collection=item-services.cdc_heart_beat,event_count=1}]}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]]

dựa trên object trên ta có thể convert nó về đối object tương ứng bạn muốn, dưới đây là code example các bạn có thể tham khảo

/* * * * Copyright (c) 2024, T5K - Truyen5k.Com * * author: Hanh.Chu * */
static { { mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true); mapper.configure(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS, false); mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); mapper.configure(SerializationFeature.INDENT_OUTPUT, false); mapper.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, true); mapper.registerModule(new ParameterNamesModule()); mapper.registerModule(new Jdk8Module()); mapper.registerModule(new JavaTimeModule()); } } public static <T> Optional<T> mapToModel(SourceRecord sourceRecord, Class<T> clasz, Operation... opeAction) { Optional<Map<String, Object>> map = sourceRecordToMap(sourceRecord, opeAction); if (map.isPresent()) { return Optional.of(mapper.convertValue(map.get(), clasz)); } return Optional.empty(); } public static Optional<Map<String, Object>> sourceRecordToMap(SourceRecord sourceRecord, Operation... opeAction) { try { // map json to struct Struct sourceRecordValue = (Struct) sourceRecord.value(); if (sourceRecordValue != null && opeAction != null) { Operation operation = Operation.forCode((String) sourceRecordValue.get(OPERATION)); // Only if this is a transaction operation. update and create event if (Arrays.asList(opeAction).contains(operation)) { // Operation.CREATE || operation == Operation.UPDATE Struct struct = null; if (operation == Operation.DELETE) { struct = (Struct) sourceRecordValue.get(BEFORE); } else { struct = (Struct) sourceRecordValue.get(AFTER); } Struct finalStruct = struct; if (finalStruct == null) { return Optional.empty(); } Map<String, Object> message = struct.schema().fields().stream().map(Field::name) .filter(fieldName -> finalStruct.get(fieldName) != null) .map(fieldName -> Pair.of(fieldName, finalStruct.get(fieldName))) .collect(toMap(Pair::getKey, Pair::getValue)); return Optional.of(message); } } } catch (Exception e) { e.printStackTrace(); } return Optional.empty(); }

Reference

T5K - Truyện chữ online (provider source)

debezium.io - using Debezium Engine

Series

Phần 1: Debezium là gì? ứng dụng thực tế.

Phần 2: Cấu hình sử dụng Debezium Engine + PostgreSQL

Phần 3: Cấu hình sử dụng Debezium + PostgreSQL + Kafka Connect (coming soon)

Bình luận

Bài viết tương tự

- vừa được xem lúc

Data Change Capture (CDC) với Debezium

Mở đầu. Làm việc với hệ thống database từ trước đến nay vẫn luôn là công việc khó khăn và bạc đầu, dạo gần đây mình có cơ hội làm việc với một giải pháp mới cho database sử dụng để phát hiện ra sự tha

0 0 61

- vừa được xem lúc

[Debezium Series] Debezium là gì? ứng dụng thực tế.

Debezium là gì. Debezium là một open source giúp bạn phát triển các ứng dụng xử lý dữ liệu realtime, Debezium hoạt động như một connector, nó kết nỗi với các cơ sở dữ liệu bằng cách sử dụng log và phá

0 0 12

- vừa được xem lúc

[Debezium Series] Cấu hình sử dụng Debezium + PostgreSQL + Kafka Connect

Giới thiệu. Tiếp nối series Debezium cơ bản hôm nay cùng mình tìm hiểu về Debezium + PostgreSQL + Kafka Connect nhé.

0 0 18