Skip to content

Commit 2786f20

Browse files
committed
[python] Support zstd decompress for btree index
1 parent 6948f85 commit 2786f20

File tree

3 files changed

+115
-3
lines changed

3 files changed

+115
-3
lines changed

paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED;
8282
import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
8383
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
84+
import static org.apache.paimon.globalindex.btree.BTreeIndexOptions.BTREE_INDEX_COMPRESSION;
8485
import static org.apache.paimon.table.SimpleTableTestBase.getResult;
8586
import static org.assertj.core.api.Assertions.assertThat;
8687

@@ -414,6 +415,7 @@ public void testBtreeIndexWrite() throws Exception {
414415
testBtreeIndexWriteString();
415416
testBtreeIndexWriteInt();
416417
testBtreeIndexWriteBigInt();
418+
testBtreeIndexWriteLarge();
417419
}
418420

419421
private void testBtreeIndexWriteString() throws Exception {
@@ -498,6 +500,75 @@ private <T> void testBtreeIndexWriteGeneric(
498500
assertThat(result).containsOnly("v2");
499501
}
500502

503+
private void testBtreeIndexWriteLarge() throws Exception {
504+
// create table
505+
RowType rowType =
506+
RowType.of(
507+
new DataType[] {DataTypes.STRING(), DataTypes.STRING()},
508+
new String[] {"k", "v"});
509+
Options options = new Options();
510+
Path tablePath = new Path(warehouse.toString() + "/default.db/test_btree_index_large");
511+
options.set(PATH, tablePath.toString());
512+
options.set(ROW_TRACKING_ENABLED, true);
513+
options.set(DATA_EVOLUTION_ENABLED, true);
514+
options.set(GLOBAL_INDEX_ENABLED, true);
515+
options.set(BTREE_INDEX_COMPRESSION, "zstd");
516+
TableSchema tableSchema =
517+
SchemaUtils.forceCommit(
518+
new SchemaManager(LocalFileIO.create(), tablePath),
519+
new Schema(
520+
rowType.getFields(),
521+
Collections.emptyList(),
522+
Collections.emptyList(),
523+
options.toMap(),
524+
""));
525+
AppendOnlyFileStoreTable table =
526+
new AppendOnlyFileStoreTable(
527+
FileIOFinder.find(tablePath),
528+
tablePath,
529+
tableSchema,
530+
CatalogEnvironment.empty());
531+
532+
// write data
533+
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
534+
try (BatchTableWrite write = writeBuilder.newWrite();
535+
BatchTableCommit commit = writeBuilder.newCommit()) {
536+
for (int i = 0; i < 2000; i++) {
537+
write.write(
538+
GenericRow.of(
539+
BinaryString.fromString("k" + i),
540+
BinaryString.fromString("v" + i)));
541+
}
542+
commit.commit(write.prepareCommit());
543+
}
544+
545+
// build index
546+
BTreeGlobalIndexBuilder builder =
547+
new BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
548+
try (BatchTableCommit commit = writeBuilder.newCommit()) {
549+
commit.commit(builder.build(builder.scan(), IOManager.create(warehouse.toString())));
550+
}
551+
552+
// assert index
553+
List<IndexManifestEntry> indexEntries =
554+
table.indexManifestFileReader().read(table.latestSnapshot().get().indexManifest);
555+
assertThat(indexEntries)
556+
.singleElement()
557+
.matches(entry -> entry.indexFile().rowCount() == 2000);
558+
559+
// read index
560+
PredicateBuilder predicateBuilder = new PredicateBuilder(table.rowType());
561+
ReadBuilder readBuilder =
562+
table.newReadBuilder()
563+
.withFilter(predicateBuilder.equal(0, BinaryString.fromString("k2")));
564+
List<String> result = new ArrayList<>();
565+
readBuilder
566+
.newRead()
567+
.createReader(readBuilder.newScan().plan())
568+
.forEachRemaining(r -> result.add(r.getString(1).toString()));
569+
assertThat(result).containsOnly("v2");
570+
}
571+
501572
// Helper method from TableTestBase
502573
protected Identifier identifier(String tableName) {
503574
return new Identifier(database, tableName);

paimon-python/pypaimon/globalindex/btree/sst_file_reader.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,18 @@
3030
from typing import Optional, Callable
3131
from typing import BinaryIO
3232

33+
try:
34+
import lz4.block
35+
LZ4_AVAILABLE = True
36+
except ImportError:
37+
LZ4_AVAILABLE = False
38+
39+
try:
40+
import python_lzo
41+
LZO_AVAILABLE = True
42+
except ImportError:
43+
LZO_AVAILABLE = False
44+
3345
from pypaimon.globalindex.btree.btree_file_footer import BlockHandle
3446
from pypaimon.globalindex.btree.block_entry import BlockEntry
3547
from pypaimon.globalindex.btree.block_reader import BlockReader, BlockIterator
@@ -136,9 +148,6 @@ def _read_block(self, block_handle: BlockHandle) -> BlockReader:
136148

137149
trailer_offset = len(block_data) - 5
138150
compression_type = block_data[trailer_offset]
139-
if compression_type != 0:
140-
raise ValueError("Compression type not supported")
141-
142151
crc32_value = struct.unpack('<I', block_data[trailer_offset + 1:trailer_offset + 5])[0]
143152

144153
# Extract block data (without trailer)
@@ -149,6 +158,21 @@ def _read_block(self, block_handle: BlockHandle) -> BlockReader:
149158
if actual_crc32 != crc32_value:
150159
raise ValueError(f"CRC32 mismatch: expected {crc32_value}, got {actual_crc32}")
151160

161+
# Decompress if needed
162+
if compression_type == 1: # ZSTD
163+
import zstandard as zstd
164+
from io import BytesIO
165+
decompressor = zstd.ZstdDecompressor()
166+
memory_input = MemorySliceInput(block_bytes)
167+
expected_len = memory_input.read_var_len_int()
168+
compressed = block_bytes[memory_input.position():]
169+
with decompressor.stream_reader(BytesIO(compressed)) as reader:
170+
block_bytes = reader.read()
171+
if len(block_bytes) != expected_len:
172+
raise ValueError("Corrupted data, decompression failed.")
173+
elif compression_type != 0: # Not NONE
174+
raise ValueError(f"Compression type {compression_type} not supported")
175+
152176
return BlockReader.create(block_bytes, self.comparator)
153177

154178
def create_iterator(self) -> SstFileIterator:

paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@ def test_read_btree_index_table(self):
324324
self._test_read_btree_index_generic("test_btree_index_string", "k2", pa.string())
325325
self._test_read_btree_index_generic("test_btree_index_int", 200, pa.int32())
326326
self._test_read_btree_index_generic("test_btree_index_bigint", 2000, pa.int64())
327+
self._test_read_btree_index_large()
327328

328329
def _test_read_btree_index_generic(self, table_name: str, k, k_type):
329330
table = self.catalog.get_table('default.' + table_name)
@@ -344,3 +345,19 @@ def _test_read_btree_index_generic(self, table_name: str, k, k_type):
344345
("v", pa.string())
345346
]))
346347
self.assertEqual(expected, actual)
348+
349+
def _test_read_btree_index_large(self):
350+
table = self.catalog.get_table('default.test_btree_index_large')
351+
read_builder: ReadBuilder = table.new_read_builder()
352+
predicate_builder = read_builder.new_predicate_builder()
353+
354+
# read equal index
355+
read_builder.with_filter(predicate_builder.equal('k', 'k2'))
356+
table_read = read_builder.new_read()
357+
splits = read_builder.new_scan().plan().splits()
358+
actual = table_sort_by(table_read.to_arrow(splits), 'k')
359+
expected = pa.Table.from_pydict({
360+
'k': ["k2"],
361+
'v': ["v2"]
362+
})
363+
self.assertEqual(expected, actual)

0 commit comments

Comments
 (0)