End-to-end pipeline that:
- fetches accident data (brottsplatskartan) and enriches it with weather (open-meteo),
- lands raw newline-delimited json (ndjson) in google cloud storage,
- loads raw data to bigquery and transforms it with dbt,
- trains an ML model on historical accident + weather data and stores a joblib bundle in GCS,
- fetches forecast data via a Cloud functions http proxy,
- serves a Flask prediction api on Cloud run that writes predictions to BigQuery,
- is callable from a clickable leaflet map or other clients.
Predictions are appended to a staging table in bigquery.
- Ingest (python)
fetch_raw_accident_data.py→ pulls accidents and writes ndjson to gcs underaccident/ingest_date=YYYY-MM-DD/…fetch_weather_data.py→ reads the latest accident files from gcs and fetches hourly weather per accident; writes ndjson toweather/ingest_date=YYYY-MM-DD/….- Builds and deploys container images (ingest, dbt, ML) via Cloud Build triggers.
- ingestion scripts are containerized and run as Cloud Run Jobs through Cloud scheduler once a day.
- Land to warehouse (python)
load_gcs_jsonl_to_bq.py→ loads the latest date folder (or a forced date) from gcs into one table per category in BigQuery (raw layer), partitioned by time, on a daily schedule.
- Transformation (dbt)
- Staging models build cleaned, de-duplicated tables (
team_rs_stgdataset), one for each of accident and weather data, ready for modeling. - Dbt transformation from stg to mart joins the two tables and adds calculated columns.
- Dbt transformations are set to a schedule at 2 AM each night.
- Staging models build cleaned, de-duplicated tables (
- Forecast proxy (Cloud functions gen 2)
apps/function/main.pyexposes a thin http wrapper around Open-Meteo to fetch hourly forecasts bylatitude,longitude,starttime,endtime.
- ML service (Cloud run)
apps/ml/src/train.py+apps/ml/src/model.pytrains an xgboost classifier, scales features, and saves{model, scaler, feature_columns}as a joblib bundle togs://$MODEL_BUCKET/models/team_model.joblib.apps/ml/src/app.py(Flask) +apps/ml/src/predict.pyfetches forecasts via the function, aligns features, loads the model bundle, predictspredicted_injury, and appends results to a BigQuery table.apps\ml\src\get_forecast_data.pyfetches weather forecast data fromapps/function/main.pyand transforms it to fit the ML-model.apps\ml\src\get_historical_data.pyreads historical weather and accident data from BigQuery and transforms it fo fit the ML-model.
- Transformations for frontend
apps/dbt/team_roadsaftai_rs/models/mart/mart_predicted_injuries.sqlis triggered at the end of the ML service run.- Transforms dates.
- Adds calculated columns for display in Looker Studio frontend.
- Saves a mart table to BigQuery.
- Frontend Leaflet & Turf map to trigger predictions
apps/frontend/pick_location.htmlis a Leaflet map in an HTML page, using Turf to make sure predictions are triggered only for the Stockholm Region (a GeoJson border sets the limit)- Calls the Flask endpoint
apps/ml/src/app.pywith chosen marker's coordinates and a 24 hr timespan. - Displays the results of the fetch: number of rows written, coordinates. Optional: Add a Looker Studio table connected to
team_rs_mart.mart_latest_predictionto display predictions.
Reference docs:
brottsplatskartan api · open-meteo archive api · bigquery load from gcs · cloud run jobs · cloud scheduler jobs · dbt bigquery setup
- bucket:
accident-raw-data(uniform access, same region as bigquery dataset) - prefixes:
accident/ingest_date=YYYY-MM-DD/accident_<category>.jsonlweather/ingest_date=YYYY-MM-DD/weather_<category>.jsonl
Categories: hit_and_run, single, unspecified, wildlife, wounded.
- dataset:
team_rs_raw - tables created/append-loaded per file:
- accidents:
raw_accident_<category>— partitioned bypubdate_iso8601 - weather:
raw_weather_<category>— partitioned byweather_hour
- accidents:
Note: loading is done with BigQuery’s newline-delimited json loader (ndjson).
Script: apps/ingest/src/fetch_raw_accident_data.py
What it does
- pages through Brottsplatskartan's
/api/eventswith polite backoff and jitter - incremental mode: looks up a watermark in BigQuery (
max(pubdate_iso8601)), backs off by an overlap buffer (e.g. 24h), and fetches only newer rows - min-age gate: skips events newer than
--min-age-days(default 5) to keep accident and weather in sync (open-meteo's archive may lag a few days)
Example (to gcs)
python fetch_raw_accident_data.py \
--area "stockholms län" \
--event-type "trafikolycka, vilt" \
--limit 500 \
--output accident_wildlife.jsonl \
--gcs-bucket accident-data-raw \
--gcs-prefix "accident/ingest_date=$(date -u +%F)" \
--incremental \
--overlap-hours 24 \
--bq-project team-roadsaftai-rs \
--bq-dataset team_rs_raw \
--bq-table raw_accident_wildlife \
--min-age-days 5
--app 'tool-name'Note: The --app parameter is optional but encouraged by the API provider. Use your own app name here.
Script: apps/ingest/src/fetch_weather_data.py
What it does
- discovers the latest accident date folder (or accepts an override uri)
- for each accident row:
- computes
weather_hour(utc truncated to the hour) - calls open-meteo historical /v1/archive for that lat/lon/day and picks the closest hour
- writes one weather record per accident:
weather_id,accident_id,weather_hour, and corresponding weather variables (temp, precip, wind, etc.)
- computes
- outputs ndjson to
weather/ingest_date=YYYY-MM-DD/weather_<category>.jsonl
note on lag: we keep accident + weather in sync by delaying accident ingestion with --min-age-days (default 5).
Script: apps/ingest/src/load_gcs_jsonl_to_bq.py
What it does
- finds the latest
ingest_date=YYYY-MM-DD/underaccident/andweather/(or use--date) - skips empty files (0 bytes)
- submits load jobs (ndjson) into the per-file tables in
team_rs_raw, appending data - ensures partitioning field matches the table (accident:
pubdate_iso8601, weather:weather_hour)
Example
python load_gcs_jsonl_to_bq.py \
--project team-roadsaftai-rs \
--dataset team_rs_raw \
--bucket accident-data-raw \
--accidents-prefix accident/ \
--weather-prefix weather/ \
--location europe-north2Project: apps/dbt/team_roadsaftai_rs/
Datasets
- raw:
team_rs_raw(landing tables) - staging:
team_rs_stg(dbt targetdev_stg) - mart:
team_rs_mart(for visualization)
Models
models/staging/stg_accidents– unions raw accident tables, prioritizes rows with title_type 'personskada', normalizes timestamps/columns, and de-duplicates by a combination ofdescriptionandpubdate_iso8601.models/staging/stg_weather– unions raw weather tables and de-duplicates byaccident_id:- pick the row with highest completeness (most non-null fields),
- if tied, pick the latest
weather_hour.
models/mart/mart_accident_weather– left joins stg tables on stg_accident.id = stg_weather.accident_id, adds calculated columns foris_snow_on_ground,is_winter,is_rush_houranddaypart. Converts nulls to 0 where reasonable.
Run locally (for debugging)
# inside apps/dbt/team_roadsaftai_rs
export DBT_PROFILES_DIR=../../../.github/dbt_profiles
# build staging models into team_rs_stg
dbt build --target dev_stgGoal
Expose a thin http proxy around the Open-Meteo forecast api, so downstream services (e.g., the ml api) can fetch hourly weather by latitude, longitude, and a time window.
Source & entry point
- code:
apps/function/main.py - entry point (function name):
apps(decorated with@functions_framework.http) - runtime: Python 3.11
Deploy
gcloud functions deploy weather-forecast \
--gen2 \
--runtime=python311 \
--region=YOUR-REGION \
--entry-point=apps \
--source=apps/function \
--trigger-http \
--allow-unauthenticatedReplace YOUR-REGION with your standard region (e.g., europe-north2). Make sure the Cloud Functions API is enabled in the project.
API contract (http)
Endpoint
GET / (deployed as weather-forecast):
https://<REGION>-<PROJECT_ID>.cloudfunctions.net/weather-forecast
Query parameters (all required)
latitude— stringified float, e.g."59.33"longitude— stringified float, e.g."18.06"starttime— ISO 8601"YYYY-MM-DDThh:mm"endtime— ISO 8601"YYYY-MM-DDThh:mm"
Responses
-
200 OK— JSON object:{ "json_body": , "query_params": { ...original query string... }, "response": { ...Open-Meteo hourly payload... } }
-
400 Bad Request— JSON with anerrormessage when validation fails (e.g., missing or malformedstarttime/endtime).
Selection of parameters The function selects the relevant weather parameters to fetch from Open-Meteo. List of parameters fetched:
- temperature_2m
- dew_point_2m
- soil_temperature_0_to_7cm
- precipitation
- rain
- snowfall
- snow_depth
- wind_speed_10m
- wind_gusts_10m
- cloud_cover
- relative_humidity_2m
- weather_code
- visibility
Notes
- The function validates the time format and returns 400 if invalid.
- Hourly variables requested from Open-Meteo are defined inside main.py. Keep those aligned with the features used by the ML service.
- The function returns a pass-through of the upstream json under the response key; the ML service reads from that.
Goal
Produce and upload a model bundle {model, scaler, feature_columns} to gs://$MODEL_BUCKET/models/team_model.joblib, based on historical accidents + weather in BigQuery.
Prerequisites
- the staging dataset exists (e.g.,
team_rs_stg) and contains joined historical data. - the service account you use for training can read from BigQuery and write to the model bucket.
- env vars available at runtime:
GOOGLE_CLOUD_PROJECT– your project idMODEL_BUCKET– target bucket for model artifacts
Setup
- Training pipeline:
apps/ml/src/train.py+apps/ml/src/model.py(feature prep, SMOTE, MinMaxScaler, XGBoost; saves model artifacts). - Model I/O helpers:
apps/ml/src/utils/model_io.py(save/upload to GCS). - Preparation of training data:
apps\ml\src\get_historical_data.py(fetches training data → transforms data)
What it does
-
Training ((
train.py↔get_historical_data.py)→model.py):- Reads accident and weather tables from BigQuery staging (
team_rs_stg), - Joins the tables,
- Engineers time features (
day_of_year,year,hour) - Builds binary label
injury, balances classes (SMOTE) - Scales features
- Uses train/test split
- Trains an XGBoost classifier
- Reads accident and weather tables from BigQuery staging (
-
Saves to bucket The model is saved as a joblib bundle
{model, scaler, feature_columns}togs://$MODEL_BUCKET/models/team_model.joblib.
# run from repo root with valid ADC (or GOOGLE_APPLICATION_CREDENTIALS set)
python apps/ml/src/train.pygsutil ls gs://$MODEL_BUCKET/models/team_model.joblib- training job/service account:
roles/bigquery.dataViewer(read training tables)roles/storage.objectAdminon$MODEL_BUCKET(write model bundle)roles/logging.logWriter
- cloud scheduler sender (if using option c):
roles/run.invokeron the ml-train job
Code
- Flask entrypoint:
apps/ml/src/app.py(HTTPGET /→ triggers prediction for the given query params). - Prediction logic:
apps/ml/src/predict.py(fetches transformed forecast data → loads model bundle from GCS → scales → predicts → writes to BigQuery). - Model I/O helpers:
apps/ml/src/utils/model_io.py(load from GCS). - Preparation of prediction data: -
apps\ml\src\get_forecast_data.py(fetches forecast data → transforms data) - Dockerfile:
apps/ml/Dockerfile(container for Cloud Run)
What it does
- ((
app.py↔get_forecast_data.py)→predict.py): Pulls hourly forecast via the Cloud Function (see step 5), reshapes to the trained feature set, loads the model bundle from GCS, generatespredicted_injury, and writes results to BigQuery (in the staging dataset), then triggers dbt transformation for BigQuery mart tableteam_rs_mart.mart_predicted_injuries(see step 8).
Environment
GOOGLE_CLOUD_PROJECT– your GCP project id (used by clients)MODEL_BUCKET– GCS bucket where the model bundle lives (e.g.,team-roadsaftai-re-models)PORT– defaults to8080for Cloud Run
The service relies on Application Default Credentials on Cloud Run. If running locally, set
GOOGLE_APPLICATION_CREDENTIALSto a service-account key with access to BigQuery and the model bucket.
Build and deploy (Cloud Run)
# from repo root – build the image
gcloud builds submit -t gcr.io/$PROJECT_ID/ml-api -f apps/ml/Dockerfile .
# deploy to Cloud Run (adjust region)
gcloud run deploy ml-api \
--image gcr.io/$PROJECT_ID/ml-api \
--region europe-north1 \
--platform managed \
--allow-unauthenticated \
--set-env-vars MODEL_BUCKET=team-roadsaftai-re-models,GOOGLE_CLOUD_PROJECT=$PROJECT_IDLocal run (optional)
docker build -f apps/ml/Dockerfile -t ml-api:local .
docker run --rm -p 8080:8080 \
-e GOOGLE_CLOUD_PROJECT=your-project \
-e MODEL_BUCKET=your-models-bucket \
ml-api:local
**Local run (optional)**API (http)
GET / (Cloud Run service URL)
Query params (all optional – sensible defaults exist in get_forecast_data):
latitude— stringified float (e.g., "59.33")longitude— stringified float (e.g., "18.06")starttime— "YYYY-MM-DDThh:mm"endtime— "YYYY-MM-DDThh:mm"
Behavior
- fetches hourly forecast via the function (step 5)
- aligns columns to the trained feature set
- loads model bundle from
gs://$MODEL_BUCKET/models/team_model.joblib - writes predictions to BigQuery (staging dataset)
- returns a short JSON payload (stringified) from the Flask handler
Quick test
curl -G "https://YOUR-CLOUD-RUN-URL/" \
--data-urlencode "latitude=59.33" \
--data-urlencode "longitude=18.06" \
--data-urlencode "starttime=2025-03-14T15:00" \
--data-urlencode "endtime=2025-03-14T18:00"Notes
- Make sure a model bundle exists in
gs://$MODEL_BUCKET/models/team_model.joblib(run training once before calling the API). - The prediction write uses pandas → BigQuery; ensure the destination dataset exists and the service account has write permissions.
- Keep feature names consistent between historical training data and the forecast payload (add column renames where necessary).
Goal
Convert and enrich the ML prediction output from the staging dataset (team_rs_stg.predicted_injuries) into a clean, analytics-ready mart table with unified timestamps and derived date fields.
Project & model
- dbt project:
apps/dbt/team_roadsaftai_rs/ - model:
models/mart/mart_predicted_injuries.sql - runner script:
apps/dbt/team_roadsaftai_rs/run_dbt.sh(Cloud Run Job entrypoint) - tests:
models/mart/mart_predicted_injuries.yml(accepted ranges + not null) - output dataset:
team_rs_mart
What it does
- Reads from the staging source
team_rs_stg.predicted_injuriesvia source() and casts all weather features to their analytics types (e.g., temperature_2m, precipitation, wind_speed_10m, etc.). - Cleans and standardizes timestamps (ISO-8601, epoch, date-only formats).
- Outputs a clustered, partitioned table,
team_rs_mart.mart_predicted_injuries: partitioned by event_date, clustered by weather_code and predicted_injury. New prediction data is appended. A view in BigQuery can be used to always hold the latest prediction data for visualization and monitoring.
How it runs
- The model runs as a Cloud Run Job containerized in the dbt image:
gcr.io/$PROJECT_ID/team-rs-dbt - Cloud Run Job command triggers
run_dbt.sh:bash run /app/run_dbt.sh - It is triggered automatically each time new prediction rows are written to
team_rs_stg.predicted_injuries(from the ML API,apps.py) or can be scheduled via Cloud Scheduler. - environment (examples): BQ_PROJECT, BQ_LOCATION, DBT_PROJECT_DIR=/app, DBT_PROFILES_DIR=/app/profiles.
Downstream
A simple BigQuery view selects the latest batch by max prediction_timestamp_clean (or prediction_ts) for dashboards in Looker Studio.
Goal
Provide an interactive, browser-based interface where users can click a location on a map of Stockholm Region to trigger real-time ML predictions for weather-related injury risk.
Source & structure
apps/frontend/pick_location.html– standalone HTML + JavaScript file (using Leaflet and Turf.js)apps/frontend/stockholms_lan.geojson– polygon defining the Stockholm Region boundary, used to restrict valid clicks
What it does
- Displays an interactive Leaflet map centered on the Stockholm region.
- Loads the region boundary from
stockholms_lan.geojsonand visually outlines it on the map. - When a user clicks inside the region, a marker is placed at that location.
- The click activates a “Predict risk for injuries here” button.
- When pressed, the app:
- Captures the clicked coordinates (latitude, longitude),
- Computes the local start hour and the end hour (24-hour window ahead, in Europe/Stockholm time),
- Calls the Flask ML prediction API (Cloud Run, step 6) with those parameters,
- Displays the API response directly on the page (JSON, HTML, or text “receipt”).
- The frontend can be hosted from a public GCS bucket or integrated into another static hosting service.
Example
Open the deployed HTML file (e.g. from your GCS bucket):
https://storage.googleapis.com/<YOUR_BUCKET_NAME>/pick_location.html
Click anywhere inside the red boundary of Stockholm County → press the Predict button → view the model’s response inline.
Tech stack
- Leaflet.js – map rendering and interaction
- Turf.js – geographic point-in-polygon validation
- Vanilla JavaScript – API call construction and dynamic DOM updates
- OpenStreetMap tiles – base map layer
Deployment
# upload files to a public GCS bucket
gsutil cp pick_location.html stockholms_lan.geojson gs://your-frontend-bucket
# make them public (if organization policy allows)
gsutil iam ch allUsers:objectViewer gs://your-frontend-bucket
# set default home page
gsutil web set -m pick_location.html gs://your-frontend-bucketOnce deployed, the map becomes a lightweight client to the full prediction pipeline — calling the Cloud Run ML API with live coordinates and forecast windows for near-real-time risk estimation.
A note on our solution
We chose to embed the map in Looker Studio, then display a table with the results of the latest prediction, fetched from a BigQuery view with the data from the latest max value in column prediction_timestamp_clean, next to it. This way we can trigger the prediction, wait for about a minute (free version refresh interval ≈ 1 min; Pro allows real-time updates) and reload the page to see the predictions in the table.