-
Notifications
You must be signed in to change notification settings - Fork 5
Upload final model training data #804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b019ad3
d5b50b5
43b601d
9bc6b2f
fe8f802
93d9cce
dd494bf
1b2a69f
ac554d2
4248b1f
370cece
78f1781
c1d2f3a
d9b3e3b
3f5aa58
036ce36
880b08f
55ea677
4505249
b942b36
e6bcfb2
6d85b17
ca631d9
23b6b6d
c4c933e
42eb7d6
91fbedc
172c0ce
71c57c2
9fe0d17
9692b61
6e7c7da
bef997c
96fc296
1d9ec1c
afe1b13
fb748a8
5e6e3fe
a9cdffa
795b64a
3cbb70a
f7314c8
4b34ef4
9858c2f
483613a
1b17f6f
15f7af5
e622c1a
06e2e76
ac8d278
0b290a5
654dc6f
9f479d5
374c9f5
27feb6e
460bb54
1dfe88b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| import functools | ||
|
|
||
| from pyspark.sql.functions import lit | ||
|
|
||
|
|
||
| def model(dbt, session): | ||
| dbt.config( | ||
| materialized="incremental", | ||
| incremental_strategy="insert_overwrite", | ||
| partitioned_by=["assessment_year", "run_id", "meta_township_code"], | ||
| on_schema_change="append_new_columns", | ||
| ) | ||
|
|
||
| # Build the base metadata DataFrame | ||
| base_query = """ | ||
| SELECT | ||
| run_id, | ||
| year, | ||
| assessment_year, | ||
| dvc_md5_training_data | ||
| FROM model.metadata | ||
| WHERE run_type = 'final' | ||
| """ | ||
| metadata_df = session.sql(base_query) | ||
|
|
||
| if dbt.is_incremental: | ||
| # anti-join out any run_ids already in the target | ||
| existing = ( | ||
| session.table(f"{dbt.this.schema}.{dbt.this.identifier}") | ||
| .select("run_id") | ||
| .distinct() | ||
| ) | ||
| metadata_df = metadata_df.join(existing, on="run_id", how="left_anti") | ||
|
Comment on lines
+26
to
+33
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the if statement just part of how these models are supposed to be built? Or do we expect it to not be true under some circumstance?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block is a key part of what makes this an incremental dbt model. Basically, if the model is configured as incremental (which we do in the Happy to talk through this in more detail if it's helpful! But I'd start with reading the dbt docs about incremental models I linked above, since I think those docs do a pretty solid job of explaining it. |
||
|
|
||
| # if there’s nothing new, return an *empty* DataFrame | ||
| if metadata_df.limit(1).count() == 0: | ||
| print(">>> no new run_id found; skipping incremental update") | ||
| # this returns zero rows but preserves the full target schema | ||
| return session.table( | ||
| f"{dbt.this.schema}.{dbt.this.identifier}" | ||
| ).limit(0) | ||
|
|
||
| # Collect remaining metadata | ||
| metadata = metadata_df.toPandas() | ||
|
|
||
| bucket = "ccao-data-dvc-us-east-1" | ||
| all_dfs = [] | ||
|
|
||
| for _, row in metadata.iterrows(): | ||
| run_id = row["run_id"] | ||
| year = int(row["year"]) | ||
| h = row["dvc_md5_training_data"] | ||
|
|
||
| prefix = "" if year <= 2023 else "files/md5/" | ||
| key = f"{prefix}{h[:2]}/{h[2:]}" | ||
| s3p = f"{bucket}/{key}" | ||
|
|
||
| print(f">>> reading all columns for run {run_id!r}") | ||
jeancochrane marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| print(f" → S3 key = {s3p}") | ||
| df = session.read.parquet(f"s3://{s3p}") | ||
|
|
||
| # coerce booleans for mismatched types | ||
| if "ccao_is_active_exe_homeowner" in df.columns: | ||
| df = df.withColumn( | ||
| "ccao_is_active_exe_homeowner", | ||
| df["ccao_is_active_exe_homeowner"].cast("boolean"), | ||
| ) | ||
|
|
||
| # add run_id and assessment_year columns | ||
| df = df.withColumn("run_id", lit(run_id)).withColumn( | ||
| "assessment_year", lit(row["assessment_year"]) | ||
| ) | ||
|
|
||
| all_dfs.append(df) | ||
| print(f"Processed run_id={run_id}, rows={df.count()}") | ||
|
|
||
| # Union all the new runs together | ||
| return functools.reduce( | ||
| lambda x, y: x.unionByName(y, allowMissingColumns=True), all_dfs | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -230,6 +230,19 @@ models: | |
| description: | | ||
| Any notes or caveats associated with the model run | ||
|
|
||
| - name: model.training_data | ||
| description: '{{ doc("table_training_data") }}' | ||
| config: | ||
| tags: | ||
| - load_manual | ||
| tests: | ||
| - unique_combination_of_columns: | ||
| name: model_training_data_unique_card_doc_number_run_id | ||
| combination_of_columns: | ||
| - run_id | ||
| - meta_sale_document_num | ||
| - meta_card_num | ||
Damonamajor marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For better or worse, outliers and cards are trimmed in stage 1 of the pipeline. |
||
|
|
||
| - name: model.vw_pin_shared_input | ||
| description: '{{ doc("view_vw_pin_shared_input") }}' | ||
| config: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.