hive_04

Hive

    1. 内部表
      • 数据生命周期
    1. 外部表
      • 删除外部表,并没有删除数据,删掉了schema(rdbms)
    1. 分区表
      • 表目录的子目录
      • create table xxx(…) partitioned by ()
      • alter table add partitions () ..
      • load data local inpath … into table xxx partition (…)
    1. bucket表
      • create table xxx(…) clustered by (fieldName) into n BUCKETS
      • 数据文件.hash

调优


    1. explain
    • 解释执行计划
      1
      explain select sum(*) from
    1. 启用limit调优,避免全表扫描,使用抽样机制
    • select * from xxx limit 1,2
    • hive.limit.optimize.enable=true
    1. JOIN
    • 使用map端连接(/+STREAMTABLE(table_name)/)
    • 连接查询表的大小是从左至右依次增长
    1. 设置本地模式,在单台机器上处理所有任务
    • 使用于小数据情况
      • set hive.exec.mode.local.auto=true 默认false
      • set mapreduce.framework.name=local 不使用yarn
    1. 并行执行job
    • 如果job之间没有依赖关系,可以并行执行,缩短执行时间
      • set hive.exec.parallel=true 默认false
    1. ##严格模式
    • set hive.mapred.mode=strict 总开关,不推荐使用,但是2.3.4版本依然有效
      • 如果设置为strict,依然有效,进入严格模式
    • set hive.strict.checks.large.query=false 默认false,该设置会禁用以下操作
      1. 不指定limit的orderby
      2. 对分区表不指定分区进行查询
      3. 和数据量无关,只是一个查询模式
    • set hive.strict.checks.type.safety=true 严格类型的安全检查,不允许以下操作
      1. BIGINT 和 STRING 之间的比较
      2. BIGINT 和 DOUBLE 之间的比较
    • set hive.strict.checks.cartesian.product=true 不允许笛卡尔积连接
    1. 调整map,reduce个数
    • set hive.exec.reducers.bytes.per.reducer=256000000 每个reducer task的字节数
    • set hive.exec.reducers.max=1009 每个reducer task的最大值,属性为负值时,会使用该属性
    1. ##JVM重用
    • 使得同一个JVM在一个Job(map,reduce)中执行多次,避免启动JVM的开销
    • set mapreduce.job.ubertask.enable=true 是否启用uber,jvm重用
    • set mapreduce.job.ubertask.maxmaps=9 最大map数
    • set mapreduce.job.ubertask.maxreduces=1 最大reduce数
      In order to change the average load for a reducer (in bytes):
      set hive.exec.reducers.bytes.per.reducer=100000000
      In order to limit the maximum number of reducers:
      set hive.exec.reducers.max=5
      In order to set a constant number of reducers:
      set mapreduce.job.reduces=3
    1. #索引
      • 使用index
    1. ##动态分区调整 + BUCKET表
    • set hive.exec.dynamic.partition.mode=strict 动态分区严格模式
    • set hive.exec.max.dynamic.partitions=1000 设置最大分区数
    • set hive.exec.max.dynamic.partitions.pernode=100 设置每个节点最大分区数
    1. 推测执行
    • 让map|reduce多个实例并发执行
      • set mapreduce.map.speculative=true map推测
      • set mapreduce.reduce.speculative=true reduce推测
    1. 多个分组优化
    • 如果多个groupby操作使用的是一个公共的字段,则这些groupby可以生成一个MR,默认true
      • set hive.multigroupby.singlereducer=true
    1. 虚拟列
    • set hive.exec.rowoffset=false 是否启用虚拟列
      • select input__file__name , block__offset__inside__file , id ,name from test2 ;

#压缩

    1. 查看压缩的编解码器
    • hive -e “set io.compression.codecs”
    1. 启用中间结果压缩
    • set hive.exec.compress.intermediate=true 默认值false
    • set mapred.compress.map.output= 控制map的输出压缩设置
    1. 修改map输出结构的压缩,默认值DefaultCodec
    • set mapred.map.output.compression.codec= 修改压缩解码器的默认值,默认值DefaultCodec
      1
      2
      3
      4
      <property>
      <name>mapred.map.output.compression.codec</name>
      <value>org.apache.hadoop.io.compress.SnappyCodec</value>
      </property>
    1. 设置最终的job输出结果为压缩
    • a.设置hive的查询结果是否压缩
      • set hive.exec.compress.output=true 默认false 不压缩
    • b.具体配置要依靠hadoop配置
      1
      2
      3
      4
      5
      6
      [mapred-site]

      <property>
      <name>mapred.output.compression.codec</name>
      <value>org.apache.hadoop.io.compress.GzipCodec</value>
      </property>
    1. 使用sequenceFile作为存储格式
    • create table seqfile(…) as STORED as sequencefile; 使用序列文件存储
      1
      2
      3
      4
      5
      create table t_seq (id int , name string, age int)
      > row format delimited
      > fields terminated by '\t'
      > lines terminated by '\n'
      > stored as sequencefile;
    • insert into t_seq select * from test1; 复制test1数据到t_seq中
    1. 控制sequencefile中map端输出时文件的压缩类型
    • 配置块压缩类型 [mapred-site.xml]
      1
      2
      3
      4
      <property>
      <name>mapred.output.compression.type</name>
      <value>BLOCK</value>
      </property>
    • 产线环境一般使用如下配置
      1
      2
      3
      4
      5
      6
      set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;//map输出codec
      set hive.exec.compress.intermediate=true;//中间结果是否压缩

      set hive.exec.compress.output=true;//job的输出是否压缩,阀门
      set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;//job输出的codec
      set mapred.output.compression.type=BLOCK;//job的输出block压缩
    1. 对分区归档
    • 只能对内部分区表进行归档,外部表不可以
    • a. 启用hive的分区归档
      • set hive.archive.enabled=true; 是否允许归档操作
    • b. 对表的指定分区进行归档
      • alter table test3 archive partition (province=’hebei’, city=’baoding’);
    • c. 操作
      • create table test3 as select id,name ,age from test2;
    • 注意:运行时缺少HadoopArchive.class类,查看日志/tmp/ubuntu/hive/hive.log可见
      • 复制$HADOOP_HOME/shared/hadoop/tools/hadoop-archive-xxx.jar /soft/hive/auxlib
      • 复制$HADOOP_HOME/shared/hadoop/tools/hadoop-archive-xxx.jar /soft/hive/lib

#函数(UDF: user define function)

    1. procedure + function
    1. 函数操作
    • show functions ; 显示所有函数
    • desc[ribe] function case ; 查看函数的帮助
    • desc[ribe] function extended case ; 查看函数的扩展帮助
    1. UDF
    • 输入是一行或多行,输出单行
    1. UDAF:
    • User Define Aggregate Function 用户定义聚合函数
      • 一行或多行的N个列作为输入,输出一个值
    1. UDTF
    • User Define table function 表生成函数
      • n个输入,多行或者多列作为输入
      • select explode(array(1,2,3)) ;
    1. 自定义函数
    • a.
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      import org.apache.hadoop.hive.ql.exec.Description;
      import org.apache.hadoop.hive.ql.exec.UDF;

      import java.text.ParseException;
      import java.text.SimpleDateFormat;
      import java.util.Date;

      /**
      * 将字符串转换成日期
      *
      * @author Takho
      */
      @Description(name = "to_date", value = "this is my first udf!", extended = "e.g select to_date('2018-12-10 12:12:12')")
      public class ToDate extends UDF {

      public Date evaluate(String date) {
      SimpleDateFormat sdf = new SimpleDateFormat();
      sdf.applyPattern("yyyy/MM/dd HH:mm:ss");
      try {
      return sdf.parse(date);
      } catch (ParseException e) {
      e.printStackTrace();
      }
      return new Date();
      }
      }
    • b. 将函数导出jar包
    • c. 通过hive命令将jar添加到hive的类路径
      • add jar /mnt/hgfs/bigdata/project/myhive210-1.0-SNAPSHOT.jar
    • d. 注册函数
      • create TEMPORARY function to_date as ‘com.tak.function.ToDate’;
    • e. 调用函数

#自定义表生成函数

    1. 创建类UDTF
    • com.tak.function.ForUDTF
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      import org.apache.hadoop.hive.ql.exec.Description;
      import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
      import org.apache.hadoop.hive.ql.metadata.HiveException;
      import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
      import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
      import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
      import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
      import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
      import org.apache.hadoop.io.IntWritable;

      import java.util.ArrayList;

      /**
      * 简单for循环函数
      *
      * @author Takho
      */
      @Description(name = "forx", value = "this is my first UDTF!", extended = "e.g : select forx(1,5,1);")
      public class ForUDTF extends GenericUDTF {
      private IntWritable start;// 起始
      private IntWritable end;// 结束
      private IntWritable inc;// 增量
      private Object[] forwardObj;


      @Override
      public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
      start = ((WritableConstantIntObjectInspector) argOIs[0]).getWritableConstantValue();
      end = ((WritableConstantIntObjectInspector) argOIs[1]).getWritableConstantValue();
      if (argOIs.length == 3) {
      inc = ((WritableConstantIntObjectInspector) argOIs[2]).getWritableConstantValue();
      } else
      inc = new IntWritable(1);
      this.forwardObj = new Object[1];
      ArrayList<String> fieldNames = new ArrayList<>();
      ArrayList<ObjectInspector> fieldOis = new ArrayList<>();
      fieldNames.add("col0");
      fieldOis.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.INT));
      return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOis);
      }

      @Override
      public void process(Object[] objects) throws HiveException {

      for (int i = start.get(); i < end.get(); i = i + inc.get()) {
      this.forwardObj[0] = new Integer(i);
      forward(forwardObj);
      }


      }

      @Override
      public void close() throws HiveException {

      }
      }
    1. 编译、打包、导出
    • mvn package -Dmaven.tests.skip=true
    1. 复制jar到ubuntu
    1. 使用hive的add jar命令 添加jar到classpath
    • add jar /p/xxx.jar;
    1. 创建临时函数
    • create temporary function forx as ‘xx.xx.xx.ForxUDTF’;
    1. 调用函数
    • select forx(1,5,1);

#OLAP

  • 仓库软件,类SQL语言,HiveQL,HQL,结构化数据
  • schema : rdbms(derby – mysql) , hdfs
  • 延迟高