Skip to content

Commit 2e253cf

Browse files
committed
Optimize log imports
1 parent cce8cf0 commit 2e253cf

File tree

1 file changed

+66
-50
lines changed

1 file changed

+66
-50
lines changed

setup_staging_environment.py

Lines changed: 66 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,6 @@ def copy_recent_status_logs(self):
652652

653653
# Build dynamic query with common columns
654654
columns_str = ', '.join(common_columns)
655-
placeholders = ', '.join(['%s'] * len(common_columns))
656655

657656
# Note: columns_str is derived from information_schema, not user input
658657
# so this is safe from SQL injection
@@ -669,36 +668,42 @@ def copy_recent_status_logs(self):
669668
status_logs = prod_cursor.fetchall()
670669
logger.info(f"Found {len(status_logs)} recent status logs to import")
671670

672-
imported_count = 0
673-
for log in status_logs:
674-
try:
675-
# Insert using dynamic columns (columns derived from
676-
# information_schema, safe from SQL injection)
677-
staging_cursor.execute(
678-
f"""
679-
INSERT INTO status_log ({columns_str})
680-
VALUES ({placeholders})
681-
""", # noqa: S608
682-
log,
683-
)
684-
imported_count += 1
685-
686-
if imported_count % 100 == 0:
687-
logger.info(f"Imported {imported_count} status logs so far...")
688-
689-
except psycopg2.Error as e:
690-
logger.warning(f"Failed to import status log: {e}")
691-
# If it's an integrity error, log more details
692-
if (
693-
"unique constraint" in str(e).lower()
694-
or "duplicate key" in str(e).lower()
695-
):
696-
logger.error(
697-
"Duplicate status log key detected - "
698-
"this indicates a race condition"
671+
if not status_logs:
672+
logger.info("No status logs to import")
673+
else:
674+
# Use execute_values for fast bulk insert
675+
from psycopg2.extras import execute_values
676+
677+
batch_size = 20000
678+
imported_count = 0
679+
for i in range(0, len(status_logs), batch_size):
680+
batch = status_logs[i : i + batch_size]
681+
try:
682+
# Note: columns_str is derived from information_schema,
683+
# not user input, so this is safe from SQL injection
684+
execute_values(
685+
staging_cursor,
686+
f"INSERT INTO status_log ({columns_str}) " # noqa: S608
687+
"VALUES %s",
688+
batch,
689+
page_size=batch_size,
699690
)
700-
# Continue with other logs rather than failing completely
701-
continue
691+
imported_count += len(batch)
692+
logger.info(
693+
f"Imported {imported_count}/{len(status_logs)} "
694+
f"status logs..."
695+
)
696+
except psycopg2.Error as e:
697+
logger.warning(f"Failed to import batch at offset {i}: {e}")
698+
# If it's an integrity error, log more details
699+
if (
700+
"unique constraint" in str(e).lower()
701+
or "duplicate key" in str(e).lower()
702+
):
703+
logger.error(
704+
"Duplicate status log key detected - "
705+
"this indicates a race condition"
706+
)
702707

703708
# Set sequence to start from a safe value after the imported logs
704709
staging_cursor.execute("SELECT MAX(id) FROM status_log")
@@ -861,30 +866,41 @@ def copy_script_logs(self, script_id_mapping):
861866
script_logs = prod_cursor.fetchall()
862867
logger.info(f"Found {len(script_logs)} script logs to import")
863868

864-
imported_count = 0
869+
# Build batch of logs with remapped script IDs
870+
logs_to_insert = []
865871
for log in script_logs:
866-
try:
867-
old_script_id = log[3]
868-
new_script_id = script_id_mapping.get(old_script_id)
869-
870-
if new_script_id:
871-
# Insert without ID to let sequence generate new IDs
872-
staging_cursor.execute(
873-
"""
874-
INSERT INTO script_log (text, register_date, script_id)
875-
VALUES (%s, %s, %s)
876-
""",
877-
(log[1], log[2], new_script_id),
878-
)
879-
imported_count += 1
872+
old_script_id = log[3]
873+
new_script_id = script_id_mapping.get(old_script_id)
874+
if new_script_id:
875+
# (text, register_date, script_id)
876+
logs_to_insert.append((log[1], log[2], new_script_id))
877+
878+
if not logs_to_insert:
879+
logger.info("No script logs to import after ID mapping")
880+
return
880881

881-
if imported_count % 100 == 0:
882-
logger.info(
883-
f"Imported {imported_count} script logs so far..."
884-
)
882+
# Use execute_values for fast bulk insert
883+
from psycopg2.extras import execute_values
885884

885+
batch_size = 20000
886+
imported_count = 0
887+
for i in range(0, len(logs_to_insert), batch_size):
888+
batch = logs_to_insert[i : i + batch_size]
889+
try:
890+
execute_values(
891+
staging_cursor,
892+
"INSERT INTO script_log (text, register_date, script_id) "
893+
"VALUES %s",
894+
batch,
895+
page_size=batch_size,
896+
)
897+
imported_count += len(batch)
898+
logger.info(
899+
f"Imported {imported_count}/{len(logs_to_insert)} "
900+
f"script logs..."
901+
)
886902
except psycopg2.Error as e:
887-
logger.warning(f"Failed to import script log {log[0]}: {e}")
903+
logger.warning(f"Failed to import batch at offset {i}: {e}")
888904

889905
# Set sequence to start from a safe value after the imported logs
890906
try:

0 commit comments

Comments
 (0)