Skip to content

Commit 5461435

Browse files
committed
fix
1 parent b05e0d5 commit 5461435

8 files changed

Lines changed: 16 additions & 13 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public interface FormatWriter extends Closeable {
6060
*
6161
* @throws UnsupportedOperationException if the format does not support appending files.
6262
*/
63-
default void appendFile(FileIO fileIO, Path sourcePath) throws IOException {
63+
default void appendFile(FileIO fileIO, Path sourcePath, long fileLength) throws IOException {
6464
throw new UnsupportedOperationException(
6565
getClass().getSimpleName() + " does not support appendFile");
6666
}

paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ public void writeBundle(BundleRecords bundle) throws IOException {
403403
}
404404

405405
@Override
406-
public void appendFile(FileIO fileIO, Path sourcePath, long recordCount) {
406+
public void appendFile(FileIO fileIO, Path sourcePath, long fileLength, long recordCount) {
407407
throw new UnsupportedOperationException(
408408
getClass().getSimpleName() + " does not support appendFile");
409409
}

paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public interface RollingFileWriter<T, R> extends FileWriter<T, List<R>> {
4646

4747
void writeBundle(BundleRecords records) throws IOException;
4848

49-
void appendFile(FileIO fileIO, Path sourcePath, long recordCount) throws IOException;
49+
void appendFile(FileIO fileIO, Path sourcePath, long fileLength, long recordCount)
50+
throws IOException;
5051

5152
@VisibleForTesting
5253
static FileWriterContext createFileWriterContext(

paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,14 @@ public void writeBundle(BundleRecords bundle) throws IOException {
119119
}
120120

121121
@Override
122-
public void appendFile(FileIO fileIO, Path sourcePath, long sourceRecordCount)
122+
public void appendFile(FileIO fileIO, Path sourcePath, long fileLength, long sourceRecordCount)
123123
throws IOException {
124124
try {
125125
if (currentWriter == null) {
126126
openCurrentWriter();
127127
}
128128

129-
currentWriter.appendFile(fileIO, sourcePath, sourceRecordCount);
129+
currentWriter.appendFile(fileIO, sourcePath, fileLength, sourceRecordCount);
130130
recordCount += sourceRecordCount;
131131

132132
if (rollingFile(true)) {

paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,10 @@ public void write(InternalRow row) throws IOException {
8989
}
9090

9191
@Override
92-
public void appendFile(FileIO sourceFileIO, Path sourcePath, long sourceRecordCount)
92+
public void appendFile(
93+
FileIO sourceFileIO, Path sourcePath, long fileLength, long sourceRecordCount)
9394
throws IOException {
94-
super.appendFile(sourceFileIO, sourcePath, sourceRecordCount);
95+
super.appendFile(sourceFileIO, sourcePath, fileLength, sourceRecordCount);
9596
seqNumCounter.add(sourceRecordCount);
9697
}
9798

paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,15 @@ public long recordCount() {
158158
return recordCount;
159159
}
160160

161-
public void appendFile(FileIO sourceFileIO, Path sourcePath, long sourceRecordCount)
161+
public void appendFile(
162+
FileIO sourceFileIO, Path sourcePath, long fileLength, long sourceRecordCount)
162163
throws IOException {
163164
if (closed) {
164165
throw new RuntimeException("Writer has already closed!");
165166
}
166167

167168
try {
168-
writer.appendFile(sourceFileIO, sourcePath);
169+
writer.appendFile(sourceFileIO, sourcePath, fileLength);
169170
recordCount += sourceRecordCount;
170171
} catch (Throwable e) {
171172
LOG.warn("Exception occurs when writing file {}. Cleaning up.", path, e);

paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ public void testAppendFile() throws IOException {
220220
Long.MAX_VALUE);
221221

222222
for (int f = 0; f < 3; f++) {
223-
writer.appendFile(fileIO, sourcePaths[f], recordsPerFile);
223+
long fileLength = fileIO.getFileSize(sourcePaths[f]);
224+
writer.appendFile(fileIO, sourcePaths[f], fileLength, recordsPerFile);
224225
}
225226
writer.close();
226227

paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,8 @@ public void addElement(InternalRow datum) throws IOException {
5959
}
6060

6161
@Override
62-
public void appendFile(FileIO fileIO, Path sourcePath) throws IOException {
63-
long fileLen = fileIO.getFileSize(sourcePath);
64-
ParquetInputFile inputFile = ParquetInputFile.fromPath(fileIO, sourcePath, fileLen);
62+
public void appendFile(FileIO fileIO, Path sourcePath, long fileLength) throws IOException {
63+
ParquetInputFile inputFile = ParquetInputFile.fromPath(fileIO, sourcePath, fileLength);
6564
try (ParquetFileReader reader =
6665
new ParquetFileReader(inputFile, ParquetReadOptions.builder().build(), null)) {
6766
reader.appendTo(parquetWriter.getFileWriter());

0 commit comments

Comments
 (0)