- vừa được xem lúc

Cứ thực hành Airflow dễ hiểu và đơn giản đã, chưa làm gì phức tạp cả

0 0 27

Người đăng: Trung Đức

Theo Viblo Asia

Mở đầu

Tiếp nối bài viết chỉ toàn lý thuyết Bài viết về Airflow cho người mới như mình thì chúng ta đi ngay tới bài thực hành này thôi

Chú ý là mình sẽ thực hành cùng với Python nhé ạ, bài viết sẽ gồm 2 phần thực hành chính

  • Thực hành với các tác vụ đơn giản
  • Thực hành với bài toán đào tạo mô hình Deep Learning

Cài đặt môi trường

  • Trong bài viết của anh Hoàng, có đề cập tới việc setup nhanh chóng với docker-compose, các bạn có thể tham khảo nhé
  • Mình hướng tới một cái gì đó chân chất, dễ hiểu, dễ tiếp cận cho người mới, ít động vào nhiều cái liên quan nên mình sẽ setup tay =)))

Chuẩn bị trước

  • Python: 3.7, 3.8, 3.9, 3.10
  • Minimum memory: 4 gb

Cài đặt Airflow bằng pip

  • Cài đặt các dependencies của Linux:
sudo apt-get install libmysqlclient-dev
sudo apt-get install libssl-dev
sudo apt-get install libkrb5-dev
  • Setup đường dẫn tới Airflow
export AIRFLOW_HOME=~/airflow
  • Assign 3 biến environment: AIRFLOW_VERSION, PYTHON_VERSION và CONSTRAINT_URL
export AIRFLOW_VERSION=2.3.3
export PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
  • Khi bạn assign một biến environment bằng lệnh export, biến này chỉ tồn tại trong phiên làm việc hiện tại của terminal. Để biến environment này tồn tại trong các phiên làm việc khác, bạn cần thêm lệnh export vào tệp ~/.bashrc
  • Nhớ lưu lại các thay đổi
source .bashrc
  • Cài đặt Airflow bằng pip
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
  • Khởi chạy các components của airflow
airflow webserver -p 8080
airflow scheduler

Các bạn mở trình duyệt lên, vào localhost:8080 để xem kết quả nhé

Thử với một tác vụ cơ bản nào

Cứ đi từ dễ nhất trước nhé, để chúng ta hiểu hơn về cấu trúc một file DAG. Thử một chương trình Hello world xem sao. Chúng ta sẽ cùng tạo 1 file my_dag.py có nội dung như sau

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from pytz import timezone
import os local_tz = timezone('Asia/Ho_Chi_Minh') # Định nghĩa các hàm xử lý dữ liệu
def process_data(): print('process data') def save_data(): print('save data') print() # Định nghĩa DAG
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 6, 7, tzinfo=local_tz), 'retries': 1, 'retry_delay': timedelta(minutes=5),
} dag = DAG( dag_id='my_dag', default_args=default_args, description='A simple DAG', schedule_interval='* * * * *'
) # Định nghĩa các Task
task1 = BashOperator( task_id='task1', bash_command='echo "Task 1"', dag=dag,
) task2 = PythonOperator( task_id='task2', python_callable=process_data, dag=dag,
) task3 = PythonOperator( task_id='task3', python_callable=save_data, dag=dag,
) # Thiết lập phụ thuộc giữa các Task
task1 >> task2 >> task3 if __name__ == "__main__": dag.cli()

Trước khi giải thích các phần trong mã nguồn trên, sẽ có một số lưu ý như sau:

  • Tên file mã nguồn cần trung với dag_id
  • File mã nguồn cần được lưu trong thư mục dags của airflow, mặc định sẽ là AIRFLOW_HOME/dags
  • Mỗi khi tạo 1 file DAG và đưa vào thư mục dags cần khởi động lại airflow webserver

Chúng ta sẽ cùng phân tích từng thành phần có trong file mã nguồn trên

  • ĐỊnh nghĩa DAG: đây là phần chúng ta sẽ định danh 1 số thông tin như dag_id để phân biệt các DAGs, người sở hữu, có phụ thuộc vào DAG nào đó hay không, ngày bắt đầu chạy, số lần thử lại nếu lỗi, thời gian thử lại nếu lỗi, mô tả DAG, thời gian tự lặp lại chạy DAG, ...
# Định nghĩa DAG
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 6, 7, tzinfo=local_tz), 'retries': 1, 'retry_delay': timedelta(minutes=5),
} dag = DAG( dag_id='my_dag', default_args=default_args, description='A simple DAG', schedule_interval='* * * * *'
)
  • Định nghĩa các Tasks có trong DAG: Tùy vào các loại Task, chúng ta sẽ lựa chọn Operator hoặc Sensor sao cho phù hợp. MÌnh có giới thiệu các Operator và Sensor ở bài 1, các bạn có thể xem lại. Ví dụ mình dùng BashOperator, và lệnh thực thi của mình là echo "Task 1". Thông tin tối thiểu cần định nghĩa cho Task đó là task_id, dag và thực thi cái gì
# Định nghĩa các Task
task1 = BashOperator( task_id='task1', bash_command='echo "Task 1"', dag=dag,
) task2 = PythonOperator( task_id='task2', python_callable=process_data, dag=dag,
) task3 = PythonOperator( task_id='task3', python_callable=save_data, dag=dag,
)
  • Thiết lập phụ thuộc giữa các Tasks: Dưới đây là những kiểu phụ thuộc thông dụng và ký hiệu tương ứng:

    • >>: Tác vụ bên trái phải được thực thi trước khi tác vụ bên phải được thực thi. Đây là kiểu phụ thuộc mặc định giữa các tác vụ trong Airflow.
    • <<: Tác vụ bên phải phải được thực thi trước khi tác vụ bên trái được thực thi.
    • >> và <<: Cả hai tác vụ phải được thực thi trước khi tác vụ được yêu cầu được thực thi.
    • >> với trigger_rule='all_success': Tất cả các tác vụ phải hoàn thành thành công trước khi tác vụ được yêu cầu được thực thi.
    • >> với trigger_rule='one_success': Một trong các tác vụ phải hoàn thành thành công trước khi tác vụ được yêu cầu được thực thi.
    • >> với trigger_rule='all_failed': Tất cả các tác vụ phải hoàn thành với lỗi trước khi tác vụ được yêu cầu được thực thi.
    • >> với trigger_rule='one_failed': Một trong các tác vụ phải hoàn thành với lỗi trước khi tác vụ được yêu cầu được thực thi.
    • Ví dụ về trigger_rule:
    t1 >> t2 >> t3
    t3.trigger_rule = 'all_failed'
    
  • Chạy DAG: Bạn có thể sử dụng dag.run() để chạy DAG của bạn trong một ứng dụng Python hoặc sử dụng dag.cli() để chạy các lệnh dòng lệnh Airflow từ DAG của bạn.

Vậy chúng ta cùng xem kết quả nào. Trước hêt nhớ những cái mình vừa note bên trên nhé, sau đó chúng ta sẽ thấy DAG của mình đã xuất hiện trên Webserver

Chúng ta sẽ chạy thử DAG của mình bằng nút gạt bên trái. Hiện tại code mình đang cho nó 1 phút chạy 1 lần và mỗi task sẽ in ra một dòng chữ. Trạng thái và số lượng những task theo từng trạng thái được hiển thị trong các ô tròn, các bạn trỏ chuột vào đó sẽ hiện ra những cái hint notes

Những thông tin in ra sẽ được lưu trữ trong log, mặc định ở thư mục AIRFLOW_HOME/logs

Xuất phát từ dân AI thì thử với Deep learning xem sao

Về nội dung phần này có lẽ mình sẽ nói chung chung một chút. Mình đặt ra một quy trình đơn giản trong việc thiết kế và xây dựng mô hình học máy như hình vẽ trên, trong đó chúng ta giả sử lựa chọn ra 3 kiến trúc mô hình phù hợp cho bài toán của mình, muốn tự động hóa việc training, lựa chọn mô hình có độ chính xác tốt nhất, sau đó đánh giá lại xem nó có hiệu quả hay không

Mình sẽ không trình bày các code liên quan đến training, evaluate quá sâu trong bài viết này

Khởi tạo DAG

Một công đoạn không thể thiếu khi chúng ta triển khai AirFlow

dag = DAG("my_dag", # Dag id start_date=datetime(2023, 1 ,1), # start date, the 1st of January 2021  schedule_interval='@daily', # Cron expression, here it is a preset of Airflow, @daily means once every day. description='A simple ML flow with DAG',

Training mô hình

Ở đây chúng ta sẽ có 3 task đại diện cho việc training 3 môn hình

def _training_model(): return random.randint(0, 10) # Tasks are implemented under the dag object
training_model_A = PythonOperator( task_id="training_model_A", python_callable=_training_model, dag=dag
)
training_model_B = PythonOperator( task_id="training_model_B", python_callable=_training_model, dag=dag
)
training_model_C = PythonOperator( task_id="training_model_C", python_callable=_training_model, dag=dag
)

Kiểm tra xem có mô hình đạt yêu cầu hay không

def _choosing_best_model(ti): accuracies = ti.xcom_pull(task_ids=[ 'training_model_A', 'training_model_B', 'training_model_C' ]) if max(accuracies) > 8: return 'accurate' return 'inaccurate' choosing_best_model = BranchPythonOperator( task_id="choosing_best_model", python_callable=_choosing_best_model, dag=dag
)

Ở đây các bạn sẽ bắt gặp một từ khóa mới, đó là xcom_pull. Các bạn có thể hiểu đơn giản như sau:

  • XCOM (Cross-Communication Messages) là một cơ chế cho phép dữ liệu đổi giữa các tác vụ DAG
  • Hàm _choosing_best_model được sử dụng để lấy thông tin về độ chính xác của 3 task training mô hình A, B, C, nếu 1 trong 3 độ chính xác này đạt một ngưỡng nào đó (ở đây việc training mình cho trả về random 1 giá trị từ 0 đến 10) thì trả về "đạt"

Hậu xử lý khi tối thiểu 1 trong 3 mô hình đã đạt kỳ vọng

Sau khi training 3 mô hình và xảy ra 2 trường hợp là tối thiểu 1 trong 3 mô hình đã đạt kỳ vọng hoặc không có mô hình nào đạt, thì cần có những hành động tiếp theo. Dưới đây mình có ví dụ dễ hiểu 1 chút

accurate = BashOperator( task_id="accurate", bash_command="echo 'Prediction'"
)
inaccurate = BashOperator( task_id="inaccurate", bash_command=" echo 'Retraining'"
)

Xác định phụ thuộc cho các task

training_model_tasks = [ PythonOperator( task_id=f"training_model_{model_id}", python_callable=_training_model, op_kwargs={ "model": model_id } ) for model_id in ['A', 'B', 'C']
]
choosing_best_model = BranchPythonOperator( task_id="choosing_best_model", python_callable=_choosing_best_model
) training_model_tasks >> choosing_best_model >> [accurate, inaccurate]

Tổng kết

Trên đây mình có giới thiệu qua một chút về việc thực hành với Airflow một cách đơn giản để hiểu quy tình cũng như các bước cơ bản khi làm việc với nó. Thời gian tới khi có dịp làm việc với Airflow nhiều hơn, mình sẽ chia sẻ thêm với các bạn. Cảm ơn mọi người đã đọc đến những dòng cuối này ^^

Tài liệu tham khảo

Bình luận

Bài viết tương tự

- vừa được xem lúc

Nhập môn lý thuyết cơ sở dữ liệu - Phần 2: Mô hình thực thể liên kết

**Chào các bạn, hôm nay mình tiếp tục viết tiếp phần 2 cho series Nhập môn lý thuyết cơ sở dữ liệu. Chắc hẳn qua bài trước các bạn tìm được lý do vì sao mình phải học môn này rồi chứ.

0 0 70

- vừa được xem lúc

Các vai trò chính trong Data Ecosystem - [Data Analyst Series]

Ngày nay, các tổ chức đang sử dụng dữ liệu để khám phá các cơ hội và mang lại lợi ích trong tương lai. Điển hình là tạo các mô hình trong các giao dịch tài chính để phát hiện gian lận, sử dụng các côn

0 0 48

- vừa được xem lúc

Tìm hiểu về Apache Spark

Ngày nay có rất nhiều hệ thống đang sử dụng Hadoop để phân tích và xử lý dữ liệu lớn. Ưu điểm lớn nhất của Hadoop là được dựa trên một mô hình lập trình song song với xử lý dữ liệu lớn là MapReduce, m

0 0 52

- vừa được xem lúc

Data Warehouse là gì? Top 7 ứng dụng quan trọng của kho dữ liệu

Data Warehouse là gì? Lợi ích và ứng dụng của kho dữ liệu Data Warehouse là gì? Với sự bùng nổ về mặt thông tin và dữ liệu như hiệu này thì đây luôn là những câu hỏi được rất nhiều bạn thắc mắc, đặc b

0 0 35

- vừa được xem lúc

Phân biệt: Database, Data Warehouse, Data Mart, Data Lake, Data Lakehouse, Data Fabric, Data Mesh

Chào mọi người,. Hôm nay, tiếp tục Series Phân tích dữ liệu kinh doanh, mình sẽ chia sẻ với mọi người những khái niệm phổ biến nhất liên quan về thiết kế hệ thống dữ liệu bên dưới nhé, vì khi làm phân

0 0 39

- vừa được xem lúc

Kỹ sư dữ liệu và lộ trình trở thành data engineer (DE) với 4 bước

Data Engineer hay còn gọi là kỹ sư dữ liệu là một trong những vị trí quan trọng trong lĩnh vực khoa học dữ liệu. Với sự phát triển của kỷ nguyên số, nhu cầu chuyển đổi số của các doanh nghiệp ngày càn

0 0 36