Skip to content

Commit d54c762

Browse files
authored
[core] Introduce withBlobConsumer in TableWrite (apache#7074)
1 parent 68d4838 commit d54c762

File tree

20 files changed

+302
-180
lines changed

20 files changed

+302
-180
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.data;
20+
21+
/** A consumer for blob. */
22+
public interface BlobConsumer {
23+
24+
/**
25+
* Accept a blob descriptor.
26+
*
27+
* @return Whether to flush to output stream.
28+
*/
29+
boolean accept(BlobDescriptor blobDescriptor);
30+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.format;
20+
21+
import org.apache.paimon.fs.Path;
22+
23+
/** A {@link FormatWriter} that can be aware of the file. */
24+
public interface FileAwareFormatWriter extends FormatWriter {
25+
26+
/** Set the path of the file. */
27+
void setFile(Path file);
28+
29+
/** Whether to delete the file upon abort. */
30+
boolean deleteFileUponAbort();
31+
}

paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.compact.CompactDeletionFile;
2323
import org.apache.paimon.compact.CompactManager;
2424
import org.apache.paimon.compression.CompressOptions;
25+
import org.apache.paimon.data.BlobConsumer;
2526
import org.apache.paimon.data.InternalRow;
2627
import org.apache.paimon.disk.IOManager;
2728
import org.apache.paimon.disk.RowBuffer;
@@ -92,6 +93,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner {
9293
@Nullable private final IOManager ioManager;
9394
private final FileIndexOptions fileIndexOptions;
9495
private final MemorySize maxDiskSize;
96+
@Nullable private final BlobConsumer blobConsumer;
9597

9698
@Nullable private CompactDeletionFile compactDeletionFile;
9799
private SinkWriter<InternalRow> sinkWriter;
@@ -120,7 +122,8 @@ public AppendOnlyWriter(
120122
MemorySize maxDiskSize,
121123
FileIndexOptions fileIndexOptions,
122124
boolean asyncFileWrite,
123-
boolean statsDenseStore) {
125+
boolean statsDenseStore,
126+
@Nullable BlobConsumer blobConsumer) {
124127
this.fileIO = fileIO;
125128
this.schemaId = schemaId;
126129
this.fileFormat = fileFormat;
@@ -134,6 +137,7 @@ public AppendOnlyWriter(
134137
this.forceCompact = forceCompact;
135138
this.asyncFileWrite = asyncFileWrite;
136139
this.statsDenseStore = statsDenseStore;
140+
this.blobConsumer = blobConsumer;
137141
this.newFiles = new ArrayList<>();
138142
this.deletedFiles = new ArrayList<>();
139143
this.compactBefore = new ArrayList<>();
@@ -311,7 +315,8 @@ private RollingFileWriter<InternalRow, DataFileMeta> createRollingRowWriter() {
311315
fileIndexOptions,
312316
FileSource.APPEND,
313317
asyncFileWrite,
314-
statsDenseStore);
318+
statsDenseStore,
319+
blobConsumer);
315320
}
316321
return new RowDataRollingFileWriter(
317322
fileIO,

paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.append;
2020

21+
import org.apache.paimon.data.BlobConsumer;
2122
import org.apache.paimon.data.InternalRow;
2223
import org.apache.paimon.fileindex.FileIndexOptions;
2324
import org.apache.paimon.format.FileFormat;
@@ -44,6 +45,8 @@
4445
import org.slf4j.Logger;
4546
import org.slf4j.LoggerFactory;
4647

48+
import javax.annotation.Nullable;
49+
4750
import java.io.IOException;
4851
import java.util.ArrayList;
4952
import java.util.Collections;
@@ -91,7 +94,6 @@ public class RollingBlobFileWriter implements RollingFileWriter<InternalRow, Dat
9194
RollingFileWriterImpl<InternalRow, DataFileMeta>, List<DataFileMeta>>>
9295
blobWriterFactory;
9396
private final long targetFileSize;
94-
private final long blobTargetFileSize;
9597

9698
// State management
9799
private final List<FileWriterAbortExecutor> closedWriters;
@@ -118,11 +120,10 @@ public RollingBlobFileWriter(
118120
FileIndexOptions fileIndexOptions,
119121
FileSource fileSource,
120122
boolean asyncFileWrite,
121-
boolean statsDenseStore) {
122-
123+
boolean statsDenseStore,
124+
@Nullable BlobConsumer blobConsumer) {
123125
// Initialize basic fields
124126
this.targetFileSize = targetFileSize;
125-
this.blobTargetFileSize = blobTargetFileSize;
126127
this.results = new ArrayList<>();
127128
this.closedWriters = new ArrayList<>();
128129

@@ -161,7 +162,8 @@ public RollingBlobFileWriter(
161162
fileSource,
162163
asyncFileWrite,
163164
statsDenseStore,
164-
blobTargetFileSize);
165+
blobTargetFileSize,
166+
blobConsumer);
165167
}
166168

167169
/** Creates a factory for normal data writers. */
@@ -221,9 +223,10 @@ public RollingBlobFileWriter(
221223
FileSource fileSource,
222224
boolean asyncFileWrite,
223225
boolean statsDenseStore,
224-
long targetFileSize) {
225-
226+
long targetFileSize,
227+
@Nullable BlobConsumer blobConsumer) {
226228
BlobFileFormat blobFileFormat = new BlobFileFormat();
229+
blobFileFormat.setWriteConsumer(blobConsumer);
227230
List<String> blobNames = blobType.getFieldNames();
228231

229232
// Validate blob field count
@@ -375,7 +378,7 @@ private void closeCurrentWriter() throws IOException {
375378
/** Closes the main writer and returns its metadata. */
376379
private DataFileMeta closeMainWriter() throws IOException {
377380
currentWriter.close();
378-
closedWriters.add(currentWriter.writer().abortExecutor());
381+
currentWriter.writer().abortExecutor().ifPresent(closedWriters::add);
379382
return currentWriter.result();
380383
}
381384

paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ protected void closeCurrentWriter() throws IOException {
129129
// only store abort executor in memory
130130
// cannot store whole writer, it includes lots of memory for example column vectors to read
131131
// and write
132-
closedWriters.add(currentWriter.abortExecutor());
132+
currentWriter.abortExecutor().ifPresent(closedWriters::add);
133133
results.add(currentWriter.result());
134134
currentWriter = null;
135135
}

paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.data.InternalRow;
2222
import org.apache.paimon.format.BundleFormatWriter;
23+
import org.apache.paimon.format.FileAwareFormatWriter;
2324
import org.apache.paimon.format.FormatWriter;
2425
import org.apache.paimon.format.FormatWriterFactory;
2526
import org.apache.paimon.format.SupportsDirectWrite;
@@ -36,6 +37,7 @@
3637

3738
import java.io.IOException;
3839
import java.io.UncheckedIOException;
40+
import java.util.Optional;
3941
import java.util.function.Function;
4042

4143
/**
@@ -52,8 +54,9 @@ public abstract class SingleFileWriter<T, R> implements FileWriter<T, R> {
5254
protected final Path path;
5355
private final Function<T, InternalRow> converter;
5456

57+
private boolean deleteFileUponAbort;
5558
private FormatWriter writer;
56-
private PositionOutputStream out;
59+
@Nullable private PositionOutputStream out;
5760

5861
@Nullable private Long outputBytes;
5962
private long recordCount;
@@ -69,6 +72,8 @@ public SingleFileWriter(
6972
this.fileIO = fileIO;
7073
this.path = path;
7174
this.converter = converter;
75+
// true first to clean file in exception
76+
this.deleteFileUponAbort = true;
7277

7378
try {
7479
if (factory instanceof SupportsDirectWrite) {
@@ -80,6 +85,12 @@ public SingleFileWriter(
8085
}
8186
writer = factory.create(out, compression);
8287
}
88+
89+
if (writer instanceof FileAwareFormatWriter) {
90+
FileAwareFormatWriter fileAwareFormatWriter = (FileAwareFormatWriter) writer;
91+
fileAwareFormatWriter.setFile(path);
92+
deleteFileUponAbort = fileAwareFormatWriter.deleteFileUponAbort();
93+
}
8394
} catch (IOException e) {
8495
LOG.warn(
8596
"Failed to open the bulk writer, closing the output stream and throw the error.",
@@ -118,7 +129,7 @@ public void writeBundle(BundleRecords bundle) throws IOException {
118129
}
119130
recordCount += bundle.rowCount();
120131
} catch (Throwable e) {
121-
LOG.warn("Exception occurs when writing file " + path + ". Cleaning up.", e);
132+
LOG.warn("Exception occurs when writing file {}. Cleaning up.", path, e);
122133
abort();
123134
throw e;
124135
}
@@ -135,7 +146,7 @@ protected InternalRow writeImpl(T record) throws IOException {
135146
recordCount++;
136147
return rowData;
137148
} catch (Throwable e) {
138-
LOG.warn("Exception occurs when writing file " + path + ". Cleaning up.", e);
149+
LOG.warn("Exception occurs when writing file {}. Cleaning up.", path, e);
139150
abort();
140151
throw e;
141152
}
@@ -160,15 +171,13 @@ public void abort() {
160171
IOUtils.closeQuietly(out);
161172
out = null;
162173
}
163-
fileIO.deleteQuietly(path);
174+
abortExecutor().ifPresent(FileWriterAbortExecutor::abort);
164175
}
165176

166-
public FileWriterAbortExecutor abortExecutor() {
167-
if (!closed) {
168-
throw new RuntimeException("Writer should be closed!");
169-
}
170-
171-
return new FileWriterAbortExecutor(fileIO, path);
177+
public Optional<FileWriterAbortExecutor> abortExecutor() {
178+
return deleteFileUponAbort
179+
? Optional.of(new FileWriterAbortExecutor(fileIO, path))
180+
: Optional.empty();
172181
}
173182

174183
@Override

paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.annotation.VisibleForTesting;
2525
import org.apache.paimon.compact.CompactDeletionFile;
2626
import org.apache.paimon.data.BinaryRow;
27+
import org.apache.paimon.data.BlobConsumer;
2728
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
2829
import org.apache.paimon.disk.IOManager;
2930
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -133,6 +134,11 @@ public FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFacto
133134
return this;
134135
}
135136

137+
@Override
138+
public FileStoreWrite<T> withBlobConsumer(BlobConsumer blobConsumer) {
139+
return this;
140+
}
141+
136142
@Override
137143
public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
138144
this.ignorePreviousFiles = ignorePreviousFiles;

paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.append.cluster.Sorter;
2525
import org.apache.paimon.compact.CompactManager;
2626
import org.apache.paimon.data.BinaryRow;
27+
import org.apache.paimon.data.BlobConsumer;
2728
import org.apache.paimon.data.InternalRow;
2829
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
2930
import org.apache.paimon.deletionvectors.DeletionVector;
@@ -82,6 +83,7 @@ public abstract class BaseAppendFileStoreWrite extends MemoryFileStoreWrite<Inte
8283
private @Nullable List<String> writeCols;
8384
private boolean forceBufferSpill = false;
8485
private boolean withBlob;
86+
private @Nullable BlobConsumer blobConsumer;
8587

8688
public BaseAppendFileStoreWrite(
8789
FileIO fileIO,
@@ -109,6 +111,12 @@ public BaseAppendFileStoreWrite(
109111
this.fileIndexOptions = options.indexColumnsOptions();
110112
}
111113

114+
@Override
115+
public BaseAppendFileStoreWrite withBlobConsumer(BlobConsumer blobConsumer) {
116+
this.blobConsumer = blobConsumer;
117+
return this;
118+
}
119+
112120
@Override
113121
protected RecordWriter<InternalRow> createWriter(
114122
BinaryRow partition,
@@ -142,7 +150,8 @@ protected RecordWriter<InternalRow> createWriter(
142150
options.writeBufferSpillDiskSize(),
143151
fileIndexOptions,
144152
options.asyncFileWrite(),
145-
options.statsDenseStore());
153+
options.statsDenseStore(),
154+
blobConsumer);
146155
}
147156

148157
@Override

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.FileStore;
2222
import org.apache.paimon.data.BinaryRow;
23+
import org.apache.paimon.data.BlobConsumer;
2324
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
2425
import org.apache.paimon.disk.IOManager;
2526
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -64,6 +65,8 @@ default void withWriteType(RowType writeType) {
6465
*/
6566
FileStoreWrite<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory);
6667

68+
FileStoreWrite<T> withBlobConsumer(BlobConsumer blobConsumer);
69+
6770
/**
6871
* Set whether the write operation should ignore previously stored files.
6972
*

paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.casting.DefaultValueRow;
2323
import org.apache.paimon.data.BinaryRow;
24+
import org.apache.paimon.data.BlobConsumer;
2425
import org.apache.paimon.data.InternalRow;
2526
import org.apache.paimon.disk.IOManager;
2627
import org.apache.paimon.fs.FileIO;
@@ -43,14 +44,13 @@
4344
/** {@link TableWrite} implementation for format table. */
4445
public class FormatTableWrite implements BatchTableWrite {
4546

46-
private RowType rowType;
47+
private final RowType rowType;
4748
private final FormatTableFileWriter write;
4849
private final FormatTableRowPartitionKeyExtractor partitionKeyExtractor;
4950

5051
private final int[] notNullFieldIndex;
5152
private final @Nullable DefaultValueRow defaultValueRow;
5253
private final ProjectedRow projectedRow;
53-
private final RowType writeRowType;
5454

5555
public FormatTableWrite(
5656
FileIO fileIO,
@@ -68,7 +68,7 @@ public FormatTableWrite(
6868
.collect(Collectors.toList());
6969
this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
7070
this.defaultValueRow = DefaultValueRow.create(rowType);
71-
this.writeRowType =
71+
RowType writeRowType =
7272
rowType.project(
7373
rowType.getFieldNames().stream()
7474
.filter(name -> !partitionType.getFieldNames().contains(name))
@@ -117,6 +117,11 @@ public TableWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
117117
return this;
118118
}
119119

120+
@Override
121+
public TableWrite withBlobConsumer(BlobConsumer blobConsumer) {
122+
throw new UnsupportedOperationException();
123+
}
124+
120125
@Override
121126
public BatchTableWrite withIOManager(IOManager ioManager) {
122127
return this;

0 commit comments

Comments
 (0)