@@ -64,35 +64,48 @@ def load_and_merge!(rows:)
6464 load_job . wait_until_done!
6565 raise load_job . error if load_job . failed?
6666
67- total_rows = load_job . output_rows
68- @logger . info ( "Loaded #{ total_rows } rows to staging table." )
67+ # Use job output rows, fallback to input rows count if zero (sometimes stats are delayed)
68+ total_rows_loaded = load_job . output_rows . to_i
69+ total_rows_loaded = rows . length if total_rows_loaded . zero?
70+
71+ @logger . info ( "Load job finished. output_rows: #{ load_job . output_rows } , rows.length: #{ rows . length } . Using #{ total_rows_loaded } as reference." )
6972
73+ # Fetch a fresh table reference to avoid caching issues
7074 target_table = dataset . table ( table_id )
7175 result = { inserted_count : 0 , updated_count : 0 }
7276
7377 if target_table . nil?
74- @logger . info ( "Target table #{ qualified_table_name ( table_id ) } not found. Creating it for the first time. " )
78+ @logger . info ( "Target table #{ qualified_table_name ( table_id ) } does not exist. Path: :create_table " )
7579 copy_job = @bigquery . copy_job qualified_table_name ( staging_table_id ) , qualified_table_name ( table_id ) , write : "truncate"
7680 copy_job . wait_until_done!
7781 raise copy_job . error if copy_job . failed?
78- @logger . info ( "Target table created successfully ." )
82+ @logger . info ( "Target table created via copy_job ." )
7983
8084 result [ :mode ] = :create_table
81- result [ :inserted_count ] = total_rows
85+ result [ :inserted_count ] = total_rows_loaded
8286 else
87+ @logger . info ( "Target table #{ qualified_table_name ( table_id ) } exists. Path: :merge" )
8388 merge_sql = build_merge_query ( rows . first . keys )
8489 query_job = @bigquery . query_job ( merge_sql )
8590 query_job . wait_until_done!
8691 raise query_job . error if query_job . failed?
8792
88- inserted_count = query_job . respond_to? ( :dml_stats ) && query_job . dml_stats ? query_job . dml_stats . inserted_row_count . to_i : 0
89- updated_count = query_job . respond_to? ( :dml_stats ) && query_job . dml_stats ? query_job . dml_stats . updated_row_count . to_i : 0
90- @logger . info ( "MERGE operation completed. Rows inserted: #{ inserted_count } , Rows updated: #{ updated_count } ." )
93+ # Capture DML stats
94+ if query_job . respond_to? ( :dml_stats ) && query_job . dml_stats
95+ result [ :inserted_count ] = query_job . dml_stats . inserted_row_count . to_i
96+ result [ :updated_count ] = query_job . dml_stats . updated_row_count . to_i
97+ @logger . info ( "DML Stats - Inserted: #{ result [ :inserted_count ] } , Updated: #{ result [ :updated_count ] } " )
98+ else
99+ # Fallback: if dml_stats is nil, try to get from raw statistics or use total affected
100+ affected = query_job . num_dml_affected_rows . to_i
101+ @logger . warn ( "DML Stats not found. num_dml_affected_rows: #{ affected } " )
102+ result [ :inserted_count ] = affected
103+ end
91104
92105 result [ :mode ] = :merge
93- result [ :inserted_count ] = inserted_count
94- result [ :updated_count ] = updated_count
95106 end
107+
108+ @logger . info ( "Final result for notification: #{ result . inspect } " )
96109 result
97110 ensure
98111 temp_file &.close!
0 commit comments