从文件到Mapper
输入分片InputSplit
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputSplit {
//以字节(B)为单位的切片长度
//使用切片长度进行排序, 优先处理最大的分片, 从而最小化作业运行事件
public abstract long getLength() throws IOException, InterruptedException;
//切片的存储位置(一组主机的hostname)
public abstract String[] getLocations() throws IOException, InterruptedException;
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
}
FileSplit
getPath(): 获取包含split的文件路径getStart(): 获取待处理文件的第一个字节所在的位置getLength(): 获取待处理文件字节总数(文件长度)
CombineFileSplilt
InputFormat
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputFormat<K, V> {
//获取job任务所有输入文件集合的逻辑分片(split)
//逻辑分片(split) != 物理块(chunk)
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException;
//对于每一个给定的split创建一个record reader
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context)
throws IOException, InterruptedException;
}
jobClient.getSplits(): 计算split的数量(分片数)mapTask任务把输入的split^1传给
InputFormat.createRecordReader()方法, 从而获取这个分片的RecordReader.RecordReader就是记录records的迭代器[^2]
map任务用一个RecordReader来生成记录的键值对, 然后传递给
Mapper.map()函数
FileInputFormat
- 定义了哪些文件包含在一个作业的输入中
- 为输入文件生成分片, 把split分割成record的任务交给其子类完成
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
public static final String INPUT_DIR = "mapreduce.input.fileinputformat.inputdir";
public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";
public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";
public static final String PATHFILTER_CLASS = "mapreduce.input.pathFilter.class";
public static final String NUM_INPUT_FILES = "mapreduce.input.fileinputformat.numinputfiles";
public static final String INPUT_DIR_RECURSIVE = "mapreduce.input.fileinputformat.input.dir.recursive";
public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS = "mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs";
public static final String LIST_STATUS_NUM_THREADS = "mapreduce.input.fileinputformat.list-status.num-threads";
public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
private static final Logger LOG = LoggerFactory.getLogger(FileInputFormat.class);
private static final double SPLIT_SLOP = 1.1; // 10% slop
@Deprecated
public enum Counter {
BYTES_READ
}
private static final PathFilter hiddenFileFilter = new PathFilter(){
public boolean accept(Path p){
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
};
private static class MultiPathFilter implements PathFilter {
private List<PathFilter> filters;
public MultiPathFilter(List<PathFilter> filters) {
this.filters = filters;
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
}
//设置递归地处理文件夹
public static void setInputDirRecursive(Job job, boolean inputDirRecursive) {
job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, inputDirRecursive);
}
public static boolean getInputDirRecursive(JobContext job) {
return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE, false);
}
protected long getFormatMinSplitSize() {
return 1;
}
//文件是否切分, 即是否一个文件必须作为一个整体处理
protected boolean isSplitable(JobContext context, Path filename) {
return true;
}
//设置文件过滤器, 用以排除特定的文件
public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter) {
job.getConfiguration().setClass(PATHFILTER_CLASS, filter, PathFilter.class);
}
public static void setMinInputSplitSize(Job job, long size) {
job.getConfiguration().setLong(SPLIT_MINSIZE, size);
}
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
public static void setMaxInputSplitSize(Job job, long size) {
job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
}
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
}
public static PathFilter getInputPathFilter(JobContext context) {
Configuration conf = context.getConfiguration();
Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null, PathFilter.class);
return (filterClass != null) ? (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
}
protected List<FileStatus> listStatus(JobContext job) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration());
boolean recursive = getInputDirRecursive(job);
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = null;
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, DEFAULT_LIST_STATUS_NUM_THREADS);
StopWatch sw = new StopWatch().start();
if (numThreads == 1) {
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw (IOException)
new InterruptedIOException(
"Interrupted while getting file statuses")
.initCause(e);
}
result = Lists.newArrayList(locatedFiles);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: " + sw.now(TimeUnit.MILLISECONDS));
}
LOG.info("Total input files to process : " + result.size());
return result;
}
private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs, PathFilter inputFilter, boolean recursive) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
for (int i=0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter =
fs.listLocatedStatus(globStat.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
} else {
result.add(stat);
}
}
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
return result;
}
protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
}
//产生FileSplit切片
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
return new FileSplit(file, start, length, hosts);
}
protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
return new FileSplit(file, start, length, hosts, inMemoryHosts);
}
//files->split的方法
//FileInputFormat的getSplits()方法决定: 1个file <-> 1个split
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
boolean ignoreDirs = !getInputDirRecursive(job) && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
}
}
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
protected int getBlockIndex(BlockLocation[] blkLocations,
long offset) {
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
return i;
}
}
BlockLocation last = blkLocations[blkLocations.length -1];
long fileLength = last.getOffset() + last.getLength() -1;
throw new IllegalArgumentException("Offset " + offset +
" is outside of file (0.." +
fileLength + ")");
}
public static void setInputPaths(Job job,
String commaSeparatedPaths
) throws IOException {
setInputPaths(job, StringUtils.stringToPath(
getPathStrings(commaSeparatedPaths)));
}
public static void addInputPaths(Job job,
String commaSeparatedPaths
) throws IOException {
for (String str : getPathStrings(commaSeparatedPaths)) {
addInputPath(job, new Path(str));
}
}
public static void setInputPaths(Job job,
Path... inputPaths) throws IOException {
Configuration conf = job.getConfiguration();
Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
for(int i = 1; i < inputPaths.length;i++) {
str.append(StringUtils.COMMA_STR);
path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
str.append(StringUtils.escapeString(path.toString()));
}
conf.set(INPUT_DIR, str.toString());
}
public static void addInputPath(Job job,
Path path) throws IOException {
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}
// This method escapes commas in the glob pattern of the given paths.
private static String[] getPathStrings(String commaSeparatedPaths) {
int length = commaSeparatedPaths.length();
int curlyOpen = 0;
int pathStart = 0;
boolean globPattern = false;
List<String> pathStrings = new ArrayList<String>();
for (int i=0; i<length; i++) {
char ch = commaSeparatedPaths.charAt(i);
switch(ch) {
case '{' : {
curlyOpen++;
if (!globPattern) {
globPattern = true;
}
break;
}
case '}' : {
curlyOpen--;
if (curlyOpen == 0 && globPattern) {
globPattern = false;
}
break;
}
case ',' : {
if (!globPattern) {
pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
pathStart = i + 1 ;
}
break;
}
default:
continue; // nothing special to do for this character
}
}
pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
return pathStrings.toArray(new String[0]);
}
public static Path[] getInputPaths(JobContext context) {
String dirs = context.getConfiguration().get(INPUT_DIR, "");
String [] list = StringUtils.split(dirs);
Path[] result = new Path[list.length];
for (int i = 0; i < list.length; i++) {
result[i] = new Path(StringUtils.unEscapeString(list[i]));
}
return result;
}
}
Mapper
模板方法设计模式
流程控制方法
run()是唯一在外部被调用的方法具体操作方法
setup()map(): 从输入的Rclearnup()
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context) throws IOException, InterruptedException {
}
//对于某一个具体的MapReduce程序, 应该重载map()方法
//默认实现相当于将RecordReader中的key,value不做任何处理
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value, Context context)
throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
protected void cleanup(Context context) throws IOException, InterruptedException {
}
//每个mapTask执行一次
public void run(Context context) throws IOException, InterruptedException {
//每个split执行一次
setup(context);
try {
//每条record执行一次Mapper.map()方法
while (context.nextKeyValue()) {
//context.getCurrentKey(): RecordReader中设置的key
//context.getCurrentValue(): RecordReader中设置的value
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
[^2]:1个split = 多个record