sparkSQL中cache的若干问题

摘要

sparkSQL在使用cache缓存的时候,有时候缓存可能不起作用,可能会发出缓存是假的吧的感慨。现在我们就把这个问题说道说道。
问题

场景描述

当我们通过spark进行统计和处理数据时,发现他是延迟计算的,如果一个应用中出现多个action,而这多个action处理同一个数据源数据时,数据源用时间来过滤数据时,由于有多个action操作,遇到每个action就是一个job,每一个action都会执行数据源获取数据的操作,由于两个action之间的操作存在时间差,这两个action获取的数据有可能不一致。
例如下例
test1表中的数据

1,2018-07-01 10:10:03
2,2018-07-01 11:12:04

代码如下操作
val odsData = spark.sql("""
select
from default.test1
where time < "2018-07-02"
""")
val targetData = odsData.map(fun _)
val targetData.createOrReplaceTempView("data1")
//第一个Action操作
val spark.sql("""
insert overwrite table default.test2
*
from data1
""")

val targetData1 = odsData.map(fun2 _) //引用同一个数据源
targetData1.createOrReplaceTempView("data2")
//第二个action操作
val spark.sql("""
insert table default.test2
*
from data2
""")

如果在运行第二个Action操作前,test1表中又增加了一条记录3,2018-07-01 13:12:04
即执行第一个Action时记录还是两条1和2,而再执行完第一个Action后而又执行第二个Action之前,
增加了一个新的单子:3,2018-07-01 13:12:04
那么在test2表中的数据是怎么样的呢?
第一种情况(因为第二个action是insert而不是insert overwrite)

1,2018-07-01 10:10:03
2,2018-07-01 11:12:04
1,2018-07-01 10:10:03
2,2018-07-01 11:12:04

第二种情况

1,2018-07-01 10:10:03
2,2018-07-01 11:12:04
1,2018-07-01 10:10:03
2,2018-07-01 11:12:04
3,2018-07-01 13:12:04

结果分析

结果是第二中情况。如果认为是第一种情况的对spark的执行计划还是不太熟悉。首先spark是lazy计算的,即不触发action操作,其实不提交作业的。而在这个application中存在两个action,而这两个aciton使用了同一个数据源的rdd,应该称为变量odsData,当遇到第一个action,其会把自己这个执行链上的rdd都执行一遍,包括执行odsData,而遇到第二个aciton的时候,其也会把自己的执行链上的数据又执行了一遍包括odsData,并从数据源中重新取数。有人会疑惑,第一个action在执行的时候,已经执行了odsData,这个RDD的结果不应该缓存起来吗?个人认为,spark还没有那么的智能,并且网上经常说的job,stage,rdd,task的划分应该是在同一个job内进行的。而同一个应用中夸job的stage拆分是不存在的。那么出现这个结果应该怎么办呢?
cache的出场

当出现这样的情况时,我的应用每天就会漏几十条数据,很是烦人,最后发现了上面的问题,当时想解决方案时,第一个就是想到了cache,我把第一次执行Action操作时,把odsData给缓存了,这样应该不会有什么问题了吧。从而可以保证两个action操作,同一个数据源的数据一致性。只能说too young to sample了。这样解决不了上面出现的问题。同样以一个例子来看。
test表中的数据:

1 2017-01-01 01:00:00 2016-05-04 9999-12-31
2 2017-01-01 02:00:00 2016-01-01 9999-12-31
代码:

val curentData = spark.sql(
"""
|select
|*
|from default.test
""".stripMargin)

curentData.cache() //缓存我们的结果

curentData.createOrReplaceTempView("dwData")

//第一个Action
spark.sql(
"""
|INSERT OVERWRITE TABLE default.test1
|SELECT
|
|FROM dwData
""".stripMargin)
//改变数据源表test表的数据并且是第二个Action
spark.sql(
"""
|INSERT OVERWRITE TABLE default.test
|SELECT
| 1,
| "2017",
| "2018",
| "2018"
|FROM default.test
""".stripMargin)
//第三个Action和第一个Action同数据源,并且cache第一次运行的结果。
spark.sql(
"""
|INSERT OVERWRITE TABLE default.test1
|SELECT
|

|FROM dwData
""".stripMargin)
那么test1表中的结果
第一种情况:
1 2017-01-01 01:00:00 2016-05-04 9999-12-31
2 2017-01-01 02:00:00 2016-01-01 9999-12-31
第二种情况

1 2017 2018 2018
1 2017 2018 2018
结果分析

结果是第二种情况,也就是说我们cache根本就没有起到效果,或者说第三个Action根本就没有使用我们cache的数据。这次我把日志都打出来了啊。
第一个Action的声明周期:
sparkSQL中cache的若干问题

第三个Action的日志:
sparkSQL中cache的若干问题

从这两个日志可以看出,我们设置cache其只能在同一个job中生效。而夸job的使用这样的数据缓存数据是不存在的。
如果想更加详细的了解cache的原理和作用,可以去网上搜,大把大把的资料,但是一定要记住,网上说的要限定一个条件,在同一个job内的rdd,夸job的cache是不存在的。
解决方案

我们最终希望解决的事,当两个action想要使用同一个数据源的rdd的时候,如何保证其数据的一致性。
方案:
把第一个Action算子用到的数据源给写入到一个临时表中
然后再第二个Action中,直接读取临时表的数据,而不是直接使用odsData
更好的方案还没有想好,可以根据业务的不同来搞。

第二个方案现在就是我们使用spark提供的checkpoint机制,checkpoint会把我们的数据
自动缓存到hdfs,它就会把这个rdd以前的父rdd的数据全部删除,以后不管哪个job的rdd
需要使用这个rdd的数据时,都会从这个checkpoin的目录中读取数据。
spark.sparkContext.setCheckpointDir("hdfs://hadoop-1:5000/hanfangfang")
curentData.cache().checkpoint
这样就可以使不同的job,同一个数据源数据的一致性。
同时我们也要记住,当程序运行完成,其不会删除checkpoint的数据的,需要们手动删除。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/191004.html

(0)
上一篇 2021年11月14日
下一篇 2021年11月14日

相关推荐

发表回复

登录后才能评论