Giới thiệu
Trong việc xử lý dữ liệu phân tán và tính toán song song, PySpark là một trong những công cụ phổ biến và mạnh mẽ nhất. Trong PySpark, pyspark.sql.Window
là một lớp quan trọng trong module pyspark.sql
, cho phép chúng ta thực hiện các phép tính và phân tích cửa sổ trên các DataFrame. Trên thực tế, pyspark.sql.Window
cung cấp một cách tiện lợi để phân chia dữ liệu thành các nhóm (cửa sổ) và thực hiện các tính toán dựa trên cửa sổ đó. Trong bài viết này, chúng ta sẽ tìm hiểu về pyspark.sql.Window
và cách áp dụng nó trong xử lý dữ liệu phân tán.
pyspark.sql.Window
là gì?
Trong PySpark, pyspark.sql.Window
là một lớp được sử dụng để định nghĩa và xác định cửa sổ dữ liệu trong DataFrame. Nó cho phép chúng ta xác định cách phân chia dữ liệu thành các nhóm và sắp xếp chúng trong từng nhóm. Bằng cách sử dụng pyspark.sql.Window
, chúng ta có thể thực hiện các tính toán trên các nhóm dữ liệu, như tính toán tổng, trung bình, độ dốc, lệch chuẩn, v.v. Chính xác hơn, pyspark.sql.Window
cho phép chúng ta định nghĩa:
- Các cột để phân chia dữ liệu thành các nhóm.
- Cách sắp xếp dữ liệu bên trong mỗi nhóm.
- Phạm vi của các dòng trong mỗi cửa sổ.
Các phương thức quan trọng trong pyspark.sql.Window
-
Window.currentRow
: Đại diện cho dòng hiện tại trong cửa sổ. Khi sử dụngWindow.currentRow
, chúng ta chỉ định rõ rằng phạm vi tính toán hoặc thực hiện hàm phân tích chỉ áp dụng cho dòng hiện tại. -
partitionBy(*cols)
: Phương thức này xác định các cột để phân chia dữ liệu thành các nhóm. Các cột được đưa vàopartitionBy
sẽ làm cơ sở cho việc phân chia dữ liệu và tính toán cửa sổ theo từng nhóm riêng biệt. Ví dụ, nếu chúng ta muốn tính tổng theo từng nhóm dữ liệu của cột "category", chúng ta có thể sử dụngpartitionBy("category")
.
from pyspark.sql import Window
from pyspark.sql.functions import row_number
df = spark.createDataFrame( [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]) window = Window.partitionBy("category").orderBy("id")
df.withColumn("row_number", row_number().over(window)).show() -----
+---+--------+----------+
| id|category|row_number|
+---+--------+----------+
| 1| a| 1|
| 1| a| 2|
| 2| a| 3|
| 1| b| 1|
| 2| b| 2|
| 3| b| 3|
+---+--------+----------+
orderBy(*cols)
: Phương thức này xác định cách sắp xếp dữ liệu bên trong mỗi nhóm. Các cột được đưa vàoorderBy
sẽ được sắp xếp theo thứ tự tăng dần để xác định thứ tự của các dòng trong cửa sổ. Ví dụ, nếu chúng ta muốn sắp xếp theo cột "date" và "time", chúng ta có thể sử dụngorderBy("date", "time")
.
from pyspark.sql import Window
from pyspark.sql.functions import row_number
df = spark.createDataFrame( [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]) window = Window.partitionBy("id").orderBy("category")
df.withColumn("row_number", row_number().over(window)).show() ----
+---+--------+----------+
| id|category|row_number|
+---+--------+----------+
| 1| a| 1|
| 1| a| 2|
| 1| b| 3|
| 2| a| 1|
| 2| b| 2|
| 3| b| 1|
+---+--------+----------+
rowsBetween(start, end)
: Phương thức này xác định phạm vi của các dòng trong mỗi cửa sổ. Các tham sốstart
vàend
có thể nhận các giá trị nhưWindow.unboundedPreceding
,Window.unboundedFollowing
, hoặc các số nguyên đại diện cho số lượng dòng trước và sau dòng hiện tại. Ví dụ, nếu chúng ta muốn tính tổng các dòng từ dòng hiện tại trở về 2 dòng trước, chúng ta có thể sử dụngrowsBetween(-2, 0)
.
from pyspark.sql import Window
from pyspark.sql import functions as func
df = spark.createDataFrame( [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"])
Tính tổng id trong phạm vi từ currentRow đến currentRow + 1 trong danh mục phân vùng
window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
df.withColumn("sum", func.sum("id").over(window)).sort("id", "category", "sum").show() -----
+---+--------+---+
| id|category|sum|
+---+--------+---+
| 1| a| 2|
| 1| a| 3|
| 1| b| 3|
| 2| a| 2|
| 2| b| 5|
| 3| b| 3|
+---+--------+---+
-
rangeBetween(start, end)
: Phương thức này xác định phạm vi của các dòng trong mỗi cửa sổ dựa trên giá trị của một cột được sắp xếp (thông quaorderBy
). Các tham sốstart
vàend
có thể là các giá trị sau:Window.unboundedPreceding
: Đại diện cho không giới hạn phía trước. Nghĩa là phạm vi sẽ bắt đầu từ dòng đầu tiên của cửa sổ.Window.unboundedFollowing
: Đại diện cho không giới hạn phía sau. Nghĩa là phạm vi sẽ kết thúc ở dòng cuối cùng của cửa sổ.Window.currentRow
: Đại diện cho dòng hiện tại trong cửa sổ.
Ví dụ,
rangeBetween(Window.unboundedPreceding, Window.currentRow)
sẽ xác định phạm vi từ dòng đầu tiên của cửa sổ đến dòng hiện tại trong cửa sổ.
from pyspark.sql import Window
from pyspark.sql import functions as func
df = spark.createDataFrame( [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"])
Tính tổng id trong phạm vi từ id của currentRow đến id của currentRow + 1 trong danh mục phân vùng.
window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
df.withColumn("sum", func.sum("id").over(window)).sort("id", "category").show() -----
+---+--------+---+
| id|category|sum|
+---+--------+---+
| 1| a| 4|
| 1| a| 4|
| 1| b| 3|
| 2| a| 2|
| 2| b| 5|
| 3| b| 3|
+---+--------+---+
-
Window.unboundedFollowing
: Đại diện cho không giới hạn phía sau. Khi sử dụngWindow.unboundedFollowing
, chúng ta xác định rằng phạm vi tính toán hoặc thực hiện hàm phân tích áp dụng cho tất cả các dòng từ dòng hiện tại đến cuối cùng của cửa sổ. -
Window.unboundedPreceding
: Đại diện cho không giới hạn phía trước. Khi sử dụngWindow.unboundedPreceding
, chúng ta xác định rằng phạm vi tính toán hoặc thực hiện hàm phân tích áp dụng cho tất cả các dòng từ đầu cửa sổ đến dòng hiện tại.
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import sum # Tạo SparkSession
spark = SparkSession.builder.getOrCreate() # Tạo DataFrame mẫu
data = [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5)]
df = spark.createDataFrame(data, ["category", "value"]) # Định nghĩa cửa sổ
window = Window.partitionBy("category").orderBy("value") # Sử dụng Window.unboundedPreceding để tính tổng tích lũy
df.withColumn("cumulative_sum", sum("value").over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show() # Sử dụng Window.unboundedFollowing để tính tổng tích lũy ngược
df.withColumn("reverse_cumulative_sum", sum("value").over(window.rowsBetween(Window.currentRow, Window.unboundedFollowing))).show() # Kết quả
+--------+-----+--------------+
|category|value|cumulative_sum|
+--------+-----+--------------+
| A| 1| 1|
| A| 2| 3|
| A| 3| 6|
| B| 4| 4|
| B| 5| 9|
+--------+-----+--------------+ +--------+-----+---------------------+
|category|value|reverse_cumulative_sum|
+--------+-----+---------------------+
| A| 1| 6|
| A| 2| 5|
| A| 3| 3|
| B| 4| 5|
| B| 5| 0|
+--------+-----+---------------------+
Các giá trị Window.unboundedPreceding
và Window.unboundedFollowing
là cách chúng ta xác định rằng không có giới hạn trước hoặc sau đối với phạm vi tính toán hoặc hàm phân tích.
Các phương thức và giá trị trên cho phép chúng ta linh hoạt xác định phạm vi các dòng trong mỗi cửa sổ dữ liệu, từ đó tính toán và áp dụng các hàm phân tích phức tạp trên dữ liệu.
Ứng dụng trong xử lý dữ liệu phân tán
pyspark.sql.Window
rất hữu ích khi chúng ta cần thực hiện các tính toán trên các nhóm dữ liệu hoặc áp dụng các hàm phân tích cửa sổ cho từng dòng dữ liệu. Ví dụ, chúng ta có thể sử dụng pyspark.sql.Window
để tính toán tổng các dòng trước đó, tính toán độ dốc, lấy giá trị trước/sau của một dòng, hoặc tính toán các thống kê như trung bình, độ lệch chuẩn, v.v.
Ví dụ, để tính tổng cột "sales" theo từng nhóm dữ liệu của cột "category", chúng ta có thể sử dụng đoạn mã sau:
from pyspark.sql import Window
from pyspark.sql.functions import sum window = Window.partitionBy("category")
df.withColumn("category_total_sales", sum("sales").over(window))
Đoạn mã trên tạo một cửa sổ dữ liệu bằng cách sử dụng partitionBy("category")
và sau đó tính tổng cột "sales" trong mỗi nhóm bằng cách sử dụng hàm sum("sales").over(window)
.
Kết luận
pyspark.sql.Window
là một công cụ mạnh mẽ trong PySpark cho phép chúng ta thực hiện tính toán và phân tích trên các nhóm dữ liệu. Bằng cách sử dụng các phương thức như partitionBy
, orderBy
, và rowsBetween
, chúng ta có thể xác định các cửa sổ dữ liệu theo những tiêu chí cụ thể và thực hiện các tính toán phức tạp trên từng cửa sổ. Với khả năng xử lý dữ liệu phân tán và tính toán song song của PySpark, pyspark.sql.Window
là một công cụ quan trọng trong kho công cụ của chúng ta khi làm việc với dữ liệu lớn và phân tán.
Kham khảo:
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/window.html