Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/** cdc sync tools. */
public class CdcTools {
private static final List<String> EMPTY_KEYS =
Collections.singletonList(DatabaseSyncConfig.PASSWORD);

// Regex pattern to find environment variables like $VAR or ${VAR}
private static final Pattern ENV_VAR_PATTERN =
Pattern.compile("\\$(?:([A-Za-z_][A-Za-z0-9_]*)|\\{([A-Za-z_][A-Za-z0-9_]*)\\})");

private static StreamExecutionEnvironment flinkEnvironmentForTesting;
private static JobClient jobClient;

Expand Down Expand Up @@ -221,7 +228,30 @@ public static Map<String, String> getConfigMap(MultipleParameterTool params, Str
for (String param : params.getMultiParameter(key)) {
String[] kv = param.split("=", 2);
if (kv.length == 2) {
map.put(kv[0].trim(), kv[1].trim());
String originalValue = kv[1].trim();
String resolvedValue = originalValue;

// Use pre-compiled pattern to find environment variables like $VAR or ${VAR}
Matcher matcher = ENV_VAR_PATTERN.matcher(originalValue);
StringBuffer sb = new StringBuffer();
boolean varFound = false;
while (matcher.find()) {
varFound = true;
String varName = matcher.group(1) != null ? matcher.group(1) : matcher.group(2);
String envValue = System.getenv(varName);
if (envValue != null) {
// Replace with environment variable value
matcher.appendReplacement(sb, Matcher.quoteReplacement(envValue));
} else {
// If environment variable is not found, keep the original placeholder
matcher.appendReplacement(sb, Matcher.quoteReplacement(matcher.group(0)));
}
}
if (varFound) {
matcher.appendTail(sb);
resolvedValue = sb.toString();
}
map.put(kv[0].trim(), resolvedValue);
continue;
} else if (kv.length == 1 && EMPTY_KEYS.contains(kv[0])) {
map.put(kv[0].trim(), "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class Mysql2DorisE2ECase extends AbstractE2EService {
private static final Logger LOG = LoggerFactory.getLogger(Mysql2DorisE2ECase.class);
Expand Down Expand Up @@ -581,6 +582,117 @@ public void testMySQL2DorisMultiDatabase2OneSync() throws Exception {
cancelE2EJob(jobName);
}

@Test
public void testMySQL2DorisEnvVar() throws Exception {
String jobName = "testMySQL2DorisEnvVar";
String resourcePath = "container/e2e/mysql2doris/testMySQL2DorisEnvVar.txt";

// Set environment variables for testing
setEnvironmentVariable("TABLE_PREFIX", "env_");
setEnvironmentVariable("TABLE_SUFFIX", "_test");

try {
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisEnvVar_init.sql");
startMysql2DorisJob(jobName, resourcePath);

// wait 2 times checkpoint
Thread.sleep(20000);
LOG.info("Start to verify create table result with environment variable resolution.");
String tblQuery =
String.format(
"SELECT TABLE_NAME \n"
+ "FROM INFORMATION_SCHEMA.TABLES \n"
+ "WHERE TABLE_SCHEMA = '%s'",
"test_e2e_mysql_env");
// Verify that tables are created with env variable resolved prefix and suffix
List<String> expectedTables =
Arrays.asList(
"env_tbl1_test", "env_tbl2_test", "env_tbl3_test", "env_tbl5_test");
ContainerUtils.checkResult(
getDorisQueryConnection(), LOG, expectedTables, tblQuery, 1, false);

LOG.info("Start to verify init result with environment variable resolution.");
List<String> expected =
Arrays.asList(
"doris_env_1,1", "doris_env_2,2", "doris_env_3,3", "doris_env_5,5");
String sql1 =
"select * from ( select * from test_e2e_mysql_env.env_tbl1_test union all select * from test_e2e_mysql_env.env_tbl2_test union all select * from test_e2e_mysql_env.env_tbl3_test union all select * from test_e2e_mysql_env.env_tbl5_test) res order by 1";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2);

// add incremental data to verify CDC continues working with env vars
ContainerUtils.executeSQLStatement(
getMySQLQueryConnection(),
LOG,
"insert into test_e2e_mysql_env.tbl1 values ('doris_env_1_1',10)",
"insert into test_e2e_mysql_env.tbl2 values ('doris_env_2_1',11)",
"update test_e2e_mysql_env.tbl1 set age=18 where name='doris_env_1'",
"delete from test_e2e_mysql_env.tbl2 where name='doris_env_2'");
Thread.sleep(20000);

LOG.info(
"Start to verify incremental data result with environment variable resolution.");
List<String> expected2 =
Arrays.asList(
"doris_env_1,18",
"doris_env_1_1,10",
"doris_env_2_1,11",
"doris_env_3,3");
String sql2 =
"select * from ( select * from test_e2e_mysql_env.env_tbl1_test union all select * from test_e2e_mysql_env.env_tbl2_test union all select * from test_e2e_mysql_env.env_tbl3_test ) res order by 1";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, sql2, 2);

cancelE2EJob(jobName);
} finally {
// Clean up environment variables
unsetEnvironmentVariable("TABLE_PREFIX");
unsetEnvironmentVariable("TABLE_SUFFIX");
}
}

/** Set environment variable using reflection (for testing purposes only) */
@SuppressWarnings("unchecked")
private void setEnvironmentVariable(String key, String value) {
try {
Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
java.lang.reflect.Field theEnvironmentField =
processEnvironmentClass.getDeclaredField("theEnvironment");
theEnvironmentField.setAccessible(true);
Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
env.put(key, value);

java.lang.reflect.Field theCaseInsensitiveEnvironmentField =
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
theCaseInsensitiveEnvironmentField.setAccessible(true);
Map<String, String> cienv =
(Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
cienv.put(key, value);
} catch (Exception e) {
LOG.warn("Failed to set environment variable: " + key + "=" + value, e);
}
}

/** Unset environment variable using reflection (for testing purposes only) */
@SuppressWarnings("unchecked")
private void unsetEnvironmentVariable(String key) {
try {
Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
java.lang.reflect.Field theEnvironmentField =
processEnvironmentClass.getDeclaredField("theEnvironment");
theEnvironmentField.setAccessible(true);
Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
env.remove(key);

java.lang.reflect.Field theCaseInsensitiveEnvironmentField =
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
theCaseInsensitiveEnvironmentField.setAccessible(true);
Map<String, String> cienv =
(Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
cienv.remove(key);
} catch (Exception e) {
LOG.warn("Failed to unset environment variable: " + key, e);
}
}

@After
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,91 @@ private void assertEquals(
Assert.assertTrue(valueConf.contains(value));
}
}

@Test
public void testGetConfigMapWithEnvironmentVariables() {
// Test cases for environment variable substitution.
// We assume these environment variables are NOT set in the test environment,
// so they should resolve to their placeholder strings.

// Case 1: Simple env var placeholder
MultipleParameterTool params1 =
MultipleParameterTool.fromArgs(
new String[] {"--test-conf", "db.user=$DB_USER_UNSET"});
Map<String, String> expected1 = new HashMap<>();
expected1.put("db.user", "$DB_USER_UNSET");
Assert.assertEquals(expected1, CdcTools.getConfigMap(params1, "test-conf"));

// Case 2: Env var with braces placeholder
MultipleParameterTool params2 =
MultipleParameterTool.fromArgs(
new String[] {"--test-conf", "db.pass=${DB_PASS_UNSET}"});
Map<String, String> expected2 = new HashMap<>();
expected2.put("db.pass", "${DB_PASS_UNSET}");
Assert.assertEquals(expected2, CdcTools.getConfigMap(params2, "test-conf"));

// Case 3: Mix of plain string and env var placeholder
MultipleParameterTool params3 =
MultipleParameterTool.fromArgs(
new String[] {
"--test-conf",
"db.host=localhost",
"--test-conf",
"db.port=$DB_PORT_UNSET"
});
Map<String, String> expected3 = new HashMap<>();
expected3.put("db.host", "localhost");
expected3.put("db.port", "$DB_PORT_UNSET");
Assert.assertEquals(expected3, CdcTools.getConfigMap(params3, "test-conf"));

// Case 4: Env var within a string
MultipleParameterTool params4 =
MultipleParameterTool.fromArgs(
new String[] {
"--test-conf", "conn.string=jdbc:mysql://$DB_HOST_UNSET:3306/mydb"
});
Map<String, String> expected4 = new HashMap<>();
expected4.put("conn.string", "jdbc:mysql://$DB_HOST_UNSET:3306/mydb");
Assert.assertEquals(expected4, CdcTools.getConfigMap(params4, "test-conf"));

// Case 5: Multiple env vars in one string
MultipleParameterTool params5 =
MultipleParameterTool.fromArgs(
new String[] {
"--test-conf", "credentials=user:$USER_UNSET,pass:$PASS_UNSET"
});
Map<String, String> expected5 = new HashMap<>();
expected5.put("credentials", "user:$USER_UNSET,pass:$PASS_UNSET");
Assert.assertEquals(expected5, CdcTools.getConfigMap(params5, "test-conf"));

// Case 6: No env vars (regular behavior)
MultipleParameterTool params6 =
MultipleParameterTool.fromArgs(
new String[] {"--test-conf", "key1=value1", "--test-conf", "key2=value2"});
Map<String, String> expected6 = new HashMap<>();
expected6.put("key1", "value1");
expected6.put("key2", "value2");
Assert.assertEquals(expected6, CdcTools.getConfigMap(params6, "test-conf"));

// Case 7: Env var for a key that allows empty value (e.g. password), resolves to
// placeholder
MultipleParameterTool params7 =
MultipleParameterTool.fromArgs(
new String[] {"--test-conf", "password=$PASSWORD_UNSET"});
Map<String, String> expected7 = new HashMap<>();
expected7.put(
"password", "$PASSWORD_UNSET"); // DatabaseSyncConfig.PASSWORD is in EMPTY_KEYS
Assert.assertEquals(expected7, CdcTools.getConfigMap(params7, "test-conf"));

// Case 8: Env var that resolves to an empty string (if it were set to empty)
// For this test, we simulate it by having the placeholder itself, as we can't set it to
// empty easily here.
// If $EMPTY_VAR was set to "", the result for "key" would be "".
// Since it's not set, it remains "$EMPTY_VAR".
MultipleParameterTool params8 =
MultipleParameterTool.fromArgs(new String[] {"--test-conf", "key=$EMPTY_VAR"});
Map<String, String> expected8 = new HashMap<>();
expected8.put("key", "$EMPTY_VAR");
Assert.assertEquals(expected8, CdcTools.getConfigMap(params8, "test-conf"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mysql-sync-database
--database test_e2e_mysql_env
--mysql-conf database-name=test_e2e_mysql_env
--table-prefix ${TABLE_PREFIX}
--table-suffix ${TABLE_SUFFIX}
--including-tables "tbl.*"
--excluding-tables "tbl4"
--sink-conf sink.ignore.update-before=false
--table-conf replication_num=1
--single-sink true
--ignore-default-value false
--schema-change-mode sql_parser
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
DROP DATABASE if EXISTS test_e2e_mysql_env;
CREATE DATABASE if NOT EXISTS test_e2e_mysql_env;
DROP TABLE IF EXISTS test_e2e_mysql_env.tbl1;
CREATE TABLE test_e2e_mysql_env.tbl1 (
`name` varchar(256) primary key,
`age` int
);
insert into test_e2e_mysql_env.tbl1 values ('doris_env_1',1);


DROP TABLE IF EXISTS test_e2e_mysql_env.tbl2;
CREATE TABLE test_e2e_mysql_env.tbl2 (
`name` varchar(256) primary key,
`age` int
);
insert into test_e2e_mysql_env.tbl2 values ('doris_env_2',2);


DROP TABLE IF EXISTS test_e2e_mysql_env.tbl3;
CREATE TABLE test_e2e_mysql_env.tbl3 (
`name` varchar(256) primary key,
`age` int
);
insert into test_e2e_mysql_env.tbl3 values ('doris_env_3',3);


DROP TABLE IF EXISTS test_e2e_mysql_env.tbl4;
CREATE TABLE test_e2e_mysql_env.tbl4 (
`name` varchar(256) primary key,
`age` int
);
insert into test_e2e_mysql_env.tbl4 values ('doris_env_4',4);

DROP TABLE IF EXISTS test_e2e_mysql_env.tbl5;
CREATE TABLE test_e2e_mysql_env.tbl5 (
`name` varchar(256) primary key,
`age` int
);
insert into test_e2e_mysql_env.tbl5 values ('doris_env_5',5);
Loading