从文件到Mapper

输入分片InputSplit

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputSplit {

    //以字节(B)为单位的切片长度
    //使用切片长度进行排序, 优先处理最大的分片, 从而最小化作业运行事件
    public abstract long getLength() throws IOException, InterruptedException;

    //切片的存储位置(一组主机的hostname)
    public abstract String[] getLocations() throws IOException, InterruptedException;

    @Evolving
    public SplitLocationInfo[] getLocationInfo() throws IOException {
        return null;
    }
}

FileSplit

  • getPath(): 获取包含split的文件路径
  • getStart(): 获取待处理文件的第一个字节所在的位置
  • getLength(): 获取待处理文件字节总数(文件长度)

CombineFileSplilt

InputFormat

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputFormat<K, V> {

    //获取job任务所有输入文件集合的逻辑分片(split)
    //逻辑分片(split) != 物理块(chunk)
    public abstract List<InputSplit> getSplits(JobContext context) 
        throws IOException, InterruptedException;

    //对于每一个给定的split创建一个record reader
    public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) 
        throws IOException, InterruptedException;
}
  • jobClient.getSplits(): 计算split的数量(分片数)

  • mapTask任务把输入的split^1传给InputFormat.createRecordReader()方法, 从而获取这个分片的RecordReader.

    RecordReader就是记录records的迭代器[^2]

  • map任务用一个RecordReader来生成记录的键值对, 然后传递给Mapper.map()函数

FileInputFormat

  • 定义了哪些文件包含在一个作业的输入中
  • 为输入文件生成分片, 把split分割成record的任务交给其子类完成
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
    public static final String INPUT_DIR = "mapreduce.input.fileinputformat.inputdir";
    public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";
    public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";
    public static final String PATHFILTER_CLASS = "mapreduce.input.pathFilter.class";
    public static final String NUM_INPUT_FILES = "mapreduce.input.fileinputformat.numinputfiles";
    public static final String INPUT_DIR_RECURSIVE = "mapreduce.input.fileinputformat.input.dir.recursive";
    public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS = "mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs";
    public static final String LIST_STATUS_NUM_THREADS = "mapreduce.input.fileinputformat.list-status.num-threads";
    public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;

    private static final Logger LOG = LoggerFactory.getLogger(FileInputFormat.class);

    private static final double SPLIT_SLOP = 1.1;   // 10% slop

    @Deprecated
    public enum Counter {
        BYTES_READ
    }

    private static final PathFilter hiddenFileFilter = new PathFilter(){
        public boolean accept(Path p){
            String name = p.getName(); 
            return !name.startsWith("_") && !name.startsWith("."); 
        }
    }; 

    private static class MultiPathFilter implements PathFilter {
        private List<PathFilter> filters;

        public MultiPathFilter(List<PathFilter> filters) {
            this.filters = filters;
        }

        public boolean accept(Path path) {
            for (PathFilter filter : filters) {
                if (!filter.accept(path)) {
                    return false;
                }
            }
            return true;
        }
    }

    //设置递归地处理文件夹
    public static void setInputDirRecursive(Job job, boolean inputDirRecursive) {
        job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, inputDirRecursive);
    }

    public static boolean getInputDirRecursive(JobContext job) {
        return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE, false);
    }

    protected long getFormatMinSplitSize() {
        return 1;
    }

    //文件是否切分, 即是否一个文件必须作为一个整体处理
    protected boolean isSplitable(JobContext context, Path filename) {
        return true;
    }

    //设置文件过滤器, 用以排除特定的文件
    public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter) {
        job.getConfiguration().setClass(PATHFILTER_CLASS, filter, PathFilter.class);
    }

    public static void setMinInputSplitSize(Job job, long size) {
        job.getConfiguration().setLong(SPLIT_MINSIZE, size);
    }

    public static long getMinSplitSize(JobContext job) {
        return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
    }


    public static void setMaxInputSplitSize(Job job, long size) {
        job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
    }


    public static long getMaxSplitSize(JobContext context) {
        return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
    }

    public static PathFilter getInputPathFilter(JobContext context) {
        Configuration conf = context.getConfiguration();
        Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null, PathFilter.class);
        return (filterClass != null) ? (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
    }


    protected List<FileStatus> listStatus(JobContext job) throws IOException {
        Path[] dirs = getInputPaths(job);
        if (dirs.length == 0) {
            throw new IOException("No input paths specified in job");
        }

        TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration());

        boolean recursive = getInputDirRecursive(job);

        List<PathFilter> filters = new ArrayList<PathFilter>();
        filters.add(hiddenFileFilter);
        PathFilter jobFilter = getInputPathFilter(job);
        if (jobFilter != null) {
            filters.add(jobFilter);
        }
        PathFilter inputFilter = new MultiPathFilter(filters);

        List<FileStatus> result = null;

        int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, DEFAULT_LIST_STATUS_NUM_THREADS);
        StopWatch sw = new StopWatch().start();
        if (numThreads == 1) {
            result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
        } else {
            Iterable<FileStatus> locatedFiles = null;
            try {
                LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(job.getConfiguration(), dirs, recursive, inputFilter, true);
                locatedFiles = locatedFileStatusFetcher.getFileStatuses();
            } catch (InterruptedException e) {
                throw (IOException)
                    new InterruptedIOException(
                    "Interrupted while getting file statuses")
                    .initCause(e);
            }
            result = Lists.newArrayList(locatedFiles);
        }

        sw.stop();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Time taken to get FileStatuses: " + sw.now(TimeUnit.MILLISECONDS));
        }
        LOG.info("Total input files to process : " + result.size());
        return result;
    }

    private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs, PathFilter inputFilter, boolean recursive) throws IOException {
        List<FileStatus> result = new ArrayList<FileStatus>();
        List<IOException> errors = new ArrayList<IOException>();
        for (int i=0; i < dirs.length; ++i) {
            Path p = dirs[i];
            FileSystem fs = p.getFileSystem(job.getConfiguration()); 
            FileStatus[] matches = fs.globStatus(p, inputFilter);
            if (matches == null) {
                errors.add(new IOException("Input path does not exist: " + p));
            } else if (matches.length == 0) {
                errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
            } else {
                for (FileStatus globStat: matches) {
                    if (globStat.isDirectory()) {
                        RemoteIterator<LocatedFileStatus> iter =
                            fs.listLocatedStatus(globStat.getPath());
                        while (iter.hasNext()) {
                            LocatedFileStatus stat = iter.next();
                            if (inputFilter.accept(stat.getPath())) {
                                if (recursive && stat.isDirectory()) {
                                    addInputPathRecursively(result, fs, stat.getPath(),
                                                            inputFilter);
                                } else {
                                    result.add(stat);
                                }
                            }
                        }
                    } else {
                        result.add(globStat);
                    }
                }
            }
        }

        if (!errors.isEmpty()) {
            throw new InvalidInputException(errors);
        }
        return result;
    }

    protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, PathFilter inputFilter) 
        throws IOException {
        RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
        while (iter.hasNext()) {
            LocatedFileStatus stat = iter.next();
            if (inputFilter.accept(stat.getPath())) {
                if (stat.isDirectory()) {
                    addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
                } else {
                    result.add(stat);
                }
            }
        }
    }

    //产生FileSplit切片
    protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
        return new FileSplit(file, start, length, hosts);
    }


    protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
        return new FileSplit(file, start, length, hosts, inMemoryHosts);
    }

    //files->split的方法
    //FileInputFormat的getSplits()方法决定: 1个file <-> 1个split
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        StopWatch sw = new StopWatch().start();
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);

        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);

        boolean ignoreDirs = !getInputDirRecursive(job) && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
        for (FileStatus file: files) {
            if (ignoreDirs && file.isDirectory()) {
                continue;
            }
            Path path = file.getPath();
            long length = file.getLen();
            if (length != 0) {
                BlockLocation[] blkLocations;
                if (file instanceof LocatedFileStatus) {
                    blkLocations = ((LocatedFileStatus) file).getBlockLocations();
                } else {
                    FileSystem fs = path.getFileSystem(job.getConfiguration());
                    blkLocations = fs.getFileBlockLocations(file, 0, length);
                }
                if (isSplitable(job, path)) {
                    long blockSize = file.getBlockSize();
                    long splitSize = computeSplitSize(blockSize, minSize, maxSize);

                    long bytesRemaining = length;
                    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                             blkLocations[blkIndex].getHosts(),
                                             blkLocations[blkIndex].getCachedHosts()));
                        bytesRemaining -= splitSize;
                    }

                    if (bytesRemaining != 0) {
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                                             blkLocations[blkIndex].getHosts(),
                                             blkLocations[blkIndex].getCachedHosts()));
                    }
                } else { // not splitable
                    if (LOG.isDebugEnabled()) {
                        // Log only if the file is big enough to be splitted
                        if (length > Math.min(file.getBlockSize(), minSize)) {
                            LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
                        }
                    }
                    splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
                }
            } else { 
                //Create empty hosts array for zero length files
                splits.add(makeSplit(path, 0, length, new String[0]));
            }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Total # of splits generated by getSplits: " + splits.size()
                      + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
        }
        return splits;
    }

    protected long computeSplitSize(long blockSize, long minSize,
                                    long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
    }

    protected int getBlockIndex(BlockLocation[] blkLocations, 
                                long offset) {
        for (int i = 0 ; i < blkLocations.length; i++) {
            // is the offset inside this block?
            if ((blkLocations[i].getOffset() <= offset) &&
                (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
                return i;
            }
        }
        BlockLocation last = blkLocations[blkLocations.length -1];
        long fileLength = last.getOffset() + last.getLength() -1;
        throw new IllegalArgumentException("Offset " + offset + 
                                           " is outside of file (0.." +
                                           fileLength + ")");
    }

    public static void setInputPaths(Job job, 
                                     String commaSeparatedPaths
                                    ) throws IOException {
        setInputPaths(job, StringUtils.stringToPath(
            getPathStrings(commaSeparatedPaths)));
    }


    public static void addInputPaths(Job job, 
                                     String commaSeparatedPaths
                                    ) throws IOException {
        for (String str : getPathStrings(commaSeparatedPaths)) {
            addInputPath(job, new Path(str));
        }
    }

    public static void setInputPaths(Job job, 
                                     Path... inputPaths) throws IOException {
        Configuration conf = job.getConfiguration();
        Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
        StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
        for(int i = 1; i < inputPaths.length;i++) {
            str.append(StringUtils.COMMA_STR);
            path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
            str.append(StringUtils.escapeString(path.toString()));
        }
        conf.set(INPUT_DIR, str.toString());
    }

    public static void addInputPath(Job job, 
                                    Path path) throws IOException {
        Configuration conf = job.getConfiguration();
        path = path.getFileSystem(conf).makeQualified(path);
        String dirStr = StringUtils.escapeString(path.toString());
        String dirs = conf.get(INPUT_DIR);
        conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
    }

    // This method escapes commas in the glob pattern of the given paths.
    private static String[] getPathStrings(String commaSeparatedPaths) {
        int length = commaSeparatedPaths.length();
        int curlyOpen = 0;
        int pathStart = 0;
        boolean globPattern = false;
        List<String> pathStrings = new ArrayList<String>();

        for (int i=0; i<length; i++) {
            char ch = commaSeparatedPaths.charAt(i);
            switch(ch) {
                case '{' : {
                    curlyOpen++;
                    if (!globPattern) {
                        globPattern = true;
                    }
                    break;
                }
                case '}' : {
                    curlyOpen--;
                    if (curlyOpen == 0 && globPattern) {
                        globPattern = false;
                    }
                    break;
                }
                case ',' : {
                    if (!globPattern) {
                        pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
                        pathStart = i + 1 ;
                    }
                    break;
                }
                default:
                    continue; // nothing special to do for this character
            }
        }
        pathStrings.add(commaSeparatedPaths.substring(pathStart, length));

        return pathStrings.toArray(new String[0]);
    }

    public static Path[] getInputPaths(JobContext context) {
        String dirs = context.getConfiguration().get(INPUT_DIR, "");
        String [] list = StringUtils.split(dirs);
        Path[] result = new Path[list.length];
        for (int i = 0; i < list.length; i++) {
            result[i] = new Path(StringUtils.unEscapeString(list[i]));
        }
        return result;
    }

}

Mapper

模板方法设计模式

  • 流程控制方法

    run()是唯一在外部被调用的方法

  • 具体操作方法

    • setup()
    • map(): 从输入的R
    • clearnup()
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

    public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    }

    protected void setup(Context context) throws IOException, InterruptedException {
    }

    //对于某一个具体的MapReduce程序, 应该重载map()方法
    //默认实现相当于将RecordReader中的key,value不做任何处理
    @SuppressWarnings("unchecked")
    protected void map(KEYIN key, VALUEIN value, Context context) 
        throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
    }

    protected void cleanup(Context context) throws IOException, InterruptedException {
    }

    //每个mapTask执行一次
    public void run(Context context) throws IOException, InterruptedException {
        //每个split执行一次
        setup(context);
        try {
            //每条record执行一次Mapper.map()方法
            while (context.nextKeyValue()) {
                //context.getCurrentKey(): RecordReader中设置的key
                //context.getCurrentValue(): RecordReader中设置的value
                map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            cleanup(context);
        }
    }
}

[^2]:1个split = 多个record


   转载规则


《》 熊水斌 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
SpringData架构对各种数据库操作的中间件, 提供一套统一的数据访问API, 支持关系型数据库和非关系型数据库 SpringData的主要模块 Spring Data Common: Spring Data的核心模块, 定义了Sp
2022-11-26
下一篇 
导入组件的方式: @Component + @ComponentScan 搭配使用 @Bean @Import ImportSelector ImportBeanDefinitionRegistry FactoryBean 接
2022-11-18
  目录