Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.ProjectedRow;
import org.apache.fluss.row.indexed.IndexedRow;
import org.apache.fluss.types.BigIntType;
import org.apache.fluss.types.DataTypes;
Expand Down Expand Up @@ -289,6 +290,47 @@ void testPutAndLookup() throws Exception {
.isEqualTo(row(1, "a", null));
}

@Test
void testPutAutoIncColumnAndLookup() throws Exception {
Schema schema =
Schema.newBuilder()
.column("col1", DataTypes.STRING())
.withComment("col1 is first column")
.column("col2", DataTypes.BIGINT())
.withComment("col2 is second column, auto increment column")
.setAutoIncrement(true)
.primaryKey("col1")
.build();
TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).distributedBy(2, "col1").build();
// create the table
TablePath data1PkTablePath =
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc");
createTable(data1PkTablePath, tableDescriptor, true);
Table autoIncTable = conn.getTable(data1PkTablePath);
UpsertWriter upsertWriter = autoIncTable.newUpsert().createWriter();
String[] keys = new String[] {"a", "b", "c", "d"};
for (String key : keys) {
upsertWriter.upsert(row(key, null));
upsertWriter.flush();
}

Lookuper lookuper = autoIncTable.newLookup().createLookuper();
ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("a", null))))
.withSchema(schema.getRowType())
.isEqualTo(row("a", 0L));
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("b", null))))
.withSchema(schema.getRowType())
.isEqualTo(row("b", 100000L));
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("c", null))))
.withSchema(schema.getRowType())
.isEqualTo(row("c", 1L));
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("d", null))))
.withSchema(schema.getRowType())
.isEqualTo(row("d", 100001L));
}

@Test
void testPutAndPrefixLookup() throws Exception {
TablePath tablePath = TablePath.of("test_db_1", "test_put_and_prefix_lookup_table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,20 @@ public class ConfigOptions {
+ "The `first_row` merge engine will keep the first row of the same primary key. "
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key.");

public static final ConfigOption<Double> TABLE_AUTO_INC_PREFETCH_LOW_WATER_MARK_RATIO =
key("table.auto-inc.prefetch.low-water-mark-ratio")
.doubleType()
.defaultValue(0.5)
.withDescription(
"The ratio threshold of remaining local cached auto-increment IDs to trigger async prefetch. When the number of remaining IDs in the current segment is less than (batch-size * this ratio), the system will asynchronously fetch a new batch of IDs from the distributed counter. Default: 0.9 (90% of batch size).");

public static final ConfigOption<Long> TABLE_AUTO_INC_BATCH_SIZE =
key("table.auto-inc.batch-size")
.longType()
.defaultValue(100000L)
.withDescription(
"The batch size of auto-increment IDs fetched from the distributed counter each time. This value determines the length of the ID segment cached locally, and works with low-water-mark-ratio to control the prefetch timing. Default: 100000.");

public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
// we may need to introduce "del-column" in the future to support delete operation
key("table.merge-engine.versioned.ver-column")
Expand Down
57 changes: 53 additions & 4 deletions fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ public int[] getPrimaryKeyIndexes() {
.orElseGet(() -> new int[0]);
}

/** Returns the auto-increment column index, if any, otherwise returns -1. */
public int getAutoIncColumnIndex() {
return columns.stream()
.mapToInt(columns::indexOf)
.filter(idx -> columns.get(idx).isAutoInc())
.findFirst()
.orElse(-1);
}

/** Returns the primary key column names, if any, otherwise returns an empty array. */
public List<String> getPrimaryKeyColumnNames() {
return getPrimaryKey().map(PrimaryKey::getColumnNames).orElse(Collections.emptyList());
Expand Down Expand Up @@ -157,6 +166,11 @@ public List<String> getColumnNames(int[] columnIndexes) {
return columnNames;
}

/** Returns the column name in given column index. */
public String getColumnName(int columnIndex) {
return columns.get(columnIndex).columnName;
}

/** Returns the indexes of the fields in the schema. */
public int[] getColumnIndexes(List<String> keyNames) {
int[] keyIndexes = new int[keyNames.size()];
Expand Down Expand Up @@ -320,6 +334,18 @@ public Builder withComment(@Nullable String comment) {
return this;
}

/** set auto increment property to the previous column. */
public Builder setAutoIncrement(boolean autoIncrement) {
if (!columns.isEmpty()) {
columns.get(columns.size() - 1).setAutoIncrement(autoIncrement);
} else {
throw new IllegalArgumentException(
"Method 'enableAutoIncrement(...)' must be called after a column definition, "
+ "but there is no preceding column defined.");
}
return this;
}

/**
* Declares a primary key constraint for a set of given columns. Primary key uniquely
* identify a row in a table. Neither of columns in a primary can be nullable. Adding a
Expand Down Expand Up @@ -422,22 +448,37 @@ public static final class Column implements Serializable {
private final int columnId;
private final String columnName;
private final DataType dataType;
private boolean isAutoInc;
private final @Nullable String comment;

public Column(String columnName, DataType dataType) {
this(columnName, dataType, null, UNKNOWN_COLUMN_ID);
this(columnName, dataType, null, UNKNOWN_COLUMN_ID, false);
}

public Column(String columnName, DataType dataType, @Nullable String comment) {
this(columnName, dataType, comment, UNKNOWN_COLUMN_ID);
this(columnName, dataType, comment, UNKNOWN_COLUMN_ID, false);
}

public Column(
String columnName, DataType dataType, @Nullable String comment, int columnId) {
this(columnName, dataType, comment, columnId, false);
}

public Column(
String columnName,
DataType dataType,
@Nullable String comment,
int columnId,
boolean isAutoInc) {
this.columnName = columnName;
this.dataType = dataType;
this.comment = comment;
this.columnId = columnId;
this.isAutoInc = isAutoInc;
}

public boolean isAutoInc() {
return isAutoInc;
}

public String getName() {
Expand All @@ -460,6 +501,10 @@ public Column withComment(String comment) {
return new Column(columnName, dataType, comment, columnId);
}

public void setAutoIncrement(boolean autoIncrement) {
this.isAutoInc = autoIncrement;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
Expand All @@ -471,6 +516,9 @@ public String toString() {
sb.append(EncodingUtils.escapeSingleQuotes(c));
sb.append("'");
});
if (isAutoInc()) {
sb.append(" AUTO_INCREMENT");
}
return sb.toString();
}

Expand All @@ -486,12 +534,13 @@ public boolean equals(Object o) {
return Objects.equals(columnName, that.columnName)
&& Objects.equals(dataType, that.dataType)
&& Objects.equals(comment, that.comment)
&& Objects.equals(columnId, that.columnId);
&& Objects.equals(columnId, that.columnId)
&& Objects.equals(isAutoInc, that.isAutoInc);
}

@Override
public int hashCode() {
return Objects.hash(columnName, dataType, comment, columnId);
return Objects.hash(columnName, dataType, comment, columnId, isAutoInc);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ColumnJsonSerde
static final String ID = "id";
static final String DATA_TYPE = "data_type";
static final String COMMENT = "comment";
static final String AUTO_INC = "auto_inc";

@Override
public void serialize(Schema.Column column, JsonGenerator generator) throws IOException {
Expand All @@ -50,6 +51,10 @@ public void serialize(Schema.Column column, JsonGenerator generator) throws IOEx
generator.writeStringField(COMMENT, column.getComment().get());
}
generator.writeNumberField(ID, column.getColumnId());
if (column.isAutoInc()) {
generator.writeFieldName(AUTO_INC);
generator.writeBoolean(column.isAutoInc());
}

generator.writeEndObject();
}
Expand All @@ -64,6 +69,7 @@ public Schema.Column deserialize(JsonNode node) {
columnName,
dataType,
node.hasNonNull(COMMENT) ? node.get(COMMENT).asText() : null,
node.has(ID) ? node.get(ID).asInt() : UNKNOWN_COLUMN_ID);
node.has(ID) ? node.get(ID).asInt() : UNKNOWN_COLUMN_ID,
node.hasNonNull(AUTO_INC) && node.get(AUTO_INC).asBoolean());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ public interface SequenceIDCounter {
* @return The previous sequence ID
*/
long getAndIncrement() throws Exception;

/**
* Atomically adds the given delta to the sequence ID.
*
* @param delta The delta to add
* @return The previous sequence ID
*/
long getAndAdd(Long delta) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.TabletManagerBase;
import org.apache.fluss.server.kv.autoinc.AutoIncProcessor;
import org.apache.fluss.server.kv.rowmerger.RowMerger;
import org.apache.fluss.server.log.LogManager;
import org.apache.fluss.server.log.LogTablet;
Expand All @@ -46,6 +47,7 @@
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.MapUtils;
import org.apache.fluss.utils.concurrent.Scheduler;
import org.apache.fluss.utils.types.Tuple2;

import org.rocksdb.RateLimiter;
Expand Down Expand Up @@ -138,13 +140,16 @@ public static RateLimiter getDefaultRateLimiter() {

private volatile boolean isShutdown = false;

private final Scheduler scheduler;

private KvManager(
File dataDir,
Configuration conf,
ZooKeeperClient zkClient,
int recoveryThreadsPerDataDir,
LogManager logManager,
TabletServerMetricGroup tabletServerMetricGroup)
TabletServerMetricGroup tabletServerMetricGroup,
Scheduler scheduler)
throws IOException {
super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir);
this.logManager = logManager;
Expand All @@ -157,6 +162,7 @@ private KvManager(
this.sharedRocksDBRateLimiter = createSharedRateLimiter(conf);
this.currentSharedRateLimitBytesPerSec =
conf.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC).getBytes();
this.scheduler = scheduler;
}

private static RateLimiter createSharedRateLimiter(Configuration conf) {
Expand All @@ -181,7 +187,8 @@ public static KvManager create(
Configuration conf,
ZooKeeperClient zkClient,
LogManager logManager,
TabletServerMetricGroup tabletServerMetricGroup)
TabletServerMetricGroup tabletServerMetricGroup,
Scheduler scheduler)
throws IOException {
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
File dataDir = new File(dataDirString).getAbsoluteFile();
Expand All @@ -191,7 +198,8 @@ public static KvManager create(
zkClient,
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
logManager,
tabletServerMetricGroup);
tabletServerMetricGroup,
scheduler);
}

public void startup() {
Expand Down Expand Up @@ -248,6 +256,15 @@ public KvTablet getOrCreateKv(

File tabletDir = getOrCreateTabletDir(tablePath, tableBucket);
RowMerger merger = RowMerger.create(tableConfig, kvFormat);
AutoIncProcessor autoIncProcessor =
AutoIncProcessor.create(
tablePath.getTablePath(),
schemaGetter.getLatestSchemaInfo().getSchemaId(),
conf,
tableConfig,
schemaGetter.getLatestSchemaInfo().getSchema(),
zkClient,
scheduler);
KvTablet tablet =
KvTablet.create(
tablePath,
Expand All @@ -263,7 +280,8 @@ public KvTablet getOrCreateKv(
arrowCompressionInfo,
schemaGetter,
tableConfig.getChangelogImage(),
sharedRocksDBRateLimiter);
sharedRocksDBRateLimiter,
autoIncProcessor);
currentKvs.put(tableBucket, tablet);

LOG.info(
Expand Down Expand Up @@ -357,6 +375,15 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti

TableConfig tableConfig = tableInfo.getTableConfig();
RowMerger rowMerger = RowMerger.create(tableConfig, tableConfig.getKvFormat());
AutoIncProcessor autoIncProcessor =
AutoIncProcessor.create(
tablePath,
tableInfo.getSchemaId(),
tableInfo.getProperties(),
tableInfo.getTableConfig(),
tableInfo.getSchema(),
zkClient,
scheduler);
KvTablet kvTablet =
KvTablet.create(
physicalTablePath,
Expand All @@ -372,7 +399,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
tableConfig.getArrowCompressionInfo(),
schemaGetter,
tableConfig.getChangelogImage(),
sharedRocksDBRateLimiter);
sharedRocksDBRateLimiter,
autoIncProcessor);
if (this.currentKvs.containsKey(tableBucket)) {
throw new IllegalStateException(
String.format(
Expand Down
Loading