Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 89 additions & 27 deletions src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import com.hadoop.mapreduce.LzoIndexOutputFormat;
import com.hadoop.mapreduce.LzoSplitInputFormat;
import com.hadoop.mapreduce.LzoSplitRecordReader;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -25,57 +27,112 @@

public class DistributedLzoIndexer extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class);
private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension();

private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension();

private final String SKIP_INDEXING_SMALL_FILES_KEY = "skip_indexing_small_files";
private final String RECURSIVE_INDEXING_KEY = "recursive_indexing";
private final boolean SKIP_INDEXING_SMALL_FILES_DEFAULT = false;
private final boolean RECURSIVE_INDEXING_DEFAULT = true;
private boolean skipIndexingSmallFiles = this.SKIP_INDEXING_SMALL_FILES_DEFAULT;
private boolean recursiveIndexing = this.RECURSIVE_INDEXING_DEFAULT;

private Configuration conf = getConf();

/**
* Accepts paths not ending in /_temporary.
*/
private final PathFilter nonTemporaryFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return !path.toString().endsWith("/_temporary");
}
};

private void walkPath(Path path, PathFilter pathFilter, List<Path> accumulator) {
/**
* Accepts paths pointing to files with length greater than a block size.
*/
private final PathFilter bigFileFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
FileStatus status;
try {
FileSystem fs = path.getFileSystem(getConf());
status = fs.getFileStatus(path);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% happy with having to query the file status again. I know the PathFilter API doesn't help you there. I think it's perfectly fine to have a simple helper method here rather than using the PathFilter API.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a PathFilter here because I was attempting to be consistent with the nonTemporary PathFilter use. However, since the bigFileFilter isn't passed to a listStatus() call, a simple helper method does seem like it'd be a better choice.

} catch (IOException e) {
LOG.info("Unable to get status of path " + path);
return false;
}
return status.getLen() >= status.getBlockSize() ? true : false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too restrictive. With high compression levels and a large FS block, you might still want to split the FS block to get a spill-less mapper. I would make the threshold configurable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Does a default value (if user's config has a bunk value like "abc") of block size make sense?

}
};

private void visitPath(Path path, PathFilter pathFilter, List<Path> accumulator, boolean recursive) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have lzoRecursiveIndexing as a member variable, we don't need argument "recursive" for this method.

try {
FileSystem fs = path.getFileSystem(getConf());
FileSystem fs = path.getFileSystem(this.conf);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the style of this code does not use the "this." notation. Can we remove the "this" reference from the changes?

FileStatus fileStatus = fs.getFileStatus(path);

if (fileStatus.isDir()) {
FileStatus[] children = fs.listStatus(path, pathFilter);
for (FileStatus childStatus : children) {
walkPath(childStatus.getPath(), pathFilter, accumulator);
}
} else if (path.toString().endsWith(LZO_EXTENSION)) {
Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX);
if (fs.exists(lzoIndexPath)) {
// If the index exists and is of nonzero size, we're already done.
// We re-index a file with a zero-length index, because every file has at least one block.
if (fs.getFileStatus(lzoIndexPath).getLen() > 0) {
LOG.info("[SKIP] LZO index file already exists for " + path);
return;
} else {
LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)");
accumulator.add(path);
if (fileStatus.isDirectory()) {
if (recursive) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires a discussion. This behavior of "recursive" is a little surprising to me. If I'm not mistaken (from things like FileSystem API), recursive=false means still processing direct file children of the directory but not traversing into subdirectories. But this patch would skip the directory entirely.

I think it may be good to be consistent and consider direct file children still. Thoughts? Either way, I think we need some comments in the code to clarify what it means.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. We should try to be consistent with hadoop's definition of recursive and still process depth = 1 levels of child files.

FileStatus[] children = fs.listStatus(path, pathFilter);
for (FileStatus childStatus : children) {
visitPath(childStatus.getPath(), pathFilter, accumulator, recursive);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the recursive case, I think we are calling Path.getFileSystem() and getFileStatus() redundantly. I know this is carried over from the original, but I think it's a little wasteful. How about something like this?

private void visitPath(FileStatus fileStatus, FileSystem fs, PathFilter pathFilter, List<Path> accumulator) {
  ...
  if (fileStatus.isDirectory()) {
    if (recursive) {
      FileStatus[] children = fs.listStatus(fileStatus.getPath(), pathFilter);
      for (FileStatus childStatus : children) {
        visitPath(childStatus, fs, pathFilter, accumulator);
    ...
}

}
} else {
// If no index exists, we need to index the file.
LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)");
accumulator.add(path);
LOG.info("[SKIP] Path " + path + " is a directory and recursion is not enabled.");
}
} else if (shouldIndexPath(path, fs)) {
accumulator.add(path);
}
} catch (IOException ioe) {
LOG.warn("Error walking path: " + path, ioe);
}
}

private boolean shouldIndexPath(Path path, FileSystem fs) throws IOException {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this API with FileStatus instead of Path in order to avoid redundant RPC for getFileStatus. We already received FileStatus from listStatus.

if (path.toString().endsWith(LZO_EXTENSION)) {
if (this.skipIndexingSmallFiles && !this.bigFileFilter.accept(path)) {
LOG.info("[SKIP] Skip indexing small files enabled and " + path + " is too small");
return false;
}

Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX);
if (fs.exists(lzoIndexPath)) {
// If the index exists and is of nonzero size, we're already done.
// We re-index a file with a zero-length index, because every file has at least one block.
if (fs.getFileStatus(path).getLen() > 0) {
LOG.info("[SKIP] LZO index file already exists for " + path);
return false;
} else {
LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)");
return true;
}
} else {
// If no index exists, we need to index the file.
LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)");
return true;
}
}
return false;
}

public int run(String[] args) throws Exception {
if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) {
printUsage();
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}

this.conf = getConf();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this redundant assignment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remnant of the previous approach where the Configuration wasn't set at class instantiation, will remove in next commit.


this.skipIndexingSmallFiles =
this.conf.getBoolean(SKIP_INDEXING_SMALL_FILES_KEY, this.SKIP_INDEXING_SMALL_FILES_DEFAULT);

// Find paths to index based on recursive/not
this.recursiveIndexing = this.conf.getBoolean(RECURSIVE_INDEXING_KEY, this.RECURSIVE_INDEXING_DEFAULT);
List<Path> inputPaths = new ArrayList<Path>();
for (String strPath: args) {
walkPath(new Path(strPath), nonTemporaryFilter, inputPaths);
for (String strPath : args) {
visitPath(new Path(strPath), nonTemporaryFilter, inputPaths, this.recursiveIndexing);
}

if (inputPaths.isEmpty()) {
Expand All @@ -84,7 +141,7 @@ public int run(String[] args) throws Exception {
return 0;
}

Job job = new Job(getConf());
Job job = new Job(this.conf);
job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args));

job.setOutputKeyClass(Path.class);
Expand Down Expand Up @@ -134,7 +191,12 @@ public static void main(String[] args) throws Exception {
System.exit(exitCode);
}

public static void printUsage() {
System.err.println("Usage: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]");
public void printUsage() {
String usage =
"Command: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]" +
"\nConfiguration options: [values] <default> description" +
"\n" + this.SKIP_INDEXING_SMALL_FILES_KEY + " [true,false] <" + this.SKIP_INDEXING_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than a block in size." +
"\n" + this.RECURSIVE_INDEXING_KEY + " [true,false] <" + this.RECURSIVE_INDEXING_DEFAULT + "> Look for files to index recursively from paths on command line.";
System.err.println(usage);
}
}