_____________________ 1 _____________________
- SparkContext: tạo cụm
- SparkConf(): tạo thuộc tính cho đối tượng
_____________________ 2 _____________________
from pyspark.sql import SparkSession as spark
pd_temp = pd.DataFrame(np.random.random(10))
+ Tạo Spark DataFrame
spark_temp = spark.createDataFrame(pd_temp)
+ Kiểm tra thông tin df
spark.catalog.listTables()
+ Tạo df tạm, nếu df đã tồn tại thì bỏ df này
.createTempView()
+ Tạo df tạm, nếu df đã tồn tại thì thay thế bằng df này
spark_temp.createOrReplaceTempView("temp")
_____________________ 3 _____________________
- Đọc file .csv
file_path = "/usr/local/share/datasets/airports.csv" airports = spark.read.csv(file_path, header=True) airports.show()
- Đọc file .log
log_file_path = "path/to/your/log/file.log" log_data = spark.read.text(log_file_path)
_____________________ 4 _____________________
- Thêm cột vào df (dùng .withColumn)
df = df.withColumn("tên_cột", biểu thức) VD: flights = flights.withColumn("duration_hrs", flights.air_time/60)
_____________________ 5 _____________________
- Filter (giống WHERE trong SQL)
long_flights = flights.filter("distance > 1000") long_flights.show()
_____________________ 6 _____________________
- SELECT (có 2 cách select như bên dưới)
selected1 = flights.select("tailnum", "origin", "dest") temp = flights.select(flights.origin, flights.dest, flights.carrier)
#Filter filterA = flights.origin == "SEA" filterB = flights.dest == "PDX"
selected2 = temp.filter(filterA).filter(filterB)
_____________________ 7 _____________________
- alias
#Định nghĩa avg_speed avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")
#Cách 1 speed1 = flights.select("origin", "dest", "tailnum", avg_speed)
#Cách 2 speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")
.select chỉ được lấy cột từ df ban đầu .selectExpr có thể lấy cột từ việc sử dụng các tính toán (ví dụ đổi phút sang giờ rồi sử dụng alias)
_____________________ 8 _____________________
- min, max, count đều thuộc nhóm phương thức Group
df.groupBy().min("col").show() VD: flights.filter(flights.origin == "PDX").groupBy().min("distance").show()
_____________________ 9 _____________________
- Function
import pyspark.sql.functions as F
by_month_dest = flights.groupBy("month", "dest") by_month_dest.avg("dep_delay").show() by_month_dest.agg(F.stddev("dep_delay")).show()
-> In ra AVG: +-----+----+-----------------------+ |month|dest| avg(dep_delay) | +-----+----+-----------------------+ | 11| TUS| -2.3333333333333335| | 11| ANC| 7.529411764705882| | 1| BUR| -1.45| +-----+----+--------------------+
-> In ra std: +-----+----+---------------------------+ |month|dest|stddev_samp(dep_delay)| +-----+----+---------------------------+ | 11| TUS| 3.0550504633038935| | 11| ANC| 18.604716401245316| | 1| BUR| 15.22627576540667| +-----+----+---------------------------+
_____________________ 10 _____________________
- Join
flights_with_airports = flights.join(airports, on="dest", how="leftouter") ** Phải đổi tên 2 cột để trùng nhau khi so sánh
_____________________ 11 _____________________
- Machine Learning Pipelines
-
Cốt lõi của module pyspark.ml là các lớp Transformer và Estimator. Hầu hết mọi lớp khác trong mô-đun đều hoạt động tương tự như hai lớp cơ bản này.
-
Các lớp Transformer có phương thức .transform() lấy DataFrame và trả về DataFrame mới; thường là bản gốc có cột mới được thêm vào. Ví dụ: bạn có thể sử dụng lớp Bucketizer để tạo các thùng riêng biệt từ một tính năng liên tục hoặc lớp PCA để giảm kích thước của tập dữ liệu bằng cách sử dụng phân tích thành phần chính.
-
Tất cả các lớp ước tính đều triển khai phương thức .fit(). Các phương thức này cũng lấy một DataFrame, nhưng thay vì trả về một DataFrame khác, chúng trả về một đối tượng mô hình. Đây có thể giống như StringIndexerModel để bao gồm dữ liệu phân loại được lưu dưới dạng chuỗi trong mô hình hoặc RandomForestModel sử dụng thuật toán rừng ngẫu nhiên để phân loại hoặc hồi quy.
_____________________ 12 _____________________
- Do Spark chỉ nhận đầu vào là số nên cần chuyển lại kiểu dữ liệu (nếu đang để str)
.cast("kiểu dữ liệu") ** kiểu dữ liệu ở đây là: integer, double
VD: model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
_____________________ 13 _____________________
- Để có thể chuyển dữ liệu string sang int, ta sử dụng one-hot-vectors của pyspark.ml.features
-
Việc cần làm là tạo StringIndexer và OneHotEncoder, còn Pipeline sẽ xử lý phần còn lại
-
StringIndexer: StringIndexer là một bước quan trọng trong việc chuyển đổi các giá trị chuỗi thành các số nguyên. StringIndexer thực hiện công việc này bằng cách gán một số nguyên duy nhất cho mỗi giá trị trong cột chuỗi dựa trên tần số xuất hiện của các giá trị đó. Ví dụ, nếu bạn có một cột "color" với các giá trị "red", "green", "blue", thì StringIndexer sẽ gán cho chúng các số 0, 1, và 2 tương ứng.
-
OneHotEncoder: Sau khi đã biến đổi các giá trị chuỗi thành số nguyên bằng StringIndexer, bạn thường sẽ muốn tiếp tục biến đổi chúng thành dạng số nhị phân (binary) để sử dụng trong các mô hình học máy. Đối với mỗi giá trị trong cột đã được biến đổi bằng StringIndexer, OneHotEncoder sẽ tạo ra một vectơ nhị phân với tất cả giá trị là 0 trừ giá trị tương ứng với giá trị ban đầu, nó sẽ được đánh dấu là 1. Điều này giúp mô hình hiểu được rằng mỗi giá trị không phải là một biến liên tục mà là một biến category. Ví dụ: Nếu bạn có cột "color" đã được biến đổi bằng StringIndexer thành [0, 1, 2] và sau đó sử dụng OneHotEncoder, nó có thể trở thành: "red" -> [1, 0, 0] "green" -> [0, 1, 0] "blue" -> [0, 0, 1] Kết hợp cả hai bước này, bạn có thể biến đổi dữ liệu chuỗi thành dạng số thực hiện việc huấn luyện mô hình học máy trên chúng.
_____________________ 14 _____________________
- Code
#Tạo StringIndexer carr_indexer = StringIndexer(inputCol = 'carrier', outputCol = 'carrier_index')
#Tạo OneHotEncoder carr_encoder = OneHotEncoder(inputCol = 'carrier_index', outputCol = 'carrier_fact')
#Tạo VectorAssembler vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")
#Tạo pipeline from pyspark.ml import Pipeline flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])
#Fit và chuyển đổi dữ liệu piped_data = flights_pipe.fit(model_data).transform(model_data)
#Chia dữ liệu train, test training, test = piped_data.randomSplit([0.6, 0.4])
_____________________ 15 _____________________
- Logistic regression
- Dự đoán xác suất xảy ra sự kiện Nếu xác suất lớn hơn giá trị ngưỡng thì -> 1, True hoặc Yes Ngược lại -> 0, False hoặc No
#import from pyspark.ml.classification import LogisticRegression #Khởi tạo LogisticRegression Estimator lr = LogisticRegression()
_____________________ 16 _____________________
- Xác thực chéo (cross validation)
- Code
import pyspark.ml.evaluation as evals
#Tạo BinaryClassificationEvaluator (dành cho phân loại nhị phân như Logistic) evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")
#Import tuning submodule import pyspark.ml.tuning as tune
#Tạo lưới tham số (parameter grid) grid = tune.ParamGridBuilder()
#Thêm các hyperparameter grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01)) grid = grid.addGrid(lr.elasticNetParam, [0, 1])
#Build the grid grid = grid.build()
#Tạo CrossValidator cv = tune.CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
#Call lr.fit() best_lr = lr.fit(training)
#Print best_lr print(best_lr)
_____________________ 17 _____________________
** AUC càng gần 1 càng tốt
#Sử dụng test để đánh giá mô hình test_results = best_lr.transform(test)
#Đánh giá dự đoán print(evaluator.evaluate(test_results))
_____________________ End _____________________