Skip to content

Commit fbf3494

Browse files
committed
fix some problem
1 parent cc63767 commit fbf3494

File tree

6 files changed

+124
-68
lines changed

6 files changed

+124
-68
lines changed

fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,22 @@ public String getComment() {
257257
public ColumnPosition getPosition() {
258258
return position;
259259
}
260+
261+
@Override
262+
public String toString() {
263+
return "AddColumn{"
264+
+ "name='"
265+
+ name
266+
+ '\''
267+
+ ", dataType="
268+
+ dataType
269+
+ ", comment='"
270+
+ comment
271+
+ '\''
272+
+ ", position="
273+
+ position
274+
+ '}';
275+
}
260276
}
261277

262278
/** A table change to drop a column. */
@@ -270,6 +286,11 @@ private DropColumn(String name) {
270286
public String getName() {
271287
return name;
272288
}
289+
290+
@Override
291+
public String toString() {
292+
return "DropColumn{" + "name='" + name + '\'' + '}';
293+
}
273294
}
274295

275296
/** A table change to modify a column. */
@@ -308,6 +329,22 @@ public String getComment() {
308329
public ColumnPosition getNewPosition() {
309330
return newPosition;
310331
}
332+
333+
@Override
334+
public String toString() {
335+
return "ModifyColumn{"
336+
+ "name='"
337+
+ name
338+
+ '\''
339+
+ ", dataType="
340+
+ dataType
341+
+ ", comment='"
342+
+ comment
343+
+ '\''
344+
+ ", newPosition="
345+
+ newPosition
346+
+ '}';
347+
}
311348
}
312349

313350
/** A table change to modify a column's name. */
@@ -327,6 +364,18 @@ public String getOldColumnName() {
327364
public String getNewColumnName() {
328365
return newColumnName;
329366
}
367+
368+
@Override
369+
public String toString() {
370+
return "RenameColumn{"
371+
+ "oldColumnName='"
372+
+ oldColumnName
373+
+ '\''
374+
+ ", newColumnName='"
375+
+ newColumnName
376+
+ '\''
377+
+ '}';
378+
}
330379
}
331380

332381
/** The position of the modified or added column. */

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.exception.InvalidAlterTableException;
2223
import org.apache.fluss.exception.TableAlreadyExistException;
2324
import org.apache.fluss.exception.TableNotExistException;
2425
import org.apache.fluss.lake.lakestorage.LakeCatalog;
@@ -38,6 +39,8 @@
3839
import org.apache.paimon.table.Table;
3940
import org.apache.paimon.types.DataType;
4041
import org.apache.paimon.types.DataTypes;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
4144

4245
import java.util.LinkedHashMap;
4346
import java.util.List;
@@ -54,6 +57,7 @@
5457
/** A Paimon implementation of {@link LakeCatalog}. */
5558
public class PaimonLakeCatalog implements LakeCatalog {
5659

60+
private static final Logger LOG = LoggerFactory.getLogger(PaimonLakeCatalog.class);
5761
public static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new LinkedHashMap<>();
5862

5963
static {
@@ -113,14 +117,19 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
113117
// Compare current Paimon table schema with expected target schema before altering
114118
if (shouldAlterTable(tablePath, tableChanges)) {
115119
alterTable(tablePath, paimonSchemaChanges);
120+
} else {
121+
// If schemas already match, treat as idempotent success
122+
LOG.info(
123+
"Skipping schema evolution for Paimon table {} because the column(s) to add {} already exist.",
124+
tablePath,
125+
tableChanges);
116126
}
117-
// If schemas already match, treat as idempotent success
118127
} catch (Catalog.ColumnAlreadyExistException e) {
119128
// This shouldn't happen if shouldAlterTable works correctly, but keep as safeguard
120-
throw new RuntimeException("Unexpected ColumnAlreadyExistException", e);
129+
throw new InvalidAlterTableException(e.getMessage());
121130
} catch (Catalog.ColumnNotExistException e) {
122131
// This shouldn't happen for AddColumn operations
123-
throw new RuntimeException(e);
132+
throw new InvalidAlterTableException(e.getMessage());
124133
}
125134
}
126135

@@ -161,9 +170,9 @@ private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColum
161170
.FlussDataTypeToPaimonDataType.INSTANCE);
162171

163172
if (!field.type().equals(expectedType)) {
164-
throw new IllegalStateException(
173+
throw new InvalidAlterTableException(
165174
String.format(
166-
"Column %s already exists but with different type. "
175+
"Column '%s' already exists but with different type. "
167176
+ "Existing: %s, Expected: %s",
168177
columnName, field.type(), expectedType));
169178
}
@@ -176,7 +185,7 @@ private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColum
176185
&& existingComment.equals(expectedComment));
177186

178187
if (!commentsMatch) {
179-
throw new IllegalStateException(
188+
throw new InvalidAlterTableException(
180189
String.format(
181190
"Column %s already exists but with different comment. "
182191
+ "Existing: %s, Expected: %s",
@@ -208,7 +217,7 @@ private void createTable(TablePath tablePath, Schema schema, boolean isCreatingF
208217
}
209218
} catch (Catalog.TableNotExistException tableNotExistException) {
210219
// shouldn't happen in normal cases
211-
throw new RuntimeException(
220+
throw new InvalidAlterTableException(
212221
String.format(
213222
"Failed to create table %s in Paimon. The table already existed "
214223
+ "during the initial creation attempt, but subsequently "

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.lake.paimon;
1919

2020
import org.apache.fluss.config.Configuration;
21+
import org.apache.fluss.exception.InvalidAlterTableException;
2122
import org.apache.fluss.exception.TableNotExistException;
2223
import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
2324
import org.apache.fluss.metadata.Schema;
@@ -191,6 +192,57 @@ tablePath, changes, new TestingLakeCatalogContext()))
191192
.hasMessage("Only support to add nullable column for paimon table.");
192193
}
193194

195+
@Test
196+
void testAlterTableAddExistingColumn() {
197+
String database = "test_alter_table_add_existing_column_db";
198+
String tableName = "test_alter_table_add_existing_column_table";
199+
TablePath tablePath = TablePath.of(database, tableName);
200+
createTable(database, tableName);
201+
202+
List<TableChange> changes =
203+
Collections.singletonList(
204+
TableChange.addColumn(
205+
"address",
206+
DataTypes.STRING(),
207+
null,
208+
TableChange.ColumnPosition.last()));
209+
210+
// no exception thrown when adding existing column
211+
flussPaimonCatalog.alterTable(tablePath, changes, new TestingLakeCatalogContext());
212+
213+
List<TableChange> changes2 =
214+
Collections.singletonList(
215+
TableChange.addColumn(
216+
"address",
217+
DataTypes.INT(),
218+
null,
219+
TableChange.ColumnPosition.last()));
220+
221+
assertThatThrownBy(
222+
() ->
223+
flussPaimonCatalog.alterTable(
224+
tablePath, changes2, new TestingLakeCatalogContext()))
225+
.isInstanceOf(InvalidAlterTableException.class)
226+
.hasMessage(
227+
"Column 'address' already exists but with different type. Existing: STRING, Expected: INT");
228+
229+
List<TableChange> changes3 =
230+
Collections.singletonList(
231+
TableChange.addColumn(
232+
"address",
233+
DataTypes.STRING(),
234+
"the address comment",
235+
TableChange.ColumnPosition.last()));
236+
237+
assertThatThrownBy(
238+
() ->
239+
flussPaimonCatalog.alterTable(
240+
tablePath, changes3, new TestingLakeCatalogContext()))
241+
.isInstanceOf(InvalidAlterTableException.class)
242+
.hasMessage(
243+
"Column address already exists but with different comment. Existing: null, Expected: the address comment");
244+
}
245+
194246
private void createTable(String database, String tableName) {
195247
Schema flussSchema =
196248
Schema.newBuilder()

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -167,66 +167,6 @@ void testPrimaryKeyTableRecord() {
167167
assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.DELETE);
168168
}
169169

170-
@Test
171-
void testPaimonSchemaWiderThanFlussRecord() {
172-
int tableBucket = 0;
173-
RowType tableRowType =
174-
RowType.of(
175-
new org.apache.paimon.types.BooleanType(),
176-
new org.apache.paimon.types.VarCharType(),
177-
// append three system columns: __bucket, __offset,__timestamp
178-
new org.apache.paimon.types.IntType(),
179-
new org.apache.paimon.types.BigIntType(),
180-
new org.apache.paimon.types.LocalZonedTimestampType(3));
181-
182-
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
183-
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
184-
185-
long logOffset = 7L;
186-
long timeStamp = System.currentTimeMillis();
187-
GenericRow genericRow = new GenericRow(1);
188-
genericRow.setField(0, true);
189-
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
190-
flussRecordAsPaimonRow.setFlussRecord(logRecord);
191-
192-
assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(5);
193-
194-
assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue();
195-
assertThat(flussRecordAsPaimonRow.isNullAt(1)).isTrue();
196-
assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket);
197-
assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset);
198-
assertThat(flussRecordAsPaimonRow.getLong(4)).isEqualTo(timeStamp);
199-
}
200-
201-
@Test
202-
void testFlussRecordWiderThanPaimonSchema() {
203-
// With "Lake First" strategy, Fluss record wider than Paimon schema should throw exception
204-
int tableBucket = 0;
205-
RowType tableRowType =
206-
RowType.of(
207-
new org.apache.paimon.types.BooleanType(),
208-
// append three system columns: __bucket, __offset,__timestamp
209-
new org.apache.paimon.types.IntType(),
210-
new org.apache.paimon.types.BigIntType(),
211-
new org.apache.paimon.types.LocalZonedTimestampType(3));
212-
213-
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
214-
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
215-
216-
long logOffset = 7L;
217-
long timeStamp = System.currentTimeMillis();
218-
GenericRow genericRow = new GenericRow(2);
219-
genericRow.setField(0, true);
220-
genericRow.setField(1, BinaryString.fromString("extra"));
221-
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
222-
223-
// Should throw exception instead of silently truncating data
224-
assertThatThrownBy(() -> flussRecordAsPaimonRow.setFlussRecord(logRecord))
225-
.isInstanceOf(IllegalStateException.class)
226-
.hasMessageContaining(
227-
"Fluss record has 2 fields but Paimon schema only has 1 business fields");
228-
}
229-
230170
@Test
231171
void testArrayTypeWithIntElements() {
232172
int tableBucket = 0;

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,11 @@ public void alterTableSchema(
343343
// Update Fluss schema (ZK) after Lake sync succeeds
344344
if (!newSchema.equals(table.getSchema())) {
345345
zookeeperClient.registerSchema(tablePath, newSchema, table.getSchemaId() + 1);
346+
} else {
347+
LOG.info(
348+
"Skipping schema evolution for table {} because the column(s) to add {} already exist.",
349+
tablePath,
350+
schemaChanges);
346351
}
347352
}
348353
} catch (Exception e) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
8888
if (existingColumn != null) {
8989
// Allow idempotent retries: if column name/type/comment match existing, treat as no-op
9090
if (!existingColumn.getDataType().equals(addColumn.getDataType())
91-
|| !Objects.equals(existingColumn.getComment(), addColumn.getComment())) {
91+
|| !Objects.equals(
92+
existingColumn.getComment().orElse(null), addColumn.getComment())) {
9293
throw new IllegalArgumentException(
9394
"Column " + addColumn.getName() + " already exists.");
9495
}

0 commit comments

Comments
 (0)