33set -eu
44
55CUR=$( cd " $( dirname " ${BASH_SOURCE[0]} " ) " && pwd)
6- source $CUR /../_utils/test_prepare
7- WORK_DIR=$OUT_DIR /$TEST_NAME
6+ source " $CUR /../_utils/test_prepare"
7+ WORK_DIR=" $OUT_DIR /$TEST_NAME "
88CDC_BINARY=cdc.test
9- SINK_TYPE=$1
9+ SINK_TYPE=${1 :- }
1010
11- if [ " $SINK_TYPE " != " iceberg" ]; then
11+ if [ -z " $SINK_TYPE " ] || [ " $SINK_TYPE " != " iceberg" ]; then
1212 echo " skip iceberg integration test, sink type is $SINK_TYPE "
1313 exit 0
1414fi
@@ -25,19 +25,20 @@ if [ "${ICEBERG_SPARK_READBACK}" = "1" ]; then
2525fi
2626
2727function prepare() {
28- rm -rf $WORK_DIR && mkdir -p $WORK_DIR
28+ rm -rf " $WORK_DIR " && mkdir -p " $WORK_DIR "
2929
30- start_tidb_cluster --workdir $WORK_DIR
30+ start_tidb_cluster --workdir " $WORK_DIR "
3131
3232 do_retry 5 2 run_sql " CREATE TABLE test.iceberg_append_basic(id INT PRIMARY KEY, val INT);"
3333 # record tso after table creation so the table exists at start-ts
34- start_ts=$( run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1} )
34+ start_ts=$( run_cdc_cli_tso_query " ${UP_PD_HOST_1} " " ${UP_PD_PORT_1} " )
3535
36- run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
36+ run_cdc_server --workdir " $WORK_DIR " --binary " $CDC_BINARY "
3737
3838 WAREHOUSE_DIR=" $WORK_DIR /iceberg_warehouse"
3939 SINK_URI=" iceberg://?warehouse=file://$WAREHOUSE_DIR &catalog=hadoop&namespace=ns&mode=append&commit-interval=1s&enable-checkpoint-table=true&enable-global-checkpoint-table=true&partitioning=days(_tidb_commit_time)"
40- cdc_cli_changefeed create --start-ts=$start_ts --sink-uri=" $SINK_URI "
40+ changefeed_id=$( cdc_cli_changefeed create --start-ts=" $start_ts " --sink-uri=" $SINK_URI " | grep ' ^ID:' | head -n1 | awk ' {print $2}' )
41+ wait_changefeed_table_assigned " $changefeed_id " " test" " iceberg_append_basic"
4142}
4243
4344function wait_file_exists() {
@@ -55,6 +56,68 @@ function wait_file_exists() {
5556 return 1
5657}
5758
59+ function wait_changefeed_table_assigned() {
60+ changefeed_id=$1
61+ db_name=$2
62+ table_name=$3
63+
64+ if [ " ${TICDC_NEWARCH:- } " != " true" ]; then
65+ return 0
66+ fi
67+
68+ table_id=$( get_table_id " $db_name " " $table_name " )
69+ echo " wait table ${db_name} .${table_name} (id=${table_id} ) assigned to changefeed ${changefeed_id} "
70+
71+ auth_user=${TICDC_USER:- ticdc}
72+ auth_pass=${TICDC_PASSWORD:- ticdc_secret}
73+ url=" http://127.0.0.1:8300/api/v2/changefeeds/${changefeed_id} /tables?keyspace=${KEYSPACE_NAME} "
74+ last_body=" "
75+ last_code=" "
76+
77+ i=0
78+ while [ $i -lt 60 ]; do
79+ resp=$( curl -sS -w ' \n%{http_code}' --user " ${auth_user} :${auth_pass} " " $url " 2>&1 || true)
80+ body=${resp% $' \n ' * }
81+ code=${resp##* $' \n ' }
82+ last_body=$body
83+ last_code=$code
84+
85+ if [ " $code " != " 200" ]; then
86+ echo " wait table ${db_name} .${table_name} : http $code response: $body "
87+ sleep 2
88+ (( i++ ))
89+ continue
90+ fi
91+
92+ if ! echo " $body " | jq -e . > /dev/null 2>&1 ; then
93+ echo " wait table ${db_name} .${table_name} : invalid json response: $body "
94+ sleep 2
95+ (( i++ ))
96+ continue
97+ fi
98+
99+ if echo " $body " | jq -e --argjson tid " $table_id " \
100+ ' def items: (if type=="array" then . else .items // [] end); items[]?.table_ids[]? | select(. == $tid)' > /dev/null; then
101+ echo " table ${db_name} .${table_name} assigned to changefeed ${changefeed_id} "
102+ return 0
103+ fi
104+
105+ if [ $(( i % 10 )) -eq 0 ]; then
106+ items_count=$( echo " $body " | jq -r ' if type=="array" then length else (.items | length) end' 2> /dev/null || echo " unknown" )
107+ echo " wait table ${db_name} .${table_name} : not assigned yet (items=${items_count} )"
108+ fi
109+
110+ sleep 2
111+ (( i++ ))
112+ done
113+
114+ echo " table ${db_name} .${table_name} not assigned to changefeed ${changefeed_id} after $(( i * 2 )) s (last_http=${last_code} )"
115+ if [ -n " $last_body " ]; then
116+ echo " last response: $last_body "
117+ fi
118+ return 1
119+ }
120+
58121function iceberg_check_append_basic() {
59122 do_retry 5 2 run_sql " INSERT INTO test.iceberg_append_basic(id, val) VALUES (1, 1);"
60123 do_retry 5 2 run_sql " INSERT INTO test.iceberg_append_basic(id, val) VALUES (2, 2);"
@@ -143,11 +206,11 @@ function iceberg_check_append_basic() {
143206 fi
144207 fi
145208
146- cleanup_process $CDC_BINARY
209+ cleanup_process " $CDC_BINARY "
147210}
148211
149- trap ' stop_test $WORK_DIR' EXIT
212+ trap ' stop_test " $WORK_DIR" ' EXIT
150213prepare " $@ "
151214iceberg_check_append_basic " $@ "
152- check_logs $WORK_DIR
215+ check_logs " $WORK_DIR "
153216echo " [$( date) ] <<<<<< run test case $TEST_NAME success! >>>>>>"
0 commit comments