Bạn cần một hàng đợi (queue) nhẹ để chạy trên nhiều máy mà không cần Redis, RabbitMQ hay dịch vụ đám mây? Trong bài viết này, chúng ta sẽ xây dựng một hàng đợi phân tán đáng ngạc nhiên bằng cách sử dụng SQLite, Python và một chút “ma thuật” khóa tệp.
Mô hình này hoạt động tốt nhất khi bạn cần chia sẻ hàng đợi qua ổ đĩa nội bộ hoặc mạng (như NFS, SMB) — ví dụ cụm cron, hệ thống render, hoặc xử lý batch — mà không cần triển khai hạ tầng phức tạp.
Tại sao sử dụng SQLite làm hàng đợi?
- Không cần dịch vụ ngoài để vận hành
- Tuyệt vời cho các công việc cập nhật không thường xuyên
- Hoạt động tốt trên ổ đĩa mạng được chia sẻ (NFS, SMB)
Bước 1: Tạo bảng hàng đợi
Chúng ta sẽ dùng một bảng đơn giản với các cột đánh dấu đã nhận việc (claimed) và timestamp:
import sqlite3 def init_db(): conn = sqlite3.connect("queue.db") conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY, task TEXT, claimed_by TEXT, claimed_at DATETIME ) """) conn.commit() conn.close()
Bước 2: Thêm công việc vào hàng đợi
Chèn một dòng vào bảng để thêm công việc mới:
def enqueue(task): conn = sqlite3.connect("queue.db") conn.execute("INSERT INTO jobs (task) VALUES (?)", (task,)) conn.commit() conn.close()
Bước 3: Nhận công việc bằng khóa
Để nhận công việc một cách an toàn giữa nhiều máy, ta sẽ dùng UPDATE có điều kiện:
import datetime, socket def claim_job(): conn = sqlite3.connect("queue.db", isolation_level="IMMEDIATE") conn.row_factory = sqlite3.Row hostname = socket.gethostname() now = datetime.datetime.utcnow().isoformat() cur = conn.execute(""" UPDATE jobs SET claimed_by = ?, claimed_at = ? WHERE id = ( SELECT id FROM jobs WHERE claimed_by IS NULL LIMIT 1 ) RETURNING * """, (hostname, now)) job = cur.fetchone() conn.commit() conn.close() return job
Bước 4: Xử lý và xóa công việc
def process(job): print(f"Processing: {job['task']}") conn = sqlite3.connect("queue.db") conn.execute("DELETE FROM jobs WHERE id = ?", (job["id"],)) conn.commit() conn.close()
Bước 5: Vòng lặp Worker
Bạn có thể chạy đoạn mã này theo lịch (cron) hoặc systemd timer trên nhiều máy:
if __name__ == "__main__": init_db() job = claim_job() if job: process(job)
Ưu và nhược điểm
1. Ưu điểm:
- Không cần cài đặt hay duy trì dịch vụ ngoài
- Dễ kiểm tra và debug
- Hoạt động trên ổ đĩa mạng chia sẻ
2. Nhược điểm:
- Không phù hợp cho tải công việc lớn
- Ghi đồng thời bị tuần tự hóa
- Cách hoạt động của khóa file SQLite có thể khác nhau tùy hệ điều hành
Các lựa chọn thay thế
- Redis + RQ: Phù hợp cho hàng đợi Python mở rộng
- Beanstalkd: Nhẹ, chuyên biệt cho hàng đợi
- Celery: Dư thừa cho các job cục bộ, nhưng rất đầy đủ tính năng
Tóm tắt
Trong các trường hợp không lý tưởng để cài đặt hoặc duy trì hệ thống hàng đợi chuyên dụng, một hàng đợi phân tán dùng SQLite là lựa chọn tối giản. Nó hoàn hảo cho phòng thí nghiệm tại nhà, thiết bị biên (edge node) hoặc bất cứ thứ gì cần xử lý công việc có trạng thái chia sẻ mà không cần hạ tầng phức tạp.
Cảm ơn các bạn đã theo dõi!