Skip to content

Commit 63bd65b

Browse files
Damonamajorwrridgewayjeancochrane
authored
Upload final model training data (#804)
* land_nbhd_rate_unique_by_town_nbhd_class_and_year * Initial draft * update docs * Add run_id to query * add table_ * Switch to training data * Update schema.yml * Update model-training_data.R * Initial draft * update docs * Add run_id to query * add table_ * Switch to training data * Update schema.yml * Update model-training_data.R * Remove unintended commit * Correct unique columns * Error if > 2 * Update etl/scripts-ccao-data-warehouse-us-east-1/model/model-training_data.R Co-authored-by: William Ridgeway <10358980+wrridgeway@users.noreply.github.com> * Update etl/scripts-ccao-data-warehouse-us-east-1/model/model-training_data.R Co-authored-by: William Ridgeway <10358980+wrridgeway@users.noreply.github.com> * updated py script * styler * test2 * Fix ref * Updated push * updates * Another attempt * Resolve SSL errors and use Spark DataFrames for `model.training_data` (#821) * Remove default file_format * Add unique key * possible functional version * Functional version * Remove old script * Remove unique_key * Commenting * Update docs.md * Update dbt/models/model/schema.yml Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com> * Update dbt/models/model/model.training_data.py Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com> * Update dbt/models/model/model.training_data.py Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com> * lintr * Update dbt/models/model/docs.md Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com> * Update dbt/models/model/model.training_data.py Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com> * Update dbt/models/model/model.training_data.py Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com> * Update dbt/models/model/model.training_data.py Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com> * update to training_data * update to training_data * Update schema * update schema * push attempt * update model * update naming * update naming * Update dbt/models/model/schema.yml Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com> * Update dbt/models/model/schema.yml Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com> --------- Co-authored-by: William Ridgeway <10358980+wrridgeway@users.noreply.github.com> Co-authored-by: Jean Cochrane <jeancochrane@users.noreply.github.com>
1 parent 4ad229f commit 63bd65b

File tree

3 files changed

+106
-0
lines changed

3 files changed

+106
-0
lines changed

dbt/models/model/docs.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,19 @@ Wall time of each stage (train, assess, etc.) for each model run (`run_id`).
173173
**Primary Key**: `year`, `run_id`
174174
{% enddocs %}
175175

176+
# training_data
177+
178+
{% docs table_training_data %}
179+
180+
A table containing the training data from the final model runs.
181+
182+
We update this table once per assessment year after choosing the final model
183+
runs for the year. As such, only final model run IDs should be present in this
184+
table.
185+
186+
**Primary Key**: `run_id`, `meta_card_num`, `meta_sale_document_num`
187+
{% enddocs %}
188+
176189
# vw_card_res_input
177190

178191
{% docs view_vw_card_res_input %}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import functools
2+
3+
from pyspark.sql.functions import lit
4+
5+
6+
def model(dbt, session):
7+
dbt.config(
8+
materialized="incremental",
9+
incremental_strategy="insert_overwrite",
10+
partitioned_by=["assessment_year", "run_id", "meta_township_code"],
11+
on_schema_change="append_new_columns",
12+
)
13+
14+
# Build the base metadata DataFrame
15+
base_query = """
16+
SELECT
17+
run_id,
18+
year,
19+
assessment_year,
20+
dvc_md5_training_data
21+
FROM model.metadata
22+
WHERE run_type = 'final'
23+
"""
24+
metadata_df = session.sql(base_query)
25+
26+
if dbt.is_incremental:
27+
# anti-join out any run_ids already in the target
28+
existing = (
29+
session.table(f"{dbt.this.schema}.{dbt.this.identifier}")
30+
.select("run_id")
31+
.distinct()
32+
)
33+
metadata_df = metadata_df.join(existing, on="run_id", how="left_anti")
34+
35+
# if there’s nothing new, return an *empty* DataFrame
36+
if metadata_df.limit(1).count() == 0:
37+
print(">>> no new run_id found; skipping incremental update")
38+
# this returns zero rows but preserves the full target schema
39+
return session.table(
40+
f"{dbt.this.schema}.{dbt.this.identifier}"
41+
).limit(0)
42+
43+
# Collect remaining metadata
44+
metadata = metadata_df.toPandas()
45+
46+
bucket = "ccao-data-dvc-us-east-1"
47+
all_dfs = []
48+
49+
for _, row in metadata.iterrows():
50+
run_id = row["run_id"]
51+
year = int(row["year"])
52+
h = row["dvc_md5_training_data"]
53+
54+
prefix = "" if year <= 2023 else "files/md5/"
55+
key = f"{prefix}{h[:2]}/{h[2:]}"
56+
s3p = f"{bucket}/{key}"
57+
58+
print(f">>> reading all columns for run {run_id!r}")
59+
print(f" → S3 key = {s3p}")
60+
df = session.read.parquet(f"s3://{s3p}")
61+
62+
# coerce booleans for mismatched types
63+
if "ccao_is_active_exe_homeowner" in df.columns:
64+
df = df.withColumn(
65+
"ccao_is_active_exe_homeowner",
66+
df["ccao_is_active_exe_homeowner"].cast("boolean"),
67+
)
68+
69+
# add run_id and assessment_year columns
70+
df = df.withColumn("run_id", lit(run_id)).withColumn(
71+
"assessment_year", lit(row["assessment_year"])
72+
)
73+
74+
all_dfs.append(df)
75+
print(f"Processed run_id={run_id}, rows={df.count()}")
76+
77+
# Union all the new runs together
78+
return functools.reduce(
79+
lambda x, y: x.unionByName(y, allowMissingColumns=True), all_dfs
80+
)

dbt/models/model/schema.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,19 @@ models:
230230
description: |
231231
Any notes or caveats associated with the model run
232232
233+
- name: model.training_data
234+
description: '{{ doc("table_training_data") }}'
235+
config:
236+
tags:
237+
- load_manual
238+
tests:
239+
- unique_combination_of_columns:
240+
name: model_training_data_unique_card_doc_number_run_id
241+
combination_of_columns:
242+
- run_id
243+
- meta_sale_document_num
244+
- meta_card_num
245+
233246
- name: model.vw_pin_shared_input
234247
description: '{{ doc("view_vw_pin_shared_input") }}'
235248
config:

0 commit comments

Comments
 (0)