Hàng đợi thường là thành phần cơ bản trong các mẫu thiết kế phần mềm. Nhưng điều gì sẽ xảy ra nếu có hàng triệu tin nhắn nhận được mỗi giây và multi-process consumers cần có khả năng đọc đầy đủ tất cả các tin nhắn? Java chỉ có thể chứa rất nhiều thông tin trước khi bộ nhớ heap trở lên quá lớn và trở thành yếu tố hạn chế với các bộ sưu tập rác (GC) có tác động lớn, điều này thậm chí tạm dừng JVM trong vài giây hoặc thậm chí vài phút.
Bài viết này đề cập đến cách tạo hàng đợi lớn liên tục trong khi vẫn duy trì độ trễ thấp nhất quán và có thể dự đoán được bằng cách sử dụng Hàng đợi Chronicle mã nguồn mở .
Ứng dụng
Trong bài viết này, mục tiêu là duy trì một hàng đợi các đối tượng từ nguồn cấp dữ liệu thị trường (ví dụ: giá mới nhất cho chứng khoán được giao dịch trên một sàn giao dịch). Các lĩnh vực kinh doanh khác như đầu vào cảm giác từ thiết bị IOT hoặc đọc thông tin ghi lại sự cố trong ngành công nghiệp ô tô cũng có thể được chọn. Nguyên tắc là như nhau.
Để bắt đầu, một lớp nắm giữ dữ liệu được xác định:
public class MarketData extends SelfDescribingMarshallable { int securityId; long time; float last; float high; float low; // Getters and setters not shown for brevity }
Lưu ý: Trong các tình huống thực tế, phải hết sức cẩn thận khi sử dụng float và double để giữ các giá trị tiền tệ vì điều này có thể gây ra các vấn đề về làm tròn. Tuy nhiên, trong bài viết giới thiệu này, tôi muốn giữ mọi thứ đơn giản.
Ngoài ra còn có một hàm tiện ích nhỏ MarketDataUtil::create sẽ tạo và trả về một đối tượng MarketData ngẫu nhiên mới khi được gọi:
static MarketData create() { MarketData marketData = new MarketData(); int id = ThreadLocalRandom.current().nextInt(1000); marketData.setSecurityId(id); float nextFloat = ThreadLocalRandom.current().nextFloat(); float last = 20 + 100 * nextFloat; marketData.setLast(last); marketData.setHigh(last * 1.1f); marketData.setLow(last * 0.9f); marketData.setTime(System.currentTimeMillis()); return marketData; }
Bây giờ, mục tiêu là tạo một hàng đợi có khả năng bền, đồng thời, độ trễ thấp, có thể truy cập từ một số process và có thể chứa hàng tỷ objects.
2. Cách tiếp cận ngây thơ
Sử dụng ConcurrentLinkedQueue:
public static void main(String[] args) { final Queue<MarketData> queue = new ConcurrentLinkedQueue<>(); for (long i = 0; i < 1e9; i++) { queue.add(MarketDataUtil.create()); } }
Điều này sẽ thất bại vì nhiều lý do:
ConcurrentLinkedQueuetạo sẽ tạo một node bao bọc cho mỗi phần tử được thêm vào hàng đợi. Điều này sẽ tăng gấp đôi số lượng đối tượng được tạo một cách hiệu quả.
Các đối tượng được đặt trên Java heap, góp phần gây ra các vấn đề về áp lực bộ nhớ heap và thu gom rác. Trên máy của tôi, điều này dẫn đến toàn bộ JVM của tôi trở nên không phản hồi và cách duy nhất là buộc phải kill tiến trình: sử dụng “kill -9”.
Không thể đọc hàng đợi từ các quy trình khác (tức là các JVM khác).
Khi JVM kết thúc, nội dung của hàng đợi sẽ bị mất. Do đó, hàng đợi không bền.
Nhìn vào các lớp Java tiêu chuẩn khác, có thể kết luận rằng không có sự hỗ trợ nào cho các hàng đợi lớn liên tục.
3. Sử dụng hàng đợi Chronicle
Chronicle Queue là một thư viện mã nguồn mở và được thiết kế để đáp ứng các yêu cầu nêu trên. Đây là một cách để thiết lập và sử dụng nó:
public static void main(String[] args) { final MarketData marketData = new MarketData(); final ChronicleQueue q = ChronicleQueue .single("market-data"); final ExcerptAppender appender = q.acquireAppender(); for (long i = 0; i < 1e9; i++) { try (final DocumentContext document = appender.acquireWritingDocument(false)) { document .wire() .bytes() .writeObject(MarketData.class, MarketDataUtil.recycle(marketData)); } } }
Sử dụng MacBook Pro 2019 với Intel Core i9 8 nhân 2,3 GHz, có thể chèn hơn 3.000.000 tin nhắn mỗi giây chỉ bằng một luồng duy nhất. Hàng đợi được duy trì thông qua tệp ánh xạ bộ nhớ trong thư mục đã cho “ market-data”. Chúng ta mong đợi một đối tượng MarketData chiếm ít nhất 4 (int securityId) + 8 (long time) + 4*3 (float last, high and low) = 24 bytes.
Trong ví dụ trên, 1 tỷ đối tượng đã được thêm vào khiến tệp được ánh xạ chiếm 30.148.657.152 byte, tương đương với khoảng 30 byte cho mỗi thông báo. Theo tôi, điều này thực sự rất hiệu quả.
Như có thể thấy, một phiên bản MarketData đơn lẻ có thể được sử dụng lại nhiều lần vì Hàng đợi Biên niên sử sẽ làm phẳng nội dung của đối tượng hiện tại vào tệp ánh xạ bộ nhớ, cho phép sử dụng lại đối tượng. Điều này làm giảm áp lực bộ nhớ hơn nữa. Đây là cách phương thức tái chế hoạt động:
static MarketData recycle(MarketData marketData) { final int id = ThreadLocalRandom.current().nextInt(1000); marketData.setSecurityId(id); final float nextFloat = ThreadLocalRandom.current().nextFloat(); final float last = 20 + 100 * nextFloat; marketData.setLast(last); marketData.setHigh(last * 1.1f); marketData.setLow(last * 0.9f); marketData.setTime(System.currentTimeMillis()); return marketData; }
4. Đọc từ hàng đợi Chronicle
Đọc từ Hàng đợi Chronicle rất đơn giản. Tiếp tục ví dụ ở trên, phần sau đây cho thấy cách đọc hai đối tượng MarketData đầu tiên từ hàng đợi:
public static void main(String[] args) { final ChronicleQueue q = ChronicleQueue .single("market-data"); final ExcerptTailer tailer = q.createTailer(); for (long i = 0; i < 2; i++) { try (final DocumentContext document = tailer.readingDocument()) { MarketData marketData = document .wire() .bytes() .readObject(MarketData.class); System.out.println(marketData); } } }
Output như sau:
!software.chronicle.sandbox.queuedemo.MarketData { securityId: 202, time: 1634646488837, last: 45.8673, high: 50.454, low: 41.2806 } !software.chronicle.sandbox.queuedemo.MarketData { securityId: 117, time: 1634646488842, last: 34.7567, high: 38.2323, low: 31.281 }
Có các quy định để tìm kiếm hiệu quả vị trí của Tailer, chẳng hạn như đến cuối hàng đợi hoặc đến một chỉ mục nhất định.
5. What’s Next?
Có nhiều tính năng khác nằm ngoài phạm vi của bài viết này. Ví dụ: các tệp hàng đợi có thể được đặt để cuộn theo các khoảng thời gian nhất định (chẳng hạn như mỗi ngày, giờ hoặc phút) tạo ra sự phân tách thông tin một cách hiệu quả để dữ liệu cũ hơn có thể được làm sạch theo thời gian. Ngoài ra còn có các điều khoản để có thể cách ly các CPU và khóa các luồng Java đối với các CPU bị cô lập này, giúp giảm đáng kể hiện tượng rung ứng dụng.
Cuối cùng, có một phiên bản dành cho doanh nghiệp với tính năng sao chép hàng đợi trên các cụm máy chủ, mở đường cho tính khả dụng cao và cải thiện hiệu suất trong các kiến trúc phân tán. Phiên bản doanh nghiệp cũng bao gồm nhiều tính năng khác như mã hóa, luân phiên múi giờ và các trình bổ sung không đồng bộ.