Skip to content

Commit 7d3cf8f

Browse files
committed
[core] Add appendFile to RollingFileWriterImpl
1 parent ec15528 commit 7d3cf8f

8 files changed

Lines changed: 178 additions & 0 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.paimon.format;
2020

2121
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.fs.Path;
2224

2325
import javax.annotation.Nullable;
2426

@@ -52,6 +54,17 @@ public interface FormatWriter extends Closeable {
5254
*/
5355
boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException;
5456

57+
/**
58+
* Append raw data from a source file without deserialization. For example, Parquet can append
59+
* row groups directly.
60+
*
61+
* @throws UnsupportedOperationException if the format does not support appending files.
62+
*/
63+
default void appendFile(FileIO fileIO, Path sourcePath) throws IOException {
64+
throw new UnsupportedOperationException(
65+
getClass().getSimpleName() + " does not support appendFile");
66+
}
67+
5568
/**
5669
* Returns format-specific writer metadata that can be used to extract statistics without
5770
* re-reading the file. This is useful for object stores (like OSS/S3) where the file may not be

paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.paimon.format.FileFormat;
2323
import org.apache.paimon.format.SimpleStatsCollector;
2424
import org.apache.paimon.format.avro.AvroFileFormat;
25+
import org.apache.paimon.fs.FileIO;
26+
import org.apache.paimon.fs.Path;
2527
import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
2628
import org.apache.paimon.statistics.SimpleColStatsCollector;
2729
import org.apache.paimon.types.RowType;
@@ -44,6 +46,8 @@ public interface RollingFileWriter<T, R> extends FileWriter<T, List<R>> {
4446

4547
void writeBundle(BundleRecords records) throws IOException;
4648

49+
void appendFile(FileIO fileIO, Path sourcePath, long recordCount) throws IOException;
50+
4751
@VisibleForTesting
4852
static FileWriterContext createFileWriterContext(
4953
FileFormat fileFormat,

paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.paimon.io;
2020

2121
import org.apache.paimon.annotation.VisibleForTesting;
22+
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.fs.Path;
2224
import org.apache.paimon.utils.Preconditions;
2325

2426
import org.slf4j.Logger;
@@ -116,6 +118,31 @@ public void writeBundle(BundleRecords bundle) throws IOException {
116118
}
117119
}
118120

121+
@Override
122+
public void appendFile(FileIO fileIO, Path sourcePath, long sourceRecordCount)
123+
throws IOException {
124+
try {
125+
if (currentWriter == null) {
126+
openCurrentWriter();
127+
}
128+
129+
currentWriter.appendFile(fileIO, sourcePath, sourceRecordCount);
130+
recordCount += sourceRecordCount;
131+
132+
if (rollingFile(true)) {
133+
closeCurrentWriter();
134+
}
135+
} catch (Throwable e) {
136+
LOG.warn(
137+
"Exception occurs when writing file "
138+
+ (currentWriter == null ? null : currentWriter.path())
139+
+ ". Cleaning up.",
140+
e);
141+
abort();
142+
throw e;
143+
}
144+
}
145+
119146
private void openCurrentWriter() {
120147
currentWriter = writerFactory.get();
121148
}

paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,13 @@ public void write(InternalRow row) throws IOException {
8888
seqNumCounter.add(1L);
8989
}
9090

91+
@Override
92+
public void appendFile(FileIO sourceFileIO, Path sourcePath, long sourceRecordCount)
93+
throws IOException {
94+
super.appendFile(sourceFileIO, sourcePath, sourceRecordCount);
95+
seqNumCounter.add(sourceRecordCount);
96+
}
97+
9198
@Override
9299
public void close() throws IOException {
93100
if (dataFileIndexWriter != null) {

paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,22 @@ public long recordCount() {
158158
return recordCount;
159159
}
160160

161+
public void appendFile(FileIO sourceFileIO, Path sourcePath, long sourceRecordCount)
162+
throws IOException {
163+
if (closed) {
164+
throw new RuntimeException("Writer has already closed!");
165+
}
166+
167+
try {
168+
writer.appendFile(sourceFileIO, sourcePath);
169+
recordCount += sourceRecordCount;
170+
} catch (Throwable e) {
171+
LOG.warn("Exception occurs when writing file {}. Cleaning up.", path, e);
172+
abort();
173+
throw e;
174+
}
175+
}
176+
161177
public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
162178
return writer.reachTargetSize(suggestedCheck, targetSize);
163179
}

paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import java.io.File;
4343
import java.io.IOException;
44+
import java.util.List;
4445

4546
import static org.assertj.core.api.Assertions.assertThat;
4647

@@ -139,4 +140,93 @@ public void testStatsDenseStore() throws IOException {
139140
assertThat(file.valueStatsCols()).isNull();
140141
assertThat(file.valueStats().minValues().getFieldCount()).isEqualTo(SCHEMA.getFieldCount());
141142
}
143+
144+
@Test
145+
public void testAppendFile() throws IOException {
146+
LocalFileIO fileIO = LocalFileIO.create();
147+
FileFormat fileFormat = FileFormat.fromIdentifier("parquet", new Options());
148+
DataFilePathFactory sourcePathFactory =
149+
new DataFilePathFactory(
150+
new Path(tempDir + "/source"),
151+
CoreOptions.FILE_FORMAT.defaultValue().toString(),
152+
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
153+
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
154+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
155+
CoreOptions.FILE_COMPRESSION.defaultValue(),
156+
null);
157+
158+
// write 3 small source files
159+
Path[] sourcePaths = new Path[3];
160+
int recordsPerFile = 100;
161+
for (int f = 0; f < 3; f++) {
162+
Path path = sourcePathFactory.newPath();
163+
sourcePaths[f] = path;
164+
RowDataFileWriter sourceWriter =
165+
new RowDataFileWriter(
166+
fileIO,
167+
RollingFileWriter.createFileWriterContext(
168+
fileFormat,
169+
SCHEMA,
170+
SimpleColStatsCollector.createFullStatsFactories(
171+
SCHEMA.getFieldCount()),
172+
CoreOptions.FILE_COMPRESSION.defaultValue()),
173+
path,
174+
SCHEMA,
175+
0L,
176+
() -> new LongCounter(0),
177+
new FileIndexOptions(),
178+
FileSource.COMPACT,
179+
true,
180+
false,
181+
false,
182+
null);
183+
for (int i = 0; i < recordsPerFile; i++) {
184+
sourceWriter.write(GenericRow.of(f * recordsPerFile + i));
185+
}
186+
sourceWriter.close();
187+
}
188+
189+
// use a large target size so all appended files go into one output file
190+
DataFilePathFactory outputPathFactory =
191+
new DataFilePathFactory(
192+
new Path(tempDir + "/output"),
193+
CoreOptions.FILE_FORMAT.defaultValue().toString(),
194+
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
195+
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
196+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
197+
CoreOptions.FILE_COMPRESSION.defaultValue(),
198+
null);
199+
RollingFileWriterImpl<InternalRow, DataFileMeta> writer =
200+
new RollingFileWriterImpl<>(
201+
() ->
202+
new RowDataFileWriter(
203+
fileIO,
204+
RollingFileWriter.createFileWriterContext(
205+
fileFormat,
206+
SCHEMA,
207+
SimpleColStatsCollector.createFullStatsFactories(
208+
SCHEMA.getFieldCount()),
209+
CoreOptions.FILE_COMPRESSION.defaultValue()),
210+
outputPathFactory.newPath(),
211+
SCHEMA,
212+
0L,
213+
() -> new LongCounter(0),
214+
new FileIndexOptions(),
215+
FileSource.COMPACT,
216+
true,
217+
false,
218+
false,
219+
null),
220+
Long.MAX_VALUE);
221+
222+
for (int f = 0; f < 3; f++) {
223+
writer.appendFile(fileIO, sourcePaths[f], recordsPerFile);
224+
}
225+
writer.close();
226+
227+
List<DataFileMeta> results = writer.result();
228+
assertThat(results).hasSize(1);
229+
assertThat(results.get(0).rowCount()).isEqualTo(recordsPerFile * 3);
230+
assertThat(writer.recordCount()).isEqualTo(recordsPerFile * 3);
231+
}
142232
}

paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020

2121
import org.apache.paimon.data.InternalRow;
2222
import org.apache.paimon.format.FormatWriter;
23+
import org.apache.paimon.format.parquet.ParquetInputFile;
24+
import org.apache.paimon.fs.FileIO;
25+
import org.apache.paimon.fs.Path;
2326

27+
import org.apache.parquet.ParquetReadOptions;
28+
import org.apache.parquet.hadoop.ParquetFileReader;
2429
import org.apache.parquet.hadoop.ParquetWriter;
2530
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
2631

@@ -53,6 +58,16 @@ public void addElement(InternalRow datum) throws IOException {
5358
parquetWriter.write(datum);
5459
}
5560

61+
@Override
62+
public void appendFile(FileIO fileIO, Path sourcePath) throws IOException {
63+
long fileLen = fileIO.getFileSize(sourcePath);
64+
ParquetInputFile inputFile = ParquetInputFile.fromPath(fileIO, sourcePath, fileLen);
65+
try (ParquetFileReader reader =
66+
new ParquetFileReader(inputFile, ParquetReadOptions.builder().build(), null)) {
67+
reader.appendTo(parquetWriter.getFileWriter());
68+
}
69+
}
70+
5671
@Override
5772
public void close() throws IOException {
5873
parquetWriter.close();

paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class ParquetWriter<T> implements Closeable {
5959
public static final int MAX_PADDING_SIZE_DEFAULT = 8 * 1024 * 1024; // 8MB
6060

6161
private final InternalParquetRecordWriter<T> writer;
62+
private final ParquetFileWriter fileWriter;
6263
private final CompressionCodecFactory codecFactory;
6364

6465
ParquetWriter(
@@ -98,6 +99,7 @@ public class ParquetWriter<T> implements Closeable {
9899
encodingProps);
99100
fileWriter.start();
100101

102+
this.fileWriter = fileWriter;
101103
this.codecFactory = codecFactory;
102104
CompressionCodecFactory.BytesInputCompressor compressor =
103105
codecFactory.getCompressor(compressionCodecName);
@@ -171,6 +173,10 @@ public long getDataSize() {
171173
return writer.getDataSize();
172174
}
173175

176+
public ParquetFileWriter getFileWriter() {
177+
return fileWriter;
178+
}
179+
174180
/**
175181
* An abstract builder class for ParquetWriter instances.
176182
*

0 commit comments

Comments
 (0)