Delta Lake with Apache Spark
Source (en): https://karlchris.github.io/data-engineering/projects/delta-spark/
Mở đầu
Sau khoảng thời gian dài trong việc quản lý dữ liệu, Data Warehouse vào khả năng lưu trữ data có cấu trúc (structured data) cũng như hỗ trợ truy vấn (query), từ đó data có thể được sử dụng với nhiều mục đích khác như BI, Machine Learning, Data Mining, ...
Cùng với sự gia tăng không ngừng của dữ liệu và nhu cầu sử dụng đống dữ liệu đó, data warehouse dần trở nên "struggled" khi gặp phải dữ liệu bán cấu trúc (semi-structured data) và phi cấu trúc (structured data), dần bộc lộ ra nhiều hạn chế. Từ đó, Data Lake ra đời, cung cấp giải pháp để lưu trữ gần như mọi loại dữ liệu từ có cấu trúc đến phi cấu trúc. Tuy nhiên Data Lake lại thiếu đi khả năng xử lý và truy các dữ liệu có cấu trúc như Data Warehouse.
Gần đây, với sự kết hợp điểm mạnh của cả Data Warehouse và cả Data Lake, cấu trúc Data LakeHouse được ra đời, cung cấp giải pháp lưu trữ linh hoạt như của data lake và hỗ trợ lưu trữ và truy vấn dữ liệu có cấu trúc như của data warehouse.
Giới thiệu về Delta Lake
Delta Lake là một open-source software, hỗ trợ một định dạng table tối ưu cho các data storage. Delta lake tối ưu hiều mặt của các data storage với nhiều tính năng mới và vượt trội như ACID, Time travel, Unified Batch & Streaming, Schema Enforcement & Evolution, ...
Delta lake sử dụng định dạng .parquet
để lưu trữ các file dữ liệu của delta table. Một delta table bao gồm 2 thành phần chính:
- Parquet files chứa các file dữ liệu dưới định dạng
.parquet
- Transaction log lưu trữ metadata về các transaction với dữ liệu trong
Delta lake đồng hỗ trợ cả ETL và ETL workloads, tuy nhiên ETL
sẽ thích hợp hơn đối với delta lake bởi hiệu năng (Performance) và độ tin cậy (Reliability).
- Performance: các truy vấn được tối ưu nhờ vào
- Lưu trữ file paths và metadata trong Transaction log
- Thực hiện partial read thông qua file-skipping
- Co-locatinng các dữ liệu tương đồng để thực hiện skipping nếu có thể.
Tips: Co-locating similar data: Delta lake lưu các dữ liệu tương đồng ở gần nhau nhằm cải thiện performance thông qua các phương pháp như
Z-Ordering
hoặcLiquid Clustering
.
- Reliability: Delta lake tối ưu quy trình ETL thông qua việc áp dụng ACID transactions.
Liquid Clustering, Z-ordering and Hive-style partitioning
Cơ bản là đưa các dữ liệu giống nhau lại gần nhau hơn. Liquid Clustering là mới nhất, cũng như tối ưu nhất. ETL workload sẽ được tối ưu từ việc clustering nếu:
- Thường xuyên
filter
các cột - Dữ liệu bị skew nặng
- Dữ liệu gia tăng quá nhanh, cần maintenance và tuning
- Access patternns thay đổi qua thời gian
Query Engine Supported
Delta lake chủ yếu hỗ trợ Apache Spark, tuy nhiên cũng hỗ trợ một số các loại query engine khác như polars
Tối ưu?
Các file nhỏ khiến việc read
trở năng chậm ("the small file problem").
Ví dụ: query từ một delta table với 2m dòng, data được partitioned theo cột education
. Ví dụ đầu thì có 1440 files mỗi partition và ví dụ sau chi có 1 per partition.
%%time
df = spark.read.format("delta").load("test/delta_table_1440")
res = df.where(df.education == "10th").collect() CPU times: user 175 ms, sys: 20.1 ms, total: 195 ms
Wall time: 16.1 s
%%time
df = spark.read.format("delta").load("test/delta_table_1")
res = df.where(df.education == "10th").collect() CPU times: user 156 ms, sys: 16 ms, total: 172 ms
Wall time: 4.62 s
Khác bọt vcl!
Offline optimize
Giả sử có một luồng ETL stream data vào một delta table đã partitioned và update mỗi phút => 1440 files mỗi partiton at the end of every day.
Có thể manually chạy OPTIMIZE
command để optimize số lượng file. Lệnh này sẽ compact các file nhỏ thành file lớn hơn. Default file size trong mỗi partitionn sau khi chạy lệnh này là 1GB.
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "test/delta_table")
deltaTable.optimize().executeCompaction()
Downstream queries sẽ được tối ưu hơn =))
Optimized Write
Không cần chạy manually, có thể config để chạy automatically.
Optimized Write sẽ combine các file nhỏ trong cùng 1 partition thành cùng 1 file.
Có thể viết option khi lưu như này:
df.write.format("delta").option("optimizeWrite", "True").save("path/to/delta")
Hoặc setting.
- Cho toàn bộ delta table sử dụng table property: delta.autoOptimize.optimizeWrite
- Cho cả Spark session: spark.databricks.delta.optimizeWrite.enabled
Warning: cái này lâu do bị shuffle => không được enables by default.
Auto Compaction
Giải quyết điểm yếu của Optimized Write. Cái này sẽ tối ưu bằng cách tự động chạy một small command optimize
sau mỗi lệnh write
Setting:
- Cho toàn bộ delta table sử dụng table property: delta.autoOptimize.autoCompact
- Cho cả Spark session: spark.databricks.delta.autoCompact.enabled
Vacuum
Vì Auto Compaction tối ưu sau mỗi write
operation, các file nhỏ có thể vẫn tồn tại
> # get n files per partition
> !ls delta/census_table_compact/education\=10th/*.parquet | wc -l 1441
1440 files nhỏ (cũ) và 1 file to (mới - sau Auto Compatio) trong 1 partition. Tuy không ảnh hưởng đến hiệu năng của việc read
, tuy nhiên nếu muốn loại bỏ thì dùng VACUUM
với parameter là thời gian (giờ) preserve.
deltaTable.vacuum(0)
Lệnh này sẽ remove các files mà không còn được actively reference trong Transaction log.
Change Data Feed - Change Data Capture (CDC)
Cơ bản là Change Data Capture, on top of Delta lake, create by Databricks
CDF cho phép Databricks track row-level channge giữa các versionn của Delta table. Khi được enable trong một Delta table, runtime sẽ record sự thay đổi của mọi data được wriet vào table.
CREATE TABLE <table_name> ()
USING DELTA
PARTITION BY (col)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
LOCATION 'path/to/table'
Databricks khuyến khích nên sử dụng CDF với Structured Streaming để incrementally process thay đổi từ các Delta table.
def read_cdc_by_table_name(starting_version): return spark.read.format("delta") \ .option("readChangeFeed", "true") \ .option("startingVersion", str(starting_version)) \ .table("<table_name>") \ .orderBy("_change_type", "id") def stream_cdc_by_table_name(starting_version): return spark.readStream.format("delta") \ .option("readChangeFeed", "true") \ .option("startingVersion", str(starting_version)) \ .table("<table_name>") \ .writeStream \ .format("console") \ .option("numRows", 1000) \ .start()