Skip to content

Commit 76ba065

Browse files
authored
Add table filters in Rds source configuration (#5494)
* Add table filters and refactor schema manager methods Signed-off-by: Hai Yan <[email protected]> * Update tests Signed-off-by: Hai Yan <[email protected]> * Check included tables during stream processing Signed-off-by: Hai Yan <[email protected]> * Fix a few issues from tests Signed-off-by: Hai Yan <[email protected]> * Address review comments Signed-off-by: Hai Yan <[email protected]> --------- Signed-off-by: Hai Yan <[email protected]>
1 parent f793741 commit 76ba065

20 files changed

+614
-222
lines changed

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java

+12-9
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
1515
import org.opensearch.dataprepper.model.record.Record;
1616
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
17+
import org.opensearch.dataprepper.plugins.source.rds.configuration.TableFilterConfig;
1718
import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler;
1819
import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler;
1920
import org.opensearch.dataprepper.plugins.source.rds.export.ExportTaskManager;
@@ -42,9 +43,9 @@
4243
import java.util.ArrayList;
4344
import java.util.List;
4445
import java.util.Map;
46+
import java.util.Set;
4547
import java.util.concurrent.ExecutorService;
4648
import java.util.concurrent.Executors;
47-
import java.util.stream.Collectors;
4849

4950
public class RdsService {
5051
private static final Logger LOG = LoggerFactory.getLogger(RdsService.class);
@@ -168,7 +169,8 @@ public void shutdown() {
168169
}
169170
}
170171

171-
private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
172+
// Visible for testing
173+
SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
172174
final ConnectionManager connectionManager = new ConnectionManagerFactory(sourceConfig, dbMetadata).getConnectionManager();
173175
return new SchemaManagerFactory(connectionManager).getSchemaManager();
174176
}
@@ -196,7 +198,9 @@ private String getS3PathPrefix() {
196198
final String s3PathPrefix;
197199
if (sourceCoordinator.getPartitionPrefix() != null ) {
198200
// The prefix will be used in RDS export, which has a limit of 60 characters.
199-
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + IdentifierShortener.shortenIdentifier(sourceCoordinator.getPartitionPrefix(), MAX_SOURCE_IDENTIFIER_LENGTH);
201+
final String uniqueIdentifier = IdentifierShortener.shortenIdentifier(sourceCoordinator.getPartitionPrefix(), MAX_SOURCE_IDENTIFIER_LENGTH);
202+
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + uniqueIdentifier;
203+
LOG.info("Unique identifier used in S3 path prefix is {}", uniqueIdentifier);
200204
} else {
201205
s3PathPrefix = s3UserPathPrefix;
202206
}
@@ -209,11 +213,10 @@ private DbTableMetadata getDbTableMetadata(final DbMetadata dbMetadata, final Sc
209213
}
210214

211215
private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) {
212-
return sourceConfig.getTableNames().stream()
213-
.collect(Collectors.toMap(
214-
fullTableName -> fullTableName,
215-
fullTableName -> schemaManager.getColumnDataTypes(fullTableName)
216-
));
216+
TableFilterConfig tableFilterConfig = sourceConfig.getTables();
217+
Set<String> tableNames = schemaManager.getTableNames(tableFilterConfig.getDatabase());
218+
tableFilterConfig.applyTableFilter(tableNames);
219+
LOG.info("These tables will be include in processing: {}", tableNames);
220+
return schemaManager.getColumnDataTypes(new ArrayList<>(tableNames));
217221
}
218-
219222
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java

+5-8
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig;
1515
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
1616
import org.opensearch.dataprepper.plugins.source.rds.configuration.ExportConfig;
17+
import org.opensearch.dataprepper.plugins.source.rds.configuration.TableFilterConfig;
1718
import org.opensearch.dataprepper.plugins.source.rds.configuration.TlsConfig;
1819
import software.amazon.awssdk.regions.Region;
1920

2021
import java.time.Duration;
21-
import java.util.List;
2222

2323
/**
2424
* Configuration for RDS Source
@@ -43,11 +43,8 @@ public class RdsSourceConfig {
4343
@NotNull
4444
private EngineType engine;
4545

46-
/**
47-
* The table name is in the format of `database.table` for MySQL engine
48-
*/
49-
@JsonProperty("table_names")
50-
private List<String> tableNames;
46+
@JsonProperty("tables")
47+
private TableFilterConfig tableFilterConfig;
5148

5249
@JsonProperty("aws")
5350
@NotNull
@@ -114,8 +111,8 @@ public boolean isAurora() {
114111
return engine.isAurora();
115112
}
116113

117-
public List<String> getTableNames() {
118-
return tableNames;
114+
public TableFilterConfig getTables() {
115+
return tableFilterConfig;
119116
}
120117

121118
public AwsAuthenticationConfig getAwsAuthenticationConfig() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.source.rds.configuration;
11+
12+
import com.fasterxml.jackson.annotation.JsonProperty;
13+
import jakarta.validation.constraints.NotEmpty;
14+
import jakarta.validation.constraints.Size;
15+
import lombok.Getter;
16+
17+
import java.util.Collections;
18+
import java.util.List;
19+
import java.util.Set;
20+
import java.util.stream.Collectors;
21+
22+
@Getter
23+
public class TableFilterConfig {
24+
25+
@JsonProperty("database")
26+
@NotEmpty
27+
private String database;
28+
29+
@JsonProperty("include")
30+
@Size(max = 1000, message = "Table filter list should not be more than 1000")
31+
private List<String> include = Collections.emptyList();
32+
33+
@JsonProperty("exclude")
34+
@Size(max = 1000, message = "Table filter list should not be more than 1000")
35+
private List<String> exclude = Collections.emptyList();
36+
37+
/**
38+
* This method applies the table filter configuration to the given set of table names.
39+
*
40+
* @param tableNames The set of table names to be filtered
41+
*/
42+
public void applyTableFilter(Set<String> tableNames) {
43+
if (!getInclude().isEmpty()) {
44+
List<String> includeTableList = getInclude().stream()
45+
.map(item -> getDatabase() + "." + item)
46+
.collect(Collectors.toList());
47+
tableNames.retainAll(includeTableList);
48+
}
49+
50+
if (!getExclude().isEmpty()) {
51+
List<String> excludeTableList = getExclude().stream()
52+
.map(item -> getDatabase() + "." + item)
53+
.collect(Collectors.toList());
54+
excludeTableList.forEach(tableNames::remove);
55+
}
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.source.rds.exception;
11+
12+
/**
13+
* Exception to indicate failed to get metadata from SQL database
14+
*/
15+
public class SqlMetadataException extends RuntimeException {
16+
public SqlMetadataException(final String message, final Throwable throwable) {
17+
super(message, throwable);
18+
}
19+
20+
public SqlMetadataException(final String message) {
21+
super(message);
22+
}
23+
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java

+12-18
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@
2727
import org.slf4j.LoggerFactory;
2828

2929
import java.time.Duration;
30+
import java.util.ArrayList;
3031
import java.util.List;
3132
import java.util.Map;
3233
import java.util.Optional;
3334
import java.util.Set;
3435
import java.util.UUID;
35-
import java.util.stream.Collectors;
3636

3737
import static org.opensearch.dataprepper.plugins.source.rds.RdsService.S3_PATH_DELIMITER;
3838

@@ -46,22 +46,24 @@ public class LeaderScheduler implements Runnable {
4646
private final RdsSourceConfig sourceConfig;
4747
private final String s3Prefix;
4848
private final SchemaManager schemaManager;
49-
private final DbTableMetadata dbTableMetadataMetadata;
49+
private final DbTableMetadata dbTableMetadata;
5050

5151
private LeaderPartition leaderPartition;
52+
private List<String> tableNames;
5253
private StreamPartition streamPartition = null;
5354
private volatile boolean shutdownRequested = false;
5455

5556
public LeaderScheduler(final EnhancedSourceCoordinator sourceCoordinator,
5657
final RdsSourceConfig sourceConfig,
5758
final String s3Prefix,
5859
final SchemaManager schemaManager,
59-
final DbTableMetadata dbTableMetadataMetadata) {
60+
final DbTableMetadata dbTableMetadata) {
6061
this.sourceCoordinator = sourceCoordinator;
6162
this.sourceConfig = sourceConfig;
6263
this.s3Prefix = s3Prefix;
6364
this.schemaManager = schemaManager;
64-
this.dbTableMetadataMetadata = dbTableMetadataMetadata;
65+
this.dbTableMetadata = dbTableMetadata;
66+
tableNames = new ArrayList<>(dbTableMetadata.getTableColumnDataTypeMap().keySet());
6567
}
6668

6769
@Override
@@ -137,7 +139,7 @@ private void init() {
137139
// Create a Global state in the coordination table for rds cluster/instance information.
138140
// Global State here is designed to be able to read whenever needed
139141
// So that the jobs can refer to the configuration.
140-
sourceCoordinator.createPartition(new GlobalState(sourceConfig.getDbIdentifier(), dbTableMetadataMetadata.toMap()));
142+
sourceCoordinator.createPartition(new GlobalState(sourceConfig.getDbIdentifier(), dbTableMetadata.toMap()));
141143
LOG.debug("Created global state for DB: {}", sourceConfig.getDbIdentifier());
142144

143145
if (sourceConfig.isExportEnabled()) {
@@ -162,7 +164,7 @@ private void createExportPartition(RdsSourceConfig sourceConfig) {
162164
progressState.setBucket(sourceConfig.getS3Bucket());
163165
// This prefix is for data exported from RDS
164166
progressState.setPrefix(getS3PrefixForExport(s3Prefix));
165-
progressState.setTables(sourceConfig.getTableNames());
167+
progressState.setTables(tableNames);
166168
progressState.setKmsKeyId(sourceConfig.getExport().getKmsKeyId());
167169
progressState.setPrimaryKeyMap(getPrimaryKeyMap());
168170
ExportPartition exportPartition = new ExportPartition(sourceConfig.getDbIdentifier(), sourceConfig.isCluster(), progressState);
@@ -174,19 +176,11 @@ private String getS3PrefixForExport(final String givenS3Prefix) {
174176
}
175177

176178
private Map<String, List<String>> getPrimaryKeyMap() {
177-
return sourceConfig.getTableNames().stream()
178-
.collect(Collectors.toMap(
179-
fullTableName -> fullTableName,
180-
fullTableName -> schemaManager.getPrimaryKeys(fullTableName)
181-
));
179+
return schemaManager.getPrimaryKeys(tableNames);
182180
}
183181

184182
private Map<String, Set<String>> getPostgresEnumColumnsByTable() {
185-
return sourceConfig.getTableNames().stream()
186-
.collect(Collectors.toMap(
187-
fullTableName -> fullTableName,
188-
fullTableName -> ((PostgresSchemaManager)schemaManager).getEnumColumns(fullTableName)
189-
));
183+
return ((PostgresSchemaManager) schemaManager).getEnumColumns(tableNames);
190184
}
191185

192186
private void createStreamPartition(RdsSourceConfig sourceConfig) {
@@ -197,14 +191,14 @@ private void createStreamPartition(RdsSourceConfig sourceConfig) {
197191
if (sourceConfig.getEngine().isMySql()) {
198192
final MySqlStreamState mySqlStreamState = new MySqlStreamState();
199193
getCurrentBinlogPosition().ifPresent(mySqlStreamState::setCurrentPosition);
200-
mySqlStreamState.setForeignKeyRelations(((MySqlSchemaManager)schemaManager).getForeignKeyRelations(sourceConfig.getTableNames()));
194+
mySqlStreamState.setForeignKeyRelations(((MySqlSchemaManager)schemaManager).getForeignKeyRelations(tableNames));
201195
progressState.setMySqlStreamState(mySqlStreamState);
202196
} else {
203197
// Postgres
204198
// Create replication slot, which will mark the starting point for stream
205199
final String publicationName = generatePublicationName();
206200
final String slotName = generateReplicationSlotName();
207-
((PostgresSchemaManager)schemaManager).createLogicalReplicationSlot(sourceConfig.getTableNames(), publicationName, slotName);
201+
((PostgresSchemaManager)schemaManager).createLogicalReplicationSlot(tableNames, publicationName, slotName);
208202
final PostgresStreamState postgresStreamState = new PostgresStreamState();
209203
postgresStreamState.setPublicationName(publicationName);
210204
postgresStreamState.setReplicationSlotName(slotName);

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/ConnectionManagerFactory.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
1414
import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata;
1515

16-
import java.util.List;
17-
1816
public class ConnectionManagerFactory {
1917
private final RdsSourceConfig sourceConfig;
2018
private final DbMetadata dbMetadata;
@@ -40,10 +38,6 @@ public ConnectionManager getConnectionManager() {
4038
sourceConfig.getAuthenticationConfig().getUsername(),
4139
sourceConfig.getAuthenticationConfig().getPassword(),
4240
sourceConfig.isTlsEnabled(),
43-
getDatabaseName(sourceConfig.getTableNames()));
44-
}
45-
46-
private String getDatabaseName(List<String> tableNames) {
47-
return tableNames.get(0).split("\\.")[0];
41+
sourceConfig.getTables().getDatabase());
4842
}
4943
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlConnectionManager.java

+5
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@
55

66
package org.opensearch.dataprepper.plugins.source.rds.schema;
77

8+
import org.slf4j.Logger;
9+
810
import java.sql.Connection;
911
import java.sql.DriverManager;
1012
import java.sql.SQLException;
1113
import java.util.Properties;
1214

1315
public class MySqlConnectionManager implements ConnectionManager {
16+
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(MySqlConnectionManager.class);
17+
1418
static final String JDBC_URL_FORMAT = "jdbc:mysql://%s:%d";
1519
static final String USERNAME_KEY = "user";
1620
static final String PASSWORD_KEY = "password";
@@ -46,6 +50,7 @@ public Connection getConnection() throws SQLException {
4650
}
4751
props.setProperty(TINY_INT_ONE_IS_BIT_KEY, FALSE_VALUE);
4852
final String jdbcUrl = String.format(JDBC_URL_FORMAT, hostName, port);
53+
LOG.debug("Connecting to JDBC URL: {}", jdbcUrl);
4954
return doGetConnection(jdbcUrl, props);
5055
}
5156

0 commit comments

Comments
 (0)