1. Deep dive về microservice có độ trễ thấp
Chúng ta sẽ cùng xây dựng một microservice có thể chạy trong JVM của chính nó, có thể thực hiện các truy vấn và cập nhật JDBC thông qua hàng đợi liên tục cho yêu cầu và hàng đợi cho kết quả.
Tôi sẽ coi đây là một Gateway Service vì nó tương tác với một hệ thống nằm ngoài mô hình microservice.
2. Dịch vụ này làm gì?
Dịch vụ này hỗ trợ hai function executeQuery và executeUpdate. Các phương thức này phản chiếu các phương thức tương tự PreparedStatement ngoại trừ kết quả được chuyển dưới dạng thông báo
Hai function xử lý yêu cầu bất đồng bộ được khai báo trong interface sau:
public interface JDBCStatement { void executeQuery(String query, Class<? extends Marshallable> resultType, Object... args); void executeUpdate(String query, Object... args);
}
Hai function xử lý kết quả bất đồng bộ được khai báo trong interface sau:
public interface JDBCResult { void queryResult(Iterator<Marshallable> marshallableList, String query, Object... args); void queryThrown(Throwable t, String query, Object... args); void updateResult(long count, String update, Object... args); void updateThrown(Throwable t, String update, Object... args);
}
3. Các thành phần được bao bọc dưới dạng Dịch vụ
Nhìn vào phương thức executorUpdate:
public class JDBCComponent implements JDBCStatement { private final Connection connection; private final JDBCResult result; public JDBCComponent(ThrowingSupplier<Connection, SQLException> connectionSupplier, JDBCResult result) throws SQLException { connection = connectionSupplier.get(); this.result = result; } @Override public void executeUpdate(String query, Object... args) { try (PreparedStatement ps = connection.prepareStatement(query)) { for (int i = 0; i < args.length; i++) ps.setObject(i + 1, args[i]); int count = ps.executeUpdate(); // record the count. result.updateResult(count, query, args); } catch (Throwable t) { result.updateThrown(t, query, args); } }
Bạn có thể thấy rằng mọi thông báo đầu vào sẽ tạo ra một thông báo đầu ra với kết quả. Điều này sẽ hữu ích sau này để khởi động lại dịch vụ từ nơi nó bắt đầu và theo dõi tiến trình của nó, cũng như thu được kết quả.
Làm thế nào để bọc cái này như một dịch vụ:
public class JDBCService implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(JDBCService.class); private final ChronicleQueue in; private final ChronicleQueue out; private final ExecutorService service; private final ThrowingSupplier<Connection, SQLException> connectionSupplier; private volatile boolean closed = false; public JDBCService(ChronicleQueue in, ChronicleQueue out, ThrowingSupplier<Connection, SQLException> connectionSupplier) throws SQLException { this.in = in; this.out = out; this.connectionSupplier = connectionSupplier; service = Executors.newSingleThreadExecutor( new NamedThreadFactory(in.file().getName() + "-JDBCService", true)); (1) service.execute(this::runLoop); (2) service.shutdown(); // stop when the task exits. } void runLoop() { try { JDBCResult result = out.createAppender() (3) .methodWriterBuilder(JDBCResult.class) .recordHistory(true) .get(); JDBCComponent js = new JDBCComponent(connectionSupplier, result); MethodReader reader = in.createTailer().afterLastWritten(out).methodReader(js); (4) Pauser pauser = new LongPauser(50, 200, 1, 10, TimeUnit.MILLISECONDS); while (!closed) { if (reader.readOne()) (5) pauser.reset(); else pauser.pause(); } } catch (Throwable t) { LOGGER.error("Run loop exited", t); } } @Override public void close() { closed = true; } public JDBCStatement createWriter() { return in.createAppender() (6) .methodWriterBuilder(JDBCStatement.class) .recordHistory(true) .get(); } public MethodReader createReader(JDBCResult result) { return out.createTailer().methodReader(result); }
}
(1) Tạo một chủ đề với một tên có ý nghĩa. Chúng tôi sử dụng ExecutorService trong trường hợp chúng tôi muốn làm điều gì đó phức tạp hơn với nó sau này.
(2) Thêm nhiệm vụ này vào nhóm
(3) Tạo proxy để ghi vào hàng đợi đầu ra
(4) Bắt đầu đọc sau khi tin nhắn cuối cùng được xử lý thành công.
(5) Đọc một tin nhắn tại một thời điểm.
(6) Thêm phương thức trợ giúp để tạo trình ghi vào đầu vào của dịch vụ này
(7) Thêm một phương thức trợ giúp để đọc kết quả của dịch vụ này.
4. Làm thế nào để nó thực hiện?
Tôi đã thử nghiệm việc ghi này vào HSQLDB khá nhanh, thậm chí ghi vào một tệp. Mặc dù vậy, việc sử dụng nó làm Dịch vụ có thể hữu ích cho hoạt động rất bùng nổ vì chúng tôi có thể xử lý nhiều yêu cầu hơn rất nhiều trong một khoảng thời gian.
Bài kiểm tra hiệu suất ghi 200 nghìn tin nhắn nhanh nhất có thể và đợi tất cả hoàn tất. Thời gian đầu tiên là độ trễ trung bình để viết từng yêu cầu và độ trễ thứ hai là thời gian trung bình để nhận kết quả.
Thời gian trung bình để ghi mỗi bản cập nhật 1,5 us, thời gian trung bình để thực hiện mỗi bản cập nhật 29,7 us
Mặc dù HSQLDB có thể persist hơn 33 nghìn bản cập nhật mỗi giây, (1/29,7 us), gói dịch vụ có thể xử lý các đợt ghi hơn 660 nghìn lượt ghi mỗi giây. (1 / 1,5 us) Điều này thể hiện sự cải thiện gấp 20 lần về thông lượng liên tục mà nó có thể hỗ trợ.
5. How long can a burst be?
Cả Linux và Windows đều có xu hướng hoạt động tốt khi có tới 10% bộ nhớ chính bị "dirty" hoặc không được ghi vào đĩa. Ví dụ: nếu bạn có 256 GB, bạn có thể có 25 GB dữ liệu "bẩn". Mặc dù vậy, nếu tốc độ liên tục nhanh hơn dịch vụ tiêu thụ, nhưng đủ chậm để hệ thống phụ của đĩa có thể theo kịp, thì các liên tục của bạn có thể vượt quá kích thước bộ nhớ chính. Nói một cách dễ hiểu, nếu tin nhắn của bạn dài 256 byte, thì dịch vụ có thể bị chậm hơn một tỷ tin nhắn và nó sẽ không bị hết bộ nhớ hoặc bị lỗi. Hạn chế chính trong trường hợp này là dung lượng đĩa trống mà bạn có. Tại thời điểm đăng bài, bạn có thể mua 1 TB SSD Doanh nghiệp với giá dưới 600 đô la và Samsung đang bán ổ SSD 16 TB. Tôi hy vọng chi phí lưu trữ sẽ tiếp tục giảm.
6. Phần kết luận
Việc xây dựng một vi dịch vụ bằng cách gói một thành phần bằng API không đồng bộ với phương tiện vận chuyển để nhắn tin vào và ra đã hoạt động mà không có quá nhiều phức tạp.
Cách tốt nhất để đi nhanh là làm ít việc hơn.