- vừa được xem lúc

Delta Lake with Apache Spark

0 0 2

Người đăng: Nguyễn Hiệp

Theo Viblo Asia

Delta Lake with Apache Spark

Source (en): https://karlchris.github.io/data-engineering/projects/delta-spark/

Mở đầu

Sau khoảng thời gian dài trong việc quản lý dữ liệu, Data Warehouse vào khả năng lưu trữ data có cấu trúc (structured data) cũng như hỗ trợ truy vấn (query), từ đó data có thể được sử dụng với nhiều mục đích khác như BI, Machine Learning, Data Mining, ...

Cùng với sự gia tăng không ngừng của dữ liệu và nhu cầu sử dụng đống dữ liệu đó, data warehouse dần trở nên "struggled" khi gặp phải dữ liệu bán cấu trúc (semi-structured data) và phi cấu trúc (structured data), dần bộc lộ ra nhiều hạn chế. Từ đó, Data Lake ra đời, cung cấp giải pháp để lưu trữ gần như mọi loại dữ liệu từ có cấu trúc đến phi cấu trúc. Tuy nhiên Data Lake lại thiếu đi khả năng xử lý và truy các dữ liệu có cấu trúc như Data Warehouse.

Gần đây, với sự kết hợp điểm mạnh của cả Data Warehouse và cả Data Lake, cấu trúc Data LakeHouse được ra đời, cung cấp giải pháp lưu trữ linh hoạt như của data lake và hỗ trợ lưu trữ và truy vấn dữ liệu có cấu trúc như của data warehouse.

Giới thiệu về Delta Lake

Delta Lake là một open-source software, hỗ trợ một định dạng table tối ưu cho các data storage. Delta lake tối ưu hiều mặt của các data storage với nhiều tính năng mới và vượt trội như ACID, Time travel, Unified Batch & Streaming, Schema Enforcement & Evolution, ...

Delta lake sử dụng định dạng .parquet để lưu trữ các file dữ liệu của delta table. Một delta table bao gồm 2 thành phần chính:

  • Parquet files chứa các file dữ liệu dưới định dạng .parquet
  • Transaction log lưu trữ metadata về các transaction với dữ liệu trong

Delta lake đồng hỗ trợ cả ETL và ETL workloads, tuy nhiên ETL sẽ thích hợp hơn đối với delta lake bởi hiệu năng (Performance) và độ tin cậy (Reliability).

  • Performance: các truy vấn được tối ưu nhờ vào
    • Lưu trữ file paths và metadata trong Transaction log
    • Thực hiện partial read thông qua file-skipping
    • Co-locatinng các dữ liệu tương đồng để thực hiện skipping nếu có thể.

Tips: Co-locating similar data: Delta lake lưu các dữ liệu tương đồng ở gần nhau nhằm cải thiện performance thông qua các phương pháp như Z-Ordering hoặc Liquid Clustering.

  • Reliability: Delta lake tối ưu quy trình ETL thông qua việc áp dụng ACID transactions.

Liquid Clustering, Z-ordering and Hive-style partitioning

Cơ bản là đưa các dữ liệu giống nhau lại gần nhau hơn. Liquid Clustering là mới nhất, cũng như tối ưu nhất. ETL workload sẽ được tối ưu từ việc clustering nếu:

  • Thường xuyên filter các cột
  • Dữ liệu bị skew nặng
  • Dữ liệu gia tăng quá nhanh, cần maintenance và tuning
  • Access patternns thay đổi qua thời gian

Query Engine Supported

Delta lake chủ yếu hỗ trợ Apache Spark, tuy nhiên cũng hỗ trợ một số các loại query engine khác như polars

Tối ưu?

Các file nhỏ khiến việc read trở năng chậm ("the small file problem").

Ví dụ: query từ một delta table với 2m dòng, data được partitioned theo cột education. Ví dụ đầu thì có 1440 files mỗi partition và ví dụ sau chi có 1 per partition.

%%time
df = spark.read.format("delta").load("test/delta_table_1440")
res = df.where(df.education == "10th").collect() CPU times: user 175 ms, sys: 20.1 ms, total: 195 ms
Wall time: 16.1 s
%%time
df = spark.read.format("delta").load("test/delta_table_1")
res = df.where(df.education == "10th").collect() CPU times: user 156 ms, sys: 16 ms, total: 172 ms
Wall time: 4.62 s

Khác bọt vcl!

Offline optimize

Giả sử có một luồng ETL stream data vào một delta table đã partitioned và update mỗi phút => 1440 files mỗi partiton at the end of every day.

Có thể manually chạy OPTIMIZE command để optimize số lượng file. Lệnh này sẽ compact các file nhỏ thành file lớn hơn. Default file size trong mỗi partitionn sau khi chạy lệnh này là 1GB.

from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "test/delta_table")
deltaTable.optimize().executeCompaction()

Downstream queries sẽ được tối ưu hơn =))

Optimized Write

Không cần chạy manually, có thể config để chạy automatically.

Optimized Write sẽ combine các file nhỏ trong cùng 1 partition thành cùng 1 file.

Có thể viết option khi lưu như này:

df.write.format("delta").option("optimizeWrite", "True").save("path/to/delta")

Hoặc setting.

  • Cho toàn bộ delta table sử dụng table property: delta.autoOptimize.optimizeWrite
  • Cho cả Spark session: spark.databricks.delta.optimizeWrite.enabled

Warning: cái này lâu do bị shuffle => không được enables by default.

Auto Compaction

Giải quyết điểm yếu của Optimized Write. Cái này sẽ tối ưu bằng cách tự động chạy một small command optimize sau mỗi lệnh write

Setting:

  • Cho toàn bộ delta table sử dụng table property: delta.autoOptimize.autoCompact
  • Cho cả Spark session: spark.databricks.delta.autoCompact.enabled

Vacuum

Auto Compaction tối ưu sau mỗi write operation, các file nhỏ có thể vẫn tồn tại

> # get n files per partition
> !ls delta/census_table_compact/education\=10th/*.parquet | wc -l 1441

1440 files nhỏ (cũ) và 1 file to (mới - sau Auto Compatio) trong 1 partition. Tuy không ảnh hưởng đến hiệu năng của việc read, tuy nhiên nếu muốn loại bỏ thì dùng VACUUM với parameter là thời gian (giờ) preserve.

deltaTable.vacuum(0)

Lệnh này sẽ remove các files mà không còn được actively reference trong Transaction log.

Change Data Feed - Change Data Capture (CDC)

Cơ bản là Change Data Capture, on top of Delta lake, create by Databricks

CDF cho phép Databricks track row-level channge giữa các versionn của Delta table. Khi được enable trong một Delta table, runtime sẽ record sự thay đổi của mọi data được wriet vào table.

CREATE TABLE <table_name> ()
USING DELTA
PARTITION BY (col)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
LOCATION 'path/to/table'

Databricks khuyến khích nên sử dụng CDF với Structured Streaming để incrementally process thay đổi từ các Delta table.

def read_cdc_by_table_name(starting_version): return spark.read.format("delta") \ .option("readChangeFeed", "true") \ .option("startingVersion", str(starting_version)) \ .table("<table_name>") \ .orderBy("_change_type", "id") def stream_cdc_by_table_name(starting_version): return spark.readStream.format("delta") \ .option("readChangeFeed", "true") \ .option("startingVersion", str(starting_version)) \ .table("<table_name>") \ .writeStream \ .format("console") \ .option("numRows", 1000) \ .start()

Conclusion

Bình luận

Bài viết tương tự

- vừa được xem lúc

Cài đặt Apache Spark cho Ubuntu

Apache Spark là một framework dùng trong xử lý dữ liệu lớn. Nền tảng này trở nên phổ biến rộng rãi do dễ sử dụng và tốc độ xử lý dữ liệu được cải thiện hơn Hadoop.

0 0 41

- vừa được xem lúc

Đọc dữ liệu từ một file text và ghi lại dưới dạng file parquet trên HDFS sử dụng Spark (Phần 2)

Các bạn chưa đọc phần 1 thì có thể đọc tại đây nha : Đọc dữ liệu từ một file text và ghi lại dưới dạng file parquet trên HDFS sử dụng Spark (Phần 1). Ghi dữ liệu ra file parquet sử dụng Spark.

0 0 50

- vừa được xem lúc

Đọc dữ liệu từ một file text và ghi lại dưới dạng file parquet trên HDFS sử dụng Spark (Phần 1)

Định dạng text là một định dạng vô cùng phổ biến cả trên HDFS hay bất cứ đâu. Dữ liệu file text được trình bày thành từng dòng, mỗi dòng có thể coi như một bản ghi và đánh dấu kết thúc bằng kí tự "" (

0 0 37

- vừa được xem lúc

Tổng quan về Apache Spark cho hệ thống Big Data

Apache Spark in-memory clusters đang là sự chú ý của nhiều doanh nghiệp trong việc ứng dụng công nghệ vào phân tích và xử lý dữ liệu nhanh chóng. Bài viết này tôi sẽ trình bày một cách tổng quan nhất

0 0 164

- vừa được xem lúc

Tổng hợp bài viết giới thiệu về Hadoop và Spark thông qua khái niệm cơ bản và thực hành

Hadoop. Hadoop là framework dựa trên 1 giải pháp tới từ Google để lưu trữ và xử lý dữ liệu lớn.

0 0 232

- vừa được xem lúc

Spark - Distributed ML model with Pandas UDFs

Hình ảnh mình mượn tại đây nhé Cat&Doc. Why.

0 0 32