Skip to content

Commit 9ee58bd

Browse files
authored
Merge pull request #58 from koichiro/feature/fix-dml-stat
Fix dml_stat error.
2 parents c59a9f6 + 2f6872e commit 9ee58bd

2 files changed

Lines changed: 109 additions & 16 deletions

File tree

lib/airbnb_payous/bigquery_gateway.rb

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,18 +92,7 @@ def load_and_merge!(rows:)
9292
query_job.wait_until_done!
9393
raise query_job.error if query_job.failed?
9494

95-
# Capture DML stats with robust fallbacks
96-
if query_job.dml_stats
97-
result[:inserted_count] = query_job.dml_stats.inserted_row_count.to_i
98-
result[:updated_count] = query_job.dml_stats.updated_row_count.to_i
99-
@logger.info("DML Stats - Inserted: #{result[:inserted_count]}, Updated: #{result[:updated_count]}")
100-
elsif (affected = query_job.num_dml_affected_rows.to_i) >= 0
101-
# For our current MERGE query (which only has INSERT), all affected rows are inserts
102-
result[:inserted_count] = affected
103-
@logger.info("DML Stats not found, but num_dml_affected_rows is #{affected}. Assuming all are inserts.")
104-
else
105-
@logger.warn("No DML statistics available for query job #{query_job.job_id}.")
106-
end
95+
result.merge!(extract_dml_counts(query_job))
10796

10897
result[:mode] = :merge
10998
end
@@ -172,6 +161,38 @@ def build_merge_query(columns)
172161
SQL
173162
end
174163

164+
def extract_dml_counts(query_job)
165+
inserted_count = extract_query_job_metric(query_job, :inserted_row_count)
166+
updated_count = extract_query_job_metric(query_job, :updated_row_count)
167+
168+
if inserted_count || updated_count
169+
inserted_count ||= 0
170+
updated_count ||= 0
171+
@logger.info("DML row counts - Inserted: #{inserted_count}, Updated: #{updated_count}")
172+
return { inserted_count:, updated_count: }
173+
end
174+
175+
affected = extract_query_job_metric(query_job, :num_dml_affected_rows)
176+
if affected
177+
# Current MERGE only inserts new rows, so affected rows can be treated as inserts.
178+
@logger.info("DML row counts unavailable; using num_dml_affected_rows=#{affected} as inserted count.")
179+
return { inserted_count: affected, updated_count: 0 }
180+
end
181+
182+
@logger.warn("No DML statistics available for query job #{query_job.job_id}.")
183+
{ inserted_count: 0, updated_count: 0 }
184+
end
185+
186+
def extract_query_job_metric(query_job, method_name)
187+
return nil unless query_job.respond_to?(method_name)
188+
189+
value = query_job.public_send(method_name)
190+
value.nil? ? nil : value.to_i
191+
rescue StandardError => e
192+
@logger.warn("Failed to read #{method_name} from query job #{query_job.job_id}: #{e.message}")
193+
nil
194+
end
195+
175196
def delete_staging_table(dataset)
176197
return if dataset.nil?
177198

test/bigquery_gateway_test.rb

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,20 +91,49 @@ def error; nil; end
9191
class FakeQueryJob < FakeLoadJob
9292
attr_reader :sql
9393

94-
def initialize(inserted: 1, updated: 0)
94+
def initialize(inserted: 1, updated: 0, supports_dml_stats: true, supports_row_counts: true, supports_affected_rows: true)
9595
super()
96+
@inserted = inserted
97+
@updated = updated
98+
@supports_dml_stats = supports_dml_stats
99+
@supports_row_counts = supports_row_counts
100+
@supports_affected_rows = supports_affected_rows
96101
@dml_stats = Struct.new(:inserted_row_count, :updated_row_count).new(inserted, updated)
97102
end
98103

99104
def dml_stats
105+
raise NoMethodError, "undefined method 'dml_stats'" unless @supports_dml_stats
106+
100107
@dml_stats
101108
end
102109

103110
def num_dml_affected_rows
104-
@dml_stats.inserted_row_count + @dml_stats.updated_row_count
111+
raise NoMethodError, "undefined method 'num_dml_affected_rows'" unless @supports_affected_rows
112+
113+
@inserted + @updated
114+
end
115+
116+
def inserted_row_count
117+
raise NoMethodError, "undefined method 'inserted_row_count'" unless @supports_row_counts
118+
119+
@inserted
120+
end
121+
122+
def updated_row_count
123+
raise NoMethodError, "undefined method 'updated_row_count'" unless @supports_row_counts
124+
125+
@updated
105126
end
106127

107128
def error; nil; end
129+
130+
def respond_to?(method_name, include_private = false)
131+
return @supports_dml_stats if method_name == :dml_stats
132+
return @supports_row_counts if [:inserted_row_count, :updated_row_count].include?(method_name)
133+
return @supports_affected_rows if method_name == :num_dml_affected_rows
134+
135+
super
136+
end
108137
end
109138

110139
class FakeTable
@@ -160,9 +189,10 @@ def table(name)
160189
class FakeBigquery
161190
attr_reader :dataset_calls, :create_dataset_calls, :copy_job_calls, :query_job_calls
162191

163-
def initialize(dataset:, created_dataset: nil)
192+
def initialize(dataset:, created_dataset: nil, query_job: nil)
164193
@dataset = dataset
165194
@created_dataset = created_dataset || dataset
195+
@query_job = query_job || FakeQueryJob.new
166196
@dataset_calls = []
167197
@create_dataset_calls = []
168198
@copy_job_calls = []
@@ -186,7 +216,7 @@ def copy_job(source, destination, write:)
186216

187217
def query_job(sql)
188218
@query_job_calls << sql
189-
FakeQueryJob.new
219+
@query_job
190220
end
191221
end
192222

@@ -256,6 +286,48 @@ def test_merges_into_an_existing_target_table
256286
assert_empty @bigquery.copy_job_calls
257287
end
258288

289+
def test_merges_with_query_job_that_exposes_row_counts_but_not_dml_stats
290+
bigquery = FakeBigquery.new(
291+
dataset: @dataset,
292+
query_job: FakeQueryJob.new(inserted: 2, updated: 3, supports_dml_stats: false, supports_row_counts: true)
293+
)
294+
gateway = AirbnbPayous::BigqueryGateway.new(
295+
project_id: "project",
296+
dataset_id: "dataset",
297+
table_id: "table",
298+
logger: @logger,
299+
bigquery: bigquery,
300+
storage: @storage
301+
)
302+
303+
result = gateway.load_and_merge!(rows: @rows)
304+
305+
assert_equal :merge, result[:mode]
306+
assert_equal 2, result[:inserted_count]
307+
assert_equal 3, result[:updated_count]
308+
end
309+
310+
def test_merges_with_query_job_that_only_exposes_affected_rows
311+
bigquery = FakeBigquery.new(
312+
dataset: @dataset,
313+
query_job: FakeQueryJob.new(inserted: 4, updated: 0, supports_dml_stats: false, supports_row_counts: false, supports_affected_rows: true)
314+
)
315+
gateway = AirbnbPayous::BigqueryGateway.new(
316+
project_id: "project",
317+
dataset_id: "dataset",
318+
table_id: "table",
319+
logger: @logger,
320+
bigquery: bigquery,
321+
storage: @storage
322+
)
323+
324+
result = gateway.load_and_merge!(rows: @rows)
325+
326+
assert_equal :merge, result[:mode]
327+
assert_equal 4, result[:inserted_count]
328+
assert_equal 0, result[:updated_count]
329+
end
330+
259331
def test_serializes_dates_and_decimals_for_bigquery_load_jobs
260332
@gateway.load_and_merge!(rows: @rows)
261333

0 commit comments

Comments
 (0)