Skip to content

Commit 082dd16

Browse files
committed
[core] Introduce 'pk-clustering-override' to clustering by non-primary key fields
1 parent d032c4e commit 082dd16

File tree

22 files changed

+2240
-39
lines changed

22 files changed

+2240
-39
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
---
2+
title: "PK Clustering Override"
3+
weight: 10
4+
type: docs
5+
aliases:
6+
- /primary-key-table/pk-clustering-override.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# PK Clustering Override
28+
29+
By default, data files in a primary key table are physically sorted by the primary key. This is optimal for point
30+
lookups but can hurt scan performance when queries filter on non-primary-key columns.
31+
32+
**PK Clustering Override** mode changes the physical sort order of data files from the primary key to user-specified
33+
clustering columns. This significantly improves scan performance for queries that filter or group by clustering columns,
34+
while still maintaining primary key uniqueness through deletion vectors.
35+
36+
## Quick Start
37+
38+
```sql
39+
CREATE TABLE my_table (
40+
id BIGINT,
41+
dt STRING,
42+
city STRING,
43+
amount DOUBLE,
44+
PRIMARY KEY (id) NOT ENFORCED
45+
) WITH (
46+
'pk-clustering-override' = 'true',
47+
'clustering.columns' = 'city',
48+
'deletion-vectors.enabled' = 'true',
49+
'bucket' = '4'
50+
);
51+
```
52+
53+
After this, data files within each bucket will be physically sorted by `city` instead of `id`. Queries like
54+
`SELECT * FROM my_table WHERE city = 'Beijing'` can skip irrelevant data files by checking their min/max statistics
55+
on the clustering column.
56+
57+
## How It Works
58+
59+
PK Clustering Override replaces the default LSM compaction with a two-phase clustering compaction:
60+
61+
**Phase 1 — Sort by Clustering Columns**: Newly flushed (level 0) files are read, sorted by the configured clustering
62+
columns, and rewritten as sorted (level 1) files. A key index tracks each primary key's file and row position to
63+
maintain uniqueness.
64+
65+
**Phase 2 — Merge Overlapping Sections**: Sorted files are grouped into sections based on clustering column range
66+
overlap. Overlapping sections are merged together. Adjacent small sections are also consolidated to reduce file count
67+
and IO amplification. Non-overlapping large files are left untouched.
68+
69+
During both phases, deduplication is handled via deletion vectors:
70+
71+
- **Deduplicate mode**: When a key already exists in an older file, the old row is marked as deleted.
72+
- **First-row mode**: When a key already exists, the new row is marked as deleted, keeping the first-seen value.
73+
74+
When the number of files to merge exceeds `sort-spill-threshold`, smaller files are first spilled to row-based
75+
temporary files to reduce memory consumption, preventing OOM during multi-way merge.
76+
77+
## Requirements
78+
79+
| Option | Requirement |
80+
|--------|-------------|
81+
| `pk-clustering-override` | `true` |
82+
| `clustering.columns` | Must be set (one or more non-primary-key columns) |
83+
| `deletion-vectors.enabled` | Must be `true` |
84+
| `merge-engine` | `deduplicate` (default) or `first-row` only |
85+
| `sequence.fields` | Must **not** be set |
86+
| `record-level.expire-time` | Must **not** be set |
87+
88+
## Related Options
89+
90+
| Option | Default | Description |
91+
|--------|---------|-------------|
92+
| `clustering.columns` | (none) | Comma-separated column names used as the physical sort order for data files. |
93+
| `sort-spill-threshold` | (auto) | When the number of merge readers exceeds this value, smaller files are spilled to row-based temp files to reduce memory usage. |
94+
| `sort-spill-buffer-size` | `64 mb` | Buffer size used for external sort during Phase 1 rewrite. |
95+
96+
## When to Use
97+
98+
PK Clustering Override is beneficial when:
99+
100+
- Analytical queries frequently filter or aggregate on non-primary-key columns (e.g., `WHERE city = 'Beijing'`).
101+
- The table uses `deduplicate` or `first-row` merge engine.
102+
- You want data files physically co-located by a business dimension rather than the primary key.
103+
104+
It is **not** suitable when:
105+
106+
- Point lookups by primary key are the dominant access pattern (default LSM sort is already optimal).
107+
- You need `partial-update` or `aggregation` merge engine.
108+
- `sequence.fields` or `record-level.expire-time` is required.

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,12 @@
10611061
<td>String</td>
10621062
<td>You can specify a pattern to get a timestamp from partitions. The formatter pattern is defined by 'partition.timestamp-formatter'.<ul><li>By default, read from the first field.</li><li>If the timestamp in the partition is a single field called 'dt', you can use '$dt'.</li><li>If it is spread across multiple fields for year, month, day, and hour, you can use '$year-$month-$day $hour:00:00'.</li><li>If the timestamp is in fields dt and hour, you can use '$dt $hour:00:00'.</li></ul></td>
10631063
</tr>
1064+
<tr>
1065+
<td><h5>pk-clustering-override</h5></td>
1066+
<td style="word-wrap: break-word;">false</td>
1067+
<td>Boolean</td>
1068+
<td>Enables clustering by non-primary key fields. When set to true, the physical sort order of data files is determined by the configured 'clustering.columns' instead of the primary key, optimizing query performance for non-PK columns.</td>
1069+
</tr>
10641070
<tr>
10651071
<td><h5>postpone.batch-write-fixed-bucket</h5></td>
10661072
<td style="word-wrap: break-word;">true</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2351,6 +2351,15 @@ public InlineElement getDescription() {
23512351
+ " Default is 10 * TARGET_FILE_SIZE.")
23522352
.build());
23532353

2354+
public static final ConfigOption<Boolean> PK_CLUSTERING_OVERRIDE =
2355+
key("pk-clustering-override")
2356+
.booleanType()
2357+
.defaultValue(false)
2358+
.withDescription(
2359+
"Enables clustering by non-primary key fields. When set to true, the physical"
2360+
+ " sort order of data files is determined by the configured 'clustering.columns'"
2361+
+ " instead of the primary key, optimizing query performance for non-PK columns.");
2362+
23542363
private final Options options;
23552364

23562365
public CoreOptions(Map<String, String> options) {
@@ -2408,6 +2417,10 @@ public TableType type() {
24082417
return options.get(TYPE);
24092418
}
24102419

2420+
public boolean pkClusteringOverride() {
2421+
return options.get(PK_CLUSTERING_OVERRIDE);
2422+
}
2423+
24112424
public String formatType() {
24122425
return normalizeFileFormat(options.get(FILE_FORMAT));
24132426
}

paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -474,9 +474,9 @@ public static class Builder {
474474
private long memTableFlushThreshold = 64 * 1024 * 1024; // 64 MB
475475
private long maxSstFileSize = 8 * 1024 * 1024; // 8 MB
476476
private int blockSize = 32 * 1024; // 32 KB
477-
private long cacheSize = 128 * 1024 * 1024; // 128 MB
478477
private int level0FileNumCompactTrigger = 4;
479478
private int sizeRatio = 10;
479+
private CacheManager cacheManager;
480480
private CompressOptions compressOptions = CompressOptions.defaultOptions();
481481
private Comparator<MemorySlice> keyComparator = MemorySlice::compareTo;
482482

@@ -502,9 +502,9 @@ public Builder blockSize(int blockSize) {
502502
return this;
503503
}
504504

505-
/** Set the block cache size in bytes. Default is 128 MB. */
506-
public Builder cacheSize(long cacheSize) {
507-
this.cacheSize = cacheSize;
505+
/** Set the cache manager. */
506+
public Builder cacheManager(CacheManager cacheManager) {
507+
this.cacheManager = cacheManager;
508508
return this;
509509
}
510510

@@ -551,7 +551,9 @@ public SimpleLsmKvDb build() {
551551
}
552552
}
553553

554-
CacheManager cacheManager = new CacheManager(MemorySize.ofBytes(cacheSize));
554+
if (cacheManager == null) {
555+
cacheManager = new CacheManager(MemorySize.ofMebiBytes(8));
556+
}
555557
SortLookupStoreFactory factory =
556558
new SortLookupStoreFactory(
557559
keyComparator, cacheManager, blockSize, compressOptions);

paimon-common/src/main/java/org/apache/paimon/utils/VarLengthIntUtils.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020

2121
import java.io.DataInput;
2222
import java.io.DataOutput;
23+
import java.io.EOFException;
2324
import java.io.IOException;
25+
import java.io.InputStream;
26+
import java.io.OutputStream;
2427

2528
/* This file is based on source code of LongPacker from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache
2629
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -126,6 +129,24 @@ public static int encodeInt(DataOutput os, int value) throws IOException {
126129
return i;
127130
}
128131

132+
/** @return bytes length. */
133+
public static int encodeInt(OutputStream os, int value) throws IOException {
134+
135+
if (value < 0) {
136+
throw new IllegalArgumentException("negative value: v=" + value);
137+
}
138+
139+
int i = 1;
140+
while ((value & ~0x7F) != 0) {
141+
os.write(((value & 0x7F) | 0x80));
142+
value >>>= 7;
143+
i++;
144+
}
145+
146+
os.write((byte) value);
147+
return i;
148+
}
149+
129150
public static int decodeInt(DataInput is) throws IOException {
130151
for (int offset = 0, result = 0; offset < 32; offset += 7) {
131152
int b = is.readUnsignedByte();
@@ -136,4 +157,19 @@ public static int decodeInt(DataInput is) throws IOException {
136157
}
137158
throw new Error("Malformed integer.");
138159
}
160+
161+
public static int decodeInt(InputStream is) throws IOException {
162+
for (int offset = 0, result = 0; offset < 32; offset += 7) {
163+
int b = is.read();
164+
if (b == -1) {
165+
throw new EOFException("Reached end of stream while reading var-length int.");
166+
}
167+
b &= 0xFF;
168+
result |= (b & 0x7F) << offset;
169+
if ((b & 0x80) == 0) {
170+
return result;
171+
}
172+
}
173+
throw new Error("Malformed integer.");
174+
}
139175
}

0 commit comments

Comments
 (0)