Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public Class<? extends Task> taskClass() {

@Override
public Config validate(Map<String, String> connectorConfigs) {
log.info("Validating JDBC Source connector configurations for testing");
return new JdbcSourceConnectorValidation(connectorConfigs).validate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,19 @@ public JdbcSourceConnectorValidation(Map<String, String> connectorConfigs) {
* Perform validation and return the Config object with any validation errors.
*/
public Config validate() {
log.info("Starting testing validation process");
try {
// Run validateAll() if not already done
if (validationResult == null && connectorConfigs != null) {
log.info("Running validateAll on connector configs");
Map<String, ConfigValue> configValuesMap = JdbcSourceConnectorConfig.CONFIG_DEF
.validateAll(connectorConfigs);
List<ConfigValue> configValues = new ArrayList<>(configValuesMap.values());
validationResult = new Config(configValues);
}

log.info("Validation result after validateAll: {}", this.validationResult);

boolean hasValidateAllErrors = validationResult.configValues().stream()
.anyMatch(configValue -> !configValue.errorMessages().isEmpty());

Expand All @@ -81,18 +85,25 @@ public Config validate() {
}

if (config == null && connectorConfigs != null) {
log.info("Creating JdbcSourceConnectorConfig for further validation");
config = new JdbcSourceConnectorConfig(connectorConfigs);
}

boolean validationResult = validateMultiConfigs()
&& validateLegacyNewConfigCompatibility()
&& validateQueryConfigs();

log.info("Validation result after multi-configs and legacy/new config compatibility: {}",
validationResult);

if (validationResult && isUsingNewConfigs()) {
log.info("Using new configs only - performing new config specific validations");
validationResult = validateTableInclusionConfigs()
&& validateTsAndIncModeColumnRequirements();
}

log.info("Validation result after core validations: {}", validationResult);

validationResult = validationResult && validatePluginSpecificNeeds();
if (!validationResult) {
log.info("Validation failed");
Expand All @@ -102,6 +113,7 @@ && validateLegacyNewConfigCompatibility()
} catch (Exception e) {
log.error("Error during validation", e);
}
log.info("Final validation result: {}", this.validationResult);
return this.validationResult;
}

Expand Down Expand Up @@ -301,17 +313,26 @@ private boolean validateTableInclusionConfigs() {
* Both configs should not be set simultaneously to avoid ambiguity.
*/
private boolean validateQueryConfigs() {
log.info("All config keys received: {}", config.originalsStrings().keySet());
log.info("Raw query.masked value: {}", config.originalsStrings().get("query.masked"));

String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
log.info("Raw query value: {}", query);
String queryMaskedValue = null;
org.apache.kafka.common.config.types.Password queryMasked =
config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG);
log.info("Retrieved query.masked Password object: {}", queryMasked);
if (queryMasked != null && queryMasked.value() != null) {
queryMaskedValue = queryMasked.value();
}

log.info("Processed query.masked value: {}", queryMaskedValue);

boolean hasQuery = query != null && !query.isEmpty();
boolean hasQueryMasked = queryMaskedValue != null && !queryMaskedValue.isEmpty();

log.info("Has query: {}, Has query.masked: {}", hasQuery, hasQueryMasked);

if (hasQuery && hasQueryMasked) {
String msg = "Both 'query' and 'query.masked' configs cannot be set at the same time. "
+ "Please use only one of them.";
Expand All @@ -320,10 +341,12 @@ private boolean validateQueryConfigs() {
addConfigError(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, msg);

log.error("Validation failed: Both query and query.masked configs are set");
log.info("Exiting validateQueryConfigs with failure");
return false;
}

if (config.getQuery().isPresent() && isUsingTableFilteringConfigs()) {
log.info("Both query and table filtering configs are set");
String msg =
"Do not specify table filtering configs with 'query'. "
+ "Remove table.whitelist / table.blacklist / table.include.list / "
Expand All @@ -344,6 +367,7 @@ private boolean validateQueryConfigs() {
}
return false;
}
log.info("Validating SQL query statement if query is present");

return !config.getQuery().isPresent()
|| validateSqlQueryStatement(config.getQuery().get());
Expand Down Expand Up @@ -481,28 +505,40 @@ protected void addConfigError(String configName, String errorMessage) {

/** Validate that provided query strings start with a SELECT statement. */
private boolean validateSqlQueryStatement(String statement) {
log.info("Validating SQL query statement: {}", statement);
String configName = config.isQueryMasked()
? JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG
: JdbcSourceConnectorConfig.QUERY_CONFIG;
log.info("Using config name for error reporting: {}", configName);

String trimmedStatement = statement.trim();
if (!SELECT_STATEMENT_PATTERN.matcher(trimmedStatement).find()) {
String msg =
"Only SELECT statements are supported for query config value. "
+ "Please provide a statement that starts with SELECT.";
addConfigError("query", msg);
addConfigError(configName, msg);
log.error(msg);
log.info("Exiting validateSqlQueryStatement with failure");
return false;
}
try {
log.info("Validating SQL syntax for statement");
SqlParser.validateSqlSyntax(trimmedStatement);
log.info("SQL syntax validation succeeded for statement");
} catch (JSQLParserException e) {
log.info("SQL syntax validation failed with exception: {}", e.getMessage());
String msg =
"Invalid SQL syntax for query config value. Please provide "
+ "a syntactically correct SELECT statement.";
addConfigError("query", msg);
addConfigError(configName, msg);
log.error(
"SQL syntax validation failed for query config: {}",
msg
);
log.info("Exiting validateSqlQueryStatement with failure");
return false;
}
log.info("Exiting validateSqlQueryStatement with success");
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,9 +847,9 @@ public void validate_withQueryMaskedStartingWithUpdate_setsError() {
validate();

assertErrors(1);
assertErrors(QUERY_CONFIG, 1);
assertErrors(QUERY_MASKED_CONFIG, 1);
assertErrorMatches(
QUERY_CONFIG,
QUERY_MASKED_CONFIG,
"Only SELECT statements are supported for query"
);
}
Expand All @@ -876,64 +876,64 @@ public void validate_withInvalidQueryMaskedSyntax_setsError() {
props.put(QUERY_MASKED_CONFIG, "SELECT FROM users");
validate();
assertErrors(1);
assertErrors(QUERY_CONFIG, 1);
assertErrorMatches(QUERY_CONFIG, ".*Invalid SQL syntax for query.*");
assertErrors(QUERY_MASKED_CONFIG, 1);
assertErrorMatches(QUERY_MASKED_CONFIG, ".*Invalid SQL syntax for query.*");

// Test 2: Missing table name after FROM
props.put(QUERY_MASKED_CONFIG, "SELECT * FROM");
validate();
assertErrors(1);
assertErrors(QUERY_CONFIG, 1);
assertErrorMatches(QUERY_CONFIG, ".*Invalid SQL syntax for query.*");
assertErrors(QUERY_MASKED_CONFIG, 1);
assertErrorMatches(QUERY_MASKED_CONFIG, ".*Invalid SQL syntax for query.*");

// Test 3: Incomplete WHERE clause
props.put(QUERY_MASKED_CONFIG, "SELECT * FROM users WHERE");
validate();
assertErrors(1);
assertErrors(QUERY_CONFIG, 1);
assertErrorMatches(QUERY_CONFIG, ".*Invalid SQL syntax for query.*");
assertErrors(QUERY_MASKED_CONFIG, 1);
assertErrorMatches(QUERY_MASKED_CONFIG, ".*Invalid SQL syntax for query.*");

// Test 4: Incomplete condition in WHERE clause
props.put(QUERY_MASKED_CONFIG, "SELECT * FROM users WHERE id =");
validate();
assertErrors(1);
assertErrors(QUERY_CONFIG, 1);
assertErrorMatches(QUERY_CONFIG, ".*Invalid SQL syntax for query.*");
assertErrors(QUERY_MASKED_CONFIG, 1);
assertErrorMatches(QUERY_MASKED_CONFIG, ".*Invalid SQL syntax for query.*");

// Test 5: Incomplete ORDER BY clause
props.put(QUERY_MASKED_CONFIG, "SELECT * FROM users ORDER BY");
validate();
assertErrors(1);
assertErrors(QUERY_CONFIG, 1);
assertErrorMatches(QUERY_CONFIG, ".*Invalid SQL syntax for query.*");
assertErrors(QUERY_MASKED_CONFIG, 1);
assertErrorMatches(QUERY_MASKED_CONFIG, ".*Invalid SQL syntax for query.*");

// Test 6: Trailing comma in column list
props.put(QUERY_MASKED_CONFIG, "SELECT id, name, FROM users");
validate();
assertErrors(1);
assertErrors(QUERY_CONFIG, 1);
assertErrorMatches(QUERY_CONFIG, ".*Invalid SQL syntax for query.*");
assertErrors(QUERY_MASKED_CONFIG, 1);
assertErrorMatches(QUERY_MASKED_CONFIG, ".*Invalid SQL syntax for query.*");

// Test 7: Missing table name in JOIN
props.put(QUERY_MASKED_CONFIG, "SELECT * FROM users JOIN ON id = order_id");
validate();
assertErrors(1);
assertErrors(QUERY_CONFIG, 1);
assertErrorMatches(QUERY_CONFIG, ".*Invalid SQL syntax for query.*");
assertErrors(QUERY_MASKED_CONFIG, 1);
assertErrorMatches(QUERY_MASKED_CONFIG, ".*Invalid SQL syntax for query.*");

// Test 8: Unbalanced parentheses
props.put(QUERY_MASKED_CONFIG, "SELECT * FROM users WHERE (id = 1");
validate();
assertErrors(1);
assertErrors(QUERY_CONFIG, 1);
assertErrorMatches(QUERY_CONFIG, ".*Invalid SQL syntax for query.*");
assertErrors(QUERY_MASKED_CONFIG, 1);
assertErrorMatches(QUERY_MASKED_CONFIG, ".*Invalid SQL syntax for query.*");

// Test 9: Missing GROUP BY column
props.put(QUERY_MASKED_CONFIG, "SELECT COUNT(*) FROM users GROUP BY");
validate();
assertErrors(1);
assertErrors(QUERY_CONFIG, 1);
assertErrorMatches(QUERY_CONFIG, ".*Invalid SQL syntax for query.*");
assertErrors(QUERY_MASKED_CONFIG, 1);
assertErrorMatches(QUERY_MASKED_CONFIG, ".*Invalid SQL syntax for query.*");

// Test 10: Basic SELECT * query
props.put(QUERY_MASKED_CONFIG, "SELECT * FROM users");
Expand Down