Skip to content

Commit 99e6f2b

Browse files
committed
add support to vector type
1 parent 7ce8c3d commit 99e6f2b

File tree

18 files changed

+150
-58
lines changed

18 files changed

+150
-58
lines changed

paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexFileFormat.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,14 @@ public Void visit(LocalZonedTimestampType localZonedTimestampType) {
175175

176176
@Override
177177
public Void visit(VariantType variantType) {
178-
return null;
178+
throw new UnsupportedOperationException(
179+
"Vortex file format does not support type VARIANT");
179180
}
180181

181182
@Override
182183
public Void visit(BlobType blobType) {
183-
return null;
184+
throw new UnsupportedOperationException(
185+
"Vortex file format does not support type BLOB");
184186
}
185187

186188
@Override

paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexRecordsReader.java

Lines changed: 54 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,30 @@ public class VortexRecordsReader implements FileRecordReader<InternalRow> {
4848
private final ArrayIterator arrayIterator;
4949
private final File vortexFile;
5050
private VectorSchemaRoot reuse;
51+
private Array currentArray;
5152
private long currentPosition = -1;
5253

54+
// TODO: batchSize is currently unused — Vortex scan batch size is determined by the file layout.
55+
// Consider passing it to ScanOptions if Vortex adds batch size control in the future.
5356
public VortexRecordsReader(Path path, RowType projectedRowType, int batchSize) {
5457
this.filePath = path;
5558
this.allocator =
5659
ArrowAllocation.rootAllocator()
5760
.newChildAllocator("vortex-reader", 0, Long.MAX_VALUE);
58-
this.vortexFile = Files.open(path.toUri().toString());
59-
60-
ImmutableScanOptions.Builder scanBuilder = ImmutableScanOptions.builder();
61-
scanBuilder.addAllColumns(projectedRowType.getFieldNames());
62-
this.arrayIterator = vortexFile.newScan(scanBuilder.build());
63-
61+
try {
62+
this.vortexFile = Files.open(path.toUri().toString());
63+
try {
64+
ImmutableScanOptions.Builder scanBuilder = ImmutableScanOptions.builder();
65+
scanBuilder.addAllColumns(projectedRowType.getFieldNames());
66+
this.arrayIterator = vortexFile.newScan(scanBuilder.build());
67+
} catch (Exception e) {
68+
vortexFile.close();
69+
throw e;
70+
}
71+
} catch (Exception e) {
72+
allocator.close();
73+
throw e;
74+
}
6475
this.arrowBatchReader = new ArrowBatchReader(projectedRowType, true);
6576
}
6677

@@ -71,43 +82,51 @@ public FileRecordIterator<InternalRow> readBatch() throws IOException {
7182
return null;
7283
}
7384

85+
releaseCurrentArray();
7486
Array array = arrayIterator.next();
75-
try {
76-
VectorSchemaRoot vsr = array.exportToArrow(allocator, reuse);
77-
this.reuse = vsr;
78-
Iterator<InternalRow> rows = arrowBatchReader.readBatch(vsr).iterator();
79-
80-
return new FileRecordIterator<InternalRow>() {
81-
@Override
82-
public long returnedPosition() {
83-
return currentPosition;
84-
}
85-
86-
@Override
87-
public Path filePath() {
88-
return filePath;
89-
}
90-
91-
@Nullable
92-
@Override
93-
public InternalRow next() {
94-
if (!rows.hasNext()) {
95-
return null;
96-
}
97-
currentPosition++;
98-
return rows.next();
87+
this.currentArray = array;
88+
VectorSchemaRoot vsr = array.exportToArrow(allocator, reuse);
89+
this.reuse = vsr;
90+
Iterator<InternalRow> rows = arrowBatchReader.readBatch(vsr).iterator();
91+
92+
return new FileRecordIterator<InternalRow>() {
93+
@Override
94+
public long returnedPosition() {
95+
return currentPosition;
96+
}
97+
98+
@Override
99+
public Path filePath() {
100+
return filePath;
101+
}
102+
103+
@Nullable
104+
@Override
105+
public InternalRow next() {
106+
if (!rows.hasNext()) {
107+
return null;
99108
}
109+
currentPosition++;
110+
return rows.next();
111+
}
112+
113+
@Override
114+
public void releaseBatch() {
115+
releaseCurrentArray();
116+
}
117+
};
118+
}
100119

101-
@Override
102-
public void releaseBatch() {}
103-
};
104-
} finally {
105-
array.close();
120+
private void releaseCurrentArray() {
121+
if (currentArray != null) {
122+
currentArray.close();
123+
currentArray = null;
106124
}
107125
}
108126

109127
@Override
110128
public void close() throws IOException {
129+
releaseCurrentArray();
111130
if (reuse != null) {
112131
reuse.close();
113132
}

paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexRecordsWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,18 +81,20 @@ public void writeBundle(BundleRecords bundleRecords) throws IOException {
8181

8282
@Override
8383
public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
84+
// Note: bytesWritten tracks Arrow IPC serialized bytes, not the actual Vortex file size
85+
// (which may differ due to Vortex's own compression/encoding).
8486
return suggestedCheck && (bytesWritten > targetSize);
8587
}
8688

8789
@Override
8890
public void close() throws IOException {
8991
flush();
90-
LOG.info("Jni cost: " + jniCost + "ms for file: " + path);
92+
LOG.info("Jni cost: {}ms for file: {}", jniCost, path);
9193
long t1 = System.currentTimeMillis();
9294
nativeWriter.close();
9395
arrowFormatWriter.close();
9496
long closeCost = (System.currentTimeMillis() - t1);
95-
LOG.info("Close cost: " + closeCost + "ms for file: " + path);
97+
LOG.info("Close cost: {}ms for file: {}", closeCost, path);
9698
}
9799

98100
private void flush() throws IOException {

paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexTypeUtils.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
import org.apache.paimon.types.DoubleType;
3232
import org.apache.paimon.types.FloatType;
3333
import org.apache.paimon.types.IntType;
34+
import org.apache.paimon.types.LocalZonedTimestampType;
3435
import org.apache.paimon.types.RowType;
3536
import org.apache.paimon.types.SmallIntType;
3637
import org.apache.paimon.types.TimeType;
3738
import org.apache.paimon.types.TimestampType;
3839
import org.apache.paimon.types.TinyIntType;
3940
import org.apache.paimon.types.VarBinaryType;
4041
import org.apache.paimon.types.VarCharType;
42+
import org.apache.paimon.types.VectorType;
4143

4244
import dev.vortex.api.DType;
4345

@@ -145,18 +147,44 @@ public DType visit(TimestampType timestampType) {
145147
unit = DType.TimeUnit.SECONDS;
146148
} else if (precision <= 3) {
147149
unit = DType.TimeUnit.MILLISECONDS;
148-
} else {
150+
} else if (precision <= 6) {
149151
unit = DType.TimeUnit.MICROSECONDS;
152+
} else {
153+
unit = DType.TimeUnit.NANOSECONDS;
150154
}
151155
return DType.newTimestamp(unit, Optional.empty(), timestampType.isNullable());
152156
}
153157

158+
@Override
159+
public DType visit(LocalZonedTimestampType lzTimestampType) {
160+
DType.TimeUnit unit;
161+
int precision = lzTimestampType.getPrecision();
162+
if (precision <= 0) {
163+
unit = DType.TimeUnit.SECONDS;
164+
} else if (precision <= 3) {
165+
unit = DType.TimeUnit.MILLISECONDS;
166+
} else if (precision <= 6) {
167+
unit = DType.TimeUnit.MICROSECONDS;
168+
} else {
169+
unit = DType.TimeUnit.NANOSECONDS;
170+
}
171+
return DType.newTimestamp(
172+
unit, Optional.of("UTC"), lzTimestampType.isNullable());
173+
}
174+
154175
@Override
155176
public DType visit(ArrayType arrayType) {
156177
DType elementType = arrayType.getElementType().accept(this);
157178
return DType.newList(elementType, arrayType.isNullable());
158179
}
159180

181+
@Override
182+
public DType visit(VectorType vectorType) {
183+
DType elementType = vectorType.getElementType().accept(this);
184+
return DType.newFixedSizeList(
185+
elementType, vectorType.getLength(), vectorType.isNullable());
186+
}
187+
160188
@Override
161189
public DType visit(RowType rowType) {
162190
return VortexTypeUtils.toStructDType(rowType, rowType.isNullable());

paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexWriterFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,14 @@
3030
import java.io.IOException;
3131
import java.util.function.Supplier;
3232

33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
3336
/** A factory to create Vortex {@link FormatWriter}. */
3437
public class VortexWriterFactory implements FormatWriterFactory, SupportsDirectWrite {
3538

39+
private static final Logger LOG = LoggerFactory.getLogger(VortexWriterFactory.class);
40+
3641
private final RowType rowType;
3742
private final Supplier<ArrowFormatWriter> arrowFormatWriterSupplier;
3843

@@ -52,6 +57,12 @@ public FormatWriter create(PositionOutputStream positionOutputStream, String com
5257

5358
@Override
5459
public FormatWriter create(FileIO fileIO, Path path, String compression) throws IOException {
60+
if (compression != null && !compression.isEmpty()) {
61+
LOG.warn(
62+
"Vortex format uses its own internal compression, "
63+
+ "ignoring configured compression: {}",
64+
compression);
65+
}
5566
return new VortexRecordsWriter(rowType, arrowFormatWriterSupplier.get(), path);
5667
}
5768
}

paimon-vortex/paimon-vortex-format/src/test/java/org/apache/paimon/format/vortex/VortexFileFormatReadWriteTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.format.vortex;
2020

21+
import org.apache.paimon.data.BinaryVector;
2122
import org.apache.paimon.data.Decimal;
2223
import org.apache.paimon.data.GenericArray;
2324
import org.apache.paimon.data.GenericRow;
@@ -108,7 +109,8 @@ protected RowType rowTypeForFullTypesTest() {
108109
1,
109110
"double1",
110111
DataTypes.DOUBLE().notNull(),
111-
"nested row double field 1"))));
112+
"nested row double field 1"))))
113+
.field("floatVector", DataTypes.VECTOR(3, DataTypes.FLOAT()));
112114

113115
RowType rowType = builder.build();
114116
if (ThreadLocalRandom.current().nextBoolean()) {
@@ -138,7 +140,8 @@ protected GenericRow expectedRowForFullTypesTest() {
138140
Decimal.fromBigDecimal(new BigDecimal("12312455.22"), 38, 2),
139141
Decimal.fromBigDecimal(new BigDecimal("12455.1"), 10, 1),
140142
new GenericArray(
141-
new Object[] {GenericRow.of(1, 0.1D), GenericRow.of(2, 0.2D)}));
143+
new Object[] {GenericRow.of(1, 0.1D), GenericRow.of(2, 0.2D)}),
144+
BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f, 3.0f}));
142145
return GenericRow.of(values.toArray());
143146
}
144147
}

paimon-vortex/paimon-vortex-format/src/test/java/org/apache/paimon/format/vortex/VortexFileFormatTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,24 @@ public void testValidateDataFields_UnsupportedMultisetType() {
8282
assertThrows(UnsupportedOperationException.class, () -> format.validateDataFields(rowType));
8383
}
8484

85+
@Test
86+
public void testValidateDataFields_UnsupportedVariantType() {
87+
VortexFileFormat format =
88+
new VortexFileFormat(
89+
new FileFormatFactory.FormatContext(new Options(), 1024, 1024));
90+
RowType rowType = RowType.of(DataTypes.VARIANT());
91+
assertThrows(UnsupportedOperationException.class, () -> format.validateDataFields(rowType));
92+
}
93+
94+
@Test
95+
public void testValidateDataFields_UnsupportedBlobType() {
96+
VortexFileFormat format =
97+
new VortexFileFormat(
98+
new FileFormatFactory.FormatContext(new Options(), 1024, 1024));
99+
RowType rowType = RowType.of(DataTypes.BLOB());
100+
assertThrows(UnsupportedOperationException.class, () -> format.validateDataFields(rowType));
101+
}
102+
85103
@Test
86104
public void testValidateDataFields_SupportedTypes() {
87105
VortexFileFormat format =
@@ -105,6 +123,7 @@ public void testValidateDataFields_SupportedTypes() {
105123
.field("float", DataTypes.FLOAT())
106124
.field("time", DataTypes.TIME())
107125
.field("ltz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
126+
.field("vector", DataTypes.VECTOR(3, DataTypes.FLOAT()))
108127
.build();
109128

110129
assertDoesNotThrow(() -> format.validateDataFields(rowType));

paimon-vortex/paimon-vortex-jni/pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ under the License.
3636
<vortex.jni.version>0.24.0</vortex.jni.version>
3737
<protobuf.version>3.25.5</protobuf.version>
3838
<immutables.version>2.10.1</immutables.version>
39-
<!-- vortex-jni uses unshaded Guava, skip spotless/checkstyle rewriting -->
39+
<!-- vortex-jni uses shaded Guava, skip spotless/checkstyle rewriting -->
4040
<spotless.check.skip>true</spotless.check.skip>
4141
<spotless.apply.skip>true</spotless.apply.skip>
4242
<checkstyle.skip>true</checkstyle.skip>
@@ -75,11 +75,11 @@ under the License.
7575
<version>${protobuf.version}</version>
7676
</dependency>
7777

78-
<!-- Guava -->
78+
<!-- Guava (shaded) -->
7979
<dependency>
80-
<groupId>com.google.guava</groupId>
81-
<artifactId>guava</artifactId>
82-
<version>${paimon.shade.guava.version}</version>
80+
<groupId>org.apache.paimon</groupId>
81+
<artifactId>paimon-shade-guava-30</artifactId>
82+
<version>${paimon.shade.guava.version}-${paimon.shade.version}</version>
8383
</dependency>
8484

8585
<!-- Immutables (compile-only annotation processor) -->

paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/api/DType.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,12 @@ static DType newList(DType element, boolean isNullable) {
101101
return new JNIDType(NativeDTypeMethods.newList(jniType.getPointer(), isNullable), true);
102102
}
103103

104+
static DType newFixedSizeList(DType element, int size, boolean isNullable) {
105+
JNIDType jniType = (JNIDType) element;
106+
return new JNIDType(
107+
NativeDTypeMethods.newFixedSizeList(jniType.getPointer(), size, isNullable), true);
108+
}
109+
104110
static DType newStruct(String[] fieldNames, DType[] fieldTypes, boolean isNullable) {
105111
long[] ptrs = new long[fieldTypes.length];
106112
for (int i = 0; i < fieldTypes.length; i++) {

paimon-vortex/paimon-vortex-jni/src/main/java/dev/vortex/api/Files.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package dev.vortex.api;
2020

21-
import com.google.common.base.Preconditions;
21+
import org.apache.paimon.shade.guava30.com.google.common.base.Preconditions;
2222

2323
import dev.vortex.jni.JNIFile;
2424
import dev.vortex.jni.NativeFileMethods;

0 commit comments

Comments
 (0)