-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathmodel.training_data.py
More file actions
80 lines (65 loc) · 2.47 KB
/
model.training_data.py
File metadata and controls
80 lines (65 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import functools
from pyspark.sql.functions import lit
def model(dbt, session):
dbt.config(
materialized="incremental",
incremental_strategy="insert_overwrite",
partitioned_by=["assessment_year", "run_id", "meta_township_code"],
on_schema_change="append_new_columns",
)
# Build the base metadata DataFrame
base_query = """
SELECT
run_id,
year,
assessment_year,
dvc_md5_training_data
FROM model.metadata
WHERE run_type = 'final'
"""
metadata_df = session.sql(base_query)
if dbt.is_incremental:
# anti-join out any run_ids already in the target
existing = (
session.table(f"{dbt.this.schema}.{dbt.this.identifier}")
.select("run_id")
.distinct()
)
metadata_df = metadata_df.join(existing, on="run_id", how="left_anti")
# if there’s nothing new, return an *empty* DataFrame
if metadata_df.limit(1).count() == 0:
print(">>> no new run_id found; skipping incremental update")
# this returns zero rows but preserves the full target schema
return session.table(
f"{dbt.this.schema}.{dbt.this.identifier}"
).limit(0)
# Collect remaining metadata
metadata = metadata_df.toPandas()
bucket = "ccao-data-dvc-us-east-1"
all_dfs = []
for _, row in metadata.iterrows():
run_id = row["run_id"]
year = int(row["year"])
h = row["dvc_md5_training_data"]
prefix = "" if year <= 2023 else "files/md5/"
key = f"{prefix}{h[:2]}/{h[2:]}"
s3p = f"{bucket}/{key}"
print(f">>> reading all columns for run {run_id!r}")
print(f" → S3 key = {s3p}")
df = session.read.parquet(f"s3://{s3p}")
# coerce booleans for mismatched types
if "ccao_is_active_exe_homeowner" in df.columns:
df = df.withColumn(
"ccao_is_active_exe_homeowner",
df["ccao_is_active_exe_homeowner"].cast("boolean"),
)
# add run_id and assessment_year columns
df = df.withColumn("run_id", lit(run_id)).withColumn(
"assessment_year", lit(row["assessment_year"])
)
all_dfs.append(df)
print(f"Processed run_id={run_id}, rows={df.count()}")
# Union all the new runs together
return functools.reduce(
lambda x, y: x.unionByName(y, allowMissingColumns=True), all_dfs
)