Skip to content

Commit ba64f0f

Browse files
author
buu.nguyen
committed
✨ feat(tests): add environment variable support for MySQL to Doris E2E tests
- Introduces a new test case `testMySQL2DorisEnvVar` that validates environment variable resolution during data syncing from MySQL to Doris. - Implements methods to set and unset environment variables for testing purposes. - Adds initialization SQL and configuration files to support the new test scenario.
1 parent 785f7c8 commit ba64f0f

3 files changed

Lines changed: 163 additions & 0 deletions

File tree

flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.sql.Statement;
3232
import java.util.Arrays;
3333
import java.util.List;
34+
import java.util.Map;
3435

3536
public class Mysql2DorisE2ECase extends AbstractE2EService {
3637
private static final Logger LOG = LoggerFactory.getLogger(Mysql2DorisE2ECase.class);
@@ -581,6 +582,117 @@ public void testMySQL2DorisMultiDatabase2OneSync() throws Exception {
581582
cancelE2EJob(jobName);
582583
}
583584

585+
@Test
586+
public void testMySQL2DorisEnvVar() throws Exception {
587+
String jobName = "testMySQL2DorisEnvVar";
588+
String resourcePath = "container/e2e/mysql2doris/testMySQL2DorisEnvVar.txt";
589+
590+
// Set environment variables for testing
591+
setEnvironmentVariable("TABLE_PREFIX", "env_");
592+
setEnvironmentVariable("TABLE_SUFFIX", "_test");
593+
594+
try {
595+
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisEnvVar_init.sql");
596+
startMysql2DorisJob(jobName, resourcePath);
597+
598+
// wait 2 times checkpoint
599+
Thread.sleep(20000);
600+
LOG.info("Start to verify create table result with environment variable resolution.");
601+
String tblQuery =
602+
String.format(
603+
"SELECT TABLE_NAME \n"
604+
+ "FROM INFORMATION_SCHEMA.TABLES \n"
605+
+ "WHERE TABLE_SCHEMA = '%s'",
606+
"test_e2e_mysql_env");
607+
// Verify that tables are created with env variable resolved prefix and suffix
608+
List<String> expectedTables =
609+
Arrays.asList(
610+
"env_tbl1_test", "env_tbl2_test", "env_tbl3_test", "env_tbl5_test");
611+
ContainerUtils.checkResult(
612+
getDorisQueryConnection(), LOG, expectedTables, tblQuery, 1, false);
613+
614+
LOG.info("Start to verify init result with environment variable resolution.");
615+
List<String> expected =
616+
Arrays.asList(
617+
"doris_env_1,1", "doris_env_2,2", "doris_env_3,3", "doris_env_5,5");
618+
String sql1 =
619+
"select * from ( select * from test_e2e_mysql_env.env_tbl1_test union all select * from test_e2e_mysql_env.env_tbl2_test union all select * from test_e2e_mysql_env.env_tbl3_test union all select * from test_e2e_mysql_env.env_tbl5_test) res order by 1";
620+
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2);
621+
622+
// add incremental data to verify CDC continues working with env vars
623+
ContainerUtils.executeSQLStatement(
624+
getMySQLQueryConnection(),
625+
LOG,
626+
"insert into test_e2e_mysql_env.tbl1 values ('doris_env_1_1',10)",
627+
"insert into test_e2e_mysql_env.tbl2 values ('doris_env_2_1',11)",
628+
"update test_e2e_mysql_env.tbl1 set age=18 where name='doris_env_1'",
629+
"delete from test_e2e_mysql_env.tbl2 where name='doris_env_2'");
630+
Thread.sleep(20000);
631+
632+
LOG.info(
633+
"Start to verify incremental data result with environment variable resolution.");
634+
List<String> expected2 =
635+
Arrays.asList(
636+
"doris_env_1,18",
637+
"doris_env_1_1,10",
638+
"doris_env_2_1,11",
639+
"doris_env_3,3");
640+
String sql2 =
641+
"select * from ( select * from test_e2e_mysql_env.env_tbl1_test union all select * from test_e2e_mysql_env.env_tbl2_test union all select * from test_e2e_mysql_env.env_tbl3_test ) res order by 1";
642+
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, sql2, 2);
643+
644+
cancelE2EJob(jobName);
645+
} finally {
646+
// Clean up environment variables
647+
unsetEnvironmentVariable("TABLE_PREFIX");
648+
unsetEnvironmentVariable("TABLE_SUFFIX");
649+
}
650+
}
651+
652+
/** Set environment variable using reflection (for testing purposes only) */
653+
@SuppressWarnings("unchecked")
654+
private void setEnvironmentVariable(String key, String value) {
655+
try {
656+
Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
657+
java.lang.reflect.Field theEnvironmentField =
658+
processEnvironmentClass.getDeclaredField("theEnvironment");
659+
theEnvironmentField.setAccessible(true);
660+
Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
661+
env.put(key, value);
662+
663+
java.lang.reflect.Field theCaseInsensitiveEnvironmentField =
664+
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
665+
theCaseInsensitiveEnvironmentField.setAccessible(true);
666+
Map<String, String> cienv =
667+
(Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
668+
cienv.put(key, value);
669+
} catch (Exception e) {
670+
LOG.warn("Failed to set environment variable: " + key + "=" + value, e);
671+
}
672+
}
673+
674+
/** Unset environment variable using reflection (for testing purposes only) */
675+
@SuppressWarnings("unchecked")
676+
private void unsetEnvironmentVariable(String key) {
677+
try {
678+
Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
679+
java.lang.reflect.Field theEnvironmentField =
680+
processEnvironmentClass.getDeclaredField("theEnvironment");
681+
theEnvironmentField.setAccessible(true);
682+
Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
683+
env.remove(key);
684+
685+
java.lang.reflect.Field theCaseInsensitiveEnvironmentField =
686+
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
687+
theCaseInsensitiveEnvironmentField.setAccessible(true);
688+
Map<String, String> cienv =
689+
(Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
690+
cienv.remove(key);
691+
} catch (Exception e) {
692+
LOG.warn("Failed to unset environment variable: " + key, e);
693+
}
694+
}
695+
584696
@After
585697
public void close() {
586698
try {
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
mysql-sync-database
2+
--database test_e2e_mysql_env
3+
--mysql-conf database-name=test_e2e_mysql_env
4+
--table-prefix ${TABLE_PREFIX}
5+
--table-suffix ${TABLE_SUFFIX}
6+
--including-tables "tbl.*"
7+
--excluding-tables "tbl4"
8+
--sink-conf sink.ignore.update-before=false
9+
--table-conf replication_num=1
10+
--single-sink true
11+
--ignore-default-value false
12+
--schema-change-mode sql_parser
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
DROP DATABASE if EXISTS test_e2e_mysql_env;
2+
CREATE DATABASE if NOT EXISTS test_e2e_mysql_env;
3+
DROP TABLE IF EXISTS test_e2e_mysql_env.tbl1;
4+
CREATE TABLE test_e2e_mysql_env.tbl1 (
5+
`name` varchar(256) primary key,
6+
`age` int
7+
);
8+
insert into test_e2e_mysql_env.tbl1 values ('doris_env_1',1);
9+
10+
11+
DROP TABLE IF EXISTS test_e2e_mysql_env.tbl2;
12+
CREATE TABLE test_e2e_mysql_env.tbl2 (
13+
`name` varchar(256) primary key,
14+
`age` int
15+
);
16+
insert into test_e2e_mysql_env.tbl2 values ('doris_env_2',2);
17+
18+
19+
DROP TABLE IF EXISTS test_e2e_mysql_env.tbl3;
20+
CREATE TABLE test_e2e_mysql_env.tbl3 (
21+
`name` varchar(256) primary key,
22+
`age` int
23+
);
24+
insert into test_e2e_mysql_env.tbl3 values ('doris_env_3',3);
25+
26+
27+
DROP TABLE IF EXISTS test_e2e_mysql_env.tbl4;
28+
CREATE TABLE test_e2e_mysql_env.tbl4 (
29+
`name` varchar(256) primary key,
30+
`age` int
31+
);
32+
insert into test_e2e_mysql_env.tbl4 values ('doris_env_4',4);
33+
34+
DROP TABLE IF EXISTS test_e2e_mysql_env.tbl5;
35+
CREATE TABLE test_e2e_mysql_env.tbl5 (
36+
`name` varchar(256) primary key,
37+
`age` int
38+
);
39+
insert into test_e2e_mysql_env.tbl5 values ('doris_env_5',5);

0 commit comments

Comments
 (0)