Spark2.x中共享变量的累加器是怎样的

这期内容当中小编将会给大家带来有关Spark2.x中共享变量的累加器是怎样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

为什么要定义累加器?

    在 Spark 应用程序中,我们经常会有这样的需求,如要需要统计符合某种特性数据的总数,这种需求都需要用到计数器。如果一个变量不被声明为一个累加器,那么它将在被改变时不会在 driver 端进行全局汇总,即在分布式运行时每个 task 运行的只是原始变量的 一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

  定义了一个累加器sum,而不是普通变量,实例实例代码如下:

package com.hadoop.ljs.spark220.studyimport org.apache.spark.{SparkConf, SparkContext}/**  * @author: Created By lujisen  * @company ChinaUnicom Software JiNan  * @date: 2020-02-20 19:36  * @version: v1.0  * @description: com.hadoop.ljs.spark220.study  */object AccumlatorTest {  def main(args: Array[String]): Unit = {    val sparkConf=new SparkConf().setMaster("local[*]").setAppName("AccumlatorTest")    val sc=new SparkContext(sparkConf)    /*定义一个共享变量:累加器*/    val sum=sc.accumulator(0)    /*输入数据*/    val rdd1=sc.parallelize(List(1,2,3,4,5))    /*求和 ,然后各个元素加1*/    val rdd2=rdd1.map(x=>{      sum+=x      x    })    /*这里是个action操作 没有这个操作,程序不会执行*/    rdd2.collect()    println("求和:"+sum)    sc.stop()  }}

运行结果如下,sum=15,符合我们的期望值:

Spark2.x中共享变量的累加器是怎样的

结合上面的代码说一下累加器的执行过程:

 1).Accumulator需要在Driver进行定义和并初始化,并进行注册,同时Accumulator首先需要在Driver进行序列化,然后发送到Executor端;另外,Driver接收到Task任务完成的状态更新后,会去更新Value的值,然后在Action操作执行后就可以获取到Accumulator的值了。

  2).Executor接收到Task之后会进行反序列化操作,反序列化得到RDD和function,同时在反序列化的同时也去反序列化Accumulator,同时也会向TaskContext完成注册,完成任务计算之后,随着Task结果一起返回给Driver端进行处理。

    这里有执行过程图可以参考下:

Spark2.x中共享变量的累加器是怎样的

累加器特性:

    1.累加器也是也具有懒加载属性,只有在action操作执行时,才会强制触发计算求值;

    2.累加器的值只可以在Driver端定义初始化,在Executor端更新,不能在Executor端进行定义初始化,不能在Executor端通过[.value]获取值,任何工作节点上的Task都不能访问累加器的值;

    3.闭包里的执行器代码可以使用累加器的 += 方法(在Java中是 add )增加累加器的值。


特别提醒:

    累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。

上述就是小编为大家分享的Spark2.x中共享变量的累加器是怎样的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/223205.html

(0)
上一篇 2022年1月6日 21:02
下一篇 2022年1月6日 21:02

相关推荐

发表回复

登录后才能评论