ScalaPB(3): gRPC streaming详解编程语言

  接着上期讨论的gRPC unary服务我们跟着介绍gRPC streaming,包括: Server-Streaming, Client-Streaming及Bidirectional-Streaming。我们首先在.proto文件里用IDL描述Server-Streaming服务:

/* 
 *  responding stream of increment results 
 */ 
service SumOneToMany { 
  rpc AddOneToMany(SumRequest) returns (stream SumResponse) {} 
} 
 
message SumRequest { 
  int32 toAdd = 1; 
} 
 
message SumResponse { 
  int32 currentResult = 1; 
}

SumOneToMany服务中AddOneToMany函数接受一个SumRequest然后返回stream SumResponse,就这么简单。经过编译后产生了SumOneToManyGrpc.scala文件,在这个文件里提供了有关RPC操作的api。我们看看protoc把IDL描述的服务函数变成了什么样的scala函数:

def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit 

调用scala函数addOneToMany需要传入参数SumRequest和StreamObserver[SumResponse],也就是说用户需要准备这两个入参数。在调用addOneToMany函数时用户事先构建这个StreamObserver传给server,由server把结果通过这个结构传回用户。gRPC是通过StreamObserver类型实例来实现数据streaming的。这个类型的构建例子如下:

 

    val responseObserver = new StreamObserver[SumResponse] { 
      def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") 
      def onCompleted(): Unit         = println("ON_COMPLETED") 
      def onNext(value: SumResponse): Unit = 
        println(s"ON_NEXT: Current sum: ${value.currentResult}") 
    }

server端通过onNext把结果不断传回给client端,因为这个responseObserver是在client端构建的。下面是SumManyToMany的实现:

 class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany { 
    override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = { 
      val currentSum: AtomicInt = Atomic(0) 
      (1 to request.toAdd).map { _ => 
          responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet())) 
      } 
      Thread.sleep(1000)     //delay and then finish 
      responseObserver.onCompleted() 
    } 
  }

这个addOneToMany服务函数把 1-request.toAdd之间的数字逐个通过responseObserver返还调用方。 在客户端如下调用服务:

    // get asyn stub 
    val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel) 
// prepare stream observer 
    val streamObserver = new StreamObserver[SumResponse] { 
      override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") 
      override def onCompleted(): Unit = println("Done incrementing !!!") 
      override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}") 
    } 
// call service with stream observer 
    client.addOneToMany(SumRequest().withToAdd(6),streamObserver)

Client-Streaming服务的IDL如下:

/* 
 *  responding a result from a request of stream of numbers 
 */ 
service SumManyToOne { 
  rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {} 
}

传入stream SumRequest, 返回SumResponse。scalaPB自动产生scala代码中的addManyToOne函数款式如下:

def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]

调用方提供StreamObserver[SumResponse]用作返回结果,函数返回客方需要的StreamObserver[SumRequest]用以传递request流。注意:虽然在.proto文件中AddManyToOne的返回结果是单个SumResponse,但产生的scala函数则提供了一个StreamObserver[SumResponse]类型,所以需要谨记只能调用一次onNext。下面是这个服务的实现代码:

  class Many2OneService extends SumManyToOneGrpc.SumManyToOne { 
     val currentSum: AtomicInt = Atomic(0) 
     override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] = 
       new StreamObserver[SumRequest] { 
         val currentSum: AtomicInt = Atomic(0) 
         override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") 
         override def onCompleted(): Unit = println("Done summing!") 
         override def onNext(value: SumRequest): Unit = { 
           //only allow one response 
           if (value.toAdd > 0) 
              currentSum.add(value.toAdd) 
           else 
              responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) 
         } 
       } 
   }

客户方调用示范如下:

    //pass to server for result 
    val respStreamObserver = new StreamObserver[SumResponse] { 
      override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") 
      override def onCompleted(): Unit = println("Done responding!") 
      override def onNext(value: SumResponse): Unit = 
        println(s"Result: ${value.currentResult}") 
    } 
    //get async stub 
    val client = SumManyToOneGrpc.stub(channel) 
 
    //get request stream observer from server 
    val reqStreamObserver = client.addManyToOne(respStreamObserver) 
 
    List(2,5,8,4,0).map { n => 
      reqStreamObserver.onNext(SumRequest(n)) 
    }

Bidirectional-Streaming的IDL描述如下:

/* 
 * Sums up numbers received from the client and returns the current result after each received request. 
 */ 
service SumInter { 
  rpc AddInter(stream SumRequest) returns (stream SumResponse) {} 
}

这个service SumInter 描述了stream SumRequest 及 stream SumResponse运算模式。产生的对应scala函数如下:

def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] 

这个函数的款式与Client-Streaming服务函数是一样的。但是,我们可以通过responseObserver传递多个SumResponse。这个服务的实现代码是这样的: 

  class Many2ManyService extends SumInterGrpc.SumInter { 
    override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] = 
      new StreamObserver[SumRequest] { 
        val currentSum: AtomicInt = Atomic(0) 
        override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") 
        override def onCompleted(): Unit = println("Done requesting!") 
        override def onNext(value: SumRequest): Unit = { 
          responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) 
        } 
      } 
  }

我们可以多次调用responseObserver.onNext。客户端源代码如下:

    //create stream observer for result stream 
    val responseObserver = new StreamObserver[SumResponse] { 
      def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") 
      def onCompleted(): Unit         = println("ON_COMPLETED") 
      def onNext(value: SumResponse): Unit = 
        println(s"ON_NEXT: Current sum: ${value.currentResult}") 
    } 
    //get request container 
    val requestObserver = client.addInter(responseObserver) 
 
    scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { 
      val toBeAdded = Random.nextInt(11) 
      println(s"Adding number: $toBeAdded") 
      requestObserver.onNext(SumRequest(toBeAdded)) 
    }

下面是本次示范的源代码:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") 
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

build.sbt

import scalapb.compiler.Version.scalapbVersion 
import scalapb.compiler.Version.grpcJavaVersion 
 
name := "learn-gRPC" 
 
version := "0.1" 
 
scalaVersion := "2.12.6" 
 
libraryDependencies ++= Seq( 
  "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", 
  "io.grpc" % "grpc-netty" % grpcJavaVersion, 
  "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, 
  "io.monix" %% "monix" % "2.3.0" 
) 
 
PB.targets in Compile := Seq( 
  scalapb.gen() -> (sourceManaged in Compile).value 
)

src/main/protobuf/sum.proto

syntax = "proto3"; 
 
package learn.grpc.services; 
 
/* 
 *  responding stream of increment results 
 */ 
service SumOneToMany { 
  rpc AddOneToMany(SumRequest) returns (stream SumResponse) {} 
} 
 
/* 
 *  responding a result from a request of stream of numbers 
 */ 
service SumManyToOne { 
  rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {} 
} 
 
/* 
 * Sums up numbers received from the client and returns the current result after each received request. 
 */ 
service SumInter { 
  rpc AddInter(stream SumRequest) returns (stream SumResponse) {} 
} 
 
message SumRequest { 
  int32 toAdd = 1; 
} 
 
message SumResponse { 
  int32 currentResult = 1; 
}

gRPCServer.scala

package learn.grpc.server 
import io.grpc.{ServerBuilder,ServerServiceDefinition} 
 
trait gRPCServer { 
  def runServer(service: ServerServiceDefinition): Unit = { 
    val server = ServerBuilder 
      .forPort(50051) 
      .addService(service) 
      .build 
      .start 
 
    // make sure our server is stopped when jvm is shut down 
    Runtime.getRuntime.addShutdownHook(new Thread() { 
      override def run(): Unit = server.shutdown() 
    }) 
 
    server.awaitTermination() 
  } 
 
}

OneToManyServer.scala

package learn.grpc.sum.one2many.server 
import io.grpc.stub.StreamObserver 
import learn.grpc.services.sum._ 
import monix.execution.atomic.{Atomic,AtomicInt} 
import learn.grpc.server.gRPCServer 
 
object One2ManyServer extends gRPCServer { 
 
  class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany { 
    override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = { 
      val currentSum: AtomicInt = Atomic(0) 
      (1 to request.toAdd).map { _ => 
          responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet())) 
      } 
      Thread.sleep(1000)     //delay and then finish 
      responseObserver.onCompleted() 
    } 
  } 
 
  def main(args: Array[String]) = { 
    val svc = SumOneToManyGrpc.bindService(new SumOne2ManyService, scala.concurrent.ExecutionContext.global) 
    runServer(svc) 
  } 
 
}

OneToManyClient.scala

package learn.grpc.sum.one2many.client 
import io.grpc.stub.StreamObserver 
import learn.grpc.services.sum._ 
 
object One2ManyClient { 
  def main(args: Array[String]): Unit = { 
 
    //build connection channel 
    val channel = io.grpc.ManagedChannelBuilder 
      .forAddress("LocalHost",50051) 
      .usePlaintext(true) 
      .build() 
 
    // get asyn stub 
    val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel) 
// prepare stream observer 
    val streamObserver = new StreamObserver[SumResponse] { 
      override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") 
      override def onCompleted(): Unit = println("Done incrementing !!!") 
      override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}") 
    } 
// call service with stream observer 
    client.addOneToMany(SumRequest().withToAdd(6),streamObserver) 
 
    // wait for async execution 
    scala.io.StdIn.readLine() 
  } 
}

ManyToOneServer.scala

package learn.grpc.sum.many2one.server 
import io.grpc.stub.StreamObserver 
import learn.grpc.services.sum._ 
import learn.grpc.server.gRPCServer 
import monix.execution.atomic.{Atomic,AtomicInt} 
 
object Many2OneServer extends gRPCServer { 
   class Many2OneService extends SumManyToOneGrpc.SumManyToOne { 
     val currentSum: AtomicInt = Atomic(0) 
     override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] = 
       new StreamObserver[SumRequest] { 
         val currentSum: AtomicInt = Atomic(0) 
         override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") 
         override def onCompleted(): Unit = println("Done summing!") 
         override def onNext(value: SumRequest): Unit = { 
           //only allow one response 
           if (value.toAdd > 0) 
              currentSum.add(value.toAdd) 
           else 
              responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) 
         } 
       } 
   } 
 
   def main(args: Array[String]): Unit = { 
     val svc = SumManyToOneGrpc.bindService(new Many2OneService,scala.concurrent.ExecutionContext.global) 
     runServer(svc) 
   } 
}

ManyToOneClient.scala

package learn.grpc.sum.many2one.client 
import io.grpc.stub.StreamObserver 
import learn.grpc.services.sum._ 
 
object Many2OneClient { 
  def main(args: Array[String]): Unit = { 
    //build channel 
    val channel = io.grpc.ManagedChannelBuilder 
      .forAddress("LocalHost",50051) 
      .usePlaintext(true) 
      .build() 
    //pass to server for result 
    val respStreamObserver = new StreamObserver[SumResponse] { 
      override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") 
      override def onCompleted(): Unit = println("Done responding!") 
      override def onNext(value: SumResponse): Unit = 
        println(s"Result: ${value.currentResult}") 
    } 
    //get async stub 
    val client = SumManyToOneGrpc.stub(channel) 
 
    //get request stream observer from server 
    val reqStreamObserver = client.addManyToOne(respStreamObserver) 
 
    List(2,5,8,4,0).map { n => 
      reqStreamObserver.onNext(SumRequest(n)) 
    } 
    scala.io.StdIn.readLine() 
  } 
}

ManyToManyServer.scala 

package learn.grpc.sum.many2many.server 
import io.grpc.stub.StreamObserver 
import learn.grpc.services.sum._ 
import learn.grpc.server.gRPCServer 
import monix.execution.atomic.{Atomic,AtomicInt} 
object Many2ManyServer extends gRPCServer { 
  class Many2ManyService extends SumInterGrpc.SumInter { 
    override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] = 
      new StreamObserver[SumRequest] { 
        val currentSum: AtomicInt = Atomic(0) 
 
        override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") 
 
        override def onCompleted(): Unit = println("Done requesting!") 
 
        override def onNext(value: SumRequest): Unit = { 
          responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) 
        } 
      } 
  } 
  def main(args: Array[String]): Unit = { 
    val svc = SumInterGrpc.bindService(new Many2ManyService, scala.concurrent.ExecutionContext.global) 
    runServer(svc) 
  } 
 
}

ManyToManyClient.scala

package learn.grpc.sum.many2many.client 
import monix.execution.Scheduler.{global => scheduler} 
import learn.grpc.services.sum._ 
 
import scala.concurrent.duration._ 
import scala.util.Random 
import io.grpc._ 
import io.grpc.stub.StreamObserver 
 
object Many2ManyClient { 
  def main(args: Array[String]): Unit = { 
    val channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext(true).build 
    val client  = SumInterGrpc.stub(channel) 
    //create stream observer for result stream 
    val responseObserver = new StreamObserver[SumResponse] { 
      def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") 
      def onCompleted(): Unit         = println("ON_COMPLETED") 
      def onNext(value: SumResponse): Unit = 
        println(s"ON_NEXT: Current sum: ${value.currentResult}") 
    } 
    //get request container 
    val requestObserver = client.addInter(responseObserver) 
 
    scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { 
      val toBeAdded = Random.nextInt(11) 
      println(s"Adding number: $toBeAdded") 
      requestObserver.onNext(SumRequest(toBeAdded)) 
    } 
 
    scala.io.StdIn.readLine() 
  } 
 
}

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12800.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论