Akka(36): Http:Client-side-Api,Client-Connections详解编程语言

   Akka-http的客户端Api应该是以HttpRequest操作为主轴的网上消息交换模式编程工具。我们知道:Akka-http是搭建在Akka-stream之上的。所以,Akka-http在客户端构建与服务器的连接通道也可以用Akka-stream的Flow来表示。这个Flow可以通过调用Http.outgoingConnection来获取:

  /** 
   * Creates a [[akka.stream.scaladsl.Flow]] representing a prospective HTTP client connection to the given endpoint. 
   * Every materialization of the produced flow will attempt to establish a new outgoing connection. 
   * 
   * To configure additional settings for requests made using this method, 
   * use the `akka.http.client` config section or pass in a [[akka.http.scaladsl.settings.ClientConnectionSettings]] explicitly. 
   */ 
  def outgoingConnection(host: String, port: Int = 80, 
                         localAddress: Option[InetSocketAddress] = None, 
                         settings:     ClientConnectionSettings  = ClientConnectionSettings(system), 
                         log:          LoggingAdapter            = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = 
    _outgoingConnection(host, port, settings.withLocalAddressOverride(localAddress), ConnectionContext.noEncryption(), ClientTransport.TCP, log)

我们看到:这个函数实现了对Server端地址host+port的设定,返回的结果类型是Flow[HttpRequest,HttpResponse,Future[OutgoingConnection]]。这个Flow代表将输入的HttpRequest转换成输出的HttpResponse。这个转换过程包括了与Server之间的Request,Response消息交换。下面我们试着用这个Flow来向Server端发送request,并获取response:

  val connFlow: Flow[HttpRequest,HttpResponse,Future[Http.OutgoingConnection]] = 
    Http().outgoingConnection("akka.io") 
   
  def sendHttpRequest(req: HttpRequest) = { 
    Source.single(req) 
      .via(connFlow) 
      .runWith(Sink.head) 
  } 
   
  sendHttpRequest(HttpRequest(uri="/")) 
      .andThen{ 
        case Success(resp) => println(s"got response: ${resp.status.intValue()}") 
        case Failure(err) => println(s"request failed: ${err.getMessage}") 
      } 
     .andThen {case _ => sys.terminate()}

上面的这种模式就是所谓Connection-Level-Client-Side-Api。这种模式可以让用户有更大程度的自由度控制connection的构建、使用及在connection上发送request的方式。一般来讲,当返回response的entity被完全消耗后系统会自动close connection,这套api还提供了一些手动方法可以在有需要的情况下手动进行connection close,如下:

 //close connection by cancelling response entity 
  resp.entity.dataBytes.runWith(Sink.cancelled) 
 //close connection by receiving response with close header 
  Http().bindAndHandleSync( 
    { req ⇒ HttpResponse(headers = headers.Connection("close") :: Nil) }, 
    "akka.io", 
    80)(mat)

Akka-http客户端api还有一种实用的Host-Level-Client-Side-Api模式。这套api能自动针对每个端点维护一个连接池(connection-pool),用户只需对连接池进行配置。系统按照连接池配置自动维护池内线程的生、死、动、停。akka-http.host-connection-pool配置中max-connections,max-open-requests,pipelining-limit等控制着connection、在途request的数量,需要特别注意。针对某个端点的连接池是通过Http().cachedHostConnectionPool(endPoint)获取的。同样,获取的也是一个client-flow实例。因为系统自动维护着线程池,所以client-flow实例可以任意引用,无论调用次数与调用时间间隔。cachedHostConnectionPool()函数定义如下:

  /** 
   * Same as [[#cachedHostConnectionPool]] but for encrypted (HTTPS) connections. 
   * 
   * If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used 
   * for encryption on the connections. 
   * 
   * To configure additional settings for the pool (and requests made using it), 
   * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly. 
   */ 
  def cachedHostConnectionPoolHttps[T](host: String, port: Int = 443, 
                                       connectionContext: HttpsConnectionContext = defaultClientHttpsContext, 
                                       settings:          ConnectionPoolSettings = defaultConnectionPoolSettings, 
                                       log:               LoggingAdapter         = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { 
    val cps = ConnectionPoolSetup(settings, connectionContext, log) 
    val setup = HostConnectionPoolSetup(host, port, cps) 
    cachedHostConnectionPool(setup) 
  }

函数返回结果类型:Flow[(HttpRequest,T),(Try[HttpResponse],T),HostConnectionPool]。因为线程池内的线程是异步构建request和接收response的,而返回response的顺序未必按照发送request的顺序,所以需要一个tuple2的T类型标示request与返回的response进行匹配。线程池会根据idle-timeout自动终止,也可以手动通过HostConnectionPool.shutDown()实现:

  /** 
   * Represents a connection pool to a specific target host and pool configuration. 
   */ 
  final case class HostConnectionPool private[http] (setup: HostConnectionPoolSetup)( 
    private[http] val gateway: PoolGateway) { // enable test access 
 
    /** 
     * Asynchronously triggers the shutdown of the host connection pool. 
     * 
     * The produced [[scala.concurrent.Future]] is fulfilled when the shutdown has been completed. 
     */ 
    def shutdown()(implicit ec: ExecutionContextExecutor): Future[Done] = gateway.shutdown() 
 
    private[http] def toJava = new akka.http.javadsl.HostConnectionPool { 
      override def setup = HostConnectionPool.this.setup 
      override def shutdown(executor: ExecutionContextExecutor): CompletionStage[Done] = HostConnectionPool.this.shutdown()(executor).toJava 
    } 
  }

也可以通过Http().shutdownAllConnectionPools()一次性终止ActorSystem内所有线程池:

  /** 
   * Triggers an orderly shutdown of all host connections pools currently maintained by the [[akka.actor.ActorSystem]]. 
   * The returned future is completed when all pools that were live at the time of this method call 
   * have completed their shutdown process. 
   * 
   * If existing pool client flows are re-used or new ones materialized concurrently with or after this 
   * method call the respective connection pools will be restarted and not contribute to the returned future. 
   */ 
  def shutdownAllConnectionPools(): Future[Unit] = { 
    val shutdownCompletedPromise = Promise[Done]() 
    poolMasterActorRef ! ShutdownAll(shutdownCompletedPromise) 
    shutdownCompletedPromise.future.map(_ ⇒ ())(system.dispatcher) 
  }

我们用cachedHostConnectionPool获取一个client-flow实例:

Flow[(HttpRequest,T),(Try[HttpResponse],T),HostConnectionPool]后就可以进行输入HttpRequest到HttpResponse的转换处理。如下面的例子:

  val pooledFlow: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] = 
      Http().cachedHostConnectionPool[Int](host="akka.io",port=80) 
 
  def sendPoolRequest(req: HttpRequest, marker: Int) = { 
    Source.single(req -> marker) 
      .via(pooledFlow) 
      .runWith(Sink.head) 
  } 
 
  sendPoolRequest(HttpRequest(uri="/"), 1) 
    .andThen{ 
      case Success((tryResp, mk)) => 
        tryResp match { 
          case Success(resp) => println(s"got response: ${resp.status.intValue()}") 
          case Failure(err) => println(s"request failed: ${err.getMessage}") 
        } 
      case Failure(err) => println(s"request failed: ${err.getMessage}") 
    } 
    .andThen {case _ => sys.terminate()}

在以上这个例子里实际同样会遇到Connection-Level-Api所遇的的问题,这是因为获取的线程池内的线程还是有限的,只能缓解因为request速率超出response速率所造成的request积压。目前最有效的方法还是通过使用一个queue来暂存request后再逐个处理:

    val QueueSize = 10 
    // This idea came initially from this blog post: 
    // http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.html 
    val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io") 
    val queue = 
      Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew) 
        .via(poolClientFlow) 
        .toMat(Sink.foreach({ 
          case ((Success(resp), p)) => p.success(resp) 
          case ((Failure(e), p))    => p.failure(e) 
        }))(Keep.left) 
        .run() 
 
    def queueRequest(request: HttpRequest): Future[HttpResponse] = { 
      val responsePromise = Promise[HttpResponse]() 
      queue.offer(request -> responsePromise).flatMap { 
        case QueueOfferResult.Enqueued    => responsePromise.future 
        case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later.")) 
        case QueueOfferResult.Failure(ex) => Future.failed(ex) 
        case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later.")) 
      } 
    } 
 
    val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/")) 
    responseFuture.andThen { 
      case Success(resp) => println(s"got response: ${resp.status.intValue()}") 
      case Failure(err) => println(s"request failed: ${err.getMessage}") 
    }.andThen {case _ => sys.terminate()}

下面是本次Akka-http-client-side-connection讨论的示范源代码:

import akka.actor._ 
import akka.http.javadsl.{HostConnectionPool, OutgoingConnection} 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model._ 
 
import scala.concurrent._ 
import scala.util._ 
 
object ClientApiDemo extends App { 
  implicit val sys = ActorSystem("ClientSys") 
  implicit val mat = ActorMaterializer() 
  implicit val ec = sys.dispatcher 
/* 
  val connFlow: Flow[HttpRequest,HttpResponse,Future[Http.OutgoingConnection]] = 
    Http().outgoingConnection("www.sina.com") 
 
  def sendHttpRequest(req: HttpRequest) = { 
    Source.single(req) 
      .via(connFlow) 
      .runWith(Sink.head) 
  } 
 
  sendHttpRequest(HttpRequest(uri="/")) 
      .andThen{ 
        case Success(resp) => 
          //close connection by cancelling response entity 
          resp.entity.dataBytes.runWith(Sink.cancelled) 
          println(s"got response: ${resp.status.intValue()}") 
        case Failure(err) => println(s"request failed: ${err.getMessage}") 
      } 
  //   .andThen {case _ => sys.terminate()} 
 
 
 //close connection by receiving response with close header 
  Http().bindAndHandleSync( 
    { req ⇒ HttpResponse(headers = headers.Connection("close") :: Nil) }, 
    "akka.io", 
    80)(mat) 
 
  val pooledFlow: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] = 
      Http().cachedHostConnectionPool[Int](host="akka.io",port=80) 
 
  def sendPoolRequest(req: HttpRequest, marker: Int) = { 
    Source.single(req -> marker) 
      .via(pooledFlow) 
      .runWith(Sink.head) 
  } 
 
  sendPoolRequest(HttpRequest(uri="/"), 1) 
    .andThen{ 
      case Success((tryResp, mk)) => 
        tryResp match { 
          case Success(resp) => println(s"got response: ${resp.status.intValue()}") 
          case Failure(err) => println(s"request failed: ${err.getMessage}") 
        } 
      case Failure(err) => println(s"request failed: ${err.getMessage}") 
    } 
    .andThen {case _ => sys.terminate()} 
*/ 
 
    val QueueSize = 10 
    // This idea came initially from this blog post: 
    // http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.html 
    val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io") 
    val queue = 
      Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew) 
        .via(poolClientFlow) 
        .toMat(Sink.foreach({ 
          case ((Success(resp), p)) => p.success(resp) 
          case ((Failure(e), p))    => p.failure(e) 
        }))(Keep.left) 
        .run() 
 
    def queueRequest(request: HttpRequest): Future[HttpResponse] = { 
      val responsePromise = Promise[HttpResponse]() 
      queue.offer(request -> responsePromise).flatMap { 
        case QueueOfferResult.Enqueued    => responsePromise.future 
        case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later.")) 
        case QueueOfferResult.Failure(ex) => Future.failed(ex) 
        case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later.")) 
      } 
    } 
 
    val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/")) 
    responseFuture.andThen { 
      case Success(resp) => println(s"got response: ${resp.status.intValue()}") 
      case Failure(err) => println(s"request failed: ${err.getMessage}") 
    }.andThen {case _ => sys.terminate()} 
 
}

 

 

 

 

 

 

 

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/12825.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论