Problem
Every asset that touches postgres does its own os.getenv("WAREHOUSE_COOLIFY_URL") + psycopg2.connect(). There are 10+ files doing this independently with no shared connection management.
Proposal
Create a WarehouseResource(ConfigurableResource) that:
- Takes
connection_url via EnvVar("WAREHOUSE_COOLIFY_URL")
- Exposes a
get_connection() context manager
- Gets wired into
Definitions once and injected via type annotations
Then migrate existing assets to use it instead of raw psycopg2.
Files to migrate
defs/airtable_raw_all_bases/definitions.py
defs/airtable_audit_logs/definitions.py
defs/loops_campaign_and_metrics_export/definitions.py
defs/ysws_programs_sync/definitions.py
defs/unified_ysws_db/definitions.py
defs/agh_fulfillment_zenventory/definitions.py
defs/zenventory_inventory_airtable_sync/definitions.py
defs/slack_users_sync/definitions.py
defs/slack/definitions.py
defs/ducklake/definitions.py
Problem
Every asset that touches postgres does its own
os.getenv("WAREHOUSE_COOLIFY_URL")+psycopg2.connect(). There are 10+ files doing this independently with no shared connection management.Proposal
Create a
WarehouseResource(ConfigurableResource)that:connection_urlviaEnvVar("WAREHOUSE_COOLIFY_URL")get_connection()context managerDefinitionsonce and injected via type annotationsThen migrate existing assets to use it instead of raw psycopg2.
Files to migrate
defs/airtable_raw_all_bases/definitions.pydefs/airtable_audit_logs/definitions.pydefs/loops_campaign_and_metrics_export/definitions.pydefs/ysws_programs_sync/definitions.pydefs/unified_ysws_db/definitions.pydefs/agh_fulfillment_zenventory/definitions.pydefs/zenventory_inventory_airtable_sync/definitions.pydefs/slack_users_sync/definitions.pydefs/slack/definitions.pydefs/ducklake/definitions.py