Akka(38): Http:Entityof ByteString-数据传输基础详解编程语言

  我们说过Akka-http是一个好的系统集成工具,集成是通过数据交换方式实现的。Http是个在网上传输和接收的规范协议。所以,在使用Akka-http之前,可能我们还是需要把Http模式的网上数据交换细节了解清楚。数据交换双方是通过Http消息类型Request和Response来实现的。在Akka-http中对应的是HttpRequest和HttpResponse。这两个类型都具备HttpEntity类型来装载需要交换的数据。首先,无论如何数据在线上的表现形式肯定是一串bytes。所以,数据交换两头Request,Response中的Entity也必须是以bytes来表达的。在Akka-http里我们把需要传输的数据转换成ByteString,通过网络发送給接收端、接收端再把收到消息Entity中的ByteString转换成目标类型的数据。这两个转换过程就是Akka-http的Marshalling和Unmarshalling过程了。我们先从HttpEntity的构建函数来了解它的定义:

object HttpEntity { 
  implicit def apply(string: String): HttpEntity.Strict = apply(ContentTypes.`text/plain(UTF-8)`, string) 
  implicit def apply(bytes: Array[Byte]): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, bytes) 
  implicit def apply(data: ByteString): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, data) 
  def apply(contentType: ContentType.NonBinary, string: String): HttpEntity.Strict = 
    if (string.isEmpty) empty(contentType) else apply(contentType, ByteString(string.getBytes(contentType.charset.nioCharset))) 
  def apply(contentType: ContentType, bytes: Array[Byte]): HttpEntity.Strict = 
    if (bytes.length == 0) empty(contentType) else apply(contentType, ByteString(bytes)) 
  def apply(contentType: ContentType, data: ByteString): HttpEntity.Strict = 
    if (data.isEmpty) empty(contentType) else HttpEntity.Strict(contentType, data) 
 
  def apply(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]): UniversalEntity = 
    if (contentLength == 0) empty(contentType) else HttpEntity.Default(contentType, contentLength, data) 
  def apply(contentType: ContentType, data: Source[ByteString, Any]): HttpEntity.Chunked = 
    HttpEntity.Chunked.fromData(contentType, data) 
...

很明显,HttpEntity可以分两大类,一种是Strict类型的,它的data是ByteString。另一种是UniversalEntity类型,它的数据dataBytes是Source[ByteString,Any]。无论如何最终在线上的还是ByteString。HttpEntity的ContentType注明了传输数据格式,有:

object ContentTypes { 
  val `application/json` = ContentType(MediaTypes.`application/json`) 
  val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`) 
  val `text/plain(UTF-8)` = MediaTypes.`text/plain` withCharset HttpCharsets.`UTF-8` 
  val `text/html(UTF-8)` = MediaTypes.`text/html` withCharset HttpCharsets.`UTF-8` 
  val `text/xml(UTF-8)` = MediaTypes.`text/xml` withCharset HttpCharsets.`UTF-8` 
  val `text/csv(UTF-8)` = MediaTypes.`text/csv` withCharset HttpCharsets.`UTF-8` 
 
  // used for explicitly suppressing the rendering of Content-Type headers on requests and responses 
  val NoContentType = ContentType(MediaTypes.NoMediaType) 
}

注意:ContentType只是一种备注,不影响线上数据表达形式,线上的数据永远是ByteString。但是,其中的application/octet-stream类型代表数据必须是Source[ByteString,Any]。我们下面就通过客户端的例子来理解HttpEntity。下面是一个客户端测试函数:

  def runService(request: HttpRequest, rentity: RequestEntity) = { 
   val futResp = for { 
     entity <- Future.successful(rentity) 
     resp <- Http(sys).singleRequest( 
       request.copy(entity = rentity) 
     ) 
   } yield resp 
 
   futResp 
    .andThen { 
      case Success([email protected](StatusCodes.OK, _, entity, _)) => 
        entity.dataBytes.map(_.utf8String).runForeach(println) 
      case Success([email protected](code, _, _, _)) => 
        println(s"Download request failed, response code: $code") 
        r.discardEntityBytes() 
      case Success(_) => println("Unable to download rows!") 
      case Failure(err) => println(s"Download failed: ${err.getMessage}") 
 
    } 
  }

我们只需要对这个函数传入RequestEntity就可以了解返回Response里Entity的许多细节了。首先我们要求服务端发送一个纯字符串Hello World。服务端代码如下:

 } ~ path("text") { 
      get { 
        complete("Hello World!") 
      } ~

虽然complete(“Hello World!”)有些迷糊,不过应该complete做了些字符串到ByteString的转换。我们可以从上面这个runService函数得到证实。下面是这个例子的调用:

  val reqText = HttpRequest(uri = s"http://localhost:8011/text") 
  runService(reqText,HttpEntity.Empty) 
    .andThen{case _ => sys.terminate()}

从显示的结果可以得出runService函数中的entity.dataBytes.map(_.utf8String)已经把ByteString转换成了String,也就是说服务器端发送的Entity里的数据是ByteString。

我们再试着发送一些数据給服务端,然后让服务端把结果通过response entity返回来:

    } ~ path("text") { 
      get { 
        complete("Hello World!") 
      } ~ 
        post { 
          withoutSizeLimit { 
            extractDataBytes { bytes => 
              val data = bytes.runFold(ByteString())(_ ++ _) 
              onComplete(data) { t => 
                complete(t) 
              } 
            } 
          } 
        }

我们看到服务端对request entity的操作是以ByteString进行的。客户端上传一串字符的request如下:

  val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text") 
  val uploadText = HttpEntity( 
    ContentTypes.`text/plain(UTF-8)`, 
    // transform each number to a chunk of bytes 
    ByteString("hello world again") 
  ) 
  runService(postText,uploadText) 
    .andThen{case _ => sys.terminate()}

我们可以看到放进entity里的数据是ByteString。

我们知道Akka-http是基于Akka-Stream的,具备Reactive-Stream功能特性。下面我们就示范一下如何进行stream的上传下载。首先定制一个Source:

  val numbers = Source.fromIterator(() => 
    Iterator.continually(Random.nextInt())) 
    .map(n => ByteString(s"$n/n")) 
  //make conform to withoutSizeLimit constrain 
  val source = limitableByteSource(numbers)

服务端也是用HttpEntity来装载这个Source然后通过HttpRequest传给客户端的:

  path("random") { 
      get { 
        complete( 
          HttpEntity( 
            ContentTypes.`application/octet-stream`, 
            // transform each number to a chunk of bytes 
            source.take(10000) 
          ) 
        ) 
      } ~ 
  

我们在客户端还是用runService来解析传过来的entity。由于接收一个大型的Source,所以需要修改一下接收方式代码:

   futResp 
    .andThen { 
      case Success([email protected](StatusCodes.OK, _, entity, _)) => 
        val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println) 
             Await.result(futEnt, Duration.Inf) // throws if binding fails 
             println("End of stream!!!") 
      case Success([email protected](code, _, _, _)) => 
        println(s"Download request failed, response code: $code") 
        r.discardEntityBytes() 
      case Success(_) => println("Unable to download rows!") 
      case Failure(err) => println(s"Download failed: ${err.getMessage}") 
 
    } 
 

用下面的方式调用

  val reqRandom = HttpRequest(uri = s"http://localhost:8011/random") 
    runService(reqRandom,HttpEntity.Empty) 
     .andThen{case _ => sys.terminate()}

再示范一下在客户端用Source上传数据。服务端代码:

       post { 
          withoutSizeLimit { 
            extractDataBytes { bytes => 
              val data = bytes.runFold(ByteString())(_ ++ _) 
              onComplete(data) { t => 
                complete(t) 
              } 
            } 
          } 
        }

客户端上传数据范例:

 val numbers = Source.fromIterator(() => 
    Iterator.continually(Random.nextInt())) 
    .map(n => ByteString(s"$n/n")) 
  //make conform to withoutSizeLimit constrain 
  val source = limitableByteSource(numbers) 
 
  val bytes = HttpEntity( 
    ContentTypes.`application/octet-stream`, 
    // transform each number to a chunk of bytes 
    source.take(10000) 
  ) 
  val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random") 
  runService(postRandom,bytes) 
    .andThen{case _ => sys.terminate()}

从上面讨论我们了解了在Marshal,Unmarshal下层只是ByteString的操作和转换。下面是本次讨论示范源代码:

服务端:

import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.server.Directives._ 
import akka.http.scaladsl.model._ 
import akka.util.ByteString 
import akka.http.scaladsl.model.HttpEntity._ 
 
import scala.util.Random 
 
object ServerEntity extends App { 
 
  implicit val httpSys = ActorSystem("httpSystem") 
  implicit val httpMat = ActorMaterializer() 
  implicit val httpEC = httpSys.dispatcher 
 
 
  val numbers = Source.fromIterator(() => 
    Iterator.continually(Random.nextInt())) 
    .map(n => ByteString(s"$n/n")) 
  //make conform to withoutSizeLimit constrain 
  val source = limitableByteSource(numbers) 
 
 
 
  val route = 
    path("random") { 
      get { 
        withoutSizeLimit { 
          complete( 
            HttpEntity( 
              ContentTypes.`application/octet-stream`, 
              // transform each number to a chunk of bytes 
              source.take(1000)) 
          ) 
        } 
      } ~ 
        post { 
          withoutSizeLimit { 
            extractDataBytes { bytes => 
              val data = bytes.runFold(ByteString())(_ ++ _) 
              onComplete(data) { t => 
                complete(t) 
              } 
            } 
          } 
        } 
    } ~ path("text") { 
      get { 
        complete("Hello World!") 
      } ~ 
        post { 
          withoutSizeLimit { 
            extractDataBytes { bytes => 
              val data = bytes.runFold(ByteString())(_ ++ _) 
              onComplete(data) { t => 
                complete(t) 
              } 
            } 
          } 
        } 
    } 
 
 
  val (port, host) = (8011,"localhost") 
 
  val bindingFuture = Http().bindAndHandle(route,host,port) 
 
  println(s"Server running at $host $port. Press any key to exit ...") 
 
  scala.io.StdIn.readLine() 
 
  bindingFuture.flatMap(_.unbind()) 
    .onComplete(_ => httpSys.terminate()) 
 
}

客户端:

import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.HttpEntity.limitableByteSource 
import akka.http.scaladsl.model._ 
 
import scala.concurrent.duration._ 
import akka.util.ByteString 
 
import scala.concurrent._ 
import scala.util._ 
 
object ClientEntity extends App { 
 
  implicit val sys = ActorSystem("ClientSys") 
  implicit val mat = ActorMaterializer() 
  implicit val ec = sys.dispatcher 
 
  def runService(request: HttpRequest, rentity: RequestEntity) = { 
   val futResp = for { 
     entity <- Future.successful(rentity) 
     resp <- Http(sys).singleRequest( 
       request.copy(entity = rentity) 
     ) 
   } yield resp 
 
   futResp 
    .andThen { 
      case Success([email protected](StatusCodes.OK, _, entity, _)) => 
        val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println) 
             Await.result(futEnt, Duration.Inf) // throws if binding fails 
             println("End of stream!!!") 
      case Success([email protected](code, _, _, _)) => 
        println(s"Download request failed, response code: $code") 
        r.discardEntityBytes() 
      case Success(_) => println("Unable to download rows!") 
      case Failure(err) => println(s"Download failed: ${err.getMessage}") 
 
    } 
  } 
 
  val reqText = HttpRequest(uri = s"http://localhost:8011/text") 
//  runService(reqText,HttpEntity.Empty) 
//    .andThen{case _ => sys.terminate()} 
 
  val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text") 
  val uploadText = HttpEntity( 
    ContentTypes.`text/plain(UTF-8)`, 
    // transform each number to a chunk of bytes 
    ByteString("hello world again") 
  ) 
//  runService(postText,uploadText) 
//    .andThen{case _ => sys.terminate()} 
 
  val reqRandom = HttpRequest(uri = s"http://localhost:8011/random") 
 //   runService(reqRandom,HttpEntity.Empty) 
 //    .andThen{case _ => sys.terminate()} 
 
  val numbers = Source.fromIterator(() => 
    Iterator.continually(Random.nextInt())) 
    .map(n => ByteString(s"$n/n")) 
  //make conform to withoutSizeLimit constrain 
  val source = limitableByteSource(numbers) 
 
  val bytes = HttpEntity( 
    ContentTypes.`application/octet-stream`, 
    // transform each number to a chunk of bytes 
    source.take(10000) 
  ) 
  val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random") 
  runService(postRandom,bytes) 
    .andThen{case _ => sys.terminate()} 
 
 
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12823.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论