2323import org .apache .paimon .compact .CompactManager ;
2424import org .apache .paimon .compression .CompressOptions ;
2525import org .apache .paimon .data .InternalRow ;
26- import org .apache .paimon .data .serializer .InternalRowSerializer ;
2726import org .apache .paimon .disk .IOManager ;
2827import org .apache .paimon .disk .RowBuffer ;
2928import org .apache .paimon .fileindex .FileIndexOptions ;
4544import org .apache .paimon .utils .BatchRecordWriter ;
4645import org .apache .paimon .utils .CommitIncrement ;
4746import org .apache .paimon .utils .IOFunction ;
48- import org .apache .paimon .utils .IOUtils ;
4947import org .apache .paimon .utils .LongCounter ;
5048import org .apache .paimon .utils .Preconditions ;
5149import org .apache .paimon .utils .RecordWriter ;
50+ import org .apache .paimon .utils .SinkWriter ;
51+ import org .apache .paimon .utils .SinkWriter .BufferedSinkWriter ;
52+ import org .apache .paimon .utils .SinkWriter .DirectSinkWriter ;
5253
5354import javax .annotation .Nullable ;
5455
55- import java .io .IOException ;
5656import java .util .ArrayList ;
5757import java .util .Collection ;
5858import java .util .Collections ;
@@ -72,25 +72,25 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner {
7272 private final RowType writeSchema ;
7373 private final DataFilePathFactory pathFactory ;
7474 private final CompactManager compactManager ;
75- private final IOFunction <List <DataFileMeta >, RecordReaderIterator <InternalRow >> bucketFileRead ;
75+ private final IOFunction <List <DataFileMeta >, RecordReaderIterator <InternalRow >> dataFileRead ;
7676 private final boolean forceCompact ;
7777 private final boolean asyncFileWrite ;
7878 private final boolean statsDenseStore ;
7979 private final List <DataFileMeta > newFiles ;
8080 private final List <DataFileMeta > deletedFiles ;
8181 private final List <DataFileMeta > compactBefore ;
8282 private final List <DataFileMeta > compactAfter ;
83- @ Nullable private CompactDeletionFile compactDeletionFile ;
8483 private final LongCounter seqNumCounter ;
8584 private final String fileCompression ;
8685 private final CompressOptions spillCompression ;
87- private SinkWriter sinkWriter ;
8886 private final SimpleColStatsCollector .Factory [] statsCollectors ;
8987 @ Nullable private final IOManager ioManager ;
9088 private final FileIndexOptions fileIndexOptions ;
89+ private final MemorySize maxDiskSize ;
9190
91+ @ Nullable private CompactDeletionFile compactDeletionFile ;
92+ private SinkWriter <InternalRow > sinkWriter ;
9293 private MemorySegmentPool memorySegmentPool ;
93- private final MemorySize maxDiskSize ;
9494
9595 public AppendOnlyWriter (
9696 FileIO fileIO ,
@@ -101,7 +101,7 @@ public AppendOnlyWriter(
101101 RowType writeSchema ,
102102 long maxSequenceNumber ,
103103 CompactManager compactManager ,
104- IOFunction <List <DataFileMeta >, RecordReaderIterator <InternalRow >> bucketFileRead ,
104+ IOFunction <List <DataFileMeta >, RecordReaderIterator <InternalRow >> dataFileRead ,
105105 boolean forceCompact ,
106106 DataFilePathFactory pathFactory ,
107107 @ Nullable CommitIncrement increment ,
@@ -121,7 +121,7 @@ public AppendOnlyWriter(
121121 this .writeSchema = writeSchema ;
122122 this .pathFactory = pathFactory ;
123123 this .compactManager = compactManager ;
124- this .bucketFileRead = bucketFileRead ;
124+ this .dataFileRead = dataFileRead ;
125125 this .forceCompact = forceCompact ;
126126 this .asyncFileWrite = asyncFileWrite ;
127127 this .statsDenseStore = statsDenseStore ;
@@ -139,8 +139,8 @@ public AppendOnlyWriter(
139139
140140 this .sinkWriter =
141141 useWriteBuffer
142- ? new BufferedSinkWriter (spillable , maxDiskSize , spillCompression )
143- : new DirectSinkWriter ( );
142+ ? createBufferedSinkWriter (spillable )
143+ : new DirectSinkWriter <>( this :: createRollingRowWriter );
144144
145145 if (increment != null ) {
146146 newFiles .addAll (increment .newFilesIncrement ().newFiles ());
@@ -151,6 +151,18 @@ public AppendOnlyWriter(
151151 }
152152 }
153153
154+ private BufferedSinkWriter <InternalRow > createBufferedSinkWriter (boolean spillable ) {
155+ return new BufferedSinkWriter <>(
156+ this ::createRollingRowWriter ,
157+ t -> t ,
158+ t -> t ,
159+ ioManager ,
160+ writeSchema ,
161+ spillable ,
162+ maxDiskSize ,
163+ spillCompression );
164+ }
165+
154166 @ Override
155167 public void write (InternalRow rowData ) throws Exception {
156168 Preconditions .checkArgument (
@@ -178,7 +190,7 @@ public void writeBundle(BundleRecords bundle) throws Exception {
178190 write (row );
179191 }
180192 } else {
181- ((DirectSinkWriter ) sinkWriter ).writeBundle (bundle );
193+ ((DirectSinkWriter <?> ) sinkWriter ).writeBundle (bundle );
182194 }
183195 }
184196
@@ -252,16 +264,16 @@ public void close() throws Exception {
252264 }
253265
254266 public void toBufferedWriter () throws Exception {
255- if (sinkWriter != null && !sinkWriter .bufferSpillableWriter () && bucketFileRead != null ) {
267+ if (sinkWriter != null && !sinkWriter .bufferSpillableWriter () && dataFileRead != null ) {
256268 // fetch the written results
257269 List <DataFileMeta > files = sinkWriter .flush ();
258270
259271 sinkWriter .close ();
260- sinkWriter = new BufferedSinkWriter (true , maxDiskSize , spillCompression );
272+ sinkWriter = createBufferedSinkWriter (true );
261273 sinkWriter .setMemoryPool (memorySegmentPool );
262274
263275 // rewrite small files
264- try (RecordReaderIterator <InternalRow > reader = bucketFileRead .apply (files )) {
276+ try (RecordReaderIterator <InternalRow > reader = dataFileRead .apply (files )) {
265277 while (reader .hasNext ()) {
266278 sinkWriter .write (reader .next ());
267279 }
@@ -356,7 +368,7 @@ public void flushMemory() throws Exception {
356368 @ VisibleForTesting
357369 public RowBuffer getWriteBuffer () {
358370 if (sinkWriter instanceof BufferedSinkWriter ) {
359- return ((BufferedSinkWriter ) sinkWriter ).writeBuffer ;
371+ return ((BufferedSinkWriter <?> ) sinkWriter ).rowBuffer () ;
360372 } else {
361373 return null ;
362374 }
@@ -366,176 +378,4 @@ public RowBuffer getWriteBuffer() {
366378 List <DataFileMeta > getNewFiles () {
367379 return newFiles ;
368380 }
369-
370- /** Internal interface to Sink Data from input. */
371- private interface SinkWriter {
372-
373- boolean write (InternalRow data ) throws IOException ;
374-
375- List <DataFileMeta > flush () throws IOException ;
376-
377- boolean flushMemory () throws IOException ;
378-
379- long memoryOccupancy ();
380-
381- void close ();
382-
383- void setMemoryPool (MemorySegmentPool memoryPool );
384-
385- boolean bufferSpillableWriter ();
386- }
387-
388- /**
389- * Directly sink data to file, no memory cache here, use OrcWriter/ParquetWrite/etc directly
390- * write data. May cause out-of-memory.
391- */
392- private class DirectSinkWriter implements SinkWriter {
393-
394- private RowDataRollingFileWriter writer ;
395-
396- @ Override
397- public boolean write (InternalRow data ) throws IOException {
398- if (writer == null ) {
399- writer = createRollingRowWriter ();
400- }
401- writer .write (data );
402- return true ;
403- }
404-
405- public void writeBundle (BundleRecords bundle ) throws IOException {
406- if (writer == null ) {
407- writer = createRollingRowWriter ();
408- }
409- writer .writeBundle (bundle );
410- }
411-
412- @ Override
413- public List <DataFileMeta > flush () throws IOException {
414- List <DataFileMeta > flushedFiles = new ArrayList <>();
415- if (writer != null ) {
416- writer .close ();
417- flushedFiles .addAll (writer .result ());
418- writer = null ;
419- }
420- return flushedFiles ;
421- }
422-
423- @ Override
424- public boolean flushMemory () throws IOException {
425- return false ;
426- }
427-
428- @ Override
429- public long memoryOccupancy () {
430- return 0 ;
431- }
432-
433- @ Override
434- public void close () {
435- if (writer != null ) {
436- writer .abort ();
437- writer = null ;
438- }
439- }
440-
441- @ Override
442- public void setMemoryPool (MemorySegmentPool memoryPool ) {
443- // do nothing
444- }
445-
446- @ Override
447- public boolean bufferSpillableWriter () {
448- return false ;
449- }
450- }
451-
452- /**
453- * Use buffered writer, segment pooled from segment pool. When spillable, may delay checkpoint
454- * acknowledge time. When non-spillable, may cause too many small files.
455- */
456- private class BufferedSinkWriter implements SinkWriter {
457-
458- private final boolean spillable ;
459-
460- private final MemorySize maxDiskSize ;
461-
462- private final CompressOptions compression ;
463-
464- private RowBuffer writeBuffer ;
465-
466- private BufferedSinkWriter (
467- boolean spillable , MemorySize maxDiskSize , CompressOptions compression ) {
468- this .spillable = spillable ;
469- this .maxDiskSize = maxDiskSize ;
470- this .compression = compression ;
471- }
472-
473- @ Override
474- public boolean write (InternalRow data ) throws IOException {
475- return writeBuffer .put (data );
476- }
477-
478- @ Override
479- public List <DataFileMeta > flush () throws IOException {
480- List <DataFileMeta > flushedFiles = new ArrayList <>();
481- if (writeBuffer != null ) {
482- writeBuffer .complete ();
483- RowDataRollingFileWriter writer = createRollingRowWriter ();
484- IOException exception = null ;
485- try (RowBuffer .RowBufferIterator iterator = writeBuffer .newIterator ()) {
486- while (iterator .advanceNext ()) {
487- writer .write (iterator .getRow ());
488- }
489- } catch (IOException e ) {
490- exception = e ;
491- } finally {
492- if (exception != null ) {
493- IOUtils .closeQuietly (writer );
494- // cleanup code that might throw another exception
495- throw exception ;
496- }
497- writer .close ();
498- }
499- flushedFiles .addAll (writer .result ());
500- // reuse writeBuffer
501- writeBuffer .reset ();
502- }
503- return flushedFiles ;
504- }
505-
506- @ Override
507- public long memoryOccupancy () {
508- return writeBuffer .memoryOccupancy ();
509- }
510-
511- @ Override
512- public void close () {
513- if (writeBuffer != null ) {
514- writeBuffer .reset ();
515- writeBuffer = null ;
516- }
517- }
518-
519- @ Override
520- public void setMemoryPool (MemorySegmentPool memoryPool ) {
521- writeBuffer =
522- RowBuffer .getBuffer (
523- ioManager ,
524- memoryPool ,
525- new InternalRowSerializer (writeSchema ),
526- spillable ,
527- maxDiskSize ,
528- compression );
529- }
530-
531- @ Override
532- public boolean bufferSpillableWriter () {
533- return spillable ;
534- }
535-
536- @ Override
537- public boolean flushMemory () throws IOException {
538- return writeBuffer .flushMemory ();
539- }
540- }
541381}
0 commit comments