Skip to content

Commit 19d0475

Browse files
committed
[kv] Add an implementation of AutoIncIDBuffer
1 parent de2d9a0 commit 19d0475

File tree

21 files changed

+984
-29
lines changed

21 files changed

+984
-29
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,39 @@ void testPutAndLookup() throws Exception {
289289
.isEqualTo(row(1, "a", null));
290290
}
291291

292+
@Test
293+
void testPutAutoIncColumnAndLookup() throws Exception {
294+
Schema schema =
295+
Schema.newBuilder()
296+
.column("col1", DataTypes.STRING())
297+
.withComment("col1 is first column")
298+
.column("b", DataTypes.BIGINT())
299+
.withComment("col2 is second column")
300+
.setAutoIncrement(true)
301+
.primaryKey("col1")
302+
.build();
303+
TableDescriptor tableDescriptor =
304+
TableDescriptor.builder().schema(schema).distributedBy(2, "col1").build();
305+
// create the table
306+
TablePath data1PkTablePath =
307+
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc");
308+
createTable(data1PkTablePath, tableDescriptor, true);
309+
Table autoIncTable = conn.getTable(data1PkTablePath);
310+
UpsertWriter upsertWriter = autoIncTable.newUpsert().createWriter();
311+
upsertWriter.upsert(row("a", null));
312+
upsertWriter.upsert(row("b", null));
313+
upsertWriter.upsert(row("c", null));
314+
upsertWriter.upsert(row("d", null));
315+
upsertWriter.upsert(row("ddddd", null));
316+
upsertWriter.flush();
317+
318+
verifyPutAndLookup(autoIncTable, new Object[] {"a", 0L});
319+
verifyPutAndLookup(autoIncTable, new Object[] {"b", 1L});
320+
verifyPutAndLookup(autoIncTable, new Object[] {"c", 2L});
321+
verifyPutAndLookup(autoIncTable, new Object[] {"d", 3L});
322+
verifyPutAndLookup(autoIncTable, new Object[] {"ddddd", 4L});
323+
}
324+
292325
@Test
293326
void testPutAndPrefixLookup() throws Exception {
294327
TablePath tablePath = TablePath.of("test_db_1", "test_put_and_prefix_lookup_table");

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,6 +1422,20 @@ public class ConfigOptions {
14221422
+ "The `first_row` merge engine will keep the first row of the same primary key. "
14231423
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key.");
14241424

1425+
public static final ConfigOption<Double> TABLE_AUTO_INC_PREFETCH_LOW_WATER_MARK_RATIO =
1426+
key("table.auto-inc.prefetch.low-water-mark-ratio")
1427+
.doubleType()
1428+
.defaultValue(0.9)
1429+
.withDescription(
1430+
"The ratio threshold of remaining local cached auto-increment IDs to trigger async prefetch. When the number of remaining IDs in the current segment is less than (batch-size * this ratio), the system will asynchronously fetch a new batch of IDs from the distributed counter. Default: 0.9 (90% of batch size).");
1431+
1432+
public static final ConfigOption<Long> TABLE_AUTO_INC_BATCH_SIZE =
1433+
key("table.auto-inc.batch-size")
1434+
.longType()
1435+
.defaultValue(100000L)
1436+
.withDescription(
1437+
"The batch size of auto-increment IDs fetched from the distributed counter each time. This value determines the length of the ID segment cached locally, and works with low-water-mark-ratio to control the prefetch timing. Default: 100000.");
1438+
14251439
public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
14261440
// we may need to introduce "del-column" in the future to support delete operation
14271441
key("table.merge-engine.versioned.ver-column")

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,15 @@ public int[] getPrimaryKeyIndexes() {
115115
.orElseGet(() -> new int[0]);
116116
}
117117

118+
/** Returns the auto-increment column index, if any, otherwise returns -1. */
119+
public int getAutoIncColumnIndex() {
120+
return columns.stream()
121+
.mapToInt(columns::indexOf)
122+
.filter(idx -> columns.get(idx).isAutoInc())
123+
.findFirst()
124+
.orElse(-1);
125+
}
126+
118127
/** Returns the primary key column names, if any, otherwise returns an empty array. */
119128
public List<String> getPrimaryKeyColumnNames() {
120129
return getPrimaryKey().map(PrimaryKey::getColumnNames).orElse(Collections.emptyList());
@@ -157,6 +166,11 @@ public List<String> getColumnNames(int[] columnIndexes) {
157166
return columnNames;
158167
}
159168

169+
/** Returns the column name in given column index. */
170+
public String getColumnName(int columnIndex) {
171+
return columns.get(columnIndex).columnName;
172+
}
173+
160174
/** Returns the indexes of the fields in the schema. */
161175
public int[] getColumnIndexes(List<String> keyNames) {
162176
int[] keyIndexes = new int[keyNames.size()];
@@ -320,6 +334,18 @@ public Builder withComment(@Nullable String comment) {
320334
return this;
321335
}
322336

337+
/** set auto increment property to the previous column. */
338+
public Builder setAutoIncrement(boolean autoIncrement) {
339+
if (!columns.isEmpty()) {
340+
columns.get(columns.size() - 1).setAutoIncrement(autoIncrement);
341+
} else {
342+
throw new IllegalArgumentException(
343+
"Method 'enableAutoIncrement(...)' must be called after a column definition, "
344+
+ "but there is no preceding column defined.");
345+
}
346+
return this;
347+
}
348+
323349
/**
324350
* Declares a primary key constraint for a set of given columns. Primary key uniquely
325351
* identify a row in a table. Neither of columns in a primary can be nullable. Adding a
@@ -422,22 +448,37 @@ public static final class Column implements Serializable {
422448
private final int columnId;
423449
private final String columnName;
424450
private final DataType dataType;
451+
private boolean isAutoInc;
425452
private final @Nullable String comment;
426453

427454
public Column(String columnName, DataType dataType) {
428-
this(columnName, dataType, null, UNKNOWN_COLUMN_ID);
455+
this(columnName, dataType, null, UNKNOWN_COLUMN_ID, false);
429456
}
430457

431458
public Column(String columnName, DataType dataType, @Nullable String comment) {
432-
this(columnName, dataType, comment, UNKNOWN_COLUMN_ID);
459+
this(columnName, dataType, comment, UNKNOWN_COLUMN_ID, false);
433460
}
434461

435462
public Column(
436463
String columnName, DataType dataType, @Nullable String comment, int columnId) {
464+
this(columnName, dataType, comment, columnId, false);
465+
}
466+
467+
public Column(
468+
String columnName,
469+
DataType dataType,
470+
@Nullable String comment,
471+
int columnId,
472+
boolean isAutoInc) {
437473
this.columnName = columnName;
438474
this.dataType = dataType;
439475
this.comment = comment;
440476
this.columnId = columnId;
477+
this.isAutoInc = isAutoInc;
478+
}
479+
480+
public boolean isAutoInc() {
481+
return isAutoInc;
441482
}
442483

443484
public String getName() {
@@ -460,6 +501,10 @@ public Column withComment(String comment) {
460501
return new Column(columnName, dataType, comment, columnId);
461502
}
462503

504+
public void setAutoIncrement(boolean autoIncrement) {
505+
this.isAutoInc = autoIncrement;
506+
}
507+
463508
@Override
464509
public String toString() {
465510
final StringBuilder sb = new StringBuilder();
@@ -471,6 +516,9 @@ public String toString() {
471516
sb.append(EncodingUtils.escapeSingleQuotes(c));
472517
sb.append("'");
473518
});
519+
if (isAutoInc()) {
520+
sb.append(" AUTO_INCREMENT");
521+
}
474522
return sb.toString();
475523
}
476524

@@ -486,12 +534,13 @@ public boolean equals(Object o) {
486534
return Objects.equals(columnName, that.columnName)
487535
&& Objects.equals(dataType, that.dataType)
488536
&& Objects.equals(comment, that.comment)
489-
&& Objects.equals(columnId, that.columnId);
537+
&& Objects.equals(columnId, that.columnId)
538+
&& Objects.equals(isAutoInc, that.isAutoInc);
490539
}
491540

492541
@Override
493542
public int hashCode() {
494-
return Objects.hash(columnName, dataType, comment, columnId);
543+
return Objects.hash(columnName, dataType, comment, columnId, isAutoInc);
495544
}
496545
}
497546

fluss-common/src/main/java/org/apache/fluss/utils/json/ColumnJsonSerde.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class ColumnJsonSerde
3737
static final String ID = "id";
3838
static final String DATA_TYPE = "data_type";
3939
static final String COMMENT = "comment";
40+
static final String AUTO_INC = "auto_inc";
4041

4142
@Override
4243
public void serialize(Schema.Column column, JsonGenerator generator) throws IOException {
@@ -50,6 +51,8 @@ public void serialize(Schema.Column column, JsonGenerator generator) throws IOEx
5051
generator.writeStringField(COMMENT, column.getComment().get());
5152
}
5253
generator.writeNumberField(ID, column.getColumnId());
54+
generator.writeFieldName(AUTO_INC);
55+
generator.writeBoolean(column.isAutoInc());
5356

5457
generator.writeEndObject();
5558
}
@@ -64,6 +67,7 @@ public Schema.Column deserialize(JsonNode node) {
6467
columnName,
6568
dataType,
6669
node.hasNonNull(COMMENT) ? node.get(COMMENT).asText() : null,
67-
node.has(ID) ? node.get(ID).asInt() : UNKNOWN_COLUMN_ID);
70+
node.has(ID) ? node.get(ID).asInt() : UNKNOWN_COLUMN_ID,
71+
node.required(AUTO_INC).asBoolean());
6872
}
6973
}

fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,12 @@ public interface SequenceIDCounter {
2626
* @return The previous sequence ID
2727
*/
2828
long getAndIncrement() throws Exception;
29+
30+
/**
31+
* Atomically adds the given delta to the sequence ID.
32+
*
33+
* @param delta The delta to add
34+
* @return The previous sequence ID
35+
*/
36+
long getAndAdd(Long delta) throws Exception;
2937
}

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.metadata.TableInfo;
3434
import org.apache.fluss.metadata.TablePath;
3535
import org.apache.fluss.server.TabletManagerBase;
36+
import org.apache.fluss.server.kv.autoinc.AutoIncProcessor;
3637
import org.apache.fluss.server.kv.rowmerger.RowMerger;
3738
import org.apache.fluss.server.log.LogManager;
3839
import org.apache.fluss.server.log.LogTablet;
@@ -43,6 +44,7 @@
4344
import org.apache.fluss.utils.FileUtils;
4445
import org.apache.fluss.utils.FlussPaths;
4546
import org.apache.fluss.utils.MapUtils;
47+
import org.apache.fluss.utils.concurrent.Scheduler;
4648
import org.apache.fluss.utils.types.Tuple2;
4749

4850
import org.slf4j.Logger;
@@ -89,13 +91,16 @@ public final class KvManager extends TabletManagerBase {
8991

9092
private final FileSystem remoteFileSystem;
9193

94+
private final Scheduler scheduler;
95+
9296
private KvManager(
9397
File dataDir,
9498
Configuration conf,
9599
ZooKeeperClient zkClient,
96100
int recoveryThreadsPerDataDir,
97101
LogManager logManager,
98-
TabletServerMetricGroup tabletServerMetricGroup)
102+
TabletServerMetricGroup tabletServerMetricGroup,
103+
Scheduler scheduler)
99104
throws IOException {
100105
super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir);
101106
this.logManager = logManager;
@@ -105,13 +110,15 @@ private KvManager(
105110
this.remoteKvDir = FlussPaths.remoteKvDir(conf);
106111
this.remoteFileSystem = remoteKvDir.getFileSystem();
107112
this.serverMetricGroup = tabletServerMetricGroup;
113+
this.scheduler = scheduler;
108114
}
109115

110116
public static KvManager create(
111117
Configuration conf,
112118
ZooKeeperClient zkClient,
113119
LogManager logManager,
114-
TabletServerMetricGroup tabletServerMetricGroup)
120+
TabletServerMetricGroup tabletServerMetricGroup,
121+
Scheduler scheduler)
115122
throws IOException {
116123
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
117124
File dataDir = new File(dataDirString).getAbsoluteFile();
@@ -121,7 +128,8 @@ public static KvManager create(
121128
zkClient,
122129
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
123130
logManager,
124-
tabletServerMetricGroup);
131+
tabletServerMetricGroup,
132+
scheduler);
125133
}
126134

127135
public void startup() {
@@ -174,6 +182,15 @@ public KvTablet getOrCreateKv(
174182

175183
File tabletDir = getOrCreateTabletDir(tablePath, tableBucket);
176184
RowMerger merger = RowMerger.create(tableConfig, kvFormat);
185+
AutoIncProcessor autoIncProcessor =
186+
AutoIncProcessor.create(
187+
tablePath.getTablePath(),
188+
schemaGetter.getLatestSchemaInfo().getSchemaId(),
189+
conf,
190+
tableConfig,
191+
schemaGetter.getLatestSchemaInfo().getSchema(),
192+
zkClient,
193+
scheduler);
177194
KvTablet tablet =
178195
KvTablet.create(
179196
logTablet,
@@ -186,7 +203,8 @@ public KvTablet getOrCreateKv(
186203
merger,
187204
arrowCompressionInfo,
188205
schemaGetter,
189-
tableConfig.getChangelogImage());
206+
tableConfig.getChangelogImage(),
207+
autoIncProcessor);
190208
currentKvs.put(tableBucket, tablet);
191209

192210
LOG.info(
@@ -280,6 +298,15 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
280298

281299
TableConfig tableConfig = tableInfo.getTableConfig();
282300
RowMerger rowMerger = RowMerger.create(tableConfig, tableConfig.getKvFormat());
301+
AutoIncProcessor autoIncProcessor =
302+
AutoIncProcessor.create(
303+
tablePath,
304+
tableInfo.getSchemaId(),
305+
tableInfo.getProperties(),
306+
tableInfo.getTableConfig(),
307+
tableInfo.getSchema(),
308+
zkClient,
309+
scheduler);
283310
KvTablet kvTablet =
284311
KvTablet.create(
285312
physicalTablePath,
@@ -294,7 +321,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
294321
rowMerger,
295322
tableConfig.getArrowCompressionInfo(),
296323
schemaGetter,
297-
tableConfig.getChangelogImage());
324+
tableConfig.getChangelogImage(),
325+
autoIncProcessor);
298326
if (this.currentKvs.containsKey(tableBucket)) {
299327
throw new IllegalStateException(
300328
String.format(

0 commit comments

Comments
 (0)