hadoop初探

MapReduce:Job

0.术语
    MapTask : map任务
    Reduce  : task
1.编程模型
    map(映射) + reduce (化简)
2.

Stage

  • 1.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16

    //main()
    job.waitForCompletion();


    //提交作业给cluster,并等待完成

    org.apache.hadoop.mapreduce.job

    submit(){
    1.确保状态
    2.使用新型API
    3.connect() //创建cluster对象
    4.创建JobSubmitter
    5.JobSubmitter.submitJobInternal()
    }
  • 2.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    org.apache.hadoop.mapreduce.JobSubmitter
    submitJobInternal{
    1.检查输出目录
    2.确立hdfs的临时目录
    3.取得本地ip
    4.创建JobID
    5.copyAndConfigureFiles //使用命令行参数设置conf信息
    6.int maps = writeSplites() //在临时目录下创建切片文件
    7.setConf(maps.map)
    8.writeConf() //提交job.xml到提交目录
    9.LocalJobRunner.submitJob() //通过执行器提交作业

    }
  • 3.
    1
    2
    3
    4
    5
    6
    mapred.LocalJobRunner
    submitJob(){
    1.创建LocalJobRunner.Job内部类对象
    2.setXxxxx...

    }
  • 4.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37

    LocalJobRunner.Job
    Job(){
    1.通过临时目录下job.xml创建JobConf对象
    2.this.start() // ###启动Job线程
    }

    // 看看Job的run方法
    run(){

    1.得到切片信息 SplitTaskMetaInfo[]

    2.得到Reduce任务数

    3.mapRunnables=getMapTaskRunnables(..)
    // 得到mapper对应的runnable [Runner.Job.MapTaskRunnable]

    4.mapService
    //创建供mapper阶段使用的线程池

    5.runTask(mapRunnables,mapService,"map")

    6.reduceRunnables=getReduceTaskRunnables(jobId, mapOutputFiles)
    // 得到reduce对应的Runnable [Runner.Job.ReduceTaskRunnable]

    7.reduceService
    //创建供reducer阶段使用的线程池

    8.runtask(reduceRunnables,reduceService,"reduce")

    }

    runtask(){
    for (Runnable r : runnables){
    service.submit(r);
    }
    }

What is going on after startting the Job?

  • Map & Reduce

Phase.1 Map

  • 1.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    LocalJobRunner.Job.MapTaskRunnable
    run(){
    1.创建MapAttemptId
    2.创建map=MapTask(splitInfo,job.xml)
    3.创建mapOutputFiles
    4.map.setXxx
    5.map.run()

    }
  • 2.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
org.apache.hadoop.mapred.MapTask
run(){
1.
2.runNewMapper(...)
}
runNewMapper(){
1.创建taskContext对象
2.通过反射创建自定义Mapper对象
//获取自定义mapper对象

3.创建inputFormat(创建输出格式)
4.重建Split (重建切片)
5.创建NewOutputCollector


6.mappperContext = 通过newOutputCollecotr获取mapperContext对象
//获取Context对象

7.mapper.run(mapperContext)
}
  • 3.
1
2
3
4
5
6
7
8
9
10
11
12
13
MyMaxTempMapper

run(){

setup()
while(){
map()
// 这个抽象方法就是需要实现的映射逻辑
}
cleanup();
}

}

Phase 2 Reduce

  • 1.

    1
    2
    3
    4
    5
    6
    7
    8
    LocalJobRunner.Job.ReduceTaskRunnable
    run(){
    1.创建ReduceAttemptId
    2.创建reduce=ReduceTask(systemJobFile(结果集输出路径),taskId,mapId.size(映射结果个数),numSlotsRequire(默认:1))
    3.创建mapOutputFiles
    4.reduce.setXxx
    5.reduce.run()
    }
  • 2.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
org.apache.hadoop.mapred.ReduceTask
run(){
1.condition -> isMapOrReduce
2.boolean useNewApi = job.getUseNewReducer()
// 通过配置项 mapred.reducer.new-api 判断是否使用新api
3.initialize()
4.获取Context
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context()
5.sortPhase.complete()
6.runNewReducer()
}

runNewReducer(){
1.创建taskContext对象
2.通过反射创建自定义Reducer对象
//获取自定义Reducer对象

3.创建inputFormat(创建输出格式)
4.重建Split (重建切片)
5.创建NewOutputCollector
6.reducerContext = 通过Reducer对象获取reducerContext对象
//获取reducerContext

7.reducer.run(reducerContext)
}
  • 3.
1
2
3
4
5
6
7
8
9
10
11
12
13
MyMaxTempReducer

run(){

setup()
while(){
reduce()
// 这个抽象方法就是需要实现的对映射结果集化简的逻辑
}
cleanup();
}

}