- vừa được xem lúc

Xử lý dữ liệu phân tán sử dụng Apache Spark và SageMaker

0 0 14

Người đăng: Kiên Lý

Theo Viblo Asia

Apache Spark là một công cụ phân tích hợp nhất để xử lý dữ liệu quy mô lớn. Spark framework thường được sử dụng trong luồng học máy để chuyển đổi dữ liệu hoặc kỹ thuật đặc trưng trên quy mô lớn. Amazon SageMaker cung cấp một tập hợp Docker images dựng sẵn bao gồm Apache Spark và các phần phụ thuộc khác cần thiết để chạy các công việc xử lý dữ liệu phân tán trên Amazon SageMaker. Bài viết này trình bày cách sử dụng Spark images dựng sẵn trên SageMaker Treatment bằng SageMaker Python SDK.

Bài viết này cung cấp một số ví dụ minh họa chức năng của SageMaker Spark Container:

  • Chạy chương trình PySpark đơn giản sử dụng lớp PySparkProcessor của SageMaker Python SDK.
  • Xem giao diện người dùng Spark (Spark UI) thông qua chức năng start_history_server() của PySparkProcessor object.
  • Thêm các file phụ thuộc Python và jar cho jobs
  • Chạy basic Java/Scala-based Spark job đơn giản sử dụng lớp SparkJarProcessor của SageMaker Python SDK.
  • Chỉnh sửa cấu hình Spark

Thiết lập

Cài đặt phiên bản SageMaker Python SDK mới nhất

!pip install -U "sagemaker>2.0"

Chạy ứng dụng PySpark cơ bản

Ví dụ đầu tiên là tập lệnh xử lý dữ liệu Spark MLlib cơ bản. Tập lệnh này sẽ lấy một tập dữ liệu thô và thực hiện một số biến đổi trên đó, chẳng hạn như lập chỉ mục chuỗi và mã hóa one hot (one hot encoding).

Thiết lập vị trí và vai trò trong S3

Đầu tiên hãy thiết lập các vị trí SageMaker bucket mặc định để lưu trữ dữ liệu đầu vào raw và kết quả của Spark job. Các vai trò phải được xác định để chạy được tất cả các SageMaker Processing jobs.

import logging
import sagemaker
from time import gmtime, strftime sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler()) sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()

Tiếp đến, tải xuống tập dữ liệu mẫu từ SageMaker staging bucket.

# Fetch the dataset from the SageMaker bucket
import boto3 s3 = boto3.client("s3")
s3.download_file( f"sagemaker-example-files-prod-{sagemaker_session.boto_region_name}", "datasets/tabular/uci_abalone/abalone.csv", "./data/abalone.csv",
)

Viết mã PySpark

Nguồn cho tập lệnh tiền xử lý nằm trong ô bên dưới. %%writefile được sử dụng để lưu trữ lệnh cục bộ. Tập lệnh này thực hiện một số kỹ thuật cơ bản trên tập dữ liệu thô đầu vào. Trong ví dụ này, tập dữ liệu dùng từ Abalone Data Set và đoạn mã dưới đây thực hiện lập chỉ mục chuỗi, một mã one-hot, tập hợp vectơ và kết hợp chúng thành một đường dẫn để thực hiện các phép biến đổi này theo thứ tự. Sau đó, tập lệnh thực hiện phân chia 80-20 để tạo ra các tập dữ liệu huấn luyện và xác thực làm đầu ra.

%%writefile ./code/preprocess.py
from __future__ import print_function
from __future__ import unicode_literals import argparse
import csv
import os
import shutil
import sys
import time import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import ( OneHotEncoder, StringIndexer, VectorAssembler, VectorIndexer,
)
from pyspark.sql.functions import *
from pyspark.sql.types import ( DoubleType, StringType, StructField, StructType,
) def csv_line(data): r = ",".join(str(d) for d in data[1]) return str(data[0]) + "," + r def main(): parser = argparse.ArgumentParser(description="app inputs and outputs") parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket") parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix") parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket") parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix") args = parser.parse_args() spark = SparkSession.builder.appName("PySparkApp").getOrCreate() # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format spark.sparkContext._jsc.hadoopConfiguration().set( "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter" ) # Defining the schema corresponding to the input data. The input data does not contain the headers schema = StructType( [ StructField("sex", StringType(), True), StructField("length", DoubleType(), True), StructField("diameter", DoubleType(), True), StructField("height", DoubleType(), True), StructField("whole_weight", DoubleType(), True), StructField("shucked_weight", DoubleType(), True), StructField("viscera_weight", DoubleType(), True), StructField("shell_weight", DoubleType(), True), StructField("rings", DoubleType(), True), ] ) # Downloading the data from S3 into a Dataframe total_df = spark.read.csv( ("s3://" + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix, "abalone.csv")), header=False, schema=schema, ) # StringIndexer on the sex column which has categorical value sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex") # one-hot-encoding is being performed on the string-indexed sex column (indexed_sex) sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec") # vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format assembler = VectorAssembler( inputCols=[ "sex_vec", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ], outputCol="features", ) # The pipeline is comprised of the steps added above pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler]) # This step trains the feature transformers model = pipeline.fit(total_df) # This step transforms the dataset with information obtained from the previous fit transformed_total_df = model.transform(total_df) # Split the overall dataset into 80-20 training and validation (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2]) # Convert the train dataframe to RDD to save in CSV format and upload to S3 train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features)) train_lines = train_rdd.map(csv_line) train_lines.saveAsTextFile( "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "train") ) # Convert the validation dataframe to RDD to save in CSV format and upload to S3 validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features)) validation_lines = validation_rdd.map(csv_line) validation_lines.saveAsTextFile( "s3://" + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, "validation") ) if __name__ == "__main__": main()

Chạy SageMaker Processing Job

Kế tiếp, lớp PySparkProcessor được sử dụng và để xác định Spark job và sử dụng SageMaker Processing chạy nó. Một số điều cần lưu ý trong định nghĩa của PySparkProcessor:

  • Đây là multi-node job với hai phiên bản m5.xlarge (được chỉ định thông qua tham số instance_count và instance_type).
  • Spark framework phiên bản 3.1 được chỉ định thông qua tham số framework_version.
  • Tập lệnh PySpark được xác định ở trên được truyền qua tham số submit_app.
  • Command-line arguments đến tập lệnh PySpark (chẳng hạn như vị trí đầu vào và đầu ra của S3) được chuyển qua tham số arguments.
  • Nhật ký sự kiện Spark (Spark event logs) sẽ được tải xuống vị trí S3 được chỉ định trong spark_event_logs_s3_uri và có thể được sử dụng để xem Spark UI trong khi công việc đang được tiến hành hoặc sau nó hoàn thành.
from sagemaker.spark.processing import PySparkProcessor # Upload the raw input dataset to a unique S3 location
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = "sagemaker/spark-preprocess-demo/{}".format(timestamp_prefix)
input_prefix_abalone = "{}/input/raw/abalone".format(prefix)
input_preprocessed_prefix_abalone = "{}/input/preprocessed/abalone".format(prefix) sagemaker_session.upload_data( path="./data/abalone.csv", bucket=bucket, key_prefix=input_prefix_abalone
) # Run the processing job
spark_processor = PySparkProcessor( base_job_name="sm-spark", framework_version="3.1", role=role, instance_count=2, instance_type="ml.m5.xlarge", max_runtime_in_seconds=1200,
) spark_processor.run( submit_app="./code/preprocess.py", arguments=[ "--s3_input_bucket", bucket, "--s3_input_key_prefix", input_prefix_abalone, "--s3_output_bucket", bucket, "--s3_output_key_prefix", input_preprocessed_prefix_abalone, ], spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, prefix), logs=False,
)

Xác thực kết quả quá trình sử lý dữ liệu

Tiếp theo, xác thực đầu ra của công việc tiền xử lý dữ liệu bằng cách xem 5 hàng đầu tiên của tập dữ liệu đầu ra:

print("Top 5 rows from s3://{}/{}/train/".format(bucket, input_preprocessed_prefix_abalone))
!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix_abalone/train/part-00000 - | head -n5

Xem qua Spark UI

Tiếp theo, Spark UI có thể được xem bằng cách chạy cục bộ máy chủ lịch sử. (Lưu ý: tính năng này sẽ chỉ hoạt động trong môi trường phát triển cục bộ có cài đặt docker hoặc trên Sagemaker Notebook Instance. Tính năng này hiện không hoạt động trong SageMaker Studio.)

# uses docker
spark_processor.start_history_server()

Sau khi xem Spark UI, bạn có thể tắt lịch sử máy chủ trước khi tiếp tục.

spark_processor.terminate_history_server()

Kham khảo:

https://docs.aws.amazon.com/sagemaker/latest/dg/whatis.html

https://github.com/aws/amazon-sagemaker-examples/tree/main

Bình luận

Bài viết tương tự

- vừa được xem lúc

PDF Export, cẩn thận với những input có thể truyền vào

Giới thiệu. Dạo gần đây mình tình cờ gặp rất nhiều lỗi XSS, tuy nhiên trang đó lại có sử dụng dữ liệu người dùng input vào để export ra PDF.

0 0 66

- vừa được xem lúc

Giới thiệu về AWS Batch

Khi sử dụng hệ thống cloud service, điều chúng ta thường phải quan tâm đến không chỉ là hiệu suất hoạt động (performance) mà còn phải chú ý đến cả chi phí bỏ ra để duy trì hoạt động của hệ thống. Chắn hẳn là hệ thống lớn hay nhỏ nào cũng đã từng phải dùng đến những instance chuyên để chạy batch thực

0 0 143

- vừa được xem lúc

Tìm hiểu về AWS KMS

1. AWS KMS là gì. Ở KMS bạn có thể lựa chọn tạo symetric key (khóa đối xứng) hoặc asymetric key (khóa bất đối xứng) để làm CMK (Customer Master Key). Sau khi tạo key thì có thể thiết đặt key policy để control quyền access và sử dụng key.

0 0 66

- vừa được xem lúc

AWS VPC cho người mới bắt đầu

Tuần này, tôi trình bày lại những gì tôi đã học được về Virtual Private Cloud (VPC) của Amazon. Nếu bạn muốn xem những gì tôi đã học được về AWS, hãy xem Tổng quan về DynamoDB và Tổng quan về S3. VPC là gì. Những điều cần lưu ý:.

0 0 84

- vừa được xem lúc

AWS Essentials (Phần 6): Guildline SNS Basic trên AWS

Tiếp tục với chuỗi bài viết về Basic AWS Setting, chúng ta tiếp tục tìm hiểu tiếp tới SNS (Simple Notification Service). Đây là một service của AWS cho phép người dùng setting thực hiện gửi email, text message hay push notification tự động tới mobile device dựa trên event người dùng setting phía AWS

0 0 145

- vừa được xem lúc

Sử dụng Amazon CloudFront Content Delivery Network với Private S3 Bucket — Signing URLs

Trong nhiều trường hợp, thì việc sử dụng CDN là bắt buộc. Mình đã trải nghiệm với một số CDN nhưng cuối cùng mình lựa chọn sử dụng AWS CloudFront.

0 0 117