2
2
3
3
import java .io .IOException ;
4
4
import java .net .URI ;
5
+ import java .util .Arrays ;
5
6
import java .util .Iterator ;
7
+ import java .util .stream .Collectors ;
6
8
7
9
import org .apache .hadoop .conf .Configuration ;
8
10
import org .apache .hadoop .fs .FileStatus ;
9
11
import org .apache .hadoop .fs .FileSystem ;
10
12
import org .apache .hadoop .fs .Path ;
11
- import org .apache .hadoop .io .FloatWritable ;
12
- import org .apache .hadoop .io .LongWritable ;
13
13
import org .apache .hadoop .io .SequenceFile ;
14
14
15
+ import edu .jhu .thrax .util .ChainedIterators ;
16
+
15
17
/**
16
18
* A base class for lexical probability tables that will be read from a Hadoop sequence file that is
17
19
* held on disk. This class serves to hide all the horrible Hadoop filesystem plumbing from more
@@ -30,14 +32,15 @@ public SequenceFileLexprobTable(Configuration conf, String fileGlob) throws IOEx
30
32
fs = FileSystem .get (uri , conf );
31
33
files = fs .globStatus (new Path (fileGlob ));
32
34
if (files .length == 0 ) throw new IOException ("no files found in lexprob glob:" + fileGlob );
35
+ Arrays .sort (files ); // some implementations (like local FS) don't return a sorted list of files
33
36
}
34
37
35
38
protected abstract void initialize (Iterable <TableEntry > entries );
36
39
37
40
public abstract float get (int car , int cdr );
38
41
39
42
public abstract boolean contains (int car , int cdr );
40
-
43
+
41
44
/**
42
45
* Return an Iterable that will range over all the entries in a series of globbed files.
43
46
*
@@ -46,77 +49,23 @@ public SequenceFileLexprobTable(Configuration conf, String fileGlob) throws IOEx
46
49
* @param files an array of FileStatus from getGlobStatus
47
50
* @return an Iterable over all entries in all files in the files glob
48
51
*/
49
- protected static Iterable <TableEntry > getSequenceFileIterator (FileSystem theFS ,
52
+ protected static Iterable <TableEntry > getSequenceFileIterator (FileSystem fs ,
50
53
Configuration conf , FileStatus [] files ) {
51
- final LongWritable pair = new LongWritable ();
52
- final FloatWritable d = new FloatWritable (0.0f );
53
- final FileStatus [] theFiles = files ;
54
- final Configuration theConf = conf ;
55
- final FileSystem fs = theFS ;
56
-
57
- final Iterator <TableEntry > iterator = new Iterator <TableEntry >() {
58
- int fileIndex = 0 ;
59
- TableEntry lookahead = null ;
60
- SequenceFile .Reader reader = null ;
61
-
62
- public boolean hasNext () {
63
- try {
64
- // if we've already peeked at the next entry, it can be
65
- // returned
66
- if (lookahead != null ) return true ;
67
- // if the reader is null, we haven't looked at a single
68
- // file yet, so set the reader to read the first file
69
- if (reader == null ) reader = new SequenceFile .Reader (fs , theFiles [0 ].getPath (), theConf );
70
- // reader is not null here, so try to read an entry
71
- boolean gotNew = reader .next (pair , d );
72
- if (gotNew ) {
73
- // there was something to read
74
- lookahead = new TableEntry (pair , d );
75
- return true ;
76
- }
77
- fileIndex ++;
78
- // else, move to the next file
79
- // but if there are no more, return false
80
- if (fileIndex >= theFiles .length ) return false ;
81
- reader .close ();
82
- reader = new SequenceFile .Reader (fs , theFiles [fileIndex ].getPath (), theConf );
83
- // new file, so try again
84
- gotNew = reader .next (pair , d );
85
- if (gotNew ) {
86
- lookahead = new TableEntry (pair , d );
87
- return true ;
88
- }
89
- return false ;
90
- } catch (IOException e ) {
91
- throw new IllegalArgumentException (e );
92
- }
93
- }
94
-
95
- public TableEntry next () {
96
- try {
97
- // return the lookahead, if possible
98
- if (lookahead != null ) {
99
- TableEntry val = lookahead ;
100
- lookahead = null ;
101
- return val ;
102
- }
103
- boolean gotNew = reader .next (pair , d );
104
- if (gotNew )
105
- return new TableEntry (pair , d );
106
- else
107
- return null ;
108
- } catch (IOException e ) {
109
- throw new IllegalArgumentException ();
110
- }
111
- }
112
-
113
- public void remove () {
114
- throw new UnsupportedOperationException ();
115
- }
116
- };
117
54
return new Iterable <TableEntry >() {
55
+
56
+ @ Override
118
57
public Iterator <TableEntry > iterator () {
119
- return iterator ;
58
+ Iterator <? extends Iterator <TableEntry >> fileIterators = Arrays .asList (files ).stream ()
59
+ .map (file -> {
60
+ try {
61
+ return new SequenceFile .Reader (fs , file .getPath (), conf );
62
+ } catch (IOException e ) {
63
+ throw new RuntimeException (e );
64
+ }
65
+ })
66
+ .map (seqFile -> new SequenceFileTableEntryIterator (seqFile ))
67
+ .collect (Collectors .toList ()).iterator ();
68
+ return new ChainedIterators <TableEntry >(fileIterators );
120
69
}
121
70
};
122
71
}
0 commit comments