InputFormat输入接口

FileInputFormat<K,V>

TextInputFormat 默认实现方式

默认实现方式, 一次读取一个文件

//TextInputFormat的输入类型默认为LongWritable和Text, 即KEYIN:LongWritable, VALUEIN: Text
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {

        // 获取分隔符, 并转换成字节数组Byte[]形式
        byte[] recordDelimiterBytes = null;
        String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
        if (null != delimiter) {
            recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
        }

        // 创建一个行扫描器, 指定分隔符
        return new LineRecordReader(recordDelimiterBytes);
    }

    //默认情况下, 都认为文件是可切分的, 只有一些压缩类型的文件不支持切片
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
        if (null == codec) {
            return true;
        }
        return codec instanceof SplittableCompressionCodec;
    }
}

CombineFileInputFormat<K,V>

一次读取多个文件, 主要用于处理多个小文件的场景

*虚拟存储过程: *

  1. 如果文件大小满足FileSize > 2 * maxInputSplitSize, 那么不停的分出一片大小为maxInputSplitSize的虚拟文件, 直至FileSize <= 2 * maxInputSplitSize
  2. 如果文件大小满足maxInputSplit < FileSize <= 2 * maxInputSplitSize, 则分成两片大小相等的虚拟文件, 即FileSize/2
  3. 否则, 文件单独为一个虚拟文件

切片过程:

  1. 如果虚拟文件大小 fileSize < maxInputSplitSize, 那么和下一个虚拟文件合并称为一个切片split
  • 设置maxInputSplitSize后的执行效果

    image-20221030192951650

  • 不设置maxInputSplitSize的效果

    使用默认值, 其默认值非常大, 因此, 在大多数情况下, 只有一个切片, 相当于将所有文件打包生成一个文件, 并且不进行切片

  • KeyValueTextInputFormat

  • SequenceFileInputFormat<K,V>

  • NlineInputFormat

  • 自定义

Shuffle过程

Map阶段的Shuffle

Reduce阶段的Shuffle

OutputFormat输出接口

FileOutputFormat

TextOutputFormat(默认方式)

自定义方式

  1. 继承FileOutputFormat抽象类
  2. 实现RecordWriter中的write()方法

   转载规则


《》 熊水斌 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
溢写文件在哪(MapTask 工作流程) Mapper 的 run() 方法 Mapper 的 cleanup() 方法 output.close(mapperContext) 中 collector.flush() 将环形缓冲区中的数据溢
2022-11-11
下一篇 
Hive基本概念Hive: 由Facebook开源, 用于解决海量结构化日志的数据统计工具 Hive是基于Hadoop的一个数据仓库工具, 可以将结构化的数据文件映射为一张表, 并提供类SQL查询功能 Hive的本质: 将HQL(Hive
2022-11-11
  目录