Trong mô hình gRPC Bidirectional Streaming
, Client và Server có thể trao đổi nhiều request và response qua một kết nối TCP duy nhất. Các request và response này có thể hoàn toàn độc lập với nhau. Client và Server sẽ đóng cuộc gọi khi chúng hoàn tất việc nghiệp vụ.
Hãy xem một số ví dụ ứng dụng streaming ở 2 phía Client và Server.
- Màn hình tìm kiếm của Google: Ngay sau khi chúng ta nhập một từ khóa, nó sẽ được gửi đến máy chủ và máy chủ ngay lập tức phản hồi với các từ khóa tìm kiếm có thể.
- Netflix / YouTube: Dựa trên các video chúng ta tìm kiếm/xem, chúng ta sẽ có thêm đề xuất liên quan đến các video đó.
Sample Application
Trong ví dụ này, chúng ta sẽ tạo ra một ứng dụng GPS cho ô tô. Ví dụ đi từ điểm A đến điểm B cách nhau 100m. Khi bắt đầu lái xe, chúng ta sẽ đồng bộ với máy chủ sau mỗi 3 giây và theo dõi vị trí của chúng ta. Máy chủ sẽ phản hồi khoảng cách còn lại đến điểm đến B và thời gian ước tính cần thiết để đến đích cho đến khi chuyến đi hoàn thành.
Protobuf – Service Definition
Khi biết nghiệp vụ, chúng ta sẽ định nghĩa service xử lý nghiệp vụ cho chúng. Phương thức navigate được implements ở phía Server nhận kiểu dữ liệu đầu vào và trả kiểu dữ liệu đầu ra theo như mong đợi. Chúng ta sử dụng từ khóa stream ở cả request và response để chỉ ra rằng đó sẽ là dịch vụ streaming ở cả 2 chiều.
syntax = "proto3"; package gps; option java_package = "example.gps";
option java_multiple_files = true; message TripRequest { int32 distanceTravelled = 1;
} message TripResponse { int32 remainingDistance = 1; int32 timeToDestination = 2;
} service NavigationService { // grpc bidirectional stream rpc navigate(stream TripRequest) returns (stream TripResponse);
}
Khi chúng ta chạy lệnh maven dưới đây, maven sẽ tự động tạo code cho client application và server application bằng công cụ protoc.
mvn clean compile
Class NavigatorServiceImplBase là abstract class được tạo tự động khi gen code cần được phía Server implements. Tương tự NavigatorServiceStub là class mà phía client application sử dụng để gửi yêu cầu đến server.
gRPC Bidirectional Streaming – Server Side
Service Implementation: class này kế thừa abstract class NavigatorServiceImplBase để implement phương thức navigate (triển khai nghiệp vụ) và phản hồi lại cho request gọi phương thức placeOrder ở phía Client. Client sẽ gửi nhiều request đến Server và nhận về nhiều response. Khi Client gọi phương thức onCompleted
, nó sẽ thông báo cho Server rằng nó đã đến đích. Lúc đó Server cũng có thể kết thúc cuộc gọi.
- Khi bắt đầu, chúng ta khởi tạo khoảng cách là 100m, chúng ta khởi tạo một con số bất kỳ là thời gian bắt đầu.
- Cứ sau 3 giây, Client sẽ gửi khoảng cách mà nó đã di chuyển được trong 3 giây trong lệnh gọi phương thức
onNext
. - Khi Server nhận được cuộc gọi
onNext
từ Client, Server sẽ kiểm tra tổng số quãng đường khách hàng đã đi và tính quãng đường còn lại. - Chúng ta cũng biết tốc độ của khách hàng (Quãng đường đã đi / Thời gian di chuyển). Sử dụng tốc độ này, chúng ta cũng tính toán khoảng thời gian cần thiết để đến đích.
public class TripRequestObserver implements StreamObserver<TripRequest> { private final int totalDistance = 100; private LocalTime startTime = LocalTime.now(); private int distanceTraveled; private final StreamObserver<TripResponse> tripResponseStreamObserver; public TripRequestObserver(StreamObserver<TripResponse> tripResponseStreamObserver) { this.tripResponseStreamObserver = tripResponseStreamObserver; } @Override public void onNext(TripRequest tripRequest) { this.distanceTraveled = Math.min(totalDistance, (this.distanceTraveled + tripRequest.getDistanceTravelled())); int remainingDistance = Math.max(0, (totalDistance - distanceTraveled)); // the client has reached destination if(remainingDistance == 0){ this.tripResponseStreamObserver.onNext(TripResponse.getDefaultInstance()); return; } // client has not yet reached destination long elapsedDuration = Duration.between(this.startTime, LocalTime.now()).getSeconds(); elapsedDuration = elapsedDuration < 1 ? 1 : elapsedDuration; double currentSpeed = (distanceTraveled * 1.0d) / elapsedDuration; int timeToReach = (int) (remainingDistance / currentSpeed); TripResponse tripResponse = TripResponse.newBuilder() .setRemainingDistance(remainingDistance) .setTimeToDestination(timeToReach) .build(); this.tripResponseStreamObserver.onNext(tripResponse); } @Override public void onError(Throwable throwable) { System.out.println("=========== Client onError ==========="); } @Override public void onCompleted() { System.out.println("=========== Client onCompleted ==========="); this.tripResponseStreamObserver.onCompleted(); System.out.println("Client reached safely"); } }
Việc triển khai ở trên là logic nghiệp vụ thực tế cần được thực thi bất cứ khi nào Client gửi vị trí hiện tại đến Server.
public class NavigationService extends NavigationServiceGrpc.NavigationServiceImplBase { @Override public StreamObserver<TripRequest> navigate(StreamObserver<TripResponse> responseObserver) { return new TripRequestObserver(responseObserver); } }
Sau khi implements xong, chúng ta cần start gRPC server để cung cấp dịch vụ cho Client.
public class GPSServer { public static void main(String[] args) throws IOException, InterruptedException { // build gRPC server Server server = ServerBuilder.forPort(6565) .addService(new NavigationService()) .build(); // start server.start(); // shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("GPS is shutting down!"); server.shutdown(); })); server.awaitTermination(); } }
Bây giờ thì gRPC server đã sẵn sàng. Chúng ta sẽ qua Client Side.
gRPC Bidirectional Streaming – Client Side
Protobuf đã gen code cho phía Client application. Trên phía Client chúng ta cần thực hiện các bước sau để gửi request và nhận lại response. Bước đầu tiên để thực hiện gửi request, chúng ta cần có một implementation của StreamObserver
.
- Cứ mỗi 3 giây di chuyển, Client sẽ gửi khoảng cách di chuyển được đến Server. Client tiếp tục làm điều này lặp đi lặp lại cho đến khi Client đến đích.
- Khi Client đến đích, chúng ta sẽ cho Server biết rằng cuộc gọi này có thể kết thúc.
public class TripResponseStreamObserver implements StreamObserver<TripResponse> { private StreamObserver<TripRequest> requestStreamObserver; @Override public void onNext(TripResponse tripResponse) { if(tripResponse.getRemainingDistance() > 0){ print(tripResponse); this.drive(); }else{ this.requestStreamObserver.onCompleted(); } } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { System.out.println("Trip Completed"); } public void startTrip(StreamObserver<TripRequest> requestStreamObserver){ this.requestStreamObserver = requestStreamObserver; this.drive(); } private void drive(){ Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); TripRequest tripRequest = TripRequest.newBuilder().setDistanceTravelled(ThreadLocalRandom.current().nextInt(1, 10)).build(); requestStreamObserver.onNext(tripRequest); } private void print(TripResponse tripResponse){ System.out.println(LocalTime.now() + ": Remaining Distance : " + tripResponse.getRemainingDistance()); System.out.println(LocalTime.now() + ": Time To Reach (sec): " + tripResponse.getTimeToDestination()); System.out.println("------------------------------"); } }
Tiếp theo là tạo kết nối và gửi request:
Tạo kênh (create channel): client apllication phải tạo một kênh kết nối với gRPC server.
Sử dụng Stub: Trong trường hợp này Client apllication sẽ sử dụng non-blocking stub
để thực hiện truyền tham số và gửi request.
Trong ví dụ này chúng ta sẽ tạo một class JUnit để hoạt động như một gRPC client. Hãy lưu ý rằng client application (hay gRPC client) có thể là bất cứ ngôn ngữ gì. Nó thậm chí có thể là một microservice khác.
public class BiDirectionalStreamingTest { private ManagedChannel channel; private NavigationServiceGrpc.NavigationServiceStub clientStub; @Before public void setup(){ this.channel = ManagedChannelBuilder.forAddress("localhost", 6565) .usePlaintext() .build(); this.clientStub = NavigationServiceGrpc.newStub(channel); } @Test public void tripTest() throws InterruptedException { TripResponseStreamObserver tripResponseStreamObserver = new TripResponseStreamObserver(); StreamObserver<TripRequest> requestStreamObserver = this.clientStub.navigate(tripResponseStreamObserver); tripResponseStreamObserver.startTrip(requestStreamObserver); } @After public void teardown(){ this.channel.shutdown(); } }
Server Output:
10:19:05.136686: Remaining Distance : 98
10:19:05.136806: Time To Reach (sec): 98
------------------------------
10:19:08.146316: Remaining Distance : 91
10:19:08.146579: Time To Reach (sec): 50
------------------------------
10:19:11.152504: Remaining Distance : 82
10:19:11.152759: Time To Reach (sec): 36 ... 10:19:59.244266: Remaining Distance : 8
10:19:59.244463: Time To Reach (sec): 4
------------------------------
10:20:02.249562: Remaining Distance : 7
10:20:02.249779: Time To Reach (sec): 4
------------------------------
Trip Completed
Client output:
=========== Client onCompleted ===========
Client reached safely
Tổng kết
Vậy là chúng ta vừa làm quen cách tạo một ứng dụng gRPC bidirectional Streaming
. Hi vọng bài viết hữu ích với mọi người.
Nguồn: https://thenewstack.wordpress.com/2021/11/24/grpc-grpc-bidirectional-streaming/
Follow me: thenewstack.wordpress.com