Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.ProjectedRow;
import org.apache.fluss.row.indexed.IndexedRow;
import org.apache.fluss.types.BigIntType;
import org.apache.fluss.types.DataTypes;
Expand Down Expand Up @@ -456,6 +457,47 @@ void testInvalidPrefixLookup() throws Exception {
+ "because the lookup columns [b, a] must contain all bucket keys [a, b] in order.");
}

@Test
void testPutAutoIncColumnAndLookup() throws Exception {
Schema schema =
Schema.newBuilder()
.column("col1", DataTypes.STRING())
.withComment("col1 is first column")
.column("col2", DataTypes.BIGINT())
.withComment("col2 is second column, auto increment column")
.enableAutoIncrement("col2")
.primaryKey("col1")
.build();
TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).distributedBy(2, "col1").build();
// create the table
TablePath data1PkTablePath =
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_auto_inc");
createTable(data1PkTablePath, tableDescriptor, true);
Table autoIncTable = conn.getTable(data1PkTablePath);
UpsertWriter upsertWriter = autoIncTable.newUpsert().partialUpdate("col1").createWriter();
String[] keys = new String[] {"a", "b", "c", "d"};
for (String key : keys) {
upsertWriter.upsert(row(key, null));
upsertWriter.flush();
}

Lookuper lookuper = autoIncTable.newLookup().createLookuper();
ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("a", null))))
.withSchema(schema.getRowType())
.isEqualTo(row("a", 0L));
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("b", null))))
.withSchema(schema.getRowType())
.isEqualTo(row("b", 100000L));
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("c", null))))
.withSchema(schema.getRowType())
.isEqualTo(row("c", 1L));
assertThatRow(lookupRow(lookuper, keyRow.replaceRow(row("d", null))))
.withSchema(schema.getRowType())
.isEqualTo(row("d", 100001L));
}

@Test
void testLookupForNotReadyTable() throws Exception {
TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,13 @@ public class ConfigOptions {
+ "The `first_row` merge engine will keep the first row of the same primary key. "
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key.");

public static final ConfigOption<Long> TABLE_AUTO_INC_BATCH_SIZE =
key("table.auto-inc.batch-size")
.longType()
.defaultValue(100000L)
.withDescription(
"The batch size of auto-increment IDs fetched from the distributed counter each time. This value determines the length of the ID segment cached locally. Default: 100000.");

public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
// we may need to introduce "del-column" in the future to support delete operation
key("table.merge-engine.versioned.ver-column")
Expand Down
15 changes: 12 additions & 3 deletions fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ public int[] getPrimaryKeyIndexes() {
.orElseGet(() -> new int[0]);
}

/** Returns the auto-increment column index, if any, returns an empty array. */
public int[] getAutoIncColumnIndexes() {
final List<String> columns = getColumnNames();
return getAutoIncrementColumnNames().stream().mapToInt(columns::indexOf).toArray();
}

/** Returns the primary key column names, if any, otherwise returns an empty array. */
public List<String> getPrimaryKeyColumnNames() {
return getPrimaryKey().map(PrimaryKey::getColumnNames).orElse(Collections.emptyList());
Expand Down Expand Up @@ -157,6 +163,11 @@ public List<String> getColumnNames(int[] columnIndexes) {
return columnNames;
}

/** Returns the column name in given column index. */
public String getColumnName(int columnIndex) {
return columns.get(columnIndex).columnName;
}

/** Returns the indexes of the fields in the schema. */
public int[] getColumnIndexes(List<String> keyNames) {
int[] keyIndexes = new int[keyNames.size()];
Expand Down Expand Up @@ -608,9 +619,7 @@ private static List<Column> normalizeColumns(
}

// primary key and auto increment column should not nullable
if ((pkSet.contains(column.getName())
|| autoIncrementColumnNames.contains(column.getName()))
&& column.getDataType().isNullable()) {
if (pkSet.contains(column.getName()) && column.getDataType().isNullable()) {
newColumns.add(
new Column(
column.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase<Schema> {
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c is third column\",\"id\":2}],\"highest_field_id\":2}";

static final String SCHEMA_JSON_4 =
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";

SchemaJsonSerdeTest() {
super(SchemaJsonSerde.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ public interface SequenceIDCounter {
* @return The previous sequence ID
*/
long getAndIncrement() throws Exception;

/**
* Atomically adds the given delta to the sequence ID.
*
* @param delta The delta to add
* @return The previous sequence ID
*/
long getAndAdd(Long delta) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.TabletManagerBase;
import org.apache.fluss.server.kv.autoinc.AutoIncProcessor;
import org.apache.fluss.server.kv.rowmerger.RowMerger;
import org.apache.fluss.server.log.LogManager;
import org.apache.fluss.server.log.LogTablet;
Expand Down Expand Up @@ -248,6 +249,14 @@ public KvTablet getOrCreateKv(

File tabletDir = getOrCreateTabletDir(tablePath, tableBucket);
RowMerger merger = RowMerger.create(tableConfig, kvFormat);
AutoIncProcessor autoIncProcessor =
AutoIncProcessor.create(
tablePath.getTablePath(),
schemaGetter.getLatestSchemaInfo().getSchemaId(),
conf,
tableConfig,
schemaGetter.getLatestSchemaInfo().getSchema(),
zkClient);
KvTablet tablet =
KvTablet.create(
tablePath,
Expand All @@ -263,7 +272,8 @@ public KvTablet getOrCreateKv(
arrowCompressionInfo,
schemaGetter,
tableConfig.getChangelogImage(),
sharedRocksDBRateLimiter);
sharedRocksDBRateLimiter,
autoIncProcessor);
currentKvs.put(tableBucket, tablet);

LOG.info(
Expand Down Expand Up @@ -357,6 +367,14 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti

TableConfig tableConfig = tableInfo.getTableConfig();
RowMerger rowMerger = RowMerger.create(tableConfig, tableConfig.getKvFormat());
AutoIncProcessor autoIncProcessor =
AutoIncProcessor.create(
tablePath,
tableInfo.getSchemaId(),
tableInfo.getProperties(),
tableInfo.getTableConfig(),
tableInfo.getSchema(),
zkClient);
KvTablet kvTablet =
KvTablet.create(
physicalTablePath,
Expand All @@ -372,7 +390,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
tableConfig.getArrowCompressionInfo(),
schemaGetter,
tableConfig.getChangelogImage(),
sharedRocksDBRateLimiter);
sharedRocksDBRateLimiter,
autoIncProcessor);
if (this.currentKvs.containsKey(tableBucket)) {
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.row.arrow.ArrowWriterProvider;
import org.apache.fluss.row.encode.ValueDecoder;
import org.apache.fluss.server.kv.autoinc.AutoIncProcessor;
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer;
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
Expand Down Expand Up @@ -112,6 +113,7 @@ public final class KvTablet {
// defines how to merge rows on the same primary key
private final RowMerger rowMerger;
private final ArrowCompressionInfo arrowCompressionInfo;
private final AutoIncProcessor autoIncProcessor;

private final SchemaGetter schemaGetter;

Expand Down Expand Up @@ -142,7 +144,8 @@ private KvTablet(
RowMerger rowMerger,
ArrowCompressionInfo arrowCompressionInfo,
SchemaGetter schemaGetter,
ChangelogImage changelogImage) {
ChangelogImage changelogImage,
AutoIncProcessor autoIncProcessor) {
this.physicalPath = physicalPath;
this.tableBucket = tableBucket;
this.logTablet = logTablet;
Expand All @@ -158,6 +161,7 @@ private KvTablet(
this.arrowCompressionInfo = arrowCompressionInfo;
this.schemaGetter = schemaGetter;
this.changelogImage = changelogImage;
this.autoIncProcessor = autoIncProcessor;
}

public static KvTablet create(
Expand All @@ -174,7 +178,8 @@ public static KvTablet create(
ArrowCompressionInfo arrowCompressionInfo,
SchemaGetter schemaGetter,
ChangelogImage changelogImage,
RateLimiter sharedRateLimiter)
RateLimiter sharedRateLimiter,
AutoIncProcessor autoIncProcessor)
throws IOException {
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter);
return new KvTablet(
Expand All @@ -192,7 +197,8 @@ public static KvTablet create(
rowMerger,
arrowCompressionInfo,
schemaGetter,
changelogImage);
changelogImage,
autoIncProcessor);
}

private static RocksDBKv buildRocksDBKv(
Expand Down Expand Up @@ -468,8 +474,9 @@ private long applyInsert(
PaddingRow latestSchemaRow,
long logOffset)
throws Exception {
walBuilder.append(ChangeType.INSERT, latestSchemaRow.replaceRow(currentValue.row));
kvPreWriteBuffer.put(key, currentValue.encodeValue(), logOffset);
BinaryValue newValue = autoIncProcessor.processAutoInc(currentValue);
walBuilder.append(ChangeType.INSERT, latestSchemaRow.replaceRow(newValue.row));
kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset);
return logOffset + 1;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv.autoinc;

import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.record.BinaryValue;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.encode.RowEncoder;
import org.apache.fluss.types.DataType;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.util.HashMap;
import java.util.Map;

/** A updater to auto increment column . */
@NotThreadSafe
public class AutoIncColumnProcessor implements AutoIncProcessor {

private final InternalRow.FieldGetter[] flussFieldGetters;

private final RowEncoder rowEncoder;

private final DataType[] fieldDataTypes;
private final Map<Integer, IncIDGenerator> idGeneratorMap;
private final short schemaId;

public AutoIncColumnProcessor(
KvFormat kvFormat,
short schemaId,
Schema schema,
int[] targetColumnIdx,
IncIDGenerator[] incIDGenerator) {
this.idGeneratorMap = new HashMap<>();
for (int i = 0; i < targetColumnIdx.length; i++) {
idGeneratorMap.put(targetColumnIdx[i], incIDGenerator[i]);
}
this.fieldDataTypes = schema.getRowType().getChildren().toArray(new DataType[0]);

// getter for the fields in row
flussFieldGetters = new InternalRow.FieldGetter[fieldDataTypes.length];
for (int i = 0; i < fieldDataTypes.length; i++) {
flussFieldGetters[i] = InternalRow.createFieldGetter(fieldDataTypes[i], i);
}
this.rowEncoder = RowEncoder.create(kvFormat, fieldDataTypes);
this.schemaId = schemaId;
}

@Nullable
@Override
public BinaryValue processAutoInc(BinaryValue oldValue) {
rowEncoder.startNewRow();
for (int i = 0; i < fieldDataTypes.length; i++) {
if (idGeneratorMap.containsKey(i)) {
if (oldValue != null && oldValue.row.isNullAt(i)) {
rowEncoder.encodeField(i, idGeneratorMap.get(i).nextVal());
}
} else {
// use the old row value
if (oldValue == null) {
rowEncoder.encodeField(i, null);
} else {
rowEncoder.encodeField(i, flussFieldGetters[i].getFieldOrNull(oldValue.row));
}
}
}
return new BinaryValue(schemaId, rowEncoder.finishRow());
}
}
Loading