|
6 | 6 | import json
|
7 | 7 | import uuid
|
8 | 8 | import decimal
|
| 9 | +import tempfile |
| 10 | +import yaml |
9 | 11 |
|
10 | 12 | import pytest
|
11 | 13 | import requests
|
@@ -2278,6 +2280,9 @@ def test_schema_evolution_with_db_mapping():
|
2278 | 2280 | clickhouse_settings=cfg.clickhouse,
|
2279 | 2281 | )
|
2280 | 2282 |
|
| 2283 | + ch.drop_database("mapped_target_db") |
| 2284 | + assert_wait(lambda: "mapped_target_db" not in ch.get_databases()) |
| 2285 | + |
2281 | 2286 | prepare_env(cfg, mysql, ch, db_name=TEST_DB_NAME)
|
2282 | 2287 |
|
2283 | 2288 | # Create a test table with some columns using fully qualified name
|
@@ -2352,3 +2357,100 @@ def test_schema_evolution_with_db_mapping():
|
2352 | 2357 | # Clean up
|
2353 | 2358 | db_replicator_runner.stop()
|
2354 | 2359 | binlog_replicator_runner.stop()
|
| 2360 | + |
| 2361 | + |
| 2362 | +def test_dynamic_column_addition_user_config(): |
| 2363 | + """Test to verify handling of dynamically added columns using user's exact configuration. |
| 2364 | + |
| 2365 | + This test reproduces the issue where columns are added on-the-fly via UPDATE |
| 2366 | + rather than through ALTER TABLE statements, leading to an index error in the converter. |
| 2367 | + """ |
| 2368 | + config_path = 'tests_config_dynamic_column.yaml' |
| 2369 | + |
| 2370 | + cfg = config.Settings() |
| 2371 | + cfg.load(config_path) |
| 2372 | + |
| 2373 | + mysql = mysql_api.MySQLApi( |
| 2374 | + database=None, |
| 2375 | + mysql_settings=cfg.mysql, |
| 2376 | + ) |
| 2377 | + |
| 2378 | + ch = clickhouse_api.ClickhouseApi( |
| 2379 | + database=None, |
| 2380 | + clickhouse_settings=cfg.clickhouse, |
| 2381 | + ) |
| 2382 | + |
| 2383 | + prepare_env(cfg, mysql, ch, db_name='test_replication') |
| 2384 | + |
| 2385 | + # Prepare environment - drop and recreate databases |
| 2386 | + mysql.drop_database("test_replication") |
| 2387 | + mysql.create_database("test_replication") |
| 2388 | + mysql.set_database("test_replication") |
| 2389 | + ch.drop_database("test_replication_ch") |
| 2390 | + assert_wait(lambda: "test_replication_ch" not in ch.get_databases()) |
| 2391 | + |
| 2392 | + # Create the exact table structure from the user's example |
| 2393 | + mysql.execute(''' |
| 2394 | + CREATE TABLE test_replication.replication_data ( |
| 2395 | + code VARCHAR(255) NOT NULL PRIMARY KEY, |
| 2396 | + val_1 VARCHAR(255) NOT NULL |
| 2397 | + ); |
| 2398 | + ''') |
| 2399 | + |
| 2400 | + # Insert initial data |
| 2401 | + mysql.execute( |
| 2402 | + "INSERT INTO test_replication.replication_data(code, val_1) VALUE ('test-1', '1');", |
| 2403 | + commit=True, |
| 2404 | + ) |
| 2405 | + |
| 2406 | + # Start the replication processes |
| 2407 | + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_path) |
| 2408 | + binlog_replicator_runner.run() |
| 2409 | + db_replicator_runner = DbReplicatorRunner("test_replication", cfg_file=config_path) |
| 2410 | + db_replicator_runner.run() |
| 2411 | + |
| 2412 | + # Wait for initial replication to complete |
| 2413 | + assert_wait(lambda: "test_replication_ch" in ch.get_databases()) |
| 2414 | + |
| 2415 | + # Set the database before checking tables |
| 2416 | + ch.execute_command("USE test_replication_ch") |
| 2417 | + assert_wait(lambda: "replication_data" in ch.get_tables()) |
| 2418 | + assert_wait(lambda: len(ch.select("replication_data")) == 1) |
| 2419 | + |
| 2420 | + # Verify initial data was replicated correctly |
| 2421 | + assert_wait(lambda: ch.select("replication_data", where="code='test-1'")[0]['val_1'] == '1') |
| 2422 | + |
| 2423 | + # Update an existing field - this should work fine |
| 2424 | + mysql.execute("UPDATE test_replication.replication_data SET val_1 = '1200' WHERE code = 'test-1';", commit=True) |
| 2425 | + assert_wait(lambda: ch.select("replication_data", where="code='test-1'")[0]['val_1'] == '1200') |
| 2426 | + |
| 2427 | + mysql.execute("USE test_replication"); |
| 2428 | + |
| 2429 | + # Add val_2 column |
| 2430 | + mysql.execute("ALTER TABLE replication_data ADD COLUMN val_2 VARCHAR(255);", commit=True) |
| 2431 | + |
| 2432 | + # Now try to update with a field that doesn't exist |
| 2433 | + # This would have caused an error before our fix |
| 2434 | + mysql.execute("UPDATE test_replication.replication_data SET val_2 = '100' WHERE code = 'test-1';", commit=True) |
| 2435 | + |
| 2436 | + # Verify replication processes are still running |
| 2437 | + binlog_pid = get_binlog_replicator_pid(cfg) |
| 2438 | + db_pid = get_db_replicator_pid(cfg, "test_replication") |
| 2439 | + |
| 2440 | + assert binlog_pid is not None, "Binlog replicator process died" |
| 2441 | + assert db_pid is not None, "DB replicator process died" |
| 2442 | + |
| 2443 | + # Verify the replication is still working after the dynamic column update |
| 2444 | + mysql.execute("UPDATE test_replication.replication_data SET val_1 = '1500' WHERE code = 'test-1';", commit=True) |
| 2445 | + assert_wait(lambda: ch.select("replication_data", where="code='test-1'")[0]['val_1'] == '1500') |
| 2446 | + |
| 2447 | + print("Test passed - dynamic column was skipped without breaking replication") |
| 2448 | + |
| 2449 | + # Cleanup |
| 2450 | + binlog_pid = get_binlog_replicator_pid(cfg) |
| 2451 | + if binlog_pid: |
| 2452 | + kill_process(binlog_pid) |
| 2453 | + |
| 2454 | + db_pid = get_db_replicator_pid(cfg, "test_replication") |
| 2455 | + if db_pid: |
| 2456 | + kill_process(db_pid) |
0 commit comments