Akka(41): Http:DBTable-rows streaming – 数据库表行交换详解编程语言

  在前面一篇讨论里我们介绍了通过http进行文件的交换。因为文件内容是以一堆bytes来表示的,而http消息的数据部分也是byte类型的,所以我们可以直接用Source[ByteString,_]来读取文件然后放进HttpEntity中。我们还提到:如果需要进行数据库数据交换的话,可以用Source[ROW,_]来表示库表行,但首先必须进行ROW -> ByteString的转换。在上期讨论我们提到过这种转换其实是ROW->Json->ByteString或者反方向的转换,在Akka-http里称之为Marshalling和Unmarshalling。Akka-http的Marshalling实现采用了type-class编程模式,需要为每一种类型与Json的转换在可视域内提供Marshaller[A,B]类型的隐式实例。Akka-http默认的Json工具库是Spray-Json,着重case class,而且要提供JsonFormat?(case-class),其中?代表case class的参数个数,用起来略显复杂。不过因为是Akka-http的配套库,在将来Akka-http的持续发展中具有一定的优势,所以我们还是用它来进行下面的示范。

下面就让我们开始写些代码吧。首先,我们用一个case class代表数据库表行结构,然后用它作为流元素来构建一个Source,如下:

  case class County(id: Int, name: String) 
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }

我们先设计服务端的数据下载部分:

import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.server.Directives._ 
import akka._ 
import akka.http.scaladsl.common._ 
import spray.json.DefaultJsonProtocol 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport 
 
 
trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol 
object Converters extends MyFormats { 
  case class County(id: Int, name: String) 
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") } 
  implicit val countyFormat = jsonFormat2(County) 
} 
 
object HttpDBServer extends App { 
  import Converters._ 
 
  implicit val httpSys = ActorSystem("httpSystem") 
  implicit val httpMat = ActorMaterializer() 
  implicit val httpEC = httpSys.dispatcher 
 
 
  implicit val jsonStreamingSupport = EntityStreamingSupport.json() 
    .withParallelMarshalling(parallelism = 8, unordered = false) 
 
  val route = 
    path("rows") { 
      get { 
        complete { 
          source 
        } 
      } 
    } 
 
  val (port, host) = (8011,"localhost") 
 
  val bindingFuture = Http().bindAndHandle(route,host,port) 
 
  println(s"Server running at $host $port. Press any key to exit ...") 
 
  scala.io.StdIn.readLine() 
 
  bindingFuture.flatMap(_.unbind()) 
    .onComplete(_ => httpSys.terminate()) 
 
}

在上面的代码里我们直接把source放进了complete(),然后期望这个directive能通过ToEntityMarshaller[County]类实例用Spray-Json把Source[County,NotUsed]转换成Source[ByteString,NotUsed]然后放入HttpResponse的HttpEntity里。转换结果只能在客户端得到证实。我们知道HttpResponse里的Entity.dataBytes就是一个Source[ByteString,_],我们可以把它Unmarshall成Source[County,_],然后用Akka-stream来操作:

     case Success([email protected](StatusCodes.OK, _, entity, _)) => 
          val futSource = Unmarshal(entity).to[Source[County,NotUsed]] 
          futSource.onSuccess { 
            case source => source.runForeach(println) 
          } 
 

上面这个Unmarshal调用了下面这个FromEntityUnmarshaller[County]隐式实例:

  // support for as[Source[T, NotUsed]] 
  implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] = 
    Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ e ⇒ 
      if (support.supported.matches(e.contentType)) { 
        val frames = e.dataBytes.via(support.framingDecoder) 
        val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_) 
        val unmarshallingFlow = 
          if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal) 
          else Flow[ByteString].mapAsync(support.parallelism)(unmarshal) 
        val elements = frames.viaMat(unmarshallingFlow)(Keep.right) 
        FastFuture.successful(elements) 
      } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported)) 
    }

这个隐式实例是由Spray-Jason提供的,在SprayJsonSupport.scala里。
下面是这部分客户端的完整代码: 

import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model._ 
import scala.util._ 
import akka._ 
import akka.http.scaladsl.common._ 
import spray.json.DefaultJsonProtocol 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport 
import akka.http.scaladsl.unmarshalling.Unmarshal 
 
trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol 
object Converters extends MyFormats { 
  case class County(id: Int, name: String) 
  implicit val countyFormat = jsonFormat2(County) 
} 
 
object HttpDBClient extends App { 
  import Converters._ 
 
  implicit val sys = ActorSystem("ClientSys") 
  implicit val mat = ActorMaterializer() 
  implicit val ec = sys.dispatcher 
 
  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() 
 
  def downloadRows(request: HttpRequest) = { 
    val futResp = Http(sys).singleRequest(request) 
    futResp 
      .andThen { 
        case Success([email protected](StatusCodes.OK, _, entity, _)) => 
          val futSource = Unmarshal(entity).to[Source[County,NotUsed]] 
          futSource.onSuccess { 
            case source => source.runForeach(println) 
          } 
        case Success([email protected](code, _, _, _)) => 
          println(s"download request failed, response code: $code") 
          r.discardEntityBytes() 
        case Success(_) => println("Unable to download rows!") 
        case Failure(err) => println(s"download failed: ${err.getMessage}") 
 
      } 
  } 
  downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows")) 
 
  scala.io.StdIn.readLine() 
 
  sys.terminate() 
 
}

以上我们已经实现了客户端从服务端下载一段数据库表行,然后以Akka-stream的操作方式来处理下载数据。那么反向交换即从客户端上传一段表行的话就需要把一个Source[T,_]转换成Source[ByteString,_]然后放进HttpRequest的HttpEntity里。服务端收到数据后又要进行反向的转换即把Request.Entity.dataBytes从Source[ByteString,_]转回Source[T,_]。Akka-http在客户端没有提供像complete这样的强大的自动化功能。我们可能需要自定义并提供像ToRequestMarshaller[Source[T,_]]这样的隐式实例。但Akka-http的Marshalling-type-class是个非常复杂的系统。如果我们的目的是简单提供一个Source[ByteString,_],我们是否可以直接调用Spray-Json的函数来进行ROW->Son->ByteString转换呢?如下:

  import akka.util.ByteString 
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource 
 
  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")} 
  def countyToByteString(c: County) = { 
    ByteString(c.toJson.toString) 
  } 
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString) 
 
  val rowBytes = limitableByteSource(source via flowCountyToByteString) 
 
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows") 
  val data = HttpEntity( 
    ContentTypes.`application/json`, 
    rowBytes 
  ) 
 
我们直接用toJson函数进行County->Json转换实现了flowCountyToByteString。toJason是Spray-Json提供的一个函数: 
package json { 
 
  case class DeserializationException(msg: String, cause: Throwable = null, fieldNames: List[String] = Nil) extends RuntimeException(msg, cause) 
  class SerializationException(msg: String) extends RuntimeException(msg) 
 
  private[json] class PimpedAny[T](any: T) { 
    def toJson(implicit writer: JsonWriter[T]): JsValue = writer.write(any) 
  } 
 
  private[json] class PimpedString(string: String) { 
    @deprecated("deprecated in favor of parseJson", "1.2.6") 
    def asJson: JsValue = parseJson 
    def parseJson: JsValue = JsonParser(string) 
  } 
}

假设服务端收到数据后以Akka-stream方式再转换成一个List返回,我们用下面的方法来测试功能:

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = { 
    val futResp = Http(sys).singleRequest( 
      request.copy(entity = dataEntity) 
    ) 
    futResp 
      .andThen { 
        case Success([email protected](StatusCodes.OK, _, entity, _)) => 
          entity.dataBytes.map(_.utf8String).runForeach(println) 
        case Success([email protected](code, _, _, _)) => 
          println(s"Upload request failed, response code: $code") 
          r.discardEntityBytes() 
        case Success(_) => println("Unable to Upload file!") 
        case Failure(err) => println(s"Upload failed: ${err.getMessage}") 
 
      } 
  }

服务端接收数据处理方法如下:

     post { 
        withoutSizeLimit { 
          entity(asSourceOf[County]) { source => 
            val futofNames: Future[List[String]] = 
              source.runFold(List[String](""))((acc, b) => acc ++ List(b.name)) 
            complete { 
              futofNames 
            } 
          } 
        } 
      }

考虑到在数据转换的过程中可能会出现异常。需要异常处理方法来释放backpressure:

  def postExceptionHandler: ExceptionHandler = 
    ExceptionHandler { 
      case _: RuntimeException => 
        extractRequest { req => 
          req.discardEntityBytes() 
          complete((StatusCodes.InternalServerError.intValue,"Upload Failed!")) 
        } 
    } 
 
      post { 
        withoutSizeLimit { 
          handleExceptions(postExceptionHandler) { 
            entity(asSourceOf[County]) { source => 
              val futofNames: Future[List[String]] = 
                source.runFold(List[String](""))((acc, b) => acc ++ List(b.name)) 
              complete { 
                futofNames 
              } 
            } 
          } 
        } 
      }

在客户端试运行返回结果显示:

  uploadRows(request,data) 
 
["","广西壮族自治区地市县编号 #1","广西壮族自治区地市县编号 #2","广西壮族自治区地市县编号 #3","广西壮族自治区地市县编号 #4","广西壮族自治区地市县编号 #5"]

正是我们期待的结果。

下面是本次讨论的示范代码:

服务端:

import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.Http 
import akka._ 
import akka.http.scaladsl.common._ 
import spray.json.DefaultJsonProtocol 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport 
import scala.concurrent._ 
import akka.http.scaladsl.server._ 
import akka.http.scaladsl.server.Directives._ 
import akka.http.scaladsl.model._ 
 
trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol 
object Converters extends MyFormats { 
  case class County(id: Int, name: String) 
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") } 
  implicit val countyFormat = jsonFormat2(County) 
} 
 
object HttpDBServer extends App { 
  import Converters._ 
 
  implicit val httpSys = ActorSystem("httpSystem") 
  implicit val httpMat = ActorMaterializer() 
  implicit val httpEC = httpSys.dispatcher 
 
 
  implicit val jsonStreamingSupport = EntityStreamingSupport.json() 
    .withParallelMarshalling(parallelism = 8, unordered = false) 
 
  def postExceptionHandler: ExceptionHandler = 
    ExceptionHandler { 
      case _: RuntimeException => 
        extractRequest { req => 
          req.discardEntityBytes() 
          complete((StatusCodes.InternalServerError.intValue,"Upload Failed!")) 
        } 
    } 
 
  val route = 
    path("rows") { 
      get { 
        complete { 
          source 
        } 
      } ~ 
      post { 
        withoutSizeLimit { 
          handleExceptions(postExceptionHandler) { 
            entity(asSourceOf[County]) { source => 
              val futofNames: Future[List[String]] = 
                source.runFold(List[String](""))((acc, b) => acc ++ List(b.name)) 
              complete { 
                futofNames 
              } 
            } 
          } 
        } 
      } 
    } 
 
  val (port, host) = (8011,"localhost") 
 
  val bindingFuture = Http().bindAndHandle(route,host,port) 
 
  println(s"Server running at $host $port. Press any key to exit ...") 
 
  scala.io.StdIn.readLine() 
 
  bindingFuture.flatMap(_.unbind()) 
    .onComplete(_ => httpSys.terminate()) 
 
}

客户端:

import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model._ 
import scala.util._ 
import akka._ 
import akka.http.scaladsl.common._ 
import spray.json.DefaultJsonProtocol 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport 
import akka.http.scaladsl.unmarshalling._ 
trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol 
object Converters extends MyFormats { 
case class County(id: Int, name: String) 
implicit val countyFormat = jsonFormat2(County) 
} 
object HttpDBClient extends App { 
import Converters._ 
implicit val sys = ActorSystem("ClientSys") 
implicit val mat = ActorMaterializer() 
implicit val ec = sys.dispatcher 
implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() 
def downloadRows(request: HttpRequest) = { 
val futResp = Http(sys).singleRequest(request) 
futResp 
.andThen { 
case Success([email protected](StatusCodes.OK, _, entity, _)) => 
val futSource = Unmarshal(entity).to[Source[County,NotUsed]] 
futSource.onSuccess { 
case source => source.runForeach(println) 
} 
case Success([email protected](code, _, _, _)) => 
println(s"download request failed, response code: $code") 
r.discardEntityBytes() 
case Success(_) => println("Unable to download rows!") 
case Failure(err) => println(s"download failed: ${err.getMessage}") 
} 
} 
downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows")) 
import akka.util.ByteString 
import akka.http.scaladsl.model.HttpEntity.limitableByteSource 
val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")} 
def countyToByteString(c: County) = { 
ByteString(c.toJson.toString) 
} 
val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString) 
val rowBytes = limitableByteSource(source via flowCountyToByteString) 
val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows") 
val data = HttpEntity( 
ContentTypes.`application/json`, 
rowBytes 
) 
def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = { 
val futResp = Http(sys).singleRequest( 
request.copy(entity = dataEntity) 
) 
futResp 
.andThen { 
case Success([email protected](StatusCodes.OK, _, entity, _)) => 
entity.dataBytes.map(_.utf8String).runForeach(println) 
case Success([email protected](code, _, _, _)) => 
println(s"Upload request failed, response code: $code") 
r.discardEntityBytes() 
case Success(_) => println("Unable to Upload file!") 
case Failure(err) => println(s"Upload failed: ${err.getMessage}") 
} 
} 
uploadRows(request,data) 
scala.io.StdIn.readLine() 
sys.terminate() 
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/12820.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论