Skip to content

Commit 995c7dc

Browse files
author
Isha Tarte
committed
add logging for dpo
1 parent 7bdc10f commit 995c7dc

File tree

2 files changed

+27
-0
lines changed

2 files changed

+27
-0
lines changed

bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,13 @@ public boolean isTablePartitioned(TableId tableId) {
354354
*/
355355
public Job overwriteDestinationWithTemporaryDynamicPartitons(
356356
TableId temporaryTableId, TableId destinationTableId) {
357+
log.info(
358+
"overwriteDestinationWithTemporaryDynamicPartitons called with temporaryTableId: {} and destinationTableId: {}",
359+
temporaryTableId,
360+
destinationTableId);
361+
log.info(
362+
"Stack trace for overwriteDestinationWithTemporaryDynamicPartitons:",
363+
new RuntimeException("Debug Stack Trace"));
357364

358365
TableDefinition destinationDefinition = getTable(destinationTableId).getDefinition();
359366
String sqlQuery = null;
@@ -364,19 +371,26 @@ public Job overwriteDestinationWithTemporaryDynamicPartitons(
364371

365372
TimePartitioning timePartitioning = sdt.getTimePartitioning();
366373
if (timePartitioning != null) {
374+
log.info(
375+
"Destination table {} is time partitioned: {}", destinationTableId, timePartitioning);
367376
sqlQuery =
368377
getQueryForTimePartitionedTable(
369378
destinationTableName, temporaryTableName, sdt, timePartitioning);
370379
} else {
371380
RangePartitioning rangePartitioning = sdt.getRangePartitioning();
372381
if (rangePartitioning != null) {
382+
log.info(
383+
"Destination table {} is range partitioned: {}",
384+
destinationTableId,
385+
rangePartitioning);
373386
sqlQuery =
374387
getQueryForRangePartitionedTable(
375388
destinationTableName, temporaryTableName, sdt, rangePartitioning);
376389
}
377390
}
378391

379392
if (sqlQuery != null) {
393+
log.info("Running dynamic partition overwrite query: {}", sqlQuery);
380394
QueryJobConfiguration queryConfig =
381395
jobConfigurationFactory
382396
.createQueryJobConfigurationBuilder(sqlQuery, Collections.emptyMap())
@@ -388,6 +402,9 @@ public Job overwriteDestinationWithTemporaryDynamicPartitons(
388402
}
389403

390404
// no partitioning default to statndard overwrite
405+
log.info(
406+
"No partition definition found for destination table {}, falling back to standard overwrite",
407+
destinationTableId);
391408
return overwriteDestinationWithTemporary(temporaryTableId, destinationTableId);
392409
}
393410

@@ -402,6 +419,9 @@ public Job overwriteDestinationWithTemporaryDynamicPartitons(
402419
*/
403420
public Job overwriteDestinationWithTemporary(
404421
TableId temporaryTableId, TableId destinationTableId) {
422+
log.info(
423+
"Stack trace for overwriteDestinationWithTemporary:",
424+
new RuntimeException("Debug Stack Trace"));
405425
String queryFormat =
406426
"MERGE `%s`\n"
407427
+ "USING (SELECT * FROM `%s`)\n"

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/BigQueryWriteHelper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ public void writeDataFrameToBigQuery() {
133133
String format = config.getIntermediateFormat().getDataSource();
134134
data.write().format(format).save(gcsPath.toString());
135135

136+
logger.info(
137+
"Conditions check: writeDisposition={}, PartitionOverwriteMode={}, tableExists={}, isTablePartitioned={}",
138+
writeDisposition,
139+
config.getPartitionOverwriteModeValue(),
140+
bigQueryClient.tableExists(config.getTableId()),
141+
bigQueryClient.isTablePartitioned(config.getTableId()));
142+
136143
if (writeDisposition == JobInfo.WriteDisposition.WRITE_TRUNCATE
137144
&& config.getPartitionOverwriteModeValue() == PartitionOverwriteMode.DYNAMIC
138145
&& bigQueryClient.tableExists(config.getTableId())

0 commit comments

Comments
 (0)