Skip to content

Commit 45dc38a

Browse files
beryllwwangjunbo
authored andcommitted
[kv] Add an implementation of AutoIncIDBuffer
1 parent aabe78a commit 45dc38a

File tree

22 files changed

+906
-32
lines changed

22 files changed

+906
-32
lines changed

.idea/vcs.xml

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,71 @@ void testPutAndLookup() throws Exception {
289289
.isEqualTo(row(1, "a", null));
290290
}
291291

292+
@Test
293+
void testPutAndLookupWithAutoIncColumn() throws Exception {
294+
// TODO: add test case for auto inc column.
295+
Schema schema =
296+
Schema.newBuilder()
297+
.column("a", DataTypes.STRING())
298+
.withComment("a is first column")
299+
.column("b", DataTypes.BIGINT())
300+
.withComment("b is second column")
301+
.setAutoIncrement(true)
302+
.primaryKey("a")
303+
.build();
304+
TableDescriptor tableDescriptor =
305+
TableDescriptor.builder().schema(schema).distributedBy(3, "a").build();
306+
// create the table
307+
TablePath data1PkTablePath =
308+
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc");
309+
createTable(data1PkTablePath, tableDescriptor, true);
310+
Table table2 = conn.getTable(data1PkTablePath);
311+
UpsertWriter upsertWriter = table2.newUpsert().createWriter();
312+
InternalRow row = row(new Object[] {"a", null});
313+
// put data.
314+
upsertWriter.upsert(row);
315+
row = row(new Object[] {"b", null});
316+
upsertWriter.upsert(row);
317+
row = row(new Object[] {"c", null});
318+
upsertWriter.upsert(row);
319+
upsertWriter.flush();
320+
321+
Lookuper lookuper = table2.newLookup().createLookuper();
322+
CompletableFuture<LookupResult> resultFuture =
323+
lookuper.lookup(GenericRow.of(BinaryString.fromString("a")));
324+
resultFuture
325+
.get()
326+
.getRowList()
327+
.forEach(
328+
lrow -> {
329+
System.out.println(lrow.getString(0));
330+
System.out.println(lrow.getLong(1));
331+
});
332+
333+
resultFuture = lookuper.lookup(GenericRow.of(BinaryString.fromString("b")));
334+
resultFuture
335+
.get()
336+
.getRowList()
337+
.forEach(
338+
lrow -> {
339+
System.out.println(lrow.getString(0));
340+
System.out.println(lrow.getLong(1));
341+
});
342+
343+
// note that the increments might not be contiguous because rows might go to different
344+
// buckets
345+
resultFuture = lookuper.lookup(GenericRow.of(BinaryString.fromString("c")));
346+
resultFuture
347+
.get()
348+
.getRowList()
349+
.forEach(
350+
lrow -> {
351+
System.out.println(lrow.getString(0));
352+
System.out.println(lrow.getLong(1));
353+
});
354+
verifyPutAndLookup(table2, new Object[] {"a", 1L});
355+
}
356+
292357
@Test
293358
void testPutAndPrefixLookup() throws Exception {
294359
TablePath tablePath = TablePath.of("test_db_1", "test_put_and_prefix_lookup_table");

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,6 +1391,20 @@ public class ConfigOptions {
13911391
+ "The `first_row` merge engine will keep the first row of the same primary key. "
13921392
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key.");
13931393

1394+
public static final ConfigOption<Double> TABLE_AUTO_INC_PREFETCH_LOW_WATER_MARK_RATIO =
1395+
key("table.auto-inc.prefetch.low-water-ratio")
1396+
.doubleType()
1397+
.defaultValue(0.9)
1398+
.withDescription(
1399+
"Controls the auto increment column IDs prefetch threshold. The prefetch task is asynchronously launched when the number local cached IDs is less than "
1400+
+ "the low water mark ratio of the prefetch batch size.");
1401+
1402+
public static final ConfigOption<Long> TABLE_AUTO_INC_BATCH_SIZE =
1403+
key("table.auto-inc.batch-size")
1404+
.longType()
1405+
.defaultValue(100000L)
1406+
.withDescription("The auto increment column IDs batch size.");
1407+
13941408
public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
13951409
// we may need to introduce "del-column" in the future to support delete operation
13961410
key("table.merge-engine.versioned.ver-column")

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,15 @@ public int[] getPrimaryKeyIndexes() {
115115
.orElseGet(() -> new int[0]);
116116
}
117117

118+
/** Returns the auto-increment column index, if any, otherwise returns -1. */
119+
public int getAutoIncColumnIndex() {
120+
return columns.stream()
121+
.mapToInt(columns::indexOf)
122+
.filter(idx -> columns.get(idx).isAutoInc())
123+
.findFirst()
124+
.orElse(-1);
125+
}
126+
118127
/** Returns the primary key column names, if any, otherwise returns an empty array. */
119128
public List<String> getPrimaryKeyColumnNames() {
120129
return getPrimaryKey().map(PrimaryKey::getColumnNames).orElse(Collections.emptyList());
@@ -157,6 +166,11 @@ public List<String> getColumnNames(int[] columnIndexes) {
157166
return columnNames;
158167
}
159168

169+
/** Returns the column name in given column index. */
170+
public String getColumnName(int columnIndex) {
171+
return columns.get(columnIndex).columnName;
172+
}
173+
160174
/** Returns the indexes of the fields in the schema. */
161175
public int[] getColumnIndexes(List<String> keyNames) {
162176
int[] keyIndexes = new int[keyNames.size()];
@@ -320,6 +334,18 @@ public Builder withComment(@Nullable String comment) {
320334
return this;
321335
}
322336

337+
/** set auto increment property to the previous column. */
338+
public Builder setAutoIncrement(boolean autoIncrement) {
339+
if (!columns.isEmpty()) {
340+
columns.get(columns.size() - 1).setAutoIncrement(autoIncrement);
341+
} else {
342+
throw new IllegalArgumentException(
343+
"Method 'enableAutoIncrement(...)' must be called after a column definition, "
344+
+ "but there is no preceding column defined.");
345+
}
346+
return this;
347+
}
348+
323349
/**
324350
* Declares a primary key constraint for a set of given columns. Primary key uniquely
325351
* identify a row in a table. Neither of columns in a primary can be nullable. Adding a
@@ -422,22 +448,37 @@ public static final class Column implements Serializable {
422448
private final int columnId;
423449
private final String columnName;
424450
private final DataType dataType;
451+
private boolean isAutoInc;
425452
private final @Nullable String comment;
426453

427454
public Column(String columnName, DataType dataType) {
428-
this(columnName, dataType, null, UNKNOWN_COLUMN_ID);
455+
this(columnName, dataType, null, UNKNOWN_COLUMN_ID, false);
429456
}
430457

431458
public Column(String columnName, DataType dataType, @Nullable String comment) {
432-
this(columnName, dataType, comment, UNKNOWN_COLUMN_ID);
459+
this(columnName, dataType, comment, UNKNOWN_COLUMN_ID, false);
433460
}
434461

435462
public Column(
436463
String columnName, DataType dataType, @Nullable String comment, int columnId) {
464+
this(columnName, dataType, comment, columnId, false);
465+
}
466+
467+
public Column(
468+
String columnName,
469+
DataType dataType,
470+
@Nullable String comment,
471+
int columnId,
472+
boolean isAutoInc) {
437473
this.columnName = columnName;
438474
this.dataType = dataType;
439475
this.comment = comment;
440476
this.columnId = columnId;
477+
this.isAutoInc = isAutoInc;
478+
}
479+
480+
public boolean isAutoInc() {
481+
return isAutoInc;
441482
}
442483

443484
public String getName() {
@@ -460,6 +501,10 @@ public Column withComment(String comment) {
460501
return new Column(columnName, dataType, comment, columnId);
461502
}
462503

504+
public void setAutoIncrement(boolean autoIncrement) {
505+
this.isAutoInc = autoIncrement;
506+
}
507+
463508
@Override
464509
public String toString() {
465510
final StringBuilder sb = new StringBuilder();
@@ -471,6 +516,9 @@ public String toString() {
471516
sb.append(EncodingUtils.escapeSingleQuotes(c));
472517
sb.append("'");
473518
});
519+
if (isAutoInc()) {
520+
sb.append(" AUTO_INCREMENT");
521+
}
474522
return sb.toString();
475523
}
476524

@@ -486,12 +534,13 @@ public boolean equals(Object o) {
486534
return Objects.equals(columnName, that.columnName)
487535
&& Objects.equals(dataType, that.dataType)
488536
&& Objects.equals(comment, that.comment)
489-
&& Objects.equals(columnId, that.columnId);
537+
&& Objects.equals(columnId, that.columnId)
538+
&& Objects.equals(isAutoInc, that.isAutoInc);
490539
}
491540

492541
@Override
493542
public int hashCode() {
494-
return Objects.hash(columnName, dataType, comment, columnId);
543+
return Objects.hash(columnName, dataType, comment, columnId, isAutoInc);
495544
}
496545
}
497546

fluss-common/src/main/java/org/apache/fluss/utils/json/ColumnJsonSerde.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class ColumnJsonSerde
3737
static final String ID = "id";
3838
static final String DATA_TYPE = "data_type";
3939
static final String COMMENT = "comment";
40+
static final String AUTO_INC = "auto_inc";
4041

4142
@Override
4243
public void serialize(Schema.Column column, JsonGenerator generator) throws IOException {
@@ -50,6 +51,8 @@ public void serialize(Schema.Column column, JsonGenerator generator) throws IOEx
5051
generator.writeStringField(COMMENT, column.getComment().get());
5152
}
5253
generator.writeNumberField(ID, column.getColumnId());
54+
generator.writeFieldName(AUTO_INC);
55+
generator.writeBoolean(column.isAutoInc());
5356

5457
generator.writeEndObject();
5558
}
@@ -64,6 +67,7 @@ public Schema.Column deserialize(JsonNode node) {
6467
columnName,
6568
dataType,
6669
node.hasNonNull(COMMENT) ? node.get(COMMENT).asText() : null,
67-
node.has(ID) ? node.get(ID).asInt() : UNKNOWN_COLUMN_ID);
70+
node.has(ID) ? node.get(ID).asInt() : UNKNOWN_COLUMN_ID,
71+
node.required(AUTO_INC).asBoolean());
6872
}
6973
}

fluss-server/src/main/java/org/apache/fluss/server/SequenceIDCounter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,12 @@ public interface SequenceIDCounter {
2626
* @return The previous sequence ID
2727
*/
2828
long getAndIncrement() throws Exception;
29+
30+
/**
31+
* Atomically adds the given delta to the sequence ID.
32+
*
33+
* @param delta The delta to add
34+
* @return The previous sequence ID
35+
*/
36+
long getAndAdd(Long delta) throws Exception;
2937
}

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.metadata.TableInfo;
3434
import org.apache.fluss.metadata.TablePath;
3535
import org.apache.fluss.server.TabletManagerBase;
36+
import org.apache.fluss.server.kv.autoinc.AutoIncProcessor;
3637
import org.apache.fluss.server.kv.rowmerger.RowMerger;
3738
import org.apache.fluss.server.log.LogManager;
3839
import org.apache.fluss.server.log.LogTablet;
@@ -43,6 +44,7 @@
4344
import org.apache.fluss.utils.FileUtils;
4445
import org.apache.fluss.utils.FlussPaths;
4546
import org.apache.fluss.utils.MapUtils;
47+
import org.apache.fluss.utils.concurrent.Scheduler;
4648
import org.apache.fluss.utils.types.Tuple2;
4749

4850
import org.slf4j.Logger;
@@ -89,13 +91,16 @@ public final class KvManager extends TabletManagerBase {
8991

9092
private final FileSystem remoteFileSystem;
9193

94+
private final Scheduler scheduler;
95+
9296
private KvManager(
9397
File dataDir,
9498
Configuration conf,
9599
ZooKeeperClient zkClient,
96100
int recoveryThreadsPerDataDir,
97101
LogManager logManager,
98-
TabletServerMetricGroup tabletServerMetricGroup)
102+
TabletServerMetricGroup tabletServerMetricGroup,
103+
Scheduler scheduler)
99104
throws IOException {
100105
super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir);
101106
this.logManager = logManager;
@@ -105,13 +110,15 @@ private KvManager(
105110
this.remoteKvDir = FlussPaths.remoteKvDir(conf);
106111
this.remoteFileSystem = remoteKvDir.getFileSystem();
107112
this.serverMetricGroup = tabletServerMetricGroup;
113+
this.scheduler = scheduler;
108114
}
109115

110116
public static KvManager create(
111117
Configuration conf,
112118
ZooKeeperClient zkClient,
113119
LogManager logManager,
114-
TabletServerMetricGroup tabletServerMetricGroup)
120+
TabletServerMetricGroup tabletServerMetricGroup,
121+
Scheduler scheduler)
115122
throws IOException {
116123
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
117124
File dataDir = new File(dataDirString).getAbsoluteFile();
@@ -121,7 +128,8 @@ public static KvManager create(
121128
zkClient,
122129
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
123130
logManager,
124-
tabletServerMetricGroup);
131+
tabletServerMetricGroup,
132+
scheduler);
125133
}
126134

127135
public void startup() {
@@ -174,6 +182,15 @@ public KvTablet getOrCreateKv(
174182

175183
File tabletDir = getOrCreateTabletDir(tablePath, tableBucket);
176184
RowMerger merger = RowMerger.create(tableConfig, kvFormat);
185+
AutoIncProcessor autoIncProcessor =
186+
AutoIncProcessor.create(
187+
tablePath.getTablePath(),
188+
schemaGetter.getLatestSchemaInfo().getSchemaId(),
189+
conf,
190+
tableConfig,
191+
schemaGetter.getLatestSchemaInfo().getSchema(),
192+
zkClient,
193+
scheduler);
177194
KvTablet tablet =
178195
KvTablet.create(
179196
logTablet,
@@ -185,7 +202,8 @@ public KvTablet getOrCreateKv(
185202
kvFormat,
186203
merger,
187204
arrowCompressionInfo,
188-
schemaGetter);
205+
schemaGetter,
206+
autoIncProcessor);
189207
currentKvs.put(tableBucket, tablet);
190208

191209
LOG.info(
@@ -280,6 +298,15 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
280298
RowMerger rowMerger =
281299
RowMerger.create(
282300
tableInfo.getTableConfig(), tableInfo.getTableConfig().getKvFormat());
301+
AutoIncProcessor autoIncProcessor =
302+
AutoIncProcessor.create(
303+
tablePath,
304+
tableInfo.getSchemaId(),
305+
tableInfo.getProperties(),
306+
tableInfo.getTableConfig(),
307+
tableInfo.getSchema(),
308+
zkClient,
309+
scheduler);
283310
KvTablet kvTablet =
284311
KvTablet.create(
285312
physicalTablePath,
@@ -293,7 +320,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
293320
tableInfo.getTableConfig().getKvFormat(),
294321
rowMerger,
295322
tableInfo.getTableConfig().getArrowCompressionInfo(),
296-
schemaGetter);
323+
schemaGetter,
324+
autoIncProcessor);
297325
if (this.currentKvs.containsKey(tableBucket)) {
298326
throw new IllegalStateException(
299327
String.format(

0 commit comments

Comments
 (0)