Bài viết trước chúng ta vừa tìm hiểu về gRPC Client Stream
, bài này chúng ta sẽ tìm hiểu về gRPC Server Streaming
.
Một số bài viết cùng chủ đề có thể mọi người quan tâm:
Trong gRPC server streaming
, Client sẽ gửi một request đến Server và nhận về nhiều response từ Server. Sau khi Server đã gửi tất cả các response cho Client, Server sẽ gửi thông báo đến Client nó đã hoàn thành nhiệm vụ.
Một case study ví dụ như chức năng đặt taxi trên ứng dụng Uber trong đó khách hàng yêu cầu đặt một taxi, sau đó Uber có thể sẽ gửi nhiều phản hồi về cho khách hàng thông tin tài xế là ai, tài xế ở đâu, tài xế sẽ mất bao lâu để đón khách hàng.
Sample application
Trong ví dụ này, chúng ta sẽ tạo một ứng dụng thương mại điện tử để khách hàng có thể đặt hàng. Sau khi máy chủ nhận được yêu cầu đặt hàng, tùy thuộc vào loại đơn hàng, nó sẽ thực hiện các bước nhất định và giao sản phẩm cho khách hàng.
Protobuf – Service Definition
Khi biết nghiệp vụ, request và response cần là gì, chúng ta sẽ định nghĩa service xử lý nghiệp vụ cho chúng. Phương thức placeOrder được implements ở phía Server nhận kiểu dữ liệu đầu vào và trả kiểu dữ liệu đầu ra theo như mong đợi. Chúng ta sử dụng từ khóa stream để chỉ ra rằng đó sẽ là server side streaming response
để phản hồi nhiều response cho một request.
- Để cho đơn giản, chúng ta sẽ tạo ra 2 loại danh mục mặt hàng là Electronics & E-Books.
- Để giao mặt hàng Electronics sẽ cần đóng gói, vận chuyển, tốn thời gian hơn,..
- Mặt hàng Ebooks sẽ được giao ngay..
File .proto
định nghĩa service trong ví dụ này như sau:
syntax = "proto3";
package ecommerce;
option java_package = "example.ecommerce";
option java_multiple_files = true;
enum Category {
ELECTRONICS = 0;
EBOOKS = 1;
}
enum Status {
PAYMENT_RECEIVED = 0;
SHIPPED = 1;
OUT_FOR_DELIVERY = 2;
DELIVERED = 3;
}
message OrderRequest {
Category orderCategory = 1;
}
message OrderResponse {
Status orderStatus = 1;
}
service ECommerceService {
// server stream
rpc placeOrder(OrderRequest) returns (stream OrderResponse);
}
Khi chúng ta chạy lệnh maven dưới đây, maven sẽ tự động tạo code cho client application và server application bằng công cụ protoc.
mvn clean compile
Class ECommerceServiceImplBase là abstract class được tạo tự động khi gen code cần được phía Server implements. Tương tự ECommerceServiceStub là class mà phía client application sử dụng để gửi yêu cầu đến server.
gRPC Server Streaming – Server Side
Service Implementation: class này kế thừa abstract class ECommerceServiceImplBase để implement phương thức placeOrder (triển khai nghiệp vụ) và phản hồi lại cho request gọi phương thức placeOrder ở phía Client. Server sẽ nhận được yêu cầu đặt hàng. Phụ thuộc vào loại đơn hàng, Server sẽ gửi phản hồi thích hợp cho Client.
public class AmazonOnlineService extends ECommerceServiceGrpc.ECommerceServiceImplBase {
Map<Category, Consumer<StreamObserver<OrderResponse>>> categoryHandler = new HashMap<>();
public void initCategoryList(){
categoryHandler.put(Category.ELECTRONICS, this::handleElectronics);
categoryHandler.put(Category.EBOOKS, this::handleEBooks);
}
@Override
public void placeOrder(OrderRequest request, StreamObserver<OrderResponse> responseObserver) {
initCategoryList();
categoryHandler.get(request.getOrderCategory()).accept(responseObserver);
responseObserver.onCompleted();
}
private void handleElectronics(StreamObserver<OrderResponse> responseStreamObserver){
Stream.of(PAYMENT_RECEIVED, SHIPPED, OUT_FOR_DELIVERY, DELIVERED)
.map(OrderResponse.newBuilder()::setOrderStatus)
.map(OrderResponse.Builder::build)
.peek(i -> Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS))
.forEach(responseStreamObserver::onNext);
}
private void handleEBooks(StreamObserver<OrderResponse> responseStreamObserver){
Stream.of(PAYMENT_RECEIVED, DELIVERED)
.map(OrderResponse.newBuilder()::setOrderStatus)
.map(OrderResponse.Builder::build)
.forEach(responseStreamObserver::onNext);
}
}
Sau khi implements xong, chúng ta cần start gRPC server để cung cấp dịch vụ cho Client.
public class AmazonServer {
public static void main(String[] args) throws IOException, InterruptedException {
// build gRPC server
Server server = ServerBuilder.forPort(6565)
.addService(new AmazonOnlineService())
.build();
// start
server.start();
// shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Amazon server is shutting down!");
server.shutdown();
}));
server.awaitTermination();
}
}
Bây giờ thì gRPC server đã sẵn sàng. Chúng ta sẽ qua Client Side.
gRPC Server Streaming – Client Side
Trên phía Client chúng ta cần thực hiện các bước sau để gửi request và nhận lại response. Bước đầu tiên để thực hiện gửi request, chúng ta cần có một implementation của StreamObserver
. Bởi vì chúng ta đang mong đợi nhiều phản hồi không đồng bộ từ Server. Chúng ta sẽ in trạng thái đơn hàng khi Client nhận được phản hồi từ máy chủ.
public class OrderResponseStreamObserver implements StreamObserver<OrderResponse> {
@Override
public void onNext(OrderResponse orderResponse) {
System.out.println("=========== onNext ===========");
System.out.println(LocalDateTime.now() + " : " + orderResponse.getOrderStatus());
}
@Override
public void onError(Throwable throwable) {
System.out.println("=========== onError ===========");
throwable.getMessage();
throwable.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("=========== onCompleted ===========");
}
}
Tiếp theo là tạo kết nối và gửi request:
- Tạo kênh (create channel): client apllication phải tạo một kênh kết nối với gRPC server.
- Stub: Client apllication sẽ sử dụng
non-blocking stub
để thực hiện truyền tham số và gửi request.
Trong ví dụ này chúng ta sẽ tạo một class JUnit để hoạt động như một gRPC client. Hãy lưu ý rằng client application (hay gRPC client) có thể là bất cứ ngôn ngữ gì. Nó thậm chí có thể là một microservice khác.
public class ServerStreamingTest {
private ManagedChannel channel;
private ECommerceServiceGrpc.ECommerceServiceStub clientStub;
@Before
public void setup(){
this.channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext()
.build();
this.clientStub = ECommerceServiceGrpc.newStub(channel);
}
@Test
public void bookStreamingTest() throws InterruptedException {
OrderRequest ebook = OrderRequest.newBuilder().setOrderCategory(Category.EBOOKS).build();
this.clientStub.placeOrder(ebook, new OrderResponseStreamObserver());
// just for testing
Thread.sleep(10000);
}
@Test
public void electronicStreamingTest() throws InterruptedException {
OrderRequest electronic = OrderRequest.newBuilder().setOrderCategory(Category.ELECTRONICS).build();
this.clientStub.placeOrder(electronic, new OrderResponseStreamObserver());
// just for testing
Thread.sleep(10000);
}
@After
public void teardown(){
this.channel.shutdown();
}
}
Khi chạy test tùy thuộc vào loại danh mục sản phẩm sẽ nhận được trạng thái khác nhau.
ebooks
2021-10-29T15:38:18.583667 : PAYMENT_RECEIVED
2021-10-29T15:38:18.584584 : DELIVERED
electronic
2021-10-29T15:32:49.542910 : PAYMENT_RECEIVED
2021-10-29T15:32:52.532445 : SHIPPED
2021-10-29T15:32:55.522148 : OUT_FOR_DELIVERY
Tổng kết
Vậy là chúng ta vừa làm quen cách tạo một ứng dụng gRPC Server Streaming. Hi vọng bài viết hữu ích với mọi người.
Nguồn: https://thenewstack.wordpress.com/2021/11/24/grpc-grpc-server-streaming/
Follow me: thenewstack.wordpress.com