spark性能调优

性能调优之在实际项目中分配更多资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
分配更多资源:性能调优的王道,就是增加和分配更多的资源,性能和速度上的提升,是显而易见的;基本上,在一定范围之内,增加资源与性能的提升,是成正比的;写完了一个复杂的spark作业之后,进行性能调优的时候,首先第一步,我觉得,就是要来调节最优的资源配置;在这个基础之上,如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源了,公司资源有限;那么才是考虑去做后面的这些性能调优的点。
问题:
1、分配哪些资源?
2、在哪里分配这些资源?
3、为什么多分配了这些资源以后,性能会得到提升?
答案:
1、分配哪些资源?executor、cpu per executor、memory per executor、driver memory
2、在哪里分配这些资源?在我们在生产环境中,提交spark作业时,用的spark-submit shell脚本,里面调整对应的参数
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置executor的数量
--driver-memory 100m \ 配置driver的内存(影响不大)
--executor-memory 100m \ 配置每个executor的内存大小
--executor-cores 3 \ 配置每个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
3、调节到多大,算是最大呢?
第一种,Spark Standalone,公司集群上,搭建了一套Spark集群,你心里应该清楚每台机器还能够给你使用的,大概有多少内存,多少cpu core;那么,设置的时候,就根据这个实际的情况,去调节每个spark作业的资源分配。比如说你的每台机器能够给你使用4G内存,2个cpu core;20台机器;executor,20;4G内存,2个cpu core,平均每个executor。
第二种,Yarn。资源队列。资源调度。应该去查看,你的spark作业,要提交到的资源队列,大概有多少资源?500G内存,100个cpu core;executor,50;10G内存,2个cpu core,平均每个executor。
一个原则,你能使用的资源有多大,就尽量去调节到最大的大小(executor的数量,几十个到上百个不等;executor内存;executor cpu core)
4、为什么调节了资源以后,性能可以提升?

image

性能调优之在实际项目中调节并行度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
并行度:其实就是指的是,Spark作业中,各个stage的task数量,也就代表了Spark作业的在各个阶段(stage)的并行度。
如果不调节并行度,导致并行度过低,会怎么样?
假设,现在已经在spark-submit脚本里面,给我们的spark作业分配了足够多的资源,比如50个executor,每个executor有10G内存,每个executor有3个cpu core。基本已经达到了集群或者yarn队列的资源上限。
task没有设置,或者设置的很少,比如就设置了,100个task。50个executor,每个executor有3个cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,可以并行运行。但是你现在,只有100个task,平均分配一下,每个executor分配到2个task,ok,那么同时在运行的task,只有100个,每个executor只会并行运行2个task。每个executor剩下的一个cpu core,就浪费掉了。
你的资源虽然分配足够了,但是问题是,并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。
合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源;比如上面的例子,总共集群有150个cpu core,可以并行运行150个task。那么就应该将你的Application的并行度,至少设置成150,才能完全有效的利用你的集群资源,让150个task,并行执行;而且task增加到150个以后,即可以同时并行运行,还可以让每个task要处理的数据量变少;比如总共150G的数据要处理,如果是100个task,每个task计算1.5G的数据;现在增加到150个task,可以并行运行,而且每个task主要处理1G的数据就可以。
很简单的道理,只要合理设置并行度,就可以完全充分利用你的集群计算资源,并且减少每个task要处理的数据量,最终,就是提升你的整个Spark作业的性能和运行速度。
1、task数量,至少设置成与Spark application的总cpu core数量相同(最理想情况,比如总共150个cpu core,分配了150个task,一起运行,差不多同一时间运行完毕)
2、官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,比如150个cpu core,基本要设置task数量为300~500;
实际情况,与理想情况不同的,有些task会运行的快一点,比如50s就完了,有些task,可能会慢一点,要1分半才运行完,所以如果你的task数量,刚好设置的跟cpu core数量相同,可能还是会导致资源的浪费,因为,比如150个task,10个先运行完了,剩余140个还在运行,但是这个时候,有10个cpu core就空闲出来了,就导致了浪费。那如果task数量设置成cpu core总数的2~3倍,那么一个task运行完了以后,另一个task马上可以补上来,就尽量让cpu core不要空闲,同时也是尽量提升spark作业运行的效率和速度,提升性能。
3、如何设置一个Spark Application的并行度?
spark.default.parallelism
SparkConf conf = new SparkConf()
.set("spark.default.parallelism", "500")
“重剑无锋”:真正有分量的一些技术和点,其实都是看起来比较平凡,看起来没有那么“炫酷”,但是其实是你每次写完一个spark作业,进入性能调优阶段的时候,应该优先调节的事情,就是这些(大部分时候,可能资源和并行度到位了,spark作业就很快了,几分钟就跑完了)
“炫酷”:数据倾斜(100个spark作业,最多10个会出现真正严重的数据倾斜问题),感冒和发烧,你不能上来就用一些偏方(癌症,用癞蛤蟆熬煮汤药);JVM调优;

性能调优之在实际项目中重构RDD架构以及RDD持久化

image
image

首先避免第二种情况(上上图红圈2),优化成第一种情况 然后在第一种情况的基础上去持久化RDD

image
image

性能调优之在实际项目中广播大变量

image

性能调优之在实际项目中使用Kryo序列化

image

性能调优之在实际项目中使用fastutil优化数据格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
fastutil介绍:
fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue;
fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,可以减小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高性能快速的,以及实用的IO类,来处理二进制和文本类型的文件;
fastutil最新版本要求Java 7以及以上版本;
fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实现了Java的Map接口),因此可以直接放入已有系统的任何代码中。
fastutil还提供了一些JDK标准类库中没有的额外功能(比如双向迭代器)。
fastutil除了对象和原始类型为元素的集合,fastutil也提供引用类型的支持,但是对引用类型是使用等于号(=)进行比较的,而不是equals()方法。
fastutil尽量提供了在任何场景下都是速度最快的集合类库。
Spark中应用fastutil的场景:
1、如果算子函数使用了外部变量;那么第一,你可以使用Broadcast广播变量优化;第二,可以使用Kryo序列化类库,提升序列化性能和效率;第三,如果外部变量是某种比较大的集合,那么可以考虑使用fastutil改写外部变量,首先从源头上就减少内存的占用,通过广播变量进一步减少内存占用,再通过Kryo序列化类库进一步减少内存占用。
2、在你的算子函数里,也就是task要执行的计算逻辑里面,如果有逻辑中,出现,要创建比较大的Map、List等集合,可能会占用较大的内存空间,而且可能涉及到消耗性能的遍历、存取等集合操作;那么此时,可以考虑将这些集合类型使用fastutil类库重写,使用了fastutil集合类以后,就可以在一定程度上,减少task创建出来的集合类型的内存占用。避免executor内存频繁占满,频繁唤起GC,导致性能下降。
关于fastutil调优的说明:
fastutil其实没有你想象中的那么强大,也不会跟官网上说的效果那么一鸣惊人。广播变量、Kryo序列化类库、fastutil,都是之前所说的,对于性能来说,类似于一种调味品,烤鸡,本来就很好吃了,然后加了一点特质的孜然麻辣粉调料,就更加好吃了一点。分配资源、并行度、RDD架构与持久化,这三个就是烤鸡;broadcast、kryo、fastutil,类似于调料。
比如说,你的spark作业,经过之前一些调优以后,大概30分钟运行完,现在加上broadcast、kryo、fastutil,也许就是优化到29分钟运行完、或者更好一点,也许就是28分钟、25分钟。
shuffle调优,15分钟;groupByKey用reduceByKey改写,执行本地聚合,也许10分钟;跟公司申请更多的资源,比如资源更大的YARN队列,1分钟。
fastutil的使用:
第一步:在pom.xml中引用fastutil的包
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
速度比较慢,可能是从国外的网去拉取jar包,可能要等待5分钟,甚至几十分钟,不等
List<Integer> => IntList
基本都是类似于IntList的格式,前缀就是集合的元素类型;特殊的就是Map,Int2IntMap,代表了key-value映射的元素类型。除此之外,刚才也看到了,还支持object、reference。

性能调优之在实际项目中调节数据本地化等待时长

这一节印象不是很深
image

我理解这个调优的点在于:好的本地化级别,就像是进程间的本地化级别,它计算起来性能高,没错。但是我可能整个集群资源不是很富裕,可能80%的情况都不是task和数据处在同一进程的情况,那这个时候spark.locality.wait.process 就可以设短一点,因为大部分情况你等不到嘛,那你何苦再花时间去尝试走捷径呢(捷径大概率不存在啊)