@@ -215,12 +215,12 @@ protected void processPath(PathData item) throws IOException {
215
215
}
216
216
217
217
protected class TextRecordInputStream extends InputStream {
218
- SequenceFile .Reader r ;
219
- Object key ;
220
- Object val ;
218
+ private final SequenceFile .Reader r ;
219
+ private Object key ;
220
+ private Object val ;
221
221
222
- DataInputBuffer inbuf ;
223
- DataOutputBuffer outbuf ;
222
+ private final DataInputBuffer inbuf ;
223
+ private final DataOutputBuffer outbuf ;
224
224
225
225
public TextRecordInputStream (FileStatus f ) throws IOException {
226
226
final Path fpath = f .getPath ();
@@ -237,30 +237,67 @@ public TextRecordInputStream(FileStatus f) throws IOException {
237
237
public int read () throws IOException {
238
238
int ret ;
239
239
if (null == inbuf || -1 == (ret = inbuf .read ())) {
240
- key = r .next (key );
241
- if (key == null ) {
242
- return -1 ;
240
+ if (!readNextFromSequenceFile ()) {
241
+ ret = -1 ;
243
242
} else {
244
- val = r . getCurrentValue ( val );
243
+ ret = inbuf . read ( );
245
244
}
246
- byte [] tmp = key .toString ().getBytes (StandardCharsets .UTF_8 );
247
- outbuf .write (tmp , 0 , tmp .length );
248
- outbuf .write ('\t' );
249
- tmp = val .toString ().getBytes (StandardCharsets .UTF_8 );
250
- outbuf .write (tmp , 0 , tmp .length );
251
- outbuf .write ('\n' );
252
- inbuf .reset (outbuf .getData (), outbuf .getLength ());
253
- outbuf .reset ();
254
- ret = inbuf .read ();
255
245
}
256
246
return ret ;
257
247
}
258
248
249
+ @ Override
250
+ public int read (byte [] dest , int destPos , int destLen ) throws IOException {
251
+ validateInputStreamReadArguments (dest , destPos , destLen );
252
+
253
+ if (destLen == 0 ) {
254
+ return 0 ;
255
+ }
256
+
257
+ int bytesRead = 0 ;
258
+ while (destLen > 0 ) {
259
+ // Attempt to copy buffered data.
260
+ int copyLen = inbuf .read (dest , destPos , destLen );
261
+ if (-1 == copyLen ) {
262
+ // There was no buffered data.
263
+ if (!readNextFromSequenceFile ()) {
264
+ // There is also no data remaining in the file.
265
+ break ;
266
+ }
267
+ // Reattempt copy now that we have buffered data.
268
+ copyLen = inbuf .read (dest , destPos , destLen );
269
+ }
270
+ bytesRead += copyLen ;
271
+ destPos += copyLen ;
272
+ destLen -= copyLen ;
273
+ }
274
+
275
+ return bytesRead > 0 ? bytesRead : -1 ;
276
+ }
277
+
259
278
@ Override
260
279
public void close () throws IOException {
261
280
r .close ();
262
281
super .close ();
263
282
}
283
+
284
+ private boolean readNextFromSequenceFile () throws IOException {
285
+ key = r .next (key );
286
+ if (key == null ) {
287
+ return false ;
288
+ } else {
289
+ val = r .getCurrentValue (val );
290
+ }
291
+ byte [] tmp = key .toString ().getBytes (StandardCharsets .UTF_8 );
292
+ outbuf .write (tmp , 0 , tmp .length );
293
+ outbuf .write ('\t' );
294
+ tmp = val .toString ().getBytes (StandardCharsets .UTF_8 );
295
+ outbuf .write (tmp , 0 , tmp .length );
296
+ outbuf .write ('\n' );
297
+ inbuf .reset (outbuf .getData (), outbuf .getLength ());
298
+ outbuf .reset ();
299
+ return true ;
300
+ }
264
301
}
265
302
266
303
/**
@@ -270,10 +307,11 @@ public void close() throws IOException {
270
307
protected static class AvroFileInputStream extends InputStream {
271
308
private int pos ;
272
309
private byte [] buffer ;
273
- private ByteArrayOutputStream output ;
274
- private FileReader <?> fileReader ;
275
- private DatumWriter <Object > writer ;
276
- private JsonEncoder encoder ;
310
+ private final ByteArrayOutputStream output ;
311
+ private final FileReader <?> fileReader ;
312
+ private final DatumWriter <Object > writer ;
313
+ private final JsonEncoder encoder ;
314
+ private final byte [] finalSeparator ;
277
315
278
316
public AvroFileInputStream (FileStatus status ) throws IOException {
279
317
pos = 0 ;
@@ -286,31 +324,96 @@ public AvroFileInputStream(FileStatus status) throws IOException {
286
324
writer = new GenericDatumWriter <Object >(schema );
287
325
output = new ByteArrayOutputStream ();
288
326
encoder = EncoderFactory .get ().jsonEncoder (schema , output );
327
+ finalSeparator = System .getProperty ("line.separator" ).getBytes (StandardCharsets .UTF_8 );
289
328
}
290
329
291
330
/**
292
331
* Read a single byte from the stream.
293
332
*/
294
333
@ Override
295
334
public int read () throws IOException {
335
+ if (buffer == null ) {
336
+ return -1 ;
337
+ }
338
+
296
339
if (pos < buffer .length ) {
297
340
return buffer [pos ++];
298
341
}
342
+
299
343
if (!fileReader .hasNext ()) {
344
+ // Unset buffer to signal EOF on future calls.
345
+ buffer = null ;
300
346
return -1 ;
301
347
}
348
+
302
349
writer .write (fileReader .next (), encoder );
303
350
encoder .flush ();
351
+
304
352
if (!fileReader .hasNext ()) {
305
- // Write a new line after the last Avro record.
306
- output .write (System .getProperty ("line.separator" )
307
- .getBytes (StandardCharsets .UTF_8 ));
308
- output .flush ();
353
+ if (buffer .length > 0 ) {
354
+ // Write a new line after the last Avro record.
355
+ output .write (finalSeparator );
356
+ output .flush ();
357
+ }
309
358
}
359
+
360
+ swapBuffer ();
361
+ return read ();
362
+ }
363
+
364
+ @ Override
365
+ public int read (byte [] dest , int destPos , int destLen ) throws IOException {
366
+ validateInputStreamReadArguments (dest , destPos , destLen );
367
+
368
+ if (destLen == 0 ) {
369
+ return 0 ;
370
+ }
371
+
372
+ if (buffer == null ) {
373
+ return -1 ;
374
+ }
375
+
376
+ int bytesRead = 0 ;
377
+ while (destLen > 0 && buffer != null ) {
378
+ if (pos < buffer .length ) {
379
+ // We have buffered data available, either from the Avro file or the final separator.
380
+ int copyLen = Math .min (buffer .length - pos , destLen );
381
+ System .arraycopy (buffer , pos , dest , destPos , copyLen );
382
+ pos += copyLen ;
383
+ bytesRead += copyLen ;
384
+ destPos += copyLen ;
385
+ destLen -= copyLen ;
386
+ } else if (buffer == finalSeparator ) {
387
+ // There is no buffered data, and the last buffer processed was the final separator.
388
+ // Unset buffer to signal EOF on future calls.
389
+ buffer = null ;
390
+ } else if (!fileReader .hasNext ()) {
391
+ if (buffer .length > 0 ) {
392
+ // There is no data remaining in the file. Get ready to write the final separator on
393
+ // the next iteration.
394
+ buffer = finalSeparator ;
395
+ pos = 0 ;
396
+ } else {
397
+ // We never read data into the buffer. This must be an empty file.
398
+ // Immediate EOF, no separator needed.
399
+ buffer = null ;
400
+ return -1 ;
401
+ }
402
+ } else {
403
+ // Read the next data from the file into the buffer.
404
+ writer .write (fileReader .next (), encoder );
405
+ encoder .flush ();
406
+ swapBuffer ();
407
+ }
408
+ }
409
+
410
+ return bytesRead ;
411
+ }
412
+
413
+ private void swapBuffer () {
310
414
pos = 0 ;
311
415
buffer = output .toByteArray ();
312
416
output .reset ();
313
- return read ();
314
417
}
315
418
316
419
/**
@@ -323,4 +426,14 @@ public void close() throws IOException {
323
426
super .close ();
324
427
}
325
428
}
429
+
430
+ private static void validateInputStreamReadArguments (byte [] dest , int destPos , int destLen )
431
+ throws IOException {
432
+ if (dest == null ) {
433
+ throw new NullPointerException ("null destination buffer" );
434
+ } else if (destPos < 0 || destLen < 0 || destLen > dest .length - destPos ) {
435
+ throw new IndexOutOfBoundsException (String .format (
436
+ "invalid destination buffer range: destPos = %d, destLen = %d" , destPos , destLen ));
437
+ }
438
+ }
326
439
}
0 commit comments