六、spark–spark调优

[TOC]

一、spark调优概论

1.1 什么是spark调优

​ spark的计算本质是分布式计算,程序的性能受集群中的任何因素的影响,如:CPU、网络带宽、内存等。一般情况下,如果内存足够大,那么其他因素影响性能。然后出现调优需求时,更多是因为资源不够用的情况,所以才需要调节资源的使用情况,更加高效的使用资源。比如如果内存比较紧张,不足以存放所有数据(10亿条),需要针对内存的使用,进行调优来减少内存的消耗

1.2 spark调优的主要方向

​ Spark的性能优化,大部分的工作,是对于内存的使用,进行调优。通常情况下,Spark 处理的程序数据量较小,内存足够使用,只要保证网络通常,一般不会出现大的性能问题。但是,Spark应用程序的性能问题往往出现在针对大数据量进行计算时(数据突增)。这种情况往往是现环境是无法满足的,所以可能导致集群崩溃。
​ 除了内存调优之外,还有一些手段可以优化性能。比如spark使用过程中有和mysql交互的话,此时调优也要考虑到mysql的性能问题。

1.3 spark调优的主要技术手段

1、使用高性能序列化类库。目的减少序列化时间以及序列化后数据的大小
2、优化数据结构。目的减少内存占用
3、对多次使用的RDD进行持久化(RDD cache)、checkpoint
4、使用序列化的持久化级别:MEMORY_ONLY不序列化,MEMORY_ONLY_SER序列化。
MEMORY_ONLY比MEMORY_ONLY_SER要占用更多内存空间。
但是要注意,序列化会增加cpu使用成本,所以要权衡好
5、Java虚拟机垃圾回收调优。
6、Shuffle调优,90%的问题都是shuffle导致(1.x版本时此问题严重,到2.x版本,官网基本已优化,所以到2.x版本,这个问题可忽略)

其他性能优化的方式:
提高计算并行度
广播共享数据

下面会针对这6点调优手段进行分析

二、诊断spark内存使用情况

2.1 内存花费(对象内存花费)

1、每个 java/scala对象,由两部分组成,一个是对象头,占用16字节,主要包含对象的一些元信息,比如指向它的类的指针。另一个是对象本身。如果对象比较小,比如int,它的对象头比自己对象本身都大。

2、String对象,会比他内部的原始数据,多出40个字节,用于保存string类型的元信息
String内部使用char数组来保存字符串序列,并且还要保存诸如数组长度之类的信息。String使用UTF-16编码,所以每个字符会占用2个字节。
比如:包含10个字符的String,占用 2*10 + 40 个字节。

3、集合类型,比如HashMap和LinkedList,内部使用链表数据结构,对链表中的每个数据,使用Entry对象包装。Entry对象,不光有对象头,还有指向下一个Entry的指针,占用8个字节。所以一句话就是,这种内部还包含多个对象的类型,占用内存更多。因为对象多了,除了对象本身数据占用内存之外,更多对象也就会有更多对象头,占用了不少内存空间。

4、基本数据类型的集合,比如int集合,内部会使用对象的包装类 Integer来存储元素。

2.2 获取spark程序内存使用情况

到driver日志目录下查看程序运行日志

less ${spark_home}/work/app-xxxxxx/0/stderr
观察到类似如下信息:
INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 320.9 KB, free 366.0 MB)
        19/07/05 05:57:47 INFO MemoryStore: Block rdd_3_1 stored as values in memory (estimated size 26.0 MB, free 339.9 MB)
        19/07/05 05:57:47 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2212 bytes result sent to driver
        19/07/05 05:57:48 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size 26.7 MB, free 313.2 MB)

estimated size 320.9 KB:当前使用的内存大概大小
free 366.0 MB:剩余空闲内存大小

这样就可以知道任务使用内存的情况了

三、spark调优技术手段

2.1 使用高性能序列化类库

2.1.1 spark序列化的使用情况

​ spark作为一个分布式系统,和其他分布式系统一样,都需要序列化。任何一个分布式系统中,序列化都是很重要的一环。如果使用的序列化技术,操作很慢,序列化后数据量大,会导致分布式系统应用程序性能下降很多。所以,Spark性能优化的第一步,就是进行序列化性能的优化。
​ spark在一些地方是会使用序列化,比如shuffle的时候,但是spark对便捷性和性能进行了取舍,spark为了便捷性,默认使用了java的序列化机制,java的序列化机制之前也讲过,性能不高,序列化速度慢,序列化后数据大。所以一般生产中,最好修改spark使用 的序列化机制

2.1.2 配置spark使用kryo来序列化

​ spark支持使用kryo来实现序列化。kryo序列化速度比java快,占用空间小,大概小10倍。但是使用起来,相对没有那么便捷。
配置spark使用kryo:

spark在读取配置时,会读取conf目录下的配置文件,其中有一个 spark-defaults.conf 文件就是用来指定spark的一些工作参数的。

vim spark-defaults.conf
spark.serializer        org.apache.spark.serializer.KryoSerializer

这就配置了使用kryo,当然也可以在spark程序中使用 conf对象来来设置
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

2.1.3 kryo类库的优化

(1)优化缓存大小
如果注册的序列化的自定义类型,本身特别大,比如包含了100个以上字段,就会导致序列化的对象过大。此时需要对kyro本身进行优化。因为kyro本身内部缓存不够存放这么大的对象。

设置:spark.kryoserializer.buffer.max  参数值调大,即可。

(2)提前注册自定义类型
使用kryo时,为了更高的性能,最好提前注册需要序列化的类,如:

在sparkConf 对象中注册
conf.registerKryoClasses(Array(classof[Student],classof[Teacher]))

注意:这里基本都针对自定义的类,而且用scala编写spark项目时,其实不会涉及到太多自定义类,不像java

2.2 优化数据结构

2.2.1 概述

优化数据结构,主要在于避免语法特性中所导致的额外内存开销。
核心:优化算子函数内部使用到的局部数据或者算子外部的数据。
目的:减少对内存的消耗和占用。

2.2.2 具体手段

(1)优先使用数组以及字符串,而不是集合类。

即:优先使用array,而不是ArrayList,LinkedList,hashMap
使用int[] 比 List<Integer> 节省内存。

前面也说过,集合类包含更多的额外数据,以及复杂的类结构,所以占用内存多。此举就是为了将结构简单化,满足使用的情况下,越简单越好

(2)将对象转换成字符串。

在企业中,将HashMap,List这种数据,统一使用String拼接成特殊格式的字符串。
举例:
Map<Integer,Person> persons = new HashMap<>()
优化为:
id:name,address,idCardNum,family......|id:name,address,idCardNum,family......

(3)避免使用多层嵌套对象结构。

public class Teacher{private List<Student> students = new ArrayList<>()}
以上例子不好,因为Teacher类的内部又嵌套了大量的小的Student对象。
改进:
转成json,处理字符串
{"teacherId":1,....,students[{"studentId":1,.....}]}

(4)对于能够避免的场景,使用int代替String

虽然String性能比List高,但是int占用更少内存。
比如:数据库主键,id,推荐使用自增主键,而不是uuid。

2.3 RDD缓存

这个就很简单了,主要是将多次使用的RDD缓存在内存中,避免再次使用时重复计算。实现方法看前面spark core的文章

2.4 使用序列化来进行缓存

默认情况下,进行RDD缓存时,RDD对象是没有序列化的,也就是持久化级别为 MEMORY_ONLY。建议使用 MEMORY_ONLY_SER进行持久化,因为这种方式同时会进行序列化,序列化后占用更少的内存的空间。实现方法看前面spark core的文章

2.5 jvm调优

2.5.1 背景

​ 如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个性能瓶颈。Java虚拟机会定期进行垃圾回收,此时就会追踪所有Java对象,并且在垃圾回收时,追中找到那些已经不再使用的对象,清理旧对象,给新对象腾出空间。
​ 垃圾回收的性能开销,和内存中的对象数量成正比。而且要注意一点, 在做Java虚拟机调优前,必须要做好上面其他调优工作,这样才有意义。因为上面的调优工作,是为了节省内存的开销,更好、更高效的使用内存。上面的优化比起进行jvm调优获得的好处要大得多。并且jvm调优好了,但是上层应用没有好的内存使用方式,jvm优化了也白搭。

2.5.2 gc原理

这里提到这个,更多是让读者自己去理解这个原理,随便百度都可以找到了,这里不重复。

2.5.3 检测垃圾回收

我们可以对垃圾回收进行监测,包括多久进行一次垃圾回收,以及每次垃圾回收耗费的时间。
在 spark-submit脚本中,添加一个配置:

--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimesStamps"

注意:输出到worker的日志中,而不是driver日志。

/usr/local/spark-2.1.0-bin-hadoop2.7/work/app-20190705055405-0000/0
这是driver日志

/usr/local/spark-2.1.0-bin-hadoop2.7/logs
这是worker日志

2.5.4 优化Executor内存比例

​ 对于GC调优来说,最重要的调节,RDD缓存占用的内存空间 与 算子执行是创建对象所占用的内存空间 的比例。默认情况下,Spark使用每个Executor 60%的内存空间来缓存RDD,那么在task执行期间创建的对象,只有40%的内存空间来存放对象。
​ 在这种情况下,很有可能因为内存不足,task创建的对象过大,导致40%的内存空间不够用,触发Java虚拟机垃圾回收操作。在极端的情况下,垃圾回收操作会被频繁触发。
​ 根据实际情况,可以增大对象存储空间,减少gc发生概率,方式:

conf.set("spark.storage.memoryFraction",0.5)
将RDD缓存占用空间比例降低到50%

2.6 shuffle

​ 以往在spark1.x版本中,如果有shuffle时,那么每个map task就会根据result task(也可以叫reduce task)的个数,对map结果进行分区,分别给不同的result task处理,每个分区产生一个文件。当map数量很多时,就会产生大量文件,这会带来性能问题。
​ 在spark2.x中,将一个map task输出的数据都放在一个文件中,然后加上一个索引文件,用于标识不同分区数据在文件中的位置,这样就保证了一个task只产生一个文件。从而降低了IO压力

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/193898.html

(0)
上一篇 2021年11月15日
下一篇 2021年11月15日

相关推荐

发表回复

登录后才能评论