Map-Reduce和Spark详解大数据

Overview

  1. 回顾python中的函数式编程
  2. python中的map和reduce函数
  3. 用map写并行代码
  4. Map-Reduce编程模型
  5. 用python写spark程序

Reading

  • Introduction to Parallel Computing, Blaise Barney, Lawrence Livermore National Laboratory.

  • Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified Data Processing on Large Clusters. Communications of the ACM, 51(1), 107–113.

  • Spark Programming Guide

  • Chapters 1 and 3 of Karau, H., Wendell, P., & Zaharia, M. (2015). Learning Spark: Lightning-Fast Big Data Analysis. O’Reilly.

Functional programming

考虑以下代码:
def double_everything_in(data):     result = []     for i in data:         result.append(2 * i)     return result  def quadruple_everything_in(data):     result = []     for i in data:         result.append(4 * i)     return result  double_everything_in([1, 2, 3, 4, 5])  [2, 4, 6, 8, 10]  quadruple_everything_in([1, 2, 3, 4, 5])  [4, 8, 12, 16, 20]
  • 上述代码没有很好的践行软件工程中“不要重复自己”的原则。
  • 应该如何避免重复呢?
def multiply_by_x_everything_in(x, data):     result = []     for i in data:         result.append(x * i)     return result  multiply_by_x_everything_in(2, [1, 2, 3, 4, 5]) [2, 4, 6, 8, 10] multiply_by_x_everything_in(4, [1, 2, 3, 4, 5]) [4, 8, 12, 16, 20]
  • 再考虑下面的代码
def squared(x):     return x*x  def double(x):     return x*2  def square_everything_in(data):     result = []     for i in data:         result.append(squared(i))     return result  def double_everything_in(data):     result = []     for i in data:         result.append(double(i))     return result  square_everything_in([1, 2, 3, 4, 5]) [1, 4, 9, 16, 25] double_everything_in([1, 2, 3, 4, 5]) [2, 4, 6, 8, 10]
  • 应该如何避免重复呢

把函数作为值

def apply_f_to_everything_in(f, data): 
    result = [] 
    for x in data: 
        result.append(f(x)) 
    return result 
apply_f_to_everything_in(squared, [1, 2, 3, 4, 5]) 
[1, 4, 9, 16, 25] 
apply_f_to_everything_in(double, [1, 2, 3, 4, 5]) 
[2, 4, 6, 8, 10]
  • Lambda表达式:每次想用map的时候又不得不定义一个函数的时候可以用匿名函数。
apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5])


Python’s map function

python中有一个内置的mpo函数比我们自己写的快很多。
map(lambda x: x*x, [1, 2, 3, 4, 5]) 
[1, 4, 9, 16, 25]

Implementing reduce


  • reduce函数有一个fold的例子
  • 有好几种实现fold的方法
  • 下面的方式叫做left fold
def foldl(f, data, z): 
    if (len(data) == 0): 
        print z 
        return z 
    else: 
        head = data[0] 
        tail = data[1:] 
        print "Folding", head, "with", tail, "using", z 
        partial_result = f(z, data[0]) 
        print "Partial result is", partial_result 
        return foldl(f, tail, partial_result)  
 
def add(x, y): 
    return x + y 
 
foldl(add, [1, 2, 3, 4, 5], 0) 
 
 
Folding 1 with [2, 3, 4, 5] using 0 
Partial result is 1 
Folding 2 with [3, 4, 5] using 1 
Partial result is 3 
Folding 3 with [4, 5] using 3 
Partial result is 6 
Folding 4 with [5] using 6 
Partial result is 10 
Folding 5 with [] using 10 
Partial result is 15 
15
  • 用lambda表达式也一样
foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0) 
 
Folding 1 with [2, 3, 4, 5] using 0 
Partial result is 1 
Folding 2 with [3, 4, 5] using 1 
Partial result is 3 
Folding 3 with [4, 5] using 3 
Partial result is 6 
Folding 4 with [5] using 6 
Partial result is 10 
Folding 5 with [] using 10 
Partial result is 15 
15

Python's reduce function.

python内部的reduce函数是left fold

reduce(lambda x, y: x + y, [1, 2, 3, 4, 5]) 
15 
reduce(lambda x, y: x - y, [1, 2, 3, 4, 5], 0) 
-15 

Functional programming and parallelism

  • 函数式编程用在并行编程里
  • map函数可以通过数据级的并行化轻松实现并行计算
  •     把函数作为参数传递进去可以避免一些副作用
def perform_computation(f, result, data, i):     print "Computing the ", i, "th result..."     # This could be scheduled on a different CPU     result[i] = f(data[i])  def my_map(f, data):     result = [None] * len(data)     for i in range(len(data)):         perform_computation(f, result, data, i)     # Wait for other CPUs to finish, and then..     return result  my_map(lambda x: x * x, [1, 2, 3, 4, 5])  Computing the  0 th result... Computing the  1 th result... Computing the  2 th result... Computing the  3 th result... Computing the  4 th result... [1, 4, 9, 16, 25]

A multi-threaded map function

from threading import Thread  def schedule_computation_threaded(f, result, data, threads, i):         # Each function evaluation is scheduled on a different core.     def my_job():          print "Processing data:", data[i], "... "         result[i] = f(data[i])         print "Finished job #", i             print "Result was", result[i]             threads[i] = Thread(target=my_job)      def my_map_multithreaded(f, data):     n = len(data)     result = [None] * n     threads = [None] * n     print "Scheduling jobs.. "     for i in range(n):         schedule_computation_threaded(f, result, data, threads, i)     print "Starting jobs.. "     for i in range(n):         threads[i].start()     print "Waiting for jobs to finish.. "     for i in range(n):         threads[i].join()     print "All done."     return result  my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5])  Scheduling jobs..  Starting jobs..  Processing data: 1 ...  Finished job # 0 Result was 1 Processing data: 2 ...  Finished job # 1 Result was 4 Processing data: 3 ...  Finished job # 2 Result was 9 Processing data: 4 ...  Finished job # 3 Result was 16 Processing data: 5 ...  Finished job # 4 Result was 25 Waiting for jobs to finish..  All done. [1, 4, 9, 16, 25]

from numpy.random import uniform 
from time import sleep 
 
def a_function_which_takes_a_long_time(x): 
    sleep(uniform(2, 10))  # Simulate some long computation 
    return x*x 
 
my_map_multithreaded(a_function_which_takes_a_long_time, [1, 2, 3, 4, 5]) 
 
Scheduling jobs..  
Starting jobs..  
Processing data: 1 ...  
Processing data: 2 ...  
Processing data: 3 ...  
Processing data: 4 ...  
Processing data: 5 ...  
Waiting for jobs to finish..  
Finished job # 4 
Result was 25 
Finished job # 0 
Result was 1 
Finished job # 3 
Result was 16 
Finished job # 2 
Result was 9 
Finished job # 1 
Result was 4 
All done. 
Out[31]: 
[1, 4, 9, 16, 25]

Map Reduce

  • map reduce是一种大规模并行处理的编程模型
  • 大规模意味着它可以借助大量的计算集群来处理大数据
  • 有很多实现:hadoop和spark
  • 我们可以用任何语言实现map-reduce:hadoop里用java,spark用scala,但也有python接口
  • python或者scala非常适合map-reduce模型,但我们不必函数式编程
  • mapreduce的实现中关注了底层的功能操作,我们不必担心。

Typical steps in a Map Reduce Computation

  1. ETL一个数据集
  2. Map操作:每一行提取你关心的信息
  3. "Shuffle and Sort":task/node allocation
  4. Reduce操作:aggregate、summaries、filter or transform
  5. 保存结果

Callbacks for Map Reduce

  • 数据集和每一步的计算状态,都以键值对的形式表现
  • map(k,v)k,v
  • reduce(k,k,v)k,v
  • *指的是值的collection
  • colletions并不是有序的

Resilient Distributed Data

  • 在map-reduce计算中这些collections被称作RDDs:
    • 数据在分布在节点之间
    • 单个节点的损坏不会导致数据丢失
    • 数据一般存在HBase或者HDFS中
  • map和reduce函数可以不同的keys、elements实现并行化

Word Count Example

  • 在这个例子里,输入是一系列URL,每一个记录篇一篇文档
  • 问题:在数据集中每个单词出现了多少次

Word Count: Map

  •  输入数据进行map:
    • Key: URL
    • Value: 文档内容

document1,tobeornottobe

  • 我们需要用map处理给定的URL 
    • Key: word
    • Value: 1
  • 我们原始的数据集会被转化成:
to,1
 
be,1
 
or,1
 
not,1
 
to,1
 
be,1

Word Count: Reduce

  • reduce操作按照key来对values进行分组,然后执行在每个key上执行reduce。
  • mapreduce会折叠计算数算来最小化数据复制的操作。
  • 不同分区的数据单独进行reduce
  • 算子的选择是很重要的,需要累加和结合。
  • 在这个例子是函数是+运算符
be,2



not,1



or,1



to,2

MiniMapReduce

  • 为了理解map-reduce编程模型是如何运作的,我们在python里实现一个自己的map-reduce框架
  • 但这并不是hadoop或者spark的实际实现方式
########################################################## 
# 
#   MiniMapReduce 
# 
# A non-parallel, non-scalable Map-Reduce implementation 
########################################################## 
 
def groupByKey(data): 
    result = dict() 
    for key, value in data: 
        if key in result: 
            result[key].append(value) 
        else: 
            result[key] = [value] 
    return result 
         
def reduceByKey(f, data): 
    key_values = groupByKey(data) 
    return map(lambda key:  
                   (key, reduce(f, key_values[key])),  
                       key_values)

Word-count using MiniMapReduce

data = map(lambda x: (x, 1), "to be or not to be".split()) 
data 
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)] 
groupByKey(data) 
{'be': [1, 1], 'not': [1], 'or': [1], 'to': [1, 1]} 
reduceByKey(lambda x, y: x + y, data) 
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]

Parallelising MiniMapReduce

我们可以轻松的把刚才的map-reduce实现改成并行框架,利用刚才的my_map_mutithreaded函数就可以。

def reduceByKey_multithreaded(f, data): 
    key_values = groupByKey(data) 
    return my_map_multithreaded( 
        lambda key: (key, reduce(f, key_values[key])), key_values.keys()) 
reduceByKey_multithreaded(lambda x, y: x + y, data) 
 
Scheduling jobs..  
Starting jobs..  
Processing data: not ...  
Finished job # 0 
Result was ('not', 1) 
Processing data: to ...  
Finished job # 1 
Result was ('to', 2) 
Processing data: or ...  
Finished job # 2 
Result was ('or', 1) 
Processing data: be ...  
Finished job # 3 
Result was ('be', 2) 
Waiting for jobs to finish..  
All done. 
 
[('not', 1), ('to', 2), ('or', 1), ('be', 2)] 

Parallelising the reduce step

  1. 假设我们的算子事累加和结合的,我们也可以并行reduce操作
  2. 把数据大概分成相等的子集
  3. 在单独的计算核心上独立reduce每一个子集
  4. 最后把各个结果组合起来

Partitioning the data

def split_data(data, split_points): 
    partitions = [] 
    n = 0 
    for i in split_points: 
        partitions.append(data[n:i]) 
        n = i 
    partitions.append(data[n:]) 
    return partitions 
 
data = ['a', 'b', 'c', 'd', 'e', 'f', 'g'] 
partitioned_data = split_data(data, [3]) 
partitioned_data

Reducing across partitions in parallel

from threading import Thread 
 
def parallel_reduce(f, partitions): 
 
    n = len(partitions) 
    results = [None] * n 
    threads = [None] * n 
     
    def job(i): 
        results[i] = reduce(f, partitions[i]) 
 
    for i in range(n): 
        threads[i] = Thread(target = lambda: job(i)) 
        threads[i].start() 
     
    for i in range(n): 
        threads[i].join() 
     
    return reduce(f, results) 
 
parallel_reduce(lambda x, y: x + y, partitioned_data)

Apache Spark and Map-Reduce

  • 我们可以用高层函数map一个RDDs到一个新的RDDs.
  • 每个RDD的实例都至少有两个相应的MapRuduce工作流:map , reduceByKey
  • 这些方法和我们之间定义的标准python collections工作原理是相同的
  • 在Apache Spark API中还有额外的RDD方法

Word-count in Apache Spark

words = "to be or not to be".split() 

The SparkContext class

  • 当我们用Spark的时候我们需要初始化一个SparkContext.
  • paralleled 方法在SparkContext中可以把任何python collection转成RDD
    • 通常情况下我们通过一个大文件或者HBase表中创建RDD
words_rdd = sc.parallelize(words) 
words_rdd 
 
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

Mapping an RDD

现在当我们在my_rdd上面执行map或者reduceByKey操作的时候可以在集群上建立一个并行计算的任务。

word_tuples_rdd = words_rdd.map(lambda x: (x, 1)) 
word_tuples_rdd 
PythonRDD[1] at RDD at PythonRDD.scala:43
  • 注意我们现在还没有产生结果
  • 计算操作在我们请求最终结果被collect之间是不会执行的
  • 通过collect()方法来激活这个计算
word_tuples_rdd.collect() 
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]

Reducing an RDD

  • 但是,我们需要额外处理
word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y) 
word_counts_rdd 
PythonRDD[6] at RDD at PythonRDD.scala:43
  • 现在来请求最终的结果
word_counts = word_counts_rdd.collect() 
word_counts 
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]

Lazy evaluation

  • 只有当我们进行collect()的时候集群才会进行计算
  • collect() 会同时激活map和reduceByKey操作
  • 如果结果collection非常大,那么这个操作开销是很大的

The head of an RDD

  •  take方法和collect类似,但是只返回前n个元素。
  •  take在测试的时候非常有用
word_counts_rdd.take(2) 
[('not', 1), ('to', 2)]

The complete word-count example

text = "to be or not to be".split() 
rdd = sc.parallelize(text) 
counts = rdd.map(lambda word: (word, 1)) / 
             .reduceByKey(lambda x, y: x + y) 
counts.collect() 
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]

Additional RDD transformations

spark提供了很多额外的collections上的操作
  • Sorting: sortByKey, sortBy, takeOrdered
  • Mapping: flatMap
  • Filtering: filter
  • Counting: count
  • Set-theoretic: intersection, union

Creating an RDD from a text file

  • 上面的例子例,我们从collection对象中创建了一个RDD
  • 这不是典型的处理大数据的方法
  • 更常用的方式直接从HDFS文件或者HBase表中创建RDD
  • 下面的例子将会从一个纯ext4文件系统中创建RDD
  • 每一个RDD对应了文本中的一行
genome = sc.textFile('/tmp/genome.txt')

Genome example

  • 我们将会中这个RDD进行计算,并且根据词频来进行排序
  • 首先我们定义一个函数,把序列切成指定大小
def group_characters(line, n=5): 
    result = '' 
    i = 0 
    for ch in line: 
        result = result + ch 
        i = i + 1 
        if (i % n) == 0: 
            yield result 
            result = '' 
 
def group_and_split(line): 
    return [sequence for sequence in group_characters(line)] 
 
group_and_split('abcdefghijklmno') 
['abcde', 'fghij', 'klmno']
  • 现在我们要把原始的RDD转换成键值对的形式,key是这个序列,value是1
  • 注意如果我们简单的把每一行进行map,会得到一个高维的数据
genome.map(group_and_split).take(2) 
[[u'CAGGG', 
  u'GCACA', 
  u'GTCTC', 
  u'GGCTC', 
  u'ACTTC', 
  u'GACCT', 
  u'CTGCC', 
  u'TCCCC', 
  u'AGTTC', 
  u'AAGTG', 
  u'ATTCT', 
  u'CCTGC', 
  u'CTCAG', 
  u'TCTCC'], 
 [u'TGAGT', 
  u'AGCTG', 
  u'GGATG', 
  u'ACAGG', 
  u'AGTGG', 
  u'AGCAT', 
  u'GCCTA', 
  u'GCTAA', 
  u'TCTTT', 
  u'GTATT', 
  u'TCTAG', 
  u'TAGAG', 
  u'ATGCG', 
  u'GTTTT']]

Flattening an RDD using flatMap

  • 我们需要把数据转成序列形式的,所以使用flatMap方法
sequences = genome.flatMap(group_and_split) 
sequences.take(3) 
[u'CAGGG', u'GCACA', u'GTCTC']
counts = / 
    sequences.map( 
        lambda w: (w, 1)).reduceByKey(lambda x, y: x + y) 
counts.take(10) 
[(u'TGTCA', 1), 
 (u'GCCCA', 3), 
 (u'CCAAG', 5), 
 (u'GCCCC', 4), 
 (u'CATGT', 1), 
 (u'AGATT', 1), 
 (u'TGTTT', 1), 
 (u'CCTAT', 4), 
 (u'TCAGT', 1), 
 (u'CAGCG', 2)] 
  • 我们根据计数队序列进行排序
  • 因此key(第一个元素)应该是计数值
  • 我们需要颠倒一下tuples的顺序
def reverse_tuple(key_value_pair): 
    return (key_value_pair[1], key_value_pair[0]) 
 
sequences = counts.map(reverse_tuple) 
sequences.take(10) 
 
[(1, u'TGTCA'), 
 (3, u'GCCCA'), 
 (5, u'CCAAG'), 
 (4, u'GCCCC'), 
 (1, u'CATGT'), 
 (1, u'AGATT'), 
 (1, u'TGTTT'), 
 (4, u'CCTAT'), 
 (1, u'TCAGT'), 
 (2, u'CAGCG')]

Sorting an RDD

  • 现在我们可以降序的方法对key进行排序
sequences_sorted = sequences.sortByKey(False) 
top_ten_sequences = sequences_sorted.take(10) 
top_ten_sequences 
[(15, u'AAAAA'), 
 (9, u'GCAGG'), 
 (8, u'ACAAA'), 
 (7, u'GGCCA'), 
 (7, u'AATTA'), 
 (7, u'AGGTT'), 
 (7, u'AGGGA'), 
 (7, u'CCAGG'), 
 (7, u'GAGCC'), 
 (7, u'AAAAC')]

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9067.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论