声明:本文是《Java虚拟机并发编程》的第五章,感谢华章出版社授权并发编程网站发布此文,禁止以任何形式转载此文。
正如我们在8.7节中所看到的那样,类型化角色是吸取了面向对象的程序设计和基于角色的程序设计二者的精华所孕育出来的新编程模型。该编程模型集所有我们耳熟能详的方法于一身,既可以方便地使用函数调用,又能享受角色所带来的好处。所以,在一个OO应用程序中,相比起普通的角色,我们可能会更倾向于使用类型化的角色。然而与普通角色相类似的是,类型化角色也是各自独立运行且不提供彼此间事务协调功能的,下面我们将会使用调和类型化角色(coordinating typed actor)来解决这个问题。
Akka帮助我们简化了将一个普通的类型化角色转换成一个调和类型化角色的过程。我们只需简单地用一个特殊的Coordinated注解对相应的接口函数进行标记即可。为了标明某个函数调用序列运行在一个协调事务中,我们需要将这些函数调用包裹在一个coordinate()函数里。在进行下一步操作之前,该函数(默认情况下)会等待所有函数都提交或回滚。
这种方案有一个限制,那就是只有void函数才能够用Coordinated注解进行标记。这是因为void函数被翻译为单向调用,并且这些函数可以参与到事务中。而返回值不为void的函数则会被翻译成双向阻塞调用,所以这些函数将无法参与到自由运行的并发事务中。
现在让我们再次重新实现那个我们曾在8.10节中摆弄过很多次的转账示例吧。
在Java中使用调和类型化角色
为了使用类型化角色,我们需要准备一对接口/实现类,所以我们先从Account和AccountService这两个下面将会用到的接口开始入手:
public interface Account { int getBalance(); @Coordinated void deposit(final int amount); @Coordinated void withdraw(final int amount); } public interface AccountService { void transfer(final Account from, final Account to, final int amount); }
在Account接口中,唯一比较特别的部分就是其两个接口函数都用@Coordinated注解进行了标记。通过这两个标记,我们声明了这些函数既可以在其自身的事务中运行,也可以加入到其调用者的事务当中。与此相对的是,由于AccountService的实现类将会自行管理其事务,所以AccountService的接口函数都没有进行标记。
public class AccountImpl extends TypedActor implements Account { private final Ref<Integer> balance = new Ref<Integer>(0); public int getBalance() { return balance.get(); } public void deposit(final int amount) { if (amount > 0) { balance.swap(balance.get() + amount); System.out.println("Received Deposit request " + amount); } } public void withdraw(final int amount) { System.out.println("Received Withdraw request " + amount); if (amount > 0 && balance.get() >= amount) balance.swap(balance.get() - amount); else { System.out.println("...insufficient funds..."); throw new RuntimeException("Insufficient fund"); } } }
通过继承TypedActor,我们将AccountImpl声明为一个角色。在这里我们没有使用简单的本地字段,而是采用了托管的STM引用(Ref)。此外,虽然我们没有写任何针对事务的代码,但是由于AccountImpl类相关接口函数都是用@Coordinated标记过的,所以该类的所有函数都将运行在一个事务中。在deposit()函数中,如果参数amount的值大于0,则deposit()函数将负责把amount的值累加到其当前余额中。相反地,如果当前余额balance的值大于参数amount,则withdraw()函数就会在当前余额中减去amount的值。否则,withdraw()函数将会抛出一个异常以表明当前操作及外围事务执行失败。如果我们没有为这些函数指定事务,则它们将会运行在自身的默认事务中。而在转账的情境下,我们希望存款和取款操作都运行在同一个事务中,所以接下来我们要写一个AccountServiceImpl来对这两个操作进行管理:
public class AccountServiceImpl extends TypedActor implements AccountService { public void transfer( final Account from, final Account to, final int amount) { coordinate(true, new Atomically() { public void atomically() { Coordinating Typed Actors • 211 to.deposit(amount); from.withdraw(amount); } }); } }
在上面的代码中,transfer()函数保证了存取款操作都将在同一事务中完成。需要在事务中执行的代码都被包装在Atomically接口的成员函数atomically()里。而从akka.transactor.Coordination类中静态引入的coordinate()函数则将以事务的形式运行atomically()函数中的代码块。其第一个参数true的作用是指示coordinate()要等待其事务完成(成功或回滚)之后才能返回。所有事务中的函数调用本质上都是(发送)单向消息,所以coordinate()函数只关心事务是否完成,而不会阻塞式地等待各函数的返回结果。
除了角色对象的创建过程略有不同之外,使用这些类型化角色的代码与使用普通对象的代码看起来没什么两样。在创建对象时,我们没有直接把对象new出来,而是使用了一个工厂类来负责创建工作,具体代码如下所示:
public class UseAccountService { public static void main(final String[] args) throws InterruptedException { final Account account1 = TypedActor.newInstance(Account.class, AccountImpl.class); final Account account2 = TypedActor.newInstance(Account.class, AccountImpl.class); final AccountService accountService = TypedActor.newInstance(AccountService.class, AccountServiceImpl.class); account1.deposit(1000); account2.deposit(1000); System.out.println("Account1 balance is " + account1.getBalance()); System.out.println("Account2 balance is " + account2.getBalance()); System.out.println("Let's transfer $20... should succeed"); accountService.transfer(account1, account2, 20); Thread.sleep(1000); System.out.println("Account1 balance is " + account1.getBalance()); System.out.println("Account2 balance is " + account2.getBalance()); 212 • Chapter 8. Favoring Isolated Mutability System.out.println("Let's transfer $2000... should not succeed"); accountService.transfer(account1, account2, 2000); Thread.sleep(6000); System.out.println("Account1 balance is " + account1.getBalance()); System.out.println("Account2 balance is " + account2.getBalance()); Actors.registry().shutdownAll(); } }
上述代码行为与我们之前在8.10节中所使用的示例完全相同。其输出结果如下所示:
Account1 balance is 1000 Received Deposit request 1000 Account2 balance is 1000 Let's transfer $20... should succeed Received Deposit request 20 Received Withdraw request 20 Account1 balance is 980 Account2 balance is 1020 Let's transfer $2000... should not succeed Received Deposit request 2000 Received Withdraw request 2000 ...insufficient funds... Account1 balance is 980 Account2 balance is 1020
正如我们所期待的那样,调和类型化角色版本的输出结果与transactor版本的输出结果基本相同——最后一个失败的转账事务所产生的所有变更最终都被丢弃。
在Scala中使用调和类型化角色
下面让我们将上述Java版本的示例代码翻译成Scala。在Scala中,我们才采用trait来代替接口,而这也是两种语言在实现方面的第一个不同点。
trait Account { def getBalance() : Int @Coordinated def deposit(amount : Int) : Unit @Coordinated def withdraw(amount : Int) : Unit } trait AccountService { def transfer(from : Account, to : Account, amount : Int) : Unit }
Account trait的实现是从Java版本直译过来的:
class AccountImpl extends TypedActor with Account { val balance = Ref(0) def getBalance() = balance.get() def deposit(amount : Int) = { if (amount > 0) { balance.swap(balance.get() + amount) println("Received Deposit request " + amount) } } def withdraw(amount : Int) = { println("Received Withdraw request " + amount) if (amount > 0 && balance.get() >= amount) balance.swap(balance.get() - amount) else { println("...insufficient funds...") throw new RuntimeException("Insufficient fund") } } }
同样地, AccountService trait的实现也从Java代码直译过来即可:
class AccountServiceImpl extends TypedActor with AccountService { def transfer(from : Account, to : Account, amount : Int) = { coordinate { to.deposit(amount) from.withdraw(amount) } } }
在Scala版本的示例中,调用定义了事务各个组成部分的coordinate()函数的方式比在Java版里简化了很多。默认情况下,coordinate()函数需要等待其事务执行完毕(成功或回滚)之后才能返回。同时,由于事务中所有的函数调用本质上都是(发送)单向消息,所以coordinate()函数只关心事务是否完成,而不会阻塞式地等待各函数的返回结果。此外,我们还可以向其传递一个可选参数,如coordinate(wait=false),来告知coordinate()函数不用等待事务完成。
最后,我们还需要一个检验上述代码的测试用例:
object UseAccountService { def main(args : Array[String]) = { val account1 = TypedActor.newInstance(classOf[Account], classOf[AccountImpl]) val account2 = TypedActor.newInstance(classOf[Account], classOf[AccountImpl]) val accountService = TypedActor.newInstance( classOf[AccountService], classOf[AccountServiceImpl]) account1.deposit(1000) account2.deposit(1000) println("Account1 balance is " + account1.getBalance()) println("Account2 balance is " + account2.getBalance()) println("Let's transfer $20... should succeed") accountService.transfer(account1, account2, 20) Thread.sleep(1000) println("Account1 balance is " + account1.getBalance()) println("Account2 balance is " + account2.getBalance()) println("Let's transfer $2000... should not succeed") accountService.transfer(account1, account2, 2000) Thread.sleep(6000) println("Account1 balance is " + account1.getBalance()) println("Account2 balance is " + account2.getBalance()) Actors.registry.shutdownAll } }
正如我们从下面的输出结果中所看到的那样,Scala实现版本的行为与Java版本完全相同:
Received Deposit request 1000 Received Deposit request 1000 Account1 balance is 1000 Account2 balance is 1000 Let's transfer $20... should succeed Received Deposit request 20 Received Withdraw request 20 Account1 balance is 980 Coordinating Typed Actors • 215 Account2 balance is 1020 Let's transfer $2000... should not succeed Received Deposit request 2000 Received Withdraw request 2000 ...insufficient funds... Account1 balance is 980 Account2 balance is 1020
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/140928.html