Với framework gRPC, chúng ta có thể gửi nhiều messages giữa Client và Server thông qua một kết nối TCP duy nhất. Nó được gọi là Multiplexing.
Trong gRPC client streaming
, Client có thể gửi nhiều request đến Server. Sau khi Client xác nhận rằng nó đã gửi tất cả các request, Server sẽ gửi lại một response duy nhất cho Client.
Một case study ví dụ như chức năng upload file, trong đó Client upload một file lớn bằng cách chia thành nhiều phần nhỏ.
Sample Application
Để mọi thứ đơn giản, chúng ta sẽ giả sử rằng Client gửi nhiều số đến Server, Server sẽ gửi kết quả là tổng của tất cả các số khi Client thực hiện xong tất cả các request của nó.
Protobuf – Service Definition
Khi biết nghiệp vụ, request và response cần là gì, chúng ta sẽ định nghĩa service xử lý nghiệp vụ cho chúng. Phương thức sumAll được implements ở phía Server nhận kiểu dữ liệu đầu vào và trả kiểu dữ liệu đầu ra theo như mong đợi. Chúng ta sử dụng từ khóa stream để chỉ ra rằng đó sẽ là client side streaming request
. File .proto định nghĩa service trong ví dụ này như sau:
syntax = "proto3"; package calculator; option java_package = "example.calculator";
option java_multiple_files = true; message NumberRequest { int32 number = 1;
} message NumberResponse { int64 result = 1;
} service CalculatorService { // client stream rpc sumAll(stream NumberRequest) returns (NumberResponse) {};
}
Khi chúng ta chạy lệnh maven dưới đây, maven sẽ tự động tạo code cho client application và server application bằng công cụ protoc.
mvn clean compile
Class CalculatorServiceImplBase là abstract class được tạo tự động khi gen code cần được phía Server implements. Tương tự CalculatorServiceStub là class được tạo tự động khi gen code mà phía client application sử dụng để gửi yêu cầu đến server.
gRPC Client Streaming – Server Side
Service Implementation: class này kế thừa abstract class CalculatorServiceImplBase để implement phương thức sumAll (triển khai nghiệp vụ) và phản hồi lại cho request gọi phương thức sumAll ở phía Client. Khi Client gọi phương thức onCompleted
, nó sẽ thông báo cho Server rằng nó đã gửi hết request. Lúc đó Server sẽ thực hiện gửi lại response.
- Success response: Server tính tổng cho tất cả các số mà Client gửi đến và response lại bằng cách sử dụng
responseObserver
. Dữ liệu trả về thông qua phương thứconNext
tới Client đang gọi và cũng thông báo cho client rằng server đã hoàn thành công việc bằng cách gọi phương thứconCompleted
. - Error response: Trong trường hợp có bất kỳ lỗi nào trong quá trình xử lý, server sẽ sử dụng phương thức
onError
để thông báo cho client. Chúng ta sẽ thảo luận về điều cách xử lý lỗi trong một bài viết khác.
public class NumberRequestObserver implements StreamObserver<NumberRequest> { private int sum = 0; private final StreamObserver<NumberResponse> responseObserver; public NumberRequestObserver(StreamObserver<NumberResponse> responseObserver) { this.responseObserver = responseObserver; } @Override public void onNext(NumberRequest numberRequest) { System.out.println("=========== client onNext ==========="); System.out.println("numberRequest: " + numberRequest.getNumber()); sum = sum + numberRequest.getNumber(); } @Override public void onError(Throwable throwable) { System.out.println("=========== client onError ==========="); throwable.getMessage(); throwable.printStackTrace(); } @Override public void onCompleted() { System.out.println("=========== client onCompleted ==========="); NumberResponse response = NumberResponse.newBuilder().setResult(sum).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } }
public class CalculatorService extends CalculatorServiceGrpc.CalculatorServiceImplBase { @Override public StreamObserver<NumberRequest> sumAll(StreamObserver<NumberResponse> responseObserver) { return new NumberRequestObserver(responseObserver); }
}
Sau khi implements xong, chúng ta cần start gRPC server để cung cấp dịch vụ cho Client.
public class CalculatorGrpcServer { public static void main(String[] args) throws IOException, InterruptedException { // build gRPC server Server server = ServerBuilder.forPort(6565) .addService(new CalculatorService()) .build(); // start server.start(); // shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("gRPC server is shutting down!"); server.shutdown(); })); server.awaitTermination(); } }
Bây giờ thì gRPC server đã sẵn sàng. Chúng ta sẽ qua Client Side.
gRPC Client Streaming – Client Side
Trên phía Client chúng ta cần thực hiện các bước sau để gửi request và nhận lại response. Bước đầu tiên để thực hiện gửi request, chúng ta cần có một implementation của StreamObserver
. Điều này là để in giá trị kết quả trả về khi Client nhận được phản hồi từ Server.
public class NumberResponseObserver implements StreamObserver<NumberResponse> { @Override public void onNext(NumberResponse response) { System.out.println("=========== server onNext ==========="); System.out.println("Result : " + response.getResult()); } @Override public void onError(Throwable throwable) { System.out.println("=========== server onError ==========="); throwable.getMessage(); throwable.printStackTrace(); } @Override public void onCompleted() { System.out.println("=========== server onCompleted ==========="); } }
Tiếp theo là tạo kết nối và gửi request:
- Tạo kênh (create channel): client apllication phải tạo một kênh kết nối với gRPC server.
- Stub: Client apllication sẽ sử dụng
non-blocking stub
để thực hiện truyền tham số và gửi request.
Trong ví dụ này chúng ta sẽ tạo một class JUnit để hoạt động như một gRPC client. Hãy lưu ý rằng client application (hay gRPC client) có thể là bất cứ ngôn ngữ gì. Nó thậm chí có thể là một microservice khác.
public class ClientStreamingTest { private ManagedChannel channel; private CalculatorServiceGrpc.CalculatorServiceStub clientStub; @Before public void setup(){ this.channel = ManagedChannelBuilder.forAddress("localhost", 6565) .usePlaintext() .build(); this.clientStub = CalculatorServiceGrpc.newStub(channel); } @Test public void clientStreamingSumTest() throws InterruptedException { // pass the output stream observer & receive the input stream observer StreamObserver<NumberRequest> inputStreamObserver = this.clientStub.sumAll(new NumberResponseObserver()); for (int i = 0; i <= 100; i++) { // build the request object NumberRequest input = NumberRequest.newBuilder() .setNumber(i) .build(); // pass the request object via input stream observer inputStreamObserver.onNext(input); } // client side is done. this method make the server respond with the sum value inputStreamObserver.onCompleted(); // just for testing Thread.sleep(3000); } @After public void teardown(){ this.channel.shutdown(); } }
Bây giờ chúng ta đã sẵn sàng để gửi request và nhận response từ Server. Client sẽ gửi 100 số từ 1 đến 100 đến Server. Khi nó gọi onCompleted
, Server gửi về response với giá trị bằng tổng của 100 số gửi đến và nó được in trên console phía Client.
=========== server onNext ===========
Result : 5050
=========== server onCompleted ===========
Tổng kết
Vậy là chúng ta vừa làm quen cách tạo một ứng dụng gRPC Client Streaming. Hi vọng bài viết hữu ích với mọi người.
Nguồn: https://thenewstack.wordpress.com/2021/11/23/grpc-grpc-client-streaming/
Follow me: thenewstack.wordpress.com