Skip to content

Group project for a data pipeline using CCP, BigQuery and a Leaflet map front-end for real-time ingestions

Notifications You must be signed in to change notification settings

araneida/pipeline-project

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Data pipeline for traffic accident and weather data, with ML predictions

image

End-to-end pipeline that:

  • fetches accident data (brottsplatskartan) and enriches it with weather (open-meteo),
  • lands raw newline-delimited json (ndjson) in google cloud storage,
  • loads raw data to bigquery and transforms it with dbt,
  • trains an ML model on historical accident + weather data and stores a joblib bundle in GCS,
  • fetches forecast data via a Cloud functions http proxy,
  • serves a Flask prediction api on Cloud run that writes predictions to BigQuery,
  • is callable from a clickable leaflet map or other clients.

Predictions are appended to a staging table in bigquery.

Architecture at a glance

  • Ingest (python)
    • fetch_raw_accident_data.py → pulls accidents and writes ndjson to gcs under accident/ingest_date=YYYY-MM-DD/…
    • fetch_weather_data.py → reads the latest accident files from gcs and fetches hourly weather per accident; writes ndjson to weather/ingest_date=YYYY-MM-DD/….
    • Builds and deploys container images (ingest, dbt, ML) via Cloud Build triggers.
    • ingestion scripts are containerized and run as Cloud Run Jobs through Cloud scheduler once a day.
  • Land to warehouse (python)
    • load_gcs_jsonl_to_bq.py → loads the latest date folder (or a forced date) from gcs into one table per category in BigQuery (raw layer), partitioned by time, on a daily schedule.
  • Transformation (dbt)
    • Staging models build cleaned, de-duplicated tables (team_rs_stg dataset), one for each of accident and weather data, ready for modeling.
    • Dbt transformation from stg to mart joins the two tables and adds calculated columns.
    • Dbt transformations are set to a schedule at 2 AM each night.
  • Forecast proxy (Cloud functions gen 2)
    • apps/function/main.py exposes a thin http wrapper around Open-Meteo to fetch hourly forecasts by latitude, longitude, starttime, endtime.
  • ML service (Cloud run)
    • apps/ml/src/train.py + apps/ml/src/model.py trains an xgboost classifier, scales features, and saves {model, scaler, feature_columns} as a joblib bundle to gs://$MODEL_BUCKET/models/team_model.joblib.
    • apps/ml/src/app.py (Flask) + apps/ml/src/predict.py fetches forecasts via the function, aligns features, loads the model bundle, predicts predicted_injury, and appends results to a BigQuery table.
    • apps\ml\src\get_forecast_data.py fetches weather forecast data from apps/function/main.py and transforms it to fit the ML-model.
    • apps\ml\src\get_historical_data.py reads historical weather and accident data from BigQuery and transforms it fo fit the ML-model.
  • Transformations for frontend
    • apps/dbt/team_roadsaftai_rs/models/mart/mart_predicted_injuries.sql is triggered at the end of the ML service run.
    • Transforms dates.
    • Adds calculated columns for display in Looker Studio frontend.
    • Saves a mart table to BigQuery.
  • Frontend Leaflet & Turf map to trigger predictions
    • apps/frontend/pick_location.html is a Leaflet map in an HTML page, using Turf to make sure predictions are triggered only for the Stockholm Region (a GeoJson border sets the limit)
    • Calls the Flask endpoint apps/ml/src/app.py with chosen marker's coordinates and a 24 hr timespan.
    • Displays the results of the fetch: number of rows written, coordinates. Optional: Add a Looker Studio table connected to team_rs_mart.mart_latest_prediction to display predictions.

Reference docs:
brottsplatskartan api · open-meteo archive api · bigquery load from gcs · cloud run jobs · cloud scheduler jobs · dbt bigquery setup


Storage layout (gcs)

  • bucket: accident-raw-data (uniform access, same region as bigquery dataset)
  • prefixes:
    • accident/ingest_date=YYYY-MM-DD/accident_<category>.jsonl
    • weather/ingest_date=YYYY-MM-DD/weather_<category>.jsonl

Categories: hit_and_run, single, unspecified, wildlife, wounded.


Raw tables (bigquery)

  • dataset: team_rs_raw
  • tables created/append-loaded per file:
    • accidents: raw_accident_<category>partitioned by pubdate_iso8601
    • weather: raw_weather_<category>partitioned by weather_hour

Note: loading is done with BigQuery’s newline-delimited json loader (ndjson).


1) Ingest – accidents

Script: apps/ingest/src/fetch_raw_accident_data.py

What it does

  • pages through Brottsplatskartan's /api/events with polite backoff and jitter
  • incremental mode: looks up a watermark in BigQuery (max(pubdate_iso8601)), backs off by an overlap buffer (e.g. 24h), and fetches only newer rows
  • min-age gate: skips events newer than --min-age-days (default 5) to keep accident and weather in sync (open-meteo's archive may lag a few days)

Example (to gcs)

python fetch_raw_accident_data.py \
  --area "stockholms län" \
  --event-type "trafikolycka, vilt" \
  --limit 500 \
  --output accident_wildlife.jsonl \
  --gcs-bucket accident-data-raw \
  --gcs-prefix "accident/ingest_date=$(date -u +%F)" \
  --incremental \
  --overlap-hours 24 \
  --bq-project team-roadsaftai-rs \
  --bq-dataset team_rs_raw \
  --bq-table raw_accident_wildlife \
  --min-age-days 5
  --app 'tool-name'

Note: The --app parameter is optional but encouraged by the API provider. Use your own app name here.

2) Ingest – weather

Script: apps/ingest/src/fetch_weather_data.py

What it does

  • discovers the latest accident date folder (or accepts an override uri)
  • for each accident row:
    • computes weather_hour (utc truncated to the hour)
    • calls open-meteo historical /v1/archive for that lat/lon/day and picks the closest hour
    • writes one weather record per accident:
      weather_id, accident_id, weather_hour, and corresponding weather variables (temp, precip, wind, etc.)
  • outputs ndjson to weather/ingest_date=YYYY-MM-DD/weather_<category>.jsonl

note on lag: we keep accident + weather in sync by delaying accident ingestion with --min-age-days (default 5).


3) Load GCS → BigQuery (raw)

Script: apps/ingest/src/load_gcs_jsonl_to_bq.py

What it does

  • finds the latest ingest_date=YYYY-MM-DD/ under accident/ and weather/ (or use --date)
  • skips empty files (0 bytes)
  • submits load jobs (ndjson) into the per-file tables in team_rs_raw, appending data
  • ensures partitioning field matches the table (accident: pubdate_iso8601, weather: weather_hour)

Example

python load_gcs_jsonl_to_bq.py \
  --project team-roadsaftai-rs \
  --dataset team_rs_raw \
  --bucket accident-data-raw \
  --accidents-prefix accident/ \
  --weather-prefix weather/ \
  --location europe-north2

4) Dbt – staging and mart models

Project: apps/dbt/team_roadsaftai_rs/

Datasets

  • raw: team_rs_raw (landing tables)
  • staging: team_rs_stg (dbt target dev_stg)
  • mart: team_rs_mart (for visualization)

Models

  • models/staging/stg_accidents – unions raw accident tables, prioritizes rows with title_type 'personskada', normalizes timestamps/columns, and de-duplicates by a combination of description and pubdate_iso8601.
  • models/staging/stg_weather – unions raw weather tables and de-duplicates by accident_id:
    1. pick the row with highest completeness (most non-null fields),
    2. if tied, pick the latest weather_hour.
  • models/mart/mart_accident_weather – left joins stg tables on stg_accident.id = stg_weather.accident_id, adds calculated columns for is_snow_on_ground, is_winter, is_rush_hour and daypart. Converts nulls to 0 where reasonable.

Run locally (for debugging)

# inside apps/dbt/team_roadsaftai_rs
export DBT_PROFILES_DIR=../../../.github/dbt_profiles

# build staging models into team_rs_stg
dbt build --target dev_stg

5) Fetch forecast data (Cloud functions gen 2)

Goal

Expose a thin http proxy around the Open-Meteo forecast api, so downstream services (e.g., the ml api) can fetch hourly weather by latitude, longitude, and a time window.

Source & entry point

  • code: apps/function/main.py
  • entry point (function name): apps (decorated with @functions_framework.http)
  • runtime: Python 3.11

Deploy

gcloud functions deploy weather-forecast \
  --gen2 \
  --runtime=python311 \
  --region=YOUR-REGION \
  --entry-point=apps \
  --source=apps/function \
  --trigger-http \
  --allow-unauthenticated

Replace YOUR-REGION with your standard region (e.g., europe-north2). Make sure the Cloud Functions API is enabled in the project.

API contract (http)

Endpoint

GET / (deployed as weather-forecast):
https://<REGION>-<PROJECT_ID>.cloudfunctions.net/weather-forecast

Query parameters (all required)

  • latitude — stringified float, e.g. "59.33"
  • longitude — stringified float, e.g. "18.06"
  • starttime — ISO 8601 "YYYY-MM-DDThh:mm"
  • endtime — ISO 8601 "YYYY-MM-DDThh:mm"

Responses

  • 200 OK — JSON object:

    { "json_body": , "query_params": { ...original query string... }, "response": { ...Open-Meteo hourly payload... } }

  • 400 Bad Request — JSON with an error message when validation fails (e.g., missing or malformed starttime/endtime).

Selection of parameters The function selects the relevant weather parameters to fetch from Open-Meteo. List of parameters fetched:

  • temperature_2m
  • dew_point_2m
  • soil_temperature_0_to_7cm
  • precipitation
  • rain
  • snowfall
  • snow_depth
  • wind_speed_10m
  • wind_gusts_10m
  • cloud_cover
  • relative_humidity_2m
  • weather_code
  • visibility

Notes

  • The function validates the time format and returns 400 if invalid.
  • Hourly variables requested from Open-Meteo are defined inside main.py. Keep those aligned with the features used by the ML service.
  • The function returns a pass-through of the upstream json under the response key; the ML service reads from that.

6) Local training of the model

Goal

Produce and upload a model bundle {model, scaler, feature_columns} to gs://$MODEL_BUCKET/models/team_model.joblib, based on historical accidents + weather in BigQuery.

Prerequisites

  • the staging dataset exists (e.g., team_rs_stg) and contains joined historical data.
  • the service account you use for training can read from BigQuery and write to the model bucket.
  • env vars available at runtime:
    • GOOGLE_CLOUD_PROJECT – your project id
    • MODEL_BUCKET – target bucket for model artifacts

Setup

  • Training pipeline: apps/ml/src/train.py + apps/ml/src/model.py (feature prep, SMOTE, MinMaxScaler, XGBoost; saves model artifacts).
  • Model I/O helpers: apps/ml/src/utils/model_io.py (save/upload to GCS).
  • Preparation of training data: apps\ml\src\get_historical_data.py (fetches training data → transforms data)

What it does

  • Training ((train.pyget_historical_data.py)→ model.py):

    • Reads accident and weather tables from BigQuery staging (team_rs_stg),
    • Joins the tables,
    • Engineers time features (day_of_year, year, hour)
    • Builds binary label injury, balances classes (SMOTE)
    • Scales features
    • Uses train/test split
    • Trains an XGBoost classifier
  • Saves to bucket The model is saved as a joblib bundle {model, scaler, feature_columns} to gs://$MODEL_BUCKET/models/team_model.joblib.


Run locally

# run from repo root with valid ADC (or GOOGLE_APPLICATION_CREDENTIALS set)
python apps/ml/src/train.py

Verify the artifact

gsutil ls gs://$MODEL_BUCKET/models/team_model.joblib

Permissions (IAM)

  • training job/service account:
    • roles/bigquery.dataViewer (read training tables)
    • roles/storage.objectAdmin on $MODEL_BUCKET (write model bundle)
    • roles/logging.logWriter
  • cloud scheduler sender (if using option c):
    • roles/run.invoker on the ml-train job

7) ML predictions API

Code

  • Flask entrypoint: apps/ml/src/app.py (HTTP GET / → triggers prediction for the given query params).
  • Prediction logic: apps/ml/src/predict.py (fetches transformed forecast data → loads model bundle from GCS → scales → predicts → writes to BigQuery).
  • Model I/O helpers: apps/ml/src/utils/model_io.py (load from GCS).
  • Preparation of prediction data: - apps\ml\src\get_forecast_data.py (fetches forecast data → transforms data)
  • Dockerfile: apps/ml/Dockerfile (container for Cloud Run)

What it does

  • ((app.pyget_forecast_data.py)→ predict.py): Pulls hourly forecast via the Cloud Function (see step 5), reshapes to the trained feature set, loads the model bundle from GCS, generates predicted_injury, and writes results to BigQuery (in the staging dataset), then triggers dbt transformation for BigQuery mart table team_rs_mart.mart_predicted_injuries (see step 8).

Environment

  • GOOGLE_CLOUD_PROJECT – your GCP project id (used by clients)
  • MODEL_BUCKET – GCS bucket where the model bundle lives (e.g., team-roadsaftai-re-models)
  • PORT – defaults to 8080 for Cloud Run

The service relies on Application Default Credentials on Cloud Run. If running locally, set GOOGLE_APPLICATION_CREDENTIALS to a service-account key with access to BigQuery and the model bucket.

Build and deploy (Cloud Run)

# from repo root – build the image
gcloud builds submit -t gcr.io/$PROJECT_ID/ml-api -f apps/ml/Dockerfile .

# deploy to Cloud Run (adjust region)
gcloud run deploy ml-api \
  --image gcr.io/$PROJECT_ID/ml-api \
  --region europe-north1 \
  --platform managed \
  --allow-unauthenticated \
  --set-env-vars MODEL_BUCKET=team-roadsaftai-re-models,GOOGLE_CLOUD_PROJECT=$PROJECT_ID

Local run (optional)

docker build -f apps/ml/Dockerfile -t ml-api:local .
docker run --rm -p 8080:8080 \
  -e GOOGLE_CLOUD_PROJECT=your-project \
  -e MODEL_BUCKET=your-models-bucket \
  ml-api:local
**Local run (optional)**

API (http)

GET / (Cloud Run service URL)

Query params (all optional – sensible defaults exist in get_forecast_data):

  • latitude — stringified float (e.g., "59.33")
  • longitude — stringified float (e.g., "18.06")
  • starttime — "YYYY-MM-DDThh:mm"
  • endtime — "YYYY-MM-DDThh:mm"

Behavior

  • fetches hourly forecast via the function (step 5)
  • aligns columns to the trained feature set
  • loads model bundle from gs://$MODEL_BUCKET/models/team_model.joblib
  • writes predictions to BigQuery (staging dataset)
  • returns a short JSON payload (stringified) from the Flask handler

Quick test

curl -G "https://YOUR-CLOUD-RUN-URL/" \
  --data-urlencode "latitude=59.33" \
  --data-urlencode "longitude=18.06" \
  --data-urlencode "starttime=2025-03-14T15:00" \
  --data-urlencode "endtime=2025-03-14T18:00"

Notes

  • Make sure a model bundle exists in gs://$MODEL_BUCKET/models/team_model.joblib (run training once before calling the API).
  • The prediction write uses pandas → BigQuery; ensure the destination dataset exists and the service account has write permissions.
  • Keep feature names consistent between historical training data and the forecast payload (add column renames where necessary).

8) Transformation of prediction data for frontend

Goal

Convert and enrich the ML prediction output from the staging dataset (team_rs_stg.predicted_injuries) into a clean, analytics-ready mart table with unified timestamps and derived date fields.

Project & model

  • dbt project: apps/dbt/team_roadsaftai_rs/
  • model: models/mart/mart_predicted_injuries.sql
  • runner script: apps/dbt/team_roadsaftai_rs/run_dbt.sh (Cloud Run Job entrypoint)
  • tests: models/mart/mart_predicted_injuries.yml (accepted ranges + not null)
  • output dataset: team_rs_mart

What it does

  • Reads from the staging source team_rs_stg.predicted_injuries via source() and casts all weather features to their analytics types (e.g., temperature_2m, precipitation, wind_speed_10m, etc.).
  • Cleans and standardizes timestamps (ISO-8601, epoch, date-only formats).
  • Outputs a clustered, partitioned table, team_rs_mart.mart_predicted_injuries: partitioned by event_date, clustered by weather_code and predicted_injury. New prediction data is appended. A view in BigQuery can be used to always hold the latest prediction data for visualization and monitoring.

How it runs

  • The model runs as a Cloud Run Job containerized in the dbt image: gcr.io/$PROJECT_ID/team-rs-dbt
  • Cloud Run Job command triggers run_dbt.sh: bash run /app/run_dbt.sh
  • It is triggered automatically each time new prediction rows are written to team_rs_stg.predicted_injuries (from the ML API, apps.py) or can be scheduled via Cloud Scheduler.
  • environment (examples): BQ_PROJECT, BQ_LOCATION, DBT_PROJECT_DIR=/app, DBT_PROFILES_DIR=/app/profiles.

Downstream

A simple BigQuery view selects the latest batch by max prediction_timestamp_clean (or prediction_ts) for dashboards in Looker Studio.

9) Frontend Leaflet map to trigger real-time predictions

Goal

Provide an interactive, browser-based interface where users can click a location on a map of Stockholm Region to trigger real-time ML predictions for weather-related injury risk.

Source & structure

  • apps/frontend/pick_location.html – standalone HTML + JavaScript file (using Leaflet and Turf.js)
  • apps/frontend/stockholms_lan.geojson – polygon defining the Stockholm Region boundary, used to restrict valid clicks

What it does

  • Displays an interactive Leaflet map centered on the Stockholm region.
  • Loads the region boundary from stockholms_lan.geojson and visually outlines it on the map.
  • When a user clicks inside the region, a marker is placed at that location.
  • The click activates a “Predict risk for injuries here” button.
  • When pressed, the app:
    1. Captures the clicked coordinates (latitude, longitude),
    2. Computes the local start hour and the end hour (24-hour window ahead, in Europe/Stockholm time),
    3. Calls the Flask ML prediction API (Cloud Run, step 6) with those parameters,
    4. Displays the API response directly on the page (JSON, HTML, or text “receipt”).
  • The frontend can be hosted from a public GCS bucket or integrated into another static hosting service.

Example

Open the deployed HTML file (e.g. from your GCS bucket): https://storage.googleapis.com/<YOUR_BUCKET_NAME>/pick_location.html

Click anywhere inside the red boundary of Stockholm County → press the Predict button → view the model’s response inline.

Tech stack

  • Leaflet.js – map rendering and interaction
  • Turf.js – geographic point-in-polygon validation
  • Vanilla JavaScript – API call construction and dynamic DOM updates
  • OpenStreetMap tiles – base map layer

Deployment

# upload files to a public GCS bucket
gsutil cp pick_location.html stockholms_lan.geojson gs://your-frontend-bucket

# make them public (if organization policy allows)
gsutil iam ch allUsers:objectViewer gs://your-frontend-bucket

# set default home page
gsutil web set -m pick_location.html gs://your-frontend-bucket

Once deployed, the map becomes a lightweight client to the full prediction pipeline — calling the Cloud Run ML API with live coordinates and forecast windows for near-real-time risk estimation.

A note on our solution

We chose to embed the map in Looker Studio, then display a table with the results of the latest prediction, fetched from a BigQuery view with the data from the latest max value in column prediction_timestamp_clean, next to it. This way we can trigger the prediction, wait for about a minute (free version refresh interval ≈ 1 min; Pro allows real-time updates) and reload the page to see the predictions in the table.

About

Group project for a data pipeline using CCP, BigQuery and a Leaflet map front-end for real-time ingestions

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 87.9%
  • HTML 9.8%
  • Dockerfile 2.0%
  • Shell 0.3%