Skip to content

Commit 5e7ed26

Browse files
committed
Alter approach based on feedback
Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
1 parent 4ade400 commit 5e7ed26

6 files changed

Lines changed: 56 additions & 60 deletions

File tree

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,11 @@
267267
<artifactId>connect-api</artifactId>
268268
<version>${kafka.version}</version>
269269
</dependency>
270+
<dependency>
271+
<groupId>org.apache.kafka</groupId>
272+
<artifactId>connect-runtime</artifactId>
273+
<version>${kafka.version}</version>
274+
</dependency>
270275
</dependencies>
271276
</plugin>
272277
</plugins>

src/main/java/io/aiven/commons/kafka/config/ConverterType.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ public enum ConverterType {
3030
/** AvroConverter */
3131
AVRO("io.confluent.connect.avro.AvroConverter");
3232

33-
private final String name;
33+
private final String className;
3434

35-
ConverterType(final String name) {
36-
this.name = name;
35+
ConverterType(final String className) {
36+
this.className = className;
3737
}
3838

3939
/**
@@ -42,10 +42,10 @@ public enum ConverterType {
4242
* @param name is the full name including package of the converter
4343
* @return An enum instance of ConverterType
4444
*/
45-
public static ConverterType forName(final String name) {
45+
public static ConverterType forClassName(final Class<?> name) {
4646
Objects.requireNonNull(name, "name cannot be null");
4747
for (final ConverterType converterType : values()) {
48-
if (converterType.name.equalsIgnoreCase(name)) {
48+
if (converterType.className.equalsIgnoreCase(name.getName())) {
4949
return converterType;
5050
}
5151
}

src/main/java/io/aiven/commons/kafka/config/fragment/CommonConfigFragment.java

Lines changed: 22 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ public static Setter setter(final Map<String, String> data) {
5252
@SuppressWarnings("PMD.AvoidUsingHardCodedIP")
5353
public static ConfigDef update(final ConfigDef configDef) {
5454
int orderInGroup = 0;
55-
final String commonGroup = "common";
56-
55+
final String commonGroup = "Common";
5756
SinceInfo tasksMaxSince =
5857
SinceInfo.builder()
5958
.groupId("org.apache.kafka")
@@ -67,56 +66,25 @@ public static ConfigDef update(final ConfigDef configDef) {
6766
SinceInfo.Builder siBuilder =
6867
SinceInfo.builder().groupId("io.aiven.commons").artifactId("kafka-config").version("1.0.0");
6968

70-
return configDef
71-
.define(
72-
ExtendedConfigKey.builder(ConnectorConfig.TASKS_MAX_CONFIG)
73-
.type(ConfigDef.Type.INT)
74-
.defaultValue(1)
75-
.validator(atLeast(1))
76-
.importance(ConfigDef.Importance.HIGH)
77-
.group(commonGroup)
78-
.orderInGroup(++orderInGroup)
79-
.width(ConfigDef.Width.SHORT)
80-
.documentation("Maximum number of tasks to use for this connector.")
81-
.since(tasksMaxSince)
82-
.build())
83-
.define(
84-
ExtendedConfigKey.builder(TASK_ID)
85-
.type(ConfigDef.Type.INT)
86-
.defaultValue(1)
87-
.validator(atLeast(0))
88-
.importance(ConfigDef.Importance.HIGH)
89-
.group(commonGroup)
90-
.orderInGroup(++orderInGroup)
91-
.width(ConfigDef.Width.SHORT)
92-
.internalConfig(true)
93-
.documentation("The task ID that this connector is working with.")
94-
.since(siBuilder.version("1.0.0").build())
95-
.build())
96-
.define(
97-
ExtendedConfigKey.builder(VALUE_CONVERTER)
98-
.type(ConfigDef.Type.STRING)
99-
.validator(new ConfigDef.NonEmptyStringWithoutControlChars())
100-
.importance(ConfigDef.Importance.MEDIUM)
101-
.group(commonGroup)
102-
.orderInGroup(++orderInGroup)
103-
.width(ConfigDef.Width.SHORT)
104-
.internalConfig(true)
105-
.documentation("The converter to use with the value.")
106-
.since(siBuilder.version("1.0.0").build())
107-
.build())
108-
.define(
109-
ExtendedConfigKey.builder(KEY_CONVERTER)
110-
.type(ConfigDef.Type.STRING)
111-
.validator(new ConfigDef.NonEmptyStringWithoutControlChars())
112-
.importance(ConfigDef.Importance.MEDIUM)
113-
.group(commonGroup)
114-
.orderInGroup(++orderInGroup)
115-
.width(ConfigDef.Width.SHORT)
116-
.internalConfig(true)
117-
.documentation("The converter to use with the key.")
118-
.since(siBuilder.version("1.0.0").build())
119-
.build());
69+
for (var configKey : ConnectorConfig.configDef().configKeys().values()) {
70+
if (configKey.hasDefault() && !configDef.configKeys().containsValue(configKey)) {
71+
configDef.define(ExtendedConfigKey.create(configKey));
72+
}
73+
}
74+
75+
return configDef.define(
76+
ExtendedConfigKey.builder(TASK_ID)
77+
.type(ConfigDef.Type.INT)
78+
.defaultValue(1)
79+
.validator(atLeast(0))
80+
.importance(ConfigDef.Importance.HIGH)
81+
.group(commonGroup)
82+
.orderInGroup(++orderInGroup)
83+
.width(ConfigDef.Width.SHORT)
84+
.internalConfig(true)
85+
.documentation("The task ID that this connector is working with.")
86+
.since(siBuilder.version("1.0.0").build())
87+
.build());
12088
}
12189

12290
/**
@@ -152,7 +120,7 @@ public Integer getMaxTasks() {
152120
* @return the converter used for the key portion of the kafka event
153121
*/
154122
public ConverterType getKeyConverter() {
155-
return ConverterType.forName(getString(KEY_CONVERTER));
123+
return ConverterType.forClassName(getClass(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG));
156124
}
157125

158126
/**
@@ -161,7 +129,7 @@ public ConverterType getKeyConverter() {
161129
* @return the converter used for the value portion of the kafka event
162130
*/
163131
public ConverterType getValueConverter() {
164-
return ConverterType.forName(getString(VALUE_CONVERTER));
132+
return ConverterType.forClassName(getClass(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG));
165133
}
166134

167135
/** Setter to programmatically set values in the configuraiotn. */

src/main/java/io/aiven/commons/kafka/config/fragment/ConfigFragment.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ public Password getPassword(final String key) {
180180
return dataAccess.getPassword(key);
181181
}
182182

183+
@Override
184+
public Class<?> getClass(String key) {
185+
return (Class<?>) dataAccess.getClass(key);
186+
}
187+
183188
@Override
184189
public <T> T getConfiguredInstance(final String key, final Class<? extends T> clazz) {
185190
return dataAccess.getConfiguredInstance(key, clazz);

src/main/java/io/aiven/commons/kafka/config/fragment/FragmentDataAccess.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ public Password getPassword(final String key) {
8383
return cfg.getPassword(key);
8484
}
8585

86+
@Override
87+
public Class<?> getClass(final String key) {
88+
return cfg.getClass(key);
89+
}
90+
8691
@Override
8792
public <T> T getConfiguredInstance(final String key, final Class<? extends T> clazz) {
8893
return cfg.getConfiguredInstance(key, clazz);
@@ -142,6 +147,11 @@ public Password getPassword(final String key) {
142147
return (Password) configValues.get(key).value();
143148
}
144149

150+
@Override
151+
public Class<?> getClass(final String key) {
152+
return (Class<?>) configValues.get(key).value();
153+
}
154+
145155
@Override
146156
public <T> T getConfiguredInstance(final String key, final Class<? extends T> clazz) {
147157
Object obj = configValues.get(key).value();
@@ -237,6 +247,14 @@ public <T> T getConfiguredInstance(final String key, final Class<? extends T> cl
237247
*/
238248
Password getPassword(String key);
239249

250+
/**
251+
* 1 Get the class associated with the key
252+
*
253+
* @param key the key to look up
254+
* @return the class associated with the key
255+
*/
256+
Class<?> getClass(String key);
257+
240258
/**
241259
* Creates a configured instance of the class specified by the key. The value of the key may be
242260
* either a class name or the instance of the class itsef.

src/site/markdown/CommonConfigDef.md.vm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ ${esc.hash}${esc.hash} ${configKey.displayName}
3333
#if (${configKey.isDeprecated})**${configKey.deprecated}** #end
3434

3535
- Configuration option: ${configKey.name}
36-
#if ($stringUtils.isNotEmpty(${configKey.since}))- Since: ${configKey.since}#end
36+
#if ($stringUtils.isNotEmpty(${configKey.getSince()}))- Since: ${configKey.getSince()}#end
3737

3838
- Default value: ${configKey.getDefaultValue()|"none"}
3939
- Type: $configKey.type

0 commit comments

Comments
 (0)