Akka(37): Http:客户端操作模式详解编程语言

   Akka-http的客户端连接模式除Connection-Level和Host-Level之外还有一种非常便利的模式:Request-Level-Api。这种模式免除了连接Connection的概念,任何时候可以直接调用singleRequest来与服务端沟通。下面我们用几个例子来示范singleRequest的用法:

  (for { 
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message")) 
    message <- Unmarshal(response.entity).to[String] 
  } yield message).andThen { 
    case Success(msg) => println(s"Received message: $msg") 
    case Failure(err) => println(s"Error: ${err.getMessage}") 
  }.andThen {case _ => sys.terminate()}

这是一个GET操作:用Http().singleRequest直接把HttpRequest发送给服务端uri并获取返回的HttpResponse。我们看到,整组函数的返回类型都是Future[?],所以用for-comprehension来把所有实际运算包嵌在Future运算模式内(context)。下面这个例子是客户端上传数据示范:

 (for { 
    entity <- Marshal("Wata hell you doing?").to[RequestEntity] 
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity)) 
    message <- Unmarshal(response.entity).to[String] 
  } yield message).andThen { 
    case Success(msg) => println(s"Received message: $msg") 
    case Failure(err) => println(s"Error: ${err.getMessage}") 
  }.andThen {case _ => sys.terminate()}

以上是个PUT操作。我们需要先构建数据载体HttpEntity。格式转换函数Marshal也返回Future[HttpEntity],所以也可以包含在for语句内。关注一下这个andThen,它可以连接一串多个monadic运算,在不影响上游运算结果的情况下实现一些副作用计算。值得注意的是上面这两个例子虽然表现形式很简洁,但我们无法对数据转换过程中的异常及response的状态码等进行监控。所以我们应该把整个过程拆分成两部分:先获取response,再具体处理response,包括核对状态,处理数据等:

  case class Item(id: Int, name: String, price: Double) 
 
  def getItem(itemId: Int): Future[HttpResponse] = for { 
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId")) 
  } yield response 
 
  def extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = { 
    futResp.andThen { 
      case Success(HttpResponse(StatusCodes.OK, _, entity, _)) => 
        Unmarshal(entity).to[T] 
          .onComplete { 
            case Success(t) => println(s"Got response entity: ${t}") 
            case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}") 
          } 
      case Success(_) => println("Exception in response!") 
      case Failure(err) => println(s"Response Failed: ${err.getMessage}") 
    } 
  } 
  extractEntity[Item](getItem(13))

现在这个extractEntity[Item](getItem(13))可以实现全过程的监控管理了。用同样的模式实现PUT操作:

  def putItem(item: Item): Future[HttpResponse] = 
   for { 
    reqEntity <- Marshal(item).to[RequestEntity] 
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity)) 
   } yield response 
 
  extractEntity[Item](putItem(Item(23,"Item#23", 46.0))) 
     .andThen { case _ => sys.terminate()}

当然,我们还是使用了前面几篇讨论里的Marshalling方式来进行数据格式的自动转换:

import de.heikoseeberger.akkahttpjson4s.Json4sSupport 
import org.json4s.jackson 
... 
trait JsonCodec extends Json4sSupport { 
  import org.json4s.DefaultFormats 
  import org.json4s.ext.JodaTimeSerializers 
  implicit val serilizer = jackson.Serialization 
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all 
} 
object JsConverters extends JsonCodec 
... 
  import JsConverters._ 
 
  implicit val jsonStreamingSupport = EntityStreamingSupport.json() 
    .withParallelMarshalling(parallelism = 8, unordered = false)

如果我们需要对数据交换过程进行更细致的管控,用Host-Level-Api会更加适合。下面我们就针对Host-Level-Api构建一个客户端的工具库:

class PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings) 
                  (implicit sys: ActorSystem, mat: ActorMaterializer) { 
 
  import sys.dispatcher 
 
  private val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] = 
    Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings) 
//单一request 
  def requestSingleResponse(req: HttpRequest): Future[HttpResponse] = { 
    Source.single(req -> 1) 
      .via(cnnPool) 
      .runWith(Sink.head).flatMap { 
      case (Success(resp), _) => Future.successful(resp) 
      case (Failure(fail), _) => Future.failed(fail) 
    } 
  } 
//组串request 
  def orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = { 
    Source(reqs.zipWithIndex.toMap) 
      .via(cnnPool) 
      .runFold(SortedMap[Int, Future[HttpResponse]]()) { 
        case (m, (Success(r), idx)) => m + (idx -> Future.successful(r)) 
        case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f)) 
      }.flatMap { m => Future.sequence(m.values) } 
  } 
}

下面是一种比较安全的模式:使用了queue来暂存request从而解决因发送方与接收方速率不同所产生的问题:

class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings) 
                          (qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew) 
                  (implicit sys: ActorSystem, mat: ActorMaterializer) { 
  import sys.dispatcher 
  private val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] = 
    Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings) 
 
  val queue = 
    Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy) 
      .via(cnnPool) 
      .to(Sink.foreach({ 
        case ((Success(resp), p)) => p.success(resp) 
        case ((Failure(e), p))    => p.failure(e) 
      })).run() 
 
  def queueRequest(request: HttpRequest): Future[HttpResponse] = { 
      val responsePromise = Promise[HttpResponse]() 
      queue.offer(request -> responsePromise).flatMap { 
        case QueueOfferResult.Enqueued    => responsePromise.future 
        case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later.")) 
        case QueueOfferResult.Failure(ex) => Future.failed(ex) 
        case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later.")) 
      } 
  } 
}

下面是这些工具函数的具体使用示范:

  val settings = ConnectionPoolSettings(sys) 
    .withMaxConnections(8) 
    .withMaxOpenRequests(8) 
    .withMaxRetries(3) 
    .withPipeliningLimit(4) 
  val pooledClient = new PooledClient("localhost",8011,settings) 
 
  def getItemByPool(itemId: Int): Future[HttpResponse] = for { 
    response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId")) 
  } yield response 
 
  extractEntity[Item](getItemByPool(13)) 
 
  def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = { 
    val reqs = itemIds.map { id => 
      HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id") 
    } 
    val rets = (for { 
      responses <- pooledClient.orderedResponses(reqs) 
    } yield responses) 
    rets 
  } 
  val futResps = getItemsByPool(List(3,5,7)) 
 
  futResps.andThen { 
    case Success(listOfResps) => { 
      listOfResps.foreach { r => 
        r match { 
          case HttpResponse(StatusCodes.OK, _, entity, _) => 
            Unmarshal(entity).to[Item] 
              .onComplete { 
                case Success(t) => println(s"Got response entity: ${t}") 
                case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}") 
              } 
          case _ => println("Exception in response!") 
        } 
      } 
    } 
    case _ => println("Failed to get list of responses!") 
  } 
 
  val queuedClient = new QueuedRequestsClient("localhost",8011,settings)() 
 
 
  def putItemByQueue(item: Item): Future[HttpResponse] = 
    for { 
      reqEntity <- Marshal(item).to[RequestEntity] 
      response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity)) 
    } yield response 
 
  extractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0))) 
    .andThen { case _ => sys.terminate()}

下面是本次讨论的示范源代码:

服务端代码:

import akka.actor._ 
import akka.stream._ 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.server.Directives._ 
 
import de.heikoseeberger.akkahttpjson4s.Json4sSupport 
import org.json4s.jackson 
trait JsonCodec extends Json4sSupport { 
  import org.json4s.DefaultFormats 
  import org.json4s.ext.JodaTimeSerializers 
  implicit val serilizer = jackson.Serialization 
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all 
} 
object JsConverters extends JsonCodec 
 
 
object TestServer extends App with JsonCodec { 
  implicit val httpSys = ActorSystem("httpSystem") 
  implicit val httpMat = ActorMaterializer() 
  implicit val httpEC = httpSys.dispatcher 
 
  import JsConverters._ 
 
  case class Item(id: Int, name: String, price: Double) 
  val messages = path("message") { 
    get { 
      complete("hello, how are you?") 
    } ~ 
    put { 
      entity(as[String]) {msg => 
        complete(msg) 
      } 
    } 
  } 
  val items = 
    (path("item" / IntNumber) & get) { id => 
       get { 
         complete(Item(id, s"item#$id", id * 2.0)) 
       } 
    } ~ 
      (path("item") & put) { 
        entity(as[Item]) {item => 
          complete(item) 
        } 
     } 
 
  val route = messages ~ items 
 
  val (host, port) = ("localhost", 8011) 
 
  val bindingFuture = Http().bindAndHandle(route,host,port) 
 
 
  println(s"Server running at $host $port. Press any key to exit ...") 
 
  scala.io.StdIn.readLine() 
 
  bindingFuture.flatMap(_.unbind()) 
    .onComplete(_ => httpSys.terminate()) 
 
}

客户端源代码: 

import akka.actor._ 
import akka.http.scaladsl.settings.ConnectionPoolSettings 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model._ 
import scala.util._ 
import de.heikoseeberger.akkahttpjson4s.Json4sSupport 
import org.json4s.jackson 
import scala.concurrent._ 
import akka.http.scaladsl.unmarshalling.Unmarshal 
import akka.http.scaladsl.unmarshalling._ 
import akka.http.scaladsl.marshalling.Marshal 
import scala.collection.SortedMap 
import akka.http.scaladsl.common._ 
trait JsonCodec extends Json4sSupport { 
import org.json4s.DefaultFormats 
import org.json4s.ext.JodaTimeSerializers 
implicit val serilizer = jackson.Serialization 
implicit val formats = DefaultFormats ++ JodaTimeSerializers.all 
} 
object JsConverters extends JsonCodec 
class PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings) 
(implicit sys: ActorSystem, mat: ActorMaterializer) { 
import sys.dispatcher 
private val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] = 
Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings) 
def requestSingleResponse(req: HttpRequest): Future[HttpResponse] = { 
Source.single(req -> 1) 
.via(cnnPool) 
.runWith(Sink.head).flatMap { 
case (Success(resp), _) => Future.successful(resp) 
case (Failure(fail), _) => Future.failed(fail) 
} 
} 
def orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = { 
Source(reqs.zipWithIndex.toMap) 
.via(cnnPool) 
.runFold(SortedMap[Int, Future[HttpResponse]]()) { 
case (m, (Success(r), idx)) => m + (idx -> Future.successful(r)) 
case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f)) 
}.flatMap { m => Future.sequence(m.values) } 
} 
} 
class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings) 
(qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew) 
(implicit sys: ActorSystem, mat: ActorMaterializer) { 
import sys.dispatcher 
private val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] = 
Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings) 
val queue = 
Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy) 
.via(cnnPool) 
.to(Sink.foreach({ 
case ((Success(resp), p)) => p.success(resp) 
case ((Failure(e), p))    => p.failure(e) 
})).run() 
def queueRequest(request: HttpRequest): Future[HttpResponse] = { 
val responsePromise = Promise[HttpResponse]() 
queue.offer(request -> responsePromise).flatMap { 
case QueueOfferResult.Enqueued    => responsePromise.future 
case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later.")) 
case QueueOfferResult.Failure(ex) => Future.failed(ex) 
case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later.")) 
} 
} 
} 
object ClientRequesting extends App { 
import JsConverters._ 
implicit val sys = ActorSystem("sysClient") 
implicit val mat = ActorMaterializer() 
implicit val ec = sys.dispatcher 
implicit val jsonStreamingSupport = EntityStreamingSupport.json() 
.withParallelMarshalling(parallelism = 8, unordered = false) 
case class Item(id: Int, name: String, price: Double) 
def extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = { 
futResp.andThen { 
case Success(HttpResponse(StatusCodes.OK, _, entity, _)) => 
Unmarshal(entity).to[T] 
.onComplete { 
case Success(t) => println(s"Got response entity: ${t}") 
case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}") 
} 
case Success(_) => println("Exception in response!") 
case Failure(err) => println(s"Response Failed: ${err.getMessage}") 
} 
} 
(for { 
response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message")) 
message <- Unmarshal(response.entity).to[String] 
} yield message).andThen { 
case Success(msg) => println(s"Received message: $msg") 
case Failure(err) => println(s"Error: ${err.getMessage}") 
}  //.andThen {case _ => sys.terminate()} 
 
(for { 
entity <- Marshal("Wata hell you doing?").to[RequestEntity] 
response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity)) 
message <- Unmarshal(response.entity).to[String] 
} yield message).andThen { 
case Success(msg) => println(s"Received message: $msg") 
case Failure(err) => println(s"Error: ${err.getMessage}") 
} //.andThen {case _ => sys.terminate()} 
 
def getItem(itemId: Int): Future[HttpResponse] = for { 
response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId")) 
} yield response 
extractEntity[Item](getItem(13)) 
def putItem(item: Item): Future[HttpResponse] = 
for { 
reqEntity <- Marshal(item).to[RequestEntity] 
response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity)) 
} yield response 
extractEntity[Item](putItem(Item(23,"Item#23", 46.0))) 
.andThen { case _ => sys.terminate()} 
val settings = ConnectionPoolSettings(sys) 
.withMaxConnections(8) 
.withMaxOpenRequests(8) 
.withMaxRetries(3) 
.withPipeliningLimit(4) 
val pooledClient = new PooledClient("localhost",8011,settings) 
def getItemByPool(itemId: Int): Future[HttpResponse] = for { 
response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId")) 
} yield response 
extractEntity[Item](getItemByPool(13)) 
def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = { 
val reqs = itemIds.map { id => 
HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id") 
} 
val rets = (for { 
responses <- pooledClient.orderedResponses(reqs) 
} yield responses) 
rets 
} 
val futResps = getItemsByPool(List(3,5,7)) 
futResps.andThen { 
case Success(listOfResps) => { 
listOfResps.foreach { r => 
r match { 
case HttpResponse(StatusCodes.OK, _, entity, _) => 
Unmarshal(entity).to[Item] 
.onComplete { 
case Success(t) => println(s"Got response entity: ${t}") 
case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}") 
} 
case _ => println("Exception in response!") 
} 
} 
} 
case _ => println("Failed to get list of responses!") 
} 
val queuedClient = new QueuedRequestsClient("localhost",8011,settings)() 
def putItemByQueue(item: Item): Future[HttpResponse] = 
for { 
reqEntity <- Marshal(item).to[RequestEntity] 
response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity)) 
} yield response 
extractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0))) 
.andThen { case _ => sys.terminate()} 
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12824.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论