前言作为大数据开发者深入理解MapReduce的框架原理至关重要。本文将从InputFormat数据输入、Shuffle机制、Partition分区、Combiner合并、Join应用和数据压缩六大核心模块结合源码与架构图带你彻底搞懂MapReduce的底层设计。一、MapReduce核心架构回顾MapReduce程序运行时有三类实例进程进程职责MrAppMaster负责整个程序的过程调度及状态协调ApplicationMaster的子类MapTask负责Map阶段的整个数据处理流程ReduceTask负责Reduce阶段的整个数据处理流程注意在查看进程时MapTask和ReduceTask显示为yarnchild是YARN的子进程。二、InputFormat数据输入与切片机制2.1 数据块 vs 数据切片概念说明特点数据块BlockHDFS物理上把数据分成一块一块物理切割默认128M存储在不同DataNode数据切片Split逻辑上对输入进行分片逻辑切割不实际切分文件决定MapTask数量关键原则一个Job的Map阶段并行度由切片数决定每一个Split切片分配一个MapTask并行实例处理默认情况下切片大小 BlockSize128M切片时不考虑数据集整体而是逐个针对每一个文件单独切片2.2 切片源码解析// FileInputFormat.getSplits() 核心逻辑longsplitSizecomputeSplitSize(blockSize,minSize,maxSize);// 其中 computeSplitSize Math.max(minSize, Math.min(maxSize, blockSize))参数优先级minSize(1) splitSize maxSize(Long.MAX_VALUE)默认配置下splitSize blockSize 128M2.3 切片大小设置!-- mapred-site.xml --propertynamemapreduce.input.fileinputformat.split.minsize/namevalue1/value/propertypropertynamemapreduce.input.fileinputformat.split.maxsize/namevalueLong.MAX_VALUE/value/property生产环境中若磁盘传输速度快如SSD可将BlockSize和切片大小调整为256M。2.4 FileInputFormat实现类实现类输入K,V类型特点TextInputFormatLongWritable, Text默认实现按行读取K为偏移量KeyValueTextInputFormatText, Text按分隔符切割第0位为K第1位为VNLineInputFormatLongWritable, Text一次读取N行效率更高CombineTextInputFormatLongWritable, Text解决小文件问题合并多个文件为一个切片2.5 CombineTextInputFormat解决小文件问题问题场景大量1KB小文件每个文件开启一个MapTask占用1G内存1个CPU资源严重浪费。解决方案将多个小文件合并成一个切片处理。// 在Driver中设置job.setInputFormatClass(CombineTextInputFormat.class);CombineTextInputFormat.setMaxInputSplitSize(job,4194304);// 4MB虚拟存储与切片过程示例4个小文件1.7M、5.1M、3.4M、6.8M设置maxSize4M虚拟存储后1.7M, (2.55M, 2.55M), 3.4M, (3.4M, 3.4M) 最终切片(1.72.55)M, (2.553.4)M, (3.43.4)M → 3个切片3个MapTask三、MapReduce完整工作流程3.1 工作流程详解阶段操作① 切片分析客户端对原始文件进行切片200M文件分成2片0-128M, 128-200M② 提交三样东西Jar包、切片规划文件、job.xml配置参数③ 启动MrAppMasterYARN启动MrAppMaster读取切片信息决定MapTask数量④ MapTask处理用InputFormat读取数据默认TextInputFormat→LineRecordReaderK为偏移量V为行内容⑤ Mapper业务逻辑用户自定义map()方法处理数据⑥ 输出到环形缓冲区数据进入环形缓冲区左侧存索引右侧存数据⑦ Shuffle阶段分区→排序→溢写→合并→压缩→Reduce拉取→归并排序→分组⑧ Reduce处理用户自定义reduce()方法聚合数据⑨ OutputFormat输出默认TextOutputFormat按行写入HDFS四、Shuffle机制面试重点Shuffle是MapReduce的核心与灵魂是Map方法之后、Reduce方法之前的数据处理过程。4.1 环形缓冲区Map端环形缓冲区结构左侧存储索引Metadata—— 分区号、keystart、valstart、vallen右侧存储真实数据Record—— K,V序列化后的字节默认大小100M溢写阈值80%反向溢写预留20%给溢写线程为什么80%就溢写如果等到100%再溢写必须等所有数据写完磁盘才能继续写入缓冲区效率低。80%时开启溢写线程边写边处理。4.2 Map端Shuffle详细流程Map输出 → 分区(Partition) → 进入环形缓冲区 → 达到80% → 快排(按K索引排序) → 溢写到磁盘(spill.out spill.index) → 多次溢写文件 → 归并排序(Merge) → Combiner(可选) → 压缩(可选) → 等待Reduce拉取关键细节排序不是移动数据快排是对索引排序不是移动真实的K,V数据分区内部有序溢写前按分区号排序同一分区内按Key字典序排序溢写文件结构一个文件包含多个分区数据通过index文件记录每个分区的偏移量4.3 Reduce端Shuffle详细流程ReduceTask启动 → 从各MapTask拉取指定分区数据 → 内存缓冲/磁盘溢写 → 归并排序所有拉取的数据 → 按Key分组 → 相同Key进入同一个Reduce方法Reduce拉取特点主动拉取ReduceTask主动从MapTask拉取数据不是MapTask推送内存磁盘拉取的数据先放内存不够则溢写到磁盘全局排序对所有拉取的数据进行一次归并排序保证相同Key连续4.4 Shuffle参数调优!-- 环形缓冲区大小默认100M --propertynamemapreduce.task.io.sort.mb/namevalue100/value/property!-- 溢写阈值百分比默认0.8 --propertynamemapreduce.map.sort.spill.percent/namevalue0.80/value/property!-- 归并时一次合并的文件数默认10 --propertynamemapreduce.task.io.sort.factor/namevalue10/value/property五、Partition分区机制5.1 默认分区器 HashPartitionerpublicclassHashPartitionerK,VextendsPartitionerK,V{publicintgetPartition(Kkey,Vvalue,intnumReduceTasks){return(key.hashCode()Integer.MAX_VALUE)%numReduceTasks;}}原理key.hashCode() Integer.MAX_VALUE防止负数再对ReduceTask个数取余。5.2 自定义分区案例需求将手机号按归属地输出到不同文件136、137、138、139、其他publicclassProvincePartitionerextendsPartitionerText,FlowBean{OverridepublicintgetPartition(Texttext,FlowBeanflowBean,intnumPartitions){Stringphonetext.toString();StringprePhonephone.substring(0,3);intpartition;if(136.equals(prePhone))partition0;elseif(137.equals(prePhone))partition1;elseif(138.equals(prePhone))partition2;elseif(139.equals(prePhone))partition3;elsepartition4;returnpartition;}}Driver设置job.setPartitionerClass(ProvincePartitioner.class);job.setNumReduceTasks(5);// 必须等于分区数5.3 分区数与ReduceTask数的关系ReduceTask数结果 分区数正常执行每个分区一个输出文件 分区数IOException数据无处写入 分区数正常执行多余ReduceTask输出空文件 1不执行分区过程所有数据进入0号分区输出一个文件源码验证MapTask中执行分区的前提是numReduceTasks 1六、WritableComparable排序机制6.1 排序发生的时机MapReduce中共发生3次排序阶段排序类型算法说明Map端快速排序QuickSort环形缓冲区溢写前对索引排序Map端归并排序MergeSort多个溢写文件合并时Reduce端归并排序MergeSort拉取所有Map数据后全局排序6.2 为什么必须排序核心原因让相同Key的数据连续排列Reduce只需顺序扫描即可判断Key是否相同无需全量遍历。未排序(a,1) (b,1) (a,1) (d,1) → 需要遍历全部判断相同Key 已排序(a,1) (a,1) (b,1) (d,1) → 顺序扫描遇到不同Key即停止6.3 自定义排序实现需求按总流量倒序排序publicclassFlowBeanimplementsWritableComparableFlowBean{privatelongupFlow;privatelongdownFlow;privatelongsumFlow;// 实现compareTo方法OverridepublicintcompareTo(FlowBeano){// 总流量倒序if(this.sumFlowo.sumFlow)return-1;elseif(this.sumFlowo.sumFlow)return1;else{// 二次排序总流量相同按上行流量正序if(this.upFlowo.upFlow)return1;elseif(this.upFlowo.upFlow)return-1;elsereturn0;}}}6.4 排序类型总结排序类型说明部分排序对每个输出文件内部排序默认行为全排序所有数据全局排序通常只有1个ReduceTask二次排序排序条件有两个第一个条件相同按第二个排区内排序每个分区内部独立排序七、Combiner合并机制7.1 Combiner的作用场景Map端输出(a,1)出现1万次不开启Combiner需传输1万条到Reduce开启后传输(a,10000)只需1条。核心价值减少Map到Reduce的网络传输量提前局部聚合。7.2 Combiner使用前提必须满足Combiner的逻辑不影响最终结果。场景是否可用原因求和可用(a,1)(a,1)(a,2)结果正确求平均值不可用(12)/21.5≠(1234)/42.5求最大值可用max(max(a,b),c) max(a,b,c)7.3 Combiner代码实现// 方式一自定义Combiner类publicclassWordCountCombinerextendsReducerText,IntWritable,Text,IntWritable{privateIntWritableoutVnewIntWritable();Overrideprotectedvoidreduce(Textkey,IterableIntWritablevalues,Contextcontext)throwsIOException,InterruptedException{intsum0;for(IntWritablevalue:values){sumvalue.get();}outV.set(sum);context.write(key,outV);}}// Driver中设置job.setCombinerClass(WordCountCombiner.class);// 方式二若Combiner逻辑与Reducer完全相同直接用Reducerjob.setCombinerClass(WordCountReducer.class);注意若setNumReduceTasks(0)则不执行Shuffle阶段Combiner也不生效。八、OutputFormat数据输出8.1 默认输出 TextOutputFormat将每个K,V对按行输出到文本文件。8.2 自定义OutputFormat案例需求将包含atguigu的日志输出到atguigu.log其他输出到other.log。// 1. 自定义OutputFormatpublicclassLogOutputFormatextendsFileOutputFormatText,NullWritable{OverridepublicRecordWriterText,NullWritablegetRecordWriter(TaskAttemptContextjob)throwsIOException,InterruptedException{returnnewLogRecordWriter(job);}}// 2. 自定义RecordWriterpublicclassLogRecordWriterextendsRecordWriterText,NullWritable{privateFSDataOutputStreamatguiguOut;privateFSDataOutputStreamotherOut;publicLogRecordWriter(TaskAttemptContextjob){FileSystemfsFileSystem.get(job.getConfiguration());atguiguOutfs.create(newPath(d:/hadoop/atguigu.log));otherOutfs.create(newPath(d:/hadoop/other.log));}Overridepublicvoidwrite(Textkey,NullWritablevalue)throwsIOException{Stringlogkey.toString();if(log.contains(atguigu)){atguiguOut.writeBytes(log\n);}else{otherOut.writeBytes(log\n);}}Overridepublicvoidclose(TaskAttemptContextcontext)throwsIOException{IOUtils.closeStream(atguiguOut);IOUtils.closeStream(otherOut);// 必须关流否则文件为空}}// 3. Driver设置job.setOutputFormatClass(LogOutputFormat.class);九、MapTask与ReduceTask工作机制源码解析9.1 MapTask五个阶段阶段操作Read通过RecordReader从InputSplit解析K,VMap调用用户自定义map()函数处理Collect调用OutputCollector.collect()分区后写入环形缓冲区Spill缓冲区满80%后溢写到磁盘先快排再按分区写入Merge所有溢写文件归并成一个最终文件每轮合并10个文件9.2 ReduceTask三个阶段阶段操作Copy从各MapTask拉取指定分区数据内存不够则溢写磁盘Sort对内存和磁盘上的文件进行归并排序Reduce调用用户自定义reduce()函数结果写入HDFS9.3 ReduceTask并行度决定// 手动设置ReduceTask数量job.setNumReduceTasks(4);实验结论1GB数据16个MapTaskReduceTask151015162025304560总时间(s)8921461109288100128101145104最佳实践ReduceTask数量并非越多越好需根据数据量和集群资源调优。一般设置为节点数的0.95倍或1.75倍。十、Join应用10.1 Reduce Join原理Map端为数据打标签区分来源以连接字段为Key输出Reduce端按Key分组合并不同来源的数据。缺点合并操作在Reduce端完成Reduce端压力大易产生数据倾斜。// Mapper中通过setup获取文件名打标签Overrideprotectedvoidsetup(Contextcontext)throwsIOException{InputSplitsplitcontext.getInputSplit();FileSplitfileSplit(FileSplit)split;filenamefileSplit.getPath().getName();}Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext){if(filename.contains(order)){// 订单表处理flagorder}else{// 商品表处理flagpd}}10.2 Map Join推荐适用场景一张表很小可放入内存一张表很大。优势在Map端缓存小表提前处理Join逻辑无需Reduce阶段避免数据倾斜。// Driver中加载缓存文件job.addCacheFile(newURI(file:///D:/input/tablecache/pd.txt));job.setNumReduceTasks(0);// 不需要Reduce// Mapper中setup读取缓存Overrideprotectedvoidsetup(Contextcontext)throwsIOException{URI[]cacheFilescontext.getCacheFiles();PathpathnewPath(cacheFiles[0]);FileSystemfsFileSystem.get(context.getConfiguration());FSDataInputStreamfisfs.open(path);BufferedReaderreadernewBufferedReader(newInputStreamReader(fis));Stringline;while(StringUtils.isNotEmpty(linereader.readLine())){String[]splitline.split(\t);pdMap.put(split[0],split[1]);// pid - pname}}Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext){// 直接通过pdMap获取pname无需ReduceStringpnamepdMap.get(fields[1]);}十一、Hadoop数据压缩11.1 压缩算法对比压缩格式自带算法可切片压缩率速度适用场景Gzip是DEFLATE否高一般归档Bzip2是bzip2是最高慢冷数据LZO否LZO是一般快需建索引Snappy是Snappy否一般最快热数据、实时11.2 压缩位置选择输入文件 → [InputFormat] → Map输出 → [Shuffle] → Reduce输出 → [OutputFormat] → HDFS ↑压缩可选 ↑强烈建议压缩 ↑压缩可选11.3 压缩参数配置// Map输出端压缩conf.setBoolean(mapreduce.map.output.compress,true);conf.setClass(mapreduce.map.output.compress.codec,BZip2Codec.class,CompressionCodec.class);// Reduce输出端压缩FileOutputFormat.setCompressOutput(job,true);FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);十二、MapReduce开发总结组件核心要点InputFormat默认TextInputFormatCombineTextInputFormat解决小文件Mapper实现map()、setup()、cleanup()三个方法Partitioner默认HashPartitioner自定义需配合setNumReduceTasksComparableKey必须实现WritableComparable重写compareTo()Combiner提前聚合减少IO但需保证不影响最终结果Reducer实现reduce()、setup()、cleanup()三个方法OutputFormat默认TextOutputFormat可自定义输出到数据库等十三、常见错误及解决方案错误原因解决方案Illegal partition for xxxPartition和ReduceTask个数不匹配调整ReduceTask个数等于分区数类型转换异常Map输入参数类型错误Mapper第一个输入必须是LongWritable或NullWritable输出文件夹已存在MapReduce不允许覆盖删除旧目录或更换输出路径Unsupported major.minor version 52.0JDK版本不一致统一Windows和Linux的JDK版本找不到缓存文件路径错误或文件名多了.txt检查路径使用绝对路径自定义OutputFormat输出为空RecordWriter未关闭流在close()方法中关闭所有输出流ReduceTask0时Combiner不生效无Shuffle阶段正常现象Combiner依赖Shuffle