一、前言
grpc 是一个由 google 推出的、高性能、开源、通用的 rpc 框架。它是基于 HTTP2 协议标准设计开发,默认采用 Protocol Buffers 数据序列化协议,支持多种开发语言。
一般业务场景下,我们都是使用grpc的simple-rpc模式,也就是每次客户端发起请求,服务端会返回一个响应结果的模式。
image.png
但是grpc除了这种一来一往的请求模式外,还有流式模式,下面我们一一道来。
二 grpc服务端流
服务端流模式是说客户端发起一次请求后,服务端在接受到请求后,可以以流的方式,使用同一连接,不断的向客户端写回响应结果,客户端则可以源源不断的接受到服务端写回的数据。
image.png
下面我们通过简单例子,来说明如何使用,服务端端流。
要实现服务端流,需要把grpc方法定义如下:
message Metric {
google.protobuf.Timestamp timestamp = 1;
int64 metric = 2;
}
message Average {
double val = 1;
}
service MetricsService {
rpc collectServerStream (Metric) returns (stream Average);
}
如上rpc方法的返回值类型前添加stream标识 是服务端流,然后服务端实现代码如下:
public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {
public StreamObserver<StreamingExample.Average> responseObserverT;
/**
* 服务端流
*
* @param request
* @param responseObserver
*/
@Override
public void collectServerStream(com.example.server.streaming.StreamingExample.Metric request,
io.grpc.stub.StreamObserver<com.example.server.streaming.StreamingExample.Average> responseObserver) {
//保存流式响应对象
this.responseObserverT = responseObserver;
}
最后启动服务,并当流式对象不为null时候,写回数据到客户端:
public class MetricsServerServerStream {
public static void main(String[] args) throws InterruptedException, IOException {
//启动服务
MetricsServiceImpl metricsService = new MetricsServiceImpl();
Server server = ServerBuilder.forPort(8080).addService(metricsService).build();
server.start();
//获取steam响应对象,不断的向客户端写回数据
new Thread(new Runnable() {
@Override
public void run() {
for (; ; ) {
if (null != metricsService.responseObserverT) {
metricsService.responseObserverT.onNext(StreamingExample.Average.newBuilder()
.setVal(new Random(1000).nextDouble())
.build());
System.out.println("send to client");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
server.awaitTermination();
}
}
下面我们看客户端代码,客户端代码如下:
public class MetricsClientServerStream {
public static void main(String[] args) throws InterruptedException {
//获取客户端桩对象
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();
MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);
//发起rpc请求,设置StreamObserver用于监听服务器返回结果
stub.collectServerStream(StreamingExample.Metric.newBuilder().setMetric(1L).build(), new StreamObserver<StreamingExample.Average>() {
@Override
public void onNext(StreamingExample.Average value) {
System.out.println(Thread.currentThread().getName() + "Average: " + value.getVal());
}
@Override
public void onError(Throwable t) {
System.out.println("error:" + t.getLocalizedMessage());
}
@Override
public void onCompleted() {
System.out.println("onCompleted:");
}
});
}
如上启动客户端后,可以看到StreamObserver的onNext方法会源源不断的接受到服务端返回的数据。
服务端流使用场景:
- 客户端请求一次,但是需要服务端源源不断的返回大量数据时候,比如大批量数据查询的场景。
- 比如客户端订阅服务端的一个服务数据,服务端发现有新数据时,源源不断的吧数据推送给客户端。
三 grpc客户端流
客户端流模式是说客户端发起请求与服务端建立链接后,可以使用同一连接,不断的向服务端传送数据,等客户端把全部数据都传送完毕后,服务端才返回一个请求结果。
image.png
下面我们通过简单例子,来说明如何使用,客户端流。
要实现服务端流,需要把grpc方法定义如下:
service MetricsService {
rpc collectClientStream (stream Metric) returns (Average);
}
如上rpc方法的入参类型前添加stream标识 是服务端流,然后服务端实现代码如下:
public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {
/**
* 客户端流
*
* @param responseObserver
* @return
*/
@Override
public StreamObserver<StreamingExample.Metric> collectClientStream(StreamObserver<StreamingExample.Average> responseObserver) {
return new StreamObserver<StreamingExample.Metric>() {
private long sum = 0;
private long count = 0;
@Override
public void onNext(StreamingExample.Metric value) {
System.out.println("value: " + value);
sum += value.getMetric();
count++;
}
@Override
public void onError(Throwable t) {
System.out.println("severError:" + t.getLocalizedMessage());
responseObserver.onError(t);
}
@Override
public void onCompleted() {
responseObserver.onNext(StreamingExample.Average.newBuilder()
.setVal(sum / count)
.build());
System.out.println("serverComplete: ");
}
};
}
如上代码,服务端使用流式对象的onNext方法不断接受客户端发来的数据,然后等客户端发送结束后,使用onCompleted方法,把响应结果写回客户端。
下面我们看客户端代码,客户端代码如下:
public class MetricsClient2 {
public static void main(String[] args) throws InterruptedException {
//1.创建客户端桩
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();
MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);
//2.发起请求,并设置结果回调监听
StreamObserver<StreamingExample.Metric> collect = stub.collectClientStream(new StreamObserver<StreamingExample.Average>() {
@Override
public void onNext(StreamingExample.Average value) {
System.out.println(Thread.currentThread().getName() + "Average: " + value.getVal());
}
@Override
public void onError(Throwable t) {
System.out.println("error:" + t.getLocalizedMessage());
}
@Override
public void onCompleted() {
System.out.println("onCompleted:");
}
});
//3.使用同一个链接,不断向服务端传送数据
Stream.of(1L, 2L, 3L, 4L,5L).map(l -> StreamingExample.Metric.newBuilder().setMetric(l).build())
.forEach(metric -> {
collect.onNext(metric);
System.out.println(metric);
});
Thread.sleep(3000);
collect.onCompleted();
channel.shutdown().awaitTermination(50, TimeUnit.SECONDS);
}
}
如上启动客户端后,可以看到代码3会把数据1,2,3,4,5通过同一个链接发送到服务端,然后等服务端结束完毕数据后,会计算接受到的数据的平均值,然后把平均值写回客户端。然后代码2设置的监听器的onNext方法就会被回调,然后打印出服务端返回的平均值3。
客户端流使用场景:
- 比如数据批量计算场景:如果只用simple rpc的话,服务端就要一次性收到大量数据,并且在收到全部数据之后才能对数据进行计算处理。如果用客户端流 rpc的话,服务端可以在收到一些记录之后就开始处理,也更有实时性。
四 grpc双向流
双向流意味着客户端向服务端发起请求后,客户端可以源源不断向服务端写入数据的同时,服务端可以源源不断向客户端写入数据。
image.png
下面我们通过简单例子,来说明如何使用双向流。
要实现双向流,需要把grpc方法定义如下:
service MetricsService {
rpc collectTwoWayStream (stream Metric) returns (stream Average);
}
如上rpc方法的入参类型前添加stream标识 是客户端流,然后服务端实现代码如下:
public class MetricsServiceImpl extends MetricsServiceGrpc.MetricsServiceImplBase {
public StreamObserver<StreamingExample.Average> responseObserverT;
/**
* 双向流
*
* @param responseObserver
* @return
*/
@Override
public StreamObserver<StreamingExample.Metric> collectTwoWayStream(StreamObserver<StreamingExample.Average> responseObserver) {
this.responseObserverT = responseObserver;
return new StreamObserver<StreamingExample.Metric>() {
private long sum = 0;
private long count = 0;
@Override
public void onNext(StreamingExample.Metric value) {
System.out.println("value: " + value);
sum += value.getMetric();
count++;
}
@Override
public void onError(Throwable t) {
System.out.println("severError:" + t.getLocalizedMessage());
responseObserver.onError(t);
}
@Override
public void onCompleted() {
responseObserver.onNext(StreamingExample.Average.newBuilder()
.setVal(sum / count)
.build());
System.out.println("serverComplete: ");
}
};
}
如上代码,服务端使用流式对象的onNext方法不断接受客户端发来的数据,然后等客户端发送结束后,使用onCompleted方法,把响应结果写回客户端。并且服务端保存了流式对象responseObserverT用来不断的写数据到客户端
双向流使用场景:
- 需要双向数据交互的场景,比如聊天机器人,游戏室等。
五 StreamObserver转换为反应式框架流
StreamObserver是grpc自己定义的一个流式接口,其定义如下:
public interface StreamObserver<V> {
void onNext(V var1);
void onError(Throwable var1);
void onCompleted();
}
grpc虽然提供了流式接口,但是其并没有提供便捷的流操作符,而我们知道Reactor或者Rxjava这些反应式编程框架,本身是提供了丰富便捷的流操作符的。所以我们想看看如何把StreamObserver转换为反应式框架流,由于Reactor是spring5自带的,所以我们看看如何把StreamObserver转换为Reactor的Flux流对象。
转换代码如下:
public class StreamObserverPublisher implements Publisher<StreamingExample.Average>, StreamObserver<StreamingExample.Average> {
private Subscriber<? super StreamingExample.Average> subscriber;
@Override
public void onNext(StreamingExample.Average l) {
subscriber.onNext(l);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onCompleted() {
subscriber.onComplete();
}
@Override
public void subscribe(Subscriber<? super StreamingExample.Average> subscriber) {
this.subscriber = subscriber;
this.subscriber.onSubscribe(new BaseSubscriber() {
});
}
}
public class MetricsClientTwoWay {
public static void main(String[] args) throws InterruptedException {
//创建客户端桩
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();
MetricsServiceGrpc.MetricsServiceStub stub = MetricsServiceGrpc.newStub(channel);
//转换StreamObserver流为Flux流
StreamObserverPublisher streamObserverPublisher = new StreamObserverPublisher();
Flux<StreamingExample.Average> flux = Flux.from(streamObserverPublisher);
//订阅流,缓存,并消费
flux.buffer(4).subscribe(o -> System.out.println("ele:" + o.size())); // must be done before executing the gRPC request
//发起rpc请求
StreamObserver<StreamingExample.Metric> collect = stub.collectTwoWayStream(streamObserverPublisher);
}
六 总结
grpc除了提供了simple-rpc还提供了双向流操作,大家可以结合自己的业务场景,选择性使用。另外为了使用反应式框架丰富的流操作符,我们可以便捷的把StreamObserver流转换为Flux流。
作者:加多,资深Java , 著《Java并发编程之美》 ,《深度剖析Apache Dubbo核心技术内幕》,《Java异步编程实战》,公众号:技术原始积累
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/175014.html