diff --git a/dbt/models/model/docs.md b/dbt/models/model/docs.md index 6fcfe4457..c920059a7 100644 --- a/dbt/models/model/docs.md +++ b/dbt/models/model/docs.md @@ -173,6 +173,19 @@ Wall time of each stage (train, assess, etc.) for each model run (`run_id`). **Primary Key**: `year`, `run_id` {% enddocs %} +# training_data + +{% docs table_training_data %} + +A table containing the training data from the final model runs. + +We update this table once per assessment year after choosing the final model +runs for the year. As such, only final model run IDs should be present in this +table. + +**Primary Key**: `run_id`, `meta_card_num`, `meta_sale_document_num` +{% enddocs %} + # vw_card_res_input {% docs view_vw_card_res_input %} diff --git a/dbt/models/model/model.training_data.py b/dbt/models/model/model.training_data.py new file mode 100644 index 000000000..7a33de6dd --- /dev/null +++ b/dbt/models/model/model.training_data.py @@ -0,0 +1,80 @@ +import functools + +from pyspark.sql.functions import lit + + +def model(dbt, session): + dbt.config( + materialized="incremental", + incremental_strategy="insert_overwrite", + partitioned_by=["assessment_year", "run_id", "meta_township_code"], + on_schema_change="append_new_columns", + ) + + # Build the base metadata DataFrame + base_query = """ + SELECT + run_id, + year, + assessment_year, + dvc_md5_training_data + FROM model.metadata + WHERE run_type = 'final' + """ + metadata_df = session.sql(base_query) + + if dbt.is_incremental: + # anti-join out any run_ids already in the target + existing = ( + session.table(f"{dbt.this.schema}.{dbt.this.identifier}") + .select("run_id") + .distinct() + ) + metadata_df = metadata_df.join(existing, on="run_id", how="left_anti") + + # if there’s nothing new, return an *empty* DataFrame + if metadata_df.limit(1).count() == 0: + print(">>> no new run_id found; skipping incremental update") + # this returns zero rows but preserves the full target schema + return session.table( + f"{dbt.this.schema}.{dbt.this.identifier}" + ).limit(0) + + # Collect remaining metadata + metadata = metadata_df.toPandas() + + bucket = "ccao-data-dvc-us-east-1" + all_dfs = [] + + for _, row in metadata.iterrows(): + run_id = row["run_id"] + year = int(row["year"]) + h = row["dvc_md5_training_data"] + + prefix = "" if year <= 2023 else "files/md5/" + key = f"{prefix}{h[:2]}/{h[2:]}" + s3p = f"{bucket}/{key}" + + print(f">>> reading all columns for run {run_id!r}") + print(f" → S3 key = {s3p}") + df = session.read.parquet(f"s3://{s3p}") + + # coerce booleans for mismatched types + if "ccao_is_active_exe_homeowner" in df.columns: + df = df.withColumn( + "ccao_is_active_exe_homeowner", + df["ccao_is_active_exe_homeowner"].cast("boolean"), + ) + + # add run_id and assessment_year columns + df = df.withColumn("run_id", lit(run_id)).withColumn( + "assessment_year", lit(row["assessment_year"]) + ) + + all_dfs.append(df) + print(f"Processed run_id={run_id}, rows={df.count()}") + + # Union all the new runs together + return functools.reduce( + lambda x, y: x.unionByName(y, allowMissingColumns=True), all_dfs + ) diff --git a/dbt/models/model/schema.yml b/dbt/models/model/schema.yml index 2dfb4d9a2..e9c4ff364 100644 --- a/dbt/models/model/schema.yml +++ b/dbt/models/model/schema.yml @@ -230,6 +230,19 @@ models: description: | Any notes or caveats associated with the model run + - name: model.training_data + description: '{{ doc("table_training_data") }}' + config: + tags: + - load_manual + tests: + - unique_combination_of_columns: + name: model_training_data_unique_card_doc_number_run_id + combination_of_columns: + - run_id + - meta_sale_document_num + - meta_card_num + - name: model.vw_pin_shared_input description: '{{ doc("view_vw_pin_shared_input") }}' config: