Skip to content

Commit a5001fe

Browse files
committed
Setup API_USER, and copy boundaries
1 parent 2df2f72 commit a5001fe

File tree

1 file changed

+216
-7
lines changed

1 file changed

+216
-7
lines changed

setup_staging_environment.py

Lines changed: 216 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,31 @@ def __init__(self):
9898
},
9999
]
100100

101+
# API Environment User configuration (for execution containers)
102+
self.api_environment_user = {
103+
"email": os.getenv("API_ENVIRONMENT_USER"),
104+
"password": os.getenv("API_ENVIRONMENT_USER_PASSWORD"),
105+
"name": "API Environment Automation User",
106+
"role": "USER", # Standard user role is sufficient for execution
107+
}
108+
101109
# Check for required test user passwords
102110
required_vars = [
103111
"TEST_SUPERADMIN_PASSWORD",
104112
"TEST_ADMIN_PASSWORD",
105113
"TEST_USER_PASSWORD",
106114
]
115+
116+
# API_ENVIRONMENT_USER is optional but recommended
117+
if (
118+
self.api_environment_user["email"]
119+
and not self.api_environment_user["password"]
120+
):
121+
logger.warning(
122+
"API_ENVIRONMENT_USER is set but API_ENVIRONMENT_USER_PASSWORD is not. "
123+
"Execution containers may fail to authenticate."
124+
)
125+
107126
missing_vars = [var for var in required_vars if not os.getenv(var)]
108127

109128
if missing_vars:
@@ -146,9 +165,7 @@ def refresh_collation_version(self):
146165
cursor = conn.cursor()
147166

148167
db_name = self.staging_db_config["database"]
149-
cursor.execute(
150-
f'ALTER DATABASE "{db_name}" REFRESH COLLATION VERSION'
151-
)
168+
cursor.execute(f'ALTER DATABASE "{db_name}" REFRESH COLLATION VERSION')
152169
logger.info(f"✓ Refreshed collation version for database {db_name}")
153170

154171
except psycopg2.Error as e:
@@ -229,6 +246,61 @@ def create_test_users(self):
229246
)
230247
for user_data in self.test_users:
231248
logger.info(f" - {user_data['role']}: {user_data['email']}")
249+
250+
# Create API Environment User if configured
251+
if (
252+
self.api_environment_user["email"]
253+
and self.api_environment_user["password"]
254+
):
255+
env_user_id = str(uuid.uuid4())
256+
env_hashed_password = generate_password_hash(
257+
self.api_environment_user["password"]
258+
)
259+
260+
logger.info(
261+
f"Creating API environment user: "
262+
f"{self.api_environment_user['email']}"
263+
)
264+
265+
cursor.execute(
266+
"""
267+
INSERT INTO "user" (id, email, name, country, institution,
268+
password, role, created_at, updated_at)
269+
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
270+
ON CONFLICT (email) DO UPDATE SET
271+
name = EXCLUDED.name,
272+
password = EXCLUDED.password,
273+
role = EXCLUDED.role,
274+
updated_at = EXCLUDED.updated_at
275+
RETURNING id;
276+
""",
277+
(
278+
env_user_id,
279+
self.api_environment_user["email"],
280+
self.api_environment_user["name"],
281+
"API Automation",
282+
"Trends.Earth System",
283+
env_hashed_password,
284+
self.api_environment_user["role"],
285+
datetime.now(UTC),
286+
datetime.now(UTC),
287+
),
288+
)
289+
290+
result = cursor.fetchone()
291+
actual_env_user_id = result[0] if result else env_user_id
292+
conn.commit()
293+
294+
logger.info(
295+
f"✓ API environment user {self.api_environment_user['email']} "
296+
f"created/updated with ID: {actual_env_user_id}"
297+
)
298+
else:
299+
logger.warning(
300+
"⚠️ API_ENVIRONMENT_USER not configured - "
301+
"execution containers may fail to authenticate"
302+
)
303+
232304
logger.info("=" * 60)
233305
return superadmin_id
234306

@@ -642,20 +714,19 @@ def copy_recent_status_logs(self):
642714

643715
# Find common columns (excluding 'id' which we'll auto-generate)
644716
common_columns = [
645-
col for col in staging_columns
646-
if col in prod_columns and col != 'id'
717+
col for col in staging_columns if col in prod_columns and col != "id"
647718
]
648719
logger.info(f"Common columns for import: {common_columns}")
649720

650-
if not common_columns or 'timestamp' not in common_columns:
721+
if not common_columns or "timestamp" not in common_columns:
651722
logger.warning(
652723
"No compatible columns found between production and staging "
653724
"status_log tables. Skipping status log import."
654725
)
655726
return
656727

657728
# Build dynamic query with common columns
658-
columns_str = ', '.join(common_columns)
729+
columns_str = ", ".join(common_columns)
659730

660731
# Note: columns_str is derived from information_schema, not user input
661732
# so this is safe from SQL injection
@@ -938,6 +1009,141 @@ def copy_script_logs(self, script_id_mapping):
9381009
prod_conn.close()
9391010
staging_conn.close()
9401011

1012+
def copy_boundary_tables(self):
1013+
"""Copy administrative boundary tables from production to staging.
1014+
1015+
Copies the following tables:
1016+
- admin_boundary_0_metadata (country-level boundaries)
1017+
- admin_boundary_1_metadata (state/province-level metadata)
1018+
- admin_boundary_1_unit (individual state/province units)
1019+
1020+
These tables are required for the boundaries API endpoints.
1021+
"""
1022+
logger.info("=" * 60)
1023+
logger.info("COPYING BOUNDARY TABLES FROM PRODUCTION")
1024+
logger.info("=" * 60)
1025+
1026+
if not self.prod_db_config:
1027+
logger.warning(
1028+
"Production database not configured, skipping boundary import"
1029+
)
1030+
return
1031+
1032+
prod_conn = self.connect_to_database(self.prod_db_config)
1033+
staging_conn = self.connect_to_database(self.staging_db_config)
1034+
1035+
if not prod_conn or not staging_conn:
1036+
logger.error("Could not connect to databases for boundary copy")
1037+
return
1038+
1039+
prod_cursor = None
1040+
staging_cursor = None
1041+
1042+
# Tables to copy (in order due to potential foreign key relationships)
1043+
boundary_tables = [
1044+
"admin_boundary_0_metadata",
1045+
"admin_boundary_1_metadata",
1046+
"admin_boundary_1_unit",
1047+
]
1048+
1049+
try:
1050+
prod_cursor = prod_conn.cursor()
1051+
staging_cursor = staging_conn.cursor()
1052+
1053+
for table_name in boundary_tables:
1054+
logger.info(f"Copying {table_name}...")
1055+
1056+
# Check if table exists in production
1057+
prod_cursor.execute(
1058+
"""
1059+
SELECT EXISTS (
1060+
SELECT FROM information_schema.tables
1061+
WHERE table_name = %s
1062+
)
1063+
""",
1064+
(table_name,),
1065+
)
1066+
if not prod_cursor.fetchone()[0]:
1067+
logger.warning(
1068+
f"Table {table_name} does not exist in production, skipping"
1069+
)
1070+
continue
1071+
1072+
# Get column names from production
1073+
prod_cursor.execute(
1074+
"""
1075+
SELECT column_name FROM information_schema.columns
1076+
WHERE table_name = %s
1077+
ORDER BY ordinal_position
1078+
""",
1079+
(table_name,),
1080+
)
1081+
columns = [row[0] for row in prod_cursor.fetchall()]
1082+
1083+
if not columns:
1084+
logger.warning(f"No columns found for {table_name}, skipping")
1085+
continue
1086+
1087+
# Clear existing data in staging
1088+
# Table names are from hardcoded whitelist, safe from injection
1089+
delete_sql = f"DELETE FROM {table_name}" # noqa: S608
1090+
staging_cursor.execute(delete_sql)
1091+
staging_conn.commit()
1092+
1093+
# Count records in production
1094+
count_sql = f"SELECT COUNT(*) FROM {table_name}" # noqa: S608
1095+
prod_cursor.execute(count_sql)
1096+
total_count = prod_cursor.fetchone()[0]
1097+
1098+
if total_count == 0:
1099+
logger.info(f"No records in {table_name}, skipping")
1100+
continue
1101+
1102+
# Fetch all records from production
1103+
columns_str = ", ".join(f'"{col}"' for col in columns)
1104+
select_sql = f"SELECT {columns_str} FROM {table_name}" # noqa: S608
1105+
prod_cursor.execute(select_sql)
1106+
records = prod_cursor.fetchall()
1107+
1108+
# Build insert query with placeholders
1109+
placeholders = ", ".join(["%s"] * len(columns))
1110+
insert_query = (
1111+
f"INSERT INTO {table_name} ({columns_str}) " # noqa: S608
1112+
f"VALUES ({placeholders})"
1113+
)
1114+
1115+
# Batch insert
1116+
batch_size = 1000
1117+
imported = 0
1118+
for i in range(0, len(records), batch_size):
1119+
batch = records[i : i + batch_size]
1120+
staging_cursor.executemany(insert_query, batch)
1121+
staging_conn.commit()
1122+
imported += len(batch)
1123+
logger.info(f"Imported {imported}/{total_count} {table_name}...")
1124+
1125+
logger.info(
1126+
f"✅ Successfully copied {total_count} records to {table_name}"
1127+
)
1128+
1129+
logger.info("=" * 60)
1130+
logger.info("BOUNDARY TABLE COPY COMPLETE")
1131+
logger.info("=" * 60)
1132+
1133+
except psycopg2.Error as e:
1134+
logger.error(f"Error copying boundary tables: {e}")
1135+
if staging_conn:
1136+
staging_conn.rollback()
1137+
finally:
1138+
if prod_cursor:
1139+
prod_cursor.close()
1140+
if staging_cursor:
1141+
staging_cursor.close()
1142+
if prod_conn:
1143+
prod_conn.close()
1144+
if staging_conn:
1145+
staging_conn.close()
1146+
9411147
def verify_setup(self):
9421148
"""Verify the staging environment setup."""
9431149
logger.info("Verifying staging database setup...")
@@ -1013,6 +1219,9 @@ def run(self):
10131219
self.copy_recent_status_logs()
10141220
self.copy_script_logs(script_id_mapping)
10151221

1222+
# Copy boundary tables (needed for boundaries API)
1223+
self.copy_boundary_tables()
1224+
10161225
# Verify setup
10171226
self.verify_setup()
10181227

0 commit comments

Comments
 (0)