记一次spark数据倾斜实践


数据倾斜概念

什么是数据倾斜

  大数据下大部分框架的处理原理都是参考mapreduce的思想:分而治之和移动计算,即提前将计算程序生成好然后发送到不同的节点起jvm进程执行任务,每个任务处理一小部分数据,最终将每个任务的处理结果汇总,完成一次计算。
  如果在分配任务的时候,数据分配不均,导致一个任务要处理的数据量远远大于其他任务,那么整个作业一直在等待这个任务完成,而其他机器的资源完全没利用起来,导致效率极差;如果数据量过大,可能发生倾斜的任务会出现OOM(内存溢出)的异常,使得整个作业失败。因此对于数据倾斜要能改则改

记一次spark数据倾斜实践

案例现象

   案例为日志数据,已做清洗,字段如第一行,重点字段是client_ip和target_ip,需求是求不同target_ip的UV。

实现方式大致是:
1.读取文件,按,切分取2个目标字段client_ip和target_ip
2.按target_ip分组,汇总所有client_ip到一个列表
3.对client_ip列表统计去重数量,输出 <target_ip,UV>

因为是倾斜案例,在1中可以过滤出几个样例ip模拟倾斜场景。
记一次spark数据倾斜实践

代码
package skew

import org.apache.spark.{SparkConf, SparkContext}

import java.util


object SkewSample {
  def main(args: Array[String]): Unit = {
    skew1()
  }

  def skew1():Unit = {
//    val conf = new SparkConf().setAppName("DataSkewTest01").setMaster("local[4]")
    val conf = new SparkConf().setAppName("DataSkewTest01")
    val spark = new SparkContext(conf)

    val rawRDD = spark.textFile("/root/data/skewdata.csv")//读取数据源

    /**筛选满足需要的数据,已到达数据倾斜的目的*/
    val filteredRDD = rawRDD.filter(line => {
      val array = line.split(",")
      val target_ip = array(3)
      target_ip.equals("106.38.176.185") || target_ip.equals("106.38.176.117") || target_ip.equals("106.38.176.118") || target_ip.equals("106.38.176.116")
    })


    /**根据目的ip进行汇总,将访问同一个目的ip的所有客户端ip进行汇总*/
    val reducedRDD = filteredRDD.map(line => {
      val array = line.split(",")
      val target_ip = array(3)
      val client_ip = array(0)
      val index = client_ip.lastIndexOf(".")
      val subClientIP = client_ip.substring(0, index) //为了让后续聚合后的value数据量尽可能的少,只取ip的前段部分
      (target_ip,Array(subClientIP))
    }).reduceByKey(_++_,4)//将Array中的元素进行合并,然后将分区调整为已知的4个

    //reducedRDD.foreach(x => println(x._1, x._2.length))  //查看倾斜key

    /**将访问同一个目的ip的客户端,再次根据客户端ip进行进一步统计*/
    val targetRDD = reducedRDD.map(kv => {
      val map = new util.HashMap[String,Int]()
      val target_ip = kv._1
      val clientIPArray = kv._2
      clientIPArray.foreach(clientIP => {
        if (map.containsKey(clientIP)) {
          val sum = map.get(clientIP) + 1
          map.put(clientIP,sum)
        }
        else map.put(clientIP,1)
      })
      (target_ip,map)
    })

    targetRDD.foreach(x => println(x._1, x._2.size()))
//    targetRDD.saveAsTextFile("tmp/DataSkew01") //结果数据保存目录

//    Thread.sleep(600000)
  }

}

倾斜现象

因为数据量小,所以没有执行很长的时间,但是可以看到有一个任务处理的数据量是其他的百倍左右。

问题分析

案例中最终的分区数量,以及分区键,还有倾斜键都是一个确定的值,因此可以考虑两种优化方式:

  1. 单独处理:案例只有一个倾斜键,可以考虑将这个倾斜键和非倾斜键的数据过滤到2个RDD中,单独处理。这种方式会生成2个JOB,读两次源数据,虽然可以用缓存来提速,但是数据量大了以后缓存也是要落盘的,所以不是特别好

  2. 加盐减盐:对于倾斜键进行加盐,即在倾斜键本身后加上0-100的数字,改变它的hash值以便将数据分散到不同的分区中,然后对结果进行聚合,这样可以显著改善倾斜情况,最终还要对加盐的数据进行去盐,即将倾斜键后面的0-100数字去掉,然后再一次汇总,得到最终结果。 实践的时候又可以将去盐分为两步由100->10,10->1这样,降低数据波动,后续有一次去盐和二次去盐的结果对比。

本站声明:
1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;

2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;

3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;

4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;

5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/293836.html

(0)
上一篇 2022年11月27日
下一篇 2022年11月28日

相关推荐

发表回复

登录后才能评论