Skip to content

Commit 1ccb123

Browse files
authored
Skip filtered databases (#4)
1 parent 6835f5e commit 1ccb123

File tree

5 files changed

+21
-14
lines changed

5 files changed

+21
-14
lines changed

mysql_ch_replicator/binlog_replicator.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
)
2121
from .pymysqlreplication.event import QueryEvent
2222

23-
from .config import MysqlSettings, BinlogReplicatorSettings
23+
from .config import Settings, BinlogReplicatorSettings
2424
from .utils import GracefulKiller
2525

2626

@@ -340,17 +340,18 @@ class BinlogReplicator:
340340
BINLOG_RETENTION_PERIOD = 12 * 60 * 60
341341
READ_LOG_INTERVAL = 1
342342

343-
def __init__(self, mysql_settings: MysqlSettings, replicator_settings: BinlogReplicatorSettings):
344-
self.mysql_settings = mysql_settings
345-
self.replicator_settings = replicator_settings
343+
def __init__(self, settings: Settings):
344+
self.settings = settings
345+
self.mysql_settings = settings.mysql
346+
self.replicator_settings = settings.binlog_replicator
346347
mysql_settings = {
347-
'host': mysql_settings.host,
348-
'port': mysql_settings.port,
349-
'user': mysql_settings.user,
350-
'passwd': mysql_settings.password,
348+
'host': self.mysql_settings.host,
349+
'port': self.mysql_settings.port,
350+
'user': self.mysql_settings.user,
351+
'passwd': self.mysql_settings.password,
351352
}
352353
self.data_writer = DataWriter(self.replicator_settings)
353-
self.state = State(os.path.join(replicator_settings.data_dir, 'state.json'))
354+
self.state = State(os.path.join(self.replicator_settings.data_dir, 'state.json'))
354355
logger.info(f'state start position: {self.state.prev_last_seen_transaction}')
355356

356357
log_file, log_pos = None, None
@@ -401,9 +402,13 @@ def run(self):
401402
if hasattr(event, 'table'):
402403
log_event.table_name = event.table
403404
log_event.db_name = event.schema
405+
404406
if isinstance(log_event.db_name, bytes):
405407
log_event.db_name = log_event.db_name.decode('utf-8')
406408

409+
if not self.settings.is_database_matches(log_event.db_name):
410+
continue
411+
407412
log_event.transaction_id = transaction_id
408413
if isinstance(event, UpdateRowsEvent) or isinstance(event, WriteRowsEvent):
409414
log_event.event_type = EventType.ADD_EVENT.value

mysql_ch_replicator/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import yaml
2+
import fnmatch
23

34
from dataclasses import dataclass
45

@@ -44,3 +45,6 @@ def load(self, settings_file):
4445
self.databases = data['databases']
4546
assert isinstance(self.databases, str)
4647
self.binlog_replicator = BinlogReplicatorSettings(**data['binlog_replicator'])
48+
49+
def is_database_matches(self, db_name):
50+
return fnmatch.fnmatch(db_name, self.databases)

mysql_ch_replicator/main.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ def set_logging_config(tags):
2020
def run_binlog_replicator(args, config: Settings):
2121
set_logging_config('binlogrepl')
2222
binlog_replicator = BinlogReplicator(
23-
mysql_settings=config.mysql,
24-
replicator_settings=config.binlog_replicator,
23+
settings=config,
2524
)
2625
binlog_replicator.run()
2726

mysql_ch_replicator/runner.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import os
22
import time
33
import sys
4-
import fnmatch
54

65
from logging import getLogger
76

@@ -59,7 +58,7 @@ def run(self):
5958
database=None, mysql_settings=self.config.mysql,
6059
)
6160
databases = mysql_api.get_databases()
62-
databases = [db for db in databases if fnmatch.fnmatch(db, self.databases)]
61+
databases = [db for db in databases if self.config.is_database_matches(db)]
6362

6463
killer = GracefulKiller()
6564

tests_config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ binlog_replicator:
1515
data_dir: '/app/binlog/'
1616
records_per_file: 100000
1717

18-
databases: 'database_name_pattern_*'
18+
databases: '*test*'

0 commit comments

Comments
 (0)