- vừa được xem lúc

[gRPC] - gRPC Client Streaming

0 0 23

Người đăng: TheLight

Theo Viblo Asia

Với framework gRPC, chúng ta có thể gửi nhiều messages giữa Client và Server thông qua một kết nối TCP duy nhất. Nó được gọi là Multiplexing.

Trong gRPC client streaming, Client có thể gửi nhiều request đến Server. Sau khi Client xác nhận rằng nó đã gửi tất cả các request, Server sẽ gửi lại một response duy nhất cho Client.

Một case study ví dụ như chức năng upload file, trong đó Client upload một file lớn bằng cách chia thành nhiều phần nhỏ.

Sample Application

Để mọi thứ đơn giản, chúng ta sẽ giả sử rằng Client gửi nhiều số đến Server, Server sẽ gửi kết quả là tổng của tất cả các số khi Client thực hiện xong tất cả các request của nó.

Protobuf – Service Definition

Khi biết nghiệp vụ, request và response cần là gì, chúng ta sẽ định nghĩa service xử lý nghiệp vụ cho chúng. Phương thức sumAll được implements ở phía Server nhận kiểu dữ liệu đầu vào và trả kiểu dữ liệu đầu ra theo như mong đợi. Chúng ta sử dụng từ khóa stream để chỉ ra rằng đó sẽ là client side streaming request. File .proto định nghĩa service trong ví dụ này như sau:

syntax = "proto3"; package calculator; option java_package = "example.calculator";
option java_multiple_files = true; message NumberRequest { int32 number = 1;
} message NumberResponse { int64 result = 1;
} service CalculatorService { // client stream rpc sumAll(stream NumberRequest) returns (NumberResponse) {};
}

Khi chúng ta chạy lệnh maven dưới đây, maven sẽ tự động tạo code cho client application và server application bằng công cụ protoc.

mvn clean compile

Class CalculatorServiceImplBase là abstract class được tạo tự động khi gen code cần được phía Server implements. Tương tự CalculatorServiceStub là class được tạo tự động khi gen code mà phía client application sử dụng để gửi yêu cầu đến server.

gRPC Client Streaming – Server Side

Service Implementation: class này kế thừa abstract class CalculatorServiceImplBase để implement phương thức sumAll (triển khai nghiệp vụ) và phản hồi lại cho request gọi phương thức sumAll ở phía Client. Khi Client gọi phương thức onCompleted, nó sẽ thông báo cho Server rằng nó đã gửi hết request. Lúc đó Server sẽ thực hiện gửi lại response.

  • Success response: Server tính tổng cho tất cả các số mà Client gửi đến và response lại bằng cách sử dụng responseObserver. Dữ liệu trả về thông qua phương thức onNext tới Client đang gọi và cũng thông báo cho client rằng server đã hoàn thành công việc bằng cách gọi phương thức onCompleted.
  • Error response: Trong trường hợp có bất kỳ lỗi nào trong quá trình xử lý, server sẽ sử dụng phương thức onError để thông báo cho client. Chúng ta sẽ thảo luận về điều cách xử lý lỗi trong một bài viết khác.
public class NumberRequestObserver implements StreamObserver<NumberRequest> { private int sum = 0; private final StreamObserver<NumberResponse> responseObserver; public NumberRequestObserver(StreamObserver<NumberResponse> responseObserver) { this.responseObserver = responseObserver; } @Override public void onNext(NumberRequest numberRequest) { System.out.println("=========== client onNext ==========="); System.out.println("numberRequest: " + numberRequest.getNumber()); sum = sum + numberRequest.getNumber(); } @Override public void onError(Throwable throwable) { System.out.println("=========== client onError ==========="); throwable.getMessage(); throwable.printStackTrace(); } @Override public void onCompleted() { System.out.println("=========== client onCompleted ==========="); NumberResponse response = NumberResponse.newBuilder().setResult(sum).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } }
public class CalculatorService extends CalculatorServiceGrpc.CalculatorServiceImplBase { @Override public StreamObserver<NumberRequest> sumAll(StreamObserver<NumberResponse> responseObserver) { return new NumberRequestObserver(responseObserver); }
} 

Sau khi implements xong, chúng ta cần start gRPC server để cung cấp dịch vụ cho Client.

public class CalculatorGrpcServer { public static void main(String[] args) throws IOException, InterruptedException { // build gRPC server Server server = ServerBuilder.forPort(6565) .addService(new CalculatorService()) .build(); // start server.start(); // shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("gRPC server is shutting down!"); server.shutdown(); })); server.awaitTermination(); } }

Bây giờ thì gRPC server đã sẵn sàng. Chúng ta sẽ qua Client Side.

gRPC Client Streaming – Client Side

Trên phía Client chúng ta cần thực hiện các bước sau để gửi request và nhận lại response. Bước đầu tiên để thực hiện gửi request, chúng ta cần có một implementation của StreamObserver. Điều này là để in giá trị kết quả trả về khi Client nhận được phản hồi từ Server.

public class NumberResponseObserver implements StreamObserver<NumberResponse> { @Override public void onNext(NumberResponse response) { System.out.println("=========== server onNext ==========="); System.out.println("Result : " + response.getResult()); } @Override public void onError(Throwable throwable) { System.out.println("=========== server onError ==========="); throwable.getMessage(); throwable.printStackTrace(); } @Override public void onCompleted() { System.out.println("=========== server onCompleted ==========="); } } 

Tiếp theo là tạo kết nối và gửi request:

  • Tạo kênh (create channel): client apllication phải tạo một kênh kết nối với gRPC server.
  • Stub: Client apllication sẽ sử dụng non-blocking stub để thực hiện truyền tham số và gửi request.

Trong ví dụ này chúng ta sẽ tạo một class JUnit để hoạt động như một gRPC client. Hãy lưu ý rằng client application (hay gRPC client) có thể là bất cứ ngôn ngữ gì. Nó thậm chí có thể là một microservice khác.

public class ClientStreamingTest { private ManagedChannel channel; private CalculatorServiceGrpc.CalculatorServiceStub clientStub; @Before public void setup(){ this.channel = ManagedChannelBuilder.forAddress("localhost", 6565) .usePlaintext() .build(); this.clientStub = CalculatorServiceGrpc.newStub(channel); } @Test public void clientStreamingSumTest() throws InterruptedException { // pass the output stream observer & receive the input stream observer StreamObserver<NumberRequest> inputStreamObserver = this.clientStub.sumAll(new NumberResponseObserver()); for (int i = 0; i <= 100; i++) { // build the request object NumberRequest input = NumberRequest.newBuilder() .setNumber(i) .build(); // pass the request object via input stream observer inputStreamObserver.onNext(input); } // client side is done. this method make the server respond with the sum value inputStreamObserver.onCompleted(); // just for testing Thread.sleep(3000); } @After public void teardown(){ this.channel.shutdown(); } }

Bây giờ chúng ta đã sẵn sàng để gửi request và nhận response từ Server. Client sẽ gửi 100 số từ 1 đến 100 đến Server. Khi nó gọi onCompleted, Server gửi về response với giá trị bằng tổng của 100 số gửi đến và nó được in trên console phía Client.

=========== server onNext ===========
Result : 5050
=========== server onCompleted ===========

Tổng kết

Vậy là chúng ta vừa làm quen cách tạo một ứng dụng gRPC Client Streaming. Hi vọng bài viết hữu ích với mọi người.

Nguồn: https://thenewstack.wordpress.com/2021/11/23/grpc-grpc-client-streaming/

Follow me: thenewstack.wordpress.com

Bình luận

Bài viết tương tự

- vừa được xem lúc

gRPC - Nó là gì và có nên sử dụng hay không?

Nhân một ngày rảnh rỗi, mình ngồi đọc lại RPC cũng như gRPC viết lại để nhớ lâu hơn. Vấn đề là gì và tại sao cần nó .

0 0 112

- vừa được xem lúc

Build gRPC client iOS đơn giản

Introduction. Trong bài viết trước, chúng ta đã cùng nhau tìm hiểu về gRPC và cách để build một gRPC server bằng node.js với các chức năng CRUD đơn giản:. https://viblo.

0 0 24

- vừa được xem lúc

Build CRUD Server đơn giản với gRPC và node.js

. gRPC là một framework RPC (remote procedure call) do Google phát triển, đã thu hút được nhiều sự quan tâm của cộng đồng software developer trong những năm vừa qua. Đặc biệt, gRPC được ứng dụng nhiều trong các hệ thống microservice bởi nhiều đặc tính vượt trội như: open source, không phụ thuộc vào

0 0 154

- vừa được xem lúc

Microservice với Golang, NodeJS và gRPC (Phần 2)

Tiếp tục phần 1, phần này mình sẽ tạo một con node server để connect đến core server và cũng chỉ để hiển thị hello world. Node Server.

0 0 103

- vừa được xem lúc

Microservice với Golang, NodeJS và gRPC (Phần 1)

Đặt vấn đề. .

0 0 40

- vừa được xem lúc

Biến ứng dụng Laravel của bạn trở nên phức tạp hơn với gRPC

gRPC là gì . RPC.

0 0 278