FunDA(16)- 示范:整合并行运算 – total parallelism solution详解编程语言

   在对上两篇讨论中我们介绍了并行运算的两种体现方式:并行构建数据源及并行运算用户自定义函数。我们分别对这两部分进行了示范。本篇我准备示范把这两种情况集成一体的并行运算模式。这次介绍的数据源并行构建方式也与前面描述的有所不同:在前面讨论里我们预知需要从三个独立流来并行构建数据源。但如果我们有一个不知长度的数据流,它的每个元素代表不同的数据流,应该如何处理。我们知道在AQMRPT表里有从1999年到2xxx年的空气质量测量数据,我们可以试着并行把按年份生成的数据流构建成一个数据源。直接使用上期示范中的铺垫代码包括NORMAQM表初始化和从STATES和COUNTIES里用名称搜索对应id的函数:

  val db = Database.forConfig("h2db") 
 
  //drop original table schema 
  val futVectorTables = db.run(MTable.getTables) 
 
  val futDropTable = futVectorTables.flatMap{ tables => { 
    val tableNames = tables.map(t => t.name.name) 
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) 
      db.run(NORMAQMQuery.schema.drop) 
    else Future() 
  } 
  }.andThen { 
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ") 
    case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}") 
  } 
  Await.ready(futDropTable,Duration.Inf) 
 
  //create new table to refine AQMRawTable 
  val actionCreateTable = Models.NORMAQMQuery.schema.create 
  val futCreateTable = db.run(actionCreateTable).andThen { 
    case Success(_) => println("Table created successfully!") 
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}") 
  } 
  //would carry on even fail to create table 
  Await.ready(futCreateTable,Duration.Inf) 
 
 
  //truncate data, only available in slick 3.2.1 
  val futTruncateTable = futVectorTables.flatMap{ tables => { 
    val tableNames = tables.map(t => t.name.name) 
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) 
      db.run(NORMAQMQuery.schema.truncate) 
    else Future() 
  } 
  }.andThen { 
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!") 
    case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}") 
  } 
  Await.ready(futDropTable,Duration.Inf) 
 
  //a conceived task for the purpose of resource consumption 
  //getting id with corresponding name from STATES table 
  def getStateID(state: String): Int = { 
    //create a stream for state id with state name 
    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name) 
    val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _) 
    val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq 
    //constructed a Stream[Task,String] 
    val stateStream =  fda_staticSource(stateSeq)() 
    var id  = -1 
    def getid: FDAUserTask[FDAROW] = row => { 
      row match { 
        case StateModel(stid,stname) =>   //target row type 
          if (stname.contains(state)) { 
            id = stid 
            fda_break      //exit 
          } 
          else fda_skip   //take next row 
        case _ => fda_skip 
      } 
    } 
    stateStream.appendTask(getid).startRun 
    id 
  } 
  //another conceived task for the purpose of resource consumption 
  //getting id with corresponding names from COUNTIES table 
  def getCountyID(state: String, county: String): Int = { 
    //create a stream for county id with state name and county name 
    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name) 
    val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _) 
    val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq 
    //constructed a Stream[Task,String] 
    val countyStream =  fda_staticSource(countySeq)() 
    var id  = -1 
    def getid: FDAUserTask[FDAROW] = row => { 
      row match { 
        case CountyModel(cid,cname) =>   //target row type 
          if (cname.contains(state) && cname.contains(county)) { 
            id = cid 
            fda_break      //exit 
          } 
          else fda_skip   //take next row 
        case _ => fda_skip 
      } 
    } 
    countyStream.appendTask(getid).startRun 
    id 
  }

以及两个用户自定义函数:

  //process input row and produce action row to insert into NORMAQM 
  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => { 
    row match { 
      case aqm: AQMRPTModel => 
        if (aqm.valid) { 
          val stateId = getStateID(aqm.state) 
          val countyId = getCountyID(aqm.state,aqm.county) 
          val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total) 
          fda_next(FDAActionRow(action)) 
        } 
        else fda_skip 
      case _ => fda_skip 
    } 
  } 
  //runner for the action rows 
  val runner = FDAActionRunner(slick.jdbc.H2Profile) 
  def runInsertAction: FDAUserTask[FDAROW] = row => 
    row match { 
      case FDAActionRow(action) => 
        runner.fda_execAction(action)(db) 
        fda_skip 
      case _ => fda_skip 
    }

跟着是本篇新增代码,我们先构建一个所有年份的流:

 //create parallel sources 
  //get a stream of years 
  val qryYears = AQMRPTQuery.map(_.year).distinct 
  case class Years(year: Int) extends FDAROW 
 
  implicit def toYears(y: Int) = Years(y) 
 
  val yearViewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toYears _) 
  val yearSeq = yearViewLoader.fda_typedRows(qryYears.result)(db).toSeq 
  val yearStream = fda_staticSource(yearSeq)()

下面是一个按年份从AQMRPT表读取数据的函数:

  //strong row type 
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) = 
    AQMRPTModel(row.rid, row.mid, row.state, row.county, row.year, row.value, row.total, row.valid) 
 
  //shared stream loader when operate in parallel mode 
  val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _) 
 
  //loading rows with year yr 
  def loadRowsInYear(yr: Int) = { 
    //a new query 
    val query = AQMRPTQuery.filter(row => row.year === yr) 
    //reuse same loader 
    AQMRPTLoader.fda_typedStream(query.result)(db)(256, 256)() 
  }

我们可以预见多个loadRowsInYear函数实例会共享统一的FDAStreamLoader AQMRPTLoader。用户自定义数据读取函数类型是FDASourceLoader。下面是FDASourceLoader示范代码:

  //loading rows by year 
  def loadRowsByYear: FDASourceLoader = row => { 
    row match { 
      case Years(y) => loadRowsInYear(y) //produce stream of the year 
      case _ => fda_appendRow(FDANullRow) 
    } 
 
  }

我们用toParSource构建一个并行数据源:

  //get parallel source constructor 
  val parSource = yearStream.toParSource(loadRowsByYear)

用fda_par_source来把并行数据源转换成统一数据流:

  //produce a stream from parallel sources 
  val source = fda_par_source(parSource)(3)

source是个FDAPipeLine,可以直接运算:source.startRun,也可以在后面挂上多个环节。下面我们把其它两个用户自定义函数转成并行运算函数后接到source后面:

  //the following is a process of composition of stream combinators 
  //get parallel source constructor 
  val parSource = yearStream.toParSource(loadRowsByYear) 
 
  //implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool") 
  //produce a stream from parallel sources 
  val source = fda_par_source(parSource)(3) 
  //turn getIdsThenInsertAction into parallel task 
  val parTasks = source.toPar(getIdsThenInsertAction) 
  //runPar to produce a new stream 
  val actionStream =fda_runPar(parTasks)(3) 
  //turn runInsertAction into parallel task 
  val parRun = actionStream.toPar(runInsertAction) 
  //runPar and carry out by startRun 
  fda_runPar(parRun)(2).startRun

下面是本次示范的完整源代码: 

import slick.jdbc.meta._ 
import com.bayakala.funda._ 
import api._ 
import scala.language.implicitConversions 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.duration._ 
import scala.concurrent.{Await, Future} 
import scala.util.{Failure, Success} 
import slick.jdbc.H2Profile.api._ 
import Models._ 
import fs2.Strategy 
 
object ParallelExecution extends App { 
 
  val db = Database.forConfig("h2db") 
 
  //drop original table schema 
  val futVectorTables = db.run(MTable.getTables) 
 
  val futDropTable = futVectorTables.flatMap{ tables => { 
    val tableNames = tables.map(t => t.name.name) 
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) 
      db.run(NORMAQMQuery.schema.drop) 
    else Future() 
  } 
  }.andThen { 
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ") 
    case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}") 
  } 
  Await.ready(futDropTable,Duration.Inf) 
 
  //create new table to refine AQMRawTable 
  val actionCreateTable = Models.NORMAQMQuery.schema.create 
  val futCreateTable = db.run(actionCreateTable).andThen { 
    case Success(_) => println("Table created successfully!") 
    case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}") 
  } 
  //would carry on even fail to create table 
  Await.ready(futCreateTable,Duration.Inf) 
 
 
  //truncate data, only available in slick 3.2.1 
  val futTruncateTable = futVectorTables.flatMap{ tables => { 
    val tableNames = tables.map(t => t.name.name) 
    if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) 
      db.run(NORMAQMQuery.schema.truncate) 
    else Future() 
  } 
  }.andThen { 
    case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!") 
    case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}") 
  } 
  Await.ready(futDropTable,Duration.Inf) 
 
  //a conceived task for the purpose of resource consumption 
  //getting id with corresponding name from STATES table 
  def getStateID(state: String): Int = { 
    //create a stream for state id with state name 
    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name) 
    val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _) 
    val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq 
    //constructed a Stream[Task,String] 
    val stateStream =  fda_staticSource(stateSeq)() 
    var id  = -1 
    def getid: FDAUserTask[FDAROW] = row => { 
      row match { 
        case StateModel(stid,stname) =>   //target row type 
          if (stname.contains(state)) { 
            id = stid 
            fda_break      //exit 
          } 
          else fda_skip   //take next row 
        case _ => fda_skip 
      } 
    } 
    stateStream.appendTask(getid).startRun 
    id 
  } 
  //another conceived task for the purpose of resource consumption 
  //getting id with corresponding names from COUNTIES table 
  def getCountyID(state: String, county: String): Int = { 
    //create a stream for county id with state name and county name 
    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name) 
    val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _) 
    val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq 
    //constructed a Stream[Task,String] 
    val countyStream =  fda_staticSource(countySeq)() 
    var id  = -1 
    def getid: FDAUserTask[FDAROW] = row => { 
      row match { 
        case CountyModel(cid,cname) =>   //target row type 
          if (cname.contains(state) && cname.contains(county)) { 
            id = cid 
            fda_break      //exit 
          } 
          else fda_skip   //take next row 
        case _ => fda_skip 
      } 
    } 
    countyStream.appendTask(getid).startRun 
    id 
  } 
 
  //process input row and produce action row to insert into NORMAQM 
  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => { 
    row match { 
      case aqm: AQMRPTModel => 
        if (aqm.valid) { 
          val stateId = getStateID(aqm.state) 
          val countyId = getCountyID(aqm.state,aqm.county) 
          val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total) 
          fda_next(FDAActionRow(action)) 
        } 
        else fda_skip 
      case _ => fda_skip 
    } 
  } 
  //runner for the action rows 
  val runner = FDAActionRunner(slick.jdbc.H2Profile) 
  def runInsertAction: FDAUserTask[FDAROW] = row => 
    row match { 
      case FDAActionRow(action) => 
        runner.fda_execAction(action)(db) 
        fda_skip 
      case _ => fda_skip 
    } 
 
  //create parallel sources 
  //get a stream of years 
  val qryYears = AQMRPTQuery.map(_.year).distinct 
  case class Years(year: Int) extends FDAROW 
 
  implicit def toYears(y: Int) = Years(y) 
 
  val yearViewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toYears _) 
  val yearSeq = yearViewLoader.fda_typedRows(qryYears.result)(db).toSeq 
  val yearStream = fda_staticSource(yearSeq)() 
 
  //strong row type 
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) = 
    AQMRPTModel(row.rid, row.mid, row.state, row.county, row.year, row.value, row.total, row.valid) 
 
  //shared stream loader when operate in parallel mode 
  val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _) 
 
  //loading rows with year yr 
  def loadRowsInYear(yr: Int) = { 
    //a new query 
    val query = AQMRPTQuery.filter(row => row.year === yr) 
    //reuse same loader 
    AQMRPTLoader.fda_typedStream(query.result)(db)(256, 256)() 
  } 
 
  //loading rows by year 
  def loadRowsByYear: FDASourceLoader = row => { 
    row match { 
      case Years(y) => loadRowsInYear(y) //produce stream of the year 
      case _ => fda_appendRow(FDANullRow) 
    } 
 
  } 
 
 
  //start counter 
  val cnt_start = System.currentTimeMillis() 
 
  def showRecord: FDAUserTask[FDAROW] = row => { 
    row match { 
      case Years(y) => println(y); fda_skip 
      case aqm: AQMRPTModel => 
        println(s"${aqm.year}  $aqm") 
        fda_skip 
      case FDAActionRow(action) => 
        println(s"${action}") 
        fda_skip 
      case _ => fda_skip 
    } 
  } 
 
  //the following is a process of composition of stream combinators 
  //get parallel source constructor 
  val parSource = yearStream.toParSource(loadRowsByYear) 
 
  //implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool") 
  //produce a stream from parallel sources 
  val source = fda_par_source(parSource)(3) 
  //turn getIdsThenInsertAction into parallel task 
  val parTasks = source.toPar(getIdsThenInsertAction) 
  //runPar to produce a new stream 
  val actionStream =fda_runPar(parTasks)(3) 
  //turn runInsertAction into parallel task 
  val parRun = actionStream.toPar(runInsertAction) 
  //runPar and carry out by startRun 
  fda_runPar(parRun)(2).startRun 
 
  println(s"processing 219400 rows parallelly  in ${(System.currentTimeMillis - cnt_start)/1000} seconds") 
 
 
 
}

 

 

 

 

 

 

 

 

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/12864.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论