Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,22 @@ public String getComment() {
public ColumnPosition getPosition() {
return position;
}

@Override
public String toString() {
return "AddColumn{"
+ "name='"
+ name
+ '\''
+ ", dataType="
+ dataType
+ ", comment='"
+ comment
+ '\''
+ ", position="
+ position
+ '}';
}
}

/** A table change to drop a column. */
Expand All @@ -270,6 +286,11 @@ private DropColumn(String name) {
public String getName() {
return name;
}

@Override
public String toString() {
return "DropColumn{" + "name='" + name + '\'' + '}';
}
}

/** A table change to modify a column. */
Expand Down Expand Up @@ -308,6 +329,22 @@ public String getComment() {
public ColumnPosition getNewPosition() {
return newPosition;
}

@Override
public String toString() {
return "ModifyColumn{"
+ "name='"
+ name
+ '\''
+ ", dataType="
+ dataType
+ ", comment='"
+ comment
+ '\''
+ ", newPosition="
+ newPosition
+ '}';
}
}

/** A table change to modify a column's name. */
Expand All @@ -327,6 +364,18 @@ public String getOldColumnName() {
public String getNewColumnName() {
return newColumnName;
}

@Override
public String toString() {
return "RenameColumn{"
+ "oldColumnName='"
+ oldColumnName
+ '\''
+ ", newColumnName='"
+ newColumnName
+ '\''
+ '}';
}
}

/** The position of the modified or added column. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
Expand All @@ -38,6 +39,8 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -54,6 +57,7 @@
/** A Paimon implementation of {@link LakeCatalog}. */
public class PaimonLakeCatalog implements LakeCatalog {

private static final Logger LOG = LoggerFactory.getLogger(PaimonLakeCatalog.class);
public static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new LinkedHashMap<>();

static {
Expand Down Expand Up @@ -109,13 +113,92 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
throws TableNotExistException {
try {
List<SchemaChange> paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);
alterTable(tablePath, paimonSchemaChanges);
} catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) {
// shouldn't happen before we support schema change
throw new RuntimeException(e);

// Compare current Paimon table schema with expected target schema before altering
if (shouldAlterTable(tablePath, tableChanges)) {
alterTable(tablePath, paimonSchemaChanges);
} else {
// If schemas already match, treat as idempotent success
LOG.info(
"Skipping schema evolution for Paimon table {} because the column(s) to add {} already exist.",
tablePath,
tableChanges);
}
} catch (Catalog.ColumnAlreadyExistException e) {
// This shouldn't happen if shouldAlterTable works correctly, but keep as safeguard
throw new InvalidAlterTableException(e.getMessage());
} catch (Catalog.ColumnNotExistException e) {
// This shouldn't happen for AddColumn operations
throw new InvalidAlterTableException(e.getMessage());
}
}

private boolean shouldAlterTable(TablePath tablePath, List<TableChange> tableChanges)
throws TableNotExistException {
try {
Table table = paimonCatalog.getTable(toPaimon(tablePath));
FileStoreTable fileStoreTable = (FileStoreTable) table;
Schema currentSchema = fileStoreTable.schema().toSchema();

for (TableChange change : tableChanges) {
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
if (!isColumnAlreadyExists(currentSchema, addColumn)) {
return true;
}
} else {
return true;
}
}

return false;
} catch (Catalog.TableNotExistException e) {
throw new TableNotExistException("Table " + tablePath + " does not exist.");
}
}

private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColumn addColumn) {
String columnName = addColumn.getName();

for (org.apache.paimon.types.DataField field : currentSchema.fields()) {
if (field.name().equals(columnName)) {
org.apache.paimon.types.DataType expectedType =
addColumn
.getDataType()
.accept(
org.apache.fluss.lake.paimon.utils
.FlussDataTypeToPaimonDataType.INSTANCE);

if (!field.type().equals(expectedType)) {
throw new InvalidAlterTableException(
String.format(
"Column '%s' already exists but with different type. "
+ "Existing: %s, Expected: %s",
columnName, field.type(), expectedType));
}
String existingComment = field.description();
String expectedComment = addColumn.getComment();

boolean commentsMatch =
(existingComment == null && expectedComment == null)
|| (existingComment != null
&& existingComment.equals(expectedComment));

if (!commentsMatch) {
throw new InvalidAlterTableException(
String.format(
"Column %s already exists but with different comment. "
+ "Existing: %s, Expected: %s",
columnName, existingComment, expectedComment));
}

return true;
}
}

return false;
}

private void createTable(TablePath tablePath, Schema schema, boolean isCreatingFlussTable)
throws Catalog.DatabaseNotExistException {
Identifier paimonPath = toPaimon(tablePath);
Expand All @@ -134,7 +217,7 @@ private void createTable(TablePath tablePath, Schema schema, boolean isCreatingF
}
} catch (Catalog.TableNotExistException tableNotExistException) {
// shouldn't happen in normal cases
throw new RuntimeException(
throw new InvalidAlterTableException(
String.format(
"Failed to create table %s in Paimon. The table already existed "
+ "during the initial creation attempt, but subsequently "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,49 @@

import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
import static org.apache.fluss.utils.Preconditions.checkState;

/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {

private final int bucket;
private LogRecord logRecord;
private int originRowFieldCount;
private final int businessFieldCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe dataFieldCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer businessFieldCount because it clearly distinguishes from system columns (__bucket, __offset, __timestamp). dataFieldCount might be confused with total field count.

private final int bucketFieldIndex;
private final int offsetFieldIndex;
private final int timestampFieldIndex;

public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
super(tableTowType);
this.bucket = bucket;
this.businessFieldCount = tableRowType.getFieldCount() - SYSTEM_COLUMNS.size();
this.bucketFieldIndex = businessFieldCount;
this.offsetFieldIndex = businessFieldCount + 1;
this.timestampFieldIndex = businessFieldCount + 2;
}

public void setFlussRecord(LogRecord logRecord) {
this.logRecord = logRecord;
this.internalRow = logRecord.getRow();
this.originRowFieldCount = internalRow.getFieldCount();
checkState(
originRowFieldCount == tableRowType.getFieldCount() - SYSTEM_COLUMNS.size(),
"The paimon table fields count must equals to LogRecord's fields count.");
int flussFieldCount = internalRow.getFieldCount();
if (flussFieldCount > businessFieldCount) {
// Fluss record is wider than Paimon schema, which means Lake schema is not yet
// synchronized. With "Lake First" strategy, this should not happen in normal cases.
throw new IllegalStateException(
String.format(
"Fluss record has %d fields but Paimon schema only has %d business fields. "
+ "This indicates the lake schema is not yet synchronized. "
+ "Please retry the schema change operation.",
flussFieldCount, businessFieldCount));
}
this.originRowFieldCount = flussFieldCount;
}

@Override
public int getFieldCount() {
return
// business (including partitions) + system (three system fields: bucket, offset,
// business (including partitions) + system (three system fields: bucket, offset,
// timestamp)
originRowFieldCount + SYSTEM_COLUMNS.size();
return tableRowType.getFieldCount();
}

@Override
Expand All @@ -68,38 +82,60 @@ public boolean isNullAt(int pos) {
if (pos < originRowFieldCount) {
return super.isNullAt(pos);
}
if (pos < businessFieldCount) {
// Padding NULL for missing business fields when Paimon schema is wider than Fluss
return true;
}
// is the last three system fields: bucket, offset, timestamp which are never null
return false;
}

@Override
public int getInt(int pos) {
if (pos == originRowFieldCount) {
if (pos == bucketFieldIndex) {
// bucket system column
return bucket;
}
if (pos >= originRowFieldCount) {
throw new IllegalStateException(
String.format(
"Field %s is NULL because Paimon schema is wider than Fluss record.",
pos));
}
return super.getInt(pos);
}

@Override
public long getLong(int pos) {
if (pos == originRowFieldCount + 1) {
if (pos == offsetFieldIndex) {
// offset system column
return logRecord.logOffset();
} else if (pos == originRowFieldCount + 2) {
} else if (pos == timestampFieldIndex) {
// timestamp system column
return logRecord.timestamp();
}
if (pos >= originRowFieldCount) {
throw new IllegalStateException(
String.format(
"Field %s is NULL because Paimon schema is wider than Fluss record.",
pos));
}
// the origin RowData
return super.getLong(pos);
}

@Override
public Timestamp getTimestamp(int pos, int precision) {
// it's timestamp system column
if (pos == originRowFieldCount + 2) {
if (pos == timestampFieldIndex) {
return Timestamp.fromEpochMillis(logRecord.timestamp());
}
if (pos >= originRowFieldCount) {
throw new IllegalStateException(
String.format(
"Field %s is NULL because Paimon schema is wider than Fluss record.",
pos));
}
return super.getTimestamp(pos, precision);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,31 @@ public static List<SchemaChange> toPaimonSchemaChanges(List<TableChange> tableCh
schemaChanges.add(
SchemaChange.removeOption(
convertFlussPropertyKeyToPaimon(resetOption.getKey())));
} else if (tableChange instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) tableChange;

if (!(addColumn.getPosition() instanceof TableChange.Last)) {
throw new UnsupportedOperationException(
"Only support to add column at last for paimon table.");
}

org.apache.fluss.types.DataType flussDataType = addColumn.getDataType();
if (!flussDataType.isNullable()) {
throw new UnsupportedOperationException(
"Only support to add nullable column for paimon table.");
}

org.apache.paimon.types.DataType paimonDataType =
flussDataType.accept(FlussDataTypeToPaimonDataType.INSTANCE);

String firstSystemColumnName = SYSTEM_COLUMNS.keySet().iterator().next();
schemaChanges.add(
SchemaChange.addColumn(
addColumn.getName(),
paimonDataType,
addColumn.getComment(),
SchemaChange.Move.before(
addColumn.getName(), firstSystemColumnName)));
} else {
throw new UnsupportedOperationException(
"Unsupported table change: " + tableChange.getClass());
Expand Down
Loading