Skip to content

Commit 7ce1316

Browse files
authored
Use database name from query if specified (#120)
1 parent 3a30148 commit 7ce1316

File tree

2 files changed

+123
-15
lines changed

2 files changed

+123
-15
lines changed

mysql_ch_replicator/binlog_replicator.py

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os.path
66
import json
77
import random
8+
import re
89

910
from enum import Enum
1011
from logging import getLogger
@@ -379,6 +380,48 @@ def clear_old_binlog_if_required(self):
379380
self.last_binlog_clear_time = curr_time
380381
self.data_writer.remove_old_files(curr_time - BinlogReplicator.BINLOG_RETENTION_PERIOD)
381382

383+
@classmethod
384+
def _try_parse_db_name_from_query(cls, query: str) -> str:
385+
"""
386+
Extract the database name from a MySQL CREATE TABLE or ALTER TABLE query.
387+
Supports multiline queries and quoted identifiers that may include special characters.
388+
389+
Examples:
390+
- CREATE TABLE `mydb`.`mytable` ( ... )
391+
- ALTER TABLE mydb.mytable ADD COLUMN id int NOT NULL
392+
- CREATE TABLE IF NOT EXISTS mydb.mytable ( ... )
393+
- ALTER TABLE "mydb"."mytable" ...
394+
- CREATE TABLE IF NOT EXISTS `multidb` . `multitable` ( ... )
395+
- CREATE TABLE `replication-test_db`.`test_table_2` ( ... )
396+
397+
Returns the database name, or an empty string if not found.
398+
"""
399+
# Updated regex:
400+
# 1. Matches optional leading whitespace.
401+
# 2. Matches "CREATE TABLE" or "ALTER TABLE" (with optional IF NOT EXISTS).
402+
# 3. Optionally captures a database name, which can be either:
403+
# - Quoted (using backticks or double quotes) and may contain special characters.
404+
# - Unquoted (letters, digits, and underscores only).
405+
# 4. Allows optional whitespace around the separating dot.
406+
# 5. Matches the table name (which we do not capture).
407+
pattern = re.compile(
408+
r'^\s*' # optional leading whitespace/newlines
409+
r'(?i:(?:create|alter))\s+table\s+' # "CREATE TABLE" or "ALTER TABLE"
410+
r'(?:if\s+not\s+exists\s+)?' # optional "IF NOT EXISTS"
411+
# Optional DB name group: either quoted or unquoted, followed by optional whitespace, a dot, and more optional whitespace.
412+
r'(?:(?:[`"](?P<dbname_quoted>[^`"]+)[`"]|(?P<dbname_unquoted>[a-zA-Z0-9_]+))\s*\.\s*)?'
413+
r'[`"]?[a-zA-Z0-9_]+[`"]?', # table name (quoted or not)
414+
re.IGNORECASE | re.DOTALL # case-insensitive, dot matches newline
415+
)
416+
417+
m = pattern.search(query)
418+
if m:
419+
# Return the quoted db name if found; else return the unquoted name if found.
420+
if m.group('dbname_quoted'):
421+
return m.group('dbname_quoted')
422+
elif m.group('dbname_unquoted'):
423+
return m.group('dbname_unquoted')
424+
return ''
382425

383426
def run(self):
384427
last_transaction_id = None
@@ -425,12 +468,6 @@ def run(self):
425468
if isinstance(log_event.db_name, bytes):
426469
log_event.db_name = log_event.db_name.decode('utf-8')
427470

428-
if not self.settings.is_database_matches(log_event.db_name):
429-
continue
430-
431-
logger.debug(f'event matched {transaction_id}, {log_event.db_name}, {log_event.table_name}')
432-
433-
log_event.transaction_id = transaction_id
434471
if isinstance(event, UpdateRowsEvent) or isinstance(event, WriteRowsEvent):
435472
log_event.event_type = EventType.ADD_EVENT.value
436473

@@ -440,6 +477,21 @@ def run(self):
440477
if isinstance(event, QueryEvent):
441478
log_event.event_type = EventType.QUERY.value
442479

480+
if log_event.event_type == EventType.UNKNOWN.value:
481+
continue
482+
483+
if log_event.event_type == EventType.QUERY.value:
484+
db_name_from_query = self._try_parse_db_name_from_query(event.query)
485+
if db_name_from_query:
486+
log_event.db_name = db_name_from_query
487+
488+
if not self.settings.is_database_matches(log_event.db_name):
489+
continue
490+
491+
logger.debug(f'event matched {transaction_id}, {log_event.db_name}, {log_event.table_name}')
492+
493+
log_event.transaction_id = transaction_id
494+
443495
if isinstance(event, QueryEvent):
444496
log_event.records = event.query
445497
else:

test_mysql_ch_replicator.py

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from mysql_ch_replicator import config
1414
from mysql_ch_replicator import mysql_api
1515
from mysql_ch_replicator import clickhouse_api
16-
from mysql_ch_replicator.binlog_replicator import State as BinlogState, FileReader, EventType
16+
from mysql_ch_replicator.binlog_replicator import State as BinlogState, FileReader, EventType, BinlogReplicator
1717
from mysql_ch_replicator.db_replicator import State as DbReplicatorState, DbReplicator
1818
from mysql_ch_replicator.converter import MysqlToClickhouseConverter
1919

@@ -69,14 +69,16 @@ def prepare_env(
6969
cfg: config.Settings,
7070
mysql: mysql_api.MySQLApi,
7171
ch: clickhouse_api.ClickhouseApi,
72-
db_name: str = TEST_DB_NAME
72+
db_name: str = TEST_DB_NAME,
73+
set_mysql_db: bool = True
7374
):
7475
if os.path.exists(cfg.binlog_replicator.data_dir):
7576
shutil.rmtree(cfg.binlog_replicator.data_dir)
7677
os.mkdir(cfg.binlog_replicator.data_dir)
7778
mysql.drop_database(db_name)
7879
mysql.create_database(db_name)
79-
mysql.set_database(db_name)
80+
if set_mysql_db:
81+
mysql.set_database(db_name)
8082
ch.drop_database(db_name)
8183
assert_wait(lambda: db_name not in ch.get_databases())
8284

@@ -784,7 +786,7 @@ def _get_last_insert_name():
784786
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) "
785787
f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0,
786788
)
787-
789+
#`replication-test_db`
788790
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True)
789791

790792
print("running db_replicator")
@@ -823,12 +825,12 @@ def test_different_types_1():
823825
clickhouse_settings=cfg.clickhouse,
824826
)
825827

826-
prepare_env(cfg, mysql, ch)
828+
prepare_env(cfg, mysql, ch, set_mysql_db=False)
827829

828830
mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';")
829831

830832
mysql.execute(f'''
831-
CREATE TABLE `{TEST_TABLE_NAME}` (
833+
CREATE TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (
832834
`id` int unsigned NOT NULL AUTO_INCREMENT,
833835
name varchar(255),
834836
`employee` int unsigned NOT NULL,
@@ -866,7 +868,7 @@ def test_different_types_1():
866868
''')
867869

868870
mysql.execute(
869-
f"INSERT INTO `{TEST_TABLE_NAME}` (name, modified_date) VALUES ('Ivan', '0000-00-00 00:00:00');",
871+
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (name, modified_date) VALUES ('Ivan', '0000-00-00 00:00:00');",
870872
commit=True,
871873
)
872874

@@ -883,15 +885,30 @@ def test_different_types_1():
883885
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
884886

885887
mysql.execute(
886-
f"INSERT INTO `{TEST_TABLE_NAME}` (name, modified_date) VALUES ('Alex', '0000-00-00 00:00:00');",
888+
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (name, modified_date) VALUES ('Alex', '0000-00-00 00:00:00');",
887889
commit=True,
888890
)
889891
mysql.execute(
890-
f"INSERT INTO `{TEST_TABLE_NAME}` (name, modified_date) VALUES ('Givi', '2023-01-08 03:11:09');",
892+
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (name, modified_date) VALUES ('Givi', '2023-01-08 03:11:09');",
891893
commit=True,
892894
)
893895
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
894896

897+
mysql.execute(f'''
898+
CREATE TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME_2}` (
899+
`id` int unsigned NOT NULL AUTO_INCREMENT,
900+
name varchar(255),
901+
PRIMARY KEY (id)
902+
);
903+
''')
904+
905+
mysql.execute(
906+
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME_2}` (name) VALUES ('Ivan');",
907+
commit=True,
908+
)
909+
910+
assert_wait(lambda: TEST_TABLE_NAME_2 in ch.get_tables())
911+
895912
db_replicator_runner.stop()
896913
binlog_replicator_runner.stop()
897914

@@ -1535,3 +1552,42 @@ def test_alter_tokens_split():
15351552
print("Match? ", result == expected)
15361553
print("-" * 60)
15371554
assert result == expected
1555+
1556+
1557+
@pytest.mark.parametrize("query,expected", [
1558+
("CREATE TABLE `mydb`.`mytable` (id INT)", "mydb"),
1559+
("CREATE TABLE mydb.mytable (id INT)", "mydb"),
1560+
("ALTER TABLE `mydb`.mytable ADD COLUMN name VARCHAR(50)", "mydb"),
1561+
("CREATE TABLE IF NOT EXISTS mydb.mytable (id INT)", "mydb"),
1562+
("CREATE TABLE mytable (id INT)", ""),
1563+
(" CREATE TABLE `mydb` . `mytable` \n ( id INT )", "mydb"),
1564+
('ALTER TABLE "testdb"."tablename" ADD COLUMN flag BOOLEAN', "testdb"),
1565+
("create table mydb.mytable (id int)", "mydb"),
1566+
("DROP DATABASE mydb", ""),
1567+
("CREATE TABLE mydbmytable (id int)", ""), # missing dot between DB and table
1568+
("""
1569+
CREATE TABLE IF NOT EXISTS
1570+
`multidb`
1571+
.
1572+
`multitable`
1573+
(
1574+
id INT,
1575+
name VARCHAR(100)
1576+
)
1577+
""", "multidb"),
1578+
("""
1579+
ALTER TABLE
1580+
`justtable`
1581+
ADD COLUMN age INT;
1582+
""", ""),
1583+
("""
1584+
CREATE TABLE `replication-test_db`.`test_table_2` (
1585+
`id` int unsigned NOT NULL AUTO_INCREMENT,
1586+
name varchar(255),
1587+
PRIMARY KEY (id)
1588+
)
1589+
""", "replication-test_db"),
1590+
("BEGIN", ""),
1591+
])
1592+
def test_parse_db_name_from_query(query, expected):
1593+
assert BinlogReplicator._try_parse_db_name_from_query(query) == expected

0 commit comments

Comments
 (0)