Skip to content

Cloudflare HTTP Ingestor (Python caller + Worker) #32

@diegoripley

Description

@diegoripley

Overview

  1. A Python caller that orchestrates up to 12 concurrent jobs, persists results to SQLite, and produces a Parquet artifact.

Establishes update semantics upload flow to Source Cooperative.

Split out from #26. Depends on the repo reorg in #31.

Functional spec

Python caller

  • Accepts a list of URLs (file).
  • Submits up to 12 concurrent jobs to the Worker (parameterized via --concurrency).
  • Persists per-URL results to SQLite with idempotency:
    • Skip URLs already recorded as success with matching freshness indicators.
    • Retries with exponential backoff and jitter; configurable max attempts.
  • On completion, writes/updates a Parquet artifact.
  • Re-runs append new datasets or updated rows into the Parquet dataset.

Data model

SQLite schema (downloads table)

CREATE TABLE IF NOT EXISTS downloads (
  url              TEXT PRIMARY KEY,
  dataset_id       TEXT NOT NULL,
  status           TEXT NOT NULL,  -- success | failed | skipped
  http_status      INTEGER,
  error            TEXT,
  started_at       TEXT NOT NULL,
  finished_at      TEXT
);
CREATE INDEX IF NOT EXISTS ix_downloads_dataset ON downloads(dataset_id);
CREATE INDEX IF NOT EXISTS ix_downloads_status  ON downloads(status);

Parquet dataset

  • Columns mirror the SQLite schema.

CLI

uv run --urls d4c-http-ingestor ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec_2026-03-10.txt \
  --dataset-id d4c-http-ingestor ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec \
  --worker-url https://cf-data-ingestor.labs.dataforcanada.org/ \
  --db ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec.sqlite \
  --key_prefix dataforcanada/d4c-datapkg-orthoimagery/archive/
  --out parquet/ \
  --concurrency 12
  • Flags for retries, timeout, and --force-refresh to ignore cached freshness.
  • --resume (default true): on next run, pick up failed/unknown rows.
  • Calls the cloudflare worker as a POST request to https://cf-data-ingestor.labs.dataforcanada.org/ with the following JSON payload
{
  "download_url": "URL parsed from the ",
  "user_agent": "Data for Canada -  ${DATASET_ID}",
  "key_prefix": "dataforcanada/d4c-datapkg-orthoimagery/archive/${dataset-id}"
}

Acceptance criteria

Running

uv run d4c-http-ingestor ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec_2026-03-10.txt \
  --dataset-id d4c-http-ingestor ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec \
  --worker-url https://cf-data-ingestor.labs.dataforcanada.org/ \
  --db ca-qc_government_and_municipalities_of_quebec-2026A000224_d4c-datapkg-orthoimagery_orthorectified_imagery_from_quebec.sqlite \
  --key_prefix dataforcanada/d4c-datapkg-orthoimagery/archive/ \
  --out parquet/ \
  --concurrency 12
  • Creates/updates ${db}.sqlite with rows for each URL

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions