Skip to content

Commit a524c0a

Browse files
author
sdomalap
committed
Don't allow regex in the source topic when a specific table is provided as destination
1 parent efd1df0 commit a524c0a

File tree

1 file changed

+24
-5
lines changed

1 file changed

+24
-5
lines changed

datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnector.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,31 @@ public void initializeDatastream(Datastream stream, List<Datastream> allDatastre
131131
if (placeholderIndex != -1 && placeholderIndex == destinationConnectionString.lastIndexOf(MM_TOPIC_PLACEHOLDER)) {
132132
LOG.info("Allowing user managed datastream destination with connector {} for datastream {}", stream.getConnectorName(), stream.getName());
133133
} else {
134-
// TODO: Implement validation on the source to confirm that it is not a regex/wildcard source
134+
135+
// validation on the source to confirm that it is not a regex/wildcard source
136+
// handling for DOT character is enough as Kafka allows only alphanumeric characters, periods (.), underscores (_), and hyphens (-) in its topic names
137+
KafkaConnectionString connectionString = KafkaConnectionString.valueOf(stream.getSource().getConnectionString());
138+
String topicName = connectionString.getTopicName();
139+
int indexOfDOT = topicName.indexOf(".");
140+
while (indexOfDOT >= 0) {
141+
if (indexOfDOT == 0 || topicName.charAt(indexOfDOT - 1) != '\\') {
142+
LOG.error("DOT character is not escaped. Regex in the source is not supported with explicit user managed datastream destination.");
143+
throw new DatastreamValidationException(
144+
String.format("Datastream source is invalid for connector %s. Datastream: %s. {}" +
145+
"Regex in the source is not supported with explicit user managed datastream destination.",
146+
stream.getConnectorName(),
147+
stream
148+
)
149+
);
150+
}
151+
indexOfDOT = topicName.indexOf(".", indexOfDOT + 1);
152+
}
153+
135154
LOG.warn("Allowing explicit user managed datastream destination with connector {} for datastream {}", stream.getConnectorName(), stream.getName());
136-
// LOG.error("User managed datastream destination format is invalid for connector {} and datastream {}", stream.getConnectorName(), stream.getName());
137-
// throw new DatastreamValidationException(
138-
// String.format("Datastream destination format is invalid for connector %s. Datastream: %s", stream.getConnectorName(), stream)
139-
// );
155+
//LOG.error("User managed datastream destination format is invalid for connector {} and datastream {}", stream.getConnectorName(), stream.getName());
156+
//throw new DatastreamValidationException(
157+
// String.format("Datastream destination format is invalid for connector %s. Datastream: %s", stream.getConnectorName(), stream)
158+
//);
140159
}
141160
} else {
142161
throw new DatastreamValidationException(

0 commit comments

Comments
 (0)