@@ -29,19 +29,15 @@ function prepare() {
2929
3030 start_tidb_cluster --workdir $WORK_DIR
3131
32- # record tso before we create tables to skip the system table DDLs
33- start_ts=$( run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1} )
3432 do_retry 5 2 run_sql " CREATE TABLE test.iceberg_append_basic(id INT PRIMARY KEY, val INT);"
33+ # record tso after table creation so the table exists at start-ts
34+ start_ts=$( run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1} )
3535
3636 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
3737
3838 WAREHOUSE_DIR=" $WORK_DIR /iceberg_warehouse"
3939 SINK_URI=" iceberg://?warehouse=file://$WAREHOUSE_DIR &catalog=hadoop&namespace=ns&mode=append&commit-interval=1s&enable-checkpoint-table=true&enable-global-checkpoint-table=true&partitioning=days(_tidb_commit_time)"
40- create_output=$( cdc_cli_changefeed create --start-ts=$start_ts --sink-uri=" $SINK_URI " )
41- echo " $create_output "
42- changefeed_id=$( echo " $create_output " | grep ' ^ID:' | head -n1 | awk ' {print $2}' )
43- table_id=$( get_table_id test iceberg_append_basic)
44- wait_table_assigned " $changefeed_id " " $table_id "
40+ cdc_cli_changefeed create --start-ts=$start_ts --sink-uri=" $SINK_URI "
4541}
4642
4743function wait_file_exists() {
@@ -59,24 +55,6 @@ function wait_file_exists() {
5955 return 1
6056}
6157
62- function wait_table_assigned() {
63- changefeed_id=$1
64- table_id=$2
65- check_time=${3:- 60}
66- i=0
67- while [ $i -lt $check_time ]; do
68- tables=$( curl -s " http://127.0.0.1:8300/api/v2/changefeeds/${changefeed_id} /tables?keyspace=$KEYSPACE_NAME " )
69- if echo " $tables " | jq -e --argjson tid " $table_id " ' .items[].table_ids[]? | select(. == $tid)' > /dev/null 2>&1 ; then
70- return 0
71- fi
72- (( i++ ))
73- sleep 2
74- done
75- echo " table id ${table_id} not assigned to changefeed ${changefeed_id} after ${check_time} checks"
76- echo " $tables "
77- return 1
78- }
79-
8058function iceberg_check_append_basic() {
8159 do_retry 5 2 run_sql " INSERT INTO test.iceberg_append_basic(id, val) VALUES (1, 1);"
8260 do_retry 5 2 run_sql " INSERT INTO test.iceberg_append_basic(id, val) VALUES (2, 2);"
0 commit comments