Skip to content

Commit f40f888

Browse files
committed
fix comment
1 parent 88c0286 commit f40f888

File tree

2 files changed

+49
-11
lines changed

2 files changed

+49
-11
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Set;
5050

5151
import static org.apache.paimon.table.BucketMode.POSTPONE_BUCKET;
52+
import static org.apache.paimon.utils.Preconditions.checkArgument;
5253

5354
/** Lookup table for primary key which supports to read the LSM tree directly. */
5455
public class PrimaryKeyPartialLookupTable implements LookupTable {
@@ -209,7 +210,8 @@ static class LocalQueryExecutor implements QueryExecutor {
209210
private final StreamTableScan scan;
210211
private final String tableName;
211212

212-
private final Map<BinaryRow, Integer> totalBuckets;
213+
private final Integer defaultNumBuckets;
214+
private final Map<BinaryRow, Integer> numBuckets;
213215

214216
private LocalQueryExecutor(
215217
FileStoreTable table,
@@ -238,13 +240,14 @@ private LocalQueryExecutor(
238240
.newStreamScan();
239241

240242
this.tableName = table.name();
241-
this.totalBuckets = new HashMap<>();
243+
this.defaultNumBuckets = table.bucketSpec().getNumBuckets();
244+
this.numBuckets = new HashMap<>();
242245
}
243246

244247
@Override
245248
@Nullable
246249
public Integer numBuckets(BinaryRow partition) {
247-
return totalBuckets.get(partition);
250+
return numBuckets.get(partition);
248251
}
249252

250253
@Override
@@ -264,18 +267,30 @@ public void refresh() {
264267
}
265268

266269
for (Split split : splits) {
267-
DataSplit dataSplit = (DataSplit) split;
268-
BinaryRow partition = dataSplit.partition();
269-
int bucket = dataSplit.bucket();
270-
List<DataFileMeta> before = dataSplit.beforeFiles();
271-
List<DataFileMeta> after = dataSplit.dataFiles();
272-
273-
tableQuery.refreshFiles(partition, bucket, before, after);
274-
totalBuckets.put(partition, dataSplit.totalBuckets());
270+
refreshSplit((DataSplit) split);
275271
}
276272
}
277273
}
278274

275+
@VisibleForTesting
276+
void refreshSplit(DataSplit split) {
277+
BinaryRow partition = split.partition();
278+
int bucket = split.bucket();
279+
List<DataFileMeta> before = split.beforeFiles();
280+
List<DataFileMeta> after = split.dataFiles();
281+
282+
tableQuery.refreshFiles(partition, bucket, before, after);
283+
Integer totalBuckets = split.totalBuckets();
284+
if (totalBuckets == null) {
285+
// Just for compatibility with older versions
286+
checkArgument(
287+
defaultNumBuckets > 0,
288+
"This is a bug, old version table numBuckets should be greater than 0.");
289+
totalBuckets = defaultNumBuckets;
290+
}
291+
numBuckets.put(partition, totalBuckets);
292+
}
293+
279294
@Override
280295
public void close() throws IOException {
281296
tableQuery.close();

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.paimon.table.sink.CommitMessage;
3838
import org.apache.paimon.table.sink.StreamTableWrite;
3939
import org.apache.paimon.table.sink.TableCommitImpl;
40+
import org.apache.paimon.table.source.DataSplit;
4041
import org.apache.paimon.types.DataType;
4142
import org.apache.paimon.types.DataTypes;
4243
import org.apache.paimon.types.RowType;
@@ -49,6 +50,7 @@
4950
import org.junit.jupiter.params.ParameterizedTest;
5051
import org.junit.jupiter.params.provider.ValueSource;
5152

53+
import java.lang.reflect.Field;
5254
import java.net.InetSocketAddress;
5355
import java.nio.file.Path;
5456
import java.time.Duration;
@@ -62,6 +64,7 @@
6264
import java.util.Random;
6365
import java.util.UUID;
6466

67+
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
6568
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
6669
import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;
6770
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
@@ -148,6 +151,26 @@ public void close() throws Exception {
148151
}
149152
}
150153

154+
@Test
155+
public void testCompatibilityForOldVersion() throws Exception {
156+
createLookupFunction(false, true, false, false);
157+
commit(writeCommit(1));
158+
PrimaryKeyPartialLookupTable lookupTable =
159+
(PrimaryKeyPartialLookupTable) lookupFunction.lookupTable();
160+
LocalQueryExecutor queryExecutor = (LocalQueryExecutor) lookupTable.queryExecutor();
161+
162+
// set totalBuckets to null, for testing old version
163+
DataSplit split = (DataSplit) table.newReadBuilder().newScan().plan().splits().get(0);
164+
Field field = DataSplit.class.getDeclaredField("totalBuckets");
165+
field.setAccessible(true);
166+
field.set(split, null);
167+
assertThat(split.totalBuckets()).isNull();
168+
169+
// assert num buckets should be 2
170+
queryExecutor.refreshSplit(split);
171+
assertThat(queryExecutor.numBuckets(EMPTY_ROW)).isEqualTo(2);
172+
}
173+
151174
@ParameterizedTest
152175
@ValueSource(booleans = {false, true})
153176
public void testDefaultLocalPartial(boolean refreshAsync) throws Exception {

0 commit comments

Comments
 (0)