Skip to content

Commit d0f8323

Browse files
committed
[core] Add blobFieldName to BlobConsumer
1 parent d54c762 commit d0f8323

File tree

4 files changed

+22
-6
lines changed

4 files changed

+22
-6
lines changed

paimon-common/src/main/java/org/apache/paimon/data/BlobConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,5 @@ public interface BlobConsumer {
2626
*
2727
* @return Whether to flush to output stream.
2828
*/
29-
boolean accept(BlobDescriptor blobDescriptor);
29+
boolean accept(String blobFieldName, BlobDescriptor blobDescriptor);
3030
}

paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,12 @@ public void testBlobConsumer() throws Exception {
6767
FileStoreTable table = getTableDefault();
6868
List<BlobDescriptor> blobs = new ArrayList<>();
6969
try (BatchTableWrite write = table.newBatchWriteBuilder().newWrite()) {
70-
write.withBlobConsumer(blobs::add);
70+
write.withBlobConsumer(
71+
(blobFieldName, blobDescriptor) -> {
72+
assertThat(blobFieldName).isEqualTo("f2");
73+
blobs.add(blobDescriptor);
74+
return true;
75+
});
7176
write.write(dataDefault(0, 0));
7277
write.write(dataDefault(0, 0));
7378
}

paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public FormatReaderFactory createReaderFactory(
7979

8080
@Override
8181
public FormatWriterFactory createWriterFactory(RowType type) {
82-
return new BlobFormatWriterFactory();
82+
return new BlobFormatWriterFactory(type);
8383
}
8484

8585
@Override
@@ -99,9 +99,15 @@ public Optional<SimpleStatsExtractor> createStatsExtractor(
9999

100100
private class BlobFormatWriterFactory implements FormatWriterFactory {
101101

102+
private final RowType type;
103+
104+
private BlobFormatWriterFactory(RowType type) {
105+
this.type = type;
106+
}
107+
102108
@Override
103109
public FormatWriter create(PositionOutputStream out, String compression) {
104-
return new BlobFormatWriter(out, writeConsumer);
110+
return new BlobFormatWriter(out, writeConsumer, type);
105111
}
106112
}
107113

paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.fs.Path;
2828
import org.apache.paimon.fs.PositionOutputStream;
2929
import org.apache.paimon.fs.SeekableInputStream;
30+
import org.apache.paimon.types.RowType;
3031
import org.apache.paimon.utils.DeltaVarintCompressor;
3132
import org.apache.paimon.utils.LongArrayList;
3233

@@ -48,15 +49,19 @@ public class BlobFormatWriter implements FileAwareFormatWriter {
4849

4950
private final PositionOutputStream out;
5051
@Nullable private final BlobConsumer writeConsumer;
52+
private final String blobFieldName;
5153
private final CRC32 crc32;
5254
private final byte[] tmpBuffer;
5355
private final LongArrayList lengths;
5456

5557
private String pathString;
5658

57-
public BlobFormatWriter(PositionOutputStream out, @Nullable BlobConsumer writeConsumer) {
59+
public BlobFormatWriter(
60+
PositionOutputStream out, @Nullable BlobConsumer writeConsumer, RowType type) {
5861
this.out = out;
5962
this.writeConsumer = writeConsumer;
63+
checkArgument(type.getFieldCount() == 1, "BlobFormatWriter only support one field.");
64+
this.blobFieldName = type.getFieldNames().get(0);
6065
this.crc32 = new CRC32();
6166
this.tmpBuffer = new byte[4096];
6267
this.lengths = new LongArrayList(16);
@@ -102,7 +107,7 @@ public void addElement(InternalRow element) throws IOException {
102107

103108
if (writeConsumer != null) {
104109
BlobDescriptor descriptor = new BlobDescriptor(pathString, blobPos, blobLength);
105-
boolean flush = writeConsumer.accept(descriptor);
110+
boolean flush = writeConsumer.accept(blobFieldName, descriptor);
106111
if (flush) {
107112
out.flush();
108113
}

0 commit comments

Comments
 (0)