Skip to content

Commit 856256a

Browse files
authored
Verify table structure after initial replication (#142)
1 parent 82aa092 commit 856256a

File tree

2 files changed

+112
-1
lines changed

2 files changed

+112
-1
lines changed

mysql_ch_replicator/db_replicator_initial.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ def perform_initial_replication(self):
103103
start_table = None
104104

105105
if not self.replicator.is_parallel_worker:
106+
# Verify table structures after replication but before swapping databases
107+
self.verify_table_structures_after_replication()
108+
106109
logger.info(f'initial replication - swapping database')
107110
if self.replicator.target_database in self.replicator.clickhouse_api.get_databases():
108111
self.replicator.clickhouse_api.execute_command(
@@ -216,6 +219,115 @@ def perform_initial_replication_table(self, table_name):
216219
)
217220
self.save_state_if_required(force=True)
218221

222+
def verify_table_structures_after_replication(self):
223+
"""
224+
Verify that MySQL table structures haven't changed during the initial replication process.
225+
This helps ensure data integrity by confirming the source tables are the same as when
226+
replication started.
227+
228+
Raises an exception if any table structure has changed, preventing the completion
229+
of the initial replication process.
230+
"""
231+
logger.info('Verifying table structures after initial replication')
232+
233+
changed_tables = []
234+
235+
for table_name in self.replicator.state.tables:
236+
if not self.replicator.config.is_table_matches(table_name):
237+
continue
238+
239+
if self.replicator.single_table and self.replicator.single_table != table_name:
240+
continue
241+
242+
# Get the current MySQL table structure
243+
current_mysql_create_statement = self.replicator.mysql_api.get_table_create_statement(table_name)
244+
current_mysql_structure = self.replicator.converter.parse_mysql_table_structure(
245+
current_mysql_create_statement, required_table_name=table_name,
246+
)
247+
248+
# Get the original structure used at the start of replication
249+
original_mysql_structure, _ = self.replicator.state.tables_structure.get(table_name, (None, None))
250+
251+
if not original_mysql_structure:
252+
logger.warning(f'Could not find original structure for table {table_name}')
253+
continue
254+
255+
# Compare the structures in a deterministic way
256+
structures_match = self._compare_table_structures(original_mysql_structure, current_mysql_structure)
257+
258+
if not structures_match:
259+
logger.warning(
260+
f'\n\n\n !!! WARNING - TABLE STRUCTURE CHANGED DURING REPLICATION (table "{table_name}") !!!\n\n'
261+
'The MySQL table structure has changed since the initial replication started.\n'
262+
'This may cause data inconsistency and replication issues.\n'
263+
)
264+
logger.error(f'Original structure: {original_mysql_structure}')
265+
logger.error(f'Current structure: {current_mysql_structure}')
266+
changed_tables.append(table_name)
267+
else:
268+
logger.info(f'Table structure verification passed for {table_name}')
269+
270+
# If any tables have changed, raise an exception to abort the replication process
271+
if changed_tables:
272+
error_message = (
273+
f"Table structure changes detected in: {', '.join(changed_tables)}. "
274+
"Initial replication aborted to prevent data inconsistency. "
275+
"Please restart replication after reviewing the changes."
276+
)
277+
logger.error(error_message)
278+
raise Exception(error_message)
279+
280+
logger.info('Table structure verification completed')
281+
282+
def _compare_table_structures(self, struct1, struct2):
283+
"""
284+
Compare two TableStructure objects in a deterministic way.
285+
Returns True if the structures are equivalent, False otherwise.
286+
"""
287+
# Compare basic attributes
288+
if struct1.table_name != struct2.table_name:
289+
logger.error(f"Table name mismatch: {struct1.table_name} vs {struct2.table_name}")
290+
return False
291+
292+
if struct1.charset != struct2.charset:
293+
logger.error(f"Charset mismatch: {struct1.charset} vs {struct2.charset}")
294+
return False
295+
296+
# Compare primary keys (order matters)
297+
if len(struct1.primary_keys) != len(struct2.primary_keys):
298+
logger.error(f"Primary key count mismatch: {len(struct1.primary_keys)} vs {len(struct2.primary_keys)}")
299+
return False
300+
301+
for i, key in enumerate(struct1.primary_keys):
302+
if key != struct2.primary_keys[i]:
303+
logger.error(f"Primary key mismatch at position {i}: {key} vs {struct2.primary_keys[i]}")
304+
return False
305+
306+
# Compare fields (count and attributes)
307+
if len(struct1.fields) != len(struct2.fields):
308+
logger.error(f"Field count mismatch: {len(struct1.fields)} vs {len(struct2.fields)}")
309+
return False
310+
311+
for i, field1 in enumerate(struct1.fields):
312+
field2 = struct2.fields[i]
313+
314+
if field1.name != field2.name:
315+
logger.error(f"Field name mismatch at position {i}: {field1.name} vs {field2.name}")
316+
return False
317+
318+
if field1.field_type != field2.field_type:
319+
logger.error(f"Field type mismatch for {field1.name}: {field1.field_type} vs {field2.field_type}")
320+
return False
321+
322+
# Compare parameters - normalize whitespace to avoid false positives
323+
params1 = ' '.join(field1.parameters.lower().split())
324+
params2 = ' '.join(field2.parameters.lower().split())
325+
if params1 != params2:
326+
logger.error(f"Field parameters mismatch for {field1.name}: {params1} vs {params2}")
327+
return False
328+
329+
return True
330+
219331
def perform_initial_replication_table_parallel(self, table_name):
220332
"""
221333
Execute initial replication for a table using multiple parallel worker processes.

mysql_ch_replicator/mysql_api.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ def get_records(self, table_name, order_by, limit, start_value=None, worker_id=N
113113
where = f'WHERE {hash_condition} '
114114

115115
query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}'
116-
print("query:", query)
117116

118117
# Execute the actual query
119118
self.cursor.execute(query)

0 commit comments

Comments
 (0)