Skip to content

Commit 697dbcf

Browse files
committed
[kv] Add an implementation of AutoIncIDBuffer
1 parent 9e6f6a4 commit 697dbcf

File tree

21 files changed

+972
-27
lines changed

21 files changed

+972
-27
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.fluss.row.BinaryString;
4949
import org.apache.fluss.row.GenericRow;
5050
import org.apache.fluss.row.InternalRow;
51+
import org.apache.fluss.row.ProjectedRow;
5152
import org.apache.fluss.row.indexed.IndexedRow;
5253
import org.apache.fluss.types.BigIntType;
5354
import org.apache.fluss.types.DataTypes;
@@ -289,6 +290,47 @@ void testPutAndLookup() throws Exception {
289290
.isEqualTo(row(1, "a", null));
290291
}
291292

293+
@Test
294+
void testPutAutoIncColumnAndLookup() throws Exception {
295+
Schema schema =
296+
Schema.newBuilder()
297+
.column("col1", DataTypes.STRING())
298+
.withComment("col1 is first column")
299+
.column("b", DataTypes.BIGINT())
300+
.withComment("col2 is second column, auto increment column")
301+
.setAutoIncrement(true)
302+
.primaryKey("col1")
303+
.build();
304+
TableDescriptor tableDescriptor =
305+
TableDescriptor.builder().schema(schema).distributedBy(2, "col1").build();
306+
// create the table
307+
TablePath data1PkTablePath =
308+
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc");
309+
createTable(data1PkTablePath, tableDescriptor, true);
310+
Table autoIncTable = conn.getTable(data1PkTablePath);
311+
UpsertWriter upsertWriter = autoIncTable.newUpsert().createWriter();
312+
String[] keys = new String[] {"a", "b", "c", "d"};
313+
for (String key : keys) {
314+
upsertWriter.upsert(row(key, null));
315+
upsertWriter.flush();
316+
}
317+
318+
Lookuper lookuper = autoIncTable.newLookup().createLookuper();
319+
ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
320+
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("a", null))))
321+
.withSchema(schema.getRowType())
322+
.isEqualTo(row("a", 0L));
323+
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("b", null))))
324+
.withSchema(schema.getRowType())
325+
.isEqualTo(row("b", 100000L));
326+
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("c", null))))
327+
.withSchema(schema.getRowType())
328+
.isEqualTo(row("c", 1L));
329+
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("d", null))))
330+
.withSchema(schema.getRowType())
331+
.isEqualTo(row("d", 100001L));
332+
}
333+
292334
@Test
293335
void testPutAndPrefixLookup() throws Exception {
294336
TablePath tablePath = TablePath.of("test_db_1", "test_put_and_prefix_lookup_table");

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1424,6 +1424,20 @@ public class ConfigOptions {
14241424
+ "The `first_row` merge engine will keep the first row of the same primary key. "
14251425
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key.");
14261426

1427+
public static final ConfigOption<Double> TABLE_AUTO_INC_PREFETCH_LOW_WATER_MARK_RATIO =
1428+
key("table.auto-inc.prefetch.low-water-mark-ratio")
1429+
.doubleType()
1430+
.defaultValue(0.5)
1431+
.withDescription(
1432+
"The ratio threshold of remaining local cached auto-increment IDs to trigger async prefetch. When the number of remaining IDs in the current segment is less than (batch-size * this ratio), the system will asynchronously fetch a new batch of IDs from the distributed counter. Default: 0.9 (90% of batch size).");
1433+
1434+
public static final ConfigOption<Long> TABLE_AUTO_INC_BATCH_SIZE =
1435+
key("table.auto-inc.batch-size")
1436+
.longType()
1437+
.defaultValue(100000L)
1438+
.withDescription(
1439+
"The batch size of auto-increment IDs fetched from the distributed counter each time. This value determines the length of the ID segment cached locally, and works with low-water-mark-ratio to control the prefetch timing. Default: 100000.");
1440+
14271441
public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
14281442
// we may need to introduce "del-column" in the future to support delete operation
14291443
key("table.merge-engine.versioned.ver-column")

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,15 @@ public int[] getPrimaryKeyIndexes() {
115115
.orElseGet(() -> new int[0]);
116116
}
117117

118+
/** Returns the auto-increment column index, if any, otherwise returns -1. */
119+
public int getAutoIncColumnIndex() {
120+
return columns.stream()
121+
.mapToInt(columns::indexOf)
122+
.filter(idx -> columns.get(idx).isAutoInc())
123+
.findFirst()
124+
.orElse(-1);
125+
}
126+
118127
/** Returns the primary key column names, if any, otherwise returns an empty array. */
119128
public List<String> getPrimaryKeyColumnNames() {
120129
return getPrimaryKey().map(PrimaryKey::getColumnNames).orElse(Collections.emptyList());
@@ -157,6 +166,11 @@ public List<String> getColumnNames(int[] columnIndexes) {
157166
return columnNames;
158167
}
159168

169+
/** Returns the column name in given column index. */
170+
public String getColumnName(int columnIndex) {
171+
return columns.get(columnIndex).columnName;
172+
}
173+
160174
/** Returns the indexes of the fields in the schema. */
161175
public int[] getColumnIndexes(List<String> keyNames) {
162176
int[] keyIndexes = new int[keyNames.size()];
@@ -320,6 +334,18 @@ public Builder withComment(@Nullable String comment) {
320334
return this;
321335
}
322336

337+
/** set auto increment property to the previous column. */
338+
public Builder setAutoIncrement(boolean autoIncrement) {
339+
if (!columns.isEmpty()) {
340+
columns.get(columns.size() - 1).setAutoIncrement(autoIncrement);
341+
} else {
342+
throw new IllegalArgumentException(
343+
"Method 'enableAutoIncrement(...)' must be called after a column definition, "
344+
+ "but there is no preceding column defined.");
345+
}
346+
return this;
347+
}
348+
323349
/**
324350
* Declares a primary key constraint for a set of given columns. Primary key uniquely
325351
* identify a row in a table. Neither of columns in a primary can be nullable. Adding a
@@ -422,22 +448,37 @@ public static final class Column implements Serializable {
422448
private final int columnId;
423449
private final String columnName;
424450
private final DataType dataType;
451+
private boolean isAutoInc;
425452
private final @Nullable String comment;
426453

427454
public Column(String columnName, DataType dataType) {
428-
this(columnName, dataType, null, UNKNOWN_COLUMN_ID);
455+
this(columnName, dataType, null, UNKNOWN_COLUMN_ID, false);
429456
}
430457

431458
public Column(String columnName, DataType dataType, @Nullable String comment) {
432-
this(columnName, dataType, comment, UNKNOWN_COLUMN_ID);
459+
this(columnName, dataType, comment, UNKNOWN_COLUMN_ID, false);
433460
}
434461

435462
public Column(
436463
String columnName, DataType dataType, @Nullable String comment, int columnId) {
464+
this(columnName, dataType, comment, columnId, false);
465+
}
466+
467+
public Column(
468+
String columnName,
469+
DataType dataType,
470+
@Nullable String comment,
471+
int columnId,
472+
boolean isAutoInc) {
437473
this.columnName = columnName;
438474
this.dataType = dataType;
439475
this.comment = comment;
440476
this.columnId = columnId;
477+
this.isAutoInc = isAutoInc;
478+
}
479+
480+
public boolean isAutoInc() {
481+
return isAutoInc;
441482
}
442483

443484
public String getName() {
@@ -460,6 +501,10 @@ public Column withComment(String comment) {
460501
return new Column(columnName, dataType, comment, columnId);
461502
}
462503

504+
public void setAutoIncrement(boolean autoIncrement) {
505+
this.isAutoInc = autoIncrement;
506+
}
507+
463508
@Override
464509
public String toString() {
465510
final StringBuilder sb = new StringBuilder();
@@ -471,6 +516,9 @@ public String toString() {
471516
sb.append(EncodingUtils.escapeSingleQuotes(c));
472517
sb.append("'");
473518
});
519+
if (isAutoInc()) {
520+
sb.append(" AUTO_INCREMENT");
521+
}
474522
return sb.toString();
475523
}
476524

@@ -486,12 +534,13 @@ public boolean equals(Object o) {
486534
return Objects.equals(columnName, that.columnName)
487535
&& Objects.equals(dataType, that.dataType)
488536
&& Objects.equals(comment, that.comment)
489-
&& Objects.equals(columnId, that.columnId);
537+
&& Objects.equals(columnId, that.columnId)
538+
&& Objects.equals(isAutoInc, that.isAutoInc);
490539
}
491540

492541
@Override
493542
public int hashCode() {
494-
return Objects.hash(columnName, dataType, comment, columnId);
543+
return Objects.hash(columnName, dataType, comment, columnId, isAutoInc);
495544
}
496545
}
497546

fluss-common/src/main/java/org/apache/fluss/utils/json/ColumnJsonSerde.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class ColumnJsonSerde
3737
static final String ID = "id";
3838
static final String DATA_TYPE = "data_type";
3939
static final String COMMENT = "comment";
40+
static final String AUTO_INC = "auto_inc";
4041

4142
@Override
4243
public void serialize(Schema.Column column, JsonGenerator generator) throws IOException {
@@ -50,6 +51,8 @@ public void serialize(Schema.Column column, JsonGenerator generator) throws IOEx
5051
generator.writeStringField(COMMENT, column.getComment().get());
5152
}
5253
generator.writeNumberField(ID, column.getColumnId());
54+
generator.writeFieldName(AUTO_INC);
55+
generator.writeBoolean(column.isAutoInc());
5356

5457
generator.writeEndObject();
5558
}
@@ -64,6 +67,7 @@ public Schema.Column deserialize(JsonNode node) {
6467
columnName,
6568
dataType,
6669
node.hasNonNull(COMMENT) ? node.get(COMMENT).asText() : null,
67-
node.has(ID) ? node.get(ID).asInt() : UNKNOWN_COLUMN_ID);
70+
node.has(ID) ? node.get(ID).asInt() : UNKNOWN_COLUMN_ID,
71+
node.required(AUTO_INC).asBoolean());
6872
}
6973
}

fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,12 @@ public interface SequenceIDCounter {
2626
* @return The previous sequence ID
2727
*/
2828
long getAndIncrement() throws Exception;
29+
30+
/**
31+
* Atomically adds the given delta to the sequence ID.
32+
*
33+
* @param delta The delta to add
34+
* @return The previous sequence ID
35+
*/
36+
long getAndAdd(Long delta) throws Exception;
2937
}

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.fluss.metadata.TableInfo;
3737
import org.apache.fluss.metadata.TablePath;
3838
import org.apache.fluss.server.TabletManagerBase;
39+
import org.apache.fluss.server.kv.autoinc.AutoIncProcessor;
3940
import org.apache.fluss.server.kv.rowmerger.RowMerger;
4041
import org.apache.fluss.server.log.LogManager;
4142
import org.apache.fluss.server.log.LogTablet;
@@ -46,6 +47,7 @@
4647
import org.apache.fluss.utils.FileUtils;
4748
import org.apache.fluss.utils.FlussPaths;
4849
import org.apache.fluss.utils.MapUtils;
50+
import org.apache.fluss.utils.concurrent.Scheduler;
4951
import org.apache.fluss.utils.types.Tuple2;
5052

5153
import org.rocksdb.RateLimiter;
@@ -138,13 +140,16 @@ public static RateLimiter getDefaultRateLimiter() {
138140

139141
private volatile boolean isShutdown = false;
140142

143+
private final Scheduler scheduler;
144+
141145
private KvManager(
142146
File dataDir,
143147
Configuration conf,
144148
ZooKeeperClient zkClient,
145149
int recoveryThreadsPerDataDir,
146150
LogManager logManager,
147-
TabletServerMetricGroup tabletServerMetricGroup)
151+
TabletServerMetricGroup tabletServerMetricGroup,
152+
Scheduler scheduler)
148153
throws IOException {
149154
super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir);
150155
this.logManager = logManager;
@@ -157,6 +162,7 @@ private KvManager(
157162
this.sharedRocksDBRateLimiter = createSharedRateLimiter(conf);
158163
this.currentSharedRateLimitBytesPerSec =
159164
conf.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC).getBytes();
165+
this.scheduler = scheduler;
160166
}
161167

162168
private static RateLimiter createSharedRateLimiter(Configuration conf) {
@@ -181,7 +187,8 @@ public static KvManager create(
181187
Configuration conf,
182188
ZooKeeperClient zkClient,
183189
LogManager logManager,
184-
TabletServerMetricGroup tabletServerMetricGroup)
190+
TabletServerMetricGroup tabletServerMetricGroup,
191+
Scheduler scheduler)
185192
throws IOException {
186193
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
187194
File dataDir = new File(dataDirString).getAbsoluteFile();
@@ -191,7 +198,8 @@ public static KvManager create(
191198
zkClient,
192199
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
193200
logManager,
194-
tabletServerMetricGroup);
201+
tabletServerMetricGroup,
202+
scheduler);
195203
}
196204

197205
public void startup() {
@@ -248,6 +256,15 @@ public KvTablet getOrCreateKv(
248256

249257
File tabletDir = getOrCreateTabletDir(tablePath, tableBucket);
250258
RowMerger merger = RowMerger.create(tableConfig, kvFormat);
259+
AutoIncProcessor autoIncProcessor =
260+
AutoIncProcessor.create(
261+
tablePath.getTablePath(),
262+
schemaGetter.getLatestSchemaInfo().getSchemaId(),
263+
conf,
264+
tableConfig,
265+
schemaGetter.getLatestSchemaInfo().getSchema(),
266+
zkClient,
267+
scheduler);
251268
KvTablet tablet =
252269
KvTablet.create(
253270
tablePath,
@@ -263,7 +280,8 @@ public KvTablet getOrCreateKv(
263280
arrowCompressionInfo,
264281
schemaGetter,
265282
tableConfig.getChangelogImage(),
266-
sharedRocksDBRateLimiter);
283+
sharedRocksDBRateLimiter,
284+
autoIncProcessor);
267285
currentKvs.put(tableBucket, tablet);
268286

269287
LOG.info(
@@ -357,6 +375,15 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
357375

358376
TableConfig tableConfig = tableInfo.getTableConfig();
359377
RowMerger rowMerger = RowMerger.create(tableConfig, tableConfig.getKvFormat());
378+
AutoIncProcessor autoIncProcessor =
379+
AutoIncProcessor.create(
380+
tablePath,
381+
tableInfo.getSchemaId(),
382+
tableInfo.getProperties(),
383+
tableInfo.getTableConfig(),
384+
tableInfo.getSchema(),
385+
zkClient,
386+
scheduler);
360387
KvTablet kvTablet =
361388
KvTablet.create(
362389
physicalTablePath,
@@ -372,7 +399,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
372399
tableConfig.getArrowCompressionInfo(),
373400
schemaGetter,
374401
tableConfig.getChangelogImage(),
375-
sharedRocksDBRateLimiter);
402+
sharedRocksDBRateLimiter,
403+
autoIncProcessor);
376404
if (this.currentKvs.containsKey(tableBucket)) {
377405
throw new IllegalStateException(
378406
String.format(

0 commit comments

Comments
 (0)