Skip to content

Commit b3edf78

Browse files
committed
rm prefetch and two segment optimize
1 parent 6d4a2ec commit b3edf78

File tree

15 files changed

+137
-448
lines changed

15 files changed

+137
-448
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -290,47 +290,6 @@ void testPutAndLookup() throws Exception {
290290
.isEqualTo(row(1, "a", null));
291291
}
292292

293-
@Test
294-
void testPutAutoIncColumnAndLookup() throws Exception {
295-
Schema schema =
296-
Schema.newBuilder()
297-
.column("col1", DataTypes.STRING())
298-
.withComment("col1 is first column")
299-
.column("col2", DataTypes.BIGINT())
300-
.withComment("col2 is second column, auto increment column")
301-
.setAutoIncrement(true)
302-
.primaryKey("col1")
303-
.build();
304-
TableDescriptor tableDescriptor =
305-
TableDescriptor.builder().schema(schema).distributedBy(2, "col1").build();
306-
// create the table
307-
TablePath data1PkTablePath =
308-
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc");
309-
createTable(data1PkTablePath, tableDescriptor, true);
310-
Table autoIncTable = conn.getTable(data1PkTablePath);
311-
UpsertWriter upsertWriter = autoIncTable.newUpsert().createWriter();
312-
String[] keys = new String[] {"a", "b", "c", "d"};
313-
for (String key : keys) {
314-
upsertWriter.upsert(row(key, null));
315-
upsertWriter.flush();
316-
}
317-
318-
Lookuper lookuper = autoIncTable.newLookup().createLookuper();
319-
ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
320-
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("a", null))))
321-
.withSchema(schema.getRowType())
322-
.isEqualTo(row("a", 0L));
323-
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("b", null))))
324-
.withSchema(schema.getRowType())
325-
.isEqualTo(row("b", 100000L));
326-
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("c", null))))
327-
.withSchema(schema.getRowType())
328-
.isEqualTo(row("c", 1L));
329-
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("d", null))))
330-
.withSchema(schema.getRowType())
331-
.isEqualTo(row("d", 100001L));
332-
}
333-
334293
@Test
335294
void testPutAndPrefixLookup() throws Exception {
336295
TablePath tablePath = TablePath.of("test_db_1", "test_put_and_prefix_lookup_table");
@@ -498,6 +457,47 @@ void testInvalidPrefixLookup() throws Exception {
498457
+ "because the lookup columns [b, a] must contain all bucket keys [a, b] in order.");
499458
}
500459

460+
@Test
461+
void testPutAutoIncColumnAndLookup() throws Exception {
462+
Schema schema =
463+
Schema.newBuilder()
464+
.column("col1", DataTypes.STRING())
465+
.withComment("col1 is first column")
466+
.column("col2", DataTypes.BIGINT())
467+
.withComment("col2 is second column, auto increment column")
468+
.enableAutoIncrement("col2")
469+
.primaryKey("col1")
470+
.build();
471+
TableDescriptor tableDescriptor =
472+
TableDescriptor.builder().schema(schema).distributedBy(2, "col1").build();
473+
// create the table
474+
TablePath data1PkTablePath =
475+
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc");
476+
createTable(data1PkTablePath, tableDescriptor, true);
477+
Table autoIncTable = conn.getTable(data1PkTablePath);
478+
UpsertWriter upsertWriter = autoIncTable.newUpsert().partialUpdate("col1").createWriter();
479+
String[] keys = new String[] {"a", "b", "c", "d"};
480+
for (String key : keys) {
481+
upsertWriter.upsert(row(key, null));
482+
upsertWriter.flush();
483+
}
484+
485+
Lookuper lookuper = autoIncTable.newLookup().createLookuper();
486+
ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
487+
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("a", null))))
488+
.withSchema(schema.getRowType())
489+
.isEqualTo(row("a", 0L));
490+
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("b", null))))
491+
.withSchema(schema.getRowType())
492+
.isEqualTo(row("b", 100000L));
493+
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("c", null))))
494+
.withSchema(schema.getRowType())
495+
.isEqualTo(row("c", 1L));
496+
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("d", null))))
497+
.withSchema(schema.getRowType())
498+
.isEqualTo(row("d", 100001L));
499+
}
500+
501501
@Test
502502
void testLookupForNotReadyTable() throws Exception {
503503
TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1");

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1424,19 +1424,12 @@ public class ConfigOptions {
14241424
+ "The `first_row` merge engine will keep the first row of the same primary key. "
14251425
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key.");
14261426

1427-
public static final ConfigOption<Double> TABLE_AUTO_INC_PREFETCH_LOW_WATER_MARK_RATIO =
1428-
key("table.auto-inc.prefetch.low-water-mark-ratio")
1429-
.doubleType()
1430-
.defaultValue(0.5)
1431-
.withDescription(
1432-
"The ratio threshold of remaining local cached auto-increment IDs to trigger async prefetch. When the number of remaining IDs in the current segment is less than (batch-size * this ratio), the system will asynchronously fetch a new batch of IDs from the distributed counter. Default: 0.9 (90% of batch size).");
1433-
14341427
public static final ConfigOption<Long> TABLE_AUTO_INC_BATCH_SIZE =
14351428
key("table.auto-inc.batch-size")
14361429
.longType()
14371430
.defaultValue(100000L)
14381431
.withDescription(
1439-
"The batch size of auto-increment IDs fetched from the distributed counter each time. This value determines the length of the ID segment cached locally, and works with low-water-mark-ratio to control the prefetch timing. Default: 100000.");
1432+
"The batch size of auto-increment IDs fetched from the distributed counter each time. This value determines the length of the ID segment cached locally. Default: 100000.");
14401433

14411434
public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
14421435
// we may need to introduce "del-column" in the future to support delete operation

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,10 @@ public int[] getPrimaryKeyIndexes() {
115115
.orElseGet(() -> new int[0]);
116116
}
117117

118-
/** Returns the auto-increment column index, if any, otherwise returns -1. */
119-
public int getAutoIncColumnIndex() {
120-
return columns.stream()
121-
.mapToInt(columns::indexOf)
122-
.filter(idx -> columns.get(idx).isAutoInc())
123-
.findFirst()
124-
.orElse(-1);
118+
/** Returns the auto-increment column index, if any, returns an empty array. */
119+
public int[] getAutoIncColumnIndexes() {
120+
final List<String> columns = getColumnNames();
121+
return getAutoIncrementColumnNames().stream().mapToInt(columns::indexOf).toArray();
125122
}
126123

127124
/** Returns the primary key column names, if any, otherwise returns an empty array. */
@@ -334,18 +331,6 @@ public Builder withComment(@Nullable String comment) {
334331
return this;
335332
}
336333

337-
/** set auto increment property to the previous column. */
338-
public Builder setAutoIncrement(boolean autoIncrement) {
339-
if (!columns.isEmpty()) {
340-
columns.get(columns.size() - 1).setAutoIncrement(autoIncrement);
341-
} else {
342-
throw new IllegalArgumentException(
343-
"Method 'enableAutoIncrement(...)' must be called after a column definition, "
344-
+ "but there is no preceding column defined.");
345-
}
346-
return this;
347-
}
348-
349334
/**
350335
* Declares a primary key constraint for a set of given columns. Primary key uniquely
351336
* identify a row in a table. Neither of columns in a primary can be nullable. Adding a
@@ -448,7 +433,6 @@ public static final class Column implements Serializable {
448433
private final int columnId;
449434
private final String columnName;
450435
private final DataType dataType;
451-
private boolean isAutoInc;
452436
private final @Nullable String comment;
453437

454438
public Column(String columnName, DataType dataType) {
@@ -474,11 +458,6 @@ public Column(
474458
this.dataType = dataType;
475459
this.comment = comment;
476460
this.columnId = columnId;
477-
this.isAutoInc = isAutoInc;
478-
}
479-
480-
public boolean isAutoInc() {
481-
return isAutoInc;
482461
}
483462

484463
public String getName() {
@@ -501,10 +480,6 @@ public Column withComment(String comment) {
501480
return new Column(columnName, dataType, comment, columnId);
502481
}
503482

504-
public void setAutoIncrement(boolean autoIncrement) {
505-
this.isAutoInc = autoIncrement;
506-
}
507-
508483
@Override
509484
public String toString() {
510485
final StringBuilder sb = new StringBuilder();
@@ -516,9 +491,6 @@ public String toString() {
516491
sb.append(EncodingUtils.escapeSingleQuotes(c));
517492
sb.append("'");
518493
});
519-
if (isAutoInc()) {
520-
sb.append(" AUTO_INCREMENT");
521-
}
522494
return sb.toString();
523495
}
524496

@@ -533,14 +505,12 @@ public boolean equals(Object o) {
533505
Column that = (Column) o;
534506
return Objects.equals(columnName, that.columnName)
535507
&& Objects.equals(dataType, that.dataType)
536-
&& Objects.equals(comment, that.comment)
537-
&& Objects.equals(columnId, that.columnId)
538-
&& Objects.equals(isAutoInc, that.isAutoInc);
508+
&& Objects.equals(comment, that.comment);
539509
}
540510

541511
@Override
542512
public int hashCode() {
543-
return Objects.hash(columnName, dataType, comment, columnId, isAutoInc);
513+
return Objects.hash(columnName, dataType, comment, columnId);
544514
}
545515
}
546516

@@ -663,7 +633,7 @@ private static List<Column> normalizeColumns(
663633
newColumns.add(
664634
new Column(
665635
column.getName(),
666-
column.getDataType().copy(false),
636+
column.getDataType().copy(true),
667637
column.getComment().isPresent() ? column.getComment().get() : null,
668638
column.getColumnId()));
669639
} else {

fluss-common/src/main/java/org/apache/fluss/utils/json/ColumnJsonSerde.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public class ColumnJsonSerde
3737
static final String ID = "id";
3838
static final String DATA_TYPE = "data_type";
3939
static final String COMMENT = "comment";
40-
static final String AUTO_INC = "auto_inc";
4140

4241
@Override
4342
public void serialize(Schema.Column column, JsonGenerator generator) throws IOException {
@@ -51,10 +50,6 @@ public void serialize(Schema.Column column, JsonGenerator generator) throws IOEx
5150
generator.writeStringField(COMMENT, column.getComment().get());
5251
}
5352
generator.writeNumberField(ID, column.getColumnId());
54-
if (column.isAutoInc()) {
55-
generator.writeFieldName(AUTO_INC);
56-
generator.writeBoolean(column.isAutoInc());
57-
}
5853

5954
generator.writeEndObject();
6055
}
@@ -69,7 +64,6 @@ public Schema.Column deserialize(JsonNode node) {
6964
columnName,
7065
dataType,
7166
node.hasNonNull(COMMENT) ? node.get(COMMENT).asText() : null,
72-
node.has(ID) ? node.get(ID).asInt() : UNKNOWN_COLUMN_ID,
73-
node.hasNonNull(AUTO_INC) && node.get(AUTO_INC).asBoolean());
67+
node.has(ID) ? node.get(ID).asInt() : UNKNOWN_COLUMN_ID);
7468
}
7569
}

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.apache.fluss.utils.FileUtils;
4848
import org.apache.fluss.utils.FlussPaths;
4949
import org.apache.fluss.utils.MapUtils;
50-
import org.apache.fluss.utils.concurrent.Scheduler;
5150
import org.apache.fluss.utils.types.Tuple2;
5251

5352
import org.rocksdb.RateLimiter;
@@ -140,16 +139,13 @@ public static RateLimiter getDefaultRateLimiter() {
140139

141140
private volatile boolean isShutdown = false;
142141

143-
private final Scheduler scheduler;
144-
145142
private KvManager(
146143
File dataDir,
147144
Configuration conf,
148145
ZooKeeperClient zkClient,
149146
int recoveryThreadsPerDataDir,
150147
LogManager logManager,
151-
TabletServerMetricGroup tabletServerMetricGroup,
152-
Scheduler scheduler)
148+
TabletServerMetricGroup tabletServerMetricGroup)
153149
throws IOException {
154150
super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir);
155151
this.logManager = logManager;
@@ -162,7 +158,6 @@ private KvManager(
162158
this.sharedRocksDBRateLimiter = createSharedRateLimiter(conf);
163159
this.currentSharedRateLimitBytesPerSec =
164160
conf.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC).getBytes();
165-
this.scheduler = scheduler;
166161
}
167162

168163
private static RateLimiter createSharedRateLimiter(Configuration conf) {
@@ -187,8 +182,7 @@ public static KvManager create(
187182
Configuration conf,
188183
ZooKeeperClient zkClient,
189184
LogManager logManager,
190-
TabletServerMetricGroup tabletServerMetricGroup,
191-
Scheduler scheduler)
185+
TabletServerMetricGroup tabletServerMetricGroup)
192186
throws IOException {
193187
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
194188
File dataDir = new File(dataDirString).getAbsoluteFile();
@@ -198,8 +192,7 @@ public static KvManager create(
198192
zkClient,
199193
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
200194
logManager,
201-
tabletServerMetricGroup,
202-
scheduler);
195+
tabletServerMetricGroup);
203196
}
204197

205198
public void startup() {
@@ -263,8 +256,7 @@ public KvTablet getOrCreateKv(
263256
conf,
264257
tableConfig,
265258
schemaGetter.getLatestSchemaInfo().getSchema(),
266-
zkClient,
267-
scheduler);
259+
zkClient);
268260
KvTablet tablet =
269261
KvTablet.create(
270262
tablePath,
@@ -382,8 +374,7 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
382374
tableInfo.getProperties(),
383375
tableInfo.getTableConfig(),
384376
tableInfo.getSchema(),
385-
zkClient,
386-
scheduler);
377+
zkClient);
387378
KvTablet kvTablet =
388379
KvTablet.create(
389380
physicalTablePath,

fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncColumnProcessor.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import javax.annotation.Nullable;
2929
import javax.annotation.concurrent.NotThreadSafe;
3030

31+
import java.util.HashMap;
32+
import java.util.Map;
33+
3134
/** A updater to auto increment column . */
3235
@NotThreadSafe
3336
public class AutoIncColumnProcessor implements AutoIncProcessor {
@@ -37,17 +40,19 @@ public class AutoIncColumnProcessor implements AutoIncProcessor {
3740
private final RowEncoder rowEncoder;
3841

3942
private final DataType[] fieldDataTypes;
40-
private final int targetColumnIdx;
41-
private final IncIDGenerator idGenerator;
43+
private final Map<Integer, IncIDGenerator> idGeneratorMap;
4244
private final short schemaId;
4345

4446
public AutoIncColumnProcessor(
4547
KvFormat kvFormat,
4648
short schemaId,
4749
Schema schema,
48-
int targetColumnIdx,
49-
IncIDGenerator incIDGenerator) {
50-
this.targetColumnIdx = targetColumnIdx;
50+
int[] targetColumnIdx,
51+
IncIDGenerator[] incIDGenerator) {
52+
this.idGeneratorMap = new HashMap<>();
53+
for (int i = 0; i < targetColumnIdx.length; i++) {
54+
idGeneratorMap.put(targetColumnIdx[i], incIDGenerator[i]);
55+
}
5156
this.fieldDataTypes = schema.getRowType().getChildren().toArray(new DataType[0]);
5257

5358
// getter for the fields in row
@@ -56,7 +61,6 @@ public AutoIncColumnProcessor(
5661
flussFieldGetters[i] = InternalRow.createFieldGetter(fieldDataTypes[i], i);
5762
}
5863
this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes);
59-
this.idGenerator = incIDGenerator;
6064
this.schemaId = schemaId;
6165
}
6266

@@ -65,9 +69,9 @@ public AutoIncColumnProcessor(
6569
public BinaryValue processAutoInc(BinaryValue oldValue) {
6670
rowEncoder.startNewRow();
6771
for (int i = 0; i < fieldDataTypes.length; i++) {
68-
if (i == targetColumnIdx) {
72+
if (idGeneratorMap.containsKey(i)) {
6973
if (oldValue != null && oldValue.row.isNullAt(i)) {
70-
rowEncoder.encodeField(i, idGenerator.nextVal());
74+
rowEncoder.encodeField(i, idGeneratorMap.get(i).nextVal());
7175
}
7276
} else {
7377
// use the old row value

0 commit comments

Comments
 (0)