|
5 | 5 | import ast |
6 | 6 | import dlt |
7 | 7 | from pyspark.sql import DataFrame |
8 | | -from pyspark.sql.functions import expr |
| 8 | +from pyspark.sql.functions import expr, struct |
9 | 9 | from pyspark.sql.types import StructType, StructField |
10 | 10 | from src.dataflow_spec import BronzeDataflowSpec, SilverDataflowSpec, DataflowSpecUtils |
11 | 11 | from src.pipeline_writers import AppendFlowWriter, DLTSinkWriter |
@@ -315,9 +315,9 @@ def read_bronze(self) -> DataFrame: |
315 | 315 | if bronze_dataflow_spec.sourceFormat == "cloudFiles": |
316 | 316 | input_df = pipeline_reader.read_dlt_cloud_files() |
317 | 317 | elif bronze_dataflow_spec.sourceFormat == "delta" or bronze_dataflow_spec.sourceFormat == "snapshot": |
318 | | - return pipeline_reader.read_dlt_delta() |
| 318 | + input_df = pipeline_reader.read_dlt_delta() |
319 | 319 | elif bronze_dataflow_spec.sourceFormat == "eventhub" or bronze_dataflow_spec.sourceFormat == "kafka": |
320 | | - return pipeline_reader.read_kafka() |
| 320 | + input_df = pipeline_reader.read_kafka() |
321 | 321 | else: |
322 | 322 | raise Exception(f"{bronze_dataflow_spec.sourceFormat} source format not supported") |
323 | 323 | return self.apply_custom_transform_fun(input_df) |
@@ -630,11 +630,18 @@ def cdc_apply_changes(self): |
630 | 630 | target_table = ( |
631 | 631 | f"{target_cl_name}{target_db_name}.{target_table_name}" |
632 | 632 | ) |
| 633 | + |
| 634 | + # Handle comma-separated sequence columns using struct |
| 635 | + sequence_by = cdc_apply_changes.sequence_by |
| 636 | + if ',' in sequence_by: |
| 637 | + sequence_cols = [col.strip() for col in sequence_by.split(',')] |
| 638 | + sequence_by = struct(*sequence_cols) # Use struct() from pyspark.sql.functions |
| 639 | + |
633 | 640 | dlt.create_auto_cdc_flow( |
634 | 641 | target=target_table, |
635 | 642 | source=self.view_name, |
636 | 643 | keys=cdc_apply_changes.keys, |
637 | | - sequence_by=cdc_apply_changes.sequence_by, |
| 644 | + sequence_by=sequence_by, |
638 | 645 | where=cdc_apply_changes.where, |
639 | 646 | ignore_null_updates=cdc_apply_changes.ignore_null_updates, |
640 | 647 | apply_as_deletes=apply_as_deletes, |
@@ -673,8 +680,17 @@ def modify_schema_for_cdc_changes(self, cdc_apply_changes): |
673 | 680 | for field in struct_schema.fields: |
674 | 681 | if field.name not in cdc_apply_changes.except_column_list: |
675 | 682 | modified_schema.add(field) |
676 | | - if field.name == cdc_apply_changes.sequence_by: |
677 | | - sequenced_by_data_type = field.dataType |
| 683 | + # For SCD Type 2, get data type of first sequence column |
| 684 | + sequence_by = cdc_apply_changes.sequence_by.strip() |
| 685 | + if ',' not in sequence_by: |
| 686 | + # Single column sequence |
| 687 | + if field.name == sequence_by: |
| 688 | + sequenced_by_data_type = field.dataType |
| 689 | + else: |
| 690 | + # Multiple column sequence - use first column's type |
| 691 | + first_sequence_col = sequence_by.split(',')[0].strip() |
| 692 | + if field.name == first_sequence_col: |
| 693 | + sequenced_by_data_type = field.dataType |
678 | 694 | struct_schema = modified_schema |
679 | 695 | else: |
680 | 696 | raise Exception(f"Schema is None for {self.dataflowSpec} for cdc_apply_changes! ") |
|
0 commit comments