Skip to content

Commit 39b4f56

Browse files
committed
use once admin to apply SchemaChange
1 parent e96cee4 commit 39b4f56

1 file changed

Lines changed: 5 additions & 6 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,22 +92,22 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
9292
@Override
9393
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
9494
LOG.info("fluss metadata applier receive schemaChangeEvent {}", schemaChangeEvent);
95+
Admin admin = getAdmin();
9596
if (schemaChangeEvent instanceof CreateTableEvent) {
9697
CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
97-
applyCreateTable(createTableEvent);
98+
applyCreateTable(admin, createTableEvent);
9899
} else if (schemaChangeEvent instanceof DropTableEvent) {
99100
DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent;
100-
applyDropTable(dropTableEvent);
101+
applyDropTable(admin, dropTableEvent);
101102
} else {
102103
throw new IllegalArgumentException(
103104
"fluss metadata applier only support CreateTableEvent now but receives "
104105
+ schemaChangeEvent);
105106
}
106107
}
107108

108-
private void applyCreateTable(CreateTableEvent event) {
109+
private void applyCreateTable(Admin admin, CreateTableEvent event) {
109110
try {
110-
Admin admin = getAdmin();
111111
TableId tableId = event.tableId();
112112
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
113113
String tableIdentifier = tablePath.getDatabaseName() + "." + tablePath.getTableName();
@@ -129,9 +129,8 @@ private void applyCreateTable(CreateTableEvent event) {
129129
}
130130
}
131131

132-
private void applyDropTable(DropTableEvent event) {
132+
private void applyDropTable(Admin admin, DropTableEvent event) {
133133
try {
134-
Admin admin = getAdmin();
135134
TableId tableId = event.tableId();
136135
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
137136
admin.dropTable(tablePath, true).get();

0 commit comments

Comments
 (0)