Tôi có một business service (tạm gọi là Service W được viết bằng Nodejs) được subscribe vào keyspace của Redis theo cơ chế pub-sub (đang được serve trên Kubernetes), Service W này hoạt động như một worker có nhiệm vụ lắng nghe thông báo khi một redis key hết hạn, tiếp theo đó nó sẽ xử lý một số logic cập nhật xuống database (tôi dùng Prisma Orm để mapping tới Postgres) và publish một message vào Kafka cho một tài nguyên duy nhất
1. Vấn đề gặp phải
Lấy ví dụ dưới đây là một đoạn logic của Service W xử lý khi lắng nghe event từ Redis
const movieName = 'Bleach thousand year blood war' const availableSeat = await prisma.seat.findFirst({ where: { movie: { name: movieName, }, claimedBy: null, },
}) if (!availableSeat) { throw new Error(`Oh no! ${movieName} is all booked.`)
} await prisma.seat.update({ data: { claimedBy: userId, }, where: { id: availableSeat.id, },
}) // Send kafka an event to do something
await producer.send({ topic: 'topic-name', messages: [] })
Đối với đoạn logic phía trên, khi scale Service W lên N-pods (workers) thì sẽ có tương đương được subscribes vào Redis, khi một redis key hết hạn thì N-pods (workers) sẽ nhận được same event ở cùng một thời điểm (cơ chế pub-sub) dẫn tới sẽ gửi cho Kafka N-messages giống nhau (duplicated messages)
2. Cách giải quyết
Theo bài toán ở trên sẽ có nhiều cách để xử lý, nhưng trong bài này tôi sẽ đề cập tới Optimistic concurrency control
Optimistic concurrency control là một mô hình xử lý các hoạt động đồng thời trên một thực thể duy nhất mà không dựa vào locking (bạn có thể tìm hiểu thêm về cơ chế này), hiểu đơn giản OCC sẽ sử dụng mã thông báo (trường version hoặc timestamp trong table) để phát hiện những thay đổi với bản ghi
Chúng ta cần thay đổi logic một chút để thoả điều kiện chỉ một worker được gửi message tới Kafka, tránh duplicated messages
const userEmail = '98savage@gmail.io'
const movieName = 'Bleach thousand year blood war' const availableSeat = await client.seat.findFirst({ where: { Movie: { name: movieName, }, claimedBy: null, },
}) if (!availableSeat) { throw new Error(`Oh no! ${movieName} is all booked.`)
} const seats = await client.seat.updateMany({ data: { claimedBy: userEmail, version: { increment: 1, }, }, where: { id: availableSeat.id, version: 0, },
}) if (seats.count === 0) { throw new Error(`That seat is already booked! Please try again.`)
} // Send kafka an event to do something
await producer.send({ topic: 'topic-name', messages: [] })
Đối với cách thay đổi này, khi N-pods (workers) nhận same event ở cùng một thời điểm, thì chỉ có một worker xử lý và gửi message vào kafka, còn các workers còn lại sẽ throw error bởi vì worker đảm nhiệm nó đã tăng version record từ 0 lên 1, các worker còn lại query version record = 0 không còn match đối với record đó nữa
Related links:
https://www.prisma.io/docs/orm/prisma-client/queries/transactions#optimistic-concurrency-control