- vừa được xem lúc

Giải thích và Ứng dụng của PySpark.sql.Window trong Xử lý Dữ liệu phân tán

0 0 18

Người đăng: Kiên Lý

Theo Viblo Asia

Giới thiệu

Trong việc xử lý dữ liệu phân tán và tính toán song song, PySpark là một trong những công cụ phổ biến và mạnh mẽ nhất. Trong PySpark, pyspark.sql.Window là một lớp quan trọng trong module pyspark.sql, cho phép chúng ta thực hiện các phép tính và phân tích cửa sổ trên các DataFrame. Trên thực tế, pyspark.sql.Window cung cấp một cách tiện lợi để phân chia dữ liệu thành các nhóm (cửa sổ) và thực hiện các tính toán dựa trên cửa sổ đó. Trong bài viết này, chúng ta sẽ tìm hiểu về pyspark.sql.Window và cách áp dụng nó trong xử lý dữ liệu phân tán.

pyspark.sql.Window là gì?

Trong PySpark, pyspark.sql.Window là một lớp được sử dụng để định nghĩa và xác định cửa sổ dữ liệu trong DataFrame. Nó cho phép chúng ta xác định cách phân chia dữ liệu thành các nhóm và sắp xếp chúng trong từng nhóm. Bằng cách sử dụng pyspark.sql.Window, chúng ta có thể thực hiện các tính toán trên các nhóm dữ liệu, như tính toán tổng, trung bình, độ dốc, lệch chuẩn, v.v. Chính xác hơn, pyspark.sql.Window cho phép chúng ta định nghĩa:

  1. Các cột để phân chia dữ liệu thành các nhóm.
  2. Cách sắp xếp dữ liệu bên trong mỗi nhóm.
  3. Phạm vi của các dòng trong mỗi cửa sổ.

Các phương thức quan trọng trong pyspark.sql.Window

  1. Window.currentRow: Đại diện cho dòng hiện tại trong cửa sổ. Khi sử dụng Window.currentRow, chúng ta chỉ định rõ rằng phạm vi tính toán hoặc thực hiện hàm phân tích chỉ áp dụng cho dòng hiện tại.

  2. partitionBy(*cols): Phương thức này xác định các cột để phân chia dữ liệu thành các nhóm. Các cột được đưa vào partitionBy sẽ làm cơ sở cho việc phân chia dữ liệu và tính toán cửa sổ theo từng nhóm riêng biệt. Ví dụ, nếu chúng ta muốn tính tổng theo từng nhóm dữ liệu của cột "category", chúng ta có thể sử dụng partitionBy("category").

from pyspark.sql import Window
from pyspark.sql.functions import row_number
df = spark.createDataFrame( [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]) window = Window.partitionBy("category").orderBy("id")
df.withColumn("row_number", row_number().over(window)).show() -----
+---+--------+----------+
| id|category|row_number|
+---+--------+----------+
| 1| a| 1|
| 1| a| 2|
| 2| a| 3|
| 1| b| 1|
| 2| b| 2|
| 3| b| 3|
+---+--------+----------+
  1. orderBy(*cols): Phương thức này xác định cách sắp xếp dữ liệu bên trong mỗi nhóm. Các cột được đưa vào orderBy sẽ được sắp xếp theo thứ tự tăng dần để xác định thứ tự của các dòng trong cửa sổ. Ví dụ, nếu chúng ta muốn sắp xếp theo cột "date" và "time", chúng ta có thể sử dụng orderBy("date", "time").
from pyspark.sql import Window
from pyspark.sql.functions import row_number
df = spark.createDataFrame( [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]) window = Window.partitionBy("id").orderBy("category")
df.withColumn("row_number", row_number().over(window)).show() ----
+---+--------+----------+
| id|category|row_number|
+---+--------+----------+
| 1| a| 1|
| 1| a| 2|
| 1| b| 3|
| 2| a| 1|
| 2| b| 2|
| 3| b| 1|
+---+--------+----------+
  1. rowsBetween(start, end): Phương thức này xác định phạm vi của các dòng trong mỗi cửa sổ. Các tham số startend có thể nhận các giá trị như Window.unboundedPreceding, Window.unboundedFollowing, hoặc các số nguyên đại diện cho số lượng dòng trước và sau dòng hiện tại. Ví dụ, nếu chúng ta muốn tính tổng các dòng từ dòng hiện tại trở về 2 dòng trước, chúng ta có thể sử dụng rowsBetween(-2, 0).
from pyspark.sql import Window
from pyspark.sql import functions as func
df = spark.createDataFrame( [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"])

Tính tổng id trong phạm vi từ currentRow đến currentRow + 1 trong danh mục phân vùng

window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
df.withColumn("sum", func.sum("id").over(window)).sort("id", "category", "sum").show() -----
+---+--------+---+
| id|category|sum|
+---+--------+---+
| 1| a| 2|
| 1| a| 3|
| 1| b| 3|
| 2| a| 2|
| 2| b| 5|
| 3| b| 3|
+---+--------+---+
  1. rangeBetween(start, end): Phương thức này xác định phạm vi của các dòng trong mỗi cửa sổ dựa trên giá trị của một cột được sắp xếp (thông qua orderBy). Các tham số startend có thể là các giá trị sau:

    • Window.unboundedPreceding: Đại diện cho không giới hạn phía trước. Nghĩa là phạm vi sẽ bắt đầu từ dòng đầu tiên của cửa sổ.
    • Window.unboundedFollowing: Đại diện cho không giới hạn phía sau. Nghĩa là phạm vi sẽ kết thúc ở dòng cuối cùng của cửa sổ.
    • Window.currentRow: Đại diện cho dòng hiện tại trong cửa sổ.

    Ví dụ, rangeBetween(Window.unboundedPreceding, Window.currentRow) sẽ xác định phạm vi từ dòng đầu tiên của cửa sổ đến dòng hiện tại trong cửa sổ.

from pyspark.sql import Window
from pyspark.sql import functions as func
df = spark.createDataFrame( [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"])

Tính tổng id trong phạm vi từ id của currentRow đến id của currentRow + 1 trong danh mục phân vùng.

window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
df.withColumn("sum", func.sum("id").over(window)).sort("id", "category").show() -----
+---+--------+---+
| id|category|sum|
+---+--------+---+
| 1| a| 4|
| 1| a| 4|
| 1| b| 3|
| 2| a| 2|
| 2| b| 5|
| 3| b| 3|
+---+--------+---+
  1. Window.unboundedFollowing: Đại diện cho không giới hạn phía sau. Khi sử dụng Window.unboundedFollowing, chúng ta xác định rằng phạm vi tính toán hoặc thực hiện hàm phân tích áp dụng cho tất cả các dòng từ dòng hiện tại đến cuối cùng của cửa sổ.

  2. Window.unboundedPreceding: Đại diện cho không giới hạn phía trước. Khi sử dụng Window.unboundedPreceding, chúng ta xác định rằng phạm vi tính toán hoặc thực hiện hàm phân tích áp dụng cho tất cả các dòng từ đầu cửa sổ đến dòng hiện tại.

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import sum # Tạo SparkSession
spark = SparkSession.builder.getOrCreate() # Tạo DataFrame mẫu
data = [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5)]
df = spark.createDataFrame(data, ["category", "value"]) # Định nghĩa cửa sổ
window = Window.partitionBy("category").orderBy("value") # Sử dụng Window.unboundedPreceding để tính tổng tích lũy
df.withColumn("cumulative_sum", sum("value").over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show() # Sử dụng Window.unboundedFollowing để tính tổng tích lũy ngược
df.withColumn("reverse_cumulative_sum", sum("value").over(window.rowsBetween(Window.currentRow, Window.unboundedFollowing))).show() # Kết quả
+--------+-----+--------------+
|category|value|cumulative_sum|
+--------+-----+--------------+
| A| 1| 1|
| A| 2| 3|
| A| 3| 6|
| B| 4| 4|
| B| 5| 9|
+--------+-----+--------------+ +--------+-----+---------------------+
|category|value|reverse_cumulative_sum|
+--------+-----+---------------------+
| A| 1| 6|
| A| 2| 5|
| A| 3| 3|
| B| 4| 5|
| B| 5| 0|
+--------+-----+---------------------+ 

Các giá trị Window.unboundedPrecedingWindow.unboundedFollowing là cách chúng ta xác định rằng không có giới hạn trước hoặc sau đối với phạm vi tính toán hoặc hàm phân tích.

Các phương thức và giá trị trên cho phép chúng ta linh hoạt xác định phạm vi các dòng trong mỗi cửa sổ dữ liệu, từ đó tính toán và áp dụng các hàm phân tích phức tạp trên dữ liệu.

Ứng dụng trong xử lý dữ liệu phân tán

pyspark.sql.Window rất hữu ích khi chúng ta cần thực hiện các tính toán trên các nhóm dữ liệu hoặc áp dụng các hàm phân tích cửa sổ cho từng dòng dữ liệu. Ví dụ, chúng ta có thể sử dụng pyspark.sql.Window để tính toán tổng các dòng trước đó, tính toán độ dốc, lấy giá trị trước/sau của một dòng, hoặc tính toán các thống kê như trung bình, độ lệch chuẩn, v.v.

Ví dụ, để tính tổng cột "sales" theo từng nhóm dữ liệu của cột "category", chúng ta có thể sử dụng đoạn mã sau:

from pyspark.sql import Window
from pyspark.sql.functions import sum window = Window.partitionBy("category")
df.withColumn("category_total_sales", sum("sales").over(window))

Đoạn mã trên tạo một cửa sổ dữ liệu bằng cách sử dụng partitionBy("category") và sau đó tính tổng cột "sales" trong mỗi nhóm bằng cách sử dụng hàm sum("sales").over(window).

Kết luận

pyspark.sql.Window là một công cụ mạnh mẽ trong PySpark cho phép chúng ta thực hiện tính toán và phân tích trên các nhóm dữ liệu. Bằng cách sử dụng các phương thức như partitionBy, orderBy, và rowsBetween, chúng ta có thể xác định các cửa sổ dữ liệu theo những tiêu chí cụ thể và thực hiện các tính toán phức tạp trên từng cửa sổ. Với khả năng xử lý dữ liệu phân tán và tính toán song song của PySpark, pyspark.sql.Window là một công cụ quan trọng trong kho công cụ của chúng ta khi làm việc với dữ liệu lớn và phân tán.

Kham khảo:

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/window.html

Bình luận

Bài viết tương tự

- vừa được xem lúc

Apache Presto - Hướng dẫn cài đặt

Bài viết này mình sẽ hướng dẫn các bạn cách cài đặt Apache Presto, trước tiên, để làm theo hướng dẫn này thì yêu cầu cơ bản như sau:. .

0 0 44

- vừa được xem lúc

Apache Presto - Giới thiệu tổng quan và kiến trúc của Apache Presto

Sau seri HIVE thì mình sẽ mang đến tiếp tục seri về Apache Presto, thằng này thì có thể sử dụng HIVE như là một connector trong kiến trúc của nó, cùng tìm hiểu về nó nhé, let's start. Apache Presto rất hữu ích để thực hiện các truy vấn thậm chí là hàng petabyte dữ liệu.

0 0 44

- vừa được xem lúc

Đọc dữ liệu từ một file text và ghi lại dưới dạng file parquet trên HDFS sử dụng Spark (Phần 2)

Các bạn chưa đọc phần 1 thì có thể đọc tại đây nha : Đọc dữ liệu từ một file text và ghi lại dưới dạng file parquet trên HDFS sử dụng Spark (Phần 1). Ghi dữ liệu ra file parquet sử dụng Spark.

0 0 50

- vừa được xem lúc

Đọc dữ liệu từ một file text và ghi lại dưới dạng file parquet trên HDFS sử dụng Spark (Phần 1)

Định dạng text là một định dạng vô cùng phổ biến cả trên HDFS hay bất cứ đâu. Dữ liệu file text được trình bày thành từng dòng, mỗi dòng có thể coi như một bản ghi và đánh dấu kết thúc bằng kí tự "" (

0 0 37

- vừa được xem lúc

Blockchain dưới con mắt làng Vũ Đại 4.0

Mở bài. Hey nhô các bạn, lại là mình đây .

0 0 51

- vừa được xem lúc

Khám phá từng ngõ ngách Apache Druid - Phần 1

1. Giới thiệu. Trước khi đi vào nội dung chính mình muốn kể 1 câu chuyện sau:. .

0 0 574