Scalaz(45)- concurrency :Task-函数式多线程编程核心配件详解编程语言

    我们在上一节讨论了scalaz Future,我们说它是一个不完善的类型,最起码没有完整的异常处理机制,只能用在构建类库之类的内部环境。如果scalaz在Future类定义中增加异常处理工具的话,用户就会经常遇到Future[Throwable//A]这样的类型,那么在进行Monadic编程时就必须使用Monad Transformer来匹配类型,程序就会变得不必要的复杂。scalaz的解决方案就是把Future[Throwable//A]包嵌在Task类里,然后把所有Future都统一升格成Task。Task是个Monad, 这样,我们就可以统一方便地用Task来进行多线程函数式编程了。我们先看看Task的定义:scalaz.concurrent/Task.scala

class Task[+A](val get: Future[Throwable // A]) { 
 
  def flatMap[B](f: A => Task[B]): Task[B] = 
    new Task(get flatMap { 
      case -//(e) => Future.now(-//(e)) 
      case //-(a) => Task.Try(f(a)) match { 
        case e @ -//(_) => Future.now(e) 
        case //-(task) => task.get 
      } 
    }) 
 
  def map[B](f: A => B): Task[B] = 
    new Task(get map { _ flatMap {a => Task.Try(f(a))} }) 
...

Task实现了flatMap,所以是个Monad,我们可以在for-comprehension中使用Task。

Task的构建方式与Future一样:

1 val tnow = Task.now { println("run now ..."); 3+4 } 
2                                      //> run now ... 
3                                      //| tnow  : scalaz.concurrent.Task[Int] = [email protected] 
4 val tdelay = Task.delay { println("run delay ...");  3+4 } 
5                                     //> tdelay  : scalaz.concurrent.Task[Int] = [email protected] 
6 val tapply = Task { println("run apply ..."); 3+4 } 
7                                    //> tapply  : scalaz.concurrent.Task[Int] = [email protected]

同样,now函数是即时运算的。它就是一个lifter,能把一个普通运算直接升格为Task。

针对Task有几种运算方法:

 1 tnow.unsafePerformSync                            //> res0: Int = 7 
 2 tdelay.unsafePerformSync                          //> run delay ... 
 3                                                   //| res1: Int = 7 
 4 tnow.unsafePerformAsync { 
 5   case //-(a) => println(s"the result is: $a") 
 6   case -//(e) => println(e.getMessage) 
 7 }                                                 //> the result is: 7 
 8 tdelay.unsafePerformAsync { 
 9   case //-(a) => println(s"the result is: $a") 
10   case -//(e) => println(e.getMessage) 
11 }                                                 //> run delay ... 
12                                                   //| the result is: 7 
13 tapply.unsafePerformAsync { 
14   case //-(a) => println(s"the result is: $a") 
15   case -//(e) => println(e.getMessage) 
16 } 
17 Thread.sleep(1000)                                //> run apply ... 
18                                                   //| the result is: 7

从上面的例子我们可以得出:tnow已经完成了运算,因为运算结果没有”run now …”提示了。tdelay和tapply都是存在trampoline结构里的。但tapply存在更深一层的结构里,所以我们必须拖时间来等待tapply的运算结果。tdelay存放在Future.Suspend结构里,而tapply是存放在Future.Async结构里的,所以tdelay是一种延迟运算,而tapply就是异步运算了:

 1 def delay[A](a: => A): Task[A] = suspend(now(a)) 
 2 def suspend[A](a: => Task[A]): Task[A] = new Task(Future.suspend( 
 3     Try(a.get) match { 
 4       case -//(e) => Future.now(-//(e)) 
 5       case //-(f) => f 
 6   })) 
 7 //Future.suspend: 
 8  def suspend[A](f: => Future[A]): Future[A] = Suspend(() => f) 
 9  
10  def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Task[A] = 
11     new Task(Future(Try(a))(pool)) 
12 //Future.apply 
13 def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Future[A] = Async { cb => 
14     pool.submit { new Callable[Unit] { def call = cb(a).run }} 
15   }

好了,我们再看看Task是怎样处理异常情况的:

 1 def eval(value: => Int) = Task { Thread.sleep(1000); value } 
 2                                                   //> eval: (value: => Int)scalaz.concurrent.Task[Int] 
 3 eval( 3 * 7 ).onFinish { 
 4   case None => Task { println("finished calculation successfully.") } 
 5   case Some(e) => Task { println(s"caught error [${e.getMessage}]") } 
 6 }.unsafePerformSyncAttempt match { 
 7   case -//(e) => println(s"calculation error [${e.getMessage}]") 
 8   case //-(a) => println(s"the result is: $a") 
 9 }                                                 //> finished calculation successfully. 
10                                                   //| the result is: 21 
11 // 异常处理 
12 eval( 3 * 7 / 0 ).onFinish { 
13   case None => Task { println("finished calculation successfully.") } 
14   case Some(e) => Task { println(s"caught error [${e.getMessage}]") } 
15 }.unsafePerformAsync { 
16   case -//(e) => println(s"calculation error [${e.getMessage}]") 
17   case //-(a) => println(s"the result is: $a") 
18 } 
19 Thread.sleep(2000)                                //> caught error [/ by zero] 
20                                                   //| calculation error [/ by zero]

精准异常处理例子:

1 import java.util.concurrent._ 
2 val timedTask = Task {Thread.sleep(2000); 3+4}    
3                      //> timedTask  : scalaz.concurrent.Task[Int] = [email protected] 
4 timedTask.timed(1 second).handleWith { 
5   case e: TimeoutException => Task { println(s"calculation exceeding time limit: ${e.getMessage}") } 
6 }.unsafePerformSync           //> calculation exceeding time limit: Timed out after 1000 milliseconds 
7                               //| res2: AnyVal{def getClass(): Class[_ >: Int with Unit <: AnyVal]} = ()

再看一些多线程编程例子:

 1 val tasks = (1 |-> 5).map(n => Task{ Thread.sleep(100); n }) 
 2//> tasks  : List[scalaz.concurrent.Task[Int]] = List([email protected] 
 3//| 8b19ad, [email protected], [email protected], s 
 4//| [email protected], [email protected]) 
 5 //并行运算list of tasks 
 6 Task.gatherUnordered(tasks).unsafePerformSync     //> res3: List[Int] = List(1, 2, 3, 4, 5) 
 7 val sb = new StringBuffer                         //> sb  : StringBuffer =  
 8 val t1 = Task.fork { Thread.sleep(100); sb.append("a"); Task.now("a")} 
 9//> t1  : scalaz.concurrent.Task[String] = [email protected] 
10 val t2 = Task.fork { Thread.sleep(800); sb.append("b"); Task.now("b")} 
11//> t2  : scalaz.concurrent.Task[String] = [email protected] 
12 val t3 = Task.fork { Thread.sleep(200); sb.append("c"); Task.now("c")} 
13//> t3  : scalaz.concurrent.Task[String] = [email protected] 
14 val t4 = Task.fork { Thread.sleep(100); sb.append("d"); Task.now("d")} 
15//> t4  : scalaz.concurrent.Task[String] = [email protected] 
16 val t5 = Task.fork { Thread.sleep(400); sb.append("e"); Task.now("e")} 
17//> t5  : scalaz.concurrent.Task[String] = [email protected] 
18 val t6 = Task.fork { Thread.sleep(100); sb.append("f"); Task.now("f")} 
19//> t6  : scalaz.concurrent.Task[String] = [email protected] 
20 val r = Nondeterminism[Task].nmap6(t1,t2,t3,t4,t5,t6)(List(_,_,_,_,_,_)) 
21//> r  : scalaz.concurrent.Task[List[String]] = [email protected]  
22 r.unsafePerformSync              //> res4: List[String] = List(a, b, c, d, e, f)

看个耗时算法的并行运算吧:

 1 def seqFib(n: Int): Task[Int] =  n match { 
 2   case 0 | 1 => Task now  1 
 3   case n => { 
 4     for { 
 5       x <- seqFib(n-1) 
 6       y <- seqFib(n-2) 
 7     } yield x + y 
 8   } 
 9  }                                                //> seqFib: (n: Int)scalaz.concurrent.Task[Int] 
10  //并行算法 
11  def parFib(n: Int): Task[Int] = n match { 
12     case 0 | 1 => Task now 1 
13     case n => { 
14        val ND = Nondeterminism[Task] 
15        for { 
16          pair <- ND.both(parFib(n-1), parFib(n-2)) 
17          (x,y) = pair 
18        } yield x + y 
19     } 
20  }                                                //> parFib: (n: Int)scalaz.concurrent.Task[Int] 
21  def msFib(n: Int, fib: Int => Task[Int]) = for { 
22    b <- Task now { System.currentTimeMillis() } 
23    a <- fib(n) 
24    e <- Task now { System.currentTimeMillis() } 
25  } yield (a, (e-b))                               //> msFib: (n: Int, fib: Int => scalaz.concurrent.Task[Int])scalaz.concurrent.T 
26                                                   //| ask[(Int, Long)] 
27   
28  msFib(20, parFib).unsafePerformSync              //> res3: (Int, Long) = (10946,373) 
29  msFib(20, seqFib).unsafePerformSync              //> res4: (Int, Long) = (10946,17)

哎呀!奇怪了,为什么并行算法要慢很多呢?这个问题暂且放一放,以后再研究。当然,如果有读者能给出个解释就太感激了。
Task的线程池是如何分配的呢?看看Task.apply和Task.fork:

 /** Create a `Task` that will evaluate `a` using the given `ExecutorService`. */ 
  def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Task[A] = 
    new Task(Future(Try(a))(pool)) 
def fork[A](a: => Task[A])(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Task[A] = 
    apply(a).join 
//Future.apply 
/** Create a `Future` that will evaluate `a` using the given `ExecutorService`. */ 
  def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Future[A] = Async { cb => 
    pool.submit { new Callable[Unit] { def call = cb(a).run }} 
 

这两个函数都包括了一个隐式参数implicit pool: ExecutorService。默认值是Strategy.DefultExecutorService。我们可以这样指定线程池:

1  Task {longProcess}(myExecutorService) 
2  Task.fork { Task {longProcess} }(myExecutorService)

下面是一个动态指定线程池的例子:

 1 import java.util.concurrent.{ExecutorService,Executors} 
 2 type Delegated[A] = Kleisli[Task,ExecutorService,A] 
 3 def delegate: Delegated[ExecutorService] = Kleisli(e => Task.now(e)) 
 4                             //> delegate: => demo.ws.task.Delegated[java.util.concurrent.ExecutorService] 
 5 implicit def delegateTaskToPool[A](ta: Task[A]): Delegated[A] = Kleisli(x => ta) 
 6//> delegateTaskToPool: [A](ta: scalaz.concurrent.Task[A])demo.ws.task.Delegated[A] 
 7 val tPrg = for { 
 8   p <- delegate 
 9   b <- Task("x")(p) 
10   c <- Task("y")(p) 
11 } yield c                   //> tPrg  : scalaz.Kleisli[scalaz.concurrent.Task,java.util.concurrent.Executor 
12                             //| Service,String] = Kleisli(<function1>) 
13 tPrg.run(Executors.newFixedThreadPool(3)).unsafePerformSync 
14                             //> res3: String = y

当然,Task和scala Future之间是可以相互转换的:

 1 import scala.concurrent.{Future => sFuture} 
 2 import scala.util.{Success,Failure} 
 3 import scala.concurrent.ExecutionContext 
 4 def futureToTask[A](fut: sFuture[A])(implicit ec: ExecutionContext): Task[A] = 
 5   Task.async { 
 6     cb => 
 7       fut.onComplete { 
 8         case Success(a) => cb(a.right) 
 9         case Failure(e) => cb(e.left) 
10       } 
11   } 
12 def taskToFuture[A](ta: Task[A]): sFuture[A] = { 
13   val prom = scala.concurrent.Promise[A] 
14   ta.unsafePerformAsync { 
15     case -//(e) => prom.failure(e) 
16     case //-(a) => prom.success(a) 
17   } 
18   prom.future 
19 }

与Future不同的是:Task增加了异常处理机制。

 

 

 

 

 

 

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/industrynews/12903.html

(0)
上一篇 2021年7月19日 15:01
下一篇 2021年7月19日 15:01

相关推荐

发表回复

登录后才能评论