温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

MapTask流程是怎样的

发布时间:2021-12-23 16:08:26 来源:亿速云 阅读:165 作者:iii 栏目:大数据

这篇文章主要讲解了“MapTask流程是怎样的”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“MapTask流程是怎样的”吧!

MapTask流程源码解读

1、从job提交流程的24步,开始mapTask的流程分析,进入submitJob  --LocalJobRunner.java中的788行
Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);  //创建一个可以真正执行的Job
该Job: LocalJobRunner$Job , 且是一个线程   $表示内部类
2、因为当前的Job对象是一个线程,所有执行线程要执行run方法,因此直接找到 LocalJobRunner的run方法进行查看
   --定位到537行
TaskSplitMetaInfo[] taskSplitMetaInfos = 
          SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
//读取切片的metainfo信息,即提交job过程中在临时目录中生成的job.splitmetainfo文件
3、向下走断点,定位到下方代码  --547行
List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
               taskSplitMetaInfos, jobId, mapOutputFiles);
//根据切片的metainfo信息,可以得出有多少个切片,再生成对应个数的Runnable对象.
每个Runnable对象对应一个线程,每一个MapTask运行在一个线程中(基于本地模式的分析)
Runnable : LocalJobRunnber$Job$MapTaskRunnable  ---联想到线程
4、ExecutorService mapService = createMapExecutor();   //创建线程池对象
runTasks(mapRunnables, mapService, "map");// 将所有的LocalJobRunnber$Job$MapTaskRunnable对象提交给
线程池执行,进入到runTasks方法内部。		--LocalJobRunner中的466行
5、//每个线程负责一个Runnable执行,定位到每个Runnable内部的run方法,查看具体执行(以内部类的方式嵌套)
for (Runnable r : runnables) {
        service.submit(r);
   }
LocalJobRunnber$Job$MapTaskRunnable交给每个线程执行时,会执行到 
LocalJobRunnber$Job$MapTaskRunnable的run方法,因此接下来看
LocalJobRunnber$Job$MapTaskRunnable的run方法     --LocalJobRunner中的248行

MapTask流程是怎样的

6、进入到run方法内部,定位到254行
MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
            info.getSplitIndex(), 1); 
//创建MapTask对象   --在每一个线程中都会执行,会创建一个mapTask对象

7、进入map.run(localConf, Job.this); --271行 //执行MapTask的run方法,关联到MapTask方法中的run MapTask流程是怎样的

进入到MapTask的run方法内
首先进行分区设置
partitions = jobContext.getNumReduceTasks();
      if (partitions > 1) {
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }

8、定位到MapTask中run方法的347行,并进入runNewMapper()方法,提前判断下是否使用新的api
进入runNewMapper()方法,定位到MapTask的745行开始读源码
9、--反射的方式创建Mapper对象.  例如: WordCountMapper
	org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)

        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

   --反射的方式创建Inputformat对象, 例如:  TextInputFormat(默认)
     org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
	  
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

    --获取当前MapTask所负责的切片信息
     org.apache.hadoop.mapreduce.InputSplit split = null;
    	split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
		
        splitIndex.getStartOffset());

    --获取RecordReader对象
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);

10、向下读取,定位到MapTask的782行 output = new NewOutputCollector(taskContext, job, umbilical, reporter);方法进入

11、定位到MapTask的710行
	collector = createSortingCollector(job, reporter);   //收集器对象,可以理解为缓冲区对象
12、进入到createSortingCollector方法,    --MapTask中的388行
13、collector.init(context);		--初始化缓冲区对象 collector: MapTask$MapOutputBuffer
14、进入到init方法中   --MapTask的968行
15、
①:定位到init方法的980行
--//获取溢写百分比 80%,通过mapreduce.map.sort.spill.percent参数来配置
 final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);

--//获取缓冲区大小 100M,	通过 mapreduce.task.io.sort.mb 参数来配置
 final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
          MRJobConfig.DEFAULT_IO_SORT_MB);

--//获取排序对象  QuickSort.class, 只排索引
sorter = ReflectionUtils.newInstance(job.getClass(
                   MRJobConfig.MAP_SORT_CLASS, QuickSort.class,
                   IndexedSorter.class), job);
--//获取key的比较器对象
	comparator = job.getOutputKeyComparator();
--//获取key的序列化对象	k/v serialization  获取kv的序列化对象
--//获取计数器对象	output counters
--//compression  获取编解码器,进行压缩操作
--//combiner 获取Combiner对象,在溢写及归并可以使用combiner
--//spillThread.start(); 启动溢写线程  ,只有达到溢写百分比才会发生溢写操作
16、mapper.run(mapperContext);执行到Mapper对象中的run方法,例如WordCountMapper中的run方法
进入到mapper.run()方法内
执行 setup(context);	--143行
执行 map(context.getCurrentKey(), context.getCurrentValue(), context);	--146行,
进入到wordCount中的map()方法,是一个循环执行的过程
context.wirte(outK,outV);将map方法中处理好的kv写出
执行cleanup(context);

感谢各位的阅读,以上就是“MapTask流程是怎样的”的内容了,经过本文的学习后,相信大家对MapTask流程是怎样的这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI