Skip to content

Commit a2c692f

Browse files
authored
CNDB-15608 add TokenOnlyPrimaryKey class and isTokenOnly (#2157)
The patch in CNDB-15608 will remove token from the partition key serialization, thus the case, when only token is provided in primary key, needs to be treated separately. Currently only token primary key case is blurred with complete or deferred primary keys. This commit implements clear separation for primary keys with token only by introducing own `TokenOnlyPrimaryKey` class. It also implement `isOnlyToken()` method, which is used instead of comparing partition key with null in appropriate cases (e.g., deferred partition key was loaded). New tests are added to exercise `TokenOnlyPrimaryKey` more and improve code coverage.
1 parent c00cfa8 commit a2c692f

File tree

7 files changed

+276
-40
lines changed

7 files changed

+276
-40
lines changed

src/java/org/apache/cassandra/index/sai/disk/v1/PartitionAwarePrimaryKeyFactory.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.Objects;
2222
import java.util.function.Supplier;
2323

24+
import javax.annotation.concurrent.NotThreadSafe;
25+
2426
import io.github.jbellis.jvector.util.RamUsageEstimator;
2527
import org.apache.cassandra.db.Clustering;
2628
import org.apache.cassandra.db.DecoratedKey;
@@ -35,13 +37,6 @@
3537
*/
3638
public class PartitionAwarePrimaryKeyFactory implements PrimaryKey.Factory
3739
{
38-
@Override
39-
public PrimaryKey createTokenOnly(Token token)
40-
{
41-
assert token != null;
42-
return new PartitionAwarePrimaryKey(token, null, null);
43-
}
44-
4540
@Override
4641
public PrimaryKey createDeferred(Token token, Supplier<PrimaryKey> primaryKeySupplier)
4742
{
@@ -56,6 +51,7 @@ public PrimaryKey create(DecoratedKey partitionKey, Clustering clustering)
5651
return new PartitionAwarePrimaryKey(partitionKey.getToken(), partitionKey, null);
5752
}
5853

54+
@NotThreadSafe
5955
private class PartitionAwarePrimaryKey implements PrimaryKey
6056
{
6157
private final Token token;
@@ -72,10 +68,12 @@ private PartitionAwarePrimaryKey(Token token, DecoratedKey partitionKey, Supplie
7268
@Override
7369
public PrimaryKey loadDeferred()
7470
{
75-
if (primaryKeySupplier != null && partitionKey == null)
71+
if (primaryKeySupplier != null)
7672
{
73+
assert partitionKey == null : "While applying existing primaryKeySupplier to load deferred primaryKey the partition key was unexpectedly already set";
7774
this.partitionKey = primaryKeySupplier.get().partitionKey();
7875
primaryKeySupplier = null;
76+
assert this.token.equals(this.partitionKey.getToken()) : "Deferred primary key must contain the same token";
7977
}
8078
return this;
8179
}
@@ -156,12 +154,26 @@ public long ramBytesUsed()
156154
return shallowSize + token.getHeapSize() + preHashedDecoratedKeySize;
157155
}
158156

157+
/**
158+
* Compares this primary key with another for ordering purposes.
159+
* <p>
160+
* This implementation uses a two-tier comparison strategy:
161+
* <ul>
162+
* <li>If the given primary key is token only, compares by token only</li>
163+
* <li>If both partition keys are available, performs full partition key comparison</li>
164+
* </ul>
165+
* Note: This comparison is partition-aware only and does not consider clustering keys.
166+
*
167+
* @param o the primary key to compare with
168+
* @return a negative integer, zero, or a positive integer as this primary key is less than,
169+
* equal to, or greater than the specified primary key
170+
*/
159171
@Override
160172
public int compareTo(PrimaryKey o)
161173
{
162-
if (partitionKey == null || o.partitionKey() == null)
174+
if (o.isTokenOnly())
163175
return token().compareTo(o.token());
164-
return partitionKey.compareTo(o.partitionKey());
176+
return partitionKey().compareTo(o.partitionKey());
165177
}
166178

167179
@Override

src/java/org/apache/cassandra/index/sai/disk/v2/RowAwarePrimaryKeyFactory.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,6 @@ public RowAwarePrimaryKeyFactory(ClusteringComparator clusteringComparator)
5050
this.hasEmptyClustering = clusteringComparator.size() == 0;
5151
}
5252

53-
@Override
54-
public PrimaryKey createTokenOnly(Token token)
55-
{
56-
return new RowAwarePrimaryKey(token, null, null, null);
57-
}
58-
5953
@Override
6054
public PrimaryKey createDeferred(Token token, Supplier<PrimaryKey> primaryKeySupplier)
6155
{
@@ -117,12 +111,14 @@ public Clustering clustering()
117111
@Override
118112
public PrimaryKey loadDeferred()
119113
{
120-
if (primaryKeySupplier != null && partitionKey == null)
114+
if (primaryKeySupplier != null)
121115
{
116+
assert partitionKey == null : "While applying existing primaryKeySupplier to load deferred primaryKey the partition key was unexpectedly already set";
122117
PrimaryKey deferredPrimaryKey = primaryKeySupplier.get();
123118
this.partitionKey = deferredPrimaryKey.partitionKey();
124119
this.clustering = deferredPrimaryKey.clustering();
125120
primaryKeySupplier = null;
121+
assert this.token.equals(this.partitionKey.getToken()) : "Deferred primary key must contain the same token";
126122
}
127123
return this;
128124
}
@@ -154,8 +150,7 @@ private ByteSource asComparableBytes(int terminator, ByteComparable.Version vers
154150
loadDeferred();
155151

156152
ByteSource tokenComparable = token.asComparableBytes(version);
157-
ByteSource keyComparable = partitionKey == null ? null
158-
: ByteSource.of(partitionKey.getKey(), version);
153+
ByteSource keyComparable = ByteSource.of(partitionKey.getKey(), version);
159154

160155
// It is important that the ClusteringComparator.asBytesComparable method is used
161156
// to maintain the correct clustering sort order
@@ -166,14 +161,10 @@ private ByteSource asComparableBytes(int terminator, ByteComparable.Version vers
166161
.asComparableBytes(version);
167162

168163
// prefix doesn't include null components
169-
if (isPrefix)
170-
{
171-
if (keyComparable == null)
172-
return ByteSource.withTerminator(terminator, tokenComparable);
173-
else if (clusteringComparable == null)
174-
return ByteSource.withTerminator(terminator, tokenComparable, keyComparable);
175-
}
176-
return ByteSource.withTerminator(terminator, tokenComparable, keyComparable, clusteringComparable);
164+
if (isPrefix && clusteringComparable == null)
165+
return ByteSource.withTerminator(terminator, tokenComparable, keyComparable);
166+
else
167+
return ByteSource.withTerminator(terminator, tokenComparable, keyComparable, clusteringComparable);
177168
}
178169

179170
@Override
@@ -182,10 +173,9 @@ public int compareTo(PrimaryKey o)
182173
int cmp = token().compareTo(o.token());
183174

184175
// If the tokens don't match then we don't need to compare any more of the key.
185-
// Otherwise if this key has no deferred loader and it's partition key is null
186-
// or the other partition key is null then one or both of the keys
187-
// are token only so we can only compare tokens
188-
if ((cmp != 0) || (primaryKeySupplier == null && partitionKey == null) || o.partitionKey() == null)
176+
// Otherwise if either this key or given key are token only,
177+
// then we can only compare tokens
178+
if ((cmp != 0) || isTokenOnly() || o.isTokenOnly())
189179
return cmp;
190180

191181
// Next compare the partition keys. If they are not equal or
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.cassandra.index.sai.disk.v2;
17+
18+
import io.github.jbellis.jvector.util.RamUsageEstimator;
19+
import org.apache.cassandra.db.Clustering;
20+
import org.apache.cassandra.db.DecoratedKey;
21+
import org.apache.cassandra.dht.Token;
22+
import org.apache.cassandra.index.sai.utils.PrimaryKey;
23+
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
24+
import org.apache.cassandra.utils.bytecomparable.ByteComparable.Version;
25+
import org.apache.cassandra.utils.bytecomparable.ByteSource;
26+
27+
public final class TokenOnlyPrimaryKey implements PrimaryKey
28+
{
29+
private final Token token;
30+
31+
public TokenOnlyPrimaryKey(Token token)
32+
{
33+
this.token = token;
34+
}
35+
36+
@Override
37+
public boolean isTokenOnly()
38+
{
39+
return true;
40+
}
41+
42+
@Override
43+
public Token token()
44+
{
45+
return token;
46+
}
47+
48+
@Override
49+
public DecoratedKey partitionKey()
50+
{
51+
return null;
52+
}
53+
54+
@Override
55+
public Clustering<?> clustering()
56+
{
57+
return null;
58+
}
59+
60+
@Override
61+
public ByteSource asComparableBytes(Version version)
62+
{
63+
return asComparableBytes(version == ByteComparable.Version.LEGACY ? ByteSource.END_OF_STREAM : ByteSource.TERMINATOR, version, false);
64+
}
65+
66+
@Override
67+
public ByteSource asComparableBytesMinPrefix(Version version)
68+
{
69+
return asComparableBytes(ByteSource.LT_NEXT_COMPONENT, version, true);
70+
}
71+
72+
@Override
73+
public ByteSource asComparableBytesMaxPrefix(Version version)
74+
{
75+
return asComparableBytes(ByteSource.GT_NEXT_COMPONENT, version, true);
76+
}
77+
78+
private ByteSource asComparableBytes(int terminator, ByteComparable.Version version, boolean isPrefix)
79+
{
80+
ByteSource tokenComparable = token.asComparableBytes(version);
81+
// prefix doesn't include null components
82+
if (isPrefix)
83+
return ByteSource.withTerminator(terminator, tokenComparable);
84+
else
85+
return ByteSource.withTerminator(terminator, tokenComparable, null, null);
86+
}
87+
88+
@Override
89+
public int compareTo(PrimaryKey o)
90+
{
91+
return token().compareTo(o.token());
92+
}
93+
94+
@Override
95+
public long ramBytesUsed()
96+
{
97+
// Object header + 1 reference (token) + implicit outer reference + token size
98+
return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF + token.getHeapSize();
99+
}
100+
101+
@Override
102+
public PrimaryKey forStaticRow()
103+
{
104+
return this;
105+
}
106+
107+
@Override
108+
public PrimaryKey loadDeferred()
109+
{
110+
return this;
111+
}
112+
113+
@Override
114+
public boolean equals(Object o)
115+
{
116+
if (o instanceof PrimaryKey)
117+
return compareTo((PrimaryKey) o) == 0;
118+
return false;
119+
}
120+
121+
@Override
122+
public int hashCode()
123+
{
124+
return token().hashCode();
125+
}
126+
127+
@Override
128+
public String toString()
129+
{
130+
return String.format("TokenOnlyPrimaryKey: { token: %s }", token());
131+
}
132+
}

src/java/org/apache/cassandra/index/sai/iterators/KeyRangeUnionIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ private void maybeSkipCurrentPartition()
125125
private void skipPartition(KeyRangeIterator iterator, DecoratedKey partitionKey)
126126
{
127127
// TODO: Push this logic down to the iterator where it can be more efficient
128-
while (iterator.hasNext() && iterator.peek().partitionKey() != null && iterator.peek().partitionKey().compareTo(partitionKey) <= 0)
128+
while (iterator.hasNext() && !iterator.peek().isTokenOnly() && iterator.peek().partitionKey().compareTo(partitionKey) <= 0)
129129
iterator.next();
130130
}
131131

src/java/org/apache/cassandra/index/sai/memory/MemtableKeyRangeIterator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ private static PrimaryKey maxKey(Memtable memtable, PrimaryKey.Factory factory)
9191
@Override
9292
protected void performSkipTo(PrimaryKey nextKey)
9393
{
94-
PartitionPosition start = nextKey.partitionKey() != null
95-
? nextKey.partitionKey()
96-
: nextKey.token().minKeyBound();
94+
PartitionPosition start = nextKey.isTokenOnly()
95+
? nextKey.token().minKeyBound()
96+
: nextKey.partitionKey();
9797
if (!keyRange.right.isMinimum() && start.compareTo(keyRange.right) > 0)
9898
{
9999
partitionIterator = EmptyIterators.unfilteredPartition(memtable.metadata());

src/java/org/apache/cassandra/index/sai/utils/PrimaryKey.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.cassandra.index.sai.disk.format.IndexFeatureSet;
2828
import org.apache.cassandra.index.sai.disk.v1.PartitionAwarePrimaryKeyFactory;
2929
import org.apache.cassandra.index.sai.disk.v2.RowAwarePrimaryKeyFactory;
30+
import org.apache.cassandra.index.sai.disk.v2.TokenOnlyPrimaryKey;
3031
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
3132
import org.apache.cassandra.utils.bytecomparable.ByteSource;
3233

@@ -55,7 +56,11 @@ interface Factory
5556
* @param token the {@link Token}
5657
* @return a {@link PrimaryKey} represented by a token only
5758
*/
58-
PrimaryKey createTokenOnly(Token token);
59+
default PrimaryKey createTokenOnly(Token token)
60+
{
61+
assert token != null;
62+
return new TokenOnlyPrimaryKey(token);
63+
}
5964

6065
/**
6166
* Creates a {@link PrimaryKey} that is represented by a {@link DecoratedKey}.
@@ -118,6 +123,11 @@ static Factory factory(ClusteringComparator clusteringComparator, IndexFeatureSe
118123
*/
119124
PrimaryKey forStaticRow();
120125

126+
default boolean isTokenOnly()
127+
{
128+
return false;
129+
}
130+
121131
/**
122132
* Returns the {@link Token} associated with this primary key.
123133
*

0 commit comments

Comments
 (0)