Skip to content

Refactoring of sequence file iterator. #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 20 additions & 71 deletions src/edu/jhu/thrax/lexprob/SequenceFileLexprobTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Iterator;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;

import edu.jhu.thrax.util.ChainedIterators;

/**
* A base class for lexical probability tables that will be read from a Hadoop sequence file that is
* held on disk. This class serves to hide all the horrible Hadoop filesystem plumbing from more
Expand All @@ -30,14 +32,15 @@ public SequenceFileLexprobTable(Configuration conf, String fileGlob) throws IOEx
fs = FileSystem.get(uri, conf);
files = fs.globStatus(new Path(fileGlob));
if (files.length == 0) throw new IOException("no files found in lexprob glob:" + fileGlob);
Arrays.sort(files); // some implementations (like local FS) don't return a sorted list of files
}

protected abstract void initialize(Iterable<TableEntry> entries);

public abstract float get(int car, int cdr);

public abstract boolean contains(int car, int cdr);

/**
* Return an Iterable that will range over all the entries in a series of globbed files.
*
Expand All @@ -46,77 +49,23 @@ public SequenceFileLexprobTable(Configuration conf, String fileGlob) throws IOEx
* @param files an array of FileStatus from getGlobStatus
* @return an Iterable over all entries in all files in the files glob
*/
protected static Iterable<TableEntry> getSequenceFileIterator(FileSystem theFS,
protected static Iterable<TableEntry> getSequenceFileIterator(FileSystem fs,
Configuration conf, FileStatus[] files) {
final LongWritable pair = new LongWritable();
final FloatWritable d = new FloatWritable(0.0f);
final FileStatus[] theFiles = files;
final Configuration theConf = conf;
final FileSystem fs = theFS;

final Iterator<TableEntry> iterator = new Iterator<TableEntry>() {
int fileIndex = 0;
TableEntry lookahead = null;
SequenceFile.Reader reader = null;

public boolean hasNext() {
try {
// if we've already peeked at the next entry, it can be
// returned
if (lookahead != null) return true;
// if the reader is null, we haven't looked at a single
// file yet, so set the reader to read the first file
if (reader == null) reader = new SequenceFile.Reader(fs, theFiles[0].getPath(), theConf);
// reader is not null here, so try to read an entry
boolean gotNew = reader.next(pair, d);
if (gotNew) {
// there was something to read
lookahead = new TableEntry(pair, d);
return true;
}
fileIndex++;
// else, move to the next file
// but if there are no more, return false
if (fileIndex >= theFiles.length) return false;
reader.close();
reader = new SequenceFile.Reader(fs, theFiles[fileIndex].getPath(), theConf);
// new file, so try again
gotNew = reader.next(pair, d);
if (gotNew) {
lookahead = new TableEntry(pair, d);
return true;
}
return false;
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}

public TableEntry next() {
try {
// return the lookahead, if possible
if (lookahead != null) {
TableEntry val = lookahead;
lookahead = null;
return val;
}
boolean gotNew = reader.next(pair, d);
if (gotNew)
return new TableEntry(pair, d);
else
return null;
} catch (IOException e) {
throw new IllegalArgumentException();
}
}

public void remove() {
throw new UnsupportedOperationException();
}
};
return new Iterable<TableEntry>() {

@Override
public Iterator<TableEntry> iterator() {
return iterator;
Iterator<? extends Iterator<TableEntry>> fileIterators = Arrays.asList(files).stream()
.map(file -> {
try {
return new SequenceFile.Reader(fs, file.getPath(), conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(seqFile -> new SequenceFileTableEntryIterator(seqFile))
.collect(Collectors.toList()).iterator();
return new ChainedIterators<TableEntry>(fileIterators);
}
};
}
Expand Down
67 changes: 67 additions & 0 deletions src/edu/jhu/thrax/lexprob/SequenceFileTableEntryIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package edu.jhu.thrax.lexprob;

import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;

public class SequenceFileTableEntryIterator implements Iterator<TableEntry> {

private final SequenceFile.Reader reader;

private final LongWritable pair = new LongWritable();
private final FloatWritable d = new FloatWritable(0.0f);

private Optional<TableEntry> lookahead = Optional.empty();
private boolean finishedReading = false;

public SequenceFileTableEntryIterator(SequenceFile.Reader reader) {
this.reader = reader;
}

@Override
public boolean hasNext() {
if (lookahead.isPresent()) {
return true;
}
lookahead = tryReadNext();
if (lookahead.isPresent()) {
return true;
} else {
return false;
}
}

@Override
public TableEntry next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
TableEntry nextEntry = lookahead.get();
lookahead = Optional.empty();
return nextEntry;
}

private Optional<TableEntry> tryReadNext() {
if (finishedReading) {
return Optional.empty();
}
try {
boolean gotNew = reader.next(pair, d);
if (gotNew) {
// there was something to read
return Optional.of(new TableEntry(pair, d));
} else {
finishedReading = true;
return Optional.empty();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
55 changes: 55 additions & 0 deletions src/edu/jhu/thrax/util/ChainedIterators.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package edu.jhu.thrax.util;

import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;

public class ChainedIterators<T> implements Iterator<T> {

private Iterator<? extends Iterator<T>> iteratorOfIterators;
private Iterator<T> currentIterator;
private boolean finished = false;

public ChainedIterators(Iterator<? extends Iterator<T>> iteratorOfIterators) {
this.iteratorOfIterators = iteratorOfIterators;
moveToNextIterator();
}

public ChainedIterators(Collection<? extends Iterator<T>> iteratorOfIterators) {
this.iteratorOfIterators = iteratorOfIterators.iterator();
moveToNextIterator();
}

@Override
public boolean hasNext() {
if (finished) {
return false;
}
if (currentIterator.hasNext()) {
return true;
} else {
moveToNextIterator();
return !finished;
}
}

@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return currentIterator.next();
}

private void moveToNextIterator() {
while (iteratorOfIterators.hasNext()) {
currentIterator = iteratorOfIterators.next();
if (currentIterator.hasNext()) {
finished = false;
return;
}
}
finished = true;
}

}