Akka(3): Actor监管 – 细述BackoffSupervisor详解编程语言

    在上一篇讨论中我们谈到了监管:在Akka中就是一种直属父子监管树结构,父级Actor负责处理直属子级Actor产生的异常。当时我们把BackoffSupervisor作为父子监管方式的其中一种。实际上BackoffSupervisor与定义了supervisorStrategy的Actor有所不同。我们应该把BackoffSupervisor看作是一个一体化的Actor。当然,它的实现方式还是由一对父子Actor组成。监管策略(SupervisorStrategy)是在BackoffSupervisor的内部实现的。从外表上BackoffSupervisor就像是一个Actor,运算逻辑是在子级Actor中定义的,所谓的父级Actor除监管之外没有任何其它功能,我们甚至没有地方定义父级Actor的功能,它的唯一功能是转发收到的信息给子级,是嵌入BackoffSupervisor里的。所以我们虽然发送消息给BackoffSupervisor,但实际上是在与它的子级交流。我们看看下面这个例子:

package backoffSupervisorDemo 
import akka.actor._ 
import akka.pattern._ 
import backoffSupervisorDemo.InnerChild.TestMessage 
 
import scala.concurrent.duration._ 
 
object InnerChild { 
  case class TestMessage(msg: String) 
  class ChildException extends Exception 
 
  def props = Props[InnerChild] 
} 
class InnerChild extends Actor with ActorLogging { 
  import InnerChild._ 
  override def receive: Receive = { 
    case TestMessage(msg) => //模拟子级功能 
      log.info(s"Child received message: ${msg}") 
  } 
} 
object Supervisor { 
  def props: Props = { //在这里定义了监管策略和child Actor构建 
    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { 
      case _: InnerChild.ChildException => SupervisorStrategy.Restart 
    } 
 
    val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0) 
      .withManualReset 
      .withSupervisorStrategy( 
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)( 
          decider.orElse(SupervisorStrategy.defaultDecider) 
        ) 
      ) 
    BackoffSupervisor.props(options) 
  } 
} 
//注意:下面是Supervisor的父级,不是InnerChild的父级 
object ParentalActor { 
  case class SendToSupervisor(msg: InnerChild.TestMessage) 
  case class SendToInnerChild(msg: InnerChild.TestMessage) 
  case class SendToChildSelection(msg: InnerChild.TestMessage) 
  def props = Props[ParentalActor] 
} 
class ParentalActor extends Actor with ActorLogging { 
  import ParentalActor._ 
  //在这里构建子级Actor supervisor 
  val supervisor = context.actorOf(Supervisor.props,"supervisor") 
  supervisor ! BackoffSupervisor.getCurrentChild //要求supervisor返回当前子级Actor 
  var innerChild: Option[ActorRef] = None   //返回的当前子级ActorRef 
  val selectedChild = context.actorSelection("/user/parent/supervisor/innerChild") 
  override def receive: Receive = { 
    case BackoffSupervisor.CurrentChild(ref) =>   //收到子级Actor信息 
      innerChild = ref 
    case SendToSupervisor(msg) => supervisor ! msg 
    case SendToChildSelection(msg) => selectedChild ! msg 
    case SendToInnerChild(msg) => innerChild foreach(child => child ! msg) 
  } 
 
} 
object BackoffSupervisorDemo extends App { 
  import ParentalActor._ 
  val testSystem = ActorSystem("testSystem") 
  val parent = testSystem.actorOf(ParentalActor.props,"parent") 
 
  Thread.sleep(1000)   //wait for BackoffSupervisor.CurrentChild(ref) received 
 
  parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor")) 
  parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild")) 
  parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild")) 
 
 
  scala.io.StdIn.readLine() 
 
  testSystem.terminate() 
 
}

在上面的例子里我们分别向supervisor,innerChild,selectedChild发送消息。但所有消息都是由InnerChild响应的,如下:

[INFO] [05/29/2017 16:11:48.167] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 1 to supervisor 
[INFO] [05/29/2017 16:11:48.177] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 2 to innerChild 
[INFO] [05/29/2017 16:11:48.179] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 3 to selectedChild

上面我们向supervisor发送了一个BackoffSupervisor.GetCurrentChild消息用来获取子级Actor。BackoffSupervisor是这样处理下面几个特殊消息的:

private[akka] trait HandleBackoff { this: Actor ⇒ 
  def childProps: Props 
  def childName: String 
  def reset: BackoffReset 
 
  var child: Option[ActorRef] = None 
  var restartCount = 0 
 
  import BackoffSupervisor._ 
  import context.dispatcher 
 
  override def preStart(): Unit = startChild() 
 
  def startChild(): Unit = { 
    if (child.isEmpty) { 
      child = Some(context.watch(context.actorOf(childProps, childName))) 
    } 
  } 
 
  def handleBackoff: Receive = { 
    case StartChild ⇒ 
      startChild() 
      reset match { 
        case AutoReset(resetBackoff) ⇒ 
          val _ = context.system.scheduler.scheduleOnce(resetBackoff, self, ResetRestartCount(restartCount)) 
        case _ ⇒ // ignore 
      } 
 
    case Reset ⇒ 
      reset match { 
        case ManualReset ⇒ restartCount = 0 
        case msg         ⇒ unhandled(msg) 
      } 
 
    case ResetRestartCount(current) ⇒ 
      if (current == restartCount) { 
        restartCount = 0 
      } 
 
    case GetRestartCount ⇒ 
      sender() ! RestartCount(restartCount) 
 
    case GetCurrentChild ⇒ 
      sender() ! CurrentChild(child) 
 
    case msg if child.contains(sender()) ⇒ 
      // use the BackoffSupervisor as sender 
      context.parent ! msg 
 
    case msg ⇒ child match { 
      case Some(c) ⇒ c.forward(msg) 
      case None    ⇒ context.system.deadLetters.forward(msg) 
    } 
  } 
}

在handleBackoff函数里可以找到这些消息的处理方式。

在构建上面例子里的Supervisor的Props时定义了监管策略(SupervisorStrategy)对InnerChild产生的异常ChildException进行Restart处理。我们调整一下InnerChild代码来随机产生一些异常:

object InnerChild { 
  case class TestMessage(msg: String) 
  class ChildException(val errmsg: TestMessage) extends Exception 
  object CException {  //for pattern match of class with parameter 
    def apply(msg: TestMessage) = new ChildException(msg) 
    def unapply(cex: ChildException) = Some(cex.errmsg) 
  } 
  def props = Props[InnerChild] 
} 
class InnerChild extends Actor with ActorLogging { 
  import InnerChild._ 
  context.parent ! BackoffSupervisor.Reset  //reset backoff counts 
  override def receive: Receive = { 
    case TestMessage(msg) => //模拟子级功能 
      if (Random.nextBoolean())   //任意产生异常 
        throw new ChildException(TestMessage(msg)) 
      else 
        log.info(s"Child received message: ${msg}") 
  } 
}

我们用Random.nextBoolean来任意产生一些异常。注意:我们同时把ChildException改成了一个带参数的class,因为我们可能需要在重启之前获取造成异常的消息,如下:

    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { 
      case InnerChild.CException(tmsg) => 
        println(s"Message causing exception: ${tmsg.msg}") //we can extract message here 
        SupervisorStrategy.Restart 
    }

所有信息发给supervisor就行了:

class ParentalActor extends Actor with ActorLogging { 
  import ParentalActor._ 
  //在这里构建子级Actor supervisor 
  val supervisor = context.actorOf(Supervisor.props,"supervisor") 
  override def receive: Receive = { 
    case msg@ _ => supervisor ! msg 
  } 
 
} 
object BackoffSupervisorDemo extends App { 
  import ParentalActor._ 
  import InnerChild._ 
  val testSystem = ActorSystem("testSystem") 
  val parent = testSystem.actorOf(ParentalActor.props,"parent") 
   
  parent ! TestMessage("Hello message 1 to supervisor") 
  parent ! TestMessage("Hello message 2 to supervisor") 
  parent ! TestMessage("Hello message 3 to supervisor") 
  parent ! TestMessage("Hello message 4 to supervisor") 
  parent ! TestMessage("Hello message 5 to supervisor") 
  parent ! TestMessage("Hello message 6 to supervisor") 
 
 
  scala.io.StdIn.readLine() 
 
  testSystem.terminate() 
 
}

运行后发现在出现异常后所有消息都变成了DeadLetter:

[INFO] [05/29/2017 18:22:11.689] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/parent/supervisor/innerChild] Message [backoffSupervisorDemo.InnerChild$TestMessage] from Actor[akka://testSystem/user/parent#2140150413] to Actor[akka://testSystem/user/parent/supervisor/innerChild#-1047097634] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
....

这也证明了BackoffSupervisor具有不同的Restart处理方式,好像是直接终止InnerChild而非正常的挂起,销毁了ActorRef和邮箱,所以在完成启动之前发给InnerChild的消息都被导入DeadLetter队列了。也就是说不但错过造成异常的消息,而是跳过了下面启动时间段内所有的消息。

下面我们来解决失踪消息的问题:首先是如何重新发送造成异常的消息,我们可以在监管策略中重启前发送:

    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { 
      case InnerChild.CException(tmsg) => 
        println(s"Message causing exception: ${tmsg.msg}") //we can extract message here 
        BackoffSupervisorDemo.sendToParent(tmsg)  //resend message  
        SupervisorStrategy.Restart 
    }

在BackoffSupervisorDemo里先声明sendToParent函数:

  def sendToParent(msg: TestMessage) = parent ! msg

然后再想办法把DeadLetter捞出来。我们可以用Akka的eventStream来订阅DeadLetter类型消息:

object DeadLetterMonitor { 
  def props(parentRef: ActorRef) = Props(new DeadLetterMonitor(parentRef)) 
} 
class DeadLetterMonitor(receiver: ActorRef) extends Actor with ActorLogging { 
  import InnerChild._ 
  import context.dispatcher 
  override def receive: Receive = { 
    case DeadLetter(msg,sender,_) => 
      //wait till InnerChild finishes restart then resend 
      context.system.scheduler.scheduleOnce(1 second,receiver,msg.asInstanceOf[TestMessage]) 
  } 
} 
object BackoffSupervisorDemo extends App { 
  import ParentalActor._ 
  import InnerChild._ 
 
  def sendToParent(msg: TestMessage) = parent ! msg 
 
  val testSystem = ActorSystem("testSystem") 
  val parent = testSystem.actorOf(ParentalActor.props,"parent") 
 
  val deadLetterMonitor = testSystem.actorOf(DeadLetterMonitor.props(parent),"dlmonitor") 
  testSystem.eventStream.subscribe(deadLetterMonitor,classOf[DeadLetter]) //listen to DeadLetter 
 
  parent ! TestMessage("Hello message 1 to supervisor") 
  parent ! TestMessage("Hello message 2 to supervisor") 
  parent ! TestMessage("Hello message 3 to supervisor") 
  parent ! TestMessage("Hello message 4 to supervisor") 
  parent ! TestMessage("Hello message 5 to supervisor") 
  parent ! TestMessage("Hello message 6 to supervisor") 
 
 
  scala.io.StdIn.readLine() 
 
  testSystem.terminate() 
 
}

试运算后显示InnerChild成功处理了所有6条消息。

下面是本次讨论的完整示范代码:

package backoffSupervisorDemo 
import akka.actor._ 
import akka.pattern._ 
import scala.util.Random 
import scala.concurrent.duration._ 
object InnerChild { 
case class TestMessage(msg: String) 
class ChildException(val errmsg: TestMessage) extends Exception 
object CException {  //for pattern match of class with parameter 
def apply(msg: TestMessage) = new ChildException(msg) 
def unapply(cex: ChildException) = Some(cex.errmsg) 
} 
def props = Props[InnerChild] 
} 
class InnerChild extends Actor with ActorLogging { 
import InnerChild._ 
context.parent ! BackoffSupervisor.Reset  //reset backoff counts 
override def receive: Receive = { 
case TestMessage(msg) => //模拟子级功能 
if (Random.nextBoolean())   //任意产生异常 
throw new ChildException(TestMessage(msg)) 
else 
log.info(s"Child received message: ${msg}") 
} 
} 
object Supervisor { 
def props: Props = { //在这里定义了监管策略和child Actor构建 
def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { 
case InnerChild.CException(tmsg) => 
println(s"Message causing exception: ${tmsg.msg}") //we can extract message here 
BackoffSupervisorDemo.sendToParent(tmsg)  //resend message 
        SupervisorStrategy.Restart 
} 
val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0) 
.withManualReset 
.withSupervisorStrategy( 
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)( 
decider.orElse(SupervisorStrategy.defaultDecider) 
) 
) 
BackoffSupervisor.props(options) 
} 
} 
//注意:下面是Supervisor的父级,不是InnerChild的父级 
object ParentalActor { 
case class SendToSupervisor(msg: InnerChild.TestMessage) 
case class SendToInnerChild(msg: InnerChild.TestMessage) 
case class SendToChildSelection(msg: InnerChild.TestMessage) 
def props = Props[ParentalActor] 
} 
class ParentalActor extends Actor with ActorLogging { 
import ParentalActor._ 
//在这里构建子级Actor supervisor 
val supervisor = context.actorOf(Supervisor.props,"supervisor") 
override def receive: Receive = { 
case msg@ _ => supervisor ! msg 
} 
} 
object DeadLetterMonitor { 
def props(parentRef: ActorRef) = Props(new DeadLetterMonitor(parentRef)) 
} 
class DeadLetterMonitor(receiver: ActorRef) extends Actor with ActorLogging { 
import InnerChild._ 
import context.dispatcher 
override def receive: Receive = { 
case DeadLetter(msg,sender,_) => 
//wait till InnerChild finishes restart then resend 
context.system.scheduler.scheduleOnce(1 second,receiver,msg.asInstanceOf[TestMessage]) 
} 
} 
object BackoffSupervisorDemo extends App { 
import ParentalActor._ 
import InnerChild._ 
def sendToParent(msg: TestMessage) = parent ! msg 
val testSystem = ActorSystem("testSystem") 
val parent = testSystem.actorOf(ParentalActor.props,"parent") 
val deadLetterMonitor = testSystem.actorOf(DeadLetterMonitor.props(parent),"dlmonitor") 
testSystem.eventStream.subscribe(deadLetterMonitor,classOf[DeadLetter]) //listen to DeadLetter 
 
parent ! TestMessage("Hello message 1 to supervisor") 
parent ! TestMessage("Hello message 2 to supervisor") 
parent ! TestMessage("Hello message 3 to supervisor") 
parent ! TestMessage("Hello message 4 to supervisor") 
parent ! TestMessage("Hello message 5 to supervisor") 
parent ! TestMessage("Hello message 6 to supervisor") 
scala.io.StdIn.readLine() 
testSystem.terminate() 
}

 

 

 

 

 

 

 

 

 

 

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12858.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论