Skip to content

Commit 1fb7ca5

Browse files
committed
Properly clean up the entire folder for jobs using streaming.
1 parent c43cc30 commit 1fb7ca5

File tree

4 files changed

+72
-11
lines changed

4 files changed

+72
-11
lines changed

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryUtil.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,19 @@ public static Path createGcsPath(
138138
return gcsPath;
139139
}
140140

141+
public static Path getGcsPathWithApplicationId(
142+
SparkBigQueryConfig config, Configuration conf, String applicationId) {
143+
String bucket = null;
144+
if (config.getPersistentGcsBucket().isPresent()) {
145+
bucket = config.getPersistentGcsBucket().get();
146+
} else if (config.getTemporaryGcsBucket().isPresent()) {
147+
bucket = config.getTemporaryGcsBucket().get();
148+
} else {
149+
bucket = config.getPersistentGcsBucket().get();
150+
}
151+
return new Path(String.format("gs://%s/.spark-bigquery-%s-*", bucket, applicationId));
152+
}
153+
141154
private static Path getUniqueGcsPath(String gcsBucket, String applicationId, Configuration conf)
142155
throws IOException {
143156
boolean needNewPath = true;

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/BigQueryWriteHelper.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,15 @@ public BigQueryWriteHelper(
9292
this.gcsPath =
9393
SparkBigQueryUtil.createGcsPath(config, conf, sqlContext.sparkContext().applicationId());
9494
this.createTemporaryPathDeleter =
95-
config.getTemporaryGcsBucket().map(unused -> new IntermediateDataCleaner(gcsPath, conf));
95+
config
96+
.getTemporaryGcsBucket()
97+
.map(
98+
unused ->
99+
new IntermediateDataCleaner(
100+
gcsPath,
101+
conf,
102+
SparkBigQueryUtil.getGcsPathWithApplicationId(
103+
config, conf, sqlContext.sparkContext().applicationId())));
96104

97105
Schema schema =
98106
SchemaConverters.from(SchemaConvertersConfiguration.from(config))

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/IntermediateDataCleaner.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616
package com.google.cloud.spark.bigquery.write;
1717

18+
import java.io.IOException;
1819
import org.apache.hadoop.conf.Configuration;
20+
import org.apache.hadoop.fs.FileStatus;
1921
import org.apache.hadoop.fs.FileSystem;
2022
import org.apache.hadoop.fs.Path;
2123
import org.slf4j.Logger;
@@ -32,15 +34,18 @@ public class IntermediateDataCleaner extends Thread {
3234
private final Path path;
3335
/** the hadoop configuration */
3436
private final Configuration conf;
37+
/** the path for the job */
38+
private final Path gcsPathPrefix;
3539

36-
public IntermediateDataCleaner(Path path, Configuration conf) {
40+
public IntermediateDataCleaner(Path path, Configuration conf, Path gcsPathPrefix) {
3741
this.path = path;
3842
this.conf = conf;
43+
this.gcsPathPrefix = gcsPathPrefix;
3944
}
4045

4146
@Override
4247
public void run() {
43-
deletePath();
48+
deleteGcsPath();
4449
}
4550

4651
public void deletePath() {
@@ -56,14 +61,44 @@ public void deletePath() {
5661
}
5762
}
5863

59-
public void deleteGcsPath(Path gcsPath) {
64+
// Delete all GCS path matched with the application Id.
65+
public void deleteGcsPath() {
66+
logger.info("Deleting Gcs path " + gcsPathPrefix + " if it exists");
6067
try {
61-
logger.info("Deleting Gcs path " + gcsPath + " if it exists");
62-
FileSystem fs = gcsPath.getFileSystem(conf);
63-
fs.delete(gcsPath, true); // <-- The crucial recursive delete call
64-
logger.info("Successfully deleted main GCS path: {}", gcsPath);
65-
} catch (Exception e) {
66-
logger.error("Failed to delete main GCS path: {}", gcsPath, e);
68+
FileSystem fs = FileSystem.get(gcsPathPrefix.toUri(), conf);
69+
FileStatus[] statuses = fs.globStatus(gcsPathPrefix);
70+
71+
if (statuses == null || statuses.length == 0) {
72+
logger.info("No paths found matching pattern: {}", gcsPathPrefix);
73+
return;
74+
}
75+
76+
logger.info(
77+
"Found {} paths matching the pattern. Starting recursive deletion.", statuses.length);
78+
79+
boolean allSuccess = true;
80+
for (FileStatus status : statuses) {
81+
Path pathToDelete = status.getPath();
82+
FileSystem deleteFs = FileSystem.get(pathToDelete.toUri(), conf);
83+
boolean deleted = deleteFs.delete(pathToDelete, true);
84+
85+
if (deleted) {
86+
logger.info("Successfully deleted path: {}", pathToDelete);
87+
} else {
88+
logger.error("Failed to delete path: {}", pathToDelete);
89+
allSuccess = false;
90+
}
91+
}
92+
93+
if (allSuccess) {
94+
logger.info("Completed cleanup for pattern: {}", gcsPathPrefix);
95+
} else {
96+
logger.warn(
97+
"Completed cleanup, but one or more paths failed to delete for pattern: {}",
98+
gcsPathPrefix);
99+
}
100+
} catch (IOException e) {
101+
throw new RuntimeException(e);
67102
}
68103
}
69104

spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDataSourceWriterModule.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,12 @@ public BigQueryIndirectDataSourceWriterContext provideIndirectDataSourceWriterCo
101101
.map(
102102
ignored ->
103103
new IntermediateDataCleaner(
104-
gcsPath, spark.sparkContext().hadoopConfiguration()));
104+
gcsPath,
105+
spark.sparkContext().hadoopConfiguration(),
106+
SparkBigQueryUtil.getGcsPathWithApplicationId(
107+
tableConfig,
108+
spark.sparkContext().hadoopConfiguration(),
109+
spark.sparkContext().applicationId())));
105110
// based on pmkc's suggestion at https://git.io/JeWRt
106111
intermediateDataCleaner.ifPresent(cleaner -> Runtime.getRuntime().addShutdownHook(cleaner));
107112
return new BigQueryIndirectDataSourceWriterContext(

0 commit comments

Comments
 (0)