Skip to content

Commit aea7b63

Browse files
committed
fix: ENV VAR kv parsing to handle json values
- JOB_KUBE_ANNOTATIONS can have JSON as value - This fixes the parsin of such values
1 parent 27927d8 commit aea7b63

File tree

4 files changed

+129
-28
lines changed

4 files changed

+129
-28
lines changed

airbyte-commons-with-dependencies/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ dependencies {
1616

1717
testImplementation(libs.mockito.core)
1818
testImplementation(libs.bundles.micronaut.test)
19+
testImplementation(libs.assertj.core)
1920
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package io.airbyte.commons.workers.config;
2+
3+
import lombok.AccessLevel;
4+
import lombok.NoArgsConstructor;
5+
import lombok.extern.slf4j.Slf4j;
6+
7+
import java.util.Collections;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.regex.Matcher;
11+
import java.util.regex.Pattern;
12+
13+
@Slf4j
14+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
15+
class EnvUtils {
16+
17+
private static final Pattern KEY_JSON_VALUE_PATTERN = Pattern.compile("(?<key>[^=]+)\\s*?=\\s*?(?<value>\\{[^=]+})\\s*,?\\s*");
18+
private static final Pattern KEY_VALUE_PATTERN = Pattern.compile("(?<key>[^=]+)=(?<value>[^=,]+),?");
19+
private static final String KEY_GROUP_ALIAS = "key";
20+
private static final String VALUE_GROUP_ALIAS = "value";
21+
22+
/**
23+
* Splits key value pairs from the input string into a map. Each kv-pair is separated by a ','.
24+
* </br>
25+
* The key and the value are separated by '='.
26+
* <p>
27+
* For example:- The following represents two map entries
28+
* </p>
29+
* - key1=value1,key2=value2
30+
* </br>
31+
* - key1={key11: value1},key2={key22: value2}
32+
* </br>
33+
* - key1={key11: value11, key12: value12},key2={key21: value21, key22: value22}
34+
*
35+
* @param input string
36+
* @return map containing kv pairs
37+
*/
38+
public static Map<String, String> splitKVPairsFromEnvString(final String input) {
39+
if (input == null || input.isBlank()) {
40+
return Map.of();
41+
}
42+
final Map<String, String> jsonValuesMatchResult = match(input, KEY_JSON_VALUE_PATTERN);
43+
return jsonValuesMatchResult.isEmpty()
44+
? getKVPairsMatchedWithSimplePattern(input)
45+
: jsonValuesMatchResult;
46+
}
47+
48+
private static Map<String, String> match(final String input, final Pattern pattern) {
49+
final Matcher matcher = pattern.matcher(input);
50+
final Map<String, String> kvResult = new HashMap<>();
51+
while (matcher.find()) {
52+
kvResult.put(matcher.group(KEY_GROUP_ALIAS).trim(), matcher.group(VALUE_GROUP_ALIAS).trim());
53+
}
54+
return kvResult;
55+
}
56+
57+
private static Map<String, String> getKVPairsMatchedWithSimplePattern(final String input) {
58+
final Map<String, String> stringMatchResult = match(input, KEY_VALUE_PATTERN);
59+
if (stringMatchResult.isEmpty()) {
60+
log.warn("No valid key value pairs found in the input string: {}", input);
61+
return Collections.emptyMap();
62+
}
63+
return stringMatchResult;
64+
}
65+
66+
}

airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package io.airbyte.commons.workers.config;
66

7-
import com.google.common.base.Splitter;
87
import com.google.common.base.Strings;
98
import io.airbyte.config.ResourceRequirements;
109
import io.airbyte.config.ResourceRequirementsType;
@@ -208,25 +207,25 @@ private WorkerConfigs getConfig(final KubeResourceKey key) {
208207
.orElseThrow(() -> new NoSuchElementException(String.format("Unable to find config: {variant:%s, type:%s, subtype:%s}",
209208
key.variant, key.type, key.subType)));
210209

211-
final Map<String, String> isolatedNodeSelectors = splitKVPairsFromEnvString(workerConfigsDefaults.isolatedNodeSelectors);
210+
final Map<String, String> isolatedNodeSelectors = EnvUtils.splitKVPairsFromEnvString(workerConfigsDefaults.isolatedNodeSelectors);
212211
validateIsolatedPoolConfigInitialization(workerConfigsDefaults.useCustomNodeSelector(), isolatedNodeSelectors);
213212

214213
// if annotations are not defined for this specific resource, then fallback to the default
215214
// resource's annotations
216215
final Map<String, String> annotations;
217216
if (Strings.isNullOrEmpty(kubeResourceConfig.getAnnotations())) {
218-
annotations = splitKVPairsFromEnvString(workerConfigsDefaults.defaultKubeResourceConfig.getAnnotations());
217+
annotations = EnvUtils.splitKVPairsFromEnvString(workerConfigsDefaults.defaultKubeResourceConfig.getAnnotations());
219218
} else {
220-
annotations = splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations());
219+
annotations = EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations());
221220
}
222221

223222
return new WorkerConfigs(
224223
getResourceRequirementsFrom(kubeResourceConfig, workerConfigsDefaults.defaultKubeResourceConfig()),
225224
TolerationPOJO.getJobKubeTolerations(workerConfigsDefaults.jobKubeTolerations()),
226-
splitKVPairsFromEnvString(kubeResourceConfig.getNodeSelectors()),
225+
EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getNodeSelectors()),
227226
workerConfigsDefaults.useCustomNodeSelector() ? Optional.of(isolatedNodeSelectors) : Optional.empty(),
228227
annotations,
229-
splitKVPairsFromEnvString(kubeResourceConfig.getLabels()),
228+
EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getLabels()),
230229
workerConfigsDefaults.mainContainerImagePullSecret(),
231230
workerConfigsDefaults.mainContainerImagePullPolicy());
232231
}
@@ -321,28 +320,6 @@ private Optional<KubeResourceKey> parseKubeResourceKey(final String value) {
321320
return Optional.empty();
322321
}
323322

324-
/**
325-
* Splits key value pairs from the input string into a map. Each kv-pair is separated by a ','. The
326-
* key and the value are separated by '='.
327-
* <p>
328-
* For example:- The following represents two map entries
329-
* </p>
330-
* key1=value1,key2=value2
331-
*
332-
* @param input string
333-
* @return map containing kv pairs
334-
*/
335-
private Map<String, String> splitKVPairsFromEnvString(final String input) {
336-
if (input == null || input.isBlank()) {
337-
return Map.of();
338-
}
339-
return Splitter.on(",")
340-
.splitToStream(input)
341-
.filter(s -> !Strings.isNullOrEmpty(s) && s.contains("="))
342-
.map(s -> s.split("="))
343-
.collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim()));
344-
}
345-
346323
private ResourceRequirements getResourceRequirementsFrom(final KubeResourceConfig kubeResourceConfig, final KubeResourceConfig defaultConfig) {
347324
return new ResourceRequirements()
348325
.withCpuLimit(useDefaultIfEmpty(kubeResourceConfig.getCpuLimit(), defaultConfig.getCpuLimit()))
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.airbyte.commons.workers.config;
2+
3+
import org.junit.jupiter.params.ParameterizedTest;
4+
import org.junit.jupiter.params.provider.Arguments;
5+
import org.junit.jupiter.params.provider.MethodSource;
6+
7+
import java.util.Collections;
8+
import java.util.Map;
9+
import java.util.stream.Stream;
10+
11+
import static org.assertj.core.api.Assertions.assertThat;
12+
13+
class EnvUtilsTest {
14+
15+
@ParameterizedTest
16+
@MethodSource("splitKVPairsFromEnvString")
17+
void splitKVPairsFromEnvString(String input, Map<String, String> expected) {
18+
final Map<String, String> result = EnvUtils.splitKVPairsFromEnvString(input);
19+
assertThat(result).isEqualTo(expected);
20+
}
21+
22+
private static Stream<Arguments> splitKVPairsFromEnvString() {
23+
return Stream.of(
24+
// unmatched
25+
Arguments.of("key1", Collections.emptyMap()),
26+
Arguments.of("key1,value", Collections.emptyMap()),
27+
Arguments.of("key1-value", Collections.emptyMap()),
28+
Arguments.of("key1:value", Collections.emptyMap()),
29+
// matched k:v pairs
30+
Arguments.of("key1=value1", Map.of("key1", "value1")),
31+
Arguments.of("key1 = value1", Map.of("key1", "value1")),
32+
Arguments.of("key1=value1,key2=value2", Map.of("key1", "value1", "key2", "value2")),
33+
Arguments.of("key1 = value1, key2 = value2", Map.of("key1", "value1", "key2", "value2")),
34+
// matched k:jsonV pairs
35+
Arguments.of("key1={value1}", Map.of("key1", "{value1}")),
36+
Arguments.of("key1={ value1 }", Map.of("key1", "{ value1 }")),
37+
Arguments.of("key1 = { value1 }", Map.of("key1", "{ value1 }")),
38+
Arguments.of("key1={value1},key2={value2}", Map.of("key1", "{value1}", "key2", "{value2}")),
39+
Arguments.of("key1= {value1} , key2={value2}", Map.of("key1", "{value1}", "key2", "{value2}")),
40+
Arguments.of("key1= {value1 } , key2= { value2}", Map.of("key1", "{value1 }", "key2", "{ value2}")),
41+
Arguments.of("key1={key11: value11},key2={key22: value22}", Map.of(
42+
"key1", "{key11: value11}",
43+
"key2", "{key22: value22}"
44+
)),
45+
Arguments.of("key1={key11: value11, key12: value12},key2={key21: value21, key22: value22}", Map.of(
46+
"key1", "{key11: value11, key12: value12}",
47+
"key2", "{key21: value21, key22: value22}"
48+
)),
49+
Arguments.of("key1={key11: value11, key12: value12, key13: {key131: value131}},"
50+
+ "key2={key21: value21, key22: value22, key23: {key231: value231}}", Map.of(
51+
"key1", "{key11: value11, key12: value12, key13: {key131: value131}}",
52+
"key2", "{key21: value21, key22: value22, key23: {key231: value231}}"
53+
))
54+
);
55+
}
56+
57+
}

0 commit comments

Comments
 (0)