声明:本文是《Java虚拟机并发编程》的第五章,感谢华章出版社授权并发编程网站发布此文,禁止以任何形式转载此文。
使用了类型化角色的EnergySource使我们能够以调用函数的形式来掩盖后台顺序处理异步消息的过程,在实现了线程安全的同时又可以免去显式同步的困扰。虽然创建类型化角色并不困难,但此时我们的EnergySource却还是一个丢失了关键特性的半成品——即还没有可以周期性自动补充电量的能力。
在上一章我们所实现的版本中,由于整个动作都是在后台完成,所以电量补充的动作是不需要任何用户介入的。只要我们启动了电源,就会有一个专门的timer负责每秒钟为电源增加一格电量。
然而在使用了类型化角色的版本中,实现这一特性还需要多费一番功夫才行——我们需要确保自动恢复动作不会破坏类型化角色的单线程特性。在真正进入编码阶段之前,让我们先考虑一下有哪些备选的实现方案。
方案一。我们可以将replenish()函数加入到EnergySource接口中,这样一来,电源的使用者就可以每秒一次周期性地调用这个函数。但不幸的是,这种做法不但为电源的使用者增添了不必要的负担,而且还面临着用户忘记执行函数调用的风险。此外,电量自动恢复并非电源的通用能力,其他版本的EnergySource或许根本就没这个功能,所以replenish()并不适合加入到EnergySource接口中。综上所述,我们排除此方案。
方案二。我们可以在类型化角色中创建一个timer,并让这个timer负责周期性地恢复电源电量。此外,TypedActor中还定义了两个名为preStart()和postStop()的特殊函数,前者会在角色被创建之前调用,而后者则会在角色被终止或关闭之后被调用。这俩函数的特性很符合本方案的需求,即我们可以在preStart()函数里启动timer,并在postStop()函数里将其关停。然而虽然粗看起来本方案似乎挺不错,但实际上这种做法会引发其他的问题。
方案二的缺陷在于,timer运行在自己独立的线程中,而我们不想让那些线程修改角色里的可变状态。请记住,我们希望状态是隔离可变的,而不是共享可变的。所以,我们所需要的是一种能够被角色正确执行、可以引发内部函数调用的方法,我称该方法为murmurs。这些murmurs对类型化角色的外部用户来说是透明的,而实际上murmurs是以异步消息的形式运作的,其本质与那些受外部调用而产生消息是相同的。下面让我们来研究如何用这种方式来编写代码。
请记住,类型化角色实际上是加强版的角色,它们同样也像普通角色那样有接收消息的功能。基类TypedActor的receive()函数主要负责从代理中接收消息,并将其分派给类中合适的函数。我们可以覆写该函数来实现针对murmurs的特殊消息,即内部操作。通过这种方式,角色的使用者还是可以调用随接口发布出去的那些函数,而我们的内部类则可以使用这个(未发布出去)的消息,二者互不干扰。此外,如果需要的话,我们甚至还可以增加一个额外的步骤来确保这个消息的发送者真的是我们自己的角色。
后面我们将会看到,用Java实现上述方案还是有些麻烦的,而在Scala中则会简单很多。下面我们还是先研究在Java中如何实现,然后再将代码翻译成Scala版本。
用Java实现Murmurs
所有EnergySourceImpl的外部用户都是通过EnergySource接口与其进行交互的,而在内部,我们将利用一个timer来实现让电源角色每秒向自己发送一个Replenish请求消息的动作。虽然为了阻止外部用户的直接调用,我们会将replenish()函数定义为private,但我们同样也应避免从timer里直接调用该函数,以保持角色消息收发的交互方式不被破坏。下面让我们来看看其中的部分代码:
@SuppressWarnings("unchecked") public class EnergySourceImpl extends TypedActor implements EnergySource { private final long MAXLEVEL = 100L; private long level = MAXLEVEL; private long usageCount = 0L; class Replenish {} @Override public void preStart() { Scheduler.schedule( optionSelf().get(), new Replenish(), 1, 1, TimeUnit.SECONDS); } 196 • Chapter 8. Favoring Isolated Mutability @Override public void postStop() { Scheduler.shutdown(); } private void replenish() { System.out.println("Thread in replenish: " + Thread.currentThread().getName()); if (level < MAXLEVEL) level += 1; }
之前我们曾经提到过,TypedActor定义了一个角色启动之后会被自动调用的preStart()函数,本例中启动timer的工作就是在这个函数里完成的。这里我们所使用的Akka Scheduler是一种基于角色的timer,它提供了一组重载的schedule()函数来执行一次性或重复性任务。我们可以用timer来执行任意函数,亦或向本例那样向角色发送消息。
在preStart()函数中,我们设定了一个每秒向角色发送一个Replenish消息的timer,该timer将在初始化动作结束1秒钟后启动。通过调用TypedActor实例对象的optionSelf()函数,我们可以拿到角色的ActorRef引用的句柄。当角色停止时,timer也需要立刻终止运行,因此这里我们还会用到postStop()函数。在私有的replenish()函数中,我们并没有对Replenish消息进行任何处理,而是简单地将level变量加1便返回了。
类型化角色的用户所使用的代理负责将函数调用转换成消息。而TypedActor基类的receive()函数负责将这些消息转换成实现类的函数调用。如果我们检查receive()函数的定义就会发现,其实该函数的返回值是一个scala.PartialFunction[1]。为了不过多纠结于语言细节,这里你将PartialFunction想像成一个经过修饰的switch语句就可以了,其功能是根据所收到的消息类型将消息分发到不同的代码处理逻辑中去。虽然基类替我们做了消息与函数之间的映射,但我们还希望它把我们的私有消息与函数的映射放在一起处理。换句话说,我们希望把自己的消息处理逻辑合并到基类的receive()函数的处理流程中去。PartialFunction的orElse()函数可以帮助我们很轻松地实现上述需求,所以我们将会像下面这样实现receive()函数:
@Override public PartialFunction receive() { return processMessage().orElse(super.receive()); }
在这个覆写的receive()函数中,我们并将基类的receive()函数所返回的PartialFunction与尚待实现的processMessage()函数所返回的PartialFunction进行了合并。而现在我们可以将注意力转向processMessage()函数的实现上来了。该函数可以接收Replenish消息并调用私有的replenish()函数。由于这是消息处理序列的一部分,所以我们在不知不觉中就已经用基于角色的交互方式解决了线程同步问题。请看一下你杯子里的咖啡是否还够,不够的话请到满吧,因为你可能在实现这个函数的时候需要一些额外的咖啡因来提提神。
PartialFunction是一个Scala trait,而Java中带有实现清单的trait是作为一个接口/抽象类对出现的。所以要想在Java中实现trait,我们需要实现PartialFunction接口,并将调用酌情委派给相应的抽象类。
除了receive()函数之外,我们还需要实现关键函数apply()和isDefinedAt()。前者主要负责处理Replenish消息,而后者主要用于判断我们的PartialFunction是否支持某个特殊的消息格式或类型,而该接口的其他函数则委派给PartialFunction$class就可以了。最后,为了少实现一些接口函数,我们可以继承与PartialFunction共享相同Function1接口的AbstractFunction1类。
private PartialFunction processMessage() { class MyDispatch extends AbstractFunction1 implements PartialFunction { public boolean isDefinedAt(Object message) { return message instanceof Replenish; } public Object apply(Object message) { if (message instanceof Replenish) replenish(); return null; } public Function1 lift() { return PartialFunction$class.lift(this); } public PartialFunction andThen(Function1 function) { return PartialFunction$class.andThen(this, function); } public PartialFunction orElse(PartialFunction function) { return PartialFunction$class.orElse(this, function); } }; return new MyDispatch(); }
在apply()函数中,我们会检查所收到的消息是否Replenish类型,如果是就调用私有函数replenish(),否则返回null。函数isDefinedAt()的作用是指明我们只支持Replenish这一种消息类型,而其他消息将交由基类的receive()函数酌情处理。OK,最后的步骤就是把我们之前在类型化角色版本中定义的几个不需要变动的方法原封不动地搬过来,如下所示:
public long getUnitsAvailable() { return level; } public long getUsageCount() { return usageCount; } public void useEnergy(final long units) { if (units > 0 && level - units >= 0) { System.out.println( "Thread in useEnergy: " + Thread.currentThread().getName()); level -= units; usageCount++; } } }
在这一版本的代码中,由于EnergySource接口没发生任何变化,所以测试用例UseEnergySource可以直接复用上一个版本的代码。下面让我们对新版本的EnergySourceImpl进行编译,并用在上一节中写的UseEnergySource来运行这个新的电源实现类。在上一节中,当测试用例运行到结尾处时,电源中还剩了70格电量。而现在,由于我们有了自动电量恢复机制,所以测试用例完成之后电源应该会比上一节的运行结果多1、2格电量。
Thread in main: main Energy units 100 Firing two requests for use energy Fired two requests for use energy Thread in useEnergy: akka:event-driven:dispatcher:global-2 Thread in useEnergy: akka:event-driven:dispatcher:global-2 Firing one more requests for use energy Thread in useEnergy: akka:event-driven:dispatcher:global-3 Thread in replenish: akka:event-driven:dispatcher:global-4 Energy units 71 Usage 3
从我们在代码里插入的打印语句所输出线程信息来看,消耗电量的请求和恢复电量的请求都是运行在Akka 角色的线程里面的,所以我们从此可以摆脱同步问题的困扰了。由于角色是单线程的,所以如果我们在代码中插入一些sleep调用来迟滞这些任务的话,就可以看到接下来的那些调用角色的动作也同样也会被延迟。
用Scala实现Murmurs
Murmurs方法的本质是将我们自己的PartialFunction实现与基类的receive()函数所返回的PartialFunction合并在一起。这种方式用Java实现起来比较繁琐,但用Scala实现则会简单很多。下面就让我们一起来看看如何在Scala中处理murmus(即内部消息)。
class EnergySourceImpl extends TypedActor with EnergySource { val MAXLEVEL = 100L var level = MAXLEVEL var usageCount = 0L case class Replenish() override def preStart() = Scheduler.schedule(self, Replenish, 1, 1, TimeUnit.SECONDS) override def postStop() = Scheduler.shutdown override def receive = processMessage orElse super.receive def processMessage : Receive = { case Replenish => println("Thread in replenish: " + Thread.currentThread.getName) if (level < MAXLEVEL) level += 1 }
在上面的代码中,preStart()函数和postStop()函数是从Java版代码简单翻译过来的,这里就不再赘述。消息类Replenish在这里变成了一个case类。而receive()函数除了在形式上保持了Scala语法简洁的特性之外,其功能和Java版本完全相同。这里面变动最大的要数processMessage()函数了。由于Scala版本的实现无需对Replenish消息进行模式匹配,所以也就不会出现Java实现版本中那些乱七八糟的继承和委派(delegation)。也正是基于这种简单性,我们还可以将Replenish()函数的逻辑也放在这里,这样我们就不用再创建一个私有函数了。此外,对于processMessage()函数的返回值类型而言,我们既可以将其定义为PartialFunction[Any, Unit],也可以和上例一样使用Receive。由于Receive是PartialFunction[Any,Unit]的别名(alias),所以二者并无本质区别。
下面要实现的几个函数由于功能本身没有任何变化,所以我们直接把前一个Scala版本的代码抄过来即可:
def getUnitsAvailable() = level def getUsageCount() = usageCount def useEnergy(units : Long) = { if (units > 0 && level - units >= 0) { println("Thread in useEnergy: " + Thread.currentThread.getName) level -= units usageCount += 1 } } }
最后,对于EnergySource和UseEnergySource的实现,我们也直接把上一个Scala版本的代码拿过来用即可。下面让我们编译运行新版本的EnergySourceImpl,并将输出结果与之前的版本进行比较:
Thread in main: main Energy units 100 Firing two requests for use energy 200 • Chapter 8. Favoring Isolated Mutability Fired two requests for use energy Thread in useEnergy: akka:event-driven:dispatcher:global-2 Thread in useEnergy: akka:event-driven:dispatcher:global-2 Firing one more requests for use energy Thread in useEnergy: akka:event-driven:dispatcher:global-3 Thread in replenish: akka:event-driven:dispatcher:global-4 Energy units 71 Usage 3
就代码实现角度而言,同样的功能用Scala实现要比用Java实现简单得多。即使两段实例代码都得到了相同的逻辑结果,但是Scala版本明显要比Java版本在代码和逻辑实现方面容易一些。
[1] 其实在技术上,我们应该更精确地将其描述为scala.PartialFunction<Any,Unit>,其中scala.Any可以看成是Java里的Object。但不幸的是,这段代码中Java的类型擦除会导致一些编译期警告。
原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/140931.html