Skip to content

Commit d91ba56

Browse files
committed
[core] Introduce 'pk-clustering-override' to clustering by non-primary key fields
1 parent e63d9ab commit d91ba56

File tree

21 files changed

+2131
-43
lines changed

21 files changed

+2131
-43
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,16 @@
2727
</thead>
2828
<tbody>
2929
<tr>
30-
<td><h5>aggregation.remove-record-on-delete</h5></td>
30+
<td><h5>add-column-before-partition</h5></td>
3131
<td style="word-wrap: break-word;">false</td>
3232
<td>Boolean</td>
33-
<td>Whether to remove the whole row in aggregation engine when -D records are received.</td>
33+
<td>If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables.</td>
3434
</tr>
3535
<tr>
36-
<td><h5>add-column-before-partition</h5></td>
36+
<td><h5>aggregation.remove-record-on-delete</h5></td>
3737
<td style="word-wrap: break-word;">false</td>
3838
<td>Boolean</td>
39-
<td>If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables.</td>
39+
<td>Whether to remove the whole row in aggregation engine when -D records are received.</td>
4040
</tr>
4141
<tr>
4242
<td><h5>alter-column-null-to-not-null.disabled</h5></td>
@@ -1055,6 +1055,12 @@
10551055
<td>String</td>
10561056
<td>You can specify a pattern to get a timestamp from partitions. The formatter pattern is defined by 'partition.timestamp-formatter'.<ul><li>By default, read from the first field.</li><li>If the timestamp in the partition is a single field called 'dt', you can use '$dt'.</li><li>If it is spread across multiple fields for year, month, day, and hour, you can use '$year-$month-$day $hour:00:00'.</li><li>If the timestamp is in fields dt and hour, you can use '$dt $hour:00:00'.</li></ul></td>
10571057
</tr>
1058+
<tr>
1059+
<td><h5>pk-clustering-override</h5></td>
1060+
<td style="word-wrap: break-word;">false</td>
1061+
<td>Boolean</td>
1062+
<td>Enables clustering by non-primary key fields. When set to true, the physical sort order of data files is determined by the configured 'clustering.columns' instead of the primary key, optimizing query performance for non-PK columns.</td>
1063+
</tr>
10581064
<tr>
10591065
<td><h5>postpone.batch-write-fixed-bucket</h5></td>
10601066
<td style="word-wrap: break-word;">true</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2314,6 +2314,15 @@ public InlineElement getDescription() {
23142314
.withDescription(
23152315
"The interval for checking visibility when visibility-callback enabled.");
23162316

2317+
public static final ConfigOption<Boolean> PK_CLUSTERING_OVERRIDE =
2318+
key("pk-clustering-override")
2319+
.booleanType()
2320+
.defaultValue(false)
2321+
.withDescription(
2322+
"Enables clustering by non-primary key fields. When set to true, the physical"
2323+
+ " sort order of data files is determined by the configured 'clustering.columns'"
2324+
+ " instead of the primary key, optimizing query performance for non-PK columns.");
2325+
23172326
private final Options options;
23182327

23192328
public CoreOptions(Map<String, String> options) {
@@ -2371,6 +2380,10 @@ public TableType type() {
23712380
return options.get(TYPE);
23722381
}
23732382

2383+
public boolean pkClusteringOverride() {
2384+
return options.get(PK_CLUSTERING_OVERRIDE);
2385+
}
2386+
23742387
public String formatType() {
23752388
return normalizeFileFormat(options.get(FILE_FORMAT));
23762389
}

paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -474,9 +474,9 @@ public static class Builder {
474474
private long memTableFlushThreshold = 64 * 1024 * 1024; // 64 MB
475475
private long maxSstFileSize = 8 * 1024 * 1024; // 8 MB
476476
private int blockSize = 32 * 1024; // 32 KB
477-
private long cacheSize = 128 * 1024 * 1024; // 128 MB
478477
private int level0FileNumCompactTrigger = 4;
479478
private int sizeRatio = 10;
479+
private CacheManager cacheManager;
480480
private CompressOptions compressOptions = CompressOptions.defaultOptions();
481481
private Comparator<MemorySlice> keyComparator = MemorySlice::compareTo;
482482

@@ -502,9 +502,9 @@ public Builder blockSize(int blockSize) {
502502
return this;
503503
}
504504

505-
/** Set the block cache size in bytes. Default is 128 MB. */
506-
public Builder cacheSize(long cacheSize) {
507-
this.cacheSize = cacheSize;
505+
/** Set the cache manager. */
506+
public Builder cacheManager(CacheManager cacheManager) {
507+
this.cacheManager = cacheManager;
508508
return this;
509509
}
510510

@@ -551,7 +551,9 @@ public SimpleLsmKvDb build() {
551551
}
552552
}
553553

554-
CacheManager cacheManager = new CacheManager(MemorySize.ofBytes(cacheSize));
554+
if (cacheManager == null) {
555+
cacheManager = new CacheManager(MemorySize.ofMebiBytes(8));
556+
}
555557
SortLookupStoreFactory factory =
556558
new SortLookupStoreFactory(
557559
keyComparator, cacheManager, blockSize, compressOptions);

paimon-common/src/main/java/org/apache/paimon/utils/VarLengthIntUtils.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.io.DataInput;
2222
import java.io.DataOutput;
2323
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.io.OutputStream;
2426

2527
/* This file is based on source code of LongPacker from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache
2628
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -126,6 +128,24 @@ public static int encodeInt(DataOutput os, int value) throws IOException {
126128
return i;
127129
}
128130

131+
/** @return bytes length. */
132+
public static int encodeInt(OutputStream os, int value) throws IOException {
133+
134+
if (value < 0) {
135+
throw new IllegalArgumentException("negative value: v=" + value);
136+
}
137+
138+
int i = 1;
139+
while ((value & ~0x7F) != 0) {
140+
os.write(((value & 0x7F) | 0x80));
141+
value >>>= 7;
142+
i++;
143+
}
144+
145+
os.write((byte) value);
146+
return i;
147+
}
148+
129149
public static int decodeInt(DataInput is) throws IOException {
130150
for (int offset = 0, result = 0; offset < 32; offset += 7) {
131151
int b = is.readUnsignedByte();
@@ -136,4 +156,15 @@ public static int decodeInt(DataInput is) throws IOException {
136156
}
137157
throw new Error("Malformed integer.");
138158
}
159+
160+
public static int decodeInt(InputStream is) throws IOException {
161+
for (int offset = 0, result = 0; offset < 32; offset += 7) {
162+
int b = is.read() & 0xFF;
163+
result |= (b & 0x7F) << offset;
164+
if ((b & 0x80) == 0) {
165+
return result;
166+
}
167+
}
168+
throw new Error("Malformed integer.");
169+
}
139170
}

paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ private SimpleLsmKvDb createDb() {
4848
return SimpleLsmKvDb.builder(dataDirectory)
4949
.memTableFlushThreshold(1024)
5050
.blockSize(256)
51-
.cacheSize(4 * 1024 * 1024)
5251
.level0FileNumCompactTrigger(4)
5352
.compressOptions(new CompressOptions("none", 1))
5453
.build();
@@ -197,7 +196,6 @@ public void testCompaction() throws IOException {
197196
SimpleLsmKvDb.builder(dataDirectory)
198197
.memTableFlushThreshold(256)
199198
.blockSize(128)
200-
.cacheSize(4 * 1024 * 1024)
201199
.level0FileNumCompactTrigger(3)
202200
.compressOptions(new CompressOptions("none", 1))
203201
.build();
@@ -320,7 +318,6 @@ public void testCloseFlushesMemTable() throws IOException {
320318
SimpleLsmKvDb.builder(dbDir)
321319
.memTableFlushThreshold(1024 * 1024) // large threshold, won't auto-flush
322320
.blockSize(256)
323-
.cacheSize(4 * 1024 * 1024)
324321
.level0FileNumCompactTrigger(10)
325322
.compressOptions(new CompressOptions("none", 1))
326323
.build();
@@ -349,7 +346,6 @@ public void testWithCompression() throws IOException {
349346
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "compressed-db"))
350347
.memTableFlushThreshold(512)
351348
.blockSize(256)
352-
.cacheSize(4 * 1024 * 1024)
353349
.level0FileNumCompactTrigger(4)
354350
.compressOptions(CompressOptions.defaultOptions())
355351
.build();
@@ -397,7 +393,6 @@ public void testUniversalCompactionTriggeredByRunCount() throws IOException {
397393
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "universal-trigger-db"))
398394
.memTableFlushThreshold(1024 * 1024)
399395
.blockSize(256)
400-
.cacheSize(4 * 1024 * 1024)
401396
.level0FileNumCompactTrigger(3)
402397
.compressOptions(new CompressOptions("none", 1))
403398
.build();
@@ -434,7 +429,6 @@ public void testUniversalCompactionWithOverlappingKeys() throws IOException {
434429
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "universal-overlap-db"))
435430
.memTableFlushThreshold(1024 * 1024)
436431
.blockSize(256)
437-
.cacheSize(4 * 1024 * 1024)
438432
.level0FileNumCompactTrigger(3)
439433
.compressOptions(new CompressOptions("none", 1))
440434
.build();
@@ -470,7 +464,6 @@ public void testUniversalCompactionReducesFileCount() throws IOException {
470464
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "universal-reduce-db"))
471465
.memTableFlushThreshold(1024 * 1024)
472466
.blockSize(256)
473-
.cacheSize(4 * 1024 * 1024)
474467
.level0FileNumCompactTrigger(3)
475468
.compressOptions(new CompressOptions("none", 1))
476469
.build();
@@ -513,7 +506,6 @@ public void testUniversalCompactionMultipleRounds() throws IOException {
513506
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "universal-multi-db"))
514507
.memTableFlushThreshold(512)
515508
.blockSize(128)
516-
.cacheSize(4 * 1024 * 1024)
517509
.level0FileNumCompactTrigger(3)
518510
.sizeRatio(50)
519511
.compressOptions(new CompressOptions("none", 1))
@@ -545,7 +537,6 @@ public void testUniversalCompactionPreservesTombstonesInPartialMerge() throws IO
545537
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "universal-tombstone-db"))
546538
.memTableFlushThreshold(1024 * 1024)
547539
.blockSize(256)
548-
.cacheSize(4 * 1024 * 1024)
549540
.level0FileNumCompactTrigger(4)
550541
.compressOptions(new CompressOptions("none", 1))
551542
.build();
@@ -593,7 +584,6 @@ public void testUniversalCompactionWithUpdatesAcrossRuns() throws IOException {
593584
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "universal-update-db"))
594585
.memTableFlushThreshold(1024 * 1024)
595586
.blockSize(256)
596-
.cacheSize(4 * 1024 * 1024)
597587
.level0FileNumCompactTrigger(3)
598588
.compressOptions(new CompressOptions("none", 1))
599589
.build();
@@ -638,7 +628,6 @@ public void testLargeScaleFlushCompactAndFullCompact() throws IOException {
638628
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "large-scale-db"))
639629
.memTableFlushThreshold(256)
640630
.blockSize(64)
641-
.cacheSize(4 * 1024 * 1024)
642631
.level0FileNumCompactTrigger(3)
643632
.sizeRatio(20)
644633
.compressOptions(new CompressOptions("none", 1))
@@ -792,7 +781,6 @@ public void testLevelStats() throws IOException {
792781
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "stats-db"))
793782
.memTableFlushThreshold(1024 * 1024)
794783
.blockSize(256)
795-
.cacheSize(4 * 1024 * 1024)
796784
.level0FileNumCompactTrigger(10)
797785
.compressOptions(new CompressOptions("none", 1))
798786
.build();
@@ -827,7 +815,6 @@ public int compare(MemorySlice a, MemorySlice b) {
827815
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "reverse-db"))
828816
.memTableFlushThreshold(1024 * 1024)
829817
.blockSize(256)
830-
.cacheSize(4 * 1024 * 1024)
831818
.level0FileNumCompactTrigger(3)
832819
.compressOptions(new CompressOptions("none", 1))
833820
.keyComparator(reverseComparator)
@@ -869,7 +856,6 @@ public int compare(MemorySlice a, MemorySlice b) {
869856
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "reverse-compact-db"))
870857
.memTableFlushThreshold(1024 * 1024)
871858
.blockSize(256)
872-
.cacheSize(4 * 1024 * 1024)
873859
.level0FileNumCompactTrigger(3)
874860
.compressOptions(new CompressOptions("none", 1))
875861
.keyComparator(reverseComparator)
@@ -915,7 +901,6 @@ public int compare(MemorySlice a, MemorySlice b) {
915901
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "reverse-delete-db"))
916902
.memTableFlushThreshold(1024 * 1024)
917903
.blockSize(256)
918-
.cacheSize(4 * 1024 * 1024)
919904
.level0FileNumCompactTrigger(4)
920905
.compressOptions(new CompressOptions("none", 1))
921906
.keyComparator(reverseComparator)
@@ -955,7 +940,6 @@ public void testNonOverlappingFilesSkipMerge() throws IOException {
955940
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "non-overlap-skip-db"))
956941
.memTableFlushThreshold(1024 * 1024)
957942
.blockSize(256)
958-
.cacheSize(4 * 1024 * 1024)
959943
.level0FileNumCompactTrigger(3)
960944
.compressOptions(new CompressOptions("none", 1))
961945
.build();
@@ -985,7 +969,6 @@ public void testOverlappingFilesAreMergedInGroups() throws IOException {
985969
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "overlap-group-db"))
986970
.memTableFlushThreshold(1024 * 1024)
987971
.blockSize(256)
988-
.cacheSize(4 * 1024 * 1024)
989972
.level0FileNumCompactTrigger(3)
990973
.compressOptions(new CompressOptions("none", 1))
991974
.build();
@@ -1019,7 +1002,6 @@ public void testMixedOverlapAndNonOverlapGroups() throws IOException {
10191002
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "mixed-group-db"))
10201003
.memTableFlushThreshold(1024 * 1024)
10211004
.blockSize(256)
1022-
.cacheSize(4 * 1024 * 1024)
10231005
.level0FileNumCompactTrigger(4)
10241006
.compressOptions(new CompressOptions("none", 1))
10251007
.build();
@@ -1068,7 +1050,6 @@ public void testTombstoneFileNotSkippedDuringFullCompact() throws IOException {
10681050
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "tombstone-no-skip-db"))
10691051
.memTableFlushThreshold(1024 * 1024)
10701052
.blockSize(256)
1071-
.cacheSize(4 * 1024 * 1024)
10721053
.level0FileNumCompactTrigger(3)
10731054
.compressOptions(new CompressOptions("none", 1))
10741055
.build();
@@ -1099,7 +1080,6 @@ public void testGroupMergeWithMultipleCompactionRounds() throws IOException {
10991080
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "multi-round-group-db"))
11001081
.memTableFlushThreshold(1024 * 1024)
11011082
.blockSize(256)
1102-
.cacheSize(4 * 1024 * 1024)
11031083
.level0FileNumCompactTrigger(3)
11041084
.compressOptions(new CompressOptions("none", 1))
11051085
.build();
@@ -1143,7 +1123,6 @@ public void testCompactionMergesAllL0RunsAndIncludesL1() throws IOException {
11431123
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "l0-clear-db"))
11441124
.memTableFlushThreshold(1024 * 1024)
11451125
.blockSize(256)
1146-
.cacheSize(4 * 1024 * 1024)
11471126
.level0FileNumCompactTrigger(3)
11481127
.sizeRatio(1)
11491128
.compressOptions(new CompressOptions("none", 1))
@@ -1197,7 +1176,6 @@ public void testCompactionWithManyRoundsNoOverflow() throws IOException {
11971176
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "many-l0-db"))
11981177
.memTableFlushThreshold(1024 * 1024)
11991178
.blockSize(256)
1200-
.cacheSize(4 * 1024 * 1024)
12011179
.level0FileNumCompactTrigger(3)
12021180
.sizeRatio(1)
12031181
.compressOptions(new CompressOptions("none", 1))
@@ -1249,7 +1227,6 @@ public void testCompactionWithOverlappingKeysAcrossL0AndL1() throws IOException
12491227
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "overlap-l0-l1-db"))
12501228
.memTableFlushThreshold(1024 * 1024)
12511229
.blockSize(256)
1252-
.cacheSize(4 * 1024 * 1024)
12531230
.level0FileNumCompactTrigger(3)
12541231
.sizeRatio(1)
12551232
.compressOptions(new CompressOptions("none", 1))
@@ -1308,7 +1285,6 @@ public void testGroupMergePreservesDeleteSemantics() throws IOException {
13081285
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "group-delete-db"))
13091286
.memTableFlushThreshold(1024 * 1024)
13101287
.blockSize(256)
1311-
.cacheSize(4 * 1024 * 1024)
13121288
.level0FileNumCompactTrigger(3)
13131289
.compressOptions(new CompressOptions("none", 1))
13141290
.build();

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
290290
bucketMode(),
291291
options.deletionVectorsEnabled(),
292292
options.dataEvolutionEnabled(),
293+
options.pkClusteringOverride(),
293294
newIndexFileHandler(),
294295
snapshotManager,
295296
scanner);

paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
import java.util.stream.IntStream;
7373

7474
import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.BLOCK_CACHE_SIZE;
75-
import static org.apache.paimon.utils.ListUtils.pickRandomly;
7675
import static org.apache.paimon.utils.Preconditions.checkArgument;
7776

7877
/** Assign UPDATE_BEFORE and bucket for the input record, output record with bucket. */
@@ -133,7 +132,7 @@ public void open(
133132
this.extractor = new RowPartitionAllPrimaryKeyExtractor(table.schema());
134133
this.keyPartExtractor = new KeyPartPartitionKeyExtractor(table.schema());
135134

136-
String tmpDir = pickRandomly(Arrays.asList(ioManager.tempDirs()));
135+
String tmpDir = ioManager.pickRandomTempDir();
137136
this.path = new File(tmpDir, "rocksdb-" + UUID.randomUUID());
138137
if (!this.path.mkdirs()) {
139138
throw new RuntimeException(

paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public interface IOManager extends AutoCloseable {
3838

3939
String[] tempDirs();
4040

41+
String pickRandomTempDir();
42+
4143
Enumerator createChannelEnumerator();
4244

4345
BufferFileWriter createBufferFileWriter(ID channelID) throws IOException;

0 commit comments

Comments
 (0)