Skip to content

Commit 8510a86

Browse files
committed
fix: ENV VAR kv parsing to handle json values
- JOB_KUBE_ANNOTATIONS can have JSON as value - This fixes the parsin of such values
1 parent 27927d8 commit 8510a86

File tree

4 files changed

+130
-29
lines changed

4 files changed

+130
-29
lines changed

airbyte-commons-with-dependencies/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ dependencies {
1616

1717
testImplementation(libs.mockito.core)
1818
testImplementation(libs.bundles.micronaut.test)
19+
testImplementation(libs.assertj.core)
1920
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.workers.config;
6+
7+
import java.util.Collections;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.regex.Matcher;
11+
import java.util.regex.Pattern;
12+
import lombok.AccessLevel;
13+
import lombok.NoArgsConstructor;
14+
import lombok.extern.slf4j.Slf4j;
15+
16+
@Slf4j
17+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
18+
class EnvUtils {
19+
20+
private static final Pattern KEY_JSON_VALUE_PATTERN = Pattern.compile("(?<key>[^=]+)\\s*?=\\s*?(?<value>\\{[^=]+})\\s*,?\\s*");
21+
private static final Pattern KEY_VALUE_PATTERN = Pattern.compile("(?<key>[^=]+)=(?<value>[^=,]+),?");
22+
private static final String KEY_GROUP_ALIAS = "key";
23+
private static final String VALUE_GROUP_ALIAS = "value";
24+
25+
/**
26+
* Splits key value pairs from the input string into a map. Each kv-pair is separated by a ','.
27+
* </br>
28+
* The key and the value are separated by '='.
29+
* <p>
30+
* For example:- The following represents two map entries
31+
* </p>
32+
* - key1=value1,key2=value2 </br>
33+
* - key1={key11: value1},key2={key22: value2} </br>
34+
* - key1={key11: value11, key12: value12},key2={key21: value21, key22: value22}
35+
*
36+
* @param input string
37+
* @return map containing kv pairs
38+
*/
39+
public static Map<String, String> splitKVPairsFromEnvString(final String input) {
40+
if (input == null || input.isBlank()) {
41+
return Map.of();
42+
}
43+
final Map<String, String> jsonValuesMatchResult = match(input, KEY_JSON_VALUE_PATTERN);
44+
return jsonValuesMatchResult.isEmpty()
45+
? getKVPairsMatchedWithSimplePattern(input)
46+
: jsonValuesMatchResult;
47+
}
48+
49+
private static Map<String, String> match(final String input, final Pattern pattern) {
50+
final Matcher matcher = pattern.matcher(input);
51+
final Map<String, String> kvResult = new HashMap<>();
52+
while (matcher.find()) {
53+
kvResult.put(matcher.group(KEY_GROUP_ALIAS).trim(), matcher.group(VALUE_GROUP_ALIAS).trim());
54+
}
55+
return kvResult;
56+
}
57+
58+
private static Map<String, String> getKVPairsMatchedWithSimplePattern(final String input) {
59+
final Map<String, String> stringMatchResult = match(input, KEY_VALUE_PATTERN);
60+
if (stringMatchResult.isEmpty()) {
61+
log.warn("No valid key value pairs found in the input string: {}", input);
62+
return Collections.emptyMap();
63+
}
64+
return stringMatchResult;
65+
}
66+
67+
}

airbyte-commons-with-dependencies/src/main/java/io/airbyte/commons/workers/config/WorkerConfigsProvider.java

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package io.airbyte.commons.workers.config;
66

7-
import com.google.common.base.Splitter;
87
import com.google.common.base.Strings;
98
import io.airbyte.config.ResourceRequirements;
109
import io.airbyte.config.ResourceRequirementsType;
@@ -24,7 +23,6 @@
2423
import java.util.Optional;
2524
import java.util.regex.Matcher;
2625
import java.util.regex.Pattern;
27-
import java.util.stream.Collectors;
2826

2927
/**
3028
* Provide WorkerConfigs.
@@ -208,25 +206,25 @@ private WorkerConfigs getConfig(final KubeResourceKey key) {
208206
.orElseThrow(() -> new NoSuchElementException(String.format("Unable to find config: {variant:%s, type:%s, subtype:%s}",
209207
key.variant, key.type, key.subType)));
210208

211-
final Map<String, String> isolatedNodeSelectors = splitKVPairsFromEnvString(workerConfigsDefaults.isolatedNodeSelectors);
209+
final Map<String, String> isolatedNodeSelectors = EnvUtils.splitKVPairsFromEnvString(workerConfigsDefaults.isolatedNodeSelectors);
212210
validateIsolatedPoolConfigInitialization(workerConfigsDefaults.useCustomNodeSelector(), isolatedNodeSelectors);
213211

214212
// if annotations are not defined for this specific resource, then fallback to the default
215213
// resource's annotations
216214
final Map<String, String> annotations;
217215
if (Strings.isNullOrEmpty(kubeResourceConfig.getAnnotations())) {
218-
annotations = splitKVPairsFromEnvString(workerConfigsDefaults.defaultKubeResourceConfig.getAnnotations());
216+
annotations = EnvUtils.splitKVPairsFromEnvString(workerConfigsDefaults.defaultKubeResourceConfig.getAnnotations());
219217
} else {
220-
annotations = splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations());
218+
annotations = EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations());
221219
}
222220

223221
return new WorkerConfigs(
224222
getResourceRequirementsFrom(kubeResourceConfig, workerConfigsDefaults.defaultKubeResourceConfig()),
225223
TolerationPOJO.getJobKubeTolerations(workerConfigsDefaults.jobKubeTolerations()),
226-
splitKVPairsFromEnvString(kubeResourceConfig.getNodeSelectors()),
224+
EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getNodeSelectors()),
227225
workerConfigsDefaults.useCustomNodeSelector() ? Optional.of(isolatedNodeSelectors) : Optional.empty(),
228226
annotations,
229-
splitKVPairsFromEnvString(kubeResourceConfig.getLabels()),
227+
EnvUtils.splitKVPairsFromEnvString(kubeResourceConfig.getLabels()),
230228
workerConfigsDefaults.mainContainerImagePullSecret(),
231229
workerConfigsDefaults.mainContainerImagePullPolicy());
232230
}
@@ -321,28 +319,6 @@ private Optional<KubeResourceKey> parseKubeResourceKey(final String value) {
321319
return Optional.empty();
322320
}
323321

324-
/**
325-
* Splits key value pairs from the input string into a map. Each kv-pair is separated by a ','. The
326-
* key and the value are separated by '='.
327-
* <p>
328-
* For example:- The following represents two map entries
329-
* </p>
330-
* key1=value1,key2=value2
331-
*
332-
* @param input string
333-
* @return map containing kv pairs
334-
*/
335-
private Map<String, String> splitKVPairsFromEnvString(final String input) {
336-
if (input == null || input.isBlank()) {
337-
return Map.of();
338-
}
339-
return Splitter.on(",")
340-
.splitToStream(input)
341-
.filter(s -> !Strings.isNullOrEmpty(s) && s.contains("="))
342-
.map(s -> s.split("="))
343-
.collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim()));
344-
}
345-
346322
private ResourceRequirements getResourceRequirementsFrom(final KubeResourceConfig kubeResourceConfig, final KubeResourceConfig defaultConfig) {
347323
return new ResourceRequirements()
348324
.withCpuLimit(useDefaultIfEmpty(kubeResourceConfig.getCpuLimit(), defaultConfig.getCpuLimit()))
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.workers.config;
6+
7+
import static org.assertj.core.api.Assertions.assertThat;
8+
9+
import java.util.Collections;
10+
import java.util.Map;
11+
import java.util.stream.Stream;
12+
import org.junit.jupiter.params.ParameterizedTest;
13+
import org.junit.jupiter.params.provider.Arguments;
14+
import org.junit.jupiter.params.provider.MethodSource;
15+
16+
class EnvUtilsTest {
17+
18+
@ParameterizedTest
19+
@MethodSource("splitKVPairsFromEnvString")
20+
void splitKVPairsFromEnvString(String input, Map<String, String> expected) {
21+
final Map<String, String> result = EnvUtils.splitKVPairsFromEnvString(input);
22+
assertThat(result).isEqualTo(expected);
23+
}
24+
25+
private static Stream<Arguments> splitKVPairsFromEnvString() {
26+
return Stream.of(
27+
// unmatched
28+
Arguments.of("key1", Collections.emptyMap()),
29+
Arguments.of("key1,value", Collections.emptyMap()),
30+
Arguments.of("key1-value", Collections.emptyMap()),
31+
Arguments.of("key1:value", Collections.emptyMap()),
32+
// matched k:v pairs
33+
Arguments.of("key1=value1", Map.of("key1", "value1")),
34+
Arguments.of("key1 = value1", Map.of("key1", "value1")),
35+
Arguments.of("key1=value1,key2=value2", Map.of("key1", "value1", "key2", "value2")),
36+
Arguments.of("key1 = value1, key2 = value2", Map.of("key1", "value1", "key2", "value2")),
37+
// matched k:jsonV pairs
38+
Arguments.of("key1={value1}", Map.of("key1", "{value1}")),
39+
Arguments.of("key1={ value1 }", Map.of("key1", "{ value1 }")),
40+
Arguments.of("key1 = { value1 }", Map.of("key1", "{ value1 }")),
41+
Arguments.of("key1={value1},key2={value2}", Map.of("key1", "{value1}", "key2", "{value2}")),
42+
Arguments.of("key1= {value1} , key2={value2}", Map.of("key1", "{value1}", "key2", "{value2}")),
43+
Arguments.of("key1= {value1 } , key2= { value2}", Map.of("key1", "{value1 }", "key2", "{ value2}")),
44+
Arguments.of("key1={key11: value11},key2={key22: value22}", Map.of(
45+
"key1", "{key11: value11}",
46+
"key2", "{key22: value22}")),
47+
Arguments.of("key1={key11: value11, key12: value12},key2={key21: value21, key22: value22}", Map.of(
48+
"key1", "{key11: value11, key12: value12}",
49+
"key2", "{key21: value21, key22: value22}")),
50+
Arguments.of("key1={key11: value11, key12: value12, key13: {key131: value131}},"
51+
+ "key2={key21: value21, key22: value22, key23: {key231: value231}}",
52+
Map.of(
53+
"key1", "{key11: value11, key12: value12, key13: {key131: value131}}",
54+
"key2", "{key21: value21, key22: value22, key23: {key231: value231}}")));
55+
}
56+
57+
}

0 commit comments

Comments
 (0)