Akka(9): 分布式运算:Remoting-远程构建式详解编程语言

   上篇我们讨论了Akka-Remoting。我们说Akka-Remoting是一种点对点的通讯方式,能使两个不同JVM上Akka-ActorSystem上的两个Actor之间可以相互沟通。Akka-Remoting还没有实现完全的Actor位置透明(location transparency),因为一个Actor还必须在获得对方Actor确切地址信息后才能启动与之沟通过程。Akka-Remoting支持“远程查找”和“远程构建”两种沟通方式。由于篇幅所限,我们只介绍了“远程查找”。在这一篇里我们将会讨论“远程构建”方式。

同样,我们先通过项目结构来分析:

lazy val local = (project in file(".")) 
  .settings(commonSettings) 
  .settings( 
    name := "remoteCreateDemo" 
  ).aggregate(calculator,remote).dependsOn(calculator) 
 
lazy val calculator = (project in file("calculator")) 
  .settings(commonSettings) 
  .settings( 
    name := "calculator" 
  ) 
 
lazy val remote = (project in file("remote")) 
  .settings(commonSettings) 
  .settings( 
    name := "remoteSystem" 
  ).aggregate(calculator).dependsOn(calculator)

远程构建的过程大致是这样的:由local通知remote启动构建Actor;remote从本地库中查找Actor的类定义(class)并把它载入内存。由于驱动、使用远程Actor是在local进行的,所以local,remote项目还必须共享Calculator,包括Calculator的功能消息。这项要求我们在.sbt中用aggregate(calculator)来协同编译。

我们把Calculator的监管supervisor也包括在这个源码文件里。现在这个calculator是个包括监管、功能、消息的完整项目了。Calculator源代码如下:

package remoteCreation.calculator 
 
import akka.actor._ 
import scala.concurrent.duration._ 
 
object Calcultor { 
  sealed trait MathOps 
  case class Num(dnum: Double) extends MathOps 
  case class Add(dnum: Double) extends MathOps 
  case class Sub(dnum: Double) extends MathOps 
  case class Mul(dnum: Double) extends MathOps 
  case class Div(dnum: Double) extends MathOps 
 
  sealed trait CalcOps 
  case object Clear extends CalcOps 
  case object GetResult extends CalcOps 
 
  def props = Props(new Calcultor) 
  def supervisorProps = Props(new SupervisorActor) 
} 
 
class Calcultor extends Actor with ActorLogging { 
  import Calcultor._ 
 
  var result: Double = 0.0   //internal state 
 
  override def receive: Receive = { 
    case Num(d) => result = d 
    case Add(d) => result += d 
    case Sub(d) => result -= d 
    case Mul(d) => result *= d 
    case Div(d) => 
      val _ = result.toInt / d.toInt   //yield ArithmeticException 
      result /= d 
    case Clear => result = 0.0 
    case GetResult => 
      sender() ! s"Result of calculation is: $result" 
  } 
 
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
    log.info(s"Restarting calculator: ${reason.getMessage}") 
    super.preRestart(reason, message) 
  } 
} 
 
class SupervisorActor extends Actor { 
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { 
    case _: ArithmeticException => SupervisorStrategy.Resume 
  } 
 
  override def supervisorStrategy: SupervisorStrategy = 
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){ 
      decider.orElse(SupervisorStrategy.defaultDecider) 
    } 
 
  val calcActor = context.actorOf(Calcultor.props,"calculator") 
 
  override def receive: Receive = { 
    case msg@ _ => calcActor.forward(msg) 
  } 
 
}

与上一个例子的”远程查找式“相同,remote需要为Remoting公开一个端口。我们可以照搬.conf配置文件内容:remote/src/main/resources/application.conf

akka { 
  actor { 
    provider = remote 
  } 
  remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
      hostname = "127.0.0.1" 
      port = 2552 
    } 
    log-sent-messages = on 
    log-received-messages = on 
  } 
}

由于远程构建和使用是在local上进行的,在remote上我们只需要启动ActorSystem就行了:

import com.typesafe.config.ConfigFactory 
import akka.actor._ 
 
object CalculatorRunner extends App { 
  val remoteSystem = ActorSystem("remoteSystem",ConfigFactory.load("application")) 
  println("Remote system started.") 
 
  scala.io.StdIn.readLine() 
  remoteSystem.terminate() 
 
}

Calculator的构建是在localSystem上启动的,我们需要在配置文件中描述远程构建标的(还是未能实现位置透明):local/src/main/resources/application.conf 

akka { 
  actor { 
    provider = remote, 
    deployment { 
      "/calculator" { 
        remote = "akka.tcp:[email protected]:2552" 
      } 
    } 
  } 
  remote { 
    netty.tcp { 
      hostname = "127.0.0.1", 
      port=2554 
    } 
  } 
}

注意:上面这个/calculator设置实际上指的是SupervisorActor。

现在我们可以在local上开始构建calculator,然后使用它来运算了:

import akka.actor._ 
import remoteCreation.calculator.Calcultor._ 
import scala.concurrent.duration._ 
import akka.pattern._ 
 
object RemotingCreate extends App { 
  val localSystem = ActorSystem("localSystem") 
  val calcActor = localSystem.actorOf(props, 
    name = "calculator")   //created SupervisorActor 
 
  import localSystem.dispatcher 
 
  calcActor ! Clear 
  calcActor ! Num(13.0) 
  calcActor ! Mul(1.5) 
 
  implicit val timeout = akka.util.Timeout(1 second) 
 
  ((calcActor ? GetResult).mapTo[String]) foreach println 
  scala.io.StdIn.readLine() 
 
  calcActor ! Div(0.0) 
  calcActor ! Div(1.5) 
  calcActor ! Add(100.0) 
  ((calcActor ? GetResult).mapTo[String]) foreach println 
 
  scala.io.StdIn.readLine() 
  localSystem.terminate() 
 
}

从代码上看构建calculator(SupervisorActor)过程与普通的Actor构建没分别,所有细节都放在配置文件里了。但是,要注意actorOf的name必须与配置文档中的设置匹配。

试运行结果与上一个例子相同。值得注意的是实际远程构建的是一个SupervisorActor。Calculator的构建是SupervisorActor构建的其中一部分。从运算结果看:这个SupervisorActor也实现了它的功能。

下面是这次示范的源代码:

 local/build.sbt

azy val commonSettings = seq ( 
  name := "RemoteCreateDemo", 
  version := "1.0", 
  scalaVersion := "2.11.8", 
  libraryDependencies := Seq( 
    "com.typesafe.akka" %% "akka-actor" % "2.5.2", 
    "com.typesafe.akka" %% "akka-remote" % "2.5.2" 
  ) 
) 
 
 
lazy val local = (project in file(".")) 
  .settings(commonSettings) 
  .settings( 
    name := "remoteCreateDemo" 
  ).aggregate(calculator).dependsOn(calculator) 
 
lazy val calculator = (project in file("calculator")) 
  .settings(commonSettings) 
  .settings( 
    name := "calculator" 
  ) 
 
lazy val remote = (project in file("remote")) 
  .settings(commonSettings) 
  .settings( 
    name := "remoteSystem" 
  ).aggregate(calculator).dependsOn(calculator)

calculator/calculator.scala

package remoteCreation.calculator 
 
import akka.actor._ 
import scala.concurrent.duration._ 
 
object Calcultor { 
  sealed trait MathOps 
  case class Num(dnum: Double) extends MathOps 
  case class Add(dnum: Double) extends MathOps 
  case class Sub(dnum: Double) extends MathOps 
  case class Mul(dnum: Double) extends MathOps 
  case class Div(dnum: Double) extends MathOps 
 
  sealed trait CalcOps 
  case object Clear extends CalcOps 
  case object GetResult extends CalcOps 
 
  def props = Props(new Calcultor) 
  def supervisorProps = Props(new SupervisorActor) 
} 
 
class Calcultor extends Actor with ActorLogging { 
  import Calcultor._ 
 
  var result: Double = 0.0   //internal state 
 
  override def receive: Receive = { 
    case Num(d) => result = d 
    case Add(d) => result += d 
    case Sub(d) => result -= d 
    case Mul(d) => result *= d 
    case Div(d) => 
      val _ = result.toInt / d.toInt   //yield ArithmeticException 
      result /= d 
    case Clear => result = 0.0 
    case GetResult => 
      sender() ! s"Result of calculation is: $result" 
  } 
 
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
    log.info(s"Restarting calculator: ${reason.getMessage}") 
    super.preRestart(reason, message) 
  } 
} 
 
class SupervisorActor extends Actor { 
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { 
    case _: ArithmeticException => SupervisorStrategy.Resume 
  } 
 
  override def supervisorStrategy: SupervisorStrategy = 
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){ 
      decider.orElse(SupervisorStrategy.defaultDecider) 
    } 
 
  val calcActor = context.actorOf(Calcultor.props,"calculator") 
 
  override def receive: Receive = { 
    case msg@ _ => calcActor.forward(msg) 
  } 
 
}

remote/src/main/resources/application.conf

akka { 
  actor { 
    provider = remote 
  } 
  remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
      hostname = "127.0.0.1" 
      port = 2552 
    } 
    log-sent-messages = on 
    log-received-messages = on 
  } 
}

remote/CalculatorRunner.scala

package remoteCreation.remote 
import com.typesafe.config.ConfigFactory 
import akka.actor._ 
 
object CalculatorRunner extends App { 
  val remoteSystem = ActorSystem("remoteSystem",ConfigFactory.load("application")) 
  println("Remote system started.") 
 
  scala.io.StdIn.readLine() 
  remoteSystem.terminate() 
 
}

local/src/main/resources/application.conf

akka { 
  actor { 
    provider = remote, 
    deployment { 
      "/calculator" { 
        remote = "akka.tcp:[email protected]:2552" 
      } 
    } 
  } 
  remote { 
    netty.tcp { 
      hostname = "127.0.0.1", 
      port=2554 
    } 
  } 
}

local/RemotingCreation.scala

import akka.actor._ 
import remoteCreation.calculator.Calcultor._ 
import scala.concurrent.duration._ 
import akka.pattern._ 
 
object RemotingCreate extends App { 
  val localSystem = ActorSystem("localSystem") 
  val calcActor = localSystem.actorOf(props, 
    name = "calculator")  //created SupervisorActor 
 
  import localSystem.dispatcher 
 
  calcActor ! Clear 
  calcActor ! Num(13.0) 
  calcActor ! Mul(1.5) 
 
  implicit val timeout = akka.util.Timeout(1 second) 
 
  ((calcActor ? GetResult).mapTo[String]) foreach println 
  scala.io.StdIn.readLine() 
 
  calcActor ! Div(0.0) 
  calcActor ! Div(1.5) 
  calcActor ! Add(100.0) 
  ((calcActor ? GetResult).mapTo[String]) foreach println 
 
  scala.io.StdIn.readLine() 
  localSystem.terminate() 
 
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12852.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论