Skip to content

Commit 02f2302

Browse files
authored
Added option to ignore delete operation (#151)
1 parent 6cf0b34 commit 02f2302

7 files changed

+190
-18
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ http_port: 9128 # optional
236236
types_mapping: # optional
237237
'char(36)': 'UUID'
238238

239+
ignore_deletes: false # optional, set to true to ignore DELETE operations
239240

240241
```
241242
@@ -259,6 +260,7 @@ types_mapping: # optional
259260
- `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch.
260261
- `http_host`, `http_port` - http endpoint to control replication, use `/docs` for abailable commands
261262
- `types_mappings` - custom types mapping, eg. you can map char(36) to UUID instead of String, etc.
263+
- `ignore_deletes` - when set to `true`, DELETE operations in MySQL will be ignored during replication. This creates an append-only model where data is only added, never removed. In this mode, the replicator doesn't create a temporary database and instead replicates directly to the target database.
262264

263265
Few more tables / dbs examples:
264266

mysql_ch_replicator/clickhouse_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ def drop_database(self, db_name):
264264
self.execute_command(f'DROP DATABASE IF EXISTS `{db_name}`')
265265

266266
def create_database(self, db_name):
267-
self.cursor.execute(f'CREATE DATABASE `{db_name}`')
267+
self.execute_command(f'CREATE DATABASE `{db_name}`')
268268

269269
def select(self, table_name, where=None, final=None):
270270
query = f'SELECT * FROM {table_name}'

mysql_ch_replicator/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ def __init__(self):
120120
self.types_mapping = {}
121121
self.target_databases = {}
122122
self.initial_replication_threads = 0
123+
self.ignore_deletes = False
123124

124125
def load(self, settings_file):
125126
data = open(settings_file, 'r').read()
@@ -145,6 +146,7 @@ def load(self, settings_file):
145146
self.http_port = data.pop('http_port', 0)
146147
self.target_databases = data.pop('target_databases', {})
147148
self.initial_replication_threads = data.pop('initial_replication_threads', 0)
149+
self.ignore_deletes = data.pop('ignore_deletes', False)
148150

149151
indexes = data.pop('indexes', [])
150152
for index in indexes:

mysql_ch_replicator/db_replicator.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,22 @@ def run(self):
200200
self.run_realtime_replication()
201201
return
202202

203-
logger.info('recreating database')
204-
self.clickhouse_api.database = self.target_database_tmp
205-
if not self.is_parallel_worker:
206-
self.clickhouse_api.recreate_database()
203+
# If ignore_deletes is enabled, we don't create a temporary DB and don't swap DBs
204+
# We replicate directly into the target DB
205+
if self.config.ignore_deletes:
206+
logger.info(f'using existing database (ignore_deletes=True)')
207+
self.clickhouse_api.database = self.target_database
208+
self.target_database_tmp = self.target_database
209+
210+
# Create database if it doesn't exist
211+
if self.target_database not in self.clickhouse_api.get_databases():
212+
logger.info(f'creating database {self.target_database}')
213+
self.clickhouse_api.create_database(db_name=self.target_database)
214+
else:
215+
logger.info('recreating database')
216+
self.clickhouse_api.database = self.target_database_tmp
217+
if not self.is_parallel_worker:
218+
self.clickhouse_api.recreate_database()
207219

208220
self.state.tables = self.mysql_api.get_tables()
209221
self.state.tables = [

mysql_ch_replicator/db_replicator_initial.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -106,19 +106,22 @@ def perform_initial_replication(self):
106106
# Verify table structures after replication but before swapping databases
107107
self.verify_table_structures_after_replication()
108108

109-
logger.info(f'initial replication - swapping database')
110-
if self.replicator.target_database in self.replicator.clickhouse_api.get_databases():
111-
self.replicator.clickhouse_api.execute_command(
112-
f'RENAME DATABASE `{self.replicator.target_database}` TO `{self.replicator.target_database}_old`',
113-
)
114-
self.replicator.clickhouse_api.execute_command(
115-
f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`',
116-
)
117-
self.replicator.clickhouse_api.drop_database(f'{self.replicator.target_database}_old')
118-
else:
119-
self.replicator.clickhouse_api.execute_command(
120-
f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`',
121-
)
109+
# If ignore_deletes is enabled, we don't swap databases, as we're directly replicating
110+
# to the target database
111+
if not self.replicator.config.ignore_deletes:
112+
logger.info(f'initial replication - swapping database')
113+
if self.replicator.target_database in self.replicator.clickhouse_api.get_databases():
114+
self.replicator.clickhouse_api.execute_command(
115+
f'RENAME DATABASE `{self.replicator.target_database}` TO `{self.replicator.target_database}_old`',
116+
)
117+
self.replicator.clickhouse_api.execute_command(
118+
f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`',
119+
)
120+
self.replicator.clickhouse_api.drop_database(f'{self.replicator.target_database}_old')
121+
else:
122+
self.replicator.clickhouse_api.execute_command(
123+
f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`',
124+
)
122125
self.replicator.clickhouse_api.database = self.replicator.target_database
123126
logger.info(f'initial replication - done')
124127

mysql_ch_replicator/db_replicator_realtime.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,17 @@ def handle_erase_event(self, event: LogEvent):
148148
f'table: {event.table_name}, '
149149
f'records: {event.records}',
150150
)
151+
152+
# If ignore_deletes is enabled, skip processing delete events
153+
if self.replicator.config.ignore_deletes:
154+
if self.replicator.config.debug_log_level:
155+
logger.debug(
156+
f'ignoring erase event (ignore_deletes=True): {event.transaction_id}, '
157+
f'table: {event.table_name}, '
158+
f'records: {len(event.records)}',
159+
)
160+
return
161+
151162
self.replicator.stats.erase_events_count += 1
152163
self.replicator.stats.erase_records_count += len(event.records)
153164

test_mysql_ch_replicator.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2454,3 +2454,145 @@ def test_dynamic_column_addition_user_config():
24542454
db_pid = get_db_replicator_pid(cfg, "test_replication")
24552455
if db_pid:
24562456
kill_process(db_pid)
2457+
2458+
2459+
def test_ignore_deletes():
2460+
# Create a temporary config file with ignore_deletes=True
2461+
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file:
2462+
config_file = temp_config_file.name
2463+
2464+
# Read the original config
2465+
with open(CONFIG_FILE, 'r') as original_config:
2466+
config_data = yaml.safe_load(original_config)
2467+
2468+
# Add ignore_deletes=True
2469+
config_data['ignore_deletes'] = True
2470+
2471+
# Write to the temp file
2472+
yaml.dump(config_data, temp_config_file)
2473+
2474+
try:
2475+
cfg = config.Settings()
2476+
cfg.load(config_file)
2477+
2478+
# Verify the ignore_deletes option was set
2479+
assert cfg.ignore_deletes is True
2480+
2481+
mysql = mysql_api.MySQLApi(
2482+
database=None,
2483+
mysql_settings=cfg.mysql,
2484+
)
2485+
2486+
ch = clickhouse_api.ClickhouseApi(
2487+
database=TEST_DB_NAME,
2488+
clickhouse_settings=cfg.clickhouse,
2489+
)
2490+
2491+
prepare_env(cfg, mysql, ch)
2492+
2493+
# Create a table with a composite primary key
2494+
mysql.execute(f'''
2495+
CREATE TABLE `{TEST_TABLE_NAME}` (
2496+
departments int(11) NOT NULL,
2497+
termine int(11) NOT NULL,
2498+
data varchar(255) NOT NULL,
2499+
PRIMARY KEY (departments,termine)
2500+
)
2501+
''')
2502+
2503+
# Insert initial records
2504+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (10, 20, 'data1');", commit=True)
2505+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (30, 40, 'data2');", commit=True)
2506+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (50, 60, 'data3');", commit=True)
2507+
2508+
# Run the replicator with ignore_deletes=True
2509+
run_all_runner = RunAllRunner(cfg_file=config_file)
2510+
run_all_runner.run()
2511+
2512+
# Wait for replication to complete
2513+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
2514+
ch.execute_command(f'USE `{TEST_DB_NAME}`')
2515+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
2516+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
2517+
2518+
# Delete some records from MySQL
2519+
mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=10;", commit=True)
2520+
mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=30;", commit=True)
2521+
2522+
# Wait a moment to ensure replication processes the events
2523+
time.sleep(5)
2524+
2525+
# Verify records are NOT deleted in ClickHouse (since ignore_deletes=True)
2526+
# The count should still be 3
2527+
assert len(ch.select(TEST_TABLE_NAME)) == 3, "Deletions were processed despite ignore_deletes=True"
2528+
2529+
# Insert a new record and verify it's added
2530+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (70, 80, 'data4');", commit=True)
2531+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4)
2532+
2533+
# Verify the new record is correctly added
2534+
result = ch.select(TEST_TABLE_NAME, where="departments=70 AND termine=80")
2535+
assert len(result) == 1
2536+
assert result[0]['data'] == 'data4'
2537+
2538+
# Clean up
2539+
run_all_runner.stop()
2540+
2541+
# Verify no errors occurred
2542+
assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
2543+
assert('Traceback' not in read_logs(TEST_DB_NAME))
2544+
2545+
# Additional tests for persistence after restart
2546+
2547+
# 1. Remove all entries from table in MySQL
2548+
mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE 1=1;", commit=True)
2549+
2550+
# Add a new row in MySQL before starting the replicator
2551+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (110, 120, 'offline_data');", commit=True)
2552+
2553+
# 2. Wait 5 seconds
2554+
time.sleep(5)
2555+
2556+
# 3. Remove binlog directory (similar to prepare_env, but without removing tables)
2557+
if os.path.exists(cfg.binlog_replicator.data_dir):
2558+
shutil.rmtree(cfg.binlog_replicator.data_dir)
2559+
os.mkdir(cfg.binlog_replicator.data_dir)
2560+
2561+
2562+
# 4. Create and run a new runner
2563+
new_runner = RunAllRunner(cfg_file=config_file)
2564+
new_runner.run()
2565+
2566+
# 5. Ensure it has all the previous data (should still be 4 records from before + 1 new offline record)
2567+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
2568+
ch.execute_command(f'USE `{TEST_DB_NAME}`')
2569+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
2570+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5)
2571+
2572+
# Verify we still have all the old data
2573+
assert len(ch.select(TEST_TABLE_NAME, where="departments=10 AND termine=20")) == 1
2574+
assert len(ch.select(TEST_TABLE_NAME, where="departments=30 AND termine=40")) == 1
2575+
assert len(ch.select(TEST_TABLE_NAME, where="departments=50 AND termine=60")) == 1
2576+
assert len(ch.select(TEST_TABLE_NAME, where="departments=70 AND termine=80")) == 1
2577+
2578+
# Verify the offline data was replicated
2579+
assert len(ch.select(TEST_TABLE_NAME, where="departments=110 AND termine=120")) == 1
2580+
offline_data = ch.select(TEST_TABLE_NAME, where="departments=110 AND termine=120")[0]
2581+
assert offline_data['data'] == 'offline_data'
2582+
2583+
# 6. Insert new data and verify it gets added to existing data
2584+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (90, 100, 'data5');", commit=True)
2585+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6)
2586+
2587+
# Verify the combined old and new data
2588+
result = ch.select(TEST_TABLE_NAME, where="departments=90 AND termine=100")
2589+
assert len(result) == 1
2590+
assert result[0]['data'] == 'data5'
2591+
2592+
# Make sure we have all 6 records (4 original + 1 offline + 1 new one)
2593+
assert len(ch.select(TEST_TABLE_NAME)) == 6
2594+
2595+
new_runner.stop()
2596+
finally:
2597+
# Clean up the temporary config file
2598+
os.unlink(config_file)

0 commit comments

Comments
 (0)