ScalaPB(5):用akka-stream实现reactive-gRPC详解编程语言

  在前面几篇讨论里我们介绍了scala-gRPC的基本功能和使用方法,我们基本确定了选择gRPC作为一种有效的内部系统集成工具,主要因为下面gRPC支持的几种服务模式:

1、Unary-Call:独立的一对client-request/server-response,是我们常用的http交互模式 
 
2、Server-Streaming:client发出一个request后从server端接收一串多个response 
 
3、Client-Streaming:client向server发送一串多个request后从server接收一个response 
 
4、Bidirectional-Streaming:由client首先发送request启动连接,然后在这个连接上两端可以不断交互信息。

很明显,gRPC支持双向的streaming。那么如果能把gRPC中ListenableFuture和StreamObserver这两种类型转成akka-stream的基本类型应该就能够实现所谓的reactive-gRPC了。如果我们能用akka-stream编程方式实现gRPC服务调用的话,可能会遭遇下面的场景:在服务端我们只需要实现一种akka-stream的Flow把进来的request转化成出去的response,如下:

// Unary case 
Flow[Request].map(computeResponse) 
  
// Server streaming 
Flow[Request].flatMapConcat(computeResponses) 
  
// Client streaming 
Flow[Request].fold(defaultResponse)(computeResponse) 
  
// Bidirectional streaming 
Flow[Request].flatMapConcat(computeResponses)

当然,这是个akka-stream Flow,我们可以在这个Flow里调用任何akka-stream提供的功能,如:

Flow[Request] 
  .throttle(1, 10.millis, 1, ThrottleMode.Shaping) 
  .map(computeResponse)

在客户端我们可以直接经客户端stub调用Flow,如下:

Source 
  .single(request) 
  .via(stub.doSomething) 
  .runForeach(println)

刚好,beyond-the-lines gRPCAkkaStream开源项目提供这么一种gRPC StreamObserver到aka-stream Flow转换桥梁。下面是gRPCAkkaStream的使用示范。先从Unary-Call开始:下面是.proto文件的IDL服务描述:

syntax = "proto3"; 
package learn.grpc.akka.stream.services; 
message NumPair { 
   int32 num1 = 1; 
   int32 num2 = 2; 
} 
message Num { 
   int32 num = 1; 
} 
message SumResult { 
   int32 result = 1; 
} 
service SumNumbers { 
   rpc SumPair(NumPair) returns (SumResult) {} 
}

我们看看编译后自动产生的SumGrpcAkkaStream.scala文件中一些相关类型和函数:

服务界面描述:

trait SumNumbers extends AbstractService { 
    override def serviceCompanion = SumNumbers 
    def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] 
}

我们看到服务函数sumPair是一个akka-stream Fow[NumPair,SumResult,NotUsed]。下面是具体实现SumNumbers.sumPair代码:

class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers { 
  val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName) 
    override def sumPair: Flow[NumPair, SumResult, NotUsed] = { 
      logger.info(s"*** calling sumPair ...  ***") 
      Flow[NumPair].map { 
        case NumPair(a,b) => { 
          logger.info(s"serving ${a} + ${b} = ???") 
          SumResult(a + b) 
        } 
      } 
    }

产生的客户端stub源代码如下:

 class SumNumbersStub( 
    channel: Channel, 
    options: CallOptions = CallOptions.DEFAULT 
  ) extends AbstractStub[SumNumbersStub](channel, options) with SumNumbers { 
    override def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] = 
      Flow[learn.grpc.akka.stream.services.sum.NumPair].flatMapConcat(request => 
        Source.fromFuture( 
          Grpc.guavaFuture2ScalaFuture( 
            ClientCalls.futureUnaryCall(channel.newCall(METHOD_SUM_PAIR, options), request) 
          ) 
        ) 
      ) 
   
 def stub(channel: Channel): SumNumbersStub = new SumNumbersStub(channel)

我们可以通过stub来调用sumPair方法,如下:

  val channel = ManagedChannelBuilder 
      .forAddress(host,port) 
      .usePlaintext(true) 
      .build() 
 
  val stub = SumGrpcAkkaStream.stub(channel) 
 
  def addPair(num1: Int, num2: Int): Source[String,NotUsed] = { 
    logger.info(s"Requesting to add $num1, $num2") 
    Source 
      .single(NumPair(num1,num2)) 
      .via(stub.sumPair) 
      .map(r => s"the result: ${r.result}") 
  }

下面是Unary-Call的具体调用方式:

object UnaryCallClient extends App { 
  implicit val system = ActorSystem("UnaryClient") 
  implicit val mat = ActorMaterializer.create(system) 
 
  val client = new gRPCAkkaStreamClient("localhost", 50051) 
 
  client.addPair(29,33).runForeach(println) 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
}

在Server-Streaming中一个request返回的是stream of responses。IDL的描述如下:

service SumNumbers { 
   rpc SumPair(NumPair) returns (SumResult) {} 
   rpc GenIncsFrom(Num) returns (stream Num) {} 
}

编译后自动产生的service trait如下:

 trait SumNumbers extends AbstractService { 
    override def serviceCompanion = SumNumbers 
    def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] 
    def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed] 
  }

这个服务函数genIncsFrom是Flow[Num,Num,NotUsed],它的具体实现如下:

class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers { 
  val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName) 
  override def genIncsFrom: Flow[Num, Num, NotUsed] = { 
    logger.info("*** calling genIncsFrom") 
    Flow[Num].mapConcat { 
      n => (1 to n.num).map {m => 
        logger.info(s"genIncFrom producing num: ${m}") 
        Num(m) 
      } 
    } 
  } 
}

因为输出response是一个stream,可以用mapConcat展平Seq来产生一个。在客户方调用服务函数genIncsFrom的方式如下:

  def genIncNumbers(len: Int): Source[Int,NotUsed] = { 
    logger.info(s"Requesting to produce ${len} inc numbers") 
    Source 
      .single(Num(len)) 
      .via(stub.genIncsFrom) 
      .map(n => n.num) 
  }

我们还是用runForeach来运算这个Source:

object ServerStreamingClient extends App { 
  implicit val system = ActorSystem("ServerStreamingClient") 
  implicit val mat = ActorMaterializer.create(system) 
 
  val client = new gRPCAkkaStreamClient("localhost", 50051) 
 
  client.genIncNumbers(5).runForeach(println) 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
}

再来看看Client-Streaming是如何通过reactive-stream实现的。IDL服务描述如下:

service SumNumbers { 
   rpc SumPair(NumPair) returns (SumResult) {} 
   rpc GenIncsFrom(Num) returns (stream Num) {} 
   rpc SumStreamNums(stream Num) returns (SumResult) {} 
}

自动产生的service接口如下:

  trait SumNumbers extends AbstractService { 
    override def serviceCompanion = SumNumbers 
    def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] 
    def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed] 
    def sumStreamNums: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] 
  }

sumStreamNums Flow实现如下:

  override def sumStreamNums: Flow[Num, SumResult, NotUsed] = { 
    logger.info("*** calling sumStreamNums") 
    Flow[Num].fold(SumResult(0)) { 
      case (a, b) => 
        logger.info(s"receiving operand ${b.num}") 
        SumResult(b.num + a.result) 
    } 
  }

request是一个stream,可以用aggregation来汇总成一个response。在客户端调用stub.sumStreamNums:

  def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = { 
    logger.info(s"Requesting to sum up ${nums}") 
    Source(nums.map(Num(_)).to[collection.immutable.Iterable]) 
      .via(stub.sumStreamNums) 
      .map(r => s"the result: ${r.result}") 
    } 
 
object ClientStreamingClient extends App { 
  implicit val system = ActorSystem("ClientStreamingClient") 
  implicit val mat = ActorMaterializer.create(system) 
 
  val client = new gRPCAkkaStreamClient("localhost", 50051) 
 
  client.sumManyNumbers(Seq(12,4,8,19)).runForeach(println) 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
}

最后我们示范一下BiDirectional-Streaming。先用IDL定义一个流输入输出的服务函数keepAdding:

service SumNumbers { 
   rpc SumPair(NumPair) returns (SumResult) {} 
   rpc GenIncsFrom(Num) returns (stream Num) {} 
   rpc SumStreamNums(stream Num) returns (SumResult) {} 
   rpc KeepAdding(stream Num) returns (stream SumResult) {} 
}

这个函数的实现代码: 

  override def keepAdding: Flow[Num, SumResult, NotUsed] = { 
    Flow[Num].scan(SumResult(0)) { 
      case (a,b) => 
        logger.info(s"receiving operand ${b.num}") 
        SumResult(b.num + a.result) 
    } 
  }

这个服务函数的作用是把一串输入数字逐个相加并输出当前结果。我们可以用scan来实现这样的功能。下面是客户端调用服务的示范代码:

  def ContSum(nums: Seq[Int]): Source[String,NotUsed] = { 
    logger.info(s"Requesting to sum up ${nums}") 
    Source(nums.map(Num(_)).to[collection.immutable.Iterable]) 
      .throttle(1, 500.millis, 1, ThrottleMode.shaping) 
      .map { n => 
        logger.info(s"Sending number: $n") 
        n 
      } 
      .via(stub.keepAdding) 
      .map(r => s"current sum = ${r.result}") 
  }

用下面这段代码运算:

object BiDiStreamingClient extends App { 
  implicit val system = ActorSystem("BiDiStreamingClient") 
  implicit val mat = ActorMaterializer.create(system) 
 
  val client = new gRPCAkkaStreamClient("localhost", 50051) 
 
  client.ContSum(Seq(12,4,8,19)).runForeach(println) 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
}

好,下面是本次讨论涉及的所有源代码:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") 
 
resolvers += Resolver.bintrayRepo("beyondthelines", "maven") 
 
libraryDependencies ++= Seq( 
  "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1", 
  "beyondthelines"         %% "grpcakkastreamgenerator" % "0.0.5" 
)

build.sbt

import scalapb.compiler.Version.scalapbVersion 
import scalapb.compiler.Version.grpcJavaVersion 
 
name := "gRPCAkkaStreamDemo" 
 
version := "0.1" 
 
scalaVersion := "2.12.6" 
 
resolvers += Resolver.bintrayRepo("beyondthelines", "maven") 
 
libraryDependencies ++= Seq( 
  "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", 
  "io.grpc" % "grpc-netty" % grpcJavaVersion, 
  "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, 
  "io.monix" %% "monix" % "2.3.0", 
  // for GRPC Akkastream 
  "beyondthelines"         %% "grpcakkastreamruntime" % "0.0.5" 
) 
 
PB.targets in Compile := Seq( 
  scalapb.gen() -> (sourceManaged in Compile).value, 
  // generate the akka stream files 
  grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value 
)

src/main/protobuf/sum.proto

syntax = "proto3"; 
 
package learn.grpc.akka.stream.services; 
 
 
message NumPair { 
   int32 num1 = 1; 
   int32 num2 = 2; 
} 
 
message Num { 
   int32 num = 1; 
} 
 
message SumResult { 
   int32 result = 1; 
} 
 
service SumNumbers { 
   rpc SumPair(NumPair) returns (SumResult) {} 
   rpc GenIncsFrom(Num) returns (stream Num) {} 
   rpc SumStreamNums(stream Num) returns (SumResult) {} 
   rpc KeepAdding(stream Num) returns (stream SumResult) {} 
}

src/main/scala/gRPCAkkaStreamService.scala

package learn.grpc.akka.stream.services.impl 
 
import akka.NotUsed 
import akka.stream.scaladsl.Flow 
import learn.grpc.akka.stream.services.sum._ 
import java.util.logging.Logger 
 
class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers { 
  val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName) 
 
  override def sumPair: Flow[NumPair, SumResult, NotUsed] = { 
    logger.info(s"*** calling sumPair ...  ***") 
    Flow[NumPair].map { 
      case NumPair(a, b) => { 
        logger.info(s"serving ${a} + ${b} = ???") 
        SumResult(a + b) 
      } 
    } 
  } 
 
  override def genIncsFrom: Flow[Num, Num, NotUsed] = { 
    logger.info("*** calling genIncsFrom ... ***") 
    Flow[Num].mapConcat { 
      n => 
        (1 to n.num).map { m => 
          logger.info(s"genIncFrom producing num: ${m}") 
          Num(m) 
        } 
    } 
  } 
 
  override def sumStreamNums: Flow[Num, SumResult, NotUsed] = { 
    logger.info("*** calling sumStreamNums ... ***") 
    Flow[Num].fold(SumResult(0)) { 
      case (a, b) => 
        logger.info(s"receiving operand ${b.num}") 
        SumResult(b.num + a.result) 
    } 
  } 
 
  override def keepAdding: Flow[Num, SumResult, NotUsed] = { 
    Flow[Num].scan(SumResult(0)) { 
      case (a,b) => 
        logger.info(s"receiving operand ${b.num}") 
        SumResult(b.num + a.result) 
    } 
  } 
}

src/main/scala/gRPCAkkaStreamServer.scala

package learn.grpc.akka.stream.server 
 
import java.util.logging.Logger 
 
import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import io.grpc.Server 
import learn.grpc.akka.stream.services.impl.gRPCAkkaStreamService 
import io.grpc.ServerBuilder 
import learn.grpc.akka.stream.services.sum._ 
class gRPCServer(server: Server) { 
 
  val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName) 
 
  def start(): Unit = { 
    server.start() 
    logger.info(s"Server started, listening on ${server.getPort}") 
    sys.addShutdownHook { 
      // Use stderr here since the logger may has been reset by its JVM shutdown hook. 
      System.err.println("*** shutting down gRPC server since JVM is shutting down") 
      stop() 
      System.err.println("*** server shut down") 
    } 
    () 
  } 
 
  def stop(): Unit = { 
    server.shutdown() 
  } 
 
  /** 
    * Await termination on the main thread since the grpc library uses daemon threads. 
    */ 
  def blockUntilShutdown(): Unit = { 
    server.awaitTermination() 
  } 
} 
 
object DemoServer extends App { 
  implicit val system = ActorSystem("UnaryServer") 
  implicit val mat = ActorMaterializer.create(system) 
  val server = new gRPCServer( 
    ServerBuilder 
      .forPort(50051) 
        .addService( 
          SumGrpcAkkaStream.bindService( 
           new gRPCAkkaStreamService 
          ) 
      ).build() 
  ) 
  server.start() 
//  UnaryServer.blockUntilShutdown() 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
}

src/main/scala/gRPCAkkaStreamClient.scala

package learn.grpc.akka.stream.client 
import learn.grpc.akka.stream.services.sum._ 
import java.util.logging.Logger 
 
import akka.stream.scaladsl._ 
import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, ThrottleMode} 
import scala.concurrent.duration._ 
import io.grpc._ 
class gRPCAkkaStreamClient(host: String, port: Int) { 
  val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamClient].getName) 
 
  val channel = ManagedChannelBuilder 
      .forAddress(host,port) 
      .usePlaintext(true) 
      .build() 
 
  val stub = SumGrpcAkkaStream.stub(channel) 
 
  def addPair(num1: Int, num2: Int): Source[String,NotUsed] = { 
    logger.info(s"Requesting to add $num1, $num2") 
    Source 
      .single(NumPair(num1,num2)) 
      .via(stub.sumPair) 
      .map(r => s"the result: ${r.result}") 
  } 
  def genIncNumbers(len: Int): Source[Int,NotUsed] = { 
    logger.info(s"Requesting to produce ${len} inc numbers") 
    Source 
      .single(Num(len)) 
      .via(stub.genIncsFrom) 
      .map(n => n.num) 
  } 
  def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = { 
    logger.info(s"Requesting to sum up ${nums}") 
    Source(nums.map(Num(_)).to[collection.immutable.Iterable]) 
      .throttle(1, 500.millis, 1, ThrottleMode.shaping) 
      .map { n => 
        logger.info(s"Sending number: $n") 
        n 
      } 
      .via(stub.sumStreamNums) 
      .map(r => s"the result: ${r.result}") 
    } 
  def ContSum(nums: Seq[Int]): Source[String,NotUsed] = { 
    logger.info(s"Requesting to sum up ${nums}") 
    Source(nums.map(Num(_)).to[collection.immutable.Iterable]) 
      .throttle(1, 500.millis, 1, ThrottleMode.shaping) 
      .map { n => 
        logger.info(s"Sending number: $n") 
        n 
      } 
      .via(stub.keepAdding) 
      .map(r => s"current sum = ${r.result}") 
  } 
} 
 
object UnaryCallClient extends App { 
  implicit val system = ActorSystem("UnaryClient") 
  implicit val mat = ActorMaterializer.create(system) 
 
  val client = new gRPCAkkaStreamClient("localhost", 50051) 
 
  client.addPair(29,33).runForeach(println) 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
} 
 
object ServerStreamingClient extends App { 
  implicit val system = ActorSystem("ServerStreamingClient") 
  implicit val mat = ActorMaterializer.create(system) 
 
  val client = new gRPCAkkaStreamClient("localhost", 50051) 
 
  client.genIncNumbers(5).runForeach(println) 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
} 
 
object ClientStreamingClient extends App { 
  implicit val system = ActorSystem("ClientStreamingClient") 
  implicit val mat = ActorMaterializer.create(system) 
 
  val client = new gRPCAkkaStreamClient("localhost", 50051) 
 
  client.sumManyNumbers(Seq(12,4,8,19)).runForeach(println) 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
} 
 
object BiDiStreamingClient extends App { 
  implicit val system = ActorSystem("BiDiStreamingClient") 
  implicit val mat = ActorMaterializer.create(system) 
 
  val client = new gRPCAkkaStreamClient("localhost", 50051) 
 
  client.ContSum(Seq(12,4,8,19)).runForeach(println) 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
}

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/12798.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论