Spark & Hive 调优,参数配置

无废话篇

Spark

配置于spark-default.conf

  1. #spark.yarn.applicationMaster.waitTries 5
    用于applicationMaster等待Spark master的次数以及SparkContext初始化尝试的次数 (一般不用设置)

2.spark.yarn.am.waitTime 100s

3.spark.yarn.submit.file.replication 3
应用程序上载到HDFS的复制份数

4.spark.preserve.staging.files false
设置为true,在job结束后,将stage相关的文件保留而不是删除。 (一般无需保留,设置成false)

5.spark.yarn.scheduler.heartbeat.interal-ms 5000
Spark application master给YARN ResourceManager 发送心跳的时间间隔(ms)

6.spark.yarn.executor.memoryOverhead 1000
此为vm的开销(根据实际情况调整)

7.spark.shuffle.consolidateFiles true
仅适用于HashShuffleMananger的实现,同样是为了解决生成过多文件的问题,采用的方式是在不同批次运行的Map任务之间重用Shuffle输出文件,也就是说合并的是不同批次的Map任务的输出数据,但是每个Map任务所需要的文件还是取决于Reduce分区的数量,因此,它并不减少同时打开的输出文件的数量,因此对内存使用量的减少并没有帮助。只是HashShuffleManager里的一个折中的解决方案。

8.spark.serializer org.apache.spark.serializer.KryoSerializer
暂时只支持Java serializer和KryoSerializer序列化方式

9.spark.kryoserializer.buffer.max 128m
允许的最大大小的序列化值。

10.spark.storage.memoryFraction 0.3
用来调整cache所占用的内存大小。默认为0.6。如果频繁发生Full GC,可以考虑降低这个比值,这样RDD Cache可用的内存空间减少(剩下的部分Cache数据就需要通过Disk Store写到磁盘上了),会带来一定的性能损失,但是腾出更多的内存空间用于执行任务,减少Full GC发生的次数,反而可能改善程序运行的整体性能。

11.spark.sql.shuffle.partitions 800
一个partition对应着一个task,如果数据量过大,可以调整次参数来减少每个task所需消耗的内存

12.spark.sql.autoBroadcastJoinThreshold -1
当处理join查询时广播到每个worker的表的最大字节数,当设置为-1广播功能将失效。

13.spark.speculation false

如果设置成true,倘若有一个或多个task执行相当缓慢,就会被重启执行。(事实证明,这种做法会造成hdfs中临时文件的丢失,报找不到文件的错)

14.spark.shuffle.manager tungsten-sort

tungsten-sort是一种类似于sort的shuffle方式,shuffle data还有其他两种方式 sort、hash. (不过官网说 tungsten-sort 应用于spark 1.5版本以上)

15.spark.sql.codegen true

Spark SQL在每次执行次,先把SQL查询编译JAVA字节码。针对执行时间长的SQL查询或频繁执行的SQL查询,此配置能加快查询速度,因为它产生特殊的字节码去执行。但是针对很短的查询,可能会增加开销,因为它必须先编译每一个查询

16.spark.shuffle.spill false

如果设置成true,将会把spill的数据存入磁盘

17.spark.shuffle.consolidateFiles true

我们都知道shuffle默认情况下的文件数据为map tasks * reduce tasks,通过设置其为true,可以使spark合并shuffle的中间文件为reduce的tasks数目。

18.代码中 如果filter过滤后 会有很多空的任务或小文件产生,这时我们使用coalesce或repartition去减少RDD中partition数量。

Hive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

SET mapreduce.map.memory.mb=8192;
SET mapreduce.map.java.opts='-Xmx7400m' -XX:+UseG1GC;
SET mapreduce.child.map.java.opts='-Xmx7400m';
SET hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
SET mapreduce.reduce.memory.mb=8192;
SET mapreduce.reduce.java.opts='-Xmx7400m' -XX:+UseG1GC;
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET mapred.max.split.size=250000000;
SET mapred.min.split.size.per.node=128000000;
SET mapred.min.split.size.per.rack=128000000;
SET hive.merge.smallfiles.avgsize=256000000;
SET hive.merge.size.per.task = 256000000;
SET mapred.job.reuse.jvm.num.tasks=10;
SET mapred.tasktracker.map.tasks.maximum=24;
SET mapred.tasktracker.reduce.tasks.maximum=24;
SET mapreduce.job.reduce.slowstart.completedmaps=0.8;
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=10;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.auto.convert.join=true;
SET hive.optimize.skewjoin = true;
SET hive.groupby.skewindata=true;
SET hive.groupby.mapaggr.checkinterval=100000;
SET hive.skewjoin.key=100000;
SET hive.exec.reducers.bytes.per.reducer = 256000000;
SET hive.map.aggr=true;
SET hive.hadoop.supports.splittable.combineinputformat=true;
SET hive.exec.reducers.max=4000;

Misc

1
2
3
4
hive --hiveconf hive.root.logger=DEBUG,console
set hive.support.quoted.identifiers=none

insert overwrite table tb2 partition(dt=xx, hr=xx) select `(dt|hr)?+.+` from tb1

拆分大文件,合并小文件

1
2
3
4
5
6
7
set mapred.max.split.size=256000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.smallfiles.avgsize=256000000;
set hive.merge.mapfiles = true;
set hive.merge.mapredfiles = true;