Skip to content

Commit 2f98f33

Browse files
authored
Revert "Add SqlParser Utility to parse and validate SQL query (#1599)" (#1607)
This reverts commit 2294e24.
1 parent a4721ae commit 2f98f33

File tree

8 files changed

+29
-1422
lines changed

8 files changed

+29
-1422
lines changed

pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
<jtds.driver.version>1.3.1</jtds.driver.version>
6464
<slf4j.version>1.7.36</slf4j.version>
6565
<reload4j.version>1.2.19</reload4j.version>
66-
<jsqlparser.version>4.9</jsqlparser.version>
6766
<licenses.name>Confluent Community License</licenses.name>
6867
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
6968
<project.package.home>target/${project.artifactId}-${project.version}-package</project.package.home>
@@ -111,11 +110,6 @@
111110
<groupId>com.google.re2j</groupId>
112111
<artifactId>re2j</artifactId>
113112
<version>1.7</version>
114-
</dependency>
115-
<dependency>
116-
<groupId>com.github.jsqlparser</groupId>
117-
<artifactId>jsqlparser</artifactId>
118-
<version>${jsqlparser.version}</version>
119113
</dependency>
120114
<!-- JDBC drivers, only included in runtime so they get packaged -->
121115
<dependency>

src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package io.confluent.connect.jdbc.source;
1717

1818
import io.confluent.connect.jdbc.util.LogUtil;
19-
import io.confluent.connect.jdbc.util.SqlParser;
2019
import org.apache.kafka.connect.source.SourceRecord;
2120
import org.slf4j.Logger;
2221
import org.slf4j.LoggerFactory;
@@ -189,27 +188,6 @@ protected void recordQuery(String query) {
189188
}
190189
}
191190

192-
/**
193-
* Returns the query string with sensitive data redacted using SQL parsing. This method preserves
194-
* the query structure while masking literal values, which is useful for troubleshooting during
195-
* exceptions.
196-
*
197-
* @return the query with sensitive data redacted, or null if query.masked is not configured or no
198-
* query is available
199-
*/
200-
public String getRedactedQueryString() {
201-
if (!shouldRedactSensitiveLogs) {
202-
return null;
203-
}
204-
if (loggedQueryString != null) {
205-
return SqlParser.redactSensitiveData(loggedQueryString);
206-
}
207-
if (query != null) {
208-
return SqlParser.redactSensitiveData(query);
209-
}
210-
return null;
211-
}
212-
213191
@Override
214192
public int compareTo(TableQuerier other) {
215193
if (this.lastUpdate < other.lastUpdate) {

src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,8 @@ private void handleNonTransientException(RecordDestination<SourceRecord> destina
135135
TableQuerier querier, SQLNonTransientException sqle) {
136136
SQLException redactedException = shouldRedactSensitiveLogs
137137
? LogUtil.redactSensitiveData(sqle) : sqle;
138-
log.error(
139-
"Non-transient SQL exception while running query for table: {}. Query: {}"
140-
+ ", sqlState: {}, vendorCode: {}.",
141-
querier,
142-
querier.getRedactedQueryString(),
143-
sqle.getSQLState(),
144-
sqle.getErrorCode(),
145-
redactedException);
138+
log.error("Non-transient SQL exception while running query for table: {}",
139+
querier, redactedException);
146140
resetAndRequeueHead(querier, true);
147141
// This task has failed, report failure to destination
148142
destination.failWith(new ConnectException(redactedException));
@@ -153,13 +147,8 @@ private void handleSqlException(RecordDestination<SourceRecord> destination,
153147
SQLException redactedException = shouldRedactSensitiveLogs
154148
? LogUtil.redactSensitiveData(sqle) : sqle;
155149
log.error(
156-
"SQL exception while running query for table: {}. Query: {},"
157-
+ " sqlState: {}, vendorCode: {}."
158-
+ " Attempting retry {} of {} attempts.",
150+
"SQL exception while running query for table: {}." + " Attempting retry {} of {} attempts.",
159151
querier,
160-
querier.getRedactedQueryString(),
161-
sqle.getSQLState(),
162-
sqle.getErrorCode(),
163152
querier.getAttemptedRetryCount() + 1,
164153
maxRetriesPerQuerier,
165154
redactedException);
@@ -174,23 +163,14 @@ private void handleSqlException(RecordDestination<SourceRecord> destination,
174163
}
175164

176165
private void handleInterruptedException(TableQuerier querier, InterruptedException e) {
177-
log.error(
178-
"Interrupted while running query for table: {}. Query: {}",
179-
querier,
180-
querier.getRedactedQueryString(),
181-
e);
182166
resetAndRequeueHead(querier, true);
183167
// Interruption should not be treated as a failure, just stop processing
184168
Thread.currentThread().interrupt();
185169
}
186170

187171
private void handleThrowable(RecordDestination<SourceRecord> destination,
188172
TableQuerier querier, Throwable t) {
189-
log.error(
190-
"Failed to run query for table: {}. Query: {}",
191-
querier,
192-
querier.getRedactedQueryString(),
193-
t);
173+
log.error("Failed to run query for table: {}", querier, t);
194174
resetAndRequeueHead(querier, true);
195175
// This task has failed, report failure to destination
196176
destination.failWith(new ConnectException("Error while processing table querier", t));

src/main/java/io/confluent/connect/jdbc/util/SqlParser.java

Lines changed: 0 additions & 153 deletions
This file was deleted.

src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
2020
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig;
2121
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TransactionIsolationMode;
22-
import io.confluent.connect.jdbc.util.SqlParser;
23-
import net.sf.jsqlparser.JSQLParserException;
2422
import org.apache.kafka.common.config.Config;
2523
import org.apache.kafka.common.config.ConfigValue;
2624
import org.slf4j.Logger;
@@ -330,6 +328,7 @@ private boolean validateQueryConfigs() {
330328
+ "table.exclude.list when using query mode"
331329
+ " or 'query' when using table filtering mode.";
332330
addConfigError(JdbcSourceConnectorConfig.QUERY_CONFIG, msg);
331+
addConfigError(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, msg);
333332
if (!config.getTableWhitelistSet().isEmpty()) {
334333
addConfigError(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, msg);
335334
}
@@ -345,8 +344,17 @@ private boolean validateQueryConfigs() {
345344
return false;
346345
}
347346

348-
return !config.getQuery().isPresent()
349-
|| validateSqlQueryStatement(config.getQuery().get());
347+
if (hasQuery
348+
&& !validateSelectStatement(query, JdbcSourceConnectorConfig.QUERY_CONFIG)) {
349+
return false;
350+
}
351+
if (hasQueryMasked
352+
&& !validateSelectStatement(
353+
queryMaskedValue, JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG)) {
354+
return false;
355+
}
356+
357+
return true;
350358
}
351359

352360
/**
@@ -480,29 +488,18 @@ protected void addConfigError(String configName, String errorMessage) {
480488
}
481489

482490
/** Validate that provided query strings start with a SELECT statement. */
483-
private boolean validateSqlQueryStatement(String statement) {
491+
private boolean validateSelectStatement(String statement, String configKey) {
484492
String trimmedStatement = statement.trim();
485493
if (!SELECT_STATEMENT_PATTERN.matcher(trimmedStatement).find()) {
486494
String msg =
487-
"Only SELECT statements are supported for query config value. "
488-
+ "Please provide a statement that starts with SELECT.";
489-
addConfigError("query", msg);
495+
String.format(
496+
"Only SELECT statements are supported for '%s'. Please provide "
497+
+ "a statement that starts with SELECT.",
498+
configKey);
499+
addConfigError(configKey, msg);
490500
log.error(msg);
491501
return false;
492502
}
493-
try {
494-
SqlParser.validateSqlSyntax(trimmedStatement);
495-
} catch (JSQLParserException e) {
496-
String msg =
497-
"Invalid SQL syntax for query config value. Please provide "
498-
+ "a syntactically correct SELECT statement.";
499-
addConfigError("query", msg);
500-
log.error(
501-
"SQL syntax validation failed for query config: {}",
502-
msg
503-
);
504-
return false;
505-
}
506503
return true;
507504
}
508505

src/test/java/io/confluent/connect/jdbc/source/TableQuerierTest.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.confluent.connect.jdbc.source.TableQuerier.QueryMode;
2020
import io.confluent.connect.jdbc.util.ColumnId;
2121
import io.confluent.connect.jdbc.util.ExpressionBuilder;
22-
import io.confluent.connect.jdbc.util.SqlParser;
2322
import io.confluent.connect.jdbc.util.TableId;
2423

2524
import java.sql.Connection;
@@ -29,7 +28,6 @@
2928
import org.junit.Test;
3029
import org.mockito.Matchers;
3130

32-
import static org.junit.Assert.*;
3331
import static org.mockito.Mockito.mock;
3432
import static org.mockito.Mockito.times;
3533
import static org.mockito.Mockito.verify;
@@ -151,38 +149,5 @@ public void testBulkTableQuerierInQueryModeWithoutSuffix() throws SQLException {
151149
querier.createPreparedStatement(connectionMock);
152150

153151
verify(databaseDialectMock, times(1)).createPreparedStatement(Matchers.any(),Matchers.eq("SELECT * FROM name"));
154-
}
155-
156-
@Test
157-
public void testGetParsedQueryStringRedactsSensitiveValuesWhenRedactionDisabled() {
158-
String query = "SELECT * FROM users WHERE id = 12345 AND name = 'John Doe'";
159-
BulkTableQuerier querier = new BulkTableQuerier(
160-
databaseDialectMock,
161-
QueryMode.QUERY,
162-
query,
163-
null,
164-
"",
165-
true
166-
);
167-
168-
String result = querier.getRedactedQueryString();
169-
String expected = "SELECT * FROM users WHERE id = 0 AND name = " + SqlParser.REDACTED_STRING;
170-
171-
assertEquals(expected, result);
172-
}
173-
174-
@Test
175-
public void testGetParsedQueryStringReturnsNullWhenRedactionDisabled() {
176-
String query = "SELECT * FROM users WHERE id = 12345 AND name = 'John Doe'";
177-
BulkTableQuerier querier = new BulkTableQuerier(
178-
databaseDialectMock,
179-
QueryMode.QUERY,
180-
query,
181-
null,
182-
"",
183-
false
184-
);
185-
186-
assertNull(querier.getRedactedQueryString());
187-
}
152+
}
188153
}

0 commit comments

Comments
 (0)