Akka(42): Http:身份验证 – authentication, authorization and use of raw headers详解编程语言

   当我们把Akka-http作为数据库数据交换工具时,数据是以Source[ROW,_]形式存放在Entity里的。很多时候除数据之外我们可能需要进行一些附加的信息传递如对数据的具体处理方式等。我们可以通过Akka-http的raw-header来实现附加自定义消息的传递,这项功能可以通过Akka-http提供的raw-header筛选功能来实现。在客户端我们把附加消息放在HttpRequest的raw header里,如下:

  import akka.http.scaladsl.model.headers._ 
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows") 
                   .addHeader(RawHeader("action","insert:county"))

在这里客户端注明上传数据应插入county表。服务端可以像下面这样获取这项信息:

             optionalHeaderValueByName("action") { 
                case Some(action) => 
                  entity(asSourceOf[County]) { source => 
                    val futofNames: Future[List[String]] = 
                      source.runFold(List[String](""))((acc, b) => acc ++ List(b.name)) 
                    complete(s"Received rows for $action") 
                  } 
                case None => complete ("No action specified!") 
              }

Akka-http通过Credential类的Directive提供了authentication和authorization。在客户端可以用下面的方法提供自己的用户身份信息:

  import akka.http.scaladsl.model.headers._ 
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows") 
    .addHeader(RawHeader("action","insert:county")) 
    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))

服务端对客户端的身份验证处理方法如下:

  import akka.http.scaladsl.server.directives.Credentials 
  def myUserPassAuthenticator(credentials: Credentials): Future[Option[User]] = { 
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher") 
    credentials match { 
      case p @ Credentials.Provided(id) => 
        Future { 
          // potentially 
          if (p.verify("p4ssw0rd")) Some(User(id)) 
          else None 
        } 
      case _ => Future.successful(None) 
    } 
  } 
 
  case class User(name: String) 
  val validUsers = Set("john","peter","tiger","susan") 
  def hasAdminPermissions(user: User): Future[Boolean] = { 
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher") 
    Future.successful(validUsers.contains(user.name)) 
  }

下面是Credential-Directive的使用方法:

         authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user => 
            authorizeAsync(_ => hasPermissions(user)) { 
              withoutSizeLimit { 
                handleExceptions(postExceptionHandler) { 
                  optionalHeaderValueByName("action") { 
                    case Some(action) => 
                      entity(asSourceOf[County]) { source => 
                        val futofNames: Future[List[String]] = 
                          source.runFold(List[String](""))((acc, b) => acc ++ List(b.name)) 
                        complete(s"Received rows for $action sent from $user") 
                      } 
                    case None => complete(s"$user did not specify action for uploaded rows!") 
                  } 
                } 
              } 
            } 
          }

下面是本次讨论的示范代码:

客户端:

import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.Http 
import scala.util._ 
import akka._ 
import akka.http.scaladsl.common._ 
import spray.json.DefaultJsonProtocol 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport 
import akka.http.scaladsl.common.EntityStreamingSupport 
import akka.http.scaladsl.model._ 
import spray.json._ 
 
trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol 
object Converters extends MyFormats { 
  case class County(id: Int, name: String) 
  implicit val countyFormat = jsonFormat2(County) 
} 
 
object HttpClientDemo extends App { 
  import Converters._ 
 
  implicit val sys = ActorSystem("ClientSys") 
  implicit val mat = ActorMaterializer() 
  implicit val ec = sys.dispatcher 
 
  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() 
 
  import akka.util.ByteString 
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource 
 
  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")} 
  def countyToByteString(c: County) = { 
    ByteString(c.toJson.toString) 
  } 
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString) 
 
  val rowBytes = limitableByteSource(source via flowCountyToByteString) 
 
  import akka.http.scaladsl.model.headers._ 
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows") 
    .addHeader(RawHeader("action","insert:county")) 
    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd")) 
 
  val data = HttpEntity( 
    ContentTypes.`application/json`, 
    rowBytes 
  ) 
 
  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = { 
    val futResp = Http(sys).singleRequest( 
      request.copy(entity = dataEntity) 
    ) 
    futResp 
      .andThen { 
        case Success([email protected](StatusCodes.OK, _, entity, _)) => 
          entity.dataBytes.map(_.utf8String).runForeach(println) 
        case Success([email protected](code, _, _, _)) => 
          println(s"Upload request failed, response code: $code") 
          r.discardEntityBytes() 
        case Success(_) => println("Unable to Upload file!") 
        case Failure(err) => println(s"Upload failed: ${err.getMessage}") 
 
      } 
  } 
 
 
  uploadRows(request,data) 
 
  scala.io.StdIn.readLine() 
 
  sys.terminate() 
 
}

服务端:

import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.Http 
import akka._ 
import akka.http.scaladsl.common._ 
import spray.json.DefaultJsonProtocol 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport 
import scala.concurrent._ 
import akka.http.scaladsl.server._ 
import akka.http.scaladsl.server.Directives._ 
import akka.http.scaladsl.model._ 
 
trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol 
object Converters extends MyFormats { 
  case class County(id: Int, name: String) 
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") } 
  implicit val countyFormat = jsonFormat2(County) 
} 
 
object HttpServerDemo extends App { 
 
  import Converters._ 
 
  implicit val httpSys = ActorSystem("httpSystem") 
  implicit val httpMat = ActorMaterializer() 
  implicit val httpEC = httpSys.dispatcher 
 
 
  implicit val jsonStreamingSupport = EntityStreamingSupport.json() 
    .withParallelMarshalling(parallelism = 8, unordered = false) 
 
  def postExceptionHandler: ExceptionHandler = 
    ExceptionHandler { 
      case _: RuntimeException => 
        extractRequest { req => 
          req.discardEntityBytes() 
          complete((StatusCodes.InternalServerError.intValue, "Upload Failed!")) 
        } 
    } 
 
  import akka.http.scaladsl.server.directives.Credentials 
  def userPassAuthenticator(credentials: Credentials): Future[Option[User]] = { 
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher") 
    credentials match { 
      case p @ Credentials.Provided(id) => 
        Future { 
          // potentially 
          if (p.verify("p4ssw0rd")) Some(User(id)) 
          else None 
        } 
      case _ => Future.successful(None) 
    } 
  } 
 
  case class User(name: String) 
  val validUsers = Set("john","peter","tiger","susan") 
  def hasPermissions(user: User): Future[Boolean] = { 
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher") 
    Future.successful(validUsers.contains(user.name)) 
  } 
 
  val route = 
    path("rows") { 
      get { 
        complete { 
          source 
        } 
      } ~ 
        post { 
          authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user => 
            authorizeAsync(_ => hasPermissions(user)) { 
              withoutSizeLimit { 
                handleExceptions(postExceptionHandler) { 
                  optionalHeaderValueByName("action") { 
                    case Some(action) => 
                      entity(asSourceOf[County]) { source => 
                        val futofNames: Future[List[String]] = 
                          source.runFold(List[String](""))((acc, b) => acc ++ List(b.name)) 
                        complete(s"Received rows for $action sent from $user") 
                      } 
                    case None => complete(s"$user did not specify action for uploaded rows!") 
                  } 
                } 
              } 
            } 
          } 
        } 
    } 
 
  val (port, host) = (8011,"localhost") 
 
  val bindingFuture = Http().bindAndHandle(route,host,port) 
 
  println(s"Server running at $host $port. Press any key to exit ...") 
 
  scala.io.StdIn.readLine() 
 
  bindingFuture.flatMap(_.unbind()) 
    .onComplete(_ => httpSys.terminate()) 
 
}

 

 

 

 

 

 

 

 

 

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/12819.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论