Mở đầu
Lướt dạo một vòng thì bài viết Airflow trên Viblo cũng có một số bài tương đối chi tiết như Tất tần tật về Airflow (P1) của anh Hoàng hay Một số điểm cần lưu ý khi sử dụng Airflow - Phần 1 của bạn Trung. Các bạn có thể đọc để hiểu hơn về Airflow
Đứng trên phương diện một người mới (đúng nghĩa, chưa từng làm việc với nó) thì bài viết này, mình rất mong muốn có thể vừa để lưu lại kiến thức cho chính bản thân (Mình viết thì bao giờ cũng dễ hiểu hơn đọc của người khác) vừa muốn chia sẻ tới những người mới tinh như mình có thể dễ dàng hiểu được cũng như thực hành được
Mình sẽ ra cùng lúc 2 bài, bài này là lý thuyết và bài tiếp theo là thực hành nhé ạ ^^ Cảm ơn mọi người đã ủng hộ
Apache AirFlow là gì?
- Apache Airflow là một công cụ mã nguồn mở được sử dụng để lập lịch, quản lý, và giám sát các quy trình xử lý dữ liệu. Nó được sử dụng rộng rãi trong các hệ thống xử lý dữ liệu lớn để tự động hóa các quy trình xử lý dữ liệu phức tạp.
- Airflow cung cấp các khái niệm như "DAG" (Directed Acyclic Graph), "Task", "Operator", "Sensor" để mô tả các quy trình xử lý dữ liệu.
DAG
-
Directed Acyclic Graph là một đồ thị có hướng không chu trình, mô tả tất cả các bước xử lý dữ liệu trong một quy trình
-
Quy trình công việc thường được xác định với sự trợ giúp của Đồ thị theo chu kỳ có hướng (DAG)
-
Mỗi DAG được xác định trong 1 file DAG, nó định nghĩa một quy trình xử lý dữ liệu, được biểu diễn dưới dạng một đồ thị có hướng không chu trình, trong đó các nút là các tác vụ (tasks) và các cạnh là các phụ thuộc giữa các tác vụ.
-
Các tác vụ trong DAG thường được xử lý tuần tự hoặc song song theo một lịch trình được định sẵn
-
Khi một DAG được thực thi, nó được gọi là một lần chạy DAG
-
Ví dụ về một DAG như hình dưới, DAG của chúng ta gồm: nhập dữ liệu, phân tích dữ liệu, lưu trữ dữ liệu, tạo báo cáo và kích hoạt các hệ thống khác như báo cáo lỗi qua Email
-
Vòng đời của 1 trạng thái nhiệm vụ gồm có các trạng thái sau
- No status: tác vụ chưa được xếp hàng để thực hiện
- Scheduled: Bộ lập lịch đã xác định rằng các phụ thuộc của nhiệm vụ được đáp ứng và đã lên lịch cho nó chạy
- Removed: Vì một lý do nào đó, tác vụ đã biết mất khỏi DAG kể từ khi bắt đầu chạy
- Upstream failed: tác vụ ngược dòng không thành công
- Queued: Nhiệm vụ đã được giao cho Executor và đang đợi 1 worker có sẵn để thực thi
- Running: Tác vụ đang được chạy bởi một worker
- Sucess: Tác vụ chạy xong không có lỗi
- Failed: Tác vụ có lỗi trong khi thực thi và không chạy được
- Up for retry: Tác vụ không thành công nhưng vẫn còn các lần thử lại và sẽ được lên lịch lại
Vậy lý tưởng nhất cho một tác vụ là gì? Đó là đi từ No status --> Scheduled --> Queued --> Running --> Sucess
Task
- Task là một đơn vị cơ bản để thực hiện một công việc nhỏ trong quy trình xử lý dữ liệu. Mỗi Task là một bước trong quy trình và có thể được lập lịch thực hiện tùy theo các điều kiện cụ thể.
- Một Task trong Airflow có các thuộc tính và phương thức sau:
- task_id: định danh duy nhất của task trong DAG.
- owner: người sở hữu task.
- depends_on_past: xác định liệu task hiện tại có phụ thuộc vào kết quả của task trước đó hay không.
- retries: số lần thử lại nếu task thất bại.
- retry_delay: khoảng thời gian giữa các lần thử lại.
- start_date: thời điểm bắt đầu thực hiện task.
- end_date: thời điểm kết thúc thực hiện task.
- execution_timeout: thời gian tối đa cho phép để thực hiện task.
- on_failure_callback: hàm được gọi khi task thất bại.
- on_success_callback: hàm được gọi khi task thành công.
Operator
- Mỗi operator đại diện cho một công việc cụ thể trong quy trình, ví dụ như đọc dữ liệu từ một nguồn dữ liệu, xử lý dữ liệu, hoặc ghi dữ liệu vào một nguồn dữ liệu khác.
- Các operator trong Airflow được phân loại thành các loại chính sau
- BashOperator: Chạy các lệnh Bash hoặc script Shell.
- PythonOperator: Thực thi các hàm Python.
- EmailOperator: Gửi email thông qua SMTP.
- DummyOperator: Được sử dụng để tạo các kết nối giữa các task.
- PythonVirtualenvOperator: Thực thi các hàm Python trong một môi trường ảo.
- MySqlOperator: Thực hiện các lệnh SQL trên cơ sở dữ liệu MySQL.
- PostgresOperator: Thực hiện các lệnh SQL trên cơ sở dữ liệu PostgreSQL.
- S3FileTransformOperator: Thực hiện các chức năng xử lý file trên Amazon S3.
- SparkSqlOperator: Thực hiện các truy vấn Spark SQL.
- HdfsSensor: Kiểm tra sự tồn tại của một tệp trên Hadoop Distributed File System (HDFS).
Ví dụ, giả sử bạn có một công việc hàng ngày cần đọc dữ liệu từ một tệp CSV, xử lý dữ liệu và lưu kết quả vào một cơ sở dữ liệu PostgreSQL. Trong Airflow, bạn có thể sử dụng các operator như sau:
- FileSensor: Kiểm tra sự tồn tại của tệp CSV trên hệ thống tệp.
- BashOperator: Sử dụng lệnh Bash để di chuyển tệp CSV đến thư mục xử lý.
- PythonOperator: Thực hiện các xử lý dữ liệu, ví dụ như đọc tệp CSV và chuyển đổi dữ liệu thành định dạng phù hợp để lưu vào cơ sở dữ liệu.
- PostgresOperator: Thực hiện các lệnh SQL để lưu kết quả xử lý vào PostgreSQL.
- EmailOperator: Gửi email thông báo cho người dùng khi quy trình xử lý dữ liệu hoàn thành.
Với các operator này, bạn có thể tạo một DAG trong Airflow để tự động hóa quy trình xử lý dữ liệu hàng ngày. DAG sẽ kiểm tra sự tồn tại của tệp CSV, di chuyển nó đến thư mục xử lý, thực hiện các xử lý dữ liệu và lưu kết quả vào cơ sở dữ liệu, sau đó gửi email thông báo cho người dùng khi quy trình hoàn thành.
Sensor
- Sensor là một loại Operator được sử dụng để giám sát các sự kiện và điều kiện, và thực hiện các hành động tương ứng.
- Sensor thường được sử dụng để đợi cho đến khi một điều kiện nào đó xảy ra trước khi tiếp tục thực hiện quy trình.
- Các loại Sensor trong Airflow bao gồm:
- FileSensor: Kiểm tra sự tồn tại của một tệp trên hệ thống tệp.
- TimeSensor: Đợi cho đến khi một khoảng thời gian cụ thể đã trôi qua.
- HttpSensor: Kiểm tra sự phản hồi của một URL cụ thể.
- HdfsSensor: Kiểm tra sự tồn tại của một tệp trên Hadoop Distributed File System (HDFS).
- SqlSensor: Kiểm tra sự tồn tại của một bảng hoặc một số dòng dữ liệu trong cơ sở dữ liệu.
- S3KeySensor: Kiểm tra sự tồn tại của một đối tượng trên Amazon S3.
- ExternalTaskSensor: Kiểm tra trạng thái của một task khác trong DAG.
Ví dụ, nếu bạn muốn tải dữ liệu từ một API bên ngoài vào cơ sở dữ liệu của mình hàng giờ, bạn có thể sử dụng HttpSensor để kiểm tra sự phản hồi của API trước khi tiếp tục thực hiện quy trình. Nếu API không phản hồi, Sensor sẽ giữ cho task đang chạy và thử lại sau một khoảng thời gian cụ thể, giúp đảm bảo rằng không có dữ liệu bị mất hoặc xử lý sai.
Airflow hoạt động thế nào
- Không giống như các công cụ Dữ liệu lớn như Apache Kafka, Apache Storm, Apache Spark,hoặc Flink, Apache Airflow không phải là giải pháp truyền dữ liệu. Nó chủ yếu là một trình quản lý quy trình làm việc
Hình vẽ trên tổng quan về các thành phần cơ bản của Apache Airflow.
- Scheduler: giám sát tất cả các DAG và các tác vụ được liên kết của chúng. Đối với 1 tác vụ, khi các phụ thuộc được đáp ứng, Scheduler sẽ khởi tạo tác vụ đó. Nó kiểm tra các tác vụ đang hoạt động để bắt đầu theo định kỳ
- Executor: xử lý việc chạy các tác vụ này bằng cách đưa chúng cho worker để chạy
- Web server: giao diện người dùng của Airflow, hiện thị trạng thái của nhiệm vụ và cho phép người dùng tương tác với cơ sở dữ liệu cũng như đọc tệp nhật kỹ từ kho lưu trữ từ xa như Google Cloud Storage, S3, ...
- DAG Directory: một thư mục chứa các file DAG của các quy trình xử lý dữ liệu (data pipelines) trong Airflow.
- Metabase Database: được sử dụng bởi Scheduler, Executor và Web Server để lưu trữ thông tin quan trọng của từng DAG, ví dụ như các phiên bản, số liệu thống kê mỗi lần chạy, khoảng thời gian lên lịch, ...
Tạm dừng lý thuyết
Có lẽ lỹ thuyết như vậy cũng đủ dài rồi, chúng ta sẽ cùng thực hành Airflow với 1 bài toán Deep Learning trong phần tiếp theo của mình nhé: Cứ thực hành Airflow dễ hiểu và đơn giản đã, chưa làm gì phức tạp cả