Spark闭包中driver及executor程序代码是怎样执行的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
闭包的作用可以理解为:函数可以访问函数外部定义的变量,但是函数内部对该变量进行的修改,在函数外是不可见的,即对函数外源变量不会产生影响。
其实,在学习Spark时,一个比较难理解的点就是,在集群模式下,定义的变量和方法作用域的范围和生命周期。这在你操作RDD时,比如调用一些函数map、foreach时,访问其外部变量进行操作时,很容易产生疑惑。为什么我本地程序运行良好且结果正确,放到集群上却得不到想要的结果呢?
任务,会将RDD的操作分解为多个task,并且这些task是由executor执行的。
在执行之前,Spark会计算task的闭包即定义的一些变量和方法,比如例子中的counter变量和foreach方法,并且闭包必须对executor而言是可见的,这些闭包会被序列化发送到每个executor。
模式下,driver和executor运行在不同的JVM进程中,发送给每个executor的闭包中的变量是driver端变量的副本。
因此,当foreach函数内引用counter时,其实处理的只是driver端变量的副本,与driver端本身的counter无关。
driver节点的内存中仍有一个计数器,但该变量对executor是不可见的!
executor只能看到序列化闭包的副本。
因此,上述例子输出的counter最终值仍然为零,因为counter上的所有操作都只是引用了序列化闭包内的值。
但是在生产中,我们的任务都是在集群模式下运行,如何能满足这种业务场景呢?
这就必须引出一个后续要重点讲解的概念:Accumulator即累加器。Spark中的累加器专门用于提供一种机制,用于在集群中的各个worker节点之间执行时安全地更新变量。
说,closures – constructs比如循环或本地定义的方法,就不应该被用来改变一些全局状态,Spark并没有定义或保证对从闭包外引用的对象进行更新的行为。
如果你这样操作只会导致一些代码在本地模式下能够达到预期的效果,但是在分布式环境下却事与愿违。
如果需要某些全局聚合,请改用累加器。
对于其他的业务场景,我们适时考虑引入外部存储系统、广播变量等。
首先,对RDD相关的操作需要传入闭包函数,如果这个函数需要访问外部定义的变量,就需要满足一定条件(比如必须可被序列化),否则会抛出运行时异常。闭包函数在最终传入到executor执行,需要经历以下步骤:
1.driver通过反射,运行时找到闭包访问的变量,并封装成一个对象,然后序列化该对象
2.将序列化后的对象通过网络传输到worker节点
3.worker节点反序列化闭包对象
4.worker节点的executor执行闭包函数
driver是运行用户编写Application 的main()函数的地方,具体负责DAG的构建、任务的划分、task的生成与调度等。job,stage,task生成都离不开rdd自身,rdd的相关的操作不能缺少driver端的sparksession/sparkcontext。
最后做个总结:所有对RDD具体数据的操作都是在executor上执行的,所有对rdd自身的操作都是在driver上执行的。比如foreach、foreachPartition都是针对rdd内部数据进行处理的,所以我们传递给这些算子的函数都是执行于executor端的。但是像foreachRDD、transform则是对RDD本身进行一列操作,所以它的参数函数是执行在driver端的,那么它内部是可以使用外部变量,比如在SparkStreaming程序中操作offset、动态更新广播变量等。
关于Spark闭包中driver及executor程序代码是怎样执行的问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。
原创文章,作者:kirin,如若转载,请注明出处:https://blog.ytso.com/223290.html