- vừa được xem lúc

Cập nhật Celery task real-time với Redis PUB/SUB và Websocket

0 0 89

Người đăng: Trung Tran

Theo Viblo Asia

Mình viết bài này chủ yếu để chia sẻ lại kiến thức trong quá trình nghiên cứu để phát triển ứng dụng web 3D trong đó một số tác vụ time-consuming, complex-computing như render, extract data từ image ...sẽ được xử lí phía server và cập nhật tiến độ cho client để theo dõi.

Đây là layout hệ thống mà mình sử dụng cho riêng tác vụ xử ly task từ user.

Yêu cầu hệ thống:

Chức năng:

    • User upload các loại asset được hỗ trợ ( file 3D: .fbx, .obj ...; file 2D: substance).
    • Hệ thống thực hiện các tác vụ tùy vào loại asset được upload.
    • Các task chạy background và chạy parallei-mode.
    • Update realtime về trạng thái task cho user

Trong khuôn khổ bài viết này, mình không đề cập chi tiết các module như Redis, RabbitMQ, Celery, FastAPI, các bạn có thể tìm thấy rất nhiều tutorial hướng dẫn trên internet. Ở đây mình sẽ chia sẻ cách thức mà team mình xây dựng để kết nối các module trên thành hệ thống để vận hành bài toán đặt ra.

Ok, vào việc luôn nha.

Modules

Celery

Đầu tiên, là một chút giới thiệu về Celery. Đối với các bạn làm Python thì Celery là công cụ khá nổi tiếng cho các tác vụ dạng Distributed Queue. Về cơ bản, nó sẽ phân bổ các task cho các worker, có một broker trung gian để nhận task từ server đưa xuống. Broker trung gian thường sử dụng là RabbitMQ hoặc Redis. Ở đây, mình sẽ sử dụng RabbitMQ là broker, Redis mình sẽ sử dụng chức năng PUB/SUB để publish trạng thái của task cho client.

Cốt lõi của Celery là các task sẽ được thực thi bởi các worker. Chúng ta chuyển function thông thường thành celery task bằng cách sử dụng decorator @task

Ở đây, với mỗi task chúng ta sẽ ghi nhận lại task-ID đồng thời kết hợp với session-ID mình dùng JWT (JSON web token), hai ID sẽ được tổ hơp lại thành channel để sử dụng Redis PUB. Vì user có thể submit nhiều file và mỗi file sẽ tương ứng với một task được thực thi ở background nên channel cần có thông tin của task-ID để sau này server có thể PSUBSCRIBE ( pattern subscribe). Nghe có vẻ rắc rối nhỉ, ?, thêm tí nữa cho nó xoắn não, và session-ID giúp cho WebSocket gửi thông tin lại cho đúng user đã gửi tasks.

Hi vọng layout trên giúp các bạn dễ dàng hình dung được lí do vì sao mình cần session-ID và task-ID.

Phần tiếp theo chỉ đơn giản là cấu hình celery application. Bắt đầu bằng việc định nghĩa các configuration:

Và celery application sẽ run với Configuration object bên trên

Cuối cùng, chỉ còn việc khởi động celery worker, sẵn sàng nhận task khi được yêu cầu:

celery -A services.queue.tasks worker --pool=eventlet --loglevel=info --concurrency=4

FastAPI

Phần tiếp theo, mình sẽ đề cập tóm tắt về HTTP Request và WebSocket của FastAPI. Minh sẽ ví dụ vài HTTP request khi user yêu cầu để thực thi các tác vụ.

WebSocket connection:

WebSocket chỉ được connect khi user đã chứng thực, do đó trong URL của WebSocket mình kèm thêm session-ID được khởi tạo khi user login và được server response bằng JSON web token.

Để tránh web server bị block các synchronous process khi sử dụng redis SUB, mình dùng module aioredis để tạo Pattern Subscribe đến Redis server.

Các bạn có thể thấy mình chọn pattern subscribe tasksession-ID đảm bảo subscribe tất cả các messges từ các tasks mà user đã submit trong 1 session. WebSocket sẽ gửi tất cả các message mỗi khi nhận được một message từ celery task. Ứng dụng web phía client sẽ cập nhật thông tin từ WebSocket và hiển thị process của task tương ứng. Phần này mình xin dành để nói thêm trong phần sau.

Lời kết

Cám ơn mọi người đã đọc tới phần này. Hi vọng đã cung cấp cho mọi người một vài thông tin hữu ích. Thật ra, những phần này mình cóp nhặt đây đó trên mạng và thay đổi tùy chỉnh theo yêu cầu của hệ thống bên mình. Mọi phản hồi và trao đổi team mình đều rất cảm ơn.

Bình luận

Bài viết tương tự

- vừa được xem lúc

Thao tác với File trong Python

Python cung cấp các chức năng cơ bản và phương thức cần thiết để thao tác các file. Bài viết này tôi xin giới thiệu những thao tác cơ bản nhất với file trong Python.

0 0 63

- vừa được xem lúc

Tập tành crawl dữ liệu với Scrapy Framework

Lời mở đầu. Chào mọi người, mấy hôm nay mình có tìm hiểu được 1 chút về Scrapy nên muốn viết vài dòng để xem mình đã học được những gì và làm 1 demo nho nhỏ.

0 0 166

- vừa được xem lúc

Sử dụng Misoca API (oauth2) với Python

Với bài viết này giúp chúng ta có thể nắm được. ・Tìm hiểu cách xử lý API misoca bằng Python.

0 0 49

- vừa được xem lúc

[Series Pandas DataFrame] Phân tích dữ liệu cùng Pandas (Phần 3)

Tiếp tục phần 2 của series Pandas DataFrame nào. Let's go!!. Ở phần trước, các bạn đã biết được cách lấy dữ liệu một row hoặc column trong Pandas DataFame rồi phải không nào. 6 Hoc.

0 0 63

- vừa được xem lúc

Lập trình socket bằng Python

Socket là gì. Một chức năng khác của socket là giúp các tầng TCP hoặc TCP Layer định danh ứng dụng mà dữ liệu sẽ được gửi tới thông qua sự ràng buộc với một cổng port (thể hiện là một con số cụ thể), từ đó tiến hành kết nối giữa client và server.

0 0 79

- vừa được xem lúc

[Series Pandas DataFrame] Phân tích dữ liệu cùng Pandas (Phần 2)

Nào, chúng ta cùng đến với phần 2 của series Pandas DataFrame. Truy xuất Labels và Data. Bạn đã biết cách khởi tạo 1 DataFrame của mình, và giờ bạn có thể truy xuất thông tin từ đó. Với Pandas, bạn có thể thực hiện các thao tác sau:.

0 0 95