  Akka是一种消息驱动运算模式,它实现跨JVM程序运算的方式是通过能跨JVM的消息系统来调动分布在不同JVM上ActorSystem中的Actor进行运算,前题是Akka的地址系统可以支持跨JVM定位。Akka的消息系统最高境界可以实现所谓的Actor位置透明化,这样在Akka编程中就无须关注Actor具体在哪个JVM上运行,分布式Actor编程从方式上跟普通Actor编程就不会有什么区别了。Akka的Remoting是一种点对点的跨JVM消息通道,让一个JVM上ActorSystem中的某个Actor可以连接另一个JVM上ActorSystem中的另一个Actor。两个JVM上的ActorSystem之间只需具备TCP网络连接功能就可以实现Akka Remoting了。Akka-Remoting还没有实现完全的位置透明化,因为用户还必须在代码里或者配置文件里指明目标Actor的具体地址。




Akka-Remoting的主要应用应该是把一些任务部署到远程机上去运算。发起方(Local JVM)在这里面的主要作用是任务分配,有点像Akka-Router。我们可以用下面的例子来示范:模拟一个计算器,可以进行连续的加减乘除,保留累计结果。我们会把这个计算器部署到远程机上,然后从本机与之沟通分配运算任务及获取运算结果。这个计算器就是个简单的Actor:

import akka.actor._ 
object Calculator { 
  sealed trait MathOps 
  case class Num(dnum: Double) extends MathOps 
  case class Add(dnum: Double) extends MathOps 
  case class Sub(dnum: Double) extends MathOps 
  case class Mul(dnum: Double) extends MathOps 
  case class Div(dnum: Double) extends MathOps 
  sealed trait CalcOps 
  case object Clear extends CalcOps 
  case object GetResult extends CalcOps 
class Calcultor extends Actor { 
  import Calculator._ 
  var result: Double = 0.0   //internal state 
  override def receive: Receive = { 
    case Num(d) => result = d 
    case Add(d) => result += d 
    case Sub(d) => result -= d 
    case Mul(d) => result *= d 
    case Div(d) => result = result / d 
    case Clear => result = 0.0 
    case GetResult => 
      sender() ! s"Result of calculation is: $result" 


下面我们会在一个远程机上部署这个Calculator Actor。 先看看这个示范的项目结构:remoteLookup/build.sbt

lazy val commonSettings = seq ( 
  name := "RemoteLookupDemo", 
  version := "1.0", 
  scalaVersion := "2.11.8", 
  libraryDependencies := Seq( 
    "com.typesafe.akka" %% "akka-actor" % "2.5.2", 
    "com.typesafe.akka" %% "akka-remote" % "2.5.2" 
lazy val local = (project in file(".")) 
       name := "localSystem" 
lazy val messages = (project in file("messages")) 
      name := "commands" 
lazy val remote = (project in file("remote")) 
      name := "remoteSystem" 


package remoteLookup.messages 
object Messages { 
  sealed trait MathOps 
  case class Num(dnum: Double) extends MathOps 
  case class Add(dnum: Double) extends MathOps 
  case class Sub(dnum: Double) extends MathOps 
  case class Mul(dnum: Double) extends MathOps 
  case class Div(dnum: Double) extends MathOps 
  sealed trait CalcOps 
  case object Clear extends CalcOps 
  case object GetResult extends CalcOps 

我们看到:这个文件是把上面的Calculator支持的消息拆了出来。这是因为Calculator Actor会在另一个JVM remote上部署,而我们会从local JVM里向Calculator发送操作消息,所以Messages必须是local和remote共享的。这个要求我们通过dependOn(messages)实现了。现在Calculator是在remote项目里定义的:remote/Calculator.scala

package remoteLookup.remote 
import akka.actor._ 
import remoteLookup.messages.Messages._ 
object CalcProps { 
  def props = Props(new Calcultor) 
class Calcultor extends Actor with ActorLogging { 
  var result: Double = 0.0   //internal state 
  override def receive: Receive = { 
    case Num(d) => result = d 
    case Add(d) => result += d 
    case Sub(d) => result -= d 
    case Mul(d) => result *= d 
    case Div(d) => 
      val _ = result.toInt / d.toInt   //yield ArithmeticException 
      result /= d 
    case Clear => result = 0.0 
    case GetResult => 
      sender() ! s"Result of calculation is: $result" 
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
    log.info(s"Restarting calculator: ${reason.getMessage}") 
    super.preRestart(reason, message) 



package remoteLookup.remote 
import akka.actor._ 
import akka.pattern._ 
import remoteLookup.messages.Messages._ 
import scala.concurrent.duration._ 
class SupervisorActor extends Actor { 
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { 
    case _: ArithmeticException => SupervisorStrategy.Resume 
  override def supervisorStrategy: SupervisorStrategy = 
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){ 
  val calcActor = context.actorOf(CalcProps.props,"calculator") 
  override def receive: Receive = { 
    case msg@ _ => calcActor.forward(msg) 
object CalculatorRunner extends App { 
  val remoteSystem = ActorSystem("remoteSystem") 
  val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor") 
  import remoteSystem.dispatcher 
  calcActor ! Clear 
  calcActor ! Num(13.0) 
  calcActor ! Mul(1.5) 
  implicit val timeout = akka.util.Timeout(1 second) 
  ((calcActor ? GetResult).mapTo[String]) foreach println 
  calcActor ! Div(0.0) 
  calcActor ! Div(1.5) 
  calcActor ! Add(100.0) 
  ((calcActor ? GetResult).mapTo[String]) foreach println 


Result of calculation is: 19.5 
Result of calculation is: 113.0 
[WARN] [06/20/2017 19:28:10.720] [remoteSystem-akka.actor.default-dispatcher-4] [akka://remoteSystem/user/parentActor/calculator] / by zero



akka { 
  actor { 
    provider = remote  
  remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
      hostname = "" 
      port = 2552 
    log-sent-messages = on 
    log-received-messages = on 

上面这段的意思是:所有向外公开Actor的地址前缀为:akka.tcp:[email protected]:2552/user/???

那么Calculator的完整地址path应该就是:akka.tcp:[email protected]:2552/user/supervisorActor/calculator

Akka-Remoting提供了两种远程查找方式:actorSelection.resolveOne方法和Identify消息确认。无论如何,local都需要进行Remoting配置: local/src/main/resources/application.conf

akka { 
  actor { 
    provider = remote 
  remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
      hostname = "" 
      port = 0 


   * Resolve the [[ActorRef]] matching this selection. 
   * The result is returned as a Future that is completed with the [[ActorRef]] 
   * if such an actor exists. It is completed with failure [[ActorNotFound]] if 
   * no such actor exists or the identification didn't complete within the 
   * supplied `timeout`. 
   * Under the hood it talks to the actor to verify its existence and acquire its 
   * [[ActorRef]]. 
  def resolveOne()(implicit timeout: Timeout): Future[ActorRef] = { 
    implicit val ec = ExecutionContexts.sameThreadExecutionContext 
    val p = Promise[ActorRef]() 
    this.ask(Identify(None)) onComplete { 
      case Success(ActorIdentity(_, Some(ref))) ⇒ p.success(ref) 
      case _                                    ⇒ p.failure(ActorNotFound(this)) 


import akka.actor._ 
import akka.util.Timeout 
import scala.concurrent.duration._ 
import akka.pattern._ 
import remoteLookup.messages.Messages._ 
object LocalSelectionDemo extends App { 
  val localSystem = ActorSystem("localSystem") 
  import localSystem.dispatcher 
  val path = "akka.tcp:[email protected]:2552/user/supervisorActor/calculator" 
      implicit val timeout = Timeout(5 seconds) 
      for (calcActor : ActorRef <- localSystem.actorSelection(path).resolveOne()) { 
        calcActor ! Clear 
        calcActor ! Num(13.0) 
        calcActor ! Mul(1.5) 
        ((calcActor ? GetResult).mapTo[String]) foreach println 
        calcActor ! Div(0.0) 
        calcActor ! Div(1.5) 
        calcActor ! Add(100.0) 
        ((calcActor ? GetResult).mapTo[String]) foreach println 


package remoteLookup.remote 
import akka.actor._ 
import akka.pattern._ 
import remoteLookup.messages.Messages._ 
import scala.concurrent.duration._ 
class SupervisorActor extends Actor { 
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { 
    case _: ArithmeticException => SupervisorStrategy.Resume 
  override def supervisorStrategy: SupervisorStrategy = 
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){ 
  val calcActor = context.actorOf(CalcProps.props,"calculator") 
  override def receive: Receive = { 
    case msg@ _ => calcActor.forward(msg) 
object CalculatorRunner extends App { 
  val remoteSystem = ActorSystem("remoteSystem") 
  val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor") 
  import remoteSystem.dispatcher 
  calcActor ! Clear 
  calcActor ! Num(13.0) 
  calcActor ! Mul(1.5) 
  implicit val timeout = akka.util.Timeout(1 second) 
  ((calcActor ? GetResult).mapTo[String]) foreach println 
  calcActor ! Div(0.0) 
  calcActor ! Div(1.5) 
  calcActor ! Add(100.0) 
  ((calcActor ? GetResult).mapTo[String]) foreach println 



INFO] [06/20/2017 21:24:37.955] [main] [akka.remote.Remoting] Starting remoting 
[INFO] [06/20/2017 21:24:38.091] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552] 
[INFO] [06/20/2017 21:24:38.092] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]


用sbt run 运行local:

Result of calculation is: 19.5 
Result of calculation is: 113.0



[INFO] [06/20/2017 21:24:37.955] [main] [akka.remote.Remoting] Starting remoting 
[INFO] [06/20/2017 21:24:38.091] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552] 
[INFO] [06/20/2017 21:24:38.092] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552] 
[WARN] [06/20/2017 21:27:06.330] [remoteSystem-akka.actor.default-dispatcher-4] [akka://remoteSystem/user/supervisorActor/calculator] / by zero 
[ERROR] [06/20/2017 21:27:34.176] [remoteSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://[email protected]:[email protected]%2FlocalSystem%40127.0.0.1%3A60601-0/endpointWriter] AssociationError [akka.tcp://[email protected]:2552] <- [akka.tcp://[email protected]:60601]: Error [Shut down address: akka.tcp://[email protected]:60601] [ 
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://[email protected]:60601 
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. 


 * A message all Actors will understand, that when processed will reply with 
 * [[akka.actor.ActorIdentity]] containing the `ActorRef`. The `messageId` 
 * is returned in the `ActorIdentity` message as `correlationId`. 
final case class Identify(messageId: Any) extends AutoReceivedMessage with NotInfluenceReceiveTimeout 
 * Reply to [[akka.actor.Identify]]. Contains 
 * `Some(ref)` with the `ActorRef` of the actor replying to the request or 
 * `None` if no actor matched the request. 
 * The `correlationId` is taken from the `messageId` in 
 * the `Identify` message. 
final case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) { 
  if (ref.isDefined && ref.get == null) { 
    throw new IllegalArgumentException("ActorIdentity created with ref = Some(null) is not allowed, " + 
      "this could happen when serializing with Scala 2.12 and deserializing with Scala 2.11 which is not supported.") 
   * Java API: `ActorRef` of the actor replying to the request or 
   * null if no actor matched the request. 
  @deprecated("Use getActorRef instead", "2.5.0") 
  def getRef: ActorRef = ref.orNull 
   * Java API: `ActorRef` of the actor replying to the request or 
   * not defined if no actor matched the request. 
  def getActorRef: Optional[ActorRef] = { 
    import scala.compat.java8.OptionConverters._ 

如果拿上面的例子,我们就会向远程机上的Calculator地址发送Identify(path),而Calculator返回ActorIdentity消息,参数包括correlationId = path, ref = Calculator的ActorRef。 下面是使用示范代码:

object LocalIdentifyDemo extends App { 
  class RemoteCalc extends Actor with ActorLogging { 
    val path = "akka.tcp:[email protected]:2552/user/supervisorActor/calculator" 
    context.actorSelection(path) ! Identify(path)  //send req for ActorRef 
    import context.dispatcher 
    implicit val timeout = Timeout(5 seconds) 
    override def receive: Receive = { 
      case ActorIdentity(p,someRef) if p.equals(path) =>  
        someRef foreach { calcActor => 
          calcActor ! Clear 
          calcActor ! Num(13.0) 
          calcActor ! Mul(1.5) 
          ((calcActor ? GetResult).mapTo[String]) foreach println 
          calcActor ! Div(0.0) 
          calcActor ! Div(1.5) 
          calcActor ! Add(100.0) 
          ((calcActor ? GetResult).mapTo[String]) foreach println 
  val localSystem = ActorSystem("localSystem") 
  val localActor = localSystem.actorOf(Props[RemoteCalc],"localActor") 

Identify消息确认机制是一种Actor沟通模式,所以我们需要构建一个RemoteCalc Actor,把程序包嵌在这个Actor里面。当receive收到确认消息ActorIdentity后获取ActorRef运算程序。




lazy val commonSettings = seq ( 
  name := "RemoteLookupDemo", 
  version := "1.0", 
  scalaVersion := "2.11.8", 
  libraryDependencies := Seq( 
    "com.typesafe.akka" %% "akka-actor" % "2.5.2", 
    "com.typesafe.akka" %% "akka-remote" % "2.5.2" 
lazy val local = (project in file(".")) 
       name := "remoteLookupDemo" 
lazy val messages = (project in file("messages")) 
      name := "commands" 
lazy val remote = (project in file("remote")) 
      name := "remoteSystem" 


package remoteLookup.messages 
object Messages { 
  sealed trait MathOps 
  case class Num(dnum: Double) extends MathOps 
  case class Add(dnum: Double) extends MathOps 
  case class Sub(dnum: Double) extends MathOps 
  case class Mul(dnum: Double) extends MathOps 
  case class Div(dnum: Double) extends MathOps 
  sealed trait CalcOps 
  case object Clear extends CalcOps 
  case object GetResult extends CalcOps 


akka { 
  actor { 
    provider = remote 
  remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
      hostname = "" 
      port = 2552 
    log-sent-messages = on 
    log-received-messages = on 


package remoteLookup.remote 
import akka.actor._ 
import remoteLookup.messages.Messages._ 
object CalcProps { 
  def props = Props(new Calcultor) 
class Calcultor extends Actor with ActorLogging { 
  var result: Double = 0.0   //internal state 
  override def receive: Receive = { 
    case Num(d) => result = d 
    case Add(d) => result += d 
    case Sub(d) => result -= d 
    case Mul(d) => result *= d 
    case Div(d) => 
      val _ = result.toInt / d.toInt   //yield ArithmeticException 
      result /= d 
    case Clear => result = 0.0 
    case GetResult => 
      sender() ! s"Result of calculation is: $result" 
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
    log.info(s"Restarting calculator: ${reason.getMessage}") 
    super.preRestart(reason, message) 


package remoteLookup.remote 
import akka.actor._ 
import akka.pattern._ 
import remoteLookup.messages.Messages._ 
import scala.concurrent.duration._ 
class SupervisorActor extends Actor { 
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { 
    case _: ArithmeticException => SupervisorStrategy.Resume 
  override def supervisorStrategy: SupervisorStrategy = 
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){ 
  val calcActor = context.actorOf(CalcProps.props,"calculator") 
  override def receive: Receive = { 
    case msg@ _ => calcActor.forward(msg) 
object CalculatorRunner extends App { 
  val remoteSystem = ActorSystem("remoteSystem") 
  val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor") 
  import remoteSystem.dispatcher 
  calcActor ! Clear 
  calcActor ! Num(13.0) 
  calcActor ! Mul(1.5) 
  implicit val timeout = akka.util.Timeout(1 second) 
  ((calcActor ? GetResult).mapTo[String]) foreach println 
  calcActor ! Div(0.0) 
  calcActor ! Div(1.5) 
  calcActor ! Add(100.0) 
  ((calcActor ? GetResult).mapTo[String]) foreach println 


akka { 
  actor { 
    provider = remote 
  remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
      hostname = "" 
      port = 0 


import akka.actor._ 
import akka.util.Timeout 
import scala.concurrent.duration._ 
import akka.pattern._ 
import remoteLookup.messages.Messages._ 
object LocalSelectionDemo extends App { 
  val localSystem = ActorSystem("localSystem") 
  import localSystem.dispatcher 
  val path = "akka.tcp:[email protected]:2552/user/supervisorActor/calculator" 
      implicit val timeout = Timeout(5 seconds) 
      for (calcActor : ActorRef <- localSystem.actorSelection(path).resolveOne()) { 
        calcActor ! Clear 
        calcActor ! Num(13.0) 
        calcActor ! Mul(1.5) 
        ((calcActor ? GetResult).mapTo[String]) foreach println 
        calcActor ! Div(0.0) 
        calcActor ! Div(1.5) 
        calcActor ! Add(100.0) 
        ((calcActor ? GetResult).mapTo[String]) foreach println 
object LocalIdentifyDemo extends App { 
  class RemoteCalc extends Actor with ActorLogging { 
    val path = "akka.tcp:[email protected]:2552/user/supervisorActor/calculator" 
    context.actorSelection(path) ! Identify(path)  //semd req for ActorRef 
    import context.dispatcher 
    implicit val timeout = Timeout(5 seconds) 
    override def receive: Receive = { 
      case ActorIdentity(p,someRef) if p.equals(path) => 
        someRef foreach { calcActor => 
          calcActor ! Clear 
          calcActor ! Num(13.0) 
          calcActor ! Mul(1.5) 
          ((calcActor ? GetResult).mapTo[String]) foreach println 
          calcActor ! Div(0.0) 
          calcActor ! Div(1.5) 
          calcActor ! Add(100.0) 
          ((calcActor ? GetResult).mapTo[String]) foreach println 
  val localSystem = ActorSystem("localSystem") 
  val localActor = localSystem.actorOf(Props[RemoteCalc],"localActor") 






























