Skip to content

Commit 289f017

Browse files
committed
[FLINK-39732] Introduce TableDiscoverer SPI for flexible table subscription (with default JdbcTableDiscoverer)
1 parent b6275e7 commit 289f017

7 files changed

Lines changed: 727 additions & 0 deletions

File tree

flink-cdc-common/pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,28 @@ limitations under the License.
2727

2828
<artifactId>flink-cdc-common</artifactId>
2929

30+
<dependencies>
31+
<!-- Test dependencies -->
32+
<dependency>
33+
<groupId>mysql</groupId>
34+
<artifactId>mysql-connector-java</artifactId>
35+
<version>8.0.27</version>
36+
<scope>test</scope>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.testcontainers</groupId>
40+
<artifactId>mysql</artifactId>
41+
<version>${testcontainers.version}</version>
42+
<scope>test</scope>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.testcontainers</groupId>
46+
<artifactId>junit-jupiter</artifactId>
47+
<version>${testcontainers.version}</version>
48+
<scope>test</scope>
49+
</dependency>
50+
</dependencies>
51+
3052
<build>
3153
<plugins>
3254
<plugin>
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.source.discover;
19+
20+
import org.apache.flink.cdc.common.configuration.ConfigOption;
21+
import org.apache.flink.cdc.common.configuration.ConfigOptions;
22+
import org.apache.flink.cdc.common.configuration.Configuration;
23+
import org.apache.flink.cdc.common.event.TableId;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.sql.Connection;
29+
import java.sql.DriverManager;
30+
import java.sql.ResultSet;
31+
import java.sql.Statement;
32+
import java.util.LinkedHashSet;
33+
import java.util.Set;
34+
35+
/**
36+
* A {@link TableDiscoverer} that reads the list of subscribed tables from a JDBC database table.
37+
*
38+
* <p>This implementation connects to any JDBC-compatible database (e.g., MySQL, PostgreSQL) and
39+
* reads table names from a specified column. The table names are parsed as {@link TableId} objects.
40+
*
41+
* <p><b>Configuration keys</b> (read from the full connector configuration):
42+
*
43+
* <ul>
44+
* <li>{@code table.discoverer.jdbc.url} — JDBC connection URL (required).
45+
* <li>{@code table.discoverer.jdbc.table-name} — The database table storing subscription entries
46+
* (required).
47+
* <li>{@code table.discoverer.jdbc.username} — JDBC username (required).
48+
* <li>{@code table.discoverer.jdbc.password} — JDBC password (required).
49+
* <li>{@code table.discoverer.jdbc.column-name} — The column containing fully-qualified table
50+
* names. Defaults to {@code "subscribe_table_name"}.
51+
* </ul>
52+
*
53+
* <p><b>Expected schema:</b> The target column must contain fully-qualified table names formatted
54+
* as {@code "schemaName.tableName"} (two-part) or {@code "namespace.schemaName.tableName"}
55+
* (three-part). For example:
56+
*
57+
* <pre>{@code
58+
* CREATE TABLE cdc_subscriptions (
59+
* subscribe_table_name VARCHAR(255) PRIMARY KEY
60+
* );
61+
* INSERT INTO cdc_subscriptions VALUES ('source_db.orders'), ('source_db.products');
62+
* }</pre>
63+
*
64+
* <p>Null values and rows that cannot be parsed into a valid {@link TableId} are silently skipped.
65+
*/
66+
public class JdbcTableDiscoverer implements TableDiscoverer {
67+
68+
private static final long serialVersionUID = 1L;
69+
70+
private static final Logger LOG = LoggerFactory.getLogger(JdbcTableDiscoverer.class);
71+
72+
public static final ConfigOption<String> JDBC_URL =
73+
ConfigOptions.key("table.discoverer.jdbc.url")
74+
.stringType()
75+
.noDefaultValue()
76+
.withDescription("The JDBC connection URL for the table discovery database.");
77+
78+
public static final ConfigOption<String> TABLE_NAME =
79+
ConfigOptions.key("table.discoverer.jdbc.table-name")
80+
.stringType()
81+
.noDefaultValue()
82+
.withDescription(
83+
"The name of the database table storing the subscription entries.");
84+
85+
public static final ConfigOption<String> USERNAME =
86+
ConfigOptions.key("table.discoverer.jdbc.username")
87+
.stringType()
88+
.noDefaultValue()
89+
.withDescription("The JDBC username for the table discovery database.");
90+
91+
public static final ConfigOption<String> PASSWORD =
92+
ConfigOptions.key("table.discoverer.jdbc.password")
93+
.stringType()
94+
.noDefaultValue()
95+
.withDescription("The JDBC password for the table discovery database.");
96+
97+
public static final ConfigOption<String> COLUMN_NAME =
98+
ConfigOptions.key("table.discoverer.jdbc.column-name")
99+
.stringType()
100+
.defaultValue("subscribe_table_name")
101+
.withDescription(
102+
"The column name in the subscription table that contains the "
103+
+ "fully-qualified table names to subscribe to.");
104+
105+
private transient Connection connection;
106+
private transient String tableName;
107+
private transient String columnName;
108+
109+
@Override
110+
public void open(Context context) throws Exception {
111+
Configuration config = context.getConfiguration();
112+
113+
String jdbcUrl = config.get(JDBC_URL);
114+
if (jdbcUrl == null || jdbcUrl.isEmpty()) {
115+
throw new IllegalArgumentException(
116+
"'" + JDBC_URL.key() + "' is required for JdbcTableDiscoverer.");
117+
}
118+
tableName = config.get(TABLE_NAME);
119+
if (tableName == null || tableName.isEmpty()) {
120+
throw new IllegalArgumentException(
121+
"'" + TABLE_NAME.key() + "' is required for JdbcTableDiscoverer.");
122+
}
123+
String username = config.get(USERNAME);
124+
if (username == null || username.isEmpty()) {
125+
throw new IllegalArgumentException(
126+
"'" + USERNAME.key() + "' is required for JdbcTableDiscoverer.");
127+
}
128+
String password = config.get(PASSWORD);
129+
if (password == null || password.isEmpty()) {
130+
throw new IllegalArgumentException(
131+
"'" + PASSWORD.key() + "' is required for JdbcTableDiscoverer.");
132+
}
133+
columnName = config.get(COLUMN_NAME);
134+
135+
connection = DriverManager.getConnection(jdbcUrl, username, password);
136+
LOG.info(
137+
"JdbcTableDiscoverer opened connection to '{}', table='{}', column='{}'.",
138+
jdbcUrl,
139+
tableName,
140+
columnName);
141+
}
142+
143+
@Override
144+
public Set<TableId> discover() throws Exception {
145+
Set<TableId> result = new LinkedHashSet<>();
146+
String sql = "SELECT " + columnName + " FROM " + tableName;
147+
try (Statement stmt = connection.createStatement();
148+
ResultSet rs = stmt.executeQuery(sql)) {
149+
while (rs.next()) {
150+
String value = rs.getString(1);
151+
if (value == null || value.isEmpty()) {
152+
continue;
153+
}
154+
try {
155+
result.add(TableId.parse(value));
156+
} catch (IllegalArgumentException e) {
157+
LOG.warn(
158+
"Skipping invalid table name '{}' from subscription table '{}'.",
159+
value,
160+
tableName);
161+
}
162+
}
163+
}
164+
LOG.info(
165+
"JdbcTableDiscoverer discovered {} tables from '{}.{}'.",
166+
result.size(),
167+
tableName,
168+
columnName);
169+
return result;
170+
}
171+
172+
@Override
173+
public void close() throws Exception {
174+
if (connection != null && !connection.isClosed()) {
175+
connection.close();
176+
LOG.info("JdbcTableDiscoverer closed JDBC connection.");
177+
}
178+
}
179+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.source.discover;
19+
20+
/**
21+
* {@link TableDiscovererFactory} for {@link JdbcTableDiscoverer}. Activated when {@code
22+
* table.discoverer.type = 'jdbc'}.
23+
*/
24+
public class JdbcTableDiscovererFactory implements TableDiscovererFactory {
25+
26+
public static final String IDENTIFIER = "jdbc";
27+
28+
@Override
29+
public String identifier() {
30+
return IDENTIFIER;
31+
}
32+
33+
@Override
34+
public TableDiscoverer createDiscoverer() {
35+
return new JdbcTableDiscoverer();
36+
}
37+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.source.discover;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.common.configuration.Configuration;
22+
import org.apache.flink.cdc.common.event.TableId;
23+
24+
import java.io.Serializable;
25+
import java.util.Set;
26+
27+
/**
28+
* Pluggable abstraction for discovering a set of tables that a source connector should read.
29+
* Implementations are loaded via SPI through {@link TableDiscovererFactory} and determine which
30+
* tables the source should subscribe to.
31+
*
32+
* <p>Lifecycle: {@link #open(Context)} is called once before the first call to {@link #discover()}.
33+
* {@link #close()} is called when the discoverer is no longer needed. Implementations manage their
34+
* own resources (e.g., connections) within this lifecycle.
35+
*
36+
* <p>Built-in implementations include:
37+
*
38+
* <ul>
39+
* <li>{@link JdbcTableDiscoverer} - reads the subscription list from a JDBC database table.
40+
* </ul>
41+
*/
42+
@PublicEvolving
43+
public interface TableDiscoverer extends Serializable, AutoCloseable {
44+
45+
/**
46+
* Opens this discoverer and initializes any resources needed for table discovery.
47+
*
48+
* @param context The context providing configuration and class loader.
49+
* @throws Exception if initialization fails.
50+
*/
51+
void open(Context context) throws Exception;
52+
53+
/**
54+
* Discovers and returns the set of tables to subscribe to.
55+
*
56+
* @return A set of {@link TableId} representing the tables to read.
57+
* @throws Exception if the discovery fails.
58+
*/
59+
Set<TableId> discover() throws Exception;
60+
61+
/**
62+
* Closes this discoverer and releases any resources.
63+
*
64+
* @throws Exception if closing fails.
65+
*/
66+
@Override
67+
void close() throws Exception;
68+
69+
/** Context providing runtime information for the discoverer. */
70+
interface Context {
71+
72+
/**
73+
* Returns the full connector configuration. Discoverer implementations read their own
74+
* configuration keys (e.g., {@code table.discoverer.jdbc.url}) directly from this
75+
* configuration.
76+
*/
77+
Configuration getConfiguration();
78+
79+
/** Returns the user code class loader of the current session. */
80+
ClassLoader getUserCodeClassLoader();
81+
}
82+
}

0 commit comments

Comments
 (0)