Skip to content

Commit 09dcef7

Browse files
committed
完善GaussDB CDC测试流程,确保测试环境隔离和CDC正确进入stream阶段
测试脚本改进: - test_gaussdb_to_gaussdb_cdc.sh: 添加Step 0重置Flink集群,使用docker-compose down/up确保干净环境 - run_gaussdb_to_gaussdb_test.sh: 添加wait_for_cdc_stream_ready()函数等待CDC slots激活后再插入测试数据 - run_gaussdb_to_gaussdb_test.sh: 修改init_test_env()不再DROP/RECREATE表,改为检查表是否存在 部署脚本改进: - deploy_gaussdb_to_gaussdb.sh: 清理所有DN上的旧replication slots - deploy_gaussdb_to_gaussdb.sh: 添加分布式source表创建逻辑 - deploy_gaussdb_to_gaussdb.sh: 插入种子数据确保CDC快照阶段有数据,正确进入stream阶段 关键修复: - 解决空表导致CDC跳过stream阶段的问题 - 解决DN1 CDC slot未激活的问题 - 排除旧Flink状态和残留slots对测试的干扰 测试结果: INSERT/UPDATE/DELETE三项测试全部通过
1 parent f5b1214 commit 09dcef7

3 files changed

Lines changed: 170 additions & 29 deletions

File tree

deploy_gaussdb_to_gaussdb.sh

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,55 @@ docker restart flink-jobmanager flink-taskmanager
134134
echo "⏳ Waiting for cluster to stabilize (25s)..."
135135
sleep 25
136136

137-
# 5. Initialize GaussDB Sink Table (通过 CN 创建)
138-
echo "🗄️ Initializing GaussDB sink table..."
137+
# 5. Initialize GaussDB environment
138+
echo "🗄️ Initializing GaussDB environment..."
139+
140+
# DN 连接信息
141+
DN_HOSTS=("10.250.0.30" "10.250.0.181" "10.250.0.157")
142+
DN_PORTS=("40000" "40020" "40040")
143+
SLOT_NAMES=("flink_cdc_g2g_dn1" "flink_cdc_g2g_dn2" "flink_cdc_g2g_dn3")
144+
145+
# 5.1 清理各 DN 上的旧 replication slots
146+
echo "🧹 Cleaning old replication slots on all DNs..."
147+
for i in "${!DN_HOSTS[@]}"; do
148+
host="${DN_HOSTS[$i]}"
149+
port="${DN_PORTS[$i]}"
150+
slot="${SLOT_NAMES[$i]}"
151+
152+
echo " -> DN$((i+1)) ($host:$port): Cleaning slots..."
153+
PGPASSWORD=Gauss_235 psql -h "$host" -p "$port" -U tom -d db1 -c "
154+
SELECT pg_drop_replication_slot(slot_name)
155+
FROM pg_replication_slots
156+
WHERE slot_name LIKE 'flink_cdc_g2g%' AND active = false;
157+
" 2>/dev/null || true
158+
done
159+
echo -e "${GREEN}✅ Old replication slots cleaned${NC}"
160+
161+
# 5.2 创建 Source 表 (分布式表,通过 CN 创建)
162+
echo "📋 Creating source table (distributed)..."
163+
PGPASSWORD=Gauss_235 psql -h 10.250.0.30 -p 8000 -U tom -d db1 <<EOF
164+
-- 如果表不存在则创建
165+
DO \$\$
166+
BEGIN
167+
IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'products' AND table_schema = 'public') THEN
168+
CREATE TABLE products (
169+
product_id INTEGER PRIMARY KEY,
170+
product_name VARCHAR(200) NOT NULL,
171+
category VARCHAR(50),
172+
price DECIMAL(10, 2) NOT NULL,
173+
stock INTEGER DEFAULT 0,
174+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
175+
) DISTRIBUTE BY HASH(product_id);
176+
RAISE NOTICE 'Source table created';
177+
ELSE
178+
RAISE NOTICE 'Source table already exists';
179+
END IF;
180+
END \$\$;
181+
EOF
182+
echo -e "${GREEN}✅ Source table ready${NC}"
183+
184+
# 5.3 创建 Sink 表 (普通表,通过 CN 创建)
185+
echo "📋 Creating sink table..."
139186
PGPASSWORD=Gauss_235 psql -h 10.250.0.30 -p 8000 -U tom -d db1 <<EOF
140187
DROP TABLE IF EXISTS products_sink CASCADE;
141188
CREATE TABLE products_sink (
@@ -146,6 +193,20 @@ CREATE TABLE products_sink (
146193
stock INTEGER
147194
);
148195
EOF
196+
echo -e "${GREEN}✅ Sink table created${NC}"
197+
198+
# 5.4 插入种子数据 (确保快照阶段有数据,CDC 能正确进入 stream 阶段)
199+
echo "🌱 Inserting seed data for CDC initialization..."
200+
PGPASSWORD=Gauss_235 psql -h 10.250.0.30 -p 8000 -U tom -d db1 <<EOF
201+
-- 清除旧种子数据
202+
DELETE FROM products WHERE product_id BETWEEN 1 AND 10;
203+
-- 插入种子数据 (使用 ID 1-10,测试数据使用 2000+)
204+
INSERT INTO products (product_id, product_name, category, price, stock) VALUES
205+
(1, 'Seed Product 1', 'SEED', 10.00, 100),
206+
(2, 'Seed Product 2', 'SEED', 20.00, 200),
207+
(3, 'Seed Product 3', 'SEED', 30.00, 300);
208+
EOF
209+
echo -e "${GREEN}✅ Seed data inserted (3 records)${NC}"
149210

150211
# 6. Submit SQL Job
151212
echo "🚀 Submitting SQL job to Flink..."

run_gaussdb_to_gaussdb_test.sh

Lines changed: 72 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -95,39 +95,83 @@ function cleanup_test_data() {
9595
function init_test_env() {
9696
echo -e "${BLUE}🔧 Initializing GaussDB-to-GaussDB test environment...${NC}"
9797

98-
# 确保 Source 表存在 (分布式表)
99-
run_sql_cn "DROP TABLE IF EXISTS $SOURCE_TABLE CASCADE;" true > /dev/null
100-
101-
local source_ddl="CREATE TABLE $SOURCE_TABLE (
102-
product_id INTEGER PRIMARY KEY,
103-
product_name VARCHAR(200) NOT NULL,
104-
category VARCHAR(50),
105-
price DECIMAL(10, 2) NOT NULL,
106-
stock INTEGER DEFAULT 0,
107-
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
108-
) DISTRIBUTE BY HASH(product_id);"
98+
# 注意:不要 DROP 表!Flink CDC Job 正在监听这些表
99+
# 只检查表是否存在,如果不存在才创建
109100

110-
run_sql_cn "$source_ddl" true > /dev/null
101+
# 检查 Source 表是否已存在
102+
local source_exists=$(PGPASSWORD=$DB_PASS psql -h $CN_HOST -p $CN_PORT -U $DB_USER -d $DB_NAME -t -A -c "SELECT 1 FROM information_schema.tables WHERE table_name='$SOURCE_TABLE' AND table_schema='public';" 2>/dev/null)
111103

112-
# 确保 Sink 表存在 (普通表,用于接收同步数据)
113-
run_sql_cn "DROP TABLE IF EXISTS $SINK_TABLE CASCADE;" true > /dev/null
114-
115-
local sink_ddl="CREATE TABLE $SINK_TABLE (
116-
product_id INTEGER PRIMARY KEY,
117-
product_name VARCHAR(200),
118-
category VARCHAR(50),
119-
price DECIMAL(10, 2),
120-
stock INTEGER
121-
);"
104+
if [ -z "$source_exists" ]; then
105+
echo -e "${YELLOW} Creating source table (not exists)...${NC}"
106+
local source_ddl="CREATE TABLE $SOURCE_TABLE (
107+
product_id INTEGER PRIMARY KEY,
108+
product_name VARCHAR(200) NOT NULL,
109+
category VARCHAR(50),
110+
price DECIMAL(10, 2) NOT NULL,
111+
stock INTEGER DEFAULT 0,
112+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
113+
) DISTRIBUTE BY HASH(product_id);"
114+
run_sql_cn "$source_ddl" true > /dev/null
115+
else
116+
echo -e "${GREEN} Source table already exists${NC}"
117+
fi
122118

123-
run_sql_cn "$sink_ddl" true > /dev/null
119+
# 检查 Sink 表是否已存在
120+
local sink_exists=$(PGPASSWORD=$DB_PASS psql -h $CN_HOST -p $CN_PORT -U $DB_USER -d $DB_NAME -t -A -c "SELECT 1 FROM information_schema.tables WHERE table_name='$SINK_TABLE' AND table_schema='public';" 2>/dev/null)
121+
122+
if [ -z "$sink_exists" ]; then
123+
echo -e "${YELLOW} Creating sink table (not exists)...${NC}"
124+
local sink_ddl="CREATE TABLE $SINK_TABLE (
125+
product_id INTEGER PRIMARY KEY,
126+
product_name VARCHAR(200),
127+
category VARCHAR(50),
128+
price DECIMAL(10, 2),
129+
stock INTEGER
130+
);"
131+
run_sql_cn "$sink_ddl" true > /dev/null
132+
else
133+
echo -e "${GREEN} Sink table already exists${NC}"
134+
fi
124135

125136
cleanup_test_data
126-
echo "⏳ Waiting for environment stabilization (10s)..."
127-
sleep 10
137+
echo "⏳ Waiting for environment stabilization (5s)..."
138+
sleep 5
128139
echo -e "${GREEN}✅ GaussDB-to-GaussDB test environment initialized${NC}"
129140
}
130141

142+
# 等待 CDC stream 阶段就绪 (等待所有 DN 的 slot 激活)
143+
function wait_for_cdc_stream_ready() {
144+
echo -e "${YELLOW}⏳ Waiting for CDC stream phase to be ready...${NC}"
145+
local max_wait=120
146+
local waited=0
147+
local interval=5
148+
149+
while [ $waited -lt $max_wait ]; do
150+
# 检查所有3个DN上是否有活跃的CDC slot
151+
local active_count=0
152+
for i in "${!DN_HOSTS[@]}"; do
153+
local host="${DN_HOSTS[$i]}"
154+
local port="${DN_PORTS[$i]}"
155+
local has_active=$(PGPASSWORD=$DB_PASS psql -h "$host" -p "$port" -U $DB_USER -d $DB_NAME -t -A -c "SELECT COUNT(*) FROM pg_replication_slots WHERE slot_name LIKE 'flink_cdc_g2g%' AND active = true;" 2>/dev/null)
156+
if [[ "$has_active" =~ ^[0-9]+$ ]] && [ "$has_active" -gt 0 ]; then
157+
active_count=$((active_count + 1))
158+
fi
159+
done
160+
161+
if [ $active_count -ge 3 ]; then
162+
echo -e "${GREEN}✅ All 3 DN CDC slots are active${NC}"
163+
return 0
164+
fi
165+
166+
echo -ne " Waiting... ($waited/${max_wait}s, active slots: $active_count/3)\r"
167+
sleep $interval
168+
waited=$((waited + interval))
169+
done
170+
171+
echo -e "\n${YELLOW}⚠️ CDC slots may not be fully ready after ${max_wait}s, continuing anyway...${NC}"
172+
return 0
173+
}
174+
131175
# 完整的分布测试流程
132176
function run_distributed_test() {
133177
echo -e "${MAGENTA}╔════════════════════════════════════════════════════════════╗${NC}"
@@ -136,6 +180,9 @@ function run_distributed_test() {
136180
echo ""
137181

138182
init_test_env
183+
184+
# 等待 CDC stream 阶段就绪
185+
wait_for_cdc_stream_ready
139186

140187
local total_records=$((TEST_ID_END - TEST_ID_START + 1))
141188

test_gaussdb_to_gaussdb_cdc.sh

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,42 @@ echo ""
5454
echo -e "${YELLOW}⏱️ 开始时间: $(date '+%Y-%m-%d %H:%M:%S')${NC}"
5555
echo ""
5656

57+
# ========== 步骤 0: 重置 Flink 集群 ==========
58+
echo -e "${MAGENTA}╔══════════════════════════════════════════════════════════════════╗${NC}"
59+
echo -e "${MAGENTA}║ 步骤 0: 重置 Flink 集群(确保干净环境) ║${NC}"
60+
echo -e "${MAGENTA}╚══════════════════════════════════════════════════════════════════╝${NC}"
61+
echo ""
62+
63+
DOCKER_COMPOSE_DIR="$SCRIPT_DIR/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-gaussdb-cdc/docker"
64+
65+
if [ -d "$DOCKER_COMPOSE_DIR" ]; then
66+
echo -e "${CYAN}🔄 正在停止并删除旧的 Flink 集群...${NC}"
67+
cd "$DOCKER_COMPOSE_DIR"
68+
docker-compose down --remove-orphans 2>/dev/null || true
69+
70+
# 清理旧的日志和检查点
71+
echo -e "${CYAN}🧹 清理旧日志和检查点...${NC}"
72+
rm -rf ./log/* ./flink-checkpoints/* ./flink-savepoints/* 2>/dev/null || true
73+
74+
echo -e "${CYAN}🚀 正在启动全新的 Flink 集群...${NC}"
75+
docker-compose up -d
76+
77+
echo -e "${YELLOW}⏳ 等待 Flink 集群启动 (20秒)...${NC}"
78+
sleep 20
79+
80+
cd "$SCRIPT_DIR"
81+
echo -e "${GREEN}✅ Flink 集群已重置完成${NC}"
82+
else
83+
echo -e "${YELLOW}⚠️ 未找到 docker-compose 目录,跳过集群重置${NC}"
84+
fi
85+
86+
echo ""
87+
echo -e "${BLUE}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${NC}"
88+
echo ""
89+
5790
# ========== 步骤 1: 部署 ==========
5891
echo -e "${MAGENTA}╔══════════════════════════════════════════════════════════════════╗${NC}"
59-
echo -e "${MAGENTA}║ 步骤 1/2: 部署 GaussDB -> GaussDB CDC 配置 ║${NC}"
92+
echo -e "${MAGENTA}║ 步骤 1/3: 部署 GaussDB -> GaussDB CDC 配置 ║${NC}"
6093
echo -e "${MAGENTA}╚══════════════════════════════════════════════════════════════════╝${NC}"
6194
echo ""
6295

@@ -90,7 +123,7 @@ sleep 5
90123
# ========== 步骤 2: 运行分布式测试 ==========
91124
echo ""
92125
echo -e "${MAGENTA}╔══════════════════════════════════════════════════════════════════╗${NC}"
93-
echo -e "${MAGENTA}║ 步骤 2/2: 运行 GaussDB -> GaussDB 增量同步测试 ║${NC}"
126+
echo -e "${MAGENTA}║ 步骤 2/3: 运行 GaussDB -> GaussDB 增量同步测试 ║${NC}"
94127
echo -e "${MAGENTA}╚══════════════════════════════════════════════════════════════════╝${NC}"
95128
echo ""
96129

0 commit comments

Comments
 (0)