Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class FlussMetaDataApplier implements MetadataApplier {
private Set<SchemaChangeEventType> enabledEventTypes =
new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));

private transient Connection connection;
private transient Admin admin;

public FlussMetaDataApplier(
Configuration flussClientConfig,
Map<String, String> tableProperties,
Expand Down Expand Up @@ -89,22 +92,22 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
@Override
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
LOG.info("fluss metadata applier receive schemaChangeEvent {}", schemaChangeEvent);
Admin admin = getAdmin();
if (schemaChangeEvent instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
applyCreateTable(createTableEvent);
applyCreateTable(admin, createTableEvent);
} else if (schemaChangeEvent instanceof DropTableEvent) {
DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent;
applyDropTable(dropTableEvent);
applyDropTable(admin, dropTableEvent);
} else {
throw new IllegalArgumentException(
"fluss metadata applier only support CreateTableEvent now but receives "
+ schemaChangeEvent);
}
}

private void applyCreateTable(CreateTableEvent event) {
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
Admin admin = connection.getAdmin()) {
private void applyCreateTable(Admin admin, CreateTableEvent event) {
try {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
String tableIdentifier = tablePath.getDatabaseName() + "." + tablePath.getTableName();
Expand All @@ -126,9 +129,8 @@ private void applyCreateTable(CreateTableEvent event) {
}
}

private void applyDropTable(DropTableEvent event) {
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
Admin admin = connection.getAdmin()) {
private void applyDropTable(Admin admin, DropTableEvent event) {
try {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
admin.dropTable(tablePath, true).get();
Expand All @@ -138,6 +140,24 @@ private void applyDropTable(DropTableEvent event) {
}
}

private Admin getAdmin() {
if (connection == null) {
connection = ConnectionFactory.createConnection(flussClientConfig);
admin = connection.getAdmin();
}
return admin;
}

@Override
public void close() throws Exception {
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
}

private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo currentTableInfo) {
List<String> inferredPrimaryKeyColumnNames =
inferredFlussTable.getSchema().getPrimaryKeyColumnNames().stream()
Expand Down
Loading