@@ -664,19 +664,14 @@ def start_bundle(self):
664664 self .pending_jobs = []
665665 self .schema_cache = {}
666666
667- def process (
668- self ,
669- element ,
670- load_job_name_prefix ,
671- pane_info = beam .DoFn .PaneInfoParam ,
672- * schema_side_inputs ):
667+ def process (self , element , load_job_name_prefix , * schema_side_inputs ):
673668 # Each load job is assumed to have files respecting these constraints:
674669 # 1. Total size of all files < 15 TB (Max size for load jobs)
675670 # 2. Total no. of files in a single load job < 10,000
676671 # This assumption means that there will always be a single load job
677672 # triggered for each partition of files.
678673 destination = element [0 ]
679- partition_key , files = element [1 ]
674+ partition_key , files , pane_index = element [1 ]
680675
681676 if callable (self .schema ):
682677 schema = self .schema (destination , * schema_side_inputs )
@@ -705,7 +700,7 @@ def process(
705700 table_reference .datasetId ,
706701 table_reference .tableId ))
707702 job_name = '%s_%s_pane%s_partition%s' % (
708- load_job_name_prefix , destination_hash , pane_info . index , partition_key )
703+ load_job_name_prefix , destination_hash , pane_index , partition_key )
709704 _LOGGER .info ('Load job has %s files. Job name is %s.' , len (files ), job_name )
710705
711706 create_disposition = self .create_disposition
@@ -1104,6 +1099,8 @@ def _load_data(
11041099 # Load data using temp tables
11051100 trigger_loads_outputs = (
11061101 partitions_using_temp_tables
1102+ | "KeyByPaneIndexWithTempTables" >> beam .ParDo (KeyByPaneIndex ())
1103+ | "ReshuffleBeforeLoadWithTempTables" >> beam .Reshuffle ()
11071104 | "TriggerLoadJobsWithTempTables" >> beam .ParDo (
11081105 TriggerLoadJobs (
11091106 schema = self .schema ,
@@ -1186,6 +1183,8 @@ def _load_data(
11861183 # Load data directly to destination table
11871184 destination_load_job_ids_pc = (
11881185 partitions_direct_to_destination
1186+ | "KeyByPaneIndexWithoutTempTables" >> beam .ParDo (KeyByPaneIndex ())
1187+ | "ReshuffleBeforeLoadWithoutTempTables" >> beam .Reshuffle ()
11891188 | "TriggerLoadJobsWithoutTempTables" >> beam .ParDo (
11901189 TriggerLoadJobs (
11911190 schema = self .schema ,
@@ -1313,3 +1312,9 @@ def expand(self, pcoll):
13131312 self .DESTINATION_FILE_PAIRS : all_destination_file_pairs_pc ,
13141313 self .DESTINATION_COPY_JOBID_PAIRS : destination_copy_job_ids_pc ,
13151314 }
1315+
1316+
1317+ class KeyByPaneIndex (beam .DoFn ):
1318+ def process (self , element , pane_info = beam .DoFn .PaneInfoParam ):
1319+ destination , (partition_key , files ) = element
1320+ return [(destination , (partition_key , files , pane_info .index ))]
0 commit comments