Skip to content

Commit 7cd1388

Browse files
authored
Fix alter table index error (#145)
1 parent 856256a commit 7cd1388

File tree

3 files changed

+125
-2
lines changed

3 files changed

+125
-2
lines changed

mysql_ch_replicator/converter.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -531,11 +531,14 @@ def get_db_and_table_name(self, token, db_name):
531531
db_name = strip_sql_name(db_name)
532532
table_name = strip_sql_name(table_name)
533533
if self.db_replicator:
534-
if db_name == self.db_replicator.database:
535-
db_name = self.db_replicator.target_database
534+
# Check if database and table match config BEFORE applying mapping
536535
matches_config = (
537536
self.db_replicator.config.is_database_matches(db_name)
538537
and self.db_replicator.config.is_table_matches(table_name))
538+
539+
# Apply database mapping AFTER checking matches_config
540+
if db_name == self.db_replicator.database:
541+
db_name = self.db_replicator.target_database
539542
else:
540543
matches_config = True
541544

test_mysql_ch_replicator.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2257,3 +2257,95 @@ def test_performance_initial_only_replication():
22572257

22582258
# Clean up the temporary config file
22592259
os.remove(parallel_config_file)
2260+
2261+
2262+
def test_schema_evolution_with_db_mapping():
2263+
"""Test case to reproduce issue where schema evolution doesn't work with database mapping."""
2264+
# Use the predefined config file with database mapping
2265+
config_file = "tests_config_db_mapping.yaml"
2266+
2267+
cfg = config.Settings()
2268+
cfg.load(config_file)
2269+
2270+
# Note: Not setting a specific database in MySQL API
2271+
mysql = mysql_api.MySQLApi(
2272+
database=None,
2273+
mysql_settings=cfg.mysql,
2274+
)
2275+
2276+
ch = clickhouse_api.ClickhouseApi(
2277+
database="mapped_target_db",
2278+
clickhouse_settings=cfg.clickhouse,
2279+
)
2280+
2281+
prepare_env(cfg, mysql, ch, db_name=TEST_DB_NAME)
2282+
2283+
# Create a test table with some columns using fully qualified name
2284+
mysql.execute(f'''
2285+
CREATE TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (
2286+
`id` int NOT NULL,
2287+
`name` varchar(255) NOT NULL,
2288+
PRIMARY KEY (`id`));
2289+
''')
2290+
2291+
mysql.execute(
2292+
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name) VALUES (1, 'Original')",
2293+
commit=True,
2294+
)
2295+
2296+
# Start the replication
2297+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
2298+
binlog_replicator_runner.run()
2299+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
2300+
db_replicator_runner.run()
2301+
2302+
# Make sure initial replication works with the database mapping
2303+
assert_wait(lambda: "mapped_target_db" in ch.get_databases())
2304+
ch.execute_command(f'USE `mapped_target_db`')
2305+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
2306+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
2307+
2308+
# Now follow user's sequence of operations with fully qualified names (excluding RENAME operation)
2309+
# 1. Add new column
2310+
mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` ADD COLUMN added_new_column varchar(5)", commit=True)
2311+
2312+
# 2. Modify column type (skipping the rename step)
2313+
mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` MODIFY added_new_column varchar(10)", commit=True)
2314+
2315+
# 3. Insert data using the modified schema
2316+
mysql.execute(
2317+
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name, added_new_column) VALUES (2, 'Second', 'ABCDE')",
2318+
commit=True,
2319+
)
2320+
2321+
# 4. Drop the column - this is where the error was reported
2322+
mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` DROP COLUMN added_new_column", commit=True)
2323+
2324+
# 5. Add more inserts after schema changes to verify ongoing replication
2325+
mysql.execute(
2326+
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name) VALUES (3, 'Third record after drop column')",
2327+
commit=True,
2328+
)
2329+
2330+
# Check if all changes were replicated correctly
2331+
time.sleep(5) # Allow time for processing the changes
2332+
result = ch.select(TEST_TABLE_NAME)
2333+
print(f"ClickHouse table contents: {result}")
2334+
2335+
# Verify all records are present
2336+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
2337+
2338+
# Verify specific records exist
2339+
records = ch.select(TEST_TABLE_NAME)
2340+
print(f"Record type: {type(records[0])}") # Debug the record type
2341+
2342+
# Access by field name 'id' instead of by position
2343+
record_ids = [record['id'] for record in records]
2344+
assert 1 in record_ids, "Original record (id=1) not found"
2345+
assert 3 in record_ids, "New record (id=3) after schema changes not found"
2346+
2347+
# Note: This test confirms our fix for schema evolution with database mapping
2348+
2349+
# Clean up
2350+
db_replicator_runner.stop()
2351+
binlog_replicator_runner.stop()

tests_config_db_mapping.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
mysql:
2+
host: 'localhost'
3+
port: 9306
4+
user: 'root'
5+
password: 'admin'
6+
7+
clickhouse:
8+
host: 'localhost'
9+
port: 9123
10+
user: 'default'
11+
password: 'admin'
12+
13+
binlog_replicator:
14+
data_dir: '/app/binlog/'
15+
records_per_file: 100000
16+
binlog_retention_period: 43200 # 12 hours in seconds
17+
18+
databases: '*test*'
19+
log_level: 'debug'
20+
optimize_interval: 3
21+
check_db_updated_interval: 3
22+
23+
# This mapping is the key part that causes issues with schema evolution
24+
target_databases:
25+
replication-test_db: mapped_target_db
26+
27+
http_host: 'localhost'
28+
http_port: 9128

0 commit comments

Comments
 (0)