Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions lib/airbnb_payous/bigquery_gateway.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,35 +64,48 @@ def load_and_merge!(rows:)
load_job.wait_until_done!
raise load_job.error if load_job.failed?

total_rows = load_job.output_rows
@logger.info("Loaded #{total_rows} rows to staging table.")
# Use job output rows, fallback to input rows count if zero (sometimes stats are delayed)
total_rows_loaded = load_job.output_rows.to_i
total_rows_loaded = rows.length if total_rows_loaded.zero?

@logger.info("Load job finished. output_rows: #{load_job.output_rows}, rows.length: #{rows.length}. Using #{total_rows_loaded} as reference.")

# Fetch a fresh table reference to avoid caching issues
target_table = dataset.table(table_id)
result = { inserted_count: 0, updated_count: 0 }

if target_table.nil?
@logger.info("Target table #{qualified_table_name(table_id)} not found. Creating it for the first time.")
@logger.info("Target table #{qualified_table_name(table_id)} does not exist. Path: :create_table")
copy_job = @bigquery.copy_job qualified_table_name(staging_table_id), qualified_table_name(table_id), write: "truncate"
copy_job.wait_until_done!
raise copy_job.error if copy_job.failed?
@logger.info("Target table created successfully.")
@logger.info("Target table created via copy_job.")

result[:mode] = :create_table
result[:inserted_count] = total_rows
result[:inserted_count] = total_rows_loaded
else
@logger.info("Target table #{qualified_table_name(table_id)} exists. Path: :merge")
merge_sql = build_merge_query(rows.first.keys)
query_job = @bigquery.query_job(merge_sql)
query_job.wait_until_done!
raise query_job.error if query_job.failed?

inserted_count = query_job.respond_to?(:dml_stats) && query_job.dml_stats ? query_job.dml_stats.inserted_row_count.to_i : 0
updated_count = query_job.respond_to?(:dml_stats) && query_job.dml_stats ? query_job.dml_stats.updated_row_count.to_i : 0
@logger.info("MERGE operation completed. Rows inserted: #{inserted_count}, Rows updated: #{updated_count}.")
# Capture DML stats
if query_job.respond_to?(:dml_stats) && query_job.dml_stats
result[:inserted_count] = query_job.dml_stats.inserted_row_count.to_i
result[:updated_count] = query_job.dml_stats.updated_row_count.to_i
@logger.info("DML Stats - Inserted: #{result[:inserted_count]}, Updated: #{result[:updated_count]}")
else
# Fallback: if dml_stats is nil, try to get from raw statistics or use total affected
affected = query_job.num_dml_affected_rows.to_i
@logger.warn("DML Stats not found. num_dml_affected_rows: #{affected}")
result[:inserted_count] = affected
end

result[:mode] = :merge
result[:inserted_count] = inserted_count
result[:updated_count] = updated_count
end

@logger.info("Final result for notification: #{result.inspect}")
result
ensure
temp_file&.close!
Expand Down
4 changes: 4 additions & 0 deletions test/bigquery_gateway_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def dml_stats
@dml_stats
end

def num_dml_affected_rows
@dml_stats.inserted_row_count + @dml_stats.updated_row_count
end

def error; nil; end
end

Expand Down
Loading