Overview
- 回顾python中的函数式编程
- python中的map和reduce函数
- 用map写并行代码
- Map-Reduce编程模型
- 用python写spark程序
Reading
-
Introduction to Parallel Computing, Blaise Barney, Lawrence Livermore National Laboratory.
-
Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified Data Processing on Large Clusters. Communications of the ACM, 51(1), 107–113.
-
Chapters 1 and 3 of Karau, H., Wendell, P., & Zaharia, M. (2015). Learning Spark: Lightning-Fast Big Data Analysis. O’Reilly.
Functional programming
考虑以下代码:
def double_everything_in(data): result = [] for i in data: result.append(2 * i) return result def quadruple_everything_in(data): result = [] for i in data: result.append(4 * i) return result double_everything_in([1, 2, 3, 4, 5]) [2, 4, 6, 8, 10] quadruple_everything_in([1, 2, 3, 4, 5]) [4, 8, 12, 16, 20]
- 上述代码没有很好的践行软件工程中“不要重复自己”的原则。
- 应该如何避免重复呢?
def multiply_by_x_everything_in(x, data): result = [] for i in data: result.append(x * i) return result multiply_by_x_everything_in(2, [1, 2, 3, 4, 5]) [2, 4, 6, 8, 10] multiply_by_x_everything_in(4, [1, 2, 3, 4, 5]) [4, 8, 12, 16, 20]
- 再考虑下面的代码
def squared(x): return x*x def double(x): return x*2 def square_everything_in(data): result = [] for i in data: result.append(squared(i)) return result def double_everything_in(data): result = [] for i in data: result.append(double(i)) return result square_everything_in([1, 2, 3, 4, 5]) [1, 4, 9, 16, 25] double_everything_in([1, 2, 3, 4, 5]) [2, 4, 6, 8, 10]
- 应该如何避免重复呢
把函数作为值
def apply_f_to_everything_in(f, data): result = [] for x in data: result.append(f(x)) return result apply_f_to_everything_in(squared, [1, 2, 3, 4, 5]) [1, 4, 9, 16, 25] apply_f_to_everything_in(double, [1, 2, 3, 4, 5]) [2, 4, 6, 8, 10]
- Lambda表达式:每次想用map的时候又不得不定义一个函数的时候可以用匿名函数。
apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5])
Python’s map function
python中有一个内置的mpo函数比我们自己写的快很多。
map(lambda x: x*x, [1, 2, 3, 4, 5])
[1, 4, 9, 16, 25]
Implementing reduce
- reduce函数有一个fold的例子
- 有好几种实现fold的方法
- 下面的方式叫做left fold
def foldl(f, data, z):
if (len(data) == 0):
print z
return z
else:
head = data[0]
tail = data[1:]
print "Folding", head, "with", tail, "using", z
partial_result = f(z, data[0])
print "Partial result is", partial_result
return foldl(f, tail, partial_result)
def add(x, y):
return x + y
foldl(add, [1, 2, 3, 4, 5], 0)
Folding 1 with [2, 3, 4, 5] using 0
Partial result is 1
Folding 2 with [3, 4, 5] using 1
Partial result is 3
Folding 3 with [4, 5] using 3
Partial result is 6
Folding 4 with [5] using 6
Partial result is 10
Folding 5 with [] using 10
Partial result is 15
15
- 用lambda表达式也一样
foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0)
Folding 1 with [2, 3, 4, 5] using 0
Partial result is 1
Folding 2 with [3, 4, 5] using 1
Partial result is 3
Folding 3 with [4, 5] using 3
Partial result is 6
Folding 4 with [5] using 6
Partial result is 10
Folding 5 with [] using 10
Partial result is 15
15
Python's reduce function.
python内部的reduce函数是left fold
reduce(lambda x, y: x + y, [1, 2, 3, 4, 5])
15
reduce(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)
-15
Functional programming and parallelism
- 函数式编程用在并行编程里
- map函数可以通过数据级的并行化轻松实现并行计算
- 把函数作为参数传递进去可以避免一些副作用
def perform_computation(f, result, data, i): print "Computing the ", i, "th result..." # This could be scheduled on a different CPU result[i] = f(data[i]) def my_map(f, data): result = [None] * len(data) for i in range(len(data)): perform_computation(f, result, data, i) # Wait for other CPUs to finish, and then.. return result my_map(lambda x: x * x, [1, 2, 3, 4, 5]) Computing the 0 th result... Computing the 1 th result... Computing the 2 th result... Computing the 3 th result... Computing the 4 th result... [1, 4, 9, 16, 25]
A multi-threaded map function
A multi-threaded map function
from threading import Thread def schedule_computation_threaded(f, result, data, threads, i): # Each function evaluation is scheduled on a different core. def my_job(): print "Processing data:", data[i], "... " result[i] = f(data[i]) print "Finished job #", i print "Result was", result[i] threads[i] = Thread(target=my_job) def my_map_multithreaded(f, data): n = len(data) result = [None] * n threads = [None] * n print "Scheduling jobs.. " for i in range(n): schedule_computation_threaded(f, result, data, threads, i) print "Starting jobs.. " for i in range(n): threads[i].start() print "Waiting for jobs to finish.. " for i in range(n): threads[i].join() print "All done." return result my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5]) Scheduling jobs.. Starting jobs.. Processing data: 1 ... Finished job # 0 Result was 1 Processing data: 2 ... Finished job # 1 Result was 4 Processing data: 3 ... Finished job # 2 Result was 9 Processing data: 4 ... Finished job # 3 Result was 16 Processing data: 5 ... Finished job # 4 Result was 25 Waiting for jobs to finish.. All done. [1, 4, 9, 16, 25]
from numpy.random import uniform
from time import sleep
def a_function_which_takes_a_long_time(x):
sleep(uniform(2, 10)) # Simulate some long computation
return x*x
my_map_multithreaded(a_function_which_takes_a_long_time, [1, 2, 3, 4, 5])
Scheduling jobs..
Starting jobs..
Processing data: 1 ...
Processing data: 2 ...
Processing data: 3 ...
Processing data: 4 ...
Processing data: 5 ...
Waiting for jobs to finish..
Finished job # 4
Result was 25
Finished job # 0
Result was 1
Finished job # 3
Result was 16
Finished job # 2
Result was 9
Finished job # 1
Result was 4
All done.
Out[31]:
[1, 4, 9, 16, 25]
Map Reduce
- map reduce是一种大规模并行处理的编程模型
- 大规模意味着它可以借助大量的计算集群来处理大数据
- 有很多实现:hadoop和spark
- 我们可以用任何语言实现map-reduce:hadoop里用java,spark用scala,但也有python接口
- python或者scala非常适合map-reduce模型,但我们不必函数式编程
- mapreduce的实现中关注了底层的功能操作,我们不必担心。
Typical steps in a Map Reduce Computation
- ETL一个数据集
- Map操作:每一行提取你关心的信息
- "Shuffle and Sort":task/node allocation
- Reduce操作:aggregate、summaries、filter or transform
- 保存结果
Callbacks for Map Reduce
- 数据集和每一步的计算状态,都以键值对的形式表现
- map(k,v)→⟨k′,v′⟩∗
- reduce(k′,⟨k′,v′⟩∗)→⟨k′,v″⟩∗
- *指的是值的collection
- colletions并不是有序的
Resilient Distributed Data
- 在map-reduce计算中这些collections被称作RDDs:
- 数据在分布在节点之间
- 单个节点的损坏不会导致数据丢失
- 数据一般存在HBase或者HDFS中
- map和reduce函数可以不同的keys、elements实现并行化
Word Count Example
- 在这个例子里,输入是一系列URL,每一个记录篇一篇文档
- 问题:在数据集中每个单词出现了多少次
Word Count: Map
- 输入数据进行map:
- Key: URL
- Value: 文档内容
⟨document1,tobeornottobe⟩
- 我们需要用map处理给定的URL
- Key: word
- Value: 1
- 我们原始的数据集会被转化成:
⟨to,1⟩
⟨be,1⟩
⟨or,1⟩
⟨not,1⟩
⟨to,1⟩
⟨be,1⟩
Word Count: Reduce
- reduce操作按照key来对values进行分组,然后执行在每个key上执行reduce。
- mapreduce会折叠计算数算来最小化数据复制的操作。
- 不同分区的数据单独进行reduce
- 算子的选择是很重要的,需要累加和结合。
- 在这个例子是函数是+运算符
⟨be,2⟩
⟨not,1⟩
⟨or,1⟩
⟨to,2⟩
MiniMapReduce
- 为了理解map-reduce编程模型是如何运作的,我们在python里实现一个自己的map-reduce框架
- 但这并不是hadoop或者spark的实际实现方式
##########################################################
#
# MiniMapReduce
#
# A non-parallel, non-scalable Map-Reduce implementation
##########################################################
def groupByKey(data):
result = dict()
for key, value in data:
if key in result:
result[key].append(value)
else:
result[key] = [value]
return result
def reduceByKey(f, data):
key_values = groupByKey(data)
return map(lambda key:
(key, reduce(f, key_values[key])),
key_values)
Word-count using MiniMapReduce
data = map(lambda x: (x, 1), "to be or not to be".split())
data
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]
groupByKey(data)
{'be': [1, 1], 'not': [1], 'or': [1], 'to': [1, 1]}
reduceByKey(lambda x, y: x + y, data)
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]
Parallelising MiniMapReduce
我们可以轻松的把刚才的map-reduce实现改成并行框架,利用刚才的my_map_mutithreaded函数就可以。
def reduceByKey_multithreaded(f, data):
key_values = groupByKey(data)
return my_map_multithreaded(
lambda key: (key, reduce(f, key_values[key])), key_values.keys())
reduceByKey_multithreaded(lambda x, y: x + y, data)
Scheduling jobs..
Starting jobs..
Processing data: not ...
Finished job # 0
Result was ('not', 1)
Processing data: to ...
Finished job # 1
Result was ('to', 2)
Processing data: or ...
Finished job # 2
Result was ('or', 1)
Processing data: be ...
Finished job # 3
Result was ('be', 2)
Waiting for jobs to finish..
All done.
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]
Parallelising the reduce step
- 假设我们的算子事累加和结合的,我们也可以并行reduce操作
- 把数据大概分成相等的子集
- 在单独的计算核心上独立reduce每一个子集
- 最后把各个结果组合起来
Partitioning the data¶
def split_data(data, split_points):
partitions = []
n = 0
for i in split_points:
partitions.append(data[n:i])
n = i
partitions.append(data[n:])
return partitions
data = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
partitioned_data = split_data(data, [3])
partitioned_data
Reducing across partitions in parallel
from threading import Thread
def parallel_reduce(f, partitions):
n = len(partitions)
results = [None] * n
threads = [None] * n
def job(i):
results[i] = reduce(f, partitions[i])
for i in range(n):
threads[i] = Thread(target = lambda: job(i))
threads[i].start()
for i in range(n):
threads[i].join()
return reduce(f, results)
parallel_reduce(lambda x, y: x + y, partitioned_data)
Apache Spark and Map-Reduce
- 我们可以用高层函数map一个RDDs到一个新的RDDs.
- 每个RDD的实例都至少有两个相应的MapRuduce工作流:map , reduceByKey
- 这些方法和我们之间定义的标准python collections工作原理是相同的
- 在Apache Spark API中还有额外的RDD方法
Word-count in Apache Spark
words = "to be or not to be".split()
The SparkContext class
- 当我们用Spark的时候我们需要初始化一个SparkContext.
- paralleled 方法在SparkContext中可以把任何python collection转成RDD
- 通常情况下我们通过一个大文件或者HBase表中创建RDD
words_rdd = sc.parallelize(words)
words_rdd
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423
Mapping an RDD
现在当我们在my_rdd上面执行map或者reduceByKey操作的时候可以在集群上建立一个并行计算的任务。
word_tuples_rdd = words_rdd.map(lambda x: (x, 1))
word_tuples_rdd
PythonRDD[1] at RDD at PythonRDD.scala:43
- 注意我们现在还没有产生结果
- 计算操作在我们请求最终结果被collect之间是不会执行的
- 通过collect()方法来激活这个计算
word_tuples_rdd.collect()
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]
Reducing an RDD
- 但是,我们需要额外处理
word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y)
word_counts_rdd
PythonRDD[6] at RDD at PythonRDD.scala:43
- 现在来请求最终的结果
word_counts = word_counts_rdd.collect()
word_counts
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]
Lazy evaluation
- 只有当我们进行collect()的时候集群才会进行计算
- collect() 会同时激活map和reduceByKey操作
- 如果结果collection非常大,那么这个操作开销是很大的
The head of an RDD
- take方法和collect类似,但是只返回前n个元素。
- take在测试的时候非常有用
word_counts_rdd.take(2)
[('not', 1), ('to', 2)]
The complete word-count example
text = "to be or not to be".split()
rdd = sc.parallelize(text)
counts = rdd.map(lambda word: (word, 1)) /
.reduceByKey(lambda x, y: x + y)
counts.collect()
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]
Additional RDD transformations
spark提供了很多额外的collections上的操作
- Sorting:
sortByKey, sortBy, takeOrdered
- Mapping:
flatMap
- Filtering:
filter
- Counting:
count
- Set-theoretic:
intersection, union
Creating an RDD from a text file
- 上面的例子例,我们从collection对象中创建了一个RDD
- 这不是典型的处理大数据的方法
- 更常用的方式直接从HDFS文件或者HBase表中创建RDD
- 下面的例子将会从一个纯ext4文件系统中创建RDD
- 每一个RDD对应了文本中的一行
genome = sc.textFile('/tmp/genome.txt')
Genome example
- 我们将会中这个RDD进行计算,并且根据词频来进行排序
- 首先我们定义一个函数,把序列切成指定大小
def group_characters(line, n=5):
result = ''
i = 0
for ch in line:
result = result + ch
i = i + 1
if (i % n) == 0:
yield result
result = ''
def group_and_split(line):
return [sequence for sequence in group_characters(line)]
group_and_split('abcdefghijklmno')
['abcde', 'fghij', 'klmno']
- 现在我们要把原始的RDD转换成键值对的形式,key是这个序列,value是1
- 注意如果我们简单的把每一行进行map,会得到一个高维的数据
genome.map(group_and_split).take(2)
[[u'CAGGG',
u'GCACA',
u'GTCTC',
u'GGCTC',
u'ACTTC',
u'GACCT',
u'CTGCC',
u'TCCCC',
u'AGTTC',
u'AAGTG',
u'ATTCT',
u'CCTGC',
u'CTCAG',
u'TCTCC'],
[u'TGAGT',
u'AGCTG',
u'GGATG',
u'ACAGG',
u'AGTGG',
u'AGCAT',
u'GCCTA',
u'GCTAA',
u'TCTTT',
u'GTATT',
u'TCTAG',
u'TAGAG',
u'ATGCG',
u'GTTTT']]
Flattening an RDD using flatMap
- 我们需要把数据转成序列形式的,所以使用flatMap方法
sequences = genome.flatMap(group_and_split)
sequences.take(3)
[u'CAGGG', u'GCACA', u'GTCTC']
counts = /
sequences.map(
lambda w: (w, 1)).reduceByKey(lambda x, y: x + y)
counts.take(10)
[(u'TGTCA', 1),
(u'GCCCA', 3),
(u'CCAAG', 5),
(u'GCCCC', 4),
(u'CATGT', 1),
(u'AGATT', 1),
(u'TGTTT', 1),
(u'CCTAT', 4),
(u'TCAGT', 1),
(u'CAGCG', 2)]
- 我们根据计数队序列进行排序
- 因此key(第一个元素)应该是计数值
- 我们需要颠倒一下tuples的顺序
def reverse_tuple(key_value_pair):
return (key_value_pair[1], key_value_pair[0])
sequences = counts.map(reverse_tuple)
sequences.take(10)
[(1, u'TGTCA'),
(3, u'GCCCA'),
(5, u'CCAAG'),
(4, u'GCCCC'),
(1, u'CATGT'),
(1, u'AGATT'),
(1, u'TGTTT'),
(4, u'CCTAT'),
(1, u'TCAGT'),
(2, u'CAGCG')]
Sorting an RDD
- 现在我们可以降序的方法对key进行排序
sequences_sorted = sequences.sortByKey(False)
top_ten_sequences = sequences_sorted.take(10)
top_ten_sequences
[(15, u'AAAAA'),
(9, u'GCAGG'),
(8, u'ACAAA'),
(7, u'GGCCA'),
(7, u'AATTA'),
(7, u'AGGTT'),
(7, u'AGGGA'),
(7, u'CCAGG'),
(7, u'GAGCC'),
(7, u'AAAAC')]
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9067.html