Akka(30): Http:High-Level-Api,Routing DSL详解编程语言

  在上篇我们介绍了Akka-http Low-Level-Api。实际上这个Api提供了Server对进来的Http-requests进行处理及反应的自定义Flow或者转换函数的接入界面。我们看看下面官方文档给出的例子:

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.HttpMethods._ 
import akka.http.scaladsl.model._ 
import akka.stream.ActorMaterializer 
import scala.io.StdIn 
 
object WebServer { 
 
  def main(args: Array[String]) { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    // needed for the future map/flatmap in the end 
    implicit val executionContext = system.dispatcher 
 
    val requestHandler: HttpRequest => HttpResponse = { 
      case HttpRequest(GET, Uri.Path("/"), _, _, _) => 
        HttpResponse(entity = HttpEntity( 
          ContentTypes.`text/html(UTF-8)`, 
          "<html><body>Hello world!</body></html>")) 
 
      case HttpRequest(GET, Uri.Path("/ping"), _, _, _) => 
        HttpResponse(entity = "PONG!") 
 
      case HttpRequest(GET, Uri.Path("/crash"), _, _, _) => 
        sys.error("BOOM!") 
 
      case r: HttpRequest => 
        r.discardEntityBytes() // important to drain incoming HTTP Entity stream 
        HttpResponse(404, entity = "Unknown resource!") 
    } 
 
    val bindingFuture = Http().bindAndHandleSync(requestHandler, "localhost", 8080) 
    println(s"Server online at http://localhost:8080//nPress RETURN to stop...") 
    StdIn.readLine() // let it run until user presses return 
    bindingFuture 
      .flatMap(_.unbind()) // trigger unbinding from the port 
      .onComplete(_ => system.terminate()) // and shutdown when done 
  } 
}

  我们看到上面例子里的requestHandler函数用模式匹配方式对可能收到的HttpRequest进行了相关HttpResponse的对应。在对应的过程中可能还会按request要求进行一些Server端的运算作为例如Rest-Api这样的服务。不过对于大型的服务,模式匹配方式就会显得篇幅臃肿及模式僵化。Akka-http提供了一套routing DSL作为High-Level-Api的主要组成部分。用routing DSL代替Low-Level-Api的模式匹配方式可以更简练的编制HttpRequest到HttpResponse的转换服务,能更灵活高效的实现现代大型Rest-Api服务。routing DSL实现Rest-Api服务的方式是通过构建一个由组件Directives组合而成的多个多层三明治结构Route。Route是一个类型:

  type Route = RequestContext ⇒ Future[RouteResult]

下面是个Route例子:

    val route: Flow[HttpRequest, HttpResponse, NotUsed]= 
      get { 
        pathSingleSlash { 
          complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,"<html><body>Hello world!</body></html>")) 
        } ~ 
          path("ping") { 
            complete("PONG!") 
          } ~ 
          path("crash") { 
            sys.error("BOOM!") 
          } 
      }

在上期讨论的例子里我们可以这样使用route:

  val futBinding: Future[Http.ServerBinding] = 
    connSource.to { Sink.foreach{ connection => 
      println(s"client address ${connection.remoteAddress}") 
      //    connection handleWith flow 
      //    connection handleWithSyncHandler syncHandler 
      //connection handleWithAsyncHandler asyncHandler 
      connection handleWith route 
    }}.run()

handleWith(flow)的参数应该是Flow[HttpRequest,HttpResponse,_]才对呀?这个我们先看看RouteResult对象: 

/** 
 * The result of handling a request. 
 * 
 * As a user you typically don't create RouteResult instances directly. 
 * Instead, use the methods on the [[RequestContext]] to achieve the desired effect. 
 */ 
sealed trait RouteResult extends javadsl.server.RouteResult 
 
object RouteResult { 
  final case class Complete(response: HttpResponse) extends javadsl.server.Complete with RouteResult { 
    override def getResponse = response 
  } 
  final case class Rejected(rejections: immutable.Seq[Rejection]) extends javadsl.server.Rejected with RouteResult { 
    override def getRejections = rejections.map(r ⇒ r: javadsl.server.Rejection).toIterable.asJava 
  } 
 
  implicit def route2HandlerFlow(route: Route)(implicit 
    routingSettings: RoutingSettings, 
                                               parserSettings:   ParserSettings, 
                                               materializer:     Materializer, 
                                               routingLog:       RoutingLog, 
                                               executionContext: ExecutionContext = null, 
                                               rejectionHandler: RejectionHandler = RejectionHandler.default, 
                                               exceptionHandler: ExceptionHandler = null): Flow[HttpRequest, HttpResponse, NotUsed] = 
    Route.handlerFlow(route) 
}

这里有个隐式转换route2HandlerFlow把Route转换成Flow[HttpRequest,HttpResponse,NotUsed],问题解决了。

从type Route=RequestContext => Future[RouteResult]可以看到:Route就是一个把RequestContext转换成Future[RouteResult]的函数。RequestContext实质上封装了个Request以及对Request进行操作的环境、配置和工具:

/** 
 * This class is not meant to be extended by user code. 
 * 
 * Immutable object encapsulating the context of an [[akka.http.scaladsl.model.HttpRequest]] 
 * as it flows through a akka-http Route structure. 
 */ 
@DoNotInherit 
trait RequestContext { 
 
  /** The request this context represents. Modelled as a `val` so as to enable an `import ctx.request._`. */ 
  val request: HttpRequest 
 
  /** The unmatched path of this context. Modelled as a `val` so as to enable an `import ctx.unmatchedPath._`. */ 
  val unmatchedPath: Uri.Path 
 
  /** 
   * The default ExecutionContext to be used for scheduling asynchronous logic related to this request. 
   */ 
  implicit def executionContext: ExecutionContextExecutor 
... 
}

Route是一种可组合组件。我们可以用简单的Route组合成更多层次的Route。下面是组合Route的几种方式:

1、Route转化:对输入的request,输出的response进行转化处理后把实际运算托付给下一层内部(inner)Route

2、筛选Route:只容许符合某种条件的Route通过并拒绝其它不符合条件的Route

3、链接Route:假如一个Route被拒绝,尝试下一个Route。这个是通过 ~ 操作符号实现的

在Akka-http的routing DSL里这些Route组合操作是通过Directive实现的。Akka-http提供了大量现成的Directive,我们也可以自定义一些特殊功能的Directive,详情可以查询官方文件或者api文件。

Directive的表达形式如下:

dirname(arguments) { extractions => 
  ... // 内层inner route 
}

下面是Directive的一些用例: 

下面的三个route效果相等:

val route: Route = { ctx => 
  if (ctx.request.method == HttpMethods.GET) 
    ctx.complete("Received GET") 
  else 
    ctx.complete("Received something else") 
} 
 
val route = 
  get { 
    complete("Received GET") 
  } ~ 
  complete("Received something else") 
   
val route = 
  get { ctx => 
    ctx.complete("Received GET") 
  } ~ 
  complete("Received something else")

下面列出一些Directive的组合例子:

val route: Route = 
  path("order" / IntNumber) { id => 
    get { 
      complete { 
        "Received GET request for order " + id 
      } 
    } ~ 
    put { 
      complete { 
        "Received PUT request for order " + id 
      } 
    } 
  } 
 
def innerRoute(id: Int): Route = 
  get { 
    complete { 
      "Received GET request for order " + id 
    } 
  } ~ 
  put { 
    complete { 
      "Received PUT request for order " + id 
    } 
  } 
val route: Route = path("order" / IntNumber) { id => innerRoute(id) } 
 
val route = 
  path("order" / IntNumber) { id => 
    (get | put) { ctx => 
      ctx.complete(s"Received ${ctx.request.method.name} request for order $id") 
    } 
  } 
 
val route = 
  path("order" / IntNumber) { id => 
    (get | put) { 
      extractMethod { m => 
        complete(s"Received ${m.name} request for order $id") 
      } 
    } 
  } 
 
val getOrPut = get | put 
val route = 
  path("order" / IntNumber) { id => 
    getOrPut { 
      extractMethod { m => 
        complete(s"Received ${m.name} request for order $id") 
      } 
    } 
  } 
 
val route = 
  (path("order" / IntNumber) & getOrPut & extractMethod) { (id, m) => 
    complete(s"Received ${m.name} request for order $id") 
  } 
 
val orderGetOrPutWithMethod = 
  path("order" / IntNumber) & (get | put) & extractMethod 
val route = 
  orderGetOrPutWithMethod { (id, m) => 
    complete(s"Received ${m.name} request for order $id") 
  }

上面例子里的~ & | 定义如下:

object RouteConcatenation extends RouteConcatenation { 
 
  class RouteWithConcatenation(route: Route) { 
    /** 
     * Returns a Route that chains two Routes. If the first Route rejects the request the second route is given a 
     * chance to act upon the request. 
     */ 
    def ~(other: Route): Route = { ctx ⇒ 
      import ctx.executionContext 
      route(ctx).fast.flatMap { 
        case x: RouteResult.Complete ⇒ FastFuture.successful(x) 
        case RouteResult.Rejected(outerRejections) ⇒ 
          other(ctx).fast.map { 
            case x: RouteResult.Complete               ⇒ x 
            case RouteResult.Rejected(innerRejections) ⇒ RouteResult.Rejected(outerRejections ++ innerRejections) 
          } 
      } 
    } 
  } 
} 
 
  /** 
   * Joins two directives into one which runs the second directive if the first one rejects. 
   */ 
  def |[R >: L](that: Directive[R]): Directive[R] = 
    recover(rejections ⇒ directives.BasicDirectives.mapRejections(rejections ++ _) & that)(that.ev) 
 
  /** 
   * Joins two directives into one which extracts the concatenation of its base directive extractions. 
   * NOTE: Extraction joining is an O(N) operation with N being the number of extractions on the right-side. 
   */ 
  def &(magnet: ConjunctionMagnet[L]): magnet.Out = magnet(this)

我们可以从上面这些示范例子得出结论:Directive的组合能力是routing DSL的核心。来看看Directive的组合能力是如何实现的。Directive类定义如下:

//#basic 
abstract class Directive[L](implicit val ev: Tuple[L]) { 
 
  /** 
   * Calls the inner route with a tuple of extracted values of type `L`. 
   * 
   * `tapply` is short for "tuple-apply". Usually, you will use the regular `apply` method instead, 
   * which is added by an implicit conversion (see `Directive.addDirectiveApply`). 
   */ 
  def tapply(f: L ⇒ Route): Route 
  ... 
} 
  /** 
   * Constructs a directive from a function literal. 
   */ 
  def apply[T: Tuple](f: (T ⇒ Route) ⇒ Route): Directive[T] = 
    new Directive[T] { def tapply(inner: T ⇒ Route) = f(inner) } 
 
  /** 
   * A Directive that always passes the request on to its inner route (i.e. does nothing). 
   */ 
  val Empty: Directive0 = Directive(_(())) 
... 
  implicit class SingleValueModifiers[T](underlying: Directive1[T]) extends AnyRef { 
    def map[R](f: T ⇒ R)(implicit tupler: Tupler[R]): Directive[tupler.Out] = 
      underlying.tmap { case Tuple1(value) ⇒ f(value) } 
 
    def flatMap[R: Tuple](f: T ⇒ Directive[R]): Directive[R] = 
      underlying.tflatMap { case Tuple1(value) ⇒ f(value) } 
 
    def require(predicate: T ⇒ Boolean, rejections: Rejection*): Directive0 = 
      underlying.filter(predicate, rejections: _*).tflatMap(_ ⇒ Empty) 
 
    def filter(predicate: T ⇒ Boolean, rejections: Rejection*): Directive1[T] = 
      underlying.tfilter({ case Tuple1(value) ⇒ predicate(value) }, rejections: _*) 
  } 
}

注意implicit ev: Tuple[L]是给compiler的证例,它要求Tuple[L]存在于可视域。Akka-http提供了所有22个TupleXX[L]的隐形实例。再注意implicit class singleValueModifiers[T]:它提供了多层Directive的自动展平,能够实现下面的自动转换结果:

Directive1[T] = Directive[Tuple1[T]] 
Directive1[Tuple2[M,N]] = Directive[Tuple1[Tuple2[M,N]]] = Directive[Tuple2[M,N]] 
Directive1[Tuple3[M,N,G]] = ... = Directive[Tuple3[M,N,G]] 
Directive1[Tuple4[M1,M2,M3,M4]] = ... = Directive[Tuple4[M1,M2,M3,M4]] 
... 
Directive1[Unit] = Directive0

Directive1,Directive0:

  type Directive0 = Directive[Unit] 
  type Directive1[T] = Directive[Tuple1[T]]

下面是这几种Directive的使用模式:

 

  dirname { route }                  //Directive0 
  dirname[L] { L => route }          //Directive1[L] 
  dirname[T] { (T1,T2...) => route}  //Directive[T]

 

任何类型值到Tuple的自动转换是通过Tupler类实现的:

/** 
 * Provides a way to convert a value into an Tuple. 
 * If the value is already a Tuple then it is returned unchanged, otherwise it's wrapped in a Tuple1 instance. 
 */ 
trait Tupler[T] { 
  type Out 
  def OutIsTuple: Tuple[Out] 
  def apply(value: T): Out 
} 
 
object Tupler extends LowerPriorityTupler { 
  implicit def forTuple[T: Tuple]: Tupler[T] { type Out = T } = 
    new Tupler[T] { 
      type Out = T 
      def OutIsTuple = implicitly[Tuple[Out]] 
      def apply(value: T) = value 
    } 
} 
 
private[server] abstract class LowerPriorityTupler { 
  implicit def forAnyRef[T]: Tupler[T] { type Out = Tuple1[T] } = 
    new Tupler[T] { 
      type Out = Tuple1[T] 
      def OutIsTuple = implicitly[Tuple[Out]] 
      def apply(value: T) = Tuple1(value) 
    } 
}

我的理解是:Route里Directive的主要功能可以分成两部分:一是如程序菜单拣选,二是对Request,Response,Entity的读写。我们把第二项功能放在以后的讨论里,下面就提供一些RestApi的菜单拣选样例:

trait UsersApi extends JsonMappings{ 
  val usersApi = 
    (path("users") & get ) { 
       complete (UsersDao.findAll.map(_.toJson)) 
    }~ 
    (path("users"/IntNumber) & get) { id => 
        complete (UsersDao.findById(id).map(_.toJson)) 
    }~ 
    (path("users") & post) { entity(as[User]) { user => 
        complete (UsersDao.create(user).map(_.toJson)) 
      } 
    }~ 
    (path("users"/IntNumber) & put) { id => entity(as[User]) { user => 
        complete (UsersDao.update(user, id).map(_.toJson)) 
      } 
    }~ 
    (path("users"/IntNumber) & delete) { userId => 
      complete (UsersDao.delete(userId).map(_.toJson)) 
    } 
} 
 
trait CommentsApi extends JsonMappings{ 
  val commentsApi = 
    (path("users"/IntNumber/"posts"/IntNumber/"comments") & get ) {(userId, postId) => 
       complete (CommentsDao.findAll(userId, postId).map(_.toJson)) 
    }~ 
      (path("users"/IntNumber/"posts"/IntNumber/"comments"/IntNumber) & get) { (userId, postId, commentId) => 
        complete (CommentsDao.findById(userId, postId, commentId).map(_.toJson)) 
    }~ 
      (path("comments") & post) { entity(as[Comment]) { comment => 
        complete (CommentsDao.create(comment).map(_.toJson)) 
      } 
    }~ 
      (path("users"/IntNumber/"posts"/IntNumber/"comments"/IntNumber) & put) { (userId, postId, commentId) => entity(as[Comment]) { comment => 
        complete (CommentsDao.update(comment, commentId).map(_.toJson)) 
      } 
    }~ 
      (path("comments"/IntNumber) & delete) { commentId => 
        complete (CommentsDao.delete(commentId).map(_.toJson)) 
    } 
} 
 
trait PostsApi extends JsonMappings{ 
  val postsApi = 
    (path("users"/IntNumber/"posts") & get){ userId => 
      complete (PostsDao.findUserPosts(userId).map(_.toJson)) 
    }~ 
    (path("users"/IntNumber/"posts"/IntNumber) & get) { (userId,postId) => 
      complete (PostsDao.findByUserIdAndId(userId, postId).map(_.toJson)) 
    }~ 
    (path("users"/IntNumber/"posts") & post) { userId => entity(as[Post]) { post => 
      complete (PostsDao.create(post).map(_.toJson)) 
    }}~ 
    (path("users"/IntNumber/"posts"/IntNumber) & put) { (userId, id) => entity(as[Post]) { post => 
      complete (PostsDao.update(post, id).map(_.toJson)) 
    }}~ 
    (path("users"/IntNumber/"posts"/IntNumber) & delete) { (userId, postId) => 
      complete (PostsDao.delete(postId).map(_.toJson)) 
    } 
} 
 
  val routes = 
    pathPrefix("v1") { 
      usersApi ~ 
      postsApi ~ 
      commentsApi 
    } ~ path("")(getFromResource("public/index.html"))

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/12831.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论