- vừa được xem lúc

Intro to Spark

0 0 8

Người đăng: Lê Huỳnh Đức

Theo Viblo Asia

_____________________ 1 _____________________

  • SparkContext: tạo cụm
  • SparkConf(): tạo thuộc tính cho đối tượng

_____________________ 2 _____________________

from pyspark.sql import SparkSession as spark

pd_temp = pd.DataFrame(np.random.random(10))

+ Tạo Spark DataFrame

spark_temp = spark.createDataFrame(pd_temp)

+ Kiểm tra thông tin df

spark.catalog.listTables()

+ Tạo df tạm, nếu df đã tồn tại thì bỏ df này

.createTempView()

+ Tạo df tạm, nếu df đã tồn tại thì thay thế bằng df này

spark_temp.createOrReplaceTempView("temp")

_____________________ 3 _____________________

  • Đọc file .csv

file_path = "/usr/local/share/datasets/airports.csv" airports = spark.read.csv(file_path, header=True) airports.show()

  • Đọc file .log

log_file_path = "path/to/your/log/file.log" log_data = spark.read.text(log_file_path)

_____________________ 4 _____________________

  • Thêm cột vào df (dùng .withColumn)

df = df.withColumn("tên_cột", biểu thức) VD: flights = flights.withColumn("duration_hrs", flights.air_time/60)

_____________________ 5 _____________________

  • Filter (giống WHERE trong SQL)

long_flights = flights.filter("distance > 1000") long_flights.show()

_____________________ 6 _____________________

  • SELECT (có 2 cách select như bên dưới)

selected1 = flights.select("tailnum", "origin", "dest") temp = flights.select(flights.origin, flights.dest, flights.carrier)

#Filter filterA = flights.origin == "SEA" filterB = flights.dest == "PDX"

selected2 = temp.filter(filterA).filter(filterB)

_____________________ 7 _____________________

  • alias

#Định nghĩa avg_speed avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

#Cách 1 speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

#Cách 2 speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

.select chỉ được lấy cột từ df ban đầu .selectExpr có thể lấy cột từ việc sử dụng các tính toán (ví dụ đổi phút sang giờ rồi sử dụng alias)

_____________________ 8 _____________________

  • min, max, count đều thuộc nhóm phương thức Group

df.groupBy().min("col").show() VD: flights.filter(flights.origin == "PDX").groupBy().min("distance").show()

_____________________ 9 _____________________

  • Function

import pyspark.sql.functions as F

by_month_dest = flights.groupBy("month", "dest") by_month_dest.avg("dep_delay").show() by_month_dest.agg(F.stddev("dep_delay")).show()

-> In ra AVG: +-----+----+-----------------------+ |month|dest| avg(dep_delay) | +-----+----+-----------------------+ | 11| TUS| -2.3333333333333335| | 11| ANC| 7.529411764705882| | 1| BUR| -1.45| +-----+----+--------------------+

-> In ra std: +-----+----+---------------------------+ |month|dest|stddev_samp(dep_delay)| +-----+----+---------------------------+ | 11| TUS| 3.0550504633038935| | 11| ANC| 18.604716401245316| | 1| BUR| 15.22627576540667| +-----+----+---------------------------+

_____________________ 10 _____________________

  • Join

flights_with_airports = flights.join(airports, on="dest", how="leftouter") ** Phải đổi tên 2 cột để trùng nhau khi so sánh

_____________________ 11 _____________________

  • Machine Learning Pipelines
  • Cốt lõi của module pyspark.ml là các lớp Transformer và Estimator. Hầu hết mọi lớp khác trong mô-đun đều hoạt động tương tự như hai lớp cơ bản này.

  • Các lớp Transformer có phương thức .transform() lấy DataFrame và trả về DataFrame mới; thường là bản gốc có cột mới được thêm vào. Ví dụ: bạn có thể sử dụng lớp Bucketizer để tạo các thùng riêng biệt từ một tính năng liên tục hoặc lớp PCA để giảm kích thước của tập dữ liệu bằng cách sử dụng phân tích thành phần chính.

  • Tất cả các lớp ước tính đều triển khai phương thức .fit(). Các phương thức này cũng lấy một DataFrame, nhưng thay vì trả về một DataFrame khác, chúng trả về một đối tượng mô hình. Đây có thể giống như StringIndexerModel để bao gồm dữ liệu phân loại được lưu dưới dạng chuỗi trong mô hình hoặc RandomForestModel sử dụng thuật toán rừng ngẫu nhiên để phân loại hoặc hồi quy.

_____________________ 12 _____________________

- Do Spark chỉ nhận đầu vào là số nên cần chuyển lại kiểu dữ liệu (nếu đang để str)

.cast("kiểu dữ liệu") ** kiểu dữ liệu ở đây là: integer, double

VD: model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))

_____________________ 13 _____________________

  • Để có thể chuyển dữ liệu string sang int, ta sử dụng one-hot-vectors của pyspark.ml.features
  • Việc cần làm là tạo StringIndexer và OneHotEncoder, còn Pipeline sẽ xử lý phần còn lại

  • StringIndexer: StringIndexer là một bước quan trọng trong việc chuyển đổi các giá trị chuỗi thành các số nguyên. StringIndexer thực hiện công việc này bằng cách gán một số nguyên duy nhất cho mỗi giá trị trong cột chuỗi dựa trên tần số xuất hiện của các giá trị đó. Ví dụ, nếu bạn có một cột "color" với các giá trị "red", "green", "blue", thì StringIndexer sẽ gán cho chúng các số 0, 1, và 2 tương ứng.

  • OneHotEncoder: Sau khi đã biến đổi các giá trị chuỗi thành số nguyên bằng StringIndexer, bạn thường sẽ muốn tiếp tục biến đổi chúng thành dạng số nhị phân (binary) để sử dụng trong các mô hình học máy. Đối với mỗi giá trị trong cột đã được biến đổi bằng StringIndexer, OneHotEncoder sẽ tạo ra một vectơ nhị phân với tất cả giá trị là 0 trừ giá trị tương ứng với giá trị ban đầu, nó sẽ được đánh dấu là 1. Điều này giúp mô hình hiểu được rằng mỗi giá trị không phải là một biến liên tục mà là một biến category. Ví dụ: Nếu bạn có cột "color" đã được biến đổi bằng StringIndexer thành [0, 1, 2] và sau đó sử dụng OneHotEncoder, nó có thể trở thành: "red" -> [1, 0, 0] "green" -> [0, 1, 0] "blue" -> [0, 0, 1] Kết hợp cả hai bước này, bạn có thể biến đổi dữ liệu chuỗi thành dạng số thực hiện việc huấn luyện mô hình học máy trên chúng.

_____________________ 14 _____________________

- Code

#Tạo StringIndexer carr_indexer = StringIndexer(inputCol = 'carrier', outputCol = 'carrier_index')

#Tạo OneHotEncoder carr_encoder = OneHotEncoder(inputCol = 'carrier_index', outputCol = 'carrier_fact')

#Tạo VectorAssembler vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

#Tạo pipeline from pyspark.ml import Pipeline flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

#Fit và chuyển đổi dữ liệu piped_data = flights_pipe.fit(model_data).transform(model_data)

#Chia dữ liệu train, test training, test = piped_data.randomSplit([0.6, 0.4])

_____________________ 15 _____________________

  • Logistic regression
  • Dự đoán xác suất xảy ra sự kiện Nếu xác suất lớn hơn giá trị ngưỡng thì -> 1, True hoặc Yes Ngược lại -> 0, False hoặc No

#import from pyspark.ml.classification import LogisticRegression #Khởi tạo LogisticRegression Estimator lr = LogisticRegression()

_____________________ 16 _____________________

  • Xác thực chéo (cross validation)
    • Code

import pyspark.ml.evaluation as evals

#Tạo BinaryClassificationEvaluator (dành cho phân loại nhị phân như Logistic) evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

#Import tuning submodule import pyspark.ml.tuning as tune

#Tạo lưới tham số (parameter grid) grid = tune.ParamGridBuilder()

#Thêm các hyperparameter grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01)) grid = grid.addGrid(lr.elasticNetParam, [0, 1])

#Build the grid grid = grid.build()

#Tạo CrossValidator cv = tune.CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)

#Call lr.fit() best_lr = lr.fit(training)

#Print best_lr print(best_lr)

_____________________ 17 _____________________

** AUC càng gần 1 càng tốt

#Sử dụng test để đánh giá mô hình test_results = best_lr.transform(test)

#Đánh giá dự đoán print(evaluator.evaluate(test_results))

_____________________ End _____________________

Bình luận

Bài viết tương tự

- vừa được xem lúc

PySpark với một project Machine Learning nho nhỏ

Trong không khi người người MayFest, nhà nhà MayFest, tiếp nối series tự học và khám phá về Data Sience, trong bài viết hôm nay mình sẽ chia sẻ cùng mọi người kiến thức cơ bản cũng như thực hành về Sp

0 0 25

- vừa được xem lúc

PySpark Decorators: Tận dụng sức mạnh của Python Decorators trong việc phân tích dữ liệu lớn

PySpark là một trong những công cụ phổ biến nhất để xử lý và phân tích dữ liệu lớn. Nó cung cấp một API Python để làm việc với Apache Spark, một hệ thống xử lý dữ liệu phân tán mạnh mẽ.

0 0 20

- vừa được xem lúc

Giải thích và Ứng dụng của PySpark.sql.Window trong Xử lý Dữ liệu phân tán

Giới thiệu. Trong việc xử lý dữ liệu phân tán và tính toán song song, PySpark là một trong những công cụ phổ biến và mạnh mẽ nhất. Trong PySpark, pyspark.sql.

0 0 18

- vừa được xem lúc

Churn Customer Prediction

Introduction:. In unraveling this intricate problem, we delve into a detailed examination of Sparkify's data to decipher underlying patterns and insights crucial for predicting customer churn.

0 0 18

- vừa được xem lúc

Xử lý dữ liệu phân tán sử dụng Apache Spark và SageMaker

Apache Spark là một công cụ phân tích hợp nhất để xử lý dữ liệu quy mô lớn. Spark framework thường được sử dụng trong luồng học máy để chuyển đổi dữ liệu hoặc kỹ thuật đặc trưng trên quy mô lớn.

0 0 14

- vừa được xem lúc

Spark streaming với Kafka

Giới thiệu chung về Spark. Trước khi Spark ra đời, Hadoop là một tool mạnh mẽ và phổ biến, tuy nhiên Hadoop có những hạn chế nhất định và Spark ra đời để cải thiện các hạn chế đó.

0 0 10