Overview
- 回顾python中的函数式编程
- python中的map和reduce函数
- 用map写并行代码
- Map-Reduce编程模型
- 用python写spark程序
Reading
-
Introduction to Parallel Computing, Blaise Barney, Lawrence Livermore National Laboratory.
-
Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified Data Processing on Large Clusters. Communications of the ACM, 51(1), 107–113.
-
Chapters 1 and 3 of Karau, H., Wendell, P., & Zaharia, M. (2015). Learning Spark: Lightning-Fast Big Data Analysis. O’Reilly.
Functional programming
考虑以下代码:
def double_everything_in(data): result = [] for i in data: result.append(2 * i) return result def quadruple_everything_in(data): result = [] for i in data: result.append(4 * i) return result double_everything_in([1, 2, 3, 4, 5]) [2, 4, 6, 8, 10] quadruple_everything_in([1, 2, 3, 4, 5]) [4, 8, 12, 16, 20]
- 上述代码没有很好的践行软件工程中“不要重复自己”的原则。
- 应该如何避免重复呢?
def multiply_by_x_everything_in(x, data): result = [] for i in data: result.append(x * i) return result multiply_by_x_everything_in(2, [1, 2, 3, 4, 5]) [2, 4, 6, 8, 10] multiply_by_x_everything_in(4, [1, 2, 3, 4, 5]) [4, 8, 12, 16, 20]
- 再考虑下面的代码
def squared(x): return x*x def double(x): return x*2 def square_everything_in(data): result = [] for i in data: result.append(squared(i)) return result def double_everything_in(data): result = [] for i in data: result.append(double(i)) return result square_everything_in([1, 2, 3, 4, 5]) [1, 4, 9, 16, 25] double_everything_in([1, 2, 3, 4, 5]) [2, 4, 6, 8, 10]
- 应该如何避免重复呢
把函数作为值
def apply_f_to_everything_in(f, data): result = [] for x in data: result.append(f(x)) return result apply_f_to_everything_in(squared, [1, 2, 3, 4, 5]) [1, 4, 9, 16, 25] apply_f_to_everything_in(double, [1, 2, 3, 4, 5]) [2, 4, 6, 8, 10]
- Lambda表达式:每次想用map的时候又不得不定义一个函数的时候可以用匿名函数。
apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5])
Python’s map function
python中有一个内置的mpo函数比我们自己写的快很多。
map(lambda x: x*x, [1, 2, 3, 4, 5]) [1, 4, 9, 16, 25]
Implementing reduce
- reduce函数有一个fold的例子
- 有好几种实现fold的方法
- 下面的方式叫做left fold
def foldl(f, data, z): if (len(data) == 0): print z return z else: head = data[0] tail = data[1:] print "Folding", head, "with", tail, "using", z partial_result = f(z, data[0]) print "Partial result is", partial_result return foldl(f, tail, partial_result) def add(x, y): return x + y foldl(add, [1, 2, 3, 4, 5], 0) Folding 1 with [2, 3, 4, 5] using 0 Partial result is 1 Folding 2 with [3, 4, 5] using 1 Partial result is 3 Folding 3 with [4, 5] using 3 Partial result is 6 Folding 4 with [5] using 6 Partial result is 10 Folding 5 with [] using 10 Partial result is 15 15
- 用lambda表达式也一样
foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0) Folding 1 with [2, 3, 4, 5] using 0 Partial result is 1 Folding 2 with [3, 4, 5] using 1 Partial result is 3 Folding 3 with [4, 5] using 3 Partial result is 6 Folding 4 with [5] using 6 Partial result is 10 Folding 5 with [] using 10 Partial result is 15 15
Python's reduce function.
python内部的reduce函数是left fold
reduce(lambda x, y: x + y, [1, 2, 3, 4, 5]) 15 reduce(lambda x, y: x - y, [1, 2, 3, 4, 5], 0) -15
Functional programming and parallelism
- 函数式编程用在并行编程里
- map函数可以通过数据级的并行化轻松实现并行计算
- 把函数作为参数传递进去可以避免一些副作用
def perform_computation(f, result, data, i): print "Computing the ", i, "th result..." # This could be scheduled on a different CPU result[i] = f(data[i]) def my_map(f, data): result = [None] * len(data) for i in range(len(data)): perform_computation(f, result, data, i) # Wait for other CPUs to finish, and then.. return result my_map(lambda x: x * x, [1, 2, 3, 4, 5]) Computing the 0 th result... Computing the 1 th result... Computing the 2 th result... Computing the 3 th result... Computing the 4 th result... [1, 4, 9, 16, 25]
A multi-threaded map function
from threading import Thread def schedule_computation_threaded(f, result, data, threads, i): # Each function evaluation is scheduled on a different core. def my_job(): print "Processing data:", data[i], "... " result[i] = f(data[i]) print "Finished job #", i print "Result was", result[i] threads[i] = Thread(target=my_job) def my_map_multithreaded(f, data): n = len(data) result = [None] * n threads = [None] * n print "Scheduling jobs.. " for i in range(n): schedule_computation_threaded(f, result, data, threads, i) print "Starting jobs.. " for i in range(n): threads[i].start() print "Waiting for jobs to finish.. " for i in range(n): threads[i].join() print "All done." return result my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5]) Scheduling jobs.. Starting jobs.. Processing data: 1 ... Finished job # 0 Result was 1 Processing data: 2 ... Finished job # 1 Result was 4 Processing data: 3 ... Finished job # 2 Result was 9 Processing data: 4 ... Finished job # 3 Result was 16 Processing data: 5 ... Finished job # 4 Result was 25 Waiting for jobs to finish.. All done. [1, 4, 9, 16, 25]
from numpy.random import uniform from time import sleep def a_function_which_takes_a_long_time(x): sleep(uniform(2, 10)) # Simulate some long computation return x*x my_map_multithreaded(a_function_which_takes_a_long_time, [1, 2, 3, 4, 5]) Scheduling jobs.. Starting jobs.. Processing data: 1 ... Processing data: 2 ... Processing data: 3 ... Processing data: 4 ... Processing data: 5 ... Waiting for jobs to finish.. Finished job # 4 Result was 25 Finished job # 0 Result was 1 Finished job # 3 Result was 16 Finished job # 2 Result was 9 Finished job # 1 Result was 4 All done. Out[31]: [1, 4, 9, 16, 25]
Map Reduce
- map reduce是一种大规模并行处理的编程模型
- 大规模意味着它可以借助大量的计算集群来处理大数据
- 有很多实现:hadoop和spark
- 我们可以用任何语言实现map-reduce:hadoop里用java,spark用scala,但也有python接口
- python或者scala非常适合map-reduce模型,但我们不必函数式编程
- mapreduce的实现中关注了底层的功能操作,我们不必担心。
Typical steps in a Map Reduce Computation
- ETL一个数据集
- Map操作:每一行提取你关心的信息
- "Shuffle and Sort":task/node allocation
- Reduce操作:aggregate、summaries、filter or transform
- 保存结果
Callbacks for Map Reduce
- 数据集和每一步的计算状态,都以键值对的形式表现
- map(k,v)→⟨k′,v′⟩∗
- reduce(k′,⟨k′,v′⟩∗)→⟨k′,v″⟩∗
- *指的是值的collection
- colletions并不是有序的
Resilient Distributed Data
- 在map-reduce计算中这些collections被称作RDDs:
- 数据在分布在节点之间
- 单个节点的损坏不会导致数据丢失
- 数据一般存在HBase或者HDFS中
- map和reduce函数可以不同的keys、elements实现并行化
Word Count Example
- 在这个例子里,输入是一系列URL,每一个记录篇一篇文档
- 问题:在数据集中每个单词出现了多少次
Word Count: Map
- 输入数据进行map:
- Key: URL
- Value: 文档内容
⟨document1,tobeornottobe⟩
- 我们需要用map处理给定的URL
- Key: word
- Value: 1
- 我们原始的数据集会被转化成:
⟨to,1⟩
⟨be,1⟩
⟨or,1⟩
⟨not,1⟩
⟨to,1⟩
⟨be,1⟩
⟨be,1⟩
⟨or,1⟩
⟨not,1⟩
⟨to,1⟩
⟨be,1⟩
Word Count: Reduce
- reduce操作按照key来对values进行分组,然后执行在每个key上执行reduce。
- mapreduce会折叠计算数算来最小化数据复制的操作。
- 不同分区的数据单独进行reduce
- 算子的选择是很重要的,需要累加和结合。
- 在这个例子是函数是+运算符
⟨be,2⟩
⟨not,1⟩
⟨or,1⟩
⟨to,2⟩
⟨not,1⟩
⟨or,1⟩
⟨to,2⟩
MiniMapReduce
- 为了理解map-reduce编程模型是如何运作的,我们在python里实现一个自己的map-reduce框架
- 但这并不是hadoop或者spark的实际实现方式
########################################################## # # MiniMapReduce # # A non-parallel, non-scalable Map-Reduce implementation ########################################################## def groupByKey(data): result = dict() for key, value in data: if key in result: result[key].append(value) else: result[key] = [value] return result def reduceByKey(f, data): key_values = groupByKey(data) return map(lambda key: (key, reduce(f, key_values[key])), key_values)
Word-count using MiniMapReduce
data = map(lambda x: (x, 1), "to be or not to be".split()) data [('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)] groupByKey(data) {'be': [1, 1], 'not': [1], 'or': [1], 'to': [1, 1]} reduceByKey(lambda x, y: x + y, data) [('not', 1), ('to', 2), ('or', 1), ('be', 2)]
Parallelising MiniMapReduce
我们可以轻松的把刚才的map-reduce实现改成并行框架,利用刚才的my_map_mutithreaded函数就可以。
def reduceByKey_multithreaded(f, data): key_values = groupByKey(data) return my_map_multithreaded( lambda key: (key, reduce(f, key_values[key])), key_values.keys()) reduceByKey_multithreaded(lambda x, y: x + y, data) Scheduling jobs.. Starting jobs.. Processing data: not ... Finished job # 0 Result was ('not', 1) Processing data: to ... Finished job # 1 Result was ('to', 2) Processing data: or ... Finished job # 2 Result was ('or', 1) Processing data: be ... Finished job # 3 Result was ('be', 2) Waiting for jobs to finish.. All done. [('not', 1), ('to', 2), ('or', 1), ('be', 2)]
Parallelising the reduce step
- 假设我们的算子事累加和结合的,我们也可以并行reduce操作
- 把数据大概分成相等的子集
- 在单独的计算核心上独立reduce每一个子集
- 最后把各个结果组合起来
Partitioning the data¶
def split_data(data, split_points): partitions = [] n = 0 for i in split_points: partitions.append(data[n:i]) n = i partitions.append(data[n:]) return partitions data = ['a', 'b', 'c', 'd', 'e', 'f', 'g'] partitioned_data = split_data(data, [3]) partitioned_data
Reducing across partitions in parallel
from threading import Thread def parallel_reduce(f, partitions): n = len(partitions) results = [None] * n threads = [None] * n def job(i): results[i] = reduce(f, partitions[i]) for i in range(n): threads[i] = Thread(target = lambda: job(i)) threads[i].start() for i in range(n): threads[i].join() return reduce(f, results) parallel_reduce(lambda x, y: x + y, partitioned_data)
Apache Spark and Map-Reduce
- 我们可以用高层函数map一个RDDs到一个新的RDDs.
- 每个RDD的实例都至少有两个相应的MapRuduce工作流:map , reduceByKey
- 这些方法和我们之间定义的标准python collections工作原理是相同的
- 在Apache Spark API中还有额外的RDD方法
Word-count in Apache Spark
words = "to be or not to be".split()
The SparkContext class
- 当我们用Spark的时候我们需要初始化一个SparkContext.
- paralleled 方法在SparkContext中可以把任何python collection转成RDD
- 通常情况下我们通过一个大文件或者HBase表中创建RDD
words_rdd = sc.parallelize(words) words_rdd ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423
Mapping an RDD
现在当我们在my_rdd上面执行map或者reduceByKey操作的时候可以在集群上建立一个并行计算的任务。
word_tuples_rdd = words_rdd.map(lambda x: (x, 1)) word_tuples_rdd PythonRDD[1] at RDD at PythonRDD.scala:43
- 注意我们现在还没有产生结果
- 计算操作在我们请求最终结果被collect之间是不会执行的
- 通过collect()方法来激活这个计算
word_tuples_rdd.collect() [('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]
Reducing an RDD
- 但是,我们需要额外处理
word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y) word_counts_rdd PythonRDD[6] at RDD at PythonRDD.scala:43
- 现在来请求最终的结果
word_counts = word_counts_rdd.collect() word_counts [('not', 1), ('to', 2), ('or', 1), ('be', 2)]
Lazy evaluation
- 只有当我们进行collect()的时候集群才会进行计算
- collect() 会同时激活map和reduceByKey操作
- 如果结果collection非常大,那么这个操作开销是很大的
The head of an RDD
- take方法和collect类似,但是只返回前n个元素。
- take在测试的时候非常有用
word_counts_rdd.take(2) [('not', 1), ('to', 2)]
The complete word-count example
text = "to be or not to be".split() rdd = sc.parallelize(text) counts = rdd.map(lambda word: (word, 1)) / .reduceByKey(lambda x, y: x + y) counts.collect() [('not', 1), ('to', 2), ('or', 1), ('be', 2)]
Additional RDD transformations
spark提供了很多额外的collections上的操作
- Sorting:
sortByKey,
sortBy,
takeOrdered
- Mapping:
flatMap
- Filtering:
filter
- Counting:
count
- Set-theoretic:
intersection,
union
Creating an RDD from a text file
- 上面的例子例,我们从collection对象中创建了一个RDD
- 这不是典型的处理大数据的方法
- 更常用的方式直接从HDFS文件或者HBase表中创建RDD
- 下面的例子将会从一个纯ext4文件系统中创建RDD
- 每一个RDD对应了文本中的一行
genome = sc.textFile('/tmp/genome.txt')
Genome example
- 我们将会中这个RDD进行计算,并且根据词频来进行排序
- 首先我们定义一个函数,把序列切成指定大小
def group_characters(line, n=5): result = '' i = 0 for ch in line: result = result + ch i = i + 1 if (i % n) == 0: yield result result = '' def group_and_split(line): return [sequence for sequence in group_characters(line)] group_and_split('abcdefghijklmno') ['abcde', 'fghij', 'klmno']
- 现在我们要把原始的RDD转换成键值对的形式,key是这个序列,value是1
- 注意如果我们简单的把每一行进行map,会得到一个高维的数据
genome.map(group_and_split).take(2) [[u'CAGGG', u'GCACA', u'GTCTC', u'GGCTC', u'ACTTC', u'GACCT', u'CTGCC', u'TCCCC', u'AGTTC', u'AAGTG', u'ATTCT', u'CCTGC', u'CTCAG', u'TCTCC'], [u'TGAGT', u'AGCTG', u'GGATG', u'ACAGG', u'AGTGG', u'AGCAT', u'GCCTA', u'GCTAA', u'TCTTT', u'GTATT', u'TCTAG', u'TAGAG', u'ATGCG', u'GTTTT']]
Flattening an RDD using flatMap
- 我们需要把数据转成序列形式的,所以使用flatMap方法
sequences = genome.flatMap(group_and_split) sequences.take(3) [u'CAGGG', u'GCACA', u'GTCTC']
counts = / sequences.map( lambda w: (w, 1)).reduceByKey(lambda x, y: x + y) counts.take(10) [(u'TGTCA', 1), (u'GCCCA', 3), (u'CCAAG', 5), (u'GCCCC', 4), (u'CATGT', 1), (u'AGATT', 1), (u'TGTTT', 1), (u'CCTAT', 4), (u'TCAGT', 1), (u'CAGCG', 2)]
- 我们根据计数队序列进行排序
- 因此key(第一个元素)应该是计数值
- 我们需要颠倒一下tuples的顺序
def reverse_tuple(key_value_pair): return (key_value_pair[1], key_value_pair[0]) sequences = counts.map(reverse_tuple) sequences.take(10) [(1, u'TGTCA'), (3, u'GCCCA'), (5, u'CCAAG'), (4, u'GCCCC'), (1, u'CATGT'), (1, u'AGATT'), (1, u'TGTTT'), (4, u'CCTAT'), (1, u'TCAGT'), (2, u'CAGCG')]
Sorting an RDD
- 现在我们可以降序的方法对key进行排序
sequences_sorted = sequences.sortByKey(False) top_ten_sequences = sequences_sorted.take(10) top_ten_sequences [(15, u'AAAAA'), (9, u'GCAGG'), (8, u'ACAAA'), (7, u'GGCCA'), (7, u'AATTA'), (7, u'AGGTT'), (7, u'AGGGA'), (7, u'CCAGG'), (7, u'GAGCC'), (7, u'AAAAC')]
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9067.html