Skip to content

chester-hill-solutions/nesrm_etl_pipeline

Repository files navigation

NESRM Ingest

bulk upload dataquickstart

Store data in data/ but start the files with an underscore so it doesn't get committed

make sure the data matches the contact column headers, make sure you delete any "id" columns, use nesrm_id if it is actually the database id

if you want tags you can either make the column be tags with values or you can make the column tags:key and the values be the column values, which will make the tags:key:value,etc:etc

try to use clear tag, but nothing explicit and always nicknames or shortened names e.g. arf or tanny etc

if you do use a tag like coolGuysEvent0415 then you copy the value down, excel sometimes increments the number at the end, be mindful

I think you'd only need these two in .env but runner.js does import handler from index.js so maybe that'll bug out

AWS_API_GATEWAY_BEARER=
AWS_API_GATEWAY_ENDPOINT=

run

node runner.js --gateway --concurrency 50 --force-comms-consent <--dry-run> <--log-payload> data/_filename.csv

this will spit out a file of errors to failed_uploads/ so you can run the exact same command but instead pointed at that folder

AI Slop Instruction Area im pulling from:

This is the lambda function to handle NES Relationship Manager Ingestion.

runner.js quickstart

node runner.js <input-path> > <output-path> --unwrap-body

runner.js can run payloads locally (--local, default) or via API Gateway (--gateway). Add --slow to pause between requests. By default it forces body._meta.submission_source = "cli-runner" (logged at start and per payload); keep existing values with --keep-source_submission (or -k). Enable nested body fixups with --unwrap-body (or -u). Log-only dry runs with --dry-run (or -d).

Concurrency: use --concurrency <n> (or -p <n> / -p<n>) to run up to n payloads at once; default is 1 (sequential). Applies to both --gateway and --local, and still honors --slow between items.

Per-payload logs are written to runner_logs/ (auto-created; ignored by git) for both real sends and dry runs.

  • CSV file (raw columns): node runner.js path/to/file.csv --gateway
    • Each row becomes the request body; default headers applied.
  • CSV file with payload JSONB column (e.g., public.request export): node runner.js path/to/export.csv
    • payload is parsed (even if stringified); inner headers/body strings are parsed; default headers applied if missing. Top-level CSV columns are ignored when payload exists (payload is authoritative).
  • JSON file (single object): node runner.js path/to/payload.json
    • Treated as one payload; wraps with default headers if none.
  • JSON file (array of objects): node runner.js path/to/payloads.json
    • Each array item is sent in order.

Headers

  • If your JSON includes headers and body, runner.js uses them as-is.
  • If your JSON lacks headers, runner.js wraps the object with default headers from runner.js (includes Authorization using AWS_API_GATEWAY_BEARER).
  • CSV rows are treated as bodies without headers; default headers are applied.
  • payload columns in CSV exports are parsed; if they contain headers as strings, those are parsed too.
  • Optional: --unwrap-body/-u will try to unwrap nested body, body.value, or body.values keys inside a payload body and merge them (useful for malformed exports). Off by default.
  • Optional: --dry-run/-d logs the final event (headers + body) that would be sent, without sending.

Examples

# Local lambda handler, CSV input
node runner.js data/upload.csv --local

# Gateway with provided headers in JSON objects
node runner.js data/payloads_with_headers.json --gateway

# Disable submission_source override
node runner.js data/upload.csv --keep-source_submission

# Unwrap nested body/body.value(s) in malformed payloads
node runner.js data/export.csv --unwrap-body

# Dry run to inspect final payloads without sending
node runner.js data/export.csv --dry-run

CSV cleanup helpers

  • scripts/csvCleanupHelpers/remove_empty_cols.py: drops columns that are empty across all rows; optionally --drop-constant to remove columns where every row has the same value. Default output: data/<input_name>-noEmptyCols.csv; override with -o/--output (file or directory).
  • scripts/csvCleanupHelpers/format_4_ingest.py: normalizes common contact headers (firstname, surname, phone, email, address/street address, municipality, postcode, tags:culture, gender, date_of_birth, comms_consent, member, van_id, voted, division_electoral_district, campus_club), reorders them to the front, lowercases headers and replaces spaces with underscores, and writes data/<input-name>-formatted.csv by default; override with -o/--output.
  • scripts/csvCleanupHelpers/fix_utf_column.py: repairs mojibake in a specified column (e.g., Orl√©ans -> Orléans, Beaches‚ÄîEast York -> Beaches—East York) without silently dropping characters, writing to data/<input>-utf_fixed.csv by default; override with -o/--output.
  • scripts/csvCleanupHelpers/prependOLP23.py: prefixes all non-core columns with olp23_ (core: firstname, surname, phone, email, address, municipality, dob, birthdate, birthyear, birthmonth, postcode; also leaves any tag* column untouched). Default output: data/<input>-olp23.csv; override with -o/--output.
  • scripts/csvCleanupHelpers/filter_rows_by_value.py: keeps only rows where a given column matches a value (case-sensitive) and also writes the complement. Default outputs: data/<input-name>-<column>_is_<value>.csv and data/<input-name>-no_<column>_is_<value>.csv; override location/name with -o/--output (file or directory). If no value is provided, it writes one file per unique value (up to 10) or aborts.
  • scripts/csvCleanupHelpers/map_column_values.py: map values from one column to another based on a provided 1:1 list; skip non-empty targets by default (use --overwrite to force); writes <input>-transform_<input_col>_to_<output_col>.csv.
  • scripts/csvCleanupHelpers/cleanDOB.py: normalizes a DOB column (default date_of_birth) to YYYY-MM-DD, populates birthdate/birthmonth/birthyear, and writes successes to data/<input>-dob_fixed.csv, failures to data/<input>-dob_unparsed.csv, and blanks to data/<input>-dob_blank.csv; use -c/--column to target another column and -o/--output to change the output location/name.
  • scripts/csvCleanupHelpers/split_per_riding_ballot_dob.py: splits an input CSV (default data/_100k-noEmptyCols-formatted-olp23.csv) into data/per-riding/<riding>/per-olp23_ballot1-is-<ballot>/ folders, normalizing DOBs and emitting dob_fixed.csv, dob_unparsed.csv, and dob_blank.csv for each riding/ballot combo. Configurable DOB/ballot columns and output base.

Development

npm install -y

Setup DB

npx supabase init
npx supabase start --debug

OLD

Setup roles

Make sure there's a geo_role_passwords.sql file with every riding and region and corresponding passwords

cp supabase/20251023_create_riding_region_roles.sql supabase/roles/20251023_create_riding_region_with_passwords_roles.sql
node scripts/updateRolePasswords.js
psql "postgresql://postgres:postgres@localhost:54322/postgres" -f ./supabase/roles/init_sys_roles.sql
psql "postgresql://postgres:postgres@localhost:54322/postgres" -f ./supabase/roles/20251023_create_riding_region_roles_with_passwords.sql
npx supabase migration up --debug

You may also need touch ./supabase/.temp/profile and echo "supabase" > ./supabase/.temp/profile

Sync local db to cloud schema if not included

# BEFORE YOU DELETE make some sort of backup for the migration folder and copy contents there then
rm -rf ./supabase/migrations && mkdir ./supabase/migrations
rm -rf ./supabase/roles && mkdir ./supabase/roles
npx supabase login --debug
npx supabase link --project-ref <SUPABASE-PROJECT-REF> --debug
npx supabase db dump -f supabase/roles/<TODAYS-YYYY><MM><DD><HH><MN>00roles.sql --role-only --debug
# note that supabase defaults to using UTC time. So if you put your current time, it might run earlier or later in order then you expect
psql "postgresql://postgres:postgres@localhost:54322/postgres" -f ./supabase/roles/<TODAYS-YYYY><MM><DD><HH><MN>roles.sql
npx supabase db pull --debug
npx supabase migration up --debug

Environment

Navigate to http://localhost:54323/project/default/editor/18509?schema=public&showConnect=true&framework=nextjs&tab=frameworks and copy the DATABASE_URL and KEY in .env.local

Setup the rest of .env.local based on .env.template

Active Development

npx supabase start

Deploy

Upload from > .zip file > Upload > Navigate to deploy.zip > Save

TODO:

SWE / DE

Dev Ops

  • sort local trial flow
  • configure aws cli
  • configure lambda build Upload
  • configure github CD process
  • develop tests
    • ingest.storeEvent tests
  • configure test CI process

About

Lambda Function to handle all ingestions from June 2025 onwards

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors