1. CircuitBreaker
Circuit Breaker là một mẫu thiết kế (design pattern) được Michael Nygard giới thiệu trong cuốn sách "Release It!". Nó được tạo ra để ngăn chặn lỗi lan rộng trong các hệ thống phân tán, đặc biệt là trong kiến trúc microservices.
Hãy tưởng tượng Circuit Breaker (CB) giống như một cầu dao điện trong nhà bạn: khi phát hiện dòng điện quá tải, cầu dao sẽ ngắt mạch để bảo vệ toàn bộ hệ thống khỏi bị hư hại.
Trong phần mềm, Circuit Breaker cũng hoạt động tương tự:
Khi một service gặp sự cố (quá tải hoặc không phản hồi), CB sẽ ngắt các request đến service đó để tránh tạo thêm áp lực.
Điều này giúp cho service có thời gian phục hồi, đồng thời ngăn không cho lỗi lây lan sang các phần khác trong hệ thống.
Kết quả: Tăng tính ổn định hệ thống và ngăn chặn hiệu ứng domino trong microservices.
Hiểu và áp dụng Circuit Breaker sẽ giúp hệ thống microservices của bạn ổn định hơn, giảm thiểu rủi ro khi các service phụ thuộc gặp vấn đề.
Introduction
CircuitBreaker được triển khai dưới dạng máy trạng thái hữu hạn với ba trạng thái thông thường: CLOSED, OPEN và HALF_OPEN, và ba trạng thái đặc biệt: METRICS_ONLY, DISABLED và FORCED_OPEN.
CircuitBreaker sử dụng cửa sổ trượt (sliding window) để lưu trữ và tổng hợp kết quả của các lần gọi. Có thể lựa chọn giữa cửa sổ trượt theo số lượng (count-based sliding window) và cửa sổ trượt theo thời gian (time-based sliding window). Cửa sổ trượt theo số lượng tổng hợp kết quả của N lần gọi gần nhất. Cửa sổ trượt theo thời gian tổng hợp kết quả của các lần gọi trong N giây gần nhất.
Count-based sliding window
Cửa sổ trượt theo số lượng được triển khai bằng một mảng vòng tròn gồm N phép đo. Nếu kích thước cửa sổ là 10, mảng vòng tròn sẽ luôn có 10 phép đo.
Cửa sổ trượt cập nhật tổng hợp toàn phần một cách gia tăng. Tổng hợp toàn phần được cập nhật khi một kết quả gọi mới được ghi nhận. Khi phép đo cũ nhất bị loại bỏ, giá trị của phép đo đó sẽ bị trừ khỏi tổng hợp toàn phần và vùng chứa (bucket) được đặt lại. (Chiến lược Subtract-on-Evict)
Thời gian truy xuất một Snapshot là hằng số O(1), vì Snapshot đã được tổng hợp sẵn và không phụ thuộc vào kích thước cửa sổ. Yêu cầu bộ nhớ của cách triển khai này là O(n).
Ví dụ sau CB sẽ tính toán ra tổng request là 10, số request fail là 5:
Bucket (N=10) | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 |
---|---|---|---|---|---|---|---|---|---|---|
Total Request | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |
Total Fail | 1 | 0 | 0 | 1 | 0 | 1 | 1 | 1 | 0 | 0 |
==> Tỉ lệ lỗi = 50%
Time-based sliding window
Cửa sổ trượt theo thời gian được xây dựng bằng một mảng vòng tròn (circular array) gồm N bộ tổng hợp từng phần (partial aggregations), còn gọi là bucket.
Ví dụ: nếu cửa sổ thời gian là 10 giây, thì mảng vòng tròn này sẽ luôn có 10 bucket. Mỗi bucket dùng để tổng hợp kết quả của tất cả các lần gọi (call) xảy ra trong một giây cụ thể (epoch second). Bucket đầu tiên lưu kết quả của các lần gọi trong giây hiện tại, còn các bucket còn lại lưu kết quả của những giây trước đó.
Cửa sổ trượt này không lưu riêng từng lần gọi, mà sẽ cập nhật dần vào các bucket và một bộ tổng hợp chung (total aggregation). Mỗi khi có một kết quả gọi mới, hệ thống sẽ cộng dồn vào bucket tương ứng và tổng hợp chung. Khi một bucket cũ nhất bị loại bỏ (vì quá hạn), dữ liệu trong bucket đó sẽ được trừ ra khỏi tổng, rồi bucket được đặt lại. Cách xử lý này được gọi là Subtract-on-Evict (trừ khi loại bỏ).
Hiệu năng:
-
Thời gian để truy xuất một bản tổng hợp (Snapshot) luôn cố định: O(1) – rất nhanh, không phụ thuộc vào kích thước cửa sổ.
-
Bộ nhớ sử dụng gần như ổn định O(n) vì không lưu chi tiết từng lần gọi, chỉ lưu N bucket và 1 tổng hợp chung.
Cấu trúc của mỗi bucket gồm:
-
3 số nguyên (integer) để đếm:
- số lần gọi thất bại,
- số lần gọi chậm,
- và tổng số lần gọi.
-
1 số nguyên dài (long) để lưu tổng thời gian thực hiện của tất cả các lần gọi trong bucket đó.
Bucket (N=10) seconds Unit | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 |
---|---|---|---|---|---|---|---|---|---|---|
Total Request | 3 | 5 | 6 | 1 | 1 | 2 | 3 | 2 | 8 | 5 |
Total Fail | 1 | 2 | 3 | 2 | 0 | 1 | 1 | 1 | 0 | 1 |
Failure rate and slow call rate thresholds
Ngưỡng tỷ lệ lỗi và tỷ lệ cuộc gọi chậm
Trạng thái của CircuitBreaker chuyển từ CLOSED (ĐÓNG) sang OPEN (MỞ) khi tỷ lệ lỗi (failure rate) bằng hoặc vượt quá một ngưỡng cấu hình trước. Ví dụ, khi hơn 50% số lần gọi bị lỗi.
Mặc định, mọi exception (ngoại lệ) đều được tính là lỗi. Tuy nhiên, bạn có thể chỉ định danh sách các exception được tính là lỗi. Những exception không nằm trong danh sách đó sẽ được tính là thành công (success), trừ khi được bỏ qua.
Các exception cũng có thể được đánh dấu là bị bỏ qua (ignored) – tức là không tính là lỗi cũng không tính là thành công.
CircuitBreaker cũng sẽ chuyển từ CLOSED sang OPEN nếu tỷ lệ gọi chậm (slow call rate) vượt ngưỡng đã cấu hình. Ví dụ, khi hơn 50% số cuộc gọi mất hơn 5 giây để thực hiện.
Cơ chế này giúp giảm tải cho hệ thống bên ngoài trước khi nó rơi vào tình trạng không phản hồi.
Tỷ lệ lỗi và tỷ lệ gọi chậm chỉ được tính toán khi số lượng tối thiểu các cuộc gọi đã được ghi nhận. Ví dụ, nếu yêu cầu tối thiểu là 10 cuộc gọi, thì phải có ít nhất 10 cuộc gọi mới tính được tỷ lệ lỗi. Nếu chỉ có 9 cuộc gọi (dù tất cả đều lỗi), thì CircuitBreaker vẫn không chuyển sang trạng thái OPEN.
Khi ở trạng thái OPEN, CircuitBreaker sẽ từ chối các lần gọi với một exception là CallNotPermittedException.
Sau một khoảng thời gian chờ (wait time), trạng thái sẽ chuyển từ OPEN sang HALF_OPEN. Trong trạng thái này, CircuitBreaker cho phép một số lượng cuộc gọi nhất định để kiểm tra xem hệ thống backend đã phục hồi hay chưa.
Trong lúc đó, các cuộc gọi ngoài danh sách cho phép vẫn sẽ bị từ chối với CallNotPermittedException. Sau khi các cuộc gọi kiểm tra hoàn tất:
- Nếu tỷ lệ lỗi hoặc tỷ lệ gọi chậm vẫn vượt ngưỡng, CircuitBreaker sẽ chuyển lại sang OPEN.
- Nếu các tỷ lệ này thấp hơn ngưỡng, trạng thái sẽ chuyển về CLOSED.
CircuitBreaker hỗ trợ ba trạng thái đặc biệt:
- METRICS_ONLY: luôn cho phép truy cập, ghi nhận số liệu thống kê (metrics) như trạng thái CLOSED, nhưng không bao giờ chuyển sang OPEN.
- DISABLED: luôn cho phép truy cập, không ghi nhận bất kỳ số liệu nào.
- FORCED_OPEN: luôn từ chối truy cập, cũng không ghi nhận số liệu.
Ở hai trạng thái DISABLED và FORCEDOPEN, chỉ có sự kiện thay đổi trạng thái (state transition) được ghi nhận. Cách duy nhất để thoát khỏi hai trạng thái này là thay đổi trạng thái thủ công hoặc reset CircuitBreaker.
Đảm bảo an toàn luồng (thread-safe)
-
Trạng thái của CircuitBreaker được lưu trong AtomicReference.
-
CircuitBreaker sử dụng các thao tác atomic (bất khả phân) để cập nhật trạng thái, không có hiệu ứng phụ (side effect).
-
Việc ghi nhận kết quả gọi và đọc số liệu từ sliding window được đồng bộ hóa (synchronized). → Như vậy, tính nguyên tử (atomicity) được đảm bảo, chỉ một luồng có thể cập nhật trạng thái hoặc sliding window tại một thời điểm.
Tuy nhiên, hàm (function) mà bạn gọi không được đồng bộ hóa, tức là CircuitBreaker không can thiệp vào quá trình thực thi của hàm.
Lý do: nếu đồng bộ hóa cả hàm thì hiệu năng sẽ bị ảnh hưởng nghiêm trọng – một cuộc gọi chậm sẽ gây ra tắc nghẽn toàn hệ thống.
Ví dụ: nếu có 20 luồng đồng thời yêu cầu gọi hàm, và CircuitBreaker đang ở trạng thái CLOSED, tất cả các luồng đều được phép gọi hàm, dù sliding window chỉ chứa tối đa 15 lần gọi gần nhất. Cửa sổ trượt không giới hạn số luồng chạy song song, nó chỉ dùng để thống kê.
Nếu bạn muốn giới hạn số luồng chạy đồng thời, hãy dùng Bulkhead – một cơ chế riêng để kiểm soát luồng. Bạn có thể kết hợp Bulkhead với CircuitBreaker để đạt hiệu quả tối ưu.
Create a CircuitBreakerRegistry
Resilience4j cung cấp một CircuitBreakerRegistry lưu trữ trong bộ nhớ (in-memory), được xây dựng dựa trên ConcurrentHashMap, đảm bảo an toàn luồng (thread safety) và tính nguyên tử (atomicity).
Bạn có thể sử dụng CircuitBreakerRegistry để quản lý các instance của CircuitBreaker – tức là tạo mới hoặc truy xuất lại các CircuitBreaker đã có.
Bạn có thể khởi tạo một CircuitBreakerRegistry với một cấu hình mặc định (CircuitBreakerConfig) áp dụng cho tất cả các CircuitBreaker trong hệ thống của bạn như sau:
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
Create and configure a CircuitBreaker
Bạn có thể cung cấp CircuitBreakerConfig tùy chỉnh của riêng mình để sử dụng làm cấu hình toàn cục (global default). Để tạo một CircuitBreakerConfig tùy chỉnh, bạn có thể sử dụng builder của CircuitBreakerConfig.
Config property | Default Value | Description |
---|---|---|
failureRateThreshold | 50 | Cấu hình ngưỡng tỷ lệ lỗi (failure rate threshold) theo phần trăm. Khi tỷ lệ lỗi bằng hoặc vượt quá ngưỡng này, CircuitBreaker sẽ chuyển sang trạng thái OPEN và bắt đầu ngắt mạch (short-circuit) các cuộc gọi – tức là từ chối thực hiện các lệnh gọi tiếp theo. |
slowCallRateThreshold | 100 | Cấu hình ngưỡng tỷ lệ phần trăm cho các cuộc gọi chậm (slow calls). CircuitBreaker sẽ coi một cuộc gọi là chậm nếu thời gian thực thi của cuộc gọi vượt quá slowCallDurationThreshold (ngưỡng thời gian cho phép của một cuộc gọi). Khi tỷ lệ cuộc gọi chậm bằng hoặc vượt quá ngưỡng này, CircuitBreaker sẽ chuyển sang trạng thái OPEN và bắt đầu ngắt mạch, tức là từ chối thực hiện các cuộc gọi tiếp theo. |
slowCallDurationThreshold | 60000 [ms] | Cấu hình ngưỡng thời gian (duration threshold) – nếu một cuộc gọi có thời gian thực thi vượt quá ngưỡng này, cuộc gọi đó sẽ được coi là chậm (slow call) và sẽ làm tăng tỷ lệ cuộc gọi chậm trong thống kê của CircuitBreaker. |
permittedNumberOfCalls InHalfOpenState | 10 | Cấu hình số lượng cuộc gọi được phép thực hiện khi CircuitBreaker ở trạng thái HALF_OPEN. |
maxWaitDurationInHalfOpenState | 0 [ms] | Cấu hình khoảng thời gian chờ tối đa (maximum wait duration) – đây là giới hạn thời gian lâu nhất mà CircuitBreaker có thể duy trì ở trạng thái HALF_OPEN trước khi tự động chuyển sang trạng thái OPEN. Nếu giá trị được cấu hình là 0, điều đó có nghĩa là CircuitBreaker sẽ chờ vô thời hạn trong trạng thái HALF_OPEN cho đến khi tất cả các cuộc gọi được phép (permitted calls) hoàn tất. |
slidingWindowType | COUNT_BASED | Nếu sliding window (cửa sổ trượt) được cấu hình là COUNT_BASED, thì số lượng cuộc gọi cuối cùng tương ứng với slidingWindowSize sẽ được ghi nhận và tổng hợp. Nếu sliding window được cấu hình là TIME_BASED, thì các cuộc gọi trong khoảng thời gian cuối cùng (tính theo giây) tương ứng với slidingWindowSize sẽ được ghi nhận và tổng hợp. |
slidingWindowSize | 100 | Cấu hình kích thước của sliding window (cửa sổ trượt) – được sử dụng để ghi nhận kết quả của các cuộc gọi khi CircuitBreaker đang ở trạng thái CLOSED. |
minimumNumberOfCalls | 100 | Cấu hình số lượng cuộc gọi tối thiểu cần ghi nhận (trong mỗi chu kỳ của sliding window) trước khi CircuitBreaker có thể tính toán tỷ lệ lỗi hoặc tỷ lệ cuộc gọi chậm. Ví dụ: nếu minimumNumberOfCalls được đặt là 10, thì phải có ít nhất 10 cuộc gọi được ghi nhận thì CircuitBreaker mới bắt đầu tính toán tỷ lệ lỗi. Nếu chỉ mới ghi nhận 9 cuộc gọi, dù tất cả đều thất bại, thì CircuitBreaker vẫn không chuyển sang trạng thái OPEN. |
waitDurationInOpenState | 60000 [ms] | Khoảng thời gian mà CircuitBreaker sẽ chờ trước khi chuyển trạng thái từ OPEN sang HALF_OPEN. |
automaticTransition FromOpenToHalfOpenEnabled | false | Nếu được đặt là true, điều đó có nghĩa là CircuitBreaker sẽ tự động chuyển trạng thái từ OPEN sang HALF_OPEN mà không cần có cuộc gọi nào kích hoạt việc chuyển trạng thái. Một luồng (thread) sẽ được tạo ra để giám sát tất cả các instance của CircuitBreaker, và sẽ chuyển chúng sang trạng thái HALF_OPEN khi thời gian waitDurationInOpenState đã trôi qua. Ngược lại, nếu được đặt là false, thì việc chuyển sang HALF_OPEN chỉ xảy ra khi có một cuộc gọi được thực hiện, ngay cả khi thời gian waitDurationInOpenState đã hết. Ưu điểm trong trường hợp này là không cần luồng nào giám sát trạng thái của tất cả các CircuitBreaker |
recordExceptions | empty | Danh sách các exception sẽ được ghi nhận là thất bại và do đó làm tăng tỷ lệ lỗi. Bất kỳ exception nào khớp hoặc kế thừa từ một trong các exception trong danh sách này đều được tính là lỗi, trừ khi được khai báo rõ ràng trong ignoreExceptions để bỏ qua. Nếu bạn chỉ định một danh sách exception, mọi exception khác sẽ được xem là thành công, trừ khi chúng cũng được liệt kê trong ignoreExceptions để bỏ qua hoàn toàn. |
ignoreExceptions | empty | Danh sách các exception được bỏ qua – không được tính là lỗi cũng không được tính là thành công. Bất kỳ exception nào khớp hoặc kế thừa từ một trong các exception trong danh sách này sẽ không ảnh hưởng đến thống kê, ngay cả khi exception đó có nằm trong danh sách recordExceptions. |
recordFailurePredicate | throwable -> true By default all exceptions are recorded as failures. |
Một Predicate tùy chỉnh dùng để xác định liệu một exception có nên được ghi nhận là lỗi hay không. - Predicate phải trả về true nếu exception được tính là lỗi. - Predicate phải trả về false nếu exception được tính là thành công, trừ khi exception đó đã được chỉ định trong ignoreExceptions để bỏ qua. |
ignoreExceptionPredicate | throwable -> false By default no exception is ignored. |
Một Predicate tùy chỉnh dùng để xác định liệu một exception có nên bị bỏ qua hay không, tức không được tính là lỗi cũng không được tính là thành công. - Nếu Predicate trả về true: exception sẽ bị bỏ qua. - Nếu Predicate trả về false: exception sẽ được tính là lỗi. |
// Create a custom configuration for a CircuitBreaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() .failureRateThreshold(50) .slowCallRateThreshold(50) .waitDurationInOpenState(Duration.ofMillis(1000)) .slowCallDurationThreshold(Duration.ofSeconds(2)) .permittedNumberOfCallsInHalfOpenState(3) .minimumNumberOfCalls(10) .slidingWindowType(SlidingWindowType.TIME_BASED) .slidingWindowSize(5) .recordException(e -> INTERNAL_SERVER_ERROR .equals(getResponse().getStatus())) .recordExceptions(IOException.class, TimeoutException.class) .ignoreExceptions(BusinessException.class, OtherBusinessException.class) .build(); // Create a CircuitBreakerRegistry with a custom global configuration
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(circuitBreakerConfig); // Get or create a CircuitBreaker from the CircuitBreakerRegistry
// with the global default configuration
CircuitBreaker circuitBreakerWithDefaultConfig = circuitBreakerRegistry.circuitBreaker("name1"); // Get or create a CircuitBreaker from the CircuitBreakerRegistry
// with a custom configuration
CircuitBreaker circuitBreakerWithCustomConfig = circuitBreakerRegistry .circuitBreaker("name2", circuitBreakerConfig);
Bạn có thể thêm các cấu hình có thể được dùng chung bởi nhiều instance của CircuitBreaker.
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() .failureRateThreshold(70) .build(); circuitBreakerRegistry.addConfiguration("someSharedConfig", config); CircuitBreaker circuitBreaker = circuitBreakerRegistry .circuitBreaker("name", "someSharedConfig");
Bạn có thể ghi đè (overwrite) các cấu hình.
CircuitBreakerConfig defaultConfig = circuitBreakerRegistry .getDefaultConfig(); CircuitBreakerConfig overwrittenConfig = CircuitBreakerConfig .from(defaultConfig) .waitDurationInOpenState(Duration.ofSeconds(20)) .build();
Nếu bạn không muốn sử dụng CircuitBreakerRegistry để quản lý các instance của CircuitBreaker, bạn cũng có thể tạo các instance trực tiếp.
// Create a custom configuration for a CircuitBreaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() .recordExceptions(IOException.class, TimeoutException.class) .ignoreExceptions(BusinessException.class, OtherBusinessException.class) .build(); CircuitBreaker customCircuitBreaker = CircuitBreaker .of("testName", circuitBreakerConfig);
Ngoài ra, bạn cũng có thể tạo CircuitBreakerRegistry bằng cách sử dụng các phương thức builder.
Map <String, String> circuitBreakerTags = Map.of("key1", "value1", "key2", "value2"); CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.custom() .withCircuitBreakerConfig(CircuitBreakerConfig.ofDefaults()) .addRegistryEventConsumer(new RegistryEventConsumer() { @Override public void onEntryAddedEvent(EntryAddedEvent entryAddedEvent) { // implementation } @Override public void onEntryRemovedEvent(EntryRemovedEvent entryRemoveEvent) { // implementation } @Override public void onEntryReplacedEvent(EntryReplacedEvent entryReplacedEvent) { // implementation } }) .withTags(circuitBreakerTags) .build(); CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("testName");
Nếu bạn muốn tích hợp (plug in) implementation riêng của mình cho Registry, bạn có thể cung cấp một implementation tùy chỉnh của interface RegistryStore và tích hợp nó thông qua phương thức builder.
CircuitBreakerRegistry registry = CircuitBreakerRegistry.custom() .withRegistryStore(new YourRegistryStoreImplementation()) .withCircuitBreakerConfig(CircuitBreakerConfig.ofDefaults()) .build();
Decorate and execute a functional interface
Bạn có thể bọc (decorate) bất kỳ Callable, Supplier, Runnable, Consumer, CheckedRunnable, CheckedSupplier, CheckedConsumer hoặc CompletionStage nào với một CircuitBreaker.
Bạn có thể gọi hàm đã được bọc bằng Try.of(…) hoặc Try.run(…) từ thư viện Vavr. Việc này cho phép xâu chuỗi (chain) các hàm tiếp theo với map, flatMap, filter, recover hoặc andThen. Các hàm được xâu chuỗi này chỉ được gọi khi CircuitBreaker đang ở trạng thái CLOSED hoặc HALF_OPEN.
Ví dụ sau đây, Try.of(…) sẽ trả về một Success< String > (Monad) nếu lời gọi hàm thành công. Nếu hàm ném ra exception, một Failure < Throwable> sẽ được trả về và hàm map sẽ không được gọi.
// Given
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName"); // When I decorate my function
CheckedFunction0<String> decoratedSupplier = CircuitBreaker .decorateCheckedSupplier(circuitBreaker, () -> "This can be any method which returns: 'Hello"); // and chain an other function with map
Try<String> result = Try.of(decoratedSupplier) .map(value -> value + " world'"); // Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");
Consume emitted RegistryEvents
Bạn có thể đăng ký một event consumer trên CircuitBreakerRegistry để thực hiện hành động mỗi khi một CircuitBreaker được tạo mới, thay thế hoặc xóa.
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
circuitBreakerRegistry.getEventPublisher() .onEntryAdded(entryAddedEvent -> { CircuitBreaker addedCircuitBreaker = entryAddedEvent.getAddedEntry(); LOG.info("CircuitBreaker {} added", addedCircuitBreaker.getName()); }) .onEntryRemoved(entryRemovedEvent -> { CircuitBreaker removedCircuitBreaker = entryRemovedEvent.getRemovedEntry(); LOG.info("CircuitBreaker {} removed", removedCircuitBreaker.getName()); });
Consume emitted CircuitBreakerEvents
Một CircuitBreakerEvent có thể là:
- một chuyển đổi trạng thái (state transition),
- một lần reset CircuitBreaker,
- một cuộc gọi thành công,
- một lỗi được ghi nhận,
- hoặc một lỗi bị bỏ qua.
Tất cả các sự kiện này đều chứa thông tin bổ sung như thời gian tạo sự kiện và thời gian xử lý cuộc gọi.
Nếu bạn muốn lắng nghe và xử lý các sự kiện này, bạn cần phải đăng ký một event consumer.
circuitBreaker.getEventPublisher() .onSuccess(event -> logger.info(...)) .onError(event -> logger.info(...)) .onIgnoredError(event -> logger.info(...)) .onReset(event -> logger.info(...)) .onStateTransition(event -> logger.info(...));
// Or if you want to register a consumer listening
// to all events, you can do:
circuitBreaker.getEventPublisher() .onEvent(event -> logger.info(...));
Bạn có thể sử dụng CircularEventConsumer để lưu trữ các sự kiện (events) trong một bộ đệm vòng (circular buffer) với dung lượng cố định.
CircularEventConsumer<CircuitBreakerEvent> ringBuffer = new CircularEventConsumer<>(10);
circuitBreaker.getEventPublisher().onEvent(ringBuffer);
List<CircuitBreakerEvent> bufferedEvents = ringBuffer.getBufferedEvents()
Bạn có thể sử dụng RxJava hoặc RxJava2 Adapters để chuyển đổi EventPublisher thành một Reactive Stream.
Override the RegistryStore
Bạn có thể ghi đè RegistryStore mặc định (lưu trữ trong bộ nhớ) bằng một implement tùy chỉnh. Ví dụ: nếu bạn muốn sử dụng một bộ nhớ đệm (cache) có khả năng tự động loại bỏ các instance không còn được sử dụng sau một khoảng thời gian nhất định.
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.custom() .withRegistryStore(new CacheCircuitBreakerRegistryStore()) .build();
Có thể tham khảo để dựng tại: https://resilience4j.readme.io/docs/examples
public class CircuitBreakerProvider { private static CircuitBreakerProvider cbInstance; private Set<String> checkCbExist = new HashSet<>(); private CircuitBreakerRegistry circuitBreakerRegistry; private static String sLockCBProvider = "LOCK"; private static String sLockCBEvent = "LOCK"; public static CircuitBreakerProvider getInstance() { // Tạo instance duy nhât để tạo CB với cấu hình if (cbInstance == null) { synchronized (sLockCBProvider) { if (cbInstance == null) { AppLog.info("Get new CircuitBreakerProvider"); cbInstance = new CircuitBreakerProvider(); } } } return cbInstance; } private CircuitBreakerProvider() { CircuitBreakerConfig config = CircuitBreakerConfig .custom() .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) .minimumNumberOfCalls(Constant.CB_MINIMUM_CALL) .slidingWindowSize(Constant.CB_SLIDE_WINDOW_SIZE) .failureRateThreshold(Constant.CB_FAILURE_RATE_THRESHOLD) .slowCallRateThreshold(Constant.CB_SLOW_CALL_RATE_THRESHOLD) .permittedNumberOfCallsInHalfOpenState(Constant.CB_PERMITTED_NUMBER_OF_CALL_HALF_OPEN_STATE) .waitDurationInOpenState(Duration.ofMillis(Constant.CB_WAIT_DURATION_IN_OPEN_STATE)) .slowCallDurationThreshold(Duration.ofMillis(Constant.CB_SLOW_CALL_DURATION_THRESHOLD)) .recordExceptions(CircuitBreakerException.class) .writableStackTraceEnabled(false) .build(); circuitBreakerRegistry = CircuitBreakerRegistry.of(config); } // Hàm get CB nếu chưa có thì sẽ tạo mới CB cho "name", ngược lại CB sẽ được tạo mới cho name private CircuitBreaker registry(String name) { CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(name); if (!checkCbExist.contains(name)) { // Phần này nhằm mục đích tạo event duy nhất cho CB, nếu không có thể bị log duplicat AppLog.info("Get new circuitBreaker and set event"); synchronized(sLockCBEvent) { if (!checkCbExist.contains(name)) { AppLog.info("Get new circuitBreaker and set event"); circuitBreaker.getEventPublisher().onStateTransition(e -> AppLog.error("State change for " + name + ": " + e.toString())); circuitBreaker.getEventPublisher().onError(e -> AppLog.error("Error for " + name + ": " + e.toString())); circuitBreaker.getEventPublisher().onSlowCallRateExceeded(e -> AppLog.error("Slow call for " + name + ": " + e.toString())); checkCbExist.add(name); } } } return circuitBreaker; } // Các hàm execute này nhằm mục đích đặt logic thực hiện ở Supplier vào trong CB, url truyền vào sẽ là key của CB, tùy theo logic mà resposse trả lại sẽ khác nhau, trong code hiện tại // chủ yều phục vụ gọi sang ms, logic trả về String, còn đối với grcp thường trả về object. Có thể thay đổi output để phục vụ các nghiệp vụ khác nhau. // Các hàm này sẽ lấy respone từ *FallBackHandler khi có lỗi xảy ra từ logic hoặc là khi CB dựng cờ. public Object executeGrpc(String url, String requestId, Supplier msSupplier) { CircuitBreaker circuitBreaker = registry(url); Supplier decorated = Decorators .ofSupplier(msSupplier) .withCircuitBreaker(circuitBreaker) .withFallback(e -> circuitBreakerGrpcFallBackHandler(url, requestId + " - " + e.toString())) .decorate(); return decorated.get(); } public String execute(String url, String requestId, Supplier<String> msSupplier) { CircuitBreaker circuitBreaker = registry(url); Supplier<String> decorated = Decorators .ofSupplier(msSupplier) .withCircuitBreaker(circuitBreaker) .withFallback(e -> circuitBreakerFallBackHandler(url, requestId + " - " + e.toString())) .decorate(); return decorated.get(); } public static void closeConnectAndThrowException(ManagedChannel channel, String errorMessage) { if (Constant.CB_METHOD_ENABLE.equals("Y")) { if (channel != null) { channel.shutdown(); } throw new CircuitBreakerException(errorMessage); } } // Hàm này wrap lại các exception để CB bắt các exception đó. Cờ CB_METHOD_ENABLE mục đích cho bật tắt CB public static void closeConnectAndThrowException(int responseCode, String responseData, HttpURLConnection conn, OutputStream output, InputStream input) { if (Constant.CB_METHOD_ENABLE.equals("Y")) { if (output != null) { try { output.close(); } catch (IOException ex) { AppLog.error(ex); } } if (input != null) { try { input.close(); } catch (IOException ex) { AppLog.error(ex); } } if (conn != null) { conn.disconnect(); } throw new CircuitBreakerException("ResponseCode=" + responseCode + ": " + responseData); } } private String mapToJson(Object obj) throws Exception { ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); return mapper.writeValueAsString(obj); } public String circuitBreakerFallBackHandler(String cbName, String errorMessage) { AppLog.error("CircuitBreaker " + cbName + " - " + errorMessage); String responseMessage = StringEscapeUtils.unescapeJava("Giao d\\u1ECBch ch\\u01B0a ho\\u00E0n t\\u1EA5t. B\\u1EA1n vui l\\u00F2ng th\\u1EF1c hi\\u1EC7n l\\u1EA1i sau \\u00EDt ph\\u00FAt"); BaseResponse baseResponse = new BaseResponse(); baseResponse.setStatus("503"); baseResponse.setSoaErrorCode(ResponseCode.CIRCUIT_BREAKER); baseResponse.setSoaErrorDesc(responseMessage); baseResponse.setMessage(responseMessage); try { return mapToJson(baseResponse); } catch (Exception e) { AppLog.error(e.toString()); return null; } } public BaseResponse circuitBreakerGrpcFallBackHandler(String cbName, String errorMessage) { AppLog.error("CircuitBreaker " + cbName + " - " + errorMessage); String responseMessage = StringEscapeUtils.unescapeJava("Giao d\\u1ECBch ch\\u01B0a ho\\u00E0n t\\u1EA5t. B\\u1EA1n vui l\\u00F2ng th\\u1EF1c hi\\u1EC7n l\\u1EA1i sau \\u00EDt ph\\u00FAt"); BaseResponse baseResponse = new BaseResponse(); baseResponse.setStatus("503"); baseResponse.setSoaErrorCode(ResponseCode.CIRCUIT_BREAKER_GRPC); baseResponse.setSoaErrorDesc(responseMessage); baseResponse.setMessage(responseMessage); return baseResponse; } }
Sử dụng các hàm trên thực hiện với các logic gọi vào bên trong:
public String sendRequest(Map<String, String> input, MicroserviceConstantUrl urlPath, String requestId, String clientMessageId, String transactionId, Cust cust, String ipMicroservice) { // Tạo supplier cho logic Supplier<String> msSupplier = () -> this.sendRequestLogic(input, urlPath, requestId, clientMessageId, transactionId, cust, ipMicroservice); return CircuitBreakerProvider.getInstance().execute(ipMicroservice + urlPath.getUrl(), requestId, msSupplier); // Thực hiện logic với CB
}
Về mặt cấu hình, hiện tại phần cấu hình cho CB đang được apply cho tất cả logic (phần tách riêng sẽ phát triển sau), có các tham số với các giá trị default sau
CIRCUIT BREAKER
CB_MINIMUM_CALL=100
CB_SLIDE_WINDOW_SIZE=100
CB_FAILURE_RATE_THRESHOLD=20
CB_SLOW_CALL_RATE_THRESHOLD=20
CB_SLOW_CALL_DURATION_THRESHOLD=120000
CB_PERMITTED_NUMBER_OF_CALL_HALF_OPEN_STATE=10
CB_WAIT_DURATION_IN_OPEN_STATE=60000
- CB_MINIMUM_CALL=100 Số lượng request tối thiểu được CB ghi lại trước khi tính ngưỡng lỗi.
- CB_SLIDE_WINDOW_SIZE=100 Cửa sổ đo - số lượng request từ thời điểm hiện tại trở về trước mà CB dùng để kiểm tra trạng thái hiện tại của nó.
- CB_FAILURE_RATE_THRESHOLD=20 Tỉ lệ lỗi, nếu như số lượng lỗi lớn hơn hoặc bằng ngưỡng này, thì CB sẽ cho là hệ thống đích đang có lỗi và chuyển sang trạng thái OPEN. Với cấu hình hiện tại, sẽ có nghĩa là từ thời điểm hiện tại trở về 100 request trước đó, nếu như lớn hơn hoặc bằng 20 request có lỗi CB sẽ chuyển qua trạng thái OPEN và không cho phép bất cứ request nào gọi sang hệ thống đích trong một khoảng thời gian được cấu hình.
- CB_SLOW_CALL_RATE_THRESHOLD=20 Tương tự là tỉ lệ lỗi nhưng đối với trường hợp các request là slow call
- CB_SLOW_CALL_DURATION_THRESHOLD=120000 Ngưỡng khi gọi một request được coi là chậm (120s), phần này với các service hiện tại có thể bỏ qua do đã có cơ chế timeout request. Khi timeout xảy ra, thì CB sẽ ghi nhận như trường hợp lỗi bên trên.
- CB_PERMITTED_NUMBER_OF_CALL_HALF_OPEN_STATE=10 Số lượng request được thực hiện ở trạng thái Half-Open, nếu số request nhiều hơn, thì request thừa sẽ bị reject không cho phép gọi vào hệ thống đích. Ở trạng thái này nếu thực hiện request không có lỗi thì CB sẽ chuyển về trạng thái bình thường (CLOSE), còn nếu lỗi sẽ chuyển lại trạng thái Open. Việc này nhằm giúp kiểm tra hệ thống đích còn lỗi hay không trước khi mở cho toàn bộ request đẩy vào
- CB_WAIT_DURATION_IN_OPEN_STATE=60000 Thời gian chờ ở trạng thái OPEN. Ở trạng thái này bất cứ request nào đến sẽ không được phép gọi sang hệ thống khác, mà trả lại lỗi ngay. Trạng thái này sẽ giữ trong khoảng thời gian waitDuration này.
2. RateLimitter
Rate Limiter là cơ chế hạn chế số lượng request gửi (rate) đến hệ thống trong một khoảng thời gian nào đó. Giả sử như hệ thống thường chỉ xử lý được một trăm request/s chẳng hạn trong khi có đến hàng nghìn request được gửi đến, thì hệ thống sẽ bị quá tải không xử lý hết tất cả các request. Để giải quyết được vấn đề này thì cơ chế RateLimiter đã ra đời. Mục đích của nó chỉ cho phép nhận 1 số lượng request nhất định trong 1 đơn vị thời gian nào đó. Nếu quá sẽ trả về response lỗi. Cơ chế này nhằm mục đích bảo vệ hệ thống xử lý tránh bị quá tải:
1 số ví dụ hay gặp trong Rate Limiting như:
Giới hạn số lượng request đến một API trong một khoảng thời gian nào đó Mỗi địa chỉ IP chỉ có thể tạo được 3 account trong 1 ngày. Mỗi người dùng chúng ta chỉ cho phép gửi 200 request/s. Nếu vượt quá thì sẽ trả về response lỗi. Mỗi người dùng chỉ cho phép nhập sai thẻ credit 3 lần trong 1 ngày ...
Hiện tại sẽ chỉ tập trung sử dụng cho việc giới hạn số lượng request đến API, sử dụng bộ thư viện của Resilience4j. Cách làm tương tự như phần CB:
public class RateLimitProvider { private static RateLimitProvider rateLimitInstance; private Set<String> rateLimitSet = new HashSet<>(); private RateLimiterRegistry registry; private static String sLockRateLimitProvider = "LOCK"; private static String sLockRateLimitEvent = "LOCK"; public static RateLimitProvider getInstance() { // Tạo instance duy nhất để quản lý các RateLimitter với cấu hình default if (rateLimitInstance == null) { synchronized(sLockRateLimitProvider) { if (rateLimitInstance == null) { AppLog.info("Get new RateLimitProvider"); rateLimitInstance = new RateLimitProvider(); } } } return rateLimitInstance; } private RateLimitProvider() { registry = RateLimiterRegistry.of(RateLimiterConfig.ofDefaults()); } // Thực hiện lấy hoặc tạo mới RateLimiter với name và cấu hình. private RateLimiter registry(String name, RateLimiterConfig rateLimiterConfig) { RateLimiter rateLimiter = registry.rateLimiter(name, rateLimiterConfig); if (!rateLimitSet.contains(name)) { synchronized(sLockRateLimitEvent) { //Đảm bảo RateLimiter được tạo với một event duy nhất tương ứng cho việc ghi log. if (!rateLimitSet.contains(name)) { AppLog.info("Get new rateLimiter and set event"); rateLimiter.getEventPublisher().onFailure(e -> AppLog.error("RateLimiter onFailure " + name + ": " + e)); rateLimitSet.add(name); } } } return rateLimiter; } // Hàm wrapper logic cần limit với name tương ứng. Hàm bên dưới app cho phần validate ở tầng Controller ngay đầu vào. Với response là Result. public Result executeValidateRequest(Supplier<Result> resultSupplier, String name) { if (!Constant.rateLimiterConfig.containsKey(name)) { // Kiểm tra nếu như được cấu hình thì mới apply RateLimiter return resultSupplier.get(); } RateLimiter rateLimiter = registry(name, Constant.rateLimiterConfig.get(name)); Supplier<Result> decorateSupplier = RateLimiter.decorateSupplier(rateLimiter, resultSupplier); try { return decorateSupplier.get(); } catch (RequestNotPermitted e) { return new SimpleResult(e.getMessage(), false, ResponseCode.RATE_LIMIT_REQUEST_NOT_PERMIT); } }
}
Thực hiện với phần validate cần limit bằng cách thêm logic cho RateLimit trong hàm validate (ở đây dùng cách wrap lại để không cần chỉnh sửa nhiều):
public Result validate (BaseRequest request) { String uri = httpRequest.getRequestURI().replace("/", ".").toUpperCase(); uri = uri.substring(uri.indexOf('.', uri.indexOf(".") + 1)); // Phần supplier chỉ cần tạo logic đơn giản là ()->Result.OK, Bình thường khi chưa đủ ngưỡng thì hàm executeValidateRequest sẽ trả lại Result là OK và thực hiện logic (doValidate(request)) bình thường. // Khi RateLimit vượt ngưỡng cấu hình thì result sẽ trả ra object lỗi từ bên trong object RateLimitProvider. Result result = RateLimitProvider.getInstance().executeValidateRequest(()->Result.OK, uri); if (!result.isOk()) { return result; } return doValidate(request);
}
Về mặt cấu hình
#Dưới đây là cấu hình Ratelimiter cho api /accountms/getbalance chỉ cho phép thực hiện 10 request /1000ms. Các request khác nếu đã có hơn 10 Request, thì sẽ chờ 100ms nếu như các request trước đó đã xong thì sẽ được phép thực hiện, trái lại sẽ trả ra lỗi. Như ví dụ trên chỉ cấu hình trả lại là Result.OK. thì phần timeout này cần phải set về 0 hoặc không set.
RATELIMITER.ACCOUNTMS.GETBALANCE.LIMIT_PERIOD=10
RATELIMITER.ACCOUNTMS.GETBALANCE.LIMIT_REFRESH_PERIOD=1000
RATELIMITER.ACCOUNTMS.GETBALANCE.LIMIT_TIMEOUT_DUARATION=100
3. Bulkhead
Bulkhead là một design pattern, cũng giống như RateLimit dùng để bảo vệ đầu server tránh khỏi việc bị quá tải bằng cách hạn chế số lượng request đồng thời (concurrent), nghĩa là tại một thời điểm server sẽ chỉ phục vụ một số lượng request nào đó. Nếu như số lượng request vượt ngưỡng thì sẽ trả lại lỗi ngay lập tức và không xử lý request đó. Khác với limit là sẽ hạn chế số request trong một khoảng thời gian, thì bulkhead sẽ hạn chế request đồng thời.
Tương tự như Ratelimiter, sẽ sử dụng Bulkhead để hạn số lượng request đến API tại một thời điểm, sử dụng bộ thư viện của Resilience4j:
public class BulkheadProvider { private static BulkheadProvider bulkheadInstance; private Set<String> bulkheadSet = new HashSet<>(); private BulkheadRegistry registry; private static String sLockBulkheadProvider = "LOCK"; private static String sLockBulkheadEvent = "LOCK"; public static BulkheadProvider getInstance() { // Tạo instance duy nhất để quản lý các Bulkhead với cấu hình default if (bulkheadInstance == null) { synchronized(sLockBulkheadProvider) { if (bulkheadInstance == null) { AppLog.info("Get new BulkheadProvider"); bulkheadInstance = new BulkheadProvider(); } } } return bulkheadInstance; } private BulkheadProvider() { registry = BulkheadRegistry.of(BulkheadConfig.ofDefaults()); } private Bulkhead registry(String name, BulkheadConfig config) { Bulkhead bulkhead = registry.bulkhead(name, config); if (!bulkheadSet.contains(name)) { synchronized(sLockBulkheadEvent) { //Đảm bảo Bulkhead được tạo với một event duy nhất tương ứng cho việc ghi log. if (!bulkheadSet.contains(name)) { AppLog.info("Get new bulkhead and set event"); bulkhead.getEventPublisher().onCallRejected(e -> AppLog.error("BulkHead Call Rejected " + name + ": " + e)); bulkheadSet.add(name); } } } return bulkhead; } // Hàm wrapper logic cần Thực hiện bulk head với name tương ứng. Hàm bên dưới app cho phần validate ở tầng Controller ngay đầu vào. Với response là Result: public Result executeValidateRequest(Supplier<Result> resultSupplier, String name) { if (!Constant.bulkHeadConfig.containsKey(name)) { return resultSupplier.get(); } Bulkhead bulkhead = registry(name, Constant.bulkHeadConfig.get(name)); Supplier<Result> decorateSupplier = Bulkhead.decorateSupplier(bulkhead, resultSupplier); try { return decorateSupplier.get(); } catch (BulkheadFullException e) { return new SimpleResult(e.getMessage(), false, ResponseCode.BULK_HEAD_FULL); } }
}
Apply vào việc validate:
public Result validate (BaseRequest request) { String uri = httpRequest.getRequestURI().replace("/", ".").toUpperCase(); uri = uri.substring(uri.indexOf('.', uri.indexOf(".") + 1)); // Phần supplier chỉ cần tạo logic đơn giản là ()->Result.OK, Bình thường khi chưa đủ ngưỡng thì hàm executeValidateRequest sẽ trả lại Result là OK và thực hiện logic (doValidate(request)) bình thường. // Khi RateLimit vượt ngưỡng cấu hình thì result sẽ trả ra object lỗi từ bên trong object Bulkhead. Result result = BulkheadProvider.getInstance().executeValidateRequest(()->Result.OK, uri); if (!result.isOk()) { return result; } return doValidate(request);
}
Cấu hình:
#Dưới đây là cấu hình Bulkhead cho api /accountms/getbalance chỉ cho phép thực hiện 10 request được xử ý đồng thời, khi đang có hơn 10 Request được xử lý, thì các request khác sẽ chờ timeout=100ms cho đến khi các request trước đó thực hiện xong, ngược lại sẽ trả ra lỗi. Như ví dụ trên chỉ cấu hình trả lại là Result.OK, thì phần waitime này cần phải set về 0 hoặc không set.
BULKHEAD.ACCOUNTMS.GETBALANCE.MAX_CONCURRENT=10
BULKHEAD.ACCOUNTMS.GETBALANCE.WAIT_TIME=100