Skip to content

Commit 8b3867f

Browse files
authored
Better insert statistics (#63)
1 parent c53bd65 commit 8b3867f

File tree

4 files changed

+101
-6
lines changed

4 files changed

+101
-6
lines changed

mysql_ch_replicator/clickhouse_api.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import clickhouse_connect
44

55
from logging import getLogger
6+
from dataclasses import dataclass, field
7+
from collections import defaultdict
68

79
from .config import ClickhouseSettings
810
from .table_structure import TableStructure, TableField
@@ -28,6 +30,51 @@
2830
'''
2931

3032

33+
@dataclass
34+
class SingleStats:
35+
duration: float = 0.0
36+
events: int = 0
37+
records: int = 0
38+
39+
def to_dict(self):
40+
return self.__dict__
41+
42+
43+
@dataclass
44+
class InsertEraseStats:
45+
inserts: SingleStats = field(default_factory=SingleStats)
46+
erases: SingleStats = field(default_factory=SingleStats)
47+
48+
def to_dict(self):
49+
return {
50+
'inserts': self.inserts.to_dict(),
51+
'erases': self.erases.to_dict(),
52+
}
53+
54+
55+
@dataclass
56+
class GeneralStats:
57+
general: InsertEraseStats = field(default_factory=InsertEraseStats)
58+
table_stats: dict[str, InsertEraseStats] = field(default_factory=lambda: defaultdict(InsertEraseStats))
59+
60+
def on_event(self, table_name: str, is_insert: bool, duration: float, records: int):
61+
targets = []
62+
if is_insert:
63+
targets.append(self.general.inserts)
64+
targets.append(self.table_stats[table_name].inserts)
65+
66+
for target in targets:
67+
target.duration += duration
68+
target.events += 1
69+
target.records += records
70+
71+
def to_dict(self):
72+
results = {'total': self.general.to_dict()}
73+
for table_name, table_stats in self.table_stats.items():
74+
results[table_name] = table_stats.to_dict()
75+
return results
76+
77+
3178
class ClickhouseApi:
3279
MAX_RETRIES = 5
3380
RETRY_INTERVAL = 30
@@ -44,8 +91,14 @@ def __init__(self, database: str | None, clickhouse_settings: ClickhouseSettings
4491
send_receive_timeout=clickhouse_settings.send_receive_timeout,
4592
)
4693
self.tables_last_record_version = {} # table_name => last used row version
94+
self.stats = GeneralStats()
4795
self.execute_command('SET final = 1;')
4896

97+
def get_stats(self):
98+
stats = self.stats.to_dict()
99+
self.stats = GeneralStats()
100+
return stats
101+
49102
def get_tables(self):
50103
result = self.client.query('SHOW TABLES')
51104
tables = result.result_rows
@@ -160,16 +213,27 @@ def insert(self, table_name, records, table_structure: TableStructure = None):
160213
if '.' not in full_table_name:
161214
full_table_name = f'{self.database}.{table_name}'
162215

216+
duration = 0.0
163217
for attempt in range(ClickhouseApi.MAX_RETRIES):
164218
try:
219+
t1 = time.time()
165220
self.client.insert(table=full_table_name, data=records_to_insert)
221+
t2 = time.time()
222+
duration += (t2 - t1)
166223
break
167224
except clickhouse_connect.driver.exceptions.OperationalError as e:
168225
logger.error(f'error inserting data: {e}', exc_info=e)
169226
if attempt == ClickhouseApi.MAX_RETRIES - 1:
170227
raise e
171228
time.sleep(ClickhouseApi.RETRY_INTERVAL)
172229

230+
self.stats.on_event(
231+
table_name=table_name,
232+
duration=duration,
233+
is_insert=True,
234+
records=len(records_to_insert),
235+
)
236+
173237
self.set_last_used_version(table_name, current_version)
174238

175239
def erase(self, table_name, field_name, field_values):
@@ -181,7 +245,16 @@ def erase(self, table_name, field_name, field_values):
181245
'field_name': field_name,
182246
'field_values': field_values,
183247
})
248+
t1 = time.time()
184249
self.execute_command(query)
250+
t2 = time.time()
251+
duration = t2 - t1
252+
self.stats.on_event(
253+
table_name=table_name,
254+
duration=duration,
255+
is_insert=True,
256+
records=len(field_values),
257+
)
185258

186259
def drop_database(self, db_name):
187260
self.execute_command(f'DROP DATABASE IF EXISTS {db_name}')

mysql_ch_replicator/db_optimizer.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from .config import Settings
77
from .mysql_api import MySQLApi
88
from .clickhouse_api import ClickhouseApi
9-
from .utils import GracefulKiller
9+
from .utils import RegularKiller
1010

1111

1212
logger = getLogger(__name__)
@@ -94,9 +94,9 @@ def optimize_database(self, db_name):
9494

9595
def run(self):
9696
logger.info('running optimizer')
97-
killer = GracefulKiller()
97+
RegularKiller('optimizer')
9898
try:
99-
while not killer.kill_now:
99+
while True:
100100
db_to_optimize = self.select_db_to_optimize()
101101
self.mysql_api.close()
102102
if db_to_optimize is None:
@@ -105,4 +105,3 @@ def run(self):
105105
self.optimize_database(db_name=db_to_optimize)
106106
except Exception as e:
107107
logger.error(f'error {e}', exc_info=True)
108-
logger.info('optimizer stopped')

mysql_ch_replicator/db_replicator.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from .converter import MysqlToClickhouseConverter, strip_sql_name, strip_sql_comments
1414
from .table_structure import TableStructure, TableField
1515
from .binlog_replicator import DataReader, LogEvent, EventType
16-
from .utils import GracefulKiller, touch_all_files
16+
from .utils import GracefulKiller, touch_all_files, format_floats
1717

1818

1919
logger = getLogger(__name__)
@@ -526,7 +526,8 @@ def log_stats_if_required(self):
526526

527527
self.last_dump_stats_time = curr_time
528528
self.last_dump_stats_process_time = curr_process_time
529-
logger.info(f'stats: {json.dumps(self.stats.__dict__)}')
529+
logger.info(f'stats: {json.dumps(format_floats(self.stats.__dict__))}')
530+
logger.info(f'ch_stats: {json.dumps(format_floats(self.clickhouse_api.get_stats()))}')
530531
self.stats = Statistics()
531532

532533
def upload_records_if_required(self, table_name):

mysql_ch_replicator/utils.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import signal
22
import subprocess
33
import os
4+
import sys
45
import time
56

67
from pathlib import Path
@@ -19,6 +20,17 @@ def exit_gracefully(self, signum, frame):
1920
self.kill_now = True
2021

2122

23+
class RegularKiller:
24+
def __init__(self, proc_name):
25+
self.proc_name = proc_name
26+
signal.signal(signal.SIGINT, self.exit_gracefully)
27+
signal.signal(signal.SIGTERM, self.exit_gracefully)
28+
29+
def exit_gracefully(self, signum, frame):
30+
logger.info(f'{self.proc_name} stopped')
31+
sys.exit(0)
32+
33+
2234
class ProcessRunner:
2335
def __init__(self, cmd):
2436
self.cmd = cmd
@@ -68,3 +80,13 @@ def touch_all_files(directory_path):
6880
os.utime(item, times=(current_time, current_time))
6981
except Exception as e:
7082
logger.warning(f"Failed to touch {item}: {e}")
83+
84+
85+
def format_floats(data):
86+
if isinstance(data, dict):
87+
return {k: format_floats(v) for k, v in data.items()}
88+
elif isinstance(data, list):
89+
return [format_floats(v) for v in data]
90+
elif isinstance(data, float):
91+
return round(data, 3)
92+
return data

0 commit comments

Comments
 (0)