这期内容当中小编将会给大家带来有关Spark2.x中共享变量的累加器是怎样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
为什么要定义累加器?
在 Spark 应用程序中,我们经常会有这样的需求,如要需要统计符合某种特性数据的总数,这种需求都需要用到计数器。如果一个变量不被声明为一个累加器,那么它将在被改变时不会在 driver 端进行全局汇总,即在分布式运行时每个 task 运行的只是原始变量的 一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。
定义了一个累加器sum,而不是普通变量,实例实例代码如下:
package com.hadoop.ljs.spark220.studyimport org.apache.spark.{SparkConf, SparkContext}/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-20 19:36 * @version: v1.0 * @description: com.hadoop.ljs.spark220.study */object AccumlatorTest { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setMaster("local[*]").setAppName("AccumlatorTest") val sc=new SparkContext(sparkConf) /*定义一个共享变量:累加器*/ val sum=sc.accumulator(0) /*输入数据*/ val rdd1=sc.parallelize(List(1,2,3,4,5)) /*求和 ,然后各个元素加1*/ val rdd2=rdd1.map(x=>{ sum+=x x }) /*这里是个action操作 没有这个操作,程序不会执行*/ rdd2.collect() println("求和:"+sum) sc.stop() }}
运行结果如下,sum=15,符合我们的期望值:
结合上面的代码说一下累加器的执行过程:
1).Accumulator需要在Driver进行定义和并初始化,并进行注册,同时Accumulator首先需要在Driver进行序列化,然后发送到Executor端;另外,Driver接收到Task任务完成的状态更新后,会去更新Value的值,然后在Action操作执行后就可以获取到Accumulator的值了。
2).Executor接收到Task之后会进行反序列化操作,反序列化得到RDD和function,同时在反序列化的同时也去反序列化Accumulator,同时也会向TaskContext完成注册,完成任务计算之后,随着Task结果一起返回给Driver端进行处理。
这里有执行过程图可以参考下:
累加器特性:
1.累加器也是也具有懒加载属性,只有在action操作执行时,才会强制触发计算求值;
2.累加器的值只可以在Driver端定义初始化,在Executor端更新,不能在Executor端进行定义初始化,不能在Executor端通过[.value]获取值,任何工作节点上的Task都不能访问累加器的值;
3.闭包里的执行器代码可以使用累加器的 += 方法(在Java中是 add )增加累加器的值。
特别提醒:
累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。
上述就是小编为大家分享的Spark2.x中共享变量的累加器是怎样的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/223205.html