Skip to content

Dashboard

[gRPC] - gRPC Server Streaming

Created by Admin on Nov 29, 2021

Bài viết trước chúng ta vừa tìm hiểu về gRPC Client Stream, bài này chúng ta sẽ tìm hiểu về gRPC Server Streaming.

Một số bài viết cùng chủ đề có thể mọi người quan tâm:

Trong gRPC server streaming, Client sẽ gửi một request đến Server và nhận về nhiều response từ Server. Sau khi Server đã gửi tất cả các response cho Client, Server sẽ gửi thông báo đến Client nó đã hoàn thành nhiệm vụ.

Một case study ví dụ như chức năng đặt taxi trên ứng dụng Uber trong đó khách hàng yêu cầu đặt một taxi, sau đó Uber có thể sẽ gửi nhiều phản hồi về cho khách hàng thông tin tài xế là ai, tài xế ở đâu, tài xế sẽ mất bao lâu để đón khách hàng.

Sample application

Trong ví dụ này, chúng ta sẽ tạo một ứng dụng thương mại điện tử để khách hàng có thể đặt hàng. Sau khi máy chủ nhận được yêu cầu đặt hàng, tùy thuộc vào loại đơn hàng, nó sẽ thực hiện các bước nhất định và giao sản phẩm cho khách hàng.

Protobuf – Service Definition

Khi biết nghiệp vụ, request và response cần là gì, chúng ta sẽ định nghĩa service xử lý nghiệp vụ cho chúng. Phương thức placeOrder được implements ở phía Server nhận kiểu dữ liệu đầu vào và trả kiểu dữ liệu đầu ra theo như mong đợi. Chúng ta sử dụng từ khóa stream để chỉ ra rằng đó sẽ là server side streaming response để phản hồi nhiều response cho một request.

  • Để cho đơn giản, chúng ta sẽ tạo ra 2 loại danh mục mặt hàng là Electronics & E-Books.
  • Để giao mặt hàng Electronics sẽ cần đóng gói, vận chuyển, tốn thời gian hơn,..
  • Mặt hàng Ebooks sẽ được giao ngay..

File .proto định nghĩa service trong ví dụ này như sau:

syntax = "proto3";

package ecommerce;

option java_package = "example.ecommerce";
option java_multiple_files = true;

enum Category {
  ELECTRONICS = 0;
  EBOOKS = 1;
}

enum Status {
  PAYMENT_RECEIVED = 0;
  SHIPPED = 1;
  OUT_FOR_DELIVERY = 2;
  DELIVERED = 3;
}

message OrderRequest {
  Category orderCategory = 1;
}

message OrderResponse {
  Status orderStatus = 1;
}

service ECommerceService {
  // server stream
  rpc placeOrder(OrderRequest) returns (stream OrderResponse);
}

Khi chúng ta chạy lệnh maven dưới đây, maven sẽ tự động tạo code cho client application và server application bằng công cụ protoc.

mvn clean compile

Class ECommerceServiceImplBase là abstract class được tạo tự động khi gen code cần được phía Server implements. Tương tự ECommerceServiceStub là class mà phía client application sử dụng để gửi yêu cầu đến server.

gRPC Server Streaming – Server Side

Service Implementation: class này kế thừa abstract class ECommerceServiceImplBase để implement phương thức placeOrder (triển khai nghiệp vụ) và phản hồi lại cho request gọi phương thức placeOrder ở phía Client. Server sẽ nhận được yêu cầu đặt hàng. Phụ thuộc vào loại đơn hàng, Server sẽ gửi phản hồi thích hợp cho Client.

public class AmazonOnlineService extends ECommerceServiceGrpc.ECommerceServiceImplBase {

    Map<Category, Consumer<StreamObserver<OrderResponse>>> categoryHandler = new HashMap<>();

    public void initCategoryList(){
        categoryHandler.put(Category.ELECTRONICS, this::handleElectronics);
        categoryHandler.put(Category.EBOOKS, this::handleEBooks);
    }


    @Override
    public void placeOrder(OrderRequest request, StreamObserver<OrderResponse> responseObserver) {
        initCategoryList();
        categoryHandler.get(request.getOrderCategory()).accept(responseObserver);
        responseObserver.onCompleted();
    }

    private void handleElectronics(StreamObserver<OrderResponse> responseStreamObserver){
        Stream.of(PAYMENT_RECEIVED, SHIPPED, OUT_FOR_DELIVERY, DELIVERED)
                .map(OrderResponse.newBuilder()::setOrderStatus)
                .map(OrderResponse.Builder::build)
                .peek(i -> Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS))
                .forEach(responseStreamObserver::onNext);
    }

    private void handleEBooks(StreamObserver<OrderResponse> responseStreamObserver){
        Stream.of(PAYMENT_RECEIVED, DELIVERED)
                .map(OrderResponse.newBuilder()::setOrderStatus)
                .map(OrderResponse.Builder::build)
                .forEach(responseStreamObserver::onNext);
    }

}

Sau khi implements xong, chúng ta cần start gRPC server để cung cấp dịch vụ cho Client.

public class AmazonServer {

    public static void main(String[] args) throws IOException, InterruptedException {

        // build gRPC server
        Server server = ServerBuilder.forPort(6565)
                .addService(new AmazonOnlineService())
                .build();

        // start
        server.start();

        // shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Amazon server is shutting down!");
            server.shutdown();
        }));

        server.awaitTermination();

    }

}

Bây giờ thì gRPC server đã sẵn sàng. Chúng ta sẽ qua Client Side.

gRPC Server Streaming – Client Side

Trên phía Client chúng ta cần thực hiện các bước sau để gửi request và nhận lại response. Bước đầu tiên để thực hiện gửi request, chúng ta cần có một implementation của StreamObserver. Bởi vì chúng ta đang mong đợi nhiều phản hồi không đồng bộ từ Server. Chúng ta sẽ in trạng thái đơn hàng khi Client nhận được phản hồi từ máy chủ.

public class OrderResponseStreamObserver implements StreamObserver<OrderResponse> {

    @Override
    public void onNext(OrderResponse orderResponse) {
        System.out.println("=========== onNext ===========");
        System.out.println(LocalDateTime.now() + " : " + orderResponse.getOrderStatus());
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("=========== onError ===========");
        throwable.getMessage();
        throwable.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("=========== onCompleted ===========");
    }

}

Tiếp theo là tạo kết nối và gửi request:

  • Tạo kênh (create channel): client apllication phải tạo một kênh kết nối với gRPC server.
  • Stub: Client apllication sẽ sử dụng non-blocking stub để thực hiện truyền tham số và gửi request.

Trong ví dụ này chúng ta sẽ tạo một class JUnit để hoạt động như một gRPC client. Hãy lưu ý rằng client application (hay gRPC client) có thể là bất cứ ngôn ngữ gì. Nó thậm chí có thể là một microservice khác.

public class ServerStreamingTest {

    private ManagedChannel channel;
    private ECommerceServiceGrpc.ECommerceServiceStub clientStub;

    @Before
    public void setup(){
        this.channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext()
                .build();
        this.clientStub = ECommerceServiceGrpc.newStub(channel);
    }

    @Test
    public void bookStreamingTest() throws InterruptedException {
        OrderRequest ebook = OrderRequest.newBuilder().setOrderCategory(Category.EBOOKS).build();
        this.clientStub.placeOrder(ebook, new OrderResponseStreamObserver());


        // just for testing
        Thread.sleep(10000);
    }

    @Test
    public void electronicStreamingTest() throws InterruptedException {
        OrderRequest electronic = OrderRequest.newBuilder().setOrderCategory(Category.ELECTRONICS).build();
        this.clientStub.placeOrder(electronic, new OrderResponseStreamObserver());

        // just for testing
        Thread.sleep(10000);
    }

    @After
    public void teardown(){
        this.channel.shutdown();
    }

}

Khi chạy test tùy thuộc vào loại danh mục sản phẩm sẽ nhận được trạng thái khác nhau.

ebooks

2021-10-29T15:38:18.583667 : PAYMENT_RECEIVED
2021-10-29T15:38:18.584584 : DELIVERED

electronic

2021-10-29T15:32:49.542910 : PAYMENT_RECEIVED
2021-10-29T15:32:52.532445 : SHIPPED
2021-10-29T15:32:55.522148 : OUT_FOR_DELIVERY

Tổng kết

Vậy là chúng ta vừa làm quen cách tạo một ứng dụng gRPC Server Streaming. Hi vọng bài viết hữu ích với mọi người.

Nguồn: https://thenewstack.wordpress.com/2021/11/24/grpc-grpc-server-streaming/

Follow me: thenewstack.wordpress.com

Source: https://viblo.asia/p/grpc-grpc-server-streaming-WAyK8BENlxX