-
Notifications
You must be signed in to change notification settings - Fork 5
Upload final model training data #804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 43 commits
b019ad3
d5b50b5
43b601d
9bc6b2f
fe8f802
93d9cce
dd494bf
1b2a69f
ac554d2
4248b1f
370cece
78f1781
c1d2f3a
d9b3e3b
3f5aa58
036ce36
880b08f
55ea677
4505249
b942b36
e6bcfb2
6d85b17
ca631d9
23b6b6d
c4c933e
42eb7d6
91fbedc
172c0ce
71c57c2
9fe0d17
9692b61
6e7c7da
bef997c
96fc296
1d9ec1c
afe1b13
fb748a8
5e6e3fe
a9cdffa
795b64a
3cbb70a
f7314c8
4b34ef4
9858c2f
483613a
1b17f6f
15f7af5
e622c1a
06e2e76
ac8d278
0b290a5
654dc6f
9f479d5
374c9f5
27feb6e
460bb54
1dfe88b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| import functools | ||
|
|
||
| from pyspark.sql.functions import lit | ||
|
|
||
|
|
||
| def model(dbt, session): | ||
| dbt.config( | ||
| materialized="incremental", | ||
| incremental_strategy="insert_overwrite", | ||
| partitions_by=[ | ||
| {"field": "year", "data_type": "string"}, | ||
| {"field": "run_id", "data_type": "string"}, | ||
| {"field": "meta_township_code", "data_type": "string"}, | ||
| ], | ||
Damonamajor marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| on_schema_change="append_new_columns", | ||
| ) | ||
|
|
||
| # Build the base metadata DataFrame | ||
| base_query = """ | ||
| SELECT | ||
| CAST(run_id AS STRING) AS run_id, | ||
| CAST(year AS STRING) AS year, | ||
| CAST(dvc_md5_assessment_data AS STRING) AS dvc_md5_assessment_data | ||
Damonamajor marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| FROM model.metadata | ||
| WHERE run_type = 'final' | ||
| """ | ||
| metadata_df = session.sql(base_query) | ||
|
|
||
| if dbt.is_incremental: | ||
| # anti-join out any run_ids already in the target | ||
| existing = ( | ||
| session.table(f"{dbt.this.schema}.{dbt.this.identifier}") | ||
| .select("run_id") | ||
| .distinct() | ||
| ) | ||
| metadata_df = metadata_df.join(existing, on="run_id", how="left_anti") | ||
|
Comment on lines
+26
to
+33
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the if statement just part of how these models are supposed to be built? Or do we expect it to not be true under some circumstance?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block is a key part of what makes this an incremental dbt model. Basically, if the model is configured as incremental (which we do in the Happy to talk through this in more detail if it's helpful! But I'd start with reading the dbt docs about incremental models I linked above, since I think those docs do a pretty solid job of explaining it. |
||
|
|
||
| # if there’s nothing new, return an *empty* DataFrame | ||
| if metadata_df.limit(1).count() == 0: | ||
| print(">>> no new run_id found; skipping incremental update") | ||
| # this returns zero rows but preserves the full target schema | ||
| return session.table( | ||
| f"{dbt.this.schema}.{dbt.this.identifier}" | ||
| ).limit(0) | ||
|
|
||
| # Collect remaining metadata | ||
| metadata = metadata_df.toPandas() | ||
|
|
||
| bucket = "ccao-data-dvc-us-east-1" | ||
| all_dfs = [] | ||
|
|
||
| for _, row in metadata.iterrows(): | ||
| run_id = row["run_id"] | ||
| year = int(row["year"]) | ||
| h = row["dvc_md5_assessment_data"] | ||
|
|
||
| prefix = "" if year <= 2023 else "files/md5/" | ||
| key = f"{prefix}{h[:2]}/{h[2:]}" | ||
| s3p = f"{bucket}/{key}" | ||
|
|
||
| print(f">>> reading all columns for run {run_id!r}") | ||
jeancochrane marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| print(f" → S3 key = {s3p}") | ||
| df = session.read.parquet(f"s3://{s3p}") | ||
|
|
||
| # coerce booleans for mismatched types | ||
| if "ccao_is_active_exe_homeowner" in df.columns: | ||
| df = df.withColumn( | ||
| "ccao_is_active_exe_homeowner", | ||
| df["ccao_is_active_exe_homeowner"].cast("boolean"), | ||
| ) | ||
|
|
||
| # add run_id column | ||
| df = df.withColumn("run_id", lit(run_id)) | ||
Damonamajor marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| all_dfs.append(df) | ||
| print(f"Processed run_id={run_id}, rows={df.count()}") | ||
|
|
||
| # Union all the new runs together | ||
| return functools.reduce( | ||
| lambda x, y: x.unionByName(y, allowMissingColumns=True), all_dfs | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.